OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageCache.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  TODO(Misiu): A lot of methods here can be made asyncronous. It may be worth an
19  investigation to determine if it's worth adding async versions of them for performance
20  reasons.
21 */
22 
23 #include "ForeignStorageCache.h"
24 #include "Shared/File.h"
25 #include "Shared/measure.h"
26 
27 namespace foreign_storage {
28 using read_lock = mapd_shared_lock<mapd_shared_mutex>;
29 using write_lock = mapd_unique_lock<mapd_shared_mutex>;
30 
31 namespace {
32 template <typename Func, typename T>
34  T& chunk_collection,
35  const ChunkKey& chunk_prefix) {
36  ChunkKey upper_prefix(chunk_prefix);
37  upper_prefix.push_back(std::numeric_limits<int>::max());
38  auto end_it = chunk_collection.upper_bound(static_cast<const ChunkKey>(upper_prefix));
39  for (auto chunk_it = chunk_collection.lower_bound(chunk_prefix); chunk_it != end_it;
40  ++chunk_it) {
41  func(*chunk_it);
42  }
43 }
44 
46  buffer->initEncoder(meta->sqlType);
47  buffer->setSize(meta->numBytes);
48  buffer->getEncoder()->setNumElems(meta->numElements);
49  buffer->getEncoder()->resetChunkStats(meta->chunkStats);
50  buffer->setUpdated();
51 }
52 } // namespace
53 
55  : num_chunks_added_(0), num_metadata_added_(0) {
56  validatePath(config.path);
57  caching_file_mgr_ = std::make_unique<File_Namespace::CachingFileMgr>(
58  config.path, config.num_reader_threads);
59 }
60 
62  write_lock meta_lock(metadata_mutex_);
63  write_lock chunk_lock(chunks_mutex_);
64  if (cached_metadata_.find(chunk_key) != cached_metadata_.end()) {
65  caching_file_mgr_->deleteBuffer(chunk_key);
66  cached_chunks_.erase(chunk_key);
67  cached_metadata_.erase(chunk_key);
68  }
69 }
70 
71 void ForeignStorageCache::cacheChunk(const ChunkKey& chunk_key, AbstractBuffer* buffer) {
72  write_lock meta_lock(metadata_mutex_);
73  write_lock chunk_lock(chunks_mutex_);
74  // We should only be caching buffers that are in sync with storage.
75  CHECK(!buffer->isDirty());
76  buffer->setUpdated();
78  caching_file_mgr_->putBuffer(chunk_key, buffer);
79  caching_file_mgr_->checkpoint();
80  cached_metadata_.emplace(chunk_key);
81  cached_chunks_.emplace(chunk_key);
82  CHECK(!buffer->isDirty());
83 }
84 
85 void ForeignStorageCache::cacheTableChunks(const std::vector<ChunkKey>& chunk_keys) {
86  auto timer = DEBUG_TIMER(__func__);
88  CHECK(!chunk_keys.empty());
89 
90  auto db_id = chunk_keys[0][CHUNK_KEY_DB_IDX];
91  auto table_id = chunk_keys[0][CHUNK_KEY_TABLE_IDX];
92  const ChunkKey table_key{db_id, table_id};
93 
94  for (const auto& chunk_key : chunk_keys) {
95  CHECK_EQ(db_id, chunk_key[CHUNK_KEY_DB_IDX]);
96  CHECK_EQ(table_id, chunk_key[CHUNK_KEY_TABLE_IDX]);
97  CHECK(caching_file_mgr_->isBufferOnDevice(chunk_key));
99  cached_chunks_.emplace(chunk_key);
100  }
101  caching_file_mgr_->checkpoint();
102 }
103 
105  {
106  read_lock lock(chunks_mutex_);
107  // We do this instead of calling getBuffer so that we don't create a fileMgr if the
108  // chunk doesn't exist.
109  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
110  return nullptr;
111  }
112  }
113  return caching_file_mgr_->getBuffer(chunk_key);
114 }
115 
116 bool ForeignStorageCache::isMetadataCached(const ChunkKey& chunk_key) const {
118  return (cached_metadata_.find(chunk_key) != cached_metadata_.end());
119 }
120 
122  const ChunkKey& table_key) {
124  CHECK(meta_vec.size() == 0);
125  CHECK(is_table_key(table_key));
126 
127  caching_file_mgr_->getChunkMetadataVecForKeyPrefix(meta_vec, table_key);
128  for (auto& [chunk_key, metadata] : meta_vec) {
129  cached_metadata_.emplace(chunk_key);
130  // If there is no page count then the chunk was metadata only and should not be
131  // cached.
132  if (const auto& buf = caching_file_mgr_->getBuffer(chunk_key); buf->pageCount() > 0) {
133  cached_chunks_.emplace(chunk_key);
134  }
135 
136  if (is_varlen_key(chunk_key)) {
137  // Metadata is only available for the data chunk, but look for the index as well
138  CHECK(is_varlen_data_key(chunk_key));
139  ChunkKey index_chunk_key = {chunk_key[CHUNK_KEY_DB_IDX],
140  chunk_key[CHUNK_KEY_TABLE_IDX],
141  chunk_key[CHUNK_KEY_COLUMN_IDX],
142  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
143  2};
144 
145  if (const auto& buf = caching_file_mgr_->getBuffer(index_chunk_key);
146  buf->pageCount() > 0) {
147  cached_chunks_.emplace(index_chunk_key);
148  }
149  }
150  }
151  return (meta_vec.size() > 0);
152 }
153 
155  write_lock chunk_lock(chunks_mutex_);
156  evictThenEraseChunkUnlocked(chunk_key);
157 }
158 
160  const ChunkKey table_prefix = get_table_key(chunk_key);
161  eraseChunk(chunk_key);
162 }
163 
165  auto timer = DEBUG_TIMER(__func__);
166  write_lock meta_lock(metadata_mutex_);
167  write_lock chunk_lock(chunks_mutex_);
168  for (auto& [chunk_key, metadata] : metadata_vec) {
169  cached_metadata_.emplace(chunk_key);
170  AbstractBuffer* buf;
171  AbstractBuffer* index_buffer = nullptr;
172  ChunkKey index_chunk_key;
173  if (is_varlen_key(chunk_key)) {
174  // For variable length chunks, metadata is associated with the data chunk.
175  CHECK(is_varlen_data_key(chunk_key));
176  index_chunk_key = {chunk_key[CHUNK_KEY_DB_IDX],
177  chunk_key[CHUNK_KEY_TABLE_IDX],
178  chunk_key[CHUNK_KEY_COLUMN_IDX],
179  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
180  2};
181  }
182  bool chunk_in_cache = false;
183  if (!caching_file_mgr_->isBufferOnDevice(chunk_key)) {
184  buf = caching_file_mgr_->createBuffer(chunk_key);
185 
186  if (!index_chunk_key.empty()) {
187  CHECK(!caching_file_mgr_->isBufferOnDevice(index_chunk_key));
188  index_buffer = caching_file_mgr_->createBuffer(index_chunk_key);
189  CHECK(index_buffer);
190  }
191  } else {
192  buf = caching_file_mgr_->getBuffer(chunk_key);
193 
194  if (!index_chunk_key.empty()) {
195  CHECK(caching_file_mgr_->isBufferOnDevice(index_chunk_key));
196  index_buffer = caching_file_mgr_->getBuffer(index_chunk_key);
197  CHECK(index_buffer);
198  }
199 
200  // We should have already cleared the data unless we are appending
201  // If the buffer metadata has changed, we need to remove this chunk
202  if (buf->getEncoder() != nullptr) {
203  const std::shared_ptr<ChunkMetadata> buf_metadata =
204  std::make_shared<ChunkMetadata>();
205  buf->getEncoder()->getMetadata(buf_metadata);
206  chunk_in_cache = *metadata.get() == *buf_metadata;
207  }
208  }
209 
210  if (!chunk_in_cache) {
211  set_metadata_for_buffer(buf, metadata.get());
212  evictThenEraseChunkUnlocked(chunk_key);
213 
214  if (!index_chunk_key.empty()) {
215  CHECK(index_buffer);
216  index_buffer->setUpdated();
217  evictThenEraseChunkUnlocked(index_chunk_key);
218  }
219  }
221  }
222  caching_file_mgr_->checkpoint();
223 }
224 
226  ChunkMetadataVector& metadata_vec,
227  const ChunkKey& chunk_prefix) const {
228  auto timer = DEBUG_TIMER(__func__);
229  read_lock r_lock(metadata_mutex_);
231  [&metadata_vec, this](auto chunk) {
232  std::shared_ptr<ChunkMetadata> buf_metadata = std::make_shared<ChunkMetadata>();
233  caching_file_mgr_->getBuffer(chunk)->getEncoder()->getMetadata(buf_metadata);
234  metadata_vec.push_back(std::make_pair(chunk, buf_metadata));
235  },
237  chunk_prefix);
238 }
239 
241  const ChunkKey& chunk_prefix) const {
243  // We don't use iterateOvermatchingPrefix() here because we want to exit early if
244  // possible.
245  ChunkKey upper_prefix(chunk_prefix);
246  upper_prefix.push_back(std::numeric_limits<int>::max());
247  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
248  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;
249  ++meta_it) {
250  return true;
251  }
252  return false;
253 }
254 
256  CHECK(is_table_key(chunk_prefix));
257  auto timer = DEBUG_TIMER(__func__);
258  ChunkKey upper_prefix(chunk_prefix);
259  upper_prefix.push_back(std::numeric_limits<int>::max());
260  {
261  write_lock w_lock(chunks_mutex_);
262  // Delete chunks for prefix
263  auto end_it = cached_chunks_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
264  for (auto chunk_it = cached_chunks_.lower_bound(chunk_prefix); chunk_it != end_it;) {
265  chunk_it = evictChunkByIterator(chunk_it);
266  }
267  }
268  {
269  write_lock w_lock(metadata_mutex_);
270  // Delete metadata for prefix
271  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
272  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;) {
273  meta_it = cached_metadata_.erase(meta_it);
274  }
275  }
276  caching_file_mgr_->clearForTable(chunk_prefix[0], chunk_prefix[1]);
277 }
278 
280  auto timer = DEBUG_TIMER(__func__);
281  std::set<ChunkKey> table_keys;
282  {
283  write_lock w_lock(chunks_mutex_);
284  for (auto chunk_it = cached_chunks_.begin(); chunk_it != cached_chunks_.end();) {
285  chunk_it = evictChunkByIterator(chunk_it);
286  }
287  }
288  {
289  write_lock w_lock(metadata_mutex_);
290  for (auto meta_it = cached_metadata_.begin(); meta_it != cached_metadata_.end();) {
291  table_keys.emplace(ChunkKey{(*meta_it)[0], (*meta_it)[1]});
292  meta_it = cached_metadata_.erase(meta_it);
293  }
294  }
295  // FileMgrs do not clean up after themselves nicely, so we need to close all their disk
296  // resources and then re-create the CachingFileMgr to reset it.
297  caching_file_mgr_->closeRemovePhysical();
298  boost::filesystem::create_directory(caching_file_mgr_->getFileMgrBasePath());
299  caching_file_mgr_ = std::make_unique<File_Namespace::CachingFileMgr>(
300  caching_file_mgr_->getFileMgrBasePath(), caching_file_mgr_->getNumReaderThreads());
301 }
302 
304  const ChunkKey& chunk_prefix) const {
305  read_lock r_lock(chunks_mutex_);
306  std::vector<ChunkKey> ret_vec;
308  [&ret_vec](auto chunk) { ret_vec.push_back(chunk); }, cached_chunks_, chunk_prefix);
309  return ret_vec;
310 }
311 
313  const std::vector<ChunkKey>& chunk_keys) const {
314  ChunkToBufferMap chunk_buffer_map;
315  read_lock lock(chunks_mutex_);
316  for (const auto& chunk_key : chunk_keys) {
317  CHECK(cached_chunks_.find(chunk_key) == cached_chunks_.end());
318  CHECK(caching_file_mgr_->isBufferOnDevice(chunk_key));
319  chunk_buffer_map[chunk_key] = caching_file_mgr_->getBuffer(chunk_key);
320  CHECK(dynamic_cast<File_Namespace::FileBuffer*>(chunk_buffer_map[chunk_key]));
321  CHECK_EQ(chunk_buffer_map[chunk_key]->pageCount(), static_cast<size_t>(0));
322 
323  // Clear all buffer metadata
324  chunk_buffer_map[chunk_key]->resetToEmpty();
325  }
326  return chunk_buffer_map;
327 }
328 
329 // Private functions. Locks should be acquired in the public interface before calling
330 // these functions.
332  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
333  return;
334  }
335  File_Namespace::FileBuffer* file_buffer =
336  static_cast<File_Namespace::FileBuffer*>(caching_file_mgr_->getBuffer(chunk_key));
337  file_buffer->freeChunkPages();
338  cached_chunks_.erase(chunk_key);
339 }
340 
341 std::set<ChunkKey>::iterator ForeignStorageCache::evictChunkByIterator(
342  const std::set<ChunkKey>::iterator& chunk_it) {
343  File_Namespace::FileBuffer* file_buffer =
344  static_cast<File_Namespace::FileBuffer*>(caching_file_mgr_->getBuffer(*chunk_it));
345  file_buffer->freeChunkPages();
346  return cached_chunks_.erase(chunk_it);
347 }
348 
350  std::string ret_string = "Cached chunks:\n";
351  for (const auto& chunk_key : cached_chunks_) {
352  ret_string += " " + show_chunk(chunk_key) + "\n";
353  }
354  return ret_string;
355 }
356 
358  std::string ret_string = "Cached ChunkMetadata:\n";
359  for (const auto& meta_key : cached_metadata_) {
360  ret_string += " " + show_chunk(meta_key) + "\n";
361  }
362  return ret_string;
363 }
364 
365 void ForeignStorageCache::validatePath(const std::string& base_path) const {
366  // check if base_path already exists, and if not create one
367  boost::filesystem::path path(base_path);
368  if (boost::filesystem::exists(path)) {
369  if (!boost::filesystem::is_directory(path)) {
370  throw std::runtime_error{
371  "cache path \"" + base_path +
372  "\" is not a directory. Please specify a valid directory "
373  "with --disk_cache_path=<path>, or use the default location."};
374  }
375  } else { // data directory does not exist
376  if (!boost::filesystem::create_directory(path)) {
377  throw std::runtime_error{
378  "could not create directory at cache path \"" + base_path +
379  "\". Please specify a valid directory location "
380  "with --disk_cache_path=<path> or use the default location."};
381  }
382  }
383 }
384 
386  const ChunkMetadataVector& metadata_vec,
387  const int frag_id) {
388  // Only re-cache last fragment and above
389  ChunkMetadataVector new_metadata_vec;
390  for (const auto& chunk_metadata : metadata_vec) {
391  if (chunk_metadata.first[CHUNK_KEY_FRAGMENT_IDX] >= frag_id) {
392  new_metadata_vec.push_back(chunk_metadata);
393  }
394  }
395  cacheMetadataVec(new_metadata_vec);
396 }
397 
399  const ChunkKey& chunk_key,
400  bool is_new_buffer) {
401  if (!is_new_buffer) {
402  CHECK(caching_file_mgr_->isBufferOnDevice(chunk_key));
403  return caching_file_mgr_->getBuffer(chunk_key);
404  } else {
405  CHECK(!caching_file_mgr_->isBufferOnDevice(chunk_key));
406  return caching_file_mgr_->createBuffer(chunk_key);
407  }
408 }
409 
410 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
bool isMetadataCached(const ChunkKey &) const
std::set< ChunkKey >::iterator evictChunkByIterator(const std::set< ChunkKey >::iterator &chunk_it)
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:70
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:57
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setNumElems(const size_t num_elems)
Definition: Encoder.h:234
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
std::string show_chunk(const ChunkKey &key)
Definition: types.h:85
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:223
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:227
ForeignStorageCache(const DiskCacheConfig &config)
AbstractBuffer * getCachedChunkIfExists(const ChunkKey &)
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
void iterate_over_matching_prefix(Func func, T &chunk_collection, const ChunkKey &chunk_prefix)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
void cacheMetadataVec(const ChunkMetadataVector &)
std::set< ChunkKey >::iterator eraseChunk(const std::set< ChunkKey >::iterator &)
std::unique_ptr< File_Namespace::CachingFileMgr > caching_file_mgr_
void evictThenEraseChunkUnlocked(const ChunkKey &)
void deleteBufferIfExists(const ChunkKey &chunk_key)
void set_metadata_for_buffer(AbstractBuffer *buffer, ChunkMetadata *meta)
void validatePath(const std::string &) const
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
AbstractBuffer * getChunkBufferForPrecaching(const ChunkKey &chunk_key, bool is_new_buffer)
void setSize(const size_t size)
mapd_shared_lock< mapd_shared_mutex > read_lock
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
ChunkToBufferMap getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
mapd_unique_lock< mapd_shared_mutex > write_lock
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
SQLTypeInfo sqlType
Definition: ChunkMetadata.h:32
A selection of helper methods for File I/O.
void cacheChunk(const ChunkKey &, AbstractBuffer *)
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:66
size_t numElements
Definition: ChunkMetadata.h:34