OmniSciDB  8a228a1076
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <algorithm>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/checked_alloc.h"
41 #include "Shared/measure.h"
42 
43 #define EPOCH_FILENAME "epoch"
44 #define DB_META_FILENAME "dbmeta"
45 
46 using namespace std;
47 
48 namespace File_Namespace {
49 
50 bool headerCompare(const HeaderInfo& firstElem, const HeaderInfo& secondElem) {
51  // HeaderInfo.first is a pair of Chunk key with a vector containing
52  // pageId and version
53  if (firstElem.chunkKey != secondElem.chunkKey) {
54  return firstElem.chunkKey < secondElem.chunkKey;
55  }
56  if (firstElem.pageId != secondElem.pageId) {
57  return firstElem.pageId < secondElem.pageId;
58  }
59  return firstElem.versionEpoch < secondElem.versionEpoch;
60 
61  /*
62  if (firstElem.first.first != secondElem.first.first)
63  return firstElem.first.first < secondElem.first.first;
64  return firstElem.first.second < secondElem.first.second;
65  */
66 }
67 
68 FileMgr::FileMgr(const int deviceId,
69  GlobalFileMgr* gfm,
70  const std::pair<const int, const int> fileMgrKey,
71  const size_t num_reader_threads,
72  const int epoch,
73  const size_t defaultPageSize)
74  : AbstractBufferMgr(deviceId)
75  , gfm_(gfm)
76  , fileMgrKey_(fileMgrKey)
77  , defaultPageSize_(defaultPageSize)
78  , nextFileId_(0)
79  , epoch_(epoch) {
80  init(num_reader_threads);
81 }
82 
83 // used only to initialize enough to drop
84 FileMgr::FileMgr(const int deviceId,
85  GlobalFileMgr* gfm,
86  const std::pair<const int, const int> fileMgrKey,
87  const bool initOnly)
88  : AbstractBufferMgr(deviceId)
89  , gfm_(gfm)
90  , fileMgrKey_(fileMgrKey)
91  , defaultPageSize_(0)
92  , nextFileId_(0)
93  , epoch_(0) {
94  const std::string fileMgrDirPrefix("table");
95  const std::string FileMgrDirDelim("_");
96  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
97  std::to_string(fileMgrKey_.first) + // db_id
98  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
99  epochFile_ = nullptr;
100  files_.clear();
101 }
102 
103 FileMgr::FileMgr(GlobalFileMgr* gfm, const size_t defaultPageSize, std::string basePath)
104  : AbstractBufferMgr(0)
105  , gfm_(gfm)
106  , fileMgrKey_(0, 0)
107  , fileMgrBasePath_(basePath)
108  , defaultPageSize_(defaultPageSize)
109  , nextFileId_(0)
110  , epoch_(-1) {
111  init(basePath);
112 }
113 
115  // checkpoint();
116  // free memory used by FileInfo objects
117  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
118  delete chunkIt->second;
119  }
120  for (auto file_info : files_) {
121  delete file_info;
122  }
123 
124  if (epochFile_) {
125  close(epochFile_);
126  epochFile_ = nullptr;
127  }
128 
129  if (DBMetaFile_) {
131  DBMetaFile_ = nullptr;
132  }
133 }
134 
135 void FileMgr::init(const size_t num_reader_threads) {
136  // if epoch = -1 this means open from epoch file
137  const std::string fileMgrDirPrefix("table");
138  const std::string FileMgrDirDelim("_");
139  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
140  std::to_string(fileMgrKey_.first) + // db_id
141  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
142  boost::filesystem::path path(fileMgrBasePath_);
143  if (boost::filesystem::exists(path)) {
144  if (!boost::filesystem::is_directory(path)) {
145  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
146  << "' for table data is not a directory.";
147  }
148  if (epoch_ != -1) { // if opening at previous epoch
149  int epochCopy = epoch_;
151  epoch_ = epochCopy;
152  } else {
154  }
155 
156  auto clock_begin = timer_start();
157 
158  boost::filesystem::directory_iterator
159  endItr; // default construction yields past-the-end
160  int maxFileId = -1;
161  int fileCount = 0;
162  int threadCount = std::thread::hardware_concurrency();
163  std::vector<HeaderInfo> headerVec;
164  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
165  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
166  if (boost::filesystem::is_regular_file(fileIt->status())) {
167  // note that boost::filesystem leaves preceding dot on
168  // extension - hence MAPD_FILE_EXT is ".mapd"
169  std::string extension(fileIt->path().extension().string());
170 
171  if (extension == MAPD_FILE_EXT) {
172  std::string fileStem(fileIt->path().stem().string());
173  // remove trailing dot if any
174  if (fileStem.size() > 0 && fileStem.back() == '.') {
175  fileStem = fileStem.substr(0, fileStem.size() - 1);
176  }
177  size_t dotPos = fileStem.find_last_of("."); // should only be one
178  if (dotPos == std::string::npos) {
179  LOG(FATAL) << "File `" << fileIt->path()
180  << "` does not carry page size information in the filename.";
181  }
182  int fileId = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
183  if (fileId > maxFileId) {
184  maxFileId = fileId;
185  }
186  size_t pageSize =
187  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
188  std::string filePath(fileIt->path().string());
189  size_t fileSize = boost::filesystem::file_size(filePath);
190  CHECK_EQ(fileSize % pageSize, size_t(0)); // should be no partial pages
191  size_t numPages = fileSize / pageSize;
192 
193  VLOG(4) << "File id: " << fileId << " Page size: " << pageSize
194  << " Num pages: " << numPages;
195 
196  file_futures.emplace_back(std::async(
197  std::launch::async, [filePath, fileId, pageSize, numPages, this] {
198  std::vector<HeaderInfo> tempHeaderVec;
199  openExistingFile(filePath, fileId, pageSize, numPages, tempHeaderVec);
200  return tempHeaderVec;
201  }));
202  fileCount++;
203  if (fileCount % threadCount == 0) {
204  processFileFutures(file_futures, headerVec);
205  }
206  }
207  }
208  }
209 
210  if (file_futures.size() > 0) {
211  processFileFutures(file_futures, headerVec);
212  }
213  int64_t queue_time_ms = timer_stop(clock_begin);
214 
215  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : "
216  << queue_time_ms << "ms Epoch: " << epoch_ << " files read: " << fileCount
217  << " table location: '" << fileMgrBasePath_ << "'";
218 
219  /* Sort headerVec so that all HeaderInfos
220  * from a chunk will be grouped together
221  * and in order of increasing PageId
222  * - Version Epoch */
223 
224  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
225 
226  /* Goal of next section is to find sequences in the
227  * sorted headerVec of the same ChunkId, which we
228  * can then initiate a FileBuffer with */
229 
230  VLOG(4) << "Number of Headers in Vector: " << headerVec.size();
231  if (headerVec.size() > 0) {
232  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
233  auto startIt = headerVec.begin();
234 
235  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
236  ++headerIt) {
237  // for (auto chunkIt = headerIt->chunkKey.begin(); chunkIt !=
238  // headerIt->chunkKey.end(); ++chunkIt) {
239  // std::cout << *chunkIt << " ";
240  //}
241 
242  if (headerIt->chunkKey != lastChunkKey) {
243  chunkIndex_[lastChunkKey] =
244  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerIt);
245  /*
246  if (startIt->versionEpoch != -1) {
247  cout << "not skipping bc version != -1" << endl;
248  // -1 means that chunk was deleted
249  // lets not read it in
250  chunkIndex_[lastChunkKey] = new FileBuffer
251  (this,/lastChunkKey,startIt,headerIt);
252 
253  }
254  else {
255  cout << "Skipping bc version == -1" << endl;
256  }
257  */
258  lastChunkKey = headerIt->chunkKey;
259  startIt = headerIt;
260  }
261  }
262  // now need to insert last Chunk
263  // size_t pageSize = files_[startIt->page.fileId]->pageSize;
264  // cout << "Inserting last chunk" << endl;
265  // if (startIt->versionEpoch != -1) {
266  chunkIndex_[lastChunkKey] =
267  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerVec.end());
268  //}
269  }
270  nextFileId_ = maxFileId + 1;
271  // std::cout << "next file id: " << nextFileId_ << std::endl;
272  } else {
273  if (!boost::filesystem::create_directory(path)) {
274  LOG(FATAL) << "Could not create data directory: " << path;
275  }
277  }
278 
279  /* define number of reader threads to be used */
280  size_t num_hardware_based_threads =
281  std::thread::hardware_concurrency(); // # of threads is based on # of cores on the
282  // host
283  if (num_reader_threads == 0) { // # of threads has not been defined by user
284  num_reader_threads_ = num_hardware_based_threads;
285  } else {
286  if (num_reader_threads > num_hardware_based_threads) {
287  num_reader_threads_ = num_hardware_based_threads;
288  } else {
289  num_reader_threads_ = num_reader_threads;
290  }
291  }
292 }
293 
295  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
296  std::vector<HeaderInfo>& headerVec) {
297  for (auto& file_future : file_futures) {
298  file_future.wait();
299  }
300  // concatenate the vectors after thread completes
301  for (auto& file_future : file_futures) {
302  auto tempHeaderVec = file_future.get();
303  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
304  }
305  file_futures.clear();
306 }
307 
308 void FileMgr::init(const std::string dataPathToConvertFrom) {
309  int converted_data_epoch = 0;
310  boost::filesystem::path path(dataPathToConvertFrom);
311  if (boost::filesystem::exists(path)) {
312  if (!boost::filesystem::is_directory(path)) {
313  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
314  }
315 
316  if (epoch_ != -1) { // if opening at previous epoch
317  int epochCopy = epoch_;
319  epoch_ = epochCopy;
320  } else {
322  }
323 
324  boost::filesystem::directory_iterator
325  endItr; // default construction yields past-the-end
326  int maxFileId = -1;
327  int fileCount = 0;
328  int threadCount = std::thread::hardware_concurrency();
329  std::vector<HeaderInfo> headerVec;
330  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
331  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
332  if (boost::filesystem::is_regular_file(fileIt->status())) {
333  // note that boost::filesystem leaves preceding dot on
334  // extension - hence MAPD_FILE_EXT is ".mapd"
335  std::string extension(fileIt->path().extension().string());
336 
337  if (extension == MAPD_FILE_EXT) {
338  std::string fileStem(fileIt->path().stem().string());
339  // remove trailing dot if any
340  if (fileStem.size() > 0 && fileStem.back() == '.') {
341  fileStem = fileStem.substr(0, fileStem.size() - 1);
342  }
343  size_t dotPos = fileStem.find_last_of("."); // should only be one
344  if (dotPos == std::string::npos) {
345  LOG(FATAL) << "File `" + fileIt->path().string() +
346  "` does not carry page size information in the filename.";
347  }
348  int fileId = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
349  if (fileId > maxFileId) {
350  maxFileId = fileId;
351  }
352  size_t pageSize =
353  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
354  std::string filePath(fileIt->path().string());
355  size_t fileSize = boost::filesystem::file_size(filePath);
356  CHECK(fileSize % pageSize == 0); // should be no partial pages
357  size_t numPages = fileSize / pageSize;
358 
359  file_futures.emplace_back(std::async(
360  std::launch::async, [filePath, fileId, pageSize, numPages, this] {
361  std::vector<HeaderInfo> tempHeaderVec;
362  openExistingFile(filePath, fileId, pageSize, numPages, tempHeaderVec);
363  return tempHeaderVec;
364  }));
365  fileCount++;
366  if (fileCount % threadCount) {
367  processFileFutures(file_futures, headerVec);
368  }
369  }
370  }
371  }
372 
373  if (file_futures.size() > 0) {
374  processFileFutures(file_futures, headerVec);
375  }
376 
377  /* Sort headerVec so that all HeaderInfos
378  * from a chunk will be grouped together
379  * and in order of increasing PageId
380  * - Version Epoch */
381 
382  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
383 
384  /* Goal of next section is to find sequences in the
385  * sorted headerVec of the same ChunkId, which we
386  * can then initiate a FileBuffer with */
387 
388  if (headerVec.size() > 0) {
389  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
390  auto startIt = headerVec.begin();
391 
392  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
393  ++headerIt) {
394  if (headerIt->chunkKey != lastChunkKey) {
395  FileMgr* c_fm_ =
396  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
397  CHECK(c_fm_);
398  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerIt);
399  chunkIndex_[lastChunkKey] = srcBuf;
400  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
401  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
402  destBuf->syncEncoder(srcBuf);
403  destBuf->setSize(srcBuf->size());
404  destBuf->setDirty(); // this needs to be set to force writing out metadata
405  // files from "checkpoint()" call
406 
407  size_t totalNumPages = srcBuf->getMultiPage().size();
408  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
409  Page srcPage = srcBuf->getMultiPage()[pageNum].current();
410  Page destPage = c_fm_->requestFreePage(
411  srcBuf->pageSize(),
412  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
413  MultiPage multiPage(srcBuf->pageSize());
414  multiPage.epochs.push_back(converted_data_epoch);
415  multiPage.pageVersions.push_back(destPage);
416  destBuf->multiPages_.push_back(multiPage);
417  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
418  copyPage(
419  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
420  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
421  }
422  lastChunkKey = headerIt->chunkKey;
423  startIt = headerIt;
424  }
425  }
426 
427  // now need to insert last Chunk
428  FileMgr* c_fm_ =
429  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
430  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerVec.end());
431  chunkIndex_[lastChunkKey] = srcBuf;
432  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
433  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
434  destBuf->syncEncoder(srcBuf);
435  destBuf->setSize(srcBuf->size());
436  destBuf->setDirty(); // this needs to be set to write out metadata file from the
437  // "checkpoint()" call
438 
439  size_t totalNumPages = srcBuf->getMultiPage().size();
440  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
441  Page srcPage = srcBuf->getMultiPage()[pageNum].current();
442  Page destPage = c_fm_->requestFreePage(
443  srcBuf->pageSize(),
444  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
445  MultiPage multiPage(srcBuf->pageSize());
446  multiPage.epochs.push_back(converted_data_epoch);
447  multiPage.pageVersions.push_back(destPage);
448  destBuf->multiPages_.push_back(multiPage);
449  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
450  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
451  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
452  }
453  }
454  nextFileId_ = maxFileId + 1;
455  } else {
456  if (!boost::filesystem::create_directory(path)) {
457  LOG(FATAL) << "Specified path does not exist: " << path;
458  }
459  }
460 }
461 
463  for (auto file_info : files_) {
464  if (file_info->f) {
465  close(file_info->f);
466  file_info->f = nullptr;
467  }
468  }
469 
470  if (epochFile_) {
471  close(epochFile_);
472  epochFile_ = nullptr;
473  }
474 
475  /* rename for later deletion the directory containing table related data */
477 }
478 
479 void FileMgr::copyPage(Page& srcPage,
480  FileMgr* destFileMgr,
481  Page& destPage,
482  const size_t reservedHeaderSize,
483  const size_t numBytes,
484  const size_t offset) {
485  CHECK(offset + numBytes <= defaultPageSize_);
486  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
487  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
488  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
489 
490  size_t bytesRead = srcFileInfo->read(
491  srcPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize, numBytes, buffer);
492  CHECK(bytesRead == numBytes);
493  size_t bytesWritten = destFileInfo->write(
494  destPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize,
495  numBytes,
496  buffer);
497  CHECK(bytesWritten == numBytes);
498  ::free(buffer);
499 }
500 
501 void FileMgr::createEpochFile(const std::string& epochFileName) {
502  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
503  if (boost::filesystem::exists(epochFilePath)) {
504  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
505  }
506  epochFile_ = create(epochFilePath, sizeof(int));
507  // Write out current epoch to file - which if this
508  // function is being called should be 0
509  write(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
510  epoch_++;
511 }
512 
513 void FileMgr::openEpochFile(const std::string& epochFileName) {
514  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
515  if (!boost::filesystem::exists(epochFilePath)) {
516  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
517  }
518  if (!boost::filesystem::is_regular_file(epochFilePath)) {
519  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
520  }
521  if (boost::filesystem::file_size(epochFilePath) < 4) {
522  LOG(FATAL) << "Epoch file `" << epochFilePath
523  << "` is not sized properly (current size: "
524  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
525  }
526  epochFile_ = open(epochFilePath);
527  read(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
528  // std::cout << "Epoch after open file: " << epoch_ << std::endl;
529  epoch_++; // we are in new epoch from last checkpoint
530 }
531 
533  write(epochFile_, 0, sizeof(int), (int8_t*)&epoch_);
534  int status = fflush(epochFile_);
535  // int status = fcntl(fileno(epochFile_),51);
536  if (status != 0) {
537  LOG(FATAL) << "Could not flush epoch file to disk";
538  }
539 #ifdef __APPLE__
540  status = fcntl(fileno(epochFile_), 51);
541 #else
542  status = fsync(fileno(epochFile_));
543 #endif
544  if (status != 0) {
545  LOG(FATAL) << "Could not sync epoch file to disk";
546  }
547  ++epoch_;
548 }
549 
550 void FileMgr::createDBMetaFile(const std::string& DBMetaFileName) {
551  std::string DBMetaFilePath(fileMgrBasePath_ + "/" + DBMetaFileName);
552  if (boost::filesystem::exists(DBMetaFilePath)) {
553  LOG(FATAL) << "DB metadata file `" << DBMetaFilePath << "` already exists.";
554  }
555  DBMetaFile_ = create(DBMetaFilePath, sizeof(int));
556  int db_ver = getDBVersion();
557  write(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_ver);
558  // LOG(INFO) << "DB metadata file has been created.";
559 }
560 
561 bool FileMgr::openDBMetaFile(const std::string& DBMetaFileName) {
562  std::string DBMetaFilePath(fileMgrBasePath_ + "/" + DBMetaFileName);
563 
564  if (!boost::filesystem::exists(DBMetaFilePath)) {
565  // LOG(INFO) << "DB metadata file does not exist, one will be created.";
566  return false;
567  }
568  if (!boost::filesystem::is_regular_file(DBMetaFilePath)) {
569  // LOG(INFO) << "DB metadata file is not a regular file, one will be created.";
570  return false;
571  }
572  if (boost::filesystem::file_size(DBMetaFilePath) < 4) {
573  // LOG(INFO) << "DB metadata file is not sized properly, one will be created.";
574  return false;
575  }
576  DBMetaFile_ = open(DBMetaFilePath);
577  read(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_version_);
578 
579  return true;
580 }
581 
583  int db_ver = getDBVersion();
584  write(DBMetaFile_, 0, sizeof(int), (int8_t*)&db_ver);
585  int status = fflush(DBMetaFile_);
586  if (status != 0) {
587  LOG(FATAL) << "Could not sync DB metadata file to disk";
588  }
589 }
590 
592  VLOG(2) << "Checkpointing epoch: " << epoch_;
593  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
594  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
595  /*
596  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
597  std::cout << *vecIt << ",";
598  }
599  cout << "Is dirty: " << chunkIt->second->isDirty_ << endl;
600  */
601  if (chunkIt->second->is_dirty_) {
602  chunkIt->second->writeMetadata(epoch_);
603  chunkIt->second->clearDirtyBits();
604  }
605  }
606  chunkIndexWriteLock.unlock();
607 
608  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
609  for (auto fileIt = files_.begin(); fileIt != files_.end(); ++fileIt) {
610  int status = (*fileIt)->syncToDisk();
611  if (status != 0) {
612  LOG(FATAL) << "Could not sync file to disk";
613  }
614  }
615 
617 
618  mapd_unique_lock<mapd_shared_mutex> freePagesWriteLock(mutex_free_page);
619  for (auto& free_page : free_pages) {
620  free_page.first->freePageDeferred(free_page.second);
621  }
622  free_pages.clear();
623 }
624 
626  const size_t pageSize,
627  const size_t numBytes) {
628  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
629  return createBufferUnlocked(key, pageSize, numBytes);
630 }
631 
632 // The underlying implementation of createBuffer needs to be lockless since
633 // some of the codepaths that call it will have already obtained a write lock
634 // and should not release it until they are complete.
636  const size_t pageSize,
637  const size_t numBytes) {
638  size_t actualPageSize = pageSize;
639  if (actualPageSize == 0) {
640  actualPageSize = defaultPageSize_;
641  }
643  // we will do this lazily and not allocate space for the Chunk (i.e.
644  // FileBuffer yet)
645 
646  if (chunkIndex_.find(key) != chunkIndex_.end()) {
647  LOG(FATAL) << "Chunk already exists for key: " << showChunk(key);
648  }
649  chunkIndex_[key] = new FileBuffer(this, actualPageSize, key, numBytes);
650  return (chunkIndex_[key]);
651 }
652 
654  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
655  return chunkIndex_.find(key) != chunkIndex_.end();
656 }
657 
658 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
659  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
660  auto chunkIt = chunkIndex_.find(key);
661  // ensure the Chunk exists
662  if (chunkIt == chunkIndex_.end()) {
663  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
664  }
665  chunkIndexWriteLock.unlock();
666  // chunkIt->second->writeMetadata(-1); // writes -1 as epoch - signifies deleted
667  if (purge) {
668  chunkIt->second->freePages();
669  }
670  //@todo need a way to represent delete in non purge case
671  delete chunkIt->second;
672  chunkIndex_.erase(chunkIt);
673 }
674 
675 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
676  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
677  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
678  if (chunkIt == chunkIndex_.end()) {
679  return; // should we throw?
680  }
681  while (chunkIt != chunkIndex_.end() &&
682  std::search(chunkIt->first.begin(),
683  chunkIt->first.begin() + keyPrefix.size(),
684  keyPrefix.begin(),
685  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
686  /*
687  cout << "Freeing pages for chunk ";
688  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
689  std::cout << *vecIt << ",";
690  }
691  cout << endl;
692  */
693  if (purge) {
694  chunkIt->second->freePages();
695  }
696  //@todo need a way to represent delete in non purge case
697  delete chunkIt->second;
698  chunkIndex_.erase(chunkIt++);
699  }
700 }
701 
702 AbstractBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t numBytes) {
703  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
704  auto chunkIt = chunkIndex_.find(key);
705  if (chunkIt == chunkIndex_.end()) {
706  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
707  }
708  return chunkIt->second;
709 }
710 
712  AbstractBuffer* destBuffer,
713  const size_t numBytes) {
714  // reads chunk specified by ChunkKey into AbstractBuffer provided by
715  // destBuffer
716  if (destBuffer->isDirty()) {
717  LOG(FATAL)
718  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
719  << showChunk(key);
720  }
721  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
722  auto chunkIt = chunkIndex_.find(key);
723  if (chunkIt == chunkIndex_.end()) {
724  LOG(FATAL) << "Chunk does not exist for key: " << showChunk(key);
725  }
726  chunkIndexReadLock.unlock();
727 
728  AbstractBuffer* chunk = chunkIt->second;
729  // ChunkSize is either specified in function call with numBytes or we
730  // just look at pageSize * numPages in FileBuffer
731  size_t chunkSize = numBytes == 0 ? chunk->size() : numBytes;
732  if (numBytes > 0 && numBytes > chunk->size()) {
733  LOG(FATAL) << "Chunk retrieved for key `" << showChunk(key) << "` is smaller ("
734  << chunk->size() << ") than number of bytes requested (" << numBytes
735  << ")";
736  }
737  destBuffer->reserve(chunkSize);
738  // std::cout << "After reserve chunksize: " << chunkSize << std::endl;
739  if (chunk->isUpdated()) {
740  chunk->read(destBuffer->getMemoryPtr(),
741  chunkSize,
742  0,
743  destBuffer->getType(),
744  destBuffer->getDeviceId());
745  } else {
746  chunk->read(destBuffer->getMemoryPtr() + destBuffer->size(),
747  chunkSize - destBuffer->size(),
748  destBuffer->size(),
749  destBuffer->getType(),
750  destBuffer->getDeviceId());
751  }
752  destBuffer->setSize(chunkSize);
753  destBuffer->syncEncoder(chunk);
754 }
755 
757  AbstractBuffer* srcBuffer,
758  const size_t numBytes) {
759  // obtain a pointer to the Chunk
760  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
761  auto chunkIt = chunkIndex_.find(key);
762  AbstractBuffer* chunk;
763  if (chunkIt == chunkIndex_.end()) {
765  } else {
766  chunk = chunkIt->second;
767  }
768  chunkIndexWriteLock.unlock();
769  size_t oldChunkSize = chunk->size();
770  // write the buffer's data to the Chunk
771  // size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
772  size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
773  if (chunk->isDirty()) {
774  // multiple appends are allowed,
775  // but only single update is allowed
776  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
777  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
778  "for key: "
779  << showChunk(key);
780  }
781  }
782  if (srcBuffer->isUpdated()) {
783  // chunk size is not changed when fixed rows are updated or are marked as deleted.
784  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
785  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
786  // For varlen update, it takes another route via fragmenter using disk-level buffer.
787  if (0 == numBytes && !chunk->isDirty()) {
788  chunk->setSize(newChunkSize);
789  }
790  //@todo use dirty flags to only flush pages of chunk that need to
791  // be flushed
792  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
793  newChunkSize,
794  0,
795  srcBuffer->getType(),
796  srcBuffer->getDeviceId());
797  } else if (srcBuffer->isAppended()) {
798  CHECK_LT(oldChunkSize, newChunkSize);
799  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
800  newChunkSize - oldChunkSize,
801  srcBuffer->getType(),
802  srcBuffer->getDeviceId());
803  } else {
804  UNREACHABLE() << "putBuffer() expects a buffer marked is_updated or is_appended";
805  }
806  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
807  //@todo commenting out line above will make sure this metadata is set
808  // but will trigger error on fetch chunk
809  srcBuffer->clearDirtyBits();
810  chunk->syncEncoder(srcBuffer);
811  return chunk;
812 }
813 
814 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
815  LOG(FATAL) << "Operation not supported";
816  return nullptr; // satisfy return-type warning
817 }
818 
820  LOG(FATAL) << "Operation not supported";
821 }
822 
823 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
824  std::lock_guard<std::mutex> lock(getPageMutex_);
825 
826  auto candidateFiles = fileIndex_.equal_range(pageSize);
827  int pageNum = -1;
828  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
829  FileInfo* fileInfo = files_[fileIt->second];
830  pageNum = fileInfo->getFreePage();
831  if (pageNum != -1) {
832  return (Page(fileInfo->fileId, pageNum));
833  }
834  }
835  // if here then we need to add a file
836  FileInfo* fileInfo;
837  if (isMetadata) {
838  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
839  } else {
840  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
841  }
842  pageNum = fileInfo->getFreePage();
843  CHECK(pageNum != -1);
844  return (Page(fileInfo->fileId, pageNum));
845 }
846 
847 void FileMgr::requestFreePages(size_t numPagesRequested,
848  size_t pageSize,
849  std::vector<Page>& pages,
850  const bool isMetadata) {
851  // not used currently
852  // @todo add method to FileInfo to get more than one page
853  std::lock_guard<std::mutex> lock(getPageMutex_);
854  auto candidateFiles = fileIndex_.equal_range(pageSize);
855  size_t numPagesNeeded = numPagesRequested;
856  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
857  FileInfo* fileInfo = files_[fileIt->second];
858  int pageNum;
859  do {
860  pageNum = fileInfo->getFreePage();
861  if (pageNum != -1) {
862  pages.emplace_back(fileInfo->fileId, pageNum);
863  numPagesNeeded--;
864  }
865  } while (pageNum != -1 && numPagesNeeded > 0);
866  if (numPagesNeeded == 0) {
867  break;
868  }
869  }
870  while (numPagesNeeded > 0) {
871  FileInfo* fileInfo;
872  if (isMetadata) {
873  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
874  } else {
875  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
876  }
877  int pageNum;
878  do {
879  pageNum = fileInfo->getFreePage();
880  if (pageNum != -1) {
881  pages.emplace_back(fileInfo->fileId, pageNum);
882  numPagesNeeded--;
883  }
884  } while (pageNum != -1 && numPagesNeeded > 0);
885  if (numPagesNeeded == 0) {
886  break;
887  }
888  }
889  CHECK(pages.size() == numPagesRequested);
890 }
891 
892 FileInfo* FileMgr::openExistingFile(const std::string& path,
893  const int fileId,
894  const size_t pageSize,
895  const size_t numPages,
896  std::vector<HeaderInfo>& headerVec) {
897  FILE* f = open(path);
898  FileInfo* fInfo = new FileInfo(
899  this, fileId, f, pageSize, numPages, false); // false means don't init file
900 
901  fInfo->openExistingFile(headerVec, epoch_);
902  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
903  if (fileId >= static_cast<int>(files_.size())) {
904  files_.resize(fileId + 1);
905  }
906  files_[fileId] = fInfo;
907  fileIndex_.insert(std::pair<size_t, int>(pageSize, fileId));
908  return fInfo;
909 }
910 
911 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
912  // check arguments
913  if (pageSize == 0 || numPages == 0) {
914  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
915  }
916 
917  // create the new file
918  FILE* f = create(fileMgrBasePath_,
919  nextFileId_,
920  pageSize,
921  numPages); // TM: not sure if I like naming scheme here - should be in
922  // separate namespace?
923  CHECK(f);
924 
925  // instantiate a new FileInfo for the newly created file
926  int fileId = nextFileId_++;
927  FileInfo* fInfo =
928  new FileInfo(this, fileId, f, pageSize, numPages, true); // true means init file
929  CHECK(fInfo);
930 
931  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
932  // update file manager data structures
933  files_.push_back(fInfo);
934  fileIndex_.insert(std::pair<size_t, int>(pageSize, fileId));
935 
936  CHECK(files_.back() == fInfo); // postcondition
937  return fInfo;
938 }
939 
940 FILE* FileMgr::getFileForFileId(const int fileId) {
941  CHECK(fileId >= 0 && static_cast<size_t>(fileId) < files_.size());
942  return files_[fileId]->f;
943 }
944 /*
945 void FileMgr::getAllChunkMetaInfo(std::vector<std::pair<ChunkKey, int64_t> > &metadata) {
946  metadata.reserve(chunkIndex_.size());
947  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
948  metadata.push_back(std::make_pair(chunkIt->first,
949 chunkIt->second->encoder->numElems));
950  }
951 }
952 */
954  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
955  chunkMetadataVec.reserve(chunkIndex_.size());
956  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
957  if (chunkIt->second->has_encoder) {
958  auto chunk_metadata = std::make_shared<ChunkMetadata>();
959  chunkIt->second->encoder->getMetadata(chunk_metadata);
960  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
961  }
962  }
963 }
964 
966  const ChunkKey& keyPrefix) {
967  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(
968  chunkIndexMutex_); // is this guarding the right structure? it look slike we oly
969  // read here for chunk
970  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
971  if (chunkIt == chunkIndex_.end()) {
972  return; // throw?
973  }
974  while (chunkIt != chunkIndex_.end() &&
975  std::search(chunkIt->first.begin(),
976  chunkIt->first.begin() + keyPrefix.size(),
977  keyPrefix.begin(),
978  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
979  /*
980  for (auto vecIt = chunkIt->first.begin(); vecIt != chunkIt->first.end(); ++vecIt) {
981  std::cout << *vecIt << ",";
982  }
983  cout << endl;
984  */
985  if (chunkIt->second->has_encoder) {
986  auto chunk_metadata = std::make_shared<ChunkMetadata>();
987  chunkIt->second->encoder->getMetadata(chunk_metadata);
988  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
989  }
990  chunkIt++;
991  }
992 }
993 
995  return gfm_->getDBVersion();
996 }
997 
998 bool FileMgr::getDBConvert() const {
999  return gfm_->getDBConvert();
1000 }
1001 
1004  if (db_version_ > getDBVersion()) {
1005  LOG(FATAL) << "DB forward compatibility is not supported. Version of OmniSci "
1006  "software used is older than the version of DB being read: "
1007  << db_version_;
1008  }
1009  } else {
1011  }
1012 }
1013 
1015  epoch_ = epoch;
1017 }
1018 
1019 void FileMgr::free_page(std::pair<FileInfo*, int>&& page) {
1020  std::unique_lock<mapd_shared_mutex> lock(mutex_free_page);
1021  free_pages.push_back(page);
1022 }
1023 
1024 void FileMgr::removeTableRelatedDS(const int db_id, const int table_id) {
1025  UNREACHABLE();
1026 }
1027 } // namespace File_Namespace
std::string getBasePath() const
#define CHECK_EQ(x, y)
Definition: Logger.h:205
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:132
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:814
FileInfo * getFileInfoForFileId(const int fileId)
Definition: FileMgr.h:155
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:501
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:823
size_t size() const override
Definition: FileBuffer.h:141
void syncEncoder(const AbstractBuffer *src_buffer)
void openEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:513
void removeTableRelatedDS(const int db_id, const int table_id) override
Definition: FileMgr.cpp:1024
void writeHeader(Page &page, const int pageId, const int epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:374
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:188
virtual size_t size() const =0
Stores Pair of ChunkKey and Page id and version, in a pair with a Page struct itself (File id and Pag...
Definition: Page.h:121
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:819
virtual MemoryLevel getType() const =0
FILE * getFileForFileId(const int fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:940
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:479
#define MAPD_FILE_EXT
Definition: File.h:25
#define UNREACHABLE()
Definition: Logger.h:241
int getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:994
std::vector< std::pair< FileInfo *, int > > free_pages
Definition: FileMgr.h:257
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:46
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:658
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:173
GlobalFileMgr * gfm_
Definition: FileMgr.h:237
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:252
bool getDBConvert() const
Definition: FileMgr.cpp:998
std::string showChunk(const ChunkKey &key)
Definition: types.h:62
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:55
void createDBMetaFile(const std::string &DBMetaFileName)
Definition: FileMgr.cpp:550
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:591
std::string fileMgrBasePath_
Definition: FileMgr.h:239
bool headerCompare(const HeaderInfo &firstElem, const HeaderInfo &secondElem)
Definition: FileMgr.cpp:50
std::string to_string(char const *&&v)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:39
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:711
AbstractBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:625
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
FILE * epochFile_
the current epoch (time of last checkpoint)
Definition: FileMgr.h:247
int epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:202
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:59
void free_page(std::pair< FileInfo *, int > &&page)
Definition: FileMgr.cpp:1019
FileMgr(const int deviceId, GlobalFileMgr *gfm, const std::pair< const int, const int > fileMgrKey, const size_t num_reader_threads=0, const int epoch=-1, const size_t defaultPageSize=2097152)
Constructor.
Definition: FileMgr.cpp:68
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:702
std::vector< FileInfo * > files_
Definition: FileMgr.h:241
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:117
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:294
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:129
void init(const size_t num_reader_threads)
Definition: FileMgr.cpp:135
int epoch_
the index of the next file id
Definition: FileMgr.h:246
#define MAX_FILE_N_METADATA_PAGES
Definition: File.h:27
std::string getFileMgrBasePath() const
Definition: FileMgr.h:228
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:223
PageSizeFileMMap fileIndex_
A vector of files accessible via a file identifier.
Definition: FileMgr.h:242
#define DB_META_FILENAME
Definition: FileMgr.cpp:44
virtual void setSize(const size_t size)
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:243
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:653
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:532
AbstractBufferMgr * getFileMgr(const int db_id, const int tb_id)
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:139
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:675
size_t fileSize(FILE *f)
Returns the size of the specified file.
Definition: File.cpp:175
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:756
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:965
#define CHECK_LT(x, y)
Definition: Logger.h:207
bool openDBMetaFile(const std::string &DBMetaFileName)
Definition: FileMgr.cpp:561
#define MAX_FILE_N_PAGES
Definition: File.h:26
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:244
std::deque< int > epochs
Definition: Page.h:72
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:64
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:114
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
virtual int getDeviceId() const
void openExistingFile(std::vector< HeaderInfo > &headerVec, const int fileMgrEpoch)
Definition: FileInfo.cpp:69
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:87
void getChunkMetadataVec(ChunkMetadataVector &chunkMetadataVec) override
Definition: FileMgr.cpp:953
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:911
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:125
#define CHECK(condition)
Definition: Logger.h:197
virtual bool isAppended() const
AbstractBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:635
std::vector< int > ChunkKey
Definition: types.h:35
virtual bool isDirty() const
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:106
mapd_shared_mutex mutex_free_page
Definition: FileMgr.h:256
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:847
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
mapd_unique_lock< mapd_shared_mutex > write_lock
void setEpoch(int epoch)
Definition: FileMgr.cpp:1014
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:136
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_<EPOCH>_<oldname>.
Definition: File.cpp:187
void writeAndSyncDBMetaToDisk()
Definition: FileMgr.cpp:582
virtual void reserve(size_t num_bytes)=0
#define EPOCH_FILENAME
Definition: FileMgr.cpp:43
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:253
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:254
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:69
A selection of helper methods for File I/O.
virtual bool isUpdated() const
std::pair< const int, const int > fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:238
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:40
FileInfo * openExistingFile(const std::string &path, const int fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:892