OmniSciDB  c07336695a
ResultSet.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #ifndef QUERYENGINE_RESULTSET_H
26 #define QUERYENGINE_RESULTSET_H
27 
28 #include "../Chunk/Chunk.h"
29 #include "CardinalityEstimator.h"
31 #include "TargetValue.h"
32 
33 #include <atomic>
34 #include <functional>
35 #include <list>
36 
37 /*
38  * Stores the underlying buffer and the meta-data for a result set. The buffer
39  * format reflects the main requirements for result sets. Not all queries
40  * specify a GROUP BY clause, but since it's the most important and challenging
41  * case we'll focus on it. Note that the meta-data is stored separately from
42  * the buffer and it's not transferred to GPU.
43  *
44  * 1. It has to be efficient for reduction of partial GROUP BY query results
45  * from multiple devices / cores, the cardinalities can be high. Reduction
46  * currently happens on the host.
47  * 2. No conversions should be needed when buffers are transferred from GPU to
48  * host for reduction. This implies the buffer needs to be "flat", with no
49  * pointers to chase since they have no meaning in a different address space.
50  * 3. Must be size-efficient.
51  *
52  * There are several variations of the format of a result set buffer, but the
53  * most common is a sequence of entries which represent a row in the result or
54  * an empty slot. One entry looks as follows:
55  *
56  * +-+-+-+-+-+-+-+-+-+-+-+--?--+-+-+-+-+-+-+-+-+-+-+-+-+
57  * |key_0| ... |key_N-1| padding |value_0|...|value_N-1|
58  * +-+-+-+-+-+-+-+-+-+-+-+--?--+-+-+-+-+-+-+-+-+-+-+-+-+
59  *
60  * (key_0 ... key_N-1) is a multiple component key, unique within the buffer.
61  * It stores the tuple specified by the GROUP BY clause. All components have
62  * the same width, 4 or 8 bytes. For the 4-byte components, 4-byte padding is
63  * added if the number of components is odd. Not all entries in the buffer are
64  * valid; an empty entry contains EMPTY_KEY_{64, 32} for 8-byte / 4-byte width,
65  * respectively. An empty entry is ignored by subsequent operations on the
66  * result set (reduction, iteration, sort etc).
67  *
68  * value_0 through value_N-1 are 8-byte fields which hold the columns of the
69  * result, like aggregates and projected expressions. They're reduced between
70  * multiple partial results for identical (key_0 ... key_N-1) tuples.
71  *
72  * The order of entries is decided by the type of hash used, which depends on
73  * the range of the keys. For small enough ranges, a perfect hash is used. When
74  * a perfect hash isn't feasible, open addressing (using MurmurHash) with linear
75  * probing is used instead, with a 50% fill rate.
76  */
77 
78 struct ReductionCode;
79 
81  public:
82  ResultSetStorage(const std::vector<TargetInfo>& targets,
83  const QueryMemoryDescriptor& query_mem_desc,
84  int8_t* buff,
85  const bool buff_is_provided);
86 
87  void reduce(const ResultSetStorage& that,
88  const std::vector<std::string>& serialized_varlen_buffer,
89  const ReductionCode& reduction_code) const;
90 
92  const std::vector<std::string>& serialized_varlen_buffer) const;
93 
94  int8_t* getUnderlyingBuffer() const;
95 
96  template <class KeyType>
97  void moveEntriesToBuffer(int8_t* new_buff, const size_t new_entry_count) const;
98 
99  template <class KeyType>
100  void moveOneEntryToBuffer(const size_t entry_index,
101  int64_t* new_buff_i64,
102  const size_t new_entry_count,
103  const size_t key_count,
104  const size_t row_qw_count,
105  const int64_t* src_buff,
106  const size_t key_byte_width) const;
107 
108  void updateEntryCount(const size_t new_entry_count) {
109  query_mem_desc_.setEntryCount(new_entry_count);
110  }
111 
112  // Reduces results for a single row when using interleaved bin layouts
113  static bool reduceSingleRow(const int8_t* row_ptr,
114  const int8_t warp_count,
115  const bool is_columnar,
116  const bool replace_bitmap_ptr_with_bitmap_sz,
117  std::vector<int64_t>& agg_vals,
118  const QueryMemoryDescriptor& query_mem_desc,
119  const std::vector<TargetInfo>& targets,
120  const std::vector<int64_t>& agg_init_vals);
121 
122  private:
124  int8_t* this_buff,
125  const int8_t* that_buff,
126  const ResultSetStorage& that,
127  const size_t start_index,
128  const size_t end_index,
129  const std::vector<std::string>& serialized_varlen_buffer) const;
130 
131  void copyKeyColWise(const size_t entry_idx,
132  int8_t* this_buff,
133  const int8_t* that_buff) const;
134 
136  const size_t i,
137  int8_t* this_buff,
138  const int8_t* that_buff,
139  const ResultSetStorage& that,
140  const std::vector<std::string>& serialized_varlen_buffer) const;
141 
142  bool isEmptyEntry(const size_t entry_idx, const int8_t* buff) const;
143  bool isEmptyEntry(const size_t entry_idx) const;
144  bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t* buff) const;
145 
146  void reduceOneEntryBaseline(int8_t* this_buff,
147  const int8_t* that_buff,
148  const size_t i,
149  const size_t that_entry_count,
150  const ResultSetStorage& that) const;
151 
152  void reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
153  const int64_t* that_buff,
154  const size_t that_entry_idx,
155  const size_t that_entry_count,
156  const ResultSetStorage& that) const;
157 
158  void initializeBaselineValueSlots(int64_t* this_entry_slots) const;
159 
160  void reduceOneSlotBaseline(int64_t* this_buff,
161  const size_t this_slot,
162  const int64_t* that_buff,
163  const size_t that_entry_count,
164  const size_t that_slot,
165  const TargetInfo& target_info,
166  const size_t target_logical_idx,
167  const size_t target_slot_idx,
168  const size_t init_agg_val_idx,
169  const ResultSetStorage& that) const;
170 
172  void reduceOneSlot(int8_t* this_ptr1,
173  int8_t* this_ptr2,
174  const int8_t* that_ptr1,
175  const int8_t* that_ptr2,
176  const TargetInfo& target_info,
177  const size_t target_logical_idx,
178  const size_t target_slot_idx,
179  const size_t init_agg_val_idx,
180  const ResultSetStorage& that,
181  const size_t first_slot_idx_for_target,
182  const std::vector<std::string>& serialized_varlen_buffer) const;
183 
184  void reduceOneCountDistinctSlot(int8_t* this_ptr1,
185  const int8_t* that_ptr1,
186  const size_t target_logical_idx,
187  const ResultSetStorage& that) const;
188 
189  void fillOneEntryRowWise(const std::vector<int64_t>& entry);
190 
191  void fillOneEntryColWise(const std::vector<int64_t>& entry);
192 
193  void initializeRowWise() const;
194 
195  void initializeColWise() const;
196 
197  // TODO(alex): remove the following two methods, see comment about
198  // count_distinct_sets_mapping_.
199  void addCountDistinctSetPointerMapping(const int64_t remote_ptr, const int64_t ptr);
200 
201  int64_t mappedPtr(const int64_t) const;
202 
203  const std::vector<TargetInfo> targets_;
205  int8_t* buff_;
206  const bool buff_is_provided_;
207  std::vector<int64_t> target_init_vals_;
208  // Provisional field used for multi-node until we improve the count distinct
209  // and flatten the main group by buffer and the distinct buffers in a single,
210  // contiguous buffer which we'll be able to serialize as a no-op. Used to
211  // re-route the pointers in the result set received over the wire to this
212  // machine address-space. Not efficient at all, just a placeholder!
213  std::unordered_map<int64_t, int64_t> count_distinct_sets_mapping_;
214 
215  friend class ResultSet;
216  friend class ResultSetManager;
217 };
218 
219 namespace Analyzer {
220 
221 class Expr;
222 class Estimator;
223 struct OrderEntry;
224 
225 } // namespace Analyzer
226 
227 class Executor;
228 
230  const bool is_lazily_fetched;
231  const int local_col_id;
233 };
234 
236  const int64_t value;
237  const bool valid;
238 };
239 
240 class ResultSet;
241 
243  public:
244  using value_type = std::vector<TargetValue>;
245  using difference_type = std::ptrdiff_t;
246  using pointer = std::vector<TargetValue>*;
247  using reference = std::vector<TargetValue>&;
248  using iterator_category = std::input_iterator_tag;
249 
250  bool operator==(const ResultSetRowIterator& other) const {
251  return result_set_ == other.result_set_ &&
252  crt_row_buff_idx_ == other.crt_row_buff_idx_;
253  }
254  bool operator!=(const ResultSetRowIterator& other) const { return !(*this == other); }
255 
256  inline value_type operator*() const;
257  inline ResultSetRowIterator& operator++(void);
259  ResultSetRowIterator iter(*this);
260  ++(*this);
261  return iter;
262  }
263 
264  size_t getCurrentRowBufferIndex() const {
265  if (crt_row_buff_idx_ == 0) {
266  throw std::runtime_error("current row buffer iteration index is undefined");
267  }
268  return crt_row_buff_idx_ - 1;
269  }
270 
271  private:
279 
281  bool translate_strings,
282  bool decimal_to_double)
283  : result_set_(rs)
284  , crt_row_buff_idx_(0)
285  , global_entry_idx_(0)
286  , global_entry_idx_valid_(false)
287  , fetched_so_far_(0)
288  , translate_strings_(translate_strings)
289  , decimal_to_double_(decimal_to_double){};
290 
291  ResultSetRowIterator(const ResultSet* rs) : ResultSetRowIterator(rs, false, false){};
292 
293  friend class ResultSet;
294 };
295 
296 class TSerializedRows;
297 
298 class ResultSet {
299  public:
300  ResultSet(const std::vector<TargetInfo>& targets,
301  const ExecutorDeviceType device_type,
302  const QueryMemoryDescriptor& query_mem_desc,
303  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
304  const Executor* executor);
305 
306  ResultSet(const std::vector<TargetInfo>& targets,
307  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
308  const std::vector<std::vector<const int8_t*>>& col_buffers,
309  const std::vector<std::vector<int64_t>>& frag_offsets,
310  const std::vector<int64_t>& consistent_frag_sizes,
311  const ExecutorDeviceType device_type,
312  const int device_id,
313  const QueryMemoryDescriptor& query_mem_desc,
314  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
315  const Executor* executor);
316 
317  ResultSet(const std::shared_ptr<const Analyzer::Estimator>,
318  const ExecutorDeviceType device_type,
319  const int device_id,
320  Data_Namespace::DataMgr* data_mgr);
321 
322  ResultSet(const std::string& explanation);
323 
324  ResultSet(int64_t queue_time_ms,
325  int64_t render_time_ms,
326  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner);
327 
328  ~ResultSet();
329 
330  inline ResultSetRowIterator rowIterator(size_t from_logical_index,
331  bool translate_strings,
332  bool decimal_to_double) const {
333  ResultSetRowIterator rowIterator(this, translate_strings, decimal_to_double);
334 
335  // move to first logical position
336  ++rowIterator;
337 
338  for (size_t index = 0; index < from_logical_index; index++) {
339  ++rowIterator;
340  }
341 
342  return rowIterator;
343  }
344 
345  inline ResultSetRowIterator rowIterator(bool translate_strings,
346  bool decimal_to_double) const {
347  return rowIterator(0, translate_strings, decimal_to_double);
348  }
349 
350  ExecutorDeviceType getDeviceType() const;
351 
352  const ResultSetStorage* allocateStorage() const;
353 
354  const ResultSetStorage* allocateStorage(int8_t*, const std::vector<int64_t>&) const;
355 
356  const ResultSetStorage* allocateStorage(const std::vector<int64_t>&) const;
357 
358  void updateStorageEntryCount(const size_t new_entry_count) {
360  query_mem_desc_.setEntryCount(new_entry_count);
361  CHECK(storage_);
362  storage_->updateEntryCount(new_entry_count);
363  }
364 
365  std::vector<TargetValue> getNextRow(const bool translate_strings,
366  const bool decimal_to_double) const;
367 
368  size_t getCurrentRowBufferIndex() const;
369 
370  std::vector<TargetValue> getRowAt(const size_t index) const;
371 
372  TargetValue getRowAt(const size_t row_idx,
373  const size_t col_idx,
374  const bool translate_strings,
375  const bool decimal_to_double = true) const;
376 
377  // Specialized random access getter for result sets with a single column to
378  // avoid the overhead of building a std::vector<TargetValue> result with only
379  // one element. Only used by RelAlgTranslator::getInIntegerSetExpr currently.
380  OneIntegerColumnRow getOneColRow(const size_t index) const;
381 
382  std::vector<TargetValue> getRowAtNoTranslations(
383  const size_t index,
384  const std::vector<bool>& targets_to_skip = {}) const;
385 
386  bool isRowAtEmpty(const size_t index) const;
387 
388  void sort(const std::list<Analyzer::OrderEntry>& order_entries, const size_t top_n);
389 
390  void keepFirstN(const size_t n);
391 
392  void dropFirstN(const size_t n);
393 
394  void append(ResultSet& that);
395 
396  const ResultSetStorage* getStorage() const;
397 
398  size_t colCount() const;
399 
400  SQLTypeInfo getColType(const size_t col_idx) const;
401 
402  size_t rowCount(const bool force_parallel = false) const;
403 
404  void setCachedRowCount(const size_t row_count) const;
405 
406  size_t entryCount() const;
407 
408  size_t getBufferSizeBytes(const ExecutorDeviceType device_type) const;
409 
410  bool definitelyHasNoRows() const;
411 
412  const QueryMemoryDescriptor& getQueryMemDesc() const;
413 
414  const std::vector<TargetInfo>& getTargetInfos() const;
415 
416  const std::vector<int64_t>& getTargetInitVals() const;
417 
418  int8_t* getDeviceEstimatorBuffer() const;
419 
420  int8_t* getHostEstimatorBuffer() const;
421 
422  void syncEstimatorBuffer() const;
423 
424  size_t getNDVEstimator() const;
425 
426  void setQueueTime(const int64_t queue_time);
427 
428  int64_t getQueueTime() const;
429 
430  int64_t getRenderTime() const;
431 
432  void moveToBegin() const;
433 
434  bool isTruncated() const;
435 
436  bool isExplain() const;
437 
438  bool isGeoColOnGpu(const size_t col_idx) const;
439  int getDeviceId() const;
440 
441  // Called from the executor because in the new ResultSet we assume the 'padded' field
442  // in SlotSize already contains the padding, whereas in the executor it's computed.
443  // Once the buffer initialization moves to ResultSet we can remove this method.
444  static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor&);
445 
446  void fillOneEntry(const std::vector<int64_t>& entry) {
447  CHECK(storage_);
448  if (storage_->query_mem_desc_.didOutputColumnar()) {
449  storage_->fillOneEntryColWise(entry);
450  } else {
451  storage_->fillOneEntryRowWise(entry);
452  }
453  }
454 
455  void initializeStorage() const;
456 
457  void holdChunks(const std::list<std::shared_ptr<Chunk_NS::Chunk>>& chunks) {
458  chunks_ = chunks;
459  }
460  void holdChunkIterators(const std::shared_ptr<std::list<ChunkIter>> chunk_iters) {
461  chunk_iters_.push_back(chunk_iters);
462  }
463  void holdLiterals(std::vector<int8_t>& literal_buff) {
464  literal_buffers_.push_back(std::move(literal_buff));
465  }
466 
467  std::shared_ptr<RowSetMemoryOwner> getRowSetMemOwner() const {
468  return row_set_mem_owner_;
469  }
470 
471  const std::vector<uint32_t>& getPermutationBuffer() const;
472  const bool isPermutationBufferEmpty() const { return permutation_.empty(); };
473 
474  void serialize(TSerializedRows& serialized_rows) const;
475 
476  static std::unique_ptr<ResultSet> unserialize(const TSerializedRows& serialized_rows,
477  const Executor*);
478 
479  size_t getLimit();
480 
484  enum class GeoReturnType {
487  WktString,
490  GeoTargetValueGpuPtr
492  };
493  GeoReturnType getGeoReturnType() const { return geo_return_type_; }
494  void setGeoReturnType(const GeoReturnType val) { geo_return_type_ = val; }
495 
496  void copyColumnIntoBuffer(const size_t column_idx,
497  int8_t* output_buffer,
498  const size_t output_buffer_size) const;
499 
500  /*
501  * Determines if it is possible to directly form a ColumnarResults class from this
502  * result set, bypassing the default row-wise columnarization.
503  *
504  * NOTE: If there exists a permutation vector (i.e., ORDER BY), it becomes equivalent to
505  * the row-wise columnarization.
506  */
508  return query_mem_desc_.didOutputColumnar() && permutation_.empty() &&
510  }
511 
512  const std::vector<ColumnLazyFetchInfo>& getLazyFetchInfo() const {
513  return lazy_fetch_info_;
514  }
515 
516  void setSeparateVarlenStorageValid(const bool val) {
517  separate_varlen_storage_valid_ = val;
518  }
519 
520  std::shared_ptr<const std::vector<std::string>> getStringDictionaryPayloadCopy(
521  const int dict_id) const;
522 
523  private:
524  void advanceCursorToNextEntry(ResultSetRowIterator& iter) const;
525 
526  std::vector<TargetValue> getNextRowImpl(const bool translate_strings,
527  const bool decimal_to_double) const;
528 
529  std::vector<TargetValue> getNextRowUnlocked(const bool translate_strings,
530  const bool decimal_to_double) const;
531 
532  std::vector<TargetValue> getRowAt(const size_t index,
533  const bool translate_strings,
534  const bool decimal_to_double,
535  const bool fixup_count_distinct_pointers,
536  const std::vector<bool>& targets_to_skip = {}) const;
537 
538  size_t parallelRowCount() const;
539 
540  size_t advanceCursorToNextEntry() const;
541 
542  void radixSortOnGpu(const std::list<Analyzer::OrderEntry>& order_entries) const;
543 
544  void radixSortOnCpu(const std::list<Analyzer::OrderEntry>& order_entries) const;
545 
546  static bool isNull(const SQLTypeInfo& ti,
547  const InternalTargetValue& val,
548  const bool float_argument_input);
549 
550  TargetValue getTargetValueFromBufferRowwise(
551  int8_t* rowwise_target_ptr,
552  int8_t* keys_ptr,
553  const size_t entry_buff_idx,
554  const TargetInfo& target_info,
555  const size_t target_logical_idx,
556  const size_t slot_idx,
557  const bool translate_strings,
558  const bool decimal_to_double,
559  const bool fixup_count_distinct_pointers) const;
560 
561  TargetValue getTargetValueFromBufferColwise(const int8_t* col_ptr,
562  const int8_t* keys_ptr,
563  const QueryMemoryDescriptor& query_mem_desc,
564  const size_t local_entry_idx,
565  const size_t global_entry_idx,
566  const TargetInfo& target_info,
567  const size_t target_logical_idx,
568  const size_t slot_idx,
569  const bool translate_strings,
570  const bool decimal_to_double) const;
571 
572  TargetValue makeTargetValue(const int8_t* ptr,
573  const int8_t compact_sz,
574  const TargetInfo& target_info,
575  const size_t target_logical_idx,
576  const bool translate_strings,
577  const bool decimal_to_double,
578  const size_t entry_buff_idx) const;
579 
580  TargetValue makeVarlenTargetValue(const int8_t* ptr1,
581  const int8_t compact_sz1,
582  const int8_t* ptr2,
583  const int8_t compact_sz2,
584  const TargetInfo& target_info,
585  const size_t target_logical_idx,
586  const bool translate_strings,
587  const size_t entry_buff_idx) const;
588 
590  int8_t* ptr1;
591  int8_t compact_sz1;
592  int8_t* ptr2;
593  int8_t compact_sz2;
594 
596  : ptr1(nullptr), compact_sz1(0), ptr2(nullptr), compact_sz2(0) {}
597  };
598  TargetValue makeGeoTargetValue(const int8_t* geo_target_ptr,
599  const size_t slot_idx,
600  const TargetInfo& target_info,
601  const size_t target_logical_idx,
602  const size_t entry_buff_idx) const;
603 
606  const size_t fixedup_entry_idx;
607  const size_t storage_idx;
608  };
609 
610  InternalTargetValue getColumnInternal(
611  const int8_t* buff,
612  const size_t entry_idx,
613  const size_t target_logical_idx,
614  const StorageLookupResult& storage_lookup_result) const;
615 
616  InternalTargetValue getVarlenOrderEntry(const int64_t str_ptr,
617  const size_t str_len) const;
618 
619  int64_t lazyReadInt(const int64_t ival,
620  const size_t target_logical_idx,
621  const StorageLookupResult& storage_lookup_result) const;
622 
623  std::pair<ssize_t, size_t> getStorageIndex(const size_t entry_idx) const;
624 
625  const std::vector<const int8_t*>& getColumnFrag(const size_t storge_idx,
626  const size_t col_logical_idx,
627  int64_t& global_idx) const;
628 
629  StorageLookupResult findStorage(const size_t entry_idx) const;
630 
631  struct TargetOffsets {
632  const int8_t* ptr1;
633  const size_t compact_sz1;
634  const int8_t* ptr2;
635  const size_t compact_sz2;
636  };
637 
639  RowWiseTargetAccessor(const ResultSet* result_set)
640  : result_set_(result_set)
641  , row_bytes_(get_row_bytes(result_set->query_mem_desc_))
642  , key_width_(result_set_->query_mem_desc_.getEffectiveKeyWidth())
643  , key_bytes_with_padding_(
645  initializeOffsetsForStorage();
646  }
647 
648  InternalTargetValue getColumnInternal(
649  const int8_t* buff,
650  const size_t entry_idx,
651  const size_t target_logical_idx,
652  const StorageLookupResult& storage_lookup_result) const;
653 
654  void initializeOffsetsForStorage();
655 
656  inline const int8_t* get_rowwise_ptr(const int8_t* buff,
657  const size_t entry_idx) const {
658  return buff + entry_idx * row_bytes_;
659  }
660 
661  std::vector<std::vector<TargetOffsets>> offsets_for_storage_;
662 
664 
665  // Row-wise iteration
666  const size_t row_bytes_;
667  const size_t key_width_;
669  };
670 
672  ColumnWiseTargetAccessor(const ResultSet* result_set) : result_set_(result_set) {
673  initializeOffsetsForStorage();
674  }
675 
676  void initializeOffsetsForStorage();
677 
678  InternalTargetValue getColumnInternal(
679  const int8_t* buff,
680  const size_t entry_idx,
681  const size_t target_logical_idx,
682  const StorageLookupResult& storage_lookup_result) const;
683 
684  std::vector<std::vector<TargetOffsets>> offsets_for_storage_;
685 
687  };
688 
689  template <typename BUFFER_ITERATOR_TYPE>
691  using BufferIteratorType = BUFFER_ITERATOR_TYPE;
692 
693  ResultSetComparator(const std::list<Analyzer::OrderEntry>& order_entries,
694  const bool use_heap,
695  const ResultSet* result_set)
696  : order_entries_(order_entries)
697  , use_heap_(use_heap)
698  , result_set_(result_set)
699  , buffer_itr_(result_set) {}
700 
701  bool operator()(const uint32_t lhs, const uint32_t rhs) const;
702 
703  // TODO(adb): make order_entries_ a pointer
704  const std::list<Analyzer::OrderEntry> order_entries_;
705  const bool use_heap_;
708  };
709 
710  std::function<bool(const uint32_t, const uint32_t)> createComparator(
711  const std::list<Analyzer::OrderEntry>& order_entries,
712  const bool use_heap) {
714  column_wise_comparator_ =
715  std::make_unique<ResultSetComparator<ColumnWiseTargetAccessor>>(
716  order_entries, use_heap, this);
717  return [this](const uint32_t lhs, const uint32_t rhs) -> bool {
718  return (*this->column_wise_comparator_)(lhs, rhs);
719  };
720  } else {
721  row_wise_comparator_ = std::make_unique<ResultSetComparator<RowWiseTargetAccessor>>(
722  order_entries, use_heap, this);
723  return [this](const uint32_t lhs, const uint32_t rhs) -> bool {
724  return (*this->row_wise_comparator_)(lhs, rhs);
725  };
726  }
727  }
728 
729  static void topPermutation(
730  std::vector<uint32_t>& to_sort,
731  const size_t n,
732  const std::function<bool(const uint32_t, const uint32_t)> compare);
733 
734  void sortPermutation(const std::function<bool(const uint32_t, const uint32_t)> compare);
735 
736  std::vector<uint32_t> initPermutationBuffer(const size_t start, const size_t step);
737 
738  void parallelTop(const std::list<Analyzer::OrderEntry>& order_entries,
739  const size_t top_n);
740 
741  void baselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
742  const size_t top_n);
743 
744  void doBaselineSort(const ExecutorDeviceType device_type,
745  const std::list<Analyzer::OrderEntry>& order_entries,
746  const size_t top_n);
747 
748  bool canUseFastBaselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
749  const size_t top_n);
750 
751  Data_Namespace::DataMgr* getDataManager() const;
752 
753  int getGpuCount() const;
754 
755  void serializeProjection(TSerializedRows& serialized_rows) const;
756  void serializeVarlenAggColumn(int8_t* buf,
757  std::vector<std::string>& varlen_bufer) const;
758 
759  void serializeCountDistinctColumns(TSerializedRows&) const;
760 
761  void unserializeCountDistinctColumns(const TSerializedRows&);
762 
763  void fixupCountDistinctPointers();
764 
765  using BufferSet = std::set<int64_t>;
766  void create_active_buffer_set(BufferSet& count_distinct_active_buffer_set) const;
767 
768  int64_t getDistinctBufferRefFromBufferRowwise(int8_t* rowwise_target_ptr,
769  const TargetInfo& target_info) const;
770 
771  const std::vector<TargetInfo> targets_;
773  const int device_id_;
775  mutable std::unique_ptr<ResultSetStorage> storage_;
776  std::vector<std::unique_ptr<ResultSetStorage>> appended_storage_;
777  mutable size_t crt_row_buff_idx_;
778  mutable size_t fetched_so_far_;
779  size_t drop_first_;
780  size_t keep_first_;
781  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
782  std::vector<uint32_t> permutation_;
783  int64_t queue_time_ms_;
785  const Executor* executor_; // TODO(alex): remove
786 
787  std::list<std::shared_ptr<Chunk_NS::Chunk>> chunks_;
788  std::vector<std::shared_ptr<std::list<ChunkIter>>> chunk_iters_;
789  // TODO(miyu): refine by using one buffer and
790  // setting offset instead of ptr in group by buffer.
791  std::vector<std::vector<int8_t>> literal_buffers_;
792  const std::vector<ColumnLazyFetchInfo> lazy_fetch_info_;
793  std::vector<std::vector<std::vector<const int8_t*>>> col_buffers_;
794  std::vector<std::vector<std::vector<int64_t>>> frag_offsets_;
795  std::vector<std::vector<int64_t>> consistent_frag_sizes_;
796 
797  const std::shared_ptr<const Analyzer::Estimator> estimator_;
799  mutable int8_t* host_estimator_buffer_;
801 
802  // only used by serialization
803  using SerializedVarlenBufferStorage = std::vector<std::string>;
804 
805  std::vector<SerializedVarlenBufferStorage> serialized_varlen_buffer_;
807  std::string explanation_;
808  const bool just_explain_;
809  mutable std::atomic<ssize_t> cached_row_count_;
810  mutable std::mutex row_iteration_mutex_;
811 
812  // only used by geo
814 
815  // comparators used for sorting (note that the actual compare function is accessed using
816  // the createComparator method)
817  std::unique_ptr<ResultSetComparator<RowWiseTargetAccessor>> row_wise_comparator_;
818  std::unique_ptr<ResultSetComparator<ColumnWiseTargetAccessor>> column_wise_comparator_;
819 
820  friend class ResultSetManager;
821  friend class ResultSetRowIterator;
822 };
823 
826  return {};
827  }
828 
829  if (result_set_->just_explain_) {
830  return {result_set_->explanation_};
831  }
832 
833  return result_set_->getRowAt(
835 }
836 
838  if (!result_set_->storage_ && !result_set_->just_explain_) {
839  global_entry_idx_valid_ = false;
840  } else if (result_set_->just_explain_) {
842  fetched_so_far_ = 1;
843  } else {
844  result_set_->advanceCursorToNextEntry(*this);
845  }
846  return *this;
847 }
848 
850  public:
851  ResultSet* reduce(std::vector<ResultSet*>&);
852 
853  std::shared_ptr<ResultSet> getOwnResultSet();
854 
855  void rewriteVarlenAggregates(ResultSet*);
856 
857  private:
858  std::shared_ptr<ResultSet> rs_;
859 };
860 
861 class RowSortException : public std::runtime_error {
862  public:
863  RowSortException(const std::string& cause) : std::runtime_error(cause) {}
864 };
865 
866 int64_t lazy_decode(const ColumnLazyFetchInfo& col_lazy_fetch,
867  const int8_t* byte_stream,
868  const int64_t pos);
869 
870 void fill_empty_key(void* key_ptr, const size_t key_count, const size_t key_width);
871 
872 bool can_use_parallel_algorithms(const ResultSet& rows);
873 
874 bool use_parallel_algorithms(const ResultSet& rows);
875 
876 int8_t get_width_for_slot(const size_t target_slot_idx,
877  const bool float_argument_input,
878  const QueryMemoryDescriptor& query_mem_desc);
879 
880 size_t get_byteoff_of_slot(const size_t slot_idx,
881  const QueryMemoryDescriptor& query_mem_desc);
882 
883 using GroupValueInfo = std::pair<int64_t*, bool>;
884 
885 GroupValueInfo get_group_value_reduction(int64_t* groups_buffer,
886  const uint32_t groups_buffer_entry_count,
887  const int64_t* key,
888  const uint32_t key_count,
889  const size_t key_width,
890  const QueryMemoryDescriptor& query_mem_desc,
891  const int64_t* that_buff_i64,
892  const size_t that_entry_idx,
893  const size_t that_entry_count,
894  const uint32_t row_size_quad);
895 
896 #endif // QUERYENGINE_RESULTSET_H
std::pair< int64_t *, bool > GroupValueInfo
Definition: ResultSet.h:883
void setSeparateVarlenStorageValid(const bool val)
Definition: ResultSet.h:516
const std::list< Analyzer::OrderEntry > order_entries_
Definition: ResultSet.h:704
void setGeoReturnType(const GeoReturnType val)
Definition: ResultSet.h:494
const SQLTypeInfo type
Definition: ResultSet.h:232
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
std::mutex row_iteration_mutex_
Definition: ResultSet.h:810
const int8_t * ptr1
Definition: ResultSet.h:632
const size_t compact_sz2
Definition: ResultSet.h:635
int64_t lazy_decode(const ColumnLazyFetchInfo &col_lazy_fetch, const int8_t *byte_stream, const int64_t pos)
void holdChunks(const std::list< std::shared_ptr< Chunk_NS::Chunk >> &chunks)
Definition: ResultSet.h:457
GeoReturnType getGeoReturnType() const
Definition: ResultSet.h:493
ResultSetRowIterator rowIterator(bool translate_strings, bool decimal_to_double) const
Definition: ResultSet.h:345
std::unique_ptr< ResultSetComparator< ColumnWiseTargetAccessor > > column_wise_comparator_
Definition: ResultSet.h:818
std::ptrdiff_t difference_type
Definition: ResultSet.h:245
int8_t * estimator_buffer_
Definition: ResultSet.h:798
const std::vector< TargetInfo > targets_
Definition: ResultSet.h:203
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
void setEntryCount(const size_t val)
std::vector< std::unique_ptr< ResultSetStorage > > appended_storage_
Definition: ResultSet.h:776
std::vector< int64_t > target_init_vals_
Definition: ResultSet.h:207
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
GeoReturnType geo_return_type_
Definition: ResultSet.h:813
ExecutorDeviceType
const BufferIteratorType buffer_itr_
Definition: ResultSet.h:707
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
bool operator==(const ResultSetRowIterator &other) const
Definition: ResultSet.h:250
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
Utility functions for easy access to the result set buffers.
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code) const
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:858
std::vector< std::string > SerializedVarlenBufferStorage
Definition: ResultSet.h:803
ResultSetRowIterator(const ResultSet *rs, bool translate_strings, bool decimal_to_double)
Definition: ResultSet.h:280
const Executor * executor_
Definition: ResultSet.h:785
size_t get_byteoff_of_slot(const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc)
QueryMemoryDescriptor query_mem_desc_
Definition: ResultSet.h:774
void addCountDistinctSetPointerMapping(const int64_t remote_ptr, const int64_t ptr)
Definition: ResultSet.cpp:841
ResultSetStorage(const std::vector< TargetInfo > &targets, const QueryMemoryDescriptor &query_mem_desc, int8_t *buff, const bool buff_is_provided)
Definition: ResultSet.cpp:44
std::unique_ptr< ResultSetStorage > storage_
Definition: ResultSet.h:775
boost::variant< GeoPointTargetValue, GeoLineStringTargetValue, GeoPolyTargetValue, GeoMultiPolyTargetValue > GeoTargetValue
Definition: TargetValue.h:161
ResultSetRowIterator(const ResultSet *rs)
Definition: ResultSet.h:291
size_t keep_first_
Definition: ResultSet.h:780
std::vector< std::shared_ptr< std::list< ChunkIter > > > chunk_iters_
Definition: ResultSet.h:788
const int64_t const uint32_t groups_buffer_entry_count
std::vector< SerializedVarlenBufferStorage > serialized_varlen_buffer_
Definition: ResultSet.h:805
const size_t compact_sz1
Definition: ResultSet.h:633
void initializeColWise() const
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:873
friend class ResultSet
Definition: ResultSet.h:215
const bool just_explain_
Definition: ResultSet.h:808
void reduceOneEntryNoCollisionsRowWise(const size_t i, int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< uint32_t > permutation_
Definition: ResultSet.h:782
std::unordered_map< int64_t, int64_t > count_distinct_sets_mapping_
Definition: ResultSet.h:213
const int local_col_id
Definition: ResultSet.h:231
const size_t key_bytes_with_padding_
Definition: ResultSet.h:668
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
const ResultSet * result_set_
Definition: ResultSet.h:663
const ResultSet * result_set_
Definition: ResultSet.h:272
std::unique_ptr< ResultSetComparator< RowWiseTargetAccessor > > row_wise_comparator_
Definition: ResultSet.h:817
ColumnWiseTargetAccessor(const ResultSet *result_set)
Definition: ResultSet.h:672
std::input_iterator_tag iterator_category
Definition: ResultSet.h:248
size_t global_entry_idx_
Definition: ResultSet.h:274
const bool buff_is_provided_
Definition: ResultSet.h:206
const std::vector< TargetInfo > targets_
Definition: ResultSet.h:771
const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Definition: ResultSet.h:781
bool operator!=(const ResultSetRowIterator &other) const
Definition: ResultSet.h:254
ResultSetRowIterator rowIterator(size_t from_logical_index, bool translate_strings, bool decimal_to_double) const
Definition: ResultSet.h:330
size_t drop_first_
Definition: ResultSet.h:779
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
const int8_t * ptr2
Definition: ResultSet.h:634
std::list< std::shared_ptr< Chunk_NS::Chunk > > chunks_
Definition: ResultSet.h:787
const ResultSet * result_set_
Definition: ResultSet.h:706
std::vector< std::vector< int8_t > > literal_buffers_
Definition: ResultSet.h:791
std::vector< std::vector< TargetOffsets > > offsets_for_storage_
Definition: ResultSet.h:684
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
void updateEntryCount(const size_t new_entry_count)
Definition: ResultSet.h:108
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
std::vector< TargetValue > & reference
Definition: ResultSet.h:247
ResultSetRowIterator & operator++(void)
Definition: ResultSet.h:837
void fillOneEntryColWise(const std::vector< int64_t > &entry)
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
Definition: File.cpp:136
std::atomic< ssize_t > cached_row_count_
Definition: ResultSet.h:809
const std::vector< ColumnLazyFetchInfo > lazy_fetch_info_
Definition: ResultSet.h:792
RowWiseTargetAccessor(const ResultSet *result_set)
Definition: ResultSet.h:639
RowSortException(const std::string &cause)
Definition: ResultSet.h:863
Data_Namespace::DataMgr * data_mgr_
Definition: ResultSet.h:800
void fillOneEntry(const std::vector< int64_t > &entry)
Definition: ResultSet.h:446
const int64_t value
Definition: ResultSet.h:236
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void updateStorageEntryCount(const size_t new_entry_count)
Definition: ResultSet.h:358
ResultSetRowIterator operator++(int)
Definition: ResultSet.h:258
const std::shared_ptr< const Analyzer::Estimator > estimator_
Definition: ResultSet.h:797
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
size_t getCurrentRowBufferIndex() const
Definition: ResultSet.h:264
void holdChunkIterators(const std::shared_ptr< std::list< ChunkIter >> chunk_iters)
Definition: ResultSet.h:460
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
int8_t * buff_
Definition: ResultSet.h:205
ResultSetComparator(const std::list< Analyzer::OrderEntry > &order_entries, const bool use_heap, const ResultSet *result_set)
Definition: ResultSet.h:693
std::vector< TargetValue > value_type
Definition: ResultSet.h:244
std::vector< std::vector< std::vector< const int8_t * > > > col_buffers_
Definition: ResultSet.h:793
const bool valid
Definition: ResultSet.h:237
int64_t queue_time_ms_
Definition: ResultSet.h:783
std::string explanation_
Definition: ResultSet.h:807
const bool is_lazily_fetched
Definition: ResultSet.h:230
std::vector< std::vector< int64_t > > consistent_frag_sizes_
Definition: ResultSet.h:795
int8_t * host_estimator_buffer_
Definition: ResultSet.h:799
bool can_use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:869
const ExecutorDeviceType device_type_
Definition: ResultSet.h:772
void holdLiterals(std::vector< int8_t > &literal_buff)
Definition: ResultSet.h:463
Executor(const int db_id, const size_t block_size_x, const size_t grid_size_x, const std::string &debug_dir, const std::string &debug_file, ::QueryRenderer::QueryRenderManager *render_manager)
Definition: Execute.cpp:101
const int8_t * get_rowwise_ptr(const int8_t *buff, const size_t entry_idx) const
Definition: ResultSet.h:656
#define CHECK(condition)
Definition: Logger.h:187
size_t fetched_so_far_
Definition: ResultSet.h:778
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
size_t crt_row_buff_idx_
Definition: ResultSet.h:777
Estimators to be used when precise cardinality isn&#39;t useful.
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
std::vector< std::vector< std::vector< int64_t > > > frag_offsets_
Definition: ResultSet.h:794
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer) const
bool separate_varlen_storage_valid_
Definition: ResultSet.h:806
const std::vector< ColumnLazyFetchInfo > & getLazyFetchInfo() const
Definition: ResultSet.h:512
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
Definition: TargetValue.h:167
void initializeRowWise() const
std::vector< TargetValue > * pointer
Definition: ResultSet.h:246
int8_t * getUnderlyingBuffer() const
Definition: ResultSet.cpp:77
const bool isPermutationBufferEmpty() const
Definition: ResultSet.h:472
std::shared_ptr< RowSetMemoryOwner > getRowSetMemOwner() const
Definition: ResultSet.h:467
std::set< int64_t > BufferSet
Definition: ResultSet.h:765
std::function< bool(const uint32_t, const uint32_t)> createComparator(const std::list< Analyzer::OrderEntry > &order_entries, const bool use_heap)
Definition: ResultSet.h:710
BUFFER_ITERATOR_TYPE BufferIteratorType
Definition: ResultSet.h:691
QueryDescriptionType getQueryDescriptionType() const
#define ALWAYS_INLINE
size_t crt_row_buff_idx_
Definition: ResultSet.h:273
std::vector< std::vector< TargetOffsets > > offsets_for_storage_
Definition: ResultSet.h:661
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
bool global_entry_idx_valid_
Definition: ResultSet.h:275
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
int64_t mappedPtr(const int64_t) const
Definition: ResultSet.cpp:847
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
bool isFastColumnarConversionPossible() const
Definition: ResultSet.h:507
const ResultSetStorage * storage_ptr
Definition: ResultSet.h:605
value_type operator*() const
Definition: ResultSet.h:824
QueryMemoryDescriptor query_mem_desc_
Definition: ResultSet.h:204
int64_t render_time_ms_
Definition: ResultSet.h:784
boost::variant< GeoPointTargetValuePtr, GeoLineStringTargetValuePtr, GeoPolyTargetValuePtr, GeoMultiPolyTargetValuePtr > GeoTargetValuePtr
Definition: TargetValue.h:165
const int device_id_
Definition: ResultSet.h:773