OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InternalSystemDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Catalog/Catalog.h"
20 #include "Catalog/SysCatalog.h"
22 #include "ForeignTableSchema.h"
23 #include "FsiChunkUtils.h"
24 #include "ImportExport/Importer.h"
25 #include "Shared/SysDefinitions.h"
26 #include "Shared/distributed.h"
27 #include "TextFileBufferParser.h"
28 #include "UserMapping.h"
29 
30 namespace foreign_storage {
31 std::string get_db_name(int32_t db_id) {
33  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
34  if (sys_catalog.getMetadataForDBById(db_id, db_metadata)) {
35  return db_metadata.dbName;
36  } else {
37  // Database has been deleted.
39  }
40 }
41 
42 std::string get_table_name(int32_t db_id, int32_t table_id) {
44  CHECK(catalog);
45  auto table_name = catalog->getTableName(table_id);
46  if (table_name.has_value()) {
47  return table_name.value();
48  } else {
49  // It is possible for the table to be concurrently deleted while querying the system
50  // table.
52  }
53 }
54 
56  std::map<std::string, import_export::TypedImportBuffer*>& import_buffers) {
57  if (import_buffers.find("node") != import_buffers.end()) {
58  if (dist::is_leaf_node()) {
59  std::string leaf_string{"Leaf " + to_string(g_distributed_leaf_idx)};
60  import_buffers["node"]->addString(leaf_string);
61  } else {
62  import_buffers["node"]->addString("Server");
63  }
64  }
65 }
66 
68  : db_id_(-1), foreign_table_(nullptr) {}
69 
71  const ForeignTable* foreign_table)
72  : db_id_(db_id), foreign_table_(foreign_table) {}
73 
75  const ForeignServer* foreign_server) const {
76  CHECK(foreign_server->options.empty());
77 }
78 
80  const ForeignTable* foreign_table) const {}
81 
82 const std::set<std::string_view>& InternalSystemDataWrapper::getSupportedTableOptions()
83  const {
84  static const std::set<std::string_view> supported_table_options{};
85  return supported_table_options;
86 }
87 
89  const UserMapping* user_mapping,
90  const ForeignServer* foreign_server) const {
91  CHECK(user_mapping->options.empty());
92 }
93 
94 const std::set<std::string_view>&
96  static const std::set<std::string_view> supported_user_mapping_options{};
97  return supported_user_mapping_options;
98 }
99 
100 namespace {
101 void initialize_chunks(std::map<ChunkKey, Chunk_NS::Chunk>& chunks,
102  const ChunkToBufferMap& buffers,
103  size_t row_count,
104  std::set<const ColumnDescriptor*>& columns_to_parse,
105  int32_t fragment_id,
106  const Catalog_Namespace::Catalog& catalog) {
107  for (auto& [chunk_key, buffer] : buffers) {
108  CHECK_EQ(fragment_id, chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
109  const auto column = catalog.getMetadataForColumn(chunk_key[CHUNK_KEY_TABLE_IDX],
110  chunk_key[CHUNK_KEY_COLUMN_IDX]);
111  if (is_varlen_index_key(chunk_key)) {
112  continue;
113  }
114  chunks[chunk_key] = Chunk_NS::Chunk{column};
115  if (column->columnType.is_varlen_indeed()) {
116  CHECK(is_varlen_data_key(chunk_key));
117  size_t index_offset_size{0};
118  if (column->columnType.is_string()) {
119  index_offset_size = sizeof(StringOffsetT);
120  } else if (column->columnType.is_array()) {
121  index_offset_size = sizeof(ArrayOffsetT);
122  } else {
123  UNREACHABLE() << "Unexpected column type: " << column->columnType.to_string();
124  }
125  ChunkKey index_chunk_key = chunk_key;
126  index_chunk_key[CHUNK_KEY_VARLEN_IDX] = 2;
127  CHECK(buffers.find(index_chunk_key) != buffers.end());
128  AbstractBuffer* index_buffer = buffers.find(index_chunk_key)->second;
129  index_buffer->reserve(index_offset_size * row_count + 1);
130  chunks[chunk_key].setIndexBuffer(index_buffer);
131  }
132 
133  if (!column->columnType.is_varlen_indeed()) {
134  buffer->reserve(column->columnType.get_size() * row_count);
135  }
136  chunks[chunk_key].setBuffer(buffer);
137  chunks[chunk_key].initEncoder();
138  columns_to_parse.emplace(column);
139  }
140 }
141 
143  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
144  std::map<std::string, import_export::TypedImportBuffer*>& import_buffers_map,
145  const std::set<const ColumnDescriptor*>& columns_to_parse,
146  const Catalog_Namespace::Catalog& catalog) {
147  for (const auto column : columns_to_parse) {
148  StringDictionary* string_dictionary = nullptr;
149  if (column->columnType.is_dict_encoded_string() ||
150  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
151  column->columnType.get_compression() == kENCODING_DICT)) {
152  auto dict_descriptor =
153  catalog.getMetadataForDict(column->columnType.get_comp_param(), true);
154  string_dictionary = dict_descriptor->stringDict.get();
155  }
156  import_buffers.emplace_back(
157  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
158  import_buffers_map[column->columnName] = import_buffers.back().get();
159  }
160 }
161 } // namespace
162 
164  ChunkMetadataVector& chunk_metadata_vector) {
165  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
166  auto catalog = sys_catalog.getCatalog(db_id_);
167  CHECK(catalog);
168  CHECK_EQ(catalog->name(), shared::kInfoSchemaDbName);
169 
171  if (row_count_ > static_cast<size_t>(foreign_table_->maxFragRows)) {
173  "System table size exceeds the maximum supported size."};
174  }
176  for (auto column : schema.getLogicalColumns()) {
177  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column->columnId, 0};
178  if (column->columnType.is_varlen_indeed()) {
179  chunk_key.emplace_back(1);
180  }
181  chunk_metadata_vector.emplace_back(
182  chunk_key, get_placeholder_metadata(column->columnType, row_count_));
183  }
184 }
185 
187  const ChunkToBufferMap& required_buffers,
188  const ChunkToBufferMap& optional_buffers,
189  AbstractBuffer* delete_buffer) {
190  auto timer = DEBUG_TIMER(__func__);
191  CHECK(optional_buffers.empty());
192 
193  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
194  auto catalog = sys_catalog.getCatalog(db_id_);
195  CHECK(catalog);
196  CHECK_EQ(catalog->name(), shared::kInfoSchemaDbName);
197 
198  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
199  CHECK_EQ(fragment_id, 0)
200  << "In-memory system tables are expected to have a single fragment.";
201 
202  std::map<ChunkKey, Chunk_NS::Chunk> chunks;
203  std::set<const ColumnDescriptor*> columns_to_parse;
205  chunks, required_buffers, row_count_, columns_to_parse, fragment_id, *catalog);
206 
207  // initialize import buffers from columns.
208  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
209  std::map<std::string, import_export::TypedImportBuffer*> import_buffers_map;
211  import_buffers, import_buffers_map, columns_to_parse, *catalog);
213 
214  auto column_id_to_data_blocks_map =
216  for (auto& [chunk_key, chunk] : chunks) {
217  auto data_block_entry =
218  column_id_to_data_blocks_map.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
219  CHECK(data_block_entry != column_id_to_data_blocks_map.end());
220  chunk.appendData(data_block_entry->second, row_count_, 0);
221  chunk.setBuffer(nullptr);
222  chunk.setIndexBuffer(nullptr);
223  }
224 }
225 
227  return {};
228 }
229 
231  const std::string& file_path,
232  const ChunkMetadataVector& chunk_metadata) {}
233 
235  return false;
236 }
237 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:297
std::vector< int > ChunkKey
Definition: types.h:36
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::string tableName
virtual void initializeObjectsForTable(const std::string &table_name)=0
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:75
constexpr const char * kDeletedValueIndicator
const std::set< std::string_view > & getSupportedTableOptions() const override
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
#define UNREACHABLE()
Definition: Logger.h:333
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
const std::string kInfoSchemaDbName
std::string to_string(char const *&&v)
int32_t StringOffsetT
Definition: sqltypes.h:1232
void validateTableOptions(const ForeignTable *foreign_table) const override
This file contains the class specification and related data structures for Catalog.
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const SQLTypeInfo &type, size_t num_elements)
static SysCatalog & instance()
Definition: SysCatalog.h:341
This file contains the class specification and related data structures for SysCatalog.
void validateUserMappingOptions(const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
void initialize_chunks(std::map< ChunkKey, Chunk_NS::Chunk > &chunks, const ChunkToBufferMap &buffers, size_t row_count, std::set< const ColumnDescriptor * > &columns_to_parse, int32_t fragment_id, const Catalog_Namespace::Catalog &catalog)
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:79
void set_node_name(std::map< std::string, import_export::TypedImportBuffer * > &import_buffers)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
bool is_leaf_node()
Definition: distributed.cpp:29
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void populateChunkBuffersForTable(const std::string &table_name, std::map< std::string, import_export::TypedImportBuffer * > &import_buffers)=0
An AbstractBuffer is a unit of data management for a data manager.
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1986
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
int32_t g_distributed_leaf_idx
Definition: Catalog.cpp:98
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
int32_t ArrayOffsetT
Definition: sqltypes.h:1233
void validateServerOptions(const ForeignServer *foreign_server) const override
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:42
#define IS_STRING(T)
Definition: sqltypes.h:297
#define CHECK(condition)
Definition: Logger.h:289
#define DEBUG_TIMER(name)
Definition: Logger.h:407
std::string get_db_name(int32_t db_id)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
const std::set< std::string_view > & getSupportedUserMappingOptions() const override
void initialize_import_buffers(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, std::map< std::string, import_export::TypedImportBuffer * > &import_buffers_map, const std::set< const ColumnDescriptor * > &columns_to_parse, const Catalog_Namespace::Catalog &catalog)
virtual void reserve(size_t num_bytes)=0
std::string get_table_name(int32_t db_id, int32_t table_id)