OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
GlobalFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <boost/filesystem.hpp>
28 #include <boost/lexical_cast.hpp>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
36 #include "Shared/File.h"
37 
38 using namespace std;
39 
40 namespace File_Namespace {
41 
42 GlobalFileMgr::GlobalFileMgr(const int32_t device_id,
43  std::shared_ptr<ForeignStorageInterface> fsi,
44  std::string base_path,
45  const size_t num_reader_threads,
46  const size_t page_size,
47  const size_t metadata_page_size)
48  : AbstractBufferMgr(device_id)
49  , fsi_(fsi)
50  , basePath_(base_path)
51  , num_reader_threads_(num_reader_threads)
52  , epoch_(-1) // set the default epoch for all tables corresponding to the time of
53  // last checkpoint
54  , page_size_(page_size)
55  , metadata_page_size_(metadata_page_size) {
57  // DS changes also triggered by individual FileMgr per table project (release 2.1.0)
58  dbConvert_ = false;
59  init();
60 }
61 
63  // check if basePath_ already exists, and if not create one
64  boost::filesystem::path path(basePath_);
65  if (basePath_.size() > 0 && basePath_[basePath_.size() - 1] != '/') {
66  basePath_.push_back('/');
67  }
68  if (boost::filesystem::exists(path)) {
69  if (!boost::filesystem::is_directory(path)) {
70  LOG(FATAL) << "Specified path is not a directory.";
71  }
72  } else { // data directory does not exist
73  if (!boost::filesystem::create_directory(path)) {
74  LOG(FATAL) << "Could not create data directory";
75  }
76  }
77 }
78 
81  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
82  ++fileMgrsIt) {
83  fileMgrsIt->second->checkpoint();
84  }
85 }
86 
87 void GlobalFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
88  getFileMgr(db_id, tb_id)->checkpoint();
89 }
90 
93  size_t num_chunks = 0;
94  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
95  ++fileMgrsIt) {
96  num_chunks += fileMgrsIt->second->getNumChunks();
97  }
98 
99  return num_chunks;
100 }
101 
102 void GlobalFileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
103  /* keyPrefix[0] can be -1 only for gpu or cpu buffers but not for FileMgr.
104  * There is no assert here, as GlobalFileMgr is being called with -1 value as well in
105  * the same loop with other buffers. So the case of -1 will just be ignored, as nothing
106  * needs to be done.
107  */
108  if (keyPrefix[0] != -1) {
109  return getFileMgr(keyPrefix)->deleteBuffersWithPrefix(keyPrefix, purge);
110  }
111 }
112 
113 AbstractBufferMgr* GlobalFileMgr::findFileMgrUnlocked(const int32_t db_id,
114  const int32_t tb_id) {
115  // NOTE: only call this private function after locking is already in place
116  AbstractBufferMgr* fm = nullptr;
117  const auto file_mgr_key = std::make_pair(db_id, tb_id);
118  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
119  fm = it->second;
120  }
121  return fm;
122 }
123 
124 void GlobalFileMgr::deleteFileMgr(const int32_t db_id, const int32_t tb_id) {
125  // NOTE: only call this private function after locking is already in place
126  const auto file_mgr_key = std::make_pair(db_id, tb_id);
127  if (auto it = ownedFileMgrs_.find(file_mgr_key); it != ownedFileMgrs_.end()) {
128  ownedFileMgrs_.erase(it);
129  }
130  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
131  allFileMgrs_.erase(it);
132  }
133 }
134 
135 void GlobalFileMgr::closeFileMgr(const int32_t db_id, const int32_t tb_id) {
137  deleteFileMgr(db_id, tb_id);
138 }
139 
141  FileMgr* file_mgr,
142  const FileMgrParams& file_mgr_params) const {
143  if (file_mgr_params.epoch != -1 &&
144  file_mgr_params.epoch != file_mgr->lastCheckpointedEpoch()) {
145  return true;
146  }
147  if (file_mgr_params.max_rollback_epochs != -1 &&
148  file_mgr_params.max_rollback_epochs != file_mgr->maxRollbackEpochs()) {
149  return true;
150  }
151  return false;
152 }
153 
154 void GlobalFileMgr::setFileMgrParams(const int32_t db_id,
155  const int32_t tb_id,
156  const FileMgrParams& file_mgr_params) {
157  auto fm = dynamic_cast<File_Namespace::FileMgr*>(findFileMgr(db_id, tb_id));
159  if (fm) {
160  deleteFileMgr(db_id, tb_id);
161  }
162  const auto file_mgr_key = std::make_pair(db_id, tb_id);
163  auto max_rollback_epochs =
164  (file_mgr_params.max_rollback_epochs >= 0 ? file_mgr_params.max_rollback_epochs
165  : -1);
166  auto s = std::make_shared<FileMgr>(
167  0,
168  this,
169  file_mgr_key,
170  max_rollback_epochs,
172  file_mgr_params.epoch != -1 ? file_mgr_params.epoch : epoch_);
173  CHECK(ownedFileMgrs_.insert(std::make_pair(file_mgr_key, s)).second);
174  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, s.get())).second);
175  max_rollback_epochs_per_table_[file_mgr_key] = max_rollback_epochs;
176  lazy_initialized_stats_.erase(file_mgr_key);
177  return;
178 }
179 
180 AbstractBufferMgr* GlobalFileMgr::getFileMgr(const int32_t db_id, const int32_t tb_id) {
181  { // check if FileMgr already exists for (db_id, tb_id)
183  AbstractBufferMgr* fm = findFileMgrUnlocked(db_id, tb_id);
184  if (fm) {
185  return fm;
186  }
187  }
188 
189  { // create new FileMgr for (db_id, tb_id)
191  AbstractBufferMgr* fm = findFileMgrUnlocked(db_id, tb_id);
192  if (fm) {
193  return fm; // mgr was added between the read lock and the write lock
194  }
195  const auto file_mgr_key = std::make_pair(db_id, tb_id);
196  const auto foreign_buffer_manager = fsi_->lookupBufferManager(db_id, tb_id);
197  if (foreign_buffer_manager) {
198  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, foreign_buffer_manager))
199  .second);
200  return foreign_buffer_manager;
201  } else {
202  int32_t max_rollback_epochs{-1};
203  if (max_rollback_epochs_per_table_.find(file_mgr_key) !=
205  max_rollback_epochs = max_rollback_epochs_per_table_[file_mgr_key];
206  }
207  auto s = std::make_shared<FileMgr>(
208  0, this, file_mgr_key, max_rollback_epochs, num_reader_threads_, epoch_);
209  CHECK(ownedFileMgrs_.insert(std::make_pair(file_mgr_key, s)).second);
210  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, s.get())).second);
211  lazy_initialized_stats_.erase(file_mgr_key);
212  return s.get();
213  }
214  }
215 }
216 
217 // For testing purposes only
218 std::shared_ptr<FileMgr> GlobalFileMgr::getSharedFileMgr(const int db_id,
219  const int table_id) {
220  const auto table_key = std::make_pair(db_id, table_id);
221  if (ownedFileMgrs_.find(table_key) == ownedFileMgrs_.end()) {
222  return nullptr;
223  }
224  return ownedFileMgrs_[table_key];
225 }
226 
227 // For testing purposes only
228 void GlobalFileMgr::setFileMgr(const int db_id,
229  const int table_id,
230  std::shared_ptr<FileMgr> file_mgr) {
231  TablePair file_mgr_key{db_id, table_id};
232  allFileMgrs_[file_mgr_key] = file_mgr.get();
233  ownedFileMgrs_[file_mgr_key] = file_mgr;
234  lazy_initialized_stats_.erase(file_mgr_key);
235 }
236 
238  FileMgr* fileMgr) { // this function is not used, keep it for now for future needs
240  for (auto fileMgrIt = allFileMgrs_.begin(); fileMgrIt != allFileMgrs_.end();
241  fileMgrIt++) {
242  FileMgr* fm = dynamic_cast<FileMgr*>(fileMgrIt->second);
243  CHECK(fm);
244  if ((fileMgr != 0) && (fileMgr != fm)) {
245  continue;
246  }
247  for (auto chunkIt = fm->chunkIndex_.begin(); chunkIt != fm->chunkIndex_.end();
248  chunkIt++) {
249  chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0);
250  }
251  }
252 }
253 
254 void GlobalFileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) {
256  auto abm = findFileMgrUnlocked(db_id, tb_id);
257  if (auto fm = dynamic_cast<File_Namespace::FileMgr*>(abm)) {
258  fm->closeRemovePhysical();
259  } else if (dynamic_cast<ForeignStorageBufferMgr*>(abm)) {
260  abm->removeTableRelatedDS(db_id, tb_id);
261  fsi_->dropBufferManager(db_id, tb_id);
262  } else {
263  // fileMgr has not been initialized so there is no need to
264  // spend the time initializing
265  // initialize just enough to have to rename
266  const auto file_mgr_key = std::make_pair(db_id, tb_id);
267  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, true);
268  u->closeRemovePhysical();
269  }
270  // remove table related in-memory DS only if directory was removed successfully
271 
272  deleteFileMgr(db_id, tb_id);
273  max_rollback_epochs_per_table_.erase({db_id, tb_id});
274 }
275 
276 void GlobalFileMgr::setTableEpoch(const int32_t db_id,
277  const int32_t tb_id,
278  const int32_t start_epoch) {
279  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
280  if (opened_fm) {
281  // Delete this FileMgr to ensure epoch change occurs in constructor with other
282  // reads/writes locked out
283  deleteFileMgr(db_id, tb_id);
284  }
285  const auto file_mgr_key = std::make_pair(db_id, tb_id);
286  // this is where the real rollback of any data ahead of the currently set epoch is
287  // performed
288  // Will call set_epoch with start_epoch internally
289  auto u = std::make_unique<FileMgr>(
290  0, this, file_mgr_key, -1, num_reader_threads_, start_epoch);
291  // remove the dummy one we built
292  u.reset();
293 }
294 
295 size_t GlobalFileMgr::getTableEpoch(const int32_t db_id, const int32_t tb_id) {
296  // UX change was made to this function Oct 2020 to return checkpointed epoch. In turn,
297  // setTableEpoch was changed to set the epoch at the user's input, instead of input - 1
299  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
300  if (opened_fm) {
301  return dynamic_cast<FileMgr*>(opened_fm)->lastCheckpointedEpoch();
302  }
303  // Do not do full init of table just to get table epoch, just check file instead
304  const auto file_mgr_key = std::make_pair(db_id, tb_id);
305  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, true);
306  const auto epoch = u->lastCheckpointedEpoch();
307  u.reset();
308  return epoch;
309 }
310 
311 void GlobalFileMgr::resetTableEpochFloor(const int32_t db_id, const int32_t tb_id) {
312  AbstractBufferMgr* fm = getFileMgr(db_id, tb_id);
313  CHECK(fm);
314  dynamic_cast<FileMgr*>(fm)->resetEpochFloor();
315 }
316 
317 StorageStats GlobalFileMgr::getStorageStats(const int32_t db_id, const int32_t tb_id) {
319  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
320  if (opened_fm) {
321  return dynamic_cast<FileMgr*>(opened_fm)->getStorageStats();
322  }
323  TablePair file_mgr_key{db_id, tb_id};
324  auto it = lazy_initialized_stats_.find(file_mgr_key);
325  if (it != lazy_initialized_stats_.end()) {
326  return it->second;
327  } else {
328  // Do not do full init of table just to get storage stats, just check file instead
329  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, true);
330  lazy_initialized_stats_[file_mgr_key] = u->getStorageStats();
331  u.reset();
332  return lazy_initialized_stats_[file_mgr_key];
333  }
334 }
335 
336 void GlobalFileMgr::compactDataFiles(const int32_t db_id, const int32_t tb_id) {
337  auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(findFileMgr(db_id, tb_id));
338  {
340  if (file_mgr) {
341  file_mgr->compactFiles();
342  deleteFileMgr(db_id, tb_id);
343  }
344  }
345 
346  // Re-initialize file manager
347  getFileMgr(db_id, tb_id);
348 }
349 } // namespace File_Namespace
void writeFileMgrData(FileMgr *fileMgr=0)
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
std::vector< int > ChunkKey
Definition: types.h:36
void deleteFileMgr(const int32_t db_id, const int32_t tb_id)
int32_t epoch_
number of threads used when loading data
std::shared_ptr< ForeignStorageInterface > fsi_
heavyai::shared_lock< heavyai::shared_mutex > read_lock
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
std::map< TablePair, std::shared_ptr< FileMgr > > ownedFileMgrs_
#define LOG(tag)
Definition: Logger.h:285
heavyai::unique_lock< heavyai::shared_mutex > write_lock
size_t getNumChunks() override
int32_t lastCheckpointedEpoch() const
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:297
std::shared_lock< T > shared_lock
std::map< TablePair, AbstractBufferMgr * > allFileMgrs_
void resetTableEpochFloor(const int32_t db_id, const int32_t tb_id)
void setTableEpoch(const int32_t db_id, const int32_t tb_id, const int32_t start_epoch)
StorageStats getStorageStats(const int32_t db_id, const int32_t tb_id)
AbstractBufferMgr * findFileMgrUnlocked(const int32_t db_id, const int32_t tb_id)
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:326
std::unique_lock< T > unique_lock
std::shared_ptr< FileMgr > getSharedFileMgr(const int db_id, const int table_id)
int32_t omnisci_db_version_
used to set FileMgr metadta_page_size_
bool existsDiffBetweenFileMgrParamsAndFileMgr(FileMgr *file_mgr, const FileMgrParams &file_mgr_params) const
void compactDataFiles(const int32_t db_id, const int32_t tb_id)
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
size_t num_reader_threads_
The OS file system path containing the files.
std::map< TablePair, int32_t > max_rollback_epochs_per_table_
void setFileMgr(const int db_id, const int table_id, std::shared_ptr< FileMgr > file_mgr)
void closeFileMgr(const int32_t db_id, const int32_t tb_id)
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
#define CHECK(condition)
Definition: Logger.h:291
AbstractBufferMgr * findFileMgr(const int32_t db_id, const int32_t tb_id)
std::map< TablePair, StorageStats > lazy_initialized_stats_
int32_t maxRollbackEpochs()
Returns value max_rollback_epochs.
Definition: FileMgr.h:306
std::pair< const int32_t, const int32_t > TablePair
Definition: FileMgr.h:91
void removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) override
heavyai::shared_mutex fileMgrs_mutex_
A selection of helper methods for File I/O.
size_t getTableEpoch(const int32_t db_id, const int32_t tb_id)