OmniSciDB  3a86f6ec37
CachingForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include "Catalog/ForeignTable.h"
20 #include "CsvDataWrapper.h"
21 #include "ForeignTableSchema.h"
22 #include "ParquetDataWrapper.h"
23 
24 namespace foreign_storage {
25 
26 namespace {
27 constexpr int64_t MAX_REFRESH_TIME_IN_SECONDS = 60 * 60;
28 const std::string wrapper_file_name = "/wrapper_metadata.json";
29 } // namespace
30 
32  : ForeignStorageMgr(), disk_cache_(cache) {
34 }
35 
37  AbstractBuffer* destination_buffer,
38  const size_t num_bytes) {
39  CHECK(destination_buffer);
40  CHECK(!destination_buffer->isDirty());
41 
43 
44  // TODO: Populate optional buffers as part of CSV performance improvement
45  std::vector<ChunkKey> chunk_keys = get_keys_vec_from_table(chunk_key);
46  std::vector<ChunkKey> optional_chunk_keys;
47  std::map<ChunkKey, AbstractBuffer*> optional_buffers;
48 
49  // Use hints to prefetch other chunks in fragment into cache
50  auto catalog =
52  CHECK(catalog);
53  auto foreign_table = catalog->getForeignTableUnlocked(chunk_key[CHUNK_KEY_TABLE_IDX]);
54  if (foreign_table->foreign_server->data_wrapper_type ==
55  foreign_storage::DataWrapperType::CSV) // optimization only useful for column based
56  // formats
57  {
58  std::set<ChunkKey> optional_chunk_key_set;
60  optional_chunk_key_set, chunk_key, get_keys_set_from_table(chunk_key));
61  for (const auto& key : optional_chunk_key_set) {
62  if (disk_cache_->getCachedChunkIfExists(key) == nullptr) {
63  optional_chunk_keys.emplace_back(key);
64  }
65  }
66 
67  if (optional_chunk_keys.size()) {
68  optional_buffers = disk_cache_->getChunkBuffersForCaching(optional_chunk_keys);
69  }
70  }
71 
72  std::map<ChunkKey, AbstractBuffer*> required_buffers =
74  CHECK(required_buffers.find(chunk_key) != required_buffers.end());
75  getDataWrapper(chunk_key)->populateChunkBuffers(required_buffers, optional_buffers);
76  disk_cache_->cacheTableChunks(chunk_keys);
77  if (optional_chunk_keys.size()) {
78  disk_cache_->cacheTableChunks(optional_chunk_keys);
79  }
80 
81  AbstractBuffer* buffer = required_buffers.at(chunk_key);
82  CHECK(buffer);
83 
84  buffer->copyTo(destination_buffer, num_bytes);
85 }
86 
88  ChunkMetadataVector& chunk_metadata,
89  const ChunkKey& keyPrefix) {
90  ForeignStorageMgr::getChunkMetadataVecForKeyPrefix(chunk_metadata, keyPrefix);
91  getDataWrapper(keyPrefix)->serializeDataWrapperInternals(
93 }
94 
96  const ChunkKey& table_key,
97  const ChunkMetadataVector& chunk_metadata) {
98  getDataWrapper(table_key)->restoreDataWrapperInternals(
100  chunk_metadata);
101 }
102 
104  const bool evict_cached_entries) {
105  CHECK(is_table_key(table_key));
108  evict_cached_entries ? disk_cache_->clearForTablePrefix(table_key)
109  : refreshTableInCache(table_key);
110 }
111 
113  CHECK(is_table_key(table_key));
114 
115  // Before we can refresh a table we should make sure it has recovered any data
116  // if the table has been unused since the last server restart.
117  if (!disk_cache_->hasCachedMetadataForKeyPrefix(table_key)) {
118  ChunkMetadataVector old_cached_metadata;
119  disk_cache_->recoverCacheForTable(old_cached_metadata, table_key);
120  }
121 
122  // Preserve the list of which chunks were cached per table to refresh after clear.
123  std::vector<ChunkKey> old_chunk_keys =
125  auto catalog =
127  CHECK(catalog);
128  bool append_mode =
129  catalog->getForeignTableUnlocked(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode();
130 
131  append_mode ? refreshAppendTableInCache(table_key, old_chunk_keys)
132  : refreshNonAppendTableInCache(table_key, old_chunk_keys);
133 }
134 
136  // Determine last fragment ID
137  int last_frag_id = 0;
138  if (disk_cache_->hasCachedMetadataForKeyPrefix(table_key)) {
139  ChunkMetadataVector cached_metadata;
140  disk_cache_->getCachedMetadataVecForKeyPrefix(cached_metadata, table_key);
141  for (const auto& [key, metadata] : cached_metadata) {
142  last_frag_id = std::max(last_frag_id, key[CHUNK_KEY_FRAGMENT_IDX]);
143  }
144  }
145  return last_frag_id;
146 }
147 
149  const ChunkKey& table_key,
150  const std::vector<ChunkKey>& old_chunk_keys) {
151  CHECK(is_table_key(table_key));
153  int last_frag_id = getHighestCachedFragId(table_key);
154 
155  ChunkMetadataVector storage_metadata;
156  getChunkMetadataVecForKeyPrefix(storage_metadata, table_key);
157  try {
158  disk_cache_->cacheMetadataWithFragIdGreaterOrEqualTo(storage_metadata, last_frag_id);
159  refreshChunksInCacheByFragment(old_chunk_keys, last_frag_id);
160  } catch (std::runtime_error& e) {
162  }
163 }
164 
166  const ChunkKey& table_key,
167  const std::vector<ChunkKey>& old_chunk_keys) {
168  CHECK(is_table_key(table_key));
169  ChunkMetadataVector storage_metadata;
170  disk_cache_->clearForTablePrefix(table_key);
171  getChunkMetadataVecForKeyPrefix(storage_metadata, table_key);
172 
173  try {
174  disk_cache_->cacheMetadataVec(storage_metadata);
175  refreshChunksInCacheByFragment(old_chunk_keys, 0);
176  } catch (std::runtime_error& e) {
178  }
179 }
180 
182  const std::vector<ChunkKey>& old_chunk_keys,
183  int start_frag_id) {
184  int64_t total_time{0};
185  auto fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
186 
187  if (old_chunk_keys.empty()) {
188  return;
189  }
190  // Iterate through previously cached chunks and re-cache them. Caching is
191  // done one fragment at a time, for all applicable chunks in the fragment.
192  std::map<ChunkKey, AbstractBuffer*> optional_buffers;
193  std::vector<ChunkKey> chunk_keys_to_be_cached;
194  auto fragment_id = old_chunk_keys[0][CHUNK_KEY_FRAGMENT_IDX];
195  const ChunkKey table_key{get_table_key(old_chunk_keys[0])};
196  std::vector<ChunkKey> chunk_keys_in_fragment;
197  for (const auto& chunk_key : old_chunk_keys) {
198  if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] < start_frag_id) {
199  continue;
200  }
201  if (disk_cache_->isMetadataCached(chunk_key)) {
202  if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] != fragment_id) {
203  if (chunk_keys_in_fragment.size() > 0) {
204  auto required_buffers =
205  disk_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
206  getDataWrapper(table_key)->populateChunkBuffers(required_buffers,
207  optional_buffers);
208  chunk_keys_in_fragment.clear();
209  }
210  // At this point, cache buffers for refreshable chunks in the last fragment
211  // have been populated. Exit if the max refresh time has been exceeded.
212  // Otherwise, move to the next fragment.
213  auto current_time = std::chrono::high_resolution_clock::now();
214  total_time += std::chrono::duration_cast<std::chrono::seconds>(
215  current_time - fragment_refresh_start_time)
216  .count();
217  if (total_time >= MAX_REFRESH_TIME_IN_SECONDS) {
218  LOG(WARNING) << "Refresh time exceeded for table key: { " << table_key[0]
219  << ", " << table_key[1] << " } after fragment id: " << fragment_id;
220  break;
221  } else {
222  fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
223  }
224  fragment_id = chunk_key[CHUNK_KEY_FRAGMENT_IDX];
225  }
226  // Key may have been cached during scan
227  if (disk_cache_->getCachedChunkIfExists(chunk_key) == nullptr) {
228  if (is_varlen_key(chunk_key)) {
229  CHECK(is_varlen_data_key(chunk_key));
230  ChunkKey index_chunk_key{chunk_key[CHUNK_KEY_DB_IDX],
231  chunk_key[CHUNK_KEY_TABLE_IDX],
232  chunk_key[CHUNK_KEY_COLUMN_IDX],
233  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
234  2};
235  chunk_keys_in_fragment.emplace_back(index_chunk_key);
236  chunk_keys_to_be_cached.emplace_back(index_chunk_key);
237  }
238  chunk_keys_in_fragment.emplace_back(chunk_key);
239  chunk_keys_to_be_cached.emplace_back(chunk_key);
240  }
241  }
242  }
243  if (chunk_keys_in_fragment.size() > 0) {
244  auto required_buffers =
245  disk_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
246  getDataWrapper(table_key)->populateChunkBuffers(required_buffers, optional_buffers);
247  }
248  if (chunk_keys_to_be_cached.size() > 0) {
249  disk_cache_->cacheTableChunks(chunk_keys_to_be_cached);
250  }
251 }
252 
254  const ChunkKey& chunk_key) {
255  ChunkKey table_key = get_table_key(chunk_key);
256  if (createDataWrapperIfNotExists(table_key)) {
257  ChunkMetadataVector chunk_metadata;
258  if (disk_cache_->hasCachedMetadataForKeyPrefix(table_key)) {
259  disk_cache_->getCachedMetadataVecForKeyPrefix(chunk_metadata, table_key);
260  recoverDataWrapperFromDisk(table_key, chunk_metadata);
261  } else {
262  getDataWrapper(table_key)->populateChunkMetadata(chunk_metadata);
263  }
264  }
265 }
266 
267 } // namespace foreign_storage
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys)
void refreshAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
#define LOG(tag)
Definition: Logger.h:188
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
void refreshTableInCache(const ChunkKey &table_key)
AbstractBuffer * getCachedChunkIfExists(const ChunkKey &)
void refreshNonAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
std::string getCacheDirectoryForTablePrefix(const ChunkKey &) const
static SysCatalog & instance()
Definition: SysCatalog.h:291
void createOrRecoverDataWrapperIfNotExists(const ChunkKey &chunk_key)
std::map< ChunkKey, AbstractBuffer * > getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
void refreshChunksInCacheByFragment(const std::vector< ChunkKey > &old_chunk_keys, int last_frag_id)
void recoverDataWrapperFromDisk(const ChunkKey &table_key, const ChunkMetadataVector &chunk_metadata)
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:61
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries) override
void cacheMetadataVec(const ChunkMetadataVector &)
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
bool isMetadataCached(const ChunkKey &) const
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:65
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
#define CHECK(condition)
Definition: Logger.h:197
static constexpr char const * CSV
Definition: ForeignServer.h:37
std::vector< int > ChunkKey
Definition: types.h:37
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
int getHighestCachedFragId(const ChunkKey &table_key)