OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "DataMgr/AbstractBuffer.h"
29 #include "DataMgr/DataMgr.h"
30 #include "Logger/Logger.h"
34 #include "Shared/quantile.h"
36 #include "StringOps/StringOps.h"
37 
38 namespace Catalog_Namespace {
39 class Catalog;
40 }
41 
42 class ResultSet;
43 
48 class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
49  public:
50  RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads = 0)
51  : arena_block_size_(arena_block_size) {
52  for (size_t i = 0; i < num_kernel_threads + 1; i++) {
53  allocators_.emplace_back(std::make_unique<DramArena>(arena_block_size));
54  }
55  CHECK(!allocators_.empty());
56  }
57 
59 
60  int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) override {
61  CHECK_LT(thread_idx, allocators_.size());
62  auto allocator = allocators_[thread_idx].get();
63  std::lock_guard<std::mutex> lock(state_mutex_);
64  return reinterpret_cast<int8_t*>(allocator->allocate(num_bytes));
65  }
66 
67  int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
68  const size_t thread_idx = 0) {
69  int8_t* buffer = allocate(num_bytes, thread_idx);
70  std::memset(buffer, 0, num_bytes);
71  addCountDistinctBuffer(buffer, num_bytes, /*physical_buffer=*/true);
72  return buffer;
73  }
74 
75  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
76  const size_t bytes,
77  const bool physical_buffer) {
78  std::lock_guard<std::mutex> lock(state_mutex_);
79  count_distinct_bitmaps_.emplace_back(
80  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
81  }
82 
83  void addCountDistinctSet(CountDistinctSet* count_distinct_set) {
84  std::lock_guard<std::mutex> lock(state_mutex_);
85  count_distinct_sets_.push_back(count_distinct_set);
86  }
87 
88  void addGroupByBuffer(int64_t* group_by_buffer) {
89  std::lock_guard<std::mutex> lock(state_mutex_);
90  group_by_buffers_.push_back(group_by_buffer);
91  }
92 
93  void addVarlenBuffer(void* varlen_buffer) {
94  std::lock_guard<std::mutex> lock(state_mutex_);
95  if (std::find(varlen_buffers_.begin(), varlen_buffers_.end(), varlen_buffer) ==
96  varlen_buffers_.end()) {
97  varlen_buffers_.push_back(varlen_buffer);
98  }
99  }
100 
107  std::lock_guard<std::mutex> lock(state_mutex_);
109  varlen_input_buffers_.push_back(buffer);
110  }
111 
112  std::string* addString(const std::string& str) {
113  std::lock_guard<std::mutex> lock(state_mutex_);
114  strings_.emplace_back(str);
115  return &strings_.back();
116  }
117 
118  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
119  std::lock_guard<std::mutex> lock(state_mutex_);
120  arrays_.emplace_back(arr);
121  return &arrays_.back();
122  }
123 
124  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
125  const int dict_id,
126  const int64_t generation) {
127  std::lock_guard<std::mutex> lock(state_mutex_);
128  auto it = str_dict_proxy_owned_.find(dict_id);
129  if (it != str_dict_proxy_owned_.end()) {
130  CHECK_EQ(it->second->getDictionary(), str_dict.get());
131  it->second->updateGeneration(generation);
132  return it->second.get();
133  }
135  .emplace(
136  dict_id,
137  std::make_shared<StringDictionaryProxy>(str_dict, dict_id, generation))
138  .first;
139  return it->second.get();
140  }
141 
143  const int32_t source_proxy_id,
144  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
145  std::ostringstream oss;
146  oss << "{source_dict_id: " << source_proxy_id << " StringOps: " << string_op_infos
147  << "}";
148  return oss.str();
149  }
150 
152  const int32_t source_proxy_id,
153  const int32_t dest_proxy_id,
154  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
155  std::ostringstream oss;
156  oss << "{source_dict_id: " << source_proxy_id << ", dest_dict_id: " << dest_proxy_id
157  << " StringOps: " << string_op_infos << "}";
158  return oss.str();
159  }
160 
162  const StringDictionaryProxy* source_proxy,
163  const StringDictionaryProxy* dest_proxy,
164  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
165  std::lock_guard<std::mutex> lock(state_mutex_);
166  const auto map_key =
168  dest_proxy->getDictionary()->getDictId(),
169  string_op_infos);
170  auto it = str_proxy_intersection_translation_maps_owned_.find(map_key);
173  .emplace(map_key,
175  dest_proxy, string_op_infos))
176  .first;
177  }
178  return &it->second;
179  }
180 
182  const StringDictionaryProxy* source_proxy,
183  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
184  const auto map_key = generate_translation_map_key(
185  source_proxy->getDictionary()->getDictId(), string_op_infos);
186  auto it = str_proxy_numeric_translation_maps_owned_.lower_bound(map_key);
187  if (it->first != map_key) {
189  it, map_key, source_proxy->buildNumericTranslationMap(string_op_infos));
190  }
191  return &it->second;
192  }
193 
195  const StringDictionaryProxy* source_proxy,
196  StringDictionaryProxy* dest_proxy,
197  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
198  std::lock_guard<std::mutex> lock(state_mutex_);
199  const auto map_key =
201  dest_proxy->getDictionary()->getDictId(),
202  string_op_infos);
203  auto it = str_proxy_union_translation_maps_owned_.find(map_key);
204  if (it == str_proxy_union_translation_maps_owned_.end()) {
206  .emplace(map_key,
208  dest_proxy, string_op_infos))
209  .first;
210  }
211  return &it->second;
212  }
213 
214  const StringOps_Namespace::StringOps* getStringOps(
215  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
216  std::lock_guard<std::mutex> lock(state_mutex_);
217  const auto map_key = generate_translation_map_key(0, 0, string_op_infos);
218  auto it = string_ops_owned_.find(map_key);
219  if (it == string_ops_owned_.end()) {
220  it = string_ops_owned_
221  .emplace(map_key,
222  std::make_shared<StringOps_Namespace::StringOps>(string_op_infos))
223  .first;
224  }
225  return it->second.get();
226  }
227 
228  StringDictionaryProxy* getStringDictProxy(const int dict_id) const {
229  std::lock_guard<std::mutex> lock(state_mutex_);
230  auto it = str_dict_proxy_owned_.find(dict_id);
231  CHECK(it != str_dict_proxy_owned_.end());
232  return it->second.get();
233  }
234 
236  const int dict_id_in,
237  const bool with_generation,
238  const Catalog_Namespace::Catalog* catalog);
239 
241  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
242  std::lock_guard<std::mutex> lock(state_mutex_);
243  lit_str_dict_proxy_ = lit_str_dict_proxy;
244  }
245 
247  std::lock_guard<std::mutex> lock(state_mutex_);
248  return lit_str_dict_proxy_.get();
249  }
250 
252  const int source_dict_id_in,
253  const int dest_dict_id_in,
254  const bool with_generation,
255  const StringTranslationType translation_map_type,
256  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
257  const Catalog_Namespace::Catalog* catalog);
258 
261  const int source_dict_id_in,
262  const bool with_generation,
263  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
264  const Catalog_Namespace::Catalog* catalog);
265 
266  void addColBuffer(const void* col_buffer) {
267  std::lock_guard<std::mutex> lock(state_mutex_);
268  col_buffers_.push_back(const_cast<void*>(col_buffer));
269  }
270 
272  for (auto count_distinct_set : count_distinct_sets_) {
273  delete count_distinct_set;
274  }
275  for (auto group_by_buffer : group_by_buffers_) {
276  free(group_by_buffer);
277  }
278  for (auto varlen_buffer : varlen_buffers_) {
279  free(varlen_buffer);
280  }
281  for (auto varlen_input_buffer : varlen_input_buffers_) {
282  CHECK(varlen_input_buffer);
283  varlen_input_buffer->unPin();
284  }
285  for (auto col_buffer : col_buffers_) {
286  free(col_buffer);
287  }
288  }
289 
290  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
291  auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_, /*num_kernels=*/1);
292  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
293  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
294  return rtn;
295  }
296 
298  string_dictionary_generations_ = generations;
299  }
300 
303  }
304 
305  quantile::TDigest* nullTDigest(double const q);
306 
307  //
308  // key/value store for table function intercommunication
309  //
310 
311  void setTableFunctionMetadata(const char* key,
312  const uint8_t* raw_data,
313  const size_t num_bytes,
314  const TableFunctionMetadataType value_type) {
315  MetadataValue metadata_value(num_bytes, value_type);
316  std::memcpy(metadata_value.first.data(), raw_data, num_bytes);
317  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
318  table_function_metadata_store_[key] = std::move(metadata_value);
319  }
320 
321  void getTableFunctionMetadata(const char* key,
322  const uint8_t*& raw_data,
323  size_t& num_bytes,
324  TableFunctionMetadataType& value_type) const {
325  std::lock_guard<std::mutex> lock(table_function_metadata_store_mutex_);
326  auto const itr = table_function_metadata_store_.find(key);
327  if (itr == table_function_metadata_store_.end()) {
328  throw std::runtime_error("Failed to find Table Function Metadata with key '" +
329  std::string(key) + "'");
330  }
331  raw_data = itr->second.first.data();
332  num_bytes = itr->second.first.size();
333  value_type = itr->second.second;
334  }
335 
336  private:
338  int8_t* ptr;
339  const size_t size;
340  const bool physical_buffer;
341  };
342 
343  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
344  std::vector<CountDistinctSet*> count_distinct_sets_;
345  std::vector<int64_t*> group_by_buffers_;
346  std::vector<void*> varlen_buffers_;
347  std::list<std::string> strings_;
348  std::list<std::vector<int64_t>> arrays_;
349  std::unordered_map<int, std::shared_ptr<StringDictionaryProxy>> str_dict_proxy_owned_;
350  std::map<std::string, StringDictionaryProxy::IdMap>
352  std::map<std::string, StringDictionaryProxy::IdMap>
354  std::map<std::string, StringDictionaryProxy::TranslationMap<Datum>>
356  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
358  std::vector<void*> col_buffers_;
359  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
360  std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
361  std::map<std::string, std::shared_ptr<StringOps_Namespace::StringOps>>
363 
364  size_t arena_block_size_; // for cloning
365  std::vector<std::unique_ptr<Arena>> allocators_;
366 
367  mutable std::mutex state_mutex_;
368 
369  using MetadataValue = std::pair<std::vector<uint8_t>, TableFunctionMetadataType>;
370  std::map<std::string, MetadataValue> table_function_metadata_store_;
372 
373  friend class ResultSet;
374  friend class QueryExecutionContext;
375 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:230
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:35
std::vector< std::unique_ptr< Arena > > allocators_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes, const size_t thread_idx=0)
std::list< std::vector< int64_t > > arrays_
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const int source_dict_id_in, const int dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:617
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
std::unordered_map< int, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::map< std::string, StringDictionaryProxy::TranslationMap< Datum > > str_proxy_numeric_translation_maps_owned_
std::string generate_translation_map_key(const int32_t source_proxy_id, const int32_t dest_proxy_id, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
const StringDictionaryProxy::TranslationMap< Datum > * addStringProxyNumericTranslationMap(const StringDictionaryProxy *source_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::list< std::string > strings_
Calculate approximate median and general quantiles, based on &quot;Computing Extremely Accurate Quantiles ...
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
int8_t * allocate(const size_t num_bytes, const size_t thread_idx=0) override
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:62
std::vector< CountDistinctSet * > count_distinct_sets_
StringDictionary * getDictionary() const noexcept
std::pair< std::vector< uint8_t >, TableFunctionMetadataType > MetadataValue
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:646
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getLiteralStringDictProxy() const
std::mutex table_function_metadata_store_mutex_
TranslationMap< Datum > buildNumericTranslationMap(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
std::map< std::string, MetadataValue > table_function_metadata_store_
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_intersection_translation_maps_owned_
void setDictionaryGenerations(StringDictionaryGenerations generations)
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_union_translation_maps_owned_
std::map< std::string, std::shared_ptr< StringOps_Namespace::StringOps > > string_ops_owned_
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:542
std::vector< void * > varlen_buffers_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads=0)
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryGenerations string_dictionary_generations_
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
TableFunctionMetadataType
const StringOps_Namespace::StringOps * getStringOps(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
#define CHECK_LT(x, y)
Definition: Logger.h:232
void setTableFunctionMetadata(const char *key, const uint8_t *raw_data, const size_t num_bytes, const TableFunctionMetadataType value_type)
std::string generate_translation_map_key(const int32_t source_proxy_id, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
int32_t getDictId() const noexcept
void addVarlenBuffer(void *varlen_buffer)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
StringDictionaryProxy * getStringDictProxy(const int dict_id) const
Functions used to work with (approximate) count distinct sets.
void addCountDistinctSet(CountDistinctSet *count_distinct_set)
void getTableFunctionMetadata(const char *key, const uint8_t *&raw_data, size_t &num_bytes, TableFunctionMetadataType &value_type) const
void addGroupByBuffer(int64_t *group_by_buffer)
const StringDictionaryProxy::IdMap * addStringProxyUnionTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
const StringDictionaryProxy::TranslationMap< Datum > * getOrAddStringProxyNumericTranslationMap(const int source_dict_id_in, const bool with_generation, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:636
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
#define CHECK(condition)
Definition: Logger.h:222
std::string * addString(const std::string &str)
const StringDictionaryProxy::IdMap * addStringProxyIntersectionTranslationMap(const StringDictionaryProxy *source_proxy, const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_
friend class ResultSet