OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <algorithm>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/measure.h"
41 
42 #define EPOCH_FILENAME "epoch"
43 #define DB_META_FILENAME "dbmeta"
44 
45 using namespace std;
46 
47 namespace File_Namespace {
48 
49 bool headerCompare(const HeaderInfo& firstElem, const HeaderInfo& secondElem) {
50  // HeaderInfo.first is a pair of Chunk key with a vector containing
51  // pageId and version
52  if (firstElem.chunkKey != secondElem.chunkKey) {
53  return firstElem.chunkKey < secondElem.chunkKey;
54  }
55  if (firstElem.pageId != secondElem.pageId) {
56  return firstElem.pageId < secondElem.pageId;
57  }
58  return firstElem.versionEpoch < secondElem.versionEpoch;
59 
60  /*
61  if (firstElem.first.first != secondElem.first.first)
62  return firstElem.first.first < secondElem.first.first;
63  return firstElem.first.second < secondElem.first.second;
64  */
65 }
66 
67 FileMgr::FileMgr(const int deviceId,
68  GlobalFileMgr* gfm,
69  const std::pair<const int, const int> fileMgrKey,
70  const size_t num_reader_threads,
71  const int epoch,
72  const size_t defaultPageSize)
73  : AbstractBufferMgr(deviceId)
74  , gfm_(gfm)
75  , fileMgrKey_(fileMgrKey)
76  , defaultPageSize_(defaultPageSize)
77  , nextFileId_(0)
78  , epoch_(epoch) {
79  init(num_reader_threads);
80 }
81 
82 // used only to initialize enough to drop
83 FileMgr::FileMgr(const int deviceId,
84  GlobalFileMgr* gfm,
85  const std::pair<const int, const int> fileMgrKey,
86  const bool initOnly)
87  : AbstractBufferMgr(deviceId)
88  , gfm_(gfm)
89  , fileMgrKey_(fileMgrKey)
90  , defaultPageSize_(0)
91  , nextFileId_(0)
92  , epoch_(0) {
93  const std::string fileMgrDirPrefix("table");
94  const std::string FileMgrDirDelim("_");
95  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
96  std::to_string(fileMgrKey_.first) + // db_id
97  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
98  epochFile_ = nullptr;
99  files_.clear();
100 }
101 
102 FileMgr::FileMgr(GlobalFileMgr* gfm, const size_t defaultPageSize, std::string basePath)
103  : AbstractBufferMgr(0)
104  , gfm_(gfm)
105  , fileMgrKey_(0, 0)
106  , fileMgrBasePath_(basePath)
107  , defaultPageSize_(defaultPageSize)
108  , nextFileId_(0)
109  , epoch_(-1) {
110  init(basePath);
111 }
112 
114  // checkpoint();
115  // free memory used by FileInfo objects
116  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
117  delete chunkIt->second;
118  }
119  for (auto file_info : files_) {
120  delete file_info;
121  }
122 }
123 
124 void FileMgr::init(const size_t num_reader_threads) {
125  // if epoch = -1 this means open from epoch file
126  const std::string fileMgrDirPrefix("table");
127  const std::string FileMgrDirDelim("_");
128  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
129  std::to_string(fileMgrKey_.first) + // db_id
130  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
131  boost::filesystem::path path(fileMgrBasePath_);
132  if (boost::filesystem::exists(path)) {
133  if (!boost::filesystem::is_directory(path)) {
134  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
135  << "' for table data is not a directory.";
136  }
137  if (epoch_ != -1) { // if opening at previous epoch
138  int epochCopy = epoch_;
140  epoch_ = epochCopy;
141  } else {
143  }
144 
145  auto clock_begin = timer_start();
146 
147  boost::filesystem::directory_iterator
148  endItr; // default construction yields past-the-end
149  int maxFileId = -1;
150  int fileCount = 0;
151  int threadCount = std::thread::hardware_concurrency();
152  std::vector<HeaderInfo> headerVec;
153  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
154  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
155  if (boost::filesystem::is_regular_file(fileIt->status())) {
156  // note that boost::filesystem leaves preceding dot on
157  // extension - hence MAPD_FILE_EXT is ".mapd"
158  std::string extension(fileIt->path().extension().string());
159 
160  if (extension == MAPD_FILE_EXT) {
161  std::string fileStem(fileIt->path().stem().string());
162  // remove trailing dot if any
163  if (fileStem.size() > 0 && fileStem.back() == '.') {
164  fileStem = fileStem.substr(0, fileStem.size() - 1);
165  }
166  size_t dotPos = fileStem.find_last_of("."); // should only be one
167  if (dotPos == std::string::npos) {
168  LOG(FATAL) << "File `" << fileIt->path()
169  << "` does not carry page size information in the filename.";
170  }
171  int fileId = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
172  if (fileId > maxFileId) {
173  maxFileId = fileId;
174  }
175  size_t pageSize =
176  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
177  std::string filePath(fileIt->path().string());
178  size_t fileSize = boost::filesystem::file_size(filePath);
179  CHECK_EQ(fileSize % pageSize, size_t(0)); // should be no partial pages
180  size_t numPages = fileSize / pageSize;
181 
182  VLOG(4) << "File id: " << fileId << " Page size: " << pageSize
183  << " Num pages: " << numPages;
184 
185  file_futures.emplace_back(std::async(
186  std::launch::async, [filePath, fileId, pageSize, numPages, this] {
187  std::vector<HeaderInfo> tempHeaderVec;
188  openExistingFile(filePath, fileId, pageSize, numPages, tempHeaderVec);
189  return tempHeaderVec;
190  }));
191  fileCount++;
192  if (fileCount % threadCount == 0) {
193  processFileFutures(file_futures, headerVec);
194  }
195  }
196  }
197  }
198 
199  if (file_futures.size() > 0) {
200  processFileFutures(file_futures, headerVec);
201  }
202  int64_t queue_time_ms = timer_stop(clock_begin);
203 
204  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : "
205  << queue_time_ms << "ms Epoch: " << epoch_ << " files read: " << fileCount
206  << " table location: '" << fileMgrBasePath_ << "'";
207 
208  /* Sort headerVec so that all HeaderInfos
209  * from a chunk will be grouped together
210  * and in order of increasing PageId
211  * - Version Epoch */
212 
213  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
214 
215  /* Goal of next section is to find sequences in the
216  * sorted headerVec of the same ChunkId, which we
217  * can then initiate a FileBuffer with */
218 
219  VLOG(4) << "Number of Headers in Vector: " << headerVec.size();
220  if (headerVec.size() > 0) {
221  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
222  auto startIt = headerVec.begin();
223 
224  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
225  ++headerIt) {
226  // for (auto chunkIt = headerIt->chunkKey.begin(); chunkIt !=
227  // headerIt->chunkKey.end(); ++chunkIt) {
228  // std::cout << *chunkIt << " ";
229  //}
230 
231  if (headerIt->chunkKey != lastChunkKey) {
232  chunkIndex_[lastChunkKey] =
233  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerIt);
234  /*
235  if (startIt->versionEpoch != -1) {
236  cout << "not skipping bc version != -1" << endl;
237  // -1 means that chunk was deleted
238  // lets not read it in
239  chunkIndex_[lastChunkKey] = new FileBuffer
240  (this,/lastChunkKey,startIt,headerIt);
241 
242  }
243  else {
244  cout << "Skipping bc version == -1" << endl;
245  }
246  */
247  lastChunkKey = headerIt->chunkKey;
248  startIt = headerIt;
249  }
250  }
251  // now need to insert last Chunk
252  // size_t pageSize = files_[startIt->page.fileId]->pageSize;
253  // cout << "Inserting last chunk" << endl;
254  // if (startIt->versionEpoch != -1) {
255  chunkIndex_[lastChunkKey] =
256  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerVec.end());
257  //}
258  }
259  nextFileId_ = maxFileId + 1;
260  // std::cout << "next file id: " << nextFileId_ << std::endl;
261  } else {
262  if (!boost::filesystem::create_directory(path)) {
263  LOG(FATAL) << "Could not create data directory: " << path;
264  }
266  }
267 
268  /* define number of reader threads to be used */
269  size_t num_hardware_based_threads =
270  std::thread::hardware_concurrency(); // # of threads is based on # of cores on the
271  // host
272  if (num_reader_threads == 0) { // # of threads has not been defined by user
273  num_reader_threads_ = num_hardware_based_threads;
274  } else {
275  if (num_reader_threads > num_hardware_based_threads) {
276  num_reader_threads_ = num_hardware_based_threads;
277  } else {
278  num_reader_threads_ = num_reader_threads;
279  }
280  }
281 }
282 
284  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
285  std::vector<HeaderInfo>& headerVec) {
286  for (auto& file_future : file_futures) {
287  file_future.wait();
288  }
289  // concatenate the vectors after thread completes
290  for (auto& file_future : file_futures) {
291  auto tempHeaderVec = file_future.get();
292  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
293  }
294  file_futures.clear();
295 }
296 
297 void FileMgr::init(const std::string dataPathToConvertFrom) {
298  int converted_data_epoch = 0;
299  boost::filesystem::path path(dataPathToConvertFrom);
300  if (boost::filesystem::exists(path)) {
301  if (!boost::filesystem::is_directory(path)) {
302  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
303  }
304 
305  if (epoch_ != -1) { // if opening at previous epoch
306  int epochCopy = epoch_;
308  epoch_ = epochCopy;
309  } else {
311  }
312 
313  boost::filesystem::directory_iterator
314  endItr; // default construction yields past-the-end
315  int maxFileId = -1;
316  int fileCount = 0;
317  int threadCount = std::thread::hardware_concurrency();
318  std::vector<HeaderInfo> headerVec;
319  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
320  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
321  if (boost::filesystem::is_regular_file(fileIt->status())) {
322  // note that boost::filesystem leaves preceding dot on
323  // extension - hence MAPD_FILE_EXT is ".mapd"
324  std::string extension(fileIt->path().extension().string());
325 
326  if (extension == MAPD_FILE_EXT) {
327  std::string fileStem(fileIt->path().stem().string());
328  // remove trailing dot if any
329  if (fileStem.size() > 0 && fileStem.back() == '.') {
330  fileStem = fileStem.substr(0, fileStem.size() - 1);
331  }
332  size_t dotPos = fileStem.find_last_of("."); // should only be one
333  if (dotPos == std::string::npos) {
334  LOG(FATAL) << "File `" + fileIt->path().string() +
335  "` does not carry page size information in the filename.";
336  }
337  int fileId = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
338  if (fileId > maxFileId) {
339  maxFileId = fileId;
340  }
341  size_t pageSize =
342  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
343  std::string filePath(fileIt->path().string());
344  size_t fileSize = boost::filesystem::file_size(filePath);
345  CHECK(fileSize % pageSize == 0); // should be no partial pages
346  size_t numPages = fileSize / pageSize;
347 
348  file_futures.emplace_back(std::async(
349  std::launch::async, [filePath, fileId, pageSize, numPages, this] {
350  std::vector<HeaderInfo> tempHeaderVec;
351  openExistingFile(filePath, fileId, pageSize, numPages, tempHeaderVec);
352  return tempHeaderVec;
353  }));
354  fileCount++;
355  if (fileCount % threadCount) {
356  processFileFutures(file_futures, headerVec);
357  }
358  }
359  }
360  }
361 
362  if (file_futures.size() > 0) {
363  processFileFutures(file_futures, headerVec);
364  }
365 
366  /* Sort headerVec so that all HeaderInfos
367  * from a chunk will be grouped together
368  * and in order of increasing PageId
369  * - Version Epoch */
370 
371  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
372 
373  /* Goal of next section is to find sequences in the
374  * sorted headerVec of the same ChunkId, which we
375  * can then initiate a FileBuffer with */
376 
377  if (headerVec.size() > 0) {
378  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
379  auto startIt = headerVec.begin();
380 
381  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
382  ++headerIt) {
383  if (headerIt->chunkKey != lastChunkKey) {
384  FileMgr* c_fm_ = gfm_->getFileMgr(lastChunkKey);
385  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerIt);
386  chunkIndex_[lastChunkKey] = srcBuf;
387  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
388  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
389  destBuf->syncEncoder(srcBuf);
390  destBuf->setSize(srcBuf->size());
391  destBuf->setDirty(); // this needs to be set to force writing out metadata
392  // files from "checkpoint()" call
393 
394  size_t totalNumPages = srcBuf->getMultiPage().size();
395  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
396  Page srcPage = srcBuf->getMultiPage()[pageNum].current();
397  Page destPage = c_fm_->requestFreePage(
398  srcBuf->pageSize(),
399  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
400  MultiPage multiPage(srcBuf->pageSize());
401  multiPage.epochs.push_back(converted_data_epoch);
402  multiPage.pageVersions.push_back(destPage);
403  destBuf->multiPages_.push_back(multiPage);
404  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
405  copyPage(
406  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
407  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
408  }
409  lastChunkKey = headerIt->chunkKey;
410  startIt = headerIt;
411  }
412  }
413 
414  // now need to insert last Chunk
415  FileMgr* c_fm_ = gfm_->getFileMgr(lastChunkKey);
416  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerVec.end());
417  chunkIndex_[lastChunkKey] = srcBuf;
418  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
419  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
420  destBuf->syncEncoder(srcBuf);
421  destBuf->setSize(srcBuf->size());
422  destBuf->setDirty(); // this needs to be set to write out metadata file from the
423  // "checkpoint()" call
424 
425  size_t totalNumPages = srcBuf->getMultiPage().size();
426  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
427  Page srcPage = srcBuf->getMultiPage()[pageNum].current();
428  Page destPage = c_fm_->requestFreePage(
429  srcBuf->pageSize(),
430  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
431  MultiPage multiPage(srcBuf->pageSize());
432  multiPage.epochs.push_back(converted_data_epoch);
433  multiPage.pageVersions.push_back(destPage);
434  destBuf->multiPages_.push_back(multiPage);
435  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
436  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
437  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
438  }
439  }
440  nextFileId_ = maxFileId + 1;
441  } else {
442  if (!boost::filesystem::create_directory(path)) {
443  LOG(FATAL) << "Specified path does not exist: " << path;
444  }
445  }
446 }
447 
449  for (auto file_info : files_) {
450  if (file_info->f) {
451  close(file_info->f);
452  file_info->f = nullptr;
453  }
454  }
455 
456  if (epochFile_) {
457  close(epochFile_);
458  epochFile_ = nullptr;
459  }
460 
461  /* rename for later deletion the directory containing table related data */
463 }
464 
465 void FileMgr::copyPage(Page& srcPage,
466  FileMgr* destFileMgr,
467  Page& destPage,
468  const size_t reservedHeaderSize,
469  const size_t numBytes,
470  const size_t offset) {
471  CHECK(offset + numBytes <= defaultPageSize_);
472  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
473  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
474  int8_t* buffer = new int8_t[numBytes];
475 
476  size_t bytesRead = srcFileInfo->read(
477  srcPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize, numBytes, buffer);
478  CHECK(bytesRead == numBytes);
479  size_t bytesWritten = destFileInfo->write(
480  destPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize,
481  numBytes,
482  buffer);
483  CHECK(bytesWritten == numBytes);
484  delete[] buffer;
485 }
486 
487 void FileMgr::createEpochFile(const std::string& epochFileName) {
488  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
489  if (boost::filesystem::exists(epochFilePath)) {
490  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
491  }
492  epochFile_ = create(epochFilePath, sizeof(int));
493  // Write out current epoch to file - which if this
494  // function is being called should be 0
495  write(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
496  epoch_++;
497 }
498 
499 void FileMgr::openEpochFile(const std::string& epochFileName) {
500  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
501  if (!boost::filesystem::exists(epochFilePath)) {
502  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
503  }
504  if (!boost::filesystem::is_regular_file(epochFilePath)) {
505  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
506  }
507  if (boost::filesystem::file_size(epochFilePath) < 4) {
508  LOG(FATAL) << "Epoch file `" << epochFilePath
509  << "` is not sized properly (current size: "
510  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
511  }
512  epochFile_ = open(epochFilePath);
513  read(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
514  // std::cout << "Epoch after open file: " << epoch_ << std::endl;
515  epoch_++; // we are in new epoch from last checkpoint
516 }
517 
519  write(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
520  int status = fflush(epochFile_);
521  // int status = fcntl(fileno(epochFile_),51);
522  if (status != 0) {
523  LOG(FATAL) << "Could not flush epoch file to disk";
524  }
525 #ifdef __APPLE__
526  status = fcntl(fileno(epochFile_), 51);
527 #else
528  status = fsync(fileno(epochFile_));
529 #endif
530  if (status != 0) {
531  LOG(FATAL) << "Could not sync epoch file to disk";
532  }
533  ++epoch_;
534 }
535 
536 void FileMgr::createDBMetaFile(const std::string& DBMetaFileName) {
537  std::string DBMetaFilePath(fileMgrBasePath_ + "/" + DBMetaFileName);
538  if (boost::filesystem::exists(DBMetaFilePath)) {
539  LOG(FATAL) << "DB metadata file `" << DBMetaFilePath << "` already exists.";
540  }
541  DBMetaFile_ = create(DBMetaFilePath, sizeof(int));
542  int db_ver = getDBVersion();
543  write(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_ver);
544  // LOG(INFO) << "DB metadata file has been created.";
545 }
546 
547 bool FileMgr::openDBMetaFile(const std::string& DBMetaFileName) {
548  std::string DBMetaFilePath(fileMgrBasePath_ + "/" + DBMetaFileName);
549 
550  if (!boost::filesystem::exists(DBMetaFilePath)) {
551  // LOG(INFO) << "DB metadata file does not exist, one will be created.";
552  return false;
553  }
554  if (!boost::filesystem::is_regular_file(DBMetaFilePath)) {
555  // LOG(INFO) << "DB metadata file is not a regular file, one will be created.";
556  return false;
557  }
558  if (boost::filesystem::file_size(DBMetaFilePath) < 4) {
559  // LOG(INFO) << "DB metadata file is not sized properly, one will be created.";
560  return false;
561  }
562  DBMetaFile_ = open(DBMetaFilePath);
563  read(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_version_);
564 
565  return true;
566 }
567 
569  int db_ver = getDBVersion();
570  write(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_ver);
571  int status = fflush(DBMetaFile_);
572  if (status != 0) {
573  LOG(FATAL) << "Could not sync DB metadata file to disk";
574  }
575 }
576 
578  VLOG(2) << "Checkpointing epoch: " << epoch_;
579  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
580  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
581  /*
582  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
583  std::cout << *vecIt << ",";
584  }
585  cout << "Is dirty: " << chunkIt->second->isDirty_ << endl;
586  */
587  if (chunkIt->second->is_dirty_) {
588  chunkIt->second->writeMetadata(epoch_);
589  chunkIt->second->clearDirtyBits();
590  }
591  }
592  chunkIndexWriteLock.unlock();
593 
594  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
595  for (auto fileIt = files_.begin(); fileIt != files_.end(); ++fileIt) {
596  int status = (*fileIt)->syncToDisk();
597  if (status != 0) {
598  LOG(FATAL) << "Could not sync file to disk";
599  }
600  }
601 
603 
604  mapd_unique_lock<mapd_shared_mutex> freePagesWriteLock(mutex_free_page);
605  for (auto& free_page : free_pages) {
606  free_page.first->freePageDeferred(free_page.second);
607  }
608  free_pages.clear();
609 }
610 
612  const size_t pageSize,
613  const size_t numBytes) {
614  size_t actualPageSize = pageSize;
615  if (actualPageSize == 0) {
616  actualPageSize = defaultPageSize_;
617  }
619  // we will do this lazily and not allocate space for the Chunk (i.e.
620  // FileBuffer yet)
621  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
622 
623  if (chunkIndex_.find(key) != chunkIndex_.end()) {
624  LOG(FATAL) << "Chunk already exists for key: " << showChunk(key);
625  }
626  chunkIndex_[key] = new FileBuffer(this, actualPageSize, key, numBytes);
627  chunkIndexWriteLock.unlock();
628  return (chunkIndex_[key]);
629 }
630 
632  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
633  return chunkIndex_.find(key) != chunkIndex_.end();
634 }
635 
636 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
637  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
638  auto chunkIt = chunkIndex_.find(key);
639  // ensure the Chunk exists
640  if (chunkIt == chunkIndex_.end()) {
641  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
642  }
643  chunkIndexWriteLock.unlock();
644  // chunkIt->second->writeMetadata(-1); // writes -1 as epoch - signifies deleted
645  if (purge) {
646  chunkIt->second->freePages();
647  }
648  //@todo need a way to represent delete in non purge case
649  delete chunkIt->second;
650  chunkIndex_.erase(chunkIt);
651 }
652 
653 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
654  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
655  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
656  if (chunkIt == chunkIndex_.end()) {
657  return; // should we throw?
658  }
659  while (chunkIt != chunkIndex_.end() &&
660  std::search(chunkIt->first.begin(),
661  chunkIt->first.begin() + keyPrefix.size(),
662  keyPrefix.begin(),
663  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
664  /*
665  cout << "Freeing pages for chunk ";
666  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
667  std::cout << *vecIt << ",";
668  }
669  cout << endl;
670  */
671  if (purge) {
672  chunkIt->second->freePages();
673  }
674  //@todo need a way to represent delete in non purge case
675  delete chunkIt->second;
676  chunkIndex_.erase(chunkIt++);
677  }
678 }
679 
680 AbstractBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t numBytes) {
681  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
682  auto chunkIt = chunkIndex_.find(key);
683  if (chunkIt == chunkIndex_.end()) {
684  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
685  }
686  return chunkIt->second;
687 }
688 
690  AbstractBuffer* destBuffer,
691  const size_t numBytes) {
692  // reads chunk specified by ChunkKey into AbstractBuffer provided by
693  // destBuffer
694  if (destBuffer->isDirty()) {
695  LOG(FATAL)
696  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
697  << showChunk(key);
698  }
699  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
700  auto chunkIt = chunkIndex_.find(key);
701  if (chunkIt == chunkIndex_.end()) {
702  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
703  }
704  chunkIndexReadLock.unlock();
705 
706  AbstractBuffer* chunk = chunkIt->second;
707  // ChunkSize is either specified in function call with numBytes or we
708  // just look at pageSize * numPages in FileBuffer
709  size_t chunkSize = numBytes == 0 ? chunk->size() : numBytes;
710  if (numBytes > 0 && numBytes > chunk->size()) {
711  LOG(FATAL) << "Chunk retrieved for key `" << showChunk(key) << "` is smaller ("
712  << chunk->size() << ") than number of bytes requested (" << numBytes
713  << ")";
714  }
715  destBuffer->reserve(chunkSize);
716  // std::cout << "After reserve chunksize: " << chunkSize << std::endl;
717  if (chunk->isUpdated()) {
718  chunk->read(destBuffer->getMemoryPtr(),
719  chunkSize,
720  0,
721  destBuffer->getType(),
722  destBuffer->getDeviceId());
723  } else {
724  chunk->read(destBuffer->getMemoryPtr() + destBuffer->size(),
725  chunkSize - destBuffer->size(),
726  destBuffer->size(),
727  destBuffer->getType(),
728  destBuffer->getDeviceId());
729  }
730  destBuffer->setSize(chunkSize);
731  destBuffer->syncEncoder(chunk);
732 }
733 
735  AbstractBuffer* srcBuffer,
736  const size_t numBytes) {
737  // obtain a pointer to the Chunk
738  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
739  auto chunkIt = chunkIndex_.find(key);
740  AbstractBuffer* chunk;
741  if (chunkIt == chunkIndex_.end()) {
742  chunk = createBuffer(key, defaultPageSize_);
743  } else {
744  chunk = chunkIt->second;
745  }
746  chunkIndexWriteLock.unlock();
747  size_t oldChunkSize = chunk->size();
748  // write the buffer's data to the Chunk
749  // size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
750  size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
751  if (chunk->isDirty()) {
752  // multiple appends are allowed,
753  // but only single update is allowed
754  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
755  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
756  "for key: "
757  << showChunk(key);
758  }
759  }
760  if (srcBuffer->isUpdated()) {
761  // chunk size is not changed when fixed rows are updated or are marked as deleted.
762  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
763  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
764  // For varlen update, it takes another route via fragmenter using disk-level buffer.
765  if (0 == numBytes && !chunk->isDirty()) {
766  chunk->setSize(newChunkSize);
767  }
768  //@todo use dirty flags to only flush pages of chunk that need to
769  // be flushed
770  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
771  newChunkSize,
772  0,
773  srcBuffer->getType(),
774  srcBuffer->getDeviceId());
775  } else if (srcBuffer->isAppended()) {
776  CHECK_LT(oldChunkSize, newChunkSize);
777  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
778  newChunkSize - oldChunkSize,
779  srcBuffer->getType(),
780  srcBuffer->getDeviceId());
781  }
782  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
783  //@todo commenting out line above will make sure this metadata is set
784  // but will trigger error on fetch chunk
785  srcBuffer->clearDirtyBits();
786  chunk->syncEncoder(srcBuffer);
787  return chunk;
788 }
789 
790 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
791  LOG(FATAL) << "Operation not supported";
792  return nullptr; // satisfy return-type warning
793 }
794 
796  LOG(FATAL) << "Operation not supported";
797 }
798 
799 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
800  std::lock_guard<std::mutex> lock(getPageMutex_);
801 
802  auto candidateFiles = fileIndex_.equal_range(pageSize);
803  int pageNum = -1;
804  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
805  FileInfo* fileInfo = files_[fileIt->second];
806  pageNum = fileInfo->getFreePage();
807  if (pageNum != -1) {
808  return (Page(fileInfo->fileId, pageNum));
809  }
810  }
811  // if here then we need to add a file
812  FileInfo* fileInfo;
813  if (isMetadata) {
814  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
815  } else {
816  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
817  }
818  pageNum = fileInfo->getFreePage();
819  CHECK(pageNum != -1);
820  return (Page(fileInfo->fileId, pageNum));
821 }
822 
823 void FileMgr::requestFreePages(size_t numPagesRequested,
824  size_t pageSize,
825  std::vector<Page>& pages,
826  const bool isMetadata) {
827  // not used currently
828  // @todo add method to FileInfo to get more than one page
829  std::lock_guard<std::mutex> lock(getPageMutex_);
830  auto candidateFiles = fileIndex_.equal_range(pageSize);
831  size_t numPagesNeeded = numPagesRequested;
832  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
833  FileInfo* fileInfo = files_[fileIt->second];
834  int pageNum;
835  do {
836  pageNum = fileInfo->getFreePage();
837  if (pageNum != -1) {
838  pages.emplace_back(fileInfo->fileId, pageNum);
839  numPagesNeeded--;
840  }
841  } while (pageNum != -1 && numPagesNeeded > 0);
842  if (numPagesNeeded == 0) {
843  break;
844  }
845  }
846  while (numPagesNeeded > 0) {
847  FileInfo* fileInfo;
848  if (isMetadata) {
849  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
850  } else {
851  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
852  }
853  int pageNum;
854  do {
855  pageNum = fileInfo->getFreePage();
856  if (pageNum != -1) {
857  pages.emplace_back(fileInfo->fileId, pageNum);
858  numPagesNeeded--;
859  }
860  } while (pageNum != -1 && numPagesNeeded > 0);
861  if (numPagesNeeded == 0) {
862  break;
863  }
864  }
865  CHECK(pages.size() == numPagesRequested);
866 }
867 
868 FileInfo* FileMgr::openExistingFile(const std::string& path,
869  const int fileId,
870  const size_t pageSize,
871  const size_t numPages,
872  std::vector<HeaderInfo>& headerVec) {
873  FILE* f = open(path);
874  FileInfo* fInfo = new FileInfo(
875  this, fileId, f, pageSize, numPages, false); // false means don't init file
876 
877  fInfo->openExistingFile(headerVec, epoch_);
878  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
879  if (fileId >= static_cast<int>(files_.size())) {
880  files_.resize(fileId + 1);
881  }
882  files_[fileId] = fInfo;
883  fileIndex_.insert(std::pair<size_t, int>(pageSize, fileId));
884  return fInfo;
885 }
886 
887 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
888  // check arguments
889  if (pageSize == 0 || numPages == 0) {
890  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
891  }
892 
893  // create the new file
894  FILE* f = create(fileMgrBasePath_,
895  nextFileId_,
896  pageSize,
897  numPages); // TM: not sure if I like naming scheme here - should be in
898  // separate namespace?
899  CHECK(f);
900 
901  // instantiate a new FileInfo for the newly created file
902  int fileId = nextFileId_++;
903  FileInfo* fInfo =
904  new FileInfo(this, fileId, f, pageSize, numPages, true); // true means init file
905  CHECK(fInfo);
906 
907  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
908  // update file manager data structures
909  files_.push_back(fInfo);
910  fileIndex_.insert(std::pair<size_t, int>(pageSize, fileId));
911 
912  CHECK(files_.back() == fInfo); // postcondition
913  return fInfo;
914 }
915 
916 FILE* FileMgr::getFileForFileId(const int fileId) {
917  CHECK(fileId >= 0 && static_cast<size_t>(fileId) < files_.size());
918  return files_[fileId]->f;
919 }
920 /*
921 void FileMgr::getAllChunkMetaInfo(std::vector<std::pair<ChunkKey, int64_t> > &metadata) {
922  metadata.reserve(chunkIndex_.size());
923  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
924  metadata.push_back(std::make_pair(chunkIt->first,
925 chunkIt->second->encoder->numElems));
926  }
927 }
928 */
930  std::vector<std::pair<ChunkKey, ChunkMetadata>>& chunkMetadataVec) {
931  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
932  chunkMetadataVec.reserve(chunkIndex_.size());
933  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
934  if (chunkIt->second->has_encoder) {
935  ChunkMetadata chunkMetadata;
936  chunkIt->second->encoder->getMetadata(chunkMetadata);
937  chunkMetadataVec.emplace_back(chunkIt->first, chunkMetadata);
938  }
939  }
940 }
941 
943  std::vector<std::pair<ChunkKey, ChunkMetadata>>& chunkMetadataVec,
944  const ChunkKey& keyPrefix) {
945  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(
946  chunkIndexMutex_); // is this guarding the right structure? it look slike we oly
947  // read here for chunk
948  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
949  if (chunkIt == chunkIndex_.end()) {
950  return; // throw?
951  }
952  while (chunkIt != chunkIndex_.end() &&
953  std::search(chunkIt->first.begin(),
954  chunkIt->first.begin() + keyPrefix.size(),
955  keyPrefix.begin(),
956  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
957  /*
958  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
959  std::cout << *vecIt << ",";
960  }
961  cout << endl;
962  */
963  if (chunkIt->second->has_encoder) {
964  ChunkMetadata chunkMetadata;
965  chunkIt->second->encoder->getMetadata(chunkMetadata);
966  chunkMetadataVec.emplace_back(chunkIt->first, chunkMetadata);
967  }
968  chunkIt++;
969  }
970 }
971 
973  return gfm_->getDBVersion();
974 }
975 
976 bool FileMgr::getDBConvert() const {
977  return gfm_->getDBConvert();
978 }
979 
982  if (db_version_ > getDBVersion()) {
983  LOG(FATAL) << "DB forward compatibility is not supported. Version of OmniSci "
984  "software used is older than the version of DB being read: "
985  << db_version_;
986  }
987  } else {
989  }
990 }
991 
992 void FileMgr::setEpoch(int epoch) {
993  epoch_ = epoch;
995 }
996 
997 void FileMgr::free_page(std::pair<FileInfo*, int>&& page) {
998  std::unique_lock<mapd_shared_mutex> lock(mutex_free_page);
999  free_pages.push_back(page);
1000 }
1001 
1002 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:137
#define CHECK_EQ(x, y)
Definition: Logger.h:198
std::vector< int > ChunkKey
Definition: types.h:35
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:790
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
std::string getBasePath() const
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:487
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:799
size_t size() const override
Definition: FileBuffer.h:139
void syncEncoder(const AbstractBuffer *src_buffer)
void openEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:499
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:370
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:185
int getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:972
virtual size_t size() const =0
FileMgr * getFileMgr(const int db_id, const int tb_id)
Stores Pair of ChunkKey and Page id and version, in a pair with a Page struct itself (File id and Pag...
Definition: Page.h:121
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:795
virtual MemoryLevel getType() const =0
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:916
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:465
#define MAPD_FILE_EXT
Definition: File.h:26
std::vector< std::pair< FileInfo *, int > > free_pages
Definition: FileMgr.h:257
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:636
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
std::string getFileMgrBasePath() const
Definition: FileMgr.h:230
GlobalFileMgr * gfm_
Definition: FileMgr.h:237
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:252
virtual bool isAppended() const
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:55
void createDBMetaFile(const std::string &DBMetaFileName)
Definition: FileMgr.cpp:536
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:577
std::string fileMgrBasePath_
Definition: FileMgr.h:239
bool headerCompare(const HeaderInfo &firstElem, const HeaderInfo &secondElem)
Definition: FileMgr.cpp:49
virtual bool isDirty() const
std::string to_string(char const *&&v)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:35
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:689
AbstractBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:611
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
FILE * epochFile_
the current epoch (time of last checkpoint)
Definition: FileMgr.h:247
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:204
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
void free_page(std::pair< FileInfo *, int > &&page)
Definition: FileMgr.cpp:997
FileMgr(const int deviceId, GlobalFileMgr *gfm, const std::pair< const int, const int > fileMgrKey, const size_t num_reader_threads=0, const int epoch=-1, const size_t defaultPageSize=2097152)
Constructor.
Definition: FileMgr.cpp:67
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:680
std::vector< FileInfo * > files_
Definition: FileMgr.h:241
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:113
CHECK(cgen_state)
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:283
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
void init(const size_t num_reader_threads)
Definition: FileMgr.cpp:124
int epoch_
the index of the next file id
Definition: FileMgr.h:246
#define MAX_FILE_N_METADATA_PAGES
Definition: File.h:28
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:225
PageSizeFileMMap fileIndex_
A vector of files accessible via a file identifier.
Definition: FileMgr.h:242
#define DB_META_FILENAME
Definition: FileMgr.cpp:43
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:243
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:631
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:518
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:653
size_t fileSize(FILE *f)
Returns the size of the specified file.
Definition: File.cpp:171
virtual bool isUpdated() const
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:734
#define CHECK_LT(x, y)
Definition: Logger.h:200
bool openDBMetaFile(const std::string &DBMetaFileName)
Definition: FileMgr.cpp:547
#define MAX_FILE_N_PAGES
Definition: File.h:27
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:244
std::deque< int > epochs
Definition: Page.h:72
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:134
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:113
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
bool getDBConvert() const
Definition: FileMgr.cpp:976
void setSize(const size_t size)
void getChunkMetadataVec(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec) override
Definition: FileMgr.cpp:929
void openExistingFile(std::vector< HeaderInfo > &headerVec, const int fileMgrEpoch)
Definition: FileInfo.cpp:69
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:83
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:887
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:121
void getChunkMetadataVecForKeyPrefix(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:942
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:102
mapd_shared_mutex mutex_free_page
Definition: FileMgr.h:256
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:823
void setEpoch(int epoch)
Definition: FileMgr.cpp:992
virtual int getDeviceId() const
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:183
void writeAndSyncDBMetaToDisk()
Definition: FileMgr.cpp:568
virtual void reserve(size_t num_bytes)=0
#define EPOCH_FILENAME
Definition: FileMgr.cpp:42
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:253
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:130
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:254
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:69
A selection of helper methods for File I/O.
std::pair< const int, const int > fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:238
#define VLOG(n)
Definition: Logger.h:280
Type timer_start()
Definition: measure.h:40
FileInfo * openExistingFile(const std::string &path, const int fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:868