OmniSciDB  2e3a973ef4
ForeignStorageCache.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  TODO(Misiu): A lot of methods here can be made asyncronous. It may be worth an
19  investigation to determine if it's worth adding async versions of them for performance
20  reasons.
21 */
22 
23 #include "ForeignStorageCache.h"
24 #include "Shared/File.h"
25 #include "Shared/measure.h"
26 
27 namespace foreign_storage {
28 using read_lock = mapd_shared_lock<mapd_shared_mutex>;
29 using write_lock = mapd_unique_lock<mapd_shared_mutex>;
30 
31 namespace {
32 template <typename Func, typename T>
34  T& chunk_collection,
35  const ChunkKey& chunk_prefix) {
36  ChunkKey upper_prefix(chunk_prefix);
37  upper_prefix.push_back(std::numeric_limits<int>::max());
38  auto end_it = chunk_collection.upper_bound(static_cast<const ChunkKey>(upper_prefix));
39  for (auto chunk_it = chunk_collection.lower_bound(chunk_prefix); chunk_it != end_it;
40  ++chunk_it) {
41  func(*chunk_it);
42  }
43 }
44 
46  buffer->initEncoder(meta->sqlType);
47  buffer->setSize(meta->numBytes);
48  buffer->getEncoder()->setNumElems(meta->numElements);
49  buffer->getEncoder()->resetChunkStats(meta->chunkStats);
50  buffer->setUpdated();
51 }
52 } // namespace
53 
55  : num_chunks_added_(0), num_metadata_added_(0), max_cached_bytes_(config.size_limit) {
56  validatePath(config.path);
57  global_file_mgr_ = std::make_unique<File_Namespace::GlobalFileMgr>(
58  0, config.path, config.num_reader_threads);
59  setLimit(config.size_limit);
60 }
61 
63  write_lock meta_lock(metadata_mutex_);
64  write_lock chunk_lock(chunks_mutex_);
65  if (cached_metadata_.find(chunk_key) != cached_metadata_.end()) {
66  const auto& tracker_it = eviction_tracker_map_.find(get_table_key(chunk_key));
67  if (tracker_it != eviction_tracker_map_.end()) {
68  tracker_it->second.eviction_alg_->removeChunk(chunk_key);
69  }
70  global_file_mgr_->deleteBuffer(chunk_key);
71  cached_chunks_.erase(chunk_key);
72  cached_metadata_.erase(chunk_key);
73  }
74 }
75 
76 void ForeignStorageCache::cacheChunk(const ChunkKey& chunk_key, AbstractBuffer* buffer) {
77  auto timer = DEBUG_TIMER(__func__);
78  write_lock meta_lock(metadata_mutex_);
79  write_lock chunk_lock(chunks_mutex_);
80  // We should only be caching buffers that are in sync with storage.
81  CHECK(!buffer->isDirty());
82  if (insertChunkIntoEvictionAlg(chunk_key, buffer->size())) {
83  buffer->setUpdated();
85  global_file_mgr_->putBuffer(chunk_key, buffer);
86  global_file_mgr_->checkpoint();
87  // TODO(Misiu): This needs to happen even if insertChunkIntoEvictionAlg() fails.
88  cached_metadata_.emplace(chunk_key);
89  }
90  CHECK(!buffer->isDirty());
91 }
92 
93 void ForeignStorageCache::cacheTableChunks(const std::vector<ChunkKey>& chunk_keys) {
94  auto timer = DEBUG_TIMER(__func__);
96  CHECK(!chunk_keys.empty());
97 
98  auto db_id = chunk_keys[0][CHUNK_KEY_DB_IDX];
99  auto table_id = chunk_keys[0][CHUNK_KEY_TABLE_IDX];
100  const ChunkKey table_key{db_id, table_id};
101 
103  for (const auto& chunk_key : chunk_keys) {
104  CHECK_EQ(db_id, chunk_key[CHUNK_KEY_DB_IDX]);
105  CHECK_EQ(table_id, chunk_key[CHUNK_KEY_TABLE_IDX]);
106  CHECK(global_file_mgr_->isBufferOnDevice(chunk_key));
108  insertChunkIntoEvictionAlg(chunk_key, global_file_mgr_->getBuffer(chunk_key)->size());
109  }
110  global_file_mgr_->checkpoint(db_id, table_id);
111 }
112 
114  auto timer = DEBUG_TIMER(__func__);
115  {
116  read_lock lock(chunks_mutex_);
117  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
118  return nullptr;
119  }
120  }
122  const auto& eviction_tracker_it = eviction_tracker_map_.find(get_table_key(chunk_key));
123  if (eviction_tracker_it != eviction_tracker_map_.end()) {
124  eviction_tracker_it->second.eviction_alg_->touchChunk(chunk_key);
125  }
126 
127  return global_file_mgr_->getBuffer(chunk_key);
128 }
129 
130 bool ForeignStorageCache::isMetadataCached(const ChunkKey& chunk_key) const {
131  auto timer = DEBUG_TIMER(__func__);
133  return (cached_metadata_.find(chunk_key) != cached_metadata_.end());
134 }
135 
137  const ChunkKey& table_key) {
139  CHECK(meta_vec.size() == 0);
140  CHECK(is_table_key(table_key));
141  CHECK(dynamic_cast<File_Namespace::FileMgr*>(global_file_mgr_->getFileMgr(table_key)));
142 
144  global_file_mgr_->getChunkMetadataVecForKeyPrefix(meta_vec, table_key);
145  for (auto& [chunk_key, metadata] : meta_vec) {
146  cached_metadata_.emplace(chunk_key);
147  // If there is no page count then the chunk was metadata only and should not be
148  // cached.
149  if (const auto& buf = global_file_mgr_->getBuffer(chunk_key); buf->pageCount() > 0) {
150  insertChunkIntoEvictionAlg(chunk_key, buf->size());
151  }
152  }
153  return (meta_vec.size() > 0);
154 }
155 
157  write_lock chunk_lock(chunks_mutex_);
158  evictThenEraseChunkUnlocked(chunk_key);
159 }
160 
162  const ChunkKey table_prefix = get_table_key(chunk_key);
163  const auto& eviction_tracker_it = eviction_tracker_map_.find(table_prefix);
164  if (eviction_tracker_it != eviction_tracker_map_.end()) {
165  eviction_tracker_it->second.eviction_alg_->removeChunk(chunk_key);
166  eraseChunk(chunk_key, eviction_tracker_it->second);
167  }
168 }
169 
171  auto timer = DEBUG_TIMER(__func__);
172  write_lock meta_lock(metadata_mutex_);
173  write_lock chunk_lock(chunks_mutex_);
174  for (auto& [chunk_key, metadata] : metadata_vec) {
175  cached_metadata_.emplace(chunk_key);
176  AbstractBuffer* buf;
177  AbstractBuffer* index_buffer = nullptr;
178  ChunkKey index_chunk_key;
179  if (is_varlen_key(chunk_key)) {
180  // For variable length chunks, metadata is associated with the data chunk
181  CHECK(is_varlen_data_key(chunk_key));
182  index_chunk_key = {chunk_key[CHUNK_KEY_DB_IDX],
183  chunk_key[CHUNK_KEY_TABLE_IDX],
184  chunk_key[CHUNK_KEY_COLUMN_IDX],
185  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
186  2};
187  }
188 
189  if (!global_file_mgr_->isBufferOnDevice(chunk_key)) {
190  buf = global_file_mgr_->createBuffer(chunk_key);
191 
192  if (!index_chunk_key.empty()) {
193  CHECK(!global_file_mgr_->isBufferOnDevice(index_chunk_key));
194  index_buffer = global_file_mgr_->createBuffer(index_chunk_key);
195  CHECK(index_buffer);
196  }
197  } else {
198  buf = global_file_mgr_->getBuffer(chunk_key);
199 
200  if (!index_chunk_key.empty()) {
201  CHECK(global_file_mgr_->isBufferOnDevice(index_chunk_key));
202  index_buffer = global_file_mgr_->getBuffer(index_chunk_key);
203  CHECK(index_buffer);
204  }
205  }
206 
207  set_metadata_for_buffer(buf, metadata.get());
208  evictThenEraseChunkUnlocked(chunk_key);
209 
210  if (!index_chunk_key.empty()) {
211  CHECK(index_buffer);
212  index_buffer->setUpdated();
213  evictThenEraseChunkUnlocked(index_chunk_key);
214  }
216  }
217  global_file_mgr_->checkpoint();
218 }
219 
221  ChunkMetadataVector& metadata_vec,
222  const ChunkKey& chunk_prefix) const {
223  auto timer = DEBUG_TIMER(__func__);
224  read_lock r_lock(metadata_mutex_);
226  [&metadata_vec, this](auto chunk) {
227  std::shared_ptr<ChunkMetadata> buf_metadata = std::make_shared<ChunkMetadata>();
228  global_file_mgr_->getBuffer(chunk)->getEncoder()->getMetadata(buf_metadata);
229  metadata_vec.push_back(std::make_pair(chunk, buf_metadata));
230  },
232  chunk_prefix);
233 }
234 
236  const ChunkKey& chunk_prefix) const {
237  auto timer = DEBUG_TIMER(__func__);
239  // We don't use iterateOvermatchingPrefix() here because we want to exit early if
240  // possible.
241  ChunkKey upper_prefix(chunk_prefix);
242  upper_prefix.push_back(std::numeric_limits<int>::max());
243  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
244  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;
245  ++meta_it) {
246  return true;
247  }
248  return false;
249 }
250 
252  CHECK(is_table_key(chunk_prefix));
253  auto timer = DEBUG_TIMER(__func__);
254  ChunkKey upper_prefix(chunk_prefix);
255  upper_prefix.push_back(std::numeric_limits<int>::max());
256  {
257  write_lock w_lock(chunks_mutex_);
258  // Delete chunks for prefix
259  auto end_it = cached_chunks_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
260  for (auto chunk_it = cached_chunks_.lower_bound(chunk_prefix); chunk_it != end_it;) {
261  chunk_it = eraseChunkByIterator(chunk_it);
262  }
263  }
264  {
265  write_lock w_lock(metadata_mutex_);
266  // Delete metadata for prefix
267  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
268  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;) {
269  meta_it = cached_metadata_.erase(meta_it);
270  }
271  }
272  global_file_mgr_->removeTableRelatedDS(chunk_prefix[0], chunk_prefix[1]);
273 }
274 
276  auto timer = DEBUG_TIMER(__func__);
277  std::set<ChunkKey> table_keys;
278  {
279  write_lock w_lock(chunks_mutex_);
280  for (auto chunk_it = cached_chunks_.begin(); chunk_it != cached_chunks_.end();) {
281  chunk_it = eraseChunkByIterator(chunk_it);
282  }
283  }
284  {
285  write_lock w_lock(metadata_mutex_);
286  for (auto meta_it = cached_metadata_.begin(); meta_it != cached_metadata_.end();) {
287  table_keys.emplace(ChunkKey{(*meta_it)[0], (*meta_it)[1]});
288  meta_it = cached_metadata_.erase(meta_it);
289  }
290  }
291  for (const auto& table_key : table_keys) {
292  global_file_mgr_->removeTableRelatedDS(table_key[0], table_key[1]);
293  }
294 }
295 
296 void ForeignStorageCache::setLimit(uint64_t limit) {
297  auto timer = DEBUG_TIMER(__func__);
298  write_lock w_lock(chunks_mutex_);
299  size_t file_size = global_file_mgr_->getDefaultPageSize() * MAX_FILE_N_PAGES;
300  if (limit < file_size) {
301  throw CacheTooSmallException("Could not create cache with size " + to_string(limit) +
302  ". Minimum cache size is " + to_string(file_size));
303  }
304  size_t max_num_files = (limit + (file_size - 1)) / file_size;
305  max_pages_per_table_ = max_num_files * MAX_FILE_N_PAGES;
306  for (auto& [table_key, tracker] : eviction_tracker_map_) {
307  auto& [eviction_alg, num_pages] = tracker;
308  while (num_pages > max_pages_per_table_) {
309  eraseChunk(eviction_alg->evictNextChunk(), tracker);
310  }
311  }
312  global_file_mgr_->checkpoint();
313  max_cached_bytes_ = limit;
314 }
315 
317  const ChunkKey& chunk_prefix) const {
318  read_lock r_lock(chunks_mutex_);
319  std::vector<ChunkKey> ret_vec;
321  [&ret_vec](auto chunk) { ret_vec.push_back(chunk); }, cached_chunks_, chunk_prefix);
322  return ret_vec;
323 }
324 
325 std::map<ChunkKey, AbstractBuffer*> ForeignStorageCache::getChunkBuffersForCaching(
326  const std::vector<ChunkKey>& chunk_keys) const {
327  auto timer = DEBUG_TIMER(__func__);
328  std::map<ChunkKey, AbstractBuffer*> chunk_buffer_map;
329  read_lock lock(chunks_mutex_);
330  for (const auto& chunk_key : chunk_keys) {
331  CHECK(cached_chunks_.find(chunk_key) == cached_chunks_.end());
332  CHECK(global_file_mgr_->isBufferOnDevice(chunk_key));
333  chunk_buffer_map[chunk_key] = global_file_mgr_->getBuffer(chunk_key);
334  CHECK(dynamic_cast<File_Namespace::FileBuffer*>(chunk_buffer_map[chunk_key]));
335  CHECK_EQ(chunk_buffer_map[chunk_key]->pageCount(), static_cast<size_t>(0));
336 
337  // Clear all buffer metadata
338  chunk_buffer_map[chunk_key]->resetToEmpty();
339  }
340  return chunk_buffer_map;
341 }
342 
343 // Private functions. Locks should be acquired in the public interface before calling
344 // these functions.
345 // This function assumes the chunk has been erased from the eviction algorithm already.
347  TableEvictionTracker& tracker) {
348  auto timer = DEBUG_TIMER(__func__);
349  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
350  return;
351  }
352  File_Namespace::FileBuffer* file_buffer =
353  static_cast<File_Namespace::FileBuffer*>(global_file_mgr_->getBuffer(chunk_key));
354  tracker.num_pages_ -= file_buffer->freeChunkPages();
355  cached_chunks_.erase(chunk_key);
356 }
357 
358 std::set<ChunkKey>::iterator ForeignStorageCache::eraseChunkByIterator(
359  const std::set<ChunkKey>::iterator& chunk_it) {
360  auto timer = DEBUG_TIMER(__func__);
361  const ChunkKey table_key = get_table_key(*chunk_it);
362  auto& [eviction_alg, num_pages] = eviction_tracker_map_.at(table_key);
363  eviction_alg->removeChunk(*chunk_it);
364  File_Namespace::FileBuffer* file_buffer =
365  static_cast<File_Namespace::FileBuffer*>(global_file_mgr_->getBuffer(*chunk_it));
366  num_pages -= file_buffer->freeChunkPages();
367  return cached_chunks_.erase(chunk_it);
368 }
369 
371  auto timer = DEBUG_TIMER(__func__);
372  std::string ret_string = "Cached chunks:\n";
373  for (const auto& chunk_key : cached_chunks_) {
374  ret_string += " " + show_chunk(chunk_key) + "\n";
375  }
376  return ret_string;
377 }
378 
380  auto timer = DEBUG_TIMER(__func__);
381  std::string ret_string = "Cached ChunkMetadata:\n";
382  for (const auto& meta_key : cached_metadata_) {
383  ret_string += " " + show_chunk(meta_key) + "\n";
384  }
385  return ret_string;
386 }
387 
389  std::string ret;
390  for (auto& [key, tracker] : eviction_tracker_map_) {
391  auto& [alg, num_pages] = tracker;
392  ret += "queue for table_key: " + show_chunk(key) + "\n" +
393  ((LRUEvictionAlgorithm*)alg.get())->dumpEvictionQueue();
394  }
395 
396  return ret;
397 }
398 
399 void ForeignStorageCache::validatePath(const std::string& base_path) const {
400  // check if base_path already exists, and if not create one
401  boost::filesystem::path path(base_path);
402  if (boost::filesystem::exists(path)) {
403  if (!boost::filesystem::is_directory(path)) {
404  throw std::runtime_error{
405  "cache path \"" + base_path +
406  "\" is not a directory. Please specify a valid directory "
407  "with --disk_cache_path=<path>, or use the default location."};
408  }
409  } else { // data directory does not exist
410  if (!boost::filesystem::create_directory(path)) {
411  throw std::runtime_error{
412  "could not create directory at cache path \"" + base_path +
413  "\". Please specify a valid directory location "
414  "with --disk_cache_path=<path> or use the default location."};
415  }
416  }
417 }
418 
420  const ChunkKey& table_prefix) const {
421  CHECK(table_prefix.size() >= 2);
422  auto fileMgr = dynamic_cast<File_Namespace::FileMgr*>(
423  getGlobalFileMgr()->getFileMgr(table_prefix));
424  CHECK(fileMgr);
425  return fileMgr->getFileMgrBasePath();
426 }
427 
429  const size_t chunk_size) {
430  size_t page_size = global_file_mgr_->getDefaultPageSize();
431  const ChunkKey table_key = get_table_key(chunk_key);
432  auto&& tracker_it = eviction_tracker_map_.find(table_key);
433  CHECK(tracker_it != eviction_tracker_map_.end());
434  auto& tracker = tracker_it->second;
435  // number of pages per chunk rounded up.
436  size_t num_pages_for_chunk = (chunk_size + (page_size - 1)) / page_size;
437  if (num_pages_for_chunk > max_pages_per_table_) {
438  // Can't fit the chunk in the cache, so bail.
439  return false;
440  }
441  while (tracker.num_pages_ + num_pages_for_chunk > max_pages_per_table_) {
442  eraseChunk(tracker.eviction_alg_->evictNextChunk(), tracker);
443  }
444 
445  tracker.eviction_alg_->touchChunk(chunk_key);
446  cached_chunks_.emplace(chunk_key);
447  tracker.num_pages_ += num_pages_for_chunk;
448  return true;
449 }
450 
452  CHECK(is_table_key(table_key));
453  if (eviction_tracker_map_.find(table_key) == eviction_tracker_map_.end()) {
454  eviction_tracker_map_.emplace(table_key, TableEvictionTracker{});
455  }
456 }
457 
459  const ChunkMetadataVector& metadata_vec,
460  const int frag_id) {
461  // Only re-cache last fragment and above
462  ChunkMetadataVector new_metadata_vec;
463  for (const auto& chunk_metadata : metadata_vec) {
464  if (chunk_metadata.first[CHUNK_KEY_FRAGMENT_IDX] >= frag_id) {
465  new_metadata_vec.push_back(chunk_metadata);
466  }
467  }
468  cacheMetadataVec(new_metadata_vec);
469 }
470 
471 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void validatePath(const std::string &) const
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::map< const ChunkKey, TableEvictionTracker > eviction_tracker_map_
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:55
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setNumElems(const size_t num_elems)
Definition: Encoder.h:215
std::string to_string(char const *&&v)
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:212
ForeignStorageCache(const DiskCacheConfig &config)
AbstractBuffer * getCachedChunkIfExists(const ChunkKey &)
std::string getCacheDirectoryForTablePrefix(const ChunkKey &) const
std::map< ChunkKey, AbstractBuffer * > getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:61
void iterate_over_matching_prefix(Func func, T &chunk_collection, const ChunkKey &chunk_prefix)
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
AbstractBufferMgr * getFileMgr(const int db_id, const int tb_id)
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
void cacheMetadataVec(const ChunkMetadataVector &)
std::set< ChunkKey >::iterator eraseChunk(const std::set< ChunkKey >::iterator &)
#define MAX_FILE_N_PAGES
Definition: File.h:26
bool isMetadataCached(const ChunkKey &) const
void evictThenEraseChunkUnlocked(const ChunkKey &)
void deleteBufferIfExists(const ChunkKey &chunk_key)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
void set_metadata_for_buffer(AbstractBuffer *buffer, ChunkMetadata *meta)
std::unique_ptr< File_Namespace::GlobalFileMgr > global_file_mgr_
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
void setSize(const size_t size)
mapd_shared_lock< mapd_shared_mutex > read_lock
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:65
std::set< ChunkKey >::iterator eraseChunkByIterator(const std::set< ChunkKey >::iterator &chunk_it)
bool insertChunkIntoEvictionAlg(const ChunkKey &, const size_t)
void createTrackerMapEntryIfNoneExists(const ChunkKey &chunk_key)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
mapd_unique_lock< mapd_shared_mutex > write_lock
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
SQLTypeInfo sqlType
Definition: ChunkMetadata.h:32
A selection of helper methods for File I/O.
void cacheChunk(const ChunkKey &, AbstractBuffer *)
size_t numElements
Definition: ChunkMetadata.h:34
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31