OmniSciDB  2b310ab3b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
GlobalFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <algorithm>
28 #include <boost/filesystem.hpp>
29 #include <boost/lexical_cast.hpp>
30 #include <string>
31 #include <thread>
32 #include <utility>
33 #include <vector>
34 
36 #include "Shared/File.h"
37 
38 using namespace std;
39 
40 namespace File_Namespace {
41 
42 GlobalFileMgr::GlobalFileMgr(const int deviceId,
43  std::string basePath,
44  const size_t num_reader_threads,
45  const size_t defaultPageSize)
46  : AbstractBufferMgr(deviceId)
47  , basePath_(basePath)
48  , num_reader_threads_(num_reader_threads)
49  , epoch_(-1)
50  , // set the default epoch for all tables corresponding to the time of
51  // last checkpoint
52  defaultPageSize_(defaultPageSize) {
54  1; // DS changes triggered by individual FileMgr per table project (release 2.1.0)
55  dbConvert_ = false;
56  init();
57 }
58 
60  // check if basePath_ already exists, and if not create one
61  boost::filesystem::path path(basePath_);
62  if (basePath_.size() > 0 && basePath_[basePath_.size() - 1] != '/') {
63  basePath_.push_back('/');
64  }
65  if (boost::filesystem::exists(path)) {
66  if (!boost::filesystem::is_directory(path)) {
67  LOG(FATAL) << "Specified path is not a directory.";
68  }
69  } else { // data directory does not exist
70  if (!boost::filesystem::create_directory(path)) {
71  LOG(FATAL) << "Could not create data directory";
72  }
73  }
74 }
75 
77  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
78  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
79  ++fileMgrsIt) {
80  fileMgrsIt->second->checkpoint();
81  }
82 }
83 
84 void GlobalFileMgr::checkpoint(const int db_id, const int tb_id) {
85  getFileMgr(db_id, tb_id)->checkpoint();
86 }
87 
89  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
90  size_t num_chunks = 0;
91  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
92  ++fileMgrsIt) {
93  num_chunks += fileMgrsIt->second->getNumChunks();
94  }
95 
96  return num_chunks;
97 }
98 
99 void GlobalFileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
100  /* keyPrefix[0] can be -1 only for gpu or cpu buffers but not for FileMgr.
101  * There is no assert here, as GlobalFileMgr is being called with -1 value as well in
102  * the same loop with other buffers. So the case of -1 will just be ignored, as nothing
103  * needs to be done.
104  */
105  if (keyPrefix[0] != -1) {
106  return getFileMgr(keyPrefix)->deleteBuffersWithPrefix(keyPrefix, purge);
107  }
108 }
109 
111  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
112  ChunkMetadataVector chunkMetadataVecForFileMgr;
113  for (auto fileMgrsIt = allFileMgrs_.begin(); fileMgrsIt != allFileMgrs_.end();
114  ++fileMgrsIt) {
115  fileMgrsIt->second->getChunkMetadataVec(chunkMetadataVecForFileMgr);
116  while (!chunkMetadataVecForFileMgr.empty()) {
117  // norair - order of elements is reversed, consider optimising this later if needed
118  chunkMetadataVec.push_back(chunkMetadataVecForFileMgr.back());
119  chunkMetadataVecForFileMgr.pop_back();
120  }
121  }
122 }
123 
124 AbstractBufferMgr* GlobalFileMgr::findFileMgr(const int db_id, const int tb_id) {
125  // NOTE: only call this private function after locking is already in place
126  AbstractBufferMgr* fm = nullptr;
127  const auto file_mgr_key = std::make_pair(db_id, tb_id);
128  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
129  fm = it->second;
130  }
131  return fm;
132 }
133 
134 void GlobalFileMgr::deleteFileMgr(const int db_id, const int tb_id) {
135  // NOTE: only call this private function after locking is already in place
136  const auto file_mgr_key = std::make_pair(db_id, tb_id);
137  if (auto it = ownedFileMgrs_.find(file_mgr_key); it != ownedFileMgrs_.end()) {
138  ownedFileMgrs_.erase(it);
139  }
140  if (auto it = allFileMgrs_.find(file_mgr_key); it != allFileMgrs_.end()) {
141  allFileMgrs_.erase(it);
142  }
143 }
144 
145 AbstractBufferMgr* GlobalFileMgr::getFileMgr(const int db_id, const int tb_id) {
146  { // check if FileMgr already exists for (db_id, tb_id)
147  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
148  AbstractBufferMgr* fm = findFileMgr(db_id, tb_id);
149  if (fm) {
150  return fm;
151  }
152  }
153 
154  { // create new FileMgr for (db_id, tb_id)
155  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
156  AbstractBufferMgr* fm = findFileMgr(db_id, tb_id);
157  if (fm) {
158  return fm; // mgr was added between the read lock and the write lock
159  }
160  const auto file_mgr_key = std::make_pair(db_id, tb_id);
161  const auto foreign_buffer_manager =
163  if (foreign_buffer_manager) {
164  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, foreign_buffer_manager))
165  .second);
166  return foreign_buffer_manager;
167  } else {
168  auto s = std::make_shared<FileMgr>(
169  0, this, file_mgr_key, num_reader_threads_, epoch_, defaultPageSize_);
170  CHECK(ownedFileMgrs_.insert(std::make_pair(file_mgr_key, s)).second);
171  CHECK(allFileMgrs_.insert(std::make_pair(file_mgr_key, s.get())).second);
172  return s.get();
173  }
174  }
175 }
176 
178  FileMgr* fileMgr) { // this function is not used, keep it for now for future needs
179  mapd_shared_lock<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
180  for (auto fileMgrIt = allFileMgrs_.begin(); fileMgrIt != allFileMgrs_.end();
181  fileMgrIt++) {
182  FileMgr* fm = dynamic_cast<FileMgr*>(fileMgrIt->second);
183  CHECK(fm);
184  if ((fileMgr != 0) && (fileMgr != fm)) {
185  continue;
186  }
187  for (auto chunkIt = fm->chunkIndex_.begin(); chunkIt != fm->chunkIndex_.end();
188  chunkIt++) {
189  chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0);
190  // chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0,
191  // CPU_LEVEL, -1);
192  }
193  }
194 }
195 
196 void GlobalFileMgr::removeTableRelatedDS(const int db_id, const int tb_id) {
197  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
198  auto fm = dynamic_cast<File_Namespace::FileMgr*>(findFileMgr(db_id, tb_id));
199  if (fm) {
200  fm->closeRemovePhysical();
201  } else {
202  // fileMgr has not been initialized so there is no need to
203  // spend the time initializing
204  // initialize just enough to have to rename
205  const auto file_mgr_key = std::make_pair(db_id, tb_id);
206  auto u = std::make_unique<FileMgr>(0, this, file_mgr_key, true);
207  u->closeRemovePhysical();
208  }
209  // remove table related in-memory DS only if directory was removed successfully
210 
211  deleteFileMgr(db_id, tb_id);
212 }
213 
214 void GlobalFileMgr::setTableEpoch(const int db_id,
215  const int tb_id,
216  const int start_epoch) {
217  mapd_unique_lock<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
218  const auto file_mgr_key = std::make_pair(db_id, tb_id);
219  // this is where the real rollback of any data ahead of the currently set epoch is
220  // performed
221  auto u = std::make_unique<FileMgr>(
222  0, this, file_mgr_key, num_reader_threads_, start_epoch, defaultPageSize_);
223  u->setEpoch(start_epoch - 1);
224  // remove the dummy one we built
225  u.reset();
226 
227  // see if one exists currently, and remove it
228  deleteFileMgr(db_id, tb_id);
229 }
230 
231 size_t GlobalFileMgr::getTableEpoch(const int db_id, const int tb_id) {
232  auto fm = dynamic_cast<FileMgr*>(getFileMgr(db_id, tb_id));
233  CHECK(fm);
234  return fm->epoch_;
235 }
236 
237 } // namespace File_Namespace
void writeFileMgrData(FileMgr *fileMgr=0)
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
std::vector< int > ChunkKey
Definition: types.h:37
static Data_Namespace::AbstractBufferMgr * lookupBufferManager(const int db_id, const int table_id)
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
int epoch_
number of threads used when loading data
#define LOG(tag)
Definition: Logger.h:188
size_t getTableEpoch(const int db_id, const int tb_id)
size_t getNumChunks() override
AbstractBufferMgr * findFileMgr(const int db_id, const int tb_id)
void setTableEpoch(const int db_id, const int tb_id, const int start_epoch)
std::map< std::pair< int, int >, std::shared_ptr< FileMgr > > ownedFileMgrs_
CHECK(cgen_state)
int mapd_db_version_
default page size, used to set FileMgr defaultPageSize_
std::map< std::pair< int, int >, AbstractBufferMgr * > allFileMgrs_
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:223
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
AbstractBufferMgr * getFileMgr(const int db_id, const int tb_id)
void removeTableRelatedDS(const int db_id, const int tb_id) override
void getChunkMetadataVec(ChunkMetadataVector &chunkMetadataVec) override
mapd_shared_lock< mapd_shared_mutex > read_lock
size_t num_reader_threads_
The OS file system path containing the files.
mapd_shared_mutex fileMgrs_mutex_
void deleteFileMgr(const int db_id, const int tb_id)
mapd_unique_lock< mapd_shared_mutex > write_lock
A selection of helper methods for File I/O.