OmniSciDB  4201147b46
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CachingForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/filesystem.hpp>
20 
21 #include "Catalog/Catalog.h"
22 #include "Catalog/ForeignTable.h"
25 #include "ForeignTableSchema.h"
26 #ifdef ENABLE_IMPORT_PARQUET
27 #include "ParquetDataWrapper.h"
28 #endif
29 #include "Shared/distributed.h"
30 
31 namespace foreign_storage {
32 
33 namespace {
34 constexpr int64_t MAX_REFRESH_TIME_IN_SECONDS = 60 * 60;
35 } // namespace
36 
38  : ForeignStorageMgr(), disk_cache_(cache) {
40 }
41 
43  ForeignDataWrapper& data_wrapper,
44  ChunkToBufferMap& required_buffers,
45  ChunkToBufferMap& optional_buffers) {
46  CHECK_GT(required_buffers.size(), 0U) << "Must populate at least one buffer";
47  try {
48  ChunkSizeValidator chunk_size_validator(required_buffers.begin()->first);
49  data_wrapper.populateChunkBuffers(required_buffers, optional_buffers);
50  chunk_size_validator.validateChunkSizes(required_buffers);
51  chunk_size_validator.validateChunkSizes(optional_buffers);
52  updateFragmenterMetadata(required_buffers);
53  updateFragmenterMetadata(optional_buffers);
54  } catch (const std::runtime_error& error) {
55  // clear any partially loaded but failed chunks (there may be some
56  // fully-loaded chunks as well but they will be cleared conservatively
57  // anyways)
58  for (const auto& [chunk_key, buffer] : required_buffers) {
59  if (auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
60  file_buffer->freeChunkPages();
61  }
62  }
63  for (const auto& [chunk_key, buffer] : optional_buffers) {
64  if (auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
65  file_buffer->freeChunkPages();
66  }
67  }
68 
69  throw ForeignStorageException(error.what());
70  }
71  // All required buffers should be from the same table.
72  auto [db, tb] = get_table_prefix(required_buffers.begin()->first);
73  disk_cache_->checkpoint(db, tb);
74 }
75 
77  AbstractBuffer* destination_buffer,
78  const size_t num_bytes) {
79  ChunkSizeValidator chunk_size_validator(chunk_key);
80  if (is_system_table_chunk_key(chunk_key)) {
81  ForeignStorageMgr::fetchBuffer(chunk_key, destination_buffer, num_bytes);
82  chunk_size_validator.validateChunkSize(destination_buffer);
83  return;
84  }
85  CHECK(destination_buffer);
86  CHECK(!destination_buffer->isDirty());
87 
89  if (buffer) {
90  chunk_size_validator.validateChunkSize(buffer);
91  buffer->copyTo(destination_buffer, num_bytes);
92  return;
93  } else {
94  auto required_size = getRequiredBuffersSize(chunk_key);
95  if (required_size > maxFetchSize(chunk_key[CHUNK_KEY_DB_IDX])) {
96  // If we don't have space in the cache then skip the caching.
97  ForeignStorageMgr::fetchBuffer(chunk_key, destination_buffer, num_bytes);
98  return;
99  }
100 
101  auto column_keys = get_column_key_set(chunk_key);
102 
103  // Use hints to prefetch other chunks in fragment into cache
104  auto& data_wrapper = *getDataWrapper(chunk_key);
105  auto optional_set = getOptionalChunkKeySet(
106  chunk_key, column_keys, data_wrapper.getCachedParallelismLevel());
107 
108  // Remove any chunks that are already cached.
109  // TODO(Misiu): Change to use std::erase_if when we get c++20
110  for (auto it = optional_set.begin(); it != optional_set.end();) {
111  if (disk_cache_->getCachedChunkIfExists(*it) != nullptr) {
112  it = optional_set.erase(it);
113  } else {
114  ++it;
115  }
116  }
117 
118  auto optional_buffers = disk_cache_->getChunkBuffersForCaching(optional_set);
119  auto required_buffers = disk_cache_->getChunkBuffersForCaching(column_keys);
120  CHECK(required_buffers.find(chunk_key) != required_buffers.end());
121  populateChunkBuffersSafely(data_wrapper, required_buffers, optional_buffers);
122 
123  AbstractBuffer* buffer = required_buffers.at(chunk_key);
124  CHECK(buffer);
125  buffer->copyTo(destination_buffer, num_bytes);
126  }
127 }
128 
130  ChunkMetadataVector& chunk_metadata,
131  const ChunkKey& key_prefix) {
132  if (is_system_table_chunk_key(key_prefix)) {
133  ForeignStorageMgr::getChunkMetadataVecForKeyPrefix(chunk_metadata, key_prefix);
134  return;
135  }
136  CHECK(has_table_prefix(key_prefix));
137  // If the disk has any cached metadata for a prefix then it is guaranteed to have all
138  // metadata for that table, so we can return a complete set. If it has no metadata,
139  // then it may be that the table has no data, or that it's just not cached, so we need
140  // to go to storage to check.
141  if (disk_cache_->hasCachedMetadataForKeyPrefix(key_prefix)) {
142  disk_cache_->getCachedMetadataVecForKeyPrefix(chunk_metadata, key_prefix);
143 
144  // Assert all metadata in cache is mapped to this leaf node in distributed.
145  if (is_shardable_key(key_prefix)) {
146  for (auto& [key, meta] : chunk_metadata) {
147  CHECK(fragment_maps_to_leaf(key)) << show_chunk(key);
148  }
149  }
150 
151  // If the data in cache was restored from disk then it is possible that the wrapper
152  // does not exist yet. In this case the wrapper will be restored from disk if
153  // possible.
154  createDataWrapperIfNotExists(key_prefix);
155  return;
156  } else if (dist::is_distributed() &&
158  key_prefix[CHUNK_KEY_TABLE_IDX])) {
159  // In distributed mode, it is possible to have all the chunk metadata filtered out for
160  // this node, after previously getting the chunk metadata from the wrapper and caching
161  // the wrapper metadata. In this case, return immediately and avoid doing a redundant
162  // metadata scan.
163  return;
164  }
165 
166  // If we have no cached data then either the data was evicted, was never populated, or
167  // the data for the table is an empty set (no chunks). In case we are hitting the first
168  // two, we should repopulate the data wrapper so just do it in all cases.
169  auto table_key = get_table_key(key_prefix);
170  eraseDataWrapper(table_key);
171  createDataWrapperIfNotExists(table_key);
172 
173  getChunkMetadataVecFromDataWrapper(chunk_metadata, key_prefix);
174  disk_cache_->cacheMetadataVec(chunk_metadata);
175 }
176 
178  ChunkMetadataVector& chunk_metadata,
179  const ChunkKey& chunk_key_prefix) {
180  CHECK(has_table_prefix(chunk_key_prefix));
181  auto [db_id, tb_id] = get_table_prefix(chunk_key_prefix);
182  try {
183  ForeignStorageMgr::getChunkMetadataVecForKeyPrefix(chunk_metadata, chunk_key_prefix);
184  } catch (...) {
185  clearTable({db_id, tb_id});
186  throw;
187  }
188  // If the table was disabled then we will have no wrapper to serialize.
189  if (is_table_enabled_on_node(chunk_key_prefix)) {
190  auto doc = getDataWrapper(chunk_key_prefix)->getSerializedDataWrapper();
191  disk_cache_->storeDataWrapper(doc, db_id, tb_id);
192 
193  // If the wrapper populated buffers we want that action to be checkpointed.
194  disk_cache_->checkpoint(db_id, tb_id);
195  }
196 }
197 
199  const bool evict_cached_entries) {
200  CHECK(is_table_key(table_key));
203  if (evict_cached_entries) {
204  clearTable(table_key);
205  } else {
206  refreshTableInCache(table_key);
207  }
208 }
209 
211  CHECK(is_table_key(table_key));
212 
213  // Preserve the list of which chunks were cached per table to refresh after clear.
214  std::vector<ChunkKey> old_chunk_keys =
216 
217  // Assert all data in cache is mapped to this leaf node in distributed.
218  if (is_shardable_key(table_key)) {
219  for (auto& key : old_chunk_keys) {
220  CHECK(fragment_maps_to_leaf(key)) << show_chunk(key);
221  }
222  }
223 
224  auto append_mode = is_append_table_chunk_key(table_key);
225 
226  append_mode ? refreshAppendTableInCache(table_key, old_chunk_keys)
227  : refreshNonAppendTableInCache(table_key, old_chunk_keys);
228 }
229 
231  CHECK(is_table_key(key));
232  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
233  // May not be created yet
234  if (data_wrapper_map_.find(key) != data_wrapper_map_.end()) {
235  auto [db, tb] = get_table_prefix(key);
236  // Need to erase serialized version on disk if it exists so we don't accidentally
237  // recover it after deleting.
238  boost::filesystem::remove_all(disk_cache_->getSerializedWrapperPath(db, tb));
239  data_wrapper_map_.erase(key);
240  }
241 }
242 
244  disk_cache_->clearForTablePrefix(table_key);
246 }
247 
249  // Determine last fragment ID
250  int last_frag_id = 0;
251  if (disk_cache_->hasCachedMetadataForKeyPrefix(table_key)) {
252  ChunkMetadataVector cached_metadata;
253  disk_cache_->getCachedMetadataVecForKeyPrefix(cached_metadata, table_key);
254  for (const auto& [key, metadata] : cached_metadata) {
255  last_frag_id = std::max(last_frag_id, key[CHUNK_KEY_FRAGMENT_IDX]);
256  }
257  }
258  return last_frag_id;
259 }
260 
262  const ChunkKey& table_key,
263  const std::vector<ChunkKey>& old_chunk_keys) {
264  CHECK(is_table_key(table_key));
265  int last_frag_id = getHighestCachedFragId(table_key);
266 
267  ChunkMetadataVector storage_metadata;
268  getChunkMetadataVecFromDataWrapper(storage_metadata, table_key);
269  try {
270  disk_cache_->cacheMetadataVec(storage_metadata);
271  refreshChunksInCacheByFragment(old_chunk_keys, last_frag_id);
272  } catch (std::runtime_error& e) {
274  }
275 }
276 
278  const ChunkKey& table_key,
279  const std::vector<ChunkKey>& old_chunk_keys) {
280  CHECK(is_table_key(table_key));
281  ChunkMetadataVector storage_metadata;
282  clearTable(table_key);
283  getChunkMetadataVecFromDataWrapper(storage_metadata, table_key);
284 
285  try {
286  disk_cache_->cacheMetadataVec(storage_metadata);
287  refreshChunksInCacheByFragment(old_chunk_keys, 0);
288  } catch (std::runtime_error& e) {
290  }
291 }
292 
294  const std::vector<ChunkKey>& old_chunk_keys,
295  int start_frag_id) {
296  int64_t total_time{0};
297  auto fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
298 
299  if (old_chunk_keys.empty()) {
300  return;
301  }
302  // Iterate through previously cached chunks and re-cache them. Caching is
303  // done one fragment at a time, for all applicable chunks in the fragment.
304  ChunkToBufferMap optional_buffers;
305  std::set<ChunkKey> chunk_keys_to_be_cached;
306  auto fragment_id = old_chunk_keys[0][CHUNK_KEY_FRAGMENT_IDX];
307  const ChunkKey table_key{get_table_key(old_chunk_keys[0])};
308  std::set<ChunkKey> chunk_keys_in_fragment;
309  for (const auto& chunk_key : old_chunk_keys) {
310  CHECK(chunk_key[CHUNK_KEY_TABLE_IDX] == table_key[CHUNK_KEY_TABLE_IDX]);
311  if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] < start_frag_id) {
312  continue;
313  }
314  if (disk_cache_->isMetadataCached(chunk_key)) {
315  if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] != fragment_id) {
316  if (chunk_keys_in_fragment.size() > 0) {
317  auto required_buffers =
318  disk_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
320  *getDataWrapper(table_key), required_buffers, optional_buffers);
321  chunk_keys_in_fragment.clear();
322  }
323  // At this point, cache buffers for refreshable chunks in the last fragment
324  // have been populated. Exit if the max refresh time has been exceeded.
325  // Otherwise, move to the next fragment.
326  auto current_time = std::chrono::high_resolution_clock::now();
327  total_time += std::chrono::duration_cast<std::chrono::seconds>(
328  current_time - fragment_refresh_start_time)
329  .count();
330  if (total_time >= MAX_REFRESH_TIME_IN_SECONDS) {
331  LOG(WARNING) << "Refresh time exceeded for table key: { " << table_key[0]
332  << ", " << table_key[1] << " } after fragment id: " << fragment_id;
333  break;
334  } else {
335  fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
336  }
337  fragment_id = chunk_key[CHUNK_KEY_FRAGMENT_IDX];
338  }
339  // Key may have been cached during scan
340  if (disk_cache_->getCachedChunkIfExists(chunk_key) == nullptr) {
341  if (is_varlen_key(chunk_key)) {
342  CHECK(is_varlen_data_key(chunk_key));
343  ChunkKey index_chunk_key{chunk_key[CHUNK_KEY_DB_IDX],
344  chunk_key[CHUNK_KEY_TABLE_IDX],
345  chunk_key[CHUNK_KEY_COLUMN_IDX],
346  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
347  2};
348  chunk_keys_in_fragment.emplace(index_chunk_key);
349  chunk_keys_to_be_cached.emplace(index_chunk_key);
350  }
351  chunk_keys_in_fragment.emplace(chunk_key);
352  chunk_keys_to_be_cached.emplace(chunk_key);
353  }
354  }
355  }
356  if (chunk_keys_in_fragment.size() > 0) {
357  auto required_buffers =
358  disk_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
360  *getDataWrapper(table_key), required_buffers, optional_buffers);
361  }
362 }
363 
365  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
366  ChunkKey table_key = get_table_key(chunk_key);
367  auto data_wrapper_it = data_wrapper_map_.find(table_key);
368  if (data_wrapper_it != data_wrapper_map_.end()) {
369  return false;
370  }
371  auto [db, tb] = get_table_prefix(chunk_key);
373  auto wrapper_file = disk_cache_->getSerializedWrapperPath(db, tb);
374  if (boost::filesystem::exists(wrapper_file)) {
375  ChunkMetadataVector chunk_metadata;
376  disk_cache_->getCachedMetadataVecForKeyPrefix(chunk_metadata, table_key);
377  data_wrapper_map_.at(table_key)->restoreDataWrapperInternals(
378  disk_cache_->getSerializedWrapperPath(db, tb), chunk_metadata);
379  }
380  return true;
381 }
382 
383 void CachingForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
384  disk_cache_->clearForTablePrefix({db_id, table_id});
386 }
387 
388 size_t CachingForeignStorageMgr::maxFetchSize(int32_t db_id) const {
390 }
391 
393  return true;
394 }
395 
397  auto key_set = get_column_key_set(chunk_key);
398  size_t total_size = 0U;
399  for (const auto& key : key_set) {
400  total_size += getBufferSize(key);
401  }
402  return total_size;
403 }
404 
406  const ChunkKey& chunk_key,
407  const std::set<ChunkKey, decltype(set_comp)*>& same_fragment_keys,
408  const std::set<ChunkKey, decltype(set_comp)*>& diff_fragment_keys) const {
409  std::set<ChunkKey> optional_keys;
410  auto total_chunk_size = getRequiredBuffersSize(chunk_key);
411  auto max_size = maxFetchSize(chunk_key[CHUNK_KEY_DB_IDX]);
412  // Add keys to the list of optional keys starting with the same fragment. If we run out
413  // of space, then exit early with what we have added so far.
414  for (const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
415  for (const auto& key : keys) {
416  auto column_keys = get_column_key_set(key);
417  for (const auto& column_key : column_keys) {
418  total_chunk_size += getBufferSize(column_key);
419  }
420  // Early exist if we exceed the size limit.
421  if (total_chunk_size > max_size) {
422  return optional_keys;
423  }
424  for (const auto& column_key : column_keys) {
425  optional_keys.emplace(column_key);
426  }
427  }
428  }
429  return optional_keys;
430 }
431 
433  size_t num_bytes = 0;
434  ChunkMetadataVector meta;
436  CHECK_EQ(meta.size(), 1U) << show_chunk(key);
437  auto metadata = meta.begin()->second;
438 
439  if (is_varlen_key(key)) {
440  if (is_varlen_data_key(key)) {
441  num_bytes = get_max_chunk_size(key);
442  } else {
443  num_bytes = (metadata->sqlType.is_string())
444  ? sizeof(StringOffsetT) * (metadata->numElements + 1)
445  : sizeof(ArrayOffsetT) * (metadata->numElements + 1);
446  }
447  } else {
448  num_bytes = metadata->numBytes;
449  }
450  return num_bytes;
451 }
452 
453 } // namespace foreign_storage
std::lock_guard< T > lock_guard
bool set_comp(const ChunkKey &left, const ChunkKey &right)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
void refreshAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
std::vector< int > ChunkKey
Definition: types.h:36
bool isMetadataCached(const ChunkKey &) const
size_t getBufferSize(const ChunkKey &key) const
bool is_system_table_chunk_key(const ChunkKey &chunk_key)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:75
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
#define LOG(tag)
Definition: Logger.h:216
void storeDataWrapper(const std::string &doc, int32_t db_id, int32_t tb_id)
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
size_t maxFetchSize(int32_t db_id) const override
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
bool is_append_table_chunk_key(const ChunkKey &chunk_key)
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
void removeTableRelatedDS(const int db_id, const int table_id) override
#define CHECK_GT(x, y)
Definition: Logger.h:234
void refreshTableInCache(const ChunkKey &table_key)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:57
std::string show_chunk(const ChunkKey &key)
Definition: types.h:98
std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const override
int32_t StringOffsetT
Definition: sqltypes.h:1113
bool is_table_enabled_on_node(const ChunkKey &key)
void eraseDataWrapper(const ChunkKey &key) override
void getChunkMetadataVecFromDataWrapper(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix)
This file contains the class specification and related data structures for Catalog.
void refreshNonAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
bool hasStoredDataWrapperMetadata(int32_t db_id, int32_t table_id) const
void updateFragmenterMetadata(const ChunkToBufferMap &) const
std::set< ChunkKey > getOptionalChunkKeySet(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
void refreshChunksInCacheByFragment(const std::vector< ChunkKey > &old_chunk_keys, int last_frag_id)
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key) override
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:48
An AbstractBuffer is a unit of data management for a data manager.
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer=nullptr)=0
void populateChunkBuffersSafely(ForeignDataWrapper &data_wrapper, ChunkToBufferMap &required_buffers, ChunkToBufferMap &optional_buffers)
ChunkToBufferMap getChunkBuffersForCaching(const std::set< ChunkKey > &chunk_keys) const
void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries) override
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
std::string getSerializedWrapperPath(int32_t db_id, int32_t tb_id) const
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
void cacheMetadataVec(const ChunkMetadataVector &)
size_t get_max_chunk_size(const ChunkKey &key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
int32_t ArrayOffsetT
Definition: sqltypes.h:1114
virtual void eraseDataWrapper(const ChunkKey &table_key)
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
void checkpoint(const int32_t db_id, const int32_t tb_id)
void createDataWrapperUnlocked(int32_t db, int32_t tb)
#define CHECK(condition)
Definition: Logger.h:222
File_Namespace::FileBuffer * getCachedChunkIfExists(const ChunkKey &)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
void removeTableRelatedDS(const int db_id, const int table_id) override
size_t getRequiredBuffersSize(const ChunkKey &chunk_key) const
ChunkKey get_fragment_key(const ChunkKey &key)
Definition: types.h:90
bool is_distributed()
Definition: distributed.cpp:21
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:71
int getHighestCachedFragId(const ChunkKey &table_key)