OmniSciDB  3a86f6ec37
ForeignStorageCache.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  TODO(Misiu): A lot of methods here can be made asyncronous. It may be worth an
19  investigation to determine if it's worth adding async versions of them for performance
20  reasons.
21 */
22 
23 #include "ForeignStorageCache.h"
24 #include "Shared/File.h"
25 #include "Shared/measure.h"
26 
27 namespace foreign_storage {
28 using read_lock = mapd_shared_lock<mapd_shared_mutex>;
29 using write_lock = mapd_unique_lock<mapd_shared_mutex>;
30 
31 namespace {
32 template <typename Func, typename T>
34  T& chunk_collection,
35  const ChunkKey& chunk_prefix) {
36  ChunkKey upper_prefix(chunk_prefix);
37  upper_prefix.push_back(std::numeric_limits<int>::max());
38  auto end_it = chunk_collection.upper_bound(static_cast<const ChunkKey>(upper_prefix));
39  for (auto chunk_it = chunk_collection.lower_bound(chunk_prefix); chunk_it != end_it;
40  ++chunk_it) {
41  func(*chunk_it);
42  }
43 }
44 
46  buffer->initEncoder(meta->sqlType);
47  buffer->setSize(meta->numBytes);
48  buffer->getEncoder()->setNumElems(meta->numElements);
49  buffer->getEncoder()->resetChunkStats(meta->chunkStats);
50  buffer->setUpdated();
51 }
52 } // namespace
53 
55  : num_chunks_added_(0), num_metadata_added_(0) {
56  validatePath(config.path);
57  global_file_mgr_ = std::make_unique<File_Namespace::GlobalFileMgr>(
58  0, config.path, config.num_reader_threads);
59 }
60 
62  write_lock meta_lock(metadata_mutex_);
63  write_lock chunk_lock(chunks_mutex_);
64  if (cached_metadata_.find(chunk_key) != cached_metadata_.end()) {
65  global_file_mgr_->deleteBuffer(chunk_key);
66  cached_chunks_.erase(chunk_key);
67  cached_metadata_.erase(chunk_key);
68  }
69 }
70 
71 void ForeignStorageCache::cacheChunk(const ChunkKey& chunk_key, AbstractBuffer* buffer) {
72  auto timer = DEBUG_TIMER(__func__);
73  write_lock meta_lock(metadata_mutex_);
74  write_lock chunk_lock(chunks_mutex_);
75  // We should only be caching buffers that are in sync with storage.
76  CHECK(!buffer->isDirty());
77  buffer->setUpdated();
79  global_file_mgr_->putBuffer(chunk_key, buffer);
80  global_file_mgr_->checkpoint();
81  cached_metadata_.emplace(chunk_key);
82  cached_chunks_.emplace(chunk_key);
83  CHECK(!buffer->isDirty());
84 }
85 
86 void ForeignStorageCache::cacheTableChunks(const std::vector<ChunkKey>& chunk_keys) {
87  auto timer = DEBUG_TIMER(__func__);
89  CHECK(!chunk_keys.empty());
90 
91  auto db_id = chunk_keys[0][CHUNK_KEY_DB_IDX];
92  auto table_id = chunk_keys[0][CHUNK_KEY_TABLE_IDX];
93  const ChunkKey table_key{db_id, table_id};
94 
95  for (const auto& chunk_key : chunk_keys) {
96  CHECK_EQ(db_id, chunk_key[CHUNK_KEY_DB_IDX]);
97  CHECK_EQ(table_id, chunk_key[CHUNK_KEY_TABLE_IDX]);
98  CHECK(global_file_mgr_->isBufferOnDevice(chunk_key));
100  cached_chunks_.emplace(chunk_key);
101  }
102  global_file_mgr_->checkpoint(db_id, table_id);
103 }
104 
106  auto timer = DEBUG_TIMER(__func__);
107  {
108  read_lock lock(chunks_mutex_);
109  // We do this instead of calling getBuffer so that we don't create a fileMgr if the
110  // chunk doesn't exist.
111  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
112  return nullptr;
113  }
114  }
115  return global_file_mgr_->getBuffer(chunk_key);
116 }
117 
118 bool ForeignStorageCache::isMetadataCached(const ChunkKey& chunk_key) const {
119  auto timer = DEBUG_TIMER(__func__);
121  return (cached_metadata_.find(chunk_key) != cached_metadata_.end());
122 }
123 
125  const ChunkKey& table_key) {
127  CHECK(meta_vec.size() == 0);
128  CHECK(is_table_key(table_key));
129  CHECK(dynamic_cast<File_Namespace::FileMgr*>(global_file_mgr_->getFileMgr(table_key)));
130 
131  global_file_mgr_->getChunkMetadataVecForKeyPrefix(meta_vec, table_key);
132  for (auto& [chunk_key, metadata] : meta_vec) {
133  cached_metadata_.emplace(chunk_key);
134  // If there is no page count then the chunk was metadata only and should not be
135  // cached.
136  if (const auto& buf = global_file_mgr_->getBuffer(chunk_key); buf->pageCount() > 0) {
137  cached_chunks_.emplace(chunk_key);
138  }
139 
140  if (is_varlen_key(chunk_key)) {
141  // Metadata is only available for the data chunk, but look for the index as well
142  CHECK(is_varlen_data_key(chunk_key));
143  ChunkKey index_chunk_key = {chunk_key[CHUNK_KEY_DB_IDX],
144  chunk_key[CHUNK_KEY_TABLE_IDX],
145  chunk_key[CHUNK_KEY_COLUMN_IDX],
146  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
147  2};
148 
149  if (const auto& buf = global_file_mgr_->getBuffer(index_chunk_key);
150  buf->pageCount() > 0) {
151  cached_chunks_.emplace(index_chunk_key);
152  }
153  }
154  }
155  return (meta_vec.size() > 0);
156 }
157 
159  write_lock chunk_lock(chunks_mutex_);
160  evictThenEraseChunkUnlocked(chunk_key);
161 }
162 
164  const ChunkKey table_prefix = get_table_key(chunk_key);
165  eraseChunk(chunk_key);
166 }
167 
169  auto timer = DEBUG_TIMER(__func__);
170  write_lock meta_lock(metadata_mutex_);
171  write_lock chunk_lock(chunks_mutex_);
172  for (auto& [chunk_key, metadata] : metadata_vec) {
173  cached_metadata_.emplace(chunk_key);
174  AbstractBuffer* buf;
175  AbstractBuffer* index_buffer = nullptr;
176  ChunkKey index_chunk_key;
177  if (is_varlen_key(chunk_key)) {
178  // For variable length chunks, metadata is associated with the data chunk.
179  CHECK(is_varlen_data_key(chunk_key));
180  index_chunk_key = {chunk_key[CHUNK_KEY_DB_IDX],
181  chunk_key[CHUNK_KEY_TABLE_IDX],
182  chunk_key[CHUNK_KEY_COLUMN_IDX],
183  chunk_key[CHUNK_KEY_FRAGMENT_IDX],
184  2};
185  }
186  bool chunk_in_cache = false;
187  if (!global_file_mgr_->isBufferOnDevice(chunk_key)) {
188  buf = global_file_mgr_->createBuffer(chunk_key);
189 
190  if (!index_chunk_key.empty()) {
191  CHECK(!global_file_mgr_->isBufferOnDevice(index_chunk_key));
192  index_buffer = global_file_mgr_->createBuffer(index_chunk_key);
193  CHECK(index_buffer);
194  }
195  } else {
196  buf = global_file_mgr_->getBuffer(chunk_key);
197 
198  if (!index_chunk_key.empty()) {
199  CHECK(global_file_mgr_->isBufferOnDevice(index_chunk_key));
200  index_buffer = global_file_mgr_->getBuffer(index_chunk_key);
201  CHECK(index_buffer);
202  }
203 
204  // We should have already cleared the data unless we are appending
205  // If the buffer metadata has changed, we need to remove this chunk
206  if (buf->getEncoder() != nullptr) {
207  const std::shared_ptr<ChunkMetadata> buf_metadata =
208  std::make_shared<ChunkMetadata>();
209  buf->getEncoder()->getMetadata(buf_metadata);
210  chunk_in_cache = *metadata.get() == *buf_metadata;
211  }
212  }
213 
214  if (!chunk_in_cache) {
215  set_metadata_for_buffer(buf, metadata.get());
216  evictThenEraseChunkUnlocked(chunk_key);
217 
218  if (!index_chunk_key.empty()) {
219  CHECK(index_buffer);
220  index_buffer->setUpdated();
221  evictThenEraseChunkUnlocked(index_chunk_key);
222  }
223  }
225  }
226  global_file_mgr_->checkpoint();
227 }
228 
230  ChunkMetadataVector& metadata_vec,
231  const ChunkKey& chunk_prefix) const {
232  auto timer = DEBUG_TIMER(__func__);
233  read_lock r_lock(metadata_mutex_);
235  [&metadata_vec, this](auto chunk) {
236  std::shared_ptr<ChunkMetadata> buf_metadata = std::make_shared<ChunkMetadata>();
237  global_file_mgr_->getBuffer(chunk)->getEncoder()->getMetadata(buf_metadata);
238  metadata_vec.push_back(std::make_pair(chunk, buf_metadata));
239  },
241  chunk_prefix);
242 }
243 
245  const ChunkKey& chunk_prefix) const {
246  auto timer = DEBUG_TIMER(__func__);
248  // We don't use iterateOvermatchingPrefix() here because we want to exit early if
249  // possible.
250  ChunkKey upper_prefix(chunk_prefix);
251  upper_prefix.push_back(std::numeric_limits<int>::max());
252  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
253  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;
254  ++meta_it) {
255  return true;
256  }
257  return false;
258 }
259 
261  CHECK(is_table_key(chunk_prefix));
262  auto timer = DEBUG_TIMER(__func__);
263  ChunkKey upper_prefix(chunk_prefix);
264  upper_prefix.push_back(std::numeric_limits<int>::max());
265  {
266  write_lock w_lock(chunks_mutex_);
267  // Delete chunks for prefix
268  auto end_it = cached_chunks_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
269  for (auto chunk_it = cached_chunks_.lower_bound(chunk_prefix); chunk_it != end_it;) {
270  chunk_it = evictChunkByIterator(chunk_it);
271  }
272  }
273  {
274  write_lock w_lock(metadata_mutex_);
275  // Delete metadata for prefix
276  auto end_it = cached_metadata_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
277  for (auto meta_it = cached_metadata_.lower_bound(chunk_prefix); meta_it != end_it;) {
278  meta_it = cached_metadata_.erase(meta_it);
279  }
280  }
281  global_file_mgr_->removeTableRelatedDS(chunk_prefix[0], chunk_prefix[1]);
282 }
283 
285  auto timer = DEBUG_TIMER(__func__);
286  std::set<ChunkKey> table_keys;
287  {
288  write_lock w_lock(chunks_mutex_);
289  for (auto chunk_it = cached_chunks_.begin(); chunk_it != cached_chunks_.end();) {
290  chunk_it = evictChunkByIterator(chunk_it);
291  }
292  }
293  {
294  write_lock w_lock(metadata_mutex_);
295  for (auto meta_it = cached_metadata_.begin(); meta_it != cached_metadata_.end();) {
296  table_keys.emplace(ChunkKey{(*meta_it)[0], (*meta_it)[1]});
297  meta_it = cached_metadata_.erase(meta_it);
298  }
299  }
300  for (const auto& table_key : table_keys) {
301  global_file_mgr_->removeTableRelatedDS(table_key[0], table_key[1]);
302  }
303 }
304 
306  const ChunkKey& chunk_prefix) const {
307  read_lock r_lock(chunks_mutex_);
308  std::vector<ChunkKey> ret_vec;
310  [&ret_vec](auto chunk) { ret_vec.push_back(chunk); }, cached_chunks_, chunk_prefix);
311  return ret_vec;
312 }
313 
314 std::map<ChunkKey, AbstractBuffer*> ForeignStorageCache::getChunkBuffersForCaching(
315  const std::vector<ChunkKey>& chunk_keys) const {
316  auto timer = DEBUG_TIMER(__func__);
317  std::map<ChunkKey, AbstractBuffer*> chunk_buffer_map;
318  read_lock lock(chunks_mutex_);
319  for (const auto& chunk_key : chunk_keys) {
320  CHECK(cached_chunks_.find(chunk_key) == cached_chunks_.end());
321  CHECK(global_file_mgr_->isBufferOnDevice(chunk_key));
322  chunk_buffer_map[chunk_key] = global_file_mgr_->getBuffer(chunk_key);
323  CHECK(dynamic_cast<File_Namespace::FileBuffer*>(chunk_buffer_map[chunk_key]));
324  CHECK_EQ(chunk_buffer_map[chunk_key]->pageCount(), static_cast<size_t>(0));
325 
326  // Clear all buffer metadata
327  chunk_buffer_map[chunk_key]->resetToEmpty();
328  }
329  return chunk_buffer_map;
330 }
331 
332 // Private functions. Locks should be acquired in the public interface before calling
333 // these functions.
335  auto timer = DEBUG_TIMER(__func__);
336  if (cached_chunks_.find(chunk_key) == cached_chunks_.end()) {
337  return;
338  }
339  File_Namespace::FileBuffer* file_buffer =
340  static_cast<File_Namespace::FileBuffer*>(global_file_mgr_->getBuffer(chunk_key));
341  file_buffer->freeChunkPages();
342  cached_chunks_.erase(chunk_key);
343 }
344 
345 std::set<ChunkKey>::iterator ForeignStorageCache::evictChunkByIterator(
346  const std::set<ChunkKey>::iterator& chunk_it) {
347  auto timer = DEBUG_TIMER(__func__);
348  File_Namespace::FileBuffer* file_buffer =
349  static_cast<File_Namespace::FileBuffer*>(global_file_mgr_->getBuffer(*chunk_it));
350  file_buffer->freeChunkPages();
351  return cached_chunks_.erase(chunk_it);
352 }
353 
355  auto timer = DEBUG_TIMER(__func__);
356  std::string ret_string = "Cached chunks:\n";
357  for (const auto& chunk_key : cached_chunks_) {
358  ret_string += " " + show_chunk(chunk_key) + "\n";
359  }
360  return ret_string;
361 }
362 
364  auto timer = DEBUG_TIMER(__func__);
365  std::string ret_string = "Cached ChunkMetadata:\n";
366  for (const auto& meta_key : cached_metadata_) {
367  ret_string += " " + show_chunk(meta_key) + "\n";
368  }
369  return ret_string;
370 }
371 
372 void ForeignStorageCache::validatePath(const std::string& base_path) const {
373  // check if base_path already exists, and if not create one
374  boost::filesystem::path path(base_path);
375  if (boost::filesystem::exists(path)) {
376  if (!boost::filesystem::is_directory(path)) {
377  throw std::runtime_error{
378  "cache path \"" + base_path +
379  "\" is not a directory. Please specify a valid directory "
380  "with --disk_cache_path=<path>, or use the default location."};
381  }
382  } else { // data directory does not exist
383  if (!boost::filesystem::create_directory(path)) {
384  throw std::runtime_error{
385  "could not create directory at cache path \"" + base_path +
386  "\". Please specify a valid directory location "
387  "with --disk_cache_path=<path> or use the default location."};
388  }
389  }
390 }
391 
393  const ChunkKey& table_prefix) const {
394  CHECK(table_prefix.size() >= 2);
395  auto fileMgr = dynamic_cast<File_Namespace::FileMgr*>(
396  getGlobalFileMgr()->getFileMgr(table_prefix));
397  CHECK(fileMgr);
398  return fileMgr->getFileMgrBasePath();
399 }
400 
402  const ChunkMetadataVector& metadata_vec,
403  const int frag_id) {
404  // Only re-cache last fragment and above
405  ChunkMetadataVector new_metadata_vec;
406  for (const auto& chunk_metadata : metadata_vec) {
407  if (chunk_metadata.first[CHUNK_KEY_FRAGMENT_IDX] >= frag_id) {
408  new_metadata_vec.push_back(chunk_metadata);
409  }
410  }
411  cacheMetadataVec(new_metadata_vec);
412 }
413 
415  const ChunkKey& chunk_key,
416  bool is_new_buffer) {
417  if (!is_new_buffer) {
418  CHECK(getGlobalFileMgr()->isBufferOnDevice(chunk_key));
419  return getGlobalFileMgr()->getBuffer(chunk_key);
420  } else {
421  CHECK(!getGlobalFileMgr()->isBufferOnDevice(chunk_key));
422  return getGlobalFileMgr()->createBuffer(chunk_key);
423  }
424 }
425 
426 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:205
void validatePath(const std::string &) const
std::vector< ChunkKey > getCachedChunksForKeyPrefix(const ChunkKey &) const
File_Namespace::GlobalFileMgr * getGlobalFileMgr() const
std::set< ChunkKey >::iterator evictChunkByIterator(const std::set< ChunkKey >::iterator &chunk_it)
AbstractBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: GlobalFileMgr.h:63
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
bool is_table_key(const ChunkKey &key)
Definition: types.h:44
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void cacheTableChunks(const std::vector< ChunkKey > &chunk_keys)
Represents/provides access to contiguous data stored in the file system.
Definition: FileBuffer.h:56
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setNumElems(const size_t num_elems)
Definition: Encoder.h:229
ChunkStats chunkStats
Definition: ChunkMetadata.h:35
virtual bool resetChunkStats(const ChunkStats &)
: Reset chunk level stats (min, max, nulls) using new values from the argument.
Definition: Encoder.h:223
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:227
ForeignStorageCache(const DiskCacheConfig &config)
AbstractBuffer * getCachedChunkIfExists(const ChunkKey &)
std::string getCacheDirectoryForTablePrefix(const ChunkKey &) const
std::map< ChunkKey, AbstractBuffer * > getChunkBuffersForCaching(const std::vector< ChunkKey > &chunk_keys) const
bool is_varlen_key(const ChunkKey &key)
Definition: types.h:61
void iterate_over_matching_prefix(Func func, T &chunk_collection, const ChunkKey &chunk_prefix)
An AbstractBuffer is a unit of data management for a data manager.
void cacheMetadataWithFragIdGreaterOrEqualTo(const ChunkMetadataVector &metadata_vec, const int frag_id)
bool recoverCacheForTable(ChunkMetadataVector &, const ChunkKey &)
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:52
void cacheMetadataVec(const ChunkMetadataVector &)
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::set< ChunkKey >::iterator eraseChunk(const std::set< ChunkKey >::iterator &)
bool isMetadataCached(const ChunkKey &) const
void evictThenEraseChunkUnlocked(const ChunkKey &)
void deleteBufferIfExists(const ChunkKey &chunk_key)
std::string show_chunk(const ChunkKey &key)
Definition: types.h:73
void set_metadata_for_buffer(AbstractBuffer *buffer, ChunkMetadata *meta)
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: GlobalFileMgr.h:85
std::unique_ptr< File_Namespace::GlobalFileMgr > global_file_mgr_
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
AbstractBuffer * getChunkBufferForPrecaching(const ChunkKey &chunk_key, bool is_new_buffer)
void setSize(const size_t size)
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_lock< mapd_shared_mutex > read_lock
bool is_varlen_data_key(const ChunkKey &key)
Definition: types.h:65
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< int > ChunkKey
Definition: types.h:37
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
mapd_unique_lock< mapd_shared_mutex > write_lock
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
SQLTypeInfo sqlType
Definition: ChunkMetadata.h:32
A selection of helper methods for File I/O.
void cacheChunk(const ChunkKey &, AbstractBuffer *)
size_t numElements
Definition: ChunkMetadata.h:34