26 #include <boost/filesystem.hpp>
30 namespace bf = boost::filesystem;
34 size_t space_used = 0;
35 if (bf::exists(dir)) {
36 for (
const auto& file : bf::recursive_directory_iterator(dir)) {
37 if (bf::is_regular_file(file.path())) {
50 LOG(
FATAL) <<
"Disk cache needs to evict data to make space, but no data found in "
58 namespace File_Namespace {
62 ss <<
"Dump Cache:\n";
64 ss <<
" " <<
show_chunk(key) <<
" num_pages: " << buf->pageCount()
65 <<
", is dirty: " << buf->isDirty() <<
"\n";
93 auto& header_vec = open_files_result.header_infos;
94 std::sort(header_vec.begin(), header_vec.end());
99 VLOG(3) <<
"Number of Headers in Vector: " << header_vec.size();
100 if (header_vec.size() > 0) {
101 auto startIt = header_vec.begin();
102 ChunkKey lastChunkKey = startIt->chunkKey;
103 for (
auto it = header_vec.begin() + 1; it != header_vec.end(); ++it) {
104 if (it->chunkKey != lastChunkKey) {
106 lastChunkKey = it->chunkKey;
123 CHECK(bf::is_directory(path))
124 <<
"Specified path '" <<
fileMgrBasePath_ <<
"' for disk cache is not a directory.";
127 boost::regex table_filter(
"table_([0-9]+)_([0-9]+)");
128 for (
const auto& file : bf::directory_iterator(path)) {
130 auto file_name = file.path().filename().string();
131 if (boost::regex_match(file_name, match, table_filter)) {
132 int32_t db_id = std::stoi(match[1]);
133 int32_t tb_id = std::stoi(match[2]);
136 <<
"Trying to read data for existing table";
138 std::make_unique<TableFileMgr>(file.path().string()));
156 auto& [pair, table_dir] = *tables_it;
157 return table_dir->getEpoch();
164 auto& [pair, table_dir] = *tables_it;
165 table_dir->incrementEpoch();
172 table_it->second->writeAndSyncEpochToDisk();
199 size_t space_used = 0;
200 ChunkKey min_table_key{db_id, tb_id};
201 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
202 for (
auto it =
chunkIndex_.lower_bound(min_table_key);
205 auto& [key, buffer] = *it;
206 space_used += (buffer->numChunkPages() *
page_size_);
212 int32_t tb_id)
const {
214 size_t space_used = 0;
215 ChunkKey min_table_key{db_id, tb_id};
216 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
217 for (
auto it =
chunkIndex_.lower_bound(min_table_key);
220 auto& [key, buffer] = *it;
231 space += table_it->second->getReservedSpace();
240 return chunk_space + meta_space + subdir_space;
253 VLOG(2) <<
"Checkpointing " << describeSelf() <<
" (" << db_id <<
", " << tb_id
254 <<
") epoch: " << epoch(db_id, tb_id);
255 writeDirtyBuffers(db_id, tb_id);
257 writeAndSyncEpochToDisk(db_id, tb_id);
258 incrementEpoch(db_id, tb_id);
262 void CachingFileMgr::createTableFileMgrIfNoneExists(
const int32_t db_id,
263 const int32_t tb_id) {
266 if (table_dirs_.find(table_pair) == table_dirs_.end()) {
268 table_pair, std::make_unique<TableFileMgr>(getTableFileMgrPath(db_id, tb_id)));
273 const size_t page_size,
274 const size_t num_bytes) {
277 createTableFileMgrIfNoneExists(db_id, tb_id);
278 return FileMgr::createBufferUnlocked(key, page_size, num_bytes);
283 const std::vector<HeaderInfo>::const_iterator& startIt,
284 const std::vector<HeaderInfo>::const_iterator& endIt) {
285 if (startIt->pageId != -1) {
292 createTableFileMgrIfNoneExists(db_id, tb_id);
293 auto buffer = FileMgr::createBufferFromHeaders(key, startIt, endIt);
294 if (buffer->isMissingPages()) {
297 buffer->freeChunkPages();
310 const size_t num_bytes) {
311 CHECK(!src_buffer->
isDirty()) <<
"Cannot cache dirty buffers.";
312 deleteBufferIfExists(key);
317 return FileMgr::putBuffer(key, src_buffer, num_bytes);
320 void CachingFileMgr::incrementAllEpochs() {
322 for (
auto& table_dir : table_dirs_) {
323 table_dir.second->incrementEpoch();
327 void CachingFileMgr::removeTableFileMgr(int32_t db_id, int32_t tb_id) {
330 auto it = table_dirs_.find({db_id, tb_id});
331 if (it != table_dirs_.end()) {
332 it->second->removeDiskContent();
333 table_dirs_.erase(it);
337 void CachingFileMgr::removeTableBuffers(int32_t db_id, int32_t tb_id) {
340 ChunkKey min_table_key{db_id, tb_id};
341 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
342 for (
auto it = chunkIndex_.lower_bound(min_table_key);
343 it != chunkIndex_.upper_bound(max_table_key);) {
344 it = deleteBufferUnlocked(it);
350 const size_t num_bytes) {
356 const std::vector<HeaderInfo>::const_iterator& headerStartIt,
357 const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
362 bool CachingFileMgr::updatePageIfDeleted(
FileInfo* file_info,
374 file_info->
freePage(page_num,
false, page_epoch);
380 void CachingFileMgr::writeDirtyBuffers(int32_t db_id, int32_t tb_id) {
382 ChunkKey min_table_key{db_id, tb_id};
383 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
385 for (
auto chunk_it = chunkIndex_.lower_bound(min_table_key);
386 chunk_it != chunkIndex_.upper_bound(max_table_key);
388 if (
auto [key, buf] = *chunk_it; buf->isDirty()) {
390 buf->freeMetadataPages();
391 buf->writeMetadata(epoch(db_id, tb_id));
392 buf->clearDirtyBits();
398 void CachingFileMgr::deleteBufferIfExists(
const ChunkKey& key) {
400 auto chunk_it = chunkIndex_.find(key);
401 if (chunk_it != chunkIndex_.end()) {
402 deleteBufferUnlocked(chunk_it);
406 size_t CachingFileMgr::getNumDataChunks()
const {
408 size_t num_chunks = 0;
409 for (
auto [key, buf] : chunkIndex_) {
410 if (buf->hasDataPages()) {
417 void CachingFileMgr::deleteCacheIfTooLarge() {
419 closeRemovePhysical();
420 bf::create_directory(fileMgrBasePath_);
421 LOG(
INFO) <<
"Cache path over limit. Existing cache deleted.";
425 Page CachingFileMgr::requestFreePage(
size_t pageSize,
const bool isMetadata) {
426 std::lock_guard<std::mutex> lock(getPageMutex_);
427 int32_t pageNum = -1;
429 auto candidateFiles = fileIndex_.equal_range(pageSize);
431 for (
auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
432 FileInfo* fileInfo = files_.at(fileIt->second);
442 if (getMaxMetaFiles() > getNumMetaFiles()) {
443 fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
446 if (getMaxDataFiles() > getNumDataFiles()) {
447 fileInfo = createFile(pageSize, num_pages_per_data_file_);
455 fileInfo = isMetadata ? evictMetadataPages() : evictPages();
460 CHECK(pageNum != -1);
464 std::vector<ChunkKey> CachingFileMgr::getKeysForTable(int32_t db_id,
465 int32_t tb_id)
const {
466 std::vector<ChunkKey> keys;
467 ChunkKey min_table_key{db_id, tb_id};
468 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
469 for (
auto it = chunkIndex_.lower_bound(min_table_key);
470 it != chunkIndex_.upper_bound(max_table_key);
472 keys.emplace_back(it->first);
482 const auto keys = getKeysForTable(db_id, tb_id);
483 for (
const auto& key : keys) {
484 auto chunk_it = chunkIndex_.find(key);
485 CHECK(chunk_it != chunkIndex_.end());
486 auto& buf = chunk_it->second;
491 CHECK(buf->getMetadataPage().pageVersions.size() > 0);
493 getFileInfoForFileId(buf->getMetadataPage().pageVersions.front().page.fileId);
497 deleteBufferUnlocked(chunk_it);
500 deleteWrapperFile(db_id, tb_id);
501 CHECK(file_info) <<
"FileInfo with freed page not found";
511 if (!buf->hasDataPages()) {
522 CHECK(buf->getMultiPage().front().pageVersions.size() > 0);
523 file_info = getFileInfoForFileId(
524 buf->getMultiPage().front().pageVersions.front().page.fileId);
526 auto pages_freed = buf->freeChunkPages();
527 CHECK(pages_freed > 0) <<
"failed to evict a page";
528 CHECK(file_info) <<
"FileInfo with freed page not found";
532 void CachingFileMgr::touchKey(
const ChunkKey& key)
const {
533 chunk_evict_alg_.touchChunk(key);
537 void CachingFileMgr::removeKey(
const ChunkKey& key)
const {
539 chunk_evict_alg_.removeChunk(key);
542 ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
543 for (
auto it = chunkIndex_.lower_bound(table_key);
544 it != chunkIndex_.upper_bound(max_table_key);
546 if (it->first != key) {
553 table_evict_alg_.removeChunk(table_key);
556 size_t CachingFileMgr::getFilesSize()
const {
559 for (
auto [
id, file] : files_) {
565 size_t CachingFileMgr::getTableFileMgrsSize()
const {
567 size_t space_used = 0;
568 for (
const auto& [pair, table_dir] : table_dirs_) {
569 space_used += table_dir->getReservedSpace();
574 size_t CachingFileMgr::getNumDataFiles()
const {
576 return fileIndex_.count(page_size_);
579 size_t CachingFileMgr::getNumMetaFiles()
const {
581 return fileIndex_.count(metadata_page_size_);
584 std::vector<ChunkKey> CachingFileMgr::getChunkKeysForPrefix(
587 std::vector<ChunkKey> chunks;
588 for (
auto [key, buf] : chunkIndex_) {
590 if (buf->hasDataPages()) {
591 chunks.emplace_back(key);
599 void CachingFileMgr::removeChunkKeepMetadata(
const ChunkKey& key) {
600 if (isBufferOnDevice(key)) {
601 auto chunkIt = chunkIndex_.find(key);
602 CHECK(chunkIt != chunkIndex_.end());
603 auto& buf = chunkIt->second;
604 if (buf->hasDataPages()) {
605 buf->freeChunkPages();
606 chunk_evict_alg_.removeChunk(key);
611 size_t CachingFileMgr::getNumChunksWithMetadata()
const {
614 for (
const auto& [key, buf] : chunkIndex_) {
615 if (buf->hasEncoder()) {
622 std::string CachingFileMgr::dumpKeysWithMetadata()
const {
624 std::string ret_string =
"CFM keys with metadata:\n";
625 for (
const auto& [key, buf] : chunkIndex_) {
626 if (buf->hasEncoder()) {
633 std::string CachingFileMgr::dumpKeysWithChunkData()
const {
635 std::string ret_string =
"CFM keys with chunk data:\n";
636 for (
const auto& [key, buf] : chunkIndex_) {
637 if (buf->hasDataPages()) {
644 std::unique_ptr<CachingFileMgr> CachingFileMgr::reconstruct()
const {
646 fileMgrBasePath_, DiskCacheLevel::none, num_reader_threads_, max_size_, page_size_};
647 return std::make_unique<CachingFileMgr>(config);
650 void CachingFileMgr::deleteWrapperFile(int32_t db, int32_t tb) {
652 auto it = table_dirs_.find({db, tb});
653 CHECK(it != table_dirs_.end());
654 it->second->deleteWrapperFile();
657 void CachingFileMgr::writeWrapperFile(
const std::string& doc, int32_t db, int32_t tb) {
658 createTableFileMgrIfNoneExists(db, tb);
659 auto wrapper_size = doc.size();
660 CHECK_LE(wrapper_size, getMaxWrapperSize())
661 <<
"Wrapper is too big to fit into the cache";
662 while (wrapper_size > getAvailableWrapperSpace()) {
663 evictMetadataPages();
666 table_dirs_.at({db, tb})->writeWrapperFile(doc);
669 bool CachingFileMgr::hasWrapperFile(int32_t db_id, int32_t table_id)
const {
671 auto it = table_dirs_.find({db_id, table_id});
672 if (it != table_dirs_.end()) {
673 return it->second->hasWrapperFile();
687 void CachingFileMgr::setMaxSizes() {
688 size_t max_meta_space = std::floor(max_size_ * METADATA_SPACE_PERCENTAGE);
689 size_t max_meta_file_space = std::floor(max_size_ * METADATA_FILE_SPACE_PERCENTAGE);
690 max_wrapper_space_ = max_meta_space - max_meta_file_space;
691 auto max_data_space = max_size_ - max_meta_space;
692 auto meta_file_size = metadata_page_size_ * num_pages_per_metadata_file_;
693 auto data_file_size = page_size_ * num_pages_per_data_file_;
694 max_num_data_files_ = max_data_space / data_file_size;
695 max_num_meta_files_ = max_meta_file_space / meta_file_size;
696 CHECK_GT(max_num_data_files_, 0U) <<
"Cannot create a cache of size " << max_size_
697 <<
". Not enough space to create a data file.";
698 CHECK_GT(max_num_meta_files_, 0U) <<
"Cannot create a cache of size " << max_size_
699 <<
". Not enough space to create a metadata file.";
702 std::optional<FileBuffer*> CachingFileMgr::getBufferIfExists(
const ChunkKey& key) {
704 auto chunk_it = chunkIndex_.find(key);
705 if (chunk_it == chunkIndex_.end()) {
708 return getBufferUnlocked(key);
711 ChunkKeyToChunkMap::iterator CachingFileMgr::deleteBufferUnlocked(
712 const ChunkKeyToChunkMap::iterator chunk_it,
714 removeKey(chunk_it->first);
715 return FileMgr::deleteBufferUnlocked(chunk_it, purge);
718 void CachingFileMgr::getChunkMetadataVecForKeyPrefix(
721 FileMgr::getChunkMetadataVecForKeyPrefix(chunkMetadataVec, keyPrefix);
722 for (
const auto& [key, meta] : chunkMetadataVec) {
728 const size_t num_bytes)
const {
730 return FileMgr::getBufferUnlocked(key, num_bytes);
733 void CachingFileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
734 page.first->freePageDeferred(page.second);
737 std::set<ChunkKey> CachingFileMgr::getKeysWithMetadata()
const {
739 std::set<ChunkKey> ret;
740 for (
const auto& [key, buf] : chunkIndex_) {
741 if (buf->hasEncoder()) {
748 size_t CachingFileMgr::getMaxDataFilesSize()
const {
749 if (limit_data_size_) {
750 return *limit_data_size_;
752 return getMaxDataFiles() * getDataFileSize();
755 TableFileMgr::TableFileMgr(
const std::string& table_path)
756 : table_path_(table_path)
757 , epoch_file_path_(table_path_ +
"/" +
FileMgr::EPOCH_FILENAME)
758 , wrapper_file_path_(table_path_ +
"/" +
CachingFileMgr::WRAPPER_FILE_NAME)
760 , is_checkpointed_(
true) {
765 <<
"' for cache table data is not a directory.";
769 <<
"Found epoch file '" <<
epoch_file_path_ <<
"' which is not a regular file";
771 <<
"Found epoch file '" <<
epoch_file_path_ <<
"' which is not of expected size";
786 <<
"Epoch greater than maximum allowed value (" <<
epoch_.
ceiling() <<
" > "
799 CHECK(status == 0) <<
"Could not flush epoch file to disk";
805 CHECK(status == 0) <<
"Could not sync epoch file to disk";
817 for (
const auto& file : bf::recursive_directory_iterator(
table_path_)) {
818 if (bf::is_regular_file(file.path())) {
835 "\". The error was: " + std::strerror(errno)};
const size_t metadata_page_size_
size_t getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const
std::vector< int > ChunkKey
OpenFilesResult openFiles()
LRUEvictionAlgorithm table_evict_alg_
void removeDiskContent() const
Removes all disk data for the subdir.
heavyai::shared_lock< heavyai::shared_mutex > read_lock
const ChunkKey evictNextChunk() override
This file details an extension of the FileMgr that can contain pages from multiple tables (CachingFil...
size_t size_of_dir(const std::string &dir)
heavyai::shared_mutex table_dirs_mutex_
std::string get_dir_name_for_table(int db_id, int tb_id)
A logical page (Page) belongs to a file on disk.
void setMaxSizes()
Sets the maximum number of files/space for each type of storage based on the maximum size...
void writeAndSyncEpochToDisk()
Write and flush the epoch to the epoch file on disk.
~CachingFileMgr() override
DEVICE void sort(ARGS &&...args)
std::string describeSelf() const override
describes this FileMgr for logging purposes.
size_t getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
void closeRemovePhysical() override
Closes files and removes the caching directory.
static int64_t min_allowable_epoch()
std::string getFileMgrBasePath() const
#define DEFAULT_METADATA_PAGE_SIZE
size_t getMetadataSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
heavyai::unique_lock< heavyai::shared_mutex > write_lock
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Represents/provides access to contiguous data stored in the file system.
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
static int64_t max_allowable_epoch()
bool hasWrapperFile() const
std::string fileMgrBasePath_
ChunkKey get_table_key(const ChunkKey &key)
std::string show_chunk(const ChunkKey &key)
CachingFileMgr(const DiskCacheConfig &config)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
std::shared_lock< T > shared_lock
std::string dumpEvictionQueue()
void deleteCacheIfTooLarge()
When the cache is read from disk, we don't know which chunks were least recently used. Rather than try to evict random pages to get down to size we just reset the cache to make sure we have space.
heavyai::shared_mutex table_mutex_
void incrementAllEpochs()
Increment epochs for each table in the CFM.
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
std::string wrapper_file_path_
std::string epoch_file_path_
ChunkKeyToChunkMap chunkIndex_
void removeTableBuffers(int32_t db_id, int32_t tb_id)
Erases and cleans up all buffers for a table.
std::unique_lock< T > unique_lock
void deleteWrapperFile() const
Deletes only the wrapper file on disk.
An AbstractBuffer is a unit of data management for a data manager.
void incrementEpoch()
increment the epoch for this subdir (not synced to disk).
bool g_enable_smem_group_by true
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf, const std::string &file_path)
Reads the specified number of bytes from the offset position in file f into buf.
void writeAndSyncEpochToDisk()
void removeTableFileMgr(int32_t db_id, int32_t tb_id)
Removes the subdirectory content for a table.
constexpr int32_t ROLLOFF_CONTINGENT
ChunkKey evict_chunk_or_fail(LRUEvictionAlgorithm &alg)
heavyai::shared_mutex chunkIndexMutex_
int32_t maxRollbackEpochs_
void writeWrapperFile(const std::string &doc) const
Writes wrapper file to disk.
static size_t byte_size()
size_t getReservedSpace() const
Returns the disk space used (in bytes) for the subdir.
std::map< TablePair, std::unique_ptr< TableFileMgr > > table_dirs_
void readTableFileMgrs()
Checks for any sub-directories containing table-specific data and creates epochs from found files...
std::pair< int, int > get_table_prefix(const ChunkKey &key)
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
void initializeNumThreads(size_t num_reader_threads=0)
void init(const size_t num_reader_threads)
Initializes a CFM, parsing any existing files and initializing data structures appropriately (current...
heavyai::shared_mutex files_rw_mutex_
void closePhysicalUnlocked()
bool in_same_table(const ChunkKey &left_key, const ChunkKey &right_key)
size_t num_reader_threads
std::pair< const int32_t, const int32_t > TablePair
unsigned nextFileId_
number of threads used when loading data
size_t getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
size_t file_size(const int fd)
int32_t getEpoch() const
Returns the current epoch (locked)
std::string getTableFileMgrPath(int32_t db, int32_t tb) const
void clearForTable(int32_t db_id, int32_t tb_id)
Removes all data related to the given table (pages and subdirectories).
A FileMgr capable of limiting it's size and storing data from multiple tables in a shared directory...
LRUEvictionAlgorithm chunk_evict_alg_
FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &startIt, const std::vector< HeaderInfo >::const_iterator &endIt) override
Creates a buffer and initializes it with info read from files on disk.