OmniSciDB  b28c0d5765
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSet.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "ResultSet.h"
26 #include "Execute.h"
27 #include "GpuMemUtils.h"
28 #include "InPlaceSort.h"
31 #include "RelAlgExecutionUnit.h"
32 #include "RuntimeFunctions.h"
33 #include "Shared/Intervals.h"
34 #include "Shared/SqlTypesLayout.h"
35 #include "Shared/checked_alloc.h"
36 #include "Shared/likely.h"
37 #include "Shared/thread_count.h"
38 #include "Shared/threading.h"
39 
40 #include <tbb/parallel_for.h>
41 
42 #include <algorithm>
43 #include <atomic>
44 #include <bitset>
45 #include <functional>
46 #include <future>
47 #include <numeric>
48 
49 size_t g_parallel_top_min = 100e3;
50 size_t g_parallel_top_max = 20e6; // In effect only with g_enable_watchdog.
51 size_t g_streaming_topn_max = 100e3;
52 constexpr int64_t uninitialized_cached_row_count{-1};
53 
54 void ResultSet::keepFirstN(const size_t n) {
55  invalidateCachedRowCount();
56  keep_first_ = n;
57 }
58 
59 void ResultSet::dropFirstN(const size_t n) {
60  invalidateCachedRowCount();
61  drop_first_ = n;
62 }
63 
64 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
65  const ExecutorDeviceType device_type,
67  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
68  const Catalog_Namespace::Catalog* catalog,
69  const unsigned block_size,
70  const unsigned grid_size)
71  : targets_(targets)
72  , device_type_(device_type)
73  , device_id_(-1)
74  , query_mem_desc_(query_mem_desc)
75  , crt_row_buff_idx_(0)
76  , fetched_so_far_(0)
77  , drop_first_(0)
78  , keep_first_(0)
79  , row_set_mem_owner_(row_set_mem_owner)
80  , catalog_(catalog)
81  , block_size_(block_size)
82  , grid_size_(grid_size)
83  , data_mgr_(nullptr)
84  , separate_varlen_storage_valid_(false)
85  , just_explain_(false)
86  , for_validation_only_(false)
87  , cached_row_count_(uninitialized_cached_row_count)
88  , geo_return_type_(GeoReturnType::WktString)
89  , cached_(false)
90  , query_exec_time_(0)
91  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
92  , can_use_speculative_top_n_sort(std::nullopt) {}
93 
94 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
95  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
96  const std::vector<std::vector<const int8_t*>>& col_buffers,
97  const std::vector<std::vector<int64_t>>& frag_offsets,
98  const std::vector<int64_t>& consistent_frag_sizes,
99  const ExecutorDeviceType device_type,
100  const int device_id,
102  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
103  const Catalog_Namespace::Catalog* catalog,
104  const unsigned block_size,
105  const unsigned grid_size)
106  : targets_(targets)
107  , device_type_(device_type)
108  , device_id_(device_id)
109  , query_mem_desc_(query_mem_desc)
110  , crt_row_buff_idx_(0)
111  , fetched_so_far_(0)
112  , drop_first_(0)
113  , keep_first_(0)
114  , row_set_mem_owner_(row_set_mem_owner)
115  , catalog_(catalog)
116  , block_size_(block_size)
117  , grid_size_(grid_size)
118  , lazy_fetch_info_(lazy_fetch_info)
119  , col_buffers_{col_buffers}
120  , frag_offsets_{frag_offsets}
121  , consistent_frag_sizes_{consistent_frag_sizes}
122  , data_mgr_(nullptr)
123  , separate_varlen_storage_valid_(false)
124  , just_explain_(false)
125  , for_validation_only_(false)
126  , cached_row_count_(uninitialized_cached_row_count)
127  , geo_return_type_(GeoReturnType::WktString)
128  , cached_(false)
129  , query_exec_time_(0)
130  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
131  , can_use_speculative_top_n_sort(std::nullopt) {}
132 
133 ResultSet::ResultSet(const std::shared_ptr<const Analyzer::Estimator> estimator,
134  const ExecutorDeviceType device_type,
135  const int device_id,
136  Data_Namespace::DataMgr* data_mgr)
137  : device_type_(device_type)
138  , device_id_(device_id)
139  , query_mem_desc_{}
140  , crt_row_buff_idx_(0)
141  , estimator_(estimator)
142  , data_mgr_(data_mgr)
143  , separate_varlen_storage_valid_(false)
144  , just_explain_(false)
145  , for_validation_only_(false)
146  , cached_row_count_(uninitialized_cached_row_count)
147  , geo_return_type_(GeoReturnType::WktString)
148  , cached_(false)
149  , query_exec_time_(0)
150  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
151  , can_use_speculative_top_n_sort(std::nullopt) {
152  if (device_type == ExecutorDeviceType::GPU) {
153  device_estimator_buffer_ = CudaAllocator::allocGpuAbstractBuffer(
154  data_mgr_, estimator_->getBufferSize(), device_id_);
155  data_mgr->getCudaMgr()->zeroDeviceMem(device_estimator_buffer_->getMemoryPtr(),
156  estimator_->getBufferSize(),
157  device_id_,
159  } else {
160  host_estimator_buffer_ =
161  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
162  }
163 }
164 
165 ResultSet::ResultSet(const std::string& explanation)
166  : device_type_(ExecutorDeviceType::CPU)
167  , device_id_(-1)
168  , fetched_so_far_(0)
169  , separate_varlen_storage_valid_(false)
170  , explanation_(explanation)
171  , just_explain_(true)
172  , for_validation_only_(false)
173  , cached_row_count_(uninitialized_cached_row_count)
174  , geo_return_type_(GeoReturnType::WktString)
175  , cached_(false)
176  , query_exec_time_(0)
177  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
178  , can_use_speculative_top_n_sort(std::nullopt) {}
179 
180 ResultSet::ResultSet(int64_t queue_time_ms,
181  int64_t render_time_ms,
182  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
183  : device_type_(ExecutorDeviceType::CPU)
184  , device_id_(-1)
185  , fetched_so_far_(0)
186  , row_set_mem_owner_(row_set_mem_owner)
187  , timings_(QueryExecutionTimings{queue_time_ms, render_time_ms, 0, 0})
188  , separate_varlen_storage_valid_(false)
189  , just_explain_(true)
190  , for_validation_only_(false)
191  , cached_row_count_(uninitialized_cached_row_count)
192  , geo_return_type_(GeoReturnType::WktString)
193  , cached_(false)
194  , query_exec_time_(0)
195  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
196  , can_use_speculative_top_n_sort(std::nullopt) {}
197 
199  if (storage_) {
200  if (!storage_->buff_is_provided_) {
201  CHECK(storage_->getUnderlyingBuffer());
202  free(storage_->getUnderlyingBuffer());
203  }
204  }
205  for (auto& storage : appended_storage_) {
206  if (storage && !storage->buff_is_provided_) {
207  free(storage->getUnderlyingBuffer());
208  }
209  }
210  if (host_estimator_buffer_) {
211  CHECK(device_type_ == ExecutorDeviceType::CPU || device_estimator_buffer_);
212  free(host_estimator_buffer_);
213  }
214  if (device_estimator_buffer_) {
215  CHECK(data_mgr_);
216  data_mgr_->free(device_estimator_buffer_);
217  }
218 }
219 
220 std::string ResultSet::summaryToString() const {
221  std::ostringstream oss;
222  oss << "Result Set Info" << std::endl;
223  oss << "\tLayout: " << query_mem_desc_.queryDescTypeToString() << std::endl;
224  oss << "\tColumns: " << colCount() << std::endl;
225  oss << "\tRows: " << rowCount() << std::endl;
226  oss << "\tEntry count: " << entryCount() << std::endl;
227  const std::string is_empty = isEmpty() ? "True" : "False";
228  oss << "\tIs empty: " << is_empty << std::endl;
229  const std::string did_output_columnar = didOutputColumnar() ? "True" : "False;";
230  oss << "\tColumnar: " << did_output_columnar << std::endl;
231  oss << "\tLazy-fetched columns: " << getNumColumnsLazyFetched() << std::endl;
232  const std::string is_direct_columnar_conversion_possible =
233  isDirectColumnarConversionPossible() ? "True" : "False";
234  oss << "\tDirect columnar conversion possible: "
235  << is_direct_columnar_conversion_possible << std::endl;
236 
237  size_t num_columns_zero_copy_columnarizable{0};
238  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
239  if (isZeroCopyColumnarConversionPossible(target_idx)) {
240  num_columns_zero_copy_columnarizable++;
241  }
242  }
243  oss << "\tZero-copy columnar conversion columns: "
244  << num_columns_zero_copy_columnarizable << std::endl;
245 
246  oss << "\tPermutation size: " << permutation_.size() << std::endl;
247  oss << "\tLimit: " << keep_first_ << std::endl;
248  oss << "\tOffset: " << drop_first_ << std::endl;
249  return oss.str();
250 }
251 
253  return device_type_;
254 }
255 
257  CHECK(!storage_);
258  CHECK(row_set_mem_owner_);
259  auto buff = row_set_mem_owner_->allocate(
260  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
261  storage_.reset(
262  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
263  return storage_.get();
264 }
265 
267  int8_t* buff,
268  const std::vector<int64_t>& target_init_vals,
269  std::shared_ptr<VarlenOutputInfo> varlen_output_info) const {
270  CHECK(buff);
271  CHECK(!storage_);
272  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, true));
273  // TODO: add both to the constructor
274  storage_->target_init_vals_ = target_init_vals;
275  if (varlen_output_info) {
276  storage_->varlen_output_info_ = varlen_output_info;
277  }
278  return storage_.get();
279 }
280 
282  const std::vector<int64_t>& target_init_vals) const {
283  CHECK(!storage_);
284  CHECK(row_set_mem_owner_);
285  auto buff = row_set_mem_owner_->allocate(
286  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
287  storage_.reset(
288  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
289  storage_->target_init_vals_ = target_init_vals;
290  return storage_.get();
291 }
292 
294  if (crt_row_buff_idx_ == 0) {
295  throw std::runtime_error("current row buffer iteration index is undefined");
296  }
297  return crt_row_buff_idx_ - 1;
298 }
299 
300 // Note: that.appended_storage_ does not get appended to this.
301 void ResultSet::append(ResultSet& that) {
302  invalidateCachedRowCount();
303  if (!that.storage_) {
304  return;
305  }
306  appended_storage_.push_back(std::move(that.storage_));
307  query_mem_desc_.setEntryCount(
308  query_mem_desc_.getEntryCount() +
309  appended_storage_.back()->query_mem_desc_.getEntryCount());
310  chunks_.insert(chunks_.end(), that.chunks_.begin(), that.chunks_.end());
311  col_buffers_.insert(
312  col_buffers_.end(), that.col_buffers_.begin(), that.col_buffers_.end());
313  frag_offsets_.insert(
314  frag_offsets_.end(), that.frag_offsets_.begin(), that.frag_offsets_.end());
315  consistent_frag_sizes_.insert(consistent_frag_sizes_.end(),
316  that.consistent_frag_sizes_.begin(),
317  that.consistent_frag_sizes_.end());
318  chunk_iters_.insert(
319  chunk_iters_.end(), that.chunk_iters_.begin(), that.chunk_iters_.end());
320  if (separate_varlen_storage_valid_) {
321  CHECK(that.separate_varlen_storage_valid_);
322  serialized_varlen_buffer_.insert(serialized_varlen_buffer_.end(),
323  that.serialized_varlen_buffer_.begin(),
324  that.serialized_varlen_buffer_.end());
325  }
326  for (auto& buff : that.literal_buffers_) {
327  literal_buffers_.push_back(std::move(buff));
328  }
329 }
330 
332  auto timer = DEBUG_TIMER(__func__);
333  if (!storage_) {
334  return nullptr;
335  }
336 
337  auto executor = getExecutor();
338  CHECK(executor);
339  ResultSetPtr copied_rs = std::make_shared<ResultSet>(targets_,
340  device_type_,
341  query_mem_desc_,
342  row_set_mem_owner_,
343  executor->getCatalog(),
344  executor->blockSize(),
345  executor->gridSize());
346 
347  auto allocate_and_copy_storage =
348  [&](const ResultSetStorage* prev_storage) -> std::unique_ptr<ResultSetStorage> {
349  const auto& prev_qmd = prev_storage->query_mem_desc_;
350  const auto storage_size = prev_qmd.getBufferSizeBytes(device_type_);
351  auto buff = row_set_mem_owner_->allocate(storage_size, /*thread_idx=*/0);
352  std::unique_ptr<ResultSetStorage> new_storage;
353  new_storage.reset(new ResultSetStorage(
354  prev_storage->targets_, prev_qmd, buff, /*buff_is_provided=*/true));
355  new_storage->target_init_vals_ = prev_storage->target_init_vals_;
356  if (prev_storage->varlen_output_info_) {
357  new_storage->varlen_output_info_ = prev_storage->varlen_output_info_;
358  }
359  memcpy(new_storage->buff_, prev_storage->buff_, storage_size);
360  new_storage->query_mem_desc_ = prev_qmd;
361  return new_storage;
362  };
363 
364  copied_rs->storage_ = allocate_and_copy_storage(storage_.get());
365  if (!appended_storage_.empty()) {
366  for (const auto& storage : appended_storage_) {
367  copied_rs->appended_storage_.push_back(allocate_and_copy_storage(storage.get()));
368  }
369  }
370  std::copy(chunks_.begin(), chunks_.end(), std::back_inserter(copied_rs->chunks_));
371  std::copy(chunk_iters_.begin(),
372  chunk_iters_.end(),
373  std::back_inserter(copied_rs->chunk_iters_));
374  std::copy(col_buffers_.begin(),
375  col_buffers_.end(),
376  std::back_inserter(copied_rs->col_buffers_));
377  std::copy(frag_offsets_.begin(),
378  frag_offsets_.end(),
379  std::back_inserter(copied_rs->frag_offsets_));
380  std::copy(consistent_frag_sizes_.begin(),
381  consistent_frag_sizes_.end(),
382  std::back_inserter(copied_rs->consistent_frag_sizes_));
383  if (separate_varlen_storage_valid_) {
384  std::copy(serialized_varlen_buffer_.begin(),
385  serialized_varlen_buffer_.end(),
386  std::back_inserter(copied_rs->serialized_varlen_buffer_));
387  }
388  std::copy(literal_buffers_.begin(),
389  literal_buffers_.end(),
390  std::back_inserter(copied_rs->literal_buffers_));
391  std::copy(lazy_fetch_info_.begin(),
392  lazy_fetch_info_.end(),
393  std::back_inserter(copied_rs->lazy_fetch_info_));
394 
395  copied_rs->permutation_ = permutation_;
396  copied_rs->drop_first_ = drop_first_;
397  copied_rs->keep_first_ = keep_first_;
398  copied_rs->separate_varlen_storage_valid_ = separate_varlen_storage_valid_;
399  copied_rs->query_exec_time_ = query_exec_time_;
400  copied_rs->input_table_keys_ = input_table_keys_;
401  copied_rs->target_meta_info_ = target_meta_info_;
402  copied_rs->geo_return_type_ = geo_return_type_;
403  copied_rs->query_plan_ = query_plan_;
404  if (can_use_speculative_top_n_sort) {
405  copied_rs->can_use_speculative_top_n_sort = can_use_speculative_top_n_sort;
406  }
407 
408  return copied_rs;
409 }
410 
412  return storage_.get();
413 }
414 
415 size_t ResultSet::colCount() const {
416  return just_explain_ ? 1 : targets_.size();
417 }
418 
419 SQLTypeInfo ResultSet::getColType(const size_t col_idx) const {
420  if (just_explain_) {
421  return SQLTypeInfo(kTEXT, false);
422  }
423  CHECK_LT(col_idx, targets_.size());
424  return targets_[col_idx].agg_kind == kAVG ? SQLTypeInfo(kDOUBLE, false)
425  : targets_[col_idx].sql_type;
426 }
427 
429  constexpr bool with_generation = true;
430  return catalog_ ? row_set_mem_owner_->getOrAddStringDictProxy(
431  dict_id, with_generation, catalog_)
432  : row_set_mem_owner_->getStringDictProxy(dict_id);
433 }
434 
437  int64_t const null_int_;
438 
439  public:
440  CellCallback(StringDictionaryProxy::IdMap&& id_map, int64_t const null_int)
441  : id_map_(std::move(id_map)), null_int_(null_int) {}
442  void operator()(int8_t const* const cell_ptr) const {
443  using StringId = int32_t;
444  StringId* const string_id_ptr =
445  const_cast<StringId*>(reinterpret_cast<StringId const*>(cell_ptr));
446  if (*string_id_ptr != null_int_) {
447  *string_id_ptr = id_map_[*string_id_ptr];
448  }
449  }
450 };
451 
452 // Update any dictionary-encoded targets within storage_ with the corresponding
453 // dictionary in the given targets parameter, if their comp_param (dictionary) differs.
454 // This may modify both the storage_ values and storage_ targets.
455 // Does not iterate through appended_storage_.
456 // Iterate over targets starting at index target_idx.
457 void ResultSet::translateDictEncodedColumns(std::vector<TargetInfo> const& targets,
458  size_t const start_idx) {
459  if (storage_) {
460  CHECK_EQ(targets.size(), storage_->targets_.size());
461  RowIterationState state;
462  for (size_t target_idx = start_idx; target_idx < targets.size(); ++target_idx) {
463  auto const& type_lhs = targets[target_idx].sql_type;
464  if (type_lhs.is_dict_encoded_string()) {
465  auto& type_rhs =
466  const_cast<SQLTypeInfo&>(storage_->targets_[target_idx].sql_type);
467  CHECK(type_rhs.is_dict_encoded_string());
468  if (type_lhs.get_comp_param() != type_rhs.get_comp_param()) {
469  auto* const sdp_lhs = getStringDictionaryProxy(type_lhs.get_comp_param());
470  CHECK(sdp_lhs);
471  auto const* const sdp_rhs = getStringDictionaryProxy(type_rhs.get_comp_param());
472  CHECK(sdp_rhs);
473  state.cur_target_idx_ = target_idx;
474  CellCallback const translate_string_ids(sdp_lhs->transientUnion(*sdp_rhs),
475  inline_int_null_val(type_rhs));
476  eachCellInColumn(state, translate_string_ids);
477  type_rhs.set_comp_param(type_lhs.get_comp_param());
478  }
479  }
480  }
481  }
482 }
483 
484 // For each cell in column target_idx, callback func with pointer to datum.
485 // This currently assumes the column type is a dictionary-encoded string, but this logic
486 // can be generalized to other types.
487 void ResultSet::eachCellInColumn(RowIterationState& state, CellCallback const& func) {
488  size_t const target_idx = state.cur_target_idx_;
489  QueryMemoryDescriptor& storage_qmd = storage_->query_mem_desc_;
490  CHECK_LT(target_idx, lazy_fetch_info_.size());
491  auto& col_lazy_fetch = lazy_fetch_info_[target_idx];
492  CHECK(col_lazy_fetch.is_lazily_fetched);
493  int const target_size = storage_->targets_[target_idx].sql_type.get_size();
494  CHECK_LT(0, target_size) << storage_->targets_[target_idx].toString();
495  size_t const nrows = storage_->binSearchRowCount();
496  if (storage_qmd.didOutputColumnar()) {
497  // Logic based on ResultSet::ColumnWiseTargetAccessor::initializeOffsetsForStorage()
498  if (state.buf_ptr_ == nullptr) {
499  state.buf_ptr_ = get_cols_ptr(storage_->buff_, storage_qmd);
500  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
501  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
502  : query_mem_desc_.getEffectiveKeyWidth();
503  }
504  for (size_t j = state.prev_target_idx_; j < state.cur_target_idx_; ++j) {
505  size_t const next_target_idx = j + 1; // Set state to reflect next target_idx j+1
507  state.buf_ptr_, storage_qmd, state.agg_idx_);
508  auto const& next_agg_info = storage_->targets_[next_target_idx];
509  state.agg_idx_ =
510  advance_slot(state.agg_idx_, next_agg_info, separate_varlen_storage_valid_);
511  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
512  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
513  : query_mem_desc_.getEffectiveKeyWidth();
514  }
515  for (size_t i = 0; i < nrows; ++i) {
516  int8_t const* const pos_ptr = state.buf_ptr_ + i * state.compact_sz1_;
517  int64_t pos = read_int_from_buff(pos_ptr, target_size);
518  CHECK_GE(pos, 0);
519  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
520  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
521  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
522  func(col_frag + pos * target_size);
523  }
524  } else {
525  size_t const key_bytes_with_padding =
527  for (size_t i = 0; i < nrows; ++i) {
528  int8_t const* const keys_ptr = row_ptr_rowwise(storage_->buff_, storage_qmd, i);
529  int8_t const* const rowwise_target_ptr = keys_ptr + key_bytes_with_padding;
530  int64_t pos = *reinterpret_cast<int64_t const*>(rowwise_target_ptr);
531  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
532  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
533  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
534  func(col_frag + pos * target_size);
535  }
536  }
537 }
538 
539 namespace {
540 
541 size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset) {
542  if (total_row_count < offset) {
543  return 0;
544  }
545 
546  size_t total_truncated_row_count = total_row_count - offset;
547 
548  if (limit) {
549  return std::min(total_truncated_row_count, limit);
550  }
551 
552  return total_truncated_row_count;
553 }
554 
555 } // namespace
556 
557 size_t ResultSet::rowCountImpl(const bool force_parallel) const {
558  if (just_explain_) {
559  return 1;
560  }
561  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::TableFunction) {
562  return entryCount();
563  }
564  if (!permutation_.empty()) {
565  // keep_first_ corresponds to SQL LIMIT
566  // drop_first_ corresponds to SQL OFFSET
567  return get_truncated_row_count(permutation_.size(), keep_first_, drop_first_);
568  }
569  if (!storage_) {
570  return 0;
571  }
572  CHECK(permutation_.empty());
573  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection) {
574  return binSearchRowCount();
575  }
576 
577  constexpr size_t auto_parallel_row_count_threshold{20000UL};
578  if (force_parallel || entryCount() >= auto_parallel_row_count_threshold) {
579  return parallelRowCount();
580  }
581  std::lock_guard<std::mutex> lock(row_iteration_mutex_);
582  moveToBegin();
583  size_t row_count{0};
584  while (true) {
585  auto crt_row = getNextRowUnlocked(false, false);
586  if (crt_row.empty()) {
587  break;
588  }
589  ++row_count;
590  }
591  moveToBegin();
592  return row_count;
593 }
594 
595 size_t ResultSet::rowCount(const bool force_parallel) const {
596  // cached_row_count_ is atomic, so fetch it into a local variable first
597  // to avoid repeat fetches
598  const int64_t cached_row_count = cached_row_count_;
599  if (cached_row_count != uninitialized_cached_row_count) {
600  CHECK_GE(cached_row_count, 0);
601  return cached_row_count;
602  }
603  setCachedRowCount(rowCountImpl(force_parallel));
604  return cached_row_count_;
605 }
606 
608  cached_row_count_ = uninitialized_cached_row_count;
609 }
610 
611 void ResultSet::setCachedRowCount(const size_t row_count) const {
612  const int64_t signed_row_count = static_cast<int64_t>(row_count);
613  const int64_t old_cached_row_count = cached_row_count_.exchange(signed_row_count);
614  CHECK(old_cached_row_count == uninitialized_cached_row_count ||
615  old_cached_row_count == signed_row_count);
616 }
617 
619  if (!storage_) {
620  return 0;
621  }
622 
623  size_t row_count = storage_->binSearchRowCount();
624  for (auto& s : appended_storage_) {
625  row_count += s->binSearchRowCount();
626  }
627 
628  return get_truncated_row_count(row_count, getLimit(), drop_first_);
629 }
630 
632  using namespace threading;
633  auto execute_parallel_row_count = [this, query_id = logger::query_id()](
634  const blocked_range<size_t>& r,
635  size_t row_count) {
636  auto qid_scope_guard = logger::set_thread_local_query_id(query_id);
637  for (size_t i = r.begin(); i < r.end(); ++i) {
638  if (!isRowAtEmpty(i)) {
639  ++row_count;
640  }
641  }
642  return row_count;
643  };
644  const auto row_count = parallel_reduce(blocked_range<size_t>(0, entryCount()),
645  size_t(0),
646  execute_parallel_row_count,
647  std::plus<int>());
648  return get_truncated_row_count(row_count, getLimit(), drop_first_);
649 }
650 
651 bool ResultSet::isEmpty() const {
652  // To simplify this function and de-dup logic with ResultSet::rowCount()
653  // (mismatches between the two were causing bugs), we modified this function
654  // to simply fetch rowCount(). The potential downside of this approach is that
655  // in some cases more work will need to be done, as we can't just stop at the first row.
656  // Mitigating that for most cases is the following:
657  // 1) rowCount() is cached, so the logic for actually computing row counts will run only
658  // once
659  // per result set.
660  // 2) If the cache is empty (cached_row_count_ == -1), rowCount() will use parallel
661  // methods if deemed appropriate, which in many cases could be faster for a sparse
662  // large result set that single-threaded iteration from the beginning
663  // 3) Often where isEmpty() is needed, rowCount() is also needed. Since the first call
664  // to rowCount()
665  // will be cached, there is no extra overhead in these cases
666 
667  return rowCount() == size_t(0);
668 }
669 
671  return (!storage_ && !estimator_ && !just_explain_) || cached_row_count_ == 0;
672 }
673 
675  CHECK(storage_);
676  return storage_->query_mem_desc_;
677 }
678 
679 const std::vector<TargetInfo>& ResultSet::getTargetInfos() const {
680  return targets_;
681 }
682 
683 const std::vector<int64_t>& ResultSet::getTargetInitVals() const {
684  CHECK(storage_);
685  return storage_->target_init_vals_;
686 }
687 
689  CHECK(device_type_ == ExecutorDeviceType::GPU);
690  CHECK(device_estimator_buffer_);
691  return device_estimator_buffer_->getMemoryPtr();
692 }
693 
695  return host_estimator_buffer_;
696 }
697 
699  CHECK(device_type_ == ExecutorDeviceType::GPU);
700  CHECK(!host_estimator_buffer_);
701  CHECK_EQ(size_t(0), estimator_->getBufferSize() % sizeof(int64_t));
702  host_estimator_buffer_ =
703  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
704  CHECK(device_estimator_buffer_);
705  auto device_buffer_ptr = device_estimator_buffer_->getMemoryPtr();
706  auto allocator = std::make_unique<CudaAllocator>(
707  data_mgr_, device_id_, getQueryEngineCudaStreamForDevice(device_id_));
708  allocator->copyFromDevice(
709  host_estimator_buffer_, device_buffer_ptr, estimator_->getBufferSize());
710 }
711 
712 void ResultSet::setQueueTime(const int64_t queue_time) {
713  timings_.executor_queue_time = queue_time;
714 }
715 
716 void ResultSet::setKernelQueueTime(const int64_t kernel_queue_time) {
717  timings_.kernel_queue_time = kernel_queue_time;
718 }
719 
720 void ResultSet::addCompilationQueueTime(const int64_t compilation_queue_time) {
721  timings_.compilation_queue_time += compilation_queue_time;
722 }
723 
724 int64_t ResultSet::getQueueTime() const {
725  return timings_.executor_queue_time + timings_.kernel_queue_time +
726  timings_.compilation_queue_time;
727 }
728 
729 int64_t ResultSet::getRenderTime() const {
730  return timings_.render_time;
731 }
732 
734  crt_row_buff_idx_ = 0;
735  fetched_so_far_ = 0;
736 }
737 
739  return keep_first_ + drop_first_;
740 }
741 
742 bool ResultSet::isExplain() const {
743  return just_explain_;
744 }
745 
747  for_validation_only_ = true;
748 }
749 
751  return for_validation_only_;
752 }
753 
755  return device_id_;
756 }
757 
760  auto query_mem_desc_copy = query_mem_desc;
761  query_mem_desc_copy.resetGroupColWidths(
762  std::vector<int8_t>(query_mem_desc_copy.getGroupbyColCount(), 8));
763  if (query_mem_desc.didOutputColumnar()) {
764  return query_mem_desc_copy;
765  }
766  query_mem_desc_copy.alignPaddedSlots();
767  return query_mem_desc_copy;
768 }
769 
770 void ResultSet::sort(const std::list<Analyzer::OrderEntry>& order_entries,
771  size_t top_n,
772  const Executor* executor) {
773  auto timer = DEBUG_TIMER(__func__);
774 
775  if (!storage_) {
776  return;
777  }
778  invalidateCachedRowCount();
779  CHECK(!targets_.empty());
780 #ifdef HAVE_CUDA
781  if (canUseFastBaselineSort(order_entries, top_n)) {
782  baselineSort(order_entries, top_n, executor);
783  return;
784  }
785 #endif // HAVE_CUDA
786  if (query_mem_desc_.sortOnGpu()) {
787  try {
788  radixSortOnGpu(order_entries);
789  } catch (const OutOfMemory&) {
790  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
791  radixSortOnCpu(order_entries);
792  } catch (const std::bad_alloc&) {
793  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
794  radixSortOnCpu(order_entries);
795  }
796  return;
797  }
798  // This check isn't strictly required, but allows the index buffer to be 32-bit.
799  if (query_mem_desc_.getEntryCount() > std::numeric_limits<uint32_t>::max()) {
800  throw RowSortException("Sorting more than 4B elements not supported");
801  }
802 
803  CHECK(permutation_.empty());
804 
805  if (top_n && g_parallel_top_min < entryCount()) {
806  if (g_enable_watchdog && g_parallel_top_max < entryCount()) {
807  throw WatchdogException("Sorting the result would be too slow");
808  }
809  parallelTop(order_entries, top_n, executor);
810  } else {
811  if (g_enable_watchdog && Executor::baseline_threshold < entryCount()) {
812  throw WatchdogException("Sorting the result would be too slow");
813  }
814  permutation_.resize(query_mem_desc_.getEntryCount());
815  // PermutationView is used to share common API with parallelTop().
816  PermutationView pv(permutation_.data(), 0, permutation_.size());
817  pv = initPermutationBuffer(pv, 0, permutation_.size());
818  if (top_n == 0) {
819  top_n = pv.size(); // top_n == 0 implies a full sort
820  }
821  pv = topPermutation(pv, top_n, createComparator(order_entries, pv, executor, false));
822  if (pv.size() < permutation_.size()) {
823  permutation_.resize(pv.size());
824  permutation_.shrink_to_fit();
825  }
826  }
827 }
828 
829 #ifdef HAVE_CUDA
830 void ResultSet::baselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
831  const size_t top_n,
832  const Executor* executor) {
833  auto timer = DEBUG_TIMER(__func__);
834  // If we only have on GPU, it's usually faster to do multi-threaded radix sort on CPU
835  if (getGpuCount() > 1) {
836  try {
837  doBaselineSort(ExecutorDeviceType::GPU, order_entries, top_n, executor);
838  } catch (...) {
839  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
840  }
841  } else {
842  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
843  }
844 }
845 #endif // HAVE_CUDA
846 
847 // Append non-empty indexes i in [begin,end) from findStorage(i) to permutation.
849  PermutationIdx const begin,
850  PermutationIdx const end) const {
851  auto timer = DEBUG_TIMER(__func__);
852  for (PermutationIdx i = begin; i < end; ++i) {
853  const auto storage_lookup_result = findStorage(i);
854  const auto lhs_storage = storage_lookup_result.storage_ptr;
855  const auto off = storage_lookup_result.fixedup_entry_idx;
856  CHECK(lhs_storage);
857  if (!lhs_storage->isEmptyEntry(off)) {
858  permutation.push_back(i);
859  }
860  }
861  return permutation;
862 }
863 
865  return permutation_;
866 }
867 
868 void ResultSet::parallelTop(const std::list<Analyzer::OrderEntry>& order_entries,
869  const size_t top_n,
870  const Executor* executor) {
871  auto timer = DEBUG_TIMER(__func__);
872  const size_t nthreads = cpu_threads();
873 
874  // Split permutation_ into nthreads subranges and top-sort in-place.
875  permutation_.resize(query_mem_desc_.getEntryCount());
876  std::vector<PermutationView> permutation_views(nthreads);
877  threading::task_group top_sort_threads;
878  for (auto interval : makeIntervals<PermutationIdx>(0, permutation_.size(), nthreads)) {
879  top_sort_threads.run([this,
880  &order_entries,
881  &permutation_views,
882  top_n,
883  executor,
885  interval] {
886  auto qid_scope_guard = logger::set_thread_local_query_id(query_id);
887  PermutationView pv(permutation_.data() + interval.begin, 0, interval.size());
888  pv = initPermutationBuffer(pv, interval.begin, interval.end);
889  const auto compare = createComparator(order_entries, pv, executor, true);
890  permutation_views[interval.index] = topPermutation(pv, top_n, compare);
891  });
892  }
893  top_sort_threads.wait();
894 
895  // In case you are considering implementing a parallel reduction, note that the
896  // ResultSetComparator constructor is O(N) in order to materialize some of the aggregate
897  // columns as necessary to perform a comparison. This cost is why reduction is chosen to
898  // be serial instead; only one more Comparator is needed below.
899 
900  // Left-copy disjoint top-sorted subranges into one contiguous range.
901  // ++++....+++.....+++++... -> ++++++++++++............
902  auto end = permutation_.begin() + permutation_views.front().size();
903  for (size_t i = 1; i < nthreads; ++i) {
904  std::copy(permutation_views[i].begin(), permutation_views[i].end(), end);
905  end += permutation_views[i].size();
906  }
907 
908  // Top sort final range.
909  PermutationView pv(permutation_.data(), end - permutation_.begin());
910  const auto compare = createComparator(order_entries, pv, executor, false);
911  pv = topPermutation(pv, top_n, compare);
912  permutation_.resize(pv.size());
913  permutation_.shrink_to_fit();
914 }
915 
916 std::pair<size_t, size_t> ResultSet::getStorageIndex(const size_t entry_idx) const {
917  size_t fixedup_entry_idx = entry_idx;
918  auto entry_count = storage_->query_mem_desc_.getEntryCount();
919  const bool is_rowwise_layout = !storage_->query_mem_desc_.didOutputColumnar();
920  if (fixedup_entry_idx < entry_count) {
921  return {0, fixedup_entry_idx};
922  }
923  fixedup_entry_idx -= entry_count;
924  for (size_t i = 0; i < appended_storage_.size(); ++i) {
925  const auto& desc = appended_storage_[i]->query_mem_desc_;
926  CHECK_NE(is_rowwise_layout, desc.didOutputColumnar());
927  entry_count = desc.getEntryCount();
928  if (fixedup_entry_idx < entry_count) {
929  return {i + 1, fixedup_entry_idx};
930  }
931  fixedup_entry_idx -= entry_count;
932  }
933  UNREACHABLE() << "entry_idx = " << entry_idx << ", query_mem_desc_.getEntryCount() = "
934  << query_mem_desc_.getEntryCount();
935  return {};
936 }
937 
940 
942  auto [stg_idx, fixedup_entry_idx] = getStorageIndex(entry_idx);
943  return {stg_idx ? appended_storage_[stg_idx - 1].get() : storage_.get(),
944  fixedup_entry_idx,
945  stg_idx};
946 }
947 
948 template <typename BUFFER_ITERATOR_TYPE>
950  BUFFER_ITERATOR_TYPE>::materializeCountDistinctColumns() {
951  for (const auto& order_entry : order_entries_) {
952  if (is_distinct_target(result_set_->targets_[order_entry.tle_no - 1])) {
953  count_distinct_materialized_buffers_.emplace_back(
954  materializeCountDistinctColumn(order_entry));
955  }
956  }
957 }
958 
959 namespace {
960 struct IsAggKind {
961  std::vector<TargetInfo> const& targets_;
963  IsAggKind(std::vector<TargetInfo> const& targets, SQLAgg const agg_kind)
964  : targets_(targets), agg_kind_(agg_kind) {}
965  bool operator()(Analyzer::OrderEntry const& order_entry) const {
966  return targets_[order_entry.tle_no - 1].agg_kind == agg_kind_;
967  }
968 };
969 } // namespace
970 
971 template <typename BUFFER_ITERATOR_TYPE>
973  BUFFER_ITERATOR_TYPE>::materializeApproxQuantileColumns() const {
974  ResultSet::ApproxQuantileBuffers approx_quantile_materialized_buffers;
975  for (const auto& order_entry : order_entries_) {
976  if (result_set_->targets_[order_entry.tle_no - 1].agg_kind == kAPPROX_QUANTILE) {
977  approx_quantile_materialized_buffers.emplace_back(
978  materializeApproxQuantileColumn(order_entry));
979  }
980  }
981  return approx_quantile_materialized_buffers;
982 }
983 
984 template <typename BUFFER_ITERATOR_TYPE>
987  ResultSet::ModeBuffers mode_buffers;
988  IsAggKind const is_mode(result_set_->targets_, kMODE);
989  mode_buffers.reserve(
990  std::count_if(order_entries_.begin(), order_entries_.end(), is_mode));
991  for (auto const& order_entry : order_entries_) {
992  if (is_mode(order_entry)) {
993  mode_buffers.emplace_back(materializeModeColumn(order_entry));
994  }
995  }
996  return mode_buffers;
997 }
998 
999 template <typename BUFFER_ITERATOR_TYPE>
1000 std::vector<int64_t>
1002  const Analyzer::OrderEntry& order_entry) const {
1003  const size_t num_storage_entries = result_set_->query_mem_desc_.getEntryCount();
1004  std::vector<int64_t> count_distinct_materialized_buffer(num_storage_entries);
1005  const CountDistinctDescriptor count_distinct_descriptor =
1006  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no - 1);
1007  const size_t num_non_empty_entries = permutation_.size();
1008 
1009  const auto work = [&, query_id = logger::query_id()](const size_t start,
1010  const size_t end) {
1011  auto qid_scope_guard = logger::set_thread_local_query_id(query_id);
1012  for (size_t i = start; i < end; ++i) {
1013  const PermutationIdx permuted_idx = permutation_[i];
1014  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1015  const auto storage = storage_lookup_result.storage_ptr;
1016  const auto off = storage_lookup_result.fixedup_entry_idx;
1017  const auto value = buffer_itr_.getColumnInternal(
1018  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1019  count_distinct_materialized_buffer[permuted_idx] =
1020  count_distinct_set_size(value.i1, count_distinct_descriptor);
1021  }
1022  };
1023  // TODO(tlm): Allow use of tbb after we determine how to easily encapsulate the choice
1024  // between thread pool types
1025  if (single_threaded_) {
1026  work(0, num_non_empty_entries);
1027  } else {
1028  threading::task_group thread_pool;
1029  for (auto interval : makeIntervals<size_t>(0, num_non_empty_entries, cpu_threads())) {
1030  thread_pool.run([=] { work(interval.begin, interval.end); });
1031  }
1032  thread_pool.wait();
1033  }
1034  return count_distinct_materialized_buffer;
1035 }
1036 
1038  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1039  CHECK(t_digest);
1040  t_digest->mergeBufferFinal();
1041  double const quantile = t_digest->quantile();
1042  return boost::math::isnan(quantile) ? NULL_DOUBLE : quantile;
1043 }
1044 
1045 template <typename BUFFER_ITERATOR_TYPE>
1046 ResultSet::ApproxQuantileBuffers::value_type
1048  const Analyzer::OrderEntry& order_entry) const {
1049  ResultSet::ApproxQuantileBuffers::value_type materialized_buffer(
1050  result_set_->query_mem_desc_.getEntryCount());
1051  const size_t size = permutation_.size();
1052  const auto work = [&, query_id = logger::query_id()](const size_t start,
1053  const size_t end) {
1054  auto qid_scope_guard = logger::set_thread_local_query_id(query_id);
1055  for (size_t i = start; i < end; ++i) {
1056  const PermutationIdx permuted_idx = permutation_[i];
1057  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1058  const auto storage = storage_lookup_result.storage_ptr;
1059  const auto off = storage_lookup_result.fixedup_entry_idx;
1060  const auto value = buffer_itr_.getColumnInternal(
1061  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1062  materialized_buffer[permuted_idx] =
1063  value.i1 ? calculateQuantile(reinterpret_cast<quantile::TDigest*>(value.i1))
1064  : NULL_DOUBLE;
1065  }
1066  };
1067  if (single_threaded_) {
1068  work(0, size);
1069  } else {
1070  threading::task_group thread_pool;
1071  for (auto interval : makeIntervals<size_t>(0, size, cpu_threads())) {
1072  thread_pool.run([=] { work(interval.begin, interval.end); });
1073  }
1074  thread_pool.wait();
1075  }
1076  return materialized_buffer;
1077 }
1078 
1079 namespace {
1080 // i1 is from InternalTargetValue
1081 int64_t materializeMode(int64_t const i1) {
1082  if (auto const* const agg_mode = reinterpret_cast<AggMode const*>(i1)) {
1083  if (std::optional<int64_t> const mode = agg_mode->mode()) {
1084  return *mode;
1085  }
1086  }
1087  return NULL_BIGINT;
1088 }
1089 
1090 using ModeBlockedRange = tbb::blocked_range<size_t>;
1091 } // namespace
1092 
1093 template <typename BUFFER_ITERATOR_TYPE>
1094 struct ResultSet::ResultSetComparator<BUFFER_ITERATOR_TYPE>::ModeScatter {
1098  ResultSet::ModeBuffers::value_type& materialized_buffer_;
1099 
1100  void operator()(ModeBlockedRange const& r) const {
1101  auto qid_scope_guard = logger::set_thread_local_query_id(query_id_);
1102  for (size_t i = r.begin(); i != r.end(); ++i) {
1103  PermutationIdx const permuted_idx = rsc_->permutation_[i];
1104  auto const storage_lookup_result = rsc_->result_set_->findStorage(permuted_idx);
1105  auto const storage = storage_lookup_result.storage_ptr;
1106  auto const off = storage_lookup_result.fixedup_entry_idx;
1107  auto const value = rsc_->buffer_itr_.getColumnInternal(
1108  storage->buff_, off, order_entry_.tle_no - 1, storage_lookup_result);
1109  materialized_buffer_[permuted_idx] = materializeMode(value.i1);
1110  }
1111  }
1112 };
1113 
1114 template <typename BUFFER_ITERATOR_TYPE>
1115 ResultSet::ModeBuffers::value_type
1117  const Analyzer::OrderEntry& order_entry) const {
1118  ResultSet::ModeBuffers::value_type materialized_buffer(
1119  result_set_->query_mem_desc_.getEntryCount());
1120  ModeScatter mode_scatter{logger::query_id(), this, order_entry, materialized_buffer};
1121  if (single_threaded_) {
1122  mode_scatter(ModeBlockedRange(0, permutation_.size()));
1123  } else {
1124  tbb::parallel_for(ModeBlockedRange(0, permutation_.size()), mode_scatter);
1125  }
1126  return materialized_buffer;
1127 }
1128 
1129 template <typename BUFFER_ITERATOR_TYPE>
1131  const PermutationIdx lhs,
1132  const PermutationIdx rhs) const {
1133  // NB: The compare function must define a strict weak ordering, otherwise
1134  // std::sort will trigger a segmentation fault (or corrupt memory).
1135  const auto lhs_storage_lookup_result = result_set_->findStorage(lhs);
1136  const auto rhs_storage_lookup_result = result_set_->findStorage(rhs);
1137  const auto lhs_storage = lhs_storage_lookup_result.storage_ptr;
1138  const auto rhs_storage = rhs_storage_lookup_result.storage_ptr;
1139  const auto fixedup_lhs = lhs_storage_lookup_result.fixedup_entry_idx;
1140  const auto fixedup_rhs = rhs_storage_lookup_result.fixedup_entry_idx;
1141  size_t materialized_count_distinct_buffer_idx{0};
1142  size_t materialized_approx_quantile_buffer_idx{0};
1143  size_t materialized_mode_buffer_idx{0};
1144 
1145  for (const auto& order_entry : order_entries_) {
1146  CHECK_GE(order_entry.tle_no, 1);
1147  // lhs_entry_ti and rhs_entry_ti can differ on comp_param w/ UNION of string dicts.
1148  const auto& lhs_agg_info = lhs_storage->targets_[order_entry.tle_no - 1];
1149  const auto& rhs_agg_info = rhs_storage->targets_[order_entry.tle_no - 1];
1150  const auto lhs_entry_ti = get_compact_type(lhs_agg_info);
1151  const auto rhs_entry_ti = get_compact_type(rhs_agg_info);
1152  // When lhs vs rhs doesn't matter, the lhs is used. For example:
1153  bool float_argument_input = takes_float_argument(lhs_agg_info);
1154  // Need to determine if the float value has been stored as float
1155  // or if it has been compacted to a different (often larger 8 bytes)
1156  // in distributed case the floats are actually 4 bytes
1157  // TODO the above takes_float_argument() is widely used wonder if this problem
1158  // exists elsewhere
1159  if (lhs_entry_ti.get_type() == kFLOAT) {
1160  const auto is_col_lazy =
1161  !result_set_->lazy_fetch_info_.empty() &&
1162  result_set_->lazy_fetch_info_[order_entry.tle_no - 1].is_lazily_fetched;
1163  if (result_set_->query_mem_desc_.getPaddedSlotWidthBytes(order_entry.tle_no - 1) ==
1164  sizeof(float)) {
1165  float_argument_input =
1166  result_set_->query_mem_desc_.didOutputColumnar() ? !is_col_lazy : true;
1167  }
1168  }
1169 
1170  if (UNLIKELY(is_distinct_target(lhs_agg_info))) {
1171  CHECK_LT(materialized_count_distinct_buffer_idx,
1172  count_distinct_materialized_buffers_.size());
1173 
1174  const auto& count_distinct_materialized_buffer =
1175  count_distinct_materialized_buffers_[materialized_count_distinct_buffer_idx];
1176  const auto lhs_sz = count_distinct_materialized_buffer[lhs];
1177  const auto rhs_sz = count_distinct_materialized_buffer[rhs];
1178  ++materialized_count_distinct_buffer_idx;
1179  if (lhs_sz == rhs_sz) {
1180  continue;
1181  }
1182  return (lhs_sz < rhs_sz) != order_entry.is_desc;
1183  } else if (UNLIKELY(lhs_agg_info.agg_kind == kAPPROX_QUANTILE)) {
1184  CHECK_LT(materialized_approx_quantile_buffer_idx,
1185  approx_quantile_materialized_buffers_.size());
1186  const auto& approx_quantile_materialized_buffer =
1187  approx_quantile_materialized_buffers_[materialized_approx_quantile_buffer_idx];
1188  const auto lhs_value = approx_quantile_materialized_buffer[lhs];
1189  const auto rhs_value = approx_quantile_materialized_buffer[rhs];
1190  ++materialized_approx_quantile_buffer_idx;
1191  if (lhs_value == rhs_value) {
1192  continue;
1193  } else if (!lhs_entry_ti.get_notnull()) {
1194  if (lhs_value == NULL_DOUBLE) {
1195  return order_entry.nulls_first;
1196  } else if (rhs_value == NULL_DOUBLE) {
1197  return !order_entry.nulls_first;
1198  }
1199  }
1200  return (lhs_value < rhs_value) != order_entry.is_desc;
1201  } else if (UNLIKELY(lhs_agg_info.agg_kind == kMODE)) {
1202  CHECK_LT(materialized_mode_buffer_idx, mode_buffers_.size());
1203  auto const& mode_buffer = mode_buffers_[materialized_mode_buffer_idx++];
1204  int64_t const lhs_value = mode_buffer[lhs];
1205  int64_t const rhs_value = mode_buffer[rhs];
1206  if (lhs_value == rhs_value) {
1207  continue;
1208  // MODE(x) can only be NULL when the group is empty, since it skips null values.
1209  } else if (lhs_value == NULL_BIGINT) { // NULL_BIGINT from materializeMode()
1210  return order_entry.nulls_first;
1211  } else if (rhs_value == NULL_BIGINT) {
1212  return !order_entry.nulls_first;
1213  } else {
1214  return result_set_->isLessThan(lhs_entry_ti, lhs_value, rhs_value) !=
1215  order_entry.is_desc;
1216  }
1217  }
1218 
1219  const auto lhs_v = buffer_itr_.getColumnInternal(lhs_storage->buff_,
1220  fixedup_lhs,
1221  order_entry.tle_no - 1,
1222  lhs_storage_lookup_result);
1223  const auto rhs_v = buffer_itr_.getColumnInternal(rhs_storage->buff_,
1224  fixedup_rhs,
1225  order_entry.tle_no - 1,
1226  rhs_storage_lookup_result);
1227 
1228  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1229  isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1230  continue;
1231  }
1232  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1233  !isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1234  return order_entry.nulls_first;
1235  }
1236  if (UNLIKELY(isNull(rhs_entry_ti, rhs_v, float_argument_input) &&
1237  !isNull(lhs_entry_ti, lhs_v, float_argument_input))) {
1238  return !order_entry.nulls_first;
1239  }
1240 
1241  if (LIKELY(lhs_v.isInt())) {
1242  CHECK(rhs_v.isInt());
1243  if (UNLIKELY(lhs_entry_ti.is_string() &&
1244  lhs_entry_ti.get_compression() == kENCODING_DICT)) {
1245  CHECK_EQ(4, lhs_entry_ti.get_logical_size());
1246  CHECK(executor_);
1247  const auto lhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1248  lhs_entry_ti.get_comp_param(), result_set_->row_set_mem_owner_, false);
1249  const auto rhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1250  rhs_entry_ti.get_comp_param(), result_set_->row_set_mem_owner_, false);
1251  const auto lhs_str = lhs_string_dict_proxy->getString(lhs_v.i1);
1252  const auto rhs_str = rhs_string_dict_proxy->getString(rhs_v.i1);
1253  if (lhs_str == rhs_str) {
1254  continue;
1255  }
1256  return (lhs_str < rhs_str) != order_entry.is_desc;
1257  }
1258 
1259  if (lhs_v.i1 == rhs_v.i1) {
1260  continue;
1261  }
1262  if (lhs_entry_ti.is_fp()) {
1263  if (float_argument_input) {
1264  const auto lhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&lhs_v.i1));
1265  const auto rhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&rhs_v.i1));
1266  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1267  } else {
1268  const auto lhs_dval =
1269  *reinterpret_cast<const double*>(may_alias_ptr(&lhs_v.i1));
1270  const auto rhs_dval =
1271  *reinterpret_cast<const double*>(may_alias_ptr(&rhs_v.i1));
1272  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1273  }
1274  }
1275  return (lhs_v.i1 < rhs_v.i1) != order_entry.is_desc;
1276  } else {
1277  if (lhs_v.isPair()) {
1278  CHECK(rhs_v.isPair());
1279  const auto lhs =
1280  pair_to_double({lhs_v.i1, lhs_v.i2}, lhs_entry_ti, float_argument_input);
1281  const auto rhs =
1282  pair_to_double({rhs_v.i1, rhs_v.i2}, rhs_entry_ti, float_argument_input);
1283  if (lhs == rhs) {
1284  continue;
1285  }
1286  return (lhs < rhs) != order_entry.is_desc;
1287  } else {
1288  CHECK(lhs_v.isStr() && rhs_v.isStr());
1289  const auto lhs = lhs_v.strVal();
1290  const auto rhs = rhs_v.strVal();
1291  if (lhs == rhs) {
1292  continue;
1293  }
1294  return (lhs < rhs) != order_entry.is_desc;
1295  }
1296  }
1297  }
1298  return false;
1299 }
1300 
1301 // Partial sort permutation into top(least by compare) n elements.
1302 // If permutation.size() <= n then sort entire permutation by compare.
1303 // Return PermutationView with new size() = min(n, permutation.size()).
1305  const size_t n,
1306  const Comparator& compare) {
1307  auto timer = DEBUG_TIMER(__func__);
1308  if (n < permutation.size()) {
1309  std::partial_sort(
1310  permutation.begin(), permutation.begin() + n, permutation.end(), compare);
1311  permutation.resize(n);
1312  } else {
1313  std::sort(permutation.begin(), permutation.end(), compare);
1314  }
1315  return permutation;
1316 }
1317 
1319  const std::list<Analyzer::OrderEntry>& order_entries) const {
1320  auto timer = DEBUG_TIMER(__func__);
1321  auto data_mgr = &catalog_->getDataMgr();
1322  const int device_id{0};
1323  auto allocator = std::make_unique<CudaAllocator>(
1324  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1325  CHECK_GT(block_size_, 0);
1326  CHECK_GT(grid_size_, 0);
1327  std::vector<int64_t*> group_by_buffers(block_size_);
1328  group_by_buffers[0] = reinterpret_cast<int64_t*>(storage_->getUnderlyingBuffer());
1329  auto dev_group_by_buffers =
1330  create_dev_group_by_buffers(allocator.get(),
1331  group_by_buffers,
1332  query_mem_desc_,
1333  block_size_,
1334  grid_size_,
1335  device_id,
1337  /*num_input_rows=*/-1,
1338  /*prepend_index_buffer=*/true,
1339  /*always_init_group_by_on_host=*/true,
1340  /*use_bump_allocator=*/false,
1341  /*has_varlen_output=*/false,
1342  /*insitu_allocator*=*/nullptr);
1344  order_entries, query_mem_desc_, dev_group_by_buffers, data_mgr, device_id);
1346  *allocator,
1347  group_by_buffers,
1348  query_mem_desc_.getBufferSizeBytes(ExecutorDeviceType::GPU),
1349  dev_group_by_buffers.data,
1350  query_mem_desc_,
1351  block_size_,
1352  grid_size_,
1353  device_id,
1354  /*use_bump_allocator=*/false,
1355  /*has_varlen_output=*/false);
1356 }
1357 
1359  const std::list<Analyzer::OrderEntry>& order_entries) const {
1360  auto timer = DEBUG_TIMER(__func__);
1361  CHECK(!query_mem_desc_.hasKeylessHash());
1362  std::vector<int64_t> tmp_buff(query_mem_desc_.getEntryCount());
1363  std::vector<int32_t> idx_buff(query_mem_desc_.getEntryCount());
1364  CHECK_EQ(size_t(1), order_entries.size());
1365  auto buffer_ptr = storage_->getUnderlyingBuffer();
1366  for (const auto& order_entry : order_entries) {
1367  const auto target_idx = order_entry.tle_no - 1;
1368  const auto sortkey_val_buff = reinterpret_cast<int64_t*>(
1369  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1370  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1371  sort_groups_cpu(sortkey_val_buff,
1372  &idx_buff[0],
1373  query_mem_desc_.getEntryCount(),
1374  order_entry.is_desc,
1375  chosen_bytes);
1376  apply_permutation_cpu(reinterpret_cast<int64_t*>(buffer_ptr),
1377  &idx_buff[0],
1378  query_mem_desc_.getEntryCount(),
1379  &tmp_buff[0],
1380  sizeof(int64_t));
1381  for (size_t target_idx = 0; target_idx < query_mem_desc_.getSlotCount();
1382  ++target_idx) {
1383  if (static_cast<int>(target_idx) == order_entry.tle_no - 1) {
1384  continue;
1385  }
1386  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1387  const auto satellite_val_buff = reinterpret_cast<int64_t*>(
1388  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1389  apply_permutation_cpu(satellite_val_buff,
1390  &idx_buff[0],
1391  query_mem_desc_.getEntryCount(),
1392  &tmp_buff[0],
1393  chosen_bytes);
1394  }
1395  }
1396 }
1397 
1398 size_t ResultSet::getLimit() const {
1399  return keep_first_;
1400 }
1401 
1402 const std::vector<std::string> ResultSet::getStringDictionaryPayloadCopy(
1403  const int dict_id) const {
1404  const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1405  dict_id, /*with_generation=*/true, catalog_);
1406  CHECK(sdp);
1407  return sdp->getDictionary()->copyStrings();
1408 }
1409 
1410 const std::pair<std::vector<int32_t>, std::vector<std::string>>
1412  const auto col_type_info = getColType(col_idx);
1413  std::unordered_set<int32_t> unique_string_ids_set;
1414  const size_t num_entries = entryCount();
1415  std::vector<bool> targets_to_skip(colCount(), true);
1416  targets_to_skip[col_idx] = false;
1417  CHECK(col_type_info.is_dict_encoded_type()); // Array<Text> or Text
1418  const int64_t null_val = inline_fixed_encoding_null_val(
1419  col_type_info.is_array() ? col_type_info.get_elem_type() : col_type_info);
1420 
1421  for (size_t row_idx = 0; row_idx < num_entries; ++row_idx) {
1422  const auto result_row = getRowAtNoTranslations(row_idx, targets_to_skip);
1423  if (!result_row.empty()) {
1424  if (const auto scalar_col_val =
1425  boost::get<ScalarTargetValue>(&result_row[col_idx])) {
1426  const int32_t string_id =
1427  static_cast<int32_t>(boost::get<int64_t>(*scalar_col_val));
1428  if (string_id != null_val) {
1429  unique_string_ids_set.emplace(string_id);
1430  }
1431  } else if (const auto array_col_val =
1432  boost::get<ArrayTargetValue>(&result_row[col_idx])) {
1433  if (*array_col_val) {
1434  for (const ScalarTargetValue& scalar : array_col_val->value()) {
1435  const int32_t string_id = static_cast<int32_t>(boost::get<int64_t>(scalar));
1436  if (string_id != null_val) {
1437  unique_string_ids_set.emplace(string_id);
1438  }
1439  }
1440  }
1441  }
1442  }
1443  }
1444 
1445  const size_t num_unique_strings = unique_string_ids_set.size();
1446  std::vector<int32_t> unique_string_ids(num_unique_strings);
1447  size_t string_idx{0};
1448  for (const auto unique_string_id : unique_string_ids_set) {
1449  unique_string_ids[string_idx++] = unique_string_id;
1450  }
1451 
1452  const int32_t dict_id = col_type_info.get_comp_param();
1453  const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1454  dict_id, /*with_generation=*/true, catalog_);
1455  CHECK(sdp);
1456 
1457  return std::make_pair(unique_string_ids, sdp->getStrings(unique_string_ids));
1458 }
1459 
1469  return false;
1470  } else if (query_mem_desc_.didOutputColumnar()) {
1471  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1473  query_mem_desc_.getQueryDescriptionType() ==
1475  query_mem_desc_.getQueryDescriptionType() ==
1477  query_mem_desc_.getQueryDescriptionType() ==
1479  } else {
1480  CHECK(!(query_mem_desc_.getQueryDescriptionType() ==
1482  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1484  query_mem_desc_.getQueryDescriptionType() ==
1486  }
1487 }
1488 
1490  return query_mem_desc_.didOutputColumnar() &&
1491  (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection ||
1492  query_mem_desc_.getQueryDescriptionType() ==
1494  appended_storage_.empty() && storage_ &&
1495  (lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
1496 }
1497 
1498 const int8_t* ResultSet::getColumnarBuffer(size_t column_idx) const {
1499  CHECK(isZeroCopyColumnarConversionPossible(column_idx));
1500  return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(column_idx);
1501 }
1502 
1503 // returns a bitmap (and total number) of all single slot targets
1504 std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
1505  std::vector<bool> target_bitmap(targets_.size(), true);
1506  size_t num_single_slot_targets = 0;
1507  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1508  const auto& sql_type = targets_[target_idx].sql_type;
1509  if (targets_[target_idx].is_agg && targets_[target_idx].agg_kind == kAVG) {
1510  target_bitmap[target_idx] = false;
1511  } else if (sql_type.is_varlen()) {
1512  target_bitmap[target_idx] = false;
1513  } else {
1514  num_single_slot_targets++;
1515  }
1516  }
1517  return std::make_tuple(std::move(target_bitmap), num_single_slot_targets);
1518 }
1519 
1528 std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
1529  const {
1530  CHECK(isDirectColumnarConversionPossible());
1531  auto [single_slot_targets, num_single_slot_targets] = getSingleSlotTargetBitmap();
1532 
1533  for (size_t target_idx = 0; target_idx < single_slot_targets.size(); target_idx++) {
1534  const auto& target = targets_[target_idx];
1535  if (single_slot_targets[target_idx] &&
1536  (is_distinct_target(target) ||
1537  shared::is_any<kAPPROX_QUANTILE, kMODE>(target.agg_kind) ||
1538  (target.is_agg && target.agg_kind == kSAMPLE && target.sql_type == kFLOAT))) {
1539  single_slot_targets[target_idx] = false;
1540  num_single_slot_targets--;
1541  }
1542  }
1543  CHECK_GE(num_single_slot_targets, size_t(0));
1544  return std::make_tuple(std::move(single_slot_targets), num_single_slot_targets);
1545 }
1546 
1547 // returns the starting slot index for all targets in the result set
1548 std::vector<size_t> ResultSet::getSlotIndicesForTargetIndices() const {
1549  std::vector<size_t> slot_indices(targets_.size(), 0);
1550  size_t slot_index = 0;
1551  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1552  slot_indices[target_idx] = slot_index;
1553  slot_index = advance_slot(slot_index, targets_[target_idx], false);
1554  }
1555  return slot_indices;
1556 }
1557 
1558 // namespace result_set
1559 
1560 bool result_set::can_use_parallel_algorithms(const ResultSet& rows) {
1561  return !rows.isTruncated();
1562 }
1563 
1564 namespace {
1566  bool operator()(TargetInfo const& target_info) const {
1567  return target_info.sql_type.is_dict_encoded_string();
1568  }
1569 };
1570 } // namespace
1571 
1573  std::vector<TargetInfo> const& targets) {
1574  auto const itr = std::find_if(targets.begin(), targets.end(), IsDictEncodedStr{});
1575  return itr == targets.end() ? std::nullopt
1576  : std::make_optional<size_t>(itr - targets.begin());
1577 }
1578 
1579 bool result_set::use_parallel_algorithms(const ResultSet& rows) {
1580  return result_set::can_use_parallel_algorithms(rows) && rows.entryCount() >= 20000;
1581 }
QidScopeGuard set_thread_local_query_id(QueryId const query_id)
Definition: Logger.cpp:486
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
void syncEstimatorBuffer() const
Definition: ResultSet.cpp:698
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:230
const QueryMemoryDescriptor & getQueryMemDesc() const
Definition: ResultSet.cpp:674
void sort_groups_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:27
GpuGroupByBuffers create_dev_group_by_buffers(DeviceAllocator *device_allocator, const std::vector< int64_t * > &group_by_buffers, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const ExecutorDispatchMode dispatch_mode, const int64_t num_input_rows, const bool prepend_index_buffer, const bool always_init_group_by_on_host, const bool use_bump_allocator, const bool has_varlen_output, Allocator *insitu_allocator)
Definition: GpuMemUtils.cpp:70
size_t g_parallel_top_max
Definition: ResultSet.cpp:50
std::pair< size_t, size_t > getStorageIndex(const size_t entry_idx) const
Definition: ResultSet.cpp:916
#define NULL_DOUBLE
DEVICE void push_back(T const &value)
Definition: VectorView.h:73
bool isValidationOnlyRes() const
Definition: ResultSet.cpp:750
void setValidationOnlyRes()
Definition: ResultSet.cpp:746
PermutationView initPermutationBuffer(PermutationView permutation, PermutationIdx const begin, PermutationIdx const end) const
Definition: ResultSet.cpp:848
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
bool g_enable_direct_columnarization
Definition: Execute.cpp:122
ExecutorDeviceType
void moveToBegin() const
Definition: ResultSet.cpp:733
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
#define NULL_BIGINT
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
#define LOG(tag)
Definition: Logger.h:216
ResultSet::ResultSetComparator< BUFFER_ITERATOR_TYPE > const *const rsc_
Definition: ResultSet.cpp:1096
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
DEVICE RealType quantile(VectorView< IndexType const > const partial_sum, RealType const q) const
Definition: quantile.h:827
static const size_t baseline_threshold
Definition: Execute.h:1365
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:64
int tle_no
Definition: Analyzer.h:2466
#define UNREACHABLE()
Definition: Logger.h:266
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const std::vector< TargetInfo > & getTargetInfos() const
Definition: ResultSet.cpp:679
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1572
#define CHECK_GE(x, y)
Definition: Logger.h:235
void setKernelQueueTime(const int64_t kernel_queue_time)
Definition: ResultSet.cpp:716
size_t rowCount(const bool force_parallel=false) const
Returns the number of valid entries in the result set (i.e that will be returned from the SQL query o...
Definition: ResultSet.cpp:595
std::shared_ptr< ResultSet > ResultSetPtr
CellCallback(StringDictionaryProxy::IdMap &&id_map, int64_t const null_int)
Definition: ResultSet.cpp:440
int64_t read_int_from_buff(const int8_t *ptr, const int8_t compact_sz)
void keepFirstN(const size_t n)
Definition: ResultSet.cpp:54
size_t g_streaming_topn_max
Definition: ResultSet.cpp:51
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
void addCompilationQueueTime(const int64_t compilation_queue_time)
Definition: ResultSet.cpp:720
std::vector< std::vector< double >> ApproxQuantileBuffers
Definition: ResultSet.h:821
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:102
void parallelTop(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
Definition: ResultSet.cpp:868
size_t colCount() const
Definition: ResultSet.cpp:415
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
void apply_permutation_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, int64_t *tmp_buff, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:46
DEVICE void resize(size_type const size)
Definition: VectorView.h:74
#define CHECK_GT(x, y)
Definition: Logger.h:234
size_t getLimit() const
Definition: ResultSet.cpp:1398
std::vector< int64_t > materializeCountDistinctColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1001
ApproxQuantileBuffers::value_type materializeApproxQuantileColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1047
bool isTruncated() const
Definition: ResultSet.cpp:738
size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset)
Definition: ResultSet.cpp:541
size_t parallelRowCount() const
Definition: ResultSet.cpp:631
DEVICE void mergeBufferFinal()
Definition: quantile.h:651
void radixSortOnCpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1358
const SQLTypeInfo get_compact_type(const TargetInfo &target)
bool definitelyHasNoRows() const
Definition: ResultSet.cpp:670
uint64_t QueryId
Definition: Logger.h:335
const std::vector< std::string > getStringDictionaryPayloadCopy(const int dict_id) const
Definition: ResultSet.cpp:1402
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1579
bool isZeroCopyColumnarConversionPossible(size_t column_idx) const
Definition: ResultSet.cpp:1489
size_t g_parallel_top_min
Definition: ResultSet.cpp:49
int8_t * getHostEstimatorBuffer() const
Definition: ResultSet.cpp:694
DEVICE size_type size() const
Definition: VectorView.h:83
void invalidateCachedRowCount() const
Definition: ResultSet.cpp:607
IsAggKind(std::vector< TargetInfo > const &targets, SQLAgg const agg_kind)
Definition: ResultSet.cpp:963
const ResultSetStorage * allocateStorage() const
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
bool operator()(TargetInfo const &target_info) const
Definition: ResultSet.cpp:1566
void sort(const std::list< Analyzer::OrderEntry > &order_entries, size_t top_n, const Executor *executor)
Definition: ResultSet.cpp:770
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void setQueueTime(const int64_t queue_time)
Definition: ResultSet.cpp:712
#define CHECK_NE(x, y)
Definition: Logger.h:231
void dropFirstN(const size_t n)
Definition: ResultSet.cpp:59
DEVICE T * begin() const
Definition: VectorView.h:59
std::vector< PermutationIdx > Permutation
Definition: ResultSet.h:153
std::tuple< std::vector< bool >, size_t > getSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1504
bool g_enable_watchdog
#define LIKELY(x)
Definition: likely.h:24
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
StorageLookupResult findStorage(const size_t entry_idx) const
Definition: ResultSet.cpp:941
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:98
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
ResultSetPtr copy()
Definition: ResultSet.cpp:331
std::function< bool(const PermutationIdx, const PermutationIdx)> Comparator
Definition: ResultSet.h:155
bool g_enable_smem_group_by true
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
static double calculateQuantile(quantile::TDigest *const t_digest)
Definition: ResultSet.cpp:1037
std::vector< TargetInfo > const & targets_
Definition: ResultSet.cpp:961
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
void radixSortOnGpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1318
const ResultSetStorage * getStorage() const
Definition: ResultSet.cpp:411
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
const std::pair< std::vector< int32_t >, std::vector< std::string > > getUniqueStringsForDictEncodedTargetCol(const size_t col_idx) const
Definition: ResultSet.cpp:1411
int64_t getQueueTime() const
Definition: ResultSet.cpp:724
#define UNLIKELY(x)
Definition: likely.h:25
ModeBuffers::value_type materializeModeColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1116
uint32_t PermutationIdx
Definition: ResultSet.h:152
#define CHECK_LT(x, y)
Definition: Logger.h:232
Definition: sqltypes.h:67
SQLTypeInfo getColType(const size_t col_idx) const
Definition: ResultSet.cpp:419
std::tuple< std::vector< bool >, size_t > getSupportedSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1528
ExecutorDeviceType getDeviceType() const
Definition: ResultSet.cpp:252
StringDictionaryProxy * getStringDictionaryProxy(int const dict_id) const
Definition: ResultSet.cpp:428
const int8_t * getColumnarBuffer(size_t column_idx) const
Definition: ResultSet.cpp:1498
bool isExplain() const
Definition: ResultSet.cpp:742
void eachCellInColumn(RowIterationState &, CellCallback const &)
Definition: ResultSet.cpp:487
StringDictionaryProxy::IdMap const id_map_
Definition: ResultSet.cpp:436
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:758
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t rowCountImpl(const bool force_parallel) const
Definition: ResultSet.cpp:557
void baselineSort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
const Permutation & getPermutationBuffer() const
Definition: ResultSet.cpp:864
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void append(ResultSet &that)
Definition: ResultSet.cpp:301
std::string summaryToString() const
Definition: ResultSet.cpp:220
data_mgr_(data_mgr)
static PermutationView topPermutation(PermutationView, const size_t n, const Comparator &)
Definition: ResultSet.cpp:1304
size_t getCurrentRowBufferIndex() const
Definition: ResultSet.cpp:293
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:222
QueryId query_id()
Definition: Logger.cpp:473
#define DEBUG_TIMER(name)
Definition: Logger.h:374
int8_t * getDeviceEstimatorBuffer() const
Definition: ResultSet.cpp:688
int64_t materializeMode(int64_t const i1)
Definition: ResultSet.cpp:1081
tbb::blocked_range< size_t > ModeBlockedRange
Definition: ResultSet.cpp:1090
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool operator()(const PermutationIdx lhs, const PermutationIdx rhs) const
Definition: ResultSet.cpp:1130
Basic constructors and methods of the row set interface.
bool operator()(Analyzer::OrderEntry const &order_entry) const
Definition: ResultSet.cpp:965
bool isEmpty() const
Returns a boolean signifying whether there are valid entries in the result set.
Definition: ResultSet.cpp:651
bool is_dict_encoded_string() const
Definition: sqltypes.h:628
const std::vector< int64_t > & getTargetInitVals() const
Definition: ResultSet.cpp:683
std::vector< size_t > getSlotIndicesForTargetIndices() const
Definition: ResultSet.cpp:1548
Allocate GPU memory using GpuBuffers via DataMgr.
constexpr double n
Definition: Utm.h:38
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
constexpr int64_t uninitialized_cached_row_count
Definition: ResultSet.cpp:52
Definition: Analyzer.h:2461
int cpu_threads()
Definition: thread_count.h:25
void operator()(ModeBlockedRange const &r) const
Definition: ResultSet.cpp:1100
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:74
void translateDictEncodedColumns(std::vector< TargetInfo > const &, size_t const start_idx)
Definition: ResultSet.cpp:457
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void copy_group_by_buffers_from_gpu(DeviceAllocator &device_allocator, const std::vector< int64_t * > &group_by_buffers, const size_t groups_buffer_size, const int8_t *group_by_dev_buffers_mem, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const bool prepend_index_buffer, const bool has_varlen_output)
void operator()(int8_t const *const cell_ptr) const
Definition: ResultSet.cpp:442
bool can_use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1560
int64_t getRenderTime() const
Definition: ResultSet.cpp:729
void setCachedRowCount(const size_t row_count) const
Definition: ResultSet.cpp:611
bool isDirectColumnarConversionPossible() const
Definition: ResultSet.cpp:1467
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:83
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ModeBuffers materializeModeColumns() const
Definition: ResultSet.cpp:986
ResultSet::ModeBuffers::value_type & materialized_buffer_
Definition: ResultSet.cpp:1098
size_t binSearchRowCount() const
Definition: ResultSet.cpp:618
int getDeviceId() const
Definition: ResultSet.cpp:754
std::vector< std::vector< int64_t >> ModeBuffers
Definition: ResultSet.h:822
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180
DEVICE T * end() const
Definition: VectorView.h:67