OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <fstream>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/checked_alloc.h"
41 #include "Shared/measure.h"
42 
43 using namespace std;
44 
45 namespace File_Namespace {
46 
47 FileMgr::FileMgr(const int32_t device_id,
48  GlobalFileMgr* gfm,
49  const TablePair file_mgr_key,
50  const int32_t max_rollback_epochs,
51  const size_t num_reader_threads,
52  const int32_t epoch)
53  : AbstractBufferMgr(device_id)
54  , maxRollbackEpochs_(max_rollback_epochs)
55  , nextFileId_(0)
56  , gfm_(gfm)
57  , fileMgrKey_(file_mgr_key)
58  , page_size_(gfm->getPageSize())
59  , metadata_page_size_(gfm->getMetadataPageSize()) {
60  init(num_reader_threads, epoch);
61 }
62 
63 // used only to initialize enough to drop
64 FileMgr::FileMgr(const int32_t device_id,
65  GlobalFileMgr* gfm,
66  const TablePair file_mgr_key,
67  const bool run_core_init)
68  : AbstractBufferMgr(device_id)
69  , maxRollbackEpochs_(-1)
70  , nextFileId_(0)
71  , gfm_(gfm)
72  , fileMgrKey_(file_mgr_key)
73  , page_size_(gfm->getPageSize())
74  , metadata_page_size_(gfm->getMetadataPageSize()) {
75  const std::string fileMgrDirPrefix("table");
76  const std::string fileMgrDirDelim("_");
77  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + fileMgrDirDelim +
78  std::to_string(fileMgrKey_.first) + // db_id
79  fileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
80  epochFile_ = nullptr;
81  files_.clear();
82  if (run_core_init) {
83  coreInit();
84  }
85 }
86 
87 FileMgr::FileMgr(GlobalFileMgr* gfm, std::string base_path)
88  : AbstractBufferMgr(0)
89  , maxRollbackEpochs_(-1)
90  , fileMgrBasePath_(base_path)
91  , nextFileId_(0)
92  , gfm_(gfm)
93  , fileMgrKey_(0, 0)
94  , page_size_(gfm->getPageSize())
95  , metadata_page_size_(gfm->getMetadataPageSize()) {
96  init(base_path, -1);
97 }
98 
99 // For testing purposes only
100 FileMgr::FileMgr(const int epoch)
101  : AbstractBufferMgr(-1)
102  , page_size_(DEFAULT_PAGE_SIZE)
103  , metadata_page_size_(DEFAULT_METADATA_PAGE_SIZE) {
104  epoch_.ceiling(epoch);
105 }
106 
107 // Used to initialize CachingFileMgr.
108 FileMgr::FileMgr(const size_t page_size, const size_t metadata_page_size)
109  : AbstractBufferMgr(0)
110  , page_size_(page_size)
111  , metadata_page_size_(metadata_page_size) {}
112 
114  // free memory used by FileInfo objects
115  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
116  delete chunkIt->second;
117  }
118  for (auto file_info_entry : files_) {
119  delete file_info_entry.second;
120  }
121 
122  if (epochFile_) {
123  close(epochFile_);
124  epochFile_ = nullptr;
125  }
126 
127  if (DBMetaFile_) {
129  DBMetaFile_ = nullptr;
130  }
131 }
132 
135  const std::string fileMgrDirPrefix("table");
136  const std::string FileMgrDirDelim("_");
137  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
138  std::to_string(fileMgrKey_.first) + // db_id
139  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
140  boost::filesystem::path path(fileMgrBasePath_);
141  if (boost::filesystem::exists(path)) {
142  if (!boost::filesystem::is_directory(path)) {
143  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
144  << "' for table data is not a directory.";
145  }
148  return true;
149  }
150  return false;
151 }
152 
154  const boost::filesystem::directory_iterator& fileIterator) const {
155  FileMetadata fileMetadata;
156  fileMetadata.is_data_file = false;
157  fileMetadata.file_path = fileIterator->path().string();
158  if (!boost::filesystem::is_regular_file(fileIterator->status())) {
159  return fileMetadata;
160  }
161  // note that boost::filesystem leaves preceding dot on
162  // extension - hence DATA_FILE_EXT is ".data"
163  std::string extension(fileIterator->path().extension().string());
164  if (extension == DATA_FILE_EXT) {
165  std::string fileStem(fileIterator->path().stem().string());
166  // remove trailing dot if any
167  if (fileStem.size() > 0 && fileStem.back() == '.') {
168  fileStem = fileStem.substr(0, fileStem.size() - 1);
169  }
170  size_t dotPos = fileStem.find_last_of("."); // should only be one
171  if (dotPos == std::string::npos) {
172  LOG(FATAL) << "File `" << fileIterator->path()
173  << "` does not carry page size information in the filename.";
174  }
175  fileMetadata.is_data_file = true;
176  fileMetadata.file_id = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
177  fileMetadata.page_size =
178  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
179 
180  fileMetadata.file_size = boost::filesystem::file_size(fileMetadata.file_path);
181  CHECK_EQ(fileMetadata.file_size % fileMetadata.page_size,
182  size_t(0)); // should be no partial pages
183  fileMetadata.num_pages = fileMetadata.file_size / fileMetadata.page_size;
184  }
185  return fileMetadata;
186 }
187 
188 namespace {
189 bool is_compaction_status_file(const std::string& file_name) {
190  return (file_name == FileMgr::COPY_PAGES_STATUS ||
193 }
194 } // namespace
195 
197  auto clock_begin = timer_start();
198  boost::filesystem::directory_iterator
199  end_itr; // default construction yields past-the-end
201  result.max_file_id = -1;
202  int32_t file_count = 0;
203  int32_t thread_count = std::thread::hardware_concurrency();
204  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
205  boost::filesystem::path path(fileMgrBasePath_);
206  for (boost::filesystem::directory_iterator file_it(path); file_it != end_itr;
207  ++file_it) {
208  FileMetadata file_metadata = getMetadataForFile(file_it);
209  if (file_metadata.is_data_file) {
210  result.max_file_id = std::max(result.max_file_id, file_metadata.file_id);
211  file_futures.emplace_back(std::async(std::launch::async, [file_metadata, this] {
212  std::vector<HeaderInfo> temp_header_vec;
213  openExistingFile(file_metadata.file_path,
214  file_metadata.file_id,
215  file_metadata.page_size,
216  file_metadata.num_pages,
217  temp_header_vec);
218  return temp_header_vec;
219  }));
220  file_count++;
221  if (file_count % thread_count == 0) {
222  processFileFutures(file_futures, result.header_infos);
223  }
224  }
225 
226  if (is_compaction_status_file(file_it->path().filename().string())) {
227  CHECK(result.compaction_status_file_name.empty());
228  result.compaction_status_file_name = file_it->path().filename().string();
229  }
230  }
231 
232  if (file_futures.size() > 0) {
233  processFileFutures(file_futures, result.header_infos);
234  }
235 
236  int64_t queue_time_ms = timer_stop(clock_begin);
237  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : " << queue_time_ms
238  << "ms Epoch: " << epoch_.ceiling() << " files read: " << file_count
239  << " table location: '" << fileMgrBasePath_ << "'";
240  return result;
241 }
242 
244  for (auto file_info_entry : files_) {
245  auto file_info = file_info_entry.second;
246  if (file_info->f) {
247  close(file_info->f);
248  file_info->f = nullptr;
249  }
250  delete file_info;
251  }
252  files_.clear();
253  fileIndex_.clear();
254 }
255 
256 void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) {
257  // if epochCeiling = -1 this means open from epoch file
258 
259  const bool dataExists = coreInit();
260  if (dataExists) {
261  if (epochOverride != -1) { // if opening at specified epoch
262  setEpoch(epochOverride);
263  }
264 
265  auto open_files_result = openFiles();
266  if (!open_files_result.compaction_status_file_name.empty()) {
267  resumeFileCompaction(open_files_result.compaction_status_file_name);
268  clearFileInfos();
269  open_files_result = openFiles();
270  CHECK(open_files_result.compaction_status_file_name.empty());
271  }
272 
273  /* Sort headerVec so that all HeaderInfos
274  * from a chunk will be grouped together
275  * and in order of increasing PageId
276  * - Version Epoch */
277  auto& header_vec = open_files_result.header_infos;
278  std::sort(header_vec.begin(), header_vec.end());
279 
280  /* Goal of next section is to find sequences in the
281  * sorted headerVec of the same ChunkId, which we
282  * can then initiate a FileBuffer with */
283 
284  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
285  if (header_vec.size() > 0) {
286  ChunkKey lastChunkKey = header_vec.begin()->chunkKey;
287  auto startIt = header_vec.begin();
288 
289  for (auto headerIt = header_vec.begin() + 1; headerIt != header_vec.end();
290  ++headerIt) {
291  if (headerIt->chunkKey != lastChunkKey) {
292  createBufferFromHeaders(lastChunkKey, startIt, headerIt);
293  lastChunkKey = headerIt->chunkKey;
294  startIt = headerIt;
295  }
296  }
297  // now need to insert last Chunk
298  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
299  }
300  nextFileId_ = open_files_result.max_file_id + 1;
301  rollOffOldData(epoch(), true /* only checkpoint if data is rolled off */);
302  incrementEpoch();
303  freePages();
304  } else {
305  boost::filesystem::path path(fileMgrBasePath_);
306  if (!boost::filesystem::create_directory(path)) {
307  LOG(FATAL) << "Could not create data directory: " << path;
308  }
310  if (epochOverride != -1) {
311  epoch_.floor(epochOverride);
312  epoch_.ceiling(epochOverride);
313  } else {
314  // These are default constructor values for epoch_, but resetting here for clarity
315  epoch_.floor(0);
316  epoch_.ceiling(0);
317  }
320  incrementEpoch();
321  }
322 
323  initializeNumThreads(num_reader_threads);
324  isFullyInitted_ = true;
325 }
326 
327 namespace {
329  size_t page_size,
330  size_t metadata_page_size,
331  size_t num_pages_per_metadata_file) {
332  return (file_size == (metadata_page_size * num_pages_per_metadata_file) &&
333  page_size == metadata_page_size);
334 }
335 } // namespace
336 
338  StorageStats storage_stats;
339  setDataAndMetadataFileStats(storage_stats);
340  if (isFullyInitted_) {
341  storage_stats.fragment_count = getFragmentCount();
342  }
343  return storage_stats;
344 }
345 
348  if (!isFullyInitted_) {
349  CHECK(!fileMgrBasePath_.empty());
350  boost::filesystem::path path(fileMgrBasePath_);
351  if (boost::filesystem::exists(path)) {
352  if (!boost::filesystem::is_directory(path)) {
353  LOG(FATAL) << "getStorageStats: Specified path '" << fileMgrBasePath_
354  << "' for table data is not a directory.";
355  }
356 
357  storage_stats.epoch = lastCheckpointedEpoch();
358  storage_stats.epoch_floor = epochFloor();
359  boost::filesystem::directory_iterator
360  endItr; // default construction yields past-the-end
361  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
362  ++fileIt) {
363  FileMetadata file_metadata = getMetadataForFile(fileIt);
364  if (file_metadata.is_data_file) {
365  if (is_metadata_file(file_metadata.file_size,
366  file_metadata.page_size,
369  storage_stats.metadata_file_count++;
370  storage_stats.total_metadata_file_size += file_metadata.file_size;
371  storage_stats.total_metadata_page_count += file_metadata.num_pages;
372  } else {
373  storage_stats.data_file_count++;
374  storage_stats.total_data_file_size += file_metadata.file_size;
375  storage_stats.total_data_page_count += file_metadata.num_pages;
376  }
377  }
378  }
379  }
380  } else {
381  storage_stats.epoch = lastCheckpointedEpoch();
382  storage_stats.epoch_floor = epochFloor();
383  storage_stats.total_free_metadata_page_count = 0;
384  storage_stats.total_free_data_page_count = 0;
385 
386  // We already initialized this table so take the faster path of walking through the
387  // FileInfo objects and getting metadata from there
388  for (const auto& file_info_entry : files_) {
389  const auto file_info = file_info_entry.second;
390  if (is_metadata_file(file_info->size(),
391  file_info->pageSize,
394  storage_stats.metadata_file_count++;
395  storage_stats.total_metadata_file_size +=
396  file_info->pageSize * file_info->numPages;
397  storage_stats.total_metadata_page_count += file_info->numPages;
398  storage_stats.total_free_metadata_page_count.value() +=
399  file_info->freePages.size();
400  } else {
401  storage_stats.data_file_count++;
402  storage_stats.total_data_file_size += file_info->pageSize * file_info->numPages;
403  storage_stats.total_data_page_count += file_info->numPages;
404  storage_stats.total_free_data_page_count.value() += file_info->freePages.size();
405  }
406  }
407  }
408 }
409 
410 uint32_t FileMgr::getFragmentCount() const {
412  std::set<int32_t> fragment_ids;
413  for (const auto& [chunk_key, file_buffer] : chunkIndex_) {
414  fragment_ids.emplace(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
415  }
416  return static_cast<uint32_t>(fragment_ids.size());
417 }
418 
420  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
421  std::vector<HeaderInfo>& headerVec) {
422  for (auto& file_future : file_futures) {
423  file_future.wait();
424  }
425  // concatenate the vectors after thread completes
426  for (auto& file_future : file_futures) {
427  auto tempHeaderVec = file_future.get();
428  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
429  }
430  file_futures.clear();
431 }
432 
433 void FileMgr::init(const std::string& dataPathToConvertFrom,
434  const int32_t epochOverride) {
435  int32_t converted_data_epoch = 0;
436  boost::filesystem::path path(dataPathToConvertFrom);
437  if (boost::filesystem::exists(path)) {
438  if (!boost::filesystem::is_directory(path)) {
439  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
440  }
442 
443  if (epochOverride != -1) { // if opening at previous epoch
444  setEpoch(epochOverride);
445  }
446 
447  boost::filesystem::directory_iterator
448  endItr; // default construction yields past-the-end
449  int32_t maxFileId = -1;
450  int32_t fileCount = 0;
451  int32_t threadCount = std::thread::hardware_concurrency();
452  std::vector<HeaderInfo> headerVec;
453  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
454  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
455  FileMetadata fileMetadata = getMetadataForFile(fileIt);
456  if (fileMetadata.is_data_file) {
457  maxFileId = std::max(maxFileId, fileMetadata.file_id);
458  file_futures.emplace_back(std::async(std::launch::async, [fileMetadata, this] {
459  std::vector<HeaderInfo> tempHeaderVec;
460  openExistingFile(fileMetadata.file_path,
461  fileMetadata.file_id,
462  fileMetadata.page_size,
463  fileMetadata.num_pages,
464  tempHeaderVec);
465  return tempHeaderVec;
466  }));
467  fileCount++;
468  if (fileCount % threadCount) {
469  processFileFutures(file_futures, headerVec);
470  }
471  }
472  }
473 
474  if (file_futures.size() > 0) {
475  processFileFutures(file_futures, headerVec);
476  }
477 
478  /* Sort headerVec so that all HeaderInfos
479  * from a chunk will be grouped together
480  * and in order of increasing PageId
481  * - Version Epoch */
482 
483  std::sort(headerVec.begin(), headerVec.end());
484 
485  /* Goal of next section is to find sequences in the
486  * sorted headerVec of the same ChunkId, which we
487  * can then initiate a FileBuffer with */
488 
489  if (headerVec.size() > 0) {
490  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
491  auto startIt = headerVec.begin();
492 
493  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
494  ++headerIt) {
495  if (headerIt->chunkKey != lastChunkKey) {
496  FileMgr* c_fm_ =
497  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
498  CHECK(c_fm_);
499  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerIt);
500  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
501  destBuf->syncEncoder(srcBuf);
502  destBuf->setSize(srcBuf->size());
503  destBuf->setDirty(); // this needs to be set to force writing out metadata
504  // files from "checkpoint()" call
505 
506  size_t totalNumPages = srcBuf->getMultiPage().size();
507  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
508  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
509  Page destPage = c_fm_->requestFreePage(
510  srcBuf->pageSize(),
511  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
512  MultiPage multiPage(srcBuf->pageSize());
513  multiPage.push(destPage, converted_data_epoch);
514  destBuf->multiPages_.push_back(multiPage);
515  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
516  copyPage(
517  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
518  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
519  }
520  lastChunkKey = headerIt->chunkKey;
521  startIt = headerIt;
522  }
523  }
524 
525  // now need to insert last Chunk
526  FileMgr* c_fm_ =
527  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
528  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerVec.end());
529  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
530  destBuf->syncEncoder(srcBuf);
531  destBuf->setSize(srcBuf->size());
532  destBuf->setDirty(); // this needs to be set to write out metadata file from the
533  // "checkpoint()" call
534 
535  size_t totalNumPages = srcBuf->getMultiPage().size();
536  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
537  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
538  Page destPage = c_fm_->requestFreePage(
539  srcBuf->pageSize(),
540  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
541  MultiPage multiPage(srcBuf->pageSize());
542  multiPage.push(destPage, converted_data_epoch);
543  destBuf->multiPages_.push_back(multiPage);
544  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
545  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
546  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
547  }
548  }
549  nextFileId_ = maxFileId + 1;
550  } else {
551  if (!boost::filesystem::create_directory(path)) {
552  LOG(FATAL) << "Specified path does not exist: " << path;
553  }
554  }
555  isFullyInitted_ = true;
556 }
557 
559  for (auto& [idx, file_info] : files_) {
560  if (file_info->f) {
561  close(file_info->f);
562  file_info->f = nullptr;
563  }
564  }
565 
566  if (DBMetaFile_) {
568  DBMetaFile_ = nullptr;
569  }
570 
571  if (epochFile_) {
572  close(epochFile_);
573  epochFile_ = nullptr;
574  }
575 }
576 
580  /* rename for later deletion the directory containing table related data */
582 }
583 
584 void FileMgr::copyPage(Page& srcPage,
585  FileMgr* destFileMgr,
586  Page& destPage,
587  const size_t reservedHeaderSize,
588  const size_t numBytes,
589  const size_t offset) {
590  CHECK(offset + numBytes <= page_size_);
591  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
592  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
593  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
594 
595  size_t bytesRead = srcFileInfo->read(
596  srcPage.pageNum * page_size_ + offset + reservedHeaderSize, numBytes, buffer);
597  CHECK(bytesRead == numBytes);
598  size_t bytesWritten = destFileInfo->write(
599  destPage.pageNum * page_size_ + offset + reservedHeaderSize, numBytes, buffer);
600  CHECK(bytesWritten == numBytes);
601  ::free(buffer);
602 }
603 
604 void FileMgr::createEpochFile(const std::string& epochFileName) {
605  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
606  if (boost::filesystem::exists(epochFilePath)) {
607  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
608  }
609  epochFile_ = create(epochFilePath, sizeof(Epoch::byte_size()));
610  // Write out current epoch to file - which if this
611  // function is being called should be 0
613 }
614 
615 int32_t FileMgr::openAndReadLegacyEpochFile(const std::string& epochFileName) {
616  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
617  if (!boost::filesystem::exists(epochFilePath)) {
618  return 0;
619  }
620 
621  if (!boost::filesystem::is_regular_file(epochFilePath)) {
622  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
623  }
624  if (boost::filesystem::file_size(epochFilePath) < 4) {
625  LOG(FATAL) << "Epoch file `" << epochFilePath
626  << "` is not sized properly (current size: "
627  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
628  }
629  FILE* legacyEpochFile = open(epochFilePath);
630  int32_t epoch;
631  read(legacyEpochFile, 0, sizeof(int32_t), (int8_t*)&epoch, epochFilePath);
632  close(legacyEpochFile);
633  return epoch;
634 }
635 
636 void FileMgr::openAndReadEpochFile(const std::string& epochFileName) {
637  if (!epochFile_) { // Check to see if already open
638  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
639  if (!boost::filesystem::exists(epochFilePath)) {
640  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
641  }
642  if (!boost::filesystem::is_regular_file(epochFilePath)) {
643  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
644  }
645  if (boost::filesystem::file_size(epochFilePath) != Epoch::byte_size()) {
646  LOG(FATAL) << "Epoch file `" << epochFilePath
647  << "` is not sized properly (current size: "
648  << boost::filesystem::file_size(epochFilePath)
649  << ", expected size: " << Epoch::byte_size() << ")";
650  }
651  epochFile_ = open(epochFilePath);
652  }
653  read(epochFile_, 0, Epoch::byte_size(), epoch_.storage_ptr(), epochFileName);
654 }
655 
657  CHECK(epochFile_);
659  int32_t status = fflush(epochFile_);
660  CHECK(status == 0) << "Could not flush epoch file to disk";
661 #ifdef __APPLE__
662  status = fcntl(fileno(epochFile_), 51);
663 #else
664  status = heavyai::fsync(fileno(epochFile_));
665 #endif
666  CHECK(status == 0) << "Could not sync epoch file to disk";
667  epochIsCheckpointed_ = true;
668 }
669 
670 void FileMgr::freePagesBeforeEpoch(const int32_t min_epoch) {
672  freePagesBeforeEpochUnlocked(min_epoch, chunkIndex_.begin(), chunkIndex_.end());
673 }
674 
676  const int32_t min_epoch,
677  const ChunkKeyToChunkMap::iterator lower_bound,
678  const ChunkKeyToChunkMap::iterator upper_bound) {
679  for (auto chunkIt = lower_bound; chunkIt != upper_bound; ++chunkIt) {
680  chunkIt->second->freePagesBeforeEpoch(min_epoch);
681  }
682 }
683 
684 void FileMgr::rollOffOldData(const int32_t epoch_ceiling, const bool should_checkpoint) {
685  if (maxRollbackEpochs_ >= 0) {
686  auto min_epoch = std::max(epoch_ceiling - maxRollbackEpochs_, epoch_.floor());
687  if (min_epoch > epoch_.floor()) {
688  freePagesBeforeEpoch(min_epoch);
689  epoch_.floor(min_epoch);
690  if (should_checkpoint) {
691  checkpoint();
692  }
693  }
694  }
695 }
696 
697 std::string FileMgr::describeSelf() const {
698  stringstream ss;
699  ss << "table (" << fileMgrKey_.first << ", " << fileMgrKey_.second << ")";
700  return ss.str();
701 }
702 
704  VLOG(2) << "Checkpointing " << describeSelf() << " epoch: " << epoch();
706  rollOffOldData(epoch(), false /* shouldCheckpoint */);
707  syncFilesToDisk();
709  incrementEpoch();
710  freePages();
711 }
712 
714  const size_t page_size,
715  const size_t num_bytes) {
717  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
718  << "Chunk already exists for key: " << show_chunk(key);
719  return createBufferUnlocked(key, page_size, num_bytes);
720 }
721 
722 // Assumes checks for pre-existing key have already occured.
724  const size_t page_size,
725  const size_t num_bytes) {
726  size_t actual_page_size = page_size;
727  if (actual_page_size == 0) {
728  actual_page_size = page_size_;
729  }
730  chunkIndex_[key] = allocateBuffer(actual_page_size, key, num_bytes);
731  return (chunkIndex_[key]);
732 }
733 
735  const ChunkKey& key,
736  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
737  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
739  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
740  << "Chunk already exists for key: " << show_chunk(key);
741  chunkIndex_[key] = allocateBuffer(key, headerStartIt, headerEndIt);
742  return (chunkIndex_[key]);
743 }
744 
747  return chunkIndex_.find(key) != chunkIndex_.end();
748 }
749 
750 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
752  auto chunk_it = chunkIndex_.find(key);
753  CHECK(chunk_it != chunkIndex_.end())
754  << "Chunk does not exist for key: " << show_chunk(key);
755  deleteBufferUnlocked(chunk_it, purge);
756 }
757 
758 ChunkKeyToChunkMap::iterator FileMgr::deleteBufferUnlocked(
759  const ChunkKeyToChunkMap::iterator chunk_it,
760  const bool purge) {
761  if (purge) {
762  chunk_it->second->freePages();
763  }
764  delete chunk_it->second;
765  return chunkIndex_.erase(chunk_it);
766 }
767 
768 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
770  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
771  if (chunkIt == chunkIndex_.end()) {
772  return; // should we throw?
773  }
774  while (chunkIt != chunkIndex_.end() &&
775  std::search(chunkIt->first.begin(),
776  chunkIt->first.begin() + keyPrefix.size(),
777  keyPrefix.begin(),
778  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
779  deleteBufferUnlocked(chunkIt++, purge);
780  }
781 }
782 
783 FileBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
785  return getBufferUnlocked(key, num_bytes);
786 }
787 
789  const size_t num_bytes) const {
790  auto chunk_it = chunkIndex_.find(key);
791  CHECK(chunk_it != chunkIndex_.end()) << "Chunk does not exist: " << show_chunk(key);
792  return chunk_it->second;
793 }
794 
796  AbstractBuffer* destBuffer,
797  const size_t numBytes) {
798  // reads chunk specified by ChunkKey into AbstractBuffer provided by
799  // destBuffer
800  CHECK(!destBuffer->isDirty())
801  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
802  << show_chunk(key);
803  AbstractBuffer* chunk = getBuffer(key);
804  // chunk's size is either specified in function call with numBytes or we
805  // just look at pageSize * numPages in FileBuffer
806  if (numBytes > 0 && numBytes > chunk->size()) {
807  LOG(FATAL) << "Chunk retrieved for key `" << show_chunk(key) << "` is smaller ("
808  << chunk->size() << ") than number of bytes requested (" << numBytes
809  << ")";
810  }
811  chunk->copyTo(destBuffer, numBytes);
812 }
813 
815  AbstractBuffer* srcBuffer,
816  const size_t numBytes) {
817  auto chunk = getOrCreateBuffer(key);
818  size_t oldChunkSize = chunk->size();
819  // write the buffer's data to the Chunk
820  size_t newChunkSize = (numBytes == 0) ? srcBuffer->size() : numBytes;
821  if (chunk->isDirty()) {
822  // multiple appends are allowed,
823  // but only single update is allowed
824  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
825  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
826  "for key: "
827  << show_chunk(key);
828  }
829  }
830  CHECK(srcBuffer->isDirty()) << "putBuffer expects a dirty buffer";
831  if (srcBuffer->isUpdated()) {
832  // chunk size is not changed when fixed rows are updated or are marked as deleted.
833  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
834  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
835  // For varlen update, it takes another route via fragmenter using disk-level buffer.
836  if (0 == numBytes && !chunk->isDirty()) {
837  chunk->setSize(newChunkSize);
838  }
839  //@todo use dirty flags to only flush pages of chunk that need to
840  // be flushed
841  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
842  newChunkSize,
843  0,
844  srcBuffer->getType(),
845  srcBuffer->getDeviceId());
846  } else if (srcBuffer->isAppended()) {
847  CHECK_LT(oldChunkSize, newChunkSize);
848  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
849  newChunkSize - oldChunkSize,
850  srcBuffer->getType(),
851  srcBuffer->getDeviceId());
852  } else {
853  // If dirty buffer comes in unmarked, it must be empty.
854  // Encoder sync is still required to flush the metadata.
855  CHECK(numBytes == 0)
856  << "Dirty buffer with size > 0 must be marked as isAppended() or isUpdated()";
857  }
858  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
859  //@todo commenting out line above will make sure this metadata is set
860  // but will trigger error on fetch chunk
861  srcBuffer->clearDirtyBits();
862  chunk->syncEncoder(srcBuffer);
863  return chunk;
864 }
865 
866 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
867  LOG(FATAL) << "Operation not supported";
868  return nullptr; // satisfy return-type warning
869 }
870 
872  LOG(FATAL) << "Operation not supported";
873 }
874 
875 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
876  std::lock_guard<std::mutex> lock(getPageMutex_);
877 
878  auto candidateFiles = fileIndex_.equal_range(pageSize);
879  int32_t pageNum = -1;
880  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
881  FileInfo* fileInfo = files_.at(fileIt->second);
882  pageNum = fileInfo->getFreePage();
883  if (pageNum != -1) {
884  return (Page(fileInfo->fileId, pageNum));
885  }
886  }
887  // if here then we need to add a file
888  FileInfo* fileInfo;
889  if (isMetadata) {
890  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
891  } else {
892  fileInfo = createFile(pageSize, num_pages_per_data_file_);
893  }
894  pageNum = fileInfo->getFreePage();
895  CHECK(pageNum != -1);
896  return (Page(fileInfo->fileId, pageNum));
897 }
898 
899 void FileMgr::requestFreePages(size_t numPagesRequested,
900  size_t pageSize,
901  std::vector<Page>& pages,
902  const bool isMetadata) {
903  // not used currently
904  // @todo add method to FileInfo to get more than one page
905  std::lock_guard<std::mutex> lock(getPageMutex_);
906  auto candidateFiles = fileIndex_.equal_range(pageSize);
907  size_t numPagesNeeded = numPagesRequested;
908  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
909  FileInfo* fileInfo = files_.at(fileIt->second);
910  int32_t pageNum;
911  do {
912  pageNum = fileInfo->getFreePage();
913  if (pageNum != -1) {
914  pages.emplace_back(fileInfo->fileId, pageNum);
915  numPagesNeeded--;
916  }
917  } while (pageNum != -1 && numPagesNeeded > 0);
918  if (numPagesNeeded == 0) {
919  break;
920  }
921  }
922  while (numPagesNeeded > 0) {
923  FileInfo* fileInfo;
924  if (isMetadata) {
925  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
926  } else {
927  fileInfo = createFile(pageSize, num_pages_per_data_file_);
928  }
929  int32_t pageNum;
930  do {
931  pageNum = fileInfo->getFreePage();
932  if (pageNum != -1) {
933  pages.emplace_back(fileInfo->fileId, pageNum);
934  numPagesNeeded--;
935  }
936  } while (pageNum != -1 && numPagesNeeded > 0);
937  if (numPagesNeeded == 0) {
938  break;
939  }
940  }
941  CHECK(pages.size() == numPagesRequested);
942 }
943 
944 FileInfo* FileMgr::openExistingFile(const std::string& path,
945  const int fileId,
946  const size_t pageSize,
947  const size_t numPages,
948  std::vector<HeaderInfo>& headerVec) {
949  FILE* f = open(path);
950  FileInfo* fInfo = new FileInfo(
951  this, fileId, f, pageSize, numPages, path, false); // false means don't init file
952 
953  fInfo->openExistingFile(headerVec);
955  files_[fileId] = fInfo;
956  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
957  return fInfo;
958 }
959 
960 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
961  // check arguments
962  if (pageSize == 0 || numPages == 0) {
963  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
964  }
965 
966  // create the new file
967  auto [f, file_path] = create(fileMgrBasePath_,
968  nextFileId_,
969  pageSize,
970  numPages); // TM: not sure if I like naming scheme here -
971  // should be in separate namespace?
972  CHECK(f);
973 
974  // instantiate a new FileInfo for the newly created file
975  int32_t fileId = nextFileId_++;
976  FileInfo* fInfo = new FileInfo(
977  this, fileId, f, pageSize, numPages, file_path, true); // true means init file
978  CHECK(fInfo);
979 
981  // update file manager data structures
982  files_[fileId] = fInfo;
983  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
984 
985  return fInfo;
986 }
987 
988 FILE* FileMgr::getFileForFileId(const int32_t fileId) {
989  CHECK(fileId >= 0);
990  CHECK(files_.find(fileId) != files_.end());
991  return files_.at(fileId)->f;
992 }
993 
996  auto chunk_it = chunkIndex_.lower_bound(key_prefix);
997  if (chunk_it == chunkIndex_.end()) {
998  return false;
999  } else {
1000  auto it_pair =
1001  std::mismatch(key_prefix.begin(), key_prefix.end(), chunk_it->first.begin());
1002  return it_pair.first == key_prefix.end();
1003  }
1004 }
1005 
1007  const ChunkKey& keyPrefix) {
1009  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
1010  if (chunkIt == chunkIndex_.end()) {
1011  return; // throw?
1012  }
1013  while (chunkIt != chunkIndex_.end() &&
1014  std::search(chunkIt->first.begin(),
1015  chunkIt->first.begin() + keyPrefix.size(),
1016  keyPrefix.begin(),
1017  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
1018  if (chunkIt->second->hasEncoder()) {
1019  auto chunk_metadata = std::make_shared<ChunkMetadata>();
1020  chunkIt->second->encoder_->getMetadata(chunk_metadata);
1021  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
1022  }
1023  chunkIt++;
1024  }
1025 }
1026 
1029  const auto& chunkIt = chunkIndex_.find(chunkKey);
1030  if (chunkIt != chunkIndex_.end()) {
1031  return chunkIt->second->numMetadataPages();
1032  } else {
1033  throw std::runtime_error("Chunk was not found.");
1034  }
1035 }
1036 
1037 int32_t FileMgr::getDBVersion() const {
1038  return gfm_->getDBVersion();
1039 }
1040 
1042  return gfm_->getDBConvert();
1043 }
1044 
1047 
1048  if (db_version_ > getDBVersion()) {
1049  LOG(FATAL) << "DB forward compatibility is not supported. Version of HeavyDB "
1050  "software used is older than the version of DB being read: "
1051  << db_version_;
1052  }
1054  // new system, or we are moving forward versions
1055  // system wide migration would go here if required
1057  return;
1058  }
1059 }
1060 
1061 int32_t FileMgr::readVersionFromDisk(const std::string& versionFileName) const {
1062  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1063  if (!boost::filesystem::exists(versionFilePath)) {
1064  return -1;
1065  }
1066  if (!boost::filesystem::is_regular_file(versionFilePath)) {
1067  return -1;
1068  }
1069  if (boost::filesystem::file_size(versionFilePath) < 4) {
1070  return -1;
1071  }
1072  FILE* versionFile = open(versionFilePath);
1073  int32_t version;
1074  read(versionFile, 0, sizeof(int32_t), (int8_t*)&version, versionFilePath);
1075  close(versionFile);
1076  return version;
1077 }
1078 
1079 void FileMgr::writeAndSyncVersionToDisk(const std::string& versionFileName,
1080  const int32_t version) {
1081  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1082  FILE* versionFile;
1083  if (boost::filesystem::exists(versionFilePath)) {
1084  int32_t oldVersion = readVersionFromDisk(versionFileName);
1085  LOG(INFO) << "Storage version file `" << versionFilePath
1086  << "` already exists, its current version is " << oldVersion;
1087  versionFile = open(versionFilePath);
1088  } else {
1089  versionFile = create(versionFilePath, sizeof(int32_t));
1090  }
1091  write(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1092  int32_t status = fflush(versionFile);
1093  if (status != 0) {
1094  LOG(FATAL) << "Could not flush version file " << versionFilePath << " to disk";
1095  }
1096 #ifdef __APPLE__
1097  status = fcntl(fileno(epochFile_), 51);
1098 #else
1099  status = heavyai::fsync(fileno(versionFile));
1100 #endif
1101  if (status != 0) {
1102  LOG(FATAL) << "Could not sync version file " << versionFilePath << " to disk";
1103  }
1104  close(versionFile);
1105 }
1106 
1108  const std::string versionFilePath(fileMgrBasePath_ + "/" + FILE_MGR_VERSION_FILENAME);
1109  LOG(INFO) << "Migrating file format version from 0 to 1 for `" << versionFilePath;
1114  int32_t migrationCompleteVersion = 1;
1115  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migrationCompleteVersion);
1116 }
1117 
1119  LOG(INFO) << "Migrating file format version from 1 to 2";
1121  constexpr int32_t migration_complete_version{2};
1122  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migration_complete_version);
1123 }
1124 
1125 void FileMgr::renameAndSymlinkLegacyFiles(const std::string& table_data_dir) {
1126  std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
1127  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
1128  it++) {
1129  const auto old_path = boost::filesystem::canonical(it->path());
1130  if (boost::filesystem::is_regular_file(it->status()) &&
1131  old_path.extension().string() == kLegacyDataFileExtension) {
1132  auto new_path = old_path;
1133  new_path.replace_extension(DATA_FILE_EXT);
1134  old_to_new_paths[old_path] = new_path;
1135  }
1136  }
1137  for (const auto& [old_path, new_path] : old_to_new_paths) {
1138  boost::filesystem::rename(old_path, new_path);
1139  LOG(INFO) << "Rebrand migration: Renamed " << old_path << " to " << new_path;
1140  boost::filesystem::create_symlink(new_path.filename(), old_path);
1141  LOG(INFO) << "Rebrand migration: Added symlink from " << old_path << " to "
1142  << new_path.filename();
1143  }
1144 }
1145 
1149  fileMgrVersion_ = 0;
1151  } else if (fileMgrVersion_ > latestFileMgrVersion_) {
1152  LOG(FATAL)
1153  << "Table storage forward compatibility is not supported. Version of HeavyDB "
1154  "software used is older than the version of table being read: "
1155  << fileMgrVersion_;
1156  }
1157 
1160  switch (fileMgrVersion_) {
1161  case 0: {
1163  break;
1164  }
1165  case 1: {
1167  break;
1168  }
1169  default: {
1170  UNREACHABLE();
1171  }
1172  }
1173  fileMgrVersion_++;
1174  }
1175  }
1176 }
1177 
1178 /*
1179  * @brief sets the epoch to a user-specified value
1180  *
1181  * With the introduction of optional capped history on files, the possibility of
1182  * multiple successive rollbacks to earlier epochs means that we cannot rely on
1183  * the maxRollbackEpochs_ variable alone (initialized from a value stored in Catalog) to
1184  * guarantee that we can set an epoch at any given value. This function checks the
1185  * user-specified epoch value to ensure it is both not higher than the last checkpointed
1186  * epoch AND it is >= the epoch floor, which on an uncapped value will be the epoch the
1187  * table table was created at (default 0), and for capped tables, the lowest epoch for
1188  * which we have complete data, now materialized in the epoch metadata itself. */
1189 
1190 void FileMgr::setEpoch(const int32_t newEpoch) {
1191  if (newEpoch < epoch_.floor()) {
1192  std::stringstream error_message;
1193  error_message << "Cannot set epoch for " << describeSelf()
1194  << " lower than the minimum rollback epoch (" << epoch_.floor() << ").";
1195  throw std::runtime_error(error_message.str());
1196  }
1197  epoch_.ceiling(newEpoch);
1199 }
1200 
1201 void FileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
1202  std::unique_lock<heavyai::shared_mutex> lock(mutex_free_page_);
1203  free_pages_.push_back(page);
1204 }
1205 
1206 void FileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t table_id) {
1207  UNREACHABLE();
1208 }
1209 
1215 void FileMgr::resumeFileCompaction(const std::string& status_file_name) {
1216  if (status_file_name == COPY_PAGES_STATUS) {
1217  // Delete status file and restart data compaction process
1218  auto file_path = getFilePath(status_file_name);
1219  CHECK(boost::filesystem::exists(file_path));
1220  boost::filesystem::remove(file_path);
1221  compactFiles();
1222  } else if (status_file_name == UPDATE_PAGE_VISIBILITY_STATUS) {
1223  // Execute second and third phases of data compaction
1225  auto page_mappings = readPageMappingsFromStatusFile();
1226  updateMappedPagesVisibility(page_mappings);
1228  deleteEmptyFiles();
1229  } else if (status_file_name == DELETE_EMPTY_FILES_STATUS) {
1230  // Execute last phase of data compaction
1232  deleteEmptyFiles();
1233  } else {
1234  UNREACHABLE() << "Unexpected status file name: " << status_file_name;
1235  }
1236 }
1237 
1267  if (files_.empty()) {
1268  return;
1269  }
1270 
1271  auto copy_pages_status_file_path = getFilePath(COPY_PAGES_STATUS);
1272  CHECK(!boost::filesystem::exists(copy_pages_status_file_path));
1273  std::ofstream status_file(copy_pages_status_file_path.string(),
1274  std::ios::out | std::ios::binary);
1275  status_file.close();
1276 
1277  std::vector<PageMapping> page_mappings;
1278  std::set<Page> touched_pages;
1279  std::set<size_t> page_sizes;
1280  for (auto [file_id, file_info] : files_) {
1281  page_sizes.emplace(file_info->pageSize);
1282  }
1283  for (auto page_size : page_sizes) {
1284  sortAndCopyFilePagesForCompaction(page_size, page_mappings, touched_pages);
1285  }
1286 
1287  writePageMappingsToStatusFile(page_mappings);
1289 
1290  updateMappedPagesVisibility(page_mappings);
1292 
1293  deleteEmptyFiles();
1294 }
1295 
1303  std::vector<PageMapping>& page_mappings,
1304  std::set<Page>& touched_pages) {
1305  std::vector<FileInfo*> sorted_file_infos;
1306  auto range = fileIndex_.equal_range(page_size);
1307  for (auto it = range.first; it != range.second; it++) {
1308  sorted_file_infos.emplace_back(files_.at(it->second));
1309  }
1310  if (sorted_file_infos.empty()) {
1311  return;
1312  }
1313 
1314  // Sort file infos in ascending order of free pages count i.e. from files with
1315  // the least number of free pages to those with the highest number of free pages.
1316  std::sort(sorted_file_infos.begin(),
1317  sorted_file_infos.end(),
1318  [](const FileInfo* file_1, const FileInfo* file_2) {
1319  return file_1->freePages.size() < file_2->freePages.size();
1320  });
1321 
1322  size_t destination_index = 0, source_index = sorted_file_infos.size() - 1;
1323 
1324  // For page copy destinations, skip files without free pages.
1325  while (destination_index < source_index &&
1326  sorted_file_infos[destination_index]->freePages.empty()) {
1327  destination_index++;
1328  }
1329 
1330  // For page copy sources, skip files with only free pages.
1331  while (destination_index < source_index &&
1332  sorted_file_infos[source_index]->freePages.size() ==
1333  sorted_file_infos[source_index]->numPages) {
1334  source_index--;
1335  }
1336 
1337  std::set<size_t> source_used_pages;
1338  CHECK(destination_index <= source_index);
1339 
1340  // Get the total number of free pages available for compaction
1341  int64_t total_free_pages{0};
1342  for (size_t i = destination_index; i <= source_index; i++) {
1343  total_free_pages += sorted_file_infos[i]->numFreePages();
1344  }
1345 
1346  while (destination_index < source_index) {
1347  if (source_used_pages.empty()) {
1348  // Populate source_used_pages with only used pages in the source file.
1349  auto source_file_info = sorted_file_infos[source_index];
1350  auto& free_pages = source_file_info->freePages;
1351  for (size_t page_num = 0; page_num < source_file_info->numPages; page_num++) {
1352  if (free_pages.find(page_num) == free_pages.end()) {
1353  source_used_pages.emplace(page_num);
1354  }
1355  }
1356 
1357  // Free pages of current source file will not be copy destinations
1358  total_free_pages -= source_file_info->numFreePages();
1359  }
1360 
1361  // Exit early if there are not enough free pages to empty the next file
1362  if (total_free_pages - static_cast<int64_t>(source_used_pages.size()) < 0) {
1363  return;
1364  }
1365 
1366  // Copy pages from source files to destination files
1367  auto dest_file_info = sorted_file_infos[destination_index];
1368  while (!source_used_pages.empty() && !dest_file_info->freePages.empty()) {
1369  // Get next page to copy
1370  size_t source_page_num = *source_used_pages.begin();
1371  source_used_pages.erase(source_page_num);
1372 
1373  Page source_page{sorted_file_infos[source_index]->fileId, source_page_num};
1374  copySourcePageForCompaction(source_page,
1375  sorted_file_infos[destination_index],
1376  page_mappings,
1377  touched_pages);
1378  total_free_pages--;
1379  }
1380 
1381  if (source_used_pages.empty()) {
1382  source_index--;
1383  }
1384 
1385  if (dest_file_info->freePages.empty()) {
1386  destination_index++;
1387  }
1388  }
1389 }
1390 
1398  FileInfo* destination_file_info,
1399  std::vector<PageMapping>& page_mappings,
1400  std::set<Page>& touched_pages) {
1401  size_t destination_page_num = destination_file_info->getFreePage();
1402  CHECK_NE(destination_page_num, static_cast<size_t>(-1));
1403  Page destination_page{destination_file_info->fileId, destination_page_num};
1404 
1405  // Assert that the same pages are not copied or overridden multiple times
1406  CHECK(touched_pages.find(source_page) == touched_pages.end());
1407  touched_pages.emplace(source_page);
1408 
1409  CHECK(touched_pages.find(destination_page) == touched_pages.end());
1410  touched_pages.emplace(destination_page);
1411 
1412  auto header_size = copyPageWithoutHeaderSize(source_page, destination_page);
1413  page_mappings.emplace_back(static_cast<size_t>(source_page.fileId),
1414  source_page.pageNum,
1415  header_size,
1416  static_cast<size_t>(destination_page.fileId),
1417  destination_page.pageNum);
1418 }
1419 
1427 int32_t FileMgr::copyPageWithoutHeaderSize(const Page& source_page,
1428  const Page& destination_page) {
1429  FileInfo* source_file_info = files_.at(source_page.fileId);
1430  CHECK(source_file_info);
1431  CHECK_EQ(source_file_info->fileId, source_page.fileId);
1432 
1433  FileInfo* destination_file_info = files_.at(destination_page.fileId);
1434  CHECK(destination_file_info);
1435  CHECK_EQ(destination_file_info->fileId, destination_page.fileId);
1436  CHECK_EQ(source_file_info->pageSize, destination_file_info->pageSize);
1437 
1438  auto page_size = source_file_info->pageSize;
1439  auto buffer = std::make_unique<int8_t[]>(page_size);
1440  size_t bytes_read =
1441  source_file_info->read(source_page.pageNum * page_size, page_size, buffer.get());
1442  CHECK_EQ(page_size, bytes_read);
1443 
1444  auto header_size_offset = sizeof(int32_t);
1445  size_t bytes_written = destination_file_info->write(
1446  (destination_page.pageNum * page_size) + header_size_offset,
1447  page_size - header_size_offset,
1448  buffer.get() + header_size_offset);
1449  CHECK_EQ(page_size - header_size_offset, bytes_written);
1450  return reinterpret_cast<int32_t*>(buffer.get())[0];
1451 }
1452 
1457 void FileMgr::updateMappedPagesVisibility(const std::vector<PageMapping>& page_mappings) {
1458  for (const auto& page_mapping : page_mappings) {
1459  auto destination_file = files_.at(page_mapping.destination_file_id);
1460 
1461  // Set destination page header size
1462  auto header_size = page_mapping.source_page_header_size;
1463  CHECK_GT(header_size, 0);
1464  destination_file->write(
1465  page_mapping.destination_page_num * destination_file->pageSize,
1466  sizeof(PageHeaderSizeType),
1467  reinterpret_cast<int8_t*>(&header_size));
1468  auto source_file = files_.at(page_mapping.source_file_id);
1469 
1470  // Free source page
1471  PageHeaderSizeType free_page_header_size{0};
1472  source_file->write(page_mapping.source_page_num * source_file->pageSize,
1473  sizeof(PageHeaderSizeType),
1474  reinterpret_cast<int8_t*>(&free_page_header_size));
1475  source_file->freePageDeferred(page_mapping.source_page_num);
1476  }
1477 
1478  for (auto file_info_entry : files_) {
1479  int32_t status = file_info_entry.second->syncToDisk();
1480  if (status != 0) {
1481  LOG(FATAL) << "Could not sync file to disk";
1482  }
1483  }
1484 }
1485 
1491  for (auto [file_id, file_info] : files_) {
1492  CHECK_EQ(file_id, file_info->fileId);
1493  if (file_info->freePages.size() == file_info->numPages) {
1494  fclose(file_info->f);
1495  file_info->f = nullptr;
1496  auto file_path = get_data_file_path(fileMgrBasePath_, file_id, file_info->pageSize);
1497  boost::filesystem::remove(get_legacy_data_file_path(file_path));
1498  boost::filesystem::remove(file_path);
1499  }
1500  }
1501 
1502  auto status_file_path = getFilePath(DELETE_EMPTY_FILES_STATUS);
1503  CHECK(boost::filesystem::exists(status_file_path));
1504  boost::filesystem::remove(status_file_path);
1505 }
1506 
1513  const std::vector<PageMapping>& page_mappings) {
1514  auto file_path = getFilePath(COPY_PAGES_STATUS);
1515  CHECK(boost::filesystem::exists(file_path));
1516  CHECK(boost::filesystem::is_empty(file_path));
1517  std::ofstream status_file{file_path.string(), std::ios::out | std::ios::binary};
1518  int64_t page_mappings_count = page_mappings.size();
1519  status_file.write(reinterpret_cast<const char*>(&page_mappings_count), sizeof(int64_t));
1520  status_file.write(reinterpret_cast<const char*>(page_mappings.data()),
1521  page_mappings_count * sizeof(PageMapping));
1522  status_file.close();
1523 }
1524 
1528 std::vector<PageMapping> FileMgr::readPageMappingsFromStatusFile() {
1529  auto file_path = getFilePath(UPDATE_PAGE_VISIBILITY_STATUS);
1530  CHECK(boost::filesystem::exists(file_path));
1531  std::ifstream status_file{file_path.string(),
1532  std::ios::in | std::ios::binary | std::ios::ate};
1533  CHECK(status_file.is_open());
1534  size_t file_size = status_file.tellg();
1535  status_file.seekg(0, std::ios::beg);
1536  CHECK_GE(file_size, sizeof(int64_t));
1537 
1538  int64_t page_mappings_count;
1539  status_file.read(reinterpret_cast<char*>(&page_mappings_count), sizeof(int64_t));
1540  auto page_mappings_byte_size = file_size - sizeof(int64_t);
1541  CHECK_EQ(page_mappings_byte_size % sizeof(PageMapping), static_cast<size_t>(0));
1542  CHECK_EQ(static_cast<size_t>(page_mappings_count),
1543  page_mappings_byte_size / sizeof(PageMapping));
1544 
1545  std::vector<PageMapping> page_mappings(page_mappings_count);
1546  status_file.read(reinterpret_cast<char*>(page_mappings.data()),
1547  page_mappings_byte_size);
1548  status_file.close();
1549  return page_mappings;
1550 }
1551 
1555 void FileMgr::renameCompactionStatusFile(const char* const from_status,
1556  const char* const to_status) {
1557  auto from_status_file_path = getFilePath(from_status);
1558  auto to_status_file_path = getFilePath(to_status);
1559  CHECK(boost::filesystem::exists(from_status_file_path));
1560  CHECK(!boost::filesystem::exists(to_status_file_path));
1561  boost::filesystem::rename(from_status_file_path, to_status_file_path);
1562 }
1563 
1564 // Methods that enable override of number of pages per data/metadata files
1565 // for use in unit tests.
1566 void FileMgr::setNumPagesPerDataFile(size_t num_pages) {
1567  num_pages_per_data_file_ = num_pages;
1568 }
1569 
1570 void FileMgr::setNumPagesPerMetadataFile(size_t num_pages) {
1571  num_pages_per_metadata_file_ = num_pages;
1572 }
1573 
1576  for (auto file_info_entry : files_) {
1577  int32_t status = file_info_entry.second->syncToDisk();
1578  CHECK(status == 0) << "Could not sync file to disk";
1579  }
1580 }
1581 
1582 void FileMgr::initializeNumThreads(size_t num_reader_threads) {
1583  // # of threads is based on # of cores on the host
1584  size_t num_hardware_based_threads = std::thread::hardware_concurrency();
1585  if (num_reader_threads == 0 || num_reader_threads > num_hardware_based_threads) {
1586  // # of threads has not been defined by user
1587  num_reader_threads_ = num_hardware_based_threads;
1588  } else {
1589  num_reader_threads_ = num_reader_threads;
1590  }
1591 }
1592 
1595  for (auto& free_page : free_pages_) {
1596  free_page.first->freePageDeferred(free_page.second);
1597  }
1598  free_pages_.clear();
1599 }
1600 
1601 FileBuffer* FileMgr::allocateBuffer(const size_t page_size,
1602  const ChunkKey& key,
1603  const size_t num_bytes) {
1604  return new FileBuffer(this, page_size, key, num_bytes);
1605 }
1606 
1608  const ChunkKey& key,
1609  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
1610  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
1611  return new FileBuffer(this, key, headerStartIt, headerEndIt);
1612 }
1613 
1614 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
1616  ChunkKey& chunk_key,
1617  int32_t contingent,
1618  int32_t page_epoch,
1619  int32_t page_num) {
1620  // If the parent FileMgr has a fileMgrKey, then all keys are locked to one table and
1621  // can be set from the manager.
1622  auto [db_id, tb_id] = get_fileMgrKey();
1623  chunk_key[CHUNK_KEY_DB_IDX] = db_id;
1624  chunk_key[CHUNK_KEY_TABLE_IDX] = tb_id;
1625 
1626  auto table_epoch = epoch(db_id, tb_id);
1627 
1628  if (is_page_deleted_with_checkpoint(table_epoch, page_epoch, contingent)) {
1629  file_info->freePageImmediate(page_num);
1630  return true;
1631  }
1632 
1633  // Recover page if it was deleted but not checkpointed.
1634  if (is_page_deleted_without_checkpoint(table_epoch, page_epoch, contingent)) {
1635  file_info->recoverPage(chunk_key, page_num);
1636  }
1637  return false;
1638 }
1639 
1641  FileBuffer* buf;
1643  auto chunk_it = chunkIndex_.find(key);
1644  if (chunk_it == chunkIndex_.end()) {
1645  buf = createBufferUnlocked(key);
1646  } else {
1647  buf = getBufferUnlocked(key);
1648  }
1649  return buf;
1650 }
1651 
1654  for (auto [key, buf] : chunkIndex_) {
1655  if (buf->isDirty()) {
1656  buf->writeMetadata(epoch());
1657  buf->clearDirtyBits();
1658  }
1659  }
1660 }
1661 
1664  return chunkIndex_.size();
1665 }
1666 
1667 boost::filesystem::path FileMgr::getFilePath(const std::string& file_name) const {
1668  return boost::filesystem::path(fileMgrBasePath_) / file_name;
1669 }
1670 
1671 size_t FileMgr::num_pages_per_data_file_{DEFAULT_NUM_PAGES_PER_DATA_FILE};
1672 size_t FileMgr::num_pages_per_metadata_file_{DEFAULT_NUM_PAGES_PER_METADATA_FILE};
1673 } // namespace File_Namespace
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:615
const size_t metadata_page_size_
Definition: FileMgr.h:536
std::vector< PageMapping > readPageMappingsFromStatusFile()
Definition: FileMgr.cpp:1528
virtual FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
Definition: FileMgr.cpp:734
int32_t readVersionFromDisk(const std::string &versionFileName) const
Definition: FileMgr.cpp:1061
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
OpenFilesResult openFiles()
Definition: FileMgr.cpp:196
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:866
size_t write(const size_t offset, const size_t size, const int8_t *buf)
Definition: FileInfo.cpp:69
TablePair fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:524
std::string getBasePath() const
int8_t * storage_ptr()
Definition: Epoch.h:61
FileMgr(const int32_t device_id, GlobalFileMgr *gfm, const TablePair file_mgr_key, const int32_t max_rollback_epochs=-1, const size_t num_reader_threads=0, const int32_t epoch=-1)
Constructor.
Definition: FileMgr.cpp:47
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:604
bool is_page_deleted_without_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:289
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:875
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
Definition: FileMgr.cpp:1027
void syncEncoder(const AbstractBuffer *src_buffer)
std::string get_legacy_data_file_path(const std::string &new_data_file_path)
Definition: File.cpp:51
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:713
const size_t page_size_
Definition: FileMgr.h:535
heavyai::shared_lock< heavyai::shared_mutex > read_lock
void sortAndCopyFilePagesForCompaction(size_t page_size, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1302
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
std::vector< HeaderInfo > header_infos
Definition: FileMgr.h:121
virtual FileBuffer * allocateBuffer(const size_t page_size, const ChunkKey &key, const size_t num_bytes=0)
Definition: FileMgr.cpp:1601
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:285
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void copySourcePageForCompaction(const Page &source_page, FileInfo *destination_file_info, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1397
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:871
virtual MemoryLevel getType() const =0
void freePageImmediate(int32_t page_num)
Definition: FileInfo.cpp:252
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:584
void setDataAndMetadataFileStats(StorageStats &storage_stats) const
Definition: FileMgr.cpp:346
void migrateToLatestFileMgrVersion()
Definition: FileMgr.cpp:1146
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
Definition: FileMgr.cpp:684
#define UNREACHABLE()
Definition: Logger.h:337
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
#define DATA_FILE_EXT
Definition: File.h:25
static constexpr char const * UPDATE_PAGE_VISIBILITY_STATUS
Definition: FileMgr.h:376
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:306
void init(const size_t num_reader_threads, const int32_t epochOverride)
Definition: FileMgr.cpp:256
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:750
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:331
GlobalFileMgr * gfm_
Definition: FileMgr.h:523
bool is_compaction_status_file(const std::string &file_name)
Definition: FileMgr.cpp:189
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:409
#define DEFAULT_METADATA_PAGE_SIZE
int32_t floor() const
Definition: Epoch.h:43
int32_t ceiling() const
Definition: Epoch.h:44
heavyai::unique_lock< heavyai::shared_mutex > write_lock
virtual bool updatePageIfDeleted(FileInfo *file_info, ChunkKey &chunk_key, int32_t contingent, int32_t page_epoch, int32_t page_num)
deletes or recovers a page based on last checkpointed epoch.
Definition: FileMgr.cpp:1615
std::optional< uint64_t > total_free_data_page_count
Definition: FileMgr.h:112
void writePageMappingsToStatusFile(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1512
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:57
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:57
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:703
int32_t copyPageWithoutHeaderSize(const Page &source_page, const Page &destination_page)
Definition: FileMgr.cpp:1427
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string fileMgrBasePath_
Definition: FileMgr.h:397
static void setNumPagesPerMetadataFile(size_t num_pages)
Definition: FileMgr.cpp:1570
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:297
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:795
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:160
static size_t num_pages_per_data_file_
Definition: FileMgr.h:417
const int32_t latestFileMgrVersion_
Definition: FileMgr.h:407
int32_t PageHeaderSizeType
Definition: FileMgr.h:127
std::shared_lock< T > shared_lock
int32_t db_version_
the index of the next file id
Definition: FileMgr.h:404
std::set< size_t > freePages
Definition: FileInfo.h:62
future< Result > async(Fn &&fn, Args &&...args)
size_t pageSize
file stream object for the represented file
Definition: FileInfo.h:59
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
Definition: FileMgr.cpp:1079
size_t getNumChunks() override
Definition: FileMgr.cpp:1662
int32_t incrementEpoch()
Definition: FileMgr.h:281
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
Definition: FileMgr.cpp:1206
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:419
FileBuffer * getOrCreateBuffer(const ChunkKey &key)
Definition: FileMgr.cpp:1640
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define CHECK_NE(x, y)
Definition: Logger.h:302
static void setNumPagesPerDataFile(size_t num_pages)
Definition: FileMgr.cpp:1566
std::optional< uint64_t > total_free_metadata_page_count
Definition: FileMgr.h:108
int32_t fileId
Definition: Page.h:47
int32_t getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:1037
string version
Definition: setup.in.py:73
void freePagesBeforeEpoch(const int32_t min_epoch)
Definition: FileMgr.cpp:670
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:326
PageSizeFileMMap fileIndex_
A map of files accessible via a file identifier.
Definition: FileMgr.h:401
std::unique_lock< T > unique_lock
virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const bool purge=true)
Definition: FileMgr.cpp:758
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
uint64_t total_metadata_page_count
Definition: FileMgr.h:107
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:988
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
Definition: FileMgr.h:414
static constexpr char FILE_MGR_VERSION_FILENAME[]
Definition: FileMgr.h:389
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:402
static size_t num_pages_per_metadata_file_
Definition: FileMgr.h:418
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:745
uint32_t getFragmentCount() const
Definition: FileMgr.cpp:410
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:944
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf, const std::string &file_path)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:142
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:656
boost::filesystem::path getFilePath(const std::string &file_name) const
Definition: FileMgr.cpp:1667
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
#define DEFAULT_PAGE_SIZE
std::string compaction_status_file_name
Definition: FileMgr.h:123
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:768
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:1006
heavyai::shared_mutex chunkIndexMutex_
Definition: FileMgr.h:410
const TablePair get_fileMgrKey() const
Definition: FileMgr.h:338
#define CHECK_LT(x, y)
Definition: Logger.h:303
static constexpr char EPOCH_FILENAME[]
Definition: FileMgr.h:387
virtual std::string describeSelf() const
Definition: FileMgr.cpp:697
virtual FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:723
heavyai::shared_mutex mutex_free_page_
Definition: FileMgr.h:413
int32_t maxRollbackEpochs_
Definition: FileMgr.h:396
void freePagesBeforeEpochUnlocked(const int32_t min_epoch, const ChunkKeyToChunkMap::iterator lower_bound, const ChunkKeyToChunkMap::iterator upper_bound)
Definition: FileMgr.cpp:675
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
uint64_t total_metadata_file_size
Definition: FileMgr.h:106
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:102
void openAndReadEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:636
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:75
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:277
StorageStats getStorageStats() const
Definition: FileMgr.cpp:337
bool hasChunkMetadataForKeyPrefix(const ChunkKey &keyPrefix)
Definition: FileMgr.cpp:994
static size_t byte_size()
Definition: Epoch.h:63
static constexpr char const * DELETE_EMPTY_FILES_STATUS
Definition: FileMgr.h:377
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:113
bool getDBConvert() const
Definition: FileMgr.cpp:1041
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
Definition: FileMgr.cpp:1125
virtual void free_page(std::pair< FileInfo *, int32_t > &&page)
Definition: FileMgr.cpp:1201
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
virtual void closeRemovePhysical()
Definition: FileMgr.cpp:577
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:107
static constexpr char const * COPY_PAGES_STATUS
Definition: FileMgr.h:375
int fsync(int fd)
Definition: heavyai_fs.cpp:62
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator) const
Definition: FileMgr.cpp:153
int32_t epoch() const
Definition: FileMgr.h:517
void openExistingFile(std::vector< HeaderInfo > &headerVec)
Definition: FileInfo.cpp:80
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:960
std::map< int32_t, FileInfo * > files_
Definition: FileMgr.h:400
void updateMappedPagesVisibility(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1457
#define CHECK(condition)
Definition: Logger.h:291
std::optional< uint32_t > fragment_count
Definition: FileMgr.h:113
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:783
void recoverPage(const ChunkKey &chunk_key, int32_t page_num)
Definition: FileInfo.cpp:265
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:128
virtual FileBuffer * getBufferUnlocked(const ChunkKey &key, const size_t numBytes=0) const
Definition: FileMgr.cpp:788
void setEpoch(const int32_t newEpoch)
Definition: FileMgr.cpp:1190
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
Definition: FileMgr.cpp:133
FileInfo * getFileInfoForFileId(const int32_t fileId) const
Definition: FileMgr.h:222
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1582
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:899
heavyai::shared_mutex files_rw_mutex_
Definition: FileMgr.h:411
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:91
std::string get_data_file_path(const std::string &base_path, int file_id, size_t page_size)
Definition: File.cpp:44
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:242
unsigned nextFileId_
number of threads used when loading data
Definition: FileMgr.h:403
int32_t epochFloor() const
Definition: FileMgr.h:279
bool is_metadata_file(size_t file_size, size_t page_size, size_t metadata_page_size, size_t num_pages_per_metadata_file)
Definition: FileMgr.cpp:328
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void renameCompactionStatusFile(const char *const from_status, const char *const to_status)
Definition: FileMgr.cpp:1555
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:79
constexpr auto kLegacyDataFileExtension
Definition: File.h:36
static constexpr char LEGACY_EPOCH_FILENAME[]
Definition: FileMgr.h:386
A selection of helper methods for File I/O.
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:814
#define VLOG(n)
Definition: Logger.h:387
Type timer_start()
Definition: measure.h:42
void resumeFileCompaction(const std::string &status_file_name)
Definition: FileMgr.cpp:1215
static constexpr char DB_META_FILENAME[]
Definition: FileMgr.h:388
static constexpr int32_t INVALID_VERSION
Definition: FileMgr.h:390