OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "DataMgr/AbstractBuffer.h"
29 #include "DataMgr/DataMgr.h"
30 #include "Logger/Logger.h"
33 #include "Shared/quantile.h"
35 #include "StringOps/StringOps.h"
36 
37 namespace Catalog_Namespace {
38 class Catalog;
39 }
40 
41 class ResultSet;
42 
47 class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
48  public:
49  RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads = 0)
50  : arena_block_size_(arena_block_size) {
51  for (size_t i = 0; i < num_kernel_threads + 1; i++) {
52  allocators_.emplace_back(std::make_unique<DramArena>(arena_block_size));
53  }
54  CHECK(!allocators_.empty());
55  }
56 
58 
59  int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) override {
60  CHECK_LT(thread_idx, allocators_.size());
61  auto allocator = allocators_[thread_idx].get();
62  std::lock_guard<std::mutex> lock(state_mutex_);
63  return reinterpret_cast<int8_t*>(allocator->allocate(num_bytes));
64  }
65 
66  int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
67  const size_t thread_idx = 0) {
68  int8_t* buffer = allocate(num_bytes, thread_idx);
69  std::memset(buffer, 0, num_bytes);
70  addCountDistinctBuffer(buffer, num_bytes, /*physical_buffer=*/true);
71  return buffer;
72  }
73 
74  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
75  const size_t bytes,
76  const bool physical_buffer) {
77  std::lock_guard<std::mutex> lock(state_mutex_);
78  count_distinct_bitmaps_.emplace_back(
79  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
80  }
81 
82  void addCountDistinctSet(CountDistinctSet* count_distinct_set) {
83  std::lock_guard<std::mutex> lock(state_mutex_);
84  count_distinct_sets_.push_back(count_distinct_set);
85  }
86 
87  void addGroupByBuffer(int64_t* group_by_buffer) {
88  std::lock_guard<std::mutex> lock(state_mutex_);
89  group_by_buffers_.push_back(group_by_buffer);
90  }
91 
92  void addVarlenBuffer(void* varlen_buffer) {
93  std::lock_guard<std::mutex> lock(state_mutex_);
94  if (std::find(varlen_buffers_.begin(), varlen_buffers_.end(), varlen_buffer) ==
95  varlen_buffers_.end()) {
96  varlen_buffers_.push_back(varlen_buffer);
97  }
98  }
99 
106  std::lock_guard<std::mutex> lock(state_mutex_);
108  varlen_input_buffers_.push_back(buffer);
109  }
110 
111  std::string* addString(const std::string& str) {
112  std::lock_guard<std::mutex> lock(state_mutex_);
113  strings_.emplace_back(str);
114  return &strings_.back();
115  }
116 
117  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
118  std::lock_guard<std::mutex> lock(state_mutex_);
119  arrays_.emplace_back(arr);
120  return &arrays_.back();
121  }
122 
123  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
124  const int dict_id,
125  const int64_t generation) {
126  std::lock_guard<std::mutex> lock(state_mutex_);
127  auto it = str_dict_proxy_owned_.find(dict_id);
128  if (it != str_dict_proxy_owned_.end()) {
129  CHECK_EQ(it->second->getDictionary(), str_dict.get());
130  it->second->updateGeneration(generation);
131  return it->second.get();
132  }
134  .emplace(
135  dict_id,
136  std::make_shared<StringDictionaryProxy>(str_dict, dict_id, generation))
137  .first;
138  return it->second.get();
139  }
140 
142  const int32_t source_proxy_id,
143  const int32_t dest_proxy_id,
144  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
145  std::ostringstream oss;
146  oss << "{source_dict_id: " << source_proxy_id << ", dest_dict_id: " << dest_proxy_id
147  << " StringOps: " << string_op_infos << "}";
148  // for (const auto& string_op_info : string_op_infos) {
149  // oss << "{" << string_op_info.toString() << "}";
150  //}
151  // oss << "]";
152  return oss.str();
153  }
154 
156  const StringDictionaryProxy* source_proxy,
157  const StringDictionaryProxy* dest_proxy,
158  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
159  std::lock_guard<std::mutex> lock(state_mutex_);
160  const auto map_key =
162  dest_proxy->getDictionary()->getDictId(),
163  string_op_infos);
164  auto it = str_proxy_intersection_translation_maps_owned_.find(map_key);
167  .emplace(map_key,
169  dest_proxy, string_op_infos))
170  .first;
171  }
172  return &it->second;
173  }
174 
176  const StringDictionaryProxy* source_proxy,
177  StringDictionaryProxy* dest_proxy,
178  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
179  std::lock_guard<std::mutex> lock(state_mutex_);
180  const auto map_key =
182  dest_proxy->getDictionary()->getDictId(),
183  string_op_infos);
184  auto it = str_proxy_union_translation_maps_owned_.find(map_key);
185  if (it == str_proxy_union_translation_maps_owned_.end()) {
187  .emplace(map_key,
189  dest_proxy, string_op_infos))
190  .first;
191  }
192  return &it->second;
193  }
194 
195  const StringOps_Namespace::StringOps* getStringOps(
196  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos) {
197  std::lock_guard<std::mutex> lock(state_mutex_);
198  const auto map_key = generate_translation_map_key(0, 0, string_op_infos);
199  auto it = string_ops_owned_.find(map_key);
200  if (it == string_ops_owned_.end()) {
201  it = string_ops_owned_
202  .emplace(map_key,
203  std::make_shared<StringOps_Namespace::StringOps>(string_op_infos))
204  .first;
205  }
206  return it->second.get();
207  }
208 
209  StringDictionaryProxy* getStringDictProxy(const int dict_id) const {
210  std::lock_guard<std::mutex> lock(state_mutex_);
211  auto it = str_dict_proxy_owned_.find(dict_id);
212  CHECK(it != str_dict_proxy_owned_.end());
213  return it->second.get();
214  }
215 
217  const int dict_id_in,
218  const bool with_generation,
219  const Catalog_Namespace::Catalog* catalog);
220 
222  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
223  std::lock_guard<std::mutex> lock(state_mutex_);
224  lit_str_dict_proxy_ = lit_str_dict_proxy;
225  }
226 
228  std::lock_guard<std::mutex> lock(state_mutex_);
229  return lit_str_dict_proxy_.get();
230  }
231 
233  const int source_dict_id_in,
234  const int dest_dict_id_in,
235  const bool with_generation,
236  const StringTranslationType translation_map_type,
237  const std::vector<StringOps_Namespace::StringOpInfo>& string_op_infos,
238  const Catalog_Namespace::Catalog* catalog);
239 
240  void addColBuffer(const void* col_buffer) {
241  std::lock_guard<std::mutex> lock(state_mutex_);
242  col_buffers_.push_back(const_cast<void*>(col_buffer));
243  }
244 
246  for (auto count_distinct_set : count_distinct_sets_) {
247  delete count_distinct_set;
248  }
249  for (auto group_by_buffer : group_by_buffers_) {
250  free(group_by_buffer);
251  }
252  for (auto varlen_buffer : varlen_buffers_) {
253  free(varlen_buffer);
254  }
255  for (auto varlen_input_buffer : varlen_input_buffers_) {
256  CHECK(varlen_input_buffer);
257  varlen_input_buffer->unPin();
258  }
259  for (auto col_buffer : col_buffers_) {
260  free(col_buffer);
261  }
262  }
263 
264  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
265  auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_, /*num_kernels=*/1);
266  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
267  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
268  return rtn;
269  }
270 
272  string_dictionary_generations_ = generations;
273  }
274 
277  }
278 
279  quantile::TDigest* nullTDigest(double const q);
280 
281  private:
283  int8_t* ptr;
284  const size_t size;
285  const bool physical_buffer;
286  };
287 
288  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
289  std::vector<CountDistinctSet*> count_distinct_sets_;
290  std::vector<int64_t*> group_by_buffers_;
291  std::vector<void*> varlen_buffers_;
292  std::list<std::string> strings_;
293  std::list<std::vector<int64_t>> arrays_;
294  std::unordered_map<int, std::shared_ptr<StringDictionaryProxy>> str_dict_proxy_owned_;
295  std::map<std::string, StringDictionaryProxy::IdMap>
297  std::map<std::string, StringDictionaryProxy::IdMap>
299  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
301  std::vector<void*> col_buffers_;
302  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
303  std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
304  std::map<std::string, std::shared_ptr<StringOps_Namespace::StringOps>>
306 
307  size_t arena_block_size_; // for cloning
308  std::vector<std::unique_ptr<Arena>> allocators_;
309 
310  mutable std::mutex state_mutex_;
311 
312  friend class ResultSet;
313  friend class QueryExecutionContext;
314 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:231
robin_hood::unordered_set< int64_t > CountDistinctSet
Definition: CountDistinct.h:37
std::vector< std::unique_ptr< Arena > > allocators_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes, const size_t thread_idx=0)
std::list< std::vector< int64_t > > arrays_
const StringDictionaryProxy::IdMap * getOrAddStringProxyTranslationMap(const int source_dict_id_in, const int dest_dict_id_in, const bool with_generation, const StringTranslationType translation_map_type, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:610
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
std::unordered_map< int, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::string generate_translation_map_key(const int32_t source_proxy_id, const int32_t dest_proxy_id, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
std::list< std::string > strings_
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
int8_t * allocate(const size_t num_bytes, const size_t thread_idx=0) override
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:62
std::vector< CountDistinctSet * > count_distinct_sets_
StringDictionary * getDictionary() const noexcept
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:628
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getLiteralStringDictProxy() const
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_intersection_translation_maps_owned_
void setDictionaryGenerations(StringDictionaryGenerations generations)
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
std::map< std::string, StringDictionaryProxy::IdMap > str_proxy_union_translation_maps_owned_
std::map< std::string, std::shared_ptr< StringOps_Namespace::StringOps > > string_ops_owned_
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:548
std::vector< void * > varlen_buffers_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads=0)
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryGenerations string_dictionary_generations_
IdMap buildUnionTranslationMapToOtherProxy(StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_types) const
const StringOps_Namespace::StringOps * getStringOps(const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
#define CHECK_LT(x, y)
Definition: Logger.h:233
int32_t getDictId() const noexcept
void addVarlenBuffer(void *varlen_buffer)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
StringDictionaryProxy * getStringDictProxy(const int dict_id) const
Functions used to work with (approximate) count distinct sets.
void addCountDistinctSet(CountDistinctSet *count_distinct_set)
void addGroupByBuffer(int64_t *group_by_buffer)
const StringDictionaryProxy::IdMap * addStringProxyUnionTranslationMap(const StringDictionaryProxy *source_proxy, StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
IdMap buildIntersectionTranslationMapToOtherProxy(const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos) const
Builds a vectorized string_id translation map from this proxy to dest_proxy.
#define CHECK(condition)
Definition: Logger.h:223
std::string * addString(const std::string &str)
const StringDictionaryProxy::IdMap * addStringProxyIntersectionTranslationMap(const StringDictionaryProxy *source_proxy, const StringDictionaryProxy *dest_proxy, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_
friend class ResultSet