OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CachingGlobalFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "CachingGlobalFileMgr.h"
18 
20 
21 namespace File_Namespace {
23  int32_t device_id,
24  std::shared_ptr<ForeignStorageInterface> fsi,
25  const std::string& base_path,
26  size_t num_reader_threads,
28  size_t default_page_size)
29  : GlobalFileMgr(device_id, fsi, base_path, num_reader_threads, default_page_size)
30  , disk_cache_(disk_cache) {
32 }
33 
35  const size_t page_size,
36  const size_t initial_size) {
37  auto buf = GlobalFileMgr::createBuffer(chunk_key, page_size, initial_size);
38  if (isChunkPrefixCacheable(chunk_key)) {
39  cached_chunk_keys_.emplace(chunk_key);
40  }
41  return buf;
42 }
43 
44 void CachingGlobalFileMgr::deleteBuffer(const ChunkKey& chunk_key, const bool purge) {
45  if (isChunkPrefixCacheable(chunk_key)) {
47  cached_chunk_keys_.erase(chunk_key);
48  }
49  GlobalFileMgr::deleteBuffer(chunk_key, purge);
50 }
51 
53  const bool purge) {
54  if (isChunkPrefixCacheable(chunk_key_prefix)) {
55  CHECK(has_table_prefix(chunk_key_prefix));
56  disk_cache_->clearForTablePrefix(get_table_key(chunk_key_prefix));
57 
58  ChunkKey upper_prefix(chunk_key_prefix);
59  upper_prefix.push_back(std::numeric_limits<int>::max());
60  auto end_it =
61  cached_chunk_keys_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
62  for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(chunk_key_prefix);
63  chunk_key_it != end_it;) {
64  chunk_key_it = cached_chunk_keys_.erase(chunk_key_it);
65  }
66  }
67  GlobalFileMgr::deleteBuffersWithPrefix(chunk_key_prefix, purge);
68 }
69 
71  ChunkMetadataVector& chunk_metadata,
72  const ChunkKey& key_prefix) {
73  CHECK(has_table_prefix(key_prefix));
74  if (isChunkPrefixCacheable(key_prefix)) {
75  // If the disk has any cached metadata for a prefix then it is guaranteed to have all
76  // metadata for that table, so we can return a complete set. If it has no metadata,
77  // then it may be that the table has no data, or that it's just not cached, so we need
78  // to go to storage to check.
79  if (disk_cache_->hasCachedMetadataForKeyPrefix(key_prefix)) {
80  disk_cache_->getCachedMetadataVecForKeyPrefix(chunk_metadata, key_prefix);
81  return;
82  }
83  }
84  GlobalFileMgr::getChunkMetadataVecForKeyPrefix(chunk_metadata, key_prefix);
85  if (isChunkPrefixCacheable(key_prefix)) {
86  disk_cache_->cacheMetadataVec(chunk_metadata);
87  }
88 }
89 
91  AbstractBuffer* destination_buffer,
92  const size_t num_bytes) {
93  if (isChunkPrefixCacheable(chunk_key)) {
94  // If we are recovering after a shutdown, it is possible for there to be cached data
95  // without the file_mgr being initialized, so we need to check if the file_mgr exists.
96  CHECK(has_table_prefix(chunk_key));
97  auto [db, table_id] = get_table_prefix(chunk_key);
98  auto file_mgr = GlobalFileMgr::findFileMgr(db, table_id);
99  if (file_mgr && file_mgr->getBuffer(chunk_key)->isDirty()) {
100  // It is possible for the fragmenter to write data to a FileBuffer and then attempt
101  // to fetch that bufer without checkpointing. In that case the cache will not have
102  // been updated and the cached buffer will be out of date, so we need to fetch the
103  // storage buffer.
104  GlobalFileMgr::fetchBuffer(chunk_key, destination_buffer, num_bytes);
105  } else {
106  AbstractBuffer* buffer = disk_cache_->getCachedChunkIfExists(chunk_key);
107  if (buffer) {
108  buffer->copyTo(destination_buffer, num_bytes);
109  } else {
110  GlobalFileMgr::fetchBuffer(chunk_key, destination_buffer, num_bytes);
111  disk_cache_->putBuffer(chunk_key, destination_buffer, num_bytes);
112  }
113  }
114  } else {
115  GlobalFileMgr::fetchBuffer(chunk_key, destination_buffer, num_bytes);
116  }
117 }
118 
120  AbstractBuffer* source_buffer,
121  const size_t num_bytes) {
122  auto buf = GlobalFileMgr::putBuffer(chunk_key, source_buffer, num_bytes);
123  if (isChunkPrefixCacheable(chunk_key)) {
124  disk_cache_->putBuffer(chunk_key, source_buffer, num_bytes);
125  }
126  return buf;
127 }
128 
130  std::set<File_Namespace::TablePair> tables_to_checkpoint;
131  for (auto& key : cached_chunk_keys_) {
132  if (isChunkPrefixCacheable(key) && GlobalFileMgr::getBuffer(key)->isDirty()) {
133  tables_to_checkpoint.emplace(get_table_prefix(key));
135  GlobalFileMgr::fetchBuffer(key, &temp_buf, 0);
136  disk_cache_->putBuffer(key, &temp_buf);
137  }
138  }
139  for (auto [db, tb] : tables_to_checkpoint) {
140  disk_cache_->checkpoint(db, tb);
141  }
143 }
144 
145 void CachingGlobalFileMgr::checkpoint(const int db_id, const int tb_id) {
146  if (isChunkPrefixCacheable({db_id, tb_id})) {
147  bool need_checkpoint{false};
148  ChunkKey chunk_prefix{db_id, tb_id};
149  ChunkKey upper_prefix(chunk_prefix);
150  upper_prefix.push_back(std::numeric_limits<int>::max());
151  auto end_it =
152  cached_chunk_keys_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
153  for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(chunk_prefix);
154  chunk_key_it != end_it;
155  ++chunk_key_it) {
156  if (GlobalFileMgr::getBuffer(*chunk_key_it)->isDirty()) {
157  need_checkpoint = true;
159  GlobalFileMgr::fetchBuffer(*chunk_key_it, &temp_buf, 0);
160  disk_cache_->putBuffer(*chunk_key_it, &temp_buf);
161  }
162  }
163  if (need_checkpoint) {
164  disk_cache_->checkpoint(db_id, tb_id);
165  }
166  }
167  GlobalFileMgr::checkpoint(db_id, tb_id);
168 }
169 
170 void CachingGlobalFileMgr::removeCachedData(const int db_id, const int table_id) {
171  if (isChunkPrefixCacheable({db_id, table_id})) {
172  const ChunkKey table_key{db_id, table_id};
173  disk_cache_->clearForTablePrefix(table_key);
174  ChunkKey upper_prefix(table_key);
175  upper_prefix.push_back(std::numeric_limits<int>::max());
176  auto end_it =
177  cached_chunk_keys_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
178  for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(table_key);
179  chunk_key_it != end_it;) {
180  chunk_key_it = cached_chunk_keys_.erase(chunk_key_it);
181  }
182  }
183 }
184 
185 void CachingGlobalFileMgr::removeTableRelatedDS(const int db_id, const int table_id) {
186  removeCachedData(db_id, table_id);
187  GlobalFileMgr::removeTableRelatedDS(db_id, table_id);
188 }
189 
191  CHECK(has_table_prefix(chunk_prefix));
192  // If this is an Arrow FSI table then we can't cache it.
193  if (fsi_->lookupBufferManager(chunk_prefix[CHUNK_KEY_DB_IDX],
194  chunk_prefix[CHUNK_KEY_TABLE_IDX])) {
195  return false;
196  }
197  return true;
198 }
199 } // namespace File_Namespace
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Puts the contents of d into the Chunk with the given key.
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
std::vector< int > ChunkKey
Definition: types.h:36
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata, const ChunkKey &keyPrefix) override
std::shared_ptr< ForeignStorageInterface > fsi_
foreign_storage::ForeignStorageCache * disk_cache_
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
AbstractBuffer * createBuffer(const ChunkKey &key, size_t pageSize=0, const size_t numBytes=0) override
Creates a chunk with the specified key and page size.
Definition: GlobalFileMgr.h:66
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
CachingGlobalFileMgr(int32_t device_id, std::shared_ptr< ForeignStorageInterface > fsi, const std::string &base_path, size_t num_reader_threads, foreign_storage::ForeignStorageCache *disk_cache, size_t defaultPageSize=DEFAULT_PAGE_SIZE)
void deleteBuffer(const ChunkKey &chunk_key, const bool purge) override
ChunkKey get_table_key(const ChunkKey &key)
Definition: types.h:57
void getCachedMetadataVecForKeyPrefix(ChunkMetadataVector &, const ChunkKey &) const
AbstractBuffer * createBuffer(const ChunkKey &chunk_key, const size_t page_size, const size_t initial_size) override
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
void removeCachedData(const int db_id, const int table_id)
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
bool has_table_prefix(const ChunkKey &key)
Definition: types.h:48
An AbstractBuffer is a unit of data management for a data manager.
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
bool isChunkPrefixCacheable(const ChunkKey &chunk_prefix) const
void putBuffer(const ChunkKey &, AbstractBuffer *, const size_t numBytes=0)
void cacheMetadataVec(const ChunkMetadataVector &)
void deleteBuffersWithPrefix(const ChunkKey &chunk_key_prefix, const bool purge) override
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: GlobalFileMgr.h:80
void deleteBufferIfExists(const ChunkKey &chunk_key)
AbstractBuffer * putBuffer(const ChunkKey &chunk_key, AbstractBuffer *source_buffer, const size_t num_bytes) override
void removeTableRelatedDS(const int db_id, const int table_id) override
void fetchBuffer(const ChunkKey &chunk_key, AbstractBuffer *destination_buffer, const size_t num_bytes) override
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: GlobalFileMgr.h:88
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
std::pair< int, int > get_table_prefix(const ChunkKey &key)
Definition: types.h:62
void checkpoint(const int32_t db_id, const int32_t tb_id)
#define CHECK(condition)
Definition: Logger.h:291
File_Namespace::FileBuffer * getCachedChunkIfExists(const ChunkKey &)
AbstractBufferMgr * findFileMgr(const int32_t db_id, const int32_t tb_id)
bool hasCachedMetadataForKeyPrefix(const ChunkKey &) const
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes) override
Definition: GlobalFileMgr.h:92
void removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) override