OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CachingFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #include <boost/filesystem.hpp>
25 #include <fstream>
26 #include "Shared/misc.h"
27 
28 namespace bf = boost::filesystem;
29 
30 namespace {
31 size_t size_of_dir(const std::string& dir) {
32  size_t space_used = 0;
33  if (bf::exists(dir)) {
34  for (const auto& file : bf::recursive_directory_iterator(dir)) {
35  if (bf::is_regular_file(file.path())) {
36  space_used += bf::file_size(file.path());
37  }
38  }
39  }
40  return space_used;
41 }
42 
44  ChunkKey ret;
45  try {
46  ret = alg.evictNextChunk();
47  } catch (const NoEntryFoundException& e) {
48  LOG(FATAL) << "Disk cache needs to evict data to make space, but no data found in "
49  "eviction queue.";
50  }
51  return ret;
52 }
53 
54 } // namespace
55 
56 namespace File_Namespace {
57 
58 std::string CachingFileMgr::dump() const {
59  std::stringstream ss;
60  ss << "Dump Cache:\n";
61  for (const auto& [key, buf] : chunkIndex_) {
62  ss << " " << show_chunk(key) << " num_pages: " << buf->pageCount()
63  << ", is dirty: " << buf->isDirty() << "\n";
64  }
65  ss << "Data Eviction Queue:\n" << chunk_evict_alg_.dumpEvictionQueue();
66  ss << "Metadata Eviction Queue:\n" << table_evict_alg_.dumpEvictionQueue();
67  ss << "\n";
68  return ss.str();
69 }
70 
72  : FileMgr(config.page_size, config.meta_page_size) {
73  fileMgrBasePath_ = config.path;
75  nextFileId_ = 0;
76  max_size_ = config.size_limit;
77  init(config.num_reader_threads);
78  setMaxSizes();
79 }
80 
82 
83 void CachingFileMgr::init(const size_t num_reader_threads) {
86  auto open_files_result = openFiles();
87  /* Sort headerVec so that all HeaderInfos
88  * from a chunk will be grouped together
89  * and in order of increasing PageId
90  * - Version Epoch */
91  auto& header_vec = open_files_result.header_infos;
92  std::sort(header_vec.begin(), header_vec.end());
93 
94  /* Goal of next section is to find sequences in the
95  * sorted headerVec of the same ChunkId, which we
96  * can then initiate a FileBuffer with */
97  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
98  if (header_vec.size() > 0) {
99  auto startIt = header_vec.begin();
100  ChunkKey lastChunkKey = startIt->chunkKey;
101  for (auto it = header_vec.begin() + 1; it != header_vec.end(); ++it) {
102  if (it->chunkKey != lastChunkKey) {
103  createBufferFromHeaders(lastChunkKey, startIt, it);
104  lastChunkKey = it->chunkKey;
105  startIt = it;
106  }
107  }
108  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
109  }
110  nextFileId_ = open_files_result.max_file_id + 1;
112  freePages();
113  initializeNumThreads(num_reader_threads);
114  isFullyInitted_ = true;
115 }
116 
119  bf::path path(fileMgrBasePath_);
120  CHECK(bf::exists(path)) << "Cache path: " << fileMgrBasePath_ << " does not exit.";
121  CHECK(bf::is_directory(path))
122  << "Specified path '" << fileMgrBasePath_ << "' for disk cache is not a directory.";
123 
124  // Look for directories with table-specific names.
125  boost::regex table_filter("table_([0-9]+)_([0-9]+)");
126  for (const auto& file : bf::directory_iterator(path)) {
127  boost::smatch match;
128  auto file_name = file.path().filename().string();
129  if (boost::regex_match(file_name, match, table_filter)) {
130  int32_t db_id = std::stoi(match[1]);
131  int32_t tb_id = std::stoi(match[2]);
132  TablePair table_pair{db_id, tb_id};
133  CHECK(table_dirs_.find(table_pair) == table_dirs_.end())
134  << "Trying to read data for existing table";
135  table_dirs_.emplace(table_pair,
136  std::make_unique<TableFileMgr>(file.path().string()));
137  }
138  }
139 }
140 
141 int32_t CachingFileMgr::epoch(int32_t db_id, int32_t tb_id) const {
143  auto tables_it = table_dirs_.find({db_id, tb_id});
144  if (tables_it == table_dirs_.end()) {
145  // If there is no directory for this table, that means the cache does not recognize
146  // the table that is requested. This can happen if a table was dropped, and it's
147  // pages were invalidated but not yet freed and then the server crashed before they
148  // were freed. Upon re-starting the FileMgr will find these pages and attempt to
149  // compare their epoch to know if they are valid or not. In this case we should
150  // return an invalid epoch to indicate that any page for this table is not valid and
151  // should be freed.
153  }
154  auto& [pair, table_dir] = *tables_it;
155  return table_dir->getEpoch();
156 }
157 
158 void CachingFileMgr::incrementEpoch(int32_t db_id, int32_t tb_id) {
160  auto tables_it = table_dirs_.find({db_id, tb_id});
161  CHECK(tables_it != table_dirs_.end());
162  auto& [pair, table_dir] = *tables_it;
163  table_dir->incrementEpoch();
164 }
165 
166 void CachingFileMgr::writeAndSyncEpochToDisk(int32_t db_id, int32_t tb_id) {
168  auto table_it = table_dirs_.find({db_id, tb_id});
169  CHECK(table_it != table_dirs_.end());
170  table_it->second->writeAndSyncEpochToDisk();
171 }
172 
173 void CachingFileMgr::clearForTable(int32_t db_id, int32_t tb_id) {
174  removeTableBuffers(db_id, tb_id);
175  removeTableFileMgr(db_id, tb_id);
176  freePages();
177 }
178 
179 std::string CachingFileMgr::getTableFileMgrPath(int32_t db_id, int32_t tb_id) const {
180  return getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
181 }
182 
184  {
187  }
188  {
190  table_dirs_.clear();
191  }
192  bf::remove_all(getFileMgrBasePath());
193 }
194 
195 size_t CachingFileMgr::getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
197  size_t space_used = 0;
198  ChunkKey min_table_key{db_id, tb_id};
199  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
200  for (auto it = chunkIndex_.lower_bound(min_table_key);
201  it != chunkIndex_.upper_bound(max_table_key);
202  ++it) {
203  auto& [key, buffer] = *it;
204  space_used += (buffer->numChunkPages() * page_size_);
205  }
206  return space_used;
207 }
208 
210  int32_t tb_id) const {
212  size_t space_used = 0;
213  ChunkKey min_table_key{db_id, tb_id};
214  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
215  for (auto it = chunkIndex_.lower_bound(min_table_key);
216  it != chunkIndex_.upper_bound(max_table_key);
217  ++it) {
218  auto& [key, buffer] = *it;
219  space_used += (buffer->numMetadataPages() * metadata_page_size_);
220  }
221  return space_used;
222 }
223 
224 size_t CachingFileMgr::getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const {
226  size_t space = 0;
227  auto table_it = table_dirs_.find({db_id, tb_id});
228  if (table_it != table_dirs_.end()) {
229  space += table_it->second->getReservedSpace();
230  }
231  return space;
232 }
233 
234 size_t CachingFileMgr::getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
235  auto chunk_space = getChunkSpaceReservedByTable(db_id, tb_id);
236  auto meta_space = getMetadataSpaceReservedByTable(db_id, tb_id);
237  auto subdir_space = getTableFileMgrSpaceReserved(db_id, tb_id);
238  return chunk_space + meta_space + subdir_space;
239 }
240 
241 std::string CachingFileMgr::describeSelf() const {
242  return "cache";
243 }
244 
245 // Similar to FileMgr::checkpoint() but only writes a subset of buffers.
246 void CachingFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
247  {
249  CHECK(table_dirs_.find({db_id, tb_id}) != table_dirs_.end()) << "No data for table";
250  }
251  VLOG(2) << "Checkpointing " << describeSelf() << " (" << db_id << ", " << tb_id
252  << ") epoch: " << epoch(db_id, tb_id);
253  writeDirtyBuffers(db_id, tb_id);
254  syncFilesToDisk();
255  writeAndSyncEpochToDisk(db_id, tb_id);
256  incrementEpoch(db_id, tb_id);
257  freePages();
258 }
259 
260 void CachingFileMgr::createTableFileMgrIfNoneExists(const int32_t db_id,
261  const int32_t tb_id) {
262  heavyai::unique_lock<heavyai::shared_mutex> write_lock(table_dirs_mutex_);
263  TablePair table_pair{db_id, tb_id};
264  if (table_dirs_.find(table_pair) == table_dirs_.end()) {
265  table_dirs_.emplace(
266  table_pair, std::make_unique<TableFileMgr>(getTableFileMgrPath(db_id, tb_id)));
267  }
268 }
269 
270 FileBuffer* CachingFileMgr::createBufferUnlocked(const ChunkKey& key,
271  const size_t page_size,
272  const size_t num_bytes) {
273  touchKey(key);
274  auto [db_id, tb_id] = get_table_prefix(key);
275  createTableFileMgrIfNoneExists(db_id, tb_id);
276  return FileMgr::createBufferUnlocked(key, page_size, num_bytes);
277 }
278 
279 FileBuffer* CachingFileMgr::createBufferFromHeaders(
280  const ChunkKey& key,
281  const std::vector<HeaderInfo>::const_iterator& startIt,
282  const std::vector<HeaderInfo>::const_iterator& endIt) {
283  if (startIt->pageId != -1) {
284  // If the first pageId is not -1 then there is no metadata page for the
285  // current key (which means it was never checkpointed), so we should skip.
286  return nullptr;
287  }
288  touchKey(key);
289  auto [db_id, tb_id] = get_table_prefix(key);
290  createTableFileMgrIfNoneExists(db_id, tb_id);
291  auto buffer = FileMgr::createBufferFromHeaders(key, startIt, endIt);
292  if (buffer->isMissingPages()) {
293  // Detect the case where a page is missing by comparing the amount of pages read
294  // with the metadata size. If data are missing, discard the chunk.
295  buffer->freeChunkPages();
296  }
297  return buffer;
298 }
299 
306 FileBuffer* CachingFileMgr::putBuffer(const ChunkKey& key,
307  AbstractBuffer* src_buffer,
308  const size_t num_bytes) {
309  CHECK(!src_buffer->isDirty()) << "Cannot cache dirty buffers.";
310  deleteBufferIfExists(key);
311  // Since the buffer is not dirty we mark it as dirty if we are only writing metadata and
312  // appended if we are writing chunk data. We delete + append rather than write to make
313  // sure we don't write multiple page versions.
314  (src_buffer->size() == 0) ? src_buffer->setDirty() : src_buffer->setAppended();
315  return FileMgr::putBuffer(key, src_buffer, num_bytes);
316 }
317 
318 void CachingFileMgr::incrementAllEpochs() {
319  heavyai::shared_lock<heavyai::shared_mutex> read_lock(table_dirs_mutex_);
320  for (auto& table_dir : table_dirs_) {
321  table_dir.second->incrementEpoch();
322  }
323 }
324 
325 void CachingFileMgr::removeTableFileMgr(int32_t db_id, int32_t tb_id) {
326  // Delete table-specific directory (stores table epoch data and serialized data wrapper)
327  heavyai::unique_lock<heavyai::shared_mutex> write_lock(table_dirs_mutex_);
328  auto it = table_dirs_.find({db_id, tb_id});
329  if (it != table_dirs_.end()) {
330  it->second->removeDiskContent();
331  table_dirs_.erase(it);
332  }
333 }
334 
335 void CachingFileMgr::removeTableBuffers(int32_t db_id, int32_t tb_id) {
336  // Free associated FileBuffers and clear buffer entries.
337  heavyai::unique_lock<heavyai::shared_mutex> write_lock(chunkIndexMutex_);
338  ChunkKey min_table_key{db_id, tb_id};
339  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
340  for (auto it = chunkIndex_.lower_bound(min_table_key);
341  it != chunkIndex_.upper_bound(max_table_key);) {
342  it = deleteBufferUnlocked(it);
343  }
344 }
345 
346 CachingFileBuffer* CachingFileMgr::allocateBuffer(const size_t page_size,
347  const ChunkKey& key,
348  const size_t num_bytes) {
349  return new CachingFileBuffer(this, page_size, key, num_bytes);
350 }
351 
352 CachingFileBuffer* CachingFileMgr::allocateBuffer(
353  const ChunkKey& key,
354  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
355  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
356  return new CachingFileBuffer(this, key, headerStartIt, headerEndIt);
357 }
358 
359 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
360 bool CachingFileMgr::updatePageIfDeleted(FileInfo* file_info,
361  ChunkKey& chunk_key,
362  int32_t contingent,
363  int32_t page_epoch,
364  int32_t page_num) {
365  // These contingents are stored by overwriting the bytes used for chunkKeys. If
366  // we run into a key marked for deletion in a fileMgr with no fileMgrKey (i.e.
367  // CachingFileMgr) then we can't know if the epoch is valid because we don't know
368  // the key. At this point our only option is to free the page as though it was
369  // checkpointed (which should be fine since we only maintain one version of each
370  // page).
371  if (contingent == DELETE_CONTINGENT || contingent == ROLLOFF_CONTINGENT) {
372  file_info->freePage(page_num, false, page_epoch);
373  return true;
374  }
375  return false;
376 }
377 
378 void CachingFileMgr::writeDirtyBuffers(int32_t db_id, int32_t tb_id) {
379  heavyai::unique_lock<heavyai::shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
380  ChunkKey min_table_key{db_id, tb_id};
381  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
382 
383  for (auto chunk_it = chunkIndex_.lower_bound(min_table_key);
384  chunk_it != chunkIndex_.upper_bound(max_table_key);
385  ++chunk_it) {
386  if (auto [key, buf] = *chunk_it; buf->isDirty()) {
387  // Free previous versions first so we only have one metadata version.
388  buf->freeMetadataPages();
389  buf->writeMetadata(epoch(db_id, tb_id));
390  buf->clearDirtyBits();
391  touchKey(key);
392  }
393  }
394 }
395 
396 void CachingFileMgr::deleteBufferIfExists(const ChunkKey& key) {
397  heavyai::unique_lock<heavyai::shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
398  auto chunk_it = chunkIndex_.find(key);
399  if (chunk_it != chunkIndex_.end()) {
400  deleteBufferUnlocked(chunk_it);
401  }
402 }
403 
404 size_t CachingFileMgr::getNumDataChunks() const {
405  heavyai::shared_lock<heavyai::shared_mutex> read_lock(chunkIndexMutex_);
406  size_t num_chunks = 0;
407  for (auto [key, buf] : chunkIndex_) {
408  if (buf->hasDataPages()) {
409  num_chunks++;
410  }
411  }
412  return num_chunks;
413 }
414 
415 void CachingFileMgr::deleteCacheIfTooLarge() {
416  if (size_of_dir(fileMgrBasePath_) > max_size_) {
417  closeRemovePhysical();
418  bf::create_directory(fileMgrBasePath_);
419  LOG(INFO) << "Cache path over limit. Existing cache deleted.";
420  }
421 }
422 
423 Page CachingFileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
424  std::lock_guard<std::mutex> lock(getPageMutex_);
425  int32_t pageNum = -1;
426  // Splits files into metadata and regular data by size.
427  auto candidateFiles = fileIndex_.equal_range(pageSize);
428  // Check if there is a free page in an existing file.
429  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
430  FileInfo* fileInfo = getFileInfoForFileId(fileIt->second);
431  pageNum = fileInfo->getFreePage();
432  if (pageNum != -1) {
433  return (Page(fileInfo->fileId, pageNum));
434  }
435  }
436 
437  // Try to add a new file if there is free space available.
438  FileInfo* fileInfo = nullptr;
439  if (isMetadata) {
440  if (getMaxMetaFiles() > getNumMetaFiles()) {
441  fileInfo = createFileInfo(pageSize, num_pages_per_metadata_file_);
442  }
443  } else {
444  if (getMaxDataFiles() > getNumDataFiles()) {
445  fileInfo = createFileInfo(pageSize, num_pages_per_data_file_);
446  }
447  }
448 
449  if (!fileInfo) {
450  // We were not able to create a new file, so we try to evict space.
451  // Eviction will return the first file it evicted a page from (a file now guaranteed
452  // to have a free page).
453  fileInfo = isMetadata ? evictMetadataPages() : evictPages();
454  }
455  CHECK(fileInfo);
456 
457  pageNum = fileInfo->getFreePage();
458  CHECK(pageNum != -1);
459  return (Page(fileInfo->fileId, pageNum));
460 }
461 
462 std::vector<ChunkKey> CachingFileMgr::getKeysForTable(int32_t db_id,
463  int32_t tb_id) const {
464  std::vector<ChunkKey> keys;
465  ChunkKey min_table_key{db_id, tb_id};
466  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
467  for (auto it = chunkIndex_.lower_bound(min_table_key);
468  it != chunkIndex_.upper_bound(max_table_key);
469  ++it) {
470  keys.emplace_back(it->first);
471  }
472  return keys;
473 }
474 
475 FileInfo* CachingFileMgr::evictMetadataPages() {
476  // Locks should already be in place before calling this method.
477  FileInfo* file_info{nullptr};
478  auto key_to_evict = evict_chunk_or_fail(table_evict_alg_);
479  auto [db_id, tb_id] = get_table_prefix(key_to_evict);
480  const auto keys = getKeysForTable(db_id, tb_id);
481  for (const auto& key : keys) {
482  auto chunk_it = chunkIndex_.find(key);
483  CHECK(chunk_it != chunkIndex_.end());
484  auto& buf = chunk_it->second;
485  if (!file_info) {
486  // Return the FileInfo for the first file we are freeing a page from so that the
487  // caller does not have to search for a FileInfo guaranteed to have at least one
488  // free page.
489  CHECK(buf->getMetadataPage().pageVersions.size() > 0);
490  file_info =
491  getFileInfoForFileId(buf->getMetadataPage().pageVersions.front().page.fileId);
492  }
493  // We erase all pages and entries for the chunk, as without metadata all other
494  // entries are useless.
495  deleteBufferUnlocked(chunk_it);
496  }
497  // Serialized datawrappers require metadata to be in the cache.
498  deleteWrapperFile(db_id, tb_id);
499  CHECK(file_info) << "FileInfo with freed page not found";
500  return file_info;
501 }
502 
503 FileInfo* CachingFileMgr::evictPages() {
504  FileInfo* file_info{nullptr};
505  FileBuffer* buf{nullptr};
506  while (!file_info) {
507  buf = chunkIndex_.at(evict_chunk_or_fail(chunk_evict_alg_));
508  CHECK(buf);
509  if (!buf->hasDataPages()) {
510  // This buffer contains no chunk data (metadata only, uninitialized, size == 0,
511  // etc...) so we won't recover any space by evicting it. In this case it gets
512  // removed from the eviction queue (it will get re-added if it gets populated with
513  // data) and we look at the next chunk in queue until we find a buffer with page
514  // data.
515  continue;
516  }
517  // Return the FileInfo for the first file we are freeing a page from so that the
518  // caller does not have to search for a FileInfo guaranteed to have at least one free
519  // page.
520  CHECK(buf->getMultiPage().front().pageVersions.size() > 0);
521  file_info = getFileInfoForFileId(
522  buf->getMultiPage().front().pageVersions.front().page.fileId);
523  }
524  auto pages_freed = buf->freeChunkPages();
525  CHECK(pages_freed > 0) << "failed to evict a page";
526  CHECK(file_info) << "FileInfo with freed page not found";
527  return file_info;
528 }
529 
530 void CachingFileMgr::touchKey(const ChunkKey& key) const {
531  chunk_evict_alg_.touchChunk(key);
532  table_evict_alg_.touchChunk(get_table_key(key));
533 }
534 
535 void CachingFileMgr::removeKey(const ChunkKey& key) const {
536  // chunkIndex lock should already be acquired.
537  chunk_evict_alg_.removeChunk(key);
538  auto [db_id, tb_id] = get_table_prefix(key);
539  ChunkKey table_key{db_id, tb_id};
540  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
541  for (auto it = chunkIndex_.lower_bound(table_key);
542  it != chunkIndex_.upper_bound(max_table_key);
543  ++it) {
544  if (it->first != key) {
545  // If there are any keys in this table other than that one we are removing, then
546  // keep the table in the eviction queue.
547  return;
548  }
549  }
550  // No other keys exist for this table, so remove it from the queue.
551  table_evict_alg_.removeChunk(table_key);
552 }
553 
554 size_t CachingFileMgr::getFilesSize() const {
555  heavyai::shared_lock<heavyai::shared_mutex> read_lock(files_rw_mutex_);
556  size_t sum = 0;
557  for (const auto& [id, file] : files_) {
558  sum += file->size();
559  }
560  return sum;
561 }
562 
563 size_t CachingFileMgr::getTableFileMgrsSize() const {
564  heavyai::shared_lock<heavyai::shared_mutex> read_lock(table_dirs_mutex_);
565  size_t space_used = 0;
566  for (const auto& [pair, table_dir] : table_dirs_) {
567  space_used += table_dir->getReservedSpace();
568  }
569  return space_used;
570 }
571 
572 size_t CachingFileMgr::getNumDataFiles() const {
573  heavyai::shared_lock<heavyai::shared_mutex> read_lock(files_rw_mutex_);
574  return fileIndex_.count(page_size_);
575 }
576 
577 size_t CachingFileMgr::getNumMetaFiles() const {
578  heavyai::shared_lock<heavyai::shared_mutex> read_lock(files_rw_mutex_);
579  return fileIndex_.count(metadata_page_size_);
580 }
581 
582 std::vector<ChunkKey> CachingFileMgr::getChunkKeysForPrefix(
583  const ChunkKey& prefix) const {
584  heavyai::shared_lock<heavyai::shared_mutex> read_lock(chunkIndexMutex_);
585  std::vector<ChunkKey> chunks;
586  for (auto [key, buf] : chunkIndex_) {
587  if (in_same_table(key, prefix)) {
588  if (buf->hasDataPages()) {
589  chunks.emplace_back(key);
590  touchKey(key);
591  }
592  }
593  }
594  return chunks;
595 }
596 
597 void CachingFileMgr::removeChunkKeepMetadata(const ChunkKey& key) {
598  if (isBufferOnDevice(key)) {
599  auto chunkIt = chunkIndex_.find(key);
600  CHECK(chunkIt != chunkIndex_.end());
601  auto& buf = chunkIt->second;
602  if (buf->hasDataPages()) {
603  buf->freeChunkPages();
604  chunk_evict_alg_.removeChunk(key);
605  }
606  }
607 }
608 
609 size_t CachingFileMgr::getNumChunksWithMetadata() const {
610  heavyai::shared_lock<heavyai::shared_mutex> read_lock(chunkIndexMutex_);
611  size_t sum = 0;
612  for (const auto& [key, buf] : chunkIndex_) {
613  if (buf->hasEncoder()) {
614  sum++;
615  }
616  }
617  return sum;
618 }
619 
620 std::string CachingFileMgr::dumpKeysWithMetadata() const {
621  heavyai::shared_lock<heavyai::shared_mutex> read_lock(chunkIndexMutex_);
622  std::string ret_string = "CFM keys with metadata:\n";
623  for (const auto& [key, buf] : chunkIndex_) {
624  if (buf->hasEncoder()) {
625  ret_string += " " + show_chunk(key) + "\n";
626  }
627  }
628  return ret_string;
629 }
630 
631 std::string CachingFileMgr::dumpKeysWithChunkData() const {
632  heavyai::shared_lock<heavyai::shared_mutex> read_lock(chunkIndexMutex_);
633  std::string ret_string = "CFM keys with chunk data:\n";
634  for (const auto& [key, buf] : chunkIndex_) {
635  if (buf->hasDataPages()) {
636  ret_string += " " + show_chunk(key) + "\n";
637  }
638  }
639  return ret_string;
640 }
641 
642 std::unique_ptr<CachingFileMgr> CachingFileMgr::reconstruct() const {
643  DiskCacheConfig config{fileMgrBasePath_,
644  DiskCacheLevel::none,
645  num_reader_threads_,
646  max_size_,
647  page_size_,
648  metadata_page_size_};
649  return std::make_unique<CachingFileMgr>(config);
650 }
651 
652 void CachingFileMgr::deleteWrapperFile(int32_t db, int32_t tb) {
653  heavyai::shared_lock<heavyai::shared_mutex> read_lock(table_dirs_mutex_);
654  auto it = table_dirs_.find({db, tb});
655  CHECK(it != table_dirs_.end()) << "Wrapper does not exist.";
656  it->second->deleteWrapperFile();
657 }
658 
659 void CachingFileMgr::writeWrapperFile(const std::string& doc, int32_t db, int32_t tb) {
660  createTableFileMgrIfNoneExists(db, tb);
661  auto wrapper_size = doc.size();
662  CHECK_LE(wrapper_size, getMaxWrapperSize())
663  << "Wrapper is too big to fit into the cache";
664  while (wrapper_size > getAvailableWrapperSpace()) {
665  evictMetadataPages();
666  }
667  heavyai::shared_lock<heavyai::shared_mutex> read_lock(table_dirs_mutex_);
668  table_dirs_.at({db, tb})->writeWrapperFile(doc);
669 }
670 
671 bool CachingFileMgr::hasWrapperFile(int32_t db_id, int32_t table_id) const {
672  heavyai::shared_lock<heavyai::shared_mutex> read_lock(table_dirs_mutex_);
673  auto it = table_dirs_.find({db_id, table_id});
674  if (it != table_dirs_.end()) {
675  return it->second->hasWrapperFile();
676  }
677  return false;
678 }
679 
680 /*
681  * While the CFM allows for multiple tables to share the same allocated files for chunk
682  * data and metadata, space cannot be reallocated between metadata files and data files
683  * (once the space has been reserve for a data file the file won't be deleted unless the
684  * cache is deleted). To prevent a case where we have allocated too many files of one
685  * type to the detrement of the other, we have a minimum portion of the cache that is
686  * reserved for each type. This default ratio gives %9 of space to data wrappers, %1 to
687  * metadata files, and %90 to data files.
688  */
689 void CachingFileMgr::setMaxSizes() {
690  size_t max_meta_space = std::floor(max_size_ * METADATA_SPACE_PERCENTAGE);
691  size_t max_meta_file_space = std::floor(max_size_ * METADATA_FILE_SPACE_PERCENTAGE);
692  max_wrapper_space_ = max_meta_space - max_meta_file_space;
693  auto max_data_space = max_size_ - max_meta_space;
694  auto meta_file_size = metadata_page_size_ * num_pages_per_metadata_file_;
695  auto data_file_size = page_size_ * num_pages_per_data_file_;
696  max_num_data_files_ = max_data_space / data_file_size;
697  max_num_meta_files_ = max_meta_file_space / meta_file_size;
698  CHECK_GT(max_num_data_files_, 0U) << "Cannot create a cache of size " << max_size_
699  << ". Not enough space to create a data file.";
700  CHECK_GT(max_num_meta_files_, 0U) << "Cannot create a cache of size " << max_size_
701  << ". Not enough space to create a metadata file.";
702 }
703 
704 std::optional<FileBuffer*> CachingFileMgr::getBufferIfExists(const ChunkKey& key) {
705  heavyai::shared_lock<heavyai::shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
706  auto chunk_it = chunkIndex_.find(key);
707  if (chunk_it == chunkIndex_.end()) {
708  return {};
709  }
710  return getBufferUnlocked(key);
711 }
712 
713 ChunkKeyToChunkMap::iterator CachingFileMgr::deleteBufferUnlocked(
714  const ChunkKeyToChunkMap::iterator chunk_it,
715  const bool purge) {
716  removeKey(chunk_it->first);
717  return FileMgr::deleteBufferUnlocked(chunk_it, purge);
718 }
719 
720 void CachingFileMgr::getChunkMetadataVecForKeyPrefix(
721  ChunkMetadataVector& chunkMetadataVec,
722  const ChunkKey& keyPrefix) {
723  FileMgr::getChunkMetadataVecForKeyPrefix(chunkMetadataVec, keyPrefix);
724  for (const auto& [key, meta] : chunkMetadataVec) {
725  touchKey(key);
726  }
727 }
728 
729 FileBuffer* CachingFileMgr::getBufferUnlocked(const ChunkKey& key,
730  const size_t num_bytes) const {
731  touchKey(key);
732  return FileMgr::getBufferUnlocked(key, num_bytes);
733 }
734 
735 void CachingFileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
736  page.first->freePageDeferred(page.second);
737 }
738 
739 std::set<ChunkKey> CachingFileMgr::getKeysWithMetadata() const {
740  heavyai::shared_lock<heavyai::shared_mutex> read_lock(chunkIndexMutex_);
741  std::set<ChunkKey> ret;
742  for (const auto& [key, buf] : chunkIndex_) {
743  if (buf->hasEncoder()) {
744  ret.emplace(key);
745  }
746  }
747  return ret;
748 }
749 
750 size_t CachingFileMgr::getMaxDataFilesSize() const {
751  if (limit_data_size_) {
752  return *limit_data_size_;
753  }
754  return getMaxDataFiles() * getDataFileSize();
755 }
756 
757 TableFileMgr::TableFileMgr(const std::string& table_path)
758  : table_path_(table_path)
759  , epoch_file_path_(table_path_ + "/" + FileMgr::EPOCH_FILENAME)
760  , wrapper_file_path_(table_path_ + "/" + CachingFileMgr::WRAPPER_FILE_NAME)
761  , epoch_(Epoch())
762  , is_checkpointed_(true) {
763  if (!bf::exists(table_path_)) {
764  bf::create_directory(table_path_);
765  } else {
766  CHECK(bf::is_directory(table_path_)) << "Specified path '" << table_path_
767  << "' for cache table data is not a directory.";
768  }
769  if (bf::exists(epoch_file_path_)) {
770  CHECK(bf::is_regular_file(epoch_file_path_))
771  << "Found epoch file '" << epoch_file_path_ << "' which is not a regular file";
773  << "Found epoch file '" << epoch_file_path_ << "' which is not of expected size";
776  } else {
779  incrementEpoch();
780  }
781 }
782 
785  epoch_.increment();
786  is_checkpointed_ = false;
788  << "Epoch greater than maximum allowed value (" << epoch_.ceiling() << " > "
789  << Epoch::max_allowable_epoch() << ").";
790 }
791 
792 int32_t TableFileMgr::getEpoch() const {
794  return static_cast<int32_t>(epoch_.ceiling());
795 }
796 
800  int32_t status = fflush(epoch_file_);
801  CHECK(status == 0) << "Could not flush epoch file to disk";
802 #ifdef __APPLE__
803  status = fcntl(fileno(epoch_file_), 51);
804 #else
805  status = heavyai::fsync(fileno(epoch_file_));
806 #endif
807  CHECK(status == 0) << "Could not sync epoch file to disk";
808  is_checkpointed_ = true;
809 }
810 
813  bf::remove_all(table_path_);
814 }
815 
818  size_t space = 0;
819  for (const auto& file : bf::recursive_directory_iterator(table_path_)) {
820  if (bf::is_regular_file(file.path())) {
821  space += bf::file_size(file.path());
822  }
823  }
824  return space;
825 }
826 
829  bf::remove_all(wrapper_file_path_);
830 }
831 
832 void TableFileMgr::writeWrapperFile(const std::string& doc) const {
834  std::ofstream ofs(wrapper_file_path_);
835  if (!ofs) {
836  throw std::runtime_error{"Error trying to create file \"" + wrapper_file_path_ +
837  "\". The error was: " + std::strerror(errno)};
838  }
839  ofs << doc;
840 }
841 
844  return bf::exists(wrapper_file_path_);
845 }
846 
847 std::ostream& operator<<(std::ostream& os, DiskCacheLevel disk_cache_level) {
848  if (disk_cache_level == DiskCacheLevel::none) {
849  os << "None";
850  } else if (disk_cache_level == DiskCacheLevel::fsi) {
851  os << "ForeignTables";
852  } else if (disk_cache_level == DiskCacheLevel::non_fsi) {
853  os << "LocalTables";
854  } else if (disk_cache_level == DiskCacheLevel::all) {
855  os << "All";
856  } else {
857  UNREACHABLE() << "Unexpected disk cache level: "
858  << static_cast<int32_t>(disk_cache_level);
859  }
860  return os;
861 }
862 } // namespace File_Namespace
const size_t metadata_page_size_
Definition: FileMgr.h:552
size_t getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const
std::vector< int > ChunkKey
Definition: types.h:36
OpenFilesResult openFiles()
Definition: FileMgr.cpp:200
std::ostream & operator<<(std::ostream &os, DiskCacheLevel disk_cache_level)
LRUEvictionAlgorithm table_evict_alg_
int8_t * storage_ptr()
Definition: Epoch.h:61
void removeDiskContent() const
Removes all disk data for the subdir.
const size_t page_size_
Definition: FileMgr.h:551
const ChunkKey evictNextChunk() override
This file details an extension of the FileMgr that can contain pages from multiple tables (CachingFil...
size_t size_of_dir(const std::string &dir)
heavyai::shared_mutex table_dirs_mutex_
std::string get_dir_name_for_table(int db_id, int tb_id)
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:285
void setMaxSizes()
Sets the maximum number of files/space for each type of storage based on the maximum size...
void writeAndSyncEpochToDisk()
Write and flush the epoch to the epoch file on disk.
#define UNREACHABLE()
Definition: Logger.h:338
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::string describeSelf() const override
describes this FileMgr for logging purposes.
size_t getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
void closeRemovePhysical() override
Closes files and removes the caching directory.
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:334
size_t getMetadataSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
int32_t ceiling() const
Definition: Epoch.h:44
std::pair< FILE *, std::string > create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:55
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:57
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:706
static int64_t max_allowable_epoch()
Definition: Epoch.h:69
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::string fileMgrBasePath_
Definition: FileMgr.h:411
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:57
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
CachingFileMgr(const DiskCacheConfig &config)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
Definition: FileInfo.cpp:187
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:143
std::shared_lock< T > shared_lock
void deleteCacheIfTooLarge()
When the cache is read from disk, we don&#39;t know which chunks were least recently used. Rather than try to evict random pages to get down to size we just reset the cache to make sure we have space.
heavyai::shared_mutex table_mutex_
void incrementAllEpochs()
Increment epochs for each table in the CFM.
int32_t incrementEpoch()
Definition: FileMgr.h:285
int32_t increment()
Definition: Epoch.h:54
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
Definition: FileInfo.h:51
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:330
void removeTableBuffers(int32_t db_id, int32_t tb_id)
Erases and cleans up all buffers for a table.
std::unique_lock< T > unique_lock
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void deleteWrapperFile() const
Deletes only the wrapper file on disk.
An AbstractBuffer is a unit of data management for a data manager.
void incrementEpoch()
increment the epoch for this subdir (not synced to disk).
bool g_enable_smem_group_by true
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf, const std::string &file_path)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:125
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:659
void removeTableFileMgr(int32_t db_id, int32_t tb_id)
Removes the subdirectory content for a table.
constexpr int32_t ROLLOFF_CONTINGENT
Definition: FileInfo.h:52
ChunkKey evict_chunk_or_fail(LRUEvictionAlgorithm &alg)
heavyai::shared_mutex chunkIndexMutex_
Definition: FileMgr.h:420
int32_t maxRollbackEpochs_
Definition: FileMgr.h:410
void writeWrapperFile(const std::string &doc) const
Writes wrapper file to disk.
#define CHECK_LE(x, y)
Definition: Logger.h:304
Definition: Epoch.h:30
static size_t byte_size()
Definition: Epoch.h:63
size_t getReservedSpace() const
Returns the disk space used (in bytes) for the subdir.
std::map< TablePair, std::unique_ptr< TableFileMgr > > table_dirs_
FILE * open(int file_id)
Opens the file with the given id; fatal crash on error.
Definition: File.cpp:100
void readTableFileMgrs()
Checks for any sub-directories containing table-specific data and creates epochs from found files...
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
int fsync(int fd)
Definition: heavyai_fs.cpp:62
int32_t epoch() const
Definition: FileMgr.h:530
#define CHECK(condition)
Definition: Logger.h:291
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1604
void init(const size_t num_reader_threads)
Initializes a CFM, parsing any existing files and initializing data structures appropriately (current...
heavyai::shared_mutex files_rw_mutex_
Definition: FileMgr.h:421
bool in_same_table(const ChunkKey &left_key, const ChunkKey &right_key)
Definition: types.h:83
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:98
unsigned nextFileId_
number of threads used when loading data
Definition: FileMgr.h:416
size_t getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
size_t file_size(const int fd)
Definition: heavyai_fs.cpp:33
int32_t getEpoch() const
Returns the current epoch (locked)
std::string getTableFileMgrPath(int32_t db, int32_t tb) const
void clearForTable(int32_t db_id, int32_t tb_id)
Removes all data related to the given table (pages and subdirectories).
A FileMgr capable of limiting it&#39;s size and storing data from multiple tables in a shared directory...
#define VLOG(n)
Definition: Logger.h:388
LRUEvictionAlgorithm chunk_evict_alg_
FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &startIt, const std::vector< HeaderInfo >::const_iterator &endIt) override
Creates a buffer and initializes it with info read from files on disk.