OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CachingFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #include "Shared/misc.h"
25 
26 #include <boost/filesystem.hpp>
27 
28 #include <fstream>
29 
30 namespace bf = boost::filesystem;
31 
32 namespace {
33 size_t size_of_dir(const std::string& dir) {
34  size_t space_used = 0;
35  if (bf::exists(dir)) {
36  for (const auto& file : bf::recursive_directory_iterator(dir)) {
37  if (bf::is_regular_file(file.path())) {
38  space_used += bf::file_size(file.path());
39  }
40  }
41  }
42  return space_used;
43 }
44 
46  ChunkKey ret;
47  try {
48  ret = alg.evictNextChunk();
49  } catch (const NoEntryFoundException& e) {
50  LOG(FATAL) << "Disk cache needs to evict data to make space, but no data found in "
51  "eviction queue.";
52  }
53  return ret;
54 }
55 
56 } // namespace
57 
58 namespace File_Namespace {
59 
60 std::string CachingFileMgr::dump() const {
61  std::stringstream ss;
62  ss << "Dump Cache:\n";
63  for (const auto& [key, buf] : chunkIndex_) {
64  ss << " " << show_chunk(key) << " num_pages: " << buf->pageCount()
65  << ", is dirty: " << buf->isDirty() << "\n";
66  }
67  ss << "Data Eviction Queue:\n" << chunk_evict_alg_.dumpEvictionQueue();
68  ss << "Metadata Eviction Queue:\n" << table_evict_alg_.dumpEvictionQueue();
69  ss << "\n";
70  return ss.str();
71 }
72 
74  : FileMgr(config.page_size, DEFAULT_METADATA_PAGE_SIZE) {
75  fileMgrBasePath_ = config.path;
77  nextFileId_ = 0;
78  max_size_ = config.size_limit;
79  init(config.num_reader_threads);
80  setMaxSizes();
81 }
82 
84 
85 void CachingFileMgr::init(const size_t num_reader_threads) {
88  auto open_files_result = openFiles();
89  /* Sort headerVec so that all HeaderInfos
90  * from a chunk will be grouped together
91  * and in order of increasing PageId
92  * - Version Epoch */
93  auto& header_vec = open_files_result.header_infos;
94  std::sort(header_vec.begin(), header_vec.end());
95 
96  /* Goal of next section is to find sequences in the
97  * sorted headerVec of the same ChunkId, which we
98  * can then initiate a FileBuffer with */
99  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
100  if (header_vec.size() > 0) {
101  auto startIt = header_vec.begin();
102  ChunkKey lastChunkKey = startIt->chunkKey;
103  for (auto it = header_vec.begin() + 1; it != header_vec.end(); ++it) {
104  if (it->chunkKey != lastChunkKey) {
105  createBufferFromHeaders(lastChunkKey, startIt, it);
106  lastChunkKey = it->chunkKey;
107  startIt = it;
108  }
109  }
110  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
111  }
112  nextFileId_ = open_files_result.max_file_id + 1;
114  freePages();
115  initializeNumThreads(num_reader_threads);
116  isFullyInitted_ = true;
117 }
118 
121  bf::path path(fileMgrBasePath_);
122  CHECK(bf::exists(path)) << "Cache path: " << fileMgrBasePath_ << " does not exit.";
123  CHECK(bf::is_directory(path))
124  << "Specified path '" << fileMgrBasePath_ << "' for disk cache is not a directory.";
125 
126  // Look for directories with table-specific names.
127  boost::regex table_filter("table_([0-9]+)_([0-9]+)");
128  for (const auto& file : bf::directory_iterator(path)) {
129  boost::smatch match;
130  auto file_name = file.path().filename().string();
131  if (boost::regex_match(file_name, match, table_filter)) {
132  int32_t db_id = std::stoi(match[1]);
133  int32_t tb_id = std::stoi(match[2]);
134  TablePair table_pair{db_id, tb_id};
135  CHECK(table_dirs_.find(table_pair) == table_dirs_.end())
136  << "Trying to read data for existing table";
137  table_dirs_.emplace(table_pair,
138  std::make_unique<TableFileMgr>(file.path().string()));
139  }
140  }
141 }
142 
143 int32_t CachingFileMgr::epoch(int32_t db_id, int32_t tb_id) const {
145  auto tables_it = table_dirs_.find({db_id, tb_id});
146  if (tables_it == table_dirs_.end()) {
147  // If there is no directory for this table, that means the cache does not recognize
148  // the table that is requested. This can happen if a table was dropped, and it's
149  // pages were invalidated but not yet freed and then the server crashed before they
150  // were freed. Upon re-starting the FileMgr will find these pages and attempt to
151  // compare their epoch to know if they are valid or not. In this case we should
152  // return an invalid epoch to indicate that any page for this table is not valid and
153  // should be freed.
155  }
156  auto& [pair, table_dir] = *tables_it;
157  return table_dir->getEpoch();
158 }
159 
160 void CachingFileMgr::incrementEpoch(int32_t db_id, int32_t tb_id) {
162  auto tables_it = table_dirs_.find({db_id, tb_id});
163  CHECK(tables_it != table_dirs_.end());
164  auto& [pair, table_dir] = *tables_it;
165  table_dir->incrementEpoch();
166 }
167 
168 void CachingFileMgr::writeAndSyncEpochToDisk(int32_t db_id, int32_t tb_id) {
170  auto table_it = table_dirs_.find({db_id, tb_id});
171  CHECK(table_it != table_dirs_.end());
172  table_it->second->writeAndSyncEpochToDisk();
173 }
174 
175 void CachingFileMgr::clearForTable(int32_t db_id, int32_t tb_id) {
176  removeTableBuffers(db_id, tb_id);
177  removeTableFileMgr(db_id, tb_id);
178  freePages();
179 }
180 
181 std::string CachingFileMgr::getTableFileMgrPath(int32_t db_id, int32_t tb_id) const {
182  return getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
183 }
184 
186  {
189  }
190  {
192  table_dirs_.clear();
193  }
194  bf::remove_all(getFileMgrBasePath());
195 }
196 
197 size_t CachingFileMgr::getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
199  size_t space_used = 0;
200  ChunkKey min_table_key{db_id, tb_id};
201  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
202  for (auto it = chunkIndex_.lower_bound(min_table_key);
203  it != chunkIndex_.upper_bound(max_table_key);
204  ++it) {
205  auto& [key, buffer] = *it;
206  space_used += (buffer->numChunkPages() * page_size_);
207  }
208  return space_used;
209 }
210 
212  int32_t tb_id) const {
214  size_t space_used = 0;
215  ChunkKey min_table_key{db_id, tb_id};
216  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
217  for (auto it = chunkIndex_.lower_bound(min_table_key);
218  it != chunkIndex_.upper_bound(max_table_key);
219  ++it) {
220  auto& [key, buffer] = *it;
221  space_used += (buffer->numMetadataPages() * metadata_page_size_);
222  }
223  return space_used;
224 }
225 
226 size_t CachingFileMgr::getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const {
228  size_t space = 0;
229  auto table_it = table_dirs_.find({db_id, tb_id});
230  if (table_it != table_dirs_.end()) {
231  space += table_it->second->getReservedSpace();
232  }
233  return space;
234 }
235 
236 size_t CachingFileMgr::getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
237  auto chunk_space = getChunkSpaceReservedByTable(db_id, tb_id);
238  auto meta_space = getMetadataSpaceReservedByTable(db_id, tb_id);
239  auto subdir_space = getTableFileMgrSpaceReserved(db_id, tb_id);
240  return chunk_space + meta_space + subdir_space;
241 }
242 
243 std::string CachingFileMgr::describeSelf() const {
244  return "cache";
245 }
246 
247 // Similar to FileMgr::checkpoint() but only writes a subset of buffers.
248 void CachingFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
249  {
251  CHECK(table_dirs_.find({db_id, tb_id}) != table_dirs_.end());
252  }
253  VLOG(2) << "Checkpointing " << describeSelf() << " (" << db_id << ", " << tb_id
254  << ") epoch: " << epoch(db_id, tb_id);
255  writeDirtyBuffers(db_id, tb_id);
256  syncFilesToDisk();
257  writeAndSyncEpochToDisk(db_id, tb_id);
258  incrementEpoch(db_id, tb_id);
259  freePages();
260 }
261 
262 void CachingFileMgr::createTableFileMgrIfNoneExists(const int32_t db_id,
263  const int32_t tb_id) {
265  TablePair table_pair{db_id, tb_id};
266  if (table_dirs_.find(table_pair) == table_dirs_.end()) {
267  table_dirs_.emplace(
268  table_pair, std::make_unique<TableFileMgr>(getTableFileMgrPath(db_id, tb_id)));
269  }
270 }
271 
272 FileBuffer* CachingFileMgr::createBufferUnlocked(const ChunkKey& key,
273  const size_t page_size,
274  const size_t num_bytes) {
275  touchKey(key);
276  auto [db_id, tb_id] = get_table_prefix(key);
277  createTableFileMgrIfNoneExists(db_id, tb_id);
278  return FileMgr::createBufferUnlocked(key, page_size, num_bytes);
279 }
280 
281 FileBuffer* CachingFileMgr::createBufferFromHeaders(
282  const ChunkKey& key,
283  const std::vector<HeaderInfo>::const_iterator& startIt,
284  const std::vector<HeaderInfo>::const_iterator& endIt) {
285  if (startIt->pageId != -1) {
286  // If the first pageId is not -1 then there is no metadata page for the
287  // current key (which means it was never checkpointed), so we should skip.
288  return nullptr;
289  }
290  touchKey(key);
291  auto [db_id, tb_id] = get_table_prefix(key);
292  createTableFileMgrIfNoneExists(db_id, tb_id);
293  auto buffer = FileMgr::createBufferFromHeaders(key, startIt, endIt);
294  if (buffer->isMissingPages()) {
295  // Detect the case where a page is missing by comparing the amount of pages read
296  // with the metadata size. If data are missing, discard the chunk.
297  buffer->freeChunkPages();
298  }
299  return buffer;
300 }
301 
308 FileBuffer* CachingFileMgr::putBuffer(const ChunkKey& key,
309  AbstractBuffer* src_buffer,
310  const size_t num_bytes) {
311  CHECK(!src_buffer->isDirty()) << "Cannot cache dirty buffers.";
312  deleteBufferIfExists(key);
313  // Since the buffer is not dirty we mark it as dirty if we are only writing metadata and
314  // appended if we are writing chunk data. We delete + append rather than write to make
315  // sure we don't write multiple page versions.
316  (src_buffer->size() == 0) ? src_buffer->setDirty() : src_buffer->setAppended();
317  return FileMgr::putBuffer(key, src_buffer, num_bytes);
318 }
319 
320 void CachingFileMgr::incrementAllEpochs() {
322  for (auto& table_dir : table_dirs_) {
323  table_dir.second->incrementEpoch();
324  }
325 }
326 
327 void CachingFileMgr::removeTableFileMgr(int32_t db_id, int32_t tb_id) {
328  // Delete table-specific directory (stores table epoch data and serialized data wrapper)
330  auto it = table_dirs_.find({db_id, tb_id});
331  if (it != table_dirs_.end()) {
332  it->second->removeDiskContent();
333  table_dirs_.erase(it);
334  }
335 }
336 
337 void CachingFileMgr::removeTableBuffers(int32_t db_id, int32_t tb_id) {
338  // Free associated FileBuffers and clear buffer entries.
340  ChunkKey min_table_key{db_id, tb_id};
341  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
342  for (auto it = chunkIndex_.lower_bound(min_table_key);
343  it != chunkIndex_.upper_bound(max_table_key);) {
344  it = deleteBufferUnlocked(it);
345  }
346 }
347 
348 CachingFileBuffer* CachingFileMgr::allocateBuffer(const size_t page_size,
349  const ChunkKey& key,
350  const size_t num_bytes) {
351  return new CachingFileBuffer(this, page_size, key, num_bytes);
352 }
353 
354 CachingFileBuffer* CachingFileMgr::allocateBuffer(
355  const ChunkKey& key,
356  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
357  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
358  return new CachingFileBuffer(this, key, headerStartIt, headerEndIt);
359 }
360 
361 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
362 bool CachingFileMgr::updatePageIfDeleted(FileInfo* file_info,
363  ChunkKey& chunk_key,
364  int32_t contingent,
365  int32_t page_epoch,
366  int32_t page_num) {
367  // These contingents are stored by overwriting the bytes used for chunkKeys. If
368  // we run into a key marked for deletion in a fileMgr with no fileMgrKey (i.e.
369  // CachingFileMgr) then we can't know if the epoch is valid because we don't know
370  // the key. At this point our only option is to free the page as though it was
371  // checkpointed (which should be fine since we only maintain one version of each
372  // page).
373  if (contingent == DELETE_CONTINGENT || contingent == ROLLOFF_CONTINGENT) {
374  file_info->freePage(page_num, false, page_epoch);
375  return true;
376  }
377  return false;
378 }
379 
380 void CachingFileMgr::writeDirtyBuffers(int32_t db_id, int32_t tb_id) {
381  heavyai::unique_lock<heavyai::shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
382  ChunkKey min_table_key{db_id, tb_id};
383  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
384 
385  for (auto chunk_it = chunkIndex_.lower_bound(min_table_key);
386  chunk_it != chunkIndex_.upper_bound(max_table_key);
387  ++chunk_it) {
388  if (auto [key, buf] = *chunk_it; buf->isDirty()) {
389  // Free previous versions first so we only have one metadata version.
390  buf->freeMetadataPages();
391  buf->writeMetadata(epoch(db_id, tb_id));
392  buf->clearDirtyBits();
393  touchKey(key);
394  }
395  }
396 }
397 
398 void CachingFileMgr::deleteBufferIfExists(const ChunkKey& key) {
399  heavyai::unique_lock<heavyai::shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
400  auto chunk_it = chunkIndex_.find(key);
401  if (chunk_it != chunkIndex_.end()) {
402  deleteBufferUnlocked(chunk_it);
403  }
404 }
405 
406 size_t CachingFileMgr::getNumDataChunks() const {
408  size_t num_chunks = 0;
409  for (auto [key, buf] : chunkIndex_) {
410  if (buf->hasDataPages()) {
411  num_chunks++;
412  }
413  }
414  return num_chunks;
415 }
416 
417 void CachingFileMgr::deleteCacheIfTooLarge() {
418  if (size_of_dir(fileMgrBasePath_) > max_size_) {
419  closeRemovePhysical();
420  bf::create_directory(fileMgrBasePath_);
421  LOG(INFO) << "Cache path over limit. Existing cache deleted.";
422  }
423 }
424 
425 Page CachingFileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
426  std::lock_guard<std::mutex> lock(getPageMutex_);
427  int32_t pageNum = -1;
428  // Splits files into metadata and regular data by size.
429  auto candidateFiles = fileIndex_.equal_range(pageSize);
430  // Check if there is a free page in an existing file.
431  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
432  FileInfo* fileInfo = files_.at(fileIt->second);
433  pageNum = fileInfo->getFreePage();
434  if (pageNum != -1) {
435  return (Page(fileInfo->fileId, pageNum));
436  }
437  }
438 
439  // Try to add a new file if there is free space available.
440  FileInfo* fileInfo = nullptr;
441  if (isMetadata) {
442  if (getMaxMetaFiles() > getNumMetaFiles()) {
443  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
444  }
445  } else {
446  if (getMaxDataFiles() > getNumDataFiles()) {
447  fileInfo = createFile(pageSize, num_pages_per_data_file_);
448  }
449  }
450 
451  if (!fileInfo) {
452  // We were not able to create a new file, so we try to evict space.
453  // Eviction will return the first file it evicted a page from (a file now guaranteed
454  // to have a free page).
455  fileInfo = isMetadata ? evictMetadataPages() : evictPages();
456  }
457  CHECK(fileInfo);
458 
459  pageNum = fileInfo->getFreePage();
460  CHECK(pageNum != -1);
461  return (Page(fileInfo->fileId, pageNum));
462 }
463 
464 std::vector<ChunkKey> CachingFileMgr::getKeysForTable(int32_t db_id,
465  int32_t tb_id) const {
466  std::vector<ChunkKey> keys;
467  ChunkKey min_table_key{db_id, tb_id};
468  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
469  for (auto it = chunkIndex_.lower_bound(min_table_key);
470  it != chunkIndex_.upper_bound(max_table_key);
471  ++it) {
472  keys.emplace_back(it->first);
473  }
474  return keys;
475 }
476 
477 FileInfo* CachingFileMgr::evictMetadataPages() {
478  // Locks should already be in place before calling this method.
479  FileInfo* file_info{nullptr};
480  auto key_to_evict = evict_chunk_or_fail(table_evict_alg_);
481  auto [db_id, tb_id] = get_table_prefix(key_to_evict);
482  const auto keys = getKeysForTable(db_id, tb_id);
483  for (const auto& key : keys) {
484  auto chunk_it = chunkIndex_.find(key);
485  CHECK(chunk_it != chunkIndex_.end());
486  auto& buf = chunk_it->second;
487  if (!file_info) {
488  // Return the FileInfo for the first file we are freeing a page from so that the
489  // caller does not have to search for a FileInfo guaranteed to have at least one
490  // free page.
491  CHECK(buf->getMetadataPage().pageVersions.size() > 0);
492  file_info =
493  getFileInfoForFileId(buf->getMetadataPage().pageVersions.front().page.fileId);
494  }
495  // We erase all pages and entries for the chunk, as without metadata all other
496  // entries are useless.
497  deleteBufferUnlocked(chunk_it);
498  }
499  // Serialized datawrappers require metadata to be in the cache.
500  deleteWrapperFile(db_id, tb_id);
501  CHECK(file_info) << "FileInfo with freed page not found";
502  return file_info;
503 }
504 
505 FileInfo* CachingFileMgr::evictPages() {
506  FileInfo* file_info{nullptr};
507  FileBuffer* buf{nullptr};
508  while (!file_info) {
509  buf = chunkIndex_.at(evict_chunk_or_fail(chunk_evict_alg_));
510  CHECK(buf);
511  if (!buf->hasDataPages()) {
512  // This buffer contains no chunk data (metadata only, uninitialized, size == 0,
513  // etc...) so we won't recover any space by evicting it. In this case it gets
514  // removed from the eviction queue (it will get re-added if it gets populated with
515  // data) and we look at the next chunk in queue until we find a buffer with page
516  // data.
517  continue;
518  }
519  // Return the FileInfo for the first file we are freeing a page from so that the
520  // caller does not have to search for a FileInfo guaranteed to have at least one free
521  // page.
522  CHECK(buf->getMultiPage().front().pageVersions.size() > 0);
523  file_info = getFileInfoForFileId(
524  buf->getMultiPage().front().pageVersions.front().page.fileId);
525  }
526  auto pages_freed = buf->freeChunkPages();
527  CHECK(pages_freed > 0) << "failed to evict a page";
528  CHECK(file_info) << "FileInfo with freed page not found";
529  return file_info;
530 }
531 
532 void CachingFileMgr::touchKey(const ChunkKey& key) const {
533  chunk_evict_alg_.touchChunk(key);
534  table_evict_alg_.touchChunk(get_table_key(key));
535 }
536 
537 void CachingFileMgr::removeKey(const ChunkKey& key) const {
538  // chunkIndex lock should already be acquired.
539  chunk_evict_alg_.removeChunk(key);
540  auto [db_id, tb_id] = get_table_prefix(key);
541  ChunkKey table_key{db_id, tb_id};
542  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
543  for (auto it = chunkIndex_.lower_bound(table_key);
544  it != chunkIndex_.upper_bound(max_table_key);
545  ++it) {
546  if (it->first != key) {
547  // If there are any keys in this table other than that one we are removing, then
548  // keep the table in the eviction queue.
549  return;
550  }
551  }
552  // No other keys exist for this table, so remove it from the queue.
553  table_evict_alg_.removeChunk(table_key);
554 }
555 
556 size_t CachingFileMgr::getFilesSize() const {
558  size_t sum = 0;
559  for (auto [id, file] : files_) {
560  sum += file->size();
561  }
562  return sum;
563 }
564 
565 size_t CachingFileMgr::getTableFileMgrsSize() const {
567  size_t space_used = 0;
568  for (const auto& [pair, table_dir] : table_dirs_) {
569  space_used += table_dir->getReservedSpace();
570  }
571  return space_used;
572 }
573 
574 size_t CachingFileMgr::getNumDataFiles() const {
576  return fileIndex_.count(page_size_);
577 }
578 
579 size_t CachingFileMgr::getNumMetaFiles() const {
581  return fileIndex_.count(metadata_page_size_);
582 }
583 
584 std::vector<ChunkKey> CachingFileMgr::getChunkKeysForPrefix(
585  const ChunkKey& prefix) const {
587  std::vector<ChunkKey> chunks;
588  for (auto [key, buf] : chunkIndex_) {
589  if (in_same_table(key, prefix)) {
590  if (buf->hasDataPages()) {
591  chunks.emplace_back(key);
592  touchKey(key);
593  }
594  }
595  }
596  return chunks;
597 }
598 
599 void CachingFileMgr::removeChunkKeepMetadata(const ChunkKey& key) {
600  if (isBufferOnDevice(key)) {
601  auto chunkIt = chunkIndex_.find(key);
602  CHECK(chunkIt != chunkIndex_.end());
603  auto& buf = chunkIt->second;
604  if (buf->hasDataPages()) {
605  buf->freeChunkPages();
606  chunk_evict_alg_.removeChunk(key);
607  }
608  }
609 }
610 
611 size_t CachingFileMgr::getNumChunksWithMetadata() const {
613  size_t sum = 0;
614  for (const auto& [key, buf] : chunkIndex_) {
615  if (buf->hasEncoder()) {
616  sum++;
617  }
618  }
619  return sum;
620 }
621 
622 std::string CachingFileMgr::dumpKeysWithMetadata() const {
624  std::string ret_string = "CFM keys with metadata:\n";
625  for (const auto& [key, buf] : chunkIndex_) {
626  if (buf->hasEncoder()) {
627  ret_string += " " + show_chunk(key) + "\n";
628  }
629  }
630  return ret_string;
631 }
632 
633 std::string CachingFileMgr::dumpKeysWithChunkData() const {
635  std::string ret_string = "CFM keys with chunk data:\n";
636  for (const auto& [key, buf] : chunkIndex_) {
637  if (buf->hasDataPages()) {
638  ret_string += " " + show_chunk(key) + "\n";
639  }
640  }
641  return ret_string;
642 }
643 
644 std::unique_ptr<CachingFileMgr> CachingFileMgr::reconstruct() const {
645  DiskCacheConfig config{
646  fileMgrBasePath_, DiskCacheLevel::none, num_reader_threads_, max_size_, page_size_};
647  return std::make_unique<CachingFileMgr>(config);
648 }
649 
650 void CachingFileMgr::deleteWrapperFile(int32_t db, int32_t tb) {
652  auto it = table_dirs_.find({db, tb});
653  CHECK(it != table_dirs_.end());
654  it->second->deleteWrapperFile();
655 }
656 
657 void CachingFileMgr::writeWrapperFile(const std::string& doc, int32_t db, int32_t tb) {
658  createTableFileMgrIfNoneExists(db, tb);
659  auto wrapper_size = doc.size();
660  CHECK_LE(wrapper_size, getMaxWrapperSize())
661  << "Wrapper is too big to fit into the cache";
662  while (wrapper_size > getAvailableWrapperSpace()) {
663  evictMetadataPages();
664  }
666  table_dirs_.at({db, tb})->writeWrapperFile(doc);
667 }
668 
669 bool CachingFileMgr::hasWrapperFile(int32_t db_id, int32_t table_id) const {
671  auto it = table_dirs_.find({db_id, table_id});
672  if (it != table_dirs_.end()) {
673  return it->second->hasWrapperFile();
674  }
675  return false;
676 }
677 
678 /*
679  * While the CFM allows for multiple tables to share the same allocated files for chunk
680  * data and metadata, space cannot be reallocated between metadata files and data files
681  * (once the space has been reserve for a data file the file won't be deleted unless the
682  * cache is deleted). To prevent a case where we have allocated too many files of one
683  * type to the detrement of the other, we have a minimum portion of the cache that is
684  * reserved for each type. This default ratio gives %9 of space to data wrappers, %1 to
685  * metadata files, and %90 to data files.
686  */
687 void CachingFileMgr::setMaxSizes() {
688  size_t max_meta_space = std::floor(max_size_ * METADATA_SPACE_PERCENTAGE);
689  size_t max_meta_file_space = std::floor(max_size_ * METADATA_FILE_SPACE_PERCENTAGE);
690  max_wrapper_space_ = max_meta_space - max_meta_file_space;
691  auto max_data_space = max_size_ - max_meta_space;
692  auto meta_file_size = metadata_page_size_ * num_pages_per_metadata_file_;
693  auto data_file_size = page_size_ * num_pages_per_data_file_;
694  max_num_data_files_ = max_data_space / data_file_size;
695  max_num_meta_files_ = max_meta_file_space / meta_file_size;
696  CHECK_GT(max_num_data_files_, 0U) << "Cannot create a cache of size " << max_size_
697  << ". Not enough space to create a data file.";
698  CHECK_GT(max_num_meta_files_, 0U) << "Cannot create a cache of size " << max_size_
699  << ". Not enough space to create a metadata file.";
700 }
701 
702 std::optional<FileBuffer*> CachingFileMgr::getBufferIfExists(const ChunkKey& key) {
703  heavyai::shared_lock<heavyai::shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
704  auto chunk_it = chunkIndex_.find(key);
705  if (chunk_it == chunkIndex_.end()) {
706  return {};
707  }
708  return getBufferUnlocked(key);
709 }
710 
711 ChunkKeyToChunkMap::iterator CachingFileMgr::deleteBufferUnlocked(
712  const ChunkKeyToChunkMap::iterator chunk_it,
713  const bool purge) {
714  removeKey(chunk_it->first);
715  return FileMgr::deleteBufferUnlocked(chunk_it, purge);
716 }
717 
718 void CachingFileMgr::getChunkMetadataVecForKeyPrefix(
719  ChunkMetadataVector& chunkMetadataVec,
720  const ChunkKey& keyPrefix) {
721  FileMgr::getChunkMetadataVecForKeyPrefix(chunkMetadataVec, keyPrefix);
722  for (const auto& [key, meta] : chunkMetadataVec) {
723  touchKey(key);
724  }
725 }
726 
727 FileBuffer* CachingFileMgr::getBufferUnlocked(const ChunkKey& key,
728  const size_t num_bytes) const {
729  touchKey(key);
730  return FileMgr::getBufferUnlocked(key, num_bytes);
731 }
732 
733 void CachingFileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
734  page.first->freePageDeferred(page.second);
735 }
736 
737 std::set<ChunkKey> CachingFileMgr::getKeysWithMetadata() const {
739  std::set<ChunkKey> ret;
740  for (const auto& [key, buf] : chunkIndex_) {
741  if (buf->hasEncoder()) {
742  ret.emplace(key);
743  }
744  }
745  return ret;
746 }
747 
748 size_t CachingFileMgr::getMaxDataFilesSize() const {
749  if (limit_data_size_) {
750  return *limit_data_size_;
751  }
752  return getMaxDataFiles() * getDataFileSize();
753 }
754 
755 TableFileMgr::TableFileMgr(const std::string& table_path)
756  : table_path_(table_path)
757  , epoch_file_path_(table_path_ + "/" + FileMgr::EPOCH_FILENAME)
758  , wrapper_file_path_(table_path_ + "/" + CachingFileMgr::WRAPPER_FILE_NAME)
759  , epoch_(Epoch())
760  , is_checkpointed_(true) {
761  if (!bf::exists(table_path_)) {
762  bf::create_directory(table_path_);
763  } else {
764  CHECK(bf::is_directory(table_path_)) << "Specified path '" << table_path_
765  << "' for cache table data is not a directory.";
766  }
767  if (bf::exists(epoch_file_path_)) {
768  CHECK(bf::is_regular_file(epoch_file_path_))
769  << "Found epoch file '" << epoch_file_path_ << "' which is not a regular file";
771  << "Found epoch file '" << epoch_file_path_ << "' which is not of expected size";
774  } else {
777  incrementEpoch();
778  }
779 }
780 
783  epoch_.increment();
784  is_checkpointed_ = false;
786  << "Epoch greater than maximum allowed value (" << epoch_.ceiling() << " > "
787  << Epoch::max_allowable_epoch() << ").";
788 }
789 
790 int32_t TableFileMgr::getEpoch() const {
792  return static_cast<int32_t>(epoch_.ceiling());
793 }
794 
798  int32_t status = fflush(epoch_file_);
799  CHECK(status == 0) << "Could not flush epoch file to disk";
800 #ifdef __APPLE__
801  status = fcntl(fileno(epoch_file_), 51);
802 #else
803  status = heavyai::fsync(fileno(epoch_file_));
804 #endif
805  CHECK(status == 0) << "Could not sync epoch file to disk";
806  is_checkpointed_ = true;
807 }
808 
811  bf::remove_all(table_path_);
812 }
813 
816  size_t space = 0;
817  for (const auto& file : bf::recursive_directory_iterator(table_path_)) {
818  if (bf::is_regular_file(file.path())) {
819  space += bf::file_size(file.path());
820  }
821  }
822  return space;
823 }
824 
827  bf::remove_all(wrapper_file_path_);
828 }
829 
830 void TableFileMgr::writeWrapperFile(const std::string& doc) const {
832  std::ofstream ofs(wrapper_file_path_);
833  if (!ofs) {
834  throw std::runtime_error{"Error trying to create file \"" + wrapper_file_path_ +
835  "\". The error was: " + std::strerror(errno)};
836  }
837  ofs << doc;
838 }
839 
842  return bf::exists(wrapper_file_path_);
843 }
844 
845 } // namespace File_Namespace
const size_t metadata_page_size_
Definition: FileMgr.h:536
size_t getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const
std::vector< int > ChunkKey
Definition: types.h:36
OpenFilesResult openFiles()
Definition: FileMgr.cpp:196
LRUEvictionAlgorithm table_evict_alg_
int8_t * storage_ptr()
Definition: Epoch.h:61
void removeDiskContent() const
Removes all disk data for the subdir.
const size_t page_size_
Definition: FileMgr.h:535
heavyai::shared_lock< heavyai::shared_mutex > read_lock
const ChunkKey evictNextChunk() override
This file details an extension of the FileMgr that can contain pages from multiple tables (CachingFil...
size_t size_of_dir(const std::string &dir)
heavyai::shared_mutex table_dirs_mutex_
std::string get_dir_name_for_table(int db_id, int tb_id)
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:283
void setMaxSizes()
Sets the maximum number of files/space for each type of storage based on the maximum size...
void writeAndSyncEpochToDisk()
Write and flush the epoch to the epoch file on disk.
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::string describeSelf() const override
describes this FileMgr for logging purposes.
size_t getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
void closeRemovePhysical() override
Closes files and removes the caching directory.
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:331
#define DEFAULT_METADATA_PAGE_SIZE
size_t getMetadataSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
int32_t ceiling() const
Definition: Epoch.h:44
heavyai::unique_lock< heavyai::shared_mutex > write_lock
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:57
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:703
static int64_t max_allowable_epoch()
Definition: Epoch.h:69
#define CHECK_GT(x, y)
Definition: Logger.h:301
std::string fileMgrBasePath_
Definition: FileMgr.h:397
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:57
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:57
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
CachingFileMgr(const DiskCacheConfig &config)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
Definition: FileInfo.cpp:185
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:150
std::shared_lock< T > shared_lock
void deleteCacheIfTooLarge()
When the cache is read from disk, we don&#39;t know which chunks were least recently used. Rather than try to evict random pages to get down to size we just reset the cache to make sure we have space.
heavyai::shared_mutex table_mutex_
void incrementAllEpochs()
Increment epochs for each table in the CFM.
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:142
int32_t incrementEpoch()
Definition: FileMgr.h:281
int32_t increment()
Definition: Epoch.h:54
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
Definition: FileInfo.h:51
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:326
void removeTableBuffers(int32_t db_id, int32_t tb_id)
Erases and cleans up all buffers for a table.
std::unique_lock< T > unique_lock
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void deleteWrapperFile() const
Deletes only the wrapper file on disk.
An AbstractBuffer is a unit of data management for a data manager.
void incrementEpoch()
increment the epoch for this subdir (not synced to disk).
bool g_enable_smem_group_by true
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:656
void removeTableFileMgr(int32_t db_id, int32_t tb_id)
Removes the subdirectory content for a table.
constexpr int32_t ROLLOFF_CONTINGENT
Definition: FileInfo.h:52
num_reader_threads_(num_reader_threads)
ChunkKey evict_chunk_or_fail(LRUEvictionAlgorithm &alg)
heavyai::shared_mutex chunkIndexMutex_
Definition: FileMgr.h:410
int32_t maxRollbackEpochs_
Definition: FileMgr.h:396
void writeWrapperFile(const std::string &doc) const
Writes wrapper file to disk.
#define CHECK_LE(x, y)
Definition: Logger.h:300
Definition: Epoch.h:30
static size_t byte_size()
Definition: Epoch.h:63
size_t getReservedSpace() const
Returns the disk space used (in bytes) for the subdir.
std::map< TablePair, std::unique_ptr< TableFileMgr > > table_dirs_
void readTableFileMgrs()
Checks for any sub-directories containing table-specific data and creates epochs from found files...
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:107
int fsync(int fd)
Definition: heavyai_fs.cpp:62
int32_t epoch() const
Definition: FileMgr.h:517
#define CHECK(condition)
Definition: Logger.h:289
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1582
void init(const size_t num_reader_threads)
Initializes a CFM, parsing any existing files and initializing data structures appropriately (current...
heavyai::shared_mutex files_rw_mutex_
Definition: FileMgr.h:411
bool in_same_table(const ChunkKey &left_key, const ChunkKey &right_key)
Definition: types.h:83
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:91
unsigned nextFileId_
number of threads used when loading data
Definition: FileMgr.h:403
size_t getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
int32_t getEpoch() const
Returns the current epoch (locked)
std::string getTableFileMgrPath(int32_t db, int32_t tb) const
void clearForTable(int32_t db_id, int32_t tb_id)
Removes all data related to the given table (pages and subdirectories).
A FileMgr capable of limiting it&#39;s size and storing data from multiple tables in a shared directory...
#define VLOG(n)
Definition: Logger.h:383
LRUEvictionAlgorithm chunk_evict_alg_
FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &startIt, const std::vector< HeaderInfo >::const_iterator &endIt) override
Creates a buffer and initializes it with info read from files on disk.