OmniSciDB  8a228a1076
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <mutex>
22 #include <set>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "DataMgr/AbstractBuffer.h"
29 #include "DataMgr/DataMgr.h"
30 #include "Logger/Logger.h"
32 
33 class ResultSet;
34 
39 class RowSetMemoryOwner : boost::noncopyable {
40  public:
41  RowSetMemoryOwner(const size_t arena_block_size)
42  : arena_block_size_(arena_block_size)
43  , allocator_(std::make_unique<Arena>(arena_block_size)) {}
44 
45  int8_t* allocate(const size_t num_bytes) {
47  std::lock_guard<std::mutex> lock(state_mutex_);
48  return reinterpret_cast<int8_t*>(allocator_->allocate(num_bytes));
49  }
50 
51  int8_t* allocateCountDistinctBuffer(const size_t num_bytes) {
53  std::lock_guard<std::mutex> lock(state_mutex_);
54  auto ret = reinterpret_cast<int8_t*>(allocator_->allocateAndZero(num_bytes));
55  count_distinct_bitmaps_.emplace_back(
56  CountDistinctBitmapBuffer{ret, num_bytes, /*physical_buffer=*/true});
57  return ret;
58  }
59 
60  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
61  const size_t bytes,
62  const bool physical_buffer) {
63  std::lock_guard<std::mutex> lock(state_mutex_);
64  count_distinct_bitmaps_.emplace_back(
65  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, physical_buffer});
66  }
67 
68  void addCountDistinctSet(std::set<int64_t>* count_distinct_set) {
69  std::lock_guard<std::mutex> lock(state_mutex_);
70  count_distinct_sets_.push_back(count_distinct_set);
71  }
72 
73  void addGroupByBuffer(int64_t* group_by_buffer) {
74  std::lock_guard<std::mutex> lock(state_mutex_);
75  group_by_buffers_.push_back(group_by_buffer);
76  }
77 
78  void addVarlenBuffer(void* varlen_buffer) {
79  std::lock_guard<std::mutex> lock(state_mutex_);
80  varlen_buffers_.push_back(varlen_buffer);
81  }
82 
89  std::lock_guard<std::mutex> lock(state_mutex_);
91  varlen_input_buffers_.push_back(buffer);
92  }
93 
94  std::string* addString(const std::string& str) {
95  std::lock_guard<std::mutex> lock(state_mutex_);
96  strings_.emplace_back(str);
97  return &strings_.back();
98  }
99 
100  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
101  std::lock_guard<std::mutex> lock(state_mutex_);
102  arrays_.emplace_back(arr);
103  return &arrays_.back();
104  }
105 
106  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
107  const int dict_id,
108  const ssize_t generation) {
109  std::lock_guard<std::mutex> lock(state_mutex_);
110  auto it = str_dict_proxy_owned_.find(dict_id);
111  if (it != str_dict_proxy_owned_.end()) {
112  CHECK_EQ(it->second->getDictionary(), str_dict.get());
113  it->second->updateGeneration(generation);
114  return it->second.get();
115  }
117  .emplace(dict_id,
118  std::make_shared<StringDictionaryProxy>(str_dict, generation))
119  .first;
120  return it->second.get();
121  }
122 
123  StringDictionaryProxy* getStringDictProxy(const int dict_id) const {
124  std::lock_guard<std::mutex> lock(state_mutex_);
125  auto it = str_dict_proxy_owned_.find(dict_id);
126  CHECK(it != str_dict_proxy_owned_.end());
127  return it->second.get();
128  }
129 
131  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
132  std::lock_guard<std::mutex> lock(state_mutex_);
133  lit_str_dict_proxy_ = lit_str_dict_proxy;
134  }
135 
137  std::lock_guard<std::mutex> lock(state_mutex_);
138  return lit_str_dict_proxy_.get();
139  }
140 
141  void addColBuffer(const void* col_buffer) {
142  std::lock_guard<std::mutex> lock(state_mutex_);
143  col_buffers_.push_back(const_cast<void*>(col_buffer));
144  }
145 
147  for (auto count_distinct_set : count_distinct_sets_) {
148  delete count_distinct_set;
149  }
150  for (auto group_by_buffer : group_by_buffers_) {
151  free(group_by_buffer);
152  }
153  for (auto varlen_buffer : varlen_buffers_) {
154  free(varlen_buffer);
155  }
156  for (auto varlen_input_buffer : varlen_input_buffers_) {
157  CHECK(varlen_input_buffer);
158  varlen_input_buffer->unPin();
159  }
160  for (auto col_buffer : col_buffers_) {
161  free(col_buffer);
162  }
163  }
164 
165  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
166  auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_);
167  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
168  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
169  return rtn;
170  }
171 
172  private:
174  int8_t* ptr;
175  const size_t size;
176  const bool physical_buffer;
177  };
178 
179  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
180  std::vector<std::set<int64_t>*> count_distinct_sets_;
181  std::vector<int64_t*> group_by_buffers_;
182  std::vector<void*> varlen_buffers_;
183  std::list<std::string> strings_;
184  std::list<std::vector<int64_t>> arrays_;
185  std::unordered_map<int, std::shared_ptr<StringDictionaryProxy>> str_dict_proxy_owned_;
186  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
187  std::vector<void*> col_buffers_;
188  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
189 
190  size_t arena_block_size_; // for cloning
191  std::unique_ptr<Arena> allocator_;
192 
193  mutable std::mutex state_mutex_;
194 
195  friend class ResultSet;
196  friend class QueryExecutionContext;
197 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::list< std::vector< int64_t > > arrays_
std::unordered_map< int, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::list< std::string > strings_
void addCountDistinctSet(std::set< int64_t > *count_distinct_set)
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool physical_buffer)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Executor *executor)
Definition: ResultSet.cpp:104
std::vector< std::set< int64_t > * > count_distinct_sets_
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
int8_t * allocate(const size_t num_bytes)
std::vector< void * > varlen_buffers_
StringDictionaryProxy * getStringDictProxy(const int dict_id) const
int8_t * allocateCountDistinctBuffer(const size_t num_bytes)
std::unique_ptr< Arena > allocator_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const ssize_t generation)
StringDictionaryProxy * getLiteralStringDictProxy() const
void addVarlenBuffer(void *varlen_buffer)
void addGroupByBuffer(int64_t *group_by_buffer)
#define CHECK(condition)
Definition: Logger.h:197
std::string * addString(const std::string &str)
RowSetMemoryOwner(const size_t arena_block_size)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_
friend class ResultSet