OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignStorageMgr.h"
18 
22 #include "ForeignTableSchema.h"
23 
24 extern bool g_enable_fsi;
25 extern bool g_enable_s3_fsi;
26 
27 namespace foreign_storage {
28 ForeignStorageMgr::ForeignStorageMgr() : AbstractBufferMgr(0), data_wrapper_map_({}) {}
29 
31  const size_t num_bytes) {
32  UNREACHABLE();
33  return nullptr; // Added to avoid "no return statement" compiler warning
34 }
35 
37  CHECK(has_table_prefix(chunk_key));
38  auto catalog =
40  CHECK(catalog);
41  auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
42  auto storage_type_entry = foreign_table->foreign_server->options.find(
44 
45  if (storage_type_entry == foreign_table->foreign_server->options.end()) {
46  // Some FSI servers such as ODBC do not have a storage_type
47  return;
48  }
49  bool is_s3_storage_type =
50  (storage_type_entry->second == AbstractFileStorageDataWrapper::S3_STORAGE_TYPE);
51  if (is_s3_storage_type) {
52  throw ForeignStorageException{
53  "Query cannot be executed for S3 backed foreign table because AWS S3 support is "
54  "currently disabled."};
55  }
56 }
57 
58 void ForeignStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
59  AbstractBuffer* destination_buffer,
60  const size_t num_bytes) {
61  checkIfS3NeedsToBeEnabled(chunk_key);
62  CHECK(destination_buffer);
63  CHECK(!destination_buffer->isDirty());
64  // Use a temp buffer if we have no cache buffers and have one mapped for this chunk.
65  if (fetchBufferIfTempBufferMapEntryExists(chunk_key, destination_buffer, num_bytes)) {
66  return;
67  }
68 
69  { // Clear any temp buffers if we've moved on to a new fragment
70  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
71  if (temp_chunk_buffer_map_.size() > 0 &&
73  chunk_key[CHUNK_KEY_FRAGMENT_IDX]) {
75  }
76  }
77 
79 
80  // TODO: Populate optional buffers as part of CSV performance improvement
81  std::set<ChunkKey> chunk_keys = get_keys_set_from_table(chunk_key);
82  chunk_keys.erase(chunk_key);
83 
84  // Use hints to prefetch other chunks in fragment
85  ChunkToBufferMap optional_buffers;
86 
87  // Use hints to prefetch other chunks in fragment into cache
88  auto& data_wrapper = *getDataWrapper(chunk_key);
89  std::set<ChunkKey> optional_keys;
90  getOptionalChunkKeySet(optional_keys,
91  chunk_key,
92  get_keys_set_from_table(chunk_key),
93  data_wrapper.getNonCachedParallelismLevel());
94  if (optional_keys.size()) {
95  {
96  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
97  // Erase anything already in temp_chunk_buffer_map_
98  for (auto it = optional_keys.begin(); it != optional_keys.end();) {
99  if (temp_chunk_buffer_map_.find(*it) != temp_chunk_buffer_map_.end()) {
100  it = optional_keys.erase(it);
101  } else {
102  ++it;
103  }
104  }
105  }
106  if (optional_keys.size()) {
107  optional_buffers = allocateTempBuffersForChunks(optional_keys);
108  }
109  }
110 
111  auto required_buffers = allocateTempBuffersForChunks(chunk_keys);
112  required_buffers[chunk_key] = destination_buffer;
113  // populate will write directly to destination_buffer so no need to copy.
114  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
115 }
116 
118  const ChunkKey& chunk_key,
119  AbstractBuffer* destination_buffer,
120  size_t num_bytes) {
121  AbstractBuffer* buffer{nullptr};
122  {
123  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
124  if (temp_chunk_buffer_map_.find(chunk_key) == temp_chunk_buffer_map_.end()) {
125  return false;
126  }
127  buffer = temp_chunk_buffer_map_[chunk_key].get();
128  }
129  // For the index key, calls with size 0 get 1 added as
130  // empty index buffers start with one entry
131  // Set to 0 here to copy entire buffer
132  if (is_varlen_index_key(chunk_key) && (num_bytes == sizeof(StringOffsetT))) {
133  num_bytes = 0;
134  }
135  CHECK(buffer);
136  buffer->copyTo(destination_buffer, num_bytes);
137  {
138  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
139  temp_chunk_buffer_map_.erase(chunk_key);
140  }
141  return true;
142 }
143 
145  ChunkMetadataVector& chunk_metadata,
146  const ChunkKey& key_prefix) {
147  if (!g_enable_fsi) {
148  throw ForeignStorageException{
149  "Query cannot be executed for foreign table because FSI is currently disabled."};
150  }
151  CHECK(is_table_key(key_prefix));
152  checkIfS3NeedsToBeEnabled(key_prefix);
153  createDataWrapperIfNotExists(key_prefix);
154  getDataWrapper(key_prefix)->populateChunkMetadata(chunk_metadata);
155 }
156 
157 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
158  const ChunkKey table_key{db_id, table_id};
159  {
160  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
161  data_wrapper_map_.erase(table_key);
162  }
164 }
165 
167  return FOREIGN_STORAGE_MGR;
168 }
169 
171  return ToString(FOREIGN_STORAGE_MGR);
172 }
173 
175  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
176  CHECK(has_table_prefix(chunk_key));
177  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
178  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
179 }
180 
181 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
182  const ChunkKey& chunk_key) {
183  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
184  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
185  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
186  return data_wrapper_map_[table_key];
187 }
188 
190  const ChunkKey& table_key,
191  std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
192  CHECK(is_table_key(table_key));
193  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
194  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
195  data_wrapper->setParentWrapper(data_wrapper_map_[table_key]);
196  data_wrapper_map_[table_key] = data_wrapper;
197 }
198 
200  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
201  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
202  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
203  auto db_id = chunk_key[CHUNK_KEY_DB_IDX];
204  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
205  CHECK(catalog);
206  auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
208  foreign_table->foreign_server->data_wrapper_type, db_id, foreign_table);
209  return true;
210  }
211  return false;
212 }
213 
214 void ForeignStorageMgr::refreshTable(const ChunkKey& table_key,
215  const bool evict_cached_entries) {
216  auto catalog =
217  Catalog_Namespace::SysCatalog::instance().getCatalog(table_key[CHUNK_KEY_DB_IDX]);
218  CHECK(catalog);
219  // Clear datawrapper unless table is non-append and evict is false
220  if (evict_cached_entries ||
221  !catalog->getForeignTableUnlocked(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode()) {
222  clearDataWrapper(table_key);
223  }
224 }
225 
226 void ForeignStorageMgr::clearDataWrapper(const ChunkKey& table_key) {
227  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
228  // May not be created yet
229  if (data_wrapper_map_.find(table_key) != data_wrapper_map_.end()) {
230  data_wrapper_map_.erase(table_key);
231  }
232 }
233 
235  const ChunkKey& table_key) {
236  CHECK(is_table_key(table_key));
237  auto start_it = temp_chunk_buffer_map_.lower_bound(table_key);
238  ChunkKey upper_bound_prefix{table_key[CHUNK_KEY_DB_IDX],
239  table_key[CHUNK_KEY_TABLE_IDX],
240  std::numeric_limits<int>::max()};
241  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
242  temp_chunk_buffer_map_.erase(start_it, end_it);
243 }
244 
246  const ChunkKey& table_key) {
247  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
249 }
250 
251 bool ForeignStorageMgr::isDatawrapperRestored(const ChunkKey& chunk_key) {
252  if (!hasDataWrapperForChunk(chunk_key)) {
253  return false;
254  }
255  return getDataWrapper(chunk_key)->isRestored();
256 }
257 
258 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
259  UNREACHABLE();
260 }
261 
262 void ForeignStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
263  const bool purge) {
264  UNREACHABLE();
265 }
266 
267 bool ForeignStorageMgr::isBufferOnDevice(const ChunkKey& chunk_key) {
268  UNREACHABLE();
269  return false; // Added to avoid "no return statement" compiler warning
270 }
271 
273  UNREACHABLE();
274  return 0; // Added to avoid "no return statement" compiler warning
275 }
276 
278  const size_t page_size,
279  const size_t initial_size) {
280  UNREACHABLE();
281  return nullptr; // Added to avoid "no return statement" compiler warning
282 }
283 
285  AbstractBuffer* source_buffer,
286  const size_t num_bytes) {
287  UNREACHABLE();
288  return nullptr; // Added to avoid "no return statement" compiler warning
289 }
290 
291 std::string ForeignStorageMgr::printSlabs() {
292  UNREACHABLE();
293  return {}; // Added to avoid "no return statement" compiler warning
294 }
295 
297  UNREACHABLE();
298 }
299 
301  UNREACHABLE();
302  return 0; // Added to avoid "no return statement" compiler warning
303 }
304 
306  UNREACHABLE();
307  return 0; // Added to avoid "no return statement" compiler warning
308 }
309 
311  UNREACHABLE();
312  return 0; // Added to avoid "no return statement" compiler warning
313 }
314 
316  UNREACHABLE();
317  return false; // Added to avoid "no return statement" compiler warning
318 }
319 
321  UNREACHABLE();
322 }
323 
324 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
325  UNREACHABLE();
326 }
327 
328 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
329  UNREACHABLE();
330  return nullptr; // Added to avoid "no return statement" compiler warning
331 }
332 
334  UNREACHABLE();
335 }
336 
338  const ChunkKey& chunk_key) {
339  ChunkKey table_key = get_table_key(chunk_key);
340  if (createDataWrapperIfNotExists(table_key)) {
341  ChunkMetadataVector chunk_metadata;
342  getDataWrapper(table_key)->populateChunkMetadata(chunk_metadata);
343  }
344 }
345 
346 std::set<ChunkKey> get_keys_set_from_table(const ChunkKey& destination_chunk_key) {
347  std::set<ChunkKey> chunk_keys;
348  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
349  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
350  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
351  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
352  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
353  CHECK(catalog);
354  auto foreign_table = catalog->getForeignTableUnlocked(table_id);
355 
356  ForeignTableSchema schema{db_id, foreign_table};
357  auto logical_column = schema.getLogicalColumn(destination_column_id);
358  auto logical_column_id = logical_column->columnId;
359 
360  for (auto column_id = logical_column_id;
361  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
362  column_id++) {
363  auto column = schema.getColumnDescriptor(column_id);
364  if (column->columnType.is_varlen_indeed()) {
365  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
366  chunk_keys.emplace(data_chunk_key);
367 
368  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
369  chunk_keys.emplace(index_chunk_key);
370  } else {
371  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
372  chunk_keys.emplace(data_chunk_key);
373  }
374  }
375  return chunk_keys;
376 }
377 
378 std::vector<ChunkKey> get_keys_vec_from_table(const ChunkKey& destination_chunk_key) {
379  std::vector<ChunkKey> chunk_keys;
380  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
381  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
382  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
383  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
384  auto catalog = Catalog_Namespace::SysCatalog::instance().getCatalog(db_id);
385  CHECK(catalog);
386  auto foreign_table = catalog->getForeignTableUnlocked(table_id);
387 
388  ForeignTableSchema schema{db_id, foreign_table};
389  auto logical_column = schema.getLogicalColumn(destination_column_id);
390  auto logical_column_id = logical_column->columnId;
391 
392  for (auto column_id = logical_column_id;
393  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
394  column_id++) {
395  auto column = schema.getColumnDescriptor(column_id);
396  if (column->columnType.is_varlen_indeed()) {
397  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
398  chunk_keys.emplace_back(data_chunk_key);
399 
400  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
401  chunk_keys.emplace_back(index_chunk_key);
402  } else {
403  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
404  chunk_keys.emplace_back(data_chunk_key);
405  }
406  }
407  return chunk_keys;
408 }
409 
411  const std::set<ChunkKey>& chunk_keys) {
412  ChunkToBufferMap chunk_buffer_map;
413  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
414  for (const auto& chunk_key : chunk_keys) {
415  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
416  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
417  chunk_buffer_map[chunk_key]->resetToEmpty();
418  }
419  return chunk_buffer_map;
420 }
421 
423  const std::map<ChunkKey, std::set<ParallelismHint>>& hints_per_table) {
424  std::shared_lock data_wrapper_lock(parallelism_hints_mutex_);
425  parallelism_hints_per_table_ = hints_per_table;
426 }
427 
429  std::set<ChunkKey>& optional_chunk_keys,
430  const ChunkKey& chunk_key,
431  const std::set<ChunkKey>& required_chunk_keys,
432  const ForeignDataWrapper::ParallelismLevel parallelism_level) {
433  if (parallelism_level == ForeignDataWrapper::NONE) {
434  return;
435  }
436  std::shared_lock data_wrapper_lock(parallelism_hints_mutex_);
437  for (const auto& hint : parallelism_hints_per_table_[get_table_key(chunk_key)]) {
438  const auto& [column_id, fragment_id] = hint;
439  ChunkKey optional_chunk_key_key = get_table_key(chunk_key);
440  optional_chunk_key_key.push_back(column_id);
441  auto optional_chunk_key = optional_chunk_key_key;
442  if (parallelism_level == ForeignDataWrapper::INTRA_FRAGMENT) {
443  optional_chunk_key.push_back(chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
444  } else if (parallelism_level == ForeignDataWrapper::INTER_FRAGMENT) {
445  optional_chunk_key.push_back(fragment_id);
446  } else {
447  UNREACHABLE();
448  }
449  std::set<ChunkKey> keys = get_keys_set_from_table(optional_chunk_key);
450  for (const auto& key : keys) {
451  if (required_chunk_keys.find(key) == required_chunk_keys.end()) {
452  optional_chunk_keys.insert(key);
453  }
454  }
455  }
456 }
457 
458 } // namespace foreign_storage
void clearTempChunkBufferMapEntriesForTableUnlocked(const ChunkKey &table_key)
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
std::vector< int > ChunkKey
Definition: types.h:37
static std::unique_ptr< ForeignDataWrapper > create(const std::string &data_wrapper_type, const int db_id, const ForeignTable *foreign_table)
AbstractBuffer * alloc(const size_t num_bytes) override
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
#define UNREACHABLE()
Definition: Logger.h:247
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void free(AbstractBuffer *buffer) override
void removeTableRelatedDS(const int db_id, const int table_id) override
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
int32_t StringOffsetT
Definition: sqltypes.h:937
static SysCatalog & instance()
Definition: SysCatalog.h:292
bool g_enable_s3_fsi
Definition: Catalog.cpp:93
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level)
bool is_varlen_index_key(const ChunkKey &key)
Definition: types.h:74
void createAndPopulateDataWrapperIfNotExists(const ChunkKey &chunk_key)
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
std::map< ChunkKey, std::set< ParallelismHint > > parallelism_hints_per_table_
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:48
An AbstractBuffer is a unit of data management for a data manager.
std::string getStringMgrType() override
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
bool hasDataWrapperForChunk(const ChunkKey &chunk_key)
void clearDataWrapper(const ChunkKey &table_key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
ChunkToBufferMap allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
void setParallelismHints(const std::map< ChunkKey, std::set< ParallelismHint >> &hints_per_table)
#define CHECK(condition)
Definition: Logger.h:203
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
bool g_enable_fsi
Definition: Catalog.cpp:92
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
std::string printSlabs() override
bool isBufferOnDevice(const ChunkKey &chunk_key) override