27 namespace foreign_storage {
31 const size_t num_bytes) {
42 auto storage_type_entry = foreign_table->foreign_server->options.find(
44 CHECK(storage_type_entry != foreign_table->foreign_server->options.end());
45 bool is_s3_storage_type =
47 if (is_s3_storage_type) {
48 throw ForeignStorageException{
49 "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
50 "currently disabled."};
56 const size_t num_bytes) {
58 CHECK(destination_buffer);
78 chunk_keys.erase(chunk_key);
81 std::map<ChunkKey, AbstractBuffer*> optional_buffers;
85 auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
86 if (foreign_table->foreign_server->data_wrapper_type ==
90 std::set<ChunkKey> optional_chunk_keys;
96 for (
auto it = optional_chunk_keys.begin(); it != optional_chunk_keys.end();) {
98 it = optional_chunk_keys.erase(it);
104 if (optional_chunk_keys.size()) {
110 required_buffers[chunk_key] = destination_buffer;
112 getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
134 buffer->copyTo(destination_buffer, num_bytes);
146 throw ForeignStorageException{
147 "Query cannot be executed for foreign table because FSI is currently disabled."};
152 getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
156 const ChunkKey table_key{db_id, table_id};
165 return FOREIGN_STORAGE_MGR;
169 return ToString(FOREIGN_STORAGE_MGR);
189 std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
204 auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
206 foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
213 const bool evict_cached_entries) {
223 std::numeric_limits<int>::max()};
261 const size_t page_size,
262 const size_t initial_size) {
269 const size_t num_bytes) {
330 std::set<ChunkKey> chunk_keys;
337 auto foreign_table = catalog->getForeignTableUnlocked(table_id);
339 ForeignTableSchema schema{db_id, foreign_table};
340 auto logical_column = schema.getLogicalColumn(destination_column_id);
341 auto logical_column_id = logical_column->columnId;
343 for (
auto column_id = logical_column_id;
344 column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
346 auto column = schema.getColumnDescriptor(column_id);
347 if (column->columnType.is_varlen_indeed()) {
348 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
349 chunk_keys.emplace(data_chunk_key);
351 ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
352 chunk_keys.emplace(index_chunk_key);
354 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
355 chunk_keys.emplace(data_chunk_key);
362 std::vector<ChunkKey> chunk_keys;
369 auto foreign_table = catalog->getForeignTableUnlocked(table_id);
371 ForeignTableSchema schema{db_id, foreign_table};
372 auto logical_column = schema.getLogicalColumn(destination_column_id);
373 auto logical_column_id = logical_column->columnId;
375 for (
auto column_id = logical_column_id;
376 column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
378 auto column = schema.getColumnDescriptor(column_id);
379 if (column->columnType.is_varlen_indeed()) {
380 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
381 chunk_keys.emplace_back(data_chunk_key);
383 ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
384 chunk_keys.emplace_back(index_chunk_key);
386 ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
387 chunk_keys.emplace_back(data_chunk_key);
394 const std::set<ChunkKey>& chunk_keys) {
395 std::map<ChunkKey, AbstractBuffer*> chunk_buffer_map;
397 for (
const auto& chunk_key : chunk_keys) {
400 chunk_buffer_map[chunk_key]->resetToEmpty();
402 return chunk_buffer_map;
406 std::map<
ChunkKey, std::vector<int> >& columns_per_table) {
412 std::set<ChunkKey>& optional_chunk_keys,
414 const std::set<ChunkKey>& required_chunk_keys) {
418 optional_chunk_key.push_back(column_id);
421 for (
const auto& key : keys) {
422 if (required_chunk_keys.find(key) == required_chunk_keys.end()) {
423 optional_chunk_keys.insert(key);
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys)
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
std::vector< int > ChunkKey
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_table_key(const ChunkKey &key)
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
#define CHUNK_KEY_FRAGMENT_IDX
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
size_t getNumChunks() override
void free(AbstractBuffer *buffer) override
void clearSlabs() override
void removeTableRelatedDS(const int db_id, const int table_id) override
ChunkKey get_table_key(const ChunkKey &key)
bool isAllocationCapped() override
static SysCatalog & instance()
static const std::string STORAGE_TYPE_KEY
bool is_varlen_index_key(const ChunkKey &key)
void createAndPopulateDataWrapperIfNotExists(const ChunkKey &chunk_key)
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
#define CHUNK_KEY_TABLE_IDX
std::map< ChunkKey, std::vector< int > > columns_hints_per_table_
bool has_table_prefix(const ChunkKey &key)
An AbstractBuffer is a unit of data management for a data manager.
size_t getAllocated() override
std::string getStringMgrType() override
size_t getMaxSize() override
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
bool hasDataWrapperForChunk(const ChunkKey &chunk_key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
std::shared_mutex columns_hints_mutex_
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
void setColumnHints(std::map< ChunkKey, std::vector< int >> &columns_per_table)
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
static constexpr char const * CSV
MgrType getMgrType() override
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
size_t getInUseSize() override
#define CHUNK_KEY_COLUMN_IDX
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
void checkpoint() override
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
static const std::string S3_STORAGE_TYPE
std::shared_mutex data_wrapper_mutex_
std::string printSlabs() override
std::map< ChunkKey, AbstractBuffer * > allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
bool isBufferOnDevice(const ChunkKey &chunk_key) override