OmniSciDB  8a228a1076
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 
2 /*
3  * Copyright 2020 OmniSci, Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #include "ForeignStorageMgr.h"
19 
20 #include "Catalog/ForeignTable.h"
21 #include "CsvDataWrapper.h"
22 #include "ForeignTableSchema.h"
23 #include "ParquetDataWrapper.h"
24 
25 namespace foreign_storage {
27  : AbstractBufferMgr(0), data_wrapper_map_({}), foreign_storage_cache_(fsc) {
29 }
30 
32  const size_t num_bytes) {
33  UNREACHABLE();
34  return nullptr; // Added to avoid "no return statement" compiler warning
35 }
36 
38  AbstractBuffer* destination_buffer,
39  const size_t num_bytes) {
40  CHECK(!destination_buffer->isDirty());
41  bool cached = true;
42  // This code is inlined from getBuffer() because we need to know if we had a cache hit
43  // to know if we need to write back to the cache later.
46  : nullptr;
47  bool is_buffer_from_map{false};
48  if (!is_cache_enabled_) {
49  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
50  if (temp_chunk_buffer_map_.find(chunk_key) != temp_chunk_buffer_map_.end()) {
51  buffer = temp_chunk_buffer_map_[chunk_key].get();
52  CHECK(buffer);
53  is_buffer_from_map = true;
54  }
55  }
56 
57  // TODO: Populate optional buffers as part of CSV performance improvement
58  std::map<ChunkKey, AbstractBuffer*> optional_buffers;
59  std::map<ChunkKey, AbstractBuffer*> required_buffers;
60  std::vector<ChunkKey> chunk_keys;
61  if (buffer == nullptr) {
62  cached = false;
63  required_buffers =
64  getChunkBuffersToPopulate(chunk_key, destination_buffer, chunk_keys);
65  CHECK(required_buffers.find(chunk_key) != required_buffers.end());
67  chunk_key, required_buffers, optional_buffers);
68  buffer = required_buffers[chunk_key];
69  }
70  CHECK(buffer);
71 
72  // Read the contents of the source buffer into the destination buffer, if the
73  // destination buffer was not directly populated by the data wrapper
74  if (is_cache_enabled_ || is_buffer_from_map) {
75  size_t chunk_size = (num_bytes == 0) ? buffer->size() : num_bytes;
76  destination_buffer->reserve(chunk_size);
77  buffer->read(destination_buffer->getMemoryPtr() + destination_buffer->size(),
78  chunk_size - destination_buffer->size(),
79  destination_buffer->size(),
80  destination_buffer->getType(),
81  destination_buffer->getDeviceId());
82  destination_buffer->setSize(chunk_size);
83  destination_buffer->syncEncoder(buffer);
84  }
85 
86  if (is_buffer_from_map) {
87  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
88  temp_chunk_buffer_map_.erase(chunk_key);
89  }
90 
91  // We only write back to the cache if we did not get the buffer from the cache.
92  if (is_cache_enabled_ && !cached) {
94  }
95 }
96 
97 std::map<ChunkKey, AbstractBuffer*> ForeignStorageMgr::getChunkBuffersToPopulate(
98  const ChunkKey& destination_chunk_key,
99  AbstractBuffer* destination_buffer,
100  std::vector<ChunkKey>& chunk_keys) {
101  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
102  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
103  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
104  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
105 
106  auto catalog = Catalog_Namespace::Catalog::get(db_id);
107  CHECK(catalog);
108 
109  auto table = catalog->getMetadataForTableImpl(table_id, false);
110  CHECK(table);
111 
112  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
113  CHECK(foreign_table);
114 
115  ForeignTableSchema schema{db_id, foreign_table};
116  auto logical_column = schema.getLogicalColumn(destination_column_id);
117  auto logical_column_id = logical_column->columnId;
118 
119  for (auto column_id = logical_column_id;
120  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
121  column_id++) {
122  auto column = schema.getColumnDescriptor(column_id);
123  if (column->columnType.is_varlen_indeed()) {
124  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
125  chunk_keys.emplace_back(data_chunk_key);
126 
127  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
128  chunk_keys.emplace_back(index_chunk_key);
129  } else {
130  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
131  chunk_keys.emplace_back(data_chunk_key);
132  }
133  }
134 
135  std::map<ChunkKey, AbstractBuffer*> chunk_buffer_map;
136  if (is_cache_enabled_) {
137  chunk_buffer_map = foreign_storage_cache_->getChunkBuffersForCaching(chunk_keys);
138  } else {
139  chunk_buffer_map[destination_chunk_key] = destination_buffer;
140  if (chunk_keys.size() > 1) {
141  for (const auto& chunk_key : chunk_keys) {
142  if (chunk_key != destination_chunk_key) {
143  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
144  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
145  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
146  }
147  }
148  } else {
149  CHECK_EQ(chunk_keys.size(), static_cast<size_t>(1));
150  CHECK(destination_chunk_key == chunk_keys[0]);
151  }
152  }
153  return chunk_buffer_map;
154 }
155 
157  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
158  for (auto& [table_chunk_key, data_wrapper] : data_wrapper_map_) {
159  data_wrapper->populateChunkMetadata(chunk_metadata);
160  }
161 
162  if (is_cache_enabled_) {
163  foreign_storage_cache_->cacheMetadataVec(chunk_metadata);
164  }
165 }
166 
168  ChunkMetadataVector& chunk_metadata,
169  const ChunkKey& keyPrefix) {
170  CHECK(isTableKey(keyPrefix));
171  if (is_cache_enabled_ &&
173  foreign_storage_cache_->getCachedMetadataVecForKeyPrefix(chunk_metadata, keyPrefix);
174  return;
175  }
176  // If we haven't created a data wrapper yet then check to see if we can recover data.
177  if (is_cache_enabled_) {
178  if (data_wrapper_map_.find(keyPrefix) == data_wrapper_map_.end()) {
179  if (foreign_storage_cache_->recoverCacheForTable(chunk_metadata, keyPrefix)) {
180  // If we recovered table data from disk then no need to create data wrappers yet.
181  return;
182  }
183  }
184  }
185  createDataWrapperIfNotExists(keyPrefix);
186  getDataWrapper(keyPrefix)->populateChunkMetadata(chunk_metadata);
187 
188  if (is_cache_enabled_) {
189  foreign_storage_cache_->cacheMetadataVec(chunk_metadata);
190  }
191 }
192 
193 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
194  {
195  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
196  data_wrapper_map_.erase({db_id, table_id});
197  }
198 
199  // Clear regardless of is_cache_enabled_
200  if (is_cache_enabled_) {
201  foreign_storage_cache_->clearForTablePrefix({db_id, table_id});
202  }
203 
205 }
206 
208  return FOREIGN_STORAGE_MGR;
209 }
210 
212  return ToString(FOREIGN_STORAGE_MGR);
213 }
214 
216  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
217  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
218  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
219 }
220 
221 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
222  const ChunkKey& chunk_key) {
223  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
224  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
225  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
226  return data_wrapper_map_[table_key];
227 }
228 
230  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
231  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
232  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
233  auto db_id = chunk_key[CHUNK_KEY_DB_IDX];
234  auto table_id = chunk_key[CHUNK_KEY_TABLE_IDX];
235 
236  auto catalog = Catalog_Namespace::Catalog::get(db_id);
237  CHECK(catalog);
238 
239  auto table = catalog->getMetadataForTableImpl(table_id, false);
240  CHECK(table);
241 
242  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
243  CHECK(foreign_table);
244 
245  if (foreign_table->foreign_server->data_wrapper_type ==
247  data_wrapper_map_[table_key] =
248  std::make_shared<CsvDataWrapper>(db_id, foreign_table);
249  } else if (foreign_table->foreign_server->data_wrapper_type ==
251  data_wrapper_map_[table_key] =
252  std::make_shared<ParquetDataWrapper>(db_id, foreign_table);
253  } else {
254  throw std::runtime_error("Unsupported data wrapper");
255  }
256  return true;
257  }
258  return false;
259 }
260 
262  return foreign_storage_cache_;
263 }
264 
265 void ForeignStorageMgr::refreshTables(const std::vector<ChunkKey>& table_keys,
266  const bool evict_cached_entries) {
267  clearTempChunkBufferMap(table_keys);
268  if (evict_cached_entries) {
269  evictTablesFromCache(table_keys);
270  } else {
271  refreshTablesInCache(table_keys);
272  }
273 }
274 
275 void ForeignStorageMgr::refreshTablesInCache(const std::vector<ChunkKey>& table_keys) {
276  if (!is_cache_enabled_) {
277  return;
278  }
279  for (const auto& table_key : table_keys) {
280  CHECK(isTableKey(table_key));
281  // Get a list of which chunks were cached for a table.
282  std::vector<ChunkKey> old_chunk_keys =
285 
286  // Refresh metadata.
287  ChunkMetadataVector metadata_vec;
288  getDataWrapper(table_key)->populateChunkMetadata(metadata_vec);
290 
291  // Iterate through previously cached chunks and re-cache them. Caching is
292  // done one fragment at a time, for all applicable chunks in the fragment.
293  std::map<ChunkKey, AbstractBuffer*> optional_buffers;
294  std::vector<ChunkKey> chunk_keys_to_be_cached;
295  if (!old_chunk_keys.empty()) {
296  auto fragment_id = old_chunk_keys[0][CHUNK_KEY_FRAGMENT_IDX];
297  std::vector<ChunkKey> chunk_keys_in_fragment;
298  for (const auto& chunk_key : old_chunk_keys) {
299  if (foreign_storage_cache_->isMetadataCached(chunk_key)) {
300  if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] != fragment_id) {
301  auto required_buffers =
302  foreign_storage_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
303  getDataWrapper(table_key)->populateChunkBuffers(required_buffers,
304  optional_buffers);
305  chunk_keys_in_fragment.clear();
306  fragment_id = chunk_key[CHUNK_KEY_FRAGMENT_IDX];
307  }
308  if (isVarLenKey(chunk_key)) {
309  CHECK(isVarLenDataKey(chunk_key));
310  ChunkKey index_chunk_key{chunk_key[CHUNK_KEY_DB_IDX],
311  chunk_key[CHUNK_KEY_TABLE_IDX],
312  chunk_key[CHUNK_KEY_COLUMN_IDX],
313  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
314  2};
315  chunk_keys_in_fragment.emplace_back(index_chunk_key);
316  chunk_keys_to_be_cached.emplace_back(index_chunk_key);
317  }
318  chunk_keys_in_fragment.emplace_back(chunk_key);
319  chunk_keys_to_be_cached.emplace_back(chunk_key);
320  }
321  }
322  auto required_buffers =
323  foreign_storage_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
324  getDataWrapper(table_key)->populateChunkBuffers(required_buffers, optional_buffers);
325  foreign_storage_cache_->cacheTableChunks(chunk_keys_to_be_cached);
326  }
327  }
328 }
329 
330 void ForeignStorageMgr::evictTablesFromCache(const std::vector<ChunkKey>& table_keys) {
331  if (!is_cache_enabled_) {
332  return;
333  }
334 
335  for (auto& table_key : table_keys) {
336  CHECK(isTableKey(table_key));
338  }
339 }
340 
341 void ForeignStorageMgr::clearTempChunkBufferMap(const std::vector<ChunkKey>& table_keys) {
342  for (const auto& table_key : table_keys) {
343  CHECK(isTableKey(table_key));
345  table_key[CHUNK_KEY_TABLE_IDX]);
346  }
347 }
348 
350  const int table_id) {
351  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
352  auto start_it = temp_chunk_buffer_map_.lower_bound({db_id, table_id});
353  ChunkKey upper_bound_prefix{db_id, table_id, std::numeric_limits<int>::max()};
354  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
355  temp_chunk_buffer_map_.erase(start_it, end_it);
356 }
357 
359  const ChunkKey& chunk_key,
360  std::map<ChunkKey, AbstractBuffer*>& required_buffers,
361  std::map<ChunkKey, AbstractBuffer*>& optional_buffers) {
362  if (createDataWrapperIfNotExists(chunk_key)) {
363  ChunkMetadataVector chunk_metadata;
364  getDataWrapper(chunk_key)->populateChunkMetadata(chunk_metadata);
365  }
366  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
367 }
368 
369 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
370  UNREACHABLE();
371 }
372 
374  const bool purge) {
375  UNREACHABLE();
376 }
377 
379  UNREACHABLE();
380  return false; // Added to avoid "no return statement" compiler warning
381 }
382 
384  UNREACHABLE();
385  return 0; // Added to avoid "no return statement" compiler warning
386 }
387 
389  const size_t page_size,
390  const size_t initial_size) {
391  UNREACHABLE();
392  return nullptr; // Added to avoid "no return statement" compiler warning
393 }
394 
396  AbstractBuffer* source_buffer,
397  const size_t num_bytes) {
398  UNREACHABLE();
399  return nullptr; // Added to avoid "no return statement" compiler warning
400 }
401 
403  UNREACHABLE();
404  return {}; // Added to avoid "no return statement" compiler warning
405 }
406 
408  UNREACHABLE();
409 }
410 
412  UNREACHABLE();
413  return 0; // Added to avoid "no return statement" compiler warning
414 }
415 
417  UNREACHABLE();
418  return 0; // Added to avoid "no return statement" compiler warning
419 }
420 
422  UNREACHABLE();
423  return 0; // Added to avoid "no return statement" compiler warning
424 }
425 
427  UNREACHABLE();
428  return false; // Added to avoid "no return statement" compiler warning
429 }
430 
432  UNREACHABLE();
433 }
434 
435 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
436  UNREACHABLE();
437 }
438 
439 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
440  UNREACHABLE();
441  return nullptr; // Added to avoid "no return statement" compiler warning
442 }
443 
445  UNREACHABLE();
446 }
447 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
void syncEncoder(const AbstractBuffer *src_buffer)
ForeignStorageMgr(ForeignStorageCache *fsc=nullptr)
bool isVarLenKey(const ChunkKey &key)
Definition: types.h:50
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:38
static std::shared_ptr< Catalog > get(const std::string &dbName)
Definition: Catalog.cpp:3671
virtual size_t size() const =0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
#define UNREACHABLE()
Definition: Logger.h:241
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
void free(AbstractBuffer *buffer) override
void clearTempChunkBufferMap(const std::vector< ChunkKey > &table_keys)
std::map< ChunkKey, AbstractBuffer * > getChunkBuffersToPopulate(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, std::vector< ChunkKey > &chunk_keys)
ForeignStorageCache * getForeignStorageCache() const
void removeTableRelatedDS(const int db_id, const int table_id) override
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
ForeignStorageCache * foreign_storage_cache_
AbstractBuffer * getCachedChunkIfExists(const ChunkKey &)
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &)
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
AbstractBuffer * alloc(const size_t num_bytes) override
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool hasCachedMetadataForKeyPrefix(const ChunkKey &)
virtual void setSize(const size_t size)
std::shared_mutex temp_chunk_buffer_map_mutex_
bool isTableKey(const ChunkKey &key)
Definition: types.h:42
An AbstractBuffer is a unit of data management for a data manager.
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
void clearTempChunkBufferMapEntriesForTable(const int db_id, const int table_id)
void populateBuffersFromOptionallyCreatedWrapper(const ChunkKey &chunk_key, std::map< ChunkKey, AbstractBuffer *> &required_buffers, std::map< ChunkKey, AbstractBuffer *> &optional_buffers)
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
void cacheMetadataVec(const ChunkMetadataVector &)
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:40
bool isVarLenDataKey(const ChunkKey &key)
Definition: types.h:54
void evictTablesFromCache(const std::vector< ChunkKey > &table_keys)
bool hasDataWrapperForChunk(const ChunkKey &chunk_key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
virtual int getDeviceId() const
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:39
#define CHECK(condition)
Definition: Logger.h:197
static constexpr char const * CSV
Definition: ForeignServer.h:35
void refreshTables(const std::vector< ChunkKey > &table_keys, const bool evict_cached_entries)
std::vector< int > ChunkKey
Definition: types.h:35
virtual bool isDirty() const
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
std::map< ChunkKey, AbstractBuffer * > getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
void getChunkMetadataVec(ChunkMetadataVector &chunk_metadata) override
#define CHUNK_KEY_DB_IDX
Definition: types.h:37
const ColumnDescriptor * getLogicalColumn(const int column_id) const
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
virtual void reserve(size_t num_bytes)=0
static constexpr char const * PARQUET
Definition: ForeignServer.h:36
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
void refreshTablesInCache(const std::vector< ChunkKey > &table_keys)
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &)
bool isBufferOnDevice(const ChunkKey &chunk_key) override