OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignStorageMgr.h"
18 
20 #include "Catalog/Catalog.h"
24 #include "ForeignTableSchema.h"
25 #include "Shared/distributed.h"
26 
27 extern bool g_enable_fsi;
28 extern bool g_enable_s3_fsi;
29 
30 namespace {
31 void filter_metadata_by_leaf(ChunkMetadataVector& meta_vec, const ChunkKey& key_prefix) {
32  if (!foreign_storage::is_shardable_key(key_prefix)) {
33  return;
34  }
35  for (auto it = meta_vec.begin(); it != meta_vec.end();) {
36  it = foreign_storage::fragment_maps_to_leaf(it->first) ? std::next(it)
37  : meta_vec.erase(it);
38  }
39 }
40 } // namespace
41 
42 namespace foreign_storage {
43 ForeignStorageMgr::ForeignStorageMgr() : AbstractBufferMgr(0), data_wrapper_map_({}) {}
44 
46  const size_t num_bytes) {
47  UNREACHABLE();
48  return nullptr; // Added to avoid "no return statement" compiler warning
49 }
50 
52  CHECK(has_table_prefix(chunk_key));
53  auto catalog =
55  CHECK(catalog);
56  auto foreign_table = catalog->getForeignTable(chunk_key[CHUNK_KEY_TABLE_IDX]);
57  auto storage_type_entry = foreign_table->foreign_server->options.find(
59 
60  if (storage_type_entry == foreign_table->foreign_server->options.end()) {
61  // Some FSI servers such as ODBC do not have a storage_type
62  return;
63  }
64  bool is_s3_storage_type =
65  (storage_type_entry->second == AbstractFileStorageDataWrapper::S3_STORAGE_TYPE);
66  if (is_s3_storage_type) {
67  throw ForeignStorageException{
68  "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
69  "currently disabled."};
70  }
71 }
72 
74  int table_id = chunk_key[CHUNK_KEY_TABLE_IDX];
75  column_id_ = chunk_key[CHUNK_KEY_COLUMN_IDX];
76  catalog_ =
77  Catalog_Namespace::SysCatalog::instance().getCatalog(chunk_key[CHUNK_KEY_DB_IDX]);
78  column_ = catalog_->getMetadataForColumn(table_id, column_id_);
79  foreign_table_ = catalog_->getForeignTable(table_id);
81 }
82 
83 void ChunkSizeValidator::validateChunkSize(const AbstractBuffer* buffer) const {
84  CHECK(buffer);
85  int64_t actual_chunk_size = buffer->size();
86  if (actual_chunk_size > max_chunk_size_) {
87  throwChunkSizeViolatedError(actual_chunk_size);
88  }
89 }
90 
92  for (const auto& [chunk_key, buffer] : buffers) {
93  int64_t actual_chunk_size = buffer->size();
94  if (actual_chunk_size > max_chunk_size_) {
95  throwChunkSizeViolatedError(actual_chunk_size, chunk_key[CHUNK_KEY_COLUMN_IDX]);
96  }
97  }
98 }
99 
100 void ChunkSizeValidator::throwChunkSizeViolatedError(const int64_t actual_chunk_size,
101  const int column_id) const {
102  std::string column_name = column_->columnName;
103  if (column_id > 0) {
104  column_name =
105  catalog_->getMetadataForColumn(foreign_table_->tableId, column_id)->columnName;
106  }
107  std::stringstream error_stream;
108  error_stream << "Chunk populated by data wrapper which is " << actual_chunk_size
109  << " bytes exceeds maximum byte size limit of " << max_chunk_size_ << "."
110  << " Foreign table: " << foreign_table_->tableName
111  << ", column name : " << column_name;
112  throw ForeignStorageException(error_stream.str());
113 }
114 
115 void ForeignStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
116  AbstractBuffer* destination_buffer,
117  const size_t num_bytes) {
118  ChunkSizeValidator chunk_size_validator(chunk_key);
119 
120  checkIfS3NeedsToBeEnabled(chunk_key);
121  CHECK(destination_buffer);
122  CHECK(!destination_buffer->isDirty());
123  // Use a temp buffer if we have no cache buffers and have one mapped for this chunk.
124  if (fetchBufferIfTempBufferMapEntryExists(chunk_key, destination_buffer, num_bytes)) {
125  chunk_size_validator.validateChunkSize(destination_buffer);
126  return;
127  }
128 
129  { // Clear any temp buffers if we've moved on to a new fragment
130  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
131  if (temp_chunk_buffer_map_.size() > 0 &&
133  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
135  }
136  }
137 
138  auto column_keys = get_column_key_set(chunk_key);
139 
140  // Use hints to prefetch other chunks in fragment
141  ChunkToBufferMap optional_buffers;
142 
143  // Use hints to prefetch other chunks in fragment into cache
144  auto optional_keys = getOptionalChunkKeySet(
145  chunk_key, column_keys, getDataWrapper(chunk_key)->getNonCachedParallelismLevel());
146  if (optional_keys.size()) {
147  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
148  // Erase anything already in temp_chunk_buffer_map_
149  // TODO(Misiu): Change to use std::erase_if when we get c++20
150  for (auto it = optional_keys.begin(); it != optional_keys.end();) {
151  if (temp_chunk_buffer_map_.find(*it) != temp_chunk_buffer_map_.end()) {
152  it = optional_keys.erase(it);
153  } else {
154  ++it;
155  }
156  }
157  }
158  if (optional_keys.size()) {
159  optional_buffers = allocateTempBuffersForChunks(optional_keys);
160  }
161 
162  // Remove the original key as it will be replaced by the destination_buffer.
163  column_keys.erase(chunk_key);
164  auto required_buffers = allocateTempBuffersForChunks(column_keys);
165  required_buffers[chunk_key] = destination_buffer;
166  // populate will write directly to destination_buffer so no need to copy.
167  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
168  chunk_size_validator.validateChunkSizes(required_buffers);
169  chunk_size_validator.validateChunkSizes(optional_buffers);
170  updateFragmenterMetadata(required_buffers);
171  updateFragmenterMetadata(optional_buffers);
172 }
173 
175  for (const auto& [key, buffer] : buffers) {
176  auto catalog =
177  Catalog_Namespace::SysCatalog::instance().getCatalog(key[CHUNK_KEY_DB_IDX]);
178  auto column = catalog->getMetadataForColumn(key[CHUNK_KEY_TABLE_IDX],
179  key[CHUNK_KEY_COLUMN_IDX]);
180  if (column->columnType.is_varlen_indeed() &&
181  key[CHUNK_KEY_VARLEN_IDX] == 2) { // skip over index buffers
182  continue;
183  }
184  auto foreign_table = catalog->getForeignTable(key[CHUNK_KEY_TABLE_IDX]);
185  auto fragmenter = foreign_table->fragmenter;
186  if (!fragmenter) {
187  continue;
188  }
189  auto encoder = buffer->getEncoder();
190  CHECK(encoder);
191  auto chunk_metadata = std::make_shared<ChunkMetadata>();
192  encoder->getMetadata(chunk_metadata);
193  fragmenter->updateColumnChunkMetadata(
194  column, key[CHUNK_KEY_FRAGMENT_IDX], chunk_metadata);
195  }
196 }
197 
199  const ChunkKey& chunk_key,
200  AbstractBuffer* destination_buffer,
201  size_t num_bytes) {
202  AbstractBuffer* buffer{nullptr};
203  {
204  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
205  if (temp_chunk_buffer_map_.find(chunk_key) == temp_chunk_buffer_map_.end()) {
206  return false;
207  }
208  buffer = temp_chunk_buffer_map_[chunk_key].get();
209  }
210  // For the index key, calls with size 0 get 1 added as
211  // empty index buffers start with one entry
212  // Set to 0 here to copy entire buffer
213  if (is_varlen_index_key(chunk_key) && (num_bytes == sizeof(StringOffsetT))) {
214  num_bytes = 0;
215  }
216  CHECK(buffer);
217  buffer->copyTo(destination_buffer, num_bytes);
218  {
219  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
220  temp_chunk_buffer_map_.erase(chunk_key);
221  }
222  return true;
223 }
224 
226  ChunkMetadataVector& chunk_metadata,
227  const ChunkKey& key_prefix) {
228  if (!g_enable_fsi) {
229  throw ForeignStorageException{
230  "Query cannot be executed for foreign table because FSI is currently disabled."};
231  }
232  CHECK(is_table_key(key_prefix));
233 
234  if (!is_table_enabled_on_node(key_prefix)) {
235  // If the table is not enabled for this node then the request should do nothing.
236  return;
237  }
238 
239  checkIfS3NeedsToBeEnabled(key_prefix);
240  createDataWrapperIfNotExists(key_prefix);
241 
242  try {
243  getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
244  filter_metadata_by_leaf(chunk_metadata, key_prefix);
245  } catch (...) {
246  eraseDataWrapper(key_prefix);
247  throw;
248  }
249 }
250 
251 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
252  const ChunkKey table_key{db_id, table_id};
253  {
254  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
255  if (auto mock_it = mocked_wrapper_map_.find(table_key);
256  mock_it != mocked_wrapper_map_.end()) {
257  mock_it->second->unsetParentWrapper();
258  }
259  data_wrapper_map_.erase(table_key);
260  }
262 }
263 
265  return FOREIGN_STORAGE_MGR;
266 }
267 
269  return ToString(FOREIGN_STORAGE_MGR);
270 }
271 
272 bool ForeignStorageMgr::hasDataWrapperForChunk(const ChunkKey& chunk_key) const {
273  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
274  CHECK(has_table_prefix(chunk_key));
275  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
276  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
277 }
278 
279 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
280  const ChunkKey& chunk_key) const {
281  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
282  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
283  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
284  return data_wrapper_map_.at(table_key);
285 }
286 
288  const ChunkKey& table_key,
289  std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
290  CHECK(is_table_key(table_key));
291  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
292  if (auto wrapper_iter = data_wrapper_map_.find(table_key);
293  wrapper_iter != data_wrapper_map_.end()) {
294  data_wrapper->setParentWrapper(wrapper_iter->second);
295  data_wrapper_map_[table_key] = data_wrapper;
296  }
297  // If a wrapper does not yet exist, then delay setting the mock until we actually
298  // create the wrapper. Preserve mock wrappers separately so they can persist the parent
299  // being re-created.
300  mocked_wrapper_map_[table_key] = data_wrapper;
301 }
302 
303 void ForeignStorageMgr::createDataWrapperUnlocked(int32_t db_id, int32_t tb_id) {
304  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
305  CHECK(catalog);
306  auto foreign_table = catalog->getForeignTable(tb_id);
307  ChunkKey table_key{db_id, tb_id};
309  foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
310 
311  // If we are testing with mocks, then we want to re-wrap new wrappers with mocks if a
312  // table was given a mock wrapper earlier and destroyed.
313  if (auto mock_it = mocked_wrapper_map_.find(table_key);
314  mock_it != mocked_wrapper_map_.end()) {
315  mock_it->second->setParentWrapper(data_wrapper_map_.at(table_key));
316  data_wrapper_map_[table_key] = mock_it->second;
317  }
318 }
319 
321  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
322  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
323  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
324  auto [db_id, tb_id] = get_table_prefix(chunk_key);
325  createDataWrapperUnlocked(db_id, tb_id);
326  return true;
327  }
328  return false;
329 }
330 
331 void ForeignStorageMgr::refreshTable(const ChunkKey& table_key,
332  const bool evict_cached_entries) {
333  auto catalog =
334  Catalog_Namespace::SysCatalog::instance().getCatalog(table_key[CHUNK_KEY_DB_IDX]);
335  CHECK(catalog);
336  // Clear datawrapper unless table is non-append and evict is false
337  if (evict_cached_entries ||
338  !catalog->getForeignTable(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode()) {
339  eraseDataWrapper(table_key);
340  }
341 }
342 
343 void ForeignStorageMgr::eraseDataWrapper(const ChunkKey& table_key) {
344  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
345  // May not be created yet
346  if (data_wrapper_map_.find(table_key) != data_wrapper_map_.end()) {
347  data_wrapper_map_.erase(table_key);
348  }
349 }
350 
352  const ChunkKey& table_key) {
353  CHECK(is_table_key(table_key));
354  auto start_it = temp_chunk_buffer_map_.lower_bound(table_key);
355  ChunkKey upper_bound_prefix{table_key[CHUNK_KEY_DB_IDX],
356  table_key[CHUNK_KEY_TABLE_IDX],
357  std::numeric_limits<int>::max()};
358  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
359  temp_chunk_buffer_map_.erase(start_it, end_it);
360 }
361 
363  const ChunkKey& table_key) {
364  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
366 }
367 
368 bool ForeignStorageMgr::isDatawrapperRestored(const ChunkKey& chunk_key) {
369  if (!hasDataWrapperForChunk(chunk_key)) {
370  return false;
371  }
372  return getDataWrapper(chunk_key)->isRestored();
373 }
374 
375 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
376  UNREACHABLE();
377 }
378 
379 void ForeignStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
380  const bool purge) {
381  UNREACHABLE();
382 }
383 
384 bool ForeignStorageMgr::isBufferOnDevice(const ChunkKey& chunk_key) {
385  UNREACHABLE();
386  return false; // Added to avoid "no return statement" compiler warning
387 }
388 
390  UNREACHABLE();
391  return 0; // Added to avoid "no return statement" compiler warning
392 }
393 
395  const size_t page_size,
396  const size_t initial_size) {
397  UNREACHABLE();
398  return nullptr; // Added to avoid "no return statement" compiler warning
399 }
400 
402  AbstractBuffer* source_buffer,
403  const size_t num_bytes) {
404  UNREACHABLE();
405  return nullptr; // Added to avoid "no return statement" compiler warning
406 }
407 
408 std::string ForeignStorageMgr::printSlabs() {
409  UNREACHABLE();
410  return {}; // Added to avoid "no return statement" compiler warning
411 }
412 
414  UNREACHABLE();
415  return 0; // Added to avoid "no return statement" compiler warning
416 }
417 
419  UNREACHABLE();
420  return 0; // Added to avoid "no return statement" compiler warning
421 }
422 
424  UNREACHABLE();
425  return 0; // Added to avoid "no return statement" compiler warning
426 }
427 
429  UNREACHABLE();
430  return false; // Added to avoid "no return statement" compiler warning
431 }
432 
434  UNREACHABLE();
435 }
436 
437 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
438  UNREACHABLE();
439 }
440 
441 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
442  UNREACHABLE();
443  return nullptr; // Added to avoid "no return statement" compiler warning
444 }
445 
447  UNREACHABLE();
448 }
449 
450 size_t get_max_chunk_size(const ChunkKey& key) {
451  auto [db_id, table_id] = get_table_prefix(key);
452  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
453  CHECK(catalog);
454  return catalog->getForeignTable(table_id)->maxChunkSize;
455 }
456 
457 std::set<ChunkKey> get_column_key_set(const ChunkKey& destination_chunk_key) {
458  std::set<ChunkKey> chunk_keys;
459  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
460  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
461  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
462  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
463  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
464  CHECK(catalog);
465  auto foreign_table = catalog->getForeignTable(table_id);
466 
467  ForeignTableSchema schema{db_id, foreign_table};
468  auto logical_column = schema.getLogicalColumn(destination_column_id);
469  auto logical_column_id = logical_column->columnId;
470 
471  for (auto column_id = logical_column_id;
472  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
473  column_id++) {
474  auto column = schema.getColumnDescriptor(column_id);
475  if (column->columnType.is_varlen_indeed()) {
476  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
477  chunk_keys.emplace(data_chunk_key);
478 
479  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
480  chunk_keys.emplace(index_chunk_key);
481  } else {
482  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
483  chunk_keys.emplace(data_chunk_key);
484  }
485  }
486  return chunk_keys;
487 }
488 
489 std::vector<ChunkKey> get_column_key_vec(const ChunkKey& destination_chunk_key) {
490  std::vector<ChunkKey> chunk_keys;
491  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
492  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
493  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
494  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
495  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
496  CHECK(catalog);
497  auto foreign_table = catalog->getForeignTable(table_id);
498 
499  ForeignTableSchema schema{db_id, foreign_table};
500  auto logical_column = schema.getLogicalColumn(destination_column_id);
501  auto logical_column_id = logical_column->columnId;
502 
503  for (auto column_id = logical_column_id;
504  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
505  column_id++) {
506  auto column = schema.getColumnDescriptor(column_id);
507  if (column->columnType.is_varlen_indeed()) {
508  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
509  chunk_keys.emplace_back(data_chunk_key);
510 
511  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
512  chunk_keys.emplace_back(index_chunk_key);
513  } else {
514  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
515  chunk_keys.emplace_back(data_chunk_key);
516  }
517  }
518  return chunk_keys;
519 }
520 
521 // Defines the "<" operator to use as a comparator.
522 // This is similar to comparing chunks normally, but we want to give fragments a higher
523 // priority than columns so that if we have to exit early we prioritize same fragment
524 // fetching.
525 bool set_comp(const ChunkKey& left, const ChunkKey& right) {
526  CHECK_GE(left.size(), 4ULL);
527  CHECK_GE(right.size(), 4ULL);
528  if ((left[CHUNK_KEY_DB_IDX] < right[CHUNK_KEY_DB_IDX]) ||
529  (left[CHUNK_KEY_TABLE_IDX] < right[CHUNK_KEY_TABLE_IDX]) ||
531  (left[CHUNK_KEY_COLUMN_IDX] < right[CHUNK_KEY_COLUMN_IDX])) {
532  return true;
533  }
534  if (left.size() < right.size()) {
535  return true;
536  }
537  if (is_varlen_key(left) && is_varlen_key(right) &&
539  return true;
540  }
541  return false;
542 }
543 
544 bool contains_fragment_key(const std::set<ChunkKey>& key_set,
545  const ChunkKey& target_key) {
546  for (const auto& key : key_set) {
547  if (get_fragment_key(target_key) == get_fragment_key(key)) {
548  return true;
549  }
550  }
551  return false;
552 }
553 
555  const std::set<ChunkKey>& chunk_keys) {
556  ChunkToBufferMap chunk_buffer_map;
557  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
558  for (const auto& chunk_key : chunk_keys) {
559  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
560  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
561  chunk_buffer_map[chunk_key]->resetToEmpty();
562  }
563  return chunk_buffer_map;
564 }
565 
567  const std::map<ChunkKey, std::set<ParallelismHint>>& hints_per_table) {
568  std::unique_lock data_wrapper_lock(parallelism_hints_mutex_);
569  parallelism_hints_per_table_ = hints_per_table;
570 }
571 
572 std::pair<std::set<ChunkKey, decltype(set_comp)*>,
573  std::set<ChunkKey, decltype(set_comp)*>>
575  const ChunkKey& chunk_key,
576  const std::set<ChunkKey>& required_chunk_keys,
577  const ForeignDataWrapper::ParallelismLevel parallelism_level) const {
578  std::shared_lock data_wrapper_lock(parallelism_hints_mutex_);
579  auto same_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(set_comp);
580  auto diff_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(set_comp);
581 
582  auto table_hints = parallelism_hints_per_table_.find(get_table_key(chunk_key));
583  if (table_hints == parallelism_hints_per_table_.end()) {
584  return {{}, {}};
585  }
586  for (const auto& hint : table_hints->second) {
587  const auto& [column_id, fragment_id] = hint;
588  auto optional_chunk_key = get_table_key(chunk_key);
589  optional_chunk_key.push_back(column_id);
590  if (parallelism_level == ForeignDataWrapper::INTRA_FRAGMENT) {
591  optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
592  } else if (parallelism_level == ForeignDataWrapper::INTER_FRAGMENT) {
593  optional_chunk_key.push_back(fragment_id);
594  } else {
595  UNREACHABLE() << "Unknown parallelism level.";
596  }
597 
598  CHECK(!key_does_not_shard_to_leaf(optional_chunk_key));
599 
600  if (!contains_fragment_key(required_chunk_keys, optional_chunk_key)) {
601  // Do not insert an optional key if it is already a required key.
602  if (optional_chunk_key[CHUNK_KEY_FRAGMENT_IDX] ==
603  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
604  same_fragment_keys.emplace(optional_chunk_key);
605  } else {
606  diff_fragment_keys.emplace(optional_chunk_key);
607  }
608  }
609  }
610  return {same_fragment_keys, diff_fragment_keys};
611 }
612 
614  const ChunkKey& chunk_key,
615  const std::set<ChunkKey, decltype(set_comp)*>& same_fragment_keys,
616  const std::set<ChunkKey, decltype(set_comp)*>& diff_fragment_keys) const {
617  std::set<ChunkKey> optional_keys;
618  for (const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
619  for (auto key : keys) {
620  auto column_keys = get_column_key_set(key);
621  for (auto column_key : column_keys) {
622  optional_keys.emplace(column_key);
623  }
624  }
625  }
626  return optional_keys;
627 }
628 
630  const ChunkKey& chunk_key,
631  const std::set<ChunkKey>& required_chunk_keys,
632  const ForeignDataWrapper::ParallelismLevel parallelism_level) const {
633  if (parallelism_level == ForeignDataWrapper::NONE) {
634  return {};
635  }
636 
637  auto [same_fragment_keys, diff_fragment_keys] =
638  getPrefetchSets(chunk_key, required_chunk_keys, parallelism_level);
639 
641  chunk_key, same_fragment_keys, diff_fragment_keys);
642 }
643 
644 size_t ForeignStorageMgr::maxFetchSize(int32_t db_id) const {
645  return 0;
646 }
647 
649  return false;
650 }
651 
652 // Determine if a wrapper is enabled on the current distributed node.
653 bool is_table_enabled_on_node(const ChunkKey& chunk_key) {
654  CHECK(is_table_key(chunk_key));
655 
656  // Replicated tables, system tables, and non-distributed tables are on all nodes by
657  // default. Leaf nodes are on, but will filter their results later by their node index.
659  is_replicated_table_chunk_key(chunk_key) || is_system_table_chunk_key(chunk_key)) {
660  return true;
661  }
662 
663  // If we aren't a leaf node then we are the aggregator, and the aggregator should not
664  // have sharded data.
665  return false;
666 }
667 } // namespace foreign_storage
std::lock_guard< T > lock_guard
bool set_comp(const ChunkKey &left, const ChunkKey &right)
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
std::vector< int > ChunkKey
Definition: types.h:36
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
virtual std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const
std::string tableName
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_system_table_chunk_key(const ChunkKey &chunk_key)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
virtual bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
void filter_metadata_by_leaf(ChunkMetadataVector &meta_vec, const ChunkKey &key_prefix)
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
ChunkSizeValidator(const ChunkKey &chunk_key)
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
#define UNREACHABLE()
Definition: Logger.h:337
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
#define CHECK_GE(x, y)
Definition: Logger.h:306
void free(AbstractBuffer *buffer) override
std::shared_ptr< Catalog_Namespace::Catalog > catalog_
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
const ColumnDescriptor * column_
void removeTableRelatedDS(const int db_id, const int table_id) override
bool is_replicated_table_chunk_key(const ChunkKey &chunk_key)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:57
std::pair< std::set< ChunkKey, decltype(set_comp)* >, std::set< ChunkKey, decltype(set_comp)* > > getPrefetchSets(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
int32_t StringOffsetT
Definition: sqltypes.h:1258
bool is_table_enabled_on_node(const ChunkKey &key)
std::shared_lock< T > shared_lock
This file contains the class specification and related data structures for Catalog.
void updateFragmenterMetadata(const ChunkToBufferMap &) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static SysCatalog & instance()
Definition: SysCatalog.h:343
bool g_enable_s3_fsi
Definition: Catalog.cpp:97
bool contains_fragment_key(const std::set< ChunkKey > &key_set, const ChunkKey &target_key)
std::set< ChunkKey > getOptionalChunkKeySet(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:79
bool hasDataWrapperForChunk(const ChunkKey &chunk_key) const
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
std::unique_lock< T > unique_lock
bool is_leaf_node()
Definition: distributed.cpp:29
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:48
An AbstractBuffer is a unit of data management for a data manager.
std::string getStringMgrType() override
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_max_chunk_size(const ChunkKey &key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
ChunkToBufferMap allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
virtual void eraseDataWrapper(const ChunkKey &table_key)
std::map< ChunkKey, std::shared_ptr< MockForeignDataWrapper > > mocked_wrapper_map_
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
void setParallelismHints(const std::map< ChunkKey, std::set< ParallelismHint >> &hints_per_table)
void validateChunkSizes(const ChunkToBufferMap &buffers) const
void createDataWrapperUnlocked(int32_t db, int32_t tb)
#define CHECK(condition)
Definition: Logger.h:291
virtual bool hasMaxFetchSize() const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
void throwChunkSizeViolatedError(const int64_t actual_chunk_size, const int column_id=-1) const
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
bool g_enable_fsi
Definition: Catalog.cpp:96
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
std::string columnName
ChunkKey get_fragment_key(const ChunkKey &key)
Definition: types.h:90
std::vector< ChunkKey > get_column_key_vec(const ChunkKey &destination_chunk_key)
virtual size_t maxFetchSize(int32_t db_id) const
std::string printSlabs() override
bool is_distributed()
Definition: distributed.cpp:21
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:71
bool isBufferOnDevice(const ChunkKey &chunk_key) override