OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageCache.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  TODO(Misiu): A lot of methods here can be made asyncronous. It may be worth an
19  investigation to determine if it's worth adding async versions of them for performance
20  reasons.
21 */
22 
23 #include "ForeignStorageCache.h"
24 #include "Shared/File.h"
25 #include "Shared/measure.h"
26 
27 namespace foreign_storage {
28 using read_lock = mapd_shared_lock<mapd_shared_mutex>;
29 using write_lock = mapd_unique_lock<mapd_shared_mutex>;
30 
31 namespace {
32 template <typename Func, typename T>
34  T& chunk_collection,
35  const ChunkKey& chunk_prefix) {
36  ChunkKey upper_prefix(chunk_prefix);
37  upper_prefix.push_back(std::numeric_limits<int>::max());
38  auto end_it = chunk_collection.upper_bound(static_cast<const ChunkKey>(upper_prefix));
39  for (auto chunk_it = chunk_collection.lower_bound(chunk_prefix); chunk_it != end_it;
40  ++chunk_it) {
41  func(*chunk_it);
42  }
43 }
44 
46  buffer->initEncoder(meta->sqlType);
47  buffer->setSize(meta->numBytes);
48  buffer->getEncoder()->setNumElems(meta->numElements);
49  buffer->getEncoder()->resetChunkStats(meta->chunkStats);
50  buffer->setUpdated();
51 }
52 } // namespace
53 
55  : num_chunks_added_(0), num_metadata_added_(0) {
56  validatePath(config.path);
57  caching_file_mgr_ = std::make_unique<File_Namespace::CachingFileMgr>(
58  config.path, config.num_reader_threads, config.page_size);
59 }
60 
62  write_lock meta_lock(metadata_mutex_);
63  write_lock chunk_lock(chunks_mutex_);
64  caching_file_mgr_->deleteBufferIfExists(chunk_key);
65  cached_chunks_.erase(chunk_key);
66  cached_metadata_.erase(chunk_key);
67 }
68 
69 void ForeignStorageCache::cacheChunk(const ChunkKey& chunk_key, AbstractBuffer* buffer) {
70  // We should only be caching buffers that are in sync with storage.
71  CHECK(!buffer->isDirty());
73  if (buffer->size() == 0) {
74  // If we are writing an empty buffer, just delete it from the cache entirely.
75  deleteBufferIfExists(chunk_key);
76  } else {
77  // Replace the existing chunk with a new version.
78  write_lock meta_lock(metadata_mutex_);
79  write_lock chunk_lock(chunks_mutex_);
80  buffer->setAppended();
81  caching_file_mgr_->putBuffer(chunk_key, buffer);
82  cached_metadata_.emplace(chunk_key);
83  cached_chunks_.emplace(chunk_key);
84  CHECK(!buffer->isDirty());
85  }
86  caching_file_mgr_->checkpoint(chunk_key[CHUNK_KEY_DB_IDX],
87  chunk_key[CHUNK_KEY_TABLE_IDX]);
88 }
89 
90 void ForeignStorageCache::cacheTableChunks(const std::vector<ChunkKey>& chunk_keys) {
91  auto timer = DEBUG_TIMER(__func__);
93  CHECK(!chunk_keys.empty());
94 
95  auto db_id = chunk_keys[0][CHUNK_KEY_DB_IDX];
96  auto table_id = chunk_keys[0][CHUNK_KEY_TABLE_IDX];
97  const ChunkKey table_key{db_id, table_id};
98 
99  for (const auto& chunk_key : chunk_keys) {
100  CHECK_EQ(db_id, chunk_key[CHUNK_KEY_DB_IDX]);
101  CHECK_EQ(table_id, chunk_key[CHUNK_KEY_TABLE_IDX]);
102  CHECK(caching_file_mgr_->isBufferOnDevice(chunk_key));
104  cached_chunks_.emplace(chunk_key);
105  }
106  caching_file_mgr_->checkpoint(db_id, table_id);
107 }
108 
110  const ChunkKey& chunk_key) {
111  {
112  read_lock lock(chunks_mutex_);
113  // We do this instead of calling getBuffer so that we don't create a fileMgr if the
114  // chunk doesn't exist.
115  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
116  return nullptr;
117  }
118  }
119  return caching_file_mgr_->getBuffer(chunk_key);
120 }
121 
122 bool ForeignStorageCache::isMetadataCached(const ChunkKey& chunk_key) const {
124  return (cached_metadata_.find(chunk_key) != cached_metadata_.end());
125 }
126 
128  const ChunkKey& table_key) {
130  CHECK(meta_vec.size() == 0);
131  CHECK(is_table_key(table_key));
132 
133  caching_file_mgr_->getChunkMetadataVecForKeyPrefix(meta_vec, table_key);
134  for (auto& [chunk_key, metadata] : meta_vec) {
135  cached_metadata_.emplace(chunk_key);
136  // If there is no page count then the chunk was metadata only and should not be
137  // cached.
138  if (const auto& buf = caching_file_mgr_->getBuffer(chunk_key); buf->pageCount() > 0) {
139  cached_chunks_.emplace(chunk_key);
140  }
141 
142  if (is_varlen_key(chunk_key)) {
143  // Metadata is only available for the data chunk, but look for the index as well
144  CHECK(is_varlen_data_key(chunk_key));
145  ChunkKey index_chunk_key = {chunk_key[CHUNK_KEY_DB_IDX],
146  chunk_key[CHUNK_KEY_TABLE_IDX],
147  chunk_key[CHUNK_KEY_COLUMN_IDX],
148  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
149  2};
150 
151  if (const auto& buf = caching_file_mgr_->getBuffer(index_chunk_key);
152  buf->pageCount() > 0) {
153  cached_chunks_.emplace(index_chunk_key);
154  }
155  }
156  }
157  return (meta_vec.size() > 0);
158 }
159 
161  write_lock chunk_lock(chunks_mutex_);
162  evictThenEraseChunkUnlocked(chunk_key);
163 }
164 
166  const ChunkKey table_prefix = get_table_key(chunk_key);
167  eraseChunk(chunk_key);
168 }
169 
171  auto timer = DEBUG_TIMER(__func__);
172  if (metadata_vec.empty()) {
173  return;
174  }
175  auto first_chunk_key = metadata_vec.begin()->first;
176  write_lock meta_lock(metadata_mutex_);
177  write_lock chunk_lock(chunks_mutex_);
178  for (auto& [chunk_key, metadata] : metadata_vec) {
179  CHECK(in_same_table(chunk_key, first_chunk_key));
180  cached_metadata_.emplace(chunk_key);
181  AbstractBuffer* buf;
182  AbstractBuffer* index_buffer = nullptr;
183  ChunkKey index_chunk_key;
184  if (is_varlen_key(chunk_key)) {
185  // For variable length chunks, metadata is associated with the data chunk.
186  CHECK(is_varlen_data_key(chunk_key));
187  index_chunk_key = {chunk_key[CHUNK_KEY_DB_IDX],
188  chunk_key[CHUNK_KEY_TABLE_IDX],
189  chunk_key[CHUNK_KEY_COLUMN_IDX],
190  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
191  2};
192  }
193  bool chunk_in_cache = false;
194  if (!caching_file_mgr_->isBufferOnDevice(chunk_key)) {
195  buf = caching_file_mgr_->createBuffer(chunk_key);
196 
197  if (!index_chunk_key.empty()) {
198  CHECK(!caching_file_mgr_->isBufferOnDevice(index_chunk_key));
199  index_buffer = caching_file_mgr_->createBuffer(index_chunk_key);
200  CHECK(index_buffer);
201  }
202  } else {
203  buf = caching_file_mgr_->getBuffer(chunk_key);
204 
205  if (!index_chunk_key.empty()) {
206  CHECK(caching_file_mgr_->isBufferOnDevice(index_chunk_key));
207  index_buffer = caching_file_mgr_->getBuffer(index_chunk_key);
208  CHECK(index_buffer);
209  }
210 
211  // We should have already cleared the data unless we are appending
212  // If the buffer metadata has changed, we need to remove this chunk
213  if (buf->getEncoder() != nullptr) {
214  const std::shared_ptr<ChunkMetadata> buf_metadata =
215  std::make_shared<ChunkMetadata>();
216  buf->getEncoder()->getMetadata(buf_metadata);
217  chunk_in_cache = *metadata.get() == *buf_metadata;
218  }
219  }
220 
221  if (!chunk_in_cache) {
222  set_metadata_for_buffer(buf, metadata.get());
223  evictThenEraseChunkUnlocked(chunk_key);
224 
225  if (!index_chunk_key.empty()) {
226  CHECK(index_buffer);
227  index_buffer->setUpdated();
228  evictThenEraseChunkUnlocked(index_chunk_key);
229  }
230  }
232  }
233  caching_file_mgr_->checkpoint(first_chunk_key[CHUNK_KEY_DB_IDX],
234  first_chunk_key[CHUNK_KEY_TABLE_IDX]);
235 }
236 
238  ChunkMetadataVector& metadata_vec,
239  const ChunkKey& chunk_prefix) const {
240  auto timer = DEBUG_TIMER(__func__);
241  read_lock r_lock(metadata_mutex_);
243  [&metadata_vec, this](auto chunk) {
244  std::shared_ptr<ChunkMetadata> buf_metadata = std::make_shared<ChunkMetadata>();
245  caching_file_mgr_->getBuffer(chunk)->getEncoder()->getMetadata(buf_metadata);
246  metadata_vec.push_back(std::make_pair(chunk, buf_metadata));
247  },
249  chunk_prefix);
250 }
251 
253  const ChunkKey& chunk_prefix) const {
255  // We don't use iterateOvermatchingPrefix() here because we want to exit early if
256  // possible.
257  ChunkKey upper_prefix(chunk_prefix);
258  upper_prefix.push_back(std::numeric_limits<int>::max());
259  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
260  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;
261  ++meta_it) {
262  return true;
263  }
264  return false;
265 }
266 
268  CHECK(is_table_key(chunk_prefix));
269  auto timer = DEBUG_TIMER(__func__);
270  ChunkKey upper_prefix(chunk_prefix);
271  upper_prefix.push_back(std::numeric_limits<int>::max());
272  {
273  write_lock w_lock(chunks_mutex_);
274  // Delete chunks for prefix
275  auto end_it = cached_chunks_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
276  for (auto chunk_it = cached_chunks_.lower_bound(chunk_prefix); chunk_it != end_it;) {
277  chunk_it = evictChunkByIterator(chunk_it);
278  }
279  }
280  {
281  write_lock w_lock(metadata_mutex_);
282  // Delete metadata for prefix
283  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
284  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;) {
285  meta_it = cached_metadata_.erase(meta_it);
286  }
287  }
288  caching_file_mgr_->clearForTable(chunk_prefix[CHUNK_KEY_DB_IDX],
289  chunk_prefix[CHUNK_KEY_TABLE_IDX]);
290 }
291 
293  auto timer = DEBUG_TIMER(__func__);
294  std::set<ChunkKey> table_keys;
295  {
296  write_lock w_lock(chunks_mutex_);
297  for (auto chunk_it = cached_chunks_.begin(); chunk_it != cached_chunks_.end();) {
298  chunk_it = evictChunkByIterator(chunk_it);
299  }
300  }
301  {
302  write_lock w_lock(metadata_mutex_);
303  for (auto meta_it = cached_metadata_.begin(); meta_it != cached_metadata_.end();) {
304  table_keys.emplace(ChunkKey{(*meta_it)[0], (*meta_it)[1]});
305  meta_it = cached_metadata_.erase(meta_it);
306  }
307  }
308  // FileMgrs do not clean up after themselves nicely, so we need to close all their disk
309  // resources and then re-create the CachingFileMgr to reset it.
310  caching_file_mgr_->closeRemovePhysical();
311  boost::filesystem::create_directory(caching_file_mgr_->getFileMgrBasePath());
312  caching_file_mgr_ = std::make_unique<File_Namespace::CachingFileMgr>(
313  caching_file_mgr_->getFileMgrBasePath(), caching_file_mgr_->getNumReaderThreads());
314 }
315 
317  const ChunkKey& chunk_prefix) const {
318  read_lock r_lock(chunks_mutex_);
319  std::vector<ChunkKey> ret_vec;
321  [&ret_vec](auto chunk) { ret_vec.push_back(chunk); }, cached_chunks_, chunk_prefix);
322  return ret_vec;
323 }
324 
326  const std::vector<ChunkKey>& chunk_keys) const {
327  ChunkToBufferMap chunk_buffer_map;
328  read_lock lock(chunks_mutex_);
329  for (const auto& chunk_key : chunk_keys) {
330  CHECK(cached_chunks_.find(chunk_key) == cached_chunks_.end());
331  CHECK(caching_file_mgr_->isBufferOnDevice(chunk_key));
332  chunk_buffer_map[chunk_key] = caching_file_mgr_->getBuffer(chunk_key);
333  CHECK(dynamic_cast<File_Namespace::FileBuffer*>(chunk_buffer_map[chunk_key]));
334  CHECK_EQ(chunk_buffer_map[chunk_key]->pageCount(), static_cast<size_t>(0));
335 
336  // Clear all buffer metadata
337  chunk_buffer_map[chunk_key]->resetToEmpty();
338  }
339  return chunk_buffer_map;
340 }
341 
342 // Private functions. Locks should be acquired in the public interface before calling
343 // these functions.
345  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
346  return;
347  }
348  File_Namespace::FileBuffer* file_buffer =
349  static_cast<File_Namespace::FileBuffer*>(caching_file_mgr_->getBuffer(chunk_key));
350  file_buffer->freeChunkPages();
351  cached_chunks_.erase(chunk_key);
352 }
353 
354 std::set<ChunkKey>::iterator ForeignStorageCache::evictChunkByIterator(
355  const std::set<ChunkKey>::iterator& chunk_it) {
356  File_Namespace::FileBuffer* file_buffer =
357  static_cast<File_Namespace::FileBuffer*>(caching_file_mgr_->getBuffer(*chunk_it));
358  file_buffer->freeChunkPages();
359  return cached_chunks_.erase(chunk_it);
360 }
361 
363  std::string ret_string = "Cached chunks:\n";
364  for (const auto& chunk_key : cached_chunks_) {
365  ret_string += " " + show_chunk(chunk_key) + "\n";
366  }
367  return ret_string;
368 }
369 
371  std::string ret_string = "Cached ChunkMetadata:\n";
372  for (const auto& meta_key : cached_metadata_) {
373  ret_string += " " + show_chunk(meta_key) + "\n";
374  }
375  return ret_string;
376 }
377 
378 void ForeignStorageCache::validatePath(const std::string& base_path) const {
379  // check if base_path already exists, and if not create one
380  boost::filesystem::path path(base_path);
381  if (boost::filesystem::exists(path)) {
382  if (!boost::filesystem::is_directory(path)) {
383  throw std::runtime_error{
384  "cache path \"" + base_path +
385  "\" is not a directory. Please specify a valid directory "
386  "with --disk_cache_path=<path>, or use the default location."};
387  }
388  } else { // data directory does not exist
389  if (!boost::filesystem::create_directory(path)) {
390  throw std::runtime_error{
391  "could not create directory at cache path \"" + base_path +
392  "\". Please specify a valid directory location "
393  "with --disk_cache_path=<path> or use the default location."};
394  }
395  }
396 }
397 
399  const ChunkMetadataVector& metadata_vec,
400  const int frag_id) {
401  // Only re-cache last fragment and above
402  ChunkMetadataVector new_metadata_vec;
403  for (const auto& chunk_metadata : metadata_vec) {
404  if (chunk_metadata.first[CHUNK_KEY_FRAGMENT_IDX] >= frag_id) {
405  new_metadata_vec.push_back(chunk_metadata);
406  }
407  }
408  cacheMetadataVec(new_metadata_vec);
409 }
410 
412  const ChunkKey& chunk_key,
413  bool is_new_buffer) {
414  if (!is_new_buffer) {
415  CHECK(caching_file_mgr_->isBufferOnDevice(chunk_key));
416  return caching_file_mgr_->getBuffer(chunk_key);
417  } else {
418  CHECK(!caching_file_mgr_->isBufferOnDevice(chunk_key));
419  return caching_file_mgr_->createBuffer(chunk_key);
420  }
421 }
422 
423 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
bool isMetadataCached(const ChunkKey &) const
std::set< ChunkKey >::iterator evictChunkByIterator(const std::set< ChunkKey >::iterator &chunk_it)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:70
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:58
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setNumElems(const size_t num_elems)
Definition: Encoder.h:234
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
std::string show_chunk(const ChunkKey &key)
Definition: types.h:85
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:223
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:227
ForeignStorageCache(const DiskCacheConfig &config)
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
void iterate_over_matching_prefix(Func func, T &chunk_collection, const ChunkKey &chunk_prefix)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
void cacheMetadataVec(const ChunkMetadataVector &)
std::set< ChunkKey >::iterator eraseChunk(const std::set< ChunkKey >::iterator &)
std::unique_ptr< File_Namespace::CachingFileMgr > caching_file_mgr_
void evictThenEraseChunkUnlocked(const ChunkKey &)
void deleteBufferIfExists(const ChunkKey &chunk_key)
void set_metadata_for_buffer(AbstractBuffer *buffer, ChunkMetadata *meta)
void validatePath(const std::string &) const
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
AbstractBuffer * getChunkBufferForPrecaching(const ChunkKey &chunk_key, bool is_new_buffer)
void setSize(const size_t size)
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
File_Namespace::FileBuffer * getCachedChunkIfExists(const ChunkKey &)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
ChunkToBufferMap getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
mapd_unique_lock< mapd_shared_mutex > write_lock
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
bool in_same_table(const ChunkKey &left_key, const ChunkKey &right_key)
Definition: types.h:78
SQLTypeInfo sqlType
Definition: ChunkMetadata.h:32
A selection of helper methods for File I/O.
void cacheChunk(const ChunkKey &, AbstractBuffer *)
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:66
size_t numElements
Definition: ChunkMetadata.h:34