OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignStorageMgr.h"
18 
20 #include "Catalog/Catalog.h"
24 #include "ForeignTableSchema.h"
25 #include "Shared/distributed.h"
26 
27 extern bool g_enable_fsi;
28 extern bool g_enable_s3_fsi;
29 
30 namespace {
31 void filter_metadata_by_leaf(ChunkMetadataVector& meta_vec, const ChunkKey& key_prefix) {
32  if (!foreign_storage::is_shardable_key(key_prefix)) {
33  return;
34  }
35  for (auto it = meta_vec.begin(); it != meta_vec.end();) {
36  it = foreign_storage::fragment_maps_to_leaf(it->first) ? std::next(it)
37  : meta_vec.erase(it);
38  }
39 }
40 } // namespace
41 
42 namespace foreign_storage {
43 ForeignStorageMgr::ForeignStorageMgr() : AbstractBufferMgr(0), data_wrapper_map_({}) {}
44 
46  const size_t num_bytes) {
47  UNREACHABLE();
48  return nullptr; // Added to avoid "no return statement" compiler warning
49 }
50 
52  CHECK(has_table_prefix(chunk_key));
53  auto catalog =
55  CHECK(catalog);
56  auto foreign_table = catalog->getForeignTable(chunk_key[CHUNK_KEY_TABLE_IDX]);
57  auto storage_type_entry = foreign_table->foreign_server->options.find(
59 
60  if (storage_type_entry == foreign_table->foreign_server->options.end()) {
61  // Some FSI servers such as ODBC do not have a storage_type
62  return;
63  }
64  bool is_s3_storage_type =
65  (storage_type_entry->second == AbstractFileStorageDataWrapper::S3_STORAGE_TYPE);
66  if (is_s3_storage_type) {
67  throw ForeignStorageException{
68  "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
69  "currently disabled."};
70  }
71 }
72 
74  int table_id = chunk_key[CHUNK_KEY_TABLE_IDX];
75  column_id_ = chunk_key[CHUNK_KEY_COLUMN_IDX];
76  catalog_ =
77  Catalog_Namespace::SysCatalog::instance().getCatalog(chunk_key[CHUNK_KEY_DB_IDX]);
78  column_ = catalog_->getMetadataForColumn(table_id, column_id_);
79  foreign_table_ = catalog_->getForeignTable(table_id);
81 }
82 
83 void ChunkSizeValidator::validateChunkSize(const AbstractBuffer* buffer) const {
84  CHECK(buffer);
85  int64_t actual_chunk_size = buffer->size();
86  if (actual_chunk_size > max_chunk_size_) {
87  throwChunkSizeViolatedError(actual_chunk_size);
88  }
89 }
90 
92  for (const auto& [chunk_key, buffer] : buffers) {
93  int64_t actual_chunk_size = buffer->size();
94  if (actual_chunk_size > max_chunk_size_) {
95  throwChunkSizeViolatedError(actual_chunk_size, chunk_key[CHUNK_KEY_COLUMN_IDX]);
96  }
97  }
98 }
99 
100 void ChunkSizeValidator::throwChunkSizeViolatedError(const int64_t actual_chunk_size,
101  const int column_id) const {
102  std::string column_name = column_->columnName;
103  if (column_id > 0) {
104  column_name =
105  catalog_->getMetadataForColumn(foreign_table_->tableId, column_id)->columnName;
106  }
107  std::stringstream error_stream;
108  error_stream << "Chunk populated by data wrapper which is " << actual_chunk_size
109  << " bytes exceeds maximum byte size limit of " << max_chunk_size_ << "."
110  << " Foreign table: " << foreign_table_->tableName
111  << ", column name : " << column_name;
112  throw ForeignStorageException(error_stream.str());
113 }
114 
115 void ForeignStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
116  AbstractBuffer* destination_buffer,
117  const size_t num_bytes) {
118  ChunkSizeValidator chunk_size_validator(chunk_key);
119 
120  checkIfS3NeedsToBeEnabled(chunk_key);
121  CHECK(destination_buffer);
122  CHECK(!destination_buffer->isDirty());
123  // Use a temp buffer if we have no cache buffers and have one mapped for this chunk.
124  if (fetchBufferIfTempBufferMapEntryExists(chunk_key, destination_buffer, num_bytes)) {
125  chunk_size_validator.validateChunkSize(destination_buffer);
126  return;
127  }
128 
129  { // Clear any temp buffers if we've moved on to a new fragment
130  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
131  if (temp_chunk_buffer_map_.size() > 0 &&
133  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
135  }
136  }
137 
138  auto column_keys = get_column_key_set(chunk_key);
139 
140  // Use hints to prefetch other chunks in fragment
141  ChunkToBufferMap optional_buffers;
142 
143  // Use hints to prefetch other chunks in fragment into cache
144  auto optional_keys = getOptionalChunkKeySet(
145  chunk_key, column_keys, getDataWrapper(chunk_key)->getNonCachedParallelismLevel());
146  if (optional_keys.size()) {
147  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
148  // Erase anything already in temp_chunk_buffer_map_
149  // TODO(Misiu): Change to use std::erase_if when we get c++20
150  for (auto it = optional_keys.begin(); it != optional_keys.end();) {
151  if (temp_chunk_buffer_map_.find(*it) != temp_chunk_buffer_map_.end()) {
152  it = optional_keys.erase(it);
153  } else {
154  ++it;
155  }
156  }
157  }
158  if (optional_keys.size()) {
159  optional_buffers = allocateTempBuffersForChunks(optional_keys);
160  }
161 
162  // Remove the original key as it will be replaced by the destination_buffer.
163  column_keys.erase(chunk_key);
164  auto required_buffers = allocateTempBuffersForChunks(column_keys);
165  required_buffers[chunk_key] = destination_buffer;
166  // populate will write directly to destination_buffer so no need to copy.
167  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
168  chunk_size_validator.validateChunkSizes(required_buffers);
169  chunk_size_validator.validateChunkSizes(optional_buffers);
170  updateFragmenterMetadata(required_buffers);
171  updateFragmenterMetadata(optional_buffers);
172 }
173 
175  for (const auto& [key, buffer] : buffers) {
176  auto catalog =
177  Catalog_Namespace::SysCatalog::instance().getCatalog(key[CHUNK_KEY_DB_IDX]);
178  auto column = catalog->getMetadataForColumn(key[CHUNK_KEY_TABLE_IDX],
179  key[CHUNK_KEY_COLUMN_IDX]);
180  if (column->columnType.is_varlen_indeed() &&
181  key[CHUNK_KEY_VARLEN_IDX] == 2) { // skip over index buffers
182  continue;
183  }
184  auto foreign_table = catalog->getForeignTable(key[CHUNK_KEY_TABLE_IDX]);
185  auto fragmenter = foreign_table->fragmenter;
186  if (!fragmenter) {
187  continue;
188  }
189  auto encoder = buffer->getEncoder();
190  CHECK(encoder);
191  auto chunk_metadata = std::make_shared<ChunkMetadata>();
192  encoder->getMetadata(chunk_metadata);
193  fragmenter->updateColumnChunkMetadata(
194  column, key[CHUNK_KEY_FRAGMENT_IDX], chunk_metadata);
195  }
196 }
197 
199  const ChunkKey& chunk_key,
200  AbstractBuffer* destination_buffer,
201  size_t num_bytes) {
202  AbstractBuffer* buffer{nullptr};
203  {
204  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
205  if (temp_chunk_buffer_map_.find(chunk_key) == temp_chunk_buffer_map_.end()) {
206  return false;
207  }
208  buffer = temp_chunk_buffer_map_[chunk_key].get();
209  }
210  // For the index key, calls with size 0 get 1 added as
211  // empty index buffers start with one entry
212  // Set to 0 here to copy entire buffer
213  if (is_varlen_index_key(chunk_key) && (num_bytes == sizeof(StringOffsetT))) {
214  num_bytes = 0;
215  }
216  CHECK(buffer);
217  buffer->copyTo(destination_buffer, num_bytes);
218  {
219  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
220  temp_chunk_buffer_map_.erase(chunk_key);
221  }
222  return true;
223 }
224 
226  ChunkMetadataVector& chunk_metadata,
227  const ChunkKey& key_prefix) {
228  if (!g_enable_fsi) {
229  throw ForeignStorageException{
230  "Query cannot be executed for foreign table because FSI is currently disabled."};
231  }
232  CHECK(is_table_key(key_prefix));
233 
234  if (!is_table_enabled_on_node(key_prefix)) {
235  // If the table is not enabled for this node then the request should do nothing.
236  return;
237  }
238 
239  checkIfS3NeedsToBeEnabled(key_prefix);
240  createDataWrapperIfNotExists(key_prefix);
241 
242  try {
243  getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
244  filter_metadata_by_leaf(chunk_metadata, key_prefix);
245  } catch (...) {
246  eraseDataWrapper(key_prefix);
247  throw;
248  }
249 }
250 
251 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
252  const ChunkKey table_key{db_id, table_id};
253  {
254  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
255  data_wrapper_map_.erase(table_key);
256  }
258 }
259 
261  return FOREIGN_STORAGE_MGR;
262 }
263 
265  return ToString(FOREIGN_STORAGE_MGR);
266 }
267 
268 bool ForeignStorageMgr::hasDataWrapperForChunk(const ChunkKey& chunk_key) const {
269  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
270  CHECK(has_table_prefix(chunk_key));
271  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
272  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
273 }
274 
275 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
276  const ChunkKey& chunk_key) const {
277  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
278  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
279  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
280  return data_wrapper_map_.at(table_key);
281 }
282 
284  const ChunkKey& table_key,
285  std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
286  CHECK(is_table_key(table_key));
287  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
288  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
289  data_wrapper->setParentWrapper(data_wrapper_map_.at(table_key));
290  data_wrapper_map_[table_key] = data_wrapper;
291 }
292 
293 void ForeignStorageMgr::createDataWrapperUnlocked(int32_t db_id, int32_t tb_id) {
294  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
295  CHECK(catalog);
296  auto foreign_table = catalog->getForeignTable(tb_id);
297  ChunkKey table_key{db_id, tb_id};
299  foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
300 }
301 
303  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
304  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
305  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
306  auto [db_id, tb_id] = get_table_prefix(chunk_key);
307  createDataWrapperUnlocked(db_id, tb_id);
308  return true;
309  }
310  return false;
311 }
312 
313 void ForeignStorageMgr::refreshTable(const ChunkKey& table_key,
314  const bool evict_cached_entries) {
315  auto catalog =
316  Catalog_Namespace::SysCatalog::instance().getCatalog(table_key[CHUNK_KEY_DB_IDX]);
317  CHECK(catalog);
318  // Clear datawrapper unless table is non-append and evict is false
319  if (evict_cached_entries ||
320  !catalog->getForeignTable(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode()) {
321  eraseDataWrapper(table_key);
322  }
323 }
324 
325 void ForeignStorageMgr::eraseDataWrapper(const ChunkKey& table_key) {
326  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
327  // May not be created yet
328  if (data_wrapper_map_.find(table_key) != data_wrapper_map_.end()) {
329  data_wrapper_map_.erase(table_key);
330  }
331 }
332 
334  const ChunkKey& table_key) {
335  CHECK(is_table_key(table_key));
336  auto start_it = temp_chunk_buffer_map_.lower_bound(table_key);
337  ChunkKey upper_bound_prefix{table_key[CHUNK_KEY_DB_IDX],
338  table_key[CHUNK_KEY_TABLE_IDX],
339  std::numeric_limits<int>::max()};
340  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
341  temp_chunk_buffer_map_.erase(start_it, end_it);
342 }
343 
345  const ChunkKey& table_key) {
346  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
348 }
349 
350 bool ForeignStorageMgr::isDatawrapperRestored(const ChunkKey& chunk_key) {
351  if (!hasDataWrapperForChunk(chunk_key)) {
352  return false;
353  }
354  return getDataWrapper(chunk_key)->isRestored();
355 }
356 
357 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
358  UNREACHABLE();
359 }
360 
361 void ForeignStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
362  const bool purge) {
363  UNREACHABLE();
364 }
365 
366 bool ForeignStorageMgr::isBufferOnDevice(const ChunkKey& chunk_key) {
367  UNREACHABLE();
368  return false; // Added to avoid "no return statement" compiler warning
369 }
370 
372  UNREACHABLE();
373  return 0; // Added to avoid "no return statement" compiler warning
374 }
375 
377  const size_t page_size,
378  const size_t initial_size) {
379  UNREACHABLE();
380  return nullptr; // Added to avoid "no return statement" compiler warning
381 }
382 
384  AbstractBuffer* source_buffer,
385  const size_t num_bytes) {
386  UNREACHABLE();
387  return nullptr; // Added to avoid "no return statement" compiler warning
388 }
389 
390 std::string ForeignStorageMgr::printSlabs() {
391  UNREACHABLE();
392  return {}; // Added to avoid "no return statement" compiler warning
393 }
394 
396  UNREACHABLE();
397  return 0; // Added to avoid "no return statement" compiler warning
398 }
399 
401  UNREACHABLE();
402  return 0; // Added to avoid "no return statement" compiler warning
403 }
404 
406  UNREACHABLE();
407  return 0; // Added to avoid "no return statement" compiler warning
408 }
409 
411  UNREACHABLE();
412  return false; // Added to avoid "no return statement" compiler warning
413 }
414 
416  UNREACHABLE();
417 }
418 
419 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
420  UNREACHABLE();
421 }
422 
423 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
424  UNREACHABLE();
425  return nullptr; // Added to avoid "no return statement" compiler warning
426 }
427 
429  UNREACHABLE();
430 }
431 
432 size_t get_max_chunk_size(const ChunkKey& key) {
433  auto [db_id, table_id] = get_table_prefix(key);
434  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
435  CHECK(catalog);
436  return catalog->getForeignTable(table_id)->maxChunkSize;
437 }
438 
439 std::set<ChunkKey> get_column_key_set(const ChunkKey& destination_chunk_key) {
440  std::set<ChunkKey> chunk_keys;
441  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
442  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
443  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
444  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
445  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
446  CHECK(catalog);
447  auto foreign_table = catalog->getForeignTable(table_id);
448 
449  ForeignTableSchema schema{db_id, foreign_table};
450  auto logical_column = schema.getLogicalColumn(destination_column_id);
451  auto logical_column_id = logical_column->columnId;
452 
453  for (auto column_id = logical_column_id;
454  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
455  column_id++) {
456  auto column = schema.getColumnDescriptor(column_id);
457  if (column->columnType.is_varlen_indeed()) {
458  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
459  chunk_keys.emplace(data_chunk_key);
460 
461  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
462  chunk_keys.emplace(index_chunk_key);
463  } else {
464  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
465  chunk_keys.emplace(data_chunk_key);
466  }
467  }
468  return chunk_keys;
469 }
470 
471 std::vector<ChunkKey> get_column_key_vec(const ChunkKey& destination_chunk_key) {
472  std::vector<ChunkKey> chunk_keys;
473  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
474  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
475  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
476  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
477  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
478  CHECK(catalog);
479  auto foreign_table = catalog->getForeignTable(table_id);
480 
481  ForeignTableSchema schema{db_id, foreign_table};
482  auto logical_column = schema.getLogicalColumn(destination_column_id);
483  auto logical_column_id = logical_column->columnId;
484 
485  for (auto column_id = logical_column_id;
486  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
487  column_id++) {
488  auto column = schema.getColumnDescriptor(column_id);
489  if (column->columnType.is_varlen_indeed()) {
490  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
491  chunk_keys.emplace_back(data_chunk_key);
492 
493  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
494  chunk_keys.emplace_back(index_chunk_key);
495  } else {
496  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
497  chunk_keys.emplace_back(data_chunk_key);
498  }
499  }
500  return chunk_keys;
501 }
502 
503 // Defines the "<" operator to use as a comparator.
504 // This is similar to comparing chunks normally, but we want to give fragments a higher
505 // priority than columns so that if we have to exit early we prioritize same fragment
506 // fetching.
507 bool set_comp(const ChunkKey& left, const ChunkKey& right) {
508  CHECK_GE(left.size(), 4ULL);
509  CHECK_GE(right.size(), 4ULL);
510  if ((left[CHUNK_KEY_DB_IDX] < right[CHUNK_KEY_DB_IDX]) ||
511  (left[CHUNK_KEY_TABLE_IDX] < right[CHUNK_KEY_TABLE_IDX]) ||
513  (left[CHUNK_KEY_COLUMN_IDX] < right[CHUNK_KEY_COLUMN_IDX])) {
514  return true;
515  }
516  if (left.size() < right.size()) {
517  return true;
518  }
519  if (is_varlen_key(left) && is_varlen_key(right) &&
521  return true;
522  }
523  return false;
524 }
525 
526 bool contains_fragment_key(const std::set<ChunkKey>& key_set,
527  const ChunkKey& target_key) {
528  for (const auto& key : key_set) {
529  if (get_fragment_key(target_key) == get_fragment_key(key)) {
530  return true;
531  }
532  }
533  return false;
534 }
535 
537  const std::set<ChunkKey>& chunk_keys) {
538  ChunkToBufferMap chunk_buffer_map;
539  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
540  for (const auto& chunk_key : chunk_keys) {
541  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
542  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
543  chunk_buffer_map[chunk_key]->resetToEmpty();
544  }
545  return chunk_buffer_map;
546 }
547 
549  const std::map<ChunkKey, std::set<ParallelismHint>>& hints_per_table) {
550  std::unique_lock data_wrapper_lock(parallelism_hints_mutex_);
551  parallelism_hints_per_table_ = hints_per_table;
552 }
553 
554 std::pair<std::set<ChunkKey, decltype(set_comp)*>,
555  std::set<ChunkKey, decltype(set_comp)*>>
557  const ChunkKey& chunk_key,
558  const std::set<ChunkKey>& required_chunk_keys,
559  const ForeignDataWrapper::ParallelismLevel parallelism_level) const {
560  std::shared_lock data_wrapper_lock(parallelism_hints_mutex_);
561  auto same_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(set_comp);
562  auto diff_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(set_comp);
563 
564  auto table_hints = parallelism_hints_per_table_.find(get_table_key(chunk_key));
565  if (table_hints == parallelism_hints_per_table_.end()) {
566  return {{}, {}};
567  }
568  for (const auto& hint : table_hints->second) {
569  const auto& [column_id, fragment_id] = hint;
570  auto optional_chunk_key = get_table_key(chunk_key);
571  optional_chunk_key.push_back(column_id);
572  if (parallelism_level == ForeignDataWrapper::INTRA_FRAGMENT) {
573  optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
574  } else if (parallelism_level == ForeignDataWrapper::INTER_FRAGMENT) {
575  optional_chunk_key.push_back(fragment_id);
576  } else {
577  UNREACHABLE() << "Unknown parallelism level.";
578  }
579 
580  CHECK(!key_does_not_shard_to_leaf(optional_chunk_key));
581 
582  if (!contains_fragment_key(required_chunk_keys, optional_chunk_key)) {
583  // Do not insert an optional key if it is already a required key.
584  if (optional_chunk_key[CHUNK_KEY_FRAGMENT_IDX] ==
585  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
586  same_fragment_keys.emplace(optional_chunk_key);
587  } else {
588  diff_fragment_keys.emplace(optional_chunk_key);
589  }
590  }
591  }
592  return {same_fragment_keys, diff_fragment_keys};
593 }
594 
596  const ChunkKey& chunk_key,
597  const std::set<ChunkKey, decltype(set_comp)*>& same_fragment_keys,
598  const std::set<ChunkKey, decltype(set_comp)*>& diff_fragment_keys) const {
599  std::set<ChunkKey> optional_keys;
600  for (const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
601  for (auto key : keys) {
602  auto column_keys = get_column_key_set(key);
603  for (auto column_key : column_keys) {
604  optional_keys.emplace(column_key);
605  }
606  }
607  }
608  return optional_keys;
609 }
610 
612  const ChunkKey& chunk_key,
613  const std::set<ChunkKey>& required_chunk_keys,
614  const ForeignDataWrapper::ParallelismLevel parallelism_level) const {
615  if (parallelism_level == ForeignDataWrapper::NONE) {
616  return {};
617  }
618 
619  auto [same_fragment_keys, diff_fragment_keys] =
620  getPrefetchSets(chunk_key, required_chunk_keys, parallelism_level);
621 
623  chunk_key, same_fragment_keys, diff_fragment_keys);
624 }
625 
626 size_t ForeignStorageMgr::maxFetchSize(int32_t db_id) const {
627  return 0;
628 }
629 
631  return false;
632 }
633 
634 // Determine if a wrapper is enabled on the current distributed node.
635 bool is_table_enabled_on_node(const ChunkKey& chunk_key) {
636  CHECK(is_table_key(chunk_key));
637 
638  // Replicated tables, system tables, and non-distributed tables are on all nodes by
639  // default. Leaf nodes are on, but will filter their results later by their node index.
641  is_replicated_table_chunk_key(chunk_key) || is_system_table_chunk_key(chunk_key)) {
642  return true;
643  }
644 
645  // If we aren't a leaf node then we are the aggregator, and the aggregator should not
646  // have sharded data.
647  return false;
648 }
649 } // namespace foreign_storage
bool set_comp(const ChunkKey &left, const ChunkKey &right)
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
std::vector< int > ChunkKey
Definition: types.h:37
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
virtual std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const
std::string tableName
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_system_table_chunk_key(const ChunkKey &chunk_key)
bool is_table_key(const ChunkKey &key)
Definition: types.h:45
virtual bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
void filter_metadata_by_leaf(ChunkMetadataVector &meta_vec, const ChunkKey &key_prefix)
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
ChunkSizeValidator(const ChunkKey &chunk_key)
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
#define UNREACHABLE()
Definition: Logger.h:267
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define CHECK_GE(x, y)
Definition: Logger.h:236
void free(AbstractBuffer *buffer) override
std::shared_ptr< Catalog_Namespace::Catalog > catalog_
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
const ColumnDescriptor * column_
void removeTableRelatedDS(const int db_id, const int table_id) override
bool is_replicated_table_chunk_key(const ChunkKey &chunk_key)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:58
std::pair< std::set< ChunkKey, decltype(set_comp)* >, std::set< ChunkKey, decltype(set_comp)* > > getPrefetchSets(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
int32_t StringOffsetT
Definition: sqltypes.h:1113
bool is_table_enabled_on_node(const ChunkKey &key)
This file contains the class specification and related data structures for Catalog.
void updateFragmenterMetadata(const ChunkToBufferMap &) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static SysCatalog & instance()
Definition: SysCatalog.h:337
bool g_enable_s3_fsi
Definition: Catalog.cpp:98
bool contains_fragment_key(const std::set< ChunkKey > &key_set, const ChunkKey &target_key)
std::set< ChunkKey > getOptionalChunkKeySet(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:80
bool hasDataWrapperForChunk(const ChunkKey &chunk_key) const
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
bool is_leaf_node()
Definition: distributed.cpp:29
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:49
An AbstractBuffer is a unit of data management for a data manager.
std::string getStringMgrType() override
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_max_chunk_size(const ChunkKey &key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
ChunkToBufferMap allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:43
virtual void eraseDataWrapper(const ChunkKey &table_key)
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:63
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
void setParallelismHints(const std::map< ChunkKey, std::set< ParallelismHint >> &hints_per_table)
void validateChunkSizes(const ChunkToBufferMap &buffers) const
void createDataWrapperUnlocked(int32_t db, int32_t tb)
#define CHECK(condition)
Definition: Logger.h:223
virtual bool hasMaxFetchSize() const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
void throwChunkSizeViolatedError(const int64_t actual_chunk_size, const int column_id=-1) const
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
bool g_enable_fsi
Definition: Catalog.cpp:97
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
std::string columnName
ChunkKey get_fragment_key(const ChunkKey &key)
Definition: types.h:91
std::vector< ChunkKey > get_column_key_vec(const ChunkKey &destination_chunk_key)
virtual size_t maxFetchSize(int32_t db_id) const
std::string printSlabs() override
bool is_distributed()
Definition: distributed.cpp:21
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:72
bool isBufferOnDevice(const ChunkKey &chunk_key) override