OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CachingFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 Omnisci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
22 #include <boost/filesystem.hpp>
23 #include "Shared/misc.h"
24 
25 namespace bf = boost::filesystem;
26 
27 namespace {
28 size_t size_of_dir(const std::string& dir) {
29  size_t space_used = 0;
30  if (bf::exists(dir)) {
31  for (const auto& file : bf::recursive_directory_iterator(dir)) {
32  if (bf::is_regular_file(file.path())) {
33  space_used += bf::file_size(file.path());
34  }
35  }
36  }
37  return space_used;
38 }
39 
41  ChunkKey ret;
42  try {
43  ret = alg.evictNextChunk();
44  } catch (const NoEntryFoundException& e) {
45  LOG(FATAL) << "Disk cache needs to evict data to make space, but no data found in "
46  "eviction queue.";
47  }
48  return ret;
49 }
50 
51 } // namespace
52 
53 namespace File_Namespace {
54 
55 std::string CachingFileMgr::dump() const {
56  std::stringstream ss;
57  ss << "Dump Cache:\n";
58  for (const auto& [key, buf] : chunkIndex_) {
59  ss << " " << show_chunk(key) << " num_pages: " << buf->pageCount()
60  << ", is dirty: " << buf->isDirty() << "\n";
61  }
62  ss << "Data Eviction Queue:\n" << chunk_evict_alg_.dumpEvictionQueue();
63  ss << "Metadata Eviction Queue:\n" << table_evict_alg_.dumpEvictionQueue();
64  ss << "\n";
65  return ss.str();
66 }
67 
69  fileMgrBasePath_ = config.path;
71  defaultPageSize_ = config.page_size;
72  nextFileId_ = 0;
73  max_size_ = config.size_limit;
74  init(config.num_reader_threads);
75  setMaxSizes();
76 }
77 
79 
80 void CachingFileMgr::init(const size_t num_reader_threads) {
83  auto open_files_result = openFiles();
84  /* Sort headerVec so that all HeaderInfos
85  * from a chunk will be grouped together
86  * and in order of increasing PageId
87  * - Version Epoch */
88  auto& header_vec = open_files_result.header_infos;
89  std::sort(header_vec.begin(), header_vec.end());
90 
91  /* Goal of next section is to find sequences in the
92  * sorted headerVec of the same ChunkId, which we
93  * can then initiate a FileBuffer with */
94  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
95  if (header_vec.size() > 0) {
96  auto startIt = header_vec.begin();
97  ChunkKey lastChunkKey = startIt->chunkKey;
98  for (auto it = header_vec.begin() + 1; it != header_vec.end(); ++it) {
99  if (it->chunkKey != lastChunkKey) {
100  createBufferFromHeaders(lastChunkKey, startIt, it);
101  lastChunkKey = it->chunkKey;
102  startIt = it;
103  }
104  }
105  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
106  }
107  nextFileId_ = open_files_result.max_file_id + 1;
109  freePages();
110  initializeNumThreads(num_reader_threads);
111  isFullyInitted_ = true;
112 }
113 
115  mapd_unique_lock<mapd_shared_mutex> write_lock(table_dirs_mutex_);
116  bf::path path(fileMgrBasePath_);
117  CHECK(bf::exists(path)) << "Cache path: " << fileMgrBasePath_ << " does not exit.";
118  CHECK(bf::is_directory(path))
119  << "Specified path '" << fileMgrBasePath_ << "' for disk cache is not a directory.";
120 
121  // Look for directories with table-specific names.
122  boost::regex table_filter("table_([0-9]+)_([0-9]+)");
123  for (const auto& file : bf::directory_iterator(path)) {
124  boost::smatch match;
125  auto file_name = file.path().filename().string();
126  if (boost::regex_match(file_name, match, table_filter)) {
127  int32_t db_id = std::stoi(match[1]);
128  int32_t tb_id = std::stoi(match[2]);
129  TablePair table_pair{db_id, tb_id};
130  CHECK(table_dirs_.find(table_pair) == table_dirs_.end())
131  << "Trying to read data for existing table";
132  table_dirs_.emplace(table_pair,
133  std::make_unique<TableFileMgr>(file.path().string()));
134  }
135  }
136 }
137 
138 int32_t CachingFileMgr::epoch(int32_t db_id, int32_t tb_id) const {
139  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
140  auto tables_it = table_dirs_.find({db_id, tb_id});
141  if (tables_it == table_dirs_.end()) {
142  // If there is no directory for this table, that means the cache does not recognize
143  // the table that is requested. This can happen if a table was dropped, and it's
144  // pages were invalidated but not yet freed and then the server crashed before they
145  // were freed. Upon re-starting the FileMgr will find these pages and attempt to
146  // compare their epoch to know if they are valid or not. In this case we should
147  // return an invalid epoch to indicate that any page for this table is not valid and
148  // should be freed.
150  }
151  auto& [pair, table_dir] = *tables_it;
152  return table_dir->getEpoch();
153 }
154 
155 void CachingFileMgr::incrementEpoch(int32_t db_id, int32_t tb_id) {
156  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
157  auto tables_it = table_dirs_.find({db_id, tb_id});
158  CHECK(tables_it != table_dirs_.end());
159  auto& [pair, table_dir] = *tables_it;
160  table_dir->incrementEpoch();
161 }
162 
163 void CachingFileMgr::writeAndSyncEpochToDisk(int32_t db_id, int32_t tb_id) {
164  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
165  auto table_it = table_dirs_.find({db_id, tb_id});
166  CHECK(table_it != table_dirs_.end());
167  table_it->second->writeAndSyncEpochToDisk();
168 }
169 
170 void CachingFileMgr::clearForTable(int32_t db_id, int32_t tb_id) {
171  removeTableBuffers(db_id, tb_id);
172  removeTableFileMgr(db_id, tb_id);
173  freePages();
174 }
175 
176 std::string CachingFileMgr::getTableFileMgrPath(int32_t db_id, int32_t tb_id) const {
177  return getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
178 }
179 
181  {
182  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
184  }
185  {
186  mapd_unique_lock<mapd_shared_mutex> tables_lock(table_dirs_mutex_);
187  table_dirs_.clear();
188  }
189  bf::remove_all(getFileMgrBasePath());
190 }
191 
192 size_t CachingFileMgr::getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
193  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
194  size_t space_used = 0;
195  ChunkKey min_table_key{db_id, tb_id};
196  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
197  for (auto it = chunkIndex_.lower_bound(min_table_key);
198  it != chunkIndex_.upper_bound(max_table_key);
199  ++it) {
200  auto& [key, buffer] = *it;
201  space_used += (buffer->numChunkPages() * defaultPageSize_);
202  }
203  return space_used;
204 }
205 
207  int32_t tb_id) const {
208  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
209  size_t space_used = 0;
210  ChunkKey min_table_key{db_id, tb_id};
211  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
212  for (auto it = chunkIndex_.lower_bound(min_table_key);
213  it != chunkIndex_.upper_bound(max_table_key);
214  ++it) {
215  auto& [key, buffer] = *it;
216  space_used += (buffer->numMetadataPages() * METADATA_PAGE_SIZE);
217  }
218  return space_used;
219 }
220 
221 size_t CachingFileMgr::getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const {
222  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
223  size_t space = 0;
224  auto table_it = table_dirs_.find({db_id, tb_id});
225  if (table_it != table_dirs_.end()) {
226  space += table_it->second->getReservedSpace();
227  }
228  return space;
229 }
230 
231 size_t CachingFileMgr::getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
232  auto chunk_space = getChunkSpaceReservedByTable(db_id, tb_id);
233  auto meta_space = getMetadataSpaceReservedByTable(db_id, tb_id);
234  auto subdir_space = getTableFileMgrSpaceReserved(db_id, tb_id);
235  return chunk_space + meta_space + subdir_space;
236 }
237 
238 std::string CachingFileMgr::describeSelf() const {
239  return "cache";
240 }
241 
242 // Similar to FileMgr::checkpoint() but only writes a subset of buffers.
243 void CachingFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
244  {
245  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
246  CHECK(table_dirs_.find({db_id, tb_id}) != table_dirs_.end());
247  }
248  VLOG(2) << "Checkpointing " << describeSelf() << " (" << db_id << ", " << tb_id
249  << ") epoch: " << epoch(db_id, tb_id);
250  writeDirtyBuffers(db_id, tb_id);
251  syncFilesToDisk();
252  writeAndSyncEpochToDisk(db_id, tb_id);
253  incrementEpoch(db_id, tb_id);
254  freePages();
255 }
256 
257 void CachingFileMgr::createTableFileMgrIfNoneExists(const int32_t db_id,
258  const int32_t tb_id) {
259  mapd_unique_lock<mapd_shared_mutex> write_lock(table_dirs_mutex_);
260  TablePair table_pair{db_id, tb_id};
261  if (table_dirs_.find(table_pair) == table_dirs_.end()) {
262  table_dirs_.emplace(
263  table_pair, std::make_unique<TableFileMgr>(getTableFileMgrPath(db_id, tb_id)));
264  }
265 }
266 
267 FileBuffer* CachingFileMgr::createBufferUnlocked(const ChunkKey& key,
268  const size_t page_size,
269  const size_t num_bytes) {
270  touchKey(key);
271  auto [db_id, tb_id] = get_table_prefix(key);
272  createTableFileMgrIfNoneExists(db_id, tb_id);
273  return FileMgr::createBufferUnlocked(key, page_size, num_bytes);
274 }
275 
276 FileBuffer* CachingFileMgr::createBufferFromHeaders(
277  const ChunkKey& key,
278  const std::vector<HeaderInfo>::const_iterator& startIt,
279  const std::vector<HeaderInfo>::const_iterator& endIt) {
280  if (startIt->pageId != -1) {
281  // If the first pageId is not -1 then there is no metadata page for the
282  // current key (which means it was never checkpointed), so we should skip.
283  return nullptr;
284  }
285  touchKey(key);
286  auto [db_id, tb_id] = get_table_prefix(key);
287  createTableFileMgrIfNoneExists(db_id, tb_id);
288  auto buffer = FileMgr::createBufferFromHeaders(key, startIt, endIt);
289  if (buffer->isMissingPages()) {
290  // Detect the case where a page is missing by comparing the amount of pages read
291  // with the metadata size. If data are missing, discard the chunk.
292  buffer->freeChunkPages();
293  }
294  return buffer;
295 }
296 
303 FileBuffer* CachingFileMgr::putBuffer(const ChunkKey& key,
304  AbstractBuffer* src_buffer,
305  const size_t num_bytes) {
306  CHECK(!src_buffer->isDirty()) << "Cannot cache dirty buffers.";
307  deleteBufferIfExists(key);
308  // Since the buffer is not dirty we mark it as dirty if we are only writing metadata and
309  // appended if we are writing chunk data. We delete + append rather than write to make
310  // sure we don't write multiple page versions.
311  (src_buffer->size() == 0) ? src_buffer->setDirty() : src_buffer->setAppended();
312  return FileMgr::putBuffer(key, src_buffer, num_bytes);
313 }
314 
315 void CachingFileMgr::incrementAllEpochs() {
316  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
317  for (auto& table_dir : table_dirs_) {
318  table_dir.second->incrementEpoch();
319  }
320 }
321 
322 void CachingFileMgr::removeTableFileMgr(int32_t db_id, int32_t tb_id) {
323  // Delete table-specific directory (stores table epoch data and serialized data wrapper)
324  mapd_unique_lock<mapd_shared_mutex> write_lock(table_dirs_mutex_);
325  auto it = table_dirs_.find({db_id, tb_id});
326  if (it != table_dirs_.end()) {
327  it->second->removeDiskContent();
328  table_dirs_.erase(it);
329  }
330 }
331 
332 void CachingFileMgr::removeTableBuffers(int32_t db_id, int32_t tb_id) {
333  // Free associated FileBuffers and clear buffer entries.
334  mapd_unique_lock<mapd_shared_mutex> write_lock(chunkIndexMutex_);
335  ChunkKey min_table_key{db_id, tb_id};
336  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
337  for (auto it = chunkIndex_.lower_bound(min_table_key);
338  it != chunkIndex_.upper_bound(max_table_key);) {
339  it = deleteBufferUnlocked(it);
340  }
341 }
342 
343 CachingFileBuffer* CachingFileMgr::allocateBuffer(const size_t page_size,
344  const ChunkKey& key,
345  const size_t num_bytes) {
346  return new CachingFileBuffer(this, page_size, key, num_bytes);
347 }
348 
349 CachingFileBuffer* CachingFileMgr::allocateBuffer(
350  const ChunkKey& key,
351  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
352  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
353  return new CachingFileBuffer(this, key, headerStartIt, headerEndIt);
354 }
355 
356 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
357 bool CachingFileMgr::updatePageIfDeleted(FileInfo* file_info,
358  ChunkKey& chunk_key,
359  int32_t contingent,
360  int32_t page_epoch,
361  int32_t page_num) {
362  // These contingents are stored by overwriting the bytes used for chunkKeys. If
363  // we run into a key marked for deletion in a fileMgr with no fileMgrKey (i.e.
364  // CachingFileMgr) then we can't know if the epoch is valid because we don't know
365  // the key. At this point our only option is to free the page as though it was
366  // checkpointed (which should be fine since we only maintain one version of each
367  // page).
368  if (contingent == DELETE_CONTINGENT || contingent == ROLLOFF_CONTINGENT) {
369  file_info->freePage(page_num, false, page_epoch);
370  return true;
371  }
372  return false;
373 }
374 
375 void CachingFileMgr::writeDirtyBuffers(int32_t db_id, int32_t tb_id) {
376  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
377  ChunkKey min_table_key{db_id, tb_id};
378  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
379 
380  for (auto chunk_it = chunkIndex_.lower_bound(min_table_key);
381  chunk_it != chunkIndex_.upper_bound(max_table_key);
382  ++chunk_it) {
383  if (auto [key, buf] = *chunk_it; buf->isDirty()) {
384  // Free previous versions first so we only have one metadata version.
385  buf->freeMetadataPages();
386  buf->writeMetadata(epoch(db_id, tb_id));
387  buf->clearDirtyBits();
388  touchKey(key);
389  }
390  }
391 }
392 
393 void CachingFileMgr::deleteBufferIfExists(const ChunkKey& key) {
394  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
395  auto chunk_it = chunkIndex_.find(key);
396  if (chunk_it != chunkIndex_.end()) {
397  deleteBufferUnlocked(chunk_it);
398  }
399 }
400 
401 size_t CachingFileMgr::getNumDataChunks() const {
402  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
403  size_t num_chunks = 0;
404  for (auto [key, buf] : chunkIndex_) {
405  if (buf->hasDataPages()) {
406  num_chunks++;
407  }
408  }
409  return num_chunks;
410 }
411 
412 void CachingFileMgr::deleteCacheIfTooLarge() {
413  if (size_of_dir(fileMgrBasePath_) > max_size_) {
414  closeRemovePhysical();
415  bf::create_directory(fileMgrBasePath_);
416  LOG(INFO) << "Cache path over limit. Existing cache deleted.";
417  }
418 }
419 
420 Page CachingFileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
421  std::lock_guard<std::mutex> lock(getPageMutex_);
422  int32_t pageNum = -1;
423  // Splits files into metadata and regular data by size.
424  auto candidateFiles = fileIndex_.equal_range(pageSize);
425  // Check if there is a free page in an existing file.
426  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
427  FileInfo* fileInfo = files_.at(fileIt->second);
428  pageNum = fileInfo->getFreePage();
429  if (pageNum != -1) {
430  return (Page(fileInfo->fileId, pageNum));
431  }
432  }
433 
434  // Try to add a new file if there is free space available.
435  FileInfo* fileInfo = nullptr;
436  if (isMetadata) {
437  if (getMaxMetaFiles() > getNumMetaFiles()) {
438  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
439  }
440  } else {
441  if (getMaxDataFiles() > getNumDataFiles()) {
442  fileInfo = createFile(pageSize, num_pages_per_data_file_);
443  }
444  }
445 
446  if (!fileInfo) {
447  // We were not able to create a new file, so we try to evict space.
448  // Eviction will return the first file it evicted a page from (a file now guaranteed
449  // to have a free page).
450  fileInfo = isMetadata ? evictMetadataPages() : evictPages();
451  }
452  CHECK(fileInfo);
453 
454  pageNum = fileInfo->getFreePage();
455  CHECK(pageNum != -1);
456  return (Page(fileInfo->fileId, pageNum));
457 }
458 
459 std::vector<ChunkKey> CachingFileMgr::getKeysForTable(int32_t db_id,
460  int32_t tb_id) const {
461  std::vector<ChunkKey> keys;
462  ChunkKey min_table_key{db_id, tb_id};
463  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
464  for (auto it = chunkIndex_.lower_bound(min_table_key);
465  it != chunkIndex_.upper_bound(max_table_key);
466  ++it) {
467  keys.emplace_back(it->first);
468  }
469  return keys;
470 }
471 
472 FileInfo* CachingFileMgr::evictMetadataPages() {
473  // Locks should already be in place before calling this method.
474  FileInfo* file_info{nullptr};
475  auto key_to_evict = evict_chunk_or_fail(table_evict_alg_);
476  auto [db_id, tb_id] = get_table_prefix(key_to_evict);
477  const auto keys = getKeysForTable(db_id, tb_id);
478  for (const auto& key : keys) {
479  auto chunk_it = chunkIndex_.find(key);
480  CHECK(chunk_it != chunkIndex_.end());
481  auto& buf = chunk_it->second;
482  if (!file_info) {
483  // Return the FileInfo for the first file we are freeing a page from so that the
484  // caller does not have to search for a FileInfo guaranteed to have at least one
485  // free page.
486  CHECK(buf->getMetadataPage().pageVersions.size() > 0);
487  file_info =
488  getFileInfoForFileId(buf->getMetadataPage().pageVersions.front().page.fileId);
489  }
490  // We erase all pages and entries for the chunk, as without metadata all other
491  // entries are useless.
492  deleteBufferUnlocked(chunk_it);
493  }
494  // Serialized datawrappers require metadata to be in the cache.
495  deleteWrapperFile(db_id, tb_id);
496  CHECK(file_info) << "FileInfo with freed page not found";
497  return file_info;
498 }
499 
500 FileInfo* CachingFileMgr::evictPages() {
501  FileInfo* file_info{nullptr};
502  FileBuffer* buf{nullptr};
503  while (!file_info) {
504  buf = chunkIndex_.at(evict_chunk_or_fail(chunk_evict_alg_));
505  CHECK(buf);
506  if (!buf->hasDataPages()) {
507  // This buffer contains no chunk data (metadata only, uninitialized, size == 0,
508  // etc...) so we won't recover any space by evicting it. In this case it gets
509  // removed from the eviction queue (it will get re-added if it gets populated with
510  // data) and we look at the next chunk in queue until we find a buffer with page
511  // data.
512  continue;
513  }
514  // Return the FileInfo for the first file we are freeing a page from so that the
515  // caller does not have to search for a FileInfo guaranteed to have at least one free
516  // page.
517  CHECK(buf->getMultiPage().front().pageVersions.size() > 0);
518  file_info = getFileInfoForFileId(
519  buf->getMultiPage().front().pageVersions.front().page.fileId);
520  }
521  auto pages_freed = buf->freeChunkPages();
522  CHECK(pages_freed > 0) << "failed to evict a page";
523  CHECK(file_info) << "FileInfo with freed page not found";
524  return file_info;
525 }
526 
527 void CachingFileMgr::touchKey(const ChunkKey& key) const {
528  chunk_evict_alg_.touchChunk(key);
529  table_evict_alg_.touchChunk(get_table_key(key));
530 }
531 
532 void CachingFileMgr::removeKey(const ChunkKey& key) const {
533  // chunkIndex lock should already be acquired.
534  chunk_evict_alg_.removeChunk(key);
535  auto [db_id, tb_id] = get_table_prefix(key);
536  ChunkKey table_key{db_id, tb_id};
537  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
538  for (auto it = chunkIndex_.lower_bound(table_key);
539  it != chunkIndex_.upper_bound(max_table_key);
540  ++it) {
541  if (it->first != key) {
542  // If there are any keys in this table other than that one we are removing, then
543  // keep the table in the eviction queue.
544  return;
545  }
546  }
547  // No other keys exist for this table, so remove it from the queue.
548  table_evict_alg_.removeChunk(table_key);
549 }
550 
551 size_t CachingFileMgr::getFilesSize() const {
552  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
553  size_t sum = 0;
554  for (auto [id, file] : files_) {
555  sum += file->size();
556  }
557  return sum;
558 }
559 
560 size_t CachingFileMgr::getTableFileMgrsSize() const {
561  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
562  size_t space_used = 0;
563  for (const auto& [pair, table_dir] : table_dirs_) {
564  space_used += table_dir->getReservedSpace();
565  }
566  return space_used;
567 }
568 
569 size_t CachingFileMgr::getNumDataFiles() const {
570  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
571  return fileIndex_.count(defaultPageSize_);
572 }
573 
574 size_t CachingFileMgr::getNumMetaFiles() const {
575  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
576  return fileIndex_.count(METADATA_PAGE_SIZE);
577 }
578 
579 std::vector<ChunkKey> CachingFileMgr::getChunkKeysForPrefix(
580  const ChunkKey& prefix) const {
581  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
582  std::vector<ChunkKey> chunks;
583  for (auto [key, buf] : chunkIndex_) {
584  if (in_same_table(key, prefix)) {
585  if (buf->hasDataPages()) {
586  chunks.emplace_back(key);
587  touchKey(key);
588  }
589  }
590  }
591  return chunks;
592 }
593 
594 void CachingFileMgr::removeChunkKeepMetadata(const ChunkKey& key) {
595  if (isBufferOnDevice(key)) {
596  auto chunkIt = chunkIndex_.find(key);
597  CHECK(chunkIt != chunkIndex_.end());
598  auto& buf = chunkIt->second;
599  if (buf->hasDataPages()) {
600  buf->freeChunkPages();
601  chunk_evict_alg_.removeChunk(key);
602  }
603  }
604 }
605 
606 size_t CachingFileMgr::getNumChunksWithMetadata() const {
607  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
608  size_t sum = 0;
609  for (const auto& [key, buf] : chunkIndex_) {
610  if (buf->hasEncoder()) {
611  sum++;
612  }
613  }
614  return sum;
615 }
616 
617 std::string CachingFileMgr::dumpKeysWithMetadata() const {
618  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
619  std::string ret_string = "CFM keys with metadata:\n";
620  for (const auto& [key, buf] : chunkIndex_) {
621  if (buf->hasEncoder()) {
622  ret_string += " " + show_chunk(key) + "\n";
623  }
624  }
625  return ret_string;
626 }
627 
628 std::string CachingFileMgr::dumpKeysWithChunkData() const {
629  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
630  std::string ret_string = "CFM keys with chunk data:\n";
631  for (const auto& [key, buf] : chunkIndex_) {
632  if (buf->hasDataPages()) {
633  ret_string += " " + show_chunk(key) + "\n";
634  }
635  }
636  return ret_string;
637 }
638 
639 std::unique_ptr<CachingFileMgr> CachingFileMgr::reconstruct() const {
640  DiskCacheConfig config{fileMgrBasePath_,
641  DiskCacheLevel::none,
642  num_reader_threads_,
643  max_size_,
644  defaultPageSize_};
645  return std::make_unique<CachingFileMgr>(config);
646 }
647 
648 void CachingFileMgr::deleteWrapperFile(int32_t db, int32_t tb) {
649  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
650  auto it = table_dirs_.find({db, tb});
651  CHECK(it != table_dirs_.end());
652  it->second->deleteWrapperFile();
653 }
654 
655 void CachingFileMgr::writeWrapperFile(const std::string& doc, int32_t db, int32_t tb) {
656  createTableFileMgrIfNoneExists(db, tb);
657  auto wrapper_size = doc.size();
658  CHECK_LE(wrapper_size, getMaxWrapperSize())
659  << "Wrapper is too big to fit into the cache";
660  while (wrapper_size > getAvailableWrapperSpace()) {
661  evictMetadataPages();
662  }
663  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
664  table_dirs_.at({db, tb})->writeWrapperFile(doc);
665 }
666 
667 /*
668  * While the CFM allows for multiple tables to share the same allocated files for chunk
669  * data and metadata, space cannot be reallocated between metadata files and data files
670  * (once the space has been reserve for a data file the file won't be deleted unless the
671  * cache is deleted). To prevent a case where we have allocated too many files of one
672  * type to the detrement of the other, we have a minimum portion of the cache that is
673  * reserved for each type. This default ratio gives %9 of space to data wrappers, %1 to
674  * metadata files, and %90 to data files.
675  */
676 void CachingFileMgr::setMaxSizes() {
677  size_t max_meta_space = std::floor(max_size_ * METADATA_SPACE_PERCENTAGE);
678  size_t max_meta_file_space = std::floor(max_size_ * METADATA_FILE_SPACE_PERCENTAGE);
679  max_wrapper_space_ = max_meta_space - max_meta_file_space;
680  auto max_data_space = max_size_ - max_meta_space;
681  auto meta_file_size = METADATA_PAGE_SIZE * num_pages_per_metadata_file_;
682  auto data_file_size = defaultPageSize_ * num_pages_per_data_file_;
683  max_num_data_files_ = max_data_space / data_file_size;
684  max_num_meta_files_ = max_meta_file_space / meta_file_size;
685  CHECK_GT(max_num_data_files_, 0U) << "Cannot create a cache of size " << max_size_
686  << ". Not enough space to create a data file.";
687  CHECK_GT(max_num_meta_files_, 0U) << "Cannot create a cache of size " << max_size_
688  << ". Not enough space to create a metadata file.";
689 }
690 
691 std::optional<FileBuffer*> CachingFileMgr::getBufferIfExists(const ChunkKey& key) {
692  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
693  auto chunk_it = chunkIndex_.find(key);
694  if (chunk_it == chunkIndex_.end()) {
695  return {};
696  }
697  return getBufferUnlocked(key);
698 }
699 
700 ChunkKeyToChunkMap::iterator CachingFileMgr::deleteBufferUnlocked(
701  const ChunkKeyToChunkMap::iterator chunk_it,
702  const bool purge) {
703  removeKey(chunk_it->first);
704  return FileMgr::deleteBufferUnlocked(chunk_it, purge);
705 }
706 
707 void CachingFileMgr::getChunkMetadataVecForKeyPrefix(
708  ChunkMetadataVector& chunkMetadataVec,
709  const ChunkKey& keyPrefix) {
710  FileMgr::getChunkMetadataVecForKeyPrefix(chunkMetadataVec, keyPrefix);
711  for (const auto& [key, meta] : chunkMetadataVec) {
712  touchKey(key);
713  }
714 }
715 
716 FileBuffer* CachingFileMgr::getBufferUnlocked(const ChunkKey& key,
717  const size_t num_bytes) const {
718  touchKey(key);
719  return FileMgr::getBufferUnlocked(key, num_bytes);
720 }
721 
722 void CachingFileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
723  page.first->freePageDeferred(page.second);
724 }
725 
726 std::set<ChunkKey> CachingFileMgr::getKeysWithMetadata() const {
727  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
728  std::set<ChunkKey> ret;
729  for (const auto& [key, buf] : chunkIndex_) {
730  if (buf->hasEncoder()) {
731  ret.emplace(key);
732  }
733  }
734  return ret;
735 }
736 
737 size_t CachingFileMgr::getMaxDataFilesSize() const {
738  if (limit_data_size_) {
739  return *limit_data_size_;
740  }
741  return getMaxDataFiles() * getDataFileSize();
742 }
743 
744 TableFileMgr::TableFileMgr(const std::string& table_path)
745  : table_path_(table_path)
746  , epoch_file_path_(table_path_ + "/" + FileMgr::EPOCH_FILENAME)
747  , wrapper_file_path_(table_path_ + "/" + CachingFileMgr::WRAPPER_FILE_NAME)
748  , epoch_(Epoch())
749  , is_checkpointed_(true) {
750  if (!bf::exists(table_path_)) {
751  bf::create_directory(table_path_);
752  } else {
753  CHECK(bf::is_directory(table_path_)) << "Specified path '" << table_path_
754  << "' for cache table data is not a directory.";
755  }
756  if (bf::exists(epoch_file_path_)) {
757  CHECK(bf::is_regular_file(epoch_file_path_))
758  << "Found epoch file '" << epoch_file_path_ << "' which is not a regular file";
760  << "Found epoch file '" << epoch_file_path_ << "' which is not of expected size";
763  } else {
766  incrementEpoch();
767  }
768 }
769 
771  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
772  epoch_.increment();
773  is_checkpointed_ = false;
775  << "Epoch greater than maximum allowed value (" << epoch_.ceiling() << " > "
776  << Epoch::max_allowable_epoch() << ").";
777 }
778 
779 int32_t TableFileMgr::getEpoch() const {
780  mapd_shared_lock<mapd_shared_mutex> r_lock(table_mutex_);
781  return static_cast<int32_t>(epoch_.ceiling());
782 }
783 
785  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
787  int32_t status = fflush(epoch_file_);
788  CHECK(status == 0) << "Could not flush epoch file to disk";
789 #ifdef __APPLE__
790  status = fcntl(fileno(epoch_file_), 51);
791 #else
792  status = heavyai::fsync(fileno(epoch_file_));
793 #endif
794  CHECK(status == 0) << "Could not sync epoch file to disk";
795  is_checkpointed_ = true;
796 }
797 
799  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
800  bf::remove_all(table_path_);
801 }
802 
804  mapd_shared_lock<mapd_shared_mutex> r_lock(table_mutex_);
805  size_t space = 0;
806  for (const auto& file : bf::recursive_directory_iterator(table_path_)) {
807  if (bf::is_regular_file(file.path())) {
808  space += bf::file_size(file.path());
809  }
810  }
811  return space;
812 }
813 
815  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
816  bf::remove_all(wrapper_file_path_);
817 }
818 
819 void TableFileMgr::writeWrapperFile(const std::string& doc) const {
820  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
821  std::ofstream ofs(wrapper_file_path_);
822  if (!ofs) {
823  throw std::runtime_error{"Error trying to create file \"" + wrapper_file_path_ +
824  "\". The error was: " + std::strerror(errno)};
825  }
826  ofs << doc;
827 }
828 
829 } // namespace File_Namespace
size_t getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
OpenFilesResult openFiles()
Definition: FileMgr.cpp:189
LRUEvictionAlgorithm table_evict_alg_
int8_t * storage_ptr()
Definition: Epoch.h:61
void removeDiskContent() const
Removes all disk data for the subdir.
const ChunkKey evictNextChunk() override
size_t size_of_dir(const std::string &dir)
std::string get_dir_name_for_table(int db_id, int tb_id)
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:217
void setMaxSizes()
Sets the maximum number of files/space for each type of storage based on the maximum size...
mapd_shared_mutex table_dirs_mutex_
void writeAndSyncEpochToDisk()
Write and flush the epoch to the epoch file on disk.
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::string describeSelf() const override
describes this FileMgr for logging purposes.
size_t getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
void closeRemovePhysical() override
Closes files and removes the caching directory.
static int64_t min_allowable_epoch()
Definition: Epoch.h:65
std::string getFileMgrBasePath() const
Definition: FileMgr.h:334
size_t getMetadataSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
int32_t ceiling() const
Definition: Epoch.h:44
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:58
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:694
static int64_t max_allowable_epoch()
Definition: Epoch.h:69
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::string fileMgrBasePath_
Definition: FileMgr.h:397
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:58
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:57
std::string show_chunk(const ChunkKey &key)
Definition: types.h:99
CachingFileMgr(const DiskCacheConfig &config)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
Definition: FileInfo.cpp:181
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:150
mapd_shared_mutex table_mutex_
void deleteCacheIfTooLarge()
When the cache is read from disk, we don&#39;t know which chunks were least recently used. Rather than try to evict random pages to get down to size we just reset the cache to make sure we have space.
void incrementAllEpochs()
Increment epochs for each table in the CFM.
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:142
int32_t incrementEpoch()
Definition: FileMgr.h:284
int32_t increment()
Definition: Epoch.h:54
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
Definition: FileInfo.h:51
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:329
void removeTableBuffers(int32_t db_id, int32_t tb_id)
Erases and cleans up all buffers for a table.
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void deleteWrapperFile() const
Deletes only the wrapper file on disk.
An AbstractBuffer is a unit of data management for a data manager.
void incrementEpoch()
increment the epoch for this subdir (not synced to disk).
bool g_enable_smem_group_by true
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:647
void removeTableFileMgr(int32_t db_id, int32_t tb_id)
Removes the subdirectory content for a table.
constexpr int32_t ROLLOFF_CONTINGENT
Definition: FileInfo.h:52
ChunkKey evict_chunk_or_fail(LRUEvictionAlgorithm &alg)
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:403
int32_t maxRollbackEpochs_
Definition: FileMgr.h:396
void writeWrapperFile(const std::string &doc) const
Writes wrapper file to disk.
#define CHECK_LE(x, y)
Definition: Logger.h:234
Definition: Epoch.h:30
static size_t byte_size()
Definition: Epoch.h:63
size_t getReservedSpace() const
Returns the disk space used (in bytes) for the subdir.
std::map< TablePair, std::unique_ptr< TableFileMgr > > table_dirs_
void readTableFileMgrs()
Checks for any sub-directories containing table-specific data and creates epochs from found files...
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:63
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:107
int fsync(int fd)
Definition: omnisci_fs.cpp:60
int32_t epoch() const
Definition: FileMgr.h:518
#define CHECK(condition)
Definition: Logger.h:223
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1567
void init(const size_t num_reader_threads)
Initializes a CFM, parsing any existing files and initializing data structures appropriately (current...
mapd_unique_lock< mapd_shared_mutex > write_lock
bool in_same_table(const ChunkKey &left_key, const ChunkKey &right_key)
Definition: types.h:84
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:92
size_t getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:411
int32_t getEpoch() const
Returns the current epoch (locked)
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:412
std::string getTableFileMgrPath(int32_t db, int32_t tb) const
void clearForTable(int32_t db_id, int32_t tb_id)
Removes all data related to the given table (pages and subdirectories).
A FileMgr capable of limiting it&#39;s size and storing data from multiple tables in a shared directory...
#define VLOG(n)
Definition: Logger.h:317
LRUEvictionAlgorithm chunk_evict_alg_
FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &startIt, const std::vector< HeaderInfo >::const_iterator &endIt) override
Creates a buffer and initializes it with info read from files on disk.