OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "DataMgr/AbstractBuffer.h"
29 #include "DataMgr/DataMgr.h"
30 #include "Logger/Logger.h"
31 #include "QueryEngine/AggMode.h"
35 #include "Shared/DbObjectKeys.h"
36 #include "Shared/quantile.h"
38 #include "StringOps/StringOps.h"
39 
40 namespace Catalog_Namespace {
41 class Catalog;
42 }
43 
44 class ResultSet;
45 
50 class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
51  public:
52  RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads = 0)
53  : arena_block_size_(arena_block_size) {
54  for (size_t i = 0; i < num_kernel_threads + 1; i++) {
55  allocators_.emplace_back(std::make_unique<DramArena>(arena_block_size));
56  }
57  CHECK(!allocators_.empty());
58  }
59 
61 
62  int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) override {
63  CHECK_LT(thread_idx, allocators_.size());
64  auto allocator = allocators_[thread_idx].get();
65  std::lock_guard<std::mutex> lock(state_mutex_);
66  return reinterpret_cast<int8_t*>(allocator->allocate(num_bytes));
67  }
68 
69  int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
70  const size_t thread_idx = 0) {
71  int8_t* buffer = allocate(num_bytes, thread_idx);
72  std::memset(buffer, 0, num_bytes);
73  addCountDistinctBuffer(buffer, num_bytes, /*physical_buffer=*/true);
74  return buffer;
75  }
76 
77  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
78  const size_t bytes,
79  const bool physical_buffer) {
80  std::lock_guard<std::mutex> lock(state_mutex_);
81  count_distinct_bitmaps_.emplace_back(
82  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
83  }
84 
85  void addCountDistinctSet(CountDistinctSet* count_distinct_set) {
86  std::lock_guard<std::mutex> lock(state_mutex_);
87  count_distinct_sets_.push_back(count_distinct_set);
88  }
89 
90  void addGroupByBuffer(int64_t* group_by_buffer) {
91  std::lock_guard<std::mutex> lock(state_mutex_);
92  group_by_buffers_.push_back(group_by_buffer);
93  }
94 
95  void addVarlenBuffer(void* varlen_buffer) {
96  std::lock_guard<std::mutex> lock(state_mutex_);
97  if (std::find(varlen_buffers_.begin(), varlen_buffers_.end(), varlen_buffer) ==
98  varlen_buffers_.end()) {
99  varlen_buffers_.push_back(varlen_buffer);
100  }
101  }
102 
109  std::lock_guard<std::mutex> lock(state_mutex_);
111  varlen_input_buffers_.push_back(buffer);
112  }
113 
114  std::string* addString(const std::string& str) {
115  std::lock_guard<std::mutex> lock(state_mutex_);
116  strings_.emplace_back(str);
117  return &strings_.back();
118  }
119 
120  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
121  std::lock_guard<std::mutex> lock(state_mutex_);
122  arrays_.emplace_back(arr);
123  return &arrays_.back();
124  }
125 
126  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
127  const shared::StringDictKey& dict_key,
128  const int64_t generation) {
129  std::lock_guard<std::mutex> lock(state_mutex_);
130  auto it = str_dict_proxy_owned_.find(dict_key);
131  if (it != str_dict_proxy_owned_.end()) {
132  CHECK_EQ(it->second->getDictionary(), str_dict.get());
133  it->second->updateGeneration(generation);
134  return it->second.get();
135  }
137  .emplace(
138  dict_key,
139  std::make_shared<StringDictionaryProxy>(str_dict, dict_key, generation))
140  .first;
141  return it->second.get();
142  }
143 
145  const shared::StringDictKey& source_proxy_dict_key,
146  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
147  std::ostringstream oss;
148  oss << "{source_dict_key: " << source_proxy_dict_key
149  << " StringOps: " << string_op_infos << "}";
150  return oss.str();
151  }
152 
154  const shared::StringDictKey& source_proxy_dict_key,
155  const shared::StringDictKey& dest_proxy_dict_key,
156  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
157  std::ostringstream oss;
158  oss << "{source_dict_key: " << source_proxy_dict_key
159  << ", dest_dict_key: " << dest_proxy_dict_key << " StringOps: " << string_op_infos
160  << "}";
161  return oss.str();
162  }
163 
165  const StringDictionaryProxy* source_proxy,
166  const StringDictionaryProxy* dest_proxy,
167  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
168  std::lock_guard<std::mutex> lock(state_mutex_);
169  const auto map_key =
171  dest_proxy->getDictionary()->getDictKey(),
172  string_op_infos);
173  auto it = str_proxy_intersection_translation_maps_owned_.find(map_key);
176  .emplace(map_key,
178  dest_proxy, string_op_infos))
179  .first;
180  }
181  return &it->second;
182  }
183 
185  const StringDictionaryProxy* source_proxy,
186  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
187  const auto map_key = generate_translation_map_key(
188  source_proxy->getDictionary()->getDictKey(), string_op_infos);
189  auto it = str_proxy_numeric_translation_maps_owned_.lower_bound(map_key);
190  if (it->first != map_key) {
192  it, map_key, source_proxy->buildNumericTranslationMap(string_op_infos));
193  }
194  return &it->second;
195  }
196 
198  const StringDictionaryProxy* source_proxy,
199  StringDictionaryProxy* dest_proxy,
200  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
201  std::lock_guard<std::mutex> lock(state_mutex_);
202  const auto map_key =
204  dest_proxy->getDictionary()->getDictKey(),
205  string_op_infos);
206  auto it = str_proxy_union_translation_maps_owned_.find(map_key);
207  if (it == str_proxy_union_translation_maps_owned_.end()) {
209  .emplace(map_key,
211  dest_proxy, string_op_infos))
212  .first;
213  }
214  return &it->second;
215  }
216 
217  const StringOps_Namespace::StringOps* getStringOps(
218  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
219  std::lock_guard<std::mutex> lock(state_mutex_);
220  const auto map_key = generate_translation_map_key({}, {}, string_op_infos);
221  auto it = string_ops_owned_.find(map_key);
222  if (it == string_ops_owned_.end()) {
223  it = string_ops_owned_
224  .emplace(map_key,
225  std::make_shared<StringOps_Namespace::StringOps>(string_op_infos))
226  .first;
227  }
228  return it->second.get();
229  }
230 
232  std::lock_guard<std::mutex> lock(state_mutex_);
233  auto it = str_dict_proxy_owned_.find(dict_key);
234  CHECK(it != str_dict_proxy_owned_.end());
235  return it->second.get();
236  }
237 
239  const bool with_generation);
240 
242  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
243  std::lock_guard<std::mutex> lock(state_mutex_);
244  lit_str_dict_proxy_ = lit_str_dict_proxy;
245  }
246 
248  std::lock_guard<std::mutex> lock(state_mutex_);
249  return lit_str_dict_proxy_.get();
250  }
251 
253  const shared::StringDictKey& source_dict_id_in,
254  const shared::StringDictKey& dest_dict_id_in,
255  const bool with_generation,
256  const StringTranslationType translation_map_type,
257  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos);
258 
261  const shared::StringDictKey& source_dict_id_in,
262  const bool with_generation,
263  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos);
264 
265  void addColBuffer(const void* col_buffer) {
266  std::lock_guard<std::mutex> lock(state_mutex_);
267  col_buffers_.push_back(const_cast<void*>(col_buffer));
268  }
269 
271  for (auto count_distinct_set : count_distinct_sets_) {
272  delete count_distinct_set;
273  }
274  for (auto group_by_buffer : group_by_buffers_) {
275  free(group_by_buffer);
276  }
277  for (auto varlen_buffer : varlen_buffers_) {
278  free(varlen_buffer);
279  }
280  for (auto varlen_input_buffer : varlen_input_buffers_) {
281  CHECK(varlen_input_buffer);
282  varlen_input_buffer->unPin();
283  }
284  for (auto col_buffer : col_buffers_) {
285  free(col_buffer);
286  }
287  }
288 
289  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
290  auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_, /*num_kernels=*/1);
291  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
292  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
293  return rtn;
294  }
295 
297  string_dictionary_generations_ = generations;
298  }
299 
302  }
303 
304  quantile::TDigest* nullTDigest(double const q);
305 
306  //
307  // key/value store for table function intercommunication
308  //
309 
310  void setTableFunctionMetadata(const char* key,
311  const uint8_t* raw_data,
312  const size_t num_bytes,
313  const TableFunctionMetadataType value_type) {
314  MetadataValue metadata_value(num_bytes, value_type);
315  std::memcpy(metadata_value.first.data(), raw_data, num_bytes);
316  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
317  table_function_metadata_store_[key] = std::move(metadata_value);
318  }
319 
320  void getTableFunctionMetadata(const char* key,
321  const uint8_t*& raw_data,
322  size_t& num_bytes,
323  TableFunctionMetadataType& value_type) const {
324  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
325  auto const itr = table_function_metadata_store_.find(key);
326  if (itr == table_function_metadata_store_.end()) {
327  throw std::runtime_error("Failed to find Table Function Metadata with key '" +
328  std::string(key) + "'");
329  }
330  raw_data = itr->second.first.data();
331  num_bytes = itr->second.first.size();
332  value_type = itr->second.second;
333  }
334 
336  std::lock_guard<std::mutex> lock(state_mutex_);
337  return &mode_maps_.emplace_back();
338  }
339 
340  private:
342  int8_t* ptr;
343  const size_t size;
344  const bool physical_buffer;
345  };
346 
347  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
348  std::vector<CountDistinctSet*> count_distinct_sets_;
349  std::vector<int64_t*> group_by_buffers_;
350  std::vector<void*> varlen_buffers_;
351  std::list<std::string> strings_;
352  std::list<std::vector<int64_t>> arrays_;
353  std::unordered_map<shared::StringDictKey, std::shared_ptr<StringDictionaryProxy>>
355  std::map<std::string, StringDictionaryProxy::IdMap>
357  std::map<std::string, StringDictionaryProxy::IdMap>
359  std::map<std::string, StringDictionaryProxy::TranslationMap<Datum>>
361  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
363  std::vector<void*> col_buffers_;
364  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
365  std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
366  std::map<std::string, std::shared_ptr<StringOps_Namespace::StringOps>>
368  std::list<AggMode> mode_maps_;
369 
370  size_t arena_block_size_; // for cloning
371  std::vector<std::unique_ptr<Arena>> allocators_;
372 
373  mutable std::mutex state_mutex_;
374 
375  using MetadataValue = std::pair<std::vector<uint8_t>, TableFunctionMetadataType>;
376  std::map<std::string, MetadataValue> table_function_metadata_store_;
378 
379  friend class ResultSet;
380  friend class QueryExecutionContext;
381 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:301
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
std::vector< std::unique_ptr< Arena > > allocators_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes, const size_t thread_idx=0)
std::list< std::vector< int64_t > > arrays_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
const shared::StringDictKey & getDictKey() const noexcept
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const shared::StringDictKey &source_dict_id_in, const shared::StringDictKey &dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:613
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::map< std::string, StringDictionaryProxy::TranslationMap< Datum > > str_proxy_numeric_translation_maps_owned_
const StringDictionaryProxy::TranslationMap< Datum > * addStringProxyNumericTranslationMap(const StringDictionaryProxy *source_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::list< std::string > strings_
Calculate approximate median and general quantiles, based on &quot;Computing Extremely Accurate Quantiles ...
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
int8_t * allocate(const size_t num_bytes, const size_t thread_idx=0) override
std::vector< CountDistinctSet * > count_distinct_sets_
StringDictionary * getDictionary() const noexcept
std::pair< std::vector< uint8_t >, TableFunctionMetadataType > MetadataValue
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:638
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getStringDictProxy(const shared::StringDictKey &dict_key) const
StringDictionaryProxy * getLiteralStringDictProxy() const
std::mutex table_function_metadata_store_mutex_
TranslationMap< Datum > buildNumericTranslationMap(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
std::map< std::string, MetadataValue > table_function_metadata_store_
std::list< AggMode > mode_maps_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const shared::StringDictKey &dict_key, const int64_t generation)
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_intersection_translation_maps_owned_
void setDictionaryGenerations(StringDictionaryGenerations generations)
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:630
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_union_translation_maps_owned_
std::map< std::string, std::shared_ptr< StringOps_Namespace::StringOps > > string_ops_owned_
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
Calculate statistical mode as an aggregate function.
std::vector< void * > varlen_buffers_
std::string generate_translation_map_key(const shared::StringDictKey &source_proxy_dict_key, const shared::StringDictKey &dest_proxy_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::unordered_map< shared::StringDictKey, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads=0)
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryGenerations string_dictionary_generations_
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
AggMode * allocateMode()
TableFunctionMetadataType
const StringOps_Namespace::StringOps * getStringOps(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
#define CHECK_LT(x, y)
Definition: Logger.h:303
void setTableFunctionMetadata(const char *key, const uint8_t *raw_data, const size_t num_bytes, const TableFunctionMetadataType value_type)
void addVarlenBuffer(void *varlen_buffer)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
Functions used to work with (approximate) count distinct sets.
void addCountDistinctSet(CountDistinctSet *count_distinct_set)
void getTableFunctionMetadata(const char *key, const uint8_t *&raw_data, size_t &num_bytes, TableFunctionMetadataType &value_type) const
void addGroupByBuffer(int64_t *group_by_buffer)
const StringDictionaryProxy::IdMap * addStringProxyUnionTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
#define CHECK(condition)
Definition: Logger.h:291
std::string * addString(const std::string &str)
const StringDictionaryProxy::IdMap * addStringProxyIntersectionTranslationMap(const StringDictionaryProxy *source_proxy, const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_
std::string generate_translation_map_key(const shared::StringDictKey &source_proxy_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:64
friend class ResultSet
StringDictionaryProxy * getOrAddStringDictProxy(const shared::StringDictKey &dict_key, const bool with_generation)
Definition: Execute.cpp:537