OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "DataMgr/AbstractBuffer.h"
30 #include "DataMgr/DataMgr.h"
31 #include "Logger/Logger.h"
33 #include "Shared/quantile.h"
35 
36 namespace Catalog_Namespace {
37 class Catalog;
38 }
39 
40 class ResultSet;
41 
46 class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
47  public:
48  RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads = 0)
49  : arena_block_size_(arena_block_size) {
50  for (size_t i = 0; i < num_kernel_threads + 1; i++) {
51  allocators_.emplace_back(std::make_unique<DramArena>(arena_block_size));
52  }
53  CHECK(!allocators_.empty());
54  }
55 
56  int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) override {
57  CHECK_LT(thread_idx, allocators_.size());
58  auto allocator = allocators_[thread_idx].get();
59  std::lock_guard<std::mutex> lock(state_mutex_);
60  return reinterpret_cast<int8_t*>(allocator->allocate(num_bytes));
61  }
62 
63  int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
64  const size_t thread_idx = 0) {
65  int8_t* buffer = allocate(num_bytes, thread_idx);
66  std::memset(buffer, 0, num_bytes);
67  addCountDistinctBuffer(buffer, num_bytes, /*physical_buffer=*/true);
68  return buffer;
69  }
70 
71  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
72  const size_t bytes,
73  const bool physical_buffer) {
74  std::lock_guard<std::mutex> lock(state_mutex_);
75  count_distinct_bitmaps_.emplace_back(
76  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
77  }
78 
79  void addCountDistinctSet(std::set<int64_t>* count_distinct_set) {
80  std::lock_guard<std::mutex> lock(state_mutex_);
81  count_distinct_sets_.push_back(count_distinct_set);
82  }
83 
84  void addGroupByBuffer(int64_t* group_by_buffer) {
85  std::lock_guard<std::mutex> lock(state_mutex_);
86  group_by_buffers_.push_back(group_by_buffer);
87  }
88 
89  void addVarlenBuffer(void* varlen_buffer) {
90  std::lock_guard<std::mutex> lock(state_mutex_);
91  varlen_buffers_.push_back(varlen_buffer);
92  }
93 
100  std::lock_guard<std::mutex> lock(state_mutex_);
102  varlen_input_buffers_.push_back(buffer);
103  }
104 
105  std::string* addString(const std::string& str) {
106  std::lock_guard<std::mutex> lock(state_mutex_);
107  strings_.emplace_back(str);
108  return &strings_.back();
109  }
110 
111  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
112  std::lock_guard<std::mutex> lock(state_mutex_);
113  arrays_.emplace_back(arr);
114  return &arrays_.back();
115  }
116 
117  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
118  const int dict_id,
119  const int64_t generation) {
120  std::lock_guard<std::mutex> lock(state_mutex_);
121  auto it = str_dict_proxy_owned_.find(dict_id);
122  if (it != str_dict_proxy_owned_.end()) {
123  CHECK_EQ(it->second->getDictionary(), str_dict.get());
124  it->second->updateGeneration(generation);
125  return it->second.get();
126  }
128  .emplace(
129  dict_id,
130  std::make_shared<StringDictionaryProxy>(str_dict, dict_id, generation))
131  .first;
132  return it->second.get();
133  }
134 
135  StringDictionaryProxy* getStringDictProxy(const int dict_id) const {
136  std::lock_guard<std::mutex> lock(state_mutex_);
137  auto it = str_dict_proxy_owned_.find(dict_id);
138  CHECK(it != str_dict_proxy_owned_.end());
139  return it->second.get();
140  }
141 
143  const int dict_id_in,
144  const bool with_generation,
145  const Catalog_Namespace::Catalog* catalog);
146 
148  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
149  std::lock_guard<std::mutex> lock(state_mutex_);
150  lit_str_dict_proxy_ = lit_str_dict_proxy;
151  }
152 
154  std::lock_guard<std::mutex> lock(state_mutex_);
155  return lit_str_dict_proxy_.get();
156  }
157 
158  void addColBuffer(const void* col_buffer) {
159  std::lock_guard<std::mutex> lock(state_mutex_);
160  col_buffers_.push_back(const_cast<void*>(col_buffer));
161  }
162 
164  for (auto count_distinct_set : count_distinct_sets_) {
165  delete count_distinct_set;
166  }
167  for (auto group_by_buffer : group_by_buffers_) {
168  free(group_by_buffer);
169  }
170  for (auto varlen_buffer : varlen_buffers_) {
171  free(varlen_buffer);
172  }
173  for (auto varlen_input_buffer : varlen_input_buffers_) {
174  CHECK(varlen_input_buffer);
175  varlen_input_buffer->unPin();
176  }
177  for (auto col_buffer : col_buffers_) {
178  free(col_buffer);
179  }
180  }
181 
182  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
183  auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_, /*num_kernels=*/1);
184  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
185  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
186  return rtn;
187  }
188 
190  string_dictionary_generations_ = generations;
191  }
192 
195  }
196 
197  quantile::TDigest* nullTDigest(double const q);
198 
199  private:
201  int8_t* ptr;
202  const size_t size;
203  const bool physical_buffer;
204  };
205 
206  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
207  std::vector<std::set<int64_t>*> count_distinct_sets_;
208  std::vector<int64_t*> group_by_buffers_;
209  std::vector<void*> varlen_buffers_;
210  std::list<std::string> strings_;
211  std::list<std::vector<int64_t>> arrays_;
212  std::unordered_map<int, std::shared_ptr<StringDictionaryProxy>> str_dict_proxy_owned_;
213  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
215  std::vector<void*> col_buffers_;
216  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
217  std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
218 
219  size_t arena_block_size_; // for cloning
220  std::vector<std::unique_ptr<Arena>> allocators_;
221 
222  mutable std::mutex state_mutex_;
223 
224  friend class ResultSet;
225  friend class QueryExecutionContext;
226 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::vector< std::unique_ptr< Arena > > allocators_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes, const size_t thread_idx=0)
std::list< std::vector< int64_t > > arrays_
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
std::unordered_map< int, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::list< std::string > strings_
void addCountDistinctSet(std::set< int64_t > *count_distinct_set)
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
int8_t * allocate(const size_t num_bytes, const size_t thread_idx=0) override
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:57
quantile::TDigest * nullTDigest(double const q)
Definition: Execute.cpp:265
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getLiteralStringDictProxy() const
std::vector< std::set< int64_t > * > count_distinct_sets_
void setDictionaryGenerations(StringDictionaryGenerations generations)
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:241
std::vector< void * > varlen_buffers_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads=0)
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryGenerations string_dictionary_generations_
#define CHECK_LT(x, y)
Definition: Logger.h:219
void addVarlenBuffer(void *varlen_buffer)
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
StringDictionaryProxy * getStringDictProxy(const int dict_id) const
void addGroupByBuffer(int64_t *group_by_buffer)
#define CHECK(condition)
Definition: Logger.h:209
std::string * addString(const std::string &str)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_
friend class ResultSet