OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <fstream>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/checked_alloc.h"
41 #include "Shared/measure.h"
42 
43 using namespace std;
44 
45 namespace File_Namespace {
46 
47 FileMgr::FileMgr(const int32_t deviceId,
48  GlobalFileMgr* gfm,
49  const TablePair fileMgrKey,
50  const int32_t maxRollbackEpochs,
51  const size_t num_reader_threads,
52  const int32_t epoch,
53  const size_t defaultPageSize)
54  : AbstractBufferMgr(deviceId)
55  , maxRollbackEpochs_(maxRollbackEpochs)
56  , defaultPageSize_(defaultPageSize)
57  , nextFileId_(0)
58  , gfm_(gfm)
59  , fileMgrKey_(fileMgrKey) {
60  init(num_reader_threads, epoch);
61 }
62 
63 // used only to initialize enough to drop
64 FileMgr::FileMgr(const int32_t deviceId,
65  GlobalFileMgr* gfm,
66  const TablePair fileMgrKey,
67  const size_t defaultPageSize,
68  const bool runCoreInit)
69  : AbstractBufferMgr(deviceId)
70  , maxRollbackEpochs_(-1)
71  , defaultPageSize_(defaultPageSize)
72  , nextFileId_(0)
73  , gfm_(gfm)
74  , fileMgrKey_(fileMgrKey) {
75  const std::string fileMgrDirPrefix("table");
76  const std::string FileMgrDirDelim("_");
77  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
78  std::to_string(fileMgrKey_.first) + // db_id
79  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
80  epochFile_ = nullptr;
81  files_.clear();
82  if (runCoreInit) {
83  coreInit();
84  }
85 }
86 
87 FileMgr::FileMgr(GlobalFileMgr* gfm, const size_t defaultPageSize, std::string basePath)
88  : AbstractBufferMgr(0)
89  , maxRollbackEpochs_(-1)
90  , fileMgrBasePath_(basePath)
91  , defaultPageSize_(defaultPageSize)
92  , nextFileId_(0)
93  , gfm_(gfm)
94  , fileMgrKey_(0, 0) {
95  init(basePath, -1);
96 }
97 
98 // For testing purposes only
99 FileMgr::FileMgr(const int epoch) : AbstractBufferMgr(-1) {
100  epoch_.ceiling(epoch);
101 }
102 
103 // Used to initialize CachingFileMgr.
104 FileMgr::FileMgr() : AbstractBufferMgr(0) {}
105 
107  // free memory used by FileInfo objects
108  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
109  delete chunkIt->second;
110  }
111  for (auto file_info_entry : files_) {
112  delete file_info_entry.second;
113  }
114 
115  if (epochFile_) {
116  close(epochFile_);
117  epochFile_ = nullptr;
118  }
119 
120  if (DBMetaFile_) {
122  DBMetaFile_ = nullptr;
123  }
124 }
125 
127  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
128  const std::string fileMgrDirPrefix("table");
129  const std::string FileMgrDirDelim("_");
130  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
131  std::to_string(fileMgrKey_.first) + // db_id
132  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
133  boost::filesystem::path path(fileMgrBasePath_);
134  if (boost::filesystem::exists(path)) {
135  if (!boost::filesystem::is_directory(path)) {
136  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
137  << "' for table data is not a directory.";
138  }
141  return true;
142  }
143  return false;
144 }
145 
147  const boost::filesystem::directory_iterator& fileIterator) {
148  FileMetadata fileMetadata;
149  fileMetadata.is_data_file = false;
150  fileMetadata.file_path = fileIterator->path().string();
151  if (!boost::filesystem::is_regular_file(fileIterator->status())) {
152  return fileMetadata;
153  }
154  // note that boost::filesystem leaves preceding dot on
155  // extension - hence MAPD_FILE_EXT is ".mapd"
156  std::string extension(fileIterator->path().extension().string());
157  if (extension == MAPD_FILE_EXT) {
158  std::string fileStem(fileIterator->path().stem().string());
159  // remove trailing dot if any
160  if (fileStem.size() > 0 && fileStem.back() == '.') {
161  fileStem = fileStem.substr(0, fileStem.size() - 1);
162  }
163  size_t dotPos = fileStem.find_last_of("."); // should only be one
164  if (dotPos == std::string::npos) {
165  LOG(FATAL) << "File `" << fileIterator->path()
166  << "` does not carry page size information in the filename.";
167  }
168  fileMetadata.is_data_file = true;
169  fileMetadata.file_id = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
170  fileMetadata.page_size =
171  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
172 
173  fileMetadata.file_size = boost::filesystem::file_size(fileMetadata.file_path);
174  CHECK_EQ(fileMetadata.file_size % fileMetadata.page_size,
175  size_t(0)); // should be no partial pages
176  fileMetadata.num_pages = fileMetadata.file_size / fileMetadata.page_size;
177  }
178  return fileMetadata;
179 }
180 
181 namespace {
182 bool is_compaction_status_file(const std::string& file_name) {
183  return (file_name == FileMgr::COPY_PAGES_STATUS ||
186 }
187 } // namespace
188 
190  auto clock_begin = timer_start();
191  boost::filesystem::directory_iterator
192  end_itr; // default construction yields past-the-end
194  result.max_file_id = -1;
195  int32_t file_count = 0;
196  int32_t thread_count = std::thread::hardware_concurrency();
197  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
198  boost::filesystem::path path(fileMgrBasePath_);
199  for (boost::filesystem::directory_iterator file_it(path); file_it != end_itr;
200  ++file_it) {
201  FileMetadata file_metadata = getMetadataForFile(file_it);
202  if (file_metadata.is_data_file) {
203  result.max_file_id = std::max(result.max_file_id, file_metadata.file_id);
204  file_futures.emplace_back(std::async(std::launch::async, [file_metadata, this] {
205  std::vector<HeaderInfo> temp_header_vec;
206  openExistingFile(file_metadata.file_path,
207  file_metadata.file_id,
208  file_metadata.page_size,
209  file_metadata.num_pages,
210  temp_header_vec);
211  return temp_header_vec;
212  }));
213  file_count++;
214  if (file_count % thread_count == 0) {
215  processFileFutures(file_futures, result.header_infos);
216  }
217  }
218 
219  if (is_compaction_status_file(file_it->path().filename().string())) {
220  CHECK(result.compaction_status_file_name.empty());
221  result.compaction_status_file_name = file_it->path().filename().string();
222  }
223  }
224 
225  if (file_futures.size() > 0) {
226  processFileFutures(file_futures, result.header_infos);
227  }
228 
229  int64_t queue_time_ms = timer_stop(clock_begin);
230  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : " << queue_time_ms
231  << "ms Epoch: " << epoch_.ceiling() << " files read: " << file_count
232  << " table location: '" << fileMgrBasePath_ << "'";
233  return result;
234 }
235 
237  for (auto file_info_entry : files_) {
238  auto file_info = file_info_entry.second;
239  if (file_info->f) {
240  close(file_info->f);
241  file_info->f = nullptr;
242  }
243  delete file_info;
244  }
245  files_.clear();
246  fileIndex_.clear();
247 }
248 
249 void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) {
250  // if epochCeiling = -1 this means open from epoch file
251 
252  const bool dataExists = coreInit();
253  if (dataExists) {
254  if (epochOverride != -1) { // if opening at specified epoch
255  setEpoch(epochOverride);
256  }
257 
258  auto open_files_result = openFiles();
259  if (!open_files_result.compaction_status_file_name.empty()) {
260  resumeFileCompaction(open_files_result.compaction_status_file_name);
261  clearFileInfos();
262  open_files_result = openFiles();
263  CHECK(open_files_result.compaction_status_file_name.empty());
264  }
265 
266  /* Sort headerVec so that all HeaderInfos
267  * from a chunk will be grouped together
268  * and in order of increasing PageId
269  * - Version Epoch */
270  auto& header_vec = open_files_result.header_infos;
271  std::sort(header_vec.begin(), header_vec.end());
272 
273  /* Goal of next section is to find sequences in the
274  * sorted headerVec of the same ChunkId, which we
275  * can then initiate a FileBuffer with */
276 
277  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
278  if (header_vec.size() > 0) {
279  ChunkKey lastChunkKey = header_vec.begin()->chunkKey;
280  auto startIt = header_vec.begin();
281 
282  for (auto headerIt = header_vec.begin() + 1; headerIt != header_vec.end();
283  ++headerIt) {
284  if (headerIt->chunkKey != lastChunkKey) {
285  chunkIndex_[lastChunkKey] =
286  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, headerIt);
287  lastChunkKey = headerIt->chunkKey;
288  startIt = headerIt;
289  }
290  }
291  // now need to insert last Chunk
292  chunkIndex_[lastChunkKey] =
293  new FileBuffer(this, /*pageSize,*/ lastChunkKey, startIt, header_vec.end());
294  }
295  nextFileId_ = open_files_result.max_file_id + 1;
296  rollOffOldData(epoch(), true /* only checkpoint if data is rolled off */);
297  incrementEpoch();
298  freePages();
299  } else {
300  boost::filesystem::path path(fileMgrBasePath_);
301  if (!boost::filesystem::create_directory(path)) {
302  LOG(FATAL) << "Could not create data directory: " << path;
303  }
305  if (epochOverride != -1) {
306  epoch_.floor(epochOverride);
307  epoch_.ceiling(epochOverride);
308  } else {
309  // These are default constructor values for epoch_, but resetting here for clarity
310  epoch_.floor(0);
311  epoch_.ceiling(0);
312  }
316  incrementEpoch();
317  }
318 
319  initializeNumThreads(num_reader_threads);
320  isFullyInitted_ = true;
321 }
322 
323 namespace {
325  size_t page_size,
326  size_t num_pages_per_metadata_file) {
327  return (file_size == (METADATA_PAGE_SIZE * num_pages_per_metadata_file) &&
328  page_size == METADATA_PAGE_SIZE);
329 }
330 } // namespace
331 
333  StorageStats storage_stats;
334  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
335  if (!isFullyInitted_) {
336  CHECK(!fileMgrBasePath_.empty());
337  boost::filesystem::path path(fileMgrBasePath_);
338  if (boost::filesystem::exists(path)) {
339  if (!boost::filesystem::is_directory(path)) {
340  LOG(FATAL) << "getStorageStats: Specified path '" << fileMgrBasePath_
341  << "' for table data is not a directory.";
342  }
343 
344  storage_stats.epoch = lastCheckpointedEpoch();
345  storage_stats.epoch_floor = epochFloor();
346  boost::filesystem::directory_iterator
347  endItr; // default construction yields past-the-end
348  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
349  ++fileIt) {
350  FileMetadata file_metadata = getMetadataForFile(fileIt);
351  if (file_metadata.is_data_file) {
352  if (is_metadata_file(file_metadata.file_size,
353  file_metadata.page_size,
355  storage_stats.metadata_file_count++;
356  storage_stats.total_metadata_file_size += file_metadata.file_size;
357  storage_stats.total_metadata_page_count += file_metadata.num_pages;
358  } else {
359  storage_stats.data_file_count++;
360  storage_stats.total_data_file_size += file_metadata.file_size;
361  storage_stats.total_data_page_count += file_metadata.num_pages;
362  }
363  }
364  }
365  }
366  } else {
367  storage_stats.epoch = lastCheckpointedEpoch();
368  storage_stats.epoch_floor = epochFloor();
369  storage_stats.total_free_metadata_page_count = 0;
370  storage_stats.total_free_data_page_count = 0;
371 
372  // We already initialized this table so take the faster path of walking through the
373  // FileInfo objects and getting metadata from there
374  for (const auto& file_info_entry : files_) {
375  const auto file_info = file_info_entry.second;
376  if (is_metadata_file(
377  file_info->size(), file_info->pageSize, num_pages_per_metadata_file_)) {
378  storage_stats.metadata_file_count++;
379  storage_stats.total_metadata_file_size +=
380  file_info->pageSize * file_info->numPages;
381  storage_stats.total_metadata_page_count += file_info->numPages;
382  storage_stats.total_free_metadata_page_count.value() +=
383  file_info->freePages.size();
384  } else {
385  storage_stats.data_file_count++;
386  storage_stats.total_data_file_size += file_info->pageSize * file_info->numPages;
387  storage_stats.total_data_page_count += file_info->numPages;
388  storage_stats.total_free_data_page_count.value() += file_info->freePages.size();
389  }
390  }
391  }
392  return storage_stats;
393 }
394 
396  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
397  std::vector<HeaderInfo>& headerVec) {
398  for (auto& file_future : file_futures) {
399  file_future.wait();
400  }
401  // concatenate the vectors after thread completes
402  for (auto& file_future : file_futures) {
403  auto tempHeaderVec = file_future.get();
404  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
405  }
406  file_futures.clear();
407 }
408 
409 void FileMgr::init(const std::string& dataPathToConvertFrom,
410  const int32_t epochOverride) {
411  int32_t converted_data_epoch = 0;
412  boost::filesystem::path path(dataPathToConvertFrom);
413  if (boost::filesystem::exists(path)) {
414  if (!boost::filesystem::is_directory(path)) {
415  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
416  }
418 
419  if (epochOverride != -1) { // if opening at previous epoch
420  setEpoch(epochOverride);
421  }
422 
423  boost::filesystem::directory_iterator
424  endItr; // default construction yields past-the-end
425  int32_t maxFileId = -1;
426  int32_t fileCount = 0;
427  int32_t threadCount = std::thread::hardware_concurrency();
428  std::vector<HeaderInfo> headerVec;
429  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
430  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
431  FileMetadata fileMetadata = getMetadataForFile(fileIt);
432  if (fileMetadata.is_data_file) {
433  maxFileId = std::max(maxFileId, fileMetadata.file_id);
434  file_futures.emplace_back(std::async(std::launch::async, [fileMetadata, this] {
435  std::vector<HeaderInfo> tempHeaderVec;
436  openExistingFile(fileMetadata.file_path,
437  fileMetadata.file_id,
438  fileMetadata.page_size,
439  fileMetadata.num_pages,
440  tempHeaderVec);
441  return tempHeaderVec;
442  }));
443  fileCount++;
444  if (fileCount % threadCount) {
445  processFileFutures(file_futures, headerVec);
446  }
447  }
448  }
449 
450  if (file_futures.size() > 0) {
451  processFileFutures(file_futures, headerVec);
452  }
453 
454  /* Sort headerVec so that all HeaderInfos
455  * from a chunk will be grouped together
456  * and in order of increasing PageId
457  * - Version Epoch */
458 
459  std::sort(headerVec.begin(), headerVec.end());
460 
461  /* Goal of next section is to find sequences in the
462  * sorted headerVec of the same ChunkId, which we
463  * can then initiate a FileBuffer with */
464 
465  if (headerVec.size() > 0) {
466  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
467  auto startIt = headerVec.begin();
468 
469  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
470  ++headerIt) {
471  if (headerIt->chunkKey != lastChunkKey) {
472  FileMgr* c_fm_ =
473  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
474  CHECK(c_fm_);
475  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerIt);
476  chunkIndex_[lastChunkKey] = srcBuf;
477  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
478  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
479  destBuf->syncEncoder(srcBuf);
480  destBuf->setSize(srcBuf->size());
481  destBuf->setDirty(); // this needs to be set to force writing out metadata
482  // files from "checkpoint()" call
483 
484  size_t totalNumPages = srcBuf->getMultiPage().size();
485  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
486  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
487  Page destPage = c_fm_->requestFreePage(
488  srcBuf->pageSize(),
489  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
490  MultiPage multiPage(srcBuf->pageSize());
491  multiPage.push(destPage, converted_data_epoch);
492  destBuf->multiPages_.push_back(multiPage);
493  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
494  copyPage(
495  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
496  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
497  }
498  lastChunkKey = headerIt->chunkKey;
499  startIt = headerIt;
500  }
501  }
502 
503  // now need to insert last Chunk
504  FileMgr* c_fm_ =
505  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
506  FileBuffer* srcBuf = new FileBuffer(this, lastChunkKey, startIt, headerVec.end());
507  chunkIndex_[lastChunkKey] = srcBuf;
508  FileBuffer* destBuf = new FileBuffer(c_fm_, srcBuf->pageSize(), lastChunkKey);
509  c_fm_->chunkIndex_[lastChunkKey] = destBuf;
510  destBuf->syncEncoder(srcBuf);
511  destBuf->setSize(srcBuf->size());
512  destBuf->setDirty(); // this needs to be set to write out metadata file from the
513  // "checkpoint()" call
514 
515  size_t totalNumPages = srcBuf->getMultiPage().size();
516  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
517  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
518  Page destPage = c_fm_->requestFreePage(
519  srcBuf->pageSize(),
520  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
521  MultiPage multiPage(srcBuf->pageSize());
522  multiPage.push(destPage, converted_data_epoch);
523  destBuf->multiPages_.push_back(multiPage);
524  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
525  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
526  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
527  }
528  }
529  nextFileId_ = maxFileId + 1;
530  } else {
531  if (!boost::filesystem::create_directory(path)) {
532  LOG(FATAL) << "Specified path does not exist: " << path;
533  }
534  }
535  isFullyInitted_ = true;
536 }
537 
539  for (auto& [idx, file_info] : files_) {
540  if (file_info->f) {
541  close(file_info->f);
542  file_info->f = nullptr;
543  }
544  }
545 
546  if (DBMetaFile_) {
548  DBMetaFile_ = nullptr;
549  }
550 
551  if (epochFile_) {
552  close(epochFile_);
553  epochFile_ = nullptr;
554  }
555 }
556 
558  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
560  /* rename for later deletion the directory containing table related data */
562 }
563 
564 void FileMgr::copyPage(Page& srcPage,
565  FileMgr* destFileMgr,
566  Page& destPage,
567  const size_t reservedHeaderSize,
568  const size_t numBytes,
569  const size_t offset) {
570  CHECK(offset + numBytes <= defaultPageSize_);
571  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
572  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
573  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
574 
575  size_t bytesRead = srcFileInfo->read(
576  srcPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize, numBytes, buffer);
577  CHECK(bytesRead == numBytes);
578  size_t bytesWritten = destFileInfo->write(
579  destPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize,
580  numBytes,
581  buffer);
582  CHECK(bytesWritten == numBytes);
583  ::free(buffer);
584 }
585 
586 void FileMgr::createEpochFile(const std::string& epochFileName) {
587  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
588  if (boost::filesystem::exists(epochFilePath)) {
589  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
590  }
591  epochFile_ = create(epochFilePath, sizeof(Epoch::byte_size()));
592  // Write out current epoch to file - which if this
593  // function is being called should be 0
595 }
596 
597 int32_t FileMgr::openAndReadLegacyEpochFile(const std::string& epochFileName) {
598  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
599  if (!boost::filesystem::exists(epochFilePath)) {
600  return 0;
601  }
602 
603  if (!boost::filesystem::is_regular_file(epochFilePath)) {
604  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
605  }
606  if (boost::filesystem::file_size(epochFilePath) < 4) {
607  LOG(FATAL) << "Epoch file `" << epochFilePath
608  << "` is not sized properly (current size: "
609  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
610  }
611  FILE* legacyEpochFile = open(epochFilePath);
612  int32_t epoch;
613  read(legacyEpochFile, 0, sizeof(int32_t), (int8_t*)&epoch);
614  close(legacyEpochFile);
615  return epoch;
616 }
617 
618 void FileMgr::openAndReadEpochFile(const std::string& epochFileName) {
619  if (!epochFile_) { // Check to see if already open
620  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
621  if (!boost::filesystem::exists(epochFilePath)) {
622  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
623  }
624  if (!boost::filesystem::is_regular_file(epochFilePath)) {
625  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
626  }
627  if (boost::filesystem::file_size(epochFilePath) != Epoch::byte_size()) {
628  LOG(FATAL) << "Epoch file `" << epochFilePath
629  << "` is not sized properly (current size: "
630  << boost::filesystem::file_size(epochFilePath)
631  << ", expected size: " << Epoch::byte_size() << ")";
632  }
633  epochFile_ = open(epochFilePath);
634  }
636 }
637 
639  CHECK(epochFile_);
641  int32_t status = fflush(epochFile_);
642  CHECK(status == 0) << "Could not flush epoch file to disk";
643 #ifdef __APPLE__
644  status = fcntl(fileno(epochFile_), 51);
645 #else
646  status = omnisci::fsync(fileno(epochFile_));
647 #endif
648  CHECK(status == 0) << "Could not sync epoch file to disk";
649  epochIsCheckpointed_ = true;
650 }
651 
652 void FileMgr::freePagesBeforeEpoch(const int32_t min_epoch) {
653  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
654  freePagesBeforeEpochUnlocked(min_epoch, chunkIndex_.begin(), chunkIndex_.end());
655 }
656 
658  const int32_t min_epoch,
659  const ChunkKeyToChunkMap::iterator lower_bound,
660  const ChunkKeyToChunkMap::iterator upper_bound) {
661  for (auto chunkIt = lower_bound; chunkIt != upper_bound; ++chunkIt) {
662  chunkIt->second->freePagesBeforeEpoch(min_epoch);
663  }
664 }
665 
666 void FileMgr::rollOffOldData(const int32_t epoch_ceiling, const bool should_checkpoint) {
667  if (maxRollbackEpochs_ >= 0) {
668  auto min_epoch = std::max(epoch_ceiling - maxRollbackEpochs_, epoch_.floor());
669  if (min_epoch > epoch_.floor()) {
670  freePagesBeforeEpoch(min_epoch);
671  epoch_.floor(min_epoch);
672  if (should_checkpoint) {
673  checkpoint();
674  }
675  }
676  }
677 }
678 
679 std::string FileMgr::describeSelf() {
680  stringstream ss;
681  ss << "table (" << fileMgrKey_.first << ", " << fileMgrKey_.second << ")";
682  return ss.str();
683 }
684 
686  VLOG(2) << "Checkpointing " << describeSelf() << " epoch: " << epoch();
688  rollOffOldData(epoch(), false /* shouldCheckpoint */);
689  syncFilesToDisk();
691  incrementEpoch();
692  freePages();
693 }
694 
696  const size_t pageSize,
697  const size_t numBytes) {
698  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
699  return createBufferUnlocked(key, pageSize, numBytes);
700 }
701 
702 // The underlying implementation of createBuffer needs to be lockless since
703 // some of the codepaths that call it will have already obtained a write lock
704 // and should not release it until they are complete.
706  const size_t pageSize,
707  const size_t numBytes) {
708  size_t actualPageSize = pageSize;
709  if (actualPageSize == 0) {
710  actualPageSize = defaultPageSize_;
711  }
713  // we will do this lazily and not allocate space for the Chunk (i.e.
714  // FileBuffer yet)
715 
716  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
717  << "Chunk already exists for key: " << show_chunk(key);
718 
719  chunkIndex_[key] = allocateBuffer(actualPageSize, key, numBytes);
720  return (chunkIndex_[key]);
721 }
722 
724  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
725  return chunkIndex_.find(key) != chunkIndex_.end();
726 }
727 
728 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
729  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
730  auto chunk_it = chunkIndex_.find(key);
731  CHECK(chunk_it != chunkIndex_.end())
732  << "Chunk does not exist for key: " << show_chunk(key);
733  deleteBufferUnlocked(chunk_it, purge);
734 }
735 
736 void FileMgr::deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it,
737  const bool purge) {
738  if (purge) {
739  chunk_it->second->freePages();
740  }
741  //@todo need a way to represent delete in non purge case
742  delete chunk_it->second;
743  chunkIndex_.erase(chunk_it);
744 }
745 
746 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
747  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
748  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
749  if (chunkIt == chunkIndex_.end()) {
750  return; // should we throw?
751  }
752  while (chunkIt != chunkIndex_.end() &&
753  std::search(chunkIt->first.begin(),
754  chunkIt->first.begin() + keyPrefix.size(),
755  keyPrefix.begin(),
756  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
757  if (purge) {
758  chunkIt->second->freePages();
759  }
760  //@todo need a way to represent delete in non purge case
761  delete chunkIt->second;
762  chunkIndex_.erase(chunkIt++);
763  }
764 }
765 
766 FileBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t numBytes) {
767  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
768  auto chunkIt = chunkIndex_.find(key);
769  CHECK(chunkIt != chunkIndex_.end())
770  << "Chunk does not exist for key: " << show_chunk(key);
771  return chunkIt->second;
772 }
773 
775  AbstractBuffer* destBuffer,
776  const size_t numBytes) {
777  // reads chunk specified by ChunkKey into AbstractBuffer provided by
778  // destBuffer
779  if (destBuffer->isDirty()) {
780  LOG(FATAL)
781  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
782  << show_chunk(key);
783  }
784  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
785  auto chunkIt = chunkIndex_.find(key);
786  if (chunkIt == chunkIndex_.end()) {
787  LOG(FATAL) << "Chunk does not exist for key: " << show_chunk(key);
788  }
789  chunkIndexReadLock.unlock();
790 
791  AbstractBuffer* chunk = chunkIt->second;
792  // chunk's size is either specified in function call with numBytes or we
793  // just look at pageSize * numPages in FileBuffer
794  if (numBytes > 0 && numBytes > chunk->size()) {
795  LOG(FATAL) << "Chunk retrieved for key `" << show_chunk(key) << "` is smaller ("
796  << chunk->size() << ") than number of bytes requested (" << numBytes
797  << ")";
798  }
799 
800  chunk->copyTo(destBuffer, numBytes);
801 }
802 
804  AbstractBuffer* srcBuffer,
805  const size_t numBytes) {
806  auto chunk = getOrCreateBuffer(key);
807  size_t oldChunkSize = chunk->size();
808  // write the buffer's data to the Chunk
809  // size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
810  size_t newChunkSize = numBytes == 0 ? srcBuffer->size() : numBytes;
811  if (chunk->isDirty()) {
812  // multiple appends are allowed,
813  // but only single update is allowed
814  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
815  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
816  "for key: "
817  << show_chunk(key);
818  }
819  }
820  if (srcBuffer->isUpdated()) {
821  // chunk size is not changed when fixed rows are updated or are marked as deleted.
822  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
823  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
824  // For varlen update, it takes another route via fragmenter using disk-level buffer.
825  if (0 == numBytes && !chunk->isDirty()) {
826  chunk->setSize(newChunkSize);
827  }
828  //@todo use dirty flags to only flush pages of chunk that need to
829  // be flushed
830  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
831  newChunkSize,
832  0,
833  srcBuffer->getType(),
834  srcBuffer->getDeviceId());
835  } else if (srcBuffer->isAppended()) {
836  CHECK_LT(oldChunkSize, newChunkSize);
837  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
838  newChunkSize - oldChunkSize,
839  srcBuffer->getType(),
840  srcBuffer->getDeviceId());
841  } else {
842  UNREACHABLE() << "putBuffer() expects a buffer marked is_updated or is_appended";
843  }
844  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
845  //@todo commenting out line above will make sure this metadata is set
846  // but will trigger error on fetch chunk
847  srcBuffer->clearDirtyBits();
848  chunk->syncEncoder(srcBuffer);
849  return chunk;
850 }
851 
852 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
853  LOG(FATAL) << "Operation not supported";
854  return nullptr; // satisfy return-type warning
855 }
856 
858  LOG(FATAL) << "Operation not supported";
859 }
860 
861 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
862  std::lock_guard<std::mutex> lock(getPageMutex_);
863 
864  auto candidateFiles = fileIndex_.equal_range(pageSize);
865  int32_t pageNum = -1;
866  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
867  FileInfo* fileInfo = files_[fileIt->second];
868  pageNum = fileInfo->getFreePage();
869  if (pageNum != -1) {
870  return (Page(fileInfo->fileId, pageNum));
871  }
872  }
873  // if here then we need to add a file
874  FileInfo* fileInfo;
875  if (isMetadata) {
876  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
877  } else {
878  fileInfo = createFile(pageSize, num_pages_per_data_file_);
879  }
880  pageNum = fileInfo->getFreePage();
881  CHECK(pageNum != -1);
882  return (Page(fileInfo->fileId, pageNum));
883 }
884 
885 void FileMgr::requestFreePages(size_t numPagesRequested,
886  size_t pageSize,
887  std::vector<Page>& pages,
888  const bool isMetadata) {
889  // not used currently
890  // @todo add method to FileInfo to get more than one page
891  std::lock_guard<std::mutex> lock(getPageMutex_);
892  auto candidateFiles = fileIndex_.equal_range(pageSize);
893  size_t numPagesNeeded = numPagesRequested;
894  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
895  FileInfo* fileInfo = files_[fileIt->second];
896  int32_t pageNum;
897  do {
898  pageNum = fileInfo->getFreePage();
899  if (pageNum != -1) {
900  pages.emplace_back(fileInfo->fileId, pageNum);
901  numPagesNeeded--;
902  }
903  } while (pageNum != -1 && numPagesNeeded > 0);
904  if (numPagesNeeded == 0) {
905  break;
906  }
907  }
908  while (numPagesNeeded > 0) {
909  FileInfo* fileInfo;
910  if (isMetadata) {
911  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
912  } else {
913  fileInfo = createFile(pageSize, num_pages_per_data_file_);
914  }
915  int32_t pageNum;
916  do {
917  pageNum = fileInfo->getFreePage();
918  if (pageNum != -1) {
919  pages.emplace_back(fileInfo->fileId, pageNum);
920  numPagesNeeded--;
921  }
922  } while (pageNum != -1 && numPagesNeeded > 0);
923  if (numPagesNeeded == 0) {
924  break;
925  }
926  }
927  CHECK(pages.size() == numPagesRequested);
928 }
929 
930 FileInfo* FileMgr::openExistingFile(const std::string& path,
931  const int fileId,
932  const size_t pageSize,
933  const size_t numPages,
934  std::vector<HeaderInfo>& headerVec) {
935  FILE* f = open(path);
936  FileInfo* fInfo = new FileInfo(
937  this, fileId, f, pageSize, numPages, false); // false means don't init file
938 
939  fInfo->openExistingFile(headerVec);
940  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
941  files_[fileId] = fInfo;
942  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
943  return fInfo;
944 }
945 
946 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
947  // check arguments
948  if (pageSize == 0 || numPages == 0) {
949  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
950  }
951 
952  // create the new file
953  FILE* f = create(fileMgrBasePath_,
954  nextFileId_,
955  pageSize,
956  numPages); // TM: not sure if I like naming scheme here - should be in
957  // separate namespace?
958  CHECK(f);
959 
960  // instantiate a new FileInfo for the newly created file
961  int32_t fileId = nextFileId_++;
962  FileInfo* fInfo =
963  new FileInfo(this, fileId, f, pageSize, numPages, true); // true means init file
964  CHECK(fInfo);
965 
966  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
967  // update file manager data structures
968  files_[fileId] = fInfo;
969  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
970 
971  return fInfo;
972 }
973 
974 FILE* FileMgr::getFileForFileId(const int32_t fileId) {
975  CHECK(fileId >= 0);
976  CHECK(files_.find(fileId) != files_.end());
977  return files_[fileId]->f;
978 }
979 
981  const ChunkKey& keyPrefix) {
982  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(
983  chunkIndexMutex_); // is this guarding the right structure? it look slike we oly
984  // read here for chunk
985  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
986  if (chunkIt == chunkIndex_.end()) {
987  return; // throw?
988  }
989  while (chunkIt != chunkIndex_.end() &&
990  std::search(chunkIt->first.begin(),
991  chunkIt->first.begin() + keyPrefix.size(),
992  keyPrefix.begin(),
993  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
994  if (chunkIt->second->hasEncoder()) {
995  auto chunk_metadata = std::make_shared<ChunkMetadata>();
996  chunkIt->second->encoder_->getMetadata(chunk_metadata);
997  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
998  }
999  chunkIt++;
1000  }
1001 }
1002 
1004  size_t num_used_pages = 0;
1005  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
1006  for (const auto file_info_entry : files_) {
1007  num_used_pages +=
1008  (file_info_entry.second->numPages - file_info_entry.second->freePages.size());
1009  }
1010  return num_used_pages;
1011 }
1012 
1014  size_t num_used_metadata_pages = 0;
1015  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
1016  for (const auto& chunkIt : chunkIndex_) {
1017  num_used_metadata_pages += chunkIt.second->numMetadataPages();
1018  }
1019  return num_used_metadata_pages;
1020 }
1021 
1023  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
1024  const auto& chunkIt = chunkIndex_.find(chunkKey);
1025  if (chunkIt != chunkIndex_.end()) {
1026  return chunkIt->second->numMetadataPages();
1027  } else {
1028  throw std::runtime_error("Chunk was not found.");
1029  }
1030 }
1031 
1032 int32_t FileMgr::getDBVersion() const {
1033  return gfm_->getDBVersion();
1034 }
1035 
1037  return gfm_->getDBConvert();
1038 }
1039 
1042 
1043  if (db_version_ > getDBVersion()) {
1044  LOG(FATAL) << "DB forward compatibility is not supported. Version of OmniSci "
1045  "software used is older than the version of DB being read: "
1046  << db_version_;
1047  }
1049  // new system, or we are moving forward versions
1050  // system wide migration would go here if required
1052  return;
1053  }
1054 }
1055 
1056 int32_t FileMgr::readVersionFromDisk(const std::string& versionFileName) const {
1057  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1058  if (!boost::filesystem::exists(versionFilePath)) {
1059  return -1;
1060  }
1061  if (!boost::filesystem::is_regular_file(versionFilePath)) {
1062  return -1;
1063  }
1064  if (boost::filesystem::file_size(versionFilePath) < 4) {
1065  return -1;
1066  }
1067  FILE* versionFile = open(versionFilePath);
1068  int32_t version;
1069  read(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1070  close(versionFile);
1071  return version;
1072 }
1073 
1074 void FileMgr::writeAndSyncVersionToDisk(const std::string& versionFileName,
1075  const int32_t version) {
1076  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1077  FILE* versionFile;
1078  if (boost::filesystem::exists(versionFilePath)) {
1079  int32_t oldVersion = readVersionFromDisk(versionFileName);
1080  LOG(INFO) << "Storage version file `" << versionFilePath
1081  << "` already exists, its current version is " << oldVersion;
1082  versionFile = open(versionFilePath);
1083  } else {
1084  versionFile = create(versionFilePath, sizeof(int32_t));
1085  }
1086  write(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1087  int32_t status = fflush(versionFile);
1088  if (status != 0) {
1089  LOG(FATAL) << "Could not flush version file " << versionFilePath << " to disk";
1090  }
1091 #ifdef __APPLE__
1092  status = fcntl(fileno(epochFile_), 51);
1093 #else
1094  status = omnisci::fsync(fileno(versionFile));
1095 #endif
1096  if (status != 0) {
1097  LOG(FATAL) << "Could not sync version file " << versionFilePath << " to disk";
1098  }
1099  close(versionFile);
1100 }
1101 
1103  const std::string versionFilePath(fileMgrBasePath_ + "/" + FILE_MGR_VERSION_FILENAME);
1104  LOG(INFO) << "Migrating file format version from 0 to 1 for `" << versionFilePath;
1109  int32_t migrationCompleteVersion = 1;
1110  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migrationCompleteVersion);
1111 }
1112 
1116  fileMgrVersion_ = 0;
1118  } else if (fileMgrVersion_ > latestFileMgrVersion_) {
1119  LOG(FATAL)
1120  << "Table storage forward compatibility is not supported. Version of OmniSci "
1121  "software used is older than the version of table being read: "
1122  << fileMgrVersion_;
1123  }
1124 
1127  switch (fileMgrVersion_) {
1128  case 0: {
1130  break;
1131  }
1132  default: {
1133  UNREACHABLE();
1134  }
1135  }
1136  fileMgrVersion_++;
1137  }
1138  }
1139 }
1140 
1141 /*
1142  * @brief sets the epoch to a user-specified value
1143  *
1144  * With the introduction of optional capped history on files, the possibility of
1145  * multiple successive rollbacks to earlier epochs means that we cannot rely on
1146  * the maxRollbackEpochs_ variable alone (initialized from a value stored in Catalog) to
1147  * guarantee that we can set an epoch at any given value. This function checks the
1148  * user-specified epoch value to ensure it is both not higher than the last checkpointed
1149  * epoch AND it is >= the epoch floor, which on an uncapped value will be the epoch the
1150  * table table was created at (default 0), and for capped tables, the lowest epoch for
1151  * which we have complete data, now materialized in the epoch metadata itself. */
1152 
1153 void FileMgr::setEpoch(const int32_t newEpoch) {
1154  if (newEpoch < epoch_.floor()) {
1155  std::stringstream error_message;
1156  error_message << "Cannot set epoch for " << describeSelf()
1157  << " lower than the minimum rollback epoch (" << epoch_.floor() << ").";
1158  throw std::runtime_error(error_message.str());
1159  }
1160  epoch_.ceiling(newEpoch);
1162 }
1163 
1164 void FileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
1165  std::unique_lock<mapd_shared_mutex> lock(mutex_free_page_);
1166  free_pages_.push_back(page);
1167 }
1168 
1169 void FileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t table_id) {
1170  UNREACHABLE();
1171 }
1172 
1178 void FileMgr::resumeFileCompaction(const std::string& status_file_name) {
1179  if (status_file_name == COPY_PAGES_STATUS) {
1180  // Delete status file and restart data compaction process
1181  auto file_path = getFilePath(status_file_name);
1182  CHECK(boost::filesystem::exists(file_path));
1183  boost::filesystem::remove(file_path);
1184  compactFiles();
1185  } else if (status_file_name == UPDATE_PAGE_VISIBILITY_STATUS) {
1186  // Execute second and third phases of data compaction
1187  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1188  auto page_mappings = readPageMappingsFromStatusFile();
1189  updateMappedPagesVisibility(page_mappings);
1191  deleteEmptyFiles();
1192  } else if (status_file_name == DELETE_EMPTY_FILES_STATUS) {
1193  // Execute last phase of data compaction
1194  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1195  deleteEmptyFiles();
1196  } else {
1197  UNREACHABLE() << "Unexpected status file name: " << status_file_name;
1198  }
1199 }
1200 
1229  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1230  if (files_.empty()) {
1231  return;
1232  }
1233 
1234  auto copy_pages_status_file_path = getFilePath(COPY_PAGES_STATUS);
1235  CHECK(!boost::filesystem::exists(copy_pages_status_file_path));
1236  std::ofstream status_file(copy_pages_status_file_path.string(),
1237  std::ios::out | std::ios::binary);
1238  status_file.close();
1239 
1240  std::vector<PageMapping> page_mappings;
1241  std::set<Page> touched_pages;
1242  std::set<size_t> page_sizes;
1243  for (auto [file_id, file_info] : files_) {
1244  page_sizes.emplace(file_info->pageSize);
1245  }
1246  for (auto page_size : page_sizes) {
1247  sortAndCopyFilePagesForCompaction(page_size, page_mappings, touched_pages);
1248  }
1249 
1250  writePageMappingsToStatusFile(page_mappings);
1252 
1253  updateMappedPagesVisibility(page_mappings);
1255 
1256  deleteEmptyFiles();
1257 }
1258 
1266  std::vector<PageMapping>& page_mappings,
1267  std::set<Page>& touched_pages) {
1268  std::vector<FileInfo*> sorted_file_infos;
1269  auto range = fileIndex_.equal_range(page_size);
1270  for (auto it = range.first; it != range.second; it++) {
1271  sorted_file_infos.emplace_back(files_[it->second]);
1272  }
1273  if (sorted_file_infos.empty()) {
1274  return;
1275  }
1276 
1277  // Sort file infos in ascending order of free pages count i.e. from files with
1278  // the least number of free pages to those with the highest number of free pages.
1279  std::sort(sorted_file_infos.begin(),
1280  sorted_file_infos.end(),
1281  [](const FileInfo* file_1, const FileInfo* file_2) {
1282  return file_1->freePages.size() < file_2->freePages.size();
1283  });
1284 
1285  size_t destination_index = 0, source_index = sorted_file_infos.size() - 1;
1286 
1287  // For page copy destinations, skip files without free pages.
1288  while (destination_index < source_index &&
1289  sorted_file_infos[destination_index]->freePages.empty()) {
1290  destination_index++;
1291  }
1292 
1293  // For page copy sources, skip files with only free pages.
1294  while (destination_index < source_index &&
1295  sorted_file_infos[source_index]->freePages.size() ==
1296  sorted_file_infos[source_index]->numPages) {
1297  source_index--;
1298  }
1299 
1300  std::set<size_t> source_used_pages;
1301  CHECK(destination_index <= source_index);
1302 
1303  // Get the total number of free pages available for compaction
1304  int64_t total_free_pages{0};
1305  for (size_t i = destination_index; i <= source_index; i++) {
1306  total_free_pages += sorted_file_infos[i]->numFreePages();
1307  }
1308 
1309  while (destination_index < source_index) {
1310  if (source_used_pages.empty()) {
1311  // Populate source_used_pages with only used pages in the source file.
1312  auto source_file_info = sorted_file_infos[source_index];
1313  auto& free_pages = source_file_info->freePages;
1314  for (size_t page_num = 0; page_num < source_file_info->numPages; page_num++) {
1315  if (free_pages.find(page_num) == free_pages.end()) {
1316  source_used_pages.emplace(page_num);
1317  }
1318  }
1319 
1320  // Free pages of current source file will not be copy destinations
1321  total_free_pages -= source_file_info->numFreePages();
1322  }
1323 
1324  // Exit early if there are not enough free pages to empty the next file
1325  if (total_free_pages - static_cast<int64_t>(source_used_pages.size()) < 0) {
1326  return;
1327  }
1328 
1329  // Copy pages from source files to destination files
1330  auto dest_file_info = sorted_file_infos[destination_index];
1331  while (!source_used_pages.empty() && !dest_file_info->freePages.empty()) {
1332  // Get next page to copy
1333  size_t source_page_num = *source_used_pages.begin();
1334  source_used_pages.erase(source_page_num);
1335 
1336  Page source_page{sorted_file_infos[source_index]->fileId, source_page_num};
1337  copySourcePageForCompaction(source_page,
1338  sorted_file_infos[destination_index],
1339  page_mappings,
1340  touched_pages);
1341  total_free_pages--;
1342  }
1343 
1344  if (source_used_pages.empty()) {
1345  source_index--;
1346  }
1347 
1348  if (dest_file_info->freePages.empty()) {
1349  destination_index++;
1350  }
1351  }
1352 }
1353 
1361  FileInfo* destination_file_info,
1362  std::vector<PageMapping>& page_mappings,
1363  std::set<Page>& touched_pages) {
1364  size_t destination_page_num = destination_file_info->getFreePage();
1365  CHECK_NE(destination_page_num, static_cast<size_t>(-1));
1366  Page destination_page{destination_file_info->fileId, destination_page_num};
1367 
1368  // Assert that the same pages are not copied or overridden multiple times
1369  CHECK(touched_pages.find(source_page) == touched_pages.end());
1370  touched_pages.emplace(source_page);
1371 
1372  CHECK(touched_pages.find(destination_page) == touched_pages.end());
1373  touched_pages.emplace(destination_page);
1374 
1375  auto header_size = copyPageWithoutHeaderSize(source_page, destination_page);
1376  page_mappings.emplace_back(static_cast<size_t>(source_page.fileId),
1377  source_page.pageNum,
1378  header_size,
1379  static_cast<size_t>(destination_page.fileId),
1380  destination_page.pageNum);
1381 }
1382 
1390 int32_t FileMgr::copyPageWithoutHeaderSize(const Page& source_page,
1391  const Page& destination_page) {
1392  FileInfo* source_file_info = files_[source_page.fileId];
1393  CHECK(source_file_info);
1394  CHECK_EQ(source_file_info->fileId, source_page.fileId);
1395 
1396  FileInfo* destination_file_info = files_[destination_page.fileId];
1397  CHECK(destination_file_info);
1398  CHECK_EQ(destination_file_info->fileId, destination_page.fileId);
1399  CHECK_EQ(source_file_info->pageSize, destination_file_info->pageSize);
1400 
1401  auto page_size = source_file_info->pageSize;
1402  auto buffer = std::make_unique<int8_t[]>(page_size);
1403  size_t bytes_read =
1404  source_file_info->read(source_page.pageNum * page_size, page_size, buffer.get());
1405  CHECK_EQ(page_size, bytes_read);
1406 
1407  auto header_size_offset = sizeof(int32_t);
1408  size_t bytes_written = destination_file_info->write(
1409  (destination_page.pageNum * page_size) + header_size_offset,
1410  page_size - header_size_offset,
1411  buffer.get() + header_size_offset);
1412  CHECK_EQ(page_size - header_size_offset, bytes_written);
1413  return reinterpret_cast<int32_t*>(buffer.get())[0];
1414 }
1415 
1420 void FileMgr::updateMappedPagesVisibility(const std::vector<PageMapping>& page_mappings) {
1421  for (const auto& page_mapping : page_mappings) {
1422  auto destination_file = files_[page_mapping.destination_file_id];
1423 
1424  // Set destination page header size
1425  auto header_size = page_mapping.source_page_header_size;
1426  CHECK_GT(header_size, 0);
1427  destination_file->write(
1428  page_mapping.destination_page_num * destination_file->pageSize,
1429  sizeof(PageHeaderSizeType),
1430  reinterpret_cast<int8_t*>(&header_size));
1431  auto source_file = files_[page_mapping.source_file_id];
1432 
1433  // Free source page
1434  PageHeaderSizeType free_page_header_size{0};
1435  source_file->write(page_mapping.source_page_num * source_file->pageSize,
1436  sizeof(PageHeaderSizeType),
1437  reinterpret_cast<int8_t*>(&free_page_header_size));
1438  source_file->freePageDeferred(page_mapping.source_page_num);
1439  }
1440 
1441  for (auto file_info_entry : files_) {
1442  int32_t status = file_info_entry.second->syncToDisk();
1443  if (status != 0) {
1444  LOG(FATAL) << "Could not sync file to disk";
1445  }
1446  }
1447 }
1448 
1454  for (auto [file_id, file_info] : files_) {
1455  CHECK_EQ(file_id, file_info->fileId);
1456  if (file_info->freePages.size() == file_info->numPages) {
1457  fclose(file_info->f);
1458  file_info->f = nullptr;
1459  auto file_path = get_data_file_path(fileMgrBasePath_, file_id, file_info->pageSize);
1460  boost::filesystem::remove(file_path);
1461  }
1462  }
1463 
1464  auto status_file_path = getFilePath(DELETE_EMPTY_FILES_STATUS);
1465  CHECK(boost::filesystem::exists(status_file_path));
1466  boost::filesystem::remove(status_file_path);
1467 }
1468 
1475  const std::vector<PageMapping>& page_mappings) {
1476  auto file_path = getFilePath(COPY_PAGES_STATUS);
1477  CHECK(boost::filesystem::exists(file_path));
1478  CHECK(boost::filesystem::is_empty(file_path));
1479  std::ofstream status_file{file_path.string(), std::ios::out | std::ios::binary};
1480  int64_t page_mappings_count = page_mappings.size();
1481  status_file.write(reinterpret_cast<const char*>(&page_mappings_count), sizeof(int64_t));
1482  status_file.write(reinterpret_cast<const char*>(page_mappings.data()),
1483  page_mappings_count * sizeof(PageMapping));
1484  status_file.close();
1485 }
1486 
1490 std::vector<PageMapping> FileMgr::readPageMappingsFromStatusFile() {
1491  auto file_path = getFilePath(UPDATE_PAGE_VISIBILITY_STATUS);
1492  CHECK(boost::filesystem::exists(file_path));
1493  std::ifstream status_file{file_path.string(),
1494  std::ios::in | std::ios::binary | std::ios::ate};
1495  CHECK(status_file.is_open());
1496  size_t file_size = status_file.tellg();
1497  status_file.seekg(0, std::ios::beg);
1498  CHECK_GE(file_size, sizeof(int64_t));
1499 
1500  int64_t page_mappings_count;
1501  status_file.read(reinterpret_cast<char*>(&page_mappings_count), sizeof(int64_t));
1502  auto page_mappings_byte_size = file_size - sizeof(int64_t);
1503  CHECK_EQ(page_mappings_byte_size % sizeof(PageMapping), static_cast<size_t>(0));
1504  CHECK_EQ(static_cast<size_t>(page_mappings_count),
1505  page_mappings_byte_size / sizeof(PageMapping));
1506 
1507  std::vector<PageMapping> page_mappings(page_mappings_count);
1508  status_file.read(reinterpret_cast<char*>(page_mappings.data()),
1509  page_mappings_byte_size);
1510  status_file.close();
1511  return page_mappings;
1512 }
1513 
1517 void FileMgr::renameCompactionStatusFile(const char* const from_status,
1518  const char* const to_status) {
1519  auto from_status_file_path = getFilePath(from_status);
1520  auto to_status_file_path = getFilePath(to_status);
1521  CHECK(boost::filesystem::exists(from_status_file_path));
1522  CHECK(!boost::filesystem::exists(to_status_file_path));
1523  boost::filesystem::rename(from_status_file_path, to_status_file_path);
1524 }
1525 
1526 // Methods that enable override of number of pages per data/metadata files
1527 // for use in unit tests.
1528 void FileMgr::setNumPagesPerDataFile(size_t num_pages) {
1529  num_pages_per_data_file_ = num_pages;
1530 }
1531 
1532 void FileMgr::setNumPagesPerMetadataFile(size_t num_pages) {
1533  num_pages_per_metadata_file_ = num_pages;
1534 }
1535 
1537  mapd_shared_lock<mapd_shared_mutex> files_read_lock(files_rw_mutex_);
1538  for (auto file_info_entry : files_) {
1539  int32_t status = file_info_entry.second->syncToDisk();
1540  CHECK(status == 0) << "Could not sync file to disk";
1541  }
1542 }
1543 
1544 void FileMgr::initializeNumThreads(size_t num_reader_threads) {
1545  // # of threads is based on # of cores on the host
1546  size_t num_hardware_based_threads = std::thread::hardware_concurrency();
1547  if (num_reader_threads == 0 || num_reader_threads > num_hardware_based_threads) {
1548  // # of threads has not been defined by user
1549  num_reader_threads_ = num_hardware_based_threads;
1550  } else {
1551  num_reader_threads_ = num_reader_threads;
1552  }
1553 }
1554 
1556  mapd_unique_lock<mapd_shared_mutex> free_pages_write_lock(mutex_free_page_);
1557  for (auto& free_page : free_pages_) {
1558  free_page.first->freePageDeferred(free_page.second);
1559  }
1560  free_pages_.clear();
1561 }
1562 
1563 FileBuffer* FileMgr::allocateBuffer(const size_t page_size,
1564  const ChunkKey& key,
1565  const size_t num_bytes) {
1566  return new FileBuffer(this, page_size, key, num_bytes);
1567 }
1568 
1569 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
1571  ChunkKey& chunk_key,
1572  int32_t contingent,
1573  int32_t page_epoch,
1574  int32_t page_num) {
1575  // If the parent FileMgr has a fileMgrKey, then all keys are locked to one table and
1576  // can be set from the manager.
1577  auto [db_id, tb_id] = get_fileMgrKey();
1578  chunk_key[CHUNK_KEY_DB_IDX] = db_id;
1579  chunk_key[CHUNK_KEY_TABLE_IDX] = tb_id;
1580  const bool delete_contingent =
1581  (contingent == DELETE_CONTINGENT || contingent == ROLLOFF_CONTINGENT);
1582  // Check if page was deleted with a checkpointed epoch
1583  if (delete_contingent && epoch(db_id, tb_id) >= page_epoch) {
1584  file_info->freePageImmediate(page_num);
1585  return true;
1586  }
1587  // Recover page if it was deleted but not checkpointed.
1588  if (!g_read_only && delete_contingent) {
1589  file_info->recoverPage(chunk_key, page_num);
1590  }
1591  return false;
1592 }
1593 
1595  FileBuffer* buf;
1596  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
1597  auto chunkIt = chunkIndex_.find(key);
1598  if (chunkIt == chunkIndex_.end()) {
1600  } else {
1601  buf = chunkIt->second;
1602  }
1603  return buf;
1604 }
1605 
1607  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
1608  for (auto [key, buf] : chunkIndex_) {
1609  if (buf->isDirty()) {
1610  buf->writeMetadata(epoch());
1611  buf->clearDirtyBits();
1612  }
1613  }
1614 }
1615 
1616 size_t FileMgr::num_pages_per_data_file_{DEFAULT_NUM_PAGES_PER_DATA_FILE};
1617 size_t FileMgr::num_pages_per_metadata_file_{DEFAULT_NUM_PAGES_PER_METADATA_FILE};
1618 } // namespace File_Namespace
virtual std::vector< MultiPage > getMultiPage() const
Returns vector of MultiPages in the FileBuffer.
Definition: FileBuffer.h:144
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:597
size_t getNumUsedPages() const
Definition: FileMgr.cpp:1003
std::vector< PageMapping > readPageMappingsFromStatusFile()
Definition: FileMgr.cpp:1490
int32_t readVersionFromDisk(const std::string &versionFileName) const
Definition: FileMgr.cpp:1056
#define CHECK_EQ(x, y)
Definition: Logger.h:211
void deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const bool purge=true)
Definition: FileMgr.cpp:736
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator)
Definition: FileMgr.cpp:146
OpenFilesResult openFiles()
Definition: FileMgr.cpp:189
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:852
TablePair fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:504
std::string getBasePath() const
int8_t * storage_ptr()
Definition: Epoch.h:61
mapd_shared_mutex mutex_free_page_
Definition: FileMgr.h:405
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:586
Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:861
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
Definition: FileMgr.cpp:1022
void syncEncoder(const AbstractBuffer *src_buffer)
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:695
void sortAndCopyFilePagesForCompaction(size_t page_size, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1265
std::vector< HeaderInfo > header_infos
Definition: FileMgr.h:115
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:194
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
void copySourcePageForCompaction(const Page &source_page, FileInfo *destination_file_info, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1360
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:857
virtual MemoryLevel getType() const =0
void freePageImmediate(int32_t page_num)
Definition: FileInfo.cpp:244
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:564
void migrateToLatestFileMgrVersion()
Definition: FileMgr.cpp:1113
#define MAPD_FILE_EXT
Definition: File.h:25
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
Definition: FileMgr.cpp:666
#define UNREACHABLE()
Definition: Logger.h:247
static constexpr char const * UPDATE_PAGE_VISIBILITY_STATUS
Definition: FileMgr.h:369
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:216
void init(const size_t num_reader_threads, const int32_t epochOverride)
Definition: FileMgr.cpp:249
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:728
std::vector< MultiPage > multiPages_
Definition: FileBuffer.h:197
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:328
GlobalFileMgr * gfm_
Definition: FileMgr.h:503
bool is_compaction_status_file(const std::string &file_name)
Definition: FileMgr.cpp:182
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:401
int32_t floor() const
Definition: Epoch.h:43
int32_t ceiling() const
Definition: Epoch.h:44
virtual bool updatePageIfDeleted(FileInfo *file_info, ChunkKey &chunk_key, int32_t contingent, int32_t page_epoch, int32_t page_num)
deletes or recovers a page based on last checkpointed epoch.
Definition: FileMgr.cpp:1570
std::optional< uint64_t > total_free_data_page_count
Definition: FileMgr.h:107
void writePageMappingsToStatusFile(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1474
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:58
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:685
int32_t copyPageWithoutHeaderSize(const Page &source_page, const Page &destination_page)
Definition: FileMgr.cpp:1390
#define CHECK_GT(x, y)
Definition: Logger.h:215
std::string fileMgrBasePath_
Definition: FileMgr.h:388
static void setNumPagesPerMetadataFile(size_t num_pages)
Definition: FileMgr.cpp:1532
std::string to_string(char const *&&v)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:49
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:774
std::string show_chunk(const ChunkKey &key)
Definition: types.h:85
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
static size_t num_pages_per_data_file_
Definition: FileMgr.h:409
const int32_t latestFileMgrVersion_
Definition: FileMgr.h:399
int32_t PageHeaderSizeType
Definition: FileMgr.h:121
size_t write(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:60
int32_t db_version_
the index of the next file id
Definition: FileMgr.h:396
std::set< size_t > freePages
Definition: FileInfo.h:62
size_t pageSize
file stream object for the represented file
Definition: FileInfo.h:59
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
Definition: FileMgr.cpp:1074
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:133
int32_t incrementEpoch()
Definition: FileMgr.h:275
virtual std::string describeSelf()
Definition: FileMgr.cpp:679
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
Definition: FileMgr.cpp:1169
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
Definition: FileInfo.h:51
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:395
StorageStats getStorageStats()
Definition: FileMgr.cpp:332
FileBuffer * getOrCreateBuffer(const ChunkKey &key)
Definition: FileMgr.cpp:1594
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define CHECK_NE(x, y)
Definition: Logger.h:212
static void setNumPagesPerDataFile(size_t num_pages)
Definition: FileMgr.cpp:1528
std::optional< uint64_t > total_free_metadata_page_count
Definition: FileMgr.h:103
int32_t fileId
Definition: Page.h:47
int32_t getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:1032
size_t pageSize() const override
Returns the size in bytes of each page in the FileBuffer.
Definition: FileBuffer.h:134
void freePagesBeforeEpoch(const int32_t min_epoch)
Definition: FileMgr.cpp:652
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:323
PageSizeFileMMap fileIndex_
A map of files accessible via a file identifier.
Definition: FileMgr.h:392
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
uint64_t total_metadata_page_count
Definition: FileMgr.h:102
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:974
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
Definition: FileMgr.h:406
static constexpr char FILE_MGR_VERSION_FILENAME[]
Definition: FileMgr.h:380
An AbstractBuffer is a unit of data management for a data manager.
size_t getNumUsedMetadataPages() const
Definition: FileMgr.cpp:1013
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:393
static size_t num_pages_per_metadata_file_
Definition: FileMgr.h:410
int fsync(int fd)
Definition: omnisci_fs.cpp:60
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:723
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:930
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:638
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
constexpr int32_t ROLLOFF_CONTINGENT
Definition: FileInfo.h:52
std::string compaction_status_file_name
Definition: FileMgr.h:117
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:746
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:980
FileInfo * getFileInfoForFileId(const int32_t fileId)
Definition: FileMgr.h:220
const TablePair get_fileMgrKey() const
Definition: FileMgr.h:335
#define CHECK_LT(x, y)
Definition: Logger.h:213
static constexpr char EPOCH_FILENAME[]
Definition: FileMgr.h:378
string version
Definition: setup.in.py:73
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:394
virtual FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:705
void writeHeader(Page &page, const int32_t pageId, const int32_t epoch, const bool writeMetadata=false)
Write header writes header at top of page in format.
Definition: FileBuffer.cpp:413
int32_t maxRollbackEpochs_
Definition: FileMgr.h:387
void freePagesBeforeEpochUnlocked(const int32_t min_epoch, const ChunkKeyToChunkMap::iterator lower_bound, const ChunkKeyToChunkMap::iterator upper_bound)
Definition: FileMgr.cpp:657
virtual size_t reservedHeaderSize() const
Definition: FileBuffer.h:141
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
uint64_t total_metadata_file_size
Definition: FileMgr.h:101
bool g_read_only
Definition: File.cpp:38
virtual FileBuffer * allocateBuffer(const size_t page_size, const ChunkKey &key, const size_t num_bytes)
Definition: FileMgr.cpp:1563
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:102
void openAndReadEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:618
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:66
bool is_metadata_file(size_t file_size, size_t page_size, size_t num_pages_per_metadata_file)
Definition: FileMgr.cpp:324
static size_t byte_size()
Definition: Epoch.h:63
static constexpr char const * DELETE_EMPTY_FILES_STATUS
Definition: FileMgr.h:370
virtual ~FileMgr() override
Destructor.
Definition: FileMgr.cpp:106
bool getDBConvert() const
Definition: FileMgr.cpp:1036
void setSize(const size_t size)
void free_page(std::pair< FileInfo *, int32_t > &&page)
Definition: FileMgr.cpp:1164
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
virtual void closeRemovePhysical()
Definition: FileMgr.cpp:557
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:98
static constexpr char const * COPY_PAGES_STATUS
Definition: FileMgr.h:368
int32_t epoch() const
Definition: FileMgr.h:500
void openExistingFile(std::vector< HeaderInfo > &headerVec)
Definition: FileInfo.cpp:71
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:946
std::map< int32_t, FileInfo * > files_
Definition: FileMgr.h:391
void updateMappedPagesVisibility(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1420
#define CHECK(condition)
Definition: Logger.h:203
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:766
void recoverPage(const ChunkKey &chunk_key, int32_t page_num)
Definition: FileInfo.cpp:253
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:119
char * f
void setEpoch(const int32_t newEpoch)
Definition: FileMgr.cpp:1153
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
Definition: FileMgr.cpp:126
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1544
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:885
mapd_unique_lock< mapd_shared_mutex > write_lock
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:86
std::string get_data_file_path(const std::string &base_path, int file_id, size_t page_size)
Definition: File.cpp:42
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:218
int32_t epochFloor() const
Definition: FileMgr.h:273
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:402
void renameCompactionStatusFile(const char *const from_status, const char *const to_status)
Definition: FileMgr.cpp:1517
virtual size_t pageDataSize() const
Returns the size in bytes of the data portion of each page in the FileBuffer.
Definition: FileBuffer.h:137
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:403
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:79
static constexpr char LEGACY_EPOCH_FILENAME[]
Definition: FileMgr.h:377
A selection of helper methods for File I/O.
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:803
#define VLOG(n)
Definition: Logger.h:297
Type timer_start()
Definition: measure.h:42
int32_t lastCheckpointedEpoch()
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:291
void resumeFileCompaction(const std::string &status_file_name)
Definition: FileMgr.cpp:1178
static constexpr char DB_META_FILENAME[]
Definition: FileMgr.h:379
static constexpr int32_t INVALID_VERSION
Definition: FileMgr.h:381
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
boost::filesystem::path getFilePath(const std::string &file_name)
Definition: FileMgr.h:337