OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
GlobalFileMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "GlobalFileMgr.h"
24 #include <boost/filesystem.hpp>
25 #include <boost/lexical_cast.hpp>
26 #include <string>
27 #include "../../Shared/File.h"
28 
29 #include <fcntl.h>
30 #include <unistd.h>
31 #include <algorithm>
32 #include <thread>
33 #include <utility>
34 #include <vector>
35 
36 using namespace std;
37 
38 namespace File_Namespace {
39 
40 GlobalFileMgr::GlobalFileMgr(const int deviceId,
41  std::string basePath,
42  const size_t num_reader_threads,
43  const size_t defaultPageSize)
44  : AbstractBufferMgr(deviceId)
45  , basePath_(basePath)
46  , num_reader_threads_(num_reader_threads)
47  , epoch_(-1)
48  , // set the default epoch for all tables corresponding to the time of
49  // last checkpoint
50  defaultPageSize_(defaultPageSize) {
52  1; // DS changes triggered by individual FileMgr per table project (release 2.1.0)
53  dbConvert_ = false;
54  init();
55 }
56 
58  mapd_lock_guard<mapd_shared_mutex> fileMgrsMutex(fileMgrs_mutex_);
59  for (auto fileMgrsIt = fileMgrs_.begin(); fileMgrsIt != fileMgrs_.end(); ++fileMgrsIt) {
60  delete fileMgrsIt->second;
61  }
62 }
63 
65  // check if basePath_ already exists, and if not create one
66  boost::filesystem::path path(basePath_);
67  if (basePath_.size() > 0 && basePath_[basePath_.size() - 1] != '/') {
68  basePath_.push_back('/');
69  }
70  if (boost::filesystem::exists(path)) {
71  if (!boost::filesystem::is_directory(path)) {
72  LOG(FATAL) << "Specified path is not a directory.";
73  }
74  } else { // data directory does not exist
75  if (!boost::filesystem::create_directory(path)) {
76  LOG(FATAL) << "Could not create data directory";
77  }
78  }
79 }
80 
82  mapd_lock_guard<mapd_shared_mutex> fileMgrsMutex(fileMgrs_mutex_);
83  for (auto fileMgrsIt = fileMgrs_.begin(); fileMgrsIt != fileMgrs_.end(); ++fileMgrsIt) {
84  fileMgrsIt->second->checkpoint();
85  }
86 }
87 
88 void GlobalFileMgr::checkpoint(const int db_id, const int tb_id) {
89  getFileMgr(db_id, tb_id)->checkpoint();
90 }
91 
93  {
94  mapd_shared_lock<mapd_shared_mutex> fileMgrsMutex(fileMgrs_mutex_);
95  size_t num_chunks = 0;
96  for (auto fileMgrsIt = fileMgrs_.begin(); fileMgrsIt != fileMgrs_.end();
97  ++fileMgrsIt) {
98  num_chunks += fileMgrsIt->second->getNumChunks();
99  }
100 
101  return num_chunks;
102  }
103 }
104 
105 void GlobalFileMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
106  /* keyPrefix[0] can be -1 only for gpu or cpu buffers but not for FileMgr.
107  * There is no assert here, as GlobalFileMgr is being called with -1 value as well in
108  * the same loop with other buffers. So the case of -1 will just be ignored, as nothing
109  * needs to be done.
110  */
111  if (keyPrefix[0] != -1) {
112  return getFileMgr(keyPrefix)->deleteBuffersWithPrefix(keyPrefix, purge);
113  }
114 }
115 
117  std::vector<std::pair<ChunkKey, ChunkMetadata>>& chunkMetadataVec) {
118  mapd_shared_lock<mapd_shared_mutex> fileMgrsMutex(fileMgrs_mutex_);
119  std::vector<std::pair<ChunkKey, ChunkMetadata>> chunkMetadataVecForFileMgr;
120  for (auto fileMgrsIt = fileMgrs_.begin(); fileMgrsIt != fileMgrs_.end(); ++fileMgrsIt) {
121  fileMgrsIt->second->getChunkMetadataVec(chunkMetadataVecForFileMgr);
122  while (!chunkMetadataVecForFileMgr.empty()) {
123  // norair - order of elements is reversed, consider optimising this later if needed
124  chunkMetadataVec.push_back(chunkMetadataVecForFileMgr.back());
125  chunkMetadataVecForFileMgr.pop_back();
126  }
127  }
128 }
129 
131  const int tb_id,
132  const bool removeFromMap) {
133  FileMgr* fm = nullptr;
134  const auto file_mgr_key = std::make_pair(db_id, tb_id);
135  {
136  mapd_lock_guard<mapd_shared_mutex> read_lock(fileMgrs_mutex_);
137  auto it = fileMgrs_.find(file_mgr_key);
138  if (it != fileMgrs_.end()) {
139  fm = it->second;
140  if (removeFromMap) {
141  fileMgrs_.erase(it);
142  }
143  }
144  }
145  return fm;
146 }
147 
148 FileMgr* GlobalFileMgr::getFileMgr(const int db_id, const int tb_id) {
149  { /* check if FileMgr already exists for (db_id, tb_id) */
150  FileMgr* fm = findFileMgr(db_id, tb_id);
151  if (fm != nullptr) {
152  return fm;
153  }
154  }
155 
156  { /* create new FileMgr for (db_id, tb_id) */
157  const auto file_mgr_key = std::make_pair(db_id, tb_id);
158  mapd_lock_guard<mapd_shared_mutex> write_lock(fileMgrs_mutex_);
159  auto it = fileMgrs_.find(file_mgr_key);
160  if (it != fileMgrs_.end()) {
161  return it->second;
162  }
163  FileMgr* fm =
164  new FileMgr(0, this, file_mgr_key, num_reader_threads_, epoch_, defaultPageSize_);
165  auto it_ok = fileMgrs_.insert(std::make_pair(file_mgr_key, fm));
166  CHECK(it_ok.second);
167 
168  return fm;
169  }
170 }
171 
173  FileMgr* fileMgr) { // this function is not used, keep it for now for future needs
174  for (auto fileMgrIt = fileMgrs_.begin(); fileMgrIt != fileMgrs_.end(); fileMgrIt++) {
175  FileMgr* fm = fileMgrIt->second;
176  if ((fileMgr != 0) && (fileMgr != fm)) {
177  continue;
178  }
179  for (auto chunkIt = fm->chunkIndex_.begin(); chunkIt != fm->chunkIndex_.end();
180  chunkIt++) {
181  chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0);
182  // chunkIt->second->write((int8_t*)chunkIt->second, chunkIt->second->size(), 0,
183  // CPU_LEVEL, -1);
184  }
185  }
186 }
187 
188 void GlobalFileMgr::removeTableRelatedDS(const int db_id, const int tb_id) {
189  FileMgr* fm = findFileMgr(db_id, tb_id, true);
190  if (fm == nullptr) {
191  // fileMgr has not been initialized so there is no need to
192  // spend the time initializing
193  // inmitialize just enough to have to rename
194  const auto file_mgr_key = std::make_pair(db_id, tb_id);
195  fm = new FileMgr(0, this, file_mgr_key, true);
196  }
197  fm->closeRemovePhysical();
198  /* remove table related in-memory DS only if directory was removed successfully */
199 
200  delete fm;
201 }
202 
203 void GlobalFileMgr::setTableEpoch(const int db_id,
204  const int tb_id,
205  const int start_epoch) {
206  const auto file_mgr_key = std::make_pair(db_id, tb_id);
207  // this is where the real rollback of any data ahead of the currently set epoch is
208  // performed
209  FileMgr* fm = new FileMgr(
210  0, this, file_mgr_key, num_reader_threads_, start_epoch, defaultPageSize_);
211  fm->setEpoch(start_epoch - 1);
212  // remove the dummy one we built
213  delete fm;
214 
215  // see if one exists currently, and remove it
216  fm = findFileMgr(db_id, tb_id, true);
217 
218  if (fm != nullptr) {
219  LOG(INFO) << "found and removed fm";
220  delete fm;
221  }
222 }
223 
224 size_t GlobalFileMgr::getTableEpoch(const int db_id, const int tb_id) {
225  FileMgr* fm = getFileMgr(db_id, tb_id);
226 
227  return fm->epoch_;
228 }
229 
230 } // namespace File_Namespace
void writeFileMgrData(FileMgr *fileMgr=0)
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
std::vector< int > ChunkKey
Definition: types.h:35
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
int epoch_
number of threads used when loading data
#define LOG(tag)
Definition: Logger.h:185
FileMgr * getFileMgr(const int db_id, const int tb_id)
void getChunkMetadataVec(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec) override
size_t getTableEpoch(const int db_id, const int tb_id)
size_t getNumChunks() override
void checkpoint() override
Fsyncs data files, writes out epoch and fsyncs that.
Definition: FileMgr.cpp:577
FileMgr * findFileMgr(const int db_id, const int tb_id, const bool removeFromMap=false)
void setTableEpoch(const int db_id, const int tb_id, const int start_epoch)
CHECK(cgen_state)
int mapd_db_version_
default page size, used to set FileMgr defaultPageSize_
int epoch_
the index of the next file id
Definition: FileMgr.h:246
ChunkKeyToChunkMap chunkIndex_
Definition: FileMgr.h:225
void removeTableRelatedDS(const int db_id, const int tb_id)
~GlobalFileMgr() override
Destructor.
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: FileMgr.cpp:653
std::map< std::pair< int, int >, FileMgr * > fileMgrs_
size_t num_reader_threads_
The OS file system path containing the files.
mapd_shared_mutex fileMgrs_mutex_
void setEpoch(int epoch)
Definition: FileMgr.cpp:992