OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "Catalog/Catalog.h"
29 #include "DataMgr/AbstractBuffer.h"
31 #include "DataMgr/DataMgr.h"
32 #include "Logger/Logger.h"
34 #include "Shared/quantile.h"
36 
37 class ResultSet;
38 
43 class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
44  public:
45  RowSetMemoryOwner(const size_t arena_block_size)
46  : arena_block_size_(arena_block_size)
47  , allocator_(std::make_unique<Arena>(arena_block_size)) {}
48 
49  int8_t* allocate(const size_t num_bytes) override {
51  std::lock_guard<std::mutex> lock(state_mutex_);
52  return reinterpret_cast<int8_t*>(allocator_->allocate(num_bytes));
53  }
54 
55  int8_t* allocateCountDistinctBuffer(const size_t num_bytes) {
57  std::lock_guard<std::mutex> lock(state_mutex_);
58  auto ret = reinterpret_cast<int8_t*>(allocator_->allocateAndZero(num_bytes));
59  count_distinct_bitmaps_.emplace_back(
60  CountDistinctBitmapBuffer{ret, num_bytes, /*physical_buffer=*/true});
61  return ret;
62  }
63 
64  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
65  const size_t bytes,
66  const bool physical_buffer) {
67  std::lock_guard<std::mutex> lock(state_mutex_);
68  count_distinct_bitmaps_.emplace_back(
69  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
70  }
71 
72  void addCountDistinctSet(std::set<int64_t>* count_distinct_set) {
73  std::lock_guard<std::mutex> lock(state_mutex_);
74  count_distinct_sets_.push_back(count_distinct_set);
75  }
76 
77  void addGroupByBuffer(int64_t* group_by_buffer) {
78  std::lock_guard<std::mutex> lock(state_mutex_);
79  group_by_buffers_.push_back(group_by_buffer);
80  }
81 
82  void addVarlenBuffer(void* varlen_buffer) {
83  std::lock_guard<std::mutex> lock(state_mutex_);
84  varlen_buffers_.push_back(varlen_buffer);
85  }
86 
93  std::lock_guard<std::mutex> lock(state_mutex_);
95  varlen_input_buffers_.push_back(buffer);
96  }
97 
98  std::string* addString(const std::string& str) {
99  std::lock_guard<std::mutex> lock(state_mutex_);
100  strings_.emplace_back(str);
101  return &strings_.back();
102  }
103 
104  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
105  std::lock_guard<std::mutex> lock(state_mutex_);
106  arrays_.emplace_back(arr);
107  return &arrays_.back();
108  }
109 
110  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
111  const int dict_id,
112  const int64_t generation) {
113  std::lock_guard<std::mutex> lock(state_mutex_);
114  auto it = str_dict_proxy_owned_.find(dict_id);
115  if (it != str_dict_proxy_owned_.end()) {
116  CHECK_EQ(it->second->getDictionary(), str_dict.get());
117  it->second->updateGeneration(generation);
118  return it->second.get();
119  }
121  .emplace(dict_id,
122  std::make_shared<StringDictionaryProxy>(str_dict, generation))
123  .first;
124  return it->second.get();
125  }
126 
127  StringDictionaryProxy* getStringDictProxy(const int dict_id) const {
128  std::lock_guard<std::mutex> lock(state_mutex_);
129  auto it = str_dict_proxy_owned_.find(dict_id);
130  CHECK(it != str_dict_proxy_owned_.end());
131  return it->second.get();
132  }
133 
135  const int dict_id_in,
136  const bool with_generation,
137  const Catalog_Namespace::Catalog* catalog);
138 
140  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
141  std::lock_guard<std::mutex> lock(state_mutex_);
142  lit_str_dict_proxy_ = lit_str_dict_proxy;
143  }
144 
146  std::lock_guard<std::mutex> lock(state_mutex_);
147  return lit_str_dict_proxy_.get();
148  }
149 
150  void addColBuffer(const void* col_buffer) {
151  std::lock_guard<std::mutex> lock(state_mutex_);
152  col_buffers_.push_back(const_cast<void*>(col_buffer));
153  }
154 
156  for (auto count_distinct_set : count_distinct_sets_) {
157  delete count_distinct_set;
158  }
159  for (auto group_by_buffer : group_by_buffers_) {
160  free(group_by_buffer);
161  }
162  for (auto varlen_buffer : varlen_buffers_) {
163  free(varlen_buffer);
164  }
165  for (auto varlen_input_buffer : varlen_input_buffers_) {
166  CHECK(varlen_input_buffer);
167  varlen_input_buffer->unPin();
168  }
169  for (auto col_buffer : col_buffers_) {
170  free(col_buffer);
171  }
172  }
173 
174  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
175  auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_);
176  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
177  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
178  return rtn;
179  }
180 
182  string_dictionary_generations_ = generations;
183  }
184 
187  }
188 
190 
191  private:
193  int8_t* ptr;
194  const size_t size;
195  const bool physical_buffer;
196  };
197 
198  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
199  std::vector<std::set<int64_t>*> count_distinct_sets_;
200  std::vector<int64_t*> group_by_buffers_;
201  std::vector<void*> varlen_buffers_;
202  std::list<std::string> strings_;
203  std::list<std::vector<int64_t>> arrays_;
204  std::unordered_map<int, std::shared_ptr<StringDictionaryProxy>> str_dict_proxy_owned_;
205  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
207  std::vector<void*> col_buffers_;
208  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
209  std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
210 
211  size_t arena_block_size_; // for cloning
212  std::unique_ptr<Arena> allocator_;
213 
214  mutable std::mutex state_mutex_;
215 
216  friend class ResultSet;
217  friend class QueryExecutionContext;
218 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::list< std::vector< int64_t > > arrays_
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
std::unordered_map< int, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::list< std::string > strings_
void addCountDistinctSet(std::set< int64_t > *count_distinct_set)
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:57
StringDictionaryGenerations & getStringDictionaryGenerations()
StringDictionaryProxy * getLiteralStringDictProxy() const
int8_t * allocate(const size_t num_bytes) override
std::vector< std::set< int64_t > * > count_distinct_sets_
This file contains the class specification and related data structures for Catalog.
void setDictionaryGenerations(StringDictionaryGenerations generations)
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const int64_t generation)
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
StringDictionaryProxy * getOrAddStringDictProxy(const int dict_id_in, const bool with_generation, const Catalog_Namespace::Catalog *catalog)
Definition: Execute.cpp:219
std::vector< void * > varlen_buffers_
int8_t * allocateCountDistinctBuffer(const size_t num_bytes)
std::unique_ptr< Arena > allocator_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryGenerations string_dictionary_generations_
void addVarlenBuffer(void *varlen_buffer)
quantile::TDigest * nullTDigest()
Definition: Execute.cpp:242
std::vector< std::unique_ptr< quantile::TDigest > > t_digests_
StringDictionaryProxy * getStringDictProxy(const int dict_id) const
void addGroupByBuffer(int64_t *group_by_buffer)
#define CHECK(condition)
Definition: Logger.h:197
std::string * addString(const std::string &str)
RowSetMemoryOwner(const size_t arena_block_size)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_
friend class ResultSet