35 for (
auto it = meta_vec.begin(); it != meta_vec.end();) {
42 namespace foreign_storage {
46 const size_t num_bytes) {
57 auto storage_type_entry = foreign_table->foreign_server->options.find(
60 if (storage_type_entry == foreign_table->foreign_server->options.end()) {
64 bool is_s3_storage_type =
66 if (is_s3_storage_type) {
67 throw ForeignStorageException{
68 "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
69 "currently disabled."};
85 int64_t actual_chunk_size = buffer->
size();
92 for (
const auto& [chunk_key, buffer] : buffers) {
93 int64_t actual_chunk_size = buffer->
size();
101 const int column_id)
const {
107 std::stringstream error_stream;
108 error_stream <<
"Chunk populated by data wrapper which is " << actual_chunk_size
109 <<
" bytes exceeds maximum byte size limit of " <<
max_chunk_size_ <<
"."
111 <<
", column name : " << column_name;
112 throw ForeignStorageException(error_stream.str());
117 const size_t num_bytes) {
118 ChunkSizeValidator chunk_size_validator(chunk_key);
121 CHECK(destination_buffer);
125 chunk_size_validator.validateChunkSize(destination_buffer);
145 chunk_key, column_keys,
getDataWrapper(chunk_key)->getNonCachedParallelismLevel());
146 if (optional_keys.size()) {
150 for (
auto it = optional_keys.begin(); it != optional_keys.end();) {
152 it = optional_keys.erase(it);
158 if (optional_keys.size()) {
163 column_keys.erase(chunk_key);
165 required_buffers[chunk_key] = destination_buffer;
167 getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
168 chunk_size_validator.validateChunkSizes(required_buffers);
169 chunk_size_validator.validateChunkSizes(optional_buffers);
175 for (
const auto& [key, buffer] : buffers) {
178 auto column = catalog->getMetadataForColumn(key[CHUNK_KEY_TABLE_IDX],
180 if (column->columnType.is_varlen_indeed() &&
184 auto foreign_table = catalog->getForeignTable(key[CHUNK_KEY_TABLE_IDX]);
185 auto fragmenter = foreign_table->fragmenter;
191 auto chunk_metadata = std::make_shared<ChunkMetadata>();
192 encoder->getMetadata(chunk_metadata);
193 fragmenter->updateColumnChunkMetadata(
217 buffer->
copyTo(destination_buffer, num_bytes);
229 throw ForeignStorageException{
230 "Query cannot be executed for foreign table because FSI is currently disabled."};
243 getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
252 const ChunkKey table_key{db_id, table_id};
257 mock_it->second->unsetParentWrapper();
265 return FOREIGN_STORAGE_MGR;
269 return ToString(FOREIGN_STORAGE_MGR);
289 std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
294 data_wrapper->setParentWrapper(wrapper_iter->second);
306 auto foreign_table = catalog->getForeignTable(tb_id);
309 foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
332 const bool evict_cached_entries) {
337 if (evict_cached_entries ||
338 !catalog->getForeignTable(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode()) {
357 std::numeric_limits<int>::max()};
395 const size_t page_size,
396 const size_t initial_size) {
403 const size_t num_bytes) {
454 return catalog->getForeignTable(table_id)->maxChunkSize;
458 std::set<ChunkKey> chunk_keys;
465 auto foreign_table = catalog->getForeignTable(table_id);
467 ForeignTableSchema schema{db_id, foreign_table};
468 auto logical_column = schema.getLogicalColumn(destination_column_id);
469 auto logical_column_id = logical_column->columnId;
471 for (
auto column_id = logical_column_id;
472 column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
474 auto column = schema.getColumnDescriptor(column_id);
475 if (column->columnType.is_varlen_indeed()) {
476 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
477 chunk_keys.emplace(data_chunk_key);
479 ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
480 chunk_keys.emplace(index_chunk_key);
482 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
483 chunk_keys.emplace(data_chunk_key);
490 std::vector<ChunkKey> chunk_keys;
497 auto foreign_table = catalog->getForeignTable(table_id);
499 ForeignTableSchema schema{db_id, foreign_table};
500 auto logical_column = schema.getLogicalColumn(destination_column_id);
501 auto logical_column_id = logical_column->columnId;
503 for (
auto column_id = logical_column_id;
504 column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
506 auto column = schema.getColumnDescriptor(column_id);
507 if (column->columnType.is_varlen_indeed()) {
508 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
509 chunk_keys.emplace_back(data_chunk_key);
511 ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
512 chunk_keys.emplace_back(index_chunk_key);
514 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
515 chunk_keys.emplace_back(data_chunk_key);
528 if ((left[CHUNK_KEY_DB_IDX] < right[CHUNK_KEY_DB_IDX]) ||
534 if (left.size() < right.size()) {
546 for (
const auto& key : key_set) {
555 const std::set<ChunkKey>& chunk_keys) {
558 for (
const auto& chunk_key : chunk_keys) {
561 chunk_buffer_map[chunk_key]->resetToEmpty();
563 return chunk_buffer_map;
567 const std::map<
ChunkKey, std::set<ParallelismHint>>& hints_per_table) {
572 std::pair<std::set<ChunkKey, decltype(set_comp)*>,
573 std::set<ChunkKey, decltype(set_comp)*>>
576 const std::set<ChunkKey>& required_chunk_keys,
579 auto same_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(
set_comp);
580 auto diff_fragment_keys = std::set<ChunkKey, decltype(set_comp)*>(
set_comp);
586 for (
const auto& hint : table_hints->second) {
587 const auto& [column_id, fragment_id] = hint;
589 optional_chunk_key.push_back(column_id);
591 optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
593 optional_chunk_key.push_back(fragment_id);
602 if (optional_chunk_key[CHUNK_KEY_FRAGMENT_IDX] ==
603 chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
604 same_fragment_keys.emplace(optional_chunk_key);
606 diff_fragment_keys.emplace(optional_chunk_key);
610 return {same_fragment_keys, diff_fragment_keys};
617 std::set<ChunkKey> optional_keys;
618 for (
const auto& keys : {same_fragment_keys, diff_fragment_keys}) {
619 for (
auto key : keys) {
621 for (
auto column_key : column_keys) {
622 optional_keys.emplace(column_key);
626 return optional_keys;
631 const std::set<ChunkKey>& required_chunk_keys,
637 auto [same_fragment_keys, diff_fragment_keys] =
641 chunk_key, same_fragment_keys, diff_fragment_keys);
std::lock_guard< T > lock_guard
bool set_comp(const ChunkKey &left, const ChunkKey &right)
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
std::vector< int > ChunkKey
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
virtual std::set< ChunkKey > getOptionalKeysWithinSizeLimit(const ChunkKey &chunk_key, const std::set< ChunkKey, decltype(set_comp)* > &same_fragment_keys, const std::set< ChunkKey, decltype(set_comp)* > &diff_fragment_keys) const
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_system_table_chunk_key(const ChunkKey &chunk_key)
bool is_table_key(const ChunkKey &key)
virtual bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
void filter_metadata_by_leaf(ChunkMetadataVector &meta_vec, const ChunkKey &key_prefix)
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key) const
void validateChunkSize(const AbstractBuffer *buffer) const
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
ChunkSizeValidator(const ChunkKey &chunk_key)
#define CHUNK_KEY_FRAGMENT_IDX
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
size_t getNumChunks() override
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void free(AbstractBuffer *buffer) override
std::shared_ptr< Catalog_Namespace::Catalog > catalog_
std::set< ChunkKey > get_column_key_set(const ChunkKey &destination_chunk_key)
const ColumnDescriptor * column_
void removeTableRelatedDS(const int db_id, const int table_id) override
bool is_replicated_table_chunk_key(const ChunkKey &chunk_key)
ChunkKey get_table_key(const ChunkKey &key)
std::pair< std::set< ChunkKey, decltype(set_comp)* >, std::set< ChunkKey, decltype(set_comp)* > > getPrefetchSets(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
bool is_table_enabled_on_node(const ChunkKey &key)
std::shared_lock< T > shared_lock
This file contains the class specification and related data structures for Catalog.
std::shared_mutex parallelism_hints_mutex_
bool isAllocationCapped() override
void updateFragmenterMetadata(const ChunkToBufferMap &) const
bool key_does_not_shard_to_leaf(const ChunkKey &key)
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
bool contains_fragment_key(const std::set< ChunkKey > &key_set, const ChunkKey &target_key)
std::set< ChunkKey > getOptionalChunkKeySet(const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level) const
bool is_varlen_index_key(const ChunkKey &key)
bool hasDataWrapperForChunk(const ChunkKey &chunk_key) const
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
std::unique_lock< T > unique_lock
#define CHUNK_KEY_TABLE_IDX
bool has_table_prefix(const ChunkKey &key)
An AbstractBuffer is a unit of data management for a data manager.
size_t getAllocated() override
std::string getStringMgrType() override
const ForeignTable * foreign_table_
bool fragment_maps_to_leaf(const ChunkKey &key)
bool is_shardable_key(const ChunkKey &key)
size_t getMaxSize() override
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
size_t get_max_chunk_size(const ChunkKey &key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
ChunkToBufferMap allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
#define CHUNK_KEY_VARLEN_IDX
virtual void eraseDataWrapper(const ChunkKey &table_key)
std::map< ChunkKey, std::shared_ptr< MockForeignDataWrapper > > mocked_wrapper_map_
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
Encoder * getEncoder() const
void setParallelismHints(const std::map< ChunkKey, std::set< ParallelismHint >> &hints_per_table)
void validateChunkSizes(const ChunkToBufferMap &buffers) const
void createDataWrapperUnlocked(int32_t db, int32_t tb)
virtual bool hasMaxFetchSize() const
MgrType getMgrType() override
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
void throwChunkSizeViolatedError(const int64_t actual_chunk_size, const int column_id=-1) const
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
size_t getInUseSize() override
#define CHUNK_KEY_COLUMN_IDX
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
void checkpoint() override
ChunkKey get_fragment_key(const ChunkKey &key)
static const std::string S3_STORAGE_TYPE
std::vector< ChunkKey > get_column_key_vec(const ChunkKey &destination_chunk_key)
std::shared_mutex data_wrapper_mutex_
virtual size_t maxFetchSize(int32_t db_id) const
std::string printSlabs() override
bool is_varlen_key(const ChunkKey &key)
bool isBufferOnDevice(const ChunkKey &chunk_key) override