OmniSciDB  72180abbfe
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <algorithm>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/checked_alloc.h"
41 #include "Shared/measure.h"
42 
43 #define EPOCH_FILENAME "epoch"
44 #define DB_META_FILENAME "dbmeta"
45 
46 using namespace std;
47 
48 namespace File_Namespace {
49 
50 bool headerCompare(const HeaderInfo& firstElem, const HeaderInfo& secondElem) {
51  // HeaderInfo.first is a pair of Chunk key with a vector containing
52  // pageId and version
53  if (firstElem.chunkKey != secondElem.chunkKey) {
54  return firstElem.chunkKey < secondElem.chunkKey;
55  }
56  if (firstElem.pageId != secondElem.pageId) {
57  return firstElem.pageId < secondElem.pageId;
58  }
59  return firstElem.versionEpoch < secondElem.versionEpoch;
60 
61  /*
62  if (firstElem.first.first != secondElem.first.first)
63  return firstElem.first.first < secondElem.first.first;
64  return firstElem.first.second < secondElem.first.second;
65  */
66 }
67 
68 FileMgr::FileMgr(const int deviceId,
69  GlobalFileMgr* gfm,
70  const std::pair<const int, const int> fileMgrKey,
71  const size_t num_reader_threads,
72  const int epoch,
73  const size_t defaultPageSize)
74  : AbstractBufferMgr(deviceId)
75  , gfm_(gfm)
76  , fileMgrKey_(fileMgrKey)
77  , defaultPageSize_(defaultPageSize)
78  , nextFileId_(0)
79  , epoch_(epoch) {
80  init(num_reader_threads);
81 }
82 
83 // used only to initialize enough to drop
84 FileMgr::FileMgr(const int deviceId,
85  GlobalFileMgr* gfm,
86  const std::pair<const int, const int> fileMgrKey,
87  const bool initOnly)
88  : AbstractBufferMgr(deviceId)
89  , gfm_(gfm)
90  , fileMgrKey_(fileMgrKey)
91  , defaultPageSize_(0)
92  , nextFileId_(0)
93  , epoch_(0) {
94  const std::string fileMgrDirPrefix("table");
95  const std::string FileMgrDirDelim("_");
96  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
97  std::to_string(fileMgrKey_.first) + // db_id
98  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
99  epochFile_ = nullptr;
100  files_.clear();
101 }
102 
103 FileMgr::FileMgr(GlobalFileMgr* gfm, const size_t defaultPageSize, std::string basePath)
104  : AbstractBufferMgr(0)
105  , gfm_(gfm)
106  , fileMgrKey_(0, 0)
107  , fileMgrBasePath_(basePath)
108  , defaultPageSize_(defaultPageSize)
109  , nextFileId_(0)
110  , epoch_(-1) {
111  init(basePath);
112 }
113 
115  // checkpoint();
116  // free memory used by FileInfo objects
117  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
118  delete chunkIt->second;
119  }
120  for (auto file_info : files_) {
121  delete file_info;
122  }
123 }
124 
125 void FileMgr::init(const size_t num_reader_threads) {
126  // if epoch = -1 this means open from epoch file
127  const std::string fileMgrDirPrefix("table");
128  const std::string FileMgrDirDelim("_");
129  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
130  std::to_string(fileMgrKey_.first) + // db_id
131  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
132  boost::filesystem::path path(fileMgrBasePath_);
133  if (boost::filesystem::exists(path)) {
134  if (!boost::filesystem::is_directory(path)) {
135  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
136  << "' for table data is not a directory.";
137  }
138  if (epoch_ != -1) { // if opening at previous epoch
139  int epochCopy = epoch_;
141  epoch_ = epochCopy;
142  } else {
144  }
145 
146  auto clock_begin = timer_start();
147 
148  boost::filesystem::directory_iterator
149  endItr; // default construction yields past-the-end
150  int maxFileId = -1;
151  int fileCount = 0;
152  int threadCount = std::thread::hardware_concurrency();
153  std::vector<HeaderInfo> headerVec;
154  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
155  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
156  if (boost::filesystem::is_regular_file(fileIt->status())) {
157  // note that boost::filesystem leaves preceding dot on
158  // extension - hence MAPD_FILE_EXT is ".mapd"
159  std::string extension(fileIt->path().extension().string());
160 
161  if (extension == MAPD_FILE_EXT) {
162  std::string fileStem(fileIt->path().stem().string());
163  // remove trailing dot if any
164  if (fileStem.size() > 0 && fileStem.back() == '.') {
165  fileStem = fileStem.substr(0, fileStem.size() - 1);
166  }
167  size_t dotPos = fileStem.find_last_of("."); // should only be one
168  if (dotPos == std::string::npos) {
169  LOG(FATAL) << "File `" << fileIt->path()
170  << "` does not carry page size information in the filename.";
171  }
172  int fileId = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
173  if (fileId > maxFileId) {
174  maxFileId = fileId;
175  }
176  size_t pageSize =
177  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
178  std::string filePath(fileIt->path().string());
179  size_t fileSize = boost::filesystem::file_size(filePath);
180  CHECK_EQ(fileSize % pageSize, size_t(0)); // should be no partial pages
181  size_t numPages = fileSize / pageSize;
182 
183  VLOG(4) << "File id: " << fileId << " Page size: " << pageSize
184  << " Num pages: " << numPages;
185 
186  file_futures.emplace_back(std::async(
187  std::launch::async, [filePath, fileId, pageSize, numPages, this] {
188  std::vector<HeaderInfo> tempHeaderVec;
189  openExistingFile(filePath, fileId, pageSize, numPages, tempHeaderVec);
190  return tempHeaderVec;
191  }));
192  fileCount++;
193  if (fileCount % threadCount == 0) {
194  processFileFutures(file_futures, headerVec);
195  }
196  }
197  }
198  }
199 
200  if (file_futures.size() > 0) {
201  processFileFutures(file_futures, headerVec);
202  }
203  int64_t queue_time_ms = timer_stop(clock_begin);
204 
205  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : "
206  << queue_time_ms << "ms Epoch: " << epoch_ << " files read: " << fileCount
207  << " table location: '" << fileMgrBasePath_ << "'";
208 
209  /* Sort headerVec so that all HeaderInfos
210  * from a chunk will be grouped together
211  * and in order of increasing PageId
212  * - Version Epoch */
213 
214  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
215 
216  /* Goal of next section is to find sequences in the
217  * sorted headerVec of the same ChunkId, which we
218  * can then initiate a FileBuffer with */
219 
220  VLOG(4) << "Number of Headers in Vector: " << headerVec.size();
221  if (headerVec.size() > 0) {
222  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
223  auto startIt = headerVec.begin();
224 
225  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
226  ++headerIt) {
227  // for (auto chunkIt = headerIt->chunkKey.begin(); chunkIt !=
228  // headerIt->chunkKey.end(); ++chunkIt) {
229  // std::cout << *chunkIt << " ";
230  //}
231 
232  if (headerIt->chunkKey != lastChunkKey) {
233  chunkIndex_[lastChunkKey] =
234  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerIt);
235  /*
236  if (startIt->versionEpoch != -1) {
237  cout << "not skipping bc version != -1" << endl;
238  // -1 means that chunk was deleted
239  // lets not read it in
240  chunkIndex_[lastChunkKey] = new FileBuffer
241  (this,/lastChunkKey,startIt,headerIt);
242 
243  }
244  else {
245  cout << "Skipping bc version == -1" << endl;
246  }
247  */
248  lastChunkKey = headerIt->chunkKey;
249  startIt = headerIt;
250  }
251  }
252  // now need to insert last Chunk
253  // size_t pageSize = files_[startIt->page.fileId]->pageSize;
254  // cout << "Inserting last chunk" << endl;
255  // if (startIt->versionEpoch != -1) {
256  chunkIndex_[lastChunkKey] =
257  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerVec.end());
258  //}
259  }
260  nextFileId_ = maxFileId + 1;
261  // std::cout << "next file id: " << nextFileId_ << std::endl;
262  } else {
263  if (!boost::filesystem::create_directory(path)) {
264  LOG(FATAL) << "Could not create data directory: " << path;
265  }
267  }
268 
269  /* define number of reader threads to be used */
270  size_t num_hardware_based_threads =
271  std::thread::hardware_concurrency(); // # of threads is based on # of cores on the
272  // host
273  if (num_reader_threads == 0) { // # of threads has not been defined by user
274  num_reader_threads_ = num_hardware_based_threads;
275  } else {
276  if (num_reader_threads > num_hardware_based_threads) {
277  num_reader_threads_ = num_hardware_based_threads;
278  } else {
279  num_reader_threads_ = num_reader_threads;
280  }
281  }
282 }
283 
285  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
286  std::vector<HeaderInfo>& headerVec) {
287  for (auto& file_future : file_futures) {
288  file_future.wait();
289  }
290  // concatenate the vectors after thread completes
291  for (auto& file_future : file_futures) {
292  auto tempHeaderVec = file_future.get();
293  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
294  }
295  file_futures.clear();
296 }
297 
298 void FileMgr::init(const std::string dataPathToConvertFrom) {
299  int converted_data_epoch = 0;
300  boost::filesystem::path path(dataPathToConvertFrom);
301  if (boost::filesystem::exists(path)) {
302  if (!boost::filesystem::is_directory(path)) {
303  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
304  }
305 
306  if (epoch_ != -1) { // if opening at previous epoch
307  int epochCopy = epoch_;
309  epoch_ = epochCopy;
310  } else {
312  }
313 
314  boost::filesystem::directory_iterator
315  endItr; // default construction yields past-the-end
316  int maxFileId = -1;
317  int fileCount = 0;
318  int threadCount = std::thread::hardware_concurrency();
319  std::vector<HeaderInfo> headerVec;
320  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
321  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
322  if (boost::filesystem::is_regular_file(fileIt->status())) {
323  // note that boost::filesystem leaves preceding dot on
324  // extension - hence MAPD_FILE_EXT is ".mapd"
325  std::string extension(fileIt->path().extension().string());
326 
327  if (extension == MAPD_FILE_EXT) {
328  std::string fileStem(fileIt->path().stem().string());
329  // remove trailing dot if any
330  if (fileStem.size() > 0 && fileStem.back() == '.') {
331  fileStem = fileStem.substr(0, fileStem.size() - 1);
332  }
333  size_t dotPos = fileStem.find_last_of("."); // should only be one
334  if (dotPos == std::string::npos) {
335  LOG(FATAL) << "File `" + fileIt->path().string() +
336  "` does not carry page size information in the filename.";
337  }
338  int fileId = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
339  if (fileId > maxFileId) {
340  maxFileId = fileId;
341  }
342  size_t pageSize =
343  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
344  std::string filePath(fileIt->path().string());
345  size_t fileSize = boost::filesystem::file_size(filePath);
346  CHECK(fileSize % pageSize == 0); // should be no partial pages
347  size_t numPages = fileSize / pageSize;
348 
349  file_futures.emplace_back(std::async(
350  std::launch::async, [filePath, fileId, pageSize, numPages, this] {
351  std::vector<HeaderInfo> tempHeaderVec;
352  openExistingFile(filePath, fileId, pageSize, numPages, tempHeaderVec);
353  return tempHeaderVec;
354  }));
355  fileCount++;
356  if (fileCount % threadCount) {
357  processFileFutures(file_futures, headerVec);
358  }
359  }
360  }
361  }
362 
363  if (file_futures.size() > 0) {
364  processFileFutures(file_futures, headerVec);
365  }
366 
367  /* Sort headerVec so that all HeaderInfos
368  * from a chunk will be grouped together
369  * and in order of increasing PageId
370  * - Version Epoch */
371 
372  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
373 
374  /* Goal of next section is to find sequences in the
375  * sorted headerVec of the same ChunkId, which we
376  * can then initiate a FileBuffer with */
377 
378  if (headerVec.size() > 0) {
379  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
380  auto startIt = headerVec.begin();
381 
382  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
383  ++headerIt) {
384  if (headerIt->chunkKey != lastChunkKey) {
385  FileMgr* c_fm_ =
386  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
387  CHECK(c_fm_);
388  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerIt);
389  chunkIndex_[lastChunkKey] = srcBuf;
390  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
391  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
392  destBuf->syncEncoder(srcBuf);
393  destBuf->setSize(srcBuf->size());
394  destBuf->setDirty(); // this needs to be set to force writing out metadata
395  // files from "checkpoint()" call
396 
397  size_t totalNumPages = srcBuf->getMultiPage().size();
398  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
399  Page srcPage = srcBuf->getMultiPage()[pageNum].current();
400  Page destPage = c_fm_->requestFreePage(
401  srcBuf->pageSize(),
402  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
403  MultiPage multiPage(srcBuf->pageSize());
404  multiPage.epochs.push_back(converted_data_epoch);
405  multiPage.pageVersions.push_back(destPage);
406  destBuf->multiPages_.push_back(multiPage);
407  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
408  copyPage(
409  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
410  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
411  }
412  lastChunkKey = headerIt->chunkKey;
413  startIt = headerIt;
414  }
415  }
416 
417  // now need to insert last Chunk
418  FileMgr* c_fm_ =
419  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
420  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerVec.end());
421  chunkIndex_[lastChunkKey] = srcBuf;
422  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
423  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
424  destBuf->syncEncoder(srcBuf);
425  destBuf->setSize(srcBuf->size());
426  destBuf->setDirty(); // this needs to be set to write out metadata file from the
427  // "checkpoint()" call
428 
429  size_t totalNumPages = srcBuf->getMultiPage().size();
430  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
431  Page srcPage = srcBuf->getMultiPage()[pageNum].current();
432  Page destPage = c_fm_->requestFreePage(
433  srcBuf->pageSize(),
434  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
435  MultiPage multiPage(srcBuf->pageSize());
436  multiPage.epochs.push_back(converted_data_epoch);
437  multiPage.pageVersions.push_back(destPage);
438  destBuf->multiPages_.push_back(multiPage);
439  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
440  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
441  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
442  }
443  }
444  nextFileId_ = maxFileId + 1;
445  } else {
446  if (!boost::filesystem::create_directory(path)) {
447  LOG(FATAL) << "Specified path does not exist: " << path;
448  }
449  }
450 }
451 
453  for (auto file_info : files_) {
454  if (file_info->f) {
455  close(file_info->f);
456  file_info->f = nullptr;
457  }
458  }
459 
460  if (epochFile_) {
461  close(epochFile_);
462  epochFile_ = nullptr;
463  }
464 
465  /* rename for later deletion the directory containing table related data */
467 }
468 
469 void FileMgr::copyPage(Page& srcPage,
470  FileMgr* destFileMgr,
471  Page& destPage,
472  const size_t reservedHeaderSize,
473  const size_t numBytes,
474  const size_t offset) {
475  CHECK(offset + numBytes <= defaultPageSize_);
476  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
477  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
478  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
479 
480  size_t bytesRead = srcFileInfo->read(
481  srcPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize, numBytes, buffer);
482  CHECK(bytesRead == numBytes);
483  size_t bytesWritten = destFileInfo->write(
484  destPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize,
485  numBytes,
486  buffer);
487  CHECK(bytesWritten == numBytes);
488  ::free(buffer);
489 }
490 
491 void FileMgr::createEpochFile(const std::string& epochFileName) {
492  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
493  if (boost::filesystem::exists(epochFilePath)) {
494  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
495  }
496  epochFile_ = create(epochFilePath, sizeof(int));
497  // Write out current epoch to file - which if this
498  // function is being called should be 0
499  write(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
500  epoch_++;
501 }
502 
503 void FileMgr::openEpochFile(const std::string& epochFileName) {
504  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
505  if (!boost::filesystem::exists(epochFilePath)) {
506  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
507  }
508  if (!boost::filesystem::is_regular_file(epochFilePath)) {
509  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
510  }
511  if (boost::filesystem::file_size(epochFilePath) < 4) {
512  LOG(FATAL) << "Epoch file `" << epochFilePath
513  << "` is not sized properly (current size: "
514  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
515  }
516  epochFile_ = open(epochFilePath);
517  read(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
518  // std::cout << "Epoch after open file: " << epoch_ << std::endl;
519  epoch_++; // we are in new epoch from last checkpoint
520 }
521 
523  write(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
524  int status = fflush(epochFile_);
525  // int status = fcntl(fileno(epochFile_),51);
526  if (status != 0) {
527  LOG(FATAL) << "Could not flush epoch file to disk";
528  }
529 #ifdef __APPLE__
530  status = fcntl(fileno(epochFile_), 51);
531 #else
532  status = fsync(fileno(epochFile_));
533 #endif
534  if (status != 0) {
535  LOG(FATAL) << "Could not sync epoch file to disk";
536  }
537  ++epoch_;
538 }
539 
540 void FileMgr::createDBMetaFile(const std::string& DBMetaFileName) {
541  std::string DBMetaFilePath(fileMgrBasePath_ + "/" + DBMetaFileName);
542  if (boost::filesystem::exists(DBMetaFilePath)) {
543  LOG(FATAL) << "DB metadata file `" << DBMetaFilePath << "` already exists.";
544  }
545  DBMetaFile_ = create(DBMetaFilePath, sizeof(int));
546  int db_ver = getDBVersion();
547  write(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_ver);
548  // LOG(INFO) << "DB metadata file has been created.";
549 }
550 
551 bool FileMgr::openDBMetaFile(const std::string& DBMetaFileName) {
552  std::string DBMetaFilePath(fileMgrBasePath_ + "/" + DBMetaFileName);
553 
554  if (!boost::filesystem::exists(DBMetaFilePath)) {
555  // LOG(INFO) << "DB metadata file does not exist, one will be created.";
556  return false;
557  }
558  if (!boost::filesystem::is_regular_file(DBMetaFilePath)) {
559  // LOG(INFO) << "DB metadata file is not a regular file, one will be created.";
560  return false;
561  }
562  if (boost::filesystem::file_size(DBMetaFilePath) < 4) {
563  // LOG(INFO) << "DB metadata file is not sized properly, one will be created.";
564  return false;
565  }
566  DBMetaFile_ = open(DBMetaFilePath);
567  read(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_version_);
568 
569  return true;
570 }
571 
573  int db_ver = getDBVersion();
574  write(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_ver);
575  int status = fflush(DBMetaFile_);
576  if (status != 0) {
577  LOG(FATAL) << "Could not sync DB metadata file to disk";
578  }
579 }
580 
582  VLOG(2) << "Checkpointing epoch: " << epoch_;
583  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
584  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
585  /*
586  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
587  std::cout << *vecIt << ",";
588  }
589  cout << "Is dirty: " << chunkIt->second->isDirty_ << endl;
590  */
591  if (chunkIt->second->is_dirty_) {
592  chunkIt->second->writeMetadata(epoch_);
593  chunkIt->second->clearDirtyBits();
594  }
595  }
596  chunkIndexWriteLock.unlock();
597 
598  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
599  for (auto fileIt = files_.begin(); fileIt != files_.end(); ++fileIt) {
600  int status = (*fileIt)->syncToDisk();
601  if (status != 0) {
602  LOG(FATAL) << "Could not sync file to disk";
603  }
604  }
605 
607 
608  mapd_unique_lock<mapd_shared_mutex> freePagesWriteLock(mutex_free_page);
609  for (auto& free_page : free_pages) {
610  free_page.first->freePageDeferred(free_page.second);
611  }
612  free_pages.clear();
613 }
614 
616  const size_t pageSize,
617  const size_t numBytes) {
618  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
619  return createBufferUnlocked(key, pageSize, numBytes);
620 }
621 
622 // The underlying implementation of createBuffer needs to be lockless since
623 // some of the codepaths that call it will have already obtained a write lock
624 // and should not release it until they are complete.
626  const size_t pageSize,
627  const size_t numBytes) {
628  size_t actualPageSize = pageSize;
629  if (actualPageSize == 0) {
630  actualPageSize = defaultPageSize_;
631  }
633  // we will do this lazily and not allocate space for the Chunk (i.e.
634  // FileBuffer yet)
635 
636  if (chunkIndex_.find(key) != chunkIndex_.end()) {
637  LOG(FATAL) << "Chunk already exists for key: " << showChunk(key);
638  }
639  chunkIndex_[key] = new FileBuffer(this, actualPageSize, key, numBytes);
640  return (chunkIndex_[key]);
641 }
642 
644  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
645  return chunkIndex_.find(key) != chunkIndex_.end();
646 }
647 
648 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
649  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
650  auto chunkIt = chunkIndex_.find(key);
651  // ensure the Chunk exists
652  if (chunkIt == chunkIndex_.end()) {
653  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
654  }
655  chunkIndexWriteLock.unlock();
656  // chunkIt->second->writeMetadata(-1); // writes -1 as epoch - signifies deleted
657  if (purge) {
658  chunkIt->second->freePages();
659  }
660  //@todo need a way to represent delete in non purge case
661  delete chunkIt->second;
662  chunkIndex_.erase(chunkIt);
663 }
664 
665 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
666  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
667  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
668  if (chunkIt == chunkIndex_.end()) {
669  return; // should we throw?
670  }
671  while (chunkIt != chunkIndex_.end() &&
672  std::search(chunkIt->first.begin(),
673  chunkIt->first.begin() + keyPrefix.size(),
674  keyPrefix.begin(),
675  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
676  /*
677  cout << "Freeing pages for chunk ";
678  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
679  std::cout << *vecIt << ",";
680  }
681  cout << endl;
682  */
683  if (purge) {
684  chunkIt->second->freePages();
685  }
686  //@todo need a way to represent delete in non purge case
687  delete chunkIt->second;
688  chunkIndex_.erase(chunkIt++);
689  }
690 }
691 
692 AbstractBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t numBytes) {
693  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
694  auto chunkIt = chunkIndex_.find(key);
695  if (chunkIt == chunkIndex_.end()) {
696  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
697  }
698  return chunkIt->second;
699 }
700 
702  AbstractBuffer* destBuffer,
703  const size_t numBytes) {
704  // reads chunk specified by ChunkKey into AbstractBuffer provided by
705  // destBuffer
706  if (destBuffer->isDirty()) {
707  LOG(FATAL)
708  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
709  << showChunk(key);
710  }
711  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
712  auto chunkIt = chunkIndex_.find(key);
713  if (chunkIt == chunkIndex_.end()) {
714  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
715  }
716  chunkIndexReadLock.unlock();
717 
718  AbstractBuffer* chunk = chunkIt->second;
719  // ChunkSize is either specified in function call with numBytes or we
720  // just look at pageSize * numPages in FileBuffer
721  size_t chunkSize = numBytes == 0 ? chunk->size() : numBytes;
722  if (numBytes > 0 && numBytes > chunk->size()) {
723  LOG(FATAL) << "Chunk retrieved for key `" << showChunk(key) << "` is smaller ("
724  << chunk->size() << ") than number of bytes requested (" << numBytes
725  << ")";
726  }
727  destBuffer->reserve(chunkSize);
728  // std::cout << "After reserve chunksize: " << chunkSize << std::endl;
729  if (chunk->isUpdated()) {
730  chunk->read(destBuffer->getMemoryPtr(),
731  chunkSize,
732  0,
733  destBuffer->getType(),
734  destBuffer->getDeviceId());
735  } else {
736  chunk->read(destBuffer->getMemoryPtr() + destBuffer->size(),
737  chunkSize - destBuffer->size(),
738  destBuffer->size(),
739  destBuffer->getType(),
740  destBuffer->getDeviceId());
741  }
742  destBuffer->setSize(chunkSize);
743  destBuffer->syncEncoder(chunk);
744 }
745 
747  AbstractBuffer* srcBuffer,
748  const size_t numBytes) {
749  // obtain a pointer to the Chunk
750  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
751  auto chunkIt = chunkIndex_.find(key);
752  AbstractBuffer* chunk;
753  if (chunkIt == chunkIndex_.end()) {
755  } else {
756  chunk = chunkIt->second;
757  }
758  chunkIndexWriteLock.unlock();
759  size_t oldChunkSize = chunk->size();
760  // write the buffer's data to the Chunk
761  // size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
762  size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
763  if (chunk->isDirty()) {
764  // multiple appends are allowed,
765  // but only single update is allowed
766  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
767  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
768  "for key: "
769  << showChunk(key);
770  }
771  }
772  if (srcBuffer->isUpdated()) {
773  // chunk size is not changed when fixed rows are updated or are marked as deleted.
774  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
775  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
776  // For varlen update, it takes another route via fragmenter using disk-level buffer.
777  if (0 == numBytes && !chunk->isDirty()) {
778  chunk->setSize(newChunkSize);
779  }
780  //@todo use dirty flags to only flush pages of chunk that need to
781  // be flushed
782  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
783  newChunkSize,
784  0,
785  srcBuffer->getType(),
786  srcBuffer->getDeviceId());
787  } else if (srcBuffer->isAppended()) {
788  CHECK_LT(oldChunkSize, newChunkSize);
789  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
790  newChunkSize - oldChunkSize,
791  srcBuffer->getType(),
792  srcBuffer->getDeviceId());
793  } else {
794  UNREACHABLE() << "putBuffer() expects a buffer marked is_updated or is_appended";
795  }
796  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
797  //@todo commenting out line above will make sure this metadata is set
798  // but will trigger error on fetch chunk
799  srcBuffer->clearDirtyBits();
800  chunk->syncEncoder(srcBuffer);
801  return chunk;
802 }
803 
804 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
805  LOG(FATAL) << "Operation not supported";
806  return nullptr; // satisfy return-type warning
807 }
808 
810  LOG(FATAL) << "Operation not supported";
811 }
812 
813 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
814  std::lock_guard<std::mutex> lock(getPageMutex_);
815 
816  auto candidateFiles = fileIndex_.equal_range(pageSize);
817  int pageNum = -1;
818  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
819  FileInfo* fileInfo = files_[fileIt->second];
820  pageNum = fileInfo->getFreePage();
821  if (pageNum != -1) {
822  return (Page(fileInfo->fileId, pageNum));
823  }
824  }
825  // if here then we need to add a file
826  FileInfo* fileInfo;
827  if (isMetadata) {
828  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
829  } else {
830  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
831  }
832  pageNum = fileInfo->getFreePage();
833  CHECK(pageNum != -1);
834  return (Page(fileInfo->fileId, pageNum));
835 }
836 
837 void FileMgr::requestFreePages(size_t numPagesRequested,
838  size_t pageSize,
839  std::vector<Page>& pages,
840  const bool isMetadata) {
841  // not used currently
842  // @todo add method to FileInfo to get more than one page
843  std::lock_guard<std::mutex> lock(getPageMutex_);
844  auto candidateFiles = fileIndex_.equal_range(pageSize);
845  size_t numPagesNeeded = numPagesRequested;
846  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
847  FileInfo* fileInfo = files_[fileIt->second];
848  int pageNum;
849  do {
850  pageNum = fileInfo->getFreePage();
851  if (pageNum != -1) {
852  pages.emplace_back(fileInfo->fileId, pageNum);
853  numPagesNeeded--;
854  }
855  } while (pageNum != -1 && numPagesNeeded > 0);
856  if (numPagesNeeded == 0) {
857  break;
858  }
859  }
860  while (numPagesNeeded > 0) {
861  FileInfo* fileInfo;
862  if (isMetadata) {
863  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
864  } else {
865  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
866  }
867  int pageNum;
868  do {
869  pageNum = fileInfo->getFreePage();
870  if (pageNum != -1) {
871  pages.emplace_back(fileInfo->fileId, pageNum);
872  numPagesNeeded--;
873  }
874  } while (pageNum != -1 && numPagesNeeded > 0);
875  if (numPagesNeeded == 0) {
876  break;
877  }
878  }
879  CHECK(pages.size() == numPagesRequested);
880 }
881 
882 FileInfo* FileMgr::openExistingFile(const std::string& path,
883  const int fileId,
884  const size_t pageSize,
885  const size_t numPages,
886  std::vector<HeaderInfo>& headerVec) {
887  FILE* f = open(path);
888  FileInfo* fInfo = new FileInfo(
889  this, fileId, f, pageSize, numPages, false); // false means don't init file
890 
891  fInfo->openExistingFile(headerVec, epoch_);
892  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
893  if (fileId >= static_cast<int>(files_.size())) {
894  files_.resize(fileId + 1);
895  }
896  files_[fileId] = fInfo;
897  fileIndex_.insert(std::pair<size_t, int>(pageSize, fileId));
898  return fInfo;
899 }
900 
901 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
902  // check arguments
903  if (pageSize == 0 || numPages == 0) {
904  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
905  }
906 
907  // create the new file
908  FILE* f = create(fileMgrBasePath_,
909  nextFileId_,
910  pageSize,
911  numPages); // TM: not sure if I like naming scheme here - should be in
912  // separate namespace?
913  CHECK(f);
914 
915  // instantiate a new FileInfo for the newly created file
916  int fileId = nextFileId_++;
917  FileInfo* fInfo =
918  new FileInfo(this, fileId, f, pageSize, numPages, true); // true means init file
919  CHECK(fInfo);
920 
921  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
922  // update file manager data structures
923  files_.push_back(fInfo);
924  fileIndex_.insert(std::pair<size_t, int>(pageSize, fileId));
925 
926  CHECK(files_.back() == fInfo); // postcondition
927  return fInfo;
928 }
929 
930 FILE* FileMgr::getFileForFileId(const int fileId) {
931  CHECK(fileId >= 0 && static_cast<size_t>(fileId) < files_.size());
932  return files_[fileId]->f;
933 }
934 /*
935 void FileMgr::getAllChunkMetaInfo(std::vector<std::pair<ChunkKey, int64_t> > &metadata) {
936  metadata.reserve(chunkIndex_.size());
937  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
938  metadata.push_back(std::make_pair(chunkIt->first,
939 chunkIt->second->encoder->numElems));
940  }
941 }
942 */
944  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
945  chunkMetadataVec.reserve(chunkIndex_.size());
946  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
947  if (chunkIt->second->has_encoder) {
948  auto chunk_metadata = std::make_shared<ChunkMetadata>();
949  chunkIt->second->encoder->getMetadata(chunk_metadata);
950  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
951  }
952  }
953 }
954 
956  const ChunkKey& keyPrefix) {
957  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(
958  chunkIndexMutex_); // is this guarding the right structure? it look slike we oly
959  // read here for chunk
960  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
961  if (chunkIt == chunkIndex_.end()) {
962  return; // throw?
963  }
964  while (chunkIt != chunkIndex_.end() &&
965  std::search(chunkIt->first.begin(),
966  chunkIt->first.begin() + keyPrefix.size(),
967  keyPrefix.begin(),
968  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
969  /*
970  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
971  std::cout << *vecIt << ",";
972  }
973  cout << endl;
974  */
975  if (chunkIt->second->has_encoder) {
976  auto chunk_metadata = std::make_shared<ChunkMetadata>();
977  chunkIt->second->encoder->getMetadata(chunk_metadata);
978  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
979  }
980  chunkIt++;
981  }
982 }
983 
985  return gfm_->getDBVersion();
986 }
987 
988 bool FileMgr::getDBConvert() const {
989  return gfm_->getDBConvert();
990 }
991 
994  if (db_version_ > getDBVersion()) {
995  LOG(FATAL) << "DB forward compatibility is not supported. Version of OmniSci "
996  "software used is older than the version of DB being read: "
997  << db_version_;
998  }
999  } else {
1001  }
1002 }
1003 
1004 void FileMgr::setEpoch(int epoch) {
1005  epoch_ = epoch;
1007 }
1008 
1009 void FileMgr::free_page(std::pair<FileInfo*, int>&& page) {
1010  std::unique_lock<mapd_shared_mutex> lock(mutex_free_page);
1011  free_pages.push_back(page);
1012 }
1013 
1014 void FileMgr::removeTableRelatedDS(const int db_id, const int table_id) {
1015  UNREACHABLE();
1016 }
1017 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:137
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:35
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:804
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
std::string getBasePath() const
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:491
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:813
size_t size() const override
Definition: FileBuffer.h:139
void syncEncoder(const AbstractBuffer *src_buffer)
void openEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:503
void removeTableRelatedDS(const int db_id, const int table_id) override
Definition: FileMgr.cpp:1014
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:371
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:188
int getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:984
virtual size_t size() const =0
Stores Pair of ChunkKey and Page id and version, in a pair with a Page struct itself (File id and Pag...
Definition: Page.h:121
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:809
virtual MemoryLevel getType() const =0
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:930
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:469
#define MAPD_FILE_EXT
Definition: File.h:26
#define UNREACHABLE()
Definition: Logger.h:241
std::vector< std::pair< FileInfo *, int > > free_pages
Definition: FileMgr.h:257
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:648
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:171
std::string getFileMgrBasePath() const
Definition: FileMgr.h:228
GlobalFileMgr * gfm_
Definition: FileMgr.h:237
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:252
virtual bool isAppended() const
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:55
void createDBMetaFile(const std::string &DBMetaFileName)
Definition: FileMgr.cpp:540
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:581
std::string fileMgrBasePath_
Definition: FileMgr.h:239
bool headerCompare(const HeaderInfo &firstElem, const HeaderInfo &secondElem)
Definition: FileMgr.cpp:50
virtual bool isDirty() const
std::string to_string(char const *&&v)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:34
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:701
AbstractBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:615
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
FILE * epochFile_
the current epoch (time of last checkpoint)
Definition: FileMgr.h:247
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:202
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
void free_page(std::pair< FileInfo *, int > &&page)
Definition: FileMgr.cpp:1009
FileMgr(const int deviceId, GlobalFileMgr *gfm, const std::pair< const int, const int > fileMgrKey, const size_t num_reader_threads=0, const int epoch=-1, const size_t defaultPageSize=2097152)
Constructor.
Definition: FileMgr.cpp:68
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:692
std::vector< FileInfo * > files_
Definition: FileMgr.h:241
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:112
CHECK(cgen_state)
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:284
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:127
void init(const size_t num_reader_threads)
Definition: FileMgr.cpp:125
int epoch_
the index of the next file id
Definition: FileMgr.h:246
#define MAX_FILE_N_METADATA_PAGES
Definition: File.h:28
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:223
PageSizeFileMMap fileIndex_
A vector of files accessible via a file identifier.
Definition: FileMgr.h:242
#define DB_META_FILENAME
Definition: FileMgr.cpp:44
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:243
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:643
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:522
AbstractBufferMgr * getFileMgr(const int db_id, const int tb_id)
std::string showChunk(const ChunkKey &key)
Definition: types.h:37
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:665
size_t fileSize(FILE *f)
Returns the size of the specified file.
Definition: File.cpp:170
virtual bool isUpdated() const
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:746
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:955
#define CHECK_LT(x, y)
Definition: Logger.h:207
bool openDBMetaFile(const std::string &DBMetaFileName)
Definition: FileMgr.cpp:551
#define MAX_FILE_N_PAGES
Definition: File.h:27
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:244
std::deque< int > epochs
Definition: Page.h:72
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:134
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:114
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
bool getDBConvert() const
Definition: FileMgr.cpp:988
void setSize(const size_t size)
void openExistingFile(std::vector< HeaderInfo > &headerVec, const int fileMgrEpoch)
Definition: FileInfo.cpp:69
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:82
void getChunkMetadataVec(ChunkMetadataVector &chunkMetadataVec) override
Definition: FileMgr.cpp:943
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:901
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:120
AbstractBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:625
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:101
mapd_shared_mutex mutex_free_page
Definition: FileMgr.h:256
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:837
mapd_unique_lock< mapd_shared_mutex > write_lock
void setEpoch(int epoch)
Definition: FileMgr.cpp:1004
virtual int getDeviceId() const
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:182
void writeAndSyncDBMetaToDisk()
Definition: FileMgr.cpp:572
virtual void reserve(size_t num_bytes)=0
#define EPOCH_FILENAME
Definition: FileMgr.cpp:43
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:253
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:130
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:254
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:69
A selection of helper methods for File I/O.
std::pair< const int, const int > fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:238
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
FileInfo * openExistingFile(const std::string &path, const int fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:882