OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSet.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "ResultSet.h"
26 #include "Execute.h"
27 #include "GpuMemUtils.h"
28 #include "InPlaceSort.h"
31 #include "RelAlgExecutionUnit.h"
32 #include "RuntimeFunctions.h"
33 #include "Shared/Intervals.h"
34 #include "Shared/SqlTypesLayout.h"
35 #include "Shared/checked_alloc.h"
36 #include "Shared/likely.h"
37 #include "Shared/thread_count.h"
38 #include "Shared/threading.h"
39 
40 #include <tbb/parallel_for.h>
41 
42 #include <algorithm>
43 #include <atomic>
44 #include <bitset>
45 #include <functional>
46 #include <future>
47 #include <numeric>
48 
49 size_t g_parallel_top_min = 100e3;
50 size_t g_parallel_top_max = 20e6; // In effect only with g_enable_watchdog.
51 size_t g_streaming_topn_max = 100e3;
52 constexpr int64_t uninitialized_cached_row_count{-1};
53 
54 void ResultSet::keepFirstN(const size_t n) {
55  invalidateCachedRowCount();
56  keep_first_ = n;
57 }
58 
59 void ResultSet::dropFirstN(const size_t n) {
60  invalidateCachedRowCount();
61  drop_first_ = n;
62 }
63 
64 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
65  const ExecutorDeviceType device_type,
67  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
68  const unsigned block_size,
69  const unsigned grid_size)
70  : targets_(targets)
71  , device_type_(device_type)
72  , device_id_(-1)
73  , query_mem_desc_(query_mem_desc)
74  , crt_row_buff_idx_(0)
75  , fetched_so_far_(0)
76  , drop_first_(0)
77  , keep_first_(0)
78  , row_set_mem_owner_(row_set_mem_owner)
79  , block_size_(block_size)
80  , grid_size_(grid_size)
81  , data_mgr_(nullptr)
82  , separate_varlen_storage_valid_(false)
83  , just_explain_(false)
84  , for_validation_only_(false)
85  , cached_row_count_(uninitialized_cached_row_count)
86  , geo_return_type_(GeoReturnType::WktString)
87  , cached_(false)
88  , query_exec_time_(0)
89  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
90  , can_use_speculative_top_n_sort(std::nullopt) {}
91 
92 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
93  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
94  const std::vector<std::vector<const int8_t*>>& col_buffers,
95  const std::vector<std::vector<int64_t>>& frag_offsets,
96  const std::vector<int64_t>& consistent_frag_sizes,
97  const ExecutorDeviceType device_type,
98  const int device_id,
100  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
101  const unsigned block_size,
102  const unsigned grid_size)
103  : targets_(targets)
104  , device_type_(device_type)
105  , device_id_(device_id)
106  , query_mem_desc_(query_mem_desc)
107  , crt_row_buff_idx_(0)
108  , fetched_so_far_(0)
109  , drop_first_(0)
110  , keep_first_(0)
111  , row_set_mem_owner_(row_set_mem_owner)
112  , block_size_(block_size)
113  , grid_size_(grid_size)
114  , lazy_fetch_info_(lazy_fetch_info)
115  , col_buffers_{col_buffers}
116  , frag_offsets_{frag_offsets}
117  , consistent_frag_sizes_{consistent_frag_sizes}
118  , data_mgr_(nullptr)
119  , separate_varlen_storage_valid_(false)
120  , just_explain_(false)
121  , for_validation_only_(false)
122  , cached_row_count_(uninitialized_cached_row_count)
123  , geo_return_type_(GeoReturnType::WktString)
124  , cached_(false)
125  , query_exec_time_(0)
126  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
127  , can_use_speculative_top_n_sort(std::nullopt) {}
128 
129 ResultSet::ResultSet(const std::shared_ptr<const Analyzer::Estimator> estimator,
130  const ExecutorDeviceType device_type,
131  const int device_id,
132  Data_Namespace::DataMgr* data_mgr)
133  : device_type_(device_type)
134  , device_id_(device_id)
135  , query_mem_desc_{}
136  , crt_row_buff_idx_(0)
137  , estimator_(estimator)
138  , data_mgr_(data_mgr)
139  , separate_varlen_storage_valid_(false)
140  , just_explain_(false)
141  , for_validation_only_(false)
142  , cached_row_count_(uninitialized_cached_row_count)
143  , geo_return_type_(GeoReturnType::WktString)
144  , cached_(false)
145  , query_exec_time_(0)
146  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
147  , can_use_speculative_top_n_sort(std::nullopt) {
148  if (device_type == ExecutorDeviceType::GPU) {
149  device_estimator_buffer_ = CudaAllocator::allocGpuAbstractBuffer(
150  data_mgr_, estimator_->getBufferSize(), device_id_);
151  data_mgr->getCudaMgr()->zeroDeviceMem(device_estimator_buffer_->getMemoryPtr(),
152  estimator_->getBufferSize(),
153  device_id_,
155  } else {
156  host_estimator_buffer_ =
157  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
158  }
159 }
160 
161 ResultSet::ResultSet(const std::string& explanation)
162  : device_type_(ExecutorDeviceType::CPU)
163  , device_id_(-1)
164  , fetched_so_far_(0)
165  , separate_varlen_storage_valid_(false)
166  , explanation_(explanation)
167  , just_explain_(true)
168  , for_validation_only_(false)
169  , cached_row_count_(uninitialized_cached_row_count)
170  , geo_return_type_(GeoReturnType::WktString)
171  , cached_(false)
172  , query_exec_time_(0)
173  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
174  , can_use_speculative_top_n_sort(std::nullopt) {}
175 
176 ResultSet::ResultSet(int64_t queue_time_ms,
177  int64_t render_time_ms,
178  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
179  : device_type_(ExecutorDeviceType::CPU)
180  , device_id_(-1)
181  , fetched_so_far_(0)
182  , row_set_mem_owner_(row_set_mem_owner)
183  , timings_(QueryExecutionTimings{queue_time_ms, render_time_ms, 0, 0})
184  , separate_varlen_storage_valid_(false)
185  , just_explain_(true)
186  , for_validation_only_(false)
187  , cached_row_count_(uninitialized_cached_row_count)
188  , geo_return_type_(GeoReturnType::WktString)
189  , cached_(false)
190  , query_exec_time_(0)
191  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
192  , can_use_speculative_top_n_sort(std::nullopt) {}
193 
195  if (storage_) {
196  if (!storage_->buff_is_provided_) {
197  CHECK(storage_->getUnderlyingBuffer());
198  free(storage_->getUnderlyingBuffer());
199  }
200  }
201  for (auto& storage : appended_storage_) {
202  if (storage && !storage->buff_is_provided_) {
203  free(storage->getUnderlyingBuffer());
204  }
205  }
206  if (host_estimator_buffer_) {
207  CHECK(device_type_ == ExecutorDeviceType::CPU || device_estimator_buffer_);
208  free(host_estimator_buffer_);
209  }
210  if (device_estimator_buffer_) {
211  CHECK(data_mgr_);
212  data_mgr_->free(device_estimator_buffer_);
213  }
214 }
215 
216 std::string ResultSet::summaryToString() const {
217  std::ostringstream oss;
218  oss << "Result Set Info" << std::endl;
219  oss << "\tLayout: " << query_mem_desc_.queryDescTypeToString() << std::endl;
220  oss << "\tColumns: " << colCount() << std::endl;
221  oss << "\tRows: " << rowCount() << std::endl;
222  oss << "\tEntry count: " << entryCount() << std::endl;
223  const std::string is_empty = isEmpty() ? "True" : "False";
224  oss << "\tIs empty: " << is_empty << std::endl;
225  const std::string did_output_columnar = didOutputColumnar() ? "True" : "False;";
226  oss << "\tColumnar: " << did_output_columnar << std::endl;
227  oss << "\tLazy-fetched columns: " << getNumColumnsLazyFetched() << std::endl;
228  const std::string is_direct_columnar_conversion_possible =
229  isDirectColumnarConversionPossible() ? "True" : "False";
230  oss << "\tDirect columnar conversion possible: "
231  << is_direct_columnar_conversion_possible << std::endl;
232 
233  size_t num_columns_zero_copy_columnarizable{0};
234  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
235  if (isZeroCopyColumnarConversionPossible(target_idx)) {
236  num_columns_zero_copy_columnarizable++;
237  }
238  }
239  oss << "\tZero-copy columnar conversion columns: "
240  << num_columns_zero_copy_columnarizable << std::endl;
241 
242  oss << "\tPermutation size: " << permutation_.size() << std::endl;
243  oss << "\tLimit: " << keep_first_ << std::endl;
244  oss << "\tOffset: " << drop_first_ << std::endl;
245  return oss.str();
246 }
247 
249  return device_type_;
250 }
251 
253  CHECK(!storage_);
254  CHECK(row_set_mem_owner_);
255  auto buff = row_set_mem_owner_->allocate(
256  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
257  storage_.reset(
258  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
259  return storage_.get();
260 }
261 
263  int8_t* buff,
264  const std::vector<int64_t>& target_init_vals,
265  std::shared_ptr<VarlenOutputInfo> varlen_output_info) const {
266  CHECK(buff);
267  CHECK(!storage_);
268  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, true));
269  // TODO: add both to the constructor
270  storage_->target_init_vals_ = target_init_vals;
271  if (varlen_output_info) {
272  storage_->varlen_output_info_ = varlen_output_info;
273  }
274  return storage_.get();
275 }
276 
278  const std::vector<int64_t>& target_init_vals) const {
279  CHECK(!storage_);
280  CHECK(row_set_mem_owner_);
281  auto buff = row_set_mem_owner_->allocate(
282  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
283  storage_.reset(
284  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
285  storage_->target_init_vals_ = target_init_vals;
286  return storage_.get();
287 }
288 
290  if (crt_row_buff_idx_ == 0) {
291  throw std::runtime_error("current row buffer iteration index is undefined");
292  }
293  return crt_row_buff_idx_ - 1;
294 }
295 
296 // Note: that.appended_storage_ does not get appended to this.
297 void ResultSet::append(ResultSet& that) {
298  invalidateCachedRowCount();
299  if (!that.storage_) {
300  return;
301  }
302  appended_storage_.push_back(std::move(that.storage_));
303  query_mem_desc_.setEntryCount(
304  query_mem_desc_.getEntryCount() +
305  appended_storage_.back()->query_mem_desc_.getEntryCount());
306  chunks_.insert(chunks_.end(), that.chunks_.begin(), that.chunks_.end());
307  col_buffers_.insert(
308  col_buffers_.end(), that.col_buffers_.begin(), that.col_buffers_.end());
309  frag_offsets_.insert(
310  frag_offsets_.end(), that.frag_offsets_.begin(), that.frag_offsets_.end());
311  consistent_frag_sizes_.insert(consistent_frag_sizes_.end(),
312  that.consistent_frag_sizes_.begin(),
313  that.consistent_frag_sizes_.end());
314  chunk_iters_.insert(
315  chunk_iters_.end(), that.chunk_iters_.begin(), that.chunk_iters_.end());
316  if (separate_varlen_storage_valid_) {
317  CHECK(that.separate_varlen_storage_valid_);
318  serialized_varlen_buffer_.insert(serialized_varlen_buffer_.end(),
319  that.serialized_varlen_buffer_.begin(),
320  that.serialized_varlen_buffer_.end());
321  }
322  for (auto& buff : that.literal_buffers_) {
323  literal_buffers_.push_back(std::move(buff));
324  }
325 }
326 
328  auto timer = DEBUG_TIMER(__func__);
329  if (!storage_) {
330  return nullptr;
331  }
332 
333  auto executor = getExecutor();
334  CHECK(executor);
335  ResultSetPtr copied_rs = std::make_shared<ResultSet>(targets_,
336  device_type_,
337  query_mem_desc_,
338  row_set_mem_owner_,
339  executor->blockSize(),
340  executor->gridSize());
341 
342  auto allocate_and_copy_storage =
343  [&](const ResultSetStorage* prev_storage) -> std::unique_ptr<ResultSetStorage> {
344  const auto& prev_qmd = prev_storage->query_mem_desc_;
345  const auto storage_size = prev_qmd.getBufferSizeBytes(device_type_);
346  auto buff = row_set_mem_owner_->allocate(storage_size, /*thread_idx=*/0);
347  std::unique_ptr<ResultSetStorage> new_storage;
348  new_storage.reset(new ResultSetStorage(
349  prev_storage->targets_, prev_qmd, buff, /*buff_is_provided=*/true));
350  new_storage->target_init_vals_ = prev_storage->target_init_vals_;
351  if (prev_storage->varlen_output_info_) {
352  new_storage->varlen_output_info_ = prev_storage->varlen_output_info_;
353  }
354  memcpy(new_storage->buff_, prev_storage->buff_, storage_size);
355  new_storage->query_mem_desc_ = prev_qmd;
356  return new_storage;
357  };
358 
359  copied_rs->storage_ = allocate_and_copy_storage(storage_.get());
360  if (!appended_storage_.empty()) {
361  for (const auto& storage : appended_storage_) {
362  copied_rs->appended_storage_.push_back(allocate_and_copy_storage(storage.get()));
363  }
364  }
365  std::copy(chunks_.begin(), chunks_.end(), std::back_inserter(copied_rs->chunks_));
366  std::copy(chunk_iters_.begin(),
367  chunk_iters_.end(),
368  std::back_inserter(copied_rs->chunk_iters_));
369  std::copy(col_buffers_.begin(),
370  col_buffers_.end(),
371  std::back_inserter(copied_rs->col_buffers_));
372  std::copy(frag_offsets_.begin(),
373  frag_offsets_.end(),
374  std::back_inserter(copied_rs->frag_offsets_));
375  std::copy(consistent_frag_sizes_.begin(),
376  consistent_frag_sizes_.end(),
377  std::back_inserter(copied_rs->consistent_frag_sizes_));
378  if (separate_varlen_storage_valid_) {
379  std::copy(serialized_varlen_buffer_.begin(),
380  serialized_varlen_buffer_.end(),
381  std::back_inserter(copied_rs->serialized_varlen_buffer_));
382  }
383  std::copy(literal_buffers_.begin(),
384  literal_buffers_.end(),
385  std::back_inserter(copied_rs->literal_buffers_));
386  std::copy(lazy_fetch_info_.begin(),
387  lazy_fetch_info_.end(),
388  std::back_inserter(copied_rs->lazy_fetch_info_));
389 
390  copied_rs->permutation_ = permutation_;
391  copied_rs->drop_first_ = drop_first_;
392  copied_rs->keep_first_ = keep_first_;
393  copied_rs->separate_varlen_storage_valid_ = separate_varlen_storage_valid_;
394  copied_rs->query_exec_time_ = query_exec_time_;
395  copied_rs->input_table_keys_ = input_table_keys_;
396  copied_rs->target_meta_info_ = target_meta_info_;
397  copied_rs->geo_return_type_ = geo_return_type_;
398  copied_rs->query_plan_ = query_plan_;
399  if (can_use_speculative_top_n_sort) {
400  copied_rs->can_use_speculative_top_n_sort = can_use_speculative_top_n_sort;
401  }
402 
403  return copied_rs;
404 }
405 
407  return storage_.get();
408 }
409 
410 size_t ResultSet::colCount() const {
411  return just_explain_ ? 1 : targets_.size();
412 }
413 
414 SQLTypeInfo ResultSet::getColType(const size_t col_idx) const {
415  if (just_explain_) {
416  return SQLTypeInfo(kTEXT, false);
417  }
418  CHECK_LT(col_idx, targets_.size());
419  return targets_[col_idx].agg_kind == kAVG ? SQLTypeInfo(kDOUBLE, false)
420  : targets_[col_idx].sql_type;
421 }
422 
424  const shared::StringDictKey& dict_key) const {
425  constexpr bool with_generation = true;
426  return (dict_key.db_id > 0 || dict_key.dict_id == DictRef::literalsDictId)
427  ? row_set_mem_owner_->getOrAddStringDictProxy(dict_key, with_generation)
428  : row_set_mem_owner_->getStringDictProxy(dict_key);
429 }
430 
433  int64_t const null_int_;
434 
435  public:
436  CellCallback(StringDictionaryProxy::IdMap&& id_map, int64_t const null_int)
437  : id_map_(std::move(id_map)), null_int_(null_int) {}
438  void operator()(int8_t const* const cell_ptr) const {
439  using StringId = int32_t;
440  StringId* const string_id_ptr =
441  const_cast<StringId*>(reinterpret_cast<StringId const*>(cell_ptr));
442  if (*string_id_ptr != null_int_) {
443  *string_id_ptr = id_map_[*string_id_ptr];
444  }
445  }
446 };
447 
448 // Update any dictionary-encoded targets within storage_ with the corresponding
449 // dictionary in the given targets parameter, if their comp_param (dictionary) differs.
450 // This may modify both the storage_ values and storage_ targets.
451 // Does not iterate through appended_storage_.
452 // Iterate over targets starting at index target_idx.
453 void ResultSet::translateDictEncodedColumns(std::vector<TargetInfo> const& targets,
454  size_t const start_idx) {
455  if (storage_) {
456  CHECK_EQ(targets.size(), storage_->targets_.size());
457  RowIterationState state;
458  for (size_t target_idx = start_idx; target_idx < targets.size(); ++target_idx) {
459  auto const& type_lhs = targets[target_idx].sql_type;
460  if (type_lhs.is_dict_encoded_string()) {
461  auto& type_rhs =
462  const_cast<SQLTypeInfo&>(storage_->targets_[target_idx].sql_type);
463  CHECK(type_rhs.is_dict_encoded_string());
464  if (type_lhs.getStringDictKey() != type_rhs.getStringDictKey()) {
465  auto* const sdp_lhs = getStringDictionaryProxy(type_lhs.getStringDictKey());
466  CHECK(sdp_lhs);
467  auto const* const sdp_rhs =
468  getStringDictionaryProxy(type_rhs.getStringDictKey());
469  CHECK(sdp_rhs);
470  state.cur_target_idx_ = target_idx;
471  CellCallback const translate_string_ids(sdp_lhs->transientUnion(*sdp_rhs),
472  inline_int_null_val(type_rhs));
473  eachCellInColumn(state, translate_string_ids);
474  type_rhs.set_comp_param(type_lhs.get_comp_param());
475  type_rhs.setStringDictKey(type_lhs.getStringDictKey());
476  }
477  }
478  }
479  }
480 }
481 
482 // For each cell in column target_idx, callback func with pointer to datum.
483 // This currently assumes the column type is a dictionary-encoded string, but this logic
484 // can be generalized to other types.
485 void ResultSet::eachCellInColumn(RowIterationState& state, CellCallback const& func) {
486  size_t const target_idx = state.cur_target_idx_;
487  QueryMemoryDescriptor& storage_qmd = storage_->query_mem_desc_;
488  CHECK_LT(target_idx, lazy_fetch_info_.size());
489  auto& col_lazy_fetch = lazy_fetch_info_[target_idx];
490  CHECK(col_lazy_fetch.is_lazily_fetched);
491  int const target_size = storage_->targets_[target_idx].sql_type.get_size();
492  CHECK_LT(0, target_size) << storage_->targets_[target_idx].toString();
493  size_t const nrows = storage_->binSearchRowCount();
494  if (storage_qmd.didOutputColumnar()) {
495  // Logic based on ResultSet::ColumnWiseTargetAccessor::initializeOffsetsForStorage()
496  if (state.buf_ptr_ == nullptr) {
497  state.buf_ptr_ = get_cols_ptr(storage_->buff_, storage_qmd);
498  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
499  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
500  : query_mem_desc_.getEffectiveKeyWidth();
501  }
502  for (size_t j = state.prev_target_idx_; j < state.cur_target_idx_; ++j) {
503  size_t const next_target_idx = j + 1; // Set state to reflect next target_idx j+1
505  state.buf_ptr_, storage_qmd, state.agg_idx_);
506  auto const& next_agg_info = storage_->targets_[next_target_idx];
507  state.agg_idx_ =
508  advance_slot(state.agg_idx_, next_agg_info, separate_varlen_storage_valid_);
509  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
510  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
511  : query_mem_desc_.getEffectiveKeyWidth();
512  }
513  for (size_t i = 0; i < nrows; ++i) {
514  int8_t const* const pos_ptr = state.buf_ptr_ + i * state.compact_sz1_;
515  int64_t pos = read_int_from_buff(pos_ptr, target_size);
516  CHECK_GE(pos, 0);
517  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
518  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
519  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
520  func(col_frag + pos * target_size);
521  }
522  } else {
523  size_t const key_bytes_with_padding =
525  for (size_t i = 0; i < nrows; ++i) {
526  int8_t const* const keys_ptr = row_ptr_rowwise(storage_->buff_, storage_qmd, i);
527  int8_t const* const rowwise_target_ptr = keys_ptr + key_bytes_with_padding;
528  int64_t pos = *reinterpret_cast<int64_t const*>(rowwise_target_ptr);
529  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
530  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
531  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
532  func(col_frag + pos * target_size);
533  }
534  }
535 }
536 
537 namespace {
538 
539 size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset) {
540  if (total_row_count < offset) {
541  return 0;
542  }
543 
544  size_t total_truncated_row_count = total_row_count - offset;
545 
546  if (limit) {
547  return std::min(total_truncated_row_count, limit);
548  }
549 
550  return total_truncated_row_count;
551 }
552 
553 } // namespace
554 
555 size_t ResultSet::rowCountImpl(const bool force_parallel) const {
556  if (just_explain_) {
557  return 1;
558  }
559  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::TableFunction) {
560  return entryCount();
561  }
562  if (!permutation_.empty()) {
563  // keep_first_ corresponds to SQL LIMIT
564  // drop_first_ corresponds to SQL OFFSET
565  return get_truncated_row_count(permutation_.size(), keep_first_, drop_first_);
566  }
567  if (!storage_) {
568  return 0;
569  }
570  CHECK(permutation_.empty());
571  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection) {
572  return binSearchRowCount();
573  }
574 
575  constexpr size_t auto_parallel_row_count_threshold{20000UL};
576  if (force_parallel || entryCount() >= auto_parallel_row_count_threshold) {
577  return parallelRowCount();
578  }
579  std::lock_guard<std::mutex> lock(row_iteration_mutex_);
580  moveToBegin();
581  size_t row_count{0};
582  while (true) {
583  auto crt_row = getNextRowUnlocked(false, false);
584  if (crt_row.empty()) {
585  break;
586  }
587  ++row_count;
588  }
589  moveToBegin();
590  return row_count;
591 }
592 
593 size_t ResultSet::rowCount(const bool force_parallel) const {
594  // cached_row_count_ is atomic, so fetch it into a local variable first
595  // to avoid repeat fetches
596  const int64_t cached_row_count = cached_row_count_;
597  if (cached_row_count != uninitialized_cached_row_count) {
598  CHECK_GE(cached_row_count, 0);
599  return cached_row_count;
600  }
601  setCachedRowCount(rowCountImpl(force_parallel));
602  return cached_row_count_;
603 }
604 
606  cached_row_count_ = uninitialized_cached_row_count;
607 }
608 
609 void ResultSet::setCachedRowCount(const size_t row_count) const {
610  const int64_t signed_row_count = static_cast<int64_t>(row_count);
611  const int64_t old_cached_row_count = cached_row_count_.exchange(signed_row_count);
612  CHECK(old_cached_row_count == uninitialized_cached_row_count ||
613  old_cached_row_count == signed_row_count);
614 }
615 
617  if (!storage_) {
618  return 0;
619  }
620 
621  size_t row_count = storage_->binSearchRowCount();
622  for (auto& s : appended_storage_) {
623  row_count += s->binSearchRowCount();
624  }
625 
626  return get_truncated_row_count(row_count, getLimit(), drop_first_);
627 }
628 
630  using namespace threading;
631  auto execute_parallel_row_count =
632  [this, parent_thread_local_ids = logger::thread_local_ids()](
633  const blocked_range<size_t>& r, size_t row_count) {
634  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
635  for (size_t i = r.begin(); i < r.end(); ++i) {
636  if (!isRowAtEmpty(i)) {
637  ++row_count;
638  }
639  }
640  return row_count;
641  };
642  const auto row_count = parallel_reduce(blocked_range<size_t>(0, entryCount()),
643  size_t(0),
644  execute_parallel_row_count,
645  std::plus<int>());
646  return get_truncated_row_count(row_count, getLimit(), drop_first_);
647 }
648 
649 bool ResultSet::isEmpty() const {
650  // To simplify this function and de-dup logic with ResultSet::rowCount()
651  // (mismatches between the two were causing bugs), we modified this function
652  // to simply fetch rowCount(). The potential downside of this approach is that
653  // in some cases more work will need to be done, as we can't just stop at the first row.
654  // Mitigating that for most cases is the following:
655  // 1) rowCount() is cached, so the logic for actually computing row counts will run only
656  // once
657  // per result set.
658  // 2) If the cache is empty (cached_row_count_ == -1), rowCount() will use parallel
659  // methods if deemed appropriate, which in many cases could be faster for a sparse
660  // large result set that single-threaded iteration from the beginning
661  // 3) Often where isEmpty() is needed, rowCount() is also needed. Since the first call
662  // to rowCount()
663  // will be cached, there is no extra overhead in these cases
664 
665  return rowCount() == size_t(0);
666 }
667 
669  return (!storage_ && !estimator_ && !just_explain_) || cached_row_count_ == 0;
670 }
671 
673  CHECK(storage_);
674  return storage_->query_mem_desc_;
675 }
676 
677 const std::vector<TargetInfo>& ResultSet::getTargetInfos() const {
678  return targets_;
679 }
680 
681 const std::vector<int64_t>& ResultSet::getTargetInitVals() const {
682  CHECK(storage_);
683  return storage_->target_init_vals_;
684 }
685 
687  CHECK(device_type_ == ExecutorDeviceType::GPU);
688  CHECK(device_estimator_buffer_);
689  return device_estimator_buffer_->getMemoryPtr();
690 }
691 
693  return host_estimator_buffer_;
694 }
695 
697  CHECK(device_type_ == ExecutorDeviceType::GPU);
698  CHECK(!host_estimator_buffer_);
699  CHECK_EQ(size_t(0), estimator_->getBufferSize() % sizeof(int64_t));
700  host_estimator_buffer_ =
701  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
702  CHECK(device_estimator_buffer_);
703  auto device_buffer_ptr = device_estimator_buffer_->getMemoryPtr();
704  auto allocator = std::make_unique<CudaAllocator>(
705  data_mgr_, device_id_, getQueryEngineCudaStreamForDevice(device_id_));
706  allocator->copyFromDevice(
707  host_estimator_buffer_, device_buffer_ptr, estimator_->getBufferSize());
708 }
709 
710 void ResultSet::setQueueTime(const int64_t queue_time) {
711  timings_.executor_queue_time = queue_time;
712 }
713 
714 void ResultSet::setKernelQueueTime(const int64_t kernel_queue_time) {
715  timings_.kernel_queue_time = kernel_queue_time;
716 }
717 
718 void ResultSet::addCompilationQueueTime(const int64_t compilation_queue_time) {
719  timings_.compilation_queue_time += compilation_queue_time;
720 }
721 
722 int64_t ResultSet::getQueueTime() const {
723  return timings_.executor_queue_time + timings_.kernel_queue_time +
724  timings_.compilation_queue_time;
725 }
726 
727 int64_t ResultSet::getRenderTime() const {
728  return timings_.render_time;
729 }
730 
732  crt_row_buff_idx_ = 0;
733  fetched_so_far_ = 0;
734 }
735 
737  return keep_first_ + drop_first_;
738 }
739 
740 bool ResultSet::isExplain() const {
741  return just_explain_;
742 }
743 
745  for_validation_only_ = true;
746 }
747 
749  return for_validation_only_;
750 }
751 
753  return device_id_;
754 }
755 
758  auto query_mem_desc_copy = query_mem_desc;
759  query_mem_desc_copy.resetGroupColWidths(
760  std::vector<int8_t>(query_mem_desc_copy.getGroupbyColCount(), 8));
761  if (query_mem_desc.didOutputColumnar()) {
762  return query_mem_desc_copy;
763  }
764  query_mem_desc_copy.alignPaddedSlots();
765  return query_mem_desc_copy;
766 }
767 
768 void ResultSet::sort(const std::list<Analyzer::OrderEntry>& order_entries,
769  size_t top_n,
770  const Executor* executor) {
771  auto timer = DEBUG_TIMER(__func__);
772 
773  if (!storage_) {
774  return;
775  }
776  invalidateCachedRowCount();
777  CHECK(!targets_.empty());
778 #ifdef HAVE_CUDA
779  if (canUseFastBaselineSort(order_entries, top_n)) {
780  baselineSort(order_entries, top_n, executor);
781  return;
782  }
783 #endif // HAVE_CUDA
784  if (query_mem_desc_.sortOnGpu()) {
785  try {
786  radixSortOnGpu(order_entries);
787  } catch (const OutOfMemory&) {
788  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
789  radixSortOnCpu(order_entries);
790  } catch (const std::bad_alloc&) {
791  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
792  radixSortOnCpu(order_entries);
793  }
794  return;
795  }
796  // This check isn't strictly required, but allows the index buffer to be 32-bit.
797  if (query_mem_desc_.getEntryCount() > std::numeric_limits<uint32_t>::max()) {
798  throw RowSortException("Sorting more than 4B elements not supported");
799  }
800 
801  CHECK(permutation_.empty());
802 
803  if (top_n && g_parallel_top_min < entryCount()) {
804  if (g_enable_watchdog && g_parallel_top_max < entryCount()) {
805  throw WatchdogException("Sorting the result would be too slow");
806  }
807  parallelTop(order_entries, top_n, executor);
808  } else {
809  if (g_enable_watchdog && Executor::baseline_threshold < entryCount()) {
810  throw WatchdogException("Sorting the result would be too slow");
811  }
812  permutation_.resize(query_mem_desc_.getEntryCount());
813  // PermutationView is used to share common API with parallelTop().
814  PermutationView pv(permutation_.data(), 0, permutation_.size());
815  pv = initPermutationBuffer(pv, 0, permutation_.size());
816  if (top_n == 0) {
817  top_n = pv.size(); // top_n == 0 implies a full sort
818  }
819  pv = topPermutation(pv, top_n, createComparator(order_entries, pv, executor, false));
820  if (pv.size() < permutation_.size()) {
821  permutation_.resize(pv.size());
822  permutation_.shrink_to_fit();
823  }
824  }
825 }
826 
827 #ifdef HAVE_CUDA
828 void ResultSet::baselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
829  const size_t top_n,
830  const Executor* executor) {
831  auto timer = DEBUG_TIMER(__func__);
832  // If we only have on GPU, it's usually faster to do multi-threaded radix sort on CPU
833  if (getGpuCount() > 1) {
834  try {
835  doBaselineSort(ExecutorDeviceType::GPU, order_entries, top_n, executor);
836  } catch (...) {
837  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
838  }
839  } else {
840  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
841  }
842 }
843 #endif // HAVE_CUDA
844 
845 // Append non-empty indexes i in [begin,end) from findStorage(i) to permutation.
847  PermutationIdx const begin,
848  PermutationIdx const end) const {
849  auto timer = DEBUG_TIMER(__func__);
850  for (PermutationIdx i = begin; i < end; ++i) {
851  const auto storage_lookup_result = findStorage(i);
852  const auto lhs_storage = storage_lookup_result.storage_ptr;
853  const auto off = storage_lookup_result.fixedup_entry_idx;
854  CHECK(lhs_storage);
855  if (!lhs_storage->isEmptyEntry(off)) {
856  permutation.push_back(i);
857  }
858  }
859  return permutation;
860 }
861 
863  return permutation_;
864 }
865 
866 void ResultSet::parallelTop(const std::list<Analyzer::OrderEntry>& order_entries,
867  const size_t top_n,
868  const Executor* executor) {
869  auto timer = DEBUG_TIMER(__func__);
870  const size_t nthreads = cpu_threads();
871 
872  // Split permutation_ into nthreads subranges and top-sort in-place.
873  permutation_.resize(query_mem_desc_.getEntryCount());
874  std::vector<PermutationView> permutation_views(nthreads);
875  threading::task_group top_sort_threads;
876  for (auto interval : makeIntervals<PermutationIdx>(0, permutation_.size(), nthreads)) {
877  top_sort_threads.run([this,
878  &order_entries,
879  &permutation_views,
880  top_n,
881  executor,
882  parent_thread_local_ids = logger::thread_local_ids(),
883  interval] {
884  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
885  PermutationView pv(permutation_.data() + interval.begin, 0, interval.size());
886  pv = initPermutationBuffer(pv, interval.begin, interval.end);
887  const auto compare = createComparator(order_entries, pv, executor, true);
888  permutation_views[interval.index] = topPermutation(pv, top_n, compare);
889  });
890  }
891  top_sort_threads.wait();
892 
893  // In case you are considering implementing a parallel reduction, note that the
894  // ResultSetComparator constructor is O(N) in order to materialize some of the aggregate
895  // columns as necessary to perform a comparison. This cost is why reduction is chosen to
896  // be serial instead; only one more Comparator is needed below.
897 
898  // Left-copy disjoint top-sorted subranges into one contiguous range.
899  // ++++....+++.....+++++... -> ++++++++++++............
900  auto end = permutation_.begin() + permutation_views.front().size();
901  for (size_t i = 1; i < nthreads; ++i) {
902  std::copy(permutation_views[i].begin(), permutation_views[i].end(), end);
903  end += permutation_views[i].size();
904  }
905 
906  // Top sort final range.
907  PermutationView pv(permutation_.data(), end - permutation_.begin());
908  const auto compare = createComparator(order_entries, pv, executor, false);
909  pv = topPermutation(pv, top_n, compare);
910  permutation_.resize(pv.size());
911  permutation_.shrink_to_fit();
912 }
913 
914 std::pair<size_t, size_t> ResultSet::getStorageIndex(const size_t entry_idx) const {
915  size_t fixedup_entry_idx = entry_idx;
916  auto entry_count = storage_->query_mem_desc_.getEntryCount();
917  const bool is_rowwise_layout = !storage_->query_mem_desc_.didOutputColumnar();
918  if (fixedup_entry_idx < entry_count) {
919  return {0, fixedup_entry_idx};
920  }
921  fixedup_entry_idx -= entry_count;
922  for (size_t i = 0; i < appended_storage_.size(); ++i) {
923  const auto& desc = appended_storage_[i]->query_mem_desc_;
924  CHECK_NE(is_rowwise_layout, desc.didOutputColumnar());
925  entry_count = desc.getEntryCount();
926  if (fixedup_entry_idx < entry_count) {
927  return {i + 1, fixedup_entry_idx};
928  }
929  fixedup_entry_idx -= entry_count;
930  }
931  UNREACHABLE() << "entry_idx = " << entry_idx << ", query_mem_desc_.getEntryCount() = "
932  << query_mem_desc_.getEntryCount();
933  return {};
934 }
935 
938 
940  auto [stg_idx, fixedup_entry_idx] = getStorageIndex(entry_idx);
941  return {stg_idx ? appended_storage_[stg_idx - 1].get() : storage_.get(),
942  fixedup_entry_idx,
943  stg_idx};
944 }
945 
946 template <typename BUFFER_ITERATOR_TYPE>
948  BUFFER_ITERATOR_TYPE>::materializeCountDistinctColumns() {
949  for (const auto& order_entry : order_entries_) {
950  if (is_distinct_target(result_set_->targets_[order_entry.tle_no - 1])) {
951  count_distinct_materialized_buffers_.emplace_back(
952  materializeCountDistinctColumn(order_entry));
953  }
954  }
955 }
956 
957 namespace {
958 struct IsAggKind {
959  std::vector<TargetInfo> const& targets_;
961  IsAggKind(std::vector<TargetInfo> const& targets, SQLAgg const agg_kind)
962  : targets_(targets), agg_kind_(agg_kind) {}
963  bool operator()(Analyzer::OrderEntry const& order_entry) const {
964  return targets_[order_entry.tle_no - 1].agg_kind == agg_kind_;
965  }
966 };
967 } // namespace
968 
969 template <typename BUFFER_ITERATOR_TYPE>
971  BUFFER_ITERATOR_TYPE>::materializeApproxQuantileColumns() const {
972  ResultSet::ApproxQuantileBuffers approx_quantile_materialized_buffers;
973  for (const auto& order_entry : order_entries_) {
974  if (result_set_->targets_[order_entry.tle_no - 1].agg_kind == kAPPROX_QUANTILE) {
975  approx_quantile_materialized_buffers.emplace_back(
976  materializeApproxQuantileColumn(order_entry));
977  }
978  }
979  return approx_quantile_materialized_buffers;
980 }
981 
982 template <typename BUFFER_ITERATOR_TYPE>
985  ResultSet::ModeBuffers mode_buffers;
986  IsAggKind const is_mode(result_set_->targets_, kMODE);
987  mode_buffers.reserve(
988  std::count_if(order_entries_.begin(), order_entries_.end(), is_mode));
989  for (auto const& order_entry : order_entries_) {
990  if (is_mode(order_entry)) {
991  mode_buffers.emplace_back(materializeModeColumn(order_entry));
992  }
993  }
994  return mode_buffers;
995 }
996 
997 template <typename BUFFER_ITERATOR_TYPE>
998 std::vector<int64_t>
1000  const Analyzer::OrderEntry& order_entry) const {
1001  const size_t num_storage_entries = result_set_->query_mem_desc_.getEntryCount();
1002  std::vector<int64_t> count_distinct_materialized_buffer(num_storage_entries);
1003  const CountDistinctDescriptor count_distinct_descriptor =
1004  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no - 1);
1005  const size_t num_non_empty_entries = permutation_.size();
1006 
1007  const auto work = [&, parent_thread_local_ids = logger::thread_local_ids()](
1008  const size_t start, const size_t end) {
1009  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1010  for (size_t i = start; i < end; ++i) {
1011  const PermutationIdx permuted_idx = permutation_[i];
1012  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1013  const auto storage = storage_lookup_result.storage_ptr;
1014  const auto off = storage_lookup_result.fixedup_entry_idx;
1015  const auto value = buffer_itr_.getColumnInternal(
1016  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1017  count_distinct_materialized_buffer[permuted_idx] =
1018  count_distinct_set_size(value.i1, count_distinct_descriptor);
1019  }
1020  };
1021  // TODO(tlm): Allow use of tbb after we determine how to easily encapsulate the choice
1022  // between thread pool types
1023  if (single_threaded_) {
1024  work(0, num_non_empty_entries);
1025  } else {
1026  threading::task_group thread_pool;
1027  for (auto interval : makeIntervals<size_t>(0, num_non_empty_entries, cpu_threads())) {
1028  thread_pool.run([=] { work(interval.begin, interval.end); });
1029  }
1030  thread_pool.wait();
1031  }
1032  return count_distinct_materialized_buffer;
1033 }
1034 
1036  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1037  CHECK(t_digest);
1038  t_digest->mergeBufferFinal();
1039  double const quantile = t_digest->quantile();
1040  return boost::math::isnan(quantile) ? NULL_DOUBLE : quantile;
1041 }
1042 
1043 template <typename BUFFER_ITERATOR_TYPE>
1044 ResultSet::ApproxQuantileBuffers::value_type
1046  const Analyzer::OrderEntry& order_entry) const {
1047  ResultSet::ApproxQuantileBuffers::value_type materialized_buffer(
1048  result_set_->query_mem_desc_.getEntryCount());
1049  const size_t size = permutation_.size();
1050  const auto work = [&, parent_thread_local_ids = logger::thread_local_ids()](
1051  const size_t start, const size_t end) {
1052  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1053  for (size_t i = start; i < end; ++i) {
1054  const PermutationIdx permuted_idx = permutation_[i];
1055  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1056  const auto storage = storage_lookup_result.storage_ptr;
1057  const auto off = storage_lookup_result.fixedup_entry_idx;
1058  const auto value = buffer_itr_.getColumnInternal(
1059  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1060  materialized_buffer[permuted_idx] =
1061  value.i1 ? calculateQuantile(reinterpret_cast<quantile::TDigest*>(value.i1))
1062  : NULL_DOUBLE;
1063  }
1064  };
1065  if (single_threaded_) {
1066  work(0, size);
1067  } else {
1068  threading::task_group thread_pool;
1069  for (auto interval : makeIntervals<size_t>(0, size, cpu_threads())) {
1070  thread_pool.run([=] { work(interval.begin, interval.end); });
1071  }
1072  thread_pool.wait();
1073  }
1074  return materialized_buffer;
1075 }
1076 
1077 namespace {
1078 // i1 is from InternalTargetValue
1079 int64_t materializeMode(int64_t const i1) {
1080  if (auto const* const agg_mode = reinterpret_cast<AggMode const*>(i1)) {
1081  if (std::optional<int64_t> const mode = agg_mode->mode()) {
1082  return *mode;
1083  }
1084  }
1085  return NULL_BIGINT;
1086 }
1087 
1088 using ModeBlockedRange = tbb::blocked_range<size_t>;
1089 } // namespace
1090 
1091 template <typename BUFFER_ITERATOR_TYPE>
1092 struct ResultSet::ResultSetComparator<BUFFER_ITERATOR_TYPE>::ModeScatter {
1096  ResultSet::ModeBuffers::value_type& materialized_buffer_;
1097 
1098  void operator()(ModeBlockedRange const& r) const {
1099  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids_.setNewThreadId();
1100  for (size_t i = r.begin(); i != r.end(); ++i) {
1101  PermutationIdx const permuted_idx = rsc_->permutation_[i];
1102  auto const storage_lookup_result = rsc_->result_set_->findStorage(permuted_idx);
1103  auto const storage = storage_lookup_result.storage_ptr;
1104  auto const off = storage_lookup_result.fixedup_entry_idx;
1105  auto const value = rsc_->buffer_itr_.getColumnInternal(
1106  storage->buff_, off, order_entry_.tle_no - 1, storage_lookup_result);
1107  materialized_buffer_[permuted_idx] = materializeMode(value.i1);
1108  }
1109  }
1110 };
1111 
1112 template <typename BUFFER_ITERATOR_TYPE>
1113 ResultSet::ModeBuffers::value_type
1115  const Analyzer::OrderEntry& order_entry) const {
1116  ResultSet::ModeBuffers::value_type materialized_buffer(
1117  result_set_->query_mem_desc_.getEntryCount());
1118  ModeScatter mode_scatter{
1119  logger::thread_local_ids(), this, order_entry, materialized_buffer};
1120  if (single_threaded_) {
1121  mode_scatter(ModeBlockedRange(0, permutation_.size())); // Still has new thread_id.
1122  } else {
1123  tbb::parallel_for(ModeBlockedRange(0, permutation_.size()), mode_scatter);
1124  }
1125  return materialized_buffer;
1126 }
1127 
1128 template <typename BUFFER_ITERATOR_TYPE>
1130  const PermutationIdx lhs,
1131  const PermutationIdx rhs) const {
1132  // NB: The compare function must define a strict weak ordering, otherwise
1133  // std::sort will trigger a segmentation fault (or corrupt memory).
1134  const auto lhs_storage_lookup_result = result_set_->findStorage(lhs);
1135  const auto rhs_storage_lookup_result = result_set_->findStorage(rhs);
1136  const auto lhs_storage = lhs_storage_lookup_result.storage_ptr;
1137  const auto rhs_storage = rhs_storage_lookup_result.storage_ptr;
1138  const auto fixedup_lhs = lhs_storage_lookup_result.fixedup_entry_idx;
1139  const auto fixedup_rhs = rhs_storage_lookup_result.fixedup_entry_idx;
1140  size_t materialized_count_distinct_buffer_idx{0};
1141  size_t materialized_approx_quantile_buffer_idx{0};
1142  size_t materialized_mode_buffer_idx{0};
1143 
1144  for (const auto& order_entry : order_entries_) {
1145  CHECK_GE(order_entry.tle_no, 1);
1146  // lhs_entry_ti and rhs_entry_ti can differ on comp_param w/ UNION of string dicts.
1147  const auto& lhs_agg_info = lhs_storage->targets_[order_entry.tle_no - 1];
1148  const auto& rhs_agg_info = rhs_storage->targets_[order_entry.tle_no - 1];
1149  const auto lhs_entry_ti = get_compact_type(lhs_agg_info);
1150  const auto rhs_entry_ti = get_compact_type(rhs_agg_info);
1151  // When lhs vs rhs doesn't matter, the lhs is used. For example:
1152  bool float_argument_input = takes_float_argument(lhs_agg_info);
1153  // Need to determine if the float value has been stored as float
1154  // or if it has been compacted to a different (often larger 8 bytes)
1155  // in distributed case the floats are actually 4 bytes
1156  // TODO the above takes_float_argument() is widely used wonder if this problem
1157  // exists elsewhere
1158  if (lhs_entry_ti.get_type() == kFLOAT) {
1159  const auto is_col_lazy =
1160  !result_set_->lazy_fetch_info_.empty() &&
1161  result_set_->lazy_fetch_info_[order_entry.tle_no - 1].is_lazily_fetched;
1162  if (result_set_->query_mem_desc_.getPaddedSlotWidthBytes(order_entry.tle_no - 1) ==
1163  sizeof(float)) {
1164  float_argument_input =
1165  result_set_->query_mem_desc_.didOutputColumnar() ? !is_col_lazy : true;
1166  }
1167  }
1168 
1169  if (UNLIKELY(is_distinct_target(lhs_agg_info))) {
1170  CHECK_LT(materialized_count_distinct_buffer_idx,
1171  count_distinct_materialized_buffers_.size());
1172 
1173  const auto& count_distinct_materialized_buffer =
1174  count_distinct_materialized_buffers_[materialized_count_distinct_buffer_idx];
1175  const auto lhs_sz = count_distinct_materialized_buffer[lhs];
1176  const auto rhs_sz = count_distinct_materialized_buffer[rhs];
1177  ++materialized_count_distinct_buffer_idx;
1178  if (lhs_sz == rhs_sz) {
1179  continue;
1180  }
1181  return (lhs_sz < rhs_sz) != order_entry.is_desc;
1182  } else if (UNLIKELY(lhs_agg_info.agg_kind == kAPPROX_QUANTILE)) {
1183  CHECK_LT(materialized_approx_quantile_buffer_idx,
1184  approx_quantile_materialized_buffers_.size());
1185  const auto& approx_quantile_materialized_buffer =
1186  approx_quantile_materialized_buffers_[materialized_approx_quantile_buffer_idx];
1187  const auto lhs_value = approx_quantile_materialized_buffer[lhs];
1188  const auto rhs_value = approx_quantile_materialized_buffer[rhs];
1189  ++materialized_approx_quantile_buffer_idx;
1190  if (lhs_value == rhs_value) {
1191  continue;
1192  } else if (!lhs_entry_ti.get_notnull()) {
1193  if (lhs_value == NULL_DOUBLE) {
1194  return order_entry.nulls_first;
1195  } else if (rhs_value == NULL_DOUBLE) {
1196  return !order_entry.nulls_first;
1197  }
1198  }
1199  return (lhs_value < rhs_value) != order_entry.is_desc;
1200  } else if (UNLIKELY(lhs_agg_info.agg_kind == kMODE)) {
1201  CHECK_LT(materialized_mode_buffer_idx, mode_buffers_.size());
1202  auto const& mode_buffer = mode_buffers_[materialized_mode_buffer_idx++];
1203  int64_t const lhs_value = mode_buffer[lhs];
1204  int64_t const rhs_value = mode_buffer[rhs];
1205  if (lhs_value == rhs_value) {
1206  continue;
1207  // MODE(x) can only be NULL when the group is empty, since it skips null values.
1208  } else if (lhs_value == NULL_BIGINT) { // NULL_BIGINT from materializeMode()
1209  return order_entry.nulls_first;
1210  } else if (rhs_value == NULL_BIGINT) {
1211  return !order_entry.nulls_first;
1212  } else {
1213  return result_set_->isLessThan(lhs_entry_ti, lhs_value, rhs_value) !=
1214  order_entry.is_desc;
1215  }
1216  }
1217 
1218  const auto lhs_v = buffer_itr_.getColumnInternal(lhs_storage->buff_,
1219  fixedup_lhs,
1220  order_entry.tle_no - 1,
1221  lhs_storage_lookup_result);
1222  const auto rhs_v = buffer_itr_.getColumnInternal(rhs_storage->buff_,
1223  fixedup_rhs,
1224  order_entry.tle_no - 1,
1225  rhs_storage_lookup_result);
1226 
1227  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1228  isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1229  continue;
1230  }
1231  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1232  !isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1233  return order_entry.nulls_first;
1234  }
1235  if (UNLIKELY(isNull(rhs_entry_ti, rhs_v, float_argument_input) &&
1236  !isNull(lhs_entry_ti, lhs_v, float_argument_input))) {
1237  return !order_entry.nulls_first;
1238  }
1239 
1240  if (LIKELY(lhs_v.isInt())) {
1241  CHECK(rhs_v.isInt());
1242  if (UNLIKELY(lhs_entry_ti.is_string() &&
1243  lhs_entry_ti.get_compression() == kENCODING_DICT)) {
1244  CHECK_EQ(4, lhs_entry_ti.get_logical_size());
1245  CHECK(executor_);
1246  const auto lhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1247  lhs_entry_ti.getStringDictKey(), result_set_->row_set_mem_owner_, false);
1248  const auto rhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1249  rhs_entry_ti.getStringDictKey(), result_set_->row_set_mem_owner_, false);
1250  const auto lhs_str = lhs_string_dict_proxy->getString(lhs_v.i1);
1251  const auto rhs_str = rhs_string_dict_proxy->getString(rhs_v.i1);
1252  if (lhs_str == rhs_str) {
1253  continue;
1254  }
1255  return (lhs_str < rhs_str) != order_entry.is_desc;
1256  }
1257 
1258  if (lhs_v.i1 == rhs_v.i1) {
1259  continue;
1260  }
1261  if (lhs_entry_ti.is_fp()) {
1262  if (float_argument_input) {
1263  const auto lhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&lhs_v.i1));
1264  const auto rhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&rhs_v.i1));
1265  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1266  } else {
1267  const auto lhs_dval =
1268  *reinterpret_cast<const double*>(may_alias_ptr(&lhs_v.i1));
1269  const auto rhs_dval =
1270  *reinterpret_cast<const double*>(may_alias_ptr(&rhs_v.i1));
1271  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1272  }
1273  }
1274  return (lhs_v.i1 < rhs_v.i1) != order_entry.is_desc;
1275  } else {
1276  if (lhs_v.isPair()) {
1277  CHECK(rhs_v.isPair());
1278  const auto lhs =
1279  pair_to_double({lhs_v.i1, lhs_v.i2}, lhs_entry_ti, float_argument_input);
1280  const auto rhs =
1281  pair_to_double({rhs_v.i1, rhs_v.i2}, rhs_entry_ti, float_argument_input);
1282  if (lhs == rhs) {
1283  continue;
1284  }
1285  return (lhs < rhs) != order_entry.is_desc;
1286  } else {
1287  CHECK(lhs_v.isStr() && rhs_v.isStr());
1288  const auto lhs = lhs_v.strVal();
1289  const auto rhs = rhs_v.strVal();
1290  if (lhs == rhs) {
1291  continue;
1292  }
1293  return (lhs < rhs) != order_entry.is_desc;
1294  }
1295  }
1296  }
1297  return false;
1298 }
1299 
1300 // Partial sort permutation into top(least by compare) n elements.
1301 // If permutation.size() <= n then sort entire permutation by compare.
1302 // Return PermutationView with new size() = min(n, permutation.size()).
1304  const size_t n,
1305  const Comparator& compare) {
1306  auto timer = DEBUG_TIMER(__func__);
1307  if (n < permutation.size()) {
1308  std::partial_sort(
1309  permutation.begin(), permutation.begin() + n, permutation.end(), compare);
1310  permutation.resize(n);
1311  } else {
1312  std::sort(permutation.begin(), permutation.end(), compare);
1313  }
1314  return permutation;
1315 }
1316 
1318  const std::list<Analyzer::OrderEntry>& order_entries) const {
1319  auto timer = DEBUG_TIMER(__func__);
1321  const int device_id{0};
1322  auto allocator = std::make_unique<CudaAllocator>(
1323  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1324  CHECK_GT(block_size_, 0);
1325  CHECK_GT(grid_size_, 0);
1326  std::vector<int64_t*> group_by_buffers(block_size_);
1327  group_by_buffers[0] = reinterpret_cast<int64_t*>(storage_->getUnderlyingBuffer());
1328  auto dev_group_by_buffers =
1329  create_dev_group_by_buffers(allocator.get(),
1330  group_by_buffers,
1331  query_mem_desc_,
1332  block_size_,
1333  grid_size_,
1334  device_id,
1336  /*num_input_rows=*/-1,
1337  /*prepend_index_buffer=*/true,
1338  /*always_init_group_by_on_host=*/true,
1339  /*use_bump_allocator=*/false,
1340  /*has_varlen_output=*/false,
1341  /*insitu_allocator*=*/nullptr);
1343  order_entries, query_mem_desc_, dev_group_by_buffers, data_mgr, device_id);
1345  *allocator,
1346  group_by_buffers,
1347  query_mem_desc_.getBufferSizeBytes(ExecutorDeviceType::GPU),
1348  dev_group_by_buffers.data,
1349  query_mem_desc_,
1350  block_size_,
1351  grid_size_,
1352  device_id,
1353  /*use_bump_allocator=*/false,
1354  /*has_varlen_output=*/false);
1355 }
1356 
1358  const std::list<Analyzer::OrderEntry>& order_entries) const {
1359  auto timer = DEBUG_TIMER(__func__);
1360  CHECK(!query_mem_desc_.hasKeylessHash());
1361  std::vector<int64_t> tmp_buff(query_mem_desc_.getEntryCount());
1362  std::vector<int32_t> idx_buff(query_mem_desc_.getEntryCount());
1363  CHECK_EQ(size_t(1), order_entries.size());
1364  auto buffer_ptr = storage_->getUnderlyingBuffer();
1365  for (const auto& order_entry : order_entries) {
1366  const auto target_idx = order_entry.tle_no - 1;
1367  const auto sortkey_val_buff = reinterpret_cast<int64_t*>(
1368  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1369  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1370  sort_groups_cpu(sortkey_val_buff,
1371  &idx_buff[0],
1372  query_mem_desc_.getEntryCount(),
1373  order_entry.is_desc,
1374  chosen_bytes);
1375  apply_permutation_cpu(reinterpret_cast<int64_t*>(buffer_ptr),
1376  &idx_buff[0],
1377  query_mem_desc_.getEntryCount(),
1378  &tmp_buff[0],
1379  sizeof(int64_t));
1380  for (size_t target_idx = 0; target_idx < query_mem_desc_.getSlotCount();
1381  ++target_idx) {
1382  if (static_cast<int>(target_idx) == order_entry.tle_no - 1) {
1383  continue;
1384  }
1385  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1386  const auto satellite_val_buff = reinterpret_cast<int64_t*>(
1387  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1388  apply_permutation_cpu(satellite_val_buff,
1389  &idx_buff[0],
1390  query_mem_desc_.getEntryCount(),
1391  &tmp_buff[0],
1392  chosen_bytes);
1393  }
1394  }
1395 }
1396 
1397 size_t ResultSet::getLimit() const {
1398  return keep_first_;
1399 }
1400 
1401 const std::vector<std::string> ResultSet::getStringDictionaryPayloadCopy(
1402  const shared::StringDictKey& dict_key) const {
1403  const auto sdp =
1404  row_set_mem_owner_->getOrAddStringDictProxy(dict_key, /*with_generation=*/true);
1405  CHECK(sdp);
1406  return sdp->getDictionary()->copyStrings();
1407 }
1408 
1409 const std::pair<std::vector<int32_t>, std::vector<std::string>>
1411  const auto col_type_info = getColType(col_idx);
1412  std::unordered_set<int32_t> unique_string_ids_set;
1413  const size_t num_entries = entryCount();
1414  std::vector<bool> targets_to_skip(colCount(), true);
1415  targets_to_skip[col_idx] = false;
1416  CHECK(col_type_info.is_dict_encoded_type()); // Array<Text> or Text
1417  const int64_t null_val = inline_fixed_encoding_null_val(
1418  col_type_info.is_array() ? col_type_info.get_elem_type() : col_type_info);
1419 
1420  for (size_t row_idx = 0; row_idx < num_entries; ++row_idx) {
1421  const auto result_row = getRowAtNoTranslations(row_idx, targets_to_skip);
1422  if (!result_row.empty()) {
1423  if (const auto scalar_col_val =
1424  boost::get<ScalarTargetValue>(&result_row[col_idx])) {
1425  const int32_t string_id =
1426  static_cast<int32_t>(boost::get<int64_t>(*scalar_col_val));
1427  if (string_id != null_val) {
1428  unique_string_ids_set.emplace(string_id);
1429  }
1430  } else if (const auto array_col_val =
1431  boost::get<ArrayTargetValue>(&result_row[col_idx])) {
1432  if (*array_col_val) {
1433  for (const ScalarTargetValue& scalar : array_col_val->value()) {
1434  const int32_t string_id = static_cast<int32_t>(boost::get<int64_t>(scalar));
1435  if (string_id != null_val) {
1436  unique_string_ids_set.emplace(string_id);
1437  }
1438  }
1439  }
1440  }
1441  }
1442  }
1443 
1444  const size_t num_unique_strings = unique_string_ids_set.size();
1445  std::vector<int32_t> unique_string_ids(num_unique_strings);
1446  size_t string_idx{0};
1447  for (const auto unique_string_id : unique_string_ids_set) {
1448  unique_string_ids[string_idx++] = unique_string_id;
1449  }
1450 
1451  const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1452  col_type_info.getStringDictKey(), /*with_generation=*/true);
1453  CHECK(sdp);
1454 
1455  return std::make_pair(unique_string_ids, sdp->getStrings(unique_string_ids));
1456 }
1457 
1467  return false;
1468  } else if (query_mem_desc_.didOutputColumnar()) {
1469  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1471  query_mem_desc_.getQueryDescriptionType() ==
1473  query_mem_desc_.getQueryDescriptionType() ==
1475  query_mem_desc_.getQueryDescriptionType() ==
1477  } else {
1478  CHECK(!(query_mem_desc_.getQueryDescriptionType() ==
1480  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1482  query_mem_desc_.getQueryDescriptionType() ==
1484  }
1485 }
1486 
1488  return query_mem_desc_.didOutputColumnar() &&
1489  (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection ||
1490  query_mem_desc_.getQueryDescriptionType() ==
1492  appended_storage_.empty() && storage_ &&
1493  (lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
1494 }
1495 
1496 const int8_t* ResultSet::getColumnarBuffer(size_t column_idx) const {
1497  CHECK(isZeroCopyColumnarConversionPossible(column_idx));
1498  return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(column_idx);
1499 }
1500 
1501 // returns a bitmap (and total number) of all single slot targets
1502 std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
1503  std::vector<bool> target_bitmap(targets_.size(), true);
1504  size_t num_single_slot_targets = 0;
1505  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1506  const auto& sql_type = targets_[target_idx].sql_type;
1507  if (targets_[target_idx].is_agg && targets_[target_idx].agg_kind == kAVG) {
1508  target_bitmap[target_idx] = false;
1509  } else if (sql_type.is_varlen()) {
1510  target_bitmap[target_idx] = false;
1511  } else {
1512  num_single_slot_targets++;
1513  }
1514  }
1515  return std::make_tuple(std::move(target_bitmap), num_single_slot_targets);
1516 }
1517 
1526 std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
1527  const {
1528  CHECK(isDirectColumnarConversionPossible());
1529  auto [single_slot_targets, num_single_slot_targets] = getSingleSlotTargetBitmap();
1530 
1531  for (size_t target_idx = 0; target_idx < single_slot_targets.size(); target_idx++) {
1532  const auto& target = targets_[target_idx];
1533  if (single_slot_targets[target_idx] &&
1534  (is_distinct_target(target) ||
1535  shared::is_any<kAPPROX_QUANTILE, kMODE>(target.agg_kind) ||
1536  (target.is_agg && target.agg_kind == kSAMPLE && target.sql_type == kFLOAT))) {
1537  single_slot_targets[target_idx] = false;
1538  num_single_slot_targets--;
1539  }
1540  }
1541  CHECK_GE(num_single_slot_targets, size_t(0));
1542  return std::make_tuple(std::move(single_slot_targets), num_single_slot_targets);
1543 }
1544 
1545 // returns the starting slot index for all targets in the result set
1546 std::vector<size_t> ResultSet::getSlotIndicesForTargetIndices() const {
1547  std::vector<size_t> slot_indices(targets_.size(), 0);
1548  size_t slot_index = 0;
1549  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1550  slot_indices[target_idx] = slot_index;
1551  slot_index = advance_slot(slot_index, targets_[target_idx], false);
1552  }
1553  return slot_indices;
1554 }
1555 
1556 // namespace result_set
1557 
1558 bool result_set::can_use_parallel_algorithms(const ResultSet& rows) {
1559  return !rows.isTruncated();
1560 }
1561 
1562 namespace {
1564  bool operator()(TargetInfo const& target_info) const {
1565  return target_info.sql_type.is_dict_encoded_string();
1566  }
1567 };
1568 } // namespace
1569 
1571  std::vector<TargetInfo> const& targets) {
1572  auto const itr = std::find_if(targets.begin(), targets.end(), IsDictEncodedStr{});
1573  return itr == targets.end() ? std::nullopt
1574  : std::make_optional<size_t>(itr - targets.begin());
1575 }
1576 
1577 bool result_set::use_parallel_algorithms(const ResultSet& rows) {
1578  return result_set::can_use_parallel_algorithms(rows) && rows.entryCount() >= 20000;
1579 }
bool is_agg(const Analyzer::Expr *expr)
void syncEstimatorBuffer() const
Definition: ResultSet.cpp:696
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:301
const QueryMemoryDescriptor & getQueryMemDesc() const
Definition: ResultSet.cpp:672
void sort_groups_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:27
GpuGroupByBuffers create_dev_group_by_buffers(DeviceAllocator *device_allocator, const std::vector< int64_t * > &group_by_buffers, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const ExecutorDispatchMode dispatch_mode, const int64_t num_input_rows, const bool prepend_index_buffer, const bool always_init_group_by_on_host, const bool use_bump_allocator, const bool has_varlen_output, Allocator *insitu_allocator)
Definition: GpuMemUtils.cpp:70
size_t g_parallel_top_max
Definition: ResultSet.cpp:50
std::pair< size_t, size_t > getStorageIndex(const size_t entry_idx) const
Definition: ResultSet.cpp:914
#define NULL_DOUBLE
DEVICE void push_back(T const &value)
Definition: VectorView.h:73
bool isValidationOnlyRes() const
Definition: ResultSet.cpp:748
void setValidationOnlyRes()
Definition: ResultSet.cpp:744
PermutationView initPermutationBuffer(PermutationView permutation, PermutationIdx const begin, PermutationIdx const end) const
Definition: ResultSet.cpp:846
bool g_enable_direct_columnarization
Definition: Execute.cpp:122
ExecutorDeviceType
void moveToBegin() const
Definition: ResultSet.cpp:731
StringDictionaryProxy * getStringDictionaryProxy(const shared::StringDictKey &dict_key) const
Definition: ResultSet.cpp:423
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
#define NULL_BIGINT
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
#define LOG(tag)
Definition: Logger.h:285
ResultSet::ResultSetComparator< BUFFER_ITERATOR_TYPE > const *const rsc_
Definition: ResultSet.cpp:1094
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
DEVICE RealType quantile(VectorView< IndexType const > const partial_sum, RealType const q) const
Definition: quantile.h:827
static const size_t baseline_threshold
Definition: Execute.h:1363
int tle_no
Definition: Analyzer.h:2473
#define UNREACHABLE()
Definition: Logger.h:337
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const std::vector< TargetInfo > & getTargetInfos() const
Definition: ResultSet.cpp:677
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1570
#define CHECK_GE(x, y)
Definition: Logger.h:306
void setKernelQueueTime(const int64_t kernel_queue_time)
Definition: ResultSet.cpp:714
size_t rowCount(const bool force_parallel=false) const
Returns the number of valid entries in the result set (i.e that will be returned from the SQL query o...
Definition: ResultSet.cpp:593
std::shared_ptr< ResultSet > ResultSetPtr
CellCallback(StringDictionaryProxy::IdMap &&id_map, int64_t const null_int)
Definition: ResultSet.cpp:436
int64_t read_int_from_buff(const int8_t *ptr, const int8_t compact_sz)
void keepFirstN(const size_t n)
Definition: ResultSet.cpp:54
size_t g_streaming_topn_max
Definition: ResultSet.cpp:51
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
void addCompilationQueueTime(const int64_t compilation_queue_time)
Definition: ResultSet.cpp:718
std::vector< std::vector< double >> ApproxQuantileBuffers
Definition: ResultSet.h:821
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:102
void parallelTop(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
Definition: ResultSet.cpp:866
size_t colCount() const
Definition: ResultSet.cpp:410
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
void apply_permutation_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, int64_t *tmp_buff, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:46
DEVICE void resize(size_type const size)
Definition: VectorView.h:74
#define CHECK_GT(x, y)
Definition: Logger.h:305
size_t getLimit() const
Definition: ResultSet.cpp:1397
std::vector< int64_t > materializeCountDistinctColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:999
ApproxQuantileBuffers::value_type materializeApproxQuantileColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1045
bool isTruncated() const
Definition: ResultSet.cpp:736
size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset)
Definition: ResultSet.cpp:539
size_t parallelRowCount() const
Definition: ResultSet.cpp:629
DEVICE void mergeBufferFinal()
Definition: quantile.h:651
void radixSortOnCpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1357
const SQLTypeInfo get_compact_type(const TargetInfo &target)
bool definitelyHasNoRows() const
Definition: ResultSet.cpp:668
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1577
bool isZeroCopyColumnarConversionPossible(size_t column_idx) const
Definition: ResultSet.cpp:1487
size_t g_parallel_top_min
Definition: ResultSet.cpp:49
int8_t * getHostEstimatorBuffer() const
Definition: ResultSet.cpp:692
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:234
DEVICE size_type size() const
Definition: VectorView.h:83
void invalidateCachedRowCount() const
Definition: ResultSet.cpp:605
IsAggKind(std::vector< TargetInfo > const &targets, SQLAgg const agg_kind)
Definition: ResultSet.cpp:961
static SysCatalog & instance()
Definition: SysCatalog.h:343
const ResultSetStorage * allocateStorage() const
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
bool operator()(TargetInfo const &target_info) const
Definition: ResultSet.cpp:1564
void sort(const std::list< Analyzer::OrderEntry > &order_entries, size_t top_n, const Executor *executor)
Definition: ResultSet.cpp:768
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void setQueueTime(const int64_t queue_time)
Definition: ResultSet.cpp:710
#define CHECK_NE(x, y)
Definition: Logger.h:302
void dropFirstN(const size_t n)
Definition: ResultSet.cpp:59
DEVICE T * begin() const
Definition: VectorView.h:59
std::vector< PermutationIdx > Permutation
Definition: ResultSet.h:153
std::tuple< std::vector< bool >, size_t > getSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1502
executor_(executor)
bool g_enable_watchdog
#define LIKELY(x)
Definition: likely.h:24
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
StorageLookupResult findStorage(const size_t entry_idx) const
Definition: ResultSet.cpp:939
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:98
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
ResultSetPtr copy()
Definition: ResultSet.cpp:327
std::function< bool(const PermutationIdx, const PermutationIdx)> Comparator
Definition: ResultSet.h:155
bool g_enable_smem_group_by true
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
static double calculateQuantile(quantile::TDigest *const t_digest)
Definition: ResultSet.cpp:1035
std::vector< TargetInfo > const & targets_
Definition: ResultSet.cpp:959
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
void radixSortOnGpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1317
const ResultSetStorage * getStorage() const
Definition: ResultSet.cpp:406
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
const std::pair< std::vector< int32_t >, std::vector< std::string > > getUniqueStringsForDictEncodedTargetCol(const size_t col_idx) const
Definition: ResultSet.cpp:1410
int64_t getQueueTime() const
Definition: ResultSet.cpp:722
#define UNLIKELY(x)
Definition: likely.h:25
ModeBuffers::value_type materializeModeColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1114
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const shared::StringDictKey &dest_dict_key, const std::vector< int32_t > &source_ids, const shared::StringDictKey &source_dict_key, const int32_t dest_generation)
uint32_t PermutationIdx
Definition: ResultSet.h:152
#define CHECK_LT(x, y)
Definition: Logger.h:303
Definition: sqltypes.h:69
SQLTypeInfo getColType(const size_t col_idx) const
Definition: ResultSet.cpp:414
std::tuple< std::vector< bool >, size_t > getSupportedSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1526
ExecutorDeviceType getDeviceType() const
Definition: ResultSet.cpp:248
const int8_t * getColumnarBuffer(size_t column_idx) const
Definition: ResultSet.cpp:1496
bool isExplain() const
Definition: ResultSet.cpp:740
void eachCellInColumn(RowIterationState &, CellCallback const &)
Definition: ResultSet.cpp:485
StringDictionaryProxy::IdMap const id_map_
Definition: ResultSet.cpp:432
const std::vector< std::string > getStringDictionaryPayloadCopy(const shared::StringDictKey &dict_key) const
Definition: ResultSet.cpp:1401
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:756
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t rowCountImpl(const bool force_parallel) const
Definition: ResultSet.cpp:555
void baselineSort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
const Permutation & getPermutationBuffer() const
Definition: ResultSet.cpp:862
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void append(ResultSet &that)
Definition: ResultSet.cpp:297
std::string summaryToString() const
Definition: ResultSet.cpp:216
data_mgr_(data_mgr)
static PermutationView topPermutation(PermutationView, const size_t n, const Comparator &)
Definition: ResultSet.cpp:1303
size_t getCurrentRowBufferIndex() const
Definition: ResultSet.cpp:289
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
int8_t * getDeviceEstimatorBuffer() const
Definition: ResultSet.cpp:686
int64_t materializeMode(int64_t const i1)
Definition: ResultSet.cpp:1079
tbb::blocked_range< size_t > ModeBlockedRange
Definition: ResultSet.cpp:1088
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool operator()(const PermutationIdx lhs, const PermutationIdx rhs) const
Definition: ResultSet.cpp:1129
Basic constructors and methods of the row set interface.
bool operator()(Analyzer::OrderEntry const &order_entry) const
Definition: ResultSet.cpp:963
bool isEmpty() const
Returns a boolean signifying whether there are valid entries in the result set.
Definition: ResultSet.cpp:649
bool is_dict_encoded_string() const
Definition: sqltypes.h:632
const std::vector< int64_t > & getTargetInitVals() const
Definition: ResultSet.cpp:681
std::vector< size_t > getSlotIndicesForTargetIndices() const
Definition: ResultSet.cpp:1546
Allocate GPU memory using GpuBuffers via DataMgr.
constexpr double n
Definition: Utm.h:38
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
constexpr int64_t uninitialized_cached_row_count
Definition: ResultSet.cpp:52
Definition: Analyzer.h:2468
int cpu_threads()
Definition: thread_count.h:25
void operator()(ModeBlockedRange const &r) const
Definition: ResultSet.cpp:1098
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:74
void translateDictEncodedColumns(std::vector< TargetInfo > const &, size_t const start_idx)
Definition: ResultSet.cpp:453
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void copy_group_by_buffers_from_gpu(DeviceAllocator &device_allocator, const std::vector< int64_t * > &group_by_buffers, const size_t groups_buffer_size, const int8_t *group_by_dev_buffers_mem, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const bool prepend_index_buffer, const bool has_varlen_output)
void operator()(int8_t const *const cell_ptr) const
Definition: ResultSet.cpp:438
bool can_use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1558
int64_t getRenderTime() const
Definition: ResultSet.cpp:727
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:64
void setCachedRowCount(const size_t row_count) const
Definition: ResultSet.cpp:609
bool isDirectColumnarConversionPossible() const
Definition: ResultSet.cpp:1465
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:83
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:874
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ModeBuffers materializeModeColumns() const
Definition: ResultSet.cpp:984
ResultSet::ModeBuffers::value_type & materialized_buffer_
Definition: ResultSet.cpp:1096
static constexpr int32_t literalsDictId
Definition: DictRef.h:18
size_t binSearchRowCount() const
Definition: ResultSet.cpp:616
int getDeviceId() const
Definition: ResultSet.cpp:752
std::vector< std::vector< int64_t >> ModeBuffers
Definition: ResultSet.h:822
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180
DEVICE T * end() const
Definition: VectorView.h:67