OmniSciDB  8a228a1076
ForeignStorageInterface.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include "Shared/StringTransform.h"
19 
21  const ChunkKey& chunk_key,
22  PersistentForeignStorageInterface* persistent_foreign_storage)
24  , chunk_key_(chunk_key)
25  , persistent_foreign_storage_(persistent_foreign_storage) {}
26 
27 void ForeignStorageBuffer::read(int8_t* const dst,
28  const size_t numBytes,
29  const size_t offset,
30  const Data_Namespace::MemoryLevel dstBufferType,
31  const int dstDeviceId) {
32  CHECK_EQ(size_t(0), offset);
33  CHECK_EQ(-1, dstDeviceId);
35 }
36 
38  const size_t numBytes,
39  const Data_Namespace::MemoryLevel srcBufferType,
40  const int deviceId) {
41  is_dirty_ = true;
42  is_appended_ = true;
43  buff_.insert(buff_.end(), src, src + numBytes);
44  size_ += numBytes;
45 }
46 
48  const int db_id,
49  const int table_id,
50  PersistentForeignStorageInterface* persistent_foreign_storage)
51  : AbstractBufferMgr(0), persistent_foreign_storage_(persistent_foreign_storage) {}
52 
54  // TODO(alex)
55  std::vector<ForeignStorageColumnBuffer> column_buffers;
56  for (auto& kv : chunk_index_) {
57  const auto buffer = kv.second->moveBuffer();
58  column_buffers.emplace_back(
59  ForeignStorageColumnBuffer{kv.first, kv.second->sql_type, buffer});
60  }
61  persistent_foreign_storage_->append(column_buffers);
62 }
63 
65  const ChunkKey& key,
66  const size_t pageSize,
67  const size_t initialSize) {
68  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(chunk_index_mutex_);
69  const auto it_ok = chunk_index_.emplace(
70  key, std::make_unique<ForeignStorageBuffer>(key, persistent_foreign_storage_));
71  // this check fails if we create table, drop it and create again
72  // CHECK(it_ok.second);
73  return it_ok.first->second.get();
74 }
75 
77  const ChunkKey& key,
78  const size_t numBytes) {
79  mapd_shared_lock<mapd_shared_mutex> chunk_index_write_lock(chunk_index_mutex_);
80  const auto it = chunk_index_.find(key);
81  CHECK(it != chunk_index_.end());
82  return it->second.get();
83 }
84 
87  const size_t numBytes) {
88  CHECK(numBytes);
89  destBuffer->reserve(numBytes);
90  auto file_buffer = getBuffer(key, numBytes);
91  file_buffer->read(destBuffer->getMemoryPtr(), numBytes);
92  destBuffer->setSize(numBytes);
93  destBuffer->syncEncoder(file_buffer);
94 }
95 
97  ChunkMetadataVector& chunkMetadataVec,
98  const ChunkKey& keyPrefix) {
99  mapd_unique_lock<mapd_shared_mutex> chunk_index_write_lock(
100  chunk_index_mutex_); // is this guarding the right structure? it look slike we
101  // oly read here for chunk
102  auto chunk_it = chunk_index_.lower_bound(keyPrefix);
103  if (chunk_it == chunk_index_.end()) {
104  CHECK(false); // throw?
105  }
106 
107  while (chunk_it != chunk_index_.end() &&
108  std::search(chunk_it->first.begin(),
109  chunk_it->first.begin() + keyPrefix.size(),
110  keyPrefix.begin(),
111  keyPrefix.end()) != chunk_it->first.begin() + keyPrefix.size()) {
112  const auto& chunk_key = chunk_it->first;
113  if (chunk_key.size() == 5) {
114  if (chunk_key[4] == 1) {
115  const auto& buffer = *chunk_it->second;
116  auto type = buffer.sql_type;
117  auto size = buffer.size();
118  auto subkey = chunk_key;
119  subkey[4] = 2;
120  auto& index_buf = *(chunk_index_.find(subkey)->second);
121  auto bs = index_buf.size() / index_buf.sql_type.get_size();
122  auto chunk_metadata =
123  std::make_shared<ChunkMetadata>(type, size, bs, ChunkStats{});
124  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
125  }
126  } else {
127  const auto& buffer = *chunk_it->second;
128  auto chunk_metadata = std::make_shared<ChunkMetadata>();
129  chunk_metadata->sqlType = buffer.sql_type;
130  buffer.encoder->getMetadata(chunk_metadata);
131  chunkMetadataVec.emplace_back(chunk_key, chunk_metadata);
132  }
133  chunk_it++;
134  }
135 }
136 
137 Data_Namespace::AbstractBufferMgr* ForeignStorageInterface::lookupBufferManager(
138  const int db_id,
139  const int table_id) {
140  auto key = std::make_pair(db_id, table_id);
141  if (managers_map_.count(key)) {
142  return managers_map_[key].get();
143  }
144 
145  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
146  persistent_storage_interfaces_mutex_);
147  const auto it = table_persistent_storage_interface_map_.find(key);
148  if (it == table_persistent_storage_interface_map_.end()) {
149  return nullptr;
150  }
151  const auto it_ok = managers_map_.emplace(
152  key, std::make_unique<ForeignStorageBufferMgr>(db_id, table_id, it->second));
153  CHECK(it_ok.second);
154  return it_ok.first->second.get();
155 }
156 
158  std::unique_ptr<PersistentForeignStorageInterface> persistent_foreign_storage) {
159  std::lock_guard<std::mutex> persistent_storage_interfaces_lock(
160  persistent_storage_interfaces_mutex_);
161  const auto it_ok = persistent_storage_interfaces_.emplace(
162  persistent_foreign_storage->getType(), std::move(persistent_foreign_storage));
163  CHECK(it_ok.second);
164 }
165 
166 std::pair<std::string, std::string> parseStorageType(const std::string& type) {
167  size_t sep = type.find_first_of(':'), sep2 = sep != std::string::npos ? sep + 1 : sep;
168  auto res = std::make_pair(to_upper(type.substr(0, sep)), type.substr(sep2));
169  return res;
170 }
171 
173  TableDescriptor& td,
174  std::list<ColumnDescriptor>& cols) {
175  auto type = parseStorageType(td.storageType);
176  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
177  persistent_storage_interfaces_mutex_);
178  const auto it = persistent_storage_interfaces_.find(type.first);
179  if (it == persistent_storage_interfaces_.end()) {
180  throw std::runtime_error("storage type " + type.first + " not supported");
181  }
182  auto& p = it->second;
183  persistent_storage_interfaces_lock.unlock();
184  p->prepareTable(db_id, type.second, td, cols);
185 }
186 
188  const TableDescriptor& td,
189  const std::list<ColumnDescriptor>& cols) {
190  const int table_id = td.tableId;
191  auto type = parseStorageType(td.storageType);
192 
193  std::unique_lock<std::mutex> persistent_storage_interfaces_lock(
194  persistent_storage_interfaces_mutex_);
195  const auto it = persistent_storage_interfaces_.find(type.first);
196  if (it == persistent_storage_interfaces_.end()) {
197  throw std::runtime_error("storage type " + type.first + " not supported");
198  }
199 
200  auto db_id = catalog->getCurrentDB().dbId;
201  const auto it_ok = table_persistent_storage_interface_map_.emplace(
202  std::make_pair(db_id, table_id), it->second.get());
203  // this check fails if we create table, drop it and create again
204  // CHECK(it_ok.second);
205  persistent_storage_interfaces_lock.unlock();
206  it_ok.first->second->registerTable(catalog,
207  it_ok.first->first,
208  type.second,
209  td,
210  cols,
211  lookupBufferManager(db_id, table_id));
212 }
213 
215  persistent_storage_interfaces_.clear();
216  managers_map_.clear();
217 }
218 
219 std::unordered_map<std::string, std::unique_ptr<PersistentForeignStorageInterface>>
221 std::map<std::pair<int, int>, PersistentForeignStorageInterface*>
223 std::map<std::pair<int, int>, std::unique_ptr<ForeignStorageBufferMgr>>
PersistentForeignStorageInterface * persistent_foreign_storage_
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunkMetadataVec, const ChunkKey &keyPrefix) override
#define CHECK_EQ(x, y)
Definition: Logger.h:205
int64_t * src
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:86
void syncEncoder(const AbstractBuffer *src_buffer)
static Data_Namespace::AbstractBufferMgr * lookupBufferManager(const int db_id, const int table_id)
void append(int8_t *src, const size_t numBytes, const Data_Namespace::MemoryLevel srcBufferType=Data_Namespace::CPU_LEVEL, const int deviceId=-1) override
std::string storageType
virtual int8_t * getMemoryPtr()=0
void fetchBuffer(const ChunkKey &key, Data_Namespace::AbstractBuffer *destBuffer, const size_t numBytes=0) override
static void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
ForeignStorageBufferMgr(const int db_id, const int table_id, PersistentForeignStorageInterface *persistent_foreign_storage)
mapd_shared_mutex chunk_index_mutex_
virtual void setSize(const size_t size)
virtual void append(const std::vector< ForeignStorageColumnBuffer > &column_buffers)=0
ForeignStorageBuffer(const ChunkKey &chunk_key, PersistentForeignStorageInterface *persistent_foreign_storage)
An AbstractBuffer is a unit of data management for a data manager.
static std::map< std::pair< int, int >, std::unique_ptr< ForeignStorageBufferMgr > > managers_map_
static void registerTable(Catalog_Namespace::Catalog *catalog, const TableDescriptor &td, const std::list< ColumnDescriptor > &cols)
ids are created
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:195
static std::mutex persistent_storage_interfaces_mutex_
std::map< ChunkKey, std::unique_ptr< ForeignStorageBuffer > > chunk_index_
std::vector< int8_t > buff_
std::string to_upper(const std::string &str)
Data_Namespace::AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
static std::map< std::pair< int, int >, PersistentForeignStorageInterface * > table_persistent_storage_interface_map_
#define CHECK(condition)
Definition: Logger.h:197
std::vector< int > ChunkKey
Definition: types.h:35
static std::unordered_map< std::string, std::unique_ptr< PersistentForeignStorageInterface > > persistent_storage_interfaces_
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata > >> ChunkMetadataVector
specifies the content in-memory of a row in the table metadata table
PersistentForeignStorageInterface * persistent_foreign_storage_
void read(int8_t *const dst, const size_t numBytes, const size_t offset=0, const Data_Namespace::MemoryLevel dstBufferType=Data_Namespace::CPU_LEVEL, const int dstDeviceId=-1) override
virtual void reserve(size_t num_bytes)=0
std::pair< std::string, std::string > parseStorageType(const std::string &type)
virtual void read(const ChunkKey &chunk_key, const SQLTypeInfo &sql_type, int8_t *dest, const size_t num_bytes)=0
static void prepareTable(const int db_id, TableDescriptor &td, std::list< ColumnDescriptor > &cols)
prepare table options and modify columns
Data_Namespace::AbstractBuffer * createBuffer(const ChunkKey &key, const size_t pageSize=0, const size_t initialSize=0) override