OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <fstream>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/checked_alloc.h"
41 #include "Shared/measure.h"
42 #include "Shared/scope.h"
43 
44 using namespace std;
45 
46 extern bool g_read_only;
47 extern bool g_multi_instance;
48 
49 namespace File_Namespace {
50 
51 FileMgr::FileMgr(const int32_t device_id,
52  GlobalFileMgr* gfm,
53  const TablePair file_mgr_key,
54  const int32_t max_rollback_epochs,
55  const size_t num_reader_threads,
56  const int32_t epoch)
57  : AbstractBufferMgr(device_id)
58  , maxRollbackEpochs_(max_rollback_epochs)
59  , nextFileId_(0)
60  , gfm_(gfm)
61  , fileMgrKey_(file_mgr_key)
62  , page_size_(gfm->getPageSize())
63  , metadata_page_size_(gfm->getMetadataPageSize()) {
64  init(num_reader_threads, epoch);
65 }
66 
67 // used only to initialize enough to drop
68 FileMgr::FileMgr(const int32_t device_id,
69  GlobalFileMgr* gfm,
70  const TablePair file_mgr_key,
71  const bool run_core_init)
72  : AbstractBufferMgr(device_id)
73  , maxRollbackEpochs_(-1)
74  , nextFileId_(0)
75  , gfm_(gfm)
76  , fileMgrKey_(file_mgr_key)
77  , page_size_(gfm->getPageSize())
78  , metadata_page_size_(gfm->getMetadataPageSize()) {
79  // TODO(Misiu): Standardize prefix and delim.
80  const std::string fileMgrDirPrefix("table");
81  const std::string fileMgrDirDelim("_");
82  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + fileMgrDirDelim +
83  std::to_string(fileMgrKey_.first) + // db_id
84  fileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
85  epochFile_ = nullptr;
86  files_.clear();
87  if (run_core_init) {
88  coreInit();
89  }
90 }
91 
92 FileMgr::FileMgr(GlobalFileMgr* gfm, std::string base_path)
93  : AbstractBufferMgr(0)
94  , maxRollbackEpochs_(-1)
95  , fileMgrBasePath_(base_path)
96  , nextFileId_(0)
97  , gfm_(gfm)
98  , fileMgrKey_(0, 0)
99  , page_size_(gfm->getPageSize())
100  , metadata_page_size_(gfm->getMetadataPageSize()) {
101  init(base_path, -1);
102 }
103 
104 // For testing purposes only
105 FileMgr::FileMgr(const int epoch)
106  : AbstractBufferMgr(-1)
107  , page_size_(DEFAULT_PAGE_SIZE)
108  , metadata_page_size_(DEFAULT_METADATA_PAGE_SIZE) {
109  epoch_.ceiling(epoch);
110 }
111 
112 // Used to initialize CachingFileMgr.
113 FileMgr::FileMgr(const size_t page_size, const size_t metadata_page_size)
114  : AbstractBufferMgr(0)
115  , page_size_(page_size)
116  , metadata_page_size_(metadata_page_size) {}
117 
119  // free memory used by FileInfo objects
120  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
121  delete chunkIt->second;
122  }
123 
124  files_.clear();
125 
126  if (epochFile_) {
127  close(epochFile_);
128  epochFile_ = nullptr;
129  }
130 
131  if (DBMetaFile_) {
133  DBMetaFile_ = nullptr;
134  }
135 }
136 
139  const std::string fileMgrDirPrefix("table");
140  const std::string FileMgrDirDelim("_");
141  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
142  std::to_string(fileMgrKey_.first) + // db_id
143  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
144  boost::filesystem::path path(fileMgrBasePath_);
145  if (boost::filesystem::exists(path)) {
146  if (!boost::filesystem::is_directory(path)) {
147  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
148  << "' for table data is not a directory.";
149  }
152  return true;
153  }
154  return false;
155 }
156 
158  const boost::filesystem::directory_iterator& fileIterator) const {
159  FileMetadata fileMetadata;
160  fileMetadata.is_data_file = false;
161  fileMetadata.file_path = fileIterator->path().string();
162  if (!boost::filesystem::is_regular_file(fileIterator->status())) {
163  return fileMetadata;
164  }
165  // note that boost::filesystem leaves preceding dot on
166  // extension - hence DATA_FILE_EXT is ".data"
167  std::string extension(fileIterator->path().extension().string());
168  if (extension == DATA_FILE_EXT) {
169  std::string fileStem(fileIterator->path().stem().string());
170  // remove trailing dot if any
171  if (fileStem.size() > 0 && fileStem.back() == '.') {
172  fileStem = fileStem.substr(0, fileStem.size() - 1);
173  }
174  size_t dotPos = fileStem.find_last_of("."); // should only be one
175  if (dotPos == std::string::npos) {
176  LOG(FATAL) << "File `" << fileIterator->path()
177  << "` does not carry page size information in the filename.";
178  }
179  fileMetadata.is_data_file = true;
180  fileMetadata.file_id = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
181  fileMetadata.page_size =
182  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
183 
184  fileMetadata.file_size = boost::filesystem::file_size(fileMetadata.file_path);
185  CHECK_EQ(fileMetadata.file_size % fileMetadata.page_size,
186  size_t(0)); // should be no partial pages
187  fileMetadata.num_pages = fileMetadata.file_size / fileMetadata.page_size;
188  }
189  return fileMetadata;
190 }
191 
192 namespace {
193 bool is_compaction_status_file(const std::string& file_name) {
194  return (file_name == FileMgr::COPY_PAGES_STATUS ||
197 }
198 } // namespace
199 
201  auto clock_begin = timer_start();
202  boost::filesystem::directory_iterator
203  end_itr; // default construction yields past-the-end
205  result.max_file_id = -1;
206  int32_t file_count = 0;
207  int32_t thread_count = std::thread::hardware_concurrency();
208  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
209  boost::filesystem::path path(fileMgrBasePath_);
210  for (boost::filesystem::directory_iterator file_it(path); file_it != end_itr;
211  ++file_it) {
212  FileMetadata file_metadata = getMetadataForFile(file_it);
213  if (file_metadata.is_data_file) {
214  result.max_file_id = std::max(result.max_file_id, file_metadata.file_id);
215  file_futures.emplace_back(std::async(std::launch::async, [file_metadata, this] {
216  std::vector<HeaderInfo> temp_header_vec;
217  openExistingFile(file_metadata.file_path,
218  file_metadata.file_id,
219  file_metadata.page_size,
220  file_metadata.num_pages,
221  temp_header_vec);
222  return temp_header_vec;
223  }));
224  file_count++;
225  if (file_count % thread_count == 0) {
226  processFileFutures(file_futures, result.header_infos);
227  }
228  }
229 
230  if (is_compaction_status_file(file_it->path().filename().string())) {
231  CHECK(result.compaction_status_file_name.empty());
232  result.compaction_status_file_name = file_it->path().filename().string();
233  }
234  }
235 
236  if (file_futures.size() > 0) {
237  processFileFutures(file_futures, result.header_infos);
238  }
239 
240  int64_t queue_time_ms = timer_stop(clock_begin);
241  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : " << queue_time_ms
242  << "ms Epoch: " << epoch_.ceiling() << " files read: " << file_count
243  << " table location: '" << fileMgrBasePath_ << "'";
244  return result;
245 }
246 
248  files_.clear();
249  fileIndex_.clear();
250 }
251 
252 void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) {
253  // if epochCeiling = -1 this means open from epoch file
254 
255  const bool dataExists = coreInit();
256  if (dataExists) {
257  if (epochOverride != -1) { // if opening at specified epoch
258  setEpoch(epochOverride);
259  }
260 
261  auto open_files_result = openFiles();
262  if (!open_files_result.compaction_status_file_name.empty()) {
263  resumeFileCompaction(open_files_result.compaction_status_file_name);
264  clearFileInfos();
265  open_files_result = openFiles();
266  CHECK(open_files_result.compaction_status_file_name.empty());
267  }
268 
269  /* Sort headerVec so that all HeaderInfos
270  * from a chunk will be grouped together
271  * and in order of increasing PageId
272  * - Version Epoch */
273  auto& header_vec = open_files_result.header_infos;
274  std::sort(header_vec.begin(), header_vec.end());
275 
276  /* Goal of next section is to find sequences in the
277  * sorted headerVec of the same ChunkId, which we
278  * can then initiate a FileBuffer with */
279 
280  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
281  if (header_vec.size() > 0) {
282  ChunkKey lastChunkKey = header_vec.begin()->chunkKey;
283  auto startIt = header_vec.begin();
284 
285  for (auto headerIt = header_vec.begin() + 1; headerIt != header_vec.end();
286  ++headerIt) {
287  if (headerIt->chunkKey != lastChunkKey) {
288  createBufferFromHeaders(lastChunkKey, startIt, headerIt);
289  lastChunkKey = headerIt->chunkKey;
290  startIt = headerIt;
291  }
292  }
293  // now need to insert last Chunk
294  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
295  }
296  nextFileId_ = open_files_result.max_file_id + 1;
297  rollOffOldData(epoch(), true /* only checkpoint if data is rolled off */);
298  incrementEpoch();
299  freePages();
300  } else {
301  boost::filesystem::path path(fileMgrBasePath_);
302  readOnlyCheck("create file", path.string());
303  if (!boost::filesystem::create_directory(path)) {
304  LOG(FATAL) << "Could not create data directory: " << path;
305  }
307  if (epochOverride != -1) {
308  epoch_.floor(epochOverride);
309  epoch_.ceiling(epochOverride);
310  } else {
311  // These are default constructor values for epoch_, but resetting here for clarity
312  epoch_.floor(0);
313  epoch_.ceiling(0);
314  }
317  incrementEpoch();
318  }
319 
320  initializeNumThreads(num_reader_threads);
321  isFullyInitted_ = true;
322 }
323 
324 namespace {
326  size_t page_size,
327  size_t metadata_page_size,
328  size_t num_pages_per_metadata_file) {
329  return (file_size == (metadata_page_size * num_pages_per_metadata_file) &&
330  page_size == metadata_page_size);
331 }
332 } // namespace
333 
335  StorageStats storage_stats;
336  setDataAndMetadataFileStats(storage_stats);
337  if (isFullyInitted_) {
338  storage_stats.fragment_count = getFragmentCount();
339  }
340  return storage_stats;
341 }
342 
345  if (!isFullyInitted_) {
346  CHECK(!fileMgrBasePath_.empty());
347  boost::filesystem::path path(fileMgrBasePath_);
348  if (boost::filesystem::exists(path)) {
349  if (!boost::filesystem::is_directory(path)) {
350  LOG(FATAL) << "getStorageStats: Specified path '" << fileMgrBasePath_
351  << "' for table data is not a directory.";
352  }
353 
354  storage_stats.epoch = lastCheckpointedEpoch();
355  storage_stats.epoch_floor = epochFloor();
356  boost::filesystem::directory_iterator
357  endItr; // default construction yields past-the-end
358  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
359  ++fileIt) {
360  FileMetadata file_metadata = getMetadataForFile(fileIt);
361  if (file_metadata.is_data_file) {
362  if (is_metadata_file(file_metadata.file_size,
363  file_metadata.page_size,
366  storage_stats.metadata_file_count++;
367  storage_stats.total_metadata_file_size += file_metadata.file_size;
368  storage_stats.total_metadata_page_count += file_metadata.num_pages;
369  } else {
370  storage_stats.data_file_count++;
371  storage_stats.total_data_file_size += file_metadata.file_size;
372  storage_stats.total_data_page_count += file_metadata.num_pages;
373  }
374  }
375  }
376  }
377  } else {
378  storage_stats.epoch = lastCheckpointedEpoch();
379  storage_stats.epoch_floor = epochFloor();
380  storage_stats.total_free_metadata_page_count = 0;
381  storage_stats.total_free_data_page_count = 0;
382 
383  // We already initialized this table so take the faster path of walking through the
384  // FileInfo objects and getting metadata from there
385  for (const auto& file_info_entry : files_) {
386  const auto file_info = file_info_entry.second.get();
387  if (is_metadata_file(file_info->size(),
388  file_info->pageSize,
391  storage_stats.metadata_file_count++;
392  storage_stats.total_metadata_file_size +=
393  file_info->pageSize * file_info->numPages;
394  storage_stats.total_metadata_page_count += file_info->numPages;
395  storage_stats.total_free_metadata_page_count.value() +=
396  file_info->freePages.size();
397  } else {
398  storage_stats.data_file_count++;
399  storage_stats.total_data_file_size += file_info->pageSize * file_info->numPages;
400  storage_stats.total_data_page_count += file_info->numPages;
401  storage_stats.total_free_data_page_count.value() += file_info->freePages.size();
402  }
403  }
404  }
405 }
406 
407 uint32_t FileMgr::getFragmentCount() const {
409  std::set<int32_t> fragment_ids;
410  for (const auto& [chunk_key, file_buffer] : chunkIndex_) {
411  fragment_ids.emplace(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
412  }
413  return static_cast<uint32_t>(fragment_ids.size());
414 }
415 
417  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
418  std::vector<HeaderInfo>& headerVec) {
419  for (auto& file_future : file_futures) {
420  file_future.wait();
421  }
422  // concatenate the vectors after thread completes
423  for (auto& file_future : file_futures) {
424  auto tempHeaderVec = file_future.get();
425  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
426  }
427  file_futures.clear();
428 }
429 
430 void FileMgr::init(const std::string& dataPathToConvertFrom,
431  const int32_t epochOverride) {
432  int32_t converted_data_epoch = 0;
433  boost::filesystem::path path(dataPathToConvertFrom);
434  if (boost::filesystem::exists(path)) {
435  if (!boost::filesystem::is_directory(path)) {
436  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
437  }
439 
440  if (epochOverride != -1) { // if opening at previous epoch
441  setEpoch(epochOverride);
442  }
443 
444  boost::filesystem::directory_iterator
445  endItr; // default construction yields past-the-end
446  int32_t maxFileId = -1;
447  int32_t fileCount = 0;
448  int32_t threadCount = std::thread::hardware_concurrency();
449  std::vector<HeaderInfo> headerVec;
450  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
451  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
452  FileMetadata fileMetadata = getMetadataForFile(fileIt);
453  if (fileMetadata.is_data_file) {
454  maxFileId = std::max(maxFileId, fileMetadata.file_id);
455  file_futures.emplace_back(std::async(std::launch::async, [fileMetadata, this] {
456  std::vector<HeaderInfo> tempHeaderVec;
457  openExistingFile(fileMetadata.file_path,
458  fileMetadata.file_id,
459  fileMetadata.page_size,
460  fileMetadata.num_pages,
461  tempHeaderVec);
462  return tempHeaderVec;
463  }));
464  fileCount++;
465  if (fileCount % threadCount) {
466  processFileFutures(file_futures, headerVec);
467  }
468  }
469  }
470 
471  if (file_futures.size() > 0) {
472  processFileFutures(file_futures, headerVec);
473  }
474 
475  /* Sort headerVec so that all HeaderInfos
476  * from a chunk will be grouped together
477  * and in order of increasing PageId
478  * - Version Epoch */
479 
480  std::sort(headerVec.begin(), headerVec.end());
481 
482  /* Goal of next section is to find sequences in the
483  * sorted headerVec of the same ChunkId, which we
484  * can then initiate a FileBuffer with */
485 
486  if (headerVec.size() > 0) {
487  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
488  auto startIt = headerVec.begin();
489 
490  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
491  ++headerIt) {
492  if (headerIt->chunkKey != lastChunkKey) {
493  FileMgr* c_fm_ =
494  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
495  CHECK(c_fm_);
496  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerIt);
497  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
498  destBuf->syncEncoder(srcBuf);
499  destBuf->setSize(srcBuf->size());
500  destBuf->setDirty(); // this needs to be set to force writing out metadata
501  // files from "checkpoint()" call
502 
503  size_t totalNumPages = srcBuf->getMultiPage().size();
504  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
505  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
506  Page destPage = c_fm_->requestFreePage(
507  srcBuf->pageSize(),
508  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
509  MultiPage multiPage(srcBuf->pageSize());
510  multiPage.push(destPage, converted_data_epoch);
511  destBuf->multiPages_.push_back(multiPage);
512  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
513  copyPage(
514  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
515  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
516  }
517  lastChunkKey = headerIt->chunkKey;
518  startIt = headerIt;
519  }
520  }
521 
522  // now need to insert last Chunk
523  FileMgr* c_fm_ =
524  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
525  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerVec.end());
526  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
527  destBuf->syncEncoder(srcBuf);
528  destBuf->setSize(srcBuf->size());
529  destBuf->setDirty(); // this needs to be set to write out metadata file from the
530  // "checkpoint()" call
531 
532  size_t totalNumPages = srcBuf->getMultiPage().size();
533  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
534  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
535  Page destPage = c_fm_->requestFreePage(
536  srcBuf->pageSize(),
537  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
538  MultiPage multiPage(srcBuf->pageSize());
539  multiPage.push(destPage, converted_data_epoch);
540  destBuf->multiPages_.push_back(multiPage);
541  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
542  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
543  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
544  }
545  }
546  nextFileId_ = maxFileId + 1;
547  } else {
548  readOnlyCheck("create file", path.string());
549  if (!boost::filesystem::create_directory(path)) {
550  LOG(FATAL) << "Specified path does not exist: " << path;
551  }
552  }
553  isFullyInitted_ = true;
554 }
555 
557  for (const auto& [idx, file_info] : files_) {
558  if (file_info->f) {
559  close(file_info->f);
560  file_info->f = nullptr;
561  }
562  }
563 
564  if (DBMetaFile_) {
566  DBMetaFile_ = nullptr;
567  }
568 
569  if (epochFile_) {
570  close(epochFile_);
571  epochFile_ = nullptr;
572  }
573 }
574 
577  auto path = getFileMgrBasePath();
578  readOnlyCheck("rename file", path);
579 
581  /* rename for later deletion the directory containing table related data */
583 }
584 
585 // TODO(Misiu): This function is almost identical to FileInfo::copyPage. Deduplicate.
586 void FileMgr::copyPage(Page& srcPage,
587  FileMgr* destFileMgr,
588  Page& destPage,
589  const size_t reservedHeaderSize,
590  const size_t numBytes,
591  const size_t offset) {
592  CHECK(offset + numBytes <= page_size_);
593  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
594  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
595  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
596  ScopeGuard guard = [&buffer] { ::free(buffer); };
597 
598  size_t bytesRead = srcFileInfo->read(
599  srcPage.pageNum * page_size_ + offset + reservedHeaderSize, numBytes, buffer);
600  CHECK(bytesRead == numBytes);
601  size_t bytesWritten = destFileInfo->write(
602  destPage.pageNum * page_size_ + offset + reservedHeaderSize, numBytes, buffer);
603  CHECK(bytesWritten == numBytes);
604 }
605 
606 void FileMgr::createEpochFile(const std::string& epochFileName) {
607  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
608  readOnlyCheck("create file", epochFilePath);
609  if (boost::filesystem::exists(epochFilePath)) {
610  LOG(FATAL) << "Epoch file '" << epochFilePath << "' already exists";
611  }
612  epochFile_ = create(epochFilePath, sizeof(Epoch::byte_size()));
613  // Write out current epoch to file - which if this
614  // function is being called should be 0
616 }
617 
618 int32_t FileMgr::openAndReadLegacyEpochFile(const std::string& epochFileName) {
619  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
620  if (!boost::filesystem::exists(epochFilePath)) {
621  return 0;
622  }
623 
624  if (!boost::filesystem::is_regular_file(epochFilePath)) {
625  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
626  }
627  if (boost::filesystem::file_size(epochFilePath) < 4) {
628  LOG(FATAL) << "Epoch file `" << epochFilePath
629  << "` is not sized properly (current size: "
630  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
631  }
632  FILE* legacyEpochFile = open(epochFilePath);
633  int32_t epoch;
634  read(legacyEpochFile, 0, sizeof(int32_t), (int8_t*)&epoch, epochFilePath);
635  close(legacyEpochFile);
636  return epoch;
637 }
638 
639 void FileMgr::openAndReadEpochFile(const std::string& epochFileName) {
640  if (!epochFile_) { // Check to see if already open
641  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
642  if (!boost::filesystem::exists(epochFilePath)) {
643  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
644  }
645  if (!boost::filesystem::is_regular_file(epochFilePath)) {
646  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
647  }
648  if (boost::filesystem::file_size(epochFilePath) != Epoch::byte_size()) {
649  LOG(FATAL) << "Epoch file `" << epochFilePath
650  << "` is not sized properly (current size: "
651  << boost::filesystem::file_size(epochFilePath)
652  << ", expected size: " << Epoch::byte_size() << ")";
653  }
654  epochFile_ = open(epochFilePath);
655  }
656  read(epochFile_, 0, Epoch::byte_size(), epoch_.storage_ptr(), epochFileName);
657 }
658 
660  CHECK(epochFile_);
662  int32_t status = fflush(epochFile_);
663  CHECK(status == 0) << "Could not flush epoch file to disk";
664 #ifdef __APPLE__
665  status = fcntl(fileno(epochFile_), 51);
666 #else
667  status = heavyai::fsync(fileno(epochFile_));
668 #endif
669  CHECK(status == 0) << "Could not sync epoch file to disk";
670  epochIsCheckpointed_ = true;
671 }
672 
673 void FileMgr::freePagesBeforeEpoch(const int32_t min_epoch) {
675  freePagesBeforeEpochUnlocked(min_epoch, chunkIndex_.begin(), chunkIndex_.end());
676 }
677 
679  const int32_t min_epoch,
680  const ChunkKeyToChunkMap::iterator lower_bound,
681  const ChunkKeyToChunkMap::iterator upper_bound) {
682  for (auto chunkIt = lower_bound; chunkIt != upper_bound; ++chunkIt) {
683  chunkIt->second->freePagesBeforeEpoch(min_epoch);
684  }
685 }
686 
687 void FileMgr::rollOffOldData(const int32_t epoch_ceiling, const bool should_checkpoint) {
688  if (maxRollbackEpochs_ >= 0) {
689  auto min_epoch = std::max(epoch_ceiling - maxRollbackEpochs_, epoch_.floor());
690  if (min_epoch > epoch_.floor()) {
691  freePagesBeforeEpoch(min_epoch);
692  epoch_.floor(min_epoch);
693  if (should_checkpoint) {
694  checkpoint();
695  }
696  }
697  }
698 }
699 
700 std::string FileMgr::describeSelf() const {
701  stringstream ss;
702  ss << "table (" << fileMgrKey_.first << ", " << fileMgrKey_.second << ")";
703  return ss.str();
704 }
705 
707  VLOG(2) << "Checkpointing " << describeSelf() << " epoch: " << epoch();
709  rollOffOldData(epoch(), false /* shouldCheckpoint */);
710  syncFilesToDisk();
712  incrementEpoch();
713  freePages();
714 }
715 
717  const size_t page_size,
718  const size_t num_bytes) {
720  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
721  << "Chunk already exists: " + show_chunk(key);
722  return createBufferUnlocked(key, page_size, num_bytes);
723 }
724 
725 // Assumes checks for pre-existing key have already occured.
727  const size_t page_size,
728  const size_t num_bytes) {
729  size_t actual_page_size = page_size;
730  if (actual_page_size == 0) {
731  actual_page_size = page_size_;
732  }
733  chunkIndex_[key] = allocateBuffer(actual_page_size, key, num_bytes);
734  return (chunkIndex_[key]);
735 }
736 
738  const ChunkKey& key,
739  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
740  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
742  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
743  << "Chunk already exists for key: " << show_chunk(key);
744  chunkIndex_[key] = allocateBuffer(key, headerStartIt, headerEndIt);
745  return (chunkIndex_[key]);
746 }
747 
750  return chunkIndex_.find(key) != chunkIndex_.end();
751 }
752 
753 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
755  auto chunk_it = chunkIndex_.find(key);
756  CHECK(chunk_it != chunkIndex_.end()) << "Chunk does not exist: " << show_chunk(key);
757  deleteBufferUnlocked(chunk_it, purge);
758 }
759 
760 ChunkKeyToChunkMap::iterator FileMgr::deleteBufferUnlocked(
761  const ChunkKeyToChunkMap::iterator chunk_it,
762  const bool purge) {
763  if (purge) {
764  chunk_it->second->freePages();
765  }
766  delete chunk_it->second;
767  return chunkIndex_.erase(chunk_it);
768 }
769 
770 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
772  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
773  if (chunkIt == chunkIndex_.end()) {
774  return; // should we throw?
775  }
776  while (chunkIt != chunkIndex_.end() &&
777  std::search(chunkIt->first.begin(),
778  chunkIt->first.begin() + keyPrefix.size(),
779  keyPrefix.begin(),
780  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
781  deleteBufferUnlocked(chunkIt++, purge);
782  }
783 }
784 
785 FileBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
787  return getBufferUnlocked(key, num_bytes);
788 }
789 
791  const size_t num_bytes) const {
792  auto chunk_it = chunkIndex_.find(key);
793  CHECK(chunk_it != chunkIndex_.end()) << "Chunk does not exist: " << show_chunk(key);
794  return chunk_it->second;
795 }
796 
798  AbstractBuffer* destBuffer,
799  const size_t numBytes) {
800  // reads chunk specified by ChunkKey into AbstractBuffer provided by
801  // destBuffer
802  CHECK(!destBuffer->isDirty())
803  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
804  << show_chunk(key);
805  AbstractBuffer* chunk = getBuffer(key);
806  // chunk's size is either specified in function call with numBytes or we
807  // just look at pageSize * numPages in FileBuffer
808  if (numBytes > 0 && numBytes > chunk->size()) {
809  LOG(FATAL) << "Chunk retrieved for key `" << show_chunk(key) << "` is smaller ("
810  << chunk->size() << ") than number of bytes requested (" << numBytes
811  << ")";
812  }
813  chunk->copyTo(destBuffer, numBytes);
814 }
815 
817  AbstractBuffer* srcBuffer,
818  const size_t numBytes) {
819  auto chunk = getOrCreateBuffer(key);
820  size_t oldChunkSize = chunk->size();
821  // write the buffer's data to the Chunk
822  size_t newChunkSize = (numBytes == 0) ? srcBuffer->size() : numBytes;
823  if (chunk->isDirty()) {
824  // multiple appends are allowed,
825  // but only single update is allowed
826  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
827  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
828  "for key: "
829  << show_chunk(key);
830  }
831  }
832  CHECK(srcBuffer->isDirty()) << "putBuffer expects a dirty buffer";
833  if (srcBuffer->isUpdated()) {
834  // chunk size is not changed when fixed rows are updated or are marked as deleted.
835  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
836  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
837  // For varlen update, it takes another route via fragmenter using disk-level buffer.
838  if (0 == numBytes && !chunk->isDirty()) {
839  chunk->setSize(newChunkSize);
840  }
841  //@todo use dirty flags to only flush pages of chunk that need to
842  // be flushed
843  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
844  newChunkSize,
845  0,
846  srcBuffer->getType(),
847  srcBuffer->getDeviceId());
848  } else if (srcBuffer->isAppended()) {
849  CHECK_LT(oldChunkSize, newChunkSize);
850  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
851  newChunkSize - oldChunkSize,
852  srcBuffer->getType(),
853  srcBuffer->getDeviceId());
854  } else {
855  // If dirty buffer comes in unmarked, it must be empty.
856  // Encoder sync is still required to flush the metadata.
857  CHECK(numBytes == 0)
858  << "Dirty buffer with size > 0 must be marked as isAppended() or isUpdated()";
859  }
860  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
861  //@todo commenting out line above will make sure this metadata is set
862  // but will trigger error on fetch chunk
863  srcBuffer->clearDirtyBits();
864  chunk->syncEncoder(srcBuffer);
865  return chunk;
866 }
867 
868 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
869  LOG(FATAL) << "Operation not supported";
870  return nullptr; // satisfy return-type warning
871 }
872 
874  LOG(FATAL) << "Operation not supported";
875 }
876 
877 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
878  std::lock_guard<std::mutex> lock(getPageMutex_);
879 
880  auto candidateFiles = fileIndex_.equal_range(pageSize);
881  int32_t pageNum = -1;
882  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
883  FileInfo* fileInfo = getFileInfoForFileId(fileIt->second);
884  pageNum = fileInfo->getFreePage();
885  if (pageNum != -1) {
886  return (Page(fileInfo->fileId, pageNum));
887  }
888  }
889  // if here then we need to add a file
890  FileInfo* fileInfo;
891  if (isMetadata) {
892  fileInfo = createFileInfo(pageSize, num_pages_per_metadata_file_);
893  } else {
894  fileInfo = createFileInfo(pageSize, num_pages_per_data_file_);
895  }
896  pageNum = fileInfo->getFreePage();
897  CHECK(pageNum != -1);
898  return (Page(fileInfo->fileId, pageNum));
899 }
900 
901 void FileMgr::requestFreePages(size_t numPagesRequested,
902  size_t pageSize,
903  std::vector<Page>& pages,
904  const bool isMetadata) {
905  // not used currently
906  // @todo add method to FileInfo to get more than one page
907  std::lock_guard<std::mutex> lock(getPageMutex_);
908  auto candidateFiles = fileIndex_.equal_range(pageSize);
909  size_t numPagesNeeded = numPagesRequested;
910  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
911  FileInfo* fileInfo = getFileInfoForFileId(fileIt->second);
912  int32_t pageNum;
913  do {
914  pageNum = fileInfo->getFreePage();
915  if (pageNum != -1) {
916  pages.emplace_back(fileInfo->fileId, pageNum);
917  numPagesNeeded--;
918  }
919  } while (pageNum != -1 && numPagesNeeded > 0);
920  if (numPagesNeeded == 0) {
921  break;
922  }
923  }
924  while (numPagesNeeded > 0) {
925  FileInfo* fileInfo;
926  if (isMetadata) {
927  fileInfo = createFileInfo(pageSize, num_pages_per_metadata_file_);
928  } else {
929  fileInfo = createFileInfo(pageSize, num_pages_per_data_file_);
930  }
931  int32_t pageNum;
932  do {
933  pageNum = fileInfo->getFreePage();
934  if (pageNum != -1) {
935  pages.emplace_back(fileInfo->fileId, pageNum);
936  numPagesNeeded--;
937  }
938  } while (pageNum != -1 && numPagesNeeded > 0);
939  if (numPagesNeeded == 0) {
940  break;
941  }
942  }
943  CHECK(pages.size() == numPagesRequested);
944 }
945 
946 FileInfo* FileMgr::openExistingFile(const std::string& path,
947  const int fileId,
948  const size_t pageSize,
949  const size_t numPages,
950  std::vector<HeaderInfo>& headerVec) {
951  FILE* f = open(path);
952  auto file_info = std::make_unique<FileInfo>(this,
953  fileId,
954  f,
955  pageSize,
956  numPages,
957  path,
958  false); // false means don't init file
959 
960  file_info->openExistingFile(headerVec);
962  CHECK(files_.find(fileId) == files_.end()) << "Attempting to re-open file";
963  files_.emplace(fileId, std::move(file_info));
964  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
965  return getFileInfoForFileId(fileId);
966 }
967 
968 FileInfo* FileMgr::createFileInfo(const size_t pageSize, const size_t numPages) {
969  readOnlyCheck("create file",
971  // check arguments
972  if (pageSize == 0 || numPages == 0) {
973  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
974  }
975 
976  // create the new file
977  auto [f, file_path] = create(fileMgrBasePath_,
978  nextFileId_,
979  pageSize,
980  numPages); // TM: not sure if I like naming scheme here -
981  // should be in separate namespace?
982  CHECK(f);
983 
984  // instantiate a new FileInfo for the newly created file
985  int32_t fileId = nextFileId_++;
986  auto fInfo = std::make_unique<FileInfo>(this,
987  fileId,
988  f,
989  pageSize,
990  numPages,
991  file_path,
992  true); // true means init file
993  CHECK(fInfo);
994 
996  // update file manager data structures
997  files_[fileId] = std::move(fInfo);
998  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
999 
1000  return getFileInfoForFileId(fileId);
1001 }
1002 
1003 FILE* FileMgr::getFileForFileId(const int32_t fileId) {
1004  CHECK(fileId >= 0);
1005  CHECK(files_.find(fileId) != files_.end()) << "File does not exist for id: " << fileId;
1006  return files_.at(fileId)->f;
1007 }
1008 
1011  auto chunk_it = chunkIndex_.lower_bound(key_prefix);
1012  if (chunk_it == chunkIndex_.end()) {
1013  return false;
1014  } else {
1015  auto it_pair =
1016  std::mismatch(key_prefix.begin(), key_prefix.end(), chunk_it->first.begin());
1017  return it_pair.first == key_prefix.end();
1018  }
1019 }
1020 
1022  const ChunkKey& keyPrefix) {
1024  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
1025  if (chunkIt == chunkIndex_.end()) {
1026  return; // throw?
1027  }
1028  while (chunkIt != chunkIndex_.end() &&
1029  std::search(chunkIt->first.begin(),
1030  chunkIt->first.begin() + keyPrefix.size(),
1031  keyPrefix.begin(),
1032  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
1033  if (chunkIt->second->hasEncoder()) {
1034  auto chunk_metadata = std::make_shared<ChunkMetadata>();
1035  chunkIt->second->encoder_->getMetadata(chunk_metadata);
1036  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
1037  }
1038  chunkIt++;
1039  }
1040 }
1041 
1044  const auto& chunkIt = chunkIndex_.find(chunkKey);
1045  if (chunkIt != chunkIndex_.end()) {
1046  return chunkIt->second->numMetadataPages();
1047  } else {
1048  throw std::runtime_error("Chunk was not found.");
1049  }
1050 }
1051 
1053  return gfm_->getDBConvert();
1054 }
1055 
1057  auto file_version = readVersionFromDisk(DB_META_FILENAME);
1058  auto gfm_version = gfm_->db_version_;
1059 
1060  if (file_version > gfm_version) {
1061  LOG(FATAL) << "DB forward compatibility is not supported. Version of HeavyDB "
1062  "software used is older than the version of DB being read: "
1063  << file_version;
1064  }
1065  if (file_version == INVALID_VERSION || file_version < gfm_version) {
1066  // new system, or we are moving forward versions
1067  // system wide migration would go here if required
1069  return;
1070  }
1071 }
1072 
1073 int32_t FileMgr::readVersionFromDisk(const std::string& versionFileName) const {
1074  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1075  if (!boost::filesystem::exists(versionFilePath) ||
1076  !boost::filesystem::is_regular_file(versionFilePath) ||
1077  boost::filesystem::file_size(versionFilePath) < 4) {
1078  return INVALID_VERSION;
1079  }
1080  FILE* versionFile = open(versionFilePath);
1081  int32_t version;
1082  read(versionFile, 0, sizeof(int32_t), (int8_t*)&version, versionFilePath);
1083  close(versionFile);
1084  return version;
1085 }
1086 
1087 void FileMgr::writeAndSyncVersionToDisk(const std::string& versionFileName,
1088  const int32_t version) {
1089  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1090  readOnlyCheck("write file", versionFilePath);
1091  FILE* versionFile;
1092  if (boost::filesystem::exists(versionFilePath)) {
1093  int32_t oldVersion = readVersionFromDisk(versionFileName);
1094  LOG(INFO) << "Storage version file `" << versionFilePath
1095  << "` already exists, its current version is " << oldVersion;
1096  versionFile = open(versionFilePath);
1097  } else {
1098  versionFile = create(versionFilePath, sizeof(int32_t));
1099  }
1100  write(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1101  int32_t status = fflush(versionFile);
1102  if (status != 0) {
1103  LOG(FATAL) << "Could not flush version file " << versionFilePath << " to disk";
1104  }
1105 #ifdef __APPLE__
1106  status = fcntl(fileno(epochFile_), 51);
1107 #else
1108  status = heavyai::fsync(fileno(versionFile));
1109 #endif
1110  if (status != 0) {
1111  LOG(FATAL) << "Could not sync version file " << versionFilePath << " to disk";
1112  }
1113  close(versionFile);
1114 }
1115 
1117  const std::string versionFilePath(fileMgrBasePath_ + "/" + FILE_MGR_VERSION_FILENAME);
1118  LOG(INFO) << "Migrating file format version from 0 to 1 for `" << versionFilePath;
1119  readOnlyCheck("migrate epoch file", versionFilePath);
1124  int32_t migrationCompleteVersion = 1;
1125  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migrationCompleteVersion);
1126 }
1127 
1129  LOG(INFO) << "Migrating file format version from 1 to 2";
1130  readOnlyCheck("migrate file format version");
1132  constexpr int32_t migration_complete_version{2};
1133  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migration_complete_version);
1134 }
1135 
1136 void FileMgr::renameAndSymlinkLegacyFiles(const std::string& table_data_dir) {
1137  std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
1138  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
1139  it++) {
1140  const auto old_path = boost::filesystem::canonical(it->path());
1141  if (boost::filesystem::is_regular_file(it->status()) &&
1142  old_path.extension().string() == kLegacyDataFileExtension) {
1143  auto new_path = old_path;
1144  new_path.replace_extension(DATA_FILE_EXT);
1145  old_to_new_paths[old_path] = new_path;
1146  }
1147  }
1148  for (const auto& [old_path, new_path] : old_to_new_paths) {
1149  boost::filesystem::rename(old_path, new_path);
1150  LOG(INFO) << "Rebrand migration: Renamed " << old_path << " to " << new_path;
1151  boost::filesystem::create_symlink(new_path.filename(), old_path);
1152  LOG(INFO) << "Rebrand migration: Added symlink from " << old_path << " to "
1153  << new_path.filename();
1154  }
1155 }
1156 
1160  fileMgrVersion_ = 0;
1163  LOG(FATAL)
1164  << "Table storage forward compatibility is not supported. Version of HeavyDB "
1165  "software used is older than the version of table being read: "
1166  << fileMgrVersion_;
1167  }
1168 
1170  switch (fileMgrVersion_) {
1171  case 0: {
1173  break;
1174  }
1175  case 1: {
1177  break;
1178  }
1179  default: {
1180  UNREACHABLE();
1181  }
1182  }
1183  fileMgrVersion_++;
1184  }
1185 }
1186 
1187 /*
1188  * @brief sets the epoch to a user-specified value
1189  *
1190  * With the introduction of optional capped history on files, the possibility of
1191  * multiple successive rollbacks to earlier epochs means that we cannot rely on
1192  * the maxRollbackEpochs_ variable alone (initialized from a value stored in Catalog) to
1193  * guarantee that we can set an epoch at any given value. This function checks the
1194  * user-specified epoch value to ensure it is both not higher than the last checkpointed
1195  * epoch AND it is >= the epoch floor, which on an uncapped value will be the epoch the
1196  * table table was created at (default 0), and for capped tables, the lowest epoch for
1197  * which we have complete data, now materialized in the epoch metadata itself. */
1198 
1199 void FileMgr::setEpoch(const int32_t newEpoch) {
1200  if (newEpoch < epoch_.floor()) {
1201  std::stringstream error_message;
1202  error_message << "Cannot set epoch for " << describeSelf()
1203  << " lower than the minimum rollback epoch (" << epoch_.floor() << ").";
1204  throw std::runtime_error(error_message.str());
1205  }
1206  epoch_.ceiling(newEpoch);
1208 }
1209 
1210 void FileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
1211  std::unique_lock<heavyai::shared_mutex> lock(mutex_free_page_);
1212  free_pages_.push_back(page);
1213 }
1214 
1215 void FileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t table_id) {
1216  UNREACHABLE();
1217 }
1218 
1224 void FileMgr::resumeFileCompaction(const std::string& status_file_name) {
1225  readOnlyCheck("resume file compaction", status_file_name);
1226 
1227  if (status_file_name == COPY_PAGES_STATUS) {
1228  // Delete status file and restart data compaction process
1229  auto file_path = getFilePath(status_file_name);
1230  CHECK(boost::filesystem::exists(file_path));
1231  boost::filesystem::remove(file_path);
1232  compactFiles();
1233  } else if (status_file_name == UPDATE_PAGE_VISIBILITY_STATUS) {
1234  // Execute second and third phases of data compaction
1236  auto page_mappings = readPageMappingsFromStatusFile();
1237  updateMappedPagesVisibility(page_mappings);
1239  deleteEmptyFiles();
1240  } else if (status_file_name == DELETE_EMPTY_FILES_STATUS) {
1241  // Execute last phase of data compaction
1243  deleteEmptyFiles();
1244  } else {
1245  UNREACHABLE() << "Unexpected status file name: " << status_file_name;
1246  }
1247 }
1248 
1278 
1279  readOnlyCheck("run file compaction");
1280 
1281  if (files_.empty()) {
1282  return;
1283  }
1284 
1285  auto copy_pages_status_file_path = getFilePath(COPY_PAGES_STATUS);
1286  CHECK(!boost::filesystem::exists(copy_pages_status_file_path));
1287  std::ofstream status_file(copy_pages_status_file_path.string(),
1288  std::ios::out | std::ios::binary);
1289  status_file.close();
1290 
1291  std::vector<PageMapping> page_mappings;
1292  std::set<Page> touched_pages;
1293  std::set<size_t> page_sizes;
1294  for (const auto& [file_id, file_info] : files_) {
1295  page_sizes.emplace(file_info->pageSize);
1296  }
1297  for (auto page_size : page_sizes) {
1298  sortAndCopyFilePagesForCompaction(page_size, page_mappings, touched_pages);
1299  }
1300 
1301  writePageMappingsToStatusFile(page_mappings);
1303 
1304  updateMappedPagesVisibility(page_mappings);
1306 
1307  deleteEmptyFiles();
1308 }
1309 
1317  std::vector<PageMapping>& page_mappings,
1318  std::set<Page>& touched_pages) {
1319  std::vector<FileInfo*> sorted_file_infos;
1320  auto range = fileIndex_.equal_range(page_size);
1321  for (auto it = range.first; it != range.second; it++) {
1322  sorted_file_infos.emplace_back(files_.at(it->second).get());
1323  }
1324  if (sorted_file_infos.empty()) {
1325  return;
1326  }
1327 
1328  // Sort file infos in ascending order of free pages count i.e. from files with
1329  // the least number of free pages to those with the highest number of free pages.
1330  std::sort(sorted_file_infos.begin(),
1331  sorted_file_infos.end(),
1332  [](const FileInfo* file_1, const FileInfo* file_2) {
1333  return file_1->freePages.size() < file_2->freePages.size();
1334  });
1335 
1336  size_t destination_index = 0, source_index = sorted_file_infos.size() - 1;
1337 
1338  // For page copy destinations, skip files without free pages.
1339  while (destination_index < source_index &&
1340  sorted_file_infos[destination_index]->freePages.empty()) {
1341  destination_index++;
1342  }
1343 
1344  // For page copy sources, skip files with only free pages.
1345  while (destination_index < source_index &&
1346  sorted_file_infos[source_index]->freePages.size() ==
1347  sorted_file_infos[source_index]->numPages) {
1348  source_index--;
1349  }
1350 
1351  std::set<size_t> source_used_pages;
1352  CHECK(destination_index <= source_index);
1353 
1354  // Get the total number of free pages available for compaction
1355  int64_t total_free_pages{0};
1356  for (size_t i = destination_index; i <= source_index; i++) {
1357  total_free_pages += sorted_file_infos[i]->numFreePages();
1358  }
1359 
1360  while (destination_index < source_index) {
1361  if (source_used_pages.empty()) {
1362  // Populate source_used_pages with only used pages in the source file.
1363  auto source_file_info = sorted_file_infos[source_index];
1364  auto& free_pages = source_file_info->freePages;
1365  for (size_t page_num = 0; page_num < source_file_info->numPages; page_num++) {
1366  if (free_pages.find(page_num) == free_pages.end()) {
1367  source_used_pages.emplace(page_num);
1368  }
1369  }
1370 
1371  // Free pages of current source file will not be copy destinations
1372  total_free_pages -= source_file_info->numFreePages();
1373  }
1374 
1375  // Exit early if there are not enough free pages to empty the next file
1376  if (total_free_pages - static_cast<int64_t>(source_used_pages.size()) < 0) {
1377  return;
1378  }
1379 
1380  // Copy pages from source files to destination files
1381  auto dest_file_info = sorted_file_infos[destination_index];
1382  while (!source_used_pages.empty() && !dest_file_info->freePages.empty()) {
1383  // Get next page to copy
1384  size_t source_page_num = *source_used_pages.begin();
1385  source_used_pages.erase(source_page_num);
1386 
1387  Page source_page{sorted_file_infos[source_index]->fileId, source_page_num};
1388  copySourcePageForCompaction(source_page,
1389  sorted_file_infos[destination_index],
1390  page_mappings,
1391  touched_pages);
1392  total_free_pages--;
1393  }
1394 
1395  if (source_used_pages.empty()) {
1396  source_index--;
1397  }
1398 
1399  if (dest_file_info->freePages.empty()) {
1400  destination_index++;
1401  }
1402  }
1403 }
1404 
1412  FileInfo* destination_file_info,
1413  std::vector<PageMapping>& page_mappings,
1414  std::set<Page>& touched_pages) {
1415  size_t destination_page_num = destination_file_info->getFreePage();
1416  CHECK_NE(destination_page_num, static_cast<size_t>(-1));
1417  Page destination_page{destination_file_info->fileId, destination_page_num};
1418 
1419  // Assert that the same pages are not copied or overridden multiple times
1420  CHECK(touched_pages.find(source_page) == touched_pages.end());
1421  touched_pages.emplace(source_page);
1422 
1423  CHECK(touched_pages.find(destination_page) == touched_pages.end());
1424  touched_pages.emplace(destination_page);
1425 
1426  auto header_size = copyPageWithoutHeaderSize(source_page, destination_page);
1427  page_mappings.emplace_back(static_cast<size_t>(source_page.fileId),
1428  source_page.pageNum,
1429  header_size,
1430  static_cast<size_t>(destination_page.fileId),
1431  destination_page.pageNum);
1432 }
1433 
1441 int32_t FileMgr::copyPageWithoutHeaderSize(const Page& source_page,
1442  const Page& destination_page) {
1443  FileInfo* source_file_info = getFileInfoForFileId(source_page.fileId);
1444  CHECK(source_file_info);
1445  CHECK_EQ(source_file_info->fileId, source_page.fileId);
1446 
1447  FileInfo* destination_file_info = getFileInfoForFileId(destination_page.fileId);
1448  CHECK(destination_file_info);
1449  CHECK_EQ(destination_file_info->fileId, destination_page.fileId);
1450  CHECK_EQ(source_file_info->pageSize, destination_file_info->pageSize);
1451 
1452  auto page_size = source_file_info->pageSize;
1453  auto buffer = std::make_unique<int8_t[]>(page_size);
1454  size_t bytes_read =
1455  source_file_info->read(source_page.pageNum * page_size, page_size, buffer.get());
1456  CHECK_EQ(page_size, bytes_read);
1457 
1458  auto header_size_offset = sizeof(int32_t);
1459  size_t bytes_written = destination_file_info->write(
1460  (destination_page.pageNum * page_size) + header_size_offset,
1461  page_size - header_size_offset,
1462  buffer.get() + header_size_offset);
1463  CHECK_EQ(page_size - header_size_offset, bytes_written);
1464  return reinterpret_cast<int32_t*>(buffer.get())[0];
1465 }
1466 
1471 void FileMgr::updateMappedPagesVisibility(const std::vector<PageMapping>& page_mappings) {
1472  for (const auto& page_mapping : page_mappings) {
1473  auto destination_file = getFileInfoForFileId(page_mapping.destination_file_id);
1474 
1475  // Set destination page header size
1476  auto header_size = page_mapping.source_page_header_size;
1477  CHECK_GT(header_size, 0);
1478  destination_file->write(
1479  page_mapping.destination_page_num * destination_file->pageSize,
1480  sizeof(PageHeaderSizeType),
1481  reinterpret_cast<int8_t*>(&header_size));
1482  auto source_file = getFileInfoForFileId(page_mapping.source_file_id);
1483 
1484  // Free source page
1485  PageHeaderSizeType free_page_header_size{0};
1486  source_file->write(page_mapping.source_page_num * source_file->pageSize,
1487  sizeof(PageHeaderSizeType),
1488  reinterpret_cast<int8_t*>(&free_page_header_size));
1489  source_file->freePageDeferred(page_mapping.source_page_num);
1490  }
1491 
1492  for (const auto& file_info_entry : files_) {
1493  int32_t status = file_info_entry.second->syncToDisk();
1494  if (status != 0) {
1495  LOG(FATAL) << "Could not sync file to disk";
1496  }
1497  }
1498 }
1499 
1505  for (const auto& [file_id, file_info] : files_) {
1506  CHECK_EQ(file_id, file_info->fileId);
1507  if (file_info->freePages.size() == file_info->numPages) {
1508  fclose(file_info->f);
1509  file_info->f = nullptr;
1510  auto file_path = get_data_file_path(fileMgrBasePath_, file_id, file_info->pageSize);
1511  readOnlyCheck("delete file", file_path);
1512  boost::filesystem::remove(get_legacy_data_file_path(file_path));
1513  boost::filesystem::remove(file_path);
1514  }
1515  }
1516 
1517  auto status_file_path = getFilePath(DELETE_EMPTY_FILES_STATUS);
1518  CHECK(boost::filesystem::exists(status_file_path));
1519  readOnlyCheck("delete file", status_file_path.string());
1520  boost::filesystem::remove(status_file_path);
1521 }
1522 
1529  const std::vector<PageMapping>& page_mappings) {
1530  auto file_path = getFilePath(COPY_PAGES_STATUS);
1531 
1532  readOnlyCheck("write file", file_path.string());
1533 
1534  CHECK(boost::filesystem::exists(file_path));
1535  CHECK(boost::filesystem::is_empty(file_path));
1536  std::ofstream status_file{file_path.string(), std::ios::out | std::ios::binary};
1537  int64_t page_mappings_count = page_mappings.size();
1538  status_file.write(reinterpret_cast<const char*>(&page_mappings_count), sizeof(int64_t));
1539  status_file.write(reinterpret_cast<const char*>(page_mappings.data()),
1540  page_mappings_count * sizeof(PageMapping));
1541  status_file.close();
1542 }
1543 
1547 std::vector<PageMapping> FileMgr::readPageMappingsFromStatusFile() {
1548  auto file_path = getFilePath(UPDATE_PAGE_VISIBILITY_STATUS);
1549  CHECK(boost::filesystem::exists(file_path));
1550  std::ifstream status_file{file_path.string(),
1551  std::ios::in | std::ios::binary | std::ios::ate};
1552  CHECK(status_file.is_open());
1553  size_t file_size = status_file.tellg();
1554  status_file.seekg(0, std::ios::beg);
1555  CHECK_GE(file_size, sizeof(int64_t));
1556 
1557  int64_t page_mappings_count;
1558  status_file.read(reinterpret_cast<char*>(&page_mappings_count), sizeof(int64_t));
1559  auto page_mappings_byte_size = file_size - sizeof(int64_t);
1560  CHECK_EQ(page_mappings_byte_size % sizeof(PageMapping), static_cast<size_t>(0));
1561  CHECK_EQ(static_cast<size_t>(page_mappings_count),
1562  page_mappings_byte_size / sizeof(PageMapping));
1563 
1564  std::vector<PageMapping> page_mappings(page_mappings_count);
1565  status_file.read(reinterpret_cast<char*>(page_mappings.data()),
1566  page_mappings_byte_size);
1567  status_file.close();
1568  return page_mappings;
1569 }
1570 
1574 void FileMgr::renameCompactionStatusFile(const char* const from_status,
1575  const char* const to_status) {
1576  auto from_status_file_path = getFilePath(from_status);
1577  auto to_status_file_path = getFilePath(to_status);
1578 
1579  readOnlyCheck("write file", from_status_file_path.string());
1580 
1581  CHECK(boost::filesystem::exists(from_status_file_path));
1582  CHECK(!boost::filesystem::exists(to_status_file_path));
1583  boost::filesystem::rename(from_status_file_path, to_status_file_path);
1584 }
1585 
1586 // Methods that enable override of number of pages per data/metadata files
1587 // for use in unit tests.
1588 void FileMgr::setNumPagesPerDataFile(size_t num_pages) {
1589  num_pages_per_data_file_ = num_pages;
1590 }
1591 
1592 void FileMgr::setNumPagesPerMetadataFile(size_t num_pages) {
1593  num_pages_per_metadata_file_ = num_pages;
1594 }
1595 
1598  for (const auto& file_info_entry : files_) {
1599  int32_t status = file_info_entry.second->syncToDisk();
1600  CHECK(status == 0) << "Could not sync file to disk";
1601  }
1602 }
1603 
1604 void FileMgr::initializeNumThreads(size_t num_reader_threads) {
1605  // # of threads is based on # of cores on the host
1606  size_t num_hardware_based_threads = std::thread::hardware_concurrency();
1607  if (num_reader_threads == 0 || num_reader_threads > num_hardware_based_threads) {
1608  // # of threads has not been defined by user
1609  num_reader_threads_ = num_hardware_based_threads;
1610  } else {
1611  num_reader_threads_ = num_reader_threads;
1612  }
1613 }
1614 
1617  for (auto& free_page : free_pages_) {
1618  free_page.first->freePageDeferred(free_page.second);
1619  }
1620  free_pages_.clear();
1621 }
1622 
1623 FileBuffer* FileMgr::allocateBuffer(const size_t page_size,
1624  const ChunkKey& key,
1625  const size_t num_bytes) {
1626  return new FileBuffer(this, page_size, key, num_bytes);
1627 }
1628 
1630  const ChunkKey& key,
1631  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
1632  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
1633  return new FileBuffer(this, key, headerStartIt, headerEndIt);
1634 }
1635 
1636 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
1638  ChunkKey& chunk_key,
1639  int32_t contingent,
1640  int32_t page_epoch,
1641  int32_t page_num) {
1642  // If the parent FileMgr has a fileMgrKey, then all keys are locked to one table and
1643  // can be set from the manager.
1644  auto [db_id, tb_id] = get_fileMgrKey();
1645  chunk_key[CHUNK_KEY_DB_IDX] = db_id;
1646  chunk_key[CHUNK_KEY_TABLE_IDX] = tb_id;
1647 
1648  auto table_epoch = epoch(db_id, tb_id);
1649 
1650  if (is_page_deleted_with_checkpoint(table_epoch, page_epoch, contingent)) {
1651  if (!g_read_only && !g_multi_instance) {
1652  // Read-only mode can find pages like this if the server was previously run in
1653  // write-mode but is not allowed to free them.
1654  file_info->freePageImmediate(page_num);
1655  }
1656  return true;
1657  }
1658 
1659  // Recover page if it was deleted but not checkpointed.
1660  if (is_page_deleted_without_checkpoint(table_epoch, page_epoch, contingent)) {
1661  if (!g_read_only && !g_multi_instance) {
1662  // Read-only mode can find pages like this if the server was previously run in
1663  // write-mode but is not allowed to free them.
1664  file_info->recoverPage(chunk_key, page_num);
1665  }
1666  }
1667  return false;
1668 }
1669 
1671  FileBuffer* buf;
1673  auto chunk_it = chunkIndex_.find(key);
1674  if (chunk_it == chunkIndex_.end()) {
1675  buf = createBufferUnlocked(key);
1676  } else {
1677  buf = getBufferUnlocked(key);
1678  }
1679  return buf;
1680 }
1681 
1684  for (auto [key, buf] : chunkIndex_) {
1685  if (buf->isDirty()) {
1686  buf->writeMetadata(epoch());
1687  buf->clearDirtyBits();
1688  }
1689  }
1690 }
1691 
1694  return chunkIndex_.size();
1695 }
1696 
1697 boost::filesystem::path FileMgr::getFilePath(const std::string& file_name) const {
1698  return boost::filesystem::path(fileMgrBasePath_) / file_name;
1699 }
1700 
1701 FILE* FileMgr::createFile(const std::string& full_path,
1702  const size_t requested_file_size) const {
1703  readOnlyCheck("create file", full_path);
1704  return create(full_path, requested_file_size);
1705 }
1706 
1707 std::pair<FILE*, std::string> FileMgr::createFile(const std::string& base_path,
1708  const int file_id,
1709  const size_t page_size,
1710  const size_t num_pages) const {
1711  readOnlyCheck("create file", get_data_file_path(base_path, file_id, page_size));
1712  return create(base_path, file_id, page_size, num_pages);
1713 }
1714 
1715 size_t FileMgr::writeFile(FILE* f,
1716  const size_t offset,
1717  const size_t size,
1718  const int8_t* buf) const {
1719  readOnlyCheck("write file");
1720  return write(f, offset, size, buf);
1721 }
1722 
1723 void FileMgr::readOnlyCheck(const std::string& action,
1724  const std::optional<std::string>& file_name) const {
1725  CHECK(!g_read_only) << "Error trying to " << action
1726  << (file_name.has_value() ? (": '" + file_name.value() + "'") : "")
1727  << ". Not allowed in read only mode.";
1728 }
1729 
1730 size_t FileMgr::num_pages_per_data_file_{DEFAULT_NUM_PAGES_PER_DATA_FILE};
1731 size_t FileMgr::num_pages_per_metadata_file_{DEFAULT_NUM_PAGES_PER_METADATA_FILE};
1732 } // namespace File_Namespace
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:618
const size_t metadata_page_size_
Definition: FileMgr.h:552
std::vector< PageMapping > readPageMappingsFromStatusFile()
Definition: FileMgr.cpp:1547
virtual FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
Definition: FileMgr.cpp:737
int32_t readVersionFromDisk(const std::string &versionFileName) const
Definition: FileMgr.cpp:1073
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
OpenFilesResult openFiles()
Definition: FileMgr.cpp:200
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:868
size_t write(const size_t offset, const size_t size, const int8_t *buf)
Definition: FileInfo.cpp:64
TablePair fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:540
std::string getBasePath() const
bool g_multi_instance
Definition: heavyai_locks.h:22
int8_t * storage_ptr()
Definition: Epoch.h:61
FileMgr(const int32_t device_id, GlobalFileMgr *gfm, const TablePair file_mgr_key, const int32_t max_rollback_epochs=-1, const size_t num_reader_threads=0, const int32_t epoch=-1)
Constructor.
Definition: FileMgr.cpp:51
static constexpr int32_t LATEST_FILE_MGR_VERSION
Definition: FileMgr.h:404
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:606
bool is_page_deleted_without_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:271
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:877
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
Definition: FileMgr.cpp:1042
void syncEncoder(const AbstractBuffer *src_buffer)
std::string get_legacy_data_file_path(const std::string &new_data_file_path)
Definition: File.cpp:49
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:716
const size_t page_size_
Definition: FileMgr.h:551
void sortAndCopyFilePagesForCompaction(size_t page_size, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1316
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
std::vector< HeaderInfo > header_infos
Definition: FileMgr.h:128
virtual FileBuffer * allocateBuffer(const size_t page_size, const ChunkKey &key, const size_t num_bytes=0)
Definition: FileMgr.cpp:1623
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
FileInfo * createFileInfo(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:968
#define LOG(tag)
Definition: Logger.h:285
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void copySourcePageForCompaction(const Page &source_page, FileInfo *destination_file_info, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1411
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:873
virtual MemoryLevel getType() const =0
void freePageImmediate(int32_t page_num)
Definition: FileInfo.cpp:245
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:586
void setDataAndMetadataFileStats(StorageStats &storage_stats) const
Definition: FileMgr.cpp:343
void migrateToLatestFileMgrVersion()
Definition: FileMgr.cpp:1157
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
Definition: FileMgr.cpp:687
#define UNREACHABLE()
Definition: Logger.h:338
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
#define DATA_FILE_EXT
Definition: File.h:25
static constexpr char const * UPDATE_PAGE_VISIBILITY_STATUS
Definition: FileMgr.h:389
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:306
void init(const size_t num_reader_threads, const int32_t epochOverride)
Definition: FileMgr.cpp:252
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:753
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:334
GlobalFileMgr * gfm_
Definition: FileMgr.h:539
bool is_compaction_status_file(const std::string &file_name)
Definition: FileMgr.cpp:193
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:419
#define DEFAULT_METADATA_PAGE_SIZE
int32_t floor() const
Definition: Epoch.h:43
int32_t ceiling() const
Definition: Epoch.h:44
static constexpr int32_t db_version_
virtual bool updatePageIfDeleted(FileInfo *file_info, ChunkKey &chunk_key, int32_t contingent, int32_t page_epoch, int32_t page_num)
deletes or recovers a page based on last checkpointed epoch.
Definition: FileMgr.cpp:1637
std::optional< uint64_t > total_free_data_page_count
Definition: FileMgr.h:119
void writePageMappingsToStatusFile(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1528
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:55
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:57
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:706
int32_t copyPageWithoutHeaderSize(const Page &source_page, const Page &destination_page)
Definition: FileMgr.cpp:1441
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string fileMgrBasePath_
Definition: FileMgr.h:411
static void setNumPagesPerMetadataFile(size_t num_pages)
Definition: FileMgr.cpp:1592
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:301
std::string to_string(char const *&&v)
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:797
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:143
static size_t num_pages_per_data_file_
Definition: FileMgr.h:427
int32_t PageHeaderSizeType
Definition: FileMgr.h:134
std::shared_lock< T > shared_lock
std::set< size_t > freePages
Definition: FileInfo.h:62
future< Result > async(Fn &&fn, Args &&...args)
size_t pageSize
file stream object for the represented file
Definition: FileInfo.h:59
int32_t fileMgrVersion_
the index of the next file id
Definition: FileMgr.h:417
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
Definition: FileMgr.cpp:1087
size_t getNumChunks() override
Definition: FileMgr.cpp:1692
int32_t incrementEpoch()
Definition: FileMgr.h:285
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
Definition: FileMgr.cpp:1215
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:416
FileBuffer * getOrCreateBuffer(const ChunkKey &key)
Definition: FileMgr.cpp:1670
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define CHECK_NE(x, y)
Definition: Logger.h:302
static void setNumPagesPerDataFile(size_t num_pages)
Definition: FileMgr.cpp:1588
std::optional< uint64_t > total_free_metadata_page_count
Definition: FileMgr.h:115
int32_t fileId
Definition: Page.h:47
string version
Definition: setup.in.py:73
void freePagesBeforeEpoch(const int32_t min_epoch)
Definition: FileMgr.cpp:673
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:330
PageSizeFileMMap fileIndex_
Definition: FileMgr.h:414
std::unique_lock< T > unique_lock
virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const bool purge=true)
Definition: FileMgr.cpp:760
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
uint64_t total_metadata_page_count
Definition: FileMgr.h:114
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:1003
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
Definition: FileMgr.h:424
static constexpr char FILE_MGR_VERSION_FILENAME[]
Definition: FileMgr.h:402
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:415
static size_t num_pages_per_metadata_file_
Definition: FileMgr.h:428
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:748
uint32_t getFragmentCount() const
Definition: FileMgr.cpp:407
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:946
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf, const std::string &file_path)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:125
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:659
boost::filesystem::path getFilePath(const std::string &file_name) const
Definition: FileMgr.cpp:1697
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
#define DEFAULT_PAGE_SIZE
std::string compaction_status_file_name
Definition: FileMgr.h:130
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:770
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:1021
heavyai::shared_mutex chunkIndexMutex_
Definition: FileMgr.h:420
const TablePair get_fileMgrKey() const
Definition: FileMgr.h:341
#define CHECK_LT(x, y)
Definition: Logger.h:303
static constexpr char EPOCH_FILENAME[]
Definition: FileMgr.h:400
virtual std::string describeSelf() const
Definition: FileMgr.cpp:700
virtual FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:726
heavyai::shared_mutex mutex_free_page_
Definition: FileMgr.h:423
int32_t maxRollbackEpochs_
Definition: FileMgr.h:410
FILE * createFile(const std::string &full_path, const size_t requested_file_size) const
Definition: FileMgr.cpp:1701
void freePagesBeforeEpochUnlocked(const int32_t min_epoch, const ChunkKeyToChunkMap::iterator lower_bound, const ChunkKeyToChunkMap::iterator upper_bound)
Definition: FileMgr.cpp:678
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
uint64_t total_metadata_file_size
Definition: FileMgr.h:113
bool g_read_only
Definition: heavyai_locks.h:21
virtual void readOnlyCheck(const std::string &action, const std::optional< std::string > &file_name={}) const
Definition: FileMgr.cpp:1723
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:102
void openAndReadEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:639
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:70
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:259
StorageStats getStorageStats() const
Definition: FileMgr.cpp:334
bool hasChunkMetadataForKeyPrefix(const ChunkKey &keyPrefix)
Definition: FileMgr.cpp:1009
static size_t byte_size()
Definition: Epoch.h:63
static constexpr char const * DELETE_EMPTY_FILES_STATUS
Definition: FileMgr.h:390
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:118
FILE * open(int file_id)
Opens the file with the given id; fatal crash on error.
Definition: File.cpp:100
bool getDBConvert() const
Index for looking up chunks.
Definition: FileMgr.cpp:1052
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
Definition: FileMgr.cpp:1136
virtual void free_page(std::pair< FileInfo *, int32_t > &&page)
Definition: FileMgr.cpp:1210
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
virtual void closeRemovePhysical()
Definition: FileMgr.cpp:575
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
static constexpr char const * COPY_PAGES_STATUS
Definition: FileMgr.h:388
int fsync(int fd)
Definition: heavyai_fs.cpp:62
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator) const
Definition: FileMgr.cpp:157
int32_t epoch() const
Definition: FileMgr.h:530
void updateMappedPagesVisibility(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1471
#define CHECK(condition)
Definition: Logger.h:291
std::optional< uint32_t > fragment_count
Definition: FileMgr.h:120
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:785
void recoverPage(const ChunkKey &chunk_key, int32_t page_num)
Definition: FileInfo.cpp:252
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:114
virtual FileBuffer * getBufferUnlocked(const ChunkKey &key, const size_t numBytes=0) const
Definition: FileMgr.cpp:790
void setEpoch(const int32_t newEpoch)
Definition: FileMgr.cpp:1199
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
Definition: FileMgr.cpp:137
FileInfo * getFileInfoForFileId(const int32_t fileId) const
Definition: FileMgr.h:229
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1604
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:901
void createOrMigrateTopLevelMetadata()
Definition: FileMgr.cpp:1056
heavyai::shared_mutex files_rw_mutex_
Definition: FileMgr.h:421
size_t writeFile(FILE *f, const size_t offset, const size_t size, const int8_t *buf) const
Definition: FileMgr.cpp:1715
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:98
std::map< int32_t, std::unique_ptr< FileInfo > > files_
Definition: FileMgr.h:413
std::string get_data_file_path(const std::string &base_path, int file_id, size_t page_size)
Definition: File.cpp:42
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:210
unsigned nextFileId_
number of threads used when loading data
Definition: FileMgr.h:416
int32_t epochFloor() const
Definition: FileMgr.h:283
bool is_metadata_file(size_t file_size, size_t page_size, size_t metadata_page_size, size_t num_pages_per_metadata_file)
Definition: FileMgr.cpp:325
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
void renameCompactionStatusFile(const char *const from_status, const char *const to_status)
Definition: FileMgr.cpp:1574
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:79
constexpr auto kLegacyDataFileExtension
Definition: File.h:36
static constexpr char LEGACY_EPOCH_FILENAME[]
Definition: FileMgr.h:399
A selection of helper methods for File I/O.
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:816
#define VLOG(n)
Definition: Logger.h:388
Type timer_start()
Definition: measure.h:42
void resumeFileCompaction(const std::string &status_file_name)
Definition: FileMgr.cpp:1224
static constexpr char DB_META_FILENAME[]
Definition: FileMgr.h:401
static constexpr int32_t INVALID_VERSION
Definition: FileMgr.h:403