34 #include <boost/filesystem.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/system/error_code.hpp>
45 namespace File_Namespace {
47 FileMgr::FileMgr(
const int32_t device_id,
50 const int32_t max_rollback_epochs,
51 const size_t num_reader_threads,
53 : AbstractBufferMgr(device_id)
54 , maxRollbackEpochs_(max_rollback_epochs)
57 , fileMgrKey_(file_mgr_key)
58 , page_size_(gfm->getPageSize())
59 , metadata_page_size_(gfm->getMetadataPageSize()) {
60 init(num_reader_threads, epoch);
67 const bool run_core_init)
68 : AbstractBufferMgr(device_id)
69 , maxRollbackEpochs_(-1)
72 , fileMgrKey_(file_mgr_key)
73 , page_size_(gfm->getPageSize())
74 , metadata_page_size_(gfm->getMetadataPageSize()) {
75 const std::string fileMgrDirPrefix(
"table");
76 const std::string fileMgrDirDelim(
"_");
88 : AbstractBufferMgr(0)
89 , maxRollbackEpochs_(-1)
90 , fileMgrBasePath_(base_path)
94 , page_size_(gfm->getPageSize())
95 , metadata_page_size_(gfm->getMetadataPageSize()) {
101 : AbstractBufferMgr(-1)
109 : AbstractBufferMgr(0)
110 , page_size_(page_size)
111 , metadata_page_size_(metadata_page_size) {}
116 delete chunkIt->second;
118 for (
auto file_info_entry :
files_) {
119 delete file_info_entry.second;
135 const std::string fileMgrDirPrefix(
"table");
136 const std::string FileMgrDirDelim(
"_");
141 if (boost::filesystem::exists(path)) {
142 if (!boost::filesystem::is_directory(path)) {
144 <<
"' for table data is not a directory.";
154 const boost::filesystem::directory_iterator& fileIterator)
const {
157 fileMetadata.
file_path = fileIterator->path().string();
158 if (!boost::filesystem::is_regular_file(fileIterator->status())) {
163 std::string extension(fileIterator->path().extension().string());
165 std::string fileStem(fileIterator->path().stem().string());
167 if (fileStem.size() > 0 && fileStem.back() ==
'.') {
168 fileStem = fileStem.substr(0, fileStem.size() - 1);
170 size_t dotPos = fileStem.find_last_of(
".");
171 if (dotPos == std::string::npos) {
172 LOG(
FATAL) <<
"File `" << fileIterator->path()
173 <<
"` does not carry page size information in the filename.";
176 fileMetadata.
file_id = boost::lexical_cast<
int>(fileStem.substr(0, dotPos));
178 boost::lexical_cast<
size_t>(fileStem.substr(dotPos + 1, fileStem.size()));
198 boost::filesystem::directory_iterator
202 int32_t file_count = 0;
203 int32_t thread_count = std::thread::hardware_concurrency();
204 std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
206 for (boost::filesystem::directory_iterator file_it(path); file_it != end_itr;
212 std::vector<HeaderInfo> temp_header_vec;
218 return temp_header_vec;
221 if (file_count % thread_count == 0) {
232 if (file_futures.size() > 0) {
236 int64_t queue_time_ms =
timer_stop(clock_begin);
237 LOG(
INFO) <<
"Completed Reading table's file metadata, Elapsed time : " << queue_time_ms
238 <<
"ms Epoch: " <<
epoch_.
ceiling() <<
" files read: " << file_count
244 for (
auto file_info_entry :
files_) {
245 auto file_info = file_info_entry.second;
248 file_info->f =
nullptr;
256 void FileMgr::init(
const size_t num_reader_threads,
const int32_t epochOverride) {
261 if (epochOverride != -1) {
266 if (!open_files_result.compaction_status_file_name.empty()) {
270 CHECK(open_files_result.compaction_status_file_name.empty());
277 auto& header_vec = open_files_result.header_infos;
278 std::sort(header_vec.begin(), header_vec.end());
284 VLOG(3) <<
"Number of Headers in Vector: " << header_vec.size();
285 if (header_vec.size() > 0) {
286 ChunkKey lastChunkKey = header_vec.begin()->chunkKey;
287 auto startIt = header_vec.begin();
289 for (
auto headerIt = header_vec.begin() + 1; headerIt != header_vec.end();
291 if (headerIt->chunkKey != lastChunkKey) {
293 lastChunkKey = headerIt->chunkKey;
306 if (!boost::filesystem::create_directory(path)) {
307 LOG(
FATAL) <<
"Could not create data directory: " << path;
310 if (epochOverride != -1) {
330 size_t metadata_page_size,
331 size_t num_pages_per_metadata_file) {
332 return (file_size == (metadata_page_size * num_pages_per_metadata_file) &&
333 page_size == metadata_page_size);
343 return storage_stats;
351 if (boost::filesystem::exists(path)) {
352 if (!boost::filesystem::is_directory(path)) {
354 <<
"' for table data is not a directory.";
359 boost::filesystem::directory_iterator
361 for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr;
388 for (
const auto& file_info_entry :
files_) {
389 const auto file_info = file_info_entry.second;
396 file_info->pageSize * file_info->numPages;
399 file_info->freePages.size();
412 std::set<int32_t> fragment_ids;
413 for (
const auto& [chunk_key, file_buffer] :
chunkIndex_) {
416 return static_cast<uint32_t
>(fragment_ids.size());
420 std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
421 std::vector<HeaderInfo>& headerVec) {
422 for (
auto& file_future : file_futures) {
426 for (
auto& file_future : file_futures) {
427 auto tempHeaderVec = file_future.get();
428 headerVec.insert(headerVec.end(), tempHeaderVec.begin(), tempHeaderVec.end());
430 file_futures.clear();
434 const int32_t epochOverride) {
435 int32_t converted_data_epoch = 0;
436 boost::filesystem::path path(dataPathToConvertFrom);
437 if (boost::filesystem::exists(path)) {
438 if (!boost::filesystem::is_directory(path)) {
439 LOG(
FATAL) <<
"Specified path `" << path <<
"` is not a directory.";
443 if (epochOverride != -1) {
447 boost::filesystem::directory_iterator
449 int32_t maxFileId = -1;
450 int32_t fileCount = 0;
451 int32_t threadCount = std::thread::hardware_concurrency();
452 std::vector<HeaderInfo> headerVec;
453 std::vector<std::future<std::vector<HeaderInfo>>> file_futures;
454 for (boost::filesystem::directory_iterator fileIt(path); fileIt != endItr; ++fileIt) {
457 maxFileId = std::max(maxFileId, fileMetadata.
file_id);
459 std::vector<HeaderInfo> tempHeaderVec;
465 return tempHeaderVec;
468 if (fileCount % threadCount) {
474 if (file_futures.size() > 0) {
483 std::sort(headerVec.begin(), headerVec.end());
489 if (headerVec.size() > 0) {
490 ChunkKey lastChunkKey = headerVec.begin()->chunkKey;
491 auto startIt = headerVec.begin();
493 for (
auto headerIt = headerVec.begin() + 1; headerIt != headerVec.end();
495 if (headerIt->chunkKey != lastChunkKey) {
500 auto destBuf = c_fm_->
createBuffer(lastChunkKey, srcBuf->pageSize());
502 destBuf->setSize(srcBuf->size());
506 size_t totalNumPages = srcBuf->getMultiPage().size();
507 for (
size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
508 Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
513 multiPage.
push(destPage, converted_data_epoch);
514 destBuf->multiPages_.push_back(multiPage);
515 size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
517 srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
518 destBuf->writeHeader(destPage, pageNum, converted_data_epoch,
false);
520 lastChunkKey = headerIt->chunkKey;
529 auto destBuf = c_fm_->
createBuffer(lastChunkKey, srcBuf->pageSize());
531 destBuf->setSize(srcBuf->size());
535 size_t totalNumPages = srcBuf->getMultiPage().size();
536 for (
size_t pageNum = 0; pageNum < totalNumPages; pageNum++) {
537 Page srcPage = srcBuf->getMultiPage()[pageNum].current().page;
542 multiPage.
push(destPage, converted_data_epoch);
543 destBuf->multiPages_.push_back(multiPage);
544 size_t reservedHeaderSize = srcBuf->reservedHeaderSize();
545 copyPage(srcPage, c_fm_, destPage, reservedHeaderSize, srcBuf->pageDataSize(), 0);
546 destBuf->writeHeader(destPage, pageNum, converted_data_epoch,
false);
551 if (!boost::filesystem::create_directory(path)) {
552 LOG(
FATAL) <<
"Specified path does not exist: " << path;
559 for (
auto& [idx, file_info] :
files_) {
562 file_info->f =
nullptr;
587 const size_t reservedHeaderSize,
588 const size_t numBytes,
589 const size_t offset) {
593 int8_t* buffer =
reinterpret_cast<int8_t*
>(
checked_malloc(numBytes));
595 size_t bytesRead = srcFileInfo->
read(
597 CHECK(bytesRead == numBytes);
598 size_t bytesWritten = destFileInfo->
write(
600 CHECK(bytesWritten == numBytes);
606 if (boost::filesystem::exists(epochFilePath)) {
607 LOG(
FATAL) <<
"Epoch file `" << epochFilePath <<
"` already exists";
617 if (!boost::filesystem::exists(epochFilePath)) {
621 if (!boost::filesystem::is_regular_file(epochFilePath)) {
622 LOG(
FATAL) <<
"Epoch file `" << epochFilePath <<
"` is not a regular file";
625 LOG(
FATAL) <<
"Epoch file `" << epochFilePath
626 <<
"` is not sized properly (current size: "
629 FILE* legacyEpochFile =
open(epochFilePath);
631 read(legacyEpochFile, 0,
sizeof(int32_t), (int8_t*)&epoch, epochFilePath);
632 close(legacyEpochFile);
639 if (!boost::filesystem::exists(epochFilePath)) {
640 LOG(
FATAL) <<
"Epoch file `" << epochFilePath <<
"` does not exist";
642 if (!boost::filesystem::is_regular_file(epochFilePath)) {
643 LOG(
FATAL) <<
"Epoch file `" << epochFilePath <<
"` is not a regular file";
646 LOG(
FATAL) <<
"Epoch file `" << epochFilePath
647 <<
"` is not sized properly (current size: "
660 CHECK(status == 0) <<
"Could not flush epoch file to disk";
666 CHECK(status == 0) <<
"Could not sync epoch file to disk";
676 const int32_t min_epoch,
679 for (
auto chunkIt = lower_bound; chunkIt !=
upper_bound; ++chunkIt) {
680 chunkIt->second->freePagesBeforeEpoch(min_epoch);
690 if (should_checkpoint) {
714 const size_t page_size,
715 const size_t num_bytes) {
718 <<
"Chunk already exists for key: " <<
show_chunk(key);
724 const size_t page_size,
725 const size_t num_bytes) {
726 size_t actual_page_size = page_size;
727 if (actual_page_size == 0) {
736 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
737 const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
740 <<
"Chunk already exists for key: " <<
show_chunk(key);
754 <<
"Chunk does not exist for key: " <<
show_chunk(key);
759 const ChunkKeyToChunkMap::iterator chunk_it,
762 chunk_it->second->freePages();
764 delete chunk_it->second;
775 std::search(chunkIt->first.begin(),
776 chunkIt->first.begin() + keyPrefix.size(),
778 keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
789 const size_t num_bytes)
const {
792 return chunk_it->second;
797 const size_t numBytes) {
801 <<
"Aborting attempt to fetch a chunk marked dirty. Chunk inconsistency for key: "
806 if (numBytes > 0 && numBytes > chunk->
size()) {
808 << chunk->
size() <<
") than number of bytes requested (" << numBytes
811 chunk->
copyTo(destBuffer, numBytes);
816 const size_t numBytes) {
818 size_t oldChunkSize = chunk->size();
820 size_t newChunkSize = (numBytes == 0) ? srcBuffer->
size() : numBytes;
821 if (chunk->isDirty()) {
824 if (srcBuffer->
isUpdated() && chunk->isUpdated()) {
825 LOG(
FATAL) <<
"Aborting attempt to write a chunk marked dirty. Chunk inconsistency "
830 CHECK(srcBuffer->
isDirty()) <<
"putBuffer expects a dirty buffer";
836 if (0 == numBytes && !chunk->isDirty()) {
837 chunk->setSize(newChunkSize);
847 CHECK_LT(oldChunkSize, newChunkSize);
848 chunk->append((int8_t*)srcBuffer->
getMemoryPtr() + oldChunkSize,
849 newChunkSize - oldChunkSize,
856 <<
"Dirty buffer with size > 0 must be marked as isAppended() or isUpdated()";
862 chunk->syncEncoder(srcBuffer);
867 LOG(
FATAL) <<
"Operation not supported";
872 LOG(
FATAL) <<
"Operation not supported";
878 auto candidateFiles =
fileIndex_.equal_range(pageSize);
879 int32_t pageNum = -1;
880 for (
auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
895 CHECK(pageNum != -1);
901 std::vector<Page>& pages,
902 const bool isMetadata) {
906 auto candidateFiles =
fileIndex_.equal_range(pageSize);
907 size_t numPagesNeeded = numPagesRequested;
908 for (
auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
914 pages.emplace_back(fileInfo->
fileId, pageNum);
917 }
while (pageNum != -1 && numPagesNeeded > 0);
918 if (numPagesNeeded == 0) {
922 while (numPagesNeeded > 0) {
933 pages.emplace_back(fileInfo->
fileId, pageNum);
936 }
while (pageNum != -1 && numPagesNeeded > 0);
937 if (numPagesNeeded == 0) {
941 CHECK(pages.size() == numPagesRequested);
946 const size_t pageSize,
947 const size_t numPages,
948 std::vector<HeaderInfo>& headerVec) {
949 FILE*
f =
open(path);
951 this, fileId, f, pageSize, numPages, path,
false);
956 fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
962 if (pageSize == 0 || numPages == 0) {
963 LOG(
FATAL) <<
"File creation failed: pageSize and numPages must be greater than 0.";
977 this, fileId,
f, pageSize, numPages, file_path,
true);
983 fileIndex_.insert(std::pair<size_t, int32_t>(pageSize, fileId));
991 return files_.at(fileId)->f;
996 auto chunk_it =
chunkIndex_.lower_bound(key_prefix);
1001 std::mismatch(key_prefix.begin(), key_prefix.end(), chunk_it->first.begin());
1002 return it_pair.first == key_prefix.end();
1009 auto chunkIt =
chunkIndex_.lower_bound(keyPrefix);
1014 std::search(chunkIt->first.begin(),
1015 chunkIt->first.begin() + keyPrefix.size(),
1017 keyPrefix.end()) != chunkIt->first.begin() + keyPrefix.size()) {
1018 if (chunkIt->second->hasEncoder()) {
1019 auto chunk_metadata = std::make_shared<ChunkMetadata>();
1020 chunkIt->second->encoder_->getMetadata(chunk_metadata);
1021 chunkMetadataVec.emplace_back(chunkIt->first, chunk_metadata);
1031 return chunkIt->second->numMetadataPages();
1033 throw std::runtime_error(
"Chunk was not found.");
1049 LOG(
FATAL) <<
"DB forward compatibility is not supported. Version of HeavyDB "
1050 "software used is older than the version of DB being read: "
1062 const std::string versionFilePath(
fileMgrBasePath_ +
"/" + versionFileName);
1063 if (!boost::filesystem::exists(versionFilePath)) {
1066 if (!boost::filesystem::is_regular_file(versionFilePath)) {
1072 FILE* versionFile =
open(versionFilePath);
1074 read(versionFile, 0,
sizeof(int32_t), (int8_t*)&version, versionFilePath);
1081 const std::string versionFilePath(
fileMgrBasePath_ +
"/" + versionFileName);
1083 if (boost::filesystem::exists(versionFilePath)) {
1085 LOG(
INFO) <<
"Storage version file `" << versionFilePath
1086 <<
"` already exists, its current version is " << oldVersion;
1087 versionFile =
open(versionFilePath);
1089 versionFile =
create(versionFilePath,
sizeof(int32_t));
1091 write(versionFile, 0,
sizeof(int32_t), (int8_t*)&version);
1092 int32_t status = fflush(versionFile);
1094 LOG(
FATAL) <<
"Could not flush version file " << versionFilePath <<
" to disk";
1102 LOG(
FATAL) <<
"Could not sync version file " << versionFilePath <<
" to disk";
1109 LOG(
INFO) <<
"Migrating file format version from 0 to 1 for `" << versionFilePath;
1114 int32_t migrationCompleteVersion = 1;
1119 LOG(
INFO) <<
"Migrating file format version from 1 to 2";
1121 constexpr int32_t migration_complete_version{2};
1126 std::map<boost::filesystem::path, boost::filesystem::path> old_to_new_paths;
1127 for (boost::filesystem::directory_iterator it(table_data_dir), end_it; it != end_it;
1129 const auto old_path = boost::filesystem::canonical(it->path());
1130 if (boost::filesystem::is_regular_file(it->status()) &&
1132 auto new_path = old_path;
1134 old_to_new_paths[old_path] = new_path;
1137 for (
const auto& [old_path, new_path] : old_to_new_paths) {
1138 boost::filesystem::rename(old_path, new_path);
1139 LOG(
INFO) <<
"Rebrand migration: Renamed " << old_path <<
" to " << new_path;
1140 boost::filesystem::create_symlink(new_path.filename(), old_path);
1141 LOG(
INFO) <<
"Rebrand migration: Added symlink from " << old_path <<
" to "
1142 << new_path.filename();
1153 <<
"Table storage forward compatibility is not supported. Version of HeavyDB "
1154 "software used is older than the version of table being read: "
1192 std::stringstream error_message;
1193 error_message <<
"Cannot set epoch for " <<
describeSelf()
1194 <<
" lower than the minimum rollback epoch (" <<
epoch_.
floor() <<
").";
1195 throw std::runtime_error(error_message.str());
1219 CHECK(boost::filesystem::exists(file_path));
1220 boost::filesystem::remove(file_path);
1234 UNREACHABLE() <<
"Unexpected status file name: " << status_file_name;
1272 CHECK(!boost::filesystem::exists(copy_pages_status_file_path));
1273 std::ofstream status_file(copy_pages_status_file_path.string(),
1274 std::ios::out | std::ios::binary);
1275 status_file.close();
1277 std::vector<PageMapping> page_mappings;
1278 std::set<Page> touched_pages;
1279 std::set<size_t> page_sizes;
1280 for (
auto [file_id, file_info] :
files_) {
1281 page_sizes.emplace(file_info->pageSize);
1283 for (
auto page_size : page_sizes) {
1303 std::vector<PageMapping>& page_mappings,
1304 std::set<Page>& touched_pages) {
1305 std::vector<FileInfo*> sorted_file_infos;
1306 auto range =
fileIndex_.equal_range(page_size);
1307 for (
auto it = range.first; it != range.second; it++) {
1308 sorted_file_infos.emplace_back(
files_.at(it->second));
1310 if (sorted_file_infos.empty()) {
1317 sorted_file_infos.end(),
1319 return file_1->
freePages.size() < file_2->freePages.size();
1322 size_t destination_index = 0, source_index = sorted_file_infos.size() - 1;
1325 while (destination_index < source_index &&
1326 sorted_file_infos[destination_index]->
freePages.empty()) {
1327 destination_index++;
1331 while (destination_index < source_index &&
1332 sorted_file_infos[source_index]->
freePages.size() ==
1333 sorted_file_infos[source_index]->numPages) {
1337 std::set<size_t> source_used_pages;
1338 CHECK(destination_index <= source_index);
1341 int64_t total_free_pages{0};
1342 for (
size_t i = destination_index; i <= source_index; i++) {
1343 total_free_pages += sorted_file_infos[i]->numFreePages();
1346 while (destination_index < source_index) {
1347 if (source_used_pages.empty()) {
1349 auto source_file_info = sorted_file_infos[source_index];
1350 auto& free_pages = source_file_info->freePages;
1351 for (
size_t page_num = 0; page_num < source_file_info->numPages; page_num++) {
1352 if (free_pages.find(page_num) == free_pages.end()) {
1353 source_used_pages.emplace(page_num);
1358 total_free_pages -= source_file_info->numFreePages();
1362 if (total_free_pages - static_cast<int64_t>(source_used_pages.size()) < 0) {
1367 auto dest_file_info = sorted_file_infos[destination_index];
1368 while (!source_used_pages.empty() && !dest_file_info->freePages.empty()) {
1370 size_t source_page_num = *source_used_pages.begin();
1371 source_used_pages.erase(source_page_num);
1373 Page source_page{sorted_file_infos[source_index]->
fileId, source_page_num};
1375 sorted_file_infos[destination_index],
1381 if (source_used_pages.empty()) {
1385 if (dest_file_info->freePages.empty()) {
1386 destination_index++;
1399 std::vector<PageMapping>& page_mappings,
1400 std::set<Page>& touched_pages) {
1401 size_t destination_page_num = destination_file_info->
getFreePage();
1402 CHECK_NE(destination_page_num, static_cast<size_t>(-1));
1403 Page destination_page{destination_file_info->
fileId, destination_page_num};
1406 CHECK(touched_pages.find(source_page) == touched_pages.end());
1407 touched_pages.emplace(source_page);
1409 CHECK(touched_pages.find(destination_page) == touched_pages.end());
1410 touched_pages.emplace(destination_page);
1413 page_mappings.emplace_back(static_cast<size_t>(source_page.
fileId),
1416 static_cast<size_t>(destination_page.fileId),
1417 destination_page.pageNum);
1428 const Page& destination_page) {
1430 CHECK(source_file_info);
1434 CHECK(destination_file_info);
1438 auto page_size = source_file_info->
pageSize;
1439 auto buffer = std::make_unique<int8_t[]>(page_size);
1441 source_file_info->
read(source_page.
pageNum * page_size, page_size, buffer.get());
1444 auto header_size_offset =
sizeof(int32_t);
1445 size_t bytes_written = destination_file_info->
write(
1446 (destination_page.
pageNum * page_size) + header_size_offset,
1447 page_size - header_size_offset,
1448 buffer.get() + header_size_offset);
1449 CHECK_EQ(page_size - header_size_offset, bytes_written);
1450 return reinterpret_cast<int32_t*
>(buffer.get())[0];
1458 for (
const auto& page_mapping : page_mappings) {
1459 auto destination_file =
files_.at(page_mapping.destination_file_id);
1462 auto header_size = page_mapping.source_page_header_size;
1464 destination_file->write(
1465 page_mapping.destination_page_num * destination_file->pageSize,
1467 reinterpret_cast<int8_t*>(&header_size));
1468 auto source_file =
files_.at(page_mapping.source_file_id);
1472 source_file->write(page_mapping.source_page_num * source_file->pageSize,
1474 reinterpret_cast<int8_t*>(&free_page_header_size));
1475 source_file->freePageDeferred(page_mapping.source_page_num);
1478 for (
auto file_info_entry :
files_) {
1479 int32_t status = file_info_entry.second->syncToDisk();
1481 LOG(
FATAL) <<
"Could not sync file to disk";
1491 for (
auto [file_id, file_info] :
files_) {
1492 CHECK_EQ(file_id, file_info->fileId);
1493 if (file_info->freePages.size() == file_info->numPages) {
1494 fclose(file_info->f);
1495 file_info->f =
nullptr;
1498 boost::filesystem::remove(file_path);
1503 CHECK(boost::filesystem::exists(status_file_path));
1504 boost::filesystem::remove(status_file_path);
1513 const std::vector<PageMapping>& page_mappings) {
1515 CHECK(boost::filesystem::exists(file_path));
1516 CHECK(boost::filesystem::is_empty(file_path));
1517 std::ofstream status_file{file_path.string(), std::ios::out | std::ios::binary};
1518 int64_t page_mappings_count = page_mappings.size();
1519 status_file.write(reinterpret_cast<const char*>(&page_mappings_count),
sizeof(int64_t));
1520 status_file.write(reinterpret_cast<const char*>(page_mappings.data()),
1522 status_file.close();
1530 CHECK(boost::filesystem::exists(file_path));
1531 std::ifstream status_file{file_path.string(),
1532 std::ios::in | std::ios::binary | std::ios::ate};
1533 CHECK(status_file.is_open());
1535 status_file.seekg(0, std::ios::beg);
1536 CHECK_GE(file_size,
sizeof(int64_t));
1538 int64_t page_mappings_count;
1539 status_file.read(reinterpret_cast<char*>(&page_mappings_count),
sizeof(int64_t));
1540 auto page_mappings_byte_size = file_size -
sizeof(int64_t);
1542 CHECK_EQ(static_cast<size_t>(page_mappings_count),
1545 std::vector<PageMapping> page_mappings(page_mappings_count);
1546 status_file.read(reinterpret_cast<char*>(page_mappings.data()),
1547 page_mappings_byte_size);
1548 status_file.close();
1549 return page_mappings;
1556 const char*
const to_status) {
1557 auto from_status_file_path =
getFilePath(from_status);
1558 auto to_status_file_path =
getFilePath(to_status);
1559 CHECK(boost::filesystem::exists(from_status_file_path));
1560 CHECK(!boost::filesystem::exists(to_status_file_path));
1561 boost::filesystem::rename(from_status_file_path, to_status_file_path);
1576 for (
auto file_info_entry :
files_) {
1577 int32_t status = file_info_entry.second->syncToDisk();
1578 CHECK(status == 0) <<
"Could not sync file to disk";
1584 size_t num_hardware_based_threads = std::thread::hardware_concurrency();
1585 if (num_reader_threads == 0 || num_reader_threads > num_hardware_based_threads) {
1598 free_pages_.clear();
1603 const size_t num_bytes) {
1604 return new FileBuffer(
this, page_size, key, num_bytes);
1609 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
1610 const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
1611 return new FileBuffer(
this, key, headerStartIt, headerEndIt);
1626 auto table_epoch =
epoch(db_id, tb_id);
1655 if (buf->isDirty()) {
1656 buf->writeMetadata(
epoch());
1657 buf->clearDirtyBits();
DEVICE auto upper_bound(ARGS &&...args)
int32_t openAndReadLegacyEpochFile(const std::string &epochFileName)
void createTopLevelMetadata()
const size_t metadata_page_size_
std::vector< PageMapping > readPageMappingsFromStatusFile()
virtual FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &headerStartIt, const std::vector< HeaderInfo >::const_iterator &headerEndIt)
int32_t readVersionFromDisk(const std::string &versionFileName) const
std::vector< int > ChunkKey
OpenFilesResult openFiles()
AbstractBuffer * alloc(const size_t numBytes) override
size_t write(const size_t offset, const size_t size, const int8_t *buf)
TablePair fileMgrKey_
Global FileMgr.
uint64_t total_data_page_count
std::string getBasePath() const
FileMgr(const int32_t device_id, GlobalFileMgr *gfm, const TablePair file_mgr_key, const int32_t max_rollback_epochs=-1, const size_t num_reader_threads=0, const int32_t epoch=-1)
Constructor.
void createEpochFile(const std::string &epochFileName)
bool is_page_deleted_without_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
virtual Page requestFreePage(size_t pagesize, const bool isMetadata)
size_t getNumUsedMetadataPagesForChunkKey(const ChunkKey &chunkKey) const
void syncEncoder(const AbstractBuffer *src_buffer)
std::string get_legacy_data_file_path(const std::string &new_data_file_path)
FileBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
heavyai::shared_lock< heavyai::shared_mutex > read_lock
void sortAndCopyFilePagesForCompaction(size_t page_size, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
uint32_t metadata_file_count
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
std::vector< HeaderInfo > header_infos
virtual FileBuffer * allocateBuffer(const size_t page_size, const ChunkKey &key, const size_t num_bytes=0)
A logical page (Page) belongs to a file on disk.
#define CHUNK_KEY_FRAGMENT_IDX
void copySourcePageForCompaction(const Page &source_page, FileInfo *destination_file_info, std::vector< PageMapping > &page_mappings, std::set< Page > &touched_pages)
virtual int8_t * getMemoryPtr()=0
void free(AbstractBuffer *buffer) override
virtual MemoryLevel getType() const =0
void freePageImmediate(int32_t page_num)
void copyPage(Page &srcPage, FileMgr *destFileMgr, Page &destPage, const size_t reservedHeaderSize, const size_t numBytes, const size_t offset)
void setDataAndMetadataFileStats(StorageStats &storage_stats) const
void migrateToLatestFileMgrVersion()
void rollOffOldData(const int32_t epochCeiling, const bool shouldCheckpoint)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
static constexpr char const * UPDATE_PAGE_VISIBILITY_STATUS
DEVICE void sort(ARGS &&...args)
void init(const size_t num_reader_threads, const int32_t epochOverride)
TypeR::rep timer_stop(Type clock_begin)
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
bool getDBConvert() const
static int64_t min_allowable_epoch()
std::string getFileMgrBasePath() const
bool is_compaction_status_file(const std::string &file_name)
std::mutex getPageMutex_
pointer to DB level metadata
#define DEFAULT_METADATA_PAGE_SIZE
heavyai::unique_lock< heavyai::shared_mutex > write_lock
virtual bool updatePageIfDeleted(FileInfo *file_info, ChunkKey &chunk_key, int32_t contingent, int32_t page_epoch, int32_t page_num)
deletes or recovers a page based on last checkpointed epoch.
std::optional< uint64_t > total_free_data_page_count
void writePageMappingsToStatusFile(const std::vector< PageMapping > &page_mappings)
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Represents/provides access to contiguous data stored in the file system.
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
int32_t copyPageWithoutHeaderSize(const Page &source_page, const Page &destination_page)
int32_t getDBVersion() const
std::string fileMgrBasePath_
static void setNumPagesPerMetadataFile(size_t num_pages)
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
std::string show_chunk(const ChunkKey &key)
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
static size_t num_pages_per_data_file_
const int32_t latestFileMgrVersion_
int32_t PageHeaderSizeType
std::shared_lock< T > shared_lock
int32_t db_version_
the index of the next file id
std::set< size_t > freePages
future< Result > async(Fn &&fn, Args &&...args)
size_t pageSize
file stream object for the represented file
void writeAndSyncVersionToDisk(const std::string &versionFileName, const int32_t version)
size_t getNumChunks() override
void removeTableRelatedDS(const int32_t db_id, const int32_t table_id) override
void processFileFutures(std::vector< std::future< std::vector< HeaderInfo >>> &file_futures, std::vector< HeaderInfo > &headerVec)
FileBuffer * getOrCreateBuffer(const ChunkKey &key)
void * checked_malloc(const size_t size)
static void setNumPagesPerDataFile(size_t num_pages)
std::optional< uint64_t > total_free_metadata_page_count
int32_t getDBVersion() const
Index for looking up chunks.
void freePagesBeforeEpoch(const int32_t min_epoch)
ChunkKeyToChunkMap chunkIndex_
PageSizeFileMMap fileIndex_
A map of files accessible via a file identifier.
std::unique_lock< T > unique_lock
virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it, const bool purge=true)
#define CHUNK_KEY_TABLE_IDX
uint64_t total_metadata_page_count
FILE * getFileForFileId(const int32_t fileId)
Returns FILE pointer associated with requested fileId.
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
static constexpr char FILE_MGR_VERSION_FILENAME[]
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
static size_t num_pages_per_metadata_file_
bool isBufferOnDevice(const ChunkKey &key) override
uint32_t getFragmentCount() const
FileInfo * openExistingFile(const std::string &path, const int32_t fileId, const size_t pageSize, const size_t numPages, std::vector< HeaderInfo > &headerVec)
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf, const std::string &file_path)
Reads the specified number of bytes from the offset position in file f into buf.
void writeAndSyncEpochToDisk()
boost::filesystem::path getFilePath(const std::string &file_name) const
size_t pageNum
unique identifier of the owning file
#define DEFAULT_PAGE_SIZE
std::string compaction_status_file_name
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
heavyai::shared_mutex chunkIndexMutex_
const TablePair get_fileMgrKey() const
static constexpr char EPOCH_FILENAME[]
virtual std::string describeSelf() const
virtual FileBuffer * createBufferUnlocked(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0)
heavyai::shared_mutex mutex_free_page_
int32_t maxRollbackEpochs_
void freePagesBeforeEpochUnlocked(const int32_t min_epoch, const ChunkKeyToChunkMap::iterator lower_bound, const ChunkKeyToChunkMap::iterator upper_bound)
DEVICE auto lower_bound(ARGS &&...args)
uint64_t total_metadata_file_size
void migrateEpochFileV0()
void push(const Page &page, const int epoch)
Pushes a new page with epoch value.
void openAndReadEpochFile(const std::string &epochFileName)
size_t read(const size_t offset, const size_t size, int8_t *buf)
bool epochIsCheckpointed_
bool is_page_deleted_with_checkpoint(int32_t table_epoch, int32_t page_epoch, int32_t contingent)
StorageStats getStorageStats() const
uint64_t total_data_file_size
bool hasChunkMetadataForKeyPrefix(const ChunkKey &keyPrefix)
static size_t byte_size()
static constexpr char const * DELETE_EMPTY_FILES_STATUS
~FileMgr() override
Destructor.
bool getDBConvert() const
static void renameAndSymlinkLegacyFiles(const std::string &table_data_dir)
virtual void free_page(std::pair< FileInfo *, int32_t > &&page)
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
virtual void closeRemovePhysical()
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
static constexpr char const * COPY_PAGES_STATUS
FileMetadata getMetadataForFile(const boost::filesystem::directory_iterator &fileIterator) const
void openExistingFile(std::vector< HeaderInfo > &headerVec)
FileInfo * createFile(const size_t pageSize, const size_t numPages)
Adds a file to the file manager repository.
std::map< int32_t, FileInfo * > files_
void updateMappedPagesVisibility(const std::vector< PageMapping > &page_mappings)
std::optional< uint32_t > fragment_count
FileBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
void recoverPage(const ChunkKey &chunk_key, int32_t page_num)
void close(FILE *f)
Closes the file pointed to by the FILE pointer.
virtual FileBuffer * getBufferUnlocked(const ChunkKey &key, const size_t numBytes=0) const
void setEpoch(const int32_t newEpoch)
bool coreInit()
Determines file path, and if exists, runs file migration and opens and reads epoch file...
FileInfo * getFileInfoForFileId(const int32_t fileId) const
void initializeNumThreads(size_t num_reader_threads=0)
void requestFreePages(size_t npages, size_t pagesize, std::vector< Page > &pages, const bool isMetadata)
Obtains free pages – creates new files if necessary – of the requested size.
heavyai::shared_mutex files_rw_mutex_
void closePhysicalUnlocked()
std::pair< const int32_t, const int32_t > TablePair
std::string get_data_file_path(const std::string &base_path, int file_id, size_t page_size)
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_<EPOCH>_<oldname>.
unsigned nextFileId_
number of threads used when loading data
int32_t epochFloor() const
bool is_metadata_file(size_t file_size, size_t page_size, size_t metadata_page_size, size_t num_pages_per_metadata_file)
size_t file_size(const int fd)
void renameCompactionStatusFile(const char *const from_status, const char *const to_status)
The MultiPage stores versions of the same logical page in a deque.
constexpr auto kLegacyDataFileExtension
static constexpr char LEGACY_EPOCH_FILENAME[]
A selection of helper methods for File I/O.
FileBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
void resumeFileCompaction(const std::string &status_file_name)
static constexpr char DB_META_FILENAME[]
static constexpr int32_t INVALID_VERSION
void migrateLegacyFilesV1()