OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CachingFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 Omnisci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
22 #include <boost/filesystem.hpp>
23 #include "Shared/misc.h"
24 
25 namespace bf = boost::filesystem;
26 
27 namespace {
28 size_t size_of_dir(const std::string& dir) {
29  size_t space_used = 0;
30  if (bf::exists(dir)) {
31  for (const auto& file : bf::recursive_directory_iterator(dir)) {
32  if (bf::is_regular_file(file.path())) {
33  space_used += bf::file_size(file.path());
34  }
35  }
36  }
37  return space_used;
38 }
39 
41  ChunkKey ret;
42  try {
43  ret = alg.evictNextChunk();
44  } catch (const NoEntryFoundException& e) {
45  LOG(FATAL) << "Disk cache needs to evict data to make space, but no data found in "
46  "eviction queue.";
47  }
48  return ret;
49 }
50 } // namespace
51 
52 namespace File_Namespace {
53 
55  fileMgrBasePath_ = config.path;
57  defaultPageSize_ = config.page_size;
58  nextFileId_ = 0;
59  max_size_ = config.size_limit;
60  init(config.num_reader_threads);
61  setMaxSizes();
62 }
63 
65 
66 void CachingFileMgr::init(const size_t num_reader_threads) {
69  auto open_files_result = openFiles();
70  /* Sort headerVec so that all HeaderInfos
71  * from a chunk will be grouped together
72  * and in order of increasing PageId
73  * - Version Epoch */
74  auto& header_vec = open_files_result.header_infos;
75  std::sort(header_vec.begin(), header_vec.end());
76 
77  /* Goal of next section is to find sequences in the
78  * sorted headerVec of the same ChunkId, which we
79  * can then initiate a FileBuffer with */
80  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
81  if (header_vec.size() > 0) {
82  auto startIt = header_vec.begin();
83  ChunkKey lastChunkKey = startIt->chunkKey;
84  for (auto it = header_vec.begin() + 1; it != header_vec.end(); ++it) {
85  if (it->chunkKey != lastChunkKey) {
86  createBufferFromHeaders(lastChunkKey, startIt, it);
87  lastChunkKey = it->chunkKey;
88  startIt = it;
89  }
90  }
91  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
92  }
93  nextFileId_ = open_files_result.max_file_id + 1;
95  freePages();
96  initializeNumThreads(num_reader_threads);
97  isFullyInitted_ = true;
98 }
99 
101  mapd_unique_lock<mapd_shared_mutex> write_lock(table_dirs_mutex_);
102  bf::path path(fileMgrBasePath_);
103  CHECK(bf::exists(path)) << "Cache path: " << fileMgrBasePath_ << " does not exit.";
104  CHECK(bf::is_directory(path))
105  << "Specified path '" << fileMgrBasePath_ << "' for disk cache is not a directory.";
106 
107  // Look for directories with table-specific names.
108  boost::regex table_filter("table_([0-9]+)_([0-9]+)");
109  for (const auto& file : bf::directory_iterator(path)) {
110  boost::smatch match;
111  auto file_name = file.path().filename().string();
112  if (boost::regex_match(file_name, match, table_filter)) {
113  int32_t db_id = std::stoi(match[1]);
114  int32_t tb_id = std::stoi(match[2]);
115  TablePair table_pair{db_id, tb_id};
116  CHECK(table_dirs_.find(table_pair) == table_dirs_.end())
117  << "Trying to read data for existing table";
118  table_dirs_.emplace(table_pair,
119  std::make_unique<TableFileMgr>(file.path().string()));
120  }
121  }
122 }
123 
124 int32_t CachingFileMgr::epoch(int32_t db_id, int32_t tb_id) const {
125  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
126  auto tables_it = table_dirs_.find({db_id, tb_id});
127  CHECK(tables_it != table_dirs_.end());
128  auto& [pair, table_dir] = *tables_it;
129  return table_dir->getEpoch();
130 }
131 
132 void CachingFileMgr::incrementEpoch(int32_t db_id, int32_t tb_id) {
133  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
134  auto tables_it = table_dirs_.find({db_id, tb_id});
135  CHECK(tables_it != table_dirs_.end());
136  auto& [pair, table_dir] = *tables_it;
137  table_dir->incrementEpoch();
138 }
139 
140 void CachingFileMgr::writeAndSyncEpochToDisk(int32_t db_id, int32_t tb_id) {
141  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
142  auto table_it = table_dirs_.find({db_id, tb_id});
143  CHECK(table_it != table_dirs_.end());
144  table_it->second->writeAndSyncEpochToDisk();
145 }
146 
147 void CachingFileMgr::clearForTable(int32_t db_id, int32_t tb_id) {
148  removeTableBuffers(db_id, tb_id);
149  removeTableFileMgr(db_id, tb_id);
150  freePages();
151 }
152 
153 std::string CachingFileMgr::getTableFileMgrPath(int32_t db_id, int32_t tb_id) const {
154  return getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
155 }
156 
158  {
159  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
161  }
162  {
163  mapd_unique_lock<mapd_shared_mutex> tables_lock(table_dirs_mutex_);
164  table_dirs_.clear();
165  }
166  bf::remove_all(getFileMgrBasePath());
167 }
168 
169 size_t CachingFileMgr::getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
170  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
171  size_t space_used = 0;
172  ChunkKey min_table_key{db_id, tb_id};
173  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
174  for (auto it = chunkIndex_.lower_bound(min_table_key);
175  it != chunkIndex_.upper_bound(max_table_key);
176  ++it) {
177  auto& [key, buffer] = *it;
178  space_used += (buffer->numChunkPages() * defaultPageSize_);
179  }
180  return space_used;
181 }
182 
184  int32_t tb_id) const {
185  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
186  size_t space_used = 0;
187  ChunkKey min_table_key{db_id, tb_id};
188  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
189  for (auto it = chunkIndex_.lower_bound(min_table_key);
190  it != chunkIndex_.upper_bound(max_table_key);
191  ++it) {
192  auto& [key, buffer] = *it;
193  space_used += (buffer->numMetadataPages() * METADATA_PAGE_SIZE);
194  }
195  return space_used;
196 }
197 
198 size_t CachingFileMgr::getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const {
199  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
200  size_t space = 0;
201  auto table_it = table_dirs_.find({db_id, tb_id});
202  if (table_it != table_dirs_.end()) {
203  space += table_it->second->getReservedSpace();
204  }
205  return space;
206 }
207 
208 size_t CachingFileMgr::getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const {
209  auto chunk_space = getChunkSpaceReservedByTable(db_id, tb_id);
210  auto meta_space = getMetadataSpaceReservedByTable(db_id, tb_id);
211  auto subdir_space = getTableFileMgrSpaceReserved(db_id, tb_id);
212  return chunk_space + meta_space + subdir_space;
213 }
214 
215 std::string CachingFileMgr::describeSelf() const {
216  return "cache";
217 }
218 
219 // Similar to FileMgr::checkpoint() but only writes a subset of buffers.
220 void CachingFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
221  {
222  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
223  CHECK(table_dirs_.find({db_id, tb_id}) != table_dirs_.end());
224  }
225  VLOG(2) << "Checkpointing " << describeSelf() << " (" << db_id << ", " << tb_id
226  << ") epoch: " << epoch(db_id, tb_id);
227  writeDirtyBuffers(db_id, tb_id);
228  syncFilesToDisk();
229  writeAndSyncEpochToDisk(db_id, tb_id);
230  incrementEpoch(db_id, tb_id);
231  freePages();
232 }
233 
234 void CachingFileMgr::createTableFileMgrIfNoneExists(const int32_t db_id,
235  const int32_t tb_id) {
236  mapd_unique_lock<mapd_shared_mutex> write_lock(table_dirs_mutex_);
237  TablePair table_pair{db_id, tb_id};
238  if (table_dirs_.find(table_pair) == table_dirs_.end()) {
239  table_dirs_.emplace(
240  table_pair, std::make_unique<TableFileMgr>(getTableFileMgrPath(db_id, tb_id)));
241  }
242 }
243 
244 FileBuffer* CachingFileMgr::createBufferUnlocked(const ChunkKey& key,
245  const size_t page_size,
246  const size_t num_bytes) {
247  touchKey(key);
248  auto [db_id, tb_id] = get_table_prefix(key);
249  createTableFileMgrIfNoneExists(db_id, tb_id);
250  return FileMgr::createBufferUnlocked(key, page_size, num_bytes);
251 }
252 
253 FileBuffer* CachingFileMgr::createBufferFromHeaders(
254  const ChunkKey& key,
255  const std::vector<HeaderInfo>::const_iterator& startIt,
256  const std::vector<HeaderInfo>::const_iterator& endIt) {
257  if (startIt->pageId != -1) {
258  // If the first pageId is not -1 then there is no metadata page for the
259  // current key (which means it was never checkpointed), so we should skip.
260  return nullptr;
261  }
262  touchKey(key);
263  auto [db_id, tb_id] = get_table_prefix(key);
264  createTableFileMgrIfNoneExists(db_id, tb_id);
265  auto buffer = FileMgr::createBufferFromHeaders(key, startIt, endIt);
266  if (buffer->isMissingPages()) {
267  // Detect the case where a page is missing by comparing the amount of pages read
268  // with the metadata size. If data are missing, discard the chunk.
269  buffer->freeChunkPages();
270  }
271  return buffer;
272 }
273 
280 FileBuffer* CachingFileMgr::putBuffer(const ChunkKey& key,
281  AbstractBuffer* src_buffer,
282  const size_t num_bytes) {
283  CHECK(!src_buffer->isDirty()) << "Cannot cache dirty buffers.";
284  deleteBufferIfExists(key);
285  // Since the buffer is not dirty we mark it as dirty if we are only writing metadata and
286  // appended if we are writing chunk data. We delete + append rather than write to make
287  // sure we don't write multiple page versions.
288  (src_buffer->size() == 0) ? src_buffer->setDirty() : src_buffer->setAppended();
289  return FileMgr::putBuffer(key, src_buffer, num_bytes);
290 }
291 
292 void CachingFileMgr::incrementAllEpochs() {
293  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
294  for (auto& table_dir : table_dirs_) {
295  table_dir.second->incrementEpoch();
296  }
297 }
298 
299 void CachingFileMgr::removeTableFileMgr(int32_t db_id, int32_t tb_id) {
300  // Delete table-specific directory (stores table epoch data and serialized data wrapper)
301  mapd_unique_lock<mapd_shared_mutex> write_lock(table_dirs_mutex_);
302  auto it = table_dirs_.find({db_id, tb_id});
303  if (it != table_dirs_.end()) {
304  it->second->removeDiskContent();
305  table_dirs_.erase(it);
306  }
307 }
308 
309 void CachingFileMgr::removeTableBuffers(int32_t db_id, int32_t tb_id) {
310  // Free associated FileBuffers and clear buffer entries.
311  mapd_unique_lock<mapd_shared_mutex> write_lock(chunkIndexMutex_);
312  ChunkKey min_table_key{db_id, tb_id};
313  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
314  for (auto it = chunkIndex_.lower_bound(min_table_key);
315  it != chunkIndex_.upper_bound(max_table_key);) {
316  it = deleteBufferUnlocked(it);
317  }
318 }
319 
320 CachingFileBuffer* CachingFileMgr::allocateBuffer(const size_t page_size,
321  const ChunkKey& key,
322  const size_t num_bytes) {
323  return new CachingFileBuffer(this, page_size, key, num_bytes);
324 }
325 
326 CachingFileBuffer* CachingFileMgr::allocateBuffer(
327  const ChunkKey& key,
328  const std::vector<HeaderInfo>::const_iterator& headerStartIt,
329  const std::vector<HeaderInfo>::const_iterator& headerEndIt) {
330  return new CachingFileBuffer(this, key, headerStartIt, headerEndIt);
331 }
332 
333 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
334 bool CachingFileMgr::updatePageIfDeleted(FileInfo* file_info,
335  ChunkKey& chunk_key,
336  int32_t contingent,
337  int32_t page_epoch,
338  int32_t page_num) {
339  // These contingents are stored by overwriting the bytes used for chunkKeys. If
340  // we run into a key marked for deletion in a fileMgr with no fileMgrKey (i.e.
341  // CachingFileMgr) then we can't know if the epoch is valid because we don't know
342  // the key. At this point our only option is to free the page as though it was
343  // checkpointed (which should be fine since we only maintain one version of each
344  // page).
345  if (contingent == DELETE_CONTINGENT || contingent == ROLLOFF_CONTINGENT) {
346  file_info->freePage(page_num, false, page_epoch);
347  return true;
348  }
349  return false;
350 }
351 
352 void CachingFileMgr::writeDirtyBuffers(int32_t db_id, int32_t tb_id) {
353  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
354  ChunkKey min_table_key{db_id, tb_id};
355  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
356 
357  for (auto chunk_it = chunkIndex_.lower_bound(min_table_key);
358  chunk_it != chunkIndex_.upper_bound(max_table_key);
359  ++chunk_it) {
360  if (auto [key, buf] = *chunk_it; buf->isDirty()) {
361  // Free previous versions first so we only have one metadata version.
362  buf->freeMetadataPages();
363  buf->writeMetadata(epoch(db_id, tb_id));
364  buf->clearDirtyBits();
365  touchKey(key);
366  }
367  }
368 }
369 
370 void CachingFileMgr::deleteBufferIfExists(const ChunkKey& key) {
371  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
372  auto chunk_it = chunkIndex_.find(key);
373  if (chunk_it != chunkIndex_.end()) {
374  deleteBufferUnlocked(chunk_it);
375  }
376 }
377 
378 size_t CachingFileMgr::getNumDataChunks() const {
379  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
380  size_t num_chunks = 0;
381  for (auto [key, buf] : chunkIndex_) {
382  if (buf->hasDataPages()) {
383  num_chunks++;
384  }
385  }
386  return num_chunks;
387 }
388 
389 void CachingFileMgr::deleteCacheIfTooLarge() {
390  if (size_of_dir(fileMgrBasePath_) > max_size_) {
391  closeRemovePhysical();
392  bf::create_directory(fileMgrBasePath_);
393  LOG(INFO) << "Cache path over limit. Existing cache deleted.";
394  }
395 }
396 
397 Page CachingFileMgr::requestFreePage(size_t pageSize, const bool isMetadata) {
398  std::lock_guard<std::mutex> lock(getPageMutex_);
399  int32_t pageNum = -1;
400  // Splits files into metadata and regular data by size.
401  auto candidateFiles = fileIndex_.equal_range(pageSize);
402  // Check if there is a free page in an existing file.
403  for (auto fileIt = candidateFiles.first; fileIt != candidateFiles.second; ++fileIt) {
404  FileInfo* fileInfo = files_.at(fileIt->second);
405  pageNum = fileInfo->getFreePage();
406  if (pageNum != -1) {
407  return (Page(fileInfo->fileId, pageNum));
408  }
409  }
410 
411  // Try to add a new file if there is free space available.
412  FileInfo* fileInfo = nullptr;
413  if (isMetadata) {
414  if (getMaxMetaFiles() > getNumMetaFiles()) {
415  fileInfo = createFile(pageSize, num_pages_per_metadata_file_);
416  }
417  } else {
418  if (getMaxDataFiles() > getNumDataFiles()) {
419  fileInfo = createFile(pageSize, num_pages_per_data_file_);
420  }
421  }
422 
423  if (!fileInfo) {
424  // We were not able to create a new file, so we try to evict space.
425  // Eviction will return the first file it evicted a page from (a file now guaranteed
426  // to have a free page).
427  fileInfo = isMetadata ? evictMetadataPages() : evictPages();
428  }
429  CHECK(fileInfo);
430 
431  pageNum = fileInfo->getFreePage();
432  CHECK(pageNum != -1);
433  return (Page(fileInfo->fileId, pageNum));
434 }
435 
436 std::vector<ChunkKey> CachingFileMgr::getKeysForTable(int32_t db_id,
437  int32_t tb_id) const {
438  std::vector<ChunkKey> keys;
439  ChunkKey min_table_key{db_id, tb_id};
440  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
441  for (auto it = chunkIndex_.lower_bound(min_table_key);
442  it != chunkIndex_.upper_bound(max_table_key);
443  ++it) {
444  keys.emplace_back(it->first);
445  }
446  return keys;
447 }
448 
449 FileInfo* CachingFileMgr::evictMetadataPages() {
450  // Locks should already be in place before calling this method.
451  FileInfo* file_info{nullptr};
452  auto key_to_evict = evict_chunk_or_fail(table_evict_alg_);
453  auto [db_id, tb_id] = get_table_prefix(key_to_evict);
454  const auto keys = getKeysForTable(db_id, tb_id);
455  for (const auto& key : keys) {
456  auto chunk_it = chunkIndex_.find(key);
457  CHECK(chunk_it != chunkIndex_.end());
458  auto& buf = chunk_it->second;
459  if (!file_info) {
460  // Return the FileInfo for the first file we are freeing a page from so that the
461  // caller does not have to search for a FileInfo guaranteed to have at least one
462  // free page.
463  CHECK(buf->getMetadataPage().pageVersions.size() > 0);
464  file_info =
465  getFileInfoForFileId(buf->getMetadataPage().pageVersions.front().page.fileId);
466  }
467  // We erase all pages and entries for the chunk, as without metadata all other
468  // entries are useless.
469  deleteBufferUnlocked(chunk_it);
470  }
471  // Serialized datawrappers require metadata to be in the cache.
472  deleteWrapperFile(db_id, tb_id);
473  CHECK(file_info) << "FileInfo with freed page not found";
474  return file_info;
475 }
476 
477 FileInfo* CachingFileMgr::evictPages() {
478  FileInfo* file_info{nullptr};
479  FileBuffer* buf{nullptr};
480  while (!file_info) {
481  buf = chunkIndex_.at(evict_chunk_or_fail(chunk_evict_alg_));
482  CHECK(buf);
483  if (!buf->hasDataPages()) {
484  // This buffer contains no chunk data (metadata only, uninitialized, size == 0,
485  // etc...) so we won't recover any space by evicting it. In this case it gets
486  // removed from the eviction queue (it will get re-added if it gets populated with
487  // data) and we look at the next chunk in queue until we find a buffer with page
488  // data.
489  continue;
490  }
491  // Return the FileInfo for the first file we are freeing a page from so that the
492  // caller does not have to search for a FileInfo guaranteed to have at least one free
493  // page.
494  CHECK(buf->getMultiPage().front().pageVersions.size() > 0);
495  file_info = getFileInfoForFileId(
496  buf->getMultiPage().front().pageVersions.front().page.fileId);
497  }
498  auto pages_freed = buf->freeChunkPages();
499  CHECK(pages_freed > 0) << "failed to evict a page";
500  CHECK(file_info) << "FileInfo with freed page not found";
501  return file_info;
502 }
503 
504 void CachingFileMgr::touchKey(const ChunkKey& key) const {
505  chunk_evict_alg_.touchChunk(key);
506  table_evict_alg_.touchChunk(get_table_key(key));
507 }
508 
509 void CachingFileMgr::removeKey(const ChunkKey& key) const {
510  // chunkIndex lock should already be acquired.
511  chunk_evict_alg_.removeChunk(key);
512  auto [db_id, tb_id] = get_table_prefix(key);
513  ChunkKey table_key{db_id, tb_id};
514  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
515  for (auto it = chunkIndex_.lower_bound(table_key);
516  it != chunkIndex_.upper_bound(max_table_key);
517  ++it) {
518  if (it->first != key) {
519  // If there are any keys in this table other than that one we are removing, then
520  // keep the table in the eviction queue.
521  return;
522  }
523  }
524  // No other keys exist for this table, so remove it from the queue.
525  table_evict_alg_.removeChunk(table_key);
526 }
527 
528 size_t CachingFileMgr::getFilesSize() const {
529  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
530  size_t sum = 0;
531  for (auto [id, file] : files_) {
532  sum += file->size();
533  }
534  return sum;
535 }
536 
537 size_t CachingFileMgr::getTableFileMgrsSize() const {
538  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
539  size_t space_used = 0;
540  for (const auto& [pair, table_dir] : table_dirs_) {
541  space_used += table_dir->getReservedSpace();
542  }
543  return space_used;
544 }
545 
546 size_t CachingFileMgr::getNumDataFiles() const {
547  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
548  return fileIndex_.count(defaultPageSize_);
549 }
550 
551 size_t CachingFileMgr::getNumMetaFiles() const {
552  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
553  return fileIndex_.count(METADATA_PAGE_SIZE);
554 }
555 
556 std::vector<ChunkKey> CachingFileMgr::getChunkKeysForPrefix(
557  const ChunkKey& prefix) const {
558  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
559  std::vector<ChunkKey> chunks;
560  for (auto [key, buf] : chunkIndex_) {
561  if (in_same_table(key, prefix)) {
562  if (buf->hasDataPages()) {
563  chunks.emplace_back(key);
564  touchKey(key);
565  }
566  }
567  }
568  return chunks;
569 }
570 
571 void CachingFileMgr::removeChunkKeepMetadata(const ChunkKey& key) {
572  if (isBufferOnDevice(key)) {
573  auto chunkIt = chunkIndex_.find(key);
574  CHECK(chunkIt != chunkIndex_.end());
575  auto& buf = chunkIt->second;
576  if (buf->hasDataPages()) {
577  buf->freeChunkPages();
578  chunk_evict_alg_.removeChunk(key);
579  }
580  }
581 }
582 
583 size_t CachingFileMgr::getNumChunksWithMetadata() const {
584  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
585  size_t sum = 0;
586  for (const auto& [key, buf] : chunkIndex_) {
587  if (buf->hasEncoder()) {
588  sum++;
589  }
590  }
591  return sum;
592 }
593 
594 std::string CachingFileMgr::dumpKeysWithMetadata() const {
595  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
596  std::string ret_string = "CFM keys with metadata:\n";
597  for (const auto& [key, buf] : chunkIndex_) {
598  if (buf->hasEncoder()) {
599  ret_string += " " + show_chunk(key) + "\n";
600  }
601  }
602  return ret_string;
603 }
604 
605 std::string CachingFileMgr::dumpKeysWithChunkData() const {
606  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
607  std::string ret_string = "CFM keys with chunk data:\n";
608  for (const auto& [key, buf] : chunkIndex_) {
609  if (buf->hasDataPages()) {
610  ret_string += " " + show_chunk(key) + "\n";
611  }
612  }
613  return ret_string;
614 }
615 
616 std::unique_ptr<CachingFileMgr> CachingFileMgr::reconstruct() const {
617  DiskCacheConfig config{fileMgrBasePath_,
618  DiskCacheLevel::none,
619  num_reader_threads_,
620  max_size_,
621  defaultPageSize_};
622  return std::make_unique<CachingFileMgr>(config);
623 }
624 
625 void CachingFileMgr::deleteWrapperFile(int32_t db, int32_t tb) {
626  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
627  auto it = table_dirs_.find({db, tb});
628  CHECK(it != table_dirs_.end());
629  it->second->deleteWrapperFile();
630 }
631 
632 void CachingFileMgr::writeWrapperFile(const std::string& doc, int32_t db, int32_t tb) {
633  createTableFileMgrIfNoneExists(db, tb);
634  auto wrapper_size = doc.size();
635  CHECK_LE(wrapper_size, getMaxWrapperSize())
636  << "Wrapper is too big to fit into the cache";
637  while (wrapper_size > getAvailableWrapperSpace()) {
638  evictMetadataPages();
639  }
640  mapd_shared_lock<mapd_shared_mutex> read_lock(table_dirs_mutex_);
641  table_dirs_.at({db, tb})->writeWrapperFile(doc);
642 }
643 
644 /*
645  * While the CFM allows for multiple tables to share the same allocated files for chunk
646  * data and metadata, space cannot be reallocated between metadata files and data files
647  * (once the space has been reserve for a data file the file won't be deleted unless the
648  * cache is deleted). To prevent a case where we have allocated too many files of one
649  * type to the detrement of the other, we have a minimum portion of the cache that is
650  * reserved for each type. This default ratio gives %9 of space to data wrappers, %1 to
651  * metadata files, and %90 to data files.
652  */
653 void CachingFileMgr::setMaxSizes() {
654  size_t max_meta_space = std::floor(max_size_ * METADATA_SPACE_PERCENTAGE);
655  size_t max_meta_file_space = std::floor(max_size_ * METADATA_FILE_SPACE_PERCENTAGE);
656  max_wrapper_space_ = max_meta_space - max_meta_file_space;
657  auto max_data_space = max_size_ - max_meta_space;
658  auto meta_file_size = METADATA_PAGE_SIZE * num_pages_per_metadata_file_;
659  auto data_file_size = defaultPageSize_ * num_pages_per_data_file_;
660  max_num_data_files_ = max_data_space / data_file_size;
661  max_num_meta_files_ = max_meta_file_space / meta_file_size;
662  CHECK_GT(max_num_data_files_, 0U) << "Cannot create a cache of size " << max_size_
663  << ". Not enough space to create a data file.";
664  CHECK_GT(max_num_meta_files_, 0U) << "Cannot create a cache of size " << max_size_
665  << ". Not enough space to create a metadata file.";
666 }
667 
668 std::optional<FileBuffer*> CachingFileMgr::getBufferIfExists(const ChunkKey& key) {
669  mapd_shared_lock<mapd_shared_mutex> chunk_index_read_lock(chunkIndexMutex_);
670  auto chunk_it = chunkIndex_.find(key);
671  if (chunk_it == chunkIndex_.end()) {
672  return {};
673  }
674  return getBufferUnlocked(chunk_it);
675 }
676 
677 ChunkKeyToChunkMap::iterator CachingFileMgr::deleteBufferUnlocked(
678  const ChunkKeyToChunkMap::iterator chunk_it,
679  const bool purge) {
680  removeKey(chunk_it->first);
681  return FileMgr::deleteBufferUnlocked(chunk_it, purge);
682 }
683 
684 void CachingFileMgr::getChunkMetadataVecForKeyPrefix(
685  ChunkMetadataVector& chunkMetadataVec,
686  const ChunkKey& keyPrefix) {
687  FileMgr::getChunkMetadataVecForKeyPrefix(chunkMetadataVec, keyPrefix);
688  for (const auto& [key, meta] : chunkMetadataVec) {
689  touchKey(key);
690  }
691 }
692 
693 FileBuffer* CachingFileMgr::getBufferUnlocked(const ChunkKeyToChunkMap::iterator chunk_it,
694  const size_t num_bytes) {
695  touchKey(chunk_it->first);
696  return FileMgr::getBufferUnlocked(chunk_it, num_bytes);
697 }
698 
699 void CachingFileMgr::free_page(std::pair<FileInfo*, int32_t>&& page) {
700  page.first->freePageDeferred(page.second);
701 }
702 
703 std::set<ChunkKey> CachingFileMgr::getKeysWithMetadata() const {
704  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
705  std::set<ChunkKey> ret;
706  for (const auto& [key, buf] : chunkIndex_) {
707  if (buf->hasEncoder()) {
708  ret.emplace(key);
709  }
710  }
711  return ret;
712 }
713 
714 TableFileMgr::TableFileMgr(const std::string& table_path)
715  : table_path_(table_path)
716  , epoch_file_path_(table_path_ + "/" + FileMgr::EPOCH_FILENAME)
717  , wrapper_file_path_(table_path_ + "/" + CachingFileMgr::WRAPPER_FILE_NAME)
718  , epoch_(Epoch())
719  , is_checkpointed_(true) {
720  if (!bf::exists(table_path_)) {
721  bf::create_directory(table_path_);
722  } else {
723  CHECK(bf::is_directory(table_path_)) << "Specified path '" << table_path_
724  << "' for cache table data is not a directory.";
725  }
726  if (bf::exists(epoch_file_path_)) {
727  CHECK(bf::is_regular_file(epoch_file_path_))
728  << "Found epoch file '" << epoch_file_path_ << "' which is not a regular file";
730  << "Found epoch file '" << epoch_file_path_ << "' which is not of expected size";
733  } else {
736  incrementEpoch();
737  }
738 }
739 
741  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
742  epoch_.increment();
743  is_checkpointed_ = false;
745  << "Epoch greater than maximum allowed value (" << epoch_.ceiling() << " > "
746  << Epoch::max_allowable_epoch() << ").";
747 }
748 
749 int32_t TableFileMgr::getEpoch() const {
750  mapd_shared_lock<mapd_shared_mutex> r_lock(table_mutex_);
751  return static_cast<int32_t>(epoch_.ceiling());
752 }
753 
755  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
757  int32_t status = fflush(epoch_file_);
758  CHECK(status == 0) << "Could not flush epoch file to disk";
759 #ifdef __APPLE__
760  status = fcntl(fileno(epoch_file_), 51);
761 #else
762  status = omnisci::fsync(fileno(epoch_file_));
763 #endif
764  CHECK(status == 0) << "Could not sync epoch file to disk";
765  is_checkpointed_ = true;
766 }
767 
769  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
770  bf::remove_all(table_path_);
771 }
772 
774  mapd_shared_lock<mapd_shared_mutex> r_lock(table_mutex_);
775  size_t space = 0;
776  for (const auto& file : bf::recursive_directory_iterator(table_path_)) {
777  if (bf::is_regular_file(file.path())) {
778  space += bf::file_size(file.path());
779  }
780  }
781  return space;
782 }
783 
785  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
786  bf::remove_all(wrapper_file_path_);
787 }
788 
789 void TableFileMgr::writeWrapperFile(const std::string& doc) const {
790  mapd_unique_lock<mapd_shared_mutex> w_lock(table_mutex_);
791  std::ofstream ofs(wrapper_file_path_);
792  if (!ofs) {
793  throw std::runtime_error{"Error trying to create file \"" + wrapper_file_path_ +
794  "\". The error was: " + std::strerror(errno)};
795  }
796  ofs << doc;
797 }
798 
799 } // namespace File_Namespace
size_t getTableFileMgrSpaceReserved(int32_t db_id, int32_t tb_id) const
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
OpenFilesResult openFiles()
Definition: FileMgr.cpp:189
int8_t * storage_ptr()
Definition: Epoch.h:61
void removeDiskContent() const
Removes all disk data for the subdir.
const ChunkKey evictNextChunk() override
size_t size_of_dir(const std::string &dir)
std::string get_dir_name_for_table(int db_id, int tb_id)
A logical page (Page) belongs to a file on disk.
Definition: Page.h:46
#define LOG(tag)
Definition: Logger.h:203
void setMaxSizes()
Sets the maximum number of files/space for each type of storage based on the maximum size...
mapd_shared_mutex table_dirs_mutex_
void writeAndSyncEpochToDisk()
Write and flush the epoch to the epoch file on disk.
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::string describeSelf() const override
describes this FileMgr for logging purposes.
size_t getSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
void closeRemovePhysical() override
Closes files and removes the caching directory.
std::string getFileMgrBasePath() const
Definition: FileMgr.h:325
size_t getMetadataSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
int32_t ceiling() const
Definition: Epoch.h:44
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:58
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:678
static int64_t max_allowable_epoch()
Definition: Epoch.h:69
#define CHECK_GT(x, y)
Definition: Logger.h:221
std::string fileMgrBasePath_
Definition: FileMgr.h:388
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:53
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:49
std::string show_chunk(const ChunkKey &key)
Definition: types.h:86
CachingFileMgr(const DiskCacheConfig &config)
void freePage(int32_t pageId, const bool isRolloff, int32_t epoch)
Definition: FileInfo.cpp:178
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
mapd_shared_mutex table_mutex_
void deleteCacheIfTooLarge()
When the cache is read from disk, we don&#39;t know which chunks were least recently used. Rather than try to evict random pages to get down to size we just reset the cache to make sure we have space.
void incrementAllEpochs()
Increment epochs for each table in the CFM.
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:133
int32_t incrementEpoch()
Definition: FileMgr.h:275
int32_t increment()
Definition: Epoch.h:54
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
Definition: FileInfo.h:51
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:320
void removeTableBuffers(int32_t db_id, int32_t tb_id)
Erases and cleans up all buffers for a table.
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
void deleteWrapperFile() const
Deletes only the wrapper file on disk.
An AbstractBuffer is a unit of data management for a data manager.
void incrementEpoch()
increment the epoch for this subdir (not synced to disk).
int fsync(int fd)
Definition: omnisci_fs.cpp:60
bool g_enable_smem_group_by true
void writeAndSyncEpochToDisk()
Definition: FileMgr.cpp:631
void removeTableFileMgr(int32_t db_id, int32_t tb_id)
Removes the subdirectory content for a table.
constexpr int32_t ROLLOFF_CONTINGENT
Definition: FileInfo.h:52
ChunkKey evict_chunk_or_fail(LRUEvictionAlgorithm &alg)
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:394
int32_t maxRollbackEpochs_
Definition: FileMgr.h:387
void writeWrapperFile(const std::string &doc) const
Writes wrapper file to disk.
#define CHECK_LE(x, y)
Definition: Logger.h:220
Definition: Epoch.h:30
static size_t byte_size()
Definition: Epoch.h:63
size_t getReservedSpace() const
Returns the disk space used (in bytes) for the subdir.
std::map< TablePair, std::unique_ptr< TableFileMgr > > table_dirs_
void readTableFileMgrs()
Checks for any sub-directories containing table-specific data and creates epochs from found files...
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:58
mapd_shared_lock< mapd_shared_mutex > read_lock
FILE * open(int fileId)
Opens/creates the file with the given id; returns NULL on error.
Definition: File.cpp:98
int32_t epoch() const
Definition: FileMgr.h:508
#define CHECK(condition)
Definition: Logger.h:209
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1513
void init(const size_t num_reader_threads)
Initializes a CFM, parsing any existing files and initializing data structures appropriately (current...
mapd_unique_lock< mapd_shared_mutex > write_lock
bool in_same_table(const ChunkKey &left_key, const ChunkKey &right_key)
Definition: types.h:79
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:86
size_t getChunkSpaceReservedByTable(int32_t db_id, int32_t tb_id) const
mapd_shared_mutex chunkIndexMutex_
Definition: FileMgr.h:402
int32_t getEpoch() const
Returns the current epoch (locked)
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:403
std::string getTableFileMgrPath(int32_t db, int32_t tb) const
void clearForTable(int32_t db_id, int32_t tb_id)
Removes all data related to the given table (pages and subdirectories).
A FileMgr capable of limiting it&#39;s size and storing data from multiple tables in a shared directory...
#define VLOG(n)
Definition: Logger.h:303
FileBuffer * createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &startIt, const std::vector< HeaderInfo >::const_iterator &endIt) override
Creates a buffer and initializes it with info read from files on disk.
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31