OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CachingFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 Omnisci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
22 #include <boost/filesystem.hpp>
23 #include "Shared/misc.h"
24 
25 namespace File_Namespace {
26 namespace bf = boost::filesystem;
27 
28 CachingFileMgr::CachingFileMgr(const std::string& base_path,
29  const size_t num_reader_threads,
30  const size_t default_page_size) {
31  fileMgrBasePath_ = base_path;
33  defaultPageSize_ = default_page_size;
34  nextFileId_ = 0;
35  init(num_reader_threads);
36 }
37 
39 
40 void CachingFileMgr::init(const size_t num_reader_threads) {
41  readTableDirs();
42  auto open_files_result = openFiles();
43  /* Sort headerVec so that all HeaderInfos
44  * from a chunk will be grouped together
45  * and in order of increasing PageId
46  * - Version Epoch */
47  auto& header_vec = open_files_result.header_infos;
48  std::sort(header_vec.begin(), header_vec.end());
49 
50  /* Goal of next section is to find sequences in the
51  * sorted headerVec of the same ChunkId, which we
52  * can then initiate a FileBuffer with */
53  VLOG(3) << "Number of Headers in Vector: " << header_vec.size();
54  if (header_vec.size() > 0) {
55  auto startIt = header_vec.begin();
56  ChunkKey lastChunkKey = startIt->chunkKey;
57  for (auto it = header_vec.begin() + 1; it != header_vec.end(); ++it) {
58  if (it->chunkKey != lastChunkKey) {
59  createBufferFromHeaders(lastChunkKey, startIt, it);
60  lastChunkKey = it->chunkKey;
61  startIt = it;
62  }
63  }
64  createBufferFromHeaders(lastChunkKey, startIt, header_vec.end());
65  }
66 
67  nextFileId_ = open_files_result.max_file_id + 1;
69  freePages();
70  initializeNumThreads(num_reader_threads);
71  isFullyInitted_ = true;
72 }
73 
79  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
80  bf::path path(fileMgrBasePath_);
81  CHECK(bf::exists(path)) << "Cache path: " << fileMgrBasePath_ << " does not exit.";
82  CHECK(bf::is_directory(path))
83  << "Specified path '" << fileMgrBasePath_ << "' for disk cache is not a directory.";
84 
85  // Look for directories with table-specific names.
86  boost::regex table_filter("table_([0-9]+)_([0-9]+)");
87  for (const auto& file : bf::directory_iterator(path)) {
88  boost::smatch match;
89  auto file_name = file.path().filename().string();
90  if (boost::regex_match(file_name, match, table_filter)) {
91  int32_t db_id = std::stoi(match[1]);
92  int32_t tb_id = std::stoi(match[2]);
93  CHECK(table_epochs_.find({db_id, tb_id}) == table_epochs_.end())
94  << "Trying to read epoch for existing table";
95  openOrCreateEpochIfNotExists(db_id, tb_id);
96  }
97  }
98 }
99 
100 int32_t CachingFileMgr::epoch(int32_t db_id, int32_t tb_id) const {
101  mapd_shared_lock<mapd_shared_mutex> read_lock(epochs_mutex_);
102  auto table_epoch_it = table_epochs_.find({db_id, tb_id});
103  CHECK(table_epoch_it != table_epochs_.end());
104  auto& [pair, epochInfo] = *table_epoch_it;
105  return static_cast<int32_t>(epochInfo->epoch.ceiling());
106 }
107 
108 void CachingFileMgr::incrementEpoch(int32_t db_id, int32_t tb_id) {
109  mapd_shared_lock<mapd_shared_mutex> read_lock(epochs_mutex_);
110  auto epochs_it = table_epochs_.find({db_id, tb_id});
111  CHECK(epochs_it != table_epochs_.end());
112  auto& [pair, epochInfo] = *epochs_it;
113  epochInfo->increment();
114 }
115 
116 void CachingFileMgr::openOrCreateEpochIfNotExists(int32_t db_id, int32_t tb_id) {
117  mapd_unique_lock<mapd_shared_mutex> epoch_lock(epochs_mutex_);
118  TablePair table_pair{db_id, tb_id};
119  if (table_epochs_.find(table_pair) == table_epochs_.end()) {
120  openAndReadEpochFileUnlocked(db_id, tb_id);
121  }
122 }
123 
124 void CachingFileMgr::openAndReadEpochFileUnlocked(int32_t db_id, int32_t tb_id) {
125  TablePair table_pair{db_id, tb_id};
126  auto table_epoch_it = table_epochs_.find(table_pair);
127  if (table_epoch_it == table_epochs_.end()) {
128  std::string epoch_file_path(getOrAddTableDirUnlocked(db_id, tb_id) + "/" +
129  EPOCH_FILENAME);
130  if (!bf::exists(epoch_file_path)) {
131  // Epoch file was missing or malformed. Create a new one.
132  createEpochFileUnlocked(db_id, tb_id);
133  return;
134  } else {
135  CHECK(bf::is_regular_file(epoch_file_path))
136  << "Found epoch file '" << epoch_file_path << "' which is not a regular file";
137  CHECK(bf::file_size(epoch_file_path) == Epoch::byte_size())
138  << "Found epoch file '" << epoch_file_path << "' which is not of expected size";
139  }
140  table_epochs_.emplace(table_pair, std::make_unique<EpochInfo>(open(epoch_file_path)));
141  }
142  table_epoch_it = table_epochs_.find(table_pair);
143  auto& [epoch, epoch_file, is_checkpointed] = *(table_epoch_it->second);
144  read(epoch_file, 0, Epoch::byte_size(), epoch.storage_ptr());
145 }
146 
147 void CachingFileMgr::createEpochFileUnlocked(int32_t db_id, int32_t tb_id) {
148  std::string epoch_file_path(getOrAddTableDirUnlocked(db_id, tb_id) + "/" +
149  EPOCH_FILENAME);
150  CHECK(!bf::exists(epoch_file_path)) << "Can't create epoch file. File already exists";
151  TablePair table_pair{db_id, tb_id};
152  table_epochs_.emplace(
153  table_pair,
154  std::make_unique<EpochInfo>(create(epoch_file_path, sizeof(Epoch::byte_size()))));
155  writeAndSyncEpochToDisk(db_id, tb_id);
156  table_epochs_.at(table_pair)->increment();
157 }
158 
159 void CachingFileMgr::writeAndSyncEpochToDisk(int32_t db_id, int32_t tb_id) {
160  auto epochs_it = table_epochs_.find({db_id, tb_id});
161  CHECK(epochs_it != table_epochs_.end());
162  auto& [pair, epoch_info] = *epochs_it;
163  auto& [epoch, epoch_file, is_checkpointed] = *epoch_info;
164  write(epoch_file, 0, Epoch::byte_size(), epoch.storage_ptr());
165  int32_t status = fflush(epoch_file);
166  CHECK(status == 0) << "Could not flush epoch file to disk";
167 #ifdef __APPLE__
168  status = fcntl(fileno(epoch_file), 51);
169 #else
170  status = omnisci::fsync(fileno(epoch_file));
171 #endif
172  CHECK(status == 0) << "Could not sync epoch file to disk";
173  is_checkpointed = true;
174 }
175 
176 void CachingFileMgr::clearForTable(int32_t db_id, int32_t tb_id) {
177  removeTableBuffers(db_id, tb_id);
178  removeTableDirectory(db_id, tb_id);
179  freePages();
180 }
181 
182 std::string CachingFileMgr::getOrAddTableDir(int db_id, int tb_id) {
183  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
184  return getOrAddTableDirUnlocked(db_id, tb_id);
185 }
186 
187 std::string CachingFileMgr::getOrAddTableDirUnlocked(int db_id, int tb_id) {
188  std::string table_dir =
189  getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
190  if (!bf::exists(table_dir)) {
191  bf::create_directory(table_dir);
192  } else {
193  if (!bf::is_directory(table_dir)) {
194  LOG(FATAL) << "Specified path '" << table_dir
195  << "' for cache table data is not a directory.";
196  }
197  }
198  return table_dir;
199 }
200 
201 void CachingFileMgr::closeRemovePhysical() {
202  mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
203  closePhysicalUnlocked();
204  table_epochs_.clear();
205  auto dir_name = getFileMgrBasePath();
206  if (bf::exists(dir_name)) {
207  bf::remove_all(dir_name);
208  }
209 }
210 
211 uint64_t CachingFileMgr::getChunkSpaceReservedByTable(int db_id, int tb_id) {
212  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
213  uint64_t space_used = 0;
214  ChunkKey min_table_key{db_id, tb_id};
215  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
216  for (auto it = chunkIndex_.lower_bound(min_table_key);
217  it != chunkIndex_.upper_bound(max_table_key);
218  ++it) {
219  auto& [key, buffer] = *it;
220  space_used += (buffer->numChunkPages() * defaultPageSize_);
221  }
222  return space_used;
223 }
224 
225 uint64_t CachingFileMgr::getMetadataSpaceReservedByTable(int db_id, int tb_id) {
226  mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
227  uint64_t space_used = 0;
228  ChunkKey min_table_key{db_id, tb_id};
229  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
230  for (auto it = chunkIndex_.lower_bound(min_table_key);
231  it != chunkIndex_.upper_bound(max_table_key);
232  ++it) {
233  auto& [key, buffer] = *it;
234  space_used += (buffer->numMetadataPages() * METADATA_PAGE_SIZE);
235  }
236  return space_used;
237 }
238 
239 uint64_t CachingFileMgr::getWrapperSpaceReservedByTable(int db_id, int tb_id) {
240  mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
241  uint64_t space_used = 0;
242  std::string table_dir =
243  getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
244  if (bf::exists(table_dir)) {
245  for (const auto& file : bf::recursive_directory_iterator(table_dir)) {
246  if (bf::is_regular_file(file.path())) {
247  space_used += bf::file_size(file.path());
248  }
249  }
250  }
251  return space_used;
252 }
253 
254 uint64_t CachingFileMgr::getSpaceReservedByTable(int db_id, int tb_id) {
255  auto chunk_space = getChunkSpaceReservedByTable(db_id, tb_id);
256  auto meta_space = getMetadataSpaceReservedByTable(db_id, tb_id);
257  auto wrapper_space = getWrapperSpaceReservedByTable(db_id, tb_id);
258  return chunk_space + meta_space + wrapper_space;
259 }
260 
261 std::string CachingFileMgr::describeSelf() {
262  return "cache";
263 }
264 
265 // Similar to FileMgr::checkpoint() but only writes a subset of buffers.
266 void CachingFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
267  VLOG(2) << "Checkpointing " << describeSelf() << " (" << db_id << ", " << tb_id
268  << ") epoch: " << epoch(db_id, tb_id);
269  writeDirtyBuffers(db_id, tb_id);
270  syncFilesToDisk();
271  writeAndSyncEpochToDisk(db_id, tb_id);
272  incrementEpoch(db_id, tb_id);
273  freePages();
274 }
275 
276 FileBuffer* CachingFileMgr::createBufferUnlocked(const ChunkKey& key,
277  size_t page_size,
278  const size_t num_bytes) {
279  auto [db_id, tb_id] = get_table_prefix(key);
280  // We need to have an epoch to correspond to each table for which we have buffers.
281  openOrCreateEpochIfNotExists(db_id, tb_id);
282  return FileMgr::createBufferUnlocked(key, page_size, num_bytes);
283 }
284 
285 void CachingFileMgr::createBufferFromHeaders(
286  const ChunkKey& key,
287  const std::vector<HeaderInfo>::const_iterator& startIt,
288  const std::vector<HeaderInfo>::const_iterator& endIt) {
289  if (startIt->pageId == -1) {
290  // If the first pageId is not -1 then there is no metadata page for the
291  // current key (which means it was never checkpointed), so we should skip.
292 
293  // Need to acquire chunkIndexMutex_ lock first to avoid lock order cycles.
294  mapd_unique_lock<mapd_shared_mutex> chunk_lock(chunkIndexMutex_);
295  auto [db_id, tb_id] = get_table_prefix(key);
296  mapd_shared_lock<mapd_shared_mutex> epochs_lock(epochs_mutex_);
297  CHECK(table_epochs_.find({db_id, tb_id}) != table_epochs_.end());
298  CHECK(chunkIndex_.find(key) == chunkIndex_.end());
299  chunkIndex_[key] = new CachingFileBuffer(this, key, startIt, endIt);
300 
301  auto buffer = chunkIndex_.at(key);
302  if (buffer->isMissingPages()) {
303  // Detect the case where a page is missing by comparing the amount of pages read
304  // with the metadata size. If data are missing, discard the chunk.
305  buffer->freeChunkPages();
306  }
307  }
308 }
309 
316 FileBuffer* CachingFileMgr::putBuffer(const ChunkKey& key,
317  AbstractBuffer* src_buffer,
318  const size_t num_bytes) {
319  deleteBufferIfExists(key);
320  return FileMgr::putBuffer(key, src_buffer, num_bytes);
321 }
322 
323 void CachingFileMgr::incrementAllEpochs() {
324  mapd_shared_lock<mapd_shared_mutex> read_lock(epochs_mutex_);
325  for (auto& [key, epochInfo] : table_epochs_) {
326  epochInfo->increment();
327  }
328 }
329 
330 void CachingFileMgr::removeTableDirectory(int32_t db_id, int32_t tb_id) {
331  // Delete table-specific directory (stores table epoch data and serialized data wrapper)
332  mapd_unique_lock<mapd_shared_mutex> write_lock(epochs_mutex_);
333  table_epochs_.erase({db_id, tb_id});
334  auto dir_name = getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
335  bf::remove_all(dir_name);
336 }
337 
338 void CachingFileMgr::removeTableBuffers(int32_t db_id, int32_t tb_id) {
339  // Free associated FileBuffers and clear buffer entries.
340  mapd_unique_lock<mapd_shared_mutex> write_lock(chunkIndexMutex_);
341  ChunkKey min_table_key{db_id, tb_id};
342  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
343  for (auto it = chunkIndex_.lower_bound(min_table_key);
344  it != chunkIndex_.upper_bound(max_table_key);) {
345  auto& [key, buffer] = *it;
346  buffer->freePages();
347  delete buffer;
348  it = chunkIndex_.erase(it);
349  }
350 }
351 
352 CachingFileBuffer* CachingFileMgr::allocateBuffer(const size_t page_size,
353  const ChunkKey& key,
354  const size_t num_bytes) {
355  return new CachingFileBuffer(this, page_size, key, num_bytes);
356 }
357 
358 // Checks if a page should be deleted or recovered. Returns true if page was deleted.
359 bool CachingFileMgr::updatePageIfDeleted(FileInfo* file_info,
360  ChunkKey& chunk_key,
361  int32_t contingent,
362  int32_t page_epoch,
363  int32_t page_num) {
364  // These contingents are stored by overwriting the bytes used for chunkKeys. If
365  // we run into a key marked for deletion in a fileMgr with no fileMgrKey (i.e.
366  // CachingFileMgr) then we can't know if the epoch is valid because we don't know
367  // the key. At this point our only option is to free the page as though it was
368  // checkpointed (which should be fine since we only maintain one version of each
369  // page).
370  if (contingent == DELETE_CONTINGENT || contingent == ROLLOFF_CONTINGENT) {
371  file_info->freePageImmediate(page_num);
372  return true;
373  }
374  return false;
375 }
376 
377 void CachingFileMgr::writeDirtyBuffers(int32_t db_id, int32_t tb_id) {
378  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
379  ChunkKey min_table_key{db_id, tb_id};
380  ChunkKey max_table_key{db_id, tb_id, std::numeric_limits<int32_t>::max()};
381 
382  for (auto chunkIt = chunkIndex_.lower_bound(min_table_key);
383  chunkIt != chunkIndex_.upper_bound(max_table_key);
384  ++chunkIt) {
385  if (chunkIt->second->isDirty()) {
386  // Free previous versions first so we only have one metadata version.
387  chunkIt->second->freeMetadataPages();
388  chunkIt->second->writeMetadata(epoch(db_id, tb_id));
389  chunkIt->second->clearDirtyBits();
390  }
391  }
392 }
393 
394 void CachingFileMgr::deleteBufferIfExists(const ChunkKey& key) {
395  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunkIndexMutex_);
396  auto chunk_it = chunkIndex_.find(key);
397  if (chunk_it != chunkIndex_.end()) {
398  deleteBufferUnlocked(chunk_it);
399  }
400 }
401 
402 } // namespace File_Namespace
#define METADATA_PAGE_SIZE
Definition: FileBuffer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
OpenFilesResult openFiles()
Definition: FileMgr.cpp:189
int open(const char *path, int flags, int mode)
Definition: omnisci_fs.cpp:64
std::string get_dir_name_for_table(int db_id, int tb_id)
#define LOG(tag)
Definition: Logger.h:194
void freePageImmediate(int32_t page_num)
Definition: FileInfo.cpp:244
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:58
std::string fileMgrBasePath_
Definition: FileMgr.h:388
FILE * create(const std::string &basePath, const int fileId, const size_t pageSize, const size_t numPages)
Definition: File.cpp:49
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:141
void createBufferFromHeaders(const ChunkKey &key, const std::vector< HeaderInfo >::const_iterator &startIt, const std::vector< HeaderInfo >::const_iterator &endIt)
size_t read(FILE *f, const size_t offset, const size_t size, int8_t *buf)
Reads the specified number of bytes from the offset position in file f into buf.
Definition: File.cpp:133
constexpr int32_t DELETE_CONTINGENT
A FileInfo type has a file pointer and metadata about a file.
Definition: FileInfo.h:51
An AbstractBuffer is a unit of data management for a data manager.
int fsync(int fd)
Definition: omnisci_fs.cpp:60
void openOrCreateEpochIfNotExists(int32_t db_id, int32_t tb_id)
constexpr int32_t ROLLOFF_CONTINGENT
Definition: FileInfo.h:52
size_t defaultPageSize_
number of threads used when loading data
Definition: FileMgr.h:394
int32_t maxRollbackEpochs_
Definition: FileMgr.h:387
static size_t byte_size()
Definition: Epoch.h:63
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:57
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:203
void initializeNumThreads(size_t num_reader_threads=0)
Definition: FileMgr.cpp:1544
void init(const size_t num_reader_threads)
mapd_unique_lock< mapd_shared_mutex > write_lock
CachingFileMgr(const std::string &base_path, const size_t num_reader_threads=0, const size_t default_page_size=DEFAULT_PAGE_SIZE)
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:86
std::map< TablePair, std::unique_ptr< EpochInfo > > table_epochs_
mapd_shared_mutex files_rw_mutex_
Definition: FileMgr.h:403
#define VLOG(n)
Definition: Logger.h:297
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31