OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CachingForeignStorageMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "Catalog/ForeignTable.h"
19 #include "CsvDataWrapper.h"
21 #include "ForeignTableSchema.h"
22 #ifdef ENABLE_IMPORT_PARQUET
23 #include "ParquetDataWrapper.h"
24 #endif
25 
26 namespace foreign_storage {
27 
28 namespace {
29 constexpr int64_t MAX_REFRESH_TIME_IN_SECONDS = 60 * 60;
30 } // namespace
31 
33  : ForeignStorageMgr(), disk_cache_(cache) {
35 }
36 
38  ForeignDataWrapper& data_wrapper,
39  ChunkToBufferMap& required_buffers,
40  ChunkToBufferMap& optional_buffers) {
41  try {
42  data_wrapper.populateChunkBuffers(required_buffers, optional_buffers);
43  } catch (const std::runtime_error& error) {
44  // clear any partially loaded but failed chunks (there may be some
45  // fully-loaded chunks as well but they will be cleared conservatively
46  // anyways)
47  for (const auto& [chunk_key, buffer] : required_buffers) {
48  if (auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
49  file_buffer->freeChunkPages();
50  }
51  }
52  for (const auto& [chunk_key, buffer] : optional_buffers) {
53  if (auto file_buffer = dynamic_cast<File_Namespace::FileBuffer*>(buffer)) {
54  file_buffer->freeChunkPages();
55  }
56  }
57 
58  throw ForeignStorageException(error.what());
59  }
60 }
61 
63  AbstractBuffer* destination_buffer,
64  const size_t num_bytes) {
65  CHECK(destination_buffer);
66  CHECK(!destination_buffer->isDirty());
67 
69 
70  // TODO: Populate optional buffers as part of CSV performance improvement
71  std::vector<ChunkKey> chunk_keys = get_keys_vec_from_table(chunk_key);
72  std::vector<ChunkKey> optional_keys;
73  ChunkToBufferMap optional_buffers;
74 
75  // Use hints to prefetch other chunks in fragment into cache
76  auto& data_wrapper = *getDataWrapper(chunk_key);
77  std::set<ChunkKey> optional_set;
78  getOptionalChunkKeySet(optional_set,
79  chunk_key,
80  get_keys_set_from_table(chunk_key),
81  data_wrapper.getCachedParallelismLevel());
82  for (const auto& key : optional_set) {
83  if (disk_cache_->getCachedChunkIfExists(key) == nullptr) {
84  optional_keys.emplace_back(key);
85  }
86  }
87 
88  if (optional_keys.size()) {
89  optional_buffers = disk_cache_->getChunkBuffersForCaching(optional_keys);
90  }
91 
92  ChunkToBufferMap required_buffers = disk_cache_->getChunkBuffersForCaching(chunk_keys);
93  CHECK(required_buffers.find(chunk_key) != required_buffers.end());
94  populateChunkBuffersSafely(data_wrapper, required_buffers, optional_buffers);
95  disk_cache_->cacheTableChunks(chunk_keys);
96  if (optional_keys.size()) {
97  disk_cache_->cacheTableChunks(optional_keys);
98  }
99 
100  AbstractBuffer* buffer = required_buffers.at(chunk_key);
101  CHECK(buffer);
102 
103  buffer->copyTo(destination_buffer, num_bytes);
104 }
105 
107  ChunkMetadataVector& chunk_metadata,
108  const ChunkKey& keyPrefix) {
109  auto [db_id, tb_id] = get_table_prefix(keyPrefix);
110  ForeignStorageMgr::getChunkMetadataVecForKeyPrefix(chunk_metadata, keyPrefix);
111  getDataWrapper(keyPrefix)->serializeDataWrapperInternals(
113 }
114 
116  const ChunkKey& table_key,
117  const ChunkMetadataVector& chunk_metadata) {
118  auto [db_id, tb_id] = get_table_prefix(table_key);
119  getDataWrapper(table_key)->restoreDataWrapperInternals(
121  chunk_metadata);
122 }
123 
125  const bool evict_cached_entries) {
126  CHECK(is_table_key(table_key));
129  if (evict_cached_entries) {
130  clearTable(table_key);
131  } else {
132  refreshTableInCache(table_key);
133  }
134 }
135 
137  CHECK(is_table_key(table_key));
138 
139  // Before we can refresh a table we should make sure it has recovered any data
140  // if the table has been unused since the last server restart.
141  if (!disk_cache_->hasCachedMetadataForKeyPrefix(table_key)) {
142  ChunkMetadataVector old_cached_metadata;
143  disk_cache_->recoverCacheForTable(old_cached_metadata, table_key);
144  }
145 
146  // Preserve the list of which chunks were cached per table to refresh after clear.
147  std::vector<ChunkKey> old_chunk_keys =
149  auto catalog =
151  CHECK(catalog);
152  bool append_mode =
153  catalog->getForeignTableUnlocked(table_key[CHUNK_KEY_TABLE_IDX])->isAppendMode();
154 
155  append_mode ? refreshAppendTableInCache(table_key, old_chunk_keys)
156  : refreshNonAppendTableInCache(table_key, old_chunk_keys);
157 }
158 
160  disk_cache_->clearForTablePrefix(table_key);
161 
163  auto [db_id, tb_id] = get_table_prefix(table_key);
164  // Make sure metadata file is gone
165  CHECK(!boost::filesystem::exists(disk_cache_->getCacheDirectoryForTable(db_id, tb_id) +
167 }
168 
170  // Determine last fragment ID
171  int last_frag_id = 0;
172  if (disk_cache_->hasCachedMetadataForKeyPrefix(table_key)) {
173  ChunkMetadataVector cached_metadata;
174  disk_cache_->getCachedMetadataVecForKeyPrefix(cached_metadata, table_key);
175  for (const auto& [key, metadata] : cached_metadata) {
176  last_frag_id = std::max(last_frag_id, key[CHUNK_KEY_FRAGMENT_IDX]);
177  }
178  }
179  return last_frag_id;
180 }
181 
183  const ChunkKey& table_key,
184  const std::vector<ChunkKey>& old_chunk_keys) {
185  CHECK(is_table_key(table_key));
187  int last_frag_id = getHighestCachedFragId(table_key);
188 
189  ChunkMetadataVector storage_metadata;
190  getChunkMetadataVecForKeyPrefix(storage_metadata, table_key);
191  try {
192  disk_cache_->cacheMetadataWithFragIdGreaterOrEqualTo(storage_metadata, last_frag_id);
193  refreshChunksInCacheByFragment(old_chunk_keys, last_frag_id);
194  } catch (std::runtime_error& e) {
196  }
197 }
198 
200  const ChunkKey& table_key,
201  const std::vector<ChunkKey>& old_chunk_keys) {
202  CHECK(is_table_key(table_key));
203  ChunkMetadataVector storage_metadata;
204  clearTable(table_key);
205  getChunkMetadataVecForKeyPrefix(storage_metadata, table_key);
206 
207  try {
208  disk_cache_->cacheMetadataVec(storage_metadata);
209  refreshChunksInCacheByFragment(old_chunk_keys, 0);
210  } catch (std::runtime_error& e) {
212  }
213 }
214 
216  const std::vector<ChunkKey>& old_chunk_keys,
217  int start_frag_id) {
218  int64_t total_time{0};
219  auto fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
220 
221  if (old_chunk_keys.empty()) {
222  return;
223  }
224  // Iterate through previously cached chunks and re-cache them. Caching is
225  // done one fragment at a time, for all applicable chunks in the fragment.
226  ChunkToBufferMap optional_buffers;
227  std::vector<ChunkKey> chunk_keys_to_be_cached;
228  auto fragment_id = old_chunk_keys[0][CHUNK_KEY_FRAGMENT_IDX];
229  const ChunkKey table_key{get_table_key(old_chunk_keys[0])};
230  std::vector<ChunkKey> chunk_keys_in_fragment;
231  for (const auto& chunk_key : old_chunk_keys) {
232  if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] < start_frag_id) {
233  continue;
234  }
235  if (disk_cache_->isMetadataCached(chunk_key)) {
236  if (chunk_key[CHUNK_KEY_FRAGMENT_IDX] != fragment_id) {
237  if (chunk_keys_in_fragment.size() > 0) {
238  auto required_buffers =
239  disk_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
241  *getDataWrapper(table_key), required_buffers, optional_buffers);
242  chunk_keys_in_fragment.clear();
243  }
244  // At this point, cache buffers for refreshable chunks in the last fragment
245  // have been populated. Exit if the max refresh time has been exceeded.
246  // Otherwise, move to the next fragment.
247  auto current_time = std::chrono::high_resolution_clock::now();
248  total_time += std::chrono::duration_cast<std::chrono::seconds>(
249  current_time - fragment_refresh_start_time)
250  .count();
251  if (total_time >= MAX_REFRESH_TIME_IN_SECONDS) {
252  LOG(WARNING) << "Refresh time exceeded for table key: { " << table_key[0]
253  << ", " << table_key[1] << " } after fragment id: " << fragment_id;
254  break;
255  } else {
256  fragment_refresh_start_time = std::chrono::high_resolution_clock::now();
257  }
258  fragment_id = chunk_key[CHUNK_KEY_FRAGMENT_IDX];
259  }
260  // Key may have been cached during scan
261  if (disk_cache_->getCachedChunkIfExists(chunk_key) == nullptr) {
262  if (is_varlen_key(chunk_key)) {
263  CHECK(is_varlen_data_key(chunk_key));
264  ChunkKey index_chunk_key{chunk_key[CHUNK_KEY_DB_IDX],
265  chunk_key[CHUNK_KEY_TABLE_IDX],
266  chunk_key[CHUNK_KEY_COLUMN_IDX],
267  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
268  2};
269  chunk_keys_in_fragment.emplace_back(index_chunk_key);
270  chunk_keys_to_be_cached.emplace_back(index_chunk_key);
271  }
272  chunk_keys_in_fragment.emplace_back(chunk_key);
273  chunk_keys_to_be_cached.emplace_back(chunk_key);
274  }
275  }
276  }
277  if (chunk_keys_in_fragment.size() > 0) {
278  auto required_buffers =
279  disk_cache_->getChunkBuffersForCaching(chunk_keys_in_fragment);
281  *getDataWrapper(table_key), required_buffers, optional_buffers);
282  }
283  if (chunk_keys_to_be_cached.size() > 0) {
284  disk_cache_->cacheTableChunks(chunk_keys_to_be_cached);
285  }
286 }
287 
289  const ChunkKey& chunk_key) {
290  ChunkKey table_key = get_table_key(chunk_key);
291  if (createDataWrapperIfNotExists(table_key)) {
292  ChunkMetadataVector chunk_metadata;
293  if (disk_cache_->hasCachedMetadataForKeyPrefix(table_key)) {
294  disk_cache_->getCachedMetadataVecForKeyPrefix(chunk_metadata, table_key);
295  recoverDataWrapperFromDisk(table_key, chunk_metadata);
296  } else {
297  getDataWrapper(table_key)->populateChunkMetadata(chunk_metadata);
298  }
299  }
300 }
301 
302 } // namespace foreign_storage
std::vector< ChunkKey > get_keys_vec_from_table(const ChunkKey &destination_chunk_key)
void refreshAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
std::vector< int > ChunkKey
Definition: types.h:37
bool isMetadataCached(const ChunkKey &) const
std::shared_ptr< ForeignDataWrapper > getDataWrapper(const ChunkKey &chunk_key)
std::set< ChunkKey > get_keys_set_from_table(const ChunkKey &destination_chunk_key)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:70
#define LOG(tag)
Definition: Logger.h:194
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
static void checkIfS3NeedsToBeEnabled(const ChunkKey &chunk_key)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
void refreshTableInCache(const ChunkKey &table_key)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
void refreshNonAppendTableInCache(const ChunkKey &table_key, const std::vector< ChunkKey > &old_chunk_keys)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
static SysCatalog & instance()
Definition: SysCatalog.h:292
void createOrRecoverDataWrapperIfNotExists(const ChunkKey &chunk_key)
void getOptionalChunkKeySet(std::set< ChunkKey > &optional_chunk_keys, const ChunkKey &chunk_key, const std::set< ChunkKey > &required_chunk_keys, const ForeignDataWrapper::ParallelismLevel parallelism_level)
int count
void refreshChunksInCacheByFragment(const std::vector< ChunkKey > &old_chunk_keys, int last_frag_id)
void recoverDataWrapperFromDisk(const ChunkKey &table_key, const ChunkMetadataVector &chunk_metadata)
const std::string wrapper_file_name
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
void populateChunkBuffersSafely(ForeignDataWrapper &data_wrapper, ChunkToBufferMap &required_buffers, ChunkToBufferMap &optional_buffers)
void refreshTable(const ChunkKey &table_key, const bool evict_cached_entries) override
void cacheMetadataVec(const ChunkMetadataVector &)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void clearDataWrapper(const ChunkKey &table_key)
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &chunk_key_prefix) override
std::string getCacheDirectoryForTable(int db_id, int tb_id) const
virtual void populateChunkBuffers(const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers)=0
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:57
#define CHECK(condition)
Definition: Logger.h:203
File_Namespace::FileBuffer * getCachedChunkIfExists(const ChunkKey &)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void clearTempChunkBufferMapEntriesForTable(const ChunkKey &table_key)
ChunkToBufferMap getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
bool createDataWrapperIfNotExists(const ChunkKey &chunk_key)
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:66
int getHighestCachedFragId(const ChunkKey &table_key)