OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
GlobalFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <algorithm>
27 #include <boost/filesystem.hpp>
28 #include <boost/lexical_cast.hpp>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
35 #include "Shared/File.h"
36 
37 using namespace std;
38 
39 namespace File_Namespace {
40 
41 GlobalFileMgr::GlobalFileMgr(const int32_t deviceId,
42  std::string basePath,
43  const size_t num_reader_threads,
44  const size_t defaultPageSize)
45  : AbstractBufferMgr(deviceId)
46  , basePath_(basePath)
47  , num_reader_threads_(num_reader_threads)
48  , epoch_(-1)
49  , // set the default epoch for all tables corresponding to the time of
50  // last checkpoint
51  defaultPageSize_(defaultPageSize) {
53  // DS changes also triggered by individual FileMgr per table project (release 2.1.0)
54  dbConvert_ = false;
55  init();
56 }
57 
59  // check if basePath_ already exists, and if not create one
60  boost::filesystem::path path(basePath_);
61  if (basePath_.size() > 0 && basePath_[basePath_.size() - 1] != '/') {
62  basePath_.push_back('/');
63  }
64  if (boost::filesystem::exists(path)) {
65  if (!boost::filesystem::is_directory(path)) {
66  LOG(FATAL) << "Specified path is not a directory.";
67  }
68  } else { // data directory does not exist
69  if (!boost::filesystem::create_directory(path)) {
70  LOG(FATAL) << "Could not create data directory";
71  }
72  }
73 }
74 
76  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
77  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
78  ++fileMgrsIt) {
79  fileMgrsIt->second->checkpoint();
80  }
81 }
82 
83 void GlobalFileMgr::checkpoint(const int32_t db_id, const int32_t tb_id) {
84  getFileMgr(db_id, tb_id)->checkpoint();
85 }
86 
88  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
89  size_t num_chunks = 0;
90  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
91  ++fileMgrsIt) {
92  num_chunks += fileMgrsIt->second->getNumChunks();
93  }
94 
95  return num_chunks;
96 }
97 
98 void GlobalFileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
99  /* keyPrefix[0] can be -1 only for gpu or cpu buffers but not for FileMgr.
100  * There is no assert here, as GlobalFileMgr is being called with -1 value as well in
101  * the same loop with other buffers. So the case of -1 will just be ignored, as nothing
102  * needs to be done.
103  */
104  if (keyPrefix[0] != -1) {
105  return getFileMgr(keyPrefix)->deleteBuffersWithPrefix(keyPrefix, purge);
106  }
107 }
108 
109 AbstractBufferMgr* GlobalFileMgr::findFileMgrUnlocked(const int32_t db_id,
110  const int32_t tb_id) {
111  // NOTE: only call this private function after locking is already in place
112  AbstractBufferMgr* fm = nullptr;
113  const auto file_mgr_key = std::make_pair(db_id, tb_id);
114  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
115  fm = it->second;
116  }
117  return fm;
118 }
119 
120 void GlobalFileMgr::deleteFileMgr(const int32_t db_id, const int32_t tb_id) {
121  // NOTE: only call this private function after locking is already in place
122  const auto file_mgr_key = std::make_pair(db_id, tb_id);
123  if (auto it = ownedFileMgrs_.find(file_mgr_key); it != ownedFileMgrs_.end()) {
124  ownedFileMgrs_.erase(it);
125  }
126  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
127  allFileMgrs_.erase(it);
128  }
129 }
130 
131 void GlobalFileMgr::closeFileMgr(const int32_t db_id, const int32_t tb_id) {
132  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
133  deleteFileMgr(db_id, tb_id);
134 }
135 
137  FileMgr* file_mgr,
138  const FileMgrParams& file_mgr_params) const {
139  if (file_mgr_params.epoch != -1 &&
140  file_mgr_params.epoch != file_mgr->lastCheckpointedEpoch()) {
141  return true;
142  }
143  if (file_mgr_params.max_rollback_epochs != -1 &&
144  file_mgr_params.max_rollback_epochs != file_mgr->maxRollbackEpochs()) {
145  return true;
146  }
147  return false;
148 }
149 
150 void GlobalFileMgr::setFileMgrParams(const int32_t db_id,
151  const int32_t tb_id,
152  const FileMgrParams& file_mgr_params) {
153  auto fm = dynamic_cast<File_Namespace::FileMgr*>(findFileMgr(db_id, tb_id));
154  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
155  if (fm) {
156  deleteFileMgr(db_id, tb_id);
157  }
158  const auto file_mgr_key = std::make_pair(db_id, tb_id);
159  auto s = std::make_shared<FileMgr>(
160  0,
161  this,
162  file_mgr_key,
163  file_mgr_params.max_rollback_epochs >= 0 ? file_mgr_params.max_rollback_epochs : -1,
165  file_mgr_params.epoch != -1 ? file_mgr_params.epoch : epoch_,
167  CHECK(ownedFileMgrs_.insert(std::make_pair(file_mgr_key, s)).second);
168  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, s.get())).second);
169  return;
170 }
171 
172 AbstractBufferMgr* GlobalFileMgr::getFileMgr(const int32_t db_id, const int32_t tb_id) {
173  { // check if FileMgr already exists for (db_id, tb_id)
174  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
175  AbstractBufferMgr* fm = findFileMgrUnlocked(db_id, tb_id);
176  if (fm) {
177  return fm;
178  }
179  }
180 
181  { // create new FileMgr for (db_id, tb_id)
182  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
183  AbstractBufferMgr* fm = findFileMgrUnlocked(db_id, tb_id);
184  if (fm) {
185  return fm; // mgr was added between the read lock and the write lock
186  }
187  const auto file_mgr_key = std::make_pair(db_id, tb_id);
188  const auto foreign_buffer_manager =
190  if (foreign_buffer_manager) {
191  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, foreign_buffer_manager))
192  .second);
193  return foreign_buffer_manager;
194  } else {
195  auto s = std::make_shared<FileMgr>(0,
196  this,
197  file_mgr_key,
198  -1 /*DEFAULT_MAX_ROLLBACK_EPOCHS*/,
200  epoch_,
202  CHECK(ownedFileMgrs_.insert(std::make_pair(file_mgr_key, s)).second);
203  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, s.get())).second);
204  return s.get();
205  }
206  }
207 }
208 
209 // For testing purposes only
210 std::shared_ptr<FileMgr> GlobalFileMgr::getSharedFileMgr(const int db_id,
211  const int table_id) {
212  const auto table_key = std::make_pair(db_id, table_id);
213  if (ownedFileMgrs_.find(table_key) == ownedFileMgrs_.end()) {
214  return nullptr;
215  }
216  return ownedFileMgrs_[table_key];
217 }
218 
219 // For testing purposes only
220 void GlobalFileMgr::setFileMgr(const int db_id,
221  const int table_id,
222  std::shared_ptr<FileMgr> file_mgr) {
223  allFileMgrs_[{db_id, table_id}] = file_mgr.get();
224  ownedFileMgrs_[{db_id, table_id}] = file_mgr;
225 }
226 
228  FileMgr* fileMgr) { // this function is not used, keep it for now for future needs
229  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
230  for (auto fileMgrIt = allFileMgrs_.begin(); fileMgrIt != allFileMgrs_.end();
231  fileMgrIt++) {
232  FileMgr* fm = dynamic_cast<FileMgr*>(fileMgrIt->second);
233  CHECK(fm);
234  if ((fileMgr != 0) && (fileMgr != fm)) {
235  continue;
236  }
237  for (auto chunkIt = fm->chunkIndex_.begin(); chunkIt != fm->chunkIndex_.end();
238  chunkIt++) {
239  chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0);
240  }
241  }
242 }
243 
244 void GlobalFileMgr::removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) {
245  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
246  auto fm = dynamic_cast<File_Namespace::FileMgr*>(findFileMgrUnlocked(db_id, tb_id));
247  if (fm) {
248  fm->closeRemovePhysical();
249  } else {
250  // fileMgr has not been initialized so there is no need to
251  // spend the time initializing
252  // initialize just enough to have to rename
253  const auto file_mgr_key = std::make_pair(db_id, tb_id);
254  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, defaultPageSize_, true);
255  u->closeRemovePhysical();
256  }
257  // remove table related in-memory DS only if directory was removed successfully
258 
259  deleteFileMgr(db_id, tb_id);
260 }
261 
262 void GlobalFileMgr::setTableEpoch(const int32_t db_id,
263  const int32_t tb_id,
264  const int32_t start_epoch) {
265  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
266  if (opened_fm) {
267  // Delete this FileMgr to ensure epoch change occurs in constructor with other
268  // reads/writes locked out
269  deleteFileMgr(db_id, tb_id);
270  }
271  const auto file_mgr_key = std::make_pair(db_id, tb_id);
272  // this is where the real rollback of any data ahead of the currently set epoch is
273  // performed
274  // Will call set_epoch with start_epoch internally
275  auto u = std::make_unique<FileMgr>(
276  0, this, file_mgr_key, -1, num_reader_threads_, start_epoch, defaultPageSize_);
277  // remove the dummy one we built
278  u.reset();
279 }
280 
281 size_t GlobalFileMgr::getTableEpoch(const int32_t db_id, const int32_t tb_id) {
282  // UX change was made to this function Oct 2020 to return checkpointed epoch. In turn,
283  // setTableEpoch was changed to set the epoch at the user's input, instead of input - 1
284  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
285  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
286  if (opened_fm) {
287  return dynamic_cast<FileMgr*>(opened_fm)->lastCheckpointedEpoch();
288  }
289  // Do not do full init of table just to get table epoch, just check file instead
290  const auto file_mgr_key = std::make_pair(db_id, tb_id);
291  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, defaultPageSize_, true);
292  const auto epoch = u->lastCheckpointedEpoch();
293  u.reset();
294  return epoch;
295 }
296 
297 StorageStats GlobalFileMgr::getStorageStats(const int32_t db_id, const int32_t tb_id) {
298  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
299  AbstractBufferMgr* opened_fm = findFileMgr(db_id, tb_id);
300  if (opened_fm) {
301  return dynamic_cast<FileMgr*>(opened_fm)->getStorageStats();
302  }
303  // Do not do full init of table just to get storage stats, just check file instead
304  const auto file_mgr_key = std::make_pair(db_id, tb_id);
305  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, defaultPageSize_, true);
306  const auto basic_storage_stats = u->getStorageStats();
307  u.reset();
308  return basic_storage_stats;
309 }
310 
311 } // namespace File_Namespace
void writeFileMgrData(FileMgr *fileMgr=0)
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
std::vector< int > ChunkKey
Definition: types.h:37
void deleteFileMgr(const int32_t db_id, const int32_t tb_id)
std::map< std::pair< int32_t, int32_t >, AbstractBufferMgr * > allFileMgrs_
int32_t epoch_
number of threads used when loading data
static Data_Namespace::AbstractBufferMgr * lookupBufferManager(const int db_id, const int table_id)
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
#define LOG(tag)
Definition: Logger.h:188
size_t getNumChunks() override
std::map< std::pair< int32_t, int32_t >, std::shared_ptr< FileMgr > > ownedFileMgrs_
void setTableEpoch(const int32_t db_id, const int32_t tb_id, const int32_t start_epoch)
StorageStats getStorageStats(const int32_t db_id, const int32_t tb_id)
AbstractBufferMgr * findFileMgrUnlocked(const int32_t db_id, const int32_t tb_id)
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:297
std::shared_ptr< FileMgr > getSharedFileMgr(const int db_id, const int table_id)
int32_t omnisci_db_version_
default page size, used to set FileMgr defaultPageSize_
bool existsDiffBetweenFileMgrParamsAndFileMgr(FileMgr *file_mgr, const FileMgrParams &file_mgr_params) const
AbstractBufferMgr * getFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t num_reader_threads_
The OS file system path containing the files.
void setFileMgr(const int db_id, const int table_id, std::shared_ptr< FileMgr > file_mgr)
void closeFileMgr(const int32_t db_id, const int32_t tb_id)
void setFileMgrParams(const int32_t db_id, const int32_t tb_id, const FileMgrParams &file_mgr_params)
#define CHECK(condition)
Definition: Logger.h:197
AbstractBufferMgr * findFileMgr(const int32_t db_id, const int32_t tb_id)
mapd_shared_mutex fileMgrs_mutex_
mapd_unique_lock< mapd_shared_mutex > write_lock
int32_t maxRollbackEpochs()
Returns value max_rollback_epochs.
Definition: FileMgr.h:272
void removeTableRelatedDS(const int32_t db_id, const int32_t tb_id) override
A selection of helper methods for File I/O.
int32_t lastCheckpointedEpoch()
Returns value of epoch at last checkpoint.
Definition: FileMgr.h:265
size_t getTableEpoch(const int32_t db_id, const int32_t tb_id)