OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <fstream>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/checked_alloc.h"
41 #include "Shared/measure.h"
42 
43 using namespace std;
44 
45 namespace File_Namespace {
46 
47 FileMgr::FileMgr(const int32_t deviceId,
48  GlobalFileMgr* gfm,
49  const TablePair fileMgrKey,
50  const int32_t maxRollbackEpochs,
51  const size_t num_reader_threads,
52  const int32_t epoch,
53  const size_t defaultPageSize)
54  : AbstractBufferMgr(deviceId)
55  , maxRollbackEpochs_(maxRollbackEpochs)
56  , defaultPageSize_(defaultPageSize)
57  , nextFileId_(0)
58  , gfm_(gfm)
59  , fileMgrKey_(fileMgrKey) {
60  init(num_reader_threads, epoch);
61 }
62 
63 // used only to initialize enough to drop
64 FileMgr::FileMgr(const int32_t deviceId,
65  GlobalFileMgr* gfm,
66  const TablePair fileMgrKey,
67  const size_t defaultPageSize,
68  const bool runCoreInit)
69  : AbstractBufferMgr(deviceId)
70  , maxRollbackEpochs_(-1)
71  , defaultPageSize_(defaultPageSize)
72  , nextFileId_(0)
73  , gfm_(gfm)
74  , fileMgrKey_(fileMgrKey) {
75  const std::string fileMgrDirPrefix("table");
76  const std::string FileMgrDirDelim("_");
77  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
78  std::to_string(fileMgrKey_.first) + // db_id
79  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
80  epochFile_ = nullptr;
81  files_.clear();
82  if (runCoreInit) {
83  coreInit();
84  }
85 }
86 
87 FileMgr::FileMgr(GlobalFileMgr* gfm, const size_t defaultPageSize, std::string basePath)
88  : AbstractBufferMgr(0)
89  , maxRollbackEpochs_(-1)
90  , fileMgrBasePath_(basePath)
91  , defaultPageSize_(defaultPageSize)
92  , nextFileId_(0)
93  , gfm_(gfm)
94  , fileMgrKey_(0, 0) {
95  init(basePath, -1);
96 }
97 
98 // For testing purposes only
99 FileMgr::FileMgr(const int epoch) : AbstractBufferMgr(-1) {
100  epoch_.ceiling(epoch);
101 }
102 
103 // Used to initialize CachingFileMgr.
104 FileMgr::FileMgr() : AbstractBufferMgr(0) {}
105 
107  // free memory used by FileInfo objects
108  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
109  delete chunkIt->second;
110  }
111  for (auto file_info_entry : files_) {
112  delete file_info_entry.second;
113  }
114 
115  if (epochFile_) {
116  close(epochFile_);
117  epochFile_ = nullptr;
118  }
119 
120  if (DBMetaFile_) {
122  DBMetaFile_ = nullptr;
123  }
124 }
125 
127  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
128  const std::string fileMgrDirPrefix("table");
129  const std::string FileMgrDirDelim("_");
130  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
131  std::to_string(fileMgrKey_.first) + // db_id
132  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
133  boost::filesystem::path path(fileMgrBasePath_);
134  if (boost::filesystem::exists(path)) {
135  if (!boost::filesystem::is_directory(path)) {
136  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
137  << "' for table data is not a directory.";
138  }
141  return true;
142  }
143  return false;
144 }
145 
147  const boost::filesystem::directory_iterator& fileIterator) {
148  FileMetadata fileMetadata;
149  fileMetadata.is_data_file = false;
150  fileMetadata.file_path = fileIterator->path().string();
151  if (!boost::filesystem::is_regular_file(fileIterator->status())) {
152  return fileMetadata;
153  }
154  // note that boost::filesystem leaves preceding dot on
155  // extension - hence MAPD_FILE_EXT is ".mapd"
156  std::string extension(fileIterator->path().extension().string());
157  if (extension == MAPD_FILE_EXT) {
158  std::string fileStem(fileIterator->path().stem().string());
159  // remove trailing dot if any
160  if (fileStem.size() > 0 && fileStem.back() == '.') {
161  fileStem = fileStem.substr(0, fileStem.size() - 1);
162  }
163  size_t dotPos = fileStem.find_last_of("."); // should only be one
164  if (dotPos == std::string::npos) {
165  LOG(FATAL) << "File `" << fileIterator->path()
166  << "` does not carry page size information in the filename.";
167  }
168  fileMetadata.is_data_file = true;
169  fileMetadata.file_id = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
170  fileMetadata.page_size =
171  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
172 
173  fileMetadata.file_size = boost::filesystem::file_size(fileMetadata.file_path);
174  CHECK_EQ(fileMetadata.file_size % fileMetadata.page_size,
175  size_t(0)); // should be no partial pages
176  fileMetadata.num_pages = fileMetadata.file_size / fileMetadata.page_size;
177  }
178  return fileMetadata;
179 }
180 
181 namespace {
182 bool is_compaction_status_file(const std::string& file_name) {
183  return (file_name == FileMgr::COPY_PAGES_STATUS ||
186 }
187 } // namespace
188 
190  auto clock_begin = timer_start();
191  boost::filesystem::directory_iterator
192  end_itr; // default construction yields past-the-end
194  result.max_file_id = -1;
195  int32_t file_count = 0;
196  int32_t thread_count = std::thread::hardware_concurrency();
197  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
198  boost::filesystem::path path(fileMgrBasePath_);
199  for (boost::filesystem::directory_iterator file_it(path); file_it != end_itr;
200  ++file_it) {
201  FileMetadata file_metadata = getMetadataForFile(file_it);
202  if (file_metadata.is_data_file) {
203  result.max_file_id = std::max(result.max_file_id, file_metadata.file_id);
204  file_futures.emplace_back(std::async(std::launch::async, [file_metadata, this] {
205  std::vector<HeaderInfo> temp_header_vec;
206  openExistingFile(file_metadata.file_path,
207  file_metadata.file_id,
208  file_metadata.page_size,
209  file_metadata.num_pages,
210  temp_header_vec);
211  return temp_header_vec;
212  }));
213  file_count++;
214  if (file_count % thread_count == 0) {
215  processFileFutures(file_futures, result.header_infos);
216  }
217  }
218 
219  if (is_compaction_status_file(file_it->path().filename().string())) {
220  CHECK(result.compaction_status_file_name.empty());
221  result.compaction_status_file_name = file_it->path().filename().string();
222  }
223  }
224 
225  if (file_futures.size() > 0) {
226  processFileFutures(file_futures, result.header_infos);
227  }
228 
229  int64_t queue_time_ms = timer_stop(clock_begin);
230  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : " << queue_time_ms
231  << "ms Epoch: " << epoch_.ceiling() << " files read: " << file_count
232  << " table location: '" << fileMgrBasePath_ << "'";
233  return result;
234 }
235 
237  for (auto file_info_entry : files_) {
238  auto file_info = file_info_entry.second;
239  if (file_info->f) {
240  close(file_info->f);
241  file_info->f = nullptr;
242  }
243  delete file_info;
244  }
245  files_.clear();
246  fileIndex_.clear();
247 }
248 
249 void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) {
250  // if epochCeiling = -1 this means open from epoch file
251 
252  const bool dataExists = coreInit();
253  if (dataExists) {
254  if (epochOverride != -1) { // if opening at specified epoch
255  setEpoch(epochOverride);
256  }
257 
258  auto open_files_result = openFiles();
259  if (!open_files_result.compaction_status_file_name.empty()) {
260  resumeFileCompaction(open_files_result.compaction_status_file_name);
261  clearFileInfos();
262  open_files_result = openFiles();
263  CHECK(open_files_result.compaction_status_file_name.empty());
264  }
265 
266  /* Sort headerVec so that all HeaderInfos
267  * from a chunk will be grouped together
268  * and in order of increasing PageId
269  * - Version Epoch */
270  auto& header_vec = open_files_result.header_infos;
271  std::sort(header_vec.begin(), header_vec.end());
272 
273  /* Goal of next section is to find sequences in the
274  * sorted headerVec of the same ChunkId, which we
275  * can then initiate a FileBuffer with */
276 
277  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
278  if (header_vec.size() > 0) {
279  ChunkKey lastChunkKey = header_vec.begin()->chunkKey;
280  auto startIt = header_vec.begin();
281 
282  for (auto headerIt = header_vec.begin() + 1; headerIt != header_vec.end();
283  ++headerIt) {
284  if (headerIt->chunkKey != lastChunkKey) {
285  createBufferFromHeaders(lastChunkKey, startIt, headerIt);
286  lastChunkKey = headerIt->chunkKey;
287  startIt = headerIt;
288  }
289  }
290  // now need to insert last Chunk
291  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
292  }
293  nextFileId_ = open_files_result.max_file_id + 1;
294  rollOffOldData(epoch(), true /* only checkpoint if data is rolled off */);
295  incrementEpoch();
296  freePages();
297  } else {
298  boost::filesystem::path path(fileMgrBasePath_);
299  if (!boost::filesystem::create_directory(path)) {
300  LOG(FATAL) << "Could not create data directory: " << path;
301  }
303  if (epochOverride != -1) {
304  epoch_.floor(epochOverride);
305  epoch_.ceiling(epochOverride);
306  } else {
307  // These are default constructor values for epoch_, but resetting here for clarity
308  epoch_.floor(0);
309  epoch_.ceiling(0);
310  }
313  incrementEpoch();
314  }
315 
316  initializeNumThreads(num_reader_threads);
317  isFullyInitted_ = true;
318 }
319 
320 namespace {
322  size_t page_size,
323  size_t num_pages_per_metadata_file) {
324  return (file_size == (METADATA_PAGE_SIZE * num_pages_per_metadata_file) &&
325  page_size == METADATA_PAGE_SIZE);
326 }
327 } // namespace
328 
330  StorageStats storage_stats;
331  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
332  if (!isFullyInitted_) {
333  CHECK(!fileMgrBasePath_.empty());
334  boost::filesystem::path path(fileMgrBasePath_);
335  if (boost::filesystem::exists(path)) {
336  if (!boost::filesystem::is_directory(path)) {
337  LOG(FATAL) << "getStorageStats: Specified path '" << fileMgrBasePath_
338  << "' for table data is not a directory.";
339  }
340 
341  storage_stats.epoch = lastCheckpointedEpoch();
342  storage_stats.epoch_floor = epochFloor();
343  boost::filesystem::directory_iterator
344  endItr; // default construction yields past-the-end
345  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
346  ++fileIt) {
347  FileMetadata file_metadata = getMetadataForFile(fileIt);
348  if (file_metadata.is_data_file) {
349  if (is_metadata_file(file_metadata.file_size,
350  file_metadata.page_size,
352  storage_stats.metadata_file_count++;
353  storage_stats.total_metadata_file_size += file_metadata.file_size;
354  storage_stats.total_metadata_page_count += file_metadata.num_pages;
355  } else {
356  storage_stats.data_file_count++;
357  storage_stats.total_data_file_size += file_metadata.file_size;
358  storage_stats.total_data_page_count += file_metadata.num_pages;
359  }
360  }
361  }
362  }
363  } else {
364  storage_stats.epoch = lastCheckpointedEpoch();
365  storage_stats.epoch_floor = epochFloor();
366  storage_stats.total_free_metadata_page_count = 0;
367  storage_stats.total_free_data_page_count = 0;
368 
369  // We already initialized this table so take the faster path of walking through the
370  // FileInfo objects and getting metadata from there
371  for (const auto& file_info_entry : files_) {
372  const auto file_info = file_info_entry.second;
373  if (is_metadata_file(
374  file_info->size(), file_info->pageSize, num_pages_per_metadata_file_)) {
375  storage_stats.metadata_file_count++;
376  storage_stats.total_metadata_file_size +=
377  file_info->pageSize * file_info->numPages;
378  storage_stats.total_metadata_page_count += file_info->numPages;
379  storage_stats.total_free_metadata_page_count.value() +=
380  file_info->freePages.size();
381  } else {
382  storage_stats.data_file_count++;
383  storage_stats.total_data_file_size += file_info->pageSize * file_info->numPages;
384  storage_stats.total_data_page_count += file_info->numPages;
385  storage_stats.total_free_data_page_count.value() += file_info->freePages.size();
386  }
387  }
388  }
389  return storage_stats;
390 }
391 
393  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
394  std::vector<HeaderInfo>& headerVec) {
395  for (auto& file_future : file_futures) {
396  file_future.wait();
397  }
398  // concatenate the vectors after thread completes
399  for (auto& file_future : file_futures) {
400  auto tempHeaderVec = file_future.get();
401  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
402  }
403  file_futures.clear();
404 }
405 
406 void FileMgr::init(const std::string& dataPathToConvertFrom,
407  const int32_t epochOverride) {
408  int32_t converted_data_epoch = 0;
409  boost::filesystem::path path(dataPathToConvertFrom);
410  if (boost::filesystem::exists(path)) {
411  if (!boost::filesystem::is_directory(path)) {
412  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
413  }
415 
416  if (epochOverride != -1) { // if opening at previous epoch
417  setEpoch(epochOverride);
418  }
419 
420  boost::filesystem::directory_iterator
421  endItr; // default construction yields past-the-end
422  int32_t maxFileId = -1;
423  int32_t fileCount = 0;
424  int32_t threadCount = std::thread::hardware_concurrency();
425  std::vector<HeaderInfo> headerVec;
426  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
427  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
428  FileMetadata fileMetadata = getMetadataForFile(fileIt);
429  if (fileMetadata.is_data_file) {
430  maxFileId = std::max(maxFileId, fileMetadata.file_id);
431  file_futures.emplace_back(std::async(std::launch::async, [fileMetadata, this] {
432  std::vector<HeaderInfo> tempHeaderVec;
433  openExistingFile(fileMetadata.file_path,
434  fileMetadata.file_id,
435  fileMetadata.page_size,
436  fileMetadata.num_pages,
437  tempHeaderVec);
438  return tempHeaderVec;
439  }));
440  fileCount++;
441  if (fileCount % threadCount) {
442  processFileFutures(file_futures, headerVec);
443  }
444  }
445  }
446 
447  if (file_futures.size() > 0) {
448  processFileFutures(file_futures, headerVec);
449  }
450 
451  /* Sort headerVec so that all HeaderInfos
452  * from a chunk will be grouped together
453  * and in order of increasing PageId
454  * - Version Epoch */
455 
456  std::sort(headerVec.begin(), headerVec.end());
457 
458  /* Goal of next section is to find sequences in the
459  * sorted headerVec of the same ChunkId, which we
460  * can then initiate a FileBuffer with */
461 
462  if (headerVec.size() > 0) {
463  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
464  auto startIt = headerVec.begin();
465 
466  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
467  ++headerIt) {
468  if (headerIt->chunkKey != lastChunkKey) {
469  FileMgr* c_fm_ =
470  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
471  CHECK(c_fm_);
472  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerIt);
473  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
474  destBuf->syncEncoder(srcBuf);
475  destBuf->setSize(srcBuf->size());
476  destBuf->setDirty(); // this needs to be set to force writing out metadata
477  // files from "checkpoint()" call
478 
479  size_t totalNumPages = srcBuf->getMultiPage().size();
480  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
481  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
482  Page destPage = c_fm_->requestFreePage(
483  srcBuf->pageSize(),
484  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
485  MultiPage multiPage(srcBuf->pageSize());
486  multiPage.push(destPage, converted_data_epoch);
487  destBuf->multiPages_.push_back(multiPage);
488  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
489  copyPage(
490  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
491  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
492  }
493  lastChunkKey = headerIt->chunkKey;
494  startIt = headerIt;
495  }
496  }
497 
498  // now need to insert last Chunk
499  FileMgr* c_fm_ =
500  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
501  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerVec.end());
502  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
503  destBuf->syncEncoder(srcBuf);
504  destBuf->setSize(srcBuf->size());
505  destBuf->setDirty(); // this needs to be set to write out metadata file from the
506  // "checkpoint()" call
507 
508  size_t totalNumPages = srcBuf->getMultiPage().size();
509  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
510  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
511  Page destPage = c_fm_->requestFreePage(
512  srcBuf->pageSize(),
513  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
514  MultiPage multiPage(srcBuf->pageSize());
515  multiPage.push(destPage, converted_data_epoch);
516  destBuf->multiPages_.push_back(multiPage);
517  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
518  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
519  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
520  }
521  }
522  nextFileId_ = maxFileId + 1;
523  } else {
524  if (!boost::filesystem::create_directory(path)) {
525  LOG(FATAL) << "Specified path does not exist: " << path;
526  }
527  }
528  isFullyInitted_ = true;
529 }
530 
532  for (auto& [idx, file_info] : files_) {
533  if (file_info->f) {
534  close(file_info->f);
535  file_info->f = nullptr;
536  }
537  }
538 
539  if (DBMetaFile_) {
541  DBMetaFile_ = nullptr;
542  }
543 
544  if (epochFile_) {
545  close(epochFile_);
546  epochFile_ = nullptr;
547  }
548 }
549 
551  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
553  /* rename for later deletion the directory containing table related data */
555 }
556 
557 void FileMgr::copyPage(Page& srcPage,
558  FileMgr* destFileMgr,
559  Page& destPage,
560  const size_t reservedHeaderSize,
561  const size_t numBytes,
562  const size_t offset) {
563  CHECK(offset + numBytes <= defaultPageSize_);
564  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
565  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
566  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
567 
568  size_t bytesRead = srcFileInfo->read(
569  srcPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize, numBytes, buffer);
570  CHECK(bytesRead == numBytes);
571  size_t bytesWritten = destFileInfo->write(
572  destPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize,
573  numBytes,
574  buffer);
575  CHECK(bytesWritten == numBytes);
576  ::free(buffer);
577 }
578 
579 void FileMgr::createEpochFile(const std::string& epochFileName) {
580  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
581  if (boost::filesystem::exists(epochFilePath)) {
582  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
583  }
584  epochFile_ = create(epochFilePath, sizeof(Epoch::byte_size()));
585  // Write out current epoch to file - which if this
586  // function is being called should be 0
588 }
589 
590 int32_t FileMgr::openAndReadLegacyEpochFile(const std::string& epochFileName) {
591  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
592  if (!boost::filesystem::exists(epochFilePath)) {
593  return 0;
594  }
595 
596  if (!boost::filesystem::is_regular_file(epochFilePath)) {
597  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
598  }
599  if (boost::filesystem::file_size(epochFilePath) < 4) {
600  LOG(FATAL) << "Epoch file `" << epochFilePath
601  << "` is not sized properly (current size: "
602  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
603  }
604  FILE* legacyEpochFile = open(epochFilePath);
605  int32_t epoch;
606  read(legacyEpochFile, 0, sizeof(int32_t), (int8_t*)&epoch);
607  close(legacyEpochFile);
608  return epoch;
609 }
610 
611 void FileMgr::openAndReadEpochFile(const std::string& epochFileName) {
612  if (!epochFile_) { // Check to see if already open
613  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
614  if (!boost::filesystem::exists(epochFilePath)) {
615  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
616  }
617  if (!boost::filesystem::is_regular_file(epochFilePath)) {
618  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
619  }
620  if (boost::filesystem::file_size(epochFilePath) != Epoch::byte_size()) {
621  LOG(FATAL) << "Epoch file `" << epochFilePath
622  << "` is not sized properly (current size: "
623  << boost::filesystem::file_size(epochFilePath)
624  << ", expected size: " << Epoch::byte_size() << ")";
625  }
626  epochFile_ = open(epochFilePath);
627  }
629 }
630 
632  CHECK(epochFile_);
634  int32_t status = fflush(epochFile_);
635  CHECK(status == 0) << "Could not flush epoch file to disk";
636 #ifdef __APPLE__
637  status = fcntl(fileno(epochFile_), 51);
638 #else
639  status = omnisci::fsync(fileno(epochFile_));
640 #endif
641  CHECK(status == 0) << "Could not sync epoch file to disk";
642  epochIsCheckpointed_ = true;
643 }
644 
645 void FileMgr::freePagesBeforeEpoch(const int32_t min_epoch) {
646  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
647  freePagesBeforeEpochUnlocked(min_epoch, chunkIndex_.begin(), chunkIndex_.end());
648 }
649 
651  const int32_t min_epoch,
652  const ChunkKeyToChunkMap::iterator lower_bound,
653  const ChunkKeyToChunkMap::iterator upper_bound) {
654  for (auto chunkIt = lower_bound; chunkIt != upper_bound; ++chunkIt) {
655  chunkIt->second->freePagesBeforeEpoch(min_epoch);
656  }
657 }
658 
659 void FileMgr::rollOffOldData(const int32_t epoch_ceiling, const bool should_checkpoint) {
660  if (maxRollbackEpochs_ >= 0) {
661  auto min_epoch = std::max(epoch_ceiling - maxRollbackEpochs_, epoch_.floor());
662  if (min_epoch > epoch_.floor()) {
663  freePagesBeforeEpoch(min_epoch);
664  epoch_.floor(min_epoch);
665  if (should_checkpoint) {
666  checkpoint();
667  }
668  }
669  }
670 }
671 
672 std::string FileMgr::describeSelf() const {
673  stringstream ss;
674  ss << "table (" << fileMgrKey_.first << ", " << fileMgrKey_.second << ")";
675  return ss.str();
676 }
677 
679  VLOG(2) << "Checkpointing " << describeSelf() << " epoch: " << epoch();
681  rollOffOldData(epoch(), false /* shouldCheckpoint */);
682  syncFilesToDisk();
684  incrementEpoch();
685  freePages();
686 }
687 
689  const size_t page_size,
690  const size_t num_bytes) {
691  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
692  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
693  << "Chunk already exists for key: " << show_chunk(key);
694  return createBufferUnlocked(key, page_size, num_bytes);
695 }
696 
697 // Assumes checks for pre-existing key have already occured.
699  const size_t page_size,
700  const size_t num_bytes) {
701  size_t actual_page_size = page_size;
702  if (actual_page_size == 0) {
703  actual_page_size = defaultPageSize_;
704  }
705  chunkIndex_[key] = allocateBuffer(actual_page_size, key, num_bytes);
706  return (chunkIndex_[key]);
707 }
708 
710  const ChunkKey& key,
711  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
712  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
713  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
714  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
715  << "Chunk already exists for key: " << show_chunk(key);
716  chunkIndex_[key] = allocateBuffer(key, headerStartIt, headerEndIt);
717  return (chunkIndex_[key]);
718 }
719 
721  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
722  return chunkIndex_.find(key) != chunkIndex_.end();
723 }
724 
725 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
726  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
727  auto chunk_it = chunkIndex_.find(key);
728  CHECK(chunk_it != chunkIndex_.end())
729  << "Chunk does not exist for key: " << show_chunk(key);
730  deleteBufferUnlocked(chunk_it, purge);
731 }
732 
733 ChunkKeyToChunkMap::iterator FileMgr::deleteBufferUnlocked(
734  const ChunkKeyToChunkMap::iterator chunk_it,
735  const bool purge) {
736  if (purge) {
737  chunk_it->second->freePages();
738  }
739  delete chunk_it->second;
740  return chunkIndex_.erase(chunk_it);
741 }
742 
743 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
744  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
745  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
746  if (chunkIt == chunkIndex_.end()) {
747  return; // should we throw?
748  }
749  while (chunkIt != chunkIndex_.end() &&
750  std::search(chunkIt->first.begin(),
751  chunkIt->first.begin() + keyPrefix.size(),
752  keyPrefix.begin(),
753  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
754  deleteBufferUnlocked(chunkIt++, purge);
755  }
756 }
757 
758 FileBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
759  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
760  auto chunk_it = chunkIndex_.find(key);
761  return getBufferUnlocked(chunk_it, num_bytes);
762 }
763 
764 FileBuffer* FileMgr::getBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it,
765  const size_t num_bytes) {
766  CHECK(chunk_it != chunkIndex_.end())
767  << "Chunk does not exist for key: " << show_chunk(chunk_it->first);
768  return chunk_it->second;
769 }
770 
772  AbstractBuffer* destBuffer,
773  const size_t numBytes) {
774  // reads chunk specified by ChunkKey into AbstractBuffer provided by
775  // destBuffer
776  CHECK(!destBuffer->isDirty())
777  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
778  << show_chunk(key);
779  AbstractBuffer* chunk = getBuffer(key);
780  // chunk's size is either specified in function call with numBytes or we
781  // just look at pageSize * numPages in FileBuffer
782  if (numBytes > 0 && numBytes > chunk->size()) {
783  LOG(FATAL) << "Chunk retrieved for key `" << show_chunk(key) << "` is smaller ("
784  << chunk->size() << ") than number of bytes requested (" << numBytes
785  << ")";
786  }
787  chunk->copyTo(destBuffer, numBytes);
788 }
789 
791  AbstractBuffer* srcBuffer,
792  const size_t numBytes) {
793  auto chunk = getOrCreateBuffer(key);
794  size_t oldChunkSize = chunk->size();
795  // write the buffer's data to the Chunk
796  size_t newChunkSize = (numBytes == 0) ? srcBuffer->size() : numBytes;
797  if (chunk->isDirty()) {
798  // multiple appends are allowed,
799  // but only single update is allowed
800  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
801  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
802  "for key: "
803  << show_chunk(key);
804  }
805  }
806  CHECK(srcBuffer->isDirty()) << "putBuffer expects a dirty buffer";
807  if (srcBuffer->isUpdated()) {
808  // chunk size is not changed when fixed rows are updated or are marked as deleted.
809  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
810  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
811  // For varlen update, it takes another route via fragmenter using disk-level buffer.
812  if (0 == numBytes && !chunk->isDirty()) {
813  chunk->setSize(newChunkSize);
814  }
815  //@todo use dirty flags to only flush pages of chunk that need to
816  // be flushed
817  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
818  newChunkSize,
819  0,
820  srcBuffer->getType(),
821  srcBuffer->getDeviceId());
822  } else if (srcBuffer->isAppended()) {
823  CHECK_LT(oldChunkSize, newChunkSize);
824  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
825  newChunkSize - oldChunkSize,
826  srcBuffer->getType(),
827  srcBuffer->getDeviceId());
828  } else {
829  // If dirty buffer comes in unmarked, it must be empty.
830  // Encoder sync is still required to flush the metadata.
831  CHECK(numBytes == 0)
832  << "Dirty buffer with size > 0 must be marked as isAppended() or isUpdated()";
833  }
834  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
835  //@todo commenting out line above will make sure this metadata is set
836  // but will trigger error on fetch chunk
837  srcBuffer->clearDirtyBits();
838  chunk->syncEncoder(srcBuffer);
839  return chunk;
840 }
841 
842 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
843  LOG(FATAL) << "Operation not supported";
844  return nullptr; // satisfy return-type warning
845 }
846 
848  LOG(FATAL) << "Operation not supported";
849 }
850 
851 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
852  std::lock_guard<std::mutex> lock(getPageMutex_);
853 
854  auto candidateFiles = fileIndex_.equal_range(pageSize);
855  int32_t pageNum = -1;
856  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
857  FileInfo* fileInfo = files_.at(fileIt->second);
858  pageNum = fileInfo->getFreePage();
859  if (pageNum != -1) {
860  return (Page(fileInfo->fileId, pageNum));
861  }
862  }
863  // if here then we need to add a file
864  FileInfo* fileInfo;
865  if (isMetadata) {
866  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
867  } else {
868  fileInfo = createFile(pageSize, num_pages_per_data_file_);
869  }
870  pageNum = fileInfo->getFreePage();
871  CHECK(pageNum != -1);
872  return (Page(fileInfo->fileId, pageNum));
873 }
874 
875 void FileMgr::requestFreePages(size_t numPagesRequested,
876  size_t pageSize,
877  std::vector<Page>& pages,
878  const bool isMetadata) {
879  // not used currently
880  // @todo add method to FileInfo to get more than one page
881  std::lock_guard<std::mutex> lock(getPageMutex_);
882  auto candidateFiles = fileIndex_.equal_range(pageSize);
883  size_t numPagesNeeded = numPagesRequested;
884  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
885  FileInfo* fileInfo = files_.at(fileIt->second);
886  int32_t pageNum;
887  do {
888  pageNum = fileInfo->getFreePage();
889  if (pageNum != -1) {
890  pages.emplace_back(fileInfo->fileId, pageNum);
891  numPagesNeeded--;
892  }
893  } while (pageNum != -1 && numPagesNeeded > 0);
894  if (numPagesNeeded == 0) {
895  break;
896  }
897  }
898  while (numPagesNeeded > 0) {
899  FileInfo* fileInfo;
900  if (isMetadata) {
901  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
902  } else {
903  fileInfo = createFile(pageSize, num_pages_per_data_file_);
904  }
905  int32_t pageNum;
906  do {
907  pageNum = fileInfo->getFreePage();
908  if (pageNum != -1) {
909  pages.emplace_back(fileInfo->fileId, pageNum);
910  numPagesNeeded--;
911  }
912  } while (pageNum != -1 && numPagesNeeded > 0);
913  if (numPagesNeeded == 0) {
914  break;
915  }
916  }
917  CHECK(pages.size() == numPagesRequested);
918 }
919 
920 FileInfo* FileMgr::openExistingFile(const std::string& path,
921  const int fileId,
922  const size_t pageSize,
923  const size_t numPages,
924  std::vector<HeaderInfo>& headerVec) {
925  FILE* f = open(path);
926  FileInfo* fInfo = new FileInfo(
927  this, fileId, f, pageSize, numPages, false); // false means don't init file
928 
929  fInfo->openExistingFile(headerVec);
930  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
931  files_[fileId] = fInfo;
932  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
933  return fInfo;
934 }
935 
936 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
937  // check arguments
938  if (pageSize == 0 || numPages == 0) {
939  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
940  }
941 
942  // create the new file
943  FILE* f = create(fileMgrBasePath_,
944  nextFileId_,
945  pageSize,
946  numPages); // TM: not sure if I like naming scheme here - should be in
947  // separate namespace?
948  CHECK(f);
949 
950  // instantiate a new FileInfo for the newly created file
951  int32_t fileId = nextFileId_++;
952  FileInfo* fInfo =
953  new FileInfo(this, fileId, f, pageSize, numPages, true); // true means init file
954  CHECK(fInfo);
955 
956  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
957  // update file manager data structures
958  files_[fileId] = fInfo;
959  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
960 
961  return fInfo;
962 }
963 
964 FILE* FileMgr::getFileForFileId(const int32_t fileId) {
965  CHECK(fileId >= 0);
966  CHECK(files_.find(fileId) != files_.end());
967  return files_.at(fileId)->f;
968 }
969 
971  const ChunkKey& keyPrefix) {
972  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
973  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
974  if (chunkIt == chunkIndex_.end()) {
975  return; // throw?
976  }
977  while (chunkIt != chunkIndex_.end() &&
978  std::search(chunkIt->first.begin(),
979  chunkIt->first.begin() + keyPrefix.size(),
980  keyPrefix.begin(),
981  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
982  if (chunkIt->second->hasEncoder()) {
983  auto chunk_metadata = std::make_shared<ChunkMetadata>();
984  chunkIt->second->encoder_->getMetadata(chunk_metadata);
985  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
986  }
987  chunkIt++;
988  }
989 }
990 
992  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
993  const auto& chunkIt = chunkIndex_.find(chunkKey);
994  if (chunkIt != chunkIndex_.end()) {
995  return chunkIt->second->numMetadataPages();
996  } else {
997  throw std::runtime_error("Chunk was not found.");
998  }
999 }
1000 
1001 int32_t FileMgr::getDBVersion() const {
1002  return gfm_->getDBVersion();
1003 }
1004 
1006  return gfm_->getDBConvert();
1007 }
1008 
1011 
1012  if (db_version_ > getDBVersion()) {
1013  LOG(FATAL) << "DB forward compatibility is not supported. Version of OmniSci "
1014  "software used is older than the version of DB being read: "
1015  << db_version_;
1016  }
1018  // new system, or we are moving forward versions
1019  // system wide migration would go here if required
1021  return;
1022  }
1023 }
1024 
1025 int32_t FileMgr::readVersionFromDisk(const std::string& versionFileName) const {
1026  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1027  if (!boost::filesystem::exists(versionFilePath)) {
1028  return -1;
1029  }
1030  if (!boost::filesystem::is_regular_file(versionFilePath)) {
1031  return -1;
1032  }
1033  if (boost::filesystem::file_size(versionFilePath) < 4) {
1034  return -1;
1035  }
1036  FILE* versionFile = open(versionFilePath);
1037  int32_t version;
1038  read(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1039  close(versionFile);
1040  return version;
1041 }
1042 
1043 void FileMgr::writeAndSyncVersionToDisk(const std::string& versionFileName,
1044  const int32_t version) {
1045  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1046  FILE* versionFile;
1047  if (boost::filesystem::exists(versionFilePath)) {
1048  int32_t oldVersion = readVersionFromDisk(versionFileName);
1049  LOG(INFO) << "Storage version file `" << versionFilePath
1050  << "` already exists, its current version is " << oldVersion;
1051  versionFile = open(versionFilePath);
1052  } else {
1053  versionFile = create(versionFilePath, sizeof(int32_t));
1054  }
1055  write(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1056  int32_t status = fflush(versionFile);
1057  if (status != 0) {
1058  LOG(FATAL) << "Could not flush version file " << versionFilePath << " to disk";
1059  }
1060 #ifdef __APPLE__
1061  status = fcntl(fileno(epochFile_), 51);
1062 #else
1063  status = omnisci::fsync(fileno(versionFile));
1064 #endif
1065  if (status != 0) {
1066  LOG(FATAL) << "Could not sync version file " << versionFilePath << " to disk";
1067  }
1068  close(versionFile);
1069 }
1070 
1072  const std::string versionFilePath(fileMgrBasePath_ + "/" + FILE_MGR_VERSION_FILENAME);
1073  LOG(INFO) << "Migrating file format version from 0 to 1 for `" << versionFilePath;
1078  int32_t migrationCompleteVersion = 1;
1079  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migrationCompleteVersion);
1080 }
1081 
1085  fileMgrVersion_ = 0;
1087  } else if (fileMgrVersion_ > latestFileMgrVersion_) {
1088  LOG(FATAL)
1089  << "Table storage forward compatibility is not supported. Version of OmniSci "
1090  "software used is older than the version of table being read: "
1091  << fileMgrVersion_;
1092  }
1093 
1096  switch (fileMgrVersion_) {
1097  case 0: {
1099  break;
1100  }
1101  default: {
1102  UNREACHABLE();
1103  }
1104  }
1105  fileMgrVersion_++;
1106  }
1107  }
1108 }
1109 
1110 /*
1111  * @brief sets the epoch to a user-specified value
1112  *
1113  * With the introduction of optional capped history on files, the possibility of
1114  * multiple successive rollbacks to earlier epochs means that we cannot rely on
1115  * the maxRollbackEpochs_ variable alone (initialized from a value stored in Catalog) to
1116  * guarantee that we can set an epoch at any given value. This function checks the
1117  * user-specified epoch value to ensure it is both not higher than the last checkpointed
1118  * epoch AND it is >= the epoch floor, which on an uncapped value will be the epoch the
1119  * table table was created at (default 0), and for capped tables, the lowest epoch for
1120  * which we have complete data, now materialized in the epoch metadata itself. */
1121 
1122 void FileMgr::setEpoch(const int32_t newEpoch) {
1123  if (newEpoch < epoch_.floor()) {
1124  std::stringstream error_message;
1125  error_message << "Cannot set epoch for " << describeSelf()
1126  << " lower than the minimum rollback epoch (" << epoch_.floor() << ").";
1127  throw std::runtime_error(error_message.str());
1128  }
1129  epoch_.ceiling(newEpoch);
1131 }
1132 
1133 void FileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
1134  std::unique_lock<mapd_shared_mutex> lock(mutex_free_page_);
1135  free_pages_.push_back(page);
1136 }
1137 
1138 void FileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t table_id) {
1139  UNREACHABLE();
1140 }
1141 
1147 void FileMgr::resumeFileCompaction(const std::string& status_file_name) {
1148  if (status_file_name == COPY_PAGES_STATUS) {
1149  // Delete status file and restart data compaction process
1150  auto file_path = getFilePath(status_file_name);
1151  CHECK(boost::filesystem::exists(file_path));
1152  boost::filesystem::remove(file_path);
1153  compactFiles();
1154  } else if (status_file_name == UPDATE_PAGE_VISIBILITY_STATUS) {
1155  // Execute second and third phases of data compaction
1156  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1157  auto page_mappings = readPageMappingsFromStatusFile();
1158  updateMappedPagesVisibility(page_mappings);
1160  deleteEmptyFiles();
1161  } else if (status_file_name == DELETE_EMPTY_FILES_STATUS) {
1162  // Execute last phase of data compaction
1163  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1164  deleteEmptyFiles();
1165  } else {
1166  UNREACHABLE() << "Unexpected status file name: " << status_file_name;
1167  }
1168 }
1169 
1198  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1199  if (files_.empty()) {
1200  return;
1201  }
1202 
1203  auto copy_pages_status_file_path = getFilePath(COPY_PAGES_STATUS);
1204  CHECK(!boost::filesystem::exists(copy_pages_status_file_path));
1205  std::ofstream status_file(copy_pages_status_file_path.string(),
1206  std::ios::out | std::ios::binary);
1207  status_file.close();
1208 
1209  std::vector<PageMapping> page_mappings;
1210  std::set<Page> touched_pages;
1211  std::set<size_t> page_sizes;
1212  for (auto [file_id, file_info] : files_) {
1213  page_sizes.emplace(file_info->pageSize);
1214  }
1215  for (auto page_size : page_sizes) {
1216  sortAndCopyFilePagesForCompaction(page_size, page_mappings, touched_pages);
1217  }
1218 
1219  writePageMappingsToStatusFile(page_mappings);
1221 
1222  updateMappedPagesVisibility(page_mappings);
1224 
1225  deleteEmptyFiles();
1226 }
1227 
1235  std::vector<PageMapping>& page_mappings,
1236  std::set<Page>& touched_pages) {
1237  std::vector<FileInfo*> sorted_file_infos;
1238  auto range = fileIndex_.equal_range(page_size);
1239  for (auto it = range.first; it != range.second; it++) {
1240  sorted_file_infos.emplace_back(files_.at(it->second));
1241  }
1242  if (sorted_file_infos.empty()) {
1243  return;
1244  }
1245 
1246  // Sort file infos in ascending order of free pages count i.e. from files with
1247  // the least number of free pages to those with the highest number of free pages.
1248  std::sort(sorted_file_infos.begin(),
1249  sorted_file_infos.end(),
1250  [](const FileInfo* file_1, const FileInfo* file_2) {
1251  return file_1->freePages.size() < file_2->freePages.size();
1252  });
1253 
1254  size_t destination_index = 0, source_index = sorted_file_infos.size() - 1;
1255 
1256  // For page copy destinations, skip files without free pages.
1257  while (destination_index < source_index &&
1258  sorted_file_infos[destination_index]->freePages.empty()) {
1259  destination_index++;
1260  }
1261 
1262  // For page copy sources, skip files with only free pages.
1263  while (destination_index < source_index &&
1264  sorted_file_infos[source_index]->freePages.size() ==
1265  sorted_file_infos[source_index]->numPages) {
1266  source_index--;
1267  }
1268 
1269  std::set<size_t> source_used_pages;
1270  CHECK(destination_index <= source_index);
1271 
1272  // Get the total number of free pages available for compaction
1273  int64_t total_free_pages{0};
1274  for (size_t i = destination_index; i <= source_index; i++) {
1275  total_free_pages += sorted_file_infos[i]->numFreePages();
1276  }
1277 
1278  while (destination_index < source_index) {
1279  if (source_used_pages.empty()) {
1280  // Populate source_used_pages with only used pages in the source file.
1281  auto source_file_info = sorted_file_infos[source_index];
1282  auto& free_pages = source_file_info->freePages;
1283  for (size_t page_num = 0; page_num < source_file_info->numPages; page_num++) {
1284  if (free_pages.find(page_num) == free_pages.end()) {
1285  source_used_pages.emplace(page_num);
1286  }
1287  }
1288 
1289  // Free pages of current source file will not be copy destinations
1290  total_free_pages -= source_file_info->numFreePages();
1291  }
1292 
1293  // Exit early if there are not enough free pages to empty the next file
1294  if (total_free_pages - static_cast<int64_t>(source_used_pages.size()) < 0) {
1295  return;
1296  }
1297 
1298  // Copy pages from source files to destination files
1299  auto dest_file_info = sorted_file_infos[destination_index];
1300  while (!source_used_pages.empty() && !dest_file_info->freePages.empty()) {
1301  // Get next page to copy
1302  size_t source_page_num = *source_used_pages.begin();
1303  source_used_pages.erase(source_page_num);
1304 
1305  Page source_page{sorted_file_infos[source_index]->fileId, source_page_num};
1306  copySourcePageForCompaction(source_page,
1307  sorted_file_infos[destination_index],
1308  page_mappings,
1309  touched_pages);
1310  total_free_pages--;
1311  }
1312 
1313  if (source_used_pages.empty()) {
1314  source_index--;
1315  }
1316 
1317  if (dest_file_info->freePages.empty()) {
1318  destination_index++;
1319  }
1320  }
1321 }
1322 
1330  FileInfo* destination_file_info,
1331  std::vector<PageMapping>& page_mappings,
1332  std::set<Page>& touched_pages) {
1333  size_t destination_page_num = destination_file_info->getFreePage();
1334  CHECK_NE(destination_page_num, static_cast<size_t>(-1));
1335  Page destination_page{destination_file_info->fileId, destination_page_num};
1336 
1337  // Assert that the same pages are not copied or overridden multiple times
1338  CHECK(touched_pages.find(source_page) == touched_pages.end());
1339  touched_pages.emplace(source_page);
1340 
1341  CHECK(touched_pages.find(destination_page) == touched_pages.end());
1342  touched_pages.emplace(destination_page);
1343 
1344  auto header_size = copyPageWithoutHeaderSize(source_page, destination_page);
1345  page_mappings.emplace_back(static_cast<size_t>(source_page.fileId),
1346  source_page.pageNum,
1347  header_size,
1348  static_cast<size_t>(destination_page.fileId),
1349  destination_page.pageNum);
1350 }
1351 
1359 int32_t FileMgr::copyPageWithoutHeaderSize(const Page& source_page,
1360  const Page& destination_page) {
1361  FileInfo* source_file_info = files_.at(source_page.fileId);
1362  CHECK(source_file_info);
1363  CHECK_EQ(source_file_info->fileId, source_page.fileId);
1364 
1365  FileInfo* destination_file_info = files_.at(destination_page.fileId);
1366  CHECK(destination_file_info);
1367  CHECK_EQ(destination_file_info->fileId, destination_page.fileId);
1368  CHECK_EQ(source_file_info->pageSize, destination_file_info->pageSize);
1369 
1370  auto page_size = source_file_info->pageSize;
1371  auto buffer = std::make_unique<int8_t[]>(page_size);
1372  size_t bytes_read =
1373  source_file_info->read(source_page.pageNum * page_size, page_size, buffer.get());
1374  CHECK_EQ(page_size, bytes_read);
1375 
1376  auto header_size_offset = sizeof(int32_t);
1377  size_t bytes_written = destination_file_info->write(
1378  (destination_page.pageNum * page_size) + header_size_offset,
1379  page_size - header_size_offset,
1380  buffer.get() + header_size_offset);
1381  CHECK_EQ(page_size - header_size_offset, bytes_written);
1382  return reinterpret_cast<int32_t*>(buffer.get())[0];
1383 }
1384 
1389 void FileMgr::updateMappedPagesVisibility(const std::vector<PageMapping>& page_mappings) {
1390  for (const auto& page_mapping : page_mappings) {
1391  auto destination_file = files_.at(page_mapping.destination_file_id);
1392 
1393  // Set destination page header size
1394  auto header_size = page_mapping.source_page_header_size;
1395  CHECK_GT(header_size, 0);
1396  destination_file->write(
1397  page_mapping.destination_page_num * destination_file->pageSize,
1398  sizeof(PageHeaderSizeType),
1399  reinterpret_cast<int8_t*>(&header_size));
1400  auto source_file = files_.at(page_mapping.source_file_id);
1401 
1402  // Free source page
1403  PageHeaderSizeType free_page_header_size{0};
1404  source_file->write(page_mapping.source_page_num * source_file->pageSize,
1405  sizeof(PageHeaderSizeType),
1406  reinterpret_cast<int8_t*>(&free_page_header_size));
1407  source_file->freePageDeferred(page_mapping.source_page_num);
1408  }
1409 
1410  for (auto file_info_entry : files_) {
1411  int32_t status = file_info_entry.second->syncToDisk();
1412  if (status != 0) {
1413  LOG(FATAL) << "Could not sync file to disk";
1414  }
1415  }
1416 }
1417 
1423  for (auto [file_id, file_info] : files_) {
1424  CHECK_EQ(file_id, file_info->fileId);
1425  if (file_info->freePages.size() == file_info->numPages) {
1426  fclose(file_info->f);
1427  file_info->f = nullptr;
1428  auto file_path = get_data_file_path(fileMgrBasePath_, file_id, file_info->pageSize);
1429  boost::filesystem::remove(file_path);
1430  }
1431  }
1432 
1433  auto status_file_path = getFilePath(DELETE_EMPTY_FILES_STATUS);
1434  CHECK(boost::filesystem::exists(status_file_path));
1435  boost::filesystem::remove(status_file_path);
1436 }
1437 
1444  const std::vector<PageMapping>& page_mappings) {
1445  auto file_path = getFilePath(COPY_PAGES_STATUS);
1446  CHECK(boost::filesystem::exists(file_path));
1447  CHECK(boost::filesystem::is_empty(file_path));
1448  std::ofstream status_file{file_path.string(), std::ios::out | std::ios::binary};
1449  int64_t page_mappings_count = page_mappings.size();
1450  status_file.write(reinterpret_cast<const char*>(&page_mappings_count), sizeof(int64_t));
1451  status_file.write(reinterpret_cast<const char*>(page_mappings.data()),
1452  page_mappings_count * sizeof(PageMapping));
1453  status_file.close();
1454 }
1455 
1459 std::vector<PageMapping> FileMgr::readPageMappingsFromStatusFile() {
1460  auto file_path = getFilePath(UPDATE_PAGE_VISIBILITY_STATUS);
1461  CHECK(boost::filesystem::exists(file_path));
1462  std::ifstream status_file{file_path.string(),
1463  std::ios::in | std::ios::binary | std::ios::ate};
1464  CHECK(status_file.is_open());
1465  size_t file_size = status_file.tellg();
1466  status_file.seekg(0, std::ios::beg);
1467  CHECK_GE(file_size, sizeof(int64_t));
1468 
1469  int64_t page_mappings_count;
1470  status_file.read(reinterpret_cast<char*>(&page_mappings_count), sizeof(int64_t));
1471  auto page_mappings_byte_size = file_size - sizeof(int64_t);
1472  CHECK_EQ(page_mappings_byte_size % sizeof(PageMapping), static_cast<size_t>(0));
1473  CHECK_EQ(static_cast<size_t>(page_mappings_count),
1474  page_mappings_byte_size / sizeof(PageMapping));
1475 
1476  std::vector<PageMapping> page_mappings(page_mappings_count);
1477  status_file.read(reinterpret_cast<char*>(page_mappings.data()),
1478  page_mappings_byte_size);
1479  status_file.close();
1480  return page_mappings;
1481 }
1482 
1486 void FileMgr::renameCompactionStatusFile(const char* const from_status,
1487  const char* const to_status) {
1488  auto from_status_file_path = getFilePath(from_status);
1489  auto to_status_file_path = getFilePath(to_status);
1490  CHECK(boost::filesystem::exists(from_status_file_path));
1491  CHECK(!boost::filesystem::exists(to_status_file_path));
1492  boost::filesystem::rename(from_status_file_path, to_status_file_path);
1493 }
1494 
1495 // Methods that enable override of number of pages per data/metadata files
1496 // for use in unit tests.
1497 void FileMgr::setNumPagesPerDataFile(size_t num_pages) {
1498  num_pages_per_data_file_ = num_pages;
1499 }
1500 
1501 void FileMgr::setNumPagesPerMetadataFile(size_t num_pages) {
1502  num_pages_per_metadata_file_ = num_pages;
1503 }
1504 
1506  mapd_shared_lock<mapd_shared_mutex> files_read_lock(files_rw_mutex_);
1507  for (auto file_info_entry : files_) {
1508  int32_t status = file_info_entry.second->syncToDisk();
1509  CHECK(status == 0) << "Could not sync file to disk";
1510  }
1511 }
1512 
1513 void FileMgr::initializeNumThreads(size_t num_reader_threads) {
1514  // # of threads is based on # of cores on the host
1515  size_t num_hardware_based_threads = std::thread::hardware_concurrency();
1516  if (num_reader_threads == 0 || num_reader_threads > num_hardware_based_threads) {
1517  // # of threads has not been defined by user
1518  num_reader_threads_ = num_hardware_based_threads;
1519  } else {
1520  num_reader_threads_ = num_reader_threads;
1521  }
1522 }
1523 
1525  mapd_unique_lock<mapd_shared_mutex> free_pages_write_lock(mutex_free_page_);
1526  for (auto& free_page : free_pages_) {
1527  free_page.first->freePageDeferred(free_page.second);
1528  }
1529  free_pages_.clear();
1530 }
1531 
1532 FileBuffer* FileMgr::allocateBuffer(const size_t page_size,
1533  const ChunkKey& key,
1534  const size_t num_bytes) {
1535  return new FileBuffer(this, page_size, key, num_bytes);
1536 }
1537 
1539  const ChunkKey& key,
1540  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
1541  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
1542  return new FileBuffer(this, key, headerStartIt, headerEndIt);
1543 }
1544 
1545 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
1547  ChunkKey& chunk_key,
1548  int32_t contingent,
1549  int32_t page_epoch,
1550  int32_t page_num) {
1551  // If the parent FileMgr has a fileMgrKey, then all keys are locked to one table and
1552  // can be set from the manager.
1553  auto [db_id, tb_id] = get_fileMgrKey();
1554  chunk_key[CHUNK_KEY_DB_IDX] = db_id;
1555  chunk_key[CHUNK_KEY_TABLE_IDX] = tb_id;
1556  const bool delete_contingent =
1557  (contingent == DELETE_CONTINGENT || contingent == ROLLOFF_CONTINGENT);
1558  // Check if page was deleted with a checkpointed epoch
1559  if (delete_contingent && epoch(db_id, tb_id) >= page_epoch) {
1560  file_info->freePageImmediate(page_num);
1561  return true;
1562  }
1563  // Recover page if it was deleted but not checkpointed.
1564  if (delete_contingent) {
1565  file_info->recoverPage(chunk_key, page_num);
1566  }
1567  return false;
1568 }
1569 
1571  FileBuffer* buf;
1572  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
1573  auto chunk_it = chunkIndex_.find(key);
1574  if (chunk_it == chunkIndex_.end()) {
1575  buf = createBufferUnlocked(key);
1576  } else {
1577  buf = getBufferUnlocked(chunk_it);
1578  }
1579  return buf;
1580 }
1581 
1583  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
1584  for (auto [key, buf] : chunkIndex_) {
1585  if (buf->isDirty()) {
1586  buf->writeMetadata(epoch());
1587  buf->clearDirtyBits();
1588  }
1589  }
1590 }
1591 
1593  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
1594  return chunkIndex_.size();
1595 }
1596 
1597 size_t FileMgr::num_pages_per_data_file_{DEFAULT_NUM_PAGES_PER_DATA_FILE};
1598 size_t FileMgr::num_pages_per_metadata_file_{DEFAULT_NUM_PAGES_PER_METADATA_FILE};
1599 } // namespace File_Namespace
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:590
std::vector< PageMapping > readPageMappingsFromStatusFile()
Definition: FileMgr.cpp:1459
virtual FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
Definition: FileMgr.cpp:709
int32_t readVersionFromDisk(const std::string &versionFileName) const
Definition: FileMgr.cpp:1025
#define CHECK_EQ(x, y)
Definition: Logger.h:217
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator)
Definition: FileMgr.cpp:146
OpenFilesResult openFiles()
Definition: FileMgr.cpp:189
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:842
size_t write(const size_t offset, const size_t size, const int8_t *buf)
Definition: FileInfo.cpp:60
TablePair fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:512
std::string getBasePath() const
int8_t * storage_ptr()
Definition: Epoch.h:61
mapd_shared_mutex mutex_free_page_
Definition: FileMgr.h:405
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:579
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:851
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
Definition: FileMgr.cpp:991
void syncEncoder(const AbstractBuffer *src_buffer)
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:688
void sortAndCopyFilePagesForCompaction(size_t page_size, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1234
std::vector< HeaderInfo > header_infos
Definition: FileMgr.h:115
virtual FileBuffer * allocateBuffer(const size_t page_size, const ChunkKey &key, const size_t num_bytes=0)
Definition: FileMgr.cpp:1532
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:203
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
void copySourcePageForCompaction(const Page &source_page, FileInfo *destination_file_info, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1329
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:847
virtual MemoryLevel getType() const =0
void freePageImmediate(int32_t page_num)
Definition: FileInfo.cpp:239
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:557
void migrateToLatestFileMgrVersion()
Definition: FileMgr.cpp:1082
#define MAPD_FILE_EXT
Definition: File.h:25
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
Definition: FileMgr.cpp:659
#define UNREACHABLE()
Definition: Logger.h:253
static constexpr char const * UPDATE_PAGE_VISIBILITY_STATUS
Definition: FileMgr.h:369
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:222
void init(const size_t num_reader_threads, const int32_t epochOverride)
Definition: FileMgr.cpp:249
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:725
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:325
GlobalFileMgr * gfm_
Definition: FileMgr.h:511
bool is_compaction_status_file(const std::string &file_name)
Definition: FileMgr.cpp:182
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:401
int32_t floor() const
Definition: Epoch.h:43
int32_t ceiling() const
Definition: Epoch.h:44
virtual bool updatePageIfDeleted(FileInfo *file_info, ChunkKey &chunk_key, int32_t contingent, int32_t page_epoch, int32_t page_num)
deletes or recovers a page based on last checkpointed epoch.
Definition: FileMgr.cpp:1546
std::optional< uint64_t > total_free_data_page_count
Definition: FileMgr.h:107
void writePageMappingsToStatusFile(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1443
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:58
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:678
int32_t copyPageWithoutHeaderSize(const Page &source_page, const Page &destination_page)
Definition: FileMgr.cpp:1359
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::string fileMgrBasePath_
Definition: FileMgr.h:388
static void setNumPagesPerMetadataFile(size_t num_pages)
Definition: FileMgr.cpp:1501
std::string to_string(char const *&&v)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:49
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:771
std::string show_chunk(const ChunkKey &key)
Definition: types.h:86
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
static size_t num_pages_per_data_file_
Definition: FileMgr.h:409
const int32_t latestFileMgrVersion_
Definition: FileMgr.h:399
int32_t PageHeaderSizeType
Definition: FileMgr.h:121
int32_t db_version_
the index of the next file id
Definition: FileMgr.h:396
std::set< size_t > freePages
Definition: FileInfo.h:62
future< Result > async(Fn &&fn, Args &&...args)
size_t pageSize
file stream object for the represented file
Definition: FileInfo.h:59
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
Definition: FileMgr.cpp:1043
size_t getNumChunks() override
Definition: FileMgr.cpp:1592
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:133
int32_t incrementEpoch()
Definition: FileMgr.h:275
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
Definition: FileMgr.cpp:1138
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
Definition: FileInfo.h:51
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:392
StorageStats getStorageStats()
Definition: FileMgr.cpp:329
FileBuffer * getOrCreateBuffer(const ChunkKey &key)
Definition: FileMgr.cpp:1570
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define CHECK_NE(x, y)
Definition: Logger.h:218
static void setNumPagesPerDataFile(size_t num_pages)
Definition: FileMgr.cpp:1497
std::optional< uint64_t > total_free_metadata_page_count
Definition: FileMgr.h:103
int32_t fileId
Definition: Page.h:47
int32_t getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:1001
void freePagesBeforeEpoch(const int32_t min_epoch)
Definition: FileMgr.cpp:645
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:320
PageSizeFileMMap fileIndex_
A map of files accessible via a file identifier.
Definition: FileMgr.h:392
virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const bool purge=true)
Definition: FileMgr.cpp:733
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
uint64_t total_metadata_page_count
Definition: FileMgr.h:102
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:964
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
Definition: FileMgr.h:406
static constexpr char FILE_MGR_VERSION_FILENAME[]
Definition: FileMgr.h:380
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:393
static size_t num_pages_per_metadata_file_
Definition: FileMgr.h:410
int fsync(int fd)
Definition: omnisci_fs.cpp:60
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:720
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:920
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:631
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
constexpr int32_t ROLLOFF_CONTINGENT
Definition: FileInfo.h:52
std::string compaction_status_file_name
Definition: FileMgr.h:117
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:743
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:970
const TablePair get_fileMgrKey() const
Definition: FileMgr.h:332
#define CHECK_LT(x, y)
Definition: Logger.h:219
static constexpr char EPOCH_FILENAME[]
Definition: FileMgr.h:378
string version
Definition: setup.in.py:73
virtual std::string describeSelf() const
Definition: FileMgr.cpp:672
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:394
virtual FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:698
int32_t maxRollbackEpochs_
Definition: FileMgr.h:387
void freePagesBeforeEpochUnlocked(const int32_t min_epoch, const ChunkKeyToChunkMap::iterator lower_bound, const ChunkKeyToChunkMap::iterator upper_bound)
Definition: FileMgr.cpp:650
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
uint64_t total_metadata_file_size
Definition: FileMgr.h:101
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:102
void openAndReadEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:611
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:66
bool is_metadata_file(size_t file_size, size_t page_size, size_t num_pages_per_metadata_file)
Definition: FileMgr.cpp:321
static size_t byte_size()
Definition: Epoch.h:63
static constexpr char const * DELETE_EMPTY_FILES_STATUS
Definition: FileMgr.h:370
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:106
bool getDBConvert() const
Definition: FileMgr.cpp:1005
virtual void free_page(std::pair< FileInfo *, int32_t > &&page)
Definition: FileMgr.cpp:1133
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
virtual void closeRemovePhysical()
Definition: FileMgr.cpp:550
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:98
static constexpr char const * COPY_PAGES_STATUS
Definition: FileMgr.h:368
int32_t epoch() const
Definition: FileMgr.h:508
void openExistingFile(std::vector< HeaderInfo > &headerVec)
Definition: FileInfo.cpp:71
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:936
std::map< int32_t, FileInfo * > files_
Definition: FileMgr.h:391
void updateMappedPagesVisibility(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1389
#define CHECK(condition)
Definition: Logger.h:209
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:758
void recoverPage(const ChunkKey &chunk_key, int32_t page_num)
Definition: FileInfo.cpp:252
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:119
char * f
void setEpoch(const int32_t newEpoch)
Definition: FileMgr.cpp:1122
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
Definition: FileMgr.cpp:126
FileInfo * getFileInfoForFileId(const int32_t fileId) const
Definition: FileMgr.h:218
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1513
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:875
mapd_unique_lock< mapd_shared_mutex > write_lock
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:86
std::string get_data_file_path(const std::string &base_path, int file_id, size_t page_size)
Definition: File.cpp:42
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:218
int32_t epochFloor() const
Definition: FileMgr.h:273
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:402
void renameCompactionStatusFile(const char *const from_status, const char *const to_status)
Definition: FileMgr.cpp:1486
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:403
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:79
virtual FileBuffer * getBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const size_t numBytes=0)
Definition: FileMgr.cpp:764
static constexpr char LEGACY_EPOCH_FILENAME[]
Definition: FileMgr.h:377
A selection of helper methods for File I/O.
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:790
#define VLOG(n)
Definition: Logger.h:303
Type timer_start()
Definition: measure.h:42
int32_t lastCheckpointedEpoch()
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:291
void resumeFileCompaction(const std::string &status_file_name)
Definition: FileMgr.cpp:1147
static constexpr char DB_META_FILENAME[]
Definition: FileMgr.h:379
static constexpr int32_t INVALID_VERSION
Definition: FileMgr.h:381
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
boost::filesystem::path getFilePath(const std::string &file_name)
Definition: FileMgr.h:334