OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ForeignStorageMgr.h"
18 
19 #include "Catalog/ForeignTable.h"
20 #include "CsvDataWrapper.h"
21 #include "ForeignTableSchema.h"
22 #include "ParquetDataWrapper.h"
23 
24 namespace foreign_storage {
25 ForeignStorageMgr::ForeignStorageMgr() : AbstractBufferMgr(0), data_wrapper_map_({}) {}
26 
28  const size_t num_bytes) {
29  UNREACHABLE();
30  return nullptr; // Added to avoid "no return statement" compiler warning
31 }
32 
33 void ForeignStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
34  AbstractBuffer* destination_buffer,
35  const size_t num_bytes) {
36  CHECK(destination_buffer);
37  CHECK(!destination_buffer->isDirty());
38  // Use a temp buffer if we have no cache buffers and have one mapped for this chunk.
39  if (fetchBufferIfTempBufferMapEntryExists(chunk_key, destination_buffer, num_bytes)) {
40  return;
41  }
43 
44  // TODO: Populate optional buffers as part of CSV performance improvement
45  std::set<ChunkKey> chunk_keys = get_keys_set_from_table(chunk_key);
46  chunk_keys.erase(chunk_key);
47  std::map<ChunkKey, AbstractBuffer*> optional_buffers;
48  auto required_buffers = allocateTempBuffersForChunks(chunk_keys);
49  required_buffers[chunk_key] = destination_buffer;
50  // populate will write directly to destination_buffer so no need to copy.
51  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
52 }
53 
55  const ChunkKey& chunk_key,
56  AbstractBuffer* destination_buffer,
57  const size_t num_bytes) {
58  AbstractBuffer* buffer{nullptr};
59  {
60  std::shared_lock temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
61  if (temp_chunk_buffer_map_.find(chunk_key) == temp_chunk_buffer_map_.end()) {
62  return false;
63  }
64  buffer = temp_chunk_buffer_map_[chunk_key].get();
65  }
66  CHECK(buffer);
67  buffer->copyTo(destination_buffer, num_bytes);
68  {
69  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
70  temp_chunk_buffer_map_.erase(chunk_key);
71  }
72  return true;
73 }
74 
76  ChunkMetadataVector& chunk_metadata,
77  const ChunkKey& keyPrefix) {
78  CHECK(is_table_key(keyPrefix));
80  getDataWrapper(keyPrefix)->populateChunkMetadata(chunk_metadata);
81 }
82 
83 void ForeignStorageMgr::removeTableRelatedDS(const int db_id, const int table_id) {
84  const ChunkKey table_key{db_id, table_id};
85  {
86  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
87  data_wrapper_map_.erase(table_key);
88  }
90 }
91 
93  return FOREIGN_STORAGE_MGR;
94 }
95 
97  return ToString(FOREIGN_STORAGE_MGR);
98 }
99 
101  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
102  CHECK(has_table_prefix(chunk_key));
103  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
104  return data_wrapper_map_.find(table_key) != data_wrapper_map_.end();
105 }
106 
107 std::shared_ptr<ForeignDataWrapper> ForeignStorageMgr::getDataWrapper(
108  const ChunkKey& chunk_key) {
109  std::shared_lock data_wrapper_lock(data_wrapper_mutex_);
110  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
111  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
112  return data_wrapper_map_[table_key];
113 }
114 
116  const ChunkKey& table_key,
117  std::shared_ptr<MockForeignDataWrapper> data_wrapper) {
118  CHECK(is_table_key(table_key));
119  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
120  CHECK(data_wrapper_map_.find(table_key) != data_wrapper_map_.end());
121  data_wrapper->setParentWrapper(data_wrapper_map_[table_key]);
122  data_wrapper_map_[table_key] = data_wrapper;
123 }
124 
126  std::lock_guard data_wrapper_lock(data_wrapper_mutex_);
127  ChunkKey table_key{chunk_key[CHUNK_KEY_DB_IDX], chunk_key[CHUNK_KEY_TABLE_IDX]};
128  if (data_wrapper_map_.find(table_key) == data_wrapper_map_.end()) {
129  auto db_id = chunk_key[CHUNK_KEY_DB_IDX];
130  auto foreign_table =
131  Catalog_Namespace::Catalog::checkedGet(db_id)->getForeignTableUnlocked(
132  chunk_key[CHUNK_KEY_TABLE_IDX]);
133 
134  if (foreign_table->foreign_server->data_wrapper_type ==
136  data_wrapper_map_[table_key] =
137  std::make_shared<CsvDataWrapper>(db_id, foreign_table);
138  } else if (foreign_table->foreign_server->data_wrapper_type ==
140  data_wrapper_map_[table_key] =
141  std::make_shared<ParquetDataWrapper>(db_id, foreign_table);
142  } else {
143  throw std::runtime_error("Unsupported data wrapper");
144  }
145  return true;
146  }
147  return false;
148 }
149 
150 void ForeignStorageMgr::refreshTable(const ChunkKey& table_key,
151  const bool evict_cached_entries) {
152  // Noop - If the cache is not enabled then a refresh does nothing.
153 }
154 
156  const ChunkKey& table_key) {
157  CHECK(is_table_key(table_key));
158  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
159  auto start_it = temp_chunk_buffer_map_.lower_bound(table_key);
160  ChunkKey upper_bound_prefix{table_key[CHUNK_KEY_DB_IDX],
161  table_key[CHUNK_KEY_TABLE_IDX],
162  std::numeric_limits<int>::max()};
163  auto end_it = temp_chunk_buffer_map_.upper_bound(upper_bound_prefix);
164  temp_chunk_buffer_map_.erase(start_it, end_it);
165 }
166 
167 bool ForeignStorageMgr::isDatawrapperRestored(const ChunkKey& chunk_key) {
168  if (!hasDataWrapperForChunk(chunk_key)) {
169  return false;
170  }
171  return getDataWrapper(chunk_key)->isRestored();
172 }
173 
174 void ForeignStorageMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
175  UNREACHABLE();
176 }
177 
178 void ForeignStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_prefix,
179  const bool purge) {
180  UNREACHABLE();
181 }
182 
183 bool ForeignStorageMgr::isBufferOnDevice(const ChunkKey& chunk_key) {
184  UNREACHABLE();
185  return false; // Added to avoid "no return statement" compiler warning
186 }
187 
189  UNREACHABLE();
190  return 0; // Added to avoid "no return statement" compiler warning
191 }
192 
194  const size_t page_size,
195  const size_t initial_size) {
196  UNREACHABLE();
197  return nullptr; // Added to avoid "no return statement" compiler warning
198 }
199 
201  AbstractBuffer* source_buffer,
202  const size_t num_bytes) {
203  UNREACHABLE();
204  return nullptr; // Added to avoid "no return statement" compiler warning
205 }
206 
207 std::string ForeignStorageMgr::printSlabs() {
208  UNREACHABLE();
209  return {}; // Added to avoid "no return statement" compiler warning
210 }
211 
213  UNREACHABLE();
214 }
215 
217  UNREACHABLE();
218  return 0; // Added to avoid "no return statement" compiler warning
219 }
220 
222  UNREACHABLE();
223  return 0; // Added to avoid "no return statement" compiler warning
224 }
225 
227  UNREACHABLE();
228  return 0; // Added to avoid "no return statement" compiler warning
229 }
230 
232  UNREACHABLE();
233  return false; // Added to avoid "no return statement" compiler warning
234 }
235 
237  UNREACHABLE();
238 }
239 
240 void ForeignStorageMgr::checkpoint(const int db_id, const int tb_id) {
241  UNREACHABLE();
242 }
243 
244 AbstractBuffer* ForeignStorageMgr::alloc(const size_t num_bytes) {
245  UNREACHABLE();
246  return nullptr; // Added to avoid "no return statement" compiler warning
247 }
248 
250  UNREACHABLE();
251 }
252 
254  const ChunkKey& chunk_key) {
255  ChunkKey table_key = get_table_key(chunk_key);
256  if (createDataWrapperIfNotExists(table_key)) {
257  ChunkMetadataVector chunk_metadata;
258  getDataWrapper(table_key)->populateChunkMetadata(chunk_metadata);
259  }
260 }
261 
262 std::set<ChunkKey> get_keys_set_from_table(const ChunkKey& destination_chunk_key) {
263  std::set<ChunkKey> chunk_keys;
264  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
265  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
266  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
267  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
268  auto foreign_table =
269  Catalog_Namespace::Catalog::checkedGet(db_id)->getForeignTableUnlocked(table_id);
270 
271  ForeignTableSchema schema{db_id, foreign_table};
272  auto logical_column = schema.getLogicalColumn(destination_column_id);
273  auto logical_column_id = logical_column->columnId;
274 
275  for (auto column_id = logical_column_id;
276  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
277  column_id++) {
278  auto column = schema.getColumnDescriptor(column_id);
279  if (column->columnType.is_varlen_indeed()) {
280  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
281  chunk_keys.emplace(data_chunk_key);
282 
283  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
284  chunk_keys.emplace(index_chunk_key);
285  } else {
286  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
287  chunk_keys.emplace(data_chunk_key);
288  }
289  }
290  return chunk_keys;
291 }
292 
293 std::vector<ChunkKey> get_keys_vec_from_table(const ChunkKey& destination_chunk_key) {
294  std::vector<ChunkKey> chunk_keys;
295  auto db_id = destination_chunk_key[CHUNK_KEY_DB_IDX];
296  auto table_id = destination_chunk_key[CHUNK_KEY_TABLE_IDX];
297  auto destination_column_id = destination_chunk_key[CHUNK_KEY_COLUMN_IDX];
298  auto fragment_id = destination_chunk_key[CHUNK_KEY_FRAGMENT_IDX];
299  auto foreign_table =
300  Catalog_Namespace::Catalog::checkedGet(db_id)->getForeignTableUnlocked(table_id);
301 
302  ForeignTableSchema schema{db_id, foreign_table};
303  auto logical_column = schema.getLogicalColumn(destination_column_id);
304  auto logical_column_id = logical_column->columnId;
305 
306  for (auto column_id = logical_column_id;
307  column_id <= (logical_column_id + logical_column->columnType.get_physical_cols());
308  column_id++) {
309  auto column = schema.getColumnDescriptor(column_id);
310  if (column->columnType.is_varlen_indeed()) {
311  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id, 1};
312  chunk_keys.emplace_back(data_chunk_key);
313 
314  ChunkKey index_chunk_key{db_id, table_id, column->columnId, fragment_id, 2};
315  chunk_keys.emplace_back(index_chunk_key);
316  } else {
317  ChunkKey data_chunk_key = {db_id, table_id, column->columnId, fragment_id};
318  chunk_keys.emplace_back(data_chunk_key);
319  }
320  }
321  return chunk_keys;
322 }
323 
324 std::map<ChunkKey, AbstractBuffer*> ForeignStorageMgr::allocateTempBuffersForChunks(
325  const std::set<ChunkKey>& chunk_keys) {
326  std::map<ChunkKey, AbstractBuffer*> chunk_buffer_map;
327  std::lock_guard temp_chunk_buffer_map_lock(temp_chunk_buffer_map_mutex_);
328  for (const auto& chunk_key : chunk_keys) {
329  temp_chunk_buffer_map_[chunk_key] = std::make_unique<ForeignStorageBuffer>();
330  chunk_buffer_map[chunk_key] = temp_chunk_buffer_map_[chunk_key].get();
331  }
332  return chunk_buffer_map;
333 }
334 } // namespace foreign_storage
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
std::vector< int > ChunkKey
Definition: types.h:37
AbstractBuffer * alloc(const size_t num_bytes) override
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
bool fetchBufferIfTempBufferMapEntryExists(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
#define UNREACHABLE()
Definition: Logger.h:241
void free(AbstractBuffer *buffer) override
void removeTableRelatedDS(const int db_id, const int table_id) override
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
void createAndPopulateDataWrapperIfNotExists(const ChunkKey &chunk_key)
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool isDatawrapperRestored(const ChunkKey &chunk_key)
std::shared_mutex temp_chunk_buffer_map_mutex_
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:48
An AbstractBuffer is a unit of data management for a data manager.
static std::shared_ptr< Catalog > checkedGet(const int32_t db_id)
Definition: Catalog.cpp:3846
std::string getStringMgrType() override
std::map< ChunkKey, std::shared_ptr< ForeignDataWrapper > > data_wrapper_map_
bool hasDataWrapperForChunk(const ChunkKey &chunk_key)
virtual void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries)
void setDataWrapper(const ChunkKey &table_key, std::shared_ptr< MockForeignDataWrapper > data_wrapper)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
AbstractBuffer * getBuffer(const ChunkKey &chunk_key, const size_t num_bytes) override
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
#define CHECK(condition)
Definition: Logger.h:197
static constexpr char const * CSV
Definition: ForeignServer.h:35
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
std::map< ChunkKey, std::unique_ptr< AbstractBuffer > > temp_chunk_buffer_map_
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
static constexpr char const * PARQUET
Definition: ForeignServer.h:36
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
std::string printSlabs() override
std::map< ChunkKey, AbstractBuffer * > allocateTempBuffersForChunks(const std::set< ChunkKey > &chunk_keys)
bool isBufferOnDevice(const ChunkKey &chunk_key) override