OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetStorage.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #ifndef QUERYENGINE_RESULTSETSTORAGE_H
24 #define QUERYENGINE_RESULTSETSTORAGE_H
25 
26 #include "CardinalityEstimator.h"
27 #include "DataMgr/Chunk/Chunk.h"
29 #include "TargetValue.h"
30 
31 #include <atomic>
32 #include <functional>
33 #include <list>
34 
35 /*
36  * Stores the underlying buffer and the meta-data for a result set. The buffer
37  * format reflects the main requirements for result sets. Not all queries
38  * specify a GROUP BY clause, but since it's the most important and challenging
39  * case we'll focus on it. Note that the meta-data is stored separately from
40  * the buffer and it's not transferred to GPU.
41  *
42  * 1. It has to be efficient for reduction of partial GROUP BY query results
43  * from multiple devices / cores, the cardinalities can be high. Reduction
44  * currently happens on the host.
45  * 2. No conversions should be needed when buffers are transferred from GPU to
46  * host for reduction. This implies the buffer needs to be "flat", with no
47  * pointers to chase since they have no meaning in a different address space.
48  * 3. Must be size-efficient.
49  *
50  * There are several variations of the format of a result set buffer, but the
51  * most common is a sequence of entries which represent a row in the result or
52  * an empty slot. One entry looks as follows:
53  *
54  * +-+-+-+-+-+-+-+-+-+-+-+--?--+-+-+-+-+-+-+-+-+-+-+-+-+
55  * |key_0| ... |key_N-1| padding |value_0|...|value_N-1|
56  * +-+-+-+-+-+-+-+-+-+-+-+--?--+-+-+-+-+-+-+-+-+-+-+-+-+
57  *
58  * (key_0 ... key_N-1) is a multiple component key, unique within the buffer.
59  * It stores the tuple specified by the GROUP BY clause. All components have
60  * the same width, 4 or 8 bytes. For the 4-byte components, 4-byte padding is
61  * added if the number of components is odd. Not all entries in the buffer are
62  * valid; an empty entry contains EMPTY_KEY_{64, 32} for 8-byte / 4-byte width,
63  * respectively. An empty entry is ignored by subsequent operations on the
64  * result set (reduction, iteration, sort etc).
65  *
66  * value_0 through value_N-1 are 8-byte fields which hold the columns of the
67  * result, like aggregates and projected expressions. They're reduced between
68  * multiple partial results for identical (key_0 ... key_N-1) tuples.
69  *
70  * The order of entries is decided by the type of hash used, which depends on
71  * the range of the keys. For small enough ranges, a perfect hash is used. When
72  * a perfect hash isn't feasible, open addressing (using MurmurHash) with linear
73  * probing is used instead, with a 50% fill rate.
74  */
75 
76 struct ReductionCode;
77 
79  const bool is_lazily_fetched;
80  const int local_col_id;
82 };
83 
85  const int64_t value;
86  const bool valid;
87 };
88 
91  int8_t* cpu_buffer_ptr;
92 
93  int8_t* computeCpuOffset(const int64_t gpu_offset_address) const;
94 };
95 
97  private:
98  ResultSetStorage(const std::vector<TargetInfo>& targets,
100  int8_t* buff,
101  const bool buff_is_provided);
102 
103  public:
104  void reduce(const ResultSetStorage& that,
105  const std::vector<std::string>& serialized_varlen_buffer,
106  const ReductionCode& reduction_code,
107  const size_t executor_id) const;
108 
110  const std::vector<std::string>& serialized_varlen_buffer) const;
111 
112  int8_t* getUnderlyingBuffer() const;
113 
114  size_t getEntryCount() const { return query_mem_desc_.getEntryCount(); }
115 
116  template <class KeyType>
117  void moveEntriesToBuffer(int8_t* new_buff, const size_t new_entry_count) const;
118 
119  template <class KeyType>
120  void moveOneEntryToBuffer(const size_t entry_index,
121  int64_t* new_buff_i64,
122  const size_t new_entry_count,
123  const size_t key_count,
124  const size_t row_qw_count,
125  const int64_t* src_buff,
126  const size_t key_byte_width) const;
127 
128  void updateEntryCount(const size_t new_entry_count) {
129  query_mem_desc_.setEntryCount(new_entry_count);
130  }
131 
132  void reduceOneApproxQuantileSlot(int8_t* this_ptr1,
133  const int8_t* that_ptr1,
134  const size_t target_logical_idx,
135  const ResultSetStorage& that) const;
136 
137  // Reduces results for a single row when using interleaved bin layouts
138  static bool reduceSingleRow(const int8_t* row_ptr,
139  const int8_t warp_count,
140  const bool is_columnar,
141  const bool replace_bitmap_ptr_with_bitmap_sz,
142  std::vector<int64_t>& agg_vals,
143  const QueryMemoryDescriptor& query_mem_desc,
144  const std::vector<TargetInfo>& targets,
145  const std::vector<int64_t>& agg_init_vals);
146 
147  private:
149  int8_t* this_buff,
150  const int8_t* that_buff,
151  const ResultSetStorage& that,
152  const size_t start_index,
153  const size_t end_index,
154  const std::vector<std::string>& serialized_varlen_buffer,
155  const size_t executor_id) const;
156 
157  void copyKeyColWise(const size_t entry_idx,
158  int8_t* this_buff,
159  const int8_t* that_buff) const;
160 
161  bool isEmptyEntry(const size_t entry_idx, const int8_t* buff) const;
162  bool isEmptyEntry(const size_t entry_idx) const;
163  bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t* buff) const;
164 
165  void reduceOneEntryBaseline(int8_t* this_buff,
166  const int8_t* that_buff,
167  const size_t i,
168  const size_t that_entry_count,
169  const ResultSetStorage& that) const;
170 
171  void reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
172  const int64_t* that_buff,
173  const size_t that_entry_idx,
174  const size_t that_entry_count,
175  const ResultSetStorage& that) const;
176 
177  void initializeBaselineValueSlots(int64_t* this_entry_slots) const;
178 
179  void reduceOneSlotBaseline(int64_t* this_buff,
180  const size_t this_slot,
181  const int64_t* that_buff,
182  const size_t that_entry_count,
183  const size_t that_slot,
184  const TargetInfo& target_info,
185  const size_t target_logical_idx,
186  const size_t target_slot_idx,
187  const size_t init_agg_val_idx,
188  const ResultSetStorage& that) const;
189 
191  void reduceOneSlotSingleValue(int8_t* this_ptr1,
192  const TargetInfo& target_info,
193  const size_t target_slot_idx,
194  const size_t init_agg_val_idx,
195  const int8_t* that_ptr1) const;
196 
198  void reduceOneSlot(int8_t* this_ptr1,
199  int8_t* this_ptr2,
200  const int8_t* that_ptr1,
201  const int8_t* that_ptr2,
202  const TargetInfo& target_info,
203  const size_t target_logical_idx,
204  const size_t target_slot_idx,
205  const size_t init_agg_val_idx,
206  const ResultSetStorage& that,
207  const size_t first_slot_idx_for_target,
208  const std::vector<std::string>& serialized_varlen_buffer) const;
209 
210  void reduceOneCountDistinctSlot(int8_t* this_ptr1,
211  const int8_t* that_ptr1,
212  const size_t target_logical_idx,
213  const ResultSetStorage& that) const;
214 
215  void fillOneEntryRowWise(const std::vector<int64_t>& entry);
216 
217  void fillOneEntryColWise(const std::vector<int64_t>& entry);
218 
219  void initializeRowWise() const;
220 
221  void initializeColWise() const;
222 
224  return varlen_output_info_.get();
225  }
226 
227  // TODO(alex): remove the following two methods, see comment about
228  // count_distinct_sets_mapping_.
229  void addCountDistinctSetPointerMapping(const int64_t remote_ptr, const int64_t ptr);
230 
231  int64_t mappedPtr(const int64_t) const;
232 
233  size_t binSearchRowCount() const;
234 
235  const std::vector<TargetInfo> targets_;
237  int8_t* buff_;
238  const bool buff_is_provided_;
239  std::vector<int64_t> target_init_vals_;
240  // Provisional field used for multi-node until we improve the count distinct
241  // and flatten the main group by buffer and the distinct buffers in a single,
242  // contiguous buffer which we'll be able to serialize as a no-op. Used to
243  // re-route the pointers in the result set received over the wire to this
244  // machine address-space. Not efficient at all, just a placeholder!
245  std::unordered_map<int64_t, int64_t> count_distinct_sets_mapping_;
246 
247  // ptr to host varlen buffer and gpu address computation info
248  std::shared_ptr<VarlenOutputInfo> varlen_output_info_;
249 
250  friend class ResultSet;
251  friend class ResultSetManager;
252 };
253 
254 using GroupValueInfo = std::pair<int64_t*, bool>;
255 
256 namespace result_set {
257 
258 int64_t lazy_decode(const ColumnLazyFetchInfo& col_lazy_fetch,
259  const int8_t* byte_stream,
260  const int64_t pos);
261 
262 void fill_empty_key(void* key_ptr, const size_t key_count, const size_t key_width);
263 
264 int8_t get_width_for_slot(const size_t target_slot_idx,
265  const bool float_argument_input,
267 
268 size_t get_byteoff_of_slot(const size_t slot_idx,
269  const QueryMemoryDescriptor& query_mem_desc);
270 
271 GroupValueInfo get_group_value_reduction(int64_t* groups_buffer,
272  const uint32_t groups_buffer_entry_count,
273  const int64_t* key,
274  const uint32_t key_count,
275  const size_t key_width,
276  const QueryMemoryDescriptor& query_mem_desc,
277  const int64_t* that_buff_i64,
278  const size_t that_entry_idx,
279  const size_t that_entry_count,
280  const uint32_t row_size_quad);
281 
282 std::vector<int64_t> initialize_target_values_for_storage(
283  const std::vector<TargetInfo>& targets);
284 
285 } // namespace result_set
286 
287 #endif // QUERYENGINE_RESULTSETSTORAGE_H
const SQLTypeInfo type
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
void setEntryCount(const size_t val)
std::vector< int64_t > target_init_vals_
void initializeColWise() const
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
Utility functions for easy access to the result set buffers.
void addCountDistinctSetPointerMapping(const int64_t remote_ptr, const int64_t ptr)
ResultSetStorage(const std::vector< TargetInfo > &targets, const QueryMemoryDescriptor &query_mem_desc, int8_t *buff, const bool buff_is_provided)
void initializeRowWise() const
void reduceOneApproxQuantileSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
High-level representation of SQL values.
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
std::shared_ptr< VarlenOutputInfo > varlen_output_info_
friend class ResultSet
std::unordered_map< int64_t, int64_t > count_distinct_sets_mapping_
int8_t * getUnderlyingBuffer() const
const VarlenOutputInfo * getVarlenOutputInfo() const
const bool buff_is_provided_
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
size_t get_byteoff_of_slot(const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
void updateEntryCount(const size_t new_entry_count)
size_t binSearchRowCount() const
std::pair< int64_t *, bool > GroupValueInfo
int64_t lazy_decode(const ColumnLazyFetchInfo &col_lazy_fetch, const int8_t *byte_stream, const int64_t pos)
void fillOneEntryColWise(const std::vector< int64_t > &entry)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code, const size_t executor_id) const
size_t getEntryCount() const
std::vector< int64_t > initialize_target_values_for_storage(const std::vector< TargetInfo > &targets)
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer, const size_t executor_id) const
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
int64_t mappedPtr(const int64_t) const
int8_t * computeCpuOffset(const int64_t gpu_offset_address) const
const bool is_lazily_fetched
Estimators to be used when precise cardinality isn&#39;t useful.
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
#define ALWAYS_INLINE
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
QueryMemoryDescriptor query_mem_desc_