OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "DataMgr/AbstractBuffer.h"
29 #include "DataMgr/DataMgr.h"
30 #include "Logger/Logger.h"
31 #include "QueryEngine/AggMode.h"
35 #include "Shared/quantile.h"
37 #include "StringOps/StringOps.h"
38 
39 namespace Catalog_Namespace {
40 class Catalog;
41 }
42 
43 class ResultSet;
44 
49 class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
50  public:
51  RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads = 0)
52  : arena_block_size_(arena_block_size) {
53  for (size_t i = 0; i < num_kernel_threads + 1; i++) {
54  allocators_.emplace_back(std::make_unique<DramArena>(arena_block_size));
55  }
56  CHECK(!allocators_.empty());
57  }
58 
60 
61  int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) override {
62  CHECK_LT(thread_idx, allocators_.size());
63  auto allocator = allocators_[thread_idx].get();
64  std::lock_guard<std::mutex> lock(state_mutex_);
65  return reinterpret_cast<int8_t*>(allocator->allocate(num_bytes));
66  }
67 
68  int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
69  const size_t thread_idx = 0) {
70  int8_t* buffer = allocate(num_bytes, thread_idx);
71  std::memset(buffer, 0, num_bytes);
72  addCountDistinctBuffer(buffer, num_bytes, /*physical_buffer=*/true);
73  return buffer;
74  }
75 
76  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
77  const size_t bytes,
78  const bool physical_buffer) {
79  std::lock_guard<std::mutex> lock(state_mutex_);
80  count_distinct_bitmaps_.emplace_back(
81  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
82  }
83 
84  void addCountDistinctSet(CountDistinctSet* count_distinct_set) {
85  std::lock_guard<std::mutex> lock(state_mutex_);
86  count_distinct_sets_.push_back(count_distinct_set);
87  }
88 
89  void addGroupByBuffer(int64_t* group_by_buffer) {
90  std::lock_guard<std::mutex> lock(state_mutex_);
91  group_by_buffers_.push_back(group_by_buffer);
92  }
93 
94  void addVarlenBuffer(void* varlen_buffer) {
95  std::lock_guard<std::mutex> lock(state_mutex_);
96  if (std::find(varlen_buffers_.begin(), varlen_buffers_.end(), varlen_buffer) ==
97  varlen_buffers_.end()) {
98  varlen_buffers_.push_back(varlen_buffer);
99  }
100  }
101 
108  std::lock_guard<std::mutex> lock(state_mutex_);
110  varlen_input_buffers_.push_back(buffer);
111  }
112 
113  std::string* addString(const std::string& str) {
114  std::lock_guard<std::mutex> lock(state_mutex_);
115  strings_.emplace_back(str);
116  return &strings_.back();
117  }
118 
119  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
120  std::lock_guard<std::mutex> lock(state_mutex_);
121  arrays_.emplace_back(arr);
122  return &arrays_.back();
123  }
124 
125  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
126  const int dict_id,
127  const int64_t generation) {
128  std::lock_guard<std::mutex> lock(state_mutex_);
129  auto it = str_dict_proxy_owned_.find(dict_id);
130  if (it != str_dict_proxy_owned_.end()) {
131  CHECK_EQ(it->second->getDictionary(), str_dict.get());
132  it->second->updateGeneration(generation);
133  return it->second.get();
134  }
136  .emplace(
137  dict_id,
138  std::make_shared<StringDictionaryProxy>(str_dict, dict_id, generation))
139  .first;
140  return it->second.get();
141  }
142 
144  const int32_t source_proxy_id,
145  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
146  std::ostringstream oss;
147  oss << "{source_dict_id: " << source_proxy_id << " StringOps: " << string_op_infos
148  << "}";
149  return oss.str();
150  }
151 
153  const int32_t source_proxy_id,
154  const int32_t dest_proxy_id,
155  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
156  std::ostringstream oss;
157  oss << "{source_dict_id: " << source_proxy_id << ", dest_dict_id: " << dest_proxy_id
158  << " StringOps: " << string_op_infos << "}";
159  return oss.str();
160  }
161 
163  const StringDictionaryProxy* source_proxy,
164  const StringDictionaryProxy* dest_proxy,
165  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
166  std::lock_guard<std::mutex> lock(state_mutex_);
167  const auto map_key =
169  dest_proxy->getDictionary()->getDictId(),
170  string_op_infos);
171  auto it = str_proxy_intersection_translation_maps_owned_.find(map_key);
174  .emplace(map_key,
176  dest_proxy, string_op_infos))
177  .first;
178  }
179  return &it->second;
180  }
181 
183  const StringDictionaryProxy* source_proxy,
184  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
185  const auto map_key = generate_translation_map_key(
186  source_proxy->getDictionary()->getDictId(), string_op_infos);
187  auto it = str_proxy_numeric_translation_maps_owned_.lower_bound(map_key);
188  if (it->first != map_key) {
190  it, map_key, source_proxy->buildNumericTranslationMap(string_op_infos));
191  }
192  return &it->second;
193  }
194 
196  const StringDictionaryProxy* source_proxy,
197  StringDictionaryProxy* dest_proxy,
198  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
199  std::lock_guard<std::mutex> lock(state_mutex_);
200  const auto map_key =
202  dest_proxy->getDictionary()->getDictId(),
203  string_op_infos);
204  auto it = str_proxy_union_translation_maps_owned_.find(map_key);
205  if (it == str_proxy_union_translation_maps_owned_.end()) {
207  .emplace(map_key,
209  dest_proxy, string_op_infos))
210  .first;
211  }
212  return &it->second;
213  }
214 
215  const StringOps_Namespace::StringOps* getStringOps(
216  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
217  std::lock_guard<std::mutex> lock(state_mutex_);
218  const auto map_key = generate_translation_map_key(0, 0, string_op_infos);
219  auto it = string_ops_owned_.find(map_key);
220  if (it == string_ops_owned_.end()) {
221  it = string_ops_owned_
222  .emplace(map_key,
223  std::make_shared<StringOps_Namespace::StringOps>(string_op_infos))
224  .first;
225  }
226  return it->second.get();
227  }
228 
229  StringDictionaryProxy* getStringDictProxy(const int dict_id) const {
230  std::lock_guard<std::mutex> lock(state_mutex_);
231  auto it = str_dict_proxy_owned_.find(dict_id);
232  CHECK(it != str_dict_proxy_owned_.end());
233  return it->second.get();
234  }
235 
237  const int dict_id_in,
238  const bool with_generation,
239  const Catalog_Namespace::Catalog* catalog);
240 
242  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
243  std::lock_guard<std::mutex> lock(state_mutex_);
244  lit_str_dict_proxy_ = lit_str_dict_proxy;
245  }
246 
248  std::lock_guard<std::mutex> lock(state_mutex_);
249  return lit_str_dict_proxy_.get();
250  }
251 
253  const int source_dict_id_in,
254  const int dest_dict_id_in,
255  const bool with_generation,
256  const StringTranslationType translation_map_type,
257  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
258  const Catalog_Namespace::Catalog* catalog);
259 
262  const int source_dict_id_in,
263  const bool with_generation,
264  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
265  const Catalog_Namespace::Catalog* catalog);
266 
267  void addColBuffer(const void* col_buffer) {
268  std::lock_guard<std::mutex> lock(state_mutex_);
269  col_buffers_.push_back(const_cast<void*>(col_buffer));
270  }
271 
273  for (auto count_distinct_set : count_distinct_sets_) {
274  delete count_distinct_set;
275  }
276  for (auto group_by_buffer : group_by_buffers_) {
277  free(group_by_buffer);
278  }
279  for (auto varlen_buffer : varlen_buffers_) {
280  free(varlen_buffer);
281  }
282  for (auto varlen_input_buffer : varlen_input_buffers_) {
283  CHECK(varlen_input_buffer);
284  varlen_input_buffer->unPin();
285  }
286  for (auto col_buffer : col_buffers_) {
287  free(col_buffer);
288  }
289  }
290 
291  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
292  auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_, /*num_kernels=*/1);
293  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
294  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
295  return rtn;
296  }
297 
299  string_dictionary_generations_ = generations;
300  }
301 
304  }
305 
306  quantile::TDigest* nullTDigest(double const q);
307 
308  //
309  // key/value store for table function intercommunication
310  //
311 
312  void setTableFunctionMetadata(const char* key,
313  const uint8_t* raw_data,
314  const size_t num_bytes,
315  const TableFunctionMetadataType value_type) {
316  MetadataValue metadata_value(num_bytes, value_type);
317  std::memcpy(metadata_value.first.data(), raw_data, num_bytes);
318  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
319  table_function_metadata_store_[key] = std::move(metadata_value);
320  }
321 
322  void getTableFunctionMetadata(const char* key,
323  const uint8_t*& raw_data,
324  size_t& num_bytes,
325  TableFunctionMetadataType& value_type) const {
326  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
327  auto const itr = table_function_metadata_store_.find(key);
328  if (itr == table_function_metadata_store_.end()) {
329  throw std::runtime_error("Failed to find Table Function Metadata with key '" +
330  std::string(key) + "'");
331  }
332  raw_data = itr->second.first.data();
333  num_bytes = itr->second.first.size();
334  value_type = itr->second.second;
335  }
336 
338  std::lock_guard<std::mutex> lock(state_mutex_);
339  return &mode_maps_.emplace_back();
340  }
341 
342  private:
344  int8_t* ptr;
345  const size_t size;
346  const bool physical_buffer;
347  };
348 
349  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
350  std::vector<CountDistinctSet*> count_distinct_sets_;
351  std::vector<int64_t*> group_by_buffers_;
352  std::vector<void*> varlen_buffers_;
353  std::list<std::string> strings_;
354  std::list<std::vector<int64_t>> arrays_;
355  std::unordered_map<int, std::shared_ptr<StringDictionaryProxy>> str_dict_proxy_owned_;
356  std::map<std::string, StringDictionaryProxy::IdMap>
358  std::map<std::string, StringDictionaryProxy::IdMap>
360  std::map<std::string, StringDictionaryProxy::TranslationMap<Datum>>
362  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
364  std::vector<void*> col_buffers_;
365  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
366  std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
367  std::map<std::string, std::shared_ptr<StringOps_Namespace::StringOps>>
369  std::list<AggMode> mode_maps_;
370 
371  size_t arena_block_size_; // for cloning
372  std::vector<std::unique_ptr<Arena>> allocators_;
373 
374  mutable std::mutex state_mutex_;
375 
376  using MetadataValue = std::pair<std::vector<uint8_t>, TableFunctionMetadataType>;
377  std::map<std::string, MetadataValue> table_function_metadata_store_;
379 
380  friend class ResultSet;
381  friend class QueryExecutionContext;
382 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:297
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
std::vector< std::unique_ptr< Arena > > allocators_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes, const size_t thread_idx=0)
std::list< std::vector< int64_t > > arrays_
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const int source_dict_id_in, const int dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:615
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::unordered_map< int, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::map< std::string, StringDictionaryProxy::TranslationMap< Datum > > str_proxy_numeric_translation_maps_owned_
std::string generate_translation_map_key(const int32_t source_proxy_id, const int32_t dest_proxy_id, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
const StringDictionaryProxy::TranslationMap< Datum > * addStringProxyNumericTranslationMap(const StringDictionaryProxy *source_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::list< std::string > strings_
Calculate approximate median and general quantiles, based on &quot;Computing Extremely Accurate Quantiles ...
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
int8_t * allocate(const size_t num_bytes, const size_t thread_idx=0) override
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:64
std::vector< CountDistinctSet * > count_distinct_sets_
StringDictionary * getDictionary() const noexcept
std::pair< std::vector< uint8_t >, TableFunctionMetadataType > MetadataValue
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:644
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getLiteralStringDictProxy() const
std::mutex table_function_metadata_store_mutex_
TranslationMap< Datum > buildNumericTranslationMap(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
std::map< std::string, MetadataValue > table_function_metadata_store_
std::list< AggMode > mode_maps_
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_intersection_translation_maps_owned_
void setDictionaryGenerations(StringDictionaryGenerations generations)
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_union_translation_maps_owned_
std::map< std::string, std::shared_ptr< StringOps_Namespace::StringOps > > string_ops_owned_
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
Calculate statistical mode as an aggregate function.
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:540
std::vector< void * > varlen_buffers_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads=0)
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryGenerations string_dictionary_generations_
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
AggMode * allocateMode()
TableFunctionMetadataType
const StringOps_Namespace::StringOps * getStringOps(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
#define CHECK_LT(x, y)
Definition: Logger.h:299
void setTableFunctionMetadata(const char *key, const uint8_t *raw_data, const size_t num_bytes, const TableFunctionMetadataType value_type)
std::string generate_translation_map_key(const int32_t source_proxy_id, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
int32_t getDictId() const noexcept
void addVarlenBuffer(void *varlen_buffer)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
StringDictionaryProxy * getStringDictProxy(const int dict_id) const
Functions used to work with (approximate) count distinct sets.
void addCountDistinctSet(CountDistinctSet *count_distinct_set)
void getTableFunctionMetadata(const char *key, const uint8_t *&raw_data, size_t &num_bytes, TableFunctionMetadataType &value_type) const
void addGroupByBuffer(int64_t *group_by_buffer)
const StringDictionaryProxy::IdMap * addStringProxyUnionTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const int source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:634
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
#define CHECK(condition)
Definition: Logger.h:289
std::string * addString(const std::string &str)
const StringDictionaryProxy::IdMap * addStringProxyIntersectionTranslationMap(const StringDictionaryProxy *source_proxy, const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_
friend class ResultSet