OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignStorageMgr.h"
18 
22 #include "ForeignTableSchema.h"
23 
24 extern bool g_enable_fsi;
25 extern bool g_enable_s3_fsi;
26 
27 namespace foreign_storage {
28 ForeignStorageMgr::ForeignStorageMgr() : AbstractBufferMgr(0), data_wrapper_map_({}) {}
29 
31  const size_t num_bytes) {
32  UNREACHABLE();
33  return nullptr; // Added to avoid "no return statement" compiler warning
34 }
35 
37  CHECK(has_table_prefix(chunk_key));
38  auto catalog =
40  CHECK(catalog);
41  auto foreign_table = catalog->getForeignTable(chunk_key[CHUNK_KEY_TABLE_IDX]);
42  auto storage_type_entry = foreign_table->foreign_server->options.find(
44 
45  if (storage_type_entry == foreign_table->foreign_server->options.end()) {
46  // Some FSI servers such as ODBC do not have a storage_type
47  return;
48  }
49  bool is_s3_storage_type =
50  (storage_type_entry->second == AbstractFileStorageDataWrapper::S3_STORAGE_TYPE);
51  if (is_s3_storage_type) {
52  throw ForeignStorageException{
53  "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
54  "currently disabled."};
55  }
56 }
57 
58 void ForeignStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
59  AbstractBuffer* destination_buffer,
60  const size_t num_bytes) {
61  checkIfS3NeedsToBeEnabled(chunk_key);
62  CHECK(destination_buffer);
63  CHECK(!destination_buffer->isDirty());
64  // Use a temp buffer if we have no cache buffers and have one mapped for this chunk.
65  if (fetchBufferIfTempBufferMapEntryExists(chunk_key, destination_buffer, num_bytes)) {
66  return;
67  }
68 
69  { // Clear any temp buffers if we've moved on to a new fragment
70  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
71  if (temp_chunk_buffer_map_.size() > 0 &&
73  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
75  }
76  }
77 
78  std::set<ChunkKey> chunk_keys = get_keys_set_from_table(chunk_key);
79  chunk_keys.erase(chunk_key);
80 
81  // Use hints to prefetch other chunks in fragment
82  ChunkToBufferMap optional_buffers;
83 
84  // Use hints to prefetch other chunks in fragment into cache
85  auto& data_wrapper = *getDataWrapper(chunk_key);
86  std::set<ChunkKey> optional_keys;
87  getOptionalChunkKeySet(optional_keys,
88  chunk_key,
89  get_keys_set_from_table(chunk_key),
90  data_wrapper.getNonCachedParallelismLevel());
91  if (optional_keys.size()) {
92  {
93  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
94  // Erase anything already in temp_chunk_buffer_map_
95  for (auto it = optional_keys.begin(); it != optional_keys.end();) {
96  if (temp_chunk_buffer_map_.find(*it) != temp_chunk_buffer_map_.end()) {
97  it = optional_keys.erase(it);
98  } else {
99  ++it;
100  }
101  }
102  }
103  if (optional_keys.size()) {
104  optional_buffers = allocateTempBuffersForChunks(optional_keys);
105  }
106  }
107 
108  auto required_buffers = allocateTempBuffersForChunks(chunk_keys);
109  required_buffers[chunk_key] = destination_buffer;
110  // populate will write directly to destination_buffer so no need to copy.
111  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
112 }
113 
115  const ChunkKey& chunk_key,
116  AbstractBuffer* destination_buffer,
117  size_t num_bytes) {
118  AbstractBuffer* buffer{nullptr};
119  {
120  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
121  if (temp_chunk_buffer_map_.find(chunk_key) == temp_chunk_buffer_map_.end()) {
122  return false;
123  }
124  buffer = temp_chunk_buffer_map_[chunk_key].get();
125  }
126  // For the index key, calls with size 0 get 1 added as
127  // empty index buffers start with one entry
128  // Set to 0 here to copy entire buffer
129  if (is_varlen_index_key(chunk_key) && (num_bytes == sizeof(StringOffsetT))) {
130  num_bytes = 0;
131  }
132  CHECK(buffer);
133  buffer->copyTo(destination_buffer, num_bytes);
134  {
135  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
136  temp_chunk_buffer_map_.erase(chunk_key);
137  }
138  return true;
139 }
140 
142  ChunkMetadataVector& chunk_metadata,
143  const ChunkKey& key_prefix) {
144  if (!g_enable_fsi) {
145  throw ForeignStorageException{
146  "Query cannot be executed for foreign table because FSI is currently disabled."};
147  }
148  CHECK(is_table_key(key_prefix));
149  checkIfS3NeedsToBeEnabled(key_prefix);
150  createDataWrapperIfNotExists(key_prefix);
151  try {
152  getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
153  } catch (...) {
154  clearDataWrapper(key_prefix);
155  throw;
156  }
157 }
158 
159 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
160  const ChunkKey table_key{db_id, table_id};
161  {
162  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
163  data_wrapper_map_.erase(table_key);
164  }
166 }
167 
169  return FOREIGN_STORAGE_MGR;
170 }
171 
173  return ToString(FOREIGN_STORAGE_MGR);
174 }
175 
177  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
178  CHECK(has_table_prefix(chunk_key));
179  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
180  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
181 }
182 
183 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
184  const ChunkKey& chunk_key) {
185  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
186  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
187  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
188  return data_wrapper_map_.at(table_key);
189 }
190 
192  const ChunkKey& table_key,
193  std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
194  CHECK(is_table_key(table_key));
195  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
196  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
197  data_wrapper->setParentWrapper(data_wrapper_map_.at(table_key));
198  data_wrapper_map_[table_key] = data_wrapper;
199 }
200 
201 void ForeignStorageMgr::createDataWrapperUnlocked(int32_t db_id, int32_t tb_id) {
202  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
203  CHECK(catalog);
204  auto foreign_table = catalog->getForeignTable(tb_id);
205  ChunkKey table_key{db_id, tb_id};
207  foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
208 }
209 
211  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
212  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
213  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
214  auto [db_id, tb_id] = get_table_prefix(chunk_key);
215  createDataWrapperUnlocked(db_id, tb_id);
216  return true;
217  }
218  return false;
219 }
220 
221 void ForeignStorageMgr::refreshTable(const ChunkKey& table_key,
222  const bool evict_cached_entries) {
223  auto catalog =
224  Catalog_Namespace::SysCatalog::instance().getCatalog(table_key[CHUNK_KEY_DB_IDX]);
225  CHECK(catalog);
226  // Clear datawrapper unless table is non-append and evict is false
227  if (evict_cached_entries ||
228  !catalog->getForeignTable(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode()) {
229  clearDataWrapper(table_key);
230  }
231 }
232 
233 void ForeignStorageMgr::clearDataWrapper(const ChunkKey& table_key) {
234  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
235  // May not be created yet
236  if (data_wrapper_map_.find(table_key) != data_wrapper_map_.end()) {
237  data_wrapper_map_.erase(table_key);
238  }
239 }
240 
242  const ChunkKey& table_key) {
243  CHECK(is_table_key(table_key));
244  auto start_it = temp_chunk_buffer_map_.lower_bound(table_key);
245  ChunkKey upper_bound_prefix{table_key[CHUNK_KEY_DB_IDX],
246  table_key[CHUNK_KEY_TABLE_IDX],
247  std::numeric_limits<int>::max()};
248  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
249  temp_chunk_buffer_map_.erase(start_it, end_it);
250 }
251 
253  const ChunkKey& table_key) {
254  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
256 }
257 
258 bool ForeignStorageMgr::isDatawrapperRestored(const ChunkKey& chunk_key) {
259  if (!hasDataWrapperForChunk(chunk_key)) {
260  return false;
261  }
262  return getDataWrapper(chunk_key)->isRestored();
263 }
264 
265 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
266  UNREACHABLE();
267 }
268 
269 void ForeignStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
270  const bool purge) {
271  UNREACHABLE();
272 }
273 
274 bool ForeignStorageMgr::isBufferOnDevice(const ChunkKey& chunk_key) {
275  UNREACHABLE();
276  return false; // Added to avoid "no return statement" compiler warning
277 }
278 
280  UNREACHABLE();
281  return 0; // Added to avoid "no return statement" compiler warning
282 }
283 
285  const size_t page_size,
286  const size_t initial_size) {
287  UNREACHABLE();
288  return nullptr; // Added to avoid "no return statement" compiler warning
289 }
290 
292  AbstractBuffer* source_buffer,
293  const size_t num_bytes) {
294  UNREACHABLE();
295  return nullptr; // Added to avoid "no return statement" compiler warning
296 }
297 
298 std::string ForeignStorageMgr::printSlabs() {
299  UNREACHABLE();
300  return {}; // Added to avoid "no return statement" compiler warning
301 }
302 
304  UNREACHABLE();
305  return 0; // Added to avoid "no return statement" compiler warning
306 }
307 
309  UNREACHABLE();
310  return 0; // Added to avoid "no return statement" compiler warning
311 }
312 
314  UNREACHABLE();
315  return 0; // Added to avoid "no return statement" compiler warning
316 }
317 
319  UNREACHABLE();
320  return false; // Added to avoid "no return statement" compiler warning
321 }
322 
324  UNREACHABLE();
325 }
326 
327 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
328  UNREACHABLE();
329 }
330 
331 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
332  UNREACHABLE();
333  return nullptr; // Added to avoid "no return statement" compiler warning
334 }
335 
337  UNREACHABLE();
338 }
339 
340 std::set<ChunkKey> get_keys_set_from_table(const ChunkKey& destination_chunk_key) {
341  std::set<ChunkKey> chunk_keys;
342  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
343  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
344  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
345  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
346  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
347  CHECK(catalog);
348  auto foreign_table = catalog->getForeignTable(table_id);
349 
350  ForeignTableSchema schema{db_id, foreign_table};
351  auto logical_column = schema.getLogicalColumn(destination_column_id);
352  auto logical_column_id = logical_column->columnId;
353 
354  for (auto column_id = logical_column_id;
355  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
356  column_id++) {
357  auto column = schema.getColumnDescriptor(column_id);
358  if (column->columnType.is_varlen_indeed()) {
359  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
360  chunk_keys.emplace(data_chunk_key);
361 
362  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
363  chunk_keys.emplace(index_chunk_key);
364  } else {
365  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
366  chunk_keys.emplace(data_chunk_key);
367  }
368  }
369  return chunk_keys;
370 }
371 
372 std::vector<ChunkKey> get_keys_vec_from_table(const ChunkKey& destination_chunk_key) {
373  std::vector<ChunkKey> chunk_keys;
374  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
375  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
376  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
377  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
378  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
379  CHECK(catalog);
380  auto foreign_table = catalog->getForeignTable(table_id);
381 
382  ForeignTableSchema schema{db_id, foreign_table};
383  auto logical_column = schema.getLogicalColumn(destination_column_id);
384  auto logical_column_id = logical_column->columnId;
385 
386  for (auto column_id = logical_column_id;
387  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
388  column_id++) {
389  auto column = schema.getColumnDescriptor(column_id);
390  if (column->columnType.is_varlen_indeed()) {
391  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
392  chunk_keys.emplace_back(data_chunk_key);
393 
394  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
395  chunk_keys.emplace_back(index_chunk_key);
396  } else {
397  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
398  chunk_keys.emplace_back(data_chunk_key);
399  }
400  }
401  return chunk_keys;
402 }
403 
405  const std::set<ChunkKey>& chunk_keys) {
406  ChunkToBufferMap chunk_buffer_map;
407  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
408  for (const auto& chunk_key : chunk_keys) {
409  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
410  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
411  chunk_buffer_map[chunk_key]->resetToEmpty();
412  }
413  return chunk_buffer_map;
414 }
415 
417  const std::map<ChunkKey, std::set<ParallelismHint>>& hints_per_table) {
418  std::unique_lock data_wrapper_lock(parallelism_hints_mutex_);
419  parallelism_hints_per_table_ = hints_per_table;
420 }
421 
423  std::set<ChunkKey>& optional_chunk_keys,
424  const ChunkKey& chunk_key,
425  const std::set<ChunkKey>& required_chunk_keys,
426  const ForeignDataWrapper::ParallelismLevel parallelism_level) {
427  if (parallelism_level == ForeignDataWrapper::NONE) {
428  return;
429  }
430  std::shared_lock data_wrapper_lock(parallelism_hints_mutex_);
431  for (const auto& hint : parallelism_hints_per_table_[get_table_key(chunk_key)]) {
432  const auto& [column_id, fragment_id] = hint;
433  ChunkKey optional_chunk_key_key = get_table_key(chunk_key);
434  optional_chunk_key_key.push_back(column_id);
435  auto optional_chunk_key = optional_chunk_key_key;
436  if (parallelism_level == ForeignDataWrapper::INTRA_FRAGMENT) {
437  optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
438  } else if (parallelism_level == ForeignDataWrapper::INTER_FRAGMENT) {
439  optional_chunk_key.push_back(fragment_id);
440  } else {
441  UNREACHABLE();
442  }
443  std::set<ChunkKey> keys = get_keys_set_from_table(optional_chunk_key);
444  for (const auto& key : keys) {
445  if (required_chunk_keys.find(key) == required_chunk_keys.end()) {
446  optional_chunk_keys.insert(key);
447  }
448  }
449  }
450 }
451 
452 } // namespace foreign_storage
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
std::vector< int > ChunkKey
Definition: types.h:37
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_table_key(const ChunkKey &key)
Definition: types.h:45
virtual bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
#define UNREACHABLE()
Definition: Logger.h:253
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void free(AbstractBuffer *buffer) override
void removeTableRelatedDS(const int db_id, const int table_id) override
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:53
int32_t StringOffsetT
Definition: sqltypes.h:1075
static SysCatalog & instance()
Definition: SysCatalog.h:325
bool g_enable_s3_fsi
Definition: Catalog.cpp:94
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level)
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:75
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:49
An AbstractBuffer is a unit of data management for a data manager.
std::string getStringMgrType() override
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
bool hasDataWrapperForChunk(const ChunkKey &chunk_key)
void clearDataWrapper(const ChunkKey &table_key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
ChunkToBufferMap allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:58
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
void setParallelismHints(const std::map< ChunkKey, std::set< ParallelismHint >> &hints_per_table)
void createDataWrapperUnlocked(int32_t db, int32_t tb)
#define CHECK(condition)
Definition: Logger.h:209
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
bool g_enable_fsi
Definition: Catalog.cpp:93
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
std::string printSlabs() override
bool isBufferOnDevice(const ChunkKey &chunk_key) override