OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InternalSystemDataWrapper.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Catalog/Catalog.h"
20 #include "Catalog/SysCatalog.h"
22 #include "ForeignTableSchema.h"
23 #include "FsiChunkUtils.h"
24 #include "ImportExport/Importer.h"
25 #include "Shared/SysDefinitions.h"
26 #include "Shared/distributed.h"
27 #include "TextFileBufferParser.h"
28 #include "UserMapping.h"
29 
30 namespace foreign_storage {
31 std::string get_db_name(int32_t db_id) {
33  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
34  if (sys_catalog.getMetadataForDBById(db_id, db_metadata)) {
35  return db_metadata.dbName;
36  } else {
37  // Database has been deleted.
39  }
40 }
41 
42 std::string get_table_name(int32_t db_id, int32_t table_id) {
44  CHECK(catalog);
45  auto table_name = catalog->getTableName(table_id);
46  if (table_name.has_value()) {
47  return table_name.value();
48  } else {
49  // It is possible for the table to be concurrently deleted while querying the system
50  // table.
52  }
53 }
54 
56  std::map<std::string, import_export::TypedImportBuffer*>& import_buffers) {
57  if (import_buffers.find("node") != import_buffers.end()) {
58  if (dist::is_leaf_node()) {
59  std::string leaf_string{"Leaf " + to_string(g_distributed_leaf_idx)};
60  import_buffers["node"]->addString(leaf_string);
61  } else {
62  import_buffers["node"]->addString("Server");
63  }
64  }
65 }
66 
68  : db_id_(-1), foreign_table_(nullptr) {}
69 
71  const ForeignTable* foreign_table)
72  : db_id_(db_id), foreign_table_(foreign_table) {}
73 
75  const ForeignServer* foreign_server) const {
76  CHECK(foreign_server->options.empty());
77 }
78 
80  const ForeignTable* foreign_table) const {}
81 
82 const std::set<std::string_view>& InternalSystemDataWrapper::getSupportedTableOptions()
83  const {
84  static const std::set<std::string_view> supported_table_options{};
85  return supported_table_options;
86 }
87 
89  const UserMapping* user_mapping,
90  const ForeignServer* foreign_server) const {
91  CHECK(user_mapping->options.empty());
92 }
93 
94 const std::set<std::string_view>&
96  static const std::set<std::string_view> supported_user_mapping_options{};
97  return supported_user_mapping_options;
98 }
99 
100 namespace {
101 void initialize_chunks(std::map<ChunkKey, Chunk_NS::Chunk>& chunks,
102  const ChunkToBufferMap& buffers,
103  size_t row_count,
104  std::set<const ColumnDescriptor*>& columns_to_parse,
105  int32_t fragment_id,
106  const Catalog_Namespace::Catalog& catalog) {
107  for (auto& [chunk_key, buffer] : buffers) {
108  CHECK_EQ(fragment_id, chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
109  const auto column = catalog.getMetadataForColumn(chunk_key[CHUNK_KEY_TABLE_IDX],
110  chunk_key[CHUNK_KEY_COLUMN_IDX]);
111  if (is_varlen_index_key(chunk_key)) {
112  continue;
113  }
114  chunks[chunk_key] = Chunk_NS::Chunk{column};
115  if (column->columnType.is_varlen_indeed()) {
116  CHECK(is_varlen_data_key(chunk_key));
117  size_t index_offset_size{0};
118  if (column->columnType.is_string()) {
119  index_offset_size = sizeof(StringOffsetT);
120  } else if (column->columnType.is_array()) {
121  index_offset_size = sizeof(ArrayOffsetT);
122  } else {
123  UNREACHABLE() << "Unexpected column type: " << column->columnType.to_string();
124  }
125  ChunkKey index_chunk_key = chunk_key;
126  index_chunk_key[CHUNK_KEY_VARLEN_IDX] = 2;
127  CHECK(buffers.find(index_chunk_key) != buffers.end());
128  AbstractBuffer* index_buffer = buffers.find(index_chunk_key)->second;
129  index_buffer->reserve(index_offset_size * row_count + 1);
130  chunks[chunk_key].setIndexBuffer(index_buffer);
131  }
132 
133  if (!column->columnType.is_varlen_indeed()) {
134  buffer->reserve(column->columnType.get_size() * row_count);
135  }
136  chunks[chunk_key].setBuffer(buffer);
137  chunks[chunk_key].initEncoder();
138  columns_to_parse.emplace(column);
139  }
140 }
141 
143  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
144  std::map<std::string, import_export::TypedImportBuffer*>& import_buffers_map,
145  const std::set<const ColumnDescriptor*>& columns_to_parse,
146  const Catalog_Namespace::Catalog& catalog) {
147  for (const auto column : columns_to_parse) {
148  StringDictionary* string_dictionary = nullptr;
149  if (column->columnType.is_dict_encoded_string() ||
150  (column->columnType.is_array() && IS_STRING(column->columnType.get_subtype()) &&
151  column->columnType.get_compression() == kENCODING_DICT)) {
152  auto dict_descriptor =
153  catalog.getMetadataForDict(column->columnType.get_comp_param(), true);
154  string_dictionary = dict_descriptor->stringDict.get();
155  }
156  import_buffers.emplace_back(
157  std::make_unique<import_export::TypedImportBuffer>(column, string_dictionary));
158  import_buffers_map[column->columnName] = import_buffers.back().get();
159  }
160 }
161 } // namespace
162 
164  ChunkMetadataVector& chunk_metadata_vector) {
165  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
166  auto catalog = sys_catalog.getCatalog(db_id_);
167  CHECK(catalog);
168  CHECK_EQ(catalog->name(), shared::kInfoSchemaDbName);
169 
171  if (row_count_ > static_cast<size_t>(foreign_table_->maxFragRows)) {
173  "System table size exceeds the maximum supported size."};
174  }
176  for (auto column : schema.getLogicalColumns()) {
177  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column->columnId, 0};
178  if (column->columnType.is_varlen_indeed()) {
179  chunk_key.emplace_back(1);
180  }
181  ForeignStorageBuffer empty_buffer;
182  // Use default encoder metadata
183  empty_buffer.initEncoder(column->columnType);
184  auto chunk_metadata = empty_buffer.getEncoder()->getMetadata(column->columnType);
185  chunk_metadata->numElements = row_count_;
186  if (!column->columnType.is_varlen_indeed()) {
187  chunk_metadata->numBytes = column->columnType.get_size() * row_count_;
188  }
189  if (column->columnType.is_array()) {
190  ForeignStorageBuffer scalar_buffer;
191  scalar_buffer.initEncoder(column->columnType.get_elem_type());
192  auto scalar_metadata =
193  scalar_buffer.getEncoder()->getMetadata(column->columnType.get_elem_type());
194  chunk_metadata->chunkStats.min = scalar_metadata->chunkStats.min;
195  chunk_metadata->chunkStats.max = scalar_metadata->chunkStats.max;
196  }
197  chunk_metadata->chunkStats.has_nulls = true;
198  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
199  }
200 }
201 
203  const ChunkToBufferMap& required_buffers,
204  const ChunkToBufferMap& optional_buffers,
205  AbstractBuffer* delete_buffer) {
206  auto timer = DEBUG_TIMER(__func__);
207  CHECK(optional_buffers.empty());
208 
209  auto& sys_catalog = Catalog_Namespace::SysCatalog::instance();
210  auto catalog = sys_catalog.getCatalog(db_id_);
211  CHECK(catalog);
212  CHECK_EQ(catalog->name(), shared::kInfoSchemaDbName);
213 
214  auto fragment_id = required_buffers.begin()->first[CHUNK_KEY_FRAGMENT_IDX];
215  CHECK_EQ(fragment_id, 0)
216  << "In-memory system tables are expected to have a single fragment.";
217 
218  std::map<ChunkKey, Chunk_NS::Chunk> chunks;
219  std::set<const ColumnDescriptor*> columns_to_parse;
221  chunks, required_buffers, row_count_, columns_to_parse, fragment_id, *catalog);
222 
223  // initialize import buffers from columns.
224  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
225  std::map<std::string, import_export::TypedImportBuffer*> import_buffers_map;
227  import_buffers, import_buffers_map, columns_to_parse, *catalog);
229 
230  auto column_id_to_data_blocks_map =
232  for (auto& [chunk_key, chunk] : chunks) {
233  auto data_block_entry =
234  column_id_to_data_blocks_map.find(chunk_key[CHUNK_KEY_COLUMN_IDX]);
235  CHECK(data_block_entry != column_id_to_data_blocks_map.end());
236  chunk.appendData(data_block_entry->second, row_count_, 0);
237  chunk.setBuffer(nullptr);
238  chunk.setIndexBuffer(nullptr);
239  }
240 }
241 
243  return {};
244 }
245 
247  const std::string& file_path,
248  const ChunkMetadataVector& chunk_metadata) {}
249 
251  return false;
252 }
253 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::vector< int > ChunkKey
Definition: types.h:37
static std::map< int, DataBlockPtr > convertImportBuffersToDataBlocks(const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
std::string tableName
virtual void initializeObjectsForTable(const std::string &table_name)=0
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:76
constexpr const char * kDeletedValueIndicator
const std::set< std::string_view > & getSupportedTableOptions() const override
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
#define UNREACHABLE()
Definition: Logger.h:267
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
const std::string kInfoSchemaDbName
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string to_string(char const *&&v)
int32_t StringOffsetT
Definition: sqltypes.h:1113
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:227
void validateTableOptions(const ForeignTable *foreign_table) const override
This file contains the class specification and related data structures for Catalog.
void restoreDataWrapperInternals(const std::string &file_path, const ChunkMetadataVector &chunk_metadata) override
static SysCatalog & instance()
Definition: SysCatalog.h:337
This file contains the class specification and related data structures for SysCatalog.
void validateUserMappingOptions(const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
void initialize_chunks(std::map< ChunkKey, Chunk_NS::Chunk > &chunks, const ChunkToBufferMap &buffers, size_t row_count, std::set< const ColumnDescriptor * > &columns_to_parse, int32_t fragment_id, const Catalog_Namespace::Catalog &catalog)
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:80
void set_node_name(std::map< std::string, import_export::TypedImportBuffer * > &import_buffers)
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
bool is_leaf_node()
Definition: distributed.cpp:29
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void populateChunkBuffersForTable(const std::string &table_name, std::map< std::string, import_export::TypedImportBuffer * > &import_buffers)=0
An AbstractBuffer is a unit of data management for a data manager.
const DictDescriptor * getMetadataForDict(int dict_ref, bool loadDict=true) const
Definition: Catalog.cpp:1673
void populateChunkMetadata(ChunkMetadataVector &chunk_metadata_vector) override
void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
int32_t g_distributed_leaf_idx
Definition: Catalog.cpp:99
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
int32_t ArrayOffsetT
Definition: sqltypes.h:1114
void validateServerOptions(const ForeignServer *foreign_server) const override
#define CHUNK_KEY_VARLEN_IDX
Definition: types.h:43
#define IS_STRING(T)
Definition: sqltypes.h:250
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
std::string get_db_name(int32_t db_id)
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
const std::set< std::string_view > & getSupportedUserMappingOptions() const override
void initialize_import_buffers(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, std::map< std::string, import_export::TypedImportBuffer * > &import_buffers_map, const std::set< const ColumnDescriptor * > &columns_to_parse, const Catalog_Namespace::Catalog &catalog)
virtual void reserve(size_t num_bytes)=0
std::string get_table_name(int32_t db_id, int32_t table_id)