OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "DataMgr/AbstractBuffer.h"
29 #include "DataMgr/DataMgr.h"
30 #include "Logger/Logger.h"
31 #include "QueryEngine/AggMode.h"
35 #include "Shared/DbObjectKeys.h"
36 #include "Shared/quantile.h"
38 #include "StringOps/StringOps.h"
39 
40 namespace Catalog_Namespace {
41 class Catalog;
42 }
43 
44 class ResultSet;
45 
50 class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
51  public:
52  RowSetMemoryOwner(const size_t arena_block_size,
53  const size_t executor_id,
54  const size_t num_kernel_threads = 0)
55  : non_owned_group_by_buffers_(num_kernel_threads + 1, nullptr)
56  , arena_block_size_(arena_block_size)
57  , executor_id_(executor_id) {
58  VLOG(2) << "Prepare " << num_kernel_threads + 1
59  << " allocators from RowSetMemoryOwner attached to Executor-" << executor_id_;
60  allocators_.reserve(num_kernel_threads + 1);
61  for (size_t i = 0; i < num_kernel_threads + 1; i++) {
62  allocators_.emplace_back(std::make_unique<DramArena>(arena_block_size));
63  }
64  CHECK(!allocators_.empty());
65  }
66 
68 
69  int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) override {
70  CHECK_LT(thread_idx, allocators_.size());
71  auto allocator = allocators_[thread_idx].get();
72  std::lock_guard<std::mutex> lock(state_mutex_);
73  return reinterpret_cast<int8_t*>(allocator->allocate(num_bytes));
74  }
75 
76  std::pair<int64_t*, bool> allocateCachedGroupByBuffer(const size_t num_bytes,
77  const size_t thread_idx) {
78  std::lock_guard<std::mutex> lock(state_mutex_);
79  CHECK_LT(thread_idx, non_owned_group_by_buffers_.size());
80  // First try cache
81  if (non_owned_group_by_buffers_[thread_idx]) { // not nullptr
82  return std::make_pair(non_owned_group_by_buffers_[thread_idx], true);
83  }
84  // Was not in cache so must allocate
85  auto allocator = allocators_[thread_idx].get();
86  int64_t* group_by_buffer = reinterpret_cast<int64_t*>(allocator->allocate(num_bytes));
87  CHECK(group_by_buffer);
88  // Put in cache
89  non_owned_group_by_buffers_[thread_idx] = group_by_buffer;
90  return std::make_pair(group_by_buffer, false);
91  }
92 
93  int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
94  const size_t thread_idx = 0) {
95  int8_t* buffer = allocate(num_bytes, thread_idx);
96  std::memset(buffer, 0, num_bytes);
97  addCountDistinctBuffer(buffer, num_bytes, /*physical_buffer=*/true);
98  return buffer;
99  }
100 
101  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
102  const size_t bytes,
103  const bool physical_buffer) {
104  std::lock_guard<std::mutex> lock(state_mutex_);
105  count_distinct_bitmaps_.emplace_back(
106  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
107  }
108 
109  void addCountDistinctSet(CountDistinctSet* count_distinct_set) {
110  std::lock_guard<std::mutex> lock(state_mutex_);
111  count_distinct_sets_.push_back(count_distinct_set);
112  }
113 
116  }
117 
118  void addVarlenBuffer(void* varlen_buffer) {
119  std::lock_guard<std::mutex> lock(state_mutex_);
120  if (std::find(varlen_buffers_.begin(), varlen_buffers_.end(), varlen_buffer) ==
121  varlen_buffers_.end()) {
122  varlen_buffers_.push_back(varlen_buffer);
123  }
124  }
125 
132  std::lock_guard<std::mutex> lock(state_mutex_);
134  varlen_input_buffers_.push_back(buffer);
135  }
136 
137  std::string* addString(const std::string& str) {
138  std::lock_guard<std::mutex> lock(state_mutex_);
139  strings_.emplace_back(str);
140  return &strings_.back();
141  }
142 
143  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
144  std::lock_guard<std::mutex> lock(state_mutex_);
145  arrays_.emplace_back(arr);
146  return &arrays_.back();
147  }
148 
149  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
150  const shared::StringDictKey& dict_key,
151  const int64_t generation) {
152  std::lock_guard<std::mutex> lock(state_mutex_);
153  auto it = str_dict_proxy_owned_.find(dict_key);
154  if (it != str_dict_proxy_owned_.end()) {
155  CHECK_EQ(it->second->getDictionary(), str_dict.get());
156  it->second->updateGeneration(generation);
157  return it->second.get();
158  }
160  .emplace(
161  dict_key,
162  std::make_shared<StringDictionaryProxy>(str_dict, dict_key, generation))
163  .first;
164  return it->second.get();
165  }
166 
168  const shared::StringDictKey& source_proxy_dict_key,
169  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
170  std::ostringstream oss;
171  oss << "{source_dict_key: " << source_proxy_dict_key
172  << " StringOps: " << string_op_infos << "}";
173  return oss.str();
174  }
175 
177  const shared::StringDictKey& source_proxy_dict_key,
178  const shared::StringDictKey& dest_proxy_dict_key,
179  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
180  std::ostringstream oss;
181  oss << "{source_dict_key: " << source_proxy_dict_key
182  << ", dest_dict_key: " << dest_proxy_dict_key << " StringOps: " << string_op_infos
183  << "}";
184  return oss.str();
185  }
186 
188  const StringDictionaryProxy* source_proxy,
189  const StringDictionaryProxy* dest_proxy,
190  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
191  std::lock_guard<std::mutex> lock(state_mutex_);
192  const auto map_key =
194  dest_proxy->getDictionary()->getDictKey(),
195  string_op_infos);
196  auto it = str_proxy_intersection_translation_maps_owned_.find(map_key);
199  .emplace(map_key,
201  dest_proxy, string_op_infos))
202  .first;
203  }
204  return &it->second;
205  }
206 
208  const StringDictionaryProxy* source_proxy,
209  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
210  const auto map_key = generate_translation_map_key(
211  source_proxy->getDictionary()->getDictKey(), string_op_infos);
212  auto it = str_proxy_numeric_translation_maps_owned_.lower_bound(map_key);
213  if (it->first != map_key) {
215  it, map_key, source_proxy->buildNumericTranslationMap(string_op_infos));
216  }
217  return &it->second;
218  }
219 
221  const StringDictionaryProxy* source_proxy,
222  StringDictionaryProxy* dest_proxy,
223  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
224  std::lock_guard<std::mutex> lock(state_mutex_);
225  const auto map_key =
227  dest_proxy->getDictionary()->getDictKey(),
228  string_op_infos);
229  auto it = str_proxy_union_translation_maps_owned_.find(map_key);
230  if (it == str_proxy_union_translation_maps_owned_.end()) {
232  .emplace(map_key,
234  dest_proxy, string_op_infos))
235  .first;
236  }
237  return &it->second;
238  }
239 
240  const StringOps_Namespace::StringOps* getStringOps(
241  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
242  std::lock_guard<std::mutex> lock(state_mutex_);
243  const auto map_key = generate_translation_map_key({}, {}, string_op_infos);
244  auto it = string_ops_owned_.find(map_key);
245  if (it == string_ops_owned_.end()) {
246  it = string_ops_owned_
247  .emplace(map_key,
248  std::make_shared<StringOps_Namespace::StringOps>(string_op_infos))
249  .first;
250  }
251  return it->second.get();
252  }
253 
255  std::lock_guard<std::mutex> lock(state_mutex_);
256  auto it = str_dict_proxy_owned_.find(dict_key);
257  CHECK(it != str_dict_proxy_owned_.end());
258  return it->second.get();
259  }
260 
262  const bool with_generation);
263 
265  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
266  std::lock_guard<std::mutex> lock(state_mutex_);
267  lit_str_dict_proxy_ = lit_str_dict_proxy;
268  }
269 
271  std::lock_guard<std::mutex> lock(state_mutex_);
272  return lit_str_dict_proxy_.get();
273  }
274 
276  const shared::StringDictKey& source_dict_id_in,
277  const shared::StringDictKey& dest_dict_id_in,
278  const bool with_generation,
279  const StringTranslationType translation_map_type,
280  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos);
281 
284  const shared::StringDictKey& source_dict_id_in,
285  const bool with_generation,
286  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos);
287 
288  void addColBuffer(const void* col_buffer) {
289  std::lock_guard<std::mutex> lock(state_mutex_);
290  col_buffers_.push_back(const_cast<void*>(col_buffer));
291  }
292 
294  std::ostringstream oss;
295  oss << "Destruct RowSetMemoryOwner attached to Executor-" << executor_id_ << "{\t";
296  int allocator_id = 0;
297  for (auto const& allocator : allocators_) {
298  auto const usedBytes = allocator->bytesUsed();
299  if (usedBytes > 0) {
300  oss << "allocator-" << allocator_id << ", byteUsed: " << usedBytes << "/"
301  << allocator->totalBytes() << "\t";
302  }
303  ++allocator_id;
304  }
305  oss << "}";
306  VLOG(2) << oss.str();
307  for (auto count_distinct_set : count_distinct_sets_) {
308  delete count_distinct_set;
309  }
310  for (auto varlen_buffer : varlen_buffers_) {
311  free(varlen_buffer);
312  }
313  for (auto varlen_input_buffer : varlen_input_buffers_) {
314  CHECK(varlen_input_buffer);
315  varlen_input_buffer->unPin();
316  }
317  for (auto col_buffer : col_buffers_) {
318  free(col_buffer);
319  }
320  }
321 
322  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
323  auto rtn = std::make_shared<RowSetMemoryOwner>(
324  arena_block_size_, executor_id_, /*num_kernels=*/1);
325  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
326  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
327  return rtn;
328  }
329 
331  string_dictionary_generations_ = generations;
332  }
333 
336  }
337 
338  quantile::TDigest* nullTDigest(double const q);
339 
340  //
341  // key/value store for table function intercommunication
342  //
343 
344  void setTableFunctionMetadata(const char* key,
345  const uint8_t* raw_data,
346  const size_t num_bytes,
347  const TableFunctionMetadataType value_type) {
348  MetadataValue metadata_value(num_bytes, value_type);
349  std::memcpy(metadata_value.first.data(), raw_data, num_bytes);
350  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
351  table_function_metadata_store_[key] = std::move(metadata_value);
352  }
353 
354  void getTableFunctionMetadata(const char* key,
355  const uint8_t*& raw_data,
356  size_t& num_bytes,
357  TableFunctionMetadataType& value_type) const {
358  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
359  auto const itr = table_function_metadata_store_.find(key);
360  if (itr == table_function_metadata_store_.end()) {
361  throw std::runtime_error("Failed to find Table Function Metadata with key '" +
362  std::string(key) + "'");
363  }
364  raw_data = itr->second.first.data();
365  num_bytes = itr->second.first.size();
366  value_type = itr->second.second;
367  }
368 
370  std::lock_guard<std::mutex> lock(state_mutex_);
371  return &mode_maps_.emplace_back();
372  }
373 
374  private:
376  int8_t* ptr;
377  const size_t size;
378  const bool physical_buffer;
379  };
380 
381  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
382  std::vector<CountDistinctSet*> count_distinct_sets_;
383  std::vector<int64_t*> non_owned_group_by_buffers_;
384  std::vector<void*> varlen_buffers_;
385  std::list<std::string> strings_;
386  std::list<std::vector<int64_t>> arrays_;
387  std::unordered_map<shared::StringDictKey, std::shared_ptr<StringDictionaryProxy>>
389  std::map<std::string, StringDictionaryProxy::IdMap>
391  std::map<std::string, StringDictionaryProxy::IdMap>
393  std::map<std::string, StringDictionaryProxy::TranslationMap<Datum>>
395  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
397  std::vector<void*> col_buffers_;
398  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
399  std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
400  std::map<std::string, std::shared_ptr<StringOps_Namespace::StringOps>>
402  std::list<AggMode> mode_maps_;
403 
404  size_t arena_block_size_; // for cloning
405  std::vector<std::unique_ptr<Arena>> allocators_;
406  size_t executor_id_;
407 
408  mutable std::mutex state_mutex_;
409 
410  using MetadataValue = std::pair<std::vector<uint8_t>, TableFunctionMetadataType>;
411  std::map<std::string, MetadataValue> table_function_metadata_store_;
413 
414  friend class ResultSet;
415  friend class QueryExecutionContext;
416 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:301
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
std::vector< std::unique_ptr< Arena > > allocators_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes, const size_t thread_idx=0)
std::list< std::vector< int64_t > > arrays_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
const shared::StringDictKey & getDictKey() const noexcept
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const shared::StringDictKey &source_dict_id_in, const shared::StringDictKey &dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:648
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
RowSetMemoryOwner(const size_t arena_block_size, const size_t executor_id, const size_t num_kernel_threads=0)
std::map< std::string, StringDictionaryProxy::TranslationMap< Datum > > str_proxy_numeric_translation_maps_owned_
const StringDictionaryProxy::TranslationMap< Datum > * addStringProxyNumericTranslationMap(const StringDictionaryProxy *source_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::list< std::string > strings_
Calculate approximate median and general quantiles, based on &quot;Computing Extremely Accurate Quantiles ...
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
int8_t * allocate(const size_t num_bytes, const size_t thread_idx=0) override
std::vector< CountDistinctSet * > count_distinct_sets_
StringDictionary * getDictionary() const noexcept
std::pair< std::vector< uint8_t >, TableFunctionMetadataType > MetadataValue
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:673
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getStringDictProxy(const shared::StringDictKey &dict_key) const
StringDictionaryProxy * getLiteralStringDictProxy() const
std::mutex table_function_metadata_store_mutex_
std::pair< int64_t *, bool > allocateCachedGroupByBuffer(const size_t num_bytes, const size_t thread_idx)
TranslationMap< Datum > buildNumericTranslationMap(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
std::map< std::string, MetadataValue > table_function_metadata_store_
std::list< AggMode > mode_maps_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const shared::StringDictKey &dict_key, const int64_t generation)
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_intersection_translation_maps_owned_
void setDictionaryGenerations(StringDictionaryGenerations generations)
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const shared::StringDictKey &source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
Definition: Execute.cpp:665
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_union_translation_maps_owned_
std::map< std::string, std::shared_ptr< StringOps_Namespace::StringOps > > string_ops_owned_
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
Calculate statistical mode as an aggregate function.
std::vector< void * > varlen_buffers_
std::string generate_translation_map_key(const shared::StringDictKey &source_proxy_dict_key, const shared::StringDictKey &dest_proxy_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::unordered_map< shared::StringDictKey, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryGenerations string_dictionary_generations_
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
AggMode * allocateMode()
TableFunctionMetadataType
const StringOps_Namespace::StringOps * getStringOps(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
#define CHECK_LT(x, y)
Definition: Logger.h:303
void setTableFunctionMetadata(const char *key, const uint8_t *raw_data, const size_t num_bytes, const TableFunctionMetadataType value_type)
void addVarlenBuffer(void *varlen_buffer)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
Functions used to work with (approximate) count distinct sets.
void addCountDistinctSet(CountDistinctSet *count_distinct_set)
void clearNonOwnedGroupByBuffers()
void getTableFunctionMetadata(const char *key, const uint8_t *&raw_data, size_t &num_bytes, TableFunctionMetadataType &value_type) const
std::vector< int64_t * > non_owned_group_by_buffers_
const StringDictionaryProxy::IdMap * addStringProxyUnionTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
#define CHECK(condition)
Definition: Logger.h:291
std::string * addString(const std::string &str)
const StringDictionaryProxy::IdMap * addStringProxyIntersectionTranslationMap(const StringDictionaryProxy *source_proxy, const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
void addColBuffer(const void *col_buffer)
std::string generate_translation_map_key(const shared::StringDictKey &source_proxy_dict_key, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:64
#define VLOG(n)
Definition: Logger.h:388
friend class ResultSet
StringDictionaryProxy * getOrAddStringDictProxy(const shared::StringDictKey &dict_key, const bool with_generation)
Definition: Execute.cpp:572