OmniSciDB  eee9fa949c
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RowSetMemoryOwner.h
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/noncopyable.hpp>
20 #include <list>
21 #include <mutex>
22 #include <set>
23 #include <string>
24 #include <unordered_map>
25 #include <vector>
26 
27 #include "DataMgr/AbstractBuffer.h"
28 #include "Shared/Logger.h"
30 
31 class ResultSet;
32 
33 class RowSetMemoryOwner : boost::noncopyable {
34  public:
35  void addCountDistinctBuffer(int8_t* count_distinct_buffer,
36  const size_t bytes,
37  const bool system_allocated) {
38  std::lock_guard<std::mutex> lock(state_mutex_);
39  count_distinct_bitmaps_.emplace_back(
40  CountDistinctBitmapBuffer{count_distinct_buffer, bytes, system_allocated});
41  }
42 
43  void addCountDistinctSet(std::set<int64_t>* count_distinct_set) {
44  std::lock_guard<std::mutex> lock(state_mutex_);
45  count_distinct_sets_.push_back(count_distinct_set);
46  }
47 
48  void addGroupByBuffer(int64_t* group_by_buffer) {
49  std::lock_guard<std::mutex> lock(state_mutex_);
50  group_by_buffers_.push_back(group_by_buffer);
51  }
52 
53  void addVarlenBuffer(void* varlen_buffer) {
54  std::lock_guard<std::mutex> lock(state_mutex_);
55  varlen_buffers_.push_back(varlen_buffer);
56  }
57 
64  std::lock_guard<std::mutex> lock(state_mutex_);
66  varlen_input_buffers_.push_back(buffer);
67  }
68 
69  std::string* addString(const std::string& str) {
70  std::lock_guard<std::mutex> lock(state_mutex_);
71  strings_.emplace_back(str);
72  return &strings_.back();
73  }
74 
75  std::vector<int64_t>* addArray(const std::vector<int64_t>& arr) {
76  std::lock_guard<std::mutex> lock(state_mutex_);
77  arrays_.emplace_back(arr);
78  return &arrays_.back();
79  }
80 
81  StringDictionaryProxy* addStringDict(std::shared_ptr<StringDictionary> str_dict,
82  const int dict_id,
83  const ssize_t generation) {
84  std::lock_guard<std::mutex> lock(state_mutex_);
85  auto it = str_dict_proxy_owned_.find(dict_id);
86  if (it != str_dict_proxy_owned_.end()) {
87  CHECK_EQ(it->second->getDictionary(), str_dict.get());
88  it->second->updateGeneration(generation);
89  return it->second.get();
90  }
92  .emplace(dict_id,
93  std::make_shared<StringDictionaryProxy>(str_dict, generation))
94  .first;
95  return it->second.get();
96  }
97 
98  StringDictionaryProxy* getStringDictProxy(const int dict_id) const {
99  std::lock_guard<std::mutex> lock(state_mutex_);
100  auto it = str_dict_proxy_owned_.find(dict_id);
101  CHECK(it != str_dict_proxy_owned_.end());
102  return it->second.get();
103  }
104 
106  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy) {
107  std::lock_guard<std::mutex> lock(state_mutex_);
108  lit_str_dict_proxy_ = lit_str_dict_proxy;
109  }
110 
112  std::lock_guard<std::mutex> lock(state_mutex_);
113  return lit_str_dict_proxy_.get();
114  }
115 
116  void addColBuffer(const void* col_buffer) {
117  std::lock_guard<std::mutex> lock(state_mutex_);
118  col_buffers_.push_back(const_cast<void*>(col_buffer));
119  }
120 
122  for (const auto& count_distinct_buffer : count_distinct_bitmaps_) {
123  if (count_distinct_buffer.system_allocated) {
124  free(count_distinct_buffer.ptr);
125  }
126  }
127  for (auto count_distinct_set : count_distinct_sets_) {
128  delete count_distinct_set;
129  }
130  for (auto group_by_buffer : group_by_buffers_) {
131  free(group_by_buffer);
132  }
133  for (auto varlen_buffer : varlen_buffers_) {
134  free(varlen_buffer);
135  }
136  for (auto varlen_input_buffer : varlen_input_buffers_) {
137  CHECK(varlen_input_buffer);
138  varlen_input_buffer->unPin();
139  }
140  for (auto col_buffer : col_buffers_) {
141  free(col_buffer);
142  }
143  }
144 
145  std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
146  auto rtn = std::make_shared<RowSetMemoryOwner>();
147  rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
148  rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
149  return rtn;
150  }
151 
152  private:
154  int8_t* ptr;
155  const size_t size;
156  const bool system_allocated;
157  };
158 
159  std::vector<CountDistinctBitmapBuffer> count_distinct_bitmaps_;
160  std::vector<std::set<int64_t>*> count_distinct_sets_;
161  std::vector<int64_t*> group_by_buffers_;
162  std::vector<void*> varlen_buffers_;
163  std::list<std::string> strings_;
164  std::list<std::vector<int64_t>> arrays_;
165  std::unordered_map<int, std::shared_ptr<StringDictionaryProxy>> str_dict_proxy_owned_;
166  std::shared_ptr<StringDictionaryProxy> lit_str_dict_proxy_;
167  std::vector<void*> col_buffers_;
168  std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
169  mutable std::mutex state_mutex_;
170 
171  friend class ResultSet;
172 };
std::shared_ptr< RowSetMemoryOwner > cloneStrDictDataOnly()
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::list< std::vector< int64_t > > arrays_
std::unordered_map< int, std::shared_ptr< StringDictionaryProxy > > str_dict_proxy_owned_
void addVarlenInputBuffer(Data_Namespace::AbstractBuffer *buffer)
void addLiteralStringDictProxy(std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy)
std::list< std::string > strings_
void addCountDistinctSet(std::set< int64_t > *count_distinct_set)
virtual MemoryLevel getType() const =0
std::vector< int64_t > * addArray(const std::vector< int64_t > &arr)
StringDictionaryProxy * getLiteralStringDictProxy() const
void addCountDistinctBuffer(int8_t *count_distinct_buffer, const size_t bytes, const bool system_allocated)
std::vector< std::set< int64_t > * > count_distinct_sets_
std::vector< CountDistinctBitmapBuffer > count_distinct_bitmaps_
std::vector< void * > col_buffers_
CHECK(cgen_state)
std::shared_ptr< StringDictionaryProxy > lit_str_dict_proxy_
std::vector< void * > varlen_buffers_
std::vector< Data_Namespace::AbstractBuffer * > varlen_input_buffers_
An AbstractBuffer is a unit of data management for a data manager.
StringDictionaryProxy * addStringDict(std::shared_ptr< StringDictionary > str_dict, const int dict_id, const ssize_t generation)
void addVarlenBuffer(void *varlen_buffer)
StringDictionaryProxy * getStringDictProxy(const int dict_id) const
void addGroupByBuffer(int64_t *group_by_buffer)
std::string * addString(const std::string &str)
void addColBuffer(const void *col_buffer)
std::vector< int64_t * > group_by_buffers_