OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
GlobalFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <boost/filesystem.hpp>
28 #include <boost/lexical_cast.hpp>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
36 #include "Shared/File.h"
37 
38 using namespace std;
39 
40 namespace File_Namespace {
41 
42 GlobalFileMgr::GlobalFileMgr(const int32_t deviceId,
43  std::shared_ptr<ForeignStorageInterface> fsi,
44  std::string basePath,
45  const size_t num_reader_threads,
46  const size_t defaultPageSize)
47  : AbstractBufferMgr(deviceId)
48  , fsi_(fsi)
49  , basePath_(basePath)
50  , num_reader_threads_(num_reader_threads)
51  , epoch_(-1)
52  , // set the default epoch for all tables corresponding to the time of
53  // last checkpoint
54  defaultPageSize_(defaultPageSize) {
56  // DS changes also triggered by individual FileMgr per table project (release 2.1.0)
57  dbConvert_ = false;
58  init();
59 }
60 
62  // check if basePath_ already exists, and if not create one
63  boost::filesystem::path path(basePath_);
64  if (basePath_.size() > 0 && basePath_[basePath_.size() - 1] != '/') {
65  basePath_.push_back('/');
66  }
67  if (boost::filesystem::exists(path)) {
68  if (!boost::filesystem::is_directory(path)) {
69  LOG(FATAL) << "Specified path is not a directory.";
70  }
71  } else { // data directory does not exist
72  if (!boost::filesystem::create_directory(path)) {
73  LOG(FATAL) << "Could not create data directory";
74  }
75  }
76 }
77 
79  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
80  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
81  ++fileMgrsIt) {
82  fileMgrsIt->second->checkpoint();
83  }
84 }
85 
86 void GlobalFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
87  getFileMgr(db_id, tb_id)->checkpoint();
88 }
89 
91  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
92  size_t num_chunks = 0;
93  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
94  ++fileMgrsIt) {
95  num_chunks += fileMgrsIt->second->getNumChunks();
96  }
97 
98  return num_chunks;
99 }
100 
101 void GlobalFileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
102  /* keyPrefix[0] can be -1 only for gpu or cpu buffers but not for FileMgr.
103  * There is no assert here, as GlobalFileMgr is being called with -1 value as well in
104  * the same loop with other buffers. So the case of -1 will just be ignored, as nothing
105  * needs to be done.
106  */
107  if (keyPrefix[0] != -1) {
108  return getFileMgr(keyPrefix)->deleteBuffersWithPrefix(keyPrefix, purge);
109  }
110 }
111 
112 AbstractBufferMgr* GlobalFileMgr::findFileMgrUnlocked(const int32_t db_id,
113  const int32_t tb_id) {
114  // NOTE: only call this private function after locking is already in place
115  AbstractBufferMgr* fm = nullptr;
116  const auto file_mgr_key = std::make_pair(db_id, tb_id);
117  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
118  fm = it->second;
119  }
120  return fm;
121 }
122 
123 void GlobalFileMgr::deleteFileMgr(const int32_t db_id, const int32_t tb_id) {
124  // NOTE: only call this private function after locking is already in place
125  const auto file_mgr_key = std::make_pair(db_id, tb_id);
126  if (auto it = ownedFileMgrs_.find(file_mgr_key); it != ownedFileMgrs_.end()) {
127  ownedFileMgrs_.erase(it);
128  }
129  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
130  allFileMgrs_.erase(it);
131  }
132 }
133 
134 void GlobalFileMgr::closeFileMgr(const int32_t db_id, const int32_t tb_id) {
135  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
136  deleteFileMgr(db_id, tb_id);
137 }
138 
140  FileMgr* file_mgr,
141  const FileMgrParams& file_mgr_params) const {
142  if (file_mgr_params.epoch != -1 &&
143  file_mgr_params.epoch != file_mgr->lastCheckpointedEpoch()) {
144  return true;
145  }
146  if (file_mgr_params.max_rollback_epochs != -1 &&
147  file_mgr_params.max_rollback_epochs != file_mgr->maxRollbackEpochs()) {
148  return true;
149  }
150  return false;
151 }
152 
153 void GlobalFileMgr::setFileMgrParams(const int32_t db_id,
154  const int32_t tb_id,
155  const FileMgrParams& file_mgr_params) {
156  auto fm = dynamic_cast<File_Namespace::FileMgr*>(findFileMgr(db_id, tb_id));
157  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
158  if (fm) {
159  deleteFileMgr(db_id, tb_id);
160  }
161  const auto file_mgr_key = std::make_pair(db_id, tb_id);
162  auto max_rollback_epochs =
163  (file_mgr_params.max_rollback_epochs >= 0 ? file_mgr_params.max_rollback_epochs
164  : -1);
165  auto s = std::make_shared<FileMgr>(
166  0,
167  this,
168  file_mgr_key,
169  max_rollback_epochs,
171  file_mgr_params.epoch != -1 ? file_mgr_params.epoch : epoch_,
173  CHECK(ownedFileMgrs_.insert(std::make_pair(file_mgr_key, s)).second);
174  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, s.get())).second);
175  max_rollback_epochs_per_table_[{db_id, tb_id}] = max_rollback_epochs;
176  return;
177 }
178 
179 AbstractBufferMgr* GlobalFileMgr::getFileMgr(const int32_t db_id, const int32_t tb_id) {
180  { // check if FileMgr already exists for (db_id, tb_id)
181  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
182  AbstractBufferMgr* fm = findFileMgrUnlocked(db_id, tb_id);
183  if (fm) {
184  return fm;
185  }
186  }
187 
188  { // create new FileMgr for (db_id, tb_id)
189  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
190  AbstractBufferMgr* fm = findFileMgrUnlocked(db_id, tb_id);
191  if (fm) {
192  return fm; // mgr was added between the read lock and the write lock
193  }
194  const auto file_mgr_key = std::make_pair(db_id, tb_id);
195  const auto foreign_buffer_manager = fsi_->lookupBufferManager(db_id, tb_id);
196  if (foreign_buffer_manager) {
197  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, foreign_buffer_manager))
198  .second);
199  return foreign_buffer_manager;
200  } else {
201  int32_t max_rollback_epochs{-1};
202  if (max_rollback_epochs_per_table_.find(file_mgr_key) !=
204  max_rollback_epochs = max_rollback_epochs_per_table_[file_mgr_key];
205  }
206  auto s = std::make_shared<FileMgr>(0,
207  this,
208  file_mgr_key,
209  max_rollback_epochs,
211  epoch_,
213  CHECK(ownedFileMgrs_.insert(std::make_pair(file_mgr_key, s)).second);
214  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, s.get())).second);
215  return s.get();
216  }
217  }
218 }
219 
220 // For testing purposes only
221 std::shared_ptr<FileMgr> GlobalFileMgr::getSharedFileMgr(const int db_id,
222  const int table_id) {
223  const auto table_key = std::make_pair(db_id, table_id);
224  if (ownedFileMgrs_.find(table_key) == ownedFileMgrs_.end()) {
225  return nullptr;
226  }
227  return ownedFileMgrs_[table_key];
228 }
229 
230 // For testing purposes only
231 void GlobalFileMgr::setFileMgr(const int db_id,
232  const int table_id,
233  std::shared_ptr<FileMgr> file_mgr) {
234  allFileMgrs_[{db_id, table_id}] = file_mgr.get();
235  ownedFileMgrs_[{db_id, table_id}] = file_mgr;
236 }
237 
239  FileMgr* fileMgr) { // this function is not used, keep it for now for future needs
240  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
241  for (auto fileMgrIt = allFileMgrs_.begin(); fileMgrIt != allFileMgrs_.end();
242  fileMgrIt++) {
243  FileMgr* fm = dynamic_cast<FileMgr*>(fileMgrIt->second);
244  CHECK(fm);
245  if ((fileMgr != 0) && (fileMgr != fm)) {
246  continue;
247  }
248  for (auto chunkIt = fm->chunkIndex_.begin(); chunkIt != fm->chunkIndex_.end();
249  chunkIt++) {
250  chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0);
251  }
252  }
253 }
254 
255 void GlobalFileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) {
256  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
257  auto fm = dynamic_cast<File_Namespace::FileMgr*>(findFileMgrUnlocked(db_id, tb_id));
258  if (fm) {
259  fm->closeRemovePhysical();
260  } else {
261  // fileMgr has not been initialized so there is no need to
262  // spend the time initializing
263  // initialize just enough to have to rename
264  const auto file_mgr_key = std::make_pair(db_id, tb_id);
265  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, defaultPageSize_, true);
266  u->closeRemovePhysical();
267  }
268  // remove table related in-memory DS only if directory was removed successfully
269 
270  deleteFileMgr(db_id, tb_id);
271  max_rollback_epochs_per_table_.erase({db_id, tb_id});
272 }
273 
274 void GlobalFileMgr::setTableEpoch(const int32_t db_id,
275  const int32_t tb_id,
276  const int32_t start_epoch) {
277  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
278  if (opened_fm) {
279  // Delete this FileMgr to ensure epoch change occurs in constructor with other
280  // reads/writes locked out
281  deleteFileMgr(db_id, tb_id);
282  }
283  const auto file_mgr_key = std::make_pair(db_id, tb_id);
284  // this is where the real rollback of any data ahead of the currently set epoch is
285  // performed
286  // Will call set_epoch with start_epoch internally
287  auto u = std::make_unique<FileMgr>(
288  0, this, file_mgr_key, -1, num_reader_threads_, start_epoch, defaultPageSize_);
289  // remove the dummy one we built
290  u.reset();
291 }
292 
293 size_t GlobalFileMgr::getTableEpoch(const int32_t db_id, const int32_t tb_id) {
294  // UX change was made to this function Oct 2020 to return checkpointed epoch. In turn,
295  // setTableEpoch was changed to set the epoch at the user's input, instead of input - 1
296  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
297  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
298  if (opened_fm) {
299  return dynamic_cast<FileMgr*>(opened_fm)->lastCheckpointedEpoch();
300  }
301  // Do not do full init of table just to get table epoch, just check file instead
302  const auto file_mgr_key = std::make_pair(db_id, tb_id);
303  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, defaultPageSize_, true);
304  const auto epoch = u->lastCheckpointedEpoch();
305  u.reset();
306  return epoch;
307 }
308 
309 StorageStats GlobalFileMgr::getStorageStats(const int32_t db_id, const int32_t tb_id) {
310  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
311  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
312  if (opened_fm) {
313  return dynamic_cast<FileMgr*>(opened_fm)->getStorageStats();
314  }
315  // Do not do full init of table just to get storage stats, just check file instead
316  const auto file_mgr_key = std::make_pair(db_id, tb_id);
317  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, defaultPageSize_, true);
318  const auto basic_storage_stats = u->getStorageStats();
319  u.reset();
320  return basic_storage_stats;
321 }
322 
323 void GlobalFileMgr::compactDataFiles(const int32_t db_id, const int32_t tb_id) {
324  auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(findFileMgr(db_id, tb_id));
325  {
326  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
327  if (file_mgr) {
328  file_mgr->compactFiles();
329  deleteFileMgr(db_id, tb_id);
330  }
331  }
332 
333  // Re-initialize file manager
334  getFileMgr(db_id, tb_id);
335 }
336 } // namespace File_Namespace
void writeFileMgrData(FileMgr *fileMgr=0)
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
std::vector< int > ChunkKey
Definition: types.h:37
void deleteFileMgr(const int32_t db_id, const int32_t tb_id)
int32_t epoch_
number of threads used when loading data
std::shared_ptr< ForeignStorageInterface > fsi_
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
std::map< TablePair, std::shared_ptr< FileMgr > > ownedFileMgrs_
#define LOG(tag)
Definition: Logger.h:203
size_t getNumChunks() override
std::map< TablePair, AbstractBufferMgr * > allFileMgrs_
void setTableEpoch(const int32_t db_id, const int32_t tb_id, const int32_t start_epoch)
StorageStats getStorageStats(const int32_t db_id, const int32_t tb_id)
AbstractBufferMgr * findFileMgrUnlocked(const int32_t db_id, const int32_t tb_id)
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:318
std::shared_ptr< FileMgr > getSharedFileMgr(const int db_id, const int table_id)
int32_t omnisci_db_version_
default page size, used to set FileMgr defaultPageSize_
bool existsDiffBetweenFileMgrParamsAndFileMgr(FileMgr *file_mgr, const FileMgrParams &file_mgr_params) const
void compactDataFiles(const int32_t db_id, const int32_t tb_id)
virtual void closeRemovePhysical()
Definition: FileMgr.cpp:550
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t num_reader_threads_
The OS file system path containing the files.
std::map< TablePair, int32_t > max_rollback_epochs_per_table_
void setFileMgr(const int db_id, const int table_id, std::shared_ptr< FileMgr > file_mgr)
void closeFileMgr(const int32_t db_id, const int32_t tb_id)
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
#define CHECK(condition)
Definition: Logger.h:209
AbstractBufferMgr * findFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_mutex fileMgrs_mutex_
mapd_unique_lock< mapd_shared_mutex > write_lock
int32_t maxRollbackEpochs()
Returns value max_rollback_epochs.
Definition: FileMgr.h:298
void removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) override
A selection of helper methods for File I/O.
int32_t lastCheckpointedEpoch()
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:291
size_t getTableEpoch(const int32_t db_id, const int32_t tb_id)