OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ForeignStorageInterface.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "Shared/StringTransform.h"
19 
21  const ChunkKey& chunk_key,
22  PersistentForeignStorageInterface* persistent_foreign_storage)
23  : Data_Namespace::AbstractBuffer(0)
24  , chunk_key_(chunk_key)
25  , persistent_foreign_storage_(persistent_foreign_storage) {}
26 
27 void ForeignStorageBuffer::read(int8_t* const dst,
28  const size_t numBytes,
29  const size_t offset,
30  const Data_Namespace::MemoryLevel dstBufferType,
31  const int dstDeviceId) {
32  CHECK_EQ(size_t(0), offset);
33  CHECK_EQ(-1, dstDeviceId);
35 }
36 
38  const size_t numBytes,
39  const Data_Namespace::MemoryLevel srcBufferType,
40  const int deviceId) {
41  setAppended();
42  buff_.insert(buff_.end(), src, src + numBytes);
43  size_ += numBytes;
44 }
45 
47  const int db_id,
48  const int table_id,
49  PersistentForeignStorageInterface* persistent_foreign_storage)
50  : AbstractBufferMgr(0), persistent_foreign_storage_(persistent_foreign_storage) {}
51 
53  // TODO(alex)
54  std::vector<ForeignStorageColumnBuffer> column_buffers;
55  for (auto& kv : chunk_index_) {
56  const auto buffer = kv.second->moveBuffer();
57  column_buffers.emplace_back(
58  ForeignStorageColumnBuffer{kv.first, kv.second->getSqlType(), buffer});
59  }
60  persistent_foreign_storage_->append(column_buffers);
61 }
62 
64  const ChunkKey& key,
65  const size_t pageSize,
66  const size_t initialSize) {
67  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunk_index_mutex_);
68  const auto it_ok = chunk_index_.emplace(
69  key, std::make_unique<ForeignStorageBuffer>(key, persistent_foreign_storage_));
70  // this check fails if we create table, drop it and create again
71  // CHECK(it_ok.second);
72  return it_ok.first->second.get();
73 }
74 
76  const ChunkKey& key,
77  const size_t numBytes) {
78  mapd_shared_lock<mapd_shared_mutex> chunk_index_write_lock(chunk_index_mutex_);
79  const auto it = chunk_index_.find(key);
80  CHECK(it != chunk_index_.end());
81  return it->second.get();
82 }
83 
86  const size_t numBytes) {
87  CHECK(numBytes);
88  destBuffer->reserve(numBytes);
89  auto file_buffer = getBuffer(key, numBytes);
90  file_buffer->read(destBuffer->getMemoryPtr(), numBytes);
91  destBuffer->setSize(numBytes);
92  destBuffer->syncEncoder(file_buffer);
93 }
94 
96  ChunkMetadataVector& chunkMetadataVec,
97  const ChunkKey& keyPrefix) {
98  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(
99  chunk_index_mutex_); // is this guarding the right structure? it look slike we
100  // oly read here for chunk
101  auto chunk_it = chunk_index_.lower_bound(keyPrefix);
102  if (chunk_it == chunk_index_.end()) {
103  CHECK(false); // throw?
104  }
105 
106  while (chunk_it != chunk_index_.end() &&
107  std::search(chunk_it->first.begin(),
108  chunk_it->first.begin() + keyPrefix.size(),
109  keyPrefix.begin(),
110  keyPrefix.end()) != chunk_it->first.begin() + keyPrefix.size()) {
111  const auto& chunk_key = chunk_it->first;
112  if (chunk_key.size() == 5) {
113  if (chunk_key[4] == 1) {
114  const auto& buffer = *chunk_it->second;
115  auto type = buffer.getSqlType();
116  auto size = buffer.size();
117  auto subkey = chunk_key;
118  subkey[4] = 2;
119  auto& index_buf = *(chunk_index_.find(subkey)->second);
120  auto bs = index_buf.size() / index_buf.getSqlType().get_size();
121  auto chunk_metadata =
122  std::make_shared<ChunkMetadata>(type, size, bs, ChunkStats{});
123  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
124  }
125  } else {
126  const auto& buffer = *chunk_it->second;
127  auto chunk_metadata = std::make_shared<ChunkMetadata>();
128  chunk_metadata->sqlType = buffer.getSqlType();
129  buffer.getEncoder()->getMetadata(chunk_metadata);
130  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
131  }
132  chunk_it++;
133  }
134 }
135 
136 Data_Namespace::AbstractBufferMgr* ForeignStorageInterface::lookupBufferManager(
137  const int db_id,
138  const int table_id) {
139  auto key = std::make_pair(db_id, table_id);
140  if (managers_map_.count(key)) {
141  return managers_map_[key].get();
142  }
143 
144  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
146  const auto it = table_persistent_storage_interface_map_.find(key);
147  if (it == table_persistent_storage_interface_map_.end()) {
148  return nullptr;
149  }
150  const auto it_ok = managers_map_.emplace(
151  key, std::make_unique<ForeignStorageBufferMgr>(db_id, table_id, it->second));
152  CHECK(it_ok.second);
153  return it_ok.first->second.get();
154 }
155 
157  std::unique_ptr<PersistentForeignStorageInterface> persistent_foreign_storage) {
158  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
160  const auto it_ok = persistent_storage_interfaces_.emplace(
161  persistent_foreign_storage->getType(), std::move(persistent_foreign_storage));
162  CHECK(it_ok.second);
163 }
164 
165 std::pair<std::string, std::string> parseStorageType(const std::string& type) {
166  size_t sep = type.find_first_of(':'), sep2 = sep != std::string::npos ? sep + 1 : sep;
167  auto res = std::make_pair(to_upper(type.substr(0, sep)), type.substr(sep2));
168  return res;
169 }
170 
172  TableDescriptor& td,
173  std::list<ColumnDescriptor>& cols) {
174  auto type = parseStorageType(td.storageType);
175  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
177  const auto it = persistent_storage_interfaces_.find(type.first);
178  if (it == persistent_storage_interfaces_.end()) {
179  throw std::runtime_error("storage type " + type.first + " not supported");
180  }
181  auto& p = it->second;
182  persistent_storage_interfaces_lock.unlock();
183  p->prepareTable(db_id, type.second, td, cols);
184 }
185 
187  const TableDescriptor& td,
188  const std::list<ColumnDescriptor>& cols) {
189  const int table_id = td.tableId;
190  auto type = parseStorageType(td.storageType);
191 
192  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
194  const auto it = persistent_storage_interfaces_.find(type.first);
195  if (it == persistent_storage_interfaces_.end()) {
196  throw std::runtime_error("storage type " + type.first + " not supported");
197  }
198 
199  auto db_id = catalog->getCurrentDB().dbId;
200  const auto it_ok = table_persistent_storage_interface_map_.emplace(
201  std::make_pair(db_id, table_id), it->second.get());
202  // this check fails if we create table, drop it and create again
203  // CHECK(it_ok.second);
204  persistent_storage_interfaces_lock.unlock();
205  it_ok.first->second->registerTable(catalog,
206  it_ok.first->first,
207  type.second,
208  td,
209  cols,
210  lookupBufferManager(db_id, table_id));
211 }
212 
215  managers_map_.clear();
216 }
217 
218 std::unordered_map<std::string, std::unique_ptr<PersistentForeignStorageInterface>>
220 std::map<std::pair<int, int>, PersistentForeignStorageInterface*>
222 std::map<std::pair<int, int>, std::unique_ptr<ForeignStorageBufferMgr>>
PersistentForeignStorageInterface * persistent_foreign_storage_
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< int > ChunkKey
Definition: types.h:37
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
void syncEncoder(const AbstractBuffer *src_buffer)
static Data_Namespace::AbstractBufferMgr * lookupBufferManager(const int db_id, const int table_id)
void append(int8_t *src, const size_t numBytes, const Data_Namespace::MemoryLevel srcBufferType=Data_Namespace::CPU_LEVEL, const int deviceId=-1) override
std::string storageType
virtual int8_t * getMemoryPtr()=0
void fetchBuffer(const ChunkKey &key, Data_Namespace::AbstractBuffer *destBuffer, const size_t numBytes=0) override
static void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:210
ForeignStorageBufferMgr(const int db_id, const int table_id, PersistentForeignStorageInterface *persistent_foreign_storage)
mapd_shared_mutex chunk_index_mutex_
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
virtual void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers)=0
ForeignStorageBuffer(const ChunkKey &chunk_key, PersistentForeignStorageInterface *persistent_foreign_storage)
An AbstractBuffer is a unit of data management for a data manager.
static std::map< std::pair< int, int >, std::unique_ptr< ForeignStorageBufferMgr > > managers_map_
static void registerTable(Catalog_Namespace::Catalog *catalog, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols)
ids are created
static std::mutex persistent_storage_interfaces_mutex_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_index_
std::vector< int8_t > buff_
std::string to_upper(const std::string &str)
Data_Namespace::AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
void setSize(const size_t size)
static std::map< std::pair< int, int >, PersistentForeignStorageInterface * > table_persistent_storage_interface_map_
#define CHECK(condition)
Definition: Logger.h:197
static std::unordered_map< std::string, std::unique_ptr< PersistentForeignStorageInterface > > persistent_storage_interfaces_
specifies the content in-memory of a row in the table metadata table
PersistentForeignStorageInterface * persistent_foreign_storage_
void read(int8_t *const dst, const size_t numBytes, const size_t offset=0, const Data_Namespace::MemoryLevel dstBufferType=Data_Namespace::CPU_LEVEL, const int dstDeviceId=-1) override
virtual void reserve(size_t num_bytes)=0
std::pair< std::string, std::string > parseStorageType(const std::string &type)
virtual void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t num_bytes)=0
static void prepareTable(const int db_id, TableDescriptor &td, std::list< ColumnDescriptor > &cols)
prepare table options and modify columns
Data_Namespace::AbstractBuffer * createBuffer(const ChunkKey &key, const size_t pageSize=0, const size_t initialSize=0) override