OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
FileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <fstream>
28 #include <future>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
37 
39 #include "Shared/File.h"
40 #include "Shared/checked_alloc.h"
41 #include "Shared/measure.h"
42 
43 using namespace std;
44 
45 namespace File_Namespace {
46 
47 FileMgr::FileMgr(const int32_t deviceId,
48  GlobalFileMgr* gfm,
49  const TablePair fileMgrKey,
50  const int32_t maxRollbackEpochs,
51  const size_t num_reader_threads,
52  const int32_t epoch,
53  const size_t defaultPageSize)
54  : AbstractBufferMgr(deviceId)
55  , maxRollbackEpochs_(maxRollbackEpochs)
56  , defaultPageSize_(defaultPageSize)
57  , nextFileId_(0)
58  , gfm_(gfm)
59  , fileMgrKey_(fileMgrKey) {
60  init(num_reader_threads, epoch);
61 }
62 
63 // used only to initialize enough to drop
64 FileMgr::FileMgr(const int32_t deviceId,
65  GlobalFileMgr* gfm,
66  const TablePair fileMgrKey,
67  const size_t defaultPageSize,
68  const bool runCoreInit)
69  : AbstractBufferMgr(deviceId)
70  , maxRollbackEpochs_(-1)
71  , defaultPageSize_(defaultPageSize)
72  , nextFileId_(0)
73  , gfm_(gfm)
74  , fileMgrKey_(fileMgrKey) {
75  const std::string fileMgrDirPrefix("table");
76  const std::string FileMgrDirDelim("_");
77  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
78  std::to_string(fileMgrKey_.first) + // db_id
79  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
80  epochFile_ = nullptr;
81  files_.clear();
82  if (runCoreInit) {
83  coreInit();
84  }
85 }
86 
87 FileMgr::FileMgr(GlobalFileMgr* gfm, const size_t defaultPageSize, std::string basePath)
88  : AbstractBufferMgr(0)
89  , maxRollbackEpochs_(-1)
90  , fileMgrBasePath_(basePath)
91  , defaultPageSize_(defaultPageSize)
92  , nextFileId_(0)
93  , gfm_(gfm)
94  , fileMgrKey_(0, 0) {
95  init(basePath, -1);
96 }
97 
98 // For testing purposes only
99 FileMgr::FileMgr(const int epoch) : AbstractBufferMgr(-1) {
100  epoch_.ceiling(epoch);
101 }
102 
103 // Used to initialize CachingFileMgr.
104 FileMgr::FileMgr() : AbstractBufferMgr(0) {}
105 
107  // free memory used by FileInfo objects
108  for (auto chunkIt = chunkIndex_.begin(); chunkIt != chunkIndex_.end(); ++chunkIt) {
109  delete chunkIt->second;
110  }
111  for (auto file_info_entry : files_) {
112  delete file_info_entry.second;
113  }
114 
115  if (epochFile_) {
116  close(epochFile_);
117  epochFile_ = nullptr;
118  }
119 
120  if (DBMetaFile_) {
122  DBMetaFile_ = nullptr;
123  }
124 }
125 
127  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
128  const std::string fileMgrDirPrefix("table");
129  const std::string FileMgrDirDelim("_");
130  fileMgrBasePath_ = (gfm_->getBasePath() + fileMgrDirPrefix + FileMgrDirDelim +
131  std::to_string(fileMgrKey_.first) + // db_id
132  FileMgrDirDelim + std::to_string(fileMgrKey_.second)); // tb_id
133  boost::filesystem::path path(fileMgrBasePath_);
134  if (boost::filesystem::exists(path)) {
135  if (!boost::filesystem::is_directory(path)) {
136  LOG(FATAL) << "Specified path '" << fileMgrBasePath_
137  << "' for table data is not a directory.";
138  }
141  return true;
142  }
143  return false;
144 }
145 
147  const boost::filesystem::directory_iterator& fileIterator) const {
148  FileMetadata fileMetadata;
149  fileMetadata.is_data_file = false;
150  fileMetadata.file_path = fileIterator->path().string();
151  if (!boost::filesystem::is_regular_file(fileIterator->status())) {
152  return fileMetadata;
153  }
154  // note that boost::filesystem leaves preceding dot on
155  // extension - hence DATA_FILE_EXT is ".data"
156  std::string extension(fileIterator->path().extension().string());
157  if (extension == DATA_FILE_EXT) {
158  std::string fileStem(fileIterator->path().stem().string());
159  // remove trailing dot if any
160  if (fileStem.size() > 0 && fileStem.back() == '.') {
161  fileStem = fileStem.substr(0, fileStem.size() - 1);
162  }
163  size_t dotPos = fileStem.find_last_of("."); // should only be one
164  if (dotPos == std::string::npos) {
165  LOG(FATAL) << "File `" << fileIterator->path()
166  << "` does not carry page size information in the filename.";
167  }
168  fileMetadata.is_data_file = true;
169  fileMetadata.file_id = boost::lexical_cast<int>(fileStem.substr(0, dotPos));
170  fileMetadata.page_size =
171  boost::lexical_cast<size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
172 
173  fileMetadata.file_size = boost::filesystem::file_size(fileMetadata.file_path);
174  CHECK_EQ(fileMetadata.file_size % fileMetadata.page_size,
175  size_t(0)); // should be no partial pages
176  fileMetadata.num_pages = fileMetadata.file_size / fileMetadata.page_size;
177  }
178  return fileMetadata;
179 }
180 
181 namespace {
182 bool is_compaction_status_file(const std::string& file_name) {
183  return (file_name == FileMgr::COPY_PAGES_STATUS ||
186 }
187 } // namespace
188 
190  auto clock_begin = timer_start();
191  boost::filesystem::directory_iterator
192  end_itr; // default construction yields past-the-end
194  result.max_file_id = -1;
195  int32_t file_count = 0;
196  int32_t thread_count = std::thread::hardware_concurrency();
197  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
198  boost::filesystem::path path(fileMgrBasePath_);
199  for (boost::filesystem::directory_iterator file_it(path); file_it != end_itr;
200  ++file_it) {
201  FileMetadata file_metadata = getMetadataForFile(file_it);
202  if (file_metadata.is_data_file) {
203  result.max_file_id = std::max(result.max_file_id, file_metadata.file_id);
204  file_futures.emplace_back(std::async(std::launch::async, [file_metadata, this] {
205  std::vector<HeaderInfo> temp_header_vec;
206  openExistingFile(file_metadata.file_path,
207  file_metadata.file_id,
208  file_metadata.page_size,
209  file_metadata.num_pages,
210  temp_header_vec);
211  return temp_header_vec;
212  }));
213  file_count++;
214  if (file_count % thread_count == 0) {
215  processFileFutures(file_futures, result.header_infos);
216  }
217  }
218 
219  if (is_compaction_status_file(file_it->path().filename().string())) {
220  CHECK(result.compaction_status_file_name.empty());
221  result.compaction_status_file_name = file_it->path().filename().string();
222  }
223  }
224 
225  if (file_futures.size() > 0) {
226  processFileFutures(file_futures, result.header_infos);
227  }
228 
229  int64_t queue_time_ms = timer_stop(clock_begin);
230  LOG(INFO) << "Completed Reading table's file metadata, Elapsed time : " << queue_time_ms
231  << "ms Epoch: " << epoch_.ceiling() << " files read: " << file_count
232  << " table location: '" << fileMgrBasePath_ << "'";
233  return result;
234 }
235 
237  for (auto file_info_entry : files_) {
238  auto file_info = file_info_entry.second;
239  if (file_info->f) {
240  close(file_info->f);
241  file_info->f = nullptr;
242  }
243  delete file_info;
244  }
245  files_.clear();
246  fileIndex_.clear();
247 }
248 
249 void FileMgr::init(const size_t num_reader_threads, const int32_t epochOverride) {
250  // if epochCeiling = -1 this means open from epoch file
251 
252  const bool dataExists = coreInit();
253  if (dataExists) {
254  if (epochOverride != -1) { // if opening at specified epoch
255  setEpoch(epochOverride);
256  }
257 
258  auto open_files_result = openFiles();
259  if (!open_files_result.compaction_status_file_name.empty()) {
260  resumeFileCompaction(open_files_result.compaction_status_file_name);
261  clearFileInfos();
262  open_files_result = openFiles();
263  CHECK(open_files_result.compaction_status_file_name.empty());
264  }
265 
266  /* Sort headerVec so that all HeaderInfos
267  * from a chunk will be grouped together
268  * and in order of increasing PageId
269  * - Version Epoch */
270  auto& header_vec = open_files_result.header_infos;
271  std::sort(header_vec.begin(), header_vec.end());
272 
273  /* Goal of next section is to find sequences in the
274  * sorted headerVec of the same ChunkId, which we
275  * can then initiate a FileBuffer with */
276 
277  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
278  if (header_vec.size() > 0) {
279  ChunkKey lastChunkKey = header_vec.begin()->chunkKey;
280  auto startIt = header_vec.begin();
281 
282  for (auto headerIt = header_vec.begin() + 1; headerIt != header_vec.end();
283  ++headerIt) {
284  if (headerIt->chunkKey != lastChunkKey) {
285  createBufferFromHeaders(lastChunkKey, startIt, headerIt);
286  lastChunkKey = headerIt->chunkKey;
287  startIt = headerIt;
288  }
289  }
290  // now need to insert last Chunk
291  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
292  }
293  nextFileId_ = open_files_result.max_file_id + 1;
294  rollOffOldData(epoch(), true /* only checkpoint if data is rolled off */);
295  incrementEpoch();
296  freePages();
297  } else {
298  boost::filesystem::path path(fileMgrBasePath_);
299  if (!boost::filesystem::create_directory(path)) {
300  LOG(FATAL) << "Could not create data directory: " << path;
301  }
303  if (epochOverride != -1) {
304  epoch_.floor(epochOverride);
305  epoch_.ceiling(epochOverride);
306  } else {
307  // These are default constructor values for epoch_, but resetting here for clarity
308  epoch_.floor(0);
309  epoch_.ceiling(0);
310  }
313  incrementEpoch();
314  }
315 
316  initializeNumThreads(num_reader_threads);
317  isFullyInitted_ = true;
318 }
319 
320 namespace {
322  size_t page_size,
323  size_t num_pages_per_metadata_file) {
324  return (file_size == (METADATA_PAGE_SIZE * num_pages_per_metadata_file) &&
325  page_size == METADATA_PAGE_SIZE);
326 }
327 } // namespace
328 
330  StorageStats storage_stats;
331  setDataAndMetadataFileStats(storage_stats);
332  if (isFullyInitted_) {
333  storage_stats.fragment_count = getFragmentCount();
334  }
335  return storage_stats;
336 }
337 
339  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
340  if (!isFullyInitted_) {
341  CHECK(!fileMgrBasePath_.empty());
342  boost::filesystem::path path(fileMgrBasePath_);
343  if (boost::filesystem::exists(path)) {
344  if (!boost::filesystem::is_directory(path)) {
345  LOG(FATAL) << "getStorageStats: Specified path '" << fileMgrBasePath_
346  << "' for table data is not a directory.";
347  }
348 
349  storage_stats.epoch = lastCheckpointedEpoch();
350  storage_stats.epoch_floor = epochFloor();
351  boost::filesystem::directory_iterator
352  endItr; // default construction yields past-the-end
353  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
354  ++fileIt) {
355  FileMetadata file_metadata = getMetadataForFile(fileIt);
356  if (file_metadata.is_data_file) {
357  if (is_metadata_file(file_metadata.file_size,
358  file_metadata.page_size,
360  storage_stats.metadata_file_count++;
361  storage_stats.total_metadata_file_size += file_metadata.file_size;
362  storage_stats.total_metadata_page_count += file_metadata.num_pages;
363  } else {
364  storage_stats.data_file_count++;
365  storage_stats.total_data_file_size += file_metadata.file_size;
366  storage_stats.total_data_page_count += file_metadata.num_pages;
367  }
368  }
369  }
370  }
371  } else {
372  storage_stats.epoch = lastCheckpointedEpoch();
373  storage_stats.epoch_floor = epochFloor();
374  storage_stats.total_free_metadata_page_count = 0;
375  storage_stats.total_free_data_page_count = 0;
376 
377  // We already initialized this table so take the faster path of walking through the
378  // FileInfo objects and getting metadata from there
379  for (const auto& file_info_entry : files_) {
380  const auto file_info = file_info_entry.second;
381  if (is_metadata_file(
382  file_info->size(), file_info->pageSize, num_pages_per_metadata_file_)) {
383  storage_stats.metadata_file_count++;
384  storage_stats.total_metadata_file_size +=
385  file_info->pageSize * file_info->numPages;
386  storage_stats.total_metadata_page_count += file_info->numPages;
387  storage_stats.total_free_metadata_page_count.value() +=
388  file_info->freePages.size();
389  } else {
390  storage_stats.data_file_count++;
391  storage_stats.total_data_file_size += file_info->pageSize * file_info->numPages;
392  storage_stats.total_data_page_count += file_info->numPages;
393  storage_stats.total_free_data_page_count.value() += file_info->freePages.size();
394  }
395  }
396  }
397 }
398 
399 uint32_t FileMgr::getFragmentCount() const {
400  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
401  std::set<int32_t> fragment_ids;
402  for (const auto& [chunk_key, file_buffer] : chunkIndex_) {
403  fragment_ids.emplace(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
404  }
405  return static_cast<uint32_t>(fragment_ids.size());
406 }
407 
409  std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
410  std::vector<HeaderInfo>& headerVec) {
411  for (auto& file_future : file_futures) {
412  file_future.wait();
413  }
414  // concatenate the vectors after thread completes
415  for (auto& file_future : file_futures) {
416  auto tempHeaderVec = file_future.get();
417  headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
418  }
419  file_futures.clear();
420 }
421 
422 void FileMgr::init(const std::string& dataPathToConvertFrom,
423  const int32_t epochOverride) {
424  int32_t converted_data_epoch = 0;
425  boost::filesystem::path path(dataPathToConvertFrom);
426  if (boost::filesystem::exists(path)) {
427  if (!boost::filesystem::is_directory(path)) {
428  LOG(FATAL) << "Specified path `" << path << "` is not a directory.";
429  }
431 
432  if (epochOverride != -1) { // if opening at previous epoch
433  setEpoch(epochOverride);
434  }
435 
436  boost::filesystem::directory_iterator
437  endItr; // default construction yields past-the-end
438  int32_t maxFileId = -1;
439  int32_t fileCount = 0;
440  int32_t threadCount = std::thread::hardware_concurrency();
441  std::vector<HeaderInfo> headerVec;
442  std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
443  for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
444  FileMetadata fileMetadata = getMetadataForFile(fileIt);
445  if (fileMetadata.is_data_file) {
446  maxFileId = std::max(maxFileId, fileMetadata.file_id);
447  file_futures.emplace_back(std::async(std::launch::async, [fileMetadata, this] {
448  std::vector<HeaderInfo> tempHeaderVec;
449  openExistingFile(fileMetadata.file_path,
450  fileMetadata.file_id,
451  fileMetadata.page_size,
452  fileMetadata.num_pages,
453  tempHeaderVec);
454  return tempHeaderVec;
455  }));
456  fileCount++;
457  if (fileCount % threadCount) {
458  processFileFutures(file_futures, headerVec);
459  }
460  }
461  }
462 
463  if (file_futures.size() > 0) {
464  processFileFutures(file_futures, headerVec);
465  }
466 
467  /* Sort headerVec so that all HeaderInfos
468  * from a chunk will be grouped together
469  * and in order of increasing PageId
470  * - Version Epoch */
471 
472  std::sort(headerVec.begin(), headerVec.end());
473 
474  /* Goal of next section is to find sequences in the
475  * sorted headerVec of the same ChunkId, which we
476  * can then initiate a FileBuffer with */
477 
478  if (headerVec.size() > 0) {
479  ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
480  auto startIt = headerVec.begin();
481 
482  for (auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
483  ++headerIt) {
484  if (headerIt->chunkKey != lastChunkKey) {
485  FileMgr* c_fm_ =
486  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
487  CHECK(c_fm_);
488  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerIt);
489  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
490  destBuf->syncEncoder(srcBuf);
491  destBuf->setSize(srcBuf->size());
492  destBuf->setDirty(); // this needs to be set to force writing out metadata
493  // files from "checkpoint()" call
494 
495  size_t totalNumPages = srcBuf->getMultiPage().size();
496  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
497  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
498  Page destPage = c_fm_->requestFreePage(
499  srcBuf->pageSize(),
500  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
501  MultiPage multiPage(srcBuf->pageSize());
502  multiPage.push(destPage, converted_data_epoch);
503  destBuf->multiPages_.push_back(multiPage);
504  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
505  copyPage(
506  srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
507  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
508  }
509  lastChunkKey = headerIt->chunkKey;
510  startIt = headerIt;
511  }
512  }
513 
514  // now need to insert last Chunk
515  FileMgr* c_fm_ =
516  dynamic_cast<File_Namespace::FileMgr*>(gfm_->getFileMgr(lastChunkKey));
517  auto srcBuf = createBufferFromHeaders(lastChunkKey, startIt, headerVec.end());
518  auto destBuf = c_fm_->createBuffer(lastChunkKey, srcBuf->pageSize());
519  destBuf->syncEncoder(srcBuf);
520  destBuf->setSize(srcBuf->size());
521  destBuf->setDirty(); // this needs to be set to write out metadata file from the
522  // "checkpoint()" call
523 
524  size_t totalNumPages = srcBuf->getMultiPage().size();
525  for (size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
526  Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
527  Page destPage = c_fm_->requestFreePage(
528  srcBuf->pageSize(),
529  false); // may modify and use api "FileBuffer::addNewMultiPage" instead
530  MultiPage multiPage(srcBuf->pageSize());
531  multiPage.push(destPage, converted_data_epoch);
532  destBuf->multiPages_.push_back(multiPage);
533  size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
534  copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
535  destBuf->writeHeader(destPage, pageNum, converted_data_epoch, false);
536  }
537  }
538  nextFileId_ = maxFileId + 1;
539  } else {
540  if (!boost::filesystem::create_directory(path)) {
541  LOG(FATAL) << "Specified path does not exist: " << path;
542  }
543  }
544  isFullyInitted_ = true;
545 }
546 
548  for (auto& [idx, file_info] : files_) {
549  if (file_info->f) {
550  close(file_info->f);
551  file_info->f = nullptr;
552  }
553  }
554 
555  if (DBMetaFile_) {
557  DBMetaFile_ = nullptr;
558  }
559 
560  if (epochFile_) {
561  close(epochFile_);
562  epochFile_ = nullptr;
563  }
564 }
565 
567  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
569  /* rename for later deletion the directory containing table related data */
571 }
572 
573 void FileMgr::copyPage(Page& srcPage,
574  FileMgr* destFileMgr,
575  Page& destPage,
576  const size_t reservedHeaderSize,
577  const size_t numBytes,
578  const size_t offset) {
579  CHECK(offset + numBytes <= defaultPageSize_);
580  FileInfo* srcFileInfo = getFileInfoForFileId(srcPage.fileId);
581  FileInfo* destFileInfo = destFileMgr->getFileInfoForFileId(destPage.fileId);
582  int8_t* buffer = reinterpret_cast<int8_t*>(checked_malloc(numBytes));
583 
584  size_t bytesRead = srcFileInfo->read(
585  srcPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize, numBytes, buffer);
586  CHECK(bytesRead == numBytes);
587  size_t bytesWritten = destFileInfo->write(
588  destPage.pageNum * defaultPageSize_ + offset + reservedHeaderSize,
589  numBytes,
590  buffer);
591  CHECK(bytesWritten == numBytes);
592  ::free(buffer);
593 }
594 
595 void FileMgr::createEpochFile(const std::string& epochFileName) {
596  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
597  if (boost::filesystem::exists(epochFilePath)) {
598  LOG(FATAL) << "Epoch file `" << epochFilePath << "` already exists";
599  }
600  epochFile_ = create(epochFilePath, sizeof(Epoch::byte_size()));
601  // Write out current epoch to file - which if this
602  // function is being called should be 0
604 }
605 
606 int32_t FileMgr::openAndReadLegacyEpochFile(const std::string& epochFileName) {
607  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
608  if (!boost::filesystem::exists(epochFilePath)) {
609  return 0;
610  }
611 
612  if (!boost::filesystem::is_regular_file(epochFilePath)) {
613  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
614  }
615  if (boost::filesystem::file_size(epochFilePath) < 4) {
616  LOG(FATAL) << "Epoch file `" << epochFilePath
617  << "` is not sized properly (current size: "
618  << boost::filesystem::file_size(epochFilePath) << ", expected size: 4)";
619  }
620  FILE* legacyEpochFile = open(epochFilePath);
621  int32_t epoch;
622  read(legacyEpochFile, 0, sizeof(int32_t), (int8_t*)&epoch);
623  close(legacyEpochFile);
624  return epoch;
625 }
626 
627 void FileMgr::openAndReadEpochFile(const std::string& epochFileName) {
628  if (!epochFile_) { // Check to see if already open
629  std::string epochFilePath(fileMgrBasePath_ + "/" + epochFileName);
630  if (!boost::filesystem::exists(epochFilePath)) {
631  LOG(FATAL) << "Epoch file `" << epochFilePath << "` does not exist";
632  }
633  if (!boost::filesystem::is_regular_file(epochFilePath)) {
634  LOG(FATAL) << "Epoch file `" << epochFilePath << "` is not a regular file";
635  }
636  if (boost::filesystem::file_size(epochFilePath) != Epoch::byte_size()) {
637  LOG(FATAL) << "Epoch file `" << epochFilePath
638  << "` is not sized properly (current size: "
639  << boost::filesystem::file_size(epochFilePath)
640  << ", expected size: " << Epoch::byte_size() << ")";
641  }
642  epochFile_ = open(epochFilePath);
643  }
645 }
646 
648  CHECK(epochFile_);
650  int32_t status = fflush(epochFile_);
651  CHECK(status == 0) << "Could not flush epoch file to disk";
652 #ifdef __APPLE__
653  status = fcntl(fileno(epochFile_), 51);
654 #else
655  status = heavyai::fsync(fileno(epochFile_));
656 #endif
657  CHECK(status == 0) << "Could not sync epoch file to disk";
658  epochIsCheckpointed_ = true;
659 }
660 
661 void FileMgr::freePagesBeforeEpoch(const int32_t min_epoch) {
662  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
663  freePagesBeforeEpochUnlocked(min_epoch, chunkIndex_.begin(), chunkIndex_.end());
664 }
665 
667  const int32_t min_epoch,
668  const ChunkKeyToChunkMap::iterator lower_bound,
669  const ChunkKeyToChunkMap::iterator upper_bound) {
670  for (auto chunkIt = lower_bound; chunkIt != upper_bound; ++chunkIt) {
671  chunkIt->second->freePagesBeforeEpoch(min_epoch);
672  }
673 }
674 
675 void FileMgr::rollOffOldData(const int32_t epoch_ceiling, const bool should_checkpoint) {
676  if (maxRollbackEpochs_ >= 0) {
677  auto min_epoch = std::max(epoch_ceiling - maxRollbackEpochs_, epoch_.floor());
678  if (min_epoch > epoch_.floor()) {
679  freePagesBeforeEpoch(min_epoch);
680  epoch_.floor(min_epoch);
681  if (should_checkpoint) {
682  checkpoint();
683  }
684  }
685  }
686 }
687 
688 std::string FileMgr::describeSelf() const {
689  stringstream ss;
690  ss << "table (" << fileMgrKey_.first << ", " << fileMgrKey_.second << ")";
691  return ss.str();
692 }
693 
695  VLOG(2) << "Checkpointing " << describeSelf() << " epoch: " << epoch();
697  rollOffOldData(epoch(), false /* shouldCheckpoint */);
698  syncFilesToDisk();
700  incrementEpoch();
701  freePages();
702 }
703 
705  const size_t page_size,
706  const size_t num_bytes) {
707  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
708  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
709  << "Chunk already exists for key: " << show_chunk(key);
710  return createBufferUnlocked(key, page_size, num_bytes);
711 }
712 
713 // Assumes checks for pre-existing key have already occured.
715  const size_t page_size,
716  const size_t num_bytes) {
717  size_t actual_page_size = page_size;
718  if (actual_page_size == 0) {
719  actual_page_size = defaultPageSize_;
720  }
721  chunkIndex_[key] = allocateBuffer(actual_page_size, key, num_bytes);
722  return (chunkIndex_[key]);
723 }
724 
726  const ChunkKey& key,
727  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
728  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
729  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
730  CHECK(chunkIndex_.find(key) == chunkIndex_.end())
731  << "Chunk already exists for key: " << show_chunk(key);
732  chunkIndex_[key] = allocateBuffer(key, headerStartIt, headerEndIt);
733  return (chunkIndex_[key]);
734 }
735 
737  mapd_shared_lock<mapd_shared_mutex> chunkIndexReadLock(chunkIndexMutex_);
738  return chunkIndex_.find(key) != chunkIndex_.end();
739 }
740 
741 void FileMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
742  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
743  auto chunk_it = chunkIndex_.find(key);
744  CHECK(chunk_it != chunkIndex_.end())
745  << "Chunk does not exist for key: " << show_chunk(key);
746  deleteBufferUnlocked(chunk_it, purge);
747 }
748 
749 ChunkKeyToChunkMap::iterator FileMgr::deleteBufferUnlocked(
750  const ChunkKeyToChunkMap::iterator chunk_it,
751  const bool purge) {
752  if (purge) {
753  chunk_it->second->freePages();
754  }
755  delete chunk_it->second;
756  return chunkIndex_.erase(chunk_it);
757 }
758 
759 void FileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
760  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
761  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
762  if (chunkIt == chunkIndex_.end()) {
763  return; // should we throw?
764  }
765  while (chunkIt != chunkIndex_.end() &&
766  std::search(chunkIt->first.begin(),
767  chunkIt->first.begin() + keyPrefix.size(),
768  keyPrefix.begin(),
769  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
770  deleteBufferUnlocked(chunkIt++, purge);
771  }
772 }
773 
774 FileBuffer* FileMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
775  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
776  return getBufferUnlocked(key, num_bytes);
777 }
778 
780  const size_t num_bytes) const {
781  auto chunk_it = chunkIndex_.find(key);
782  CHECK(chunk_it != chunkIndex_.end()) << "Chunk does not exist: " << show_chunk(key);
783  return chunk_it->second;
784 }
785 
787  AbstractBuffer* destBuffer,
788  const size_t numBytes) {
789  // reads chunk specified by ChunkKey into AbstractBuffer provided by
790  // destBuffer
791  CHECK(!destBuffer->isDirty())
792  << "Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
793  << show_chunk(key);
794  AbstractBuffer* chunk = getBuffer(key);
795  // chunk's size is either specified in function call with numBytes or we
796  // just look at pageSize * numPages in FileBuffer
797  if (numBytes > 0 && numBytes > chunk->size()) {
798  LOG(FATAL) << "Chunk retrieved for key `" << show_chunk(key) << "` is smaller ("
799  << chunk->size() << ") than number of bytes requested (" << numBytes
800  << ")";
801  }
802  chunk->copyTo(destBuffer, numBytes);
803 }
804 
806  AbstractBuffer* srcBuffer,
807  const size_t numBytes) {
808  auto chunk = getOrCreateBuffer(key);
809  size_t oldChunkSize = chunk->size();
810  // write the buffer's data to the Chunk
811  size_t newChunkSize = (numBytes == 0) ? srcBuffer->size() : numBytes;
812  if (chunk->isDirty()) {
813  // multiple appends are allowed,
814  // but only single update is allowed
815  if (srcBuffer->isUpdated() && chunk->isUpdated()) {
816  LOG(FATAL) << "Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
817  "for key: "
818  << show_chunk(key);
819  }
820  }
821  CHECK(srcBuffer->isDirty()) << "putBuffer expects a dirty buffer";
822  if (srcBuffer->isUpdated()) {
823  // chunk size is not changed when fixed rows are updated or are marked as deleted.
824  // but when rows are vacuumed or varlen rows are updated (new rows are appended),
825  // chunk size will change. For vacuum, checkpoint should sync size from cpu to disk.
826  // For varlen update, it takes another route via fragmenter using disk-level buffer.
827  if (0 == numBytes && !chunk->isDirty()) {
828  chunk->setSize(newChunkSize);
829  }
830  //@todo use dirty flags to only flush pages of chunk that need to
831  // be flushed
832  chunk->write((int8_t*)srcBuffer->getMemoryPtr(),
833  newChunkSize,
834  0,
835  srcBuffer->getType(),
836  srcBuffer->getDeviceId());
837  } else if (srcBuffer->isAppended()) {
838  CHECK_LT(oldChunkSize, newChunkSize);
839  chunk->append((int8_t*)srcBuffer->getMemoryPtr() + oldChunkSize,
840  newChunkSize - oldChunkSize,
841  srcBuffer->getType(),
842  srcBuffer->getDeviceId());
843  } else {
844  // If dirty buffer comes in unmarked, it must be empty.
845  // Encoder sync is still required to flush the metadata.
846  CHECK(numBytes == 0)
847  << "Dirty buffer with size > 0 must be marked as isAppended() or isUpdated()";
848  }
849  // chunk->clearDirtyBits(); // Hack: because write and append will set dirty bits
850  //@todo commenting out line above will make sure this metadata is set
851  // but will trigger error on fetch chunk
852  srcBuffer->clearDirtyBits();
853  chunk->syncEncoder(srcBuffer);
854  return chunk;
855 }
856 
857 AbstractBuffer* FileMgr::alloc(const size_t numBytes = 0) {
858  LOG(FATAL) << "Operation not supported";
859  return nullptr; // satisfy return-type warning
860 }
861 
863  LOG(FATAL) << "Operation not supported";
864 }
865 
866 Page FileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
867  std::lock_guard<std::mutex> lock(getPageMutex_);
868 
869  auto candidateFiles = fileIndex_.equal_range(pageSize);
870  int32_t pageNum = -1;
871  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
872  FileInfo* fileInfo = files_.at(fileIt->second);
873  pageNum = fileInfo->getFreePage();
874  if (pageNum != -1) {
875  return (Page(fileInfo->fileId, pageNum));
876  }
877  }
878  // if here then we need to add a file
879  FileInfo* fileInfo;
880  if (isMetadata) {
881  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
882  } else {
883  fileInfo = createFile(pageSize, num_pages_per_data_file_);
884  }
885  pageNum = fileInfo->getFreePage();
886  CHECK(pageNum != -1);
887  return (Page(fileInfo->fileId, pageNum));
888 }
889 
890 void FileMgr::requestFreePages(size_t numPagesRequested,
891  size_t pageSize,
892  std::vector<Page>& pages,
893  const bool isMetadata) {
894  // not used currently
895  // @todo add method to FileInfo to get more than one page
896  std::lock_guard<std::mutex> lock(getPageMutex_);
897  auto candidateFiles = fileIndex_.equal_range(pageSize);
898  size_t numPagesNeeded = numPagesRequested;
899  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
900  FileInfo* fileInfo = files_.at(fileIt->second);
901  int32_t pageNum;
902  do {
903  pageNum = fileInfo->getFreePage();
904  if (pageNum != -1) {
905  pages.emplace_back(fileInfo->fileId, pageNum);
906  numPagesNeeded--;
907  }
908  } while (pageNum != -1 && numPagesNeeded > 0);
909  if (numPagesNeeded == 0) {
910  break;
911  }
912  }
913  while (numPagesNeeded > 0) {
914  FileInfo* fileInfo;
915  if (isMetadata) {
916  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
917  } else {
918  fileInfo = createFile(pageSize, num_pages_per_data_file_);
919  }
920  int32_t pageNum;
921  do {
922  pageNum = fileInfo->getFreePage();
923  if (pageNum != -1) {
924  pages.emplace_back(fileInfo->fileId, pageNum);
925  numPagesNeeded--;
926  }
927  } while (pageNum != -1 && numPagesNeeded > 0);
928  if (numPagesNeeded == 0) {
929  break;
930  }
931  }
932  CHECK(pages.size() == numPagesRequested);
933 }
934 
935 FileInfo* FileMgr::openExistingFile(const std::string& path,
936  const int fileId,
937  const size_t pageSize,
938  const size_t numPages,
939  std::vector<HeaderInfo>& headerVec) {
940  FILE* f = open(path);
941  FileInfo* fInfo = new FileInfo(
942  this, fileId, f, pageSize, numPages, false); // false means don't init file
943 
944  fInfo->openExistingFile(headerVec);
945  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
946  files_[fileId] = fInfo;
947  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
948  return fInfo;
949 }
950 
951 FileInfo* FileMgr::createFile(const size_t pageSize, const size_t numPages) {
952  // check arguments
953  if (pageSize == 0 || numPages == 0) {
954  LOG(FATAL) << "File creation failed: pageSize and numPages must be greater than 0.";
955  }
956 
957  // create the new file
958  FILE* f = create(fileMgrBasePath_,
959  nextFileId_,
960  pageSize,
961  numPages); // TM: not sure if I like naming scheme here - should be in
962  // separate namespace?
963  CHECK(f);
964 
965  // instantiate a new FileInfo for the newly created file
966  int32_t fileId = nextFileId_++;
967  FileInfo* fInfo =
968  new FileInfo(this, fileId, f, pageSize, numPages, true); // true means init file
969  CHECK(fInfo);
970 
971  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
972  // update file manager data structures
973  files_[fileId] = fInfo;
974  fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
975 
976  return fInfo;
977 }
978 
979 FILE* FileMgr::getFileForFileId(const int32_t fileId) {
980  CHECK(fileId >= 0);
981  CHECK(files_.find(fileId) != files_.end());
982  return files_.at(fileId)->f;
983 }
984 
986  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
987  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
988  return (chunkIt != chunkIndex_.end());
989 }
990 
992  const ChunkKey& keyPrefix) {
993  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
994  auto chunkIt = chunkIndex_.lower_bound(keyPrefix);
995  if (chunkIt == chunkIndex_.end()) {
996  return; // throw?
997  }
998  while (chunkIt != chunkIndex_.end() &&
999  std::search(chunkIt->first.begin(),
1000  chunkIt->first.begin() + keyPrefix.size(),
1001  keyPrefix.begin(),
1002  keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
1003  if (chunkIt->second->hasEncoder()) {
1004  auto chunk_metadata = std::make_shared<ChunkMetadata>();
1005  chunkIt->second->encoder_->getMetadata(chunk_metadata);
1006  chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
1007  }
1008  chunkIt++;
1009  }
1010 }
1011 
1013  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
1014  const auto& chunkIt = chunkIndex_.find(chunkKey);
1015  if (chunkIt != chunkIndex_.end()) {
1016  return chunkIt->second->numMetadataPages();
1017  } else {
1018  throw std::runtime_error("Chunk was not found.");
1019  }
1020 }
1021 
1022 int32_t FileMgr::getDBVersion() const {
1023  return gfm_->getDBVersion();
1024 }
1025 
1027  return gfm_->getDBConvert();
1028 }
1029 
1032 
1033  if (db_version_ > getDBVersion()) {
1034  LOG(FATAL) << "DB forward compatibility is not supported. Version of HeavyDB "
1035  "software used is older than the version of DB being read: "
1036  << db_version_;
1037  }
1039  // new system, or we are moving forward versions
1040  // system wide migration would go here if required
1042  return;
1043  }
1044 }
1045 
1046 int32_t FileMgr::readVersionFromDisk(const std::string& versionFileName) const {
1047  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1048  if (!boost::filesystem::exists(versionFilePath)) {
1049  return -1;
1050  }
1051  if (!boost::filesystem::is_regular_file(versionFilePath)) {
1052  return -1;
1053  }
1054  if (boost::filesystem::file_size(versionFilePath) < 4) {
1055  return -1;
1056  }
1057  FILE* versionFile = open(versionFilePath);
1058  int32_t version;
1059  read(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1060  close(versionFile);
1061  return version;
1062 }
1063 
1064 void FileMgr::writeAndSyncVersionToDisk(const std::string& versionFileName,
1065  const int32_t version) {
1066  const std::string versionFilePath(fileMgrBasePath_ + "/" + versionFileName);
1067  FILE* versionFile;
1068  if (boost::filesystem::exists(versionFilePath)) {
1069  int32_t oldVersion = readVersionFromDisk(versionFileName);
1070  LOG(INFO) << "Storage version file `" << versionFilePath
1071  << "` already exists, its current version is " << oldVersion;
1072  versionFile = open(versionFilePath);
1073  } else {
1074  versionFile = create(versionFilePath, sizeof(int32_t));
1075  }
1076  write(versionFile, 0, sizeof(int32_t), (int8_t*)&version);
1077  int32_t status = fflush(versionFile);
1078  if (status != 0) {
1079  LOG(FATAL) << "Could not flush version file " << versionFilePath << " to disk";
1080  }
1081 #ifdef __APPLE__
1082  status = fcntl(fileno(epochFile_), 51);
1083 #else
1084  status = heavyai::fsync(fileno(versionFile));
1085 #endif
1086  if (status != 0) {
1087  LOG(FATAL) << "Could not sync version file " << versionFilePath << " to disk";
1088  }
1089  close(versionFile);
1090 }
1091 
1093  const std::string versionFilePath(fileMgrBasePath_ + "/" + FILE_MGR_VERSION_FILENAME);
1094  LOG(INFO) << "Migrating file format version from 0 to 1 for `" << versionFilePath;
1099  int32_t migrationCompleteVersion = 1;
1100  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migrationCompleteVersion);
1101 }
1102 
1104  LOG(INFO) << "Migrating file format version from 1 to 2";
1106  constexpr int32_t migration_complete_version{2};
1107  writeAndSyncVersionToDisk(FILE_MGR_VERSION_FILENAME, migration_complete_version);
1108 }
1109 
1110 void FileMgr::renameAndSymlinkLegacyFiles(const std::string& table_data_dir) {
1111  std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
1112  for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
1113  it++) {
1114  const auto old_path = boost::filesystem::canonical(it->path());
1115  if (boost::filesystem::is_regular_file(it->status()) &&
1116  old_path.extension().string() == kLegacyDataFileExtension) {
1117  auto new_path = old_path;
1118  new_path.replace_extension(DATA_FILE_EXT);
1119  old_to_new_paths[old_path] = new_path;
1120  }
1121  }
1122  for (const auto& [old_path, new_path] : old_to_new_paths) {
1123  boost::filesystem::rename(old_path, new_path);
1124  LOG(INFO) << "Rebrand migration: Renamed " << old_path << " to " << new_path;
1125  boost::filesystem::create_symlink(new_path.filename(), old_path);
1126  LOG(INFO) << "Rebrand migration: Added symlink from " << old_path << " to "
1127  << new_path.filename();
1128  }
1129 }
1130 
1134  fileMgrVersion_ = 0;
1136  } else if (fileMgrVersion_ > latestFileMgrVersion_) {
1137  LOG(FATAL)
1138  << "Table storage forward compatibility is not supported. Version of HeavyDB "
1139  "software used is older than the version of table being read: "
1140  << fileMgrVersion_;
1141  }
1142 
1145  switch (fileMgrVersion_) {
1146  case 0: {
1148  break;
1149  }
1150  case 1: {
1152  break;
1153  }
1154  default: {
1155  UNREACHABLE();
1156  }
1157  }
1158  fileMgrVersion_++;
1159  }
1160  }
1161 }
1162 
1163 /*
1164  * @brief sets the epoch to a user-specified value
1165  *
1166  * With the introduction of optional capped history on files, the possibility of
1167  * multiple successive rollbacks to earlier epochs means that we cannot rely on
1168  * the maxRollbackEpochs_ variable alone (initialized from a value stored in Catalog) to
1169  * guarantee that we can set an epoch at any given value. This function checks the
1170  * user-specified epoch value to ensure it is both not higher than the last checkpointed
1171  * epoch AND it is >= the epoch floor, which on an uncapped value will be the epoch the
1172  * table table was created at (default 0), and for capped tables, the lowest epoch for
1173  * which we have complete data, now materialized in the epoch metadata itself. */
1174 
1175 void FileMgr::setEpoch(const int32_t newEpoch) {
1176  if (newEpoch < epoch_.floor()) {
1177  std::stringstream error_message;
1178  error_message << "Cannot set epoch for " << describeSelf()
1179  << " lower than the minimum rollback epoch (" << epoch_.floor() << ").";
1180  throw std::runtime_error(error_message.str());
1181  }
1182  epoch_.ceiling(newEpoch);
1184 }
1185 
1186 void FileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
1187  std::unique_lock<mapd_shared_mutex> lock(mutex_free_page_);
1188  free_pages_.push_back(page);
1189 }
1190 
1191 void FileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t table_id) {
1192  UNREACHABLE();
1193 }
1194 
1200 void FileMgr::resumeFileCompaction(const std::string& status_file_name) {
1201  if (status_file_name == COPY_PAGES_STATUS) {
1202  // Delete status file and restart data compaction process
1203  auto file_path = getFilePath(status_file_name);
1204  CHECK(boost::filesystem::exists(file_path));
1205  boost::filesystem::remove(file_path);
1206  compactFiles();
1207  } else if (status_file_name == UPDATE_PAGE_VISIBILITY_STATUS) {
1208  // Execute second and third phases of data compaction
1209  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1210  auto page_mappings = readPageMappingsFromStatusFile();
1211  updateMappedPagesVisibility(page_mappings);
1213  deleteEmptyFiles();
1214  } else if (status_file_name == DELETE_EMPTY_FILES_STATUS) {
1215  // Execute last phase of data compaction
1216  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1217  deleteEmptyFiles();
1218  } else {
1219  UNREACHABLE() << "Unexpected status file name: " << status_file_name;
1220  }
1221 }
1222 
1251  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
1252  if (files_.empty()) {
1253  return;
1254  }
1255 
1256  auto copy_pages_status_file_path = getFilePath(COPY_PAGES_STATUS);
1257  CHECK(!boost::filesystem::exists(copy_pages_status_file_path));
1258  std::ofstream status_file(copy_pages_status_file_path.string(),
1259  std::ios::out | std::ios::binary);
1260  status_file.close();
1261 
1262  std::vector<PageMapping> page_mappings;
1263  std::set<Page> touched_pages;
1264  std::set<size_t> page_sizes;
1265  for (auto [file_id, file_info] : files_) {
1266  page_sizes.emplace(file_info->pageSize);
1267  }
1268  for (auto page_size : page_sizes) {
1269  sortAndCopyFilePagesForCompaction(page_size, page_mappings, touched_pages);
1270  }
1271 
1272  writePageMappingsToStatusFile(page_mappings);
1274 
1275  updateMappedPagesVisibility(page_mappings);
1277 
1278  deleteEmptyFiles();
1279 }
1280 
1288  std::vector<PageMapping>& page_mappings,
1289  std::set<Page>& touched_pages) {
1290  std::vector<FileInfo*> sorted_file_infos;
1291  auto range = fileIndex_.equal_range(page_size);
1292  for (auto it = range.first; it != range.second; it++) {
1293  sorted_file_infos.emplace_back(files_.at(it->second));
1294  }
1295  if (sorted_file_infos.empty()) {
1296  return;
1297  }
1298 
1299  // Sort file infos in ascending order of free pages count i.e. from files with
1300  // the least number of free pages to those with the highest number of free pages.
1301  std::sort(sorted_file_infos.begin(),
1302  sorted_file_infos.end(),
1303  [](const FileInfo* file_1, const FileInfo* file_2) {
1304  return file_1->freePages.size() < file_2->freePages.size();
1305  });
1306 
1307  size_t destination_index = 0, source_index = sorted_file_infos.size() - 1;
1308 
1309  // For page copy destinations, skip files without free pages.
1310  while (destination_index < source_index &&
1311  sorted_file_infos[destination_index]->freePages.empty()) {
1312  destination_index++;
1313  }
1314 
1315  // For page copy sources, skip files with only free pages.
1316  while (destination_index < source_index &&
1317  sorted_file_infos[source_index]->freePages.size() ==
1318  sorted_file_infos[source_index]->numPages) {
1319  source_index--;
1320  }
1321 
1322  std::set<size_t> source_used_pages;
1323  CHECK(destination_index <= source_index);
1324 
1325  // Get the total number of free pages available for compaction
1326  int64_t total_free_pages{0};
1327  for (size_t i = destination_index; i <= source_index; i++) {
1328  total_free_pages += sorted_file_infos[i]->numFreePages();
1329  }
1330 
1331  while (destination_index < source_index) {
1332  if (source_used_pages.empty()) {
1333  // Populate source_used_pages with only used pages in the source file.
1334  auto source_file_info = sorted_file_infos[source_index];
1335  auto& free_pages = source_file_info->freePages;
1336  for (size_t page_num = 0; page_num < source_file_info->numPages; page_num++) {
1337  if (free_pages.find(page_num) == free_pages.end()) {
1338  source_used_pages.emplace(page_num);
1339  }
1340  }
1341 
1342  // Free pages of current source file will not be copy destinations
1343  total_free_pages -= source_file_info->numFreePages();
1344  }
1345 
1346  // Exit early if there are not enough free pages to empty the next file
1347  if (total_free_pages - static_cast<int64_t>(source_used_pages.size()) < 0) {
1348  return;
1349  }
1350 
1351  // Copy pages from source files to destination files
1352  auto dest_file_info = sorted_file_infos[destination_index];
1353  while (!source_used_pages.empty() && !dest_file_info->freePages.empty()) {
1354  // Get next page to copy
1355  size_t source_page_num = *source_used_pages.begin();
1356  source_used_pages.erase(source_page_num);
1357 
1358  Page source_page{sorted_file_infos[source_index]->fileId, source_page_num};
1359  copySourcePageForCompaction(source_page,
1360  sorted_file_infos[destination_index],
1361  page_mappings,
1362  touched_pages);
1363  total_free_pages--;
1364  }
1365 
1366  if (source_used_pages.empty()) {
1367  source_index--;
1368  }
1369 
1370  if (dest_file_info->freePages.empty()) {
1371  destination_index++;
1372  }
1373  }
1374 }
1375 
1383  FileInfo* destination_file_info,
1384  std::vector<PageMapping>& page_mappings,
1385  std::set<Page>& touched_pages) {
1386  size_t destination_page_num = destination_file_info->getFreePage();
1387  CHECK_NE(destination_page_num, static_cast<size_t>(-1));
1388  Page destination_page{destination_file_info->fileId, destination_page_num};
1389 
1390  // Assert that the same pages are not copied or overridden multiple times
1391  CHECK(touched_pages.find(source_page) == touched_pages.end());
1392  touched_pages.emplace(source_page);
1393 
1394  CHECK(touched_pages.find(destination_page) == touched_pages.end());
1395  touched_pages.emplace(destination_page);
1396 
1397  auto header_size = copyPageWithoutHeaderSize(source_page, destination_page);
1398  page_mappings.emplace_back(static_cast<size_t>(source_page.fileId),
1399  source_page.pageNum,
1400  header_size,
1401  static_cast<size_t>(destination_page.fileId),
1402  destination_page.pageNum);
1403 }
1404 
1412 int32_t FileMgr::copyPageWithoutHeaderSize(const Page& source_page,
1413  const Page& destination_page) {
1414  FileInfo* source_file_info = files_.at(source_page.fileId);
1415  CHECK(source_file_info);
1416  CHECK_EQ(source_file_info->fileId, source_page.fileId);
1417 
1418  FileInfo* destination_file_info = files_.at(destination_page.fileId);
1419  CHECK(destination_file_info);
1420  CHECK_EQ(destination_file_info->fileId, destination_page.fileId);
1421  CHECK_EQ(source_file_info->pageSize, destination_file_info->pageSize);
1422 
1423  auto page_size = source_file_info->pageSize;
1424  auto buffer = std::make_unique<int8_t[]>(page_size);
1425  size_t bytes_read =
1426  source_file_info->read(source_page.pageNum * page_size, page_size, buffer.get());
1427  CHECK_EQ(page_size, bytes_read);
1428 
1429  auto header_size_offset = sizeof(int32_t);
1430  size_t bytes_written = destination_file_info->write(
1431  (destination_page.pageNum * page_size) + header_size_offset,
1432  page_size - header_size_offset,
1433  buffer.get() + header_size_offset);
1434  CHECK_EQ(page_size - header_size_offset, bytes_written);
1435  return reinterpret_cast<int32_t*>(buffer.get())[0];
1436 }
1437 
1442 void FileMgr::updateMappedPagesVisibility(const std::vector<PageMapping>& page_mappings) {
1443  for (const auto& page_mapping : page_mappings) {
1444  auto destination_file = files_.at(page_mapping.destination_file_id);
1445 
1446  // Set destination page header size
1447  auto header_size = page_mapping.source_page_header_size;
1448  CHECK_GT(header_size, 0);
1449  destination_file->write(
1450  page_mapping.destination_page_num * destination_file->pageSize,
1451  sizeof(PageHeaderSizeType),
1452  reinterpret_cast<int8_t*>(&header_size));
1453  auto source_file = files_.at(page_mapping.source_file_id);
1454 
1455  // Free source page
1456  PageHeaderSizeType free_page_header_size{0};
1457  source_file->write(page_mapping.source_page_num * source_file->pageSize,
1458  sizeof(PageHeaderSizeType),
1459  reinterpret_cast<int8_t*>(&free_page_header_size));
1460  source_file->freePageDeferred(page_mapping.source_page_num);
1461  }
1462 
1463  for (auto file_info_entry : files_) {
1464  int32_t status = file_info_entry.second->syncToDisk();
1465  if (status != 0) {
1466  LOG(FATAL) << "Could not sync file to disk";
1467  }
1468  }
1469 }
1470 
1476  for (auto [file_id, file_info] : files_) {
1477  CHECK_EQ(file_id, file_info->fileId);
1478  if (file_info->freePages.size() == file_info->numPages) {
1479  fclose(file_info->f);
1480  file_info->f = nullptr;
1481  auto file_path = get_data_file_path(fileMgrBasePath_, file_id, file_info->pageSize);
1482  boost::filesystem::remove(get_legacy_data_file_path(file_path));
1483  boost::filesystem::remove(file_path);
1484  }
1485  }
1486 
1487  auto status_file_path = getFilePath(DELETE_EMPTY_FILES_STATUS);
1488  CHECK(boost::filesystem::exists(status_file_path));
1489  boost::filesystem::remove(status_file_path);
1490 }
1491 
1498  const std::vector<PageMapping>& page_mappings) {
1499  auto file_path = getFilePath(COPY_PAGES_STATUS);
1500  CHECK(boost::filesystem::exists(file_path));
1501  CHECK(boost::filesystem::is_empty(file_path));
1502  std::ofstream status_file{file_path.string(), std::ios::out | std::ios::binary};
1503  int64_t page_mappings_count = page_mappings.size();
1504  status_file.write(reinterpret_cast<const char*>(&page_mappings_count), sizeof(int64_t));
1505  status_file.write(reinterpret_cast<const char*>(page_mappings.data()),
1506  page_mappings_count * sizeof(PageMapping));
1507  status_file.close();
1508 }
1509 
1513 std::vector<PageMapping> FileMgr::readPageMappingsFromStatusFile() {
1514  auto file_path = getFilePath(UPDATE_PAGE_VISIBILITY_STATUS);
1515  CHECK(boost::filesystem::exists(file_path));
1516  std::ifstream status_file{file_path.string(),
1517  std::ios::in | std::ios::binary | std::ios::ate};
1518  CHECK(status_file.is_open());
1519  size_t file_size = status_file.tellg();
1520  status_file.seekg(0, std::ios::beg);
1521  CHECK_GE(file_size, sizeof(int64_t));
1522 
1523  int64_t page_mappings_count;
1524  status_file.read(reinterpret_cast<char*>(&page_mappings_count), sizeof(int64_t));
1525  auto page_mappings_byte_size = file_size - sizeof(int64_t);
1526  CHECK_EQ(page_mappings_byte_size % sizeof(PageMapping), static_cast<size_t>(0));
1527  CHECK_EQ(static_cast<size_t>(page_mappings_count),
1528  page_mappings_byte_size / sizeof(PageMapping));
1529 
1530  std::vector<PageMapping> page_mappings(page_mappings_count);
1531  status_file.read(reinterpret_cast<char*>(page_mappings.data()),
1532  page_mappings_byte_size);
1533  status_file.close();
1534  return page_mappings;
1535 }
1536 
1540 void FileMgr::renameCompactionStatusFile(const char* const from_status,
1541  const char* const to_status) {
1542  auto from_status_file_path = getFilePath(from_status);
1543  auto to_status_file_path = getFilePath(to_status);
1544  CHECK(boost::filesystem::exists(from_status_file_path));
1545  CHECK(!boost::filesystem::exists(to_status_file_path));
1546  boost::filesystem::rename(from_status_file_path, to_status_file_path);
1547 }
1548 
1549 // Methods that enable override of number of pages per data/metadata files
1550 // for use in unit tests.
1551 void FileMgr::setNumPagesPerDataFile(size_t num_pages) {
1552  num_pages_per_data_file_ = num_pages;
1553 }
1554 
1555 void FileMgr::setNumPagesPerMetadataFile(size_t num_pages) {
1556  num_pages_per_metadata_file_ = num_pages;
1557 }
1558 
1560  mapd_shared_lock<mapd_shared_mutex> files_read_lock(files_rw_mutex_);
1561  for (auto file_info_entry : files_) {
1562  int32_t status = file_info_entry.second->syncToDisk();
1563  CHECK(status == 0) << "Could not sync file to disk";
1564  }
1565 }
1566 
1567 void FileMgr::initializeNumThreads(size_t num_reader_threads) {
1568  // # of threads is based on # of cores on the host
1569  size_t num_hardware_based_threads = std::thread::hardware_concurrency();
1570  if (num_reader_threads == 0 || num_reader_threads > num_hardware_based_threads) {
1571  // # of threads has not been defined by user
1572  num_reader_threads_ = num_hardware_based_threads;
1573  } else {
1574  num_reader_threads_ = num_reader_threads;
1575  }
1576 }
1577 
1579  mapd_unique_lock<mapd_shared_mutex> free_pages_write_lock(mutex_free_page_);
1580  for (auto& free_page : free_pages_) {
1581  free_page.first->freePageDeferred(free_page.second);
1582  }
1583  free_pages_.clear();
1584 }
1585 
1586 FileBuffer* FileMgr::allocateBuffer(const size_t page_size,
1587  const ChunkKey& key,
1588  const size_t num_bytes) {
1589  return new FileBuffer(this, page_size, key, num_bytes);
1590 }
1591 
1593  const ChunkKey& key,
1594  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
1595  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
1596  return new FileBuffer(this, key, headerStartIt, headerEndIt);
1597 }
1598 
1599 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
1601  ChunkKey& chunk_key,
1602  int32_t contingent,
1603  int32_t page_epoch,
1604  int32_t page_num) {
1605  // If the parent FileMgr has a fileMgrKey, then all keys are locked to one table and
1606  // can be set from the manager.
1607  auto [db_id, tb_id] = get_fileMgrKey();
1608  chunk_key[CHUNK_KEY_DB_IDX] = db_id;
1609  chunk_key[CHUNK_KEY_TABLE_IDX] = tb_id;
1610 
1611  auto table_epoch = epoch(db_id, tb_id);
1612 
1613  if (is_page_deleted_with_checkpoint(table_epoch, page_epoch, contingent)) {
1614  file_info->freePageImmediate(page_num);
1615  return true;
1616  }
1617 
1618  // Recover page if it was deleted but not checkpointed.
1619  if (is_page_deleted_without_checkpoint(table_epoch, page_epoch, contingent)) {
1620  file_info->recoverPage(chunk_key, page_num);
1621  }
1622  return false;
1623 }
1624 
1626  FileBuffer* buf;
1627  mapd_unique_lock<mapd_shared_mutex> chunkIndexWriteLock(chunkIndexMutex_);
1628  auto chunk_it = chunkIndex_.find(key);
1629  if (chunk_it == chunkIndex_.end()) {
1630  buf = createBufferUnlocked(key);
1631  } else {
1632  buf = getBufferUnlocked(key);
1633  }
1634  return buf;
1635 }
1636 
1638  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
1639  for (auto [key, buf] : chunkIndex_) {
1640  if (buf->isDirty()) {
1641  buf->writeMetadata(epoch());
1642  buf->clearDirtyBits();
1643  }
1644  }
1645 }
1646 
1648  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
1649  return chunkIndex_.size();
1650 }
1651 
1652 boost::filesystem::path FileMgr::getFilePath(const std::string& file_name) const {
1653  return boost::filesystem::path(fileMgrBasePath_) / file_name;
1654 }
1655 
1656 size_t FileMgr::num_pages_per_data_file_{DEFAULT_NUM_PAGES_PER_DATA_FILE};
1657 size_t FileMgr::num_pages_per_metadata_file_{DEFAULT_NUM_PAGES_PER_METADATA_FILE};
1658 } // namespace File_Namespace
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:606
std::vector< PageMapping > readPageMappingsFromStatusFile()
Definition: FileMgr.cpp:1513
virtual FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
Definition: FileMgr.cpp:725
int32_t readVersionFromDisk(const std::string &versionFileName) const
Definition: FileMgr.cpp:1046
#define CHECK_EQ(x, y)
Definition: Logger.h:231
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
OpenFilesResult openFiles()
Definition: FileMgr.cpp:189
AbstractBuffer * alloc(const size_t numBytes) override
Definition: FileMgr.cpp:857
size_t write(const size_t offset, const size_t size, const int8_t *buf)
Definition: FileInfo.cpp:60
TablePair fileMgrKey_
Global FileMgr.
Definition: FileMgr.h:525
std::string getBasePath() const
int8_t * storage_ptr()
Definition: Epoch.h:61
mapd_shared_mutex mutex_free_page_
Definition: FileMgr.h:414
void createEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:595
bool is_page_deleted_without_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:279
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
Definition: FileMgr.cpp:866
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
Definition: FileMgr.cpp:1012
void syncEncoder(const AbstractBuffer *src_buffer)
std::string get_legacy_data_file_path(const std::string &new_data_file_path)
Definition: File.cpp:51
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: FileMgr.cpp:704
void sortAndCopyFilePagesForCompaction(size_t page_size, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1287
std::vector< HeaderInfo > header_infos
Definition: FileMgr.h:122
virtual FileBuffer * allocateBuffer(const size_t page_size, const ChunkKey &key, const size_t num_bytes=0)
Definition: FileMgr.cpp:1586
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:217
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
void copySourcePageForCompaction(const Page &source_page, FileInfo *destination_file_info, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
Definition: FileMgr.cpp:1382
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
Definition: FileMgr.cpp:862
virtual MemoryLevel getType() const =0
void freePageImmediate(int32_t page_num)
Definition: FileInfo.cpp:242
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
Definition: FileMgr.cpp:573
void setDataAndMetadataFileStats(StorageStats &storage_stats) const
Definition: FileMgr.cpp:338
void migrateToLatestFileMgrVersion()
Definition: FileMgr.cpp:1131
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
Definition: FileMgr.cpp:675
#define UNREACHABLE()
Definition: Logger.h:267
#define DATA_FILE_EXT
Definition: File.h:25
static constexpr char const * UPDATE_PAGE_VISIBILITY_STATUS
Definition: FileMgr.h:376
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:236
void init(const size_t num_reader_threads, const int32_t epochOverride)
Definition: FileMgr.cpp:249
TypeR::rep timer_stop(Type clock_begin)
Definition: measure.h:48
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: FileMgr.cpp:741
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:334
GlobalFileMgr * gfm_
Definition: FileMgr.h:524
bool is_compaction_status_file(const std::string &file_name)
Definition: FileMgr.cpp:182
std::mutex getPageMutex_
pointer to DB level metadata
Definition: FileMgr.h:410
int32_t floor() const
Definition: Epoch.h:43
int32_t ceiling() const
Definition: Epoch.h:44
virtual bool updatePageIfDeleted(FileInfo *file_info, ChunkKey &chunk_key, int32_t contingent, int32_t page_epoch, int32_t page_num)
deletes or recovers a page based on last checkpointed epoch.
Definition: FileMgr.cpp:1600
std::optional< uint64_t > total_free_data_page_count
Definition: FileMgr.h:113
void writePageMappingsToStatusFile(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1497
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:58
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:694
int32_t copyPageWithoutHeaderSize(const Page &source_page, const Page &destination_page)
Definition: FileMgr.cpp:1412
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::string fileMgrBasePath_
Definition: FileMgr.h:397
static void setNumPagesPerMetadataFile(size_t num_pages)
Definition: FileMgr.cpp:1555
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:300
constexpr double f
Definition: Utm.h:31
std::string to_string(char const *&&v)
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:57
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: FileMgr.cpp:786
std::string show_chunk(const ChunkKey &key)
Definition: types.h:99
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:150
static size_t num_pages_per_data_file_
Definition: FileMgr.h:418
const int32_t latestFileMgrVersion_
Definition: FileMgr.h:408
int32_t PageHeaderSizeType
Definition: FileMgr.h:128
int32_t db_version_
the index of the next file id
Definition: FileMgr.h:405
std::set< size_t > freePages
Definition: FileInfo.h:62
future< Result > async(Fn &&fn, Args &&...args)
size_t pageSize
file stream object for the represented file
Definition: FileInfo.h:59
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
Definition: FileMgr.cpp:1064
size_t getNumChunks() override
Definition: FileMgr.cpp:1647
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:142
int32_t incrementEpoch()
Definition: FileMgr.h:284
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
Definition: FileMgr.cpp:1191
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:408
FileBuffer * getOrCreateBuffer(const ChunkKey &key)
Definition: FileMgr.cpp:1625
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
#define CHECK_NE(x, y)
Definition: Logger.h:232
static void setNumPagesPerDataFile(size_t num_pages)
Definition: FileMgr.cpp:1551
std::optional< uint64_t > total_free_metadata_page_count
Definition: FileMgr.h:109
int32_t fileId
Definition: Page.h:47
int32_t getDBVersion() const
Index for looking up chunks.
Definition: FileMgr.cpp:1022
string version
Definition: setup.in.py:73
void freePagesBeforeEpoch(const int32_t min_epoch)
Definition: FileMgr.cpp:661
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:329
PageSizeFileMMap fileIndex_
A map of files accessible via a file identifier.
Definition: FileMgr.h:401
virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const bool purge=true)
Definition: FileMgr.cpp:749
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
uint64_t total_metadata_page_count
Definition: FileMgr.h:108
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
Definition: FileMgr.cpp:979
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
Definition: FileMgr.h:415
static constexpr char FILE_MGR_VERSION_FILENAME[]
Definition: FileMgr.h:389
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
Definition: FileMgr.h:402
static size_t num_pages_per_metadata_file_
Definition: FileMgr.h:419
bool isBufferOnDevice(const ChunkKey &key) override
Definition: FileMgr.cpp:736
uint32_t getFragmentCount() const
Definition: FileMgr.cpp:399
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
Definition: FileMgr.cpp:935
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:647
boost::filesystem::path getFilePath(const std::string &file_name) const
Definition: FileMgr.cpp:1652
size_t pageNum
unique identifier of the owning file
Definition: Page.h:48
std::string compaction_status_file_name
Definition: FileMgr.h:124
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:759
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: FileMgr.cpp:991
const TablePair get_fileMgrKey() const
Definition: FileMgr.h:341
#define CHECK_LT(x, y)
Definition: Logger.h:233
static constexpr char EPOCH_FILENAME[]
Definition: FileMgr.h:387
virtual std::string describeSelf() const
Definition: FileMgr.cpp:688
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:403
virtual FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
Definition: FileMgr.cpp:714
int32_t maxRollbackEpochs_
Definition: FileMgr.h:396
void freePagesBeforeEpochUnlocked(const int32_t min_epoch, const ChunkKeyToChunkMap::iterator lower_bound, const ChunkKeyToChunkMap::iterator upper_bound)
Definition: FileMgr.cpp:666
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
uint64_t total_metadata_file_size
Definition: FileMgr.h:107
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
Definition: Page.h:102
void openAndReadEpochFile(const std::string &epochFileName)
Definition: FileMgr.cpp:627
size_t read(const size_t offset, const size_t size, int8_t *buf)
Definition: FileInfo.cpp:66
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
Definition: FileInfo.cpp:267
StorageStats getStorageStats() const
Definition: FileMgr.cpp:329
bool is_metadata_file(size_t file_size, size_t page_size, size_t num_pages_per_metadata_file)
Definition: FileMgr.cpp:321
bool hasChunkMetadataForKeyPrefix(const ChunkKey &keyPrefix)
Definition: FileMgr.cpp:985
static size_t byte_size()
Definition: Epoch.h:63
static constexpr char const * DELETE_EMPTY_FILES_STATUS
Definition: FileMgr.h:377
~FileMgr() override
Destructor.
Definition: FileMgr.cpp:106
bool getDBConvert() const
Definition: FileMgr.cpp:1026
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
Definition: FileMgr.cpp:1110
virtual void free_page(std::pair< FileInfo *, int32_t > &&page)
Definition: FileMgr.cpp:1186
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
virtual void closeRemovePhysical()
Definition: FileMgr.cpp:566
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:107
static constexpr char const * COPY_PAGES_STATUS
Definition: FileMgr.h:375
int fsync(int fd)
Definition: omnisci_fs.cpp:60
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator) const
Definition: FileMgr.cpp:146
int32_t epoch() const
Definition: FileMgr.h:518
void openExistingFile(std::vector< HeaderInfo > &headerVec)
Definition: FileInfo.cpp:71
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
Definition: FileMgr.cpp:951
std::map< int32_t, FileInfo * > files_
Definition: FileMgr.h:400
void updateMappedPagesVisibility(const std::vector< PageMapping > &page_mappings)
Definition: FileMgr.cpp:1442
#define CHECK(condition)
Definition: Logger.h:223
std::optional< uint32_t > fragment_count
Definition: FileMgr.h:114
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: FileMgr.cpp:774
void recoverPage(const ChunkKey &chunk_key, int32_t page_num)
Definition: FileInfo.cpp:255
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
Definition: File.cpp:128
virtual FileBuffer * getBufferUnlocked(const ChunkKey &key, const size_t numBytes=0) const
Definition: FileMgr.cpp:779
void setEpoch(const int32_t newEpoch)
Definition: FileMgr.cpp:1175
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
Definition: FileMgr.cpp:126
FileInfo * getFileInfoForFileId(const int32_t fileId) const
Definition: FileMgr.h:225
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1567
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
Definition: FileMgr.cpp:890
mapd_unique_lock< mapd_shared_mutex > write_lock
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:92
std::string get_data_file_path(const std::string &base_path, int file_id, size_t page_size)
Definition: File.cpp:44
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_&lt;EPOCH&gt;_&lt;oldname&gt;.
Definition: File.cpp:227
int32_t epochFloor() const
Definition: FileMgr.h:282
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:411
void renameCompactionStatusFile(const char *const from_status, const char *const to_status)
Definition: FileMgr.cpp:1540
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:412
The MultiPage stores versions of the same logical page in a deque.
Definition: Page.h:79
constexpr auto kLegacyDataFileExtension
Definition: File.h:36
static constexpr char LEGACY_EPOCH_FILENAME[]
Definition: FileMgr.h:386
A selection of helper methods for File I/O.
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
Definition: FileMgr.cpp:805
#define VLOG(n)
Definition: Logger.h:317
Type timer_start()
Definition: measure.h:42
void resumeFileCompaction(const std::string &status_file_name)
Definition: FileMgr.cpp:1200
static constexpr char DB_META_FILENAME[]
Definition: FileMgr.h:388
static constexpr int32_t INVALID_VERSION
Definition: FileMgr.h:390