OmniSciDB  a575cb28ea
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageInterface.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "Shared/StringTransform.h"
19 
21  const ChunkKey& chunk_key,
22  PersistentForeignStorageInterface* persistent_foreign_storage)
23  : Data_Namespace::AbstractBuffer(0)
24  , chunk_key_(chunk_key)
25  , persistent_foreign_storage_(persistent_foreign_storage) {}
26 
27 void ForeignStorageBuffer::read(int8_t* const dst,
28  const size_t numBytes,
29  const size_t offset,
30  const Data_Namespace::MemoryLevel dstBufferType,
31  const int dstDeviceId) {
32  CHECK_EQ(size_t(0), offset);
33  CHECK_EQ(-1, dstDeviceId);
35 }
36 
37 int8_t* ForeignStorageBuffer::tryZeroCopy(const size_t numBytes) {
39 }
40 
42  const size_t numBytes,
43  const Data_Namespace::MemoryLevel srcBufferType,
44  const int deviceId) {
45  setAppended();
46  buff_.insert(buff_.end(), src, src + numBytes);
47  size_ += numBytes;
48 }
49 
51  const int db_id,
52  const int table_id,
53  PersistentForeignStorageInterface* persistent_foreign_storage)
54  : AbstractBufferMgr(0), persistent_foreign_storage_(persistent_foreign_storage) {}
55 
57  // TODO(alex)
58  std::vector<ForeignStorageColumnBuffer> column_buffers;
59  for (auto& kv : chunk_index_) {
60  const auto buffer = kv.second->moveBuffer();
61  column_buffers.emplace_back(
62  ForeignStorageColumnBuffer{kv.first, kv.second->getSqlType(), buffer});
63  }
64  persistent_foreign_storage_->append(column_buffers);
65 }
66 
68  const ChunkKey& key,
69  const size_t pageSize,
70  const size_t initialSize) {
71  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunk_index_mutex_);
72  const auto it_ok = chunk_index_.emplace(
73  key, std::make_unique<ForeignStorageBuffer>(key, persistent_foreign_storage_));
74  // this check fails if we create table, drop it and create again
75  // CHECK(it_ok.second);
76  return it_ok.first->second.get();
77 }
78 
80  const ChunkKey& key,
81  const size_t numBytes) {
82  mapd_shared_lock<mapd_shared_mutex> chunk_index_write_lock(chunk_index_mutex_);
83  const auto it = chunk_index_.find(key);
84  CHECK(it != chunk_index_.end());
85  return it->second.get();
86 }
87 
90  const size_t numBytes) {
91  CHECK(numBytes);
92  auto file_buffer = dynamic_cast<ForeignStorageBuffer*>(getBuffer(key, numBytes));
93  CHECK(file_buffer);
94 
95  // TODO: check if GPU is used
96  auto buf = file_buffer->tryZeroCopy(numBytes);
97  if (buf) {
98  destBuffer->setMemoryPtr(buf);
99  } else {
100  destBuffer->reserve(numBytes);
101  file_buffer->read(destBuffer->getMemoryPtr(), numBytes);
102  }
103  destBuffer->setSize(numBytes);
104  destBuffer->syncEncoder(file_buffer);
105 }
106 
108  ChunkMetadataVector& chunkMetadataVec,
109  const ChunkKey& keyPrefix) {
110  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(
111  chunk_index_mutex_); // is this guarding the right structure? it look slike we
112  // oly read here for chunk
113  auto chunk_it = chunk_index_.lower_bound(keyPrefix);
114  if (chunk_it == chunk_index_.end()) {
115  CHECK(false); // throw?
116  }
117 
118  while (chunk_it != chunk_index_.end() &&
119  std::search(chunk_it->first.begin(),
120  chunk_it->first.begin() + keyPrefix.size(),
121  keyPrefix.begin(),
122  keyPrefix.end()) != chunk_it->first.begin() + keyPrefix.size()) {
123  const auto& chunk_key = chunk_it->first;
124  if (chunk_key.size() == 5) {
125  if (chunk_key[4] == 1) {
126  const auto& buffer = *chunk_it->second;
127  auto type = buffer.getSqlType();
128  auto size = buffer.size();
129  auto subkey = chunk_key;
130  subkey[4] = 2;
131  auto& index_buf = *(chunk_index_.find(subkey)->second);
132  auto bs = index_buf.size() / index_buf.getSqlType().get_size();
133  auto chunk_metadata =
134  std::make_shared<ChunkMetadata>(type, size, bs, ChunkStats{});
135  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
136  }
137  } else {
138  const auto& buffer = *chunk_it->second;
139  auto chunk_metadata = std::make_shared<ChunkMetadata>();
140  chunk_metadata->sqlType = buffer.getSqlType();
141  buffer.getEncoder()->getMetadata(chunk_metadata);
142  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
143  }
144  chunk_it++;
145  }
146 }
147 
148 Data_Namespace::AbstractBufferMgr* ForeignStorageInterface::lookupBufferManager(
149  const int db_id,
150  const int table_id) {
151  auto key = std::make_pair(db_id, table_id);
152  if (managers_map_.count(key)) {
153  return managers_map_[key].get();
154  }
155 
156  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
158  const auto it = table_persistent_storage_interface_map_.find(key);
159  if (it == table_persistent_storage_interface_map_.end()) {
160  return nullptr;
161  }
162  const auto it_ok = managers_map_.emplace(
163  key, std::make_unique<ForeignStorageBufferMgr>(db_id, table_id, it->second));
164  CHECK(it_ok.second);
165  return it_ok.first->second.get();
166 }
167 
169  std::unique_ptr<PersistentForeignStorageInterface> persistent_foreign_storage) {
170  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
172  const auto it_ok = persistent_storage_interfaces_.emplace(
173  persistent_foreign_storage->getType(), std::move(persistent_foreign_storage));
174  CHECK(it_ok.second);
175 }
176 
177 std::pair<std::string, std::string> parseStorageType(const std::string& type) {
178  size_t sep = type.find_first_of(':'), sep2 = sep != std::string::npos ? sep + 1 : sep;
179  auto res = std::make_pair(to_upper(type.substr(0, sep)), type.substr(sep2));
180  return res;
181 }
182 
184  TableDescriptor& td,
185  std::list<ColumnDescriptor>& cols) {
186  auto type = parseStorageType(td.storageType);
187  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
189  const auto it = persistent_storage_interfaces_.find(type.first);
190  if (it == persistent_storage_interfaces_.end()) {
191  throw std::runtime_error("storage type " + type.first + " not supported");
192  }
193  auto& p = it->second;
194  persistent_storage_interfaces_lock.unlock();
195  p->prepareTable(db_id, type.second, td, cols);
196 }
197 
199  const TableDescriptor& td,
200  const std::list<ColumnDescriptor>& cols) {
201  const int table_id = td.tableId;
202  auto type = parseStorageType(td.storageType);
203 
204  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
206  const auto it = persistent_storage_interfaces_.find(type.first);
207  if (it == persistent_storage_interfaces_.end()) {
208  throw std::runtime_error("storage type " + type.first + " not supported");
209  }
210 
211  auto db_id = catalog->getCurrentDB().dbId;
212  const auto it_ok = table_persistent_storage_interface_map_.emplace(
213  std::make_pair(db_id, table_id), it->second.get());
214  // this check fails if we create table, drop it and create again
215  // CHECK(it_ok.second);
216  persistent_storage_interfaces_lock.unlock();
217  it_ok.first->second->registerTable(catalog,
218  it_ok.first->first,
219  type.second,
220  td,
221  cols,
222  lookupBufferManager(db_id, table_id));
223 }
224 
227  managers_map_.clear();
228 }
229 
230 std::unordered_map<std::string, std::unique_ptr<PersistentForeignStorageInterface>>
232 std::map<std::pair<int, int>, PersistentForeignStorageInterface*>
234 std::map<std::pair<int, int>, std::unique_ptr<ForeignStorageBufferMgr>>
PersistentForeignStorageInterface * persistent_foreign_storage_
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
void syncEncoder(const AbstractBuffer *src_buffer)
static Data_Namespace::AbstractBufferMgr * lookupBufferManager(const int db_id, const int table_id)
void append(int8_t *src, const size_t numBytes, const Data_Namespace::MemoryLevel srcBufferType=Data_Namespace::CPU_LEVEL, const int deviceId=-1) override
std::string storageType
virtual int8_t * getMemoryPtr()=0
void fetchBuffer(const ChunkKey &key, Data_Namespace::AbstractBuffer *destBuffer, const size_t numBytes=0) override
static void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
ForeignStorageBufferMgr(const int db_id, const int table_id, PersistentForeignStorageInterface *persistent_foreign_storage)
mapd_shared_mutex chunk_index_mutex_
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers)=0
ForeignStorageBuffer(const ChunkKey &chunk_key, PersistentForeignStorageInterface *persistent_foreign_storage)
An AbstractBuffer is a unit of data management for a data manager.
static std::map< std::pair< int, int >, std::unique_ptr< ForeignStorageBufferMgr > > managers_map_
static void registerTable(Catalog_Namespace::Catalog *catalog, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols)
ids are created
static std::mutex persistent_storage_interfaces_mutex_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_index_
std::vector< int8_t > buff_
std::string to_upper(const std::string &str)
virtual void setMemoryPtr(int8_t *new_ptr)
Data_Namespace::AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
virtual int8_t * tryZeroCopy(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, const size_t num_bytes)
void setSize(const size_t size)
static std::map< std::pair< int, int >, PersistentForeignStorageInterface * > table_persistent_storage_interface_map_
#define CHECK(condition)
Definition: Logger.h:197
static std::unordered_map< std::string, std::unique_ptr< PersistentForeignStorageInterface > > persistent_storage_interfaces_
PersistentForeignStorageInterface * persistent_foreign_storage_
void read(int8_t *const dst, const size_t numBytes, const size_t offset=0, const Data_Namespace::MemoryLevel dstBufferType=Data_Namespace::CPU_LEVEL, const int dstDeviceId=-1) override
virtual void reserve(size_t num_bytes)=0
std::pair< std::string, std::string > parseStorageType(const std::string &type)
virtual void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t num_bytes)=0
int8_t * tryZeroCopy(const size_t numBytes)
static void prepareTable(const int db_id, TableDescriptor &td, std::list< ColumnDescriptor > &cols)
prepare table options and modify columns
Data_Namespace::AbstractBuffer * createBuffer(const ChunkKey &key, const size_t pageSize=0, const size_t initialSize=0) override