43 using namespace Data_Namespace;
46 namespace filesystem {
47 class directory_iterator;
51 namespace File_Namespace {
91 using TablePair = std::pair<const int32_t, const int32_t>;
104 int32_t epoch_floor{0};
105 uint32_t metadata_file_count{0};
106 uint64_t total_metadata_file_size{0};
107 uint64_t total_metadata_page_count{0};
108 std::optional<uint64_t> total_free_metadata_page_count{};
109 uint32_t data_file_count{0};
110 uint64_t total_data_file_size{0};
111 uint64_t total_data_page_count{0};
112 std::optional<uint64_t> total_free_data_page_count{};
113 std::optional<uint32_t> fragment_count{};
133 size_t source_page_num,
135 int32_t destination_file_id,
136 size_t destination_page_num)
137 : source_file_id(source_file_id)
138 , source_page_num(source_page_num)
139 , source_page_header_size(source_page_header_size)
140 , destination_file_id(destination_file_id)
141 , destination_page_num(destination_page_num) {}
159 FileMgr(
const int32_t device_id,
162 const int32_t max_rollback_epochs = -1,
163 const size_t num_reader_threads = 0,
164 const int32_t epoch = -1);
167 FileMgr(
const int32_t device_id,
170 const bool run_core_init);
181 const size_t numBytes = 0)
override;
183 bool isBufferOnDevice(
const ChunkKey& key)
override;
188 void deleteBuffer(
const ChunkKey& key,
const bool purge =
true)
override;
189 void deleteBuffersWithPrefix(
const ChunkKey& keyPrefix,
190 const bool purge =
true)
override;
195 void fetchBuffer(
const ChunkKey& key,
197 const size_t numBytes)
override;
207 const size_t numBytes = 0)
override;
212 virtual Page requestFreePage(
size_t pagesize,
const bool isMetadata);
216 inline std::string
printSlabs()
override {
return "Not Implemented"; }
223 return files_.at(fileId);
227 const boost::filesystem::directory_iterator& fileIterator)
const;
229 void init(
const size_t num_reader_threads,
const int32_t epochOverride);
230 void init(
const std::string& dataPathToConvertFrom,
const int32_t epochOverride);
232 void copyPage(
Page& srcPage,
235 const size_t reservedHeaderSize,
236 const size_t numBytes,
237 const size_t offset);
252 void requestFreePages(
size_t npages,
254 std::vector<Page>& pages,
255 const bool isMetadata);
258 const ChunkKey& keyPrefix)
override;
260 bool hasChunkMetadataForKeyPrefix(
const ChunkKey& keyPrefix);
267 void checkpoint()
override;
268 void checkpoint(
const int32_t db_id,
const int32_t tb_id)
override {
269 LOG(
FATAL) <<
"Operation not supported, api checkpoint() should be used instead";
277 inline virtual int32_t
epoch(int32_t db_id, int32_t tb_id)
const {
return epoch(); }
279 inline int32_t
epochFloor()
const {
return static_cast<int32_t
>(epoch_.floor()); }
282 int32_t newEpoch = epoch_.increment();
283 epochIsCheckpointed_ =
false;
287 LOG(
FATAL) <<
"Epoch for table (" << fileMgrKey_.first <<
", " << fileMgrKey_.second
288 <<
") greater than maximum allowed value of "
298 return epoch() - (epochIsCheckpointed_ ? 0 : 1);
321 FILE* getFileForFileId(
const int32_t fileId);
323 size_t getNumChunks()
override;
324 size_t getNumUsedMetadataPagesForChunkKey(
const ChunkKey& chunkKey)
const;
328 int32_t getDBVersion()
const;
329 bool getDBConvert()
const;
330 void createTopLevelMetadata();
332 virtual void closeRemovePhysical();
334 void removeTableRelatedDS(
const int32_t db_id,
const int32_t table_id)
override;
336 virtual void free_page(std::pair<FileInfo*, int32_t>&& page);
340 boost::filesystem::path getFilePath(
const std::string& file_name)
const;
343 void writePageMappingsToStatusFile(
const std::vector<PageMapping>& page_mappings);
346 void renameCompactionStatusFile(
const char*
const from_status,
347 const char*
const to_status);
354 virtual bool updatePageIfDeleted(
FileInfo* file_info,
369 virtual std::string describeSelf()
const;
371 static constexpr
size_t DEFAULT_NUM_PAGES_PER_DATA_FILE{256};
372 static constexpr
size_t DEFAULT_NUM_PAGES_PER_METADATA_FILE{4096};
375 static constexpr
char const* COPY_PAGES_STATUS{
"pending_data_compaction_0"};
376 static constexpr
char const* UPDATE_PAGE_VISIBILITY_STATUS{
"pending_data_compaction_1"};
377 static constexpr
char const* DELETE_EMPTY_FILES_STATUS{
"pending_data_compaction_2"};
381 static void setNumPagesPerDataFile(
size_t num_pages);
382 static void setNumPagesPerMetadataFile(
size_t num_pages);
384 static void renameAndSymlinkLegacyFiles(
const std::string& table_data_dir);
386 static constexpr
char LEGACY_EPOCH_FILENAME[] =
"epoch";
387 static constexpr
char EPOCH_FILENAME[] =
"epoch_metadata";
388 static constexpr
char DB_META_FILENAME[] =
"dbmeta";
389 static constexpr
char FILE_MGR_VERSION_FILENAME[] =
"filemgr_version";
390 static constexpr int32_t INVALID_VERSION = -1;
394 FileMgr(
const size_t defaultPageSize,
const size_t defaultMetadataPageSize);
398 std::map<int32_t, FileInfo*>
405 int32_t fileMgrVersion_;
407 const int32_t latestFileMgrVersion_{2};
408 FILE* DBMetaFile_ =
nullptr;
415 bool isFullyInitted_{
false};
436 FileInfo* createFile(
const size_t pageSize,
const size_t numPages);
437 FileInfo* openExistingFile(
const std::string& path,
438 const int32_t fileId,
439 const size_t pageSize,
440 const size_t numPages,
441 std::vector<HeaderInfo>& headerVec);
442 void createEpochFile(
const std::string& epochFileName);
443 int32_t openAndReadLegacyEpochFile(
const std::string& epochFileName);
444 void openAndReadEpochFile(
const std::string& epochFileName);
445 void writeAndSyncEpochToDisk();
446 void setEpoch(
const int32_t newEpoch);
447 int32_t readVersionFromDisk(
const std::string& versionFileName)
const;
448 void writeAndSyncVersionToDisk(
const std::string& versionFileName,
450 void processFileFutures(std::vector<std::future<std::vector<HeaderInfo>>>& file_futures,
451 std::vector<HeaderInfo>& headerVec);
454 const size_t numBytes = 0);
457 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
458 const std::vector<HeaderInfo>::const_iterator& headerEndIt);
461 void migrateToLatestFileMgrVersion();
462 void migrateEpochFileV0();
463 void migrateLegacyFilesV1();
467 void clearFileInfos();
470 void copySourcePageForCompaction(
const Page& source_page,
472 std::vector<PageMapping>& page_mappings,
473 std::set<Page>& touched_pages);
474 int32_t copyPageWithoutHeaderSize(
const Page& source_page,
475 const Page& destination_page);
476 void sortAndCopyFilePagesForCompaction(
size_t page_size,
477 std::vector<PageMapping>& page_mappings,
478 std::set<Page>& touched_pages);
479 void updateMappedPagesVisibility(
const std::vector<PageMapping>& page_mappings);
480 void deleteEmptyFiles();
481 void resumeFileCompaction(
const std::string& status_file_name);
482 std::vector<PageMapping> readPageMappingsFromStatusFile();
487 void closePhysicalUnlocked();
488 void syncFilesToDisk();
490 void initializeNumThreads(
size_t num_reader_threads = 0);
491 virtual FileBuffer* allocateBuffer(
const size_t page_size,
493 const size_t num_bytes = 0);
496 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
497 const std::vector<HeaderInfo>::const_iterator& headerEndIt);
498 virtual ChunkKeyToChunkMap::iterator deleteBufferUnlocked(
499 const ChunkKeyToChunkMap::iterator chunk_it,
500 const bool purge =
true);
502 const size_t numBytes = 0)
const;
505 void rollOffOldData(
const int32_t epochCeiling,
const bool shouldCheckpoint);
506 void freePagesBeforeEpoch(
const int32_t min_epoch);
507 void freePagesBeforeEpochUnlocked(
const int32_t min_epoch,
517 inline int32_t
epoch()
const {
return static_cast<int32_t
>(epoch_.ceiling()); }
518 void writeDirtyBuffers();
520 void setDataAndMetadataFileStats(
StorageStats& storage_stats)
const;
521 uint32_t getFragmentCount()
const;
527 bool epochIsCheckpointed_ =
true;
528 FILE* epochFile_ =
nullptr;
DEVICE auto upper_bound(ARGS &&...args)
const size_t metadata_page_size_
virtual int32_t epoch(int32_t db_id, int32_t tb_id) const
Returns current value of epoch - should be one greater than recorded at last checkpoint. Because FileMgr only contains buffers from one table we can just return the FileMgr's epoch instead of finding a table-specific epoch.
std::vector< int > ChunkKey
TablePair fileMgrKey_
Global FileMgr.
size_t getPageSize() const
virtual bool hasFileMgrKey() const
std::vector< HeaderInfo > header_infos
A logical page (Page) belongs to a file on disk.
int32_t destination_file_id
std::string printSlabs() override
size_t getMaxSize() override
std::string getFileMgrBasePath() const
std::mutex getPageMutex_
pointer to DB level metadata
Represents/provides access to contiguous data stored in the file system.
static int64_t max_allowable_epoch()
MgrType getMgrType() override
std::string fileMgrBasePath_
std::multimap< size_t, int32_t > PageSizeFileMMap
Maps logical page sizes to files.
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
size_t getInUseSize() override
static size_t num_pages_per_data_file_
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
int32_t PageHeaderSizeType
int32_t db_version_
the index of the next file id
PageMapping(int32_t source_file_id, size_t source_page_num, PageHeaderSizeType source_page_header_size, int32_t destination_file_id, size_t destination_page_num)
void init(LogOptions const &log_opts)
ChunkKeyToChunkMap chunkIndex_
PageSizeFileMMap fileIndex_
A map of files accessible via a file identifier.
std::vector< std::pair< FileInfo *, int32_t > > free_pages_
An AbstractBuffer is a unit of data management for a data manager.
size_t num_reader_threads_
Maps page sizes to FileInfo objects.
static size_t num_pages_per_metadata_file_
std::map< ChunkKey, FileBuffer * > ChunkKeyToChunkMap
Maps ChunkKeys (unique ids for Chunks) to Chunk objects.
std::string compaction_status_file_name
heavyai::shared_mutex chunkIndexMutex_
const TablePair get_fileMgrKey() const
heavyai::shared_mutex mutex_free_page_
int32_t maxRollbackEpochs_
DEVICE auto lower_bound(ARGS &&...args)
void checkpoint(const int32_t db_id, const int32_t tb_id) override
virtual bool failOnReadError() const
True if a read error should cause a fatal error.
std::string getStringMgrType() override
size_t getAllocated() override
This file contains the declaration and definition of a Page type and a MultiPage type.
std::map< int32_t, FileInfo * > files_
FileInfo * getFileInfoForFileId(const int32_t fileId) const
heavyai::shared_mutex files_rw_mutex_
size_t destination_page_num
int32_t maxRollbackEpochs()
Returns value max_rollback_epochs.
std::shared_timed_mutex shared_mutex
std::pair< const int32_t, const int32_t > TablePair
unsigned nextFileId_
number of threads used when loading data
int32_t epochFloor() const
PageHeaderSizeType source_page_header_size
bool isAllocationCapped() override
size_t getMetadataPageSize() const
size_t getNumReaderThreads()
Returns number of threads defined by parameter num-reader-threads which should be used during initial...