OmniSciDB  a575cb28ea
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignStorageMgr.h"
18 
22 #include "ForeignTableSchema.h"
23 
24 extern bool g_enable_fsi;
25 extern bool g_enable_s3_fsi;
26 
27 namespace foreign_storage {
28 ForeignStorageMgr::ForeignStorageMgr() : AbstractBufferMgr(0), data_wrapper_map_({}) {}
29 
31  const size_t num_bytes) {
32  UNREACHABLE();
33  return nullptr; // Added to avoid "no return statement" compiler warning
34 }
35 
37  CHECK(has_table_prefix(chunk_key));
38  auto catalog =
40  CHECK(catalog);
41  auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
42  auto storage_type_entry = foreign_table->foreign_server->options.find(
44  CHECK(storage_type_entry != foreign_table->foreign_server->options.end());
45  bool is_s3_storage_type =
46  (storage_type_entry->second == AbstractFileStorageDataWrapper::S3_STORAGE_TYPE);
47  if (is_s3_storage_type) {
48  throw ForeignStorageException{
49  "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
50  "currently disabled."};
51  }
52 }
53 
54 void ForeignStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
55  AbstractBuffer* destination_buffer,
56  const size_t num_bytes) {
57  checkIfS3NeedsToBeEnabled(chunk_key);
58  CHECK(destination_buffer);
59  CHECK(!destination_buffer->isDirty());
60  // Use a temp buffer if we have no cache buffers and have one mapped for this chunk.
61  if (fetchBufferIfTempBufferMapEntryExists(chunk_key, destination_buffer, num_bytes)) {
62  return;
63  }
64 
65  { // Clear any temp buffers if we've moved on to a new fragment
66  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
67  if (temp_chunk_buffer_map_.size() > 0 &&
69  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
71  }
72  }
73 
75 
76  // TODO: Populate optional buffers as part of CSV performance improvement
77  std::set<ChunkKey> chunk_keys = get_keys_set_from_table(chunk_key);
78  chunk_keys.erase(chunk_key);
79 
80  // Use hints to prefetch other chunks in fragment
81  std::map<ChunkKey, AbstractBuffer*> optional_buffers;
82  auto catalog =
83  Catalog_Namespace::SysCatalog::instance().getCatalog(chunk_key[CHUNK_KEY_DB_IDX]);
84  CHECK(catalog);
85  auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
86  if (foreign_table->foreign_server->data_wrapper_type ==
87  foreign_storage::DataWrapperType::CSV) // optimization only useful for column
88  // oriented formats
89  {
90  std::set<ChunkKey> optional_chunk_keys;
92  optional_chunk_keys, chunk_key, get_keys_set_from_table(chunk_key));
93  {
94  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
95  // Erase anything already in temp_chunk_buffer_map_
96  for (auto it = optional_chunk_keys.begin(); it != optional_chunk_keys.end();) {
97  if (temp_chunk_buffer_map_.find(*it) != temp_chunk_buffer_map_.end()) {
98  it = optional_chunk_keys.erase(it);
99  } else {
100  ++it;
101  }
102  }
103  }
104  if (optional_chunk_keys.size()) {
105  optional_buffers = allocateTempBuffersForChunks(optional_chunk_keys);
106  }
107  }
108 
109  auto required_buffers = allocateTempBuffersForChunks(chunk_keys);
110  required_buffers[chunk_key] = destination_buffer;
111  // populate will write directly to destination_buffer so no need to copy.
112  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
113 }
114 
116  const ChunkKey& chunk_key,
117  AbstractBuffer* destination_buffer,
118  size_t num_bytes) {
119  AbstractBuffer* buffer{nullptr};
120  {
121  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
122  if (temp_chunk_buffer_map_.find(chunk_key) == temp_chunk_buffer_map_.end()) {
123  return false;
124  }
125  buffer = temp_chunk_buffer_map_[chunk_key].get();
126  }
127  // For the index key, calls with size 0 get 1 added as
128  // empty index buffers start with one entry
129  // Set to 0 here to copy entire buffer
130  if (is_varlen_index_key(chunk_key) && (num_bytes == sizeof(StringOffsetT))) {
131  num_bytes = 0;
132  }
133  CHECK(buffer);
134  buffer->copyTo(destination_buffer, num_bytes);
135  {
136  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
137  temp_chunk_buffer_map_.erase(chunk_key);
138  }
139  return true;
140 }
141 
143  ChunkMetadataVector& chunk_metadata,
144  const ChunkKey& key_prefix) {
145  if (!g_enable_fsi) {
146  throw ForeignStorageException{
147  "Query cannot be executed for foreign table because FSI is currently disabled."};
148  }
149  CHECK(is_table_key(key_prefix));
150  checkIfS3NeedsToBeEnabled(key_prefix);
151  createDataWrapperIfNotExists(key_prefix);
152  getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
153 }
154 
155 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
156  const ChunkKey table_key{db_id, table_id};
157  {
158  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
159  data_wrapper_map_.erase(table_key);
160  }
162 }
163 
165  return FOREIGN_STORAGE_MGR;
166 }
167 
169  return ToString(FOREIGN_STORAGE_MGR);
170 }
171 
173  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
174  CHECK(has_table_prefix(chunk_key));
175  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
176  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
177 }
178 
179 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
180  const ChunkKey& chunk_key) {
181  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
182  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
183  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
184  return data_wrapper_map_[table_key];
185 }
186 
188  const ChunkKey& table_key,
189  std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
190  CHECK(is_table_key(table_key));
191  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
192  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
193  data_wrapper->setParentWrapper(data_wrapper_map_[table_key]);
194  data_wrapper_map_[table_key] = data_wrapper;
195 }
196 
198  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
199  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
200  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
201  auto db_id = chunk_key[CHUNK_KEY_DB_IDX];
202  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
203  CHECK(catalog);
204  auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
206  foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
207  return true;
208  }
209  return false;
210 }
211 
212 void ForeignStorageMgr::refreshTable(const ChunkKey& table_key,
213  const bool evict_cached_entries) {
214  // Noop - If the cache is not enabled then a refresh does nothing.
215 }
216 
218  const ChunkKey& table_key) {
219  CHECK(is_table_key(table_key));
220  auto start_it = temp_chunk_buffer_map_.lower_bound(table_key);
221  ChunkKey upper_bound_prefix{table_key[CHUNK_KEY_DB_IDX],
222  table_key[CHUNK_KEY_TABLE_IDX],
223  std::numeric_limits<int>::max()};
224  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
225  temp_chunk_buffer_map_.erase(start_it, end_it);
226 }
227 
229  const ChunkKey& table_key) {
230  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
232 }
233 
234 bool ForeignStorageMgr::isDatawrapperRestored(const ChunkKey& chunk_key) {
235  if (!hasDataWrapperForChunk(chunk_key)) {
236  return false;
237  }
238  return getDataWrapper(chunk_key)->isRestored();
239 }
240 
241 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
242  UNREACHABLE();
243 }
244 
245 void ForeignStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
246  const bool purge) {
247  UNREACHABLE();
248 }
249 
250 bool ForeignStorageMgr::isBufferOnDevice(const ChunkKey& chunk_key) {
251  UNREACHABLE();
252  return false; // Added to avoid "no return statement" compiler warning
253 }
254 
256  UNREACHABLE();
257  return 0; // Added to avoid "no return statement" compiler warning
258 }
259 
261  const size_t page_size,
262  const size_t initial_size) {
263  UNREACHABLE();
264  return nullptr; // Added to avoid "no return statement" compiler warning
265 }
266 
268  AbstractBuffer* source_buffer,
269  const size_t num_bytes) {
270  UNREACHABLE();
271  return nullptr; // Added to avoid "no return statement" compiler warning
272 }
273 
274 std::string ForeignStorageMgr::printSlabs() {
275  UNREACHABLE();
276  return {}; // Added to avoid "no return statement" compiler warning
277 }
278 
280  UNREACHABLE();
281 }
282 
284  UNREACHABLE();
285  return 0; // Added to avoid "no return statement" compiler warning
286 }
287 
289  UNREACHABLE();
290  return 0; // Added to avoid "no return statement" compiler warning
291 }
292 
294  UNREACHABLE();
295  return 0; // Added to avoid "no return statement" compiler warning
296 }
297 
299  UNREACHABLE();
300  return false; // Added to avoid "no return statement" compiler warning
301 }
302 
304  UNREACHABLE();
305 }
306 
307 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
308  UNREACHABLE();
309 }
310 
311 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
312  UNREACHABLE();
313  return nullptr; // Added to avoid "no return statement" compiler warning
314 }
315 
317  UNREACHABLE();
318 }
319 
321  const ChunkKey& chunk_key) {
322  ChunkKey table_key = get_table_key(chunk_key);
323  if (createDataWrapperIfNotExists(table_key)) {
324  ChunkMetadataVector chunk_metadata;
325  getDataWrapper(table_key)->populateChunkMetadata(chunk_metadata);
326  }
327 }
328 
329 std::set<ChunkKey> get_keys_set_from_table(const ChunkKey& destination_chunk_key) {
330  std::set<ChunkKey> chunk_keys;
331  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
332  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
333  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
334  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
335  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
336  CHECK(catalog);
337  auto foreign_table = catalog->getForeignTableUnlocked(table_id);
338 
339  ForeignTableSchema schema{db_id, foreign_table};
340  auto logical_column = schema.getLogicalColumn(destination_column_id);
341  auto logical_column_id = logical_column->columnId;
342 
343  for (auto column_id = logical_column_id;
344  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
345  column_id++) {
346  auto column = schema.getColumnDescriptor(column_id);
347  if (column->columnType.is_varlen_indeed()) {
348  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
349  chunk_keys.emplace(data_chunk_key);
350 
351  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
352  chunk_keys.emplace(index_chunk_key);
353  } else {
354  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
355  chunk_keys.emplace(data_chunk_key);
356  }
357  }
358  return chunk_keys;
359 }
360 
361 std::vector<ChunkKey> get_keys_vec_from_table(const ChunkKey& destination_chunk_key) {
362  std::vector<ChunkKey> chunk_keys;
363  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
364  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
365  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
366  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
367  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
368  CHECK(catalog);
369  auto foreign_table = catalog->getForeignTableUnlocked(table_id);
370 
371  ForeignTableSchema schema{db_id, foreign_table};
372  auto logical_column = schema.getLogicalColumn(destination_column_id);
373  auto logical_column_id = logical_column->columnId;
374 
375  for (auto column_id = logical_column_id;
376  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
377  column_id++) {
378  auto column = schema.getColumnDescriptor(column_id);
379  if (column->columnType.is_varlen_indeed()) {
380  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
381  chunk_keys.emplace_back(data_chunk_key);
382 
383  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
384  chunk_keys.emplace_back(index_chunk_key);
385  } else {
386  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
387  chunk_keys.emplace_back(data_chunk_key);
388  }
389  }
390  return chunk_keys;
391 }
392 
393 std::map<ChunkKey, AbstractBuffer*> ForeignStorageMgr::allocateTempBuffersForChunks(
394  const std::set<ChunkKey>& chunk_keys) {
395  std::map<ChunkKey, AbstractBuffer*> chunk_buffer_map;
396  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
397  for (const auto& chunk_key : chunk_keys) {
398  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
399  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
400  chunk_buffer_map[chunk_key]->resetToEmpty();
401  }
402  return chunk_buffer_map;
403 }
404 
406  std::map<ChunkKey, std::vector<int> >& columns_per_table) {
407  std::shared_lock data_wrapper_lock(columns_hints_mutex_);
408  columns_hints_per_table_ = columns_per_table;
409 }
410 
412  std::set<ChunkKey>& optional_chunk_keys,
413  const ChunkKey& chunk_key,
414  const std::set<ChunkKey>& required_chunk_keys) {
415  std::shared_lock data_wrapper_lock(columns_hints_mutex_);
416  for (const int column_id : columns_hints_per_table_[get_table_key(chunk_key)]) {
417  ChunkKey optional_chunk_key = get_table_key(chunk_key);
418  optional_chunk_key.push_back(column_id);
419  optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
420  std::set<ChunkKey> keys = get_keys_set_from_table(optional_chunk_key);
421  for (const auto& key : keys) {
422  if (required_chunk_keys.find(key) == required_chunk_keys.end()) {
423  optional_chunk_keys.insert(key);
424  }
425  }
426  }
427 }
428 
429 } // namespace foreign_storage
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys)
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
std::vector< int > ChunkKey
Definition: types.h:37
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
#define UNREACHABLE()
Definition: Logger.h:241
void free(AbstractBuffer *buffer) override
void removeTableRelatedDS(const int db_id, const int table_id) override
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
int32_t StringOffsetT
Definition: sqltypes.h:919
static SysCatalog & instance()
Definition: SysCatalog.h:286
bool g_enable_s3_fsi
Definition: Catalog.cpp:93
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:69
void createAndPopulateDataWrapperIfNotExists(const ChunkKey &chunk_key)
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
std::map< ChunkKey, std::vector< int > > columns_hints_per_table_
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:48
An AbstractBuffer is a unit of data management for a data manager.
std::string getStringMgrType() override
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
bool hasDataWrapperForChunk(const ChunkKey &chunk_key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
void setColumnHints(std::map< ChunkKey, std::vector< int >> &columns_per_table)
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
#define CHECK(condition)
Definition: Logger.h:197
static constexpr char const * CSV
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
bool g_enable_fsi
Definition: Catalog.cpp:92
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
std::string printSlabs() override
std::map< ChunkKey, AbstractBuffer * > allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
bool isBufferOnDevice(const ChunkKey &chunk_key) override