OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignStorageMgr.h"
18 
20 #include "Catalog/Catalog.h"
24 #include "ForeignTableSchema.h"
25 #include "Shared/distributed.h"
26 
27 extern bool g_enable_fsi;
28 extern bool g_enable_s3_fsi;
29 
30 namespace {
31 void filter_metadata_by_leaf(ChunkMetadataVector& meta_vec, const ChunkKey& key_prefix) {
32  if (!foreign_storage::is_shardable_key(key_prefix)) {
33  return;
34  }
35  for (auto it = meta_vec.begin(); it != meta_vec.end();) {
36  it = foreign_storage::fragment_maps_to_leaf(it->first) ? std::next(it)
37  : meta_vec.erase(it);
38  }
39 }
40 } // namespace
41 
42 namespace foreign_storage {
43 ForeignStorageMgr::ForeignStorageMgr() : AbstractBufferMgr(0), data_wrapper_map_({}) {}
44 
46  const size_t num_bytes) {
47  UNREACHABLE();
48  return nullptr; // Added to avoid "no return statement" compiler warning
49 }
50 
52  CHECK(has_table_prefix(chunk_key));
53  auto catalog =
55  CHECK(catalog);
56  auto foreign_table = catalog->getForeignTable(chunk_key[CHUNK_KEY_TABLE_IDX]);
57  auto storage_type_entry = foreign_table->foreign_server->options.find(
59 
60  if (storage_type_entry == foreign_table->foreign_server->options.end()) {
61  // Some FSI servers such as ODBC do not have a storage_type
62  return;
63  }
64  bool is_s3_storage_type =
65  (storage_type_entry->second == AbstractFileStorageDataWrapper::S3_STORAGE_TYPE);
66  if (is_s3_storage_type) {
67  throw ForeignStorageException{
68  "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
69  "currently disabled."};
70  }
71 }
72 
74  int table_id = chunk_key[CHUNK_KEY_TABLE_IDX];
75  column_id_ = chunk_key[CHUNK_KEY_COLUMN_IDX];
76  catalog_ =
77  Catalog_Namespace::SysCatalog::instance().getCatalog(chunk_key[CHUNK_KEY_DB_IDX]);
78  column_ = catalog_->getMetadataForColumn(table_id, column_id_);
79  foreign_table_ = catalog_->getForeignTable(table_id);
81 }
82 
83 void ChunkSizeValidator::validateChunkSize(const AbstractBuffer* buffer) const {
84  CHECK(buffer);
85  int64_t actual_chunk_size = buffer->size();
86  if (actual_chunk_size > max_chunk_size_) {
87  throwChunkSizeViolatedError(actual_chunk_size);
88  }
89 }
90 
92  for (const auto& [chunk_key, buffer] : buffers) {
93  int64_t actual_chunk_size = buffer->size();
94  if (actual_chunk_size > max_chunk_size_) {
95  throwChunkSizeViolatedError(actual_chunk_size, chunk_key[CHUNK_KEY_COLUMN_IDX]);
96  }
97  }
98 }
99 
100 void ChunkSizeValidator::throwChunkSizeViolatedError(const int64_t actual_chunk_size,
101  const int column_id) const {
102  std::string column_name = column_->columnName;
103  if (column_id > 0) {
104  column_name =
105  catalog_->getMetadataForColumn(foreign_table_->tableId, column_id)->columnName;
106  }
107  std::stringstream error_stream;
108  error_stream << "Chunk populated by data wrapper which is " << actual_chunk_size
109  << " bytes exceeds maximum byte size limit of " << max_chunk_size_ << "."
110  << " Foreign table: " << foreign_table_->tableName
111  << ", column name : " << column_name;
112  throw ForeignStorageException(error_stream.str());
113 }
114 
115 void ForeignStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
116  AbstractBuffer* destination_buffer,
117  const size_t num_bytes) {
118  ChunkSizeValidator chunk_size_validator(chunk_key);
119 
120  checkIfS3NeedsToBeEnabled(chunk_key);
121  CHECK(destination_buffer);
122  CHECK(!destination_buffer->isDirty());
123  // Use a temp buffer if we have no cache buffers and have one mapped for this chunk.
124  if (fetchBufferIfTempBufferMapEntryExists(chunk_key, destination_buffer, num_bytes)) {
125  chunk_size_validator.validateChunkSize(destination_buffer);
126  return;
127  }
128 
129  { // Clear any temp buffers if we've moved on to a new fragment
130  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
131  if (temp_chunk_buffer_map_.size() > 0 &&
133  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
135  }
136  }
137 
138  auto column_keys = get_column_key_set(chunk_key);
139 
140  // Use hints to prefetch other chunks in fragment
141  ChunkToBufferMap optional_buffers;
142 
143  // Use hints to prefetch other chunks in fragment into cache
144  auto optional_keys = getOptionalChunkKeySetAndNormalizeCache(
145  chunk_key, column_keys, getDataWrapper(chunk_key)->getNonCachedParallelismLevel());
146  if (optional_keys.size()) {
147  optional_buffers = allocateTempBuffersForChunks(optional_keys);
148  }
149 
150  // Remove the original key as it will be replaced by the destination_buffer.
151  column_keys.erase(chunk_key);
152  auto required_buffers = allocateTempBuffersForChunks(column_keys);
153  required_buffers[chunk_key] = destination_buffer;
154  // populate will write directly to destination_buffer so no need to copy.
155  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
156  chunk_size_validator.validateChunkSizes(required_buffers);
157  chunk_size_validator.validateChunkSizes(optional_buffers);
158  updateFragmenterMetadata(required_buffers);
159  updateFragmenterMetadata(optional_buffers);
160 }
161 
163  for (const auto& [key, buffer] : buffers) {
164  auto catalog =
165  Catalog_Namespace::SysCatalog::instance().getCatalog(key[CHUNK_KEY_DB_IDX]);
166  auto column = catalog->getMetadataForColumn(key[CHUNK_KEY_TABLE_IDX],
167  key[CHUNK_KEY_COLUMN_IDX]);
168  if (column->columnType.is_varlen_indeed() &&
169  key[CHUNK_KEY_VARLEN_IDX] == 2) { // skip over index buffers
170  continue;
171  }
172  auto foreign_table = catalog->getForeignTable(key[CHUNK_KEY_TABLE_IDX]);
173  auto fragmenter = foreign_table->fragmenter;
174  if (!fragmenter) {
175  continue;
176  }
177  auto encoder = buffer->getEncoder();
178  CHECK(encoder);
179  auto chunk_metadata = std::make_shared<ChunkMetadata>();
180  encoder->getMetadata(chunk_metadata);
181  fragmenter->updateColumnChunkMetadata(
182  column, key[CHUNK_KEY_FRAGMENT_IDX], chunk_metadata);
183  }
184 }
185 
187  const ChunkKey& chunk_key,
188  AbstractBuffer* destination_buffer,
189  size_t num_bytes) {
190  AbstractBuffer* buffer{nullptr};
191  {
192  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
193  if (temp_chunk_buffer_map_.find(chunk_key) == temp_chunk_buffer_map_.end()) {
194  return false;
195  }
196  buffer = temp_chunk_buffer_map_[chunk_key].get();
197  }
198  // For the index key, calls with size 0 get 1 added as
199  // empty index buffers start with one entry
200  // Set to 0 here to copy entire buffer
201  if (is_varlen_index_key(chunk_key) && (num_bytes == sizeof(StringOffsetT))) {
202  num_bytes = 0;
203  }
204  CHECK(buffer);
205  buffer->copyTo(destination_buffer, num_bytes);
206  {
207  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
208  temp_chunk_buffer_map_.erase(chunk_key);
209  }
210  return true;
211 }
212 
214  ChunkMetadataVector& chunk_metadata,
215  const ChunkKey& key_prefix) {
216  if (!g_enable_fsi) {
217  throw ForeignStorageException{
218  "Query cannot be executed for foreign table because FSI is currently disabled."};
219  }
220  CHECK(is_table_key(key_prefix));
221 
222  if (!is_table_enabled_on_node(key_prefix)) {
223  // If the table is not enabled for this node then the request should do nothing.
224  return;
225  }
226 
227  checkIfS3NeedsToBeEnabled(key_prefix);
228  createDataWrapperIfNotExists(key_prefix);
229 
230  try {
231  getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
232  filter_metadata_by_leaf(chunk_metadata, key_prefix);
233  } catch (...) {
234  eraseDataWrapper(key_prefix);
235  throw;
236  }
237 }
238 
239 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
240  const ChunkKey table_key{db_id, table_id};
241  {
242  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
243  if (auto mock_it = mocked_wrapper_map_.find(table_key);
244  mock_it != mocked_wrapper_map_.end()) {
245  mock_it->second->unsetParentWrapper();
246  }
247  data_wrapper_map_.erase(table_key);
248  }
250 }
251 
253  return FOREIGN_STORAGE_MGR;
254 }
255 
257  return ToString(FOREIGN_STORAGE_MGR);
258 }
259 
260 bool ForeignStorageMgr::hasDataWrapperForChunk(const ChunkKey& chunk_key) const {
261  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
262  CHECK(has_table_prefix(chunk_key));
263  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
264  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
265 }
266 
267 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
268  const ChunkKey& chunk_key) const {
269  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
270  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
271  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
272  return data_wrapper_map_.at(table_key);
273 }
274 
276  const ChunkKey& table_key,
277  std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
278  CHECK(is_table_key(table_key));
279  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
280  if (auto wrapper_iter = data_wrapper_map_.find(table_key);
281  wrapper_iter != data_wrapper_map_.end()) {
282  data_wrapper->setParentWrapper(wrapper_iter->second);
283  data_wrapper_map_[table_key] = data_wrapper;
284  }
285  // If a wrapper does not yet exist, then delay setting the mock until we actually
286  // create the wrapper. Preserve mock wrappers separately so they can persist the parent
287  // being re-created.
288  mocked_wrapper_map_[table_key] = data_wrapper;
289 }
290 
291 void ForeignStorageMgr::createDataWrapperUnlocked(int32_t db_id, int32_t tb_id) {
292  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
293  CHECK(catalog);
294  auto foreign_table = catalog->getForeignTable(tb_id);
295  ChunkKey table_key{db_id, tb_id};
297  foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
298 
299  // If we are testing with mocks, then we want to re-wrap new wrappers with mocks if a
300  // table was given a mock wrapper earlier and destroyed.
301  if (auto mock_it = mocked_wrapper_map_.find(table_key);
302  mock_it != mocked_wrapper_map_.end()) {
303  mock_it->second->setParentWrapper(data_wrapper_map_.at(table_key));
304  data_wrapper_map_[table_key] = mock_it->second;
305  }
306 }
307 
309  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
310  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
311  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
312  auto [db_id, tb_id] = get_table_prefix(chunk_key);
313  createDataWrapperUnlocked(db_id, tb_id);
314  return true;
315  }
316  return false;
317 }
318 
319 void ForeignStorageMgr::refreshTable(const ChunkKey& table_key,
320  const bool evict_cached_entries) {
321  auto catalog =
322  Catalog_Namespace::SysCatalog::instance().getCatalog(table_key[CHUNK_KEY_DB_IDX]);
323  CHECK(catalog);
324  // Clear datawrapper unless table is non-append and evict is false
325  if (evict_cached_entries ||
326  !catalog->getForeignTable(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode()) {
327  eraseDataWrapper(table_key);
328  }
329 }
330 
331 void ForeignStorageMgr::eraseDataWrapper(const ChunkKey& table_key) {
332  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
333  // May not be created yet
334  if (data_wrapper_map_.find(table_key) != data_wrapper_map_.end()) {
335  data_wrapper_map_.erase(table_key);
336  }
337 }
338 
340  const ChunkKey& table_key) {
341  CHECK(is_table_key(table_key));
342  auto start_it = temp_chunk_buffer_map_.lower_bound(table_key);
343  ChunkKey upper_bound_prefix{table_key[CHUNK_KEY_DB_IDX],
344  table_key[CHUNK_KEY_TABLE_IDX],
345  std::numeric_limits<int>::max()};
346  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
347  temp_chunk_buffer_map_.erase(start_it, end_it);
348 }
349 
351  const ChunkKey& table_key) {
352  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
354 }
355 
356 bool ForeignStorageMgr::isDatawrapperRestored(const ChunkKey& chunk_key) {
357  if (!hasDataWrapperForChunk(chunk_key)) {
358  return false;
359  }
360  return getDataWrapper(chunk_key)->isRestored();
361 }
362 
363 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
364  UNREACHABLE();
365 }
366 
367 void ForeignStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
368  const bool purge) {
369  UNREACHABLE();
370 }
371 
372 bool ForeignStorageMgr::isBufferOnDevice(const ChunkKey& chunk_key) {
373  UNREACHABLE();
374  return false; // Added to avoid "no return statement" compiler warning
375 }
376 
378  UNREACHABLE();
379  return 0; // Added to avoid "no return statement" compiler warning
380 }
381 
383  const size_t page_size,
384  const size_t initial_size) {
385  UNREACHABLE();
386  return nullptr; // Added to avoid "no return statement" compiler warning
387 }
388 
390  AbstractBuffer* source_buffer,
391  const size_t num_bytes) {
392  UNREACHABLE();
393  return nullptr; // Added to avoid "no return statement" compiler warning
394 }
395 
396 std::string ForeignStorageMgr::printSlabs() {
397  UNREACHABLE();
398  return {}; // Added to avoid "no return statement" compiler warning
399 }
400 
402  UNREACHABLE();
403  return 0; // Added to avoid "no return statement" compiler warning
404 }
405 
407  UNREACHABLE();
408  return 0; // Added to avoid "no return statement" compiler warning
409 }
410 
412  UNREACHABLE();
413  return 0; // Added to avoid "no return statement" compiler warning
414 }
415 
417  UNREACHABLE();
418  return false; // Added to avoid "no return statement" compiler warning
419 }
420 
422  UNREACHABLE();
423 }
424 
425 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
426  UNREACHABLE();
427 }
428 
429 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
430  UNREACHABLE();
431  return nullptr; // Added to avoid "no return statement" compiler warning
432 }
433 
435  UNREACHABLE();
436 }
437 
438 size_t get_max_chunk_size(const ChunkKey& key) {
439  auto [db_id, table_id] = get_table_prefix(key);
440  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
441  CHECK(catalog);
442  return catalog->getForeignTable(table_id)->maxChunkSize;
443 }
444 
445 std::set<ChunkKey> get_column_key_set(const ChunkKey& destination_chunk_key) {
446  std::set<ChunkKey> chunk_keys;
447  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
448  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
449  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
450  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
451  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
452  CHECK(catalog);
453  auto foreign_table = catalog->getForeignTable(table_id);
454 
455  ForeignTableSchema schema{db_id, foreign_table};
456  auto logical_column = schema.getLogicalColumn(destination_column_id);
457  auto logical_column_id = logical_column->columnId;
458 
459  for (auto column_id = logical_column_id;
460  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
461  column_id++) {
462  auto column = schema.getColumnDescriptor(column_id);
463  if (column->columnType.is_varlen_indeed()) {
464  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
465  chunk_keys.emplace(data_chunk_key);
466 
467  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
468  chunk_keys.emplace(index_chunk_key);
469  } else {
470  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
471  chunk_keys.emplace(data_chunk_key);
472  }
473  }
474  return chunk_keys;
475 }
476 
477 std::vector<ChunkKey> get_column_key_vec(const ChunkKey& destination_chunk_key) {
478  std::vector<ChunkKey> chunk_keys;
479  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
480  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
481  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
482  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
483  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
484  CHECK(catalog);
485  auto foreign_table = catalog->getForeignTable(table_id);
486 
487  ForeignTableSchema schema{db_id, foreign_table};
488  auto logical_column = schema.getLogicalColumn(destination_column_id);
489  auto logical_column_id = logical_column->columnId;
490 
491  for (auto column_id = logical_column_id;
492  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
493  column_id++) {
494  auto column = schema.getColumnDescriptor(column_id);
495  if (column->columnType.is_varlen_indeed()) {
496  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
497  chunk_keys.emplace_back(data_chunk_key);
498 
499  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
500  chunk_keys.emplace_back(index_chunk_key);
501  } else {
502  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
503  chunk_keys.emplace_back(data_chunk_key);
504  }
505  }
506  return chunk_keys;
507 }
508 
509 // Defines the "<" operator to use as a comparator.
510 // This is similar to comparing chunks normally, but we want to give fragments a higher
511 // priority than columns so that if we have to exit early we prioritize same fragment
512 // fetching.
513 bool set_comp(const ChunkKey& left, const ChunkKey& right) {
514  CHECK_GE(left.size(), 4ULL);
515  CHECK_GE(right.size(), 4ULL);
516  if ((left[CHUNK_KEY_DB_IDX] < right[CHUNK_KEY_DB_IDX]) ||
517  (left[CHUNK_KEY_TABLE_IDX] < right[CHUNK_KEY_TABLE_IDX]) ||
519  (left[CHUNK_KEY_COLUMN_IDX] < right[CHUNK_KEY_COLUMN_IDX])) {
520  return true;
521  }
522  if (left.size() < right.size()) {
523  return true;
524  }
525  if (is_varlen_key(left) && is_varlen_key(right) &&
527  return true;
528  }
529  return false;
530 }
531 
532 bool contains_fragment_key(const std::set<ChunkKey>& key_set,
533  const ChunkKey& target_key) {
534  for (const auto& key : key_set) {
535  if (get_fragment_key(target_key) == get_fragment_key(key)) {
536  return true;
537  }
538  }
539  return false;
540 }
541 
543  const std::set<ChunkKey>& chunk_keys) {
544  ChunkToBufferMap chunk_buffer_map;
545  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
546  for (const auto& chunk_key : chunk_keys) {
547  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
548  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
549  chunk_buffer_map[chunk_key]->resetToEmpty();
550  }
551  return chunk_buffer_map;
552 }
553 
555  const std::map<ChunkKey, std::set<ParallelismHint>>& hints_per_table) {
556  std::unique_lock data_wrapper_lock(parallelism_hints_mutex_);
557  parallelism_hints_per_table_ = hints_per_table;
558 }
559 
560 std::pair<std::set<ChunkKey, decltype(set_comp)*>,
561  std::set<ChunkKey, decltype(set_comp)*>>
563  const ChunkKey& chunk_key,
564  const std::set<ChunkKey>& required_chunk_keys,
565  const ForeignDataWrapper::ParallelismLevel parallelism_level) const {
566  std::shared_lock data_wrapper_lock(parallelism_hints_mutex_);
567  auto same_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(set_comp);
568  auto diff_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(set_comp);
569 
570  auto table_hints = parallelism_hints_per_table_.find(get_table_key(chunk_key));
571  if (table_hints == parallelism_hints_per_table_.end()) {
572  return {{}, {}};
573  }
574  for (const auto& hint : table_hints->second) {
575  const auto& [column_id, fragment_id] = hint;
576  auto optional_chunk_key = get_table_key(chunk_key);
577  optional_chunk_key.push_back(column_id);
578  if (parallelism_level == ForeignDataWrapper::INTRA_FRAGMENT) {
579  optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
580  } else if (parallelism_level == ForeignDataWrapper::INTER_FRAGMENT) {
581  optional_chunk_key.push_back(fragment_id);
582  } else {
583  UNREACHABLE() << "Unknown parallelism level.";
584  }
585 
586  CHECK(!key_does_not_shard_to_leaf(optional_chunk_key));
587 
588  if (!contains_fragment_key(required_chunk_keys, optional_chunk_key)) {
589  // Do not insert an optional key if it is already a required key.
590  if (optional_chunk_key[CHUNK_KEY_FRAGMENT_IDX] ==
591  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
592  same_fragment_keys.emplace(optional_chunk_key);
593  } else {
594  diff_fragment_keys.emplace(optional_chunk_key);
595  }
596  }
597  }
598  return {same_fragment_keys, diff_fragment_keys};
599 }
600 
602  const ChunkKey& chunk_key,
603  const std::set<ChunkKey, decltype(set_comp)*>& same_fragment_keys,
604  const std::set<ChunkKey, decltype(set_comp)*>& diff_fragment_keys) const {
605  std::set<ChunkKey> optional_keys;
606  for (const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
607  for (auto key : keys) {
608  auto column_keys = get_column_key_set(key);
609  for (auto column_key : column_keys) {
610  optional_keys.emplace(column_key);
611  }
612  }
613  }
614  return optional_keys;
615 }
616 
618  const ChunkKey& chunk_key,
619  const std::set<ChunkKey>& required_chunk_keys,
620  const ForeignDataWrapper::ParallelismLevel parallelism_level) {
621  if (parallelism_level == ForeignDataWrapper::NONE) {
622  return {};
623  }
624 
625  auto [same_fragment_keys, diff_fragment_keys] =
626  getPrefetchSets(chunk_key, required_chunk_keys, parallelism_level);
627 
628  auto optional_keys =
629  getOptionalKeysWithinSizeLimit(chunk_key, same_fragment_keys, diff_fragment_keys);
630 
631  std::set<ChunkKey> optional_keys_to_delete;
632  if (!optional_keys.empty()) {
633  for (const auto& key : optional_keys) {
634  if (!shared::contains(optional_keys_to_delete, key)) {
635  auto key_set = get_column_key_set(key);
636  auto all_keys_cached =
637  std::all_of(key_set.begin(), key_set.end(), [this](const ChunkKey& key) {
638  return isChunkCached(key);
639  });
640  // Avoid cases where the optional_keys set or cache only has a subset of the
641  // column key set.
642  if (all_keys_cached) {
643  optional_keys_to_delete.insert(key_set.begin(), key_set.end());
644  } else {
645  evictChunkFromCache(key);
646  }
647  }
648  }
649  }
650  for (const auto& key : optional_keys_to_delete) {
651  optional_keys.erase(key);
652  }
653  return optional_keys;
654 }
655 
656 size_t ForeignStorageMgr::maxFetchSize(int32_t db_id) const {
657  return 0;
658 }
659 
661  return false;
662 }
663 
664 bool ForeignStorageMgr::isChunkCached(const ChunkKey& chunk_key) const {
665  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
666  return temp_chunk_buffer_map_.find(chunk_key) != temp_chunk_buffer_map_.end();
667 }
668 
669 void ForeignStorageMgr::evictChunkFromCache(const ChunkKey& chunk_key) {
670  std::unique_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
671  auto it = temp_chunk_buffer_map_.find(chunk_key);
672  if (it != temp_chunk_buffer_map_.end()) {
673  temp_chunk_buffer_map_.erase(it);
674  }
675 }
676 
677 // Determine if a wrapper is enabled on the current distributed node.
678 bool is_table_enabled_on_node(const ChunkKey& chunk_key) {
679  CHECK(is_table_key(chunk_key));
680 
681  // Replicated tables, system tables, and non-distributed tables are on all nodes by
682  // default. Leaf nodes are on, but will filter their results later by their node index.
684  is_replicated_table_chunk_key(chunk_key) || is_system_table_chunk_key(chunk_key)) {
685  return true;
686  }
687 
688  // If we aren't a leaf node then we are the aggregator, and the aggregator should not
689  // have sharded data.
690  return false;
691 }
692 } // namespace foreign_storage
std::lock_guard< T > lock_guard
bool set_comp(const ChunkKey &left, const ChunkKey &right)
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
bool contains(const T &container, const U &element)
Definition: misc.h:195
std::vector< int > ChunkKey
Definition: types.h:36
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
virtual std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const
std::string tableName
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_system_table_chunk_key(const ChunkKey &chunk_key)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
virtual bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
std::set< ChunkKey > getOptionalChunkKeySetAndNormalizeCache(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level)
void filter_metadata_by_leaf(ChunkMetadataVector &meta_vec, const ChunkKey &key_prefix)
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
ChunkSizeValidator(const ChunkKey &chunk_key)
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
#define UNREACHABLE()
Definition: Logger.h:338
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define CHECK_GE(x, y)
Definition: Logger.h:306
void free(AbstractBuffer *buffer) override
std::shared_ptr< Catalog_Namespace::Catalog > catalog_
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
const ColumnDescriptor * column_
void removeTableRelatedDS(const int db_id, const int table_id) override
virtual bool isChunkCached(const ChunkKey &chunk_key) const
bool is_replicated_table_chunk_key(const ChunkKey &chunk_key)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:57
std::pair< std::set< ChunkKey, decltype(set_comp)* >, std::set< ChunkKey, decltype(set_comp)* > > getPrefetchSets(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
int32_t StringOffsetT
Definition: sqltypes.h:1493
bool is_table_enabled_on_node(const ChunkKey &key)
std::shared_lock< T > shared_lock
This file contains the class specification and related data structures for Catalog.
void updateFragmenterMetadata(const ChunkToBufferMap &) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static SysCatalog & instance()
Definition: SysCatalog.h:343
bool g_enable_s3_fsi
Definition: Catalog.cpp:97
bool contains_fragment_key(const std::set< ChunkKey > &key_set, const ChunkKey &target_key)
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:79
bool hasDataWrapperForChunk(const ChunkKey &chunk_key) const
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
std::unique_lock< T > unique_lock
bool is_leaf_node()
Definition: distributed.cpp:29
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:48
An AbstractBuffer is a unit of data management for a data manager.
std::string getStringMgrType() override
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_max_chunk_size(const ChunkKey &key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
ChunkToBufferMap allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
virtual void eraseDataWrapper(const ChunkKey &table_key)
std::map< ChunkKey, std::shared_ptr< MockForeignDataWrapper > > mocked_wrapper_map_
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
void setParallelismHints(const std::map< ChunkKey, std::set< ParallelismHint >> &hints_per_table)
void validateChunkSizes(const ChunkToBufferMap &buffers) const
void createDataWrapperUnlocked(int32_t db, int32_t tb)
#define CHECK(condition)
Definition: Logger.h:291
virtual bool hasMaxFetchSize() const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
void throwChunkSizeViolatedError(const int64_t actual_chunk_size, const int column_id=-1) const
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
bool g_enable_fsi
Definition: Catalog.cpp:96
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
std::string columnName
virtual void evictChunkFromCache(const ChunkKey &chunk_key)
ChunkKey get_fragment_key(const ChunkKey &key)
Definition: types.h:90
std::vector< ChunkKey > get_column_key_vec(const ChunkKey &destination_chunk_key)
virtual size_t maxFetchSize(int32_t db_id) const
std::string printSlabs() override
bool is_distributed()
Definition: distributed.cpp:21
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:71
bool isBufferOnDevice(const ChunkKey &chunk_key) override