OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <future>
28 #include <string>
29 #include <thread>
30 #include <utility>
31 #include <vector>
32 
33 #include <boost/filesystem.hpp>
34 #include <boost/lexical_cast.hpp>
35 #include <boost/system/error_code.hpp>
36 
38 #include "Shared/File.h"
39 #include "Shared/checked_alloc.h"
40 #include "Shared/measure.h"
41 
42 constexpr char LEGACY_EPOCH_FILENAME[] = "epoch";
43 constexpr char EPOCH_FILENAME[] = "epoch_metadata";
44 constexpr char DB_META_FILENAME[] = "dbmeta";
45 constexpr char FILE_MGR_VERSION_FILENAME[] = "filemgr_version";
46 
47 constexpr int32_t INVALID_VERSION = -1;
48 
49 using namespace std;
50 
51 namespace File_Namespace {
52 
53 bool headerCompare(const HeaderInfo& firstElem, const HeaderInfo& secondElem) {
54  // HeaderInfo.first is a pair of Chunk key with a vector containing
55  // pageId and version
56  if (firstElem.chunkKey != secondElem.chunkKey) {
57  return firstElem.chunkKey < secondElem.chunkKey;
58  }
59  if (firstElem.pageId != secondElem.pageId) {
60  return firstElem.pageId < secondElem.pageId;
61  }
62  return firstElem.versionEpoch < secondElem.versionEpoch;
63 }
64 
65 FileMgr::FileMgr(const int32_t deviceId,
66  GlobalFileMgr* gfm,
67  const std::pair<const int32_t, const int> fileMgrKey,
68  const int32_t maxRollbackEpochs,
69  const size_t num_reader_threads,
70  const int32_t epoch,
71  const size_t defaultPageSize)
72  : AbstractBufferMgr(deviceId)
73  , gfm_(gfm)
74  , fileMgrKey_(fileMgrKey)
75  , maxRollbackEpochs_(maxRollbackEpochs)
76  , defaultPageSize_(defaultPageSize)
77  , nextFileId_(0) {
78  init(num_reader_threads, epoch);
79 }
80 
81 // used only to initialize enough to drop
82 FileMgr::FileMgr(const int32_t deviceId,
83  GlobalFileMgr* gfm,
84  const std::pair<const int32_t, const int32_t> fileMgrKey,
85  const size_t defaultPageSize,
86  const bool runCoreInit)
87  : AbstractBufferMgr(deviceId)
88  , gfm_(gfm)
89  , fileMgrKey_(fileMgrKey)
90  , maxRollbackEpochs_(-1)
91  , defaultPageSize_(defaultPageSize)
92  , nextFileId_(0) {
93  const std::string fileMgrDirPrefix("table");
94  const std::string FileMgrDirDelim("_");
95  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
96  std::to_string(fileMgrKey_.first) + // db_id
97  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
98  epochFile_ = nullptr;
99  files_.clear();
100  if (runCoreInit) {
101  coreInit();
102  }
103 }
104 
105 FileMgr::FileMgr(GlobalFileMgr* gfm, const size_t defaultPageSize, std::string basePath)
106  : AbstractBufferMgr(0)
107  , gfm_(gfm)
108  , fileMgrKey_(0, 0)
109  , maxRollbackEpochs_(-1)
110  , fileMgrBasePath_(basePath)
111  , defaultPageSize_(defaultPageSize)
112  , nextFileId_(0) {
113  init(basePath, -1);
114 }
115 
116 // For testing purposes only
117 FileMgr::FileMgr(const int epoch) : AbstractBufferMgr(-1) {
118  epoch_.ceiling(epoch);
119 }
120 
122  // free memory used by FileInfo objects
123  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
124  delete chunkIt->second;
125  }
126  for (auto file_info : files_) {
127  delete file_info;
128  }
129 
130  if (epochFile_) {
131  close(epochFile_);
132  epochFile_ = nullptr;
133  }
134 
135  if (DBMetaFile_) {
137  DBMetaFile_ = nullptr;
138  }
139 }
140 
142  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
143  const std::string fileMgrDirPrefix("table");
144  const std::string FileMgrDirDelim("_");
145  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
146  std::to_string(fileMgrKey_.first) + // db_id
147  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
148  boost::filesystem::path path(fileMgrBasePath_);
149  if (boost::filesystem::exists(path)) {
150  if (!boost::filesystem::is_directory(path)) {
151  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
152  << "' for table data is not a directory.";
153  }
156  return true;
157  }
158  return false;
159 }
160 
162  const boost::filesystem::directory_iterator& fileIterator) {
163  FileMetadata fileMetadata;
164  fileMetadata.is_data_file = false;
165  fileMetadata.file_path = fileIterator->path().string();
166  if (!boost::filesystem::is_regular_file(fileIterator->status())) {
167  return fileMetadata;
168  }
169  // note that boost::filesystem leaves preceding dot on
170  // extension - hence MAPD_FILE_EXT is ".mapd"
171  std::string extension(fileIterator->path().extension().string());
172  if (extension == MAPD_FILE_EXT) {
173  std::string fileStem(fileIterator->path().stem().string());
174  // remove trailing dot if any
175  if (fileStem.size() > 0 && fileStem.back() == '.') {
176  fileStem = fileStem.substr(0, fileStem.size() - 1);
177  }
178  size_t dotPos = fileStem.find_last_of("."); // should only be one
179  if (dotPos == std::string::npos) {
180  LOG(FATAL) << "File `" << fileIterator->path()
181  << "` does not carry page size information in the filename.";
182  }
183  fileMetadata.is_data_file = true;
184  fileMetadata.file_id = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
185  fileMetadata.page_size =
186  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
187 
188  fileMetadata.file_size = boost::filesystem::file_size(fileMetadata.file_path);
189  CHECK_EQ(fileMetadata.file_size % fileMetadata.page_size,
190  size_t(0)); // should be no partial pages
191  fileMetadata.num_pages = fileMetadata.file_size / fileMetadata.page_size;
192  }
193  return fileMetadata;
194 }
195 
196 void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) {
197  // if epochCeiling = -1 this means open from epoch file
198 
199  const bool dataExists = coreInit();
200  if (dataExists) {
201  if (epochOverride != -1) { // if opening at specified epoch
202  setEpoch(epochOverride);
203  }
204  auto clock_begin = timer_start();
205 
206  boost::filesystem::directory_iterator
207  endItr; // default construction yields past-the-end
208  int32_t maxFileId = -1;
209  int32_t fileCount = 0;
210  int32_t threadCount = std::thread::hardware_concurrency();
211  std::vector<HeaderInfo> headerVec;
212  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
213  boost::filesystem::path path(fileMgrBasePath_);
214  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
215  FileMetadata fileMetadata = getMetadataForFile(fileIt);
216  if (fileMetadata.is_data_file) {
217  maxFileId = std::max(maxFileId, fileMetadata.file_id);
218  file_futures.emplace_back(std::async(std::launch::async, [fileMetadata, this] {
219  std::vector<HeaderInfo> tempHeaderVec;
220  openExistingFile(fileMetadata.file_path,
221  fileMetadata.file_id,
222  fileMetadata.page_size,
223  fileMetadata.num_pages,
224  tempHeaderVec);
225  return tempHeaderVec;
226  }));
227  fileCount++;
228  if (fileCount % threadCount == 0) {
229  processFileFutures(file_futures, headerVec);
230  }
231  }
232  }
233 
234  if (file_futures.size() > 0) {
235  processFileFutures(file_futures, headerVec);
236  }
237  int64_t queue_time_ms = timer_stop(clock_begin);
238 
239  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : "
240  << queue_time_ms << "ms Epoch: " << epoch_.ceiling()
241  << " files read: " << fileCount << " table location: '" << fileMgrBasePath_
242  << "'";
243 
244  /* Sort headerVec so that all HeaderInfos
245  * from a chunk will be grouped together
246  * and in order of increasing PageId
247  * - Version Epoch */
248 
249  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
250 
251  /* Goal of next section is to find sequences in the
252  * sorted headerVec of the same ChunkId, which we
253  * can then initiate a FileBuffer with */
254 
255  VLOG(4) << "Number of Headers in Vector: " << headerVec.size();
256  if (headerVec.size() > 0) {
257  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
258  auto startIt = headerVec.begin();
259 
260  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
261  ++headerIt) {
262  if (headerIt->chunkKey != lastChunkKey) {
263  chunkIndex_[lastChunkKey] =
264  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerIt);
265  lastChunkKey = headerIt->chunkKey;
266  startIt = headerIt;
267  }
268  }
269  // now need to insert last Chunk
270  chunkIndex_[lastChunkKey] =
271  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerVec.end());
272  }
273  nextFileId_ = maxFileId + 1;
275  true /* shouldCheckpoint - only happens if data is rolled off */);
276  incrementEpoch();
277  mapd_unique_lock<mapd_shared_mutex> freePagesWriteLock(mutex_free_page_);
278  for (auto& free_page : free_pages_) {
279  free_page.first->freePageDeferred(free_page.second);
280  }
281  free_pages_.clear();
282  } else {
283  boost::filesystem::path path(fileMgrBasePath_);
284  if (!boost::filesystem::create_directory(path)) {
285  LOG(FATAL) << "Could not create data directory: " << path;
286  }
288  if (epochOverride != -1) {
289  epoch_.floor(epochOverride);
290  epoch_.ceiling(epochOverride);
291  } else {
292  // These are default constructor values for epoch_, but resetting here for clarity
293  epoch_.floor(0);
294  epoch_.ceiling(0);
295  }
296 
300  incrementEpoch();
301  }
302 
303  /* define number of reader threads to be used */
304  size_t num_hardware_based_threads =
305  std::thread::hardware_concurrency(); // # of threads is based on # of cores on the
306  // host
307  if (num_reader_threads == 0) { // # of threads has not been defined by user
308  num_reader_threads_ = num_hardware_based_threads;
309  } else {
310  if (num_reader_threads > num_hardware_based_threads) {
311  num_reader_threads_ = num_hardware_based_threads;
312  } else {
313  num_reader_threads_ = num_reader_threads;
314  }
315  }
316  isFullyInitted_ = true;
317 }
318 
320  StorageStats storage_stats;
321  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
322  if (!isFullyInitted_) {
323  CHECK(!fileMgrBasePath_.empty());
324  boost::filesystem::path path(fileMgrBasePath_);
325  if (boost::filesystem::exists(path)) {
326  if (!boost::filesystem::is_directory(path)) {
327  LOG(FATAL) << "getStorageStats: Specified path '" << fileMgrBasePath_
328  << "' for table data is not a directory.";
329  }
330 
331  storage_stats.epoch = lastCheckpointedEpoch();
332  storage_stats.epoch_floor = epochFloor();
333  boost::filesystem::directory_iterator
334  endItr; // default construction yields past-the-end
335  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
336  ++fileIt) {
337  FileMetadata file_metadata = getMetadataForFile(fileIt);
338  if (file_metadata.is_data_file) {
339  if (file_metadata.page_size == METADATA_PAGE_SIZE) {
340  storage_stats.metadata_file_count++;
341  storage_stats.total_metadata_file_size += file_metadata.file_size;
342  storage_stats.total_metadata_page_count += file_metadata.num_pages;
343  } else if (file_metadata.page_size == defaultPageSize_) {
344  storage_stats.data_file_count++;
345  storage_stats.total_data_file_size += file_metadata.file_size;
346  storage_stats.total_data_page_count += file_metadata.num_pages;
347  } else {
348  UNREACHABLE() << "Found file with unexpected page size. Page size: "
349  << file_metadata.page_size
350  << ", file path: " << file_metadata.file_path;
351  }
352  }
353  }
354  }
355  } else {
356  storage_stats.epoch = lastCheckpointedEpoch();
357  storage_stats.epoch_floor = epochFloor();
358 
359  // We already initialized this table so take the faster path of walking through the
360  // FileInfo objects and getting metadata from there
361  for (const auto& file_info : files_) {
362  if (file_info->pageSize == METADATA_PAGE_SIZE) {
363  storage_stats.metadata_file_count++;
364  storage_stats.total_metadata_file_size +=
365  file_info->pageSize * file_info->numPages;
366  storage_stats.total_metadata_page_count += file_info->numPages;
367  if (storage_stats.total_free_metadata_page_count) {
368  storage_stats.total_free_metadata_page_count.value() +=
369  file_info->freePages.size();
370  } else {
371  storage_stats.total_free_metadata_page_count = file_info->freePages.size();
372  }
373  } else if (file_info->pageSize == defaultPageSize_) {
374  storage_stats.data_file_count++;
375  storage_stats.total_data_file_size += file_info->pageSize * file_info->numPages;
376  storage_stats.total_data_page_count += file_info->numPages;
377  if (storage_stats.total_free_data_page_count) {
378  storage_stats.total_free_data_page_count.value() += file_info->freePages.size();
379  } else {
380  storage_stats.total_free_data_page_count = file_info->freePages.size();
381  }
382  } else {
383  UNREACHABLE() << "Found file with unexpected page size. Page size: "
384  << file_info->pageSize;
385  }
386  }
387  }
388  return storage_stats;
389 }
390 
392  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
393  std::vector<HeaderInfo>& headerVec) {
394  for (auto& file_future : file_futures) {
395  file_future.wait();
396  }
397  // concatenate the vectors after thread completes
398  for (auto& file_future : file_futures) {
399  auto tempHeaderVec = file_future.get();
400  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
401  }
402  file_futures.clear();
403 }
404 
405 void FileMgr::init(const std::string& dataPathToConvertFrom,
406  const int32_t epochOverride) {
407  int32_t converted_data_epoch = 0;
408  boost::filesystem::path path(dataPathToConvertFrom);
409  if (boost::filesystem::exists(path)) {
410  if (!boost::filesystem::is_directory(path)) {
411  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
412  }
414 
415  if (epochOverride != -1) { // if opening at previous epoch
416  setEpoch(epochOverride);
417  }
418 
419  boost::filesystem::directory_iterator
420  endItr; // default construction yields past-the-end
421  int32_t maxFileId = -1;
422  int32_t fileCount = 0;
423  int32_t threadCount = std::thread::hardware_concurrency();
424  std::vector<HeaderInfo> headerVec;
425  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
426  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
427  FileMetadata fileMetadata = getMetadataForFile(fileIt);
428  if (fileMetadata.is_data_file) {
429  maxFileId = std::max(maxFileId, fileMetadata.file_id);
430  file_futures.emplace_back(std::async(std::launch::async, [fileMetadata, this] {
431  std::vector<HeaderInfo> tempHeaderVec;
432  openExistingFile(fileMetadata.file_path,
433  fileMetadata.file_id,
434  fileMetadata.page_size,
435  fileMetadata.num_pages,
436  tempHeaderVec);
437  return tempHeaderVec;
438  }));
439  fileCount++;
440  if (fileCount % threadCount) {
441  processFileFutures(file_futures, headerVec);
442  }
443  }
444  }
445 
446  if (file_futures.size() > 0) {
447  processFileFutures(file_futures, headerVec);
448  }
449 
450  /* Sort headerVec so that all HeaderInfos
451  * from a chunk will be grouped together
452  * and in order of increasing PageId
453  * - Version Epoch */
454 
455  std::sort(headerVec.begin(), headerVec.end(), headerCompare);
456 
457  /* Goal of next section is to find sequences in the
458  * sorted headerVec of the same ChunkId, which we
459  * can then initiate a FileBuffer with */
460 
461  if (headerVec.size() > 0) {
462  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
463  auto startIt = headerVec.begin();
464 
465  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
466  ++headerIt) {
467  if (headerIt->chunkKey != lastChunkKey) {
468  FileMgr* c_fm_ =
469  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
470  CHECK(c_fm_);
471  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerIt);
472  chunkIndex_[lastChunkKey] = srcBuf;
473  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
474  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
475  destBuf->syncEncoder(srcBuf);
476  destBuf->setSize(srcBuf->size());
477  destBuf->setDirty(); // this needs to be set to force writing out metadata
478  // files from "checkpoint()" call
479 
480  size_t totalNumPages = srcBuf->getMultiPage().size();
481  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
482  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
483  Page destPage = c_fm_->requestFreePage(
484  srcBuf->pageSize(),
485  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
486  MultiPage multiPage(srcBuf->pageSize());
487  multiPage.push(destPage, converted_data_epoch);
488  destBuf->multiPages_.push_back(multiPage);
489  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
490  copyPage(
491  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
492  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
493  }
494  lastChunkKey = headerIt->chunkKey;
495  startIt = headerIt;
496  }
497  }
498 
499  // now need to insert last Chunk
500  FileMgr* c_fm_ =
501  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
502  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerVec.end());
503  chunkIndex_[lastChunkKey] = srcBuf;
504  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
505  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
506  destBuf->syncEncoder(srcBuf);
507  destBuf->setSize(srcBuf->size());
508  destBuf->setDirty(); // this needs to be set to write out metadata file from the
509  // "checkpoint()" call
510 
511  size_t totalNumPages = srcBuf->getMultiPage().size();
512  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
513  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
514  Page destPage = c_fm_->requestFreePage(
515  srcBuf->pageSize(),
516  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
517  MultiPage multiPage(srcBuf->pageSize());
518  multiPage.push(destPage, converted_data_epoch);
519  destBuf->multiPages_.push_back(multiPage);
520  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
521  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
522  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
523  }
524  }
525  nextFileId_ = maxFileId + 1;
526  } else {
527  if (!boost::filesystem::create_directory(path)) {
528  LOG(FATAL) << "Specified path does not exist: " << path;
529  }
530  }
531  isFullyInitted_ = true;
532 }
533 
535  for (auto file_info : files_) {
536  if (file_info->f) {
537  close(file_info->f);
538  file_info->f = nullptr;
539  }
540  }
541 
542  if (DBMetaFile_) {
544  DBMetaFile_ = nullptr;
545  }
546 
547  if (epochFile_) {
548  close(epochFile_);
549  epochFile_ = nullptr;
550  }
551 
552  /* rename for later deletion the directory containing table related data */
554 }
555 
556 void FileMgr::copyPage(Page& srcPage,
557  FileMgr* destFileMgr,
558  Page& destPage,
559  const size_t reservedHeaderSize,
560  const size_t numBytes,
561  const size_t offset) {
562  CHECK(offset + numBytes <= defaultPageSize_);
563  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
564  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
565  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
566 
567  size_t bytesRead = srcFileInfo->read(
568  srcPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize, numBytes, buffer);
569  CHECK(bytesRead == numBytes);
570  size_t bytesWritten = destFileInfo->write(
571  destPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize,
572  numBytes,
573  buffer);
574  CHECK(bytesWritten == numBytes);
575  ::free(buffer);
576 }
577 
578 void FileMgr::createEpochFile(const std::string& epochFileName) {
579  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
580  if (boost::filesystem::exists(epochFilePath)) {
581  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
582  }
583  epochFile_ = create(epochFilePath, sizeof(Epoch::byte_size()));
584  // Write out current epoch to file - which if this
585  // function is being called should be 0
587 }
588 
589 int32_t FileMgr::openAndReadLegacyEpochFile(const std::string& epochFileName) {
590  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
591  if (!boost::filesystem::exists(epochFilePath)) {
592  return 0;
593  }
594 
595  if (!boost::filesystem::is_regular_file(epochFilePath)) {
596  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
597  }
598  if (boost::filesystem::file_size(epochFilePath) < 4) {
599  LOG(FATAL) << "Epoch file `" << epochFilePath
600  << "` is not sized properly (current size: "
601  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
602  }
603  FILE* legacyEpochFile = open(epochFilePath);
604  int32_t epoch;
605  read(legacyEpochFile, 0, sizeof(int32_t), (int8_t*)&epoch);
606  return epoch;
607 }
608 
609 void FileMgr::openAndReadEpochFile(const std::string& epochFileName) {
610  if (!epochFile_) { // Check to see if already open
611  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
612  if (!boost::filesystem::exists(epochFilePath)) {
613  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
614  }
615  if (!boost::filesystem::is_regular_file(epochFilePath)) {
616  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
617  }
618  if (boost::filesystem::file_size(epochFilePath) != 16) {
619  LOG(FATAL) << "Epoch file `" << epochFilePath
620  << "` is not sized properly (current size: "
621  << boost::filesystem::file_size(epochFilePath) << ", expected size: 16)";
622  }
623  epochFile_ = open(epochFilePath);
624  }
626 }
627 
629  CHECK(epochFile_);
631  int32_t status = fflush(epochFile_);
632  if (status != 0) {
633  LOG(FATAL) << "Could not flush epoch file to disk";
634  }
635 #ifdef __APPLE__
636  status = fcntl(fileno(epochFile_), 51);
637 #else
638  status = omnisci::fsync(fileno(epochFile_));
639 #endif
640  if (status != 0) {
641  LOG(FATAL) << "Could not sync epoch file to disk";
642  }
643  epochIsCheckpointed_ = true;
644 }
645 
646 void FileMgr::freePagesBeforeEpoch(const int32_t minRollbackEpoch) {
647  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
648  chunkIt->second->freePagesBeforeEpoch(minRollbackEpoch);
649  }
650 }
651 
652 void FileMgr::rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint) {
653  if (maxRollbackEpochs_ >= 0) {
654  const int32_t minRollbackEpoch =
655  std::max(epochCeiling - maxRollbackEpochs_, epoch_.floor());
656  if (minRollbackEpoch > epoch_.floor()) {
657  freePagesBeforeEpoch(minRollbackEpoch);
658  epoch_.floor(minRollbackEpoch);
659  if (shouldCheckpoint) {
660  checkpoint();
661  }
662  }
663  }
664 }
665 
667  VLOG(2) << "Checkpointing table (" << fileMgrKey_.first << ", " << fileMgrKey_.second
668  << " epoch: " << epoch();
669  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
670  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
671  if (chunkIt->second->isDirty()) {
672  chunkIt->second->writeMetadata(epoch());
673  chunkIt->second->clearDirtyBits();
674  }
675  }
676  chunkIndexWriteLock.unlock();
677 
678  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
679  for (auto fileIt = files_.begin(); fileIt != files_.end(); ++fileIt) {
680  int32_t status = (*fileIt)->syncToDisk();
681  if (status != 0) {
682  LOG(FATAL) << "Could not sync file to disk";
683  }
684  }
685 
687  incrementEpoch();
688  rollOffOldData(lastCheckpointedEpoch(), false /* shouldCheckpoint */);
689 
690  mapd_unique_lock<mapd_shared_mutex> freePagesWriteLock(mutex_free_page_);
691  for (auto& free_page : free_pages_) {
692  free_page.first->freePageDeferred(free_page.second);
693  }
694  free_pages_.clear();
695 }
696 
698  const size_t pageSize,
699  const size_t numBytes) {
700  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
701  return createBufferUnlocked(key, pageSize, numBytes);
702 }
703 
704 // The underlying implementation of createBuffer needs to be lockless since
705 // some of the codepaths that call it will have already obtained a write lock
706 // and should not release it until they are complete.
708  const size_t pageSize,
709  const size_t numBytes) {
710  size_t actualPageSize = pageSize;
711  if (actualPageSize == 0) {
712  actualPageSize = defaultPageSize_;
713  }
715  // we will do this lazily and not allocate space for the Chunk (i.e.
716  // FileBuffer yet)
717 
718  if (chunkIndex_.find(key) != chunkIndex_.end()) {
719  LOG(FATAL) << "Chunk already exists for key: " << show_chunk(key);
720  }
721  chunkIndex_[key] = new FileBuffer(this, actualPageSize, key, numBytes);
722  return (chunkIndex_[key]);
723 }
724 
726  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
727  return chunkIndex_.find(key) != chunkIndex_.end();
728 }
729 
730 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
731  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
732  auto chunkIt = chunkIndex_.find(key);
733  // ensure the Chunk exists
734  if (chunkIt == chunkIndex_.end()) {
735  LOG(FATAL) << "Chunk does not exist for key: " << show_chunk(key);
736  }
737  chunkIndexWriteLock.unlock();
738  // chunkIt->second->writeMetadata(-1); // writes -1 as epoch - signifies deleted
739  if (purge) {
740  chunkIt->second->freePages();
741  }
742  //@todo need a way to represent delete in non purge case
743  delete chunkIt->second;
744  chunkIndex_.erase(chunkIt);
745 }
746 
747 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
748  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
749  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
750  if (chunkIt == chunkIndex_.end()) {
751  return; // should we throw?
752  }
753  while (chunkIt != chunkIndex_.end() &&
754  std::search(chunkIt->first.begin(),
755  chunkIt->first.begin() + keyPrefix.size(),
756  keyPrefix.begin(),
757  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
758  if (purge) {
759  chunkIt->second->freePages();
760  }
761  //@todo need a way to represent delete in non purge case
762  delete chunkIt->second;
763  chunkIndex_.erase(chunkIt++);
764  }
765 }
766 
767 FileBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t numBytes) {
768  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
769  auto chunkIt = chunkIndex_.find(key);
770  CHECK(chunkIt != chunkIndex_.end())
771  << "Chunk does not exist for key: " << show_chunk(key);
772  return chunkIt->second;
773 }
774 
776  AbstractBuffer* destBuffer,
777  const size_t numBytes) {
778  // reads chunk specified by ChunkKey into AbstractBuffer provided by
779  // destBuffer
780  if (destBuffer->isDirty()) {
781  LOG(FATAL)
782  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
783  << show_chunk(key);
784  }
785  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
786  auto chunkIt = chunkIndex_.find(key);
787  if (chunkIt == chunkIndex_.end()) {
788  LOG(FATAL) << "Chunk does not exist for key: " << show_chunk(key);
789  }
790  chunkIndexReadLock.unlock();
791 
792  AbstractBuffer* chunk = chunkIt->second;
793  // chunk's size is either specified in function call with numBytes or we
794  // just look at pageSize * numPages in FileBuffer
795  if (numBytes > 0 && numBytes > chunk->size()) {
796  LOG(FATAL) << "Chunk retrieved for key `" << show_chunk(key) << "` is smaller ("
797  << chunk->size() << ") than number of bytes requested (" << numBytes
798  << ")";
799  }
800 
801  chunk->copyTo(destBuffer, numBytes);
802 }
803 
805  AbstractBuffer* srcBuffer,
806  const size_t numBytes) {
807  // obtain a pointer to the Chunk
808  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
809  auto chunkIt = chunkIndex_.find(key);
810  FileBuffer* chunk;
811  if (chunkIt == chunkIndex_.end()) {
813  } else {
814  chunk = chunkIt->second;
815  }
816  chunkIndexWriteLock.unlock();
817  size_t oldChunkSize = chunk->size();
818  // write the buffer's data to the Chunk
819  // size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
820  size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
821  if (chunk->isDirty()) {
822  // multiple appends are allowed,
823  // but only single update is allowed
824  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
825  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
826  "for key: "
827  << show_chunk(key);
828  }
829  }
830  if (srcBuffer->isUpdated()) {
831  // chunk size is not changed when fixed rows are updated or are marked as deleted.
832  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
833  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
834  // For varlen update, it takes another route via fragmenter using disk-level buffer.
835  if (0 == numBytes && !chunk->isDirty()) {
836  chunk->setSize(newChunkSize);
837  }
838  //@todo use dirty flags to only flush pages of chunk that need to
839  // be flushed
840  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
841  newChunkSize,
842  0,
843  srcBuffer->getType(),
844  srcBuffer->getDeviceId());
845  } else if (srcBuffer->isAppended()) {
846  CHECK_LT(oldChunkSize, newChunkSize);
847  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
848  newChunkSize - oldChunkSize,
849  srcBuffer->getType(),
850  srcBuffer->getDeviceId());
851  } else {
852  UNREACHABLE() << "putBuffer() expects a buffer marked is_updated or is_appended";
853  }
854  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
855  //@todo commenting out line above will make sure this metadata is set
856  // but will trigger error on fetch chunk
857  srcBuffer->clearDirtyBits();
858  chunk->syncEncoder(srcBuffer);
859  return chunk;
860 }
861 
862 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
863  LOG(FATAL) << "Operation not supported";
864  return nullptr; // satisfy return-type warning
865 }
866 
868  LOG(FATAL) << "Operation not supported";
869 }
870 
871 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
872  std::lock_guard<std::mutex> lock(getPageMutex_);
873 
874  auto candidateFiles = fileIndex_.equal_range(pageSize);
875  int32_t pageNum = -1;
876  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
877  FileInfo* fileInfo = files_[fileIt->second];
878  pageNum = fileInfo->getFreePage();
879  if (pageNum != -1) {
880  return (Page(fileInfo->fileId, pageNum));
881  }
882  }
883  // if here then we need to add a file
884  FileInfo* fileInfo;
885  if (isMetadata) {
886  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
887  } else {
888  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
889  }
890  pageNum = fileInfo->getFreePage();
891  CHECK(pageNum != -1);
892  return (Page(fileInfo->fileId, pageNum));
893 }
894 
895 void FileMgr::requestFreePages(size_t numPagesRequested,
896  size_t pageSize,
897  std::vector<Page>& pages,
898  const bool isMetadata) {
899  // not used currently
900  // @todo add method to FileInfo to get more than one page
901  std::lock_guard<std::mutex> lock(getPageMutex_);
902  auto candidateFiles = fileIndex_.equal_range(pageSize);
903  size_t numPagesNeeded = numPagesRequested;
904  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
905  FileInfo* fileInfo = files_[fileIt->second];
906  int32_t pageNum;
907  do {
908  pageNum = fileInfo->getFreePage();
909  if (pageNum != -1) {
910  pages.emplace_back(fileInfo->fileId, pageNum);
911  numPagesNeeded--;
912  }
913  } while (pageNum != -1 && numPagesNeeded > 0);
914  if (numPagesNeeded == 0) {
915  break;
916  }
917  }
918  while (numPagesNeeded > 0) {
919  FileInfo* fileInfo;
920  if (isMetadata) {
921  fileInfo = createFile(pageSize, MAX_FILE_N_METADATA_PAGES);
922  } else {
923  fileInfo = createFile(pageSize, MAX_FILE_N_PAGES);
924  }
925  int32_t pageNum;
926  do {
927  pageNum = fileInfo->getFreePage();
928  if (pageNum != -1) {
929  pages.emplace_back(fileInfo->fileId, pageNum);
930  numPagesNeeded--;
931  }
932  } while (pageNum != -1 && numPagesNeeded > 0);
933  if (numPagesNeeded == 0) {
934  break;
935  }
936  }
937  CHECK(pages.size() == numPagesRequested);
938 }
939 
940 FileInfo* FileMgr::openExistingFile(const std::string& path,
941  const int fileId,
942  const size_t pageSize,
943  const size_t numPages,
944  std::vector<HeaderInfo>& headerVec) {
945  FILE* f = open(path);
946  FileInfo* fInfo = new FileInfo(
947  this, fileId, f, pageSize, numPages, false); // false means don't init file
948 
949  fInfo->openExistingFile(headerVec, epoch());
950  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
951  if (fileId >= static_cast<int32_t>(files_.size())) {
952  files_.resize(fileId + 1);
953  }
954  files_[fileId] = fInfo;
955  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
956  return fInfo;
957 }
958 
959 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
960  // check arguments
961  if (pageSize == 0 || numPages == 0) {
962  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
963  }
964 
965  // create the new file
966  FILE* f = create(fileMgrBasePath_,
967  nextFileId_,
968  pageSize,
969  numPages); // TM: not sure if I like naming scheme here - should be in
970  // separate namespace?
971  CHECK(f);
972 
973  // instantiate a new FileInfo for the newly created file
974  int32_t fileId = nextFileId_++;
975  FileInfo* fInfo =
976  new FileInfo(this, fileId, f, pageSize, numPages, true); // true means init file
977  CHECK(fInfo);
978 
979  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
980  // update file manager data structures
981  files_.push_back(fInfo);
982  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
983 
984  CHECK(files_.back() == fInfo); // postcondition
985  return fInfo;
986 }
987 
988 FILE* FileMgr::getFileForFileId(const int32_t fileId) {
989  CHECK(fileId >= 0 && static_cast<size_t>(fileId) < files_.size());
990  return files_[fileId]->f;
991 }
992 
994  const ChunkKey& keyPrefix) {
995  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(
996  chunkIndexMutex_); // is this guarding the right structure? it look slike we oly
997  // read here for chunk
998  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
999  if (chunkIt == chunkIndex_.end()) {
1000  return; // throw?
1001  }
1002  while (chunkIt != chunkIndex_.end() &&
1003  std::search(chunkIt->first.begin(),
1004  chunkIt->first.begin() + keyPrefix.size(),
1005  keyPrefix.begin(),
1006  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
1007  if (chunkIt->second->hasEncoder()) {
1008  auto chunk_metadata = std::make_shared<ChunkMetadata>();
1009  chunkIt->second->encoder_->getMetadata(chunk_metadata);
1010  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
1011  }
1012  chunkIt++;
1013  }
1014 }
1015 
1017  size_t num_used_pages = 0;
1018  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
1019  for (const auto file : files_) {
1020  num_used_pages += (file->numPages - file->freePages.size());
1021  }
1022  return num_used_pages;
1023 }
1024 
1026  size_t num_used_metadata_pages = 0;
1027  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
1028  for (const auto& chunkIt : chunkIndex_) {
1029  num_used_metadata_pages += chunkIt.second->numMetadataPages();
1030  }
1031  return num_used_metadata_pages;
1032 }
1033 
1035  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
1036  const auto& chunkIt = chunkIndex_.find(chunkKey);
1037  if (chunkIt != chunkIndex_.end()) {
1038  return chunkIt->second->numMetadataPages();
1039  } else {
1040  throw std::runtime_error("Chunk was not found.");
1041  }
1042 }
1043 
1044 int32_t FileMgr::getDBVersion() const {
1045  return gfm_->getDBVersion();
1046 }
1047 
1049  return gfm_->getDBConvert();
1050 }
1051 
1054 
1055  if (db_version_ > getDBVersion()) {
1056  LOG(FATAL) << "DB forward compatibility is not supported. Version of OmniSci "
1057  "software used is older than the version of DB being read: "
1058  << db_version_;
1059  }
1061  // new system, or we are moving forward versions
1062  // system wide migration would go here if required
1064  return;
1065  }
1066 }
1067 
1068 int32_t FileMgr::readVersionFromDisk(const std::string& versionFileName) const {
1069  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1070  if (!boost::filesystem::exists(versionFilePath)) {
1071  return -1;
1072  }
1073  if (!boost::filesystem::is_regular_file(versionFilePath)) {
1074  return -1;
1075  }
1076  if (boost::filesystem::file_size(versionFilePath) < 4) {
1077  return -1;
1078  }
1079  FILE* versionFile = open(versionFilePath);
1080  int32_t version;
1081  read(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1082  close(versionFile);
1083  return version;
1084 }
1085 
1086 void FileMgr::writeAndSyncVersionToDisk(const std::string& versionFileName,
1087  const int32_t version) {
1088  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1089  FILE* versionFile;
1090  if (boost::filesystem::exists(versionFilePath)) {
1091  int32_t oldVersion = readVersionFromDisk(versionFileName);
1092  LOG(INFO) << "Storage version file `" << versionFilePath
1093  << "` already exists, its current version is " << oldVersion;
1094  versionFile = open(versionFilePath);
1095  } else {
1096  versionFile = create(versionFilePath, sizeof(int32_t));
1097  }
1098  write(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1099  int32_t status = fflush(versionFile);
1100  if (status != 0) {
1101  LOG(FATAL) << "Could not flush version file " << versionFilePath << " to disk";
1102  }
1103 #ifdef __APPLE__
1104  status = fcntl(fileno(epochFile_), 51);
1105 #else
1106  status = omnisci::fsync(fileno(versionFile));
1107 #endif
1108  if (status != 0) {
1109  LOG(FATAL) << "Could not sync version file " << versionFilePath << " to disk";
1110  }
1111  close(versionFile);
1112 }
1113 
1115  const std::string versionFilePath(fileMgrBasePath_ + "/" + FILE_MGR_VERSION_FILENAME);
1116  LOG(INFO) << "Migrating file format version from 0 to 1 for `" << versionFilePath;
1121  int32_t migrationCompleteVersion = 1;
1122  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migrationCompleteVersion);
1123 }
1124 
1128  fileMgrVersion_ = 0;
1130  } else if (fileMgrVersion_ > latestFileMgrVersion_) {
1131  LOG(FATAL)
1132  << "Table storage forward compatibility is not supported. Version of OmniSci "
1133  "software used is older than the version of table being read: "
1134  << fileMgrVersion_;
1135  }
1136 
1139  switch (fileMgrVersion_) {
1140  case 0: {
1142  break;
1143  }
1144  default: {
1145  UNREACHABLE();
1146  }
1147  }
1148  fileMgrVersion_++;
1149  }
1150  }
1151 }
1152 
1153 /*
1154  * @brief sets the epoch to a user-specified value
1155  *
1156  * With the introduction of optional capped history on files, the possibility of
1157  * multiple successive rollbacks to earlier epochs means that we cannot rely on
1158  * the maxRollbackEpochs_ variable alone (initialized from a value stored in Catalog) to
1159  * guarantee that we can set an epoch at any given value. This function checks the
1160  * user-specified epoch value to ensure it is both not higher than the last checkpointed
1161  * epoch AND it is >= the epoch floor, which on an uncapped value will be the epoch the
1162  * table table was created at (default 0), and for capped tables, the lowest epoch for
1163  * which we have complete data, now materialized in the epoch metadata itself. */
1164 
1165 void FileMgr::setEpoch(const int32_t newEpoch) {
1166  if (newEpoch < epoch_.floor()) {
1167  std::stringstream error_message;
1168  error_message << "Cannot set epoch for table (" << fileMgrKey_.first << ","
1169  << fileMgrKey_.second << ") lower than the minimum rollback epoch ("
1170  << epoch_.floor() << ").";
1171  throw std::runtime_error(error_message.str());
1172  }
1173  epoch_.ceiling(newEpoch);
1175 }
1176 
1177 void FileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
1178  std::unique_lock<mapd_shared_mutex> lock(mutex_free_page_);
1179  free_pages_.push_back(page);
1180 }
1181 
1182 void FileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t table_id) {
1183  UNREACHABLE();
1184 }
1185 
1186 uint64_t FileMgr::getTotalFileSize() const {
1187  uint64_t total_size = 0;
1188  for (const auto& file : files_) {
1189  total_size += file->size();
1190  }
1191  if (epochFile_) {
1192  total_size += fileSize(epochFile_);
1193  }
1194  if (DBMetaFile_) {
1195  total_size += fileSize(DBMetaFile_);
1196  }
1197  return total_size;
1198 }
1199 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:141
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:589
size_t getNumUsedPages() const
Definition: FileMgr.cpp:1016
int32_t readVersionFromDisk(const std::string &versionFileName) const
Definition: FileMgr.cpp:1068
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator)
Definition: FileMgr.cpp:161
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:862
std::string getBasePath() const
int8_t * storage_ptr()
Definition: Epoch.h:61
mapd_shared_mutex mutex_free_page_
Definition: FileMgr.h:342
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:578
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:871
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
Definition: FileMgr.cpp:1034
void syncEncoder(const AbstractBuffer *src_buffer)
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:697
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:188
uint64_t getTotalFileSize() const
Definition: FileMgr.cpp:1186
Stores Pair of ChunkKey and Page id and version, in a pair with a Page struct itself (File id and Pag...
Definition: Page.h:133
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:867
virtual MemoryLevel getType() const =0
void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Writes the contents of source (src) into new versions of the affected logical pages.
Definition: FileBuffer.cpp:516
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:556
void migrateToLatestFileMgrVersion()
Definition: FileMgr.cpp:1125
#define MAPD_FILE_EXT
Definition: File.h:25
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
Definition: FileMgr.cpp:652
#define UNREACHABLE()
Definition: Logger.h:241
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
void init(const size_t num_reader_threads, const int32_t epochOverride)
Definition: FileMgr.cpp:196
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:730
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:176
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:302
GlobalFileMgr * gfm_
Definition: FileMgr.h:317
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:338
int32_t floor() const
Definition: Epoch.h:43
int32_t ceiling() const
Definition: Epoch.h:44
std::optional< uint64_t > total_free_data_page_count
Definition: FileMgr.h:103
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:56
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:666
std::string fileMgrBasePath_
Definition: FileMgr.h:320
bool headerCompare(const HeaderInfo &firstElem, const HeaderInfo &secondElem)
Definition: FileMgr.cpp:53
std::string to_string(char const *&&v)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:40
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:775
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
FileMgr(const int32_t deviceId, GlobalFileMgr *gfm, const std::pair< const int32_t, const int32_t > fileMgrKey, const int32_t max_rollback_epochs=-1, const size_t num_reader_threads=0, const int32_t epoch=-1, const size_t defaultPageSize=DEFAULT_PAGE_SIZE)
Constructor.
int32_t epochFloor()
Definition: FileMgr.h:247
const int32_t latestFileMgrVersion_
Definition: FileMgr.h:335
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:60
std::vector< FileInfo * > files_
Definition: FileMgr.h:322
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
Definition: FileMgr.cpp:1086
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:118
int32_t incrementEpoch()
Definition: FileMgr.h:249
constexpr char LEGACY_EPOCH_FILENAME[]
Definition: FileMgr.cpp:42
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
Definition: FileMgr.cpp:1182
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:391
StorageStats getStorageStats()
Definition: FileMgr.cpp:319
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:44
std::optional< uint64_t > total_free_metadata_page_count
Definition: FileMgr.h:99
std::pair< const int32_t, const int32_t > fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:318
int32_t fileId
Definition: Page.h:47
int32_t getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:1044
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:131
#define MAX_FILE_N_METADATA_PAGES
Definition: File.h:27
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:297
PageSizeFileMMap fileIndex_
A vector of files accessible via a file identifier.
Definition: FileMgr.h:323
constexpr char EPOCH_FILENAME[]
Definition: FileMgr.cpp:43
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
uint64_t total_metadata_page_count
Definition: FileMgr.h:98
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:988
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
Definition: FileMgr.h:343
An AbstractBuffer is a unit of data management for a data manager.
size_t getNumUsedMetadataPages() const
Definition: FileMgr.cpp:1025
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:324
int fsync(int fd)
Definition: omnisci_fs.cpp:60
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:725
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:940
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:628
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
void append(int8_t *src, const size_t numBytes, const MemoryLevel srcMemoryLevel=CPU_LEVEL, const int32_t deviceId=-1) override
Definition: FileBuffer.cpp:472
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:747
size_t fileSize(FILE *f)
Returns the size of the specified file.
Definition: File.cpp:176
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:993
FileInfo * getFileInfoForFileId(const int32_t fileId)
Definition: FileMgr.h:187
#define CHECK_LT(x, y)
Definition: Logger.h:207
#define MAX_FILE_N_PAGES
Definition: File.h:26
version
Definition: setup.py:65
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:325
FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:707
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:396
int32_t maxRollbackEpochs_
Definition: FileMgr.h:319
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:138
uint64_t total_metadata_file_size
Definition: FileMgr.h:97
void openExistingFile(std::vector< HeaderInfo > &headerVec, const int32_t fileMgrEpoch)
Definition: FileInfo.cpp:71
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:95
void openAndReadEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:609
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:66
constexpr int32_t INVALID_VERSION
Definition: FileMgr.cpp:47
static size_t byte_size()
Definition: Epoch.h:63
int32_t epoch()
Returns current value of epoch - should be one greater than recorded at last checkpoint.
Definition: FileMgr.h:245
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:121
bool getDBConvert() const
Definition: FileMgr.cpp:1048
void freePagesBeforeEpoch(const int32_t minRollbackEpoch)
Definition: FileMgr.cpp:646
void setSize(const size_t size)
void free_page(std::pair< FileInfo *, int32_t > &&page)
Definition: FileMgr.cpp:1177
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:88
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:959
size_t write(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:126
#define CHECK(condition)
Definition: Logger.h:197
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:767
constexpr char FILE_MGR_VERSION_FILENAME[]
Definition: FileMgr.cpp:45
constexpr char DB_META_FILENAME[]
Definition: FileMgr.cpp:44
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:107
void setEpoch(const int32_t newEpoch)
Definition: FileMgr.cpp:1165
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
Definition: FileMgr.cpp:141
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:895
mapd_unique_lock< mapd_shared_mutex > write_lock
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:188
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:339
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:134
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:340
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:72
A selection of helper methods for File I/O.
Epoch epoch_
the index of the next file id
Definition: FileMgr.h:327
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:804
#define VLOG(n)
Definition: Logger.h:291
Type timer_start()
Definition: measure.h:42
int32_t lastCheckpointedEpoch()
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:265
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31