OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSet.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "ResultSet.h"
26 #include "Execute.h"
27 #include "GpuMemUtils.h"
28 #include "InPlaceSort.h"
31 #include "RelAlgExecutionUnit.h"
32 #include "RuntimeFunctions.h"
33 #include "Shared/Intervals.h"
34 #include "Shared/SqlTypesLayout.h"
35 #include "Shared/checked_alloc.h"
36 #include "Shared/likely.h"
37 #include "Shared/thread_count.h"
38 #include "Shared/threading.h"
39 
40 #include <tbb/parallel_for.h>
41 
42 #include <algorithm>
43 #include <atomic>
44 #include <bitset>
45 #include <functional>
46 #include <future>
47 #include <numeric>
48 
49 size_t g_parallel_top_min = 100e3;
50 size_t g_parallel_top_max = 20e6; // In effect only with g_enable_watchdog.
51 size_t g_streaming_topn_max = 100e3;
52 constexpr int64_t uninitialized_cached_row_count{-1};
53 
54 void ResultSet::keepFirstN(const size_t n) {
55  invalidateCachedRowCount();
56  keep_first_ = n;
57 }
58 
59 void ResultSet::dropFirstN(const size_t n) {
60  invalidateCachedRowCount();
61  drop_first_ = n;
62 }
63 
64 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
65  const ExecutorDeviceType device_type,
67  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
68  const unsigned block_size,
69  const unsigned grid_size)
70  : targets_(targets)
71  , device_type_(device_type)
72  , device_id_(-1)
73  , thread_idx_(-1)
74  , query_mem_desc_(query_mem_desc)
75  , crt_row_buff_idx_(0)
76  , fetched_so_far_(0)
77  , drop_first_(0)
78  , keep_first_(0)
79  , row_set_mem_owner_(row_set_mem_owner)
80  , block_size_(block_size)
81  , grid_size_(grid_size)
82  , data_mgr_(nullptr)
83  , separate_varlen_storage_valid_(false)
84  , just_explain_(false)
85  , for_validation_only_(false)
86  , cached_row_count_(uninitialized_cached_row_count)
87  , geo_return_type_(GeoReturnType::WktString)
88  , cached_(false)
89  , query_exec_time_(0)
90  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
91  , can_use_speculative_top_n_sort(std::nullopt) {}
92 
93 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
94  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
95  const std::vector<std::vector<const int8_t*>>& col_buffers,
96  const std::vector<std::vector<int64_t>>& frag_offsets,
97  const std::vector<int64_t>& consistent_frag_sizes,
98  const ExecutorDeviceType device_type,
99  const int device_id,
100  const int thread_idx,
102  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
103  const unsigned block_size,
104  const unsigned grid_size)
105  : targets_(targets)
106  , device_type_(device_type)
107  , device_id_(device_id)
108  , thread_idx_(thread_idx)
109  , query_mem_desc_(query_mem_desc)
110  , crt_row_buff_idx_(0)
111  , fetched_so_far_(0)
112  , drop_first_(0)
113  , keep_first_(0)
114  , row_set_mem_owner_(row_set_mem_owner)
115  , block_size_(block_size)
116  , grid_size_(grid_size)
117  , lazy_fetch_info_(lazy_fetch_info)
118  , col_buffers_{col_buffers}
119  , frag_offsets_{frag_offsets}
120  , consistent_frag_sizes_{consistent_frag_sizes}
121  , data_mgr_(nullptr)
122  , separate_varlen_storage_valid_(false)
123  , just_explain_(false)
124  , for_validation_only_(false)
125  , cached_row_count_(uninitialized_cached_row_count)
126  , geo_return_type_(GeoReturnType::WktString)
127  , cached_(false)
128  , query_exec_time_(0)
129  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
130  , can_use_speculative_top_n_sort(std::nullopt) {}
131 
132 ResultSet::ResultSet(const std::shared_ptr<const Analyzer::Estimator> estimator,
133  const ExecutorDeviceType device_type,
134  const int device_id,
135  Data_Namespace::DataMgr* data_mgr)
136  : device_type_(device_type)
137  , device_id_(device_id)
138  , thread_idx_(-1)
139  , query_mem_desc_{}
140  , crt_row_buff_idx_(0)
141  , estimator_(estimator)
142  , data_mgr_(data_mgr)
143  , separate_varlen_storage_valid_(false)
144  , just_explain_(false)
145  , for_validation_only_(false)
146  , cached_row_count_(uninitialized_cached_row_count)
147  , geo_return_type_(GeoReturnType::WktString)
148  , cached_(false)
149  , query_exec_time_(0)
150  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
151  , can_use_speculative_top_n_sort(std::nullopt) {
152  if (device_type == ExecutorDeviceType::GPU) {
153  device_estimator_buffer_ = CudaAllocator::allocGpuAbstractBuffer(
154  data_mgr_, estimator_->getBufferSize(), device_id_);
155  data_mgr->getCudaMgr()->zeroDeviceMem(device_estimator_buffer_->getMemoryPtr(),
156  estimator_->getBufferSize(),
157  device_id_,
159  } else {
160  host_estimator_buffer_ =
161  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
162  }
163 }
164 
165 ResultSet::ResultSet(const std::string& explanation)
166  : device_type_(ExecutorDeviceType::CPU)
167  , device_id_(-1)
168  , thread_idx_(-1)
169  , fetched_so_far_(0)
170  , separate_varlen_storage_valid_(false)
171  , explanation_(explanation)
172  , just_explain_(true)
173  , for_validation_only_(false)
174  , cached_row_count_(uninitialized_cached_row_count)
175  , geo_return_type_(GeoReturnType::WktString)
176  , cached_(false)
177  , query_exec_time_(0)
178  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
179  , can_use_speculative_top_n_sort(std::nullopt) {}
180 
181 ResultSet::ResultSet(int64_t queue_time_ms,
182  int64_t render_time_ms,
183  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
184  : device_type_(ExecutorDeviceType::CPU)
185  , device_id_(-1)
186  , thread_idx_(-1)
187  , fetched_so_far_(0)
188  , row_set_mem_owner_(row_set_mem_owner)
189  , timings_(QueryExecutionTimings{queue_time_ms, render_time_ms, 0, 0})
190  , separate_varlen_storage_valid_(false)
191  , just_explain_(true)
192  , for_validation_only_(false)
193  , cached_row_count_(uninitialized_cached_row_count)
194  , geo_return_type_(GeoReturnType::WktString)
195  , cached_(false)
196  , query_exec_time_(0)
197  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
198  , can_use_speculative_top_n_sort(std::nullopt) {}
199 
201  if (storage_) {
202  if (!storage_->buff_is_provided_) {
203  CHECK(storage_->getUnderlyingBuffer());
204  free(storage_->getUnderlyingBuffer());
205  }
206  }
207  for (auto& storage : appended_storage_) {
208  if (storage && !storage->buff_is_provided_) {
209  free(storage->getUnderlyingBuffer());
210  }
211  }
212  if (host_estimator_buffer_) {
213  CHECK(device_type_ == ExecutorDeviceType::CPU || device_estimator_buffer_);
214  free(host_estimator_buffer_);
215  }
216  if (device_estimator_buffer_) {
217  CHECK(data_mgr_);
218  data_mgr_->free(device_estimator_buffer_);
219  }
220 }
221 
222 std::string ResultSet::summaryToString() const {
223  std::ostringstream oss;
224  oss << "Result Set Info" << std::endl;
225  oss << "\tLayout: " << query_mem_desc_.queryDescTypeToString() << std::endl;
226  oss << "\tColumns: " << colCount() << std::endl;
227  oss << "\tRows: " << rowCount() << std::endl;
228  oss << "\tEntry count: " << entryCount() << std::endl;
229  const std::string is_empty = isEmpty() ? "True" : "False";
230  oss << "\tIs empty: " << is_empty << std::endl;
231  const std::string did_output_columnar = didOutputColumnar() ? "True" : "False;";
232  oss << "\tColumnar: " << did_output_columnar << std::endl;
233  oss << "\tLazy-fetched columns: " << getNumColumnsLazyFetched() << std::endl;
234  const std::string is_direct_columnar_conversion_possible =
235  isDirectColumnarConversionPossible() ? "True" : "False";
236  oss << "\tDirect columnar conversion possible: "
237  << is_direct_columnar_conversion_possible << std::endl;
238 
239  size_t num_columns_zero_copy_columnarizable{0};
240  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
241  if (isZeroCopyColumnarConversionPossible(target_idx)) {
242  num_columns_zero_copy_columnarizable++;
243  }
244  }
245  oss << "\tZero-copy columnar conversion columns: "
246  << num_columns_zero_copy_columnarizable << std::endl;
247 
248  oss << "\tPermutation size: " << permutation_.size() << std::endl;
249  oss << "\tLimit: " << keep_first_ << std::endl;
250  oss << "\tOffset: " << drop_first_ << std::endl;
251  return oss.str();
252 }
253 
255  return device_type_;
256 }
257 
259  CHECK(!storage_);
260  CHECK(row_set_mem_owner_);
261  auto buff = row_set_mem_owner_->allocate(
262  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
263  storage_.reset(
264  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
265  return storage_.get();
266 }
267 
269  int8_t* buff,
270  const std::vector<int64_t>& target_init_vals,
271  std::shared_ptr<VarlenOutputInfo> varlen_output_info) const {
272  CHECK(buff);
273  CHECK(!storage_);
274  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, true));
275  // TODO: add both to the constructor
276  storage_->target_init_vals_ = target_init_vals;
277  if (varlen_output_info) {
278  storage_->varlen_output_info_ = varlen_output_info;
279  }
280  return storage_.get();
281 }
282 
284  const std::vector<int64_t>& target_init_vals) const {
285  CHECK(!storage_);
286  CHECK(row_set_mem_owner_);
287  auto buff = row_set_mem_owner_->allocate(
288  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
289  storage_.reset(
290  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
291  storage_->target_init_vals_ = target_init_vals;
292  return storage_.get();
293 }
294 
296  if (crt_row_buff_idx_ == 0) {
297  throw std::runtime_error("current row buffer iteration index is undefined");
298  }
299  return crt_row_buff_idx_ - 1;
300 }
301 
302 // Note: that.appended_storage_ does not get appended to this.
303 void ResultSet::append(ResultSet& that) {
304  invalidateCachedRowCount();
305  if (!that.storage_) {
306  return;
307  }
308  appended_storage_.push_back(std::move(that.storage_));
309  query_mem_desc_.setEntryCount(
310  query_mem_desc_.getEntryCount() +
311  appended_storage_.back()->query_mem_desc_.getEntryCount());
312  chunks_.insert(chunks_.end(), that.chunks_.begin(), that.chunks_.end());
313  col_buffers_.insert(
314  col_buffers_.end(), that.col_buffers_.begin(), that.col_buffers_.end());
315  frag_offsets_.insert(
316  frag_offsets_.end(), that.frag_offsets_.begin(), that.frag_offsets_.end());
317  consistent_frag_sizes_.insert(consistent_frag_sizes_.end(),
318  that.consistent_frag_sizes_.begin(),
319  that.consistent_frag_sizes_.end());
320  chunk_iters_.insert(
321  chunk_iters_.end(), that.chunk_iters_.begin(), that.chunk_iters_.end());
322  if (separate_varlen_storage_valid_) {
323  CHECK(that.separate_varlen_storage_valid_);
324  serialized_varlen_buffer_.insert(serialized_varlen_buffer_.end(),
325  that.serialized_varlen_buffer_.begin(),
326  that.serialized_varlen_buffer_.end());
327  }
328  for (auto& buff : that.literal_buffers_) {
329  literal_buffers_.push_back(std::move(buff));
330  }
331 }
332 
334  auto timer = DEBUG_TIMER(__func__);
335  if (!storage_) {
336  return nullptr;
337  }
338 
339  auto executor = getExecutor();
340  CHECK(executor);
341  ResultSetPtr copied_rs = std::make_shared<ResultSet>(targets_,
342  device_type_,
343  query_mem_desc_,
344  row_set_mem_owner_,
345  executor->blockSize(),
346  executor->gridSize());
347 
348  auto allocate_and_copy_storage =
349  [&](const ResultSetStorage* prev_storage) -> std::unique_ptr<ResultSetStorage> {
350  const auto& prev_qmd = prev_storage->query_mem_desc_;
351  const auto storage_size = prev_qmd.getBufferSizeBytes(device_type_);
352  auto buff = row_set_mem_owner_->allocate(storage_size, /*thread_idx=*/0);
353  std::unique_ptr<ResultSetStorage> new_storage;
354  new_storage.reset(new ResultSetStorage(
355  prev_storage->targets_, prev_qmd, buff, /*buff_is_provided=*/true));
356  new_storage->target_init_vals_ = prev_storage->target_init_vals_;
357  if (prev_storage->varlen_output_info_) {
358  new_storage->varlen_output_info_ = prev_storage->varlen_output_info_;
359  }
360  memcpy(new_storage->buff_, prev_storage->buff_, storage_size);
361  new_storage->query_mem_desc_ = prev_qmd;
362  return new_storage;
363  };
364 
365  copied_rs->storage_ = allocate_and_copy_storage(storage_.get());
366  if (!appended_storage_.empty()) {
367  for (const auto& storage : appended_storage_) {
368  copied_rs->appended_storage_.push_back(allocate_and_copy_storage(storage.get()));
369  }
370  }
371  std::copy(chunks_.begin(), chunks_.end(), std::back_inserter(copied_rs->chunks_));
372  std::copy(chunk_iters_.begin(),
373  chunk_iters_.end(),
374  std::back_inserter(copied_rs->chunk_iters_));
375  std::copy(col_buffers_.begin(),
376  col_buffers_.end(),
377  std::back_inserter(copied_rs->col_buffers_));
378  std::copy(frag_offsets_.begin(),
379  frag_offsets_.end(),
380  std::back_inserter(copied_rs->frag_offsets_));
381  std::copy(consistent_frag_sizes_.begin(),
382  consistent_frag_sizes_.end(),
383  std::back_inserter(copied_rs->consistent_frag_sizes_));
384  if (separate_varlen_storage_valid_) {
385  std::copy(serialized_varlen_buffer_.begin(),
386  serialized_varlen_buffer_.end(),
387  std::back_inserter(copied_rs->serialized_varlen_buffer_));
388  }
389  std::copy(literal_buffers_.begin(),
390  literal_buffers_.end(),
391  std::back_inserter(copied_rs->literal_buffers_));
392  std::copy(lazy_fetch_info_.begin(),
393  lazy_fetch_info_.end(),
394  std::back_inserter(copied_rs->lazy_fetch_info_));
395 
396  copied_rs->permutation_ = permutation_;
397  copied_rs->drop_first_ = drop_first_;
398  copied_rs->keep_first_ = keep_first_;
399  copied_rs->separate_varlen_storage_valid_ = separate_varlen_storage_valid_;
400  copied_rs->query_exec_time_ = query_exec_time_;
401  copied_rs->input_table_keys_ = input_table_keys_;
402  copied_rs->target_meta_info_ = target_meta_info_;
403  copied_rs->geo_return_type_ = geo_return_type_;
404  copied_rs->query_plan_ = query_plan_;
405  if (can_use_speculative_top_n_sort) {
406  copied_rs->can_use_speculative_top_n_sort = can_use_speculative_top_n_sort;
407  }
408 
409  return copied_rs;
410 }
411 
413  return storage_.get();
414 }
415 
416 size_t ResultSet::colCount() const {
417  return just_explain_ ? 1 : targets_.size();
418 }
419 
420 SQLTypeInfo ResultSet::getColType(const size_t col_idx) const {
421  if (just_explain_) {
422  return SQLTypeInfo(kTEXT, false);
423  }
424  CHECK_LT(col_idx, targets_.size());
425  return targets_[col_idx].agg_kind == kAVG ? SQLTypeInfo(kDOUBLE, false)
426  : targets_[col_idx].sql_type;
427 }
428 
430  const shared::StringDictKey& dict_key) const {
431  constexpr bool with_generation = true;
432  return (dict_key.db_id > 0 || dict_key.dict_id == DictRef::literalsDictId)
433  ? row_set_mem_owner_->getOrAddStringDictProxy(dict_key, with_generation)
434  : row_set_mem_owner_->getStringDictProxy(dict_key);
435 }
436 
439  int64_t const null_int_;
440 
441  public:
442  CellCallback(StringDictionaryProxy::IdMap&& id_map, int64_t const null_int)
443  : id_map_(std::move(id_map)), null_int_(null_int) {}
444  void operator()(int8_t const* const cell_ptr) const {
445  using StringId = int32_t;
446  StringId* const string_id_ptr =
447  const_cast<StringId*>(reinterpret_cast<StringId const*>(cell_ptr));
448  if (*string_id_ptr != null_int_) {
449  *string_id_ptr = id_map_[*string_id_ptr];
450  }
451  }
452 };
453 
454 // Update any dictionary-encoded targets within storage_ with the corresponding
455 // dictionary in the given targets parameter, if their comp_param (dictionary) differs.
456 // This may modify both the storage_ values and storage_ targets.
457 // Does not iterate through appended_storage_.
458 // Iterate over targets starting at index target_idx.
459 void ResultSet::translateDictEncodedColumns(std::vector<TargetInfo> const& targets,
460  size_t const start_idx) {
461  if (storage_) {
462  CHECK_EQ(targets.size(), storage_->targets_.size());
463  RowIterationState state;
464  for (size_t target_idx = start_idx; target_idx < targets.size(); ++target_idx) {
465  auto const& type_lhs = targets[target_idx].sql_type;
466  if (type_lhs.is_dict_encoded_string()) {
467  auto& type_rhs =
468  const_cast<SQLTypeInfo&>(storage_->targets_[target_idx].sql_type);
469  CHECK(type_rhs.is_dict_encoded_string());
470  if (type_lhs.getStringDictKey() != type_rhs.getStringDictKey()) {
471  auto* const sdp_lhs = getStringDictionaryProxy(type_lhs.getStringDictKey());
472  CHECK(sdp_lhs);
473  auto const* const sdp_rhs =
474  getStringDictionaryProxy(type_rhs.getStringDictKey());
475  CHECK(sdp_rhs);
476  state.cur_target_idx_ = target_idx;
477  CellCallback const translate_string_ids(sdp_lhs->transientUnion(*sdp_rhs),
478  inline_int_null_val(type_rhs));
479  eachCellInColumn(state, translate_string_ids);
480  type_rhs.set_comp_param(type_lhs.get_comp_param());
481  type_rhs.setStringDictKey(type_lhs.getStringDictKey());
482  }
483  }
484  }
485  }
486 }
487 
488 // For each cell in column target_idx, callback func with pointer to datum.
489 // This currently assumes the column type is a dictionary-encoded string, but this logic
490 // can be generalized to other types.
491 void ResultSet::eachCellInColumn(RowIterationState& state, CellCallback const& func) {
492  size_t const target_idx = state.cur_target_idx_;
493  QueryMemoryDescriptor& storage_qmd = storage_->query_mem_desc_;
494  CHECK_LT(target_idx, lazy_fetch_info_.size());
495  auto& col_lazy_fetch = lazy_fetch_info_[target_idx];
496  CHECK(col_lazy_fetch.is_lazily_fetched);
497  int const target_size = storage_->targets_[target_idx].sql_type.get_size();
498  CHECK_LT(0, target_size) << storage_->targets_[target_idx].toString();
499  size_t const nrows = storage_->binSearchRowCount();
500  if (storage_qmd.didOutputColumnar()) {
501  // Logic based on ResultSet::ColumnWiseTargetAccessor::initializeOffsetsForStorage()
502  if (state.buf_ptr_ == nullptr) {
503  state.buf_ptr_ = get_cols_ptr(storage_->buff_, storage_qmd);
504  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
505  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
506  : query_mem_desc_.getEffectiveKeyWidth();
507  }
508  for (size_t j = state.prev_target_idx_; j < state.cur_target_idx_; ++j) {
509  size_t const next_target_idx = j + 1; // Set state to reflect next target_idx j+1
511  state.buf_ptr_, storage_qmd, state.agg_idx_);
512  auto const& next_agg_info = storage_->targets_[next_target_idx];
513  state.agg_idx_ =
514  advance_slot(state.agg_idx_, next_agg_info, separate_varlen_storage_valid_);
515  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
516  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
517  : query_mem_desc_.getEffectiveKeyWidth();
518  }
519  for (size_t i = 0; i < nrows; ++i) {
520  int8_t const* const pos_ptr = state.buf_ptr_ + i * state.compact_sz1_;
521  int64_t pos = read_int_from_buff(pos_ptr, target_size);
522  CHECK_GE(pos, 0);
523  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
524  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
525  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
526  func(col_frag + pos * target_size);
527  }
528  } else {
529  size_t const key_bytes_with_padding =
531  for (size_t i = 0; i < nrows; ++i) {
532  int8_t const* const keys_ptr = row_ptr_rowwise(storage_->buff_, storage_qmd, i);
533  int8_t const* const rowwise_target_ptr = keys_ptr + key_bytes_with_padding;
534  int64_t pos = *reinterpret_cast<int64_t const*>(rowwise_target_ptr);
535  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
536  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
537  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
538  func(col_frag + pos * target_size);
539  }
540  }
541 }
542 
543 namespace {
544 
545 size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset) {
546  if (total_row_count < offset) {
547  return 0;
548  }
549 
550  size_t total_truncated_row_count = total_row_count - offset;
551 
552  if (limit) {
553  return std::min(total_truncated_row_count, limit);
554  }
555 
556  return total_truncated_row_count;
557 }
558 
559 } // namespace
560 
561 size_t ResultSet::rowCountImpl(const bool force_parallel) const {
562  if (just_explain_) {
563  return 1;
564  }
565  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::TableFunction) {
566  return entryCount();
567  }
568  if (!permutation_.empty()) {
569  // keep_first_ corresponds to SQL LIMIT
570  // drop_first_ corresponds to SQL OFFSET
571  return get_truncated_row_count(permutation_.size(), keep_first_, drop_first_);
572  }
573  if (!storage_) {
574  return 0;
575  }
576  CHECK(permutation_.empty());
577  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection) {
578  return binSearchRowCount();
579  }
580 
581  constexpr size_t auto_parallel_row_count_threshold{20000UL};
582  if (force_parallel || entryCount() >= auto_parallel_row_count_threshold) {
583  return parallelRowCount();
584  }
585  std::lock_guard<std::mutex> lock(row_iteration_mutex_);
586  moveToBegin();
587  size_t row_count{0};
588  while (true) {
589  auto crt_row = getNextRowUnlocked(false, false);
590  if (crt_row.empty()) {
591  break;
592  }
593  ++row_count;
594  }
595  moveToBegin();
596  return row_count;
597 }
598 
599 size_t ResultSet::rowCount(const bool force_parallel) const {
600  // cached_row_count_ is atomic, so fetch it into a local variable first
601  // to avoid repeat fetches
602  const int64_t cached_row_count = cached_row_count_;
603  if (cached_row_count != uninitialized_cached_row_count) {
604  CHECK_GE(cached_row_count, 0);
605  return cached_row_count;
606  }
607  setCachedRowCount(rowCountImpl(force_parallel));
608  return cached_row_count_;
609 }
610 
612  cached_row_count_ = uninitialized_cached_row_count;
613 }
614 
615 void ResultSet::setCachedRowCount(const size_t row_count) const {
616  const int64_t signed_row_count = static_cast<int64_t>(row_count);
617  const int64_t old_cached_row_count = cached_row_count_.exchange(signed_row_count);
618  CHECK(old_cached_row_count == uninitialized_cached_row_count ||
619  old_cached_row_count == signed_row_count);
620 }
621 
623  if (!storage_) {
624  return 0;
625  }
626 
627  size_t row_count = storage_->binSearchRowCount();
628  for (auto& s : appended_storage_) {
629  row_count += s->binSearchRowCount();
630  }
631 
632  return get_truncated_row_count(row_count, getLimit(), drop_first_);
633 }
634 
636  using namespace threading;
637  auto execute_parallel_row_count =
638  [this, parent_thread_local_ids = logger::thread_local_ids()](
639  const blocked_range<size_t>& r, size_t row_count) {
640  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
641  for (size_t i = r.begin(); i < r.end(); ++i) {
642  if (!isRowAtEmpty(i)) {
643  ++row_count;
644  }
645  }
646  return row_count;
647  };
648  const auto row_count = parallel_reduce(blocked_range<size_t>(0, entryCount()),
649  size_t(0),
650  execute_parallel_row_count,
651  std::plus<int>());
652  return get_truncated_row_count(row_count, getLimit(), drop_first_);
653 }
654 
655 bool ResultSet::isEmpty() const {
656  // To simplify this function and de-dup logic with ResultSet::rowCount()
657  // (mismatches between the two were causing bugs), we modified this function
658  // to simply fetch rowCount(). The potential downside of this approach is that
659  // in some cases more work will need to be done, as we can't just stop at the first row.
660  // Mitigating that for most cases is the following:
661  // 1) rowCount() is cached, so the logic for actually computing row counts will run only
662  // once
663  // per result set.
664  // 2) If the cache is empty (cached_row_count_ == -1), rowCount() will use parallel
665  // methods if deemed appropriate, which in many cases could be faster for a sparse
666  // large result set that single-threaded iteration from the beginning
667  // 3) Often where isEmpty() is needed, rowCount() is also needed. Since the first call
668  // to rowCount()
669  // will be cached, there is no extra overhead in these cases
670 
671  return rowCount() == size_t(0);
672 }
673 
675  return (!storage_ && !estimator_ && !just_explain_) || cached_row_count_ == 0;
676 }
677 
679  CHECK(storage_);
680  return storage_->query_mem_desc_;
681 }
682 
683 const std::vector<TargetInfo>& ResultSet::getTargetInfos() const {
684  return targets_;
685 }
686 
687 const std::vector<int64_t>& ResultSet::getTargetInitVals() const {
688  CHECK(storage_);
689  return storage_->target_init_vals_;
690 }
691 
693  CHECK(device_type_ == ExecutorDeviceType::GPU);
694  CHECK(device_estimator_buffer_);
695  return device_estimator_buffer_->getMemoryPtr();
696 }
697 
699  return host_estimator_buffer_;
700 }
701 
703  CHECK(device_type_ == ExecutorDeviceType::GPU);
704  CHECK(!host_estimator_buffer_);
705  CHECK_EQ(size_t(0), estimator_->getBufferSize() % sizeof(int64_t));
706  host_estimator_buffer_ =
707  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
708  CHECK(device_estimator_buffer_);
709  auto device_buffer_ptr = device_estimator_buffer_->getMemoryPtr();
710  auto allocator = std::make_unique<CudaAllocator>(
711  data_mgr_, device_id_, getQueryEngineCudaStreamForDevice(device_id_));
712  allocator->copyFromDevice(
713  host_estimator_buffer_, device_buffer_ptr, estimator_->getBufferSize());
714 }
715 
716 void ResultSet::setQueueTime(const int64_t queue_time) {
717  timings_.executor_queue_time = queue_time;
718 }
719 
720 void ResultSet::setKernelQueueTime(const int64_t kernel_queue_time) {
721  timings_.kernel_queue_time = kernel_queue_time;
722 }
723 
724 void ResultSet::addCompilationQueueTime(const int64_t compilation_queue_time) {
725  timings_.compilation_queue_time += compilation_queue_time;
726 }
727 
728 int64_t ResultSet::getQueueTime() const {
729  return timings_.executor_queue_time + timings_.kernel_queue_time +
730  timings_.compilation_queue_time;
731 }
732 
733 int64_t ResultSet::getRenderTime() const {
734  return timings_.render_time;
735 }
736 
738  crt_row_buff_idx_ = 0;
739  fetched_so_far_ = 0;
740 }
741 
743  return keep_first_ + drop_first_;
744 }
745 
746 bool ResultSet::isExplain() const {
747  return just_explain_;
748 }
749 
751  for_validation_only_ = true;
752 }
753 
755  return for_validation_only_;
756 }
757 
759  return device_id_;
760 }
761 
763  return thread_idx_;
764 }
765 
768  auto query_mem_desc_copy = query_mem_desc;
769  query_mem_desc_copy.resetGroupColWidths(
770  std::vector<int8_t>(query_mem_desc_copy.getGroupbyColCount(), 8));
771  if (query_mem_desc.didOutputColumnar()) {
772  return query_mem_desc_copy;
773  }
774  query_mem_desc_copy.alignPaddedSlots();
775  return query_mem_desc_copy;
776 }
777 
778 void ResultSet::sort(const std::list<Analyzer::OrderEntry>& order_entries,
779  size_t top_n,
780  ExecutorDeviceType device_type,
781  const Executor* executor) {
782  auto timer = DEBUG_TIMER(__func__);
783 
784  if (!storage_) {
785  return;
786  }
787  invalidateCachedRowCount();
788  CHECK(!targets_.empty());
789 #ifdef HAVE_CUDA
790  if (canUseFastBaselineSort(order_entries, top_n)) {
791  baselineSort(order_entries, top_n, device_type, executor);
792  return;
793  }
794 #endif // HAVE_CUDA
795  if (query_mem_desc_.sortOnGpu()) {
796  try {
797  radixSortOnGpu(order_entries);
798  } catch (const OutOfMemory&) {
799  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
800  radixSortOnCpu(order_entries);
801  } catch (const std::bad_alloc&) {
802  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
803  radixSortOnCpu(order_entries);
804  }
805  return;
806  }
807  // This check isn't strictly required, but allows the index buffer to be 32-bit.
808  if (query_mem_desc_.getEntryCount() > std::numeric_limits<uint32_t>::max()) {
809  throw RowSortException("Sorting more than 4B elements not supported");
810  }
811 
812  CHECK(permutation_.empty());
813 
814  if (top_n && g_parallel_top_min < entryCount()) {
815  if (g_enable_watchdog && g_parallel_top_max < entryCount()) {
816  throw WatchdogException("Sorting the result would be too slow");
817  }
818  parallelTop(order_entries, top_n, executor);
819  } else {
820  if (g_enable_watchdog && Executor::baseline_threshold < entryCount()) {
821  throw WatchdogException("Sorting the result would be too slow");
822  }
823  permutation_.resize(query_mem_desc_.getEntryCount());
824  // PermutationView is used to share common API with parallelTop().
825  PermutationView pv(permutation_.data(), 0, permutation_.size());
826  pv = initPermutationBuffer(pv, 0, permutation_.size());
827  if (top_n == 0) {
828  top_n = pv.size(); // top_n == 0 implies a full sort
829  }
830  pv = topPermutation(pv, top_n, createComparator(order_entries, pv, executor, false));
831  if (pv.size() < permutation_.size()) {
832  permutation_.resize(pv.size());
833  permutation_.shrink_to_fit();
834  }
835  }
836 }
837 
838 #ifdef HAVE_CUDA
839 void ResultSet::baselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
840  const size_t top_n,
841  const ExecutorDeviceType device_type,
842  const Executor* executor) {
843  auto timer = DEBUG_TIMER(__func__);
844  // If we only have on GPU, it's usually faster to do multi-threaded radix sort on CPU
845  if (device_type == ExecutorDeviceType::GPU && getGpuCount() > 1) {
846  try {
847  doBaselineSort(ExecutorDeviceType::GPU, order_entries, top_n, executor);
848  } catch (...) {
849  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
850  }
851  } else {
852  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
853  }
854 }
855 #endif // HAVE_CUDA
856 
857 // Append non-empty indexes i in [begin,end) from findStorage(i) to permutation.
859  PermutationIdx const begin,
860  PermutationIdx const end) const {
861  auto timer = DEBUG_TIMER(__func__);
862  for (PermutationIdx i = begin; i < end; ++i) {
863  const auto storage_lookup_result = findStorage(i);
864  const auto lhs_storage = storage_lookup_result.storage_ptr;
865  const auto off = storage_lookup_result.fixedup_entry_idx;
866  CHECK(lhs_storage);
867  if (!lhs_storage->isEmptyEntry(off)) {
868  permutation.push_back(i);
869  }
870  }
871  return permutation;
872 }
873 
875  return permutation_;
876 }
877 
878 void ResultSet::parallelTop(const std::list<Analyzer::OrderEntry>& order_entries,
879  const size_t top_n,
880  const Executor* executor) {
881  auto timer = DEBUG_TIMER(__func__);
882  const size_t nthreads = cpu_threads();
883 
884  // Split permutation_ into nthreads subranges and top-sort in-place.
885  permutation_.resize(query_mem_desc_.getEntryCount());
886  std::vector<PermutationView> permutation_views(nthreads);
887  threading::task_group top_sort_threads;
888  for (auto interval : makeIntervals<PermutationIdx>(0, permutation_.size(), nthreads)) {
889  top_sort_threads.run([this,
890  &order_entries,
891  &permutation_views,
892  top_n,
893  executor,
894  parent_thread_local_ids = logger::thread_local_ids(),
895  interval] {
896  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
897  PermutationView pv(permutation_.data() + interval.begin, 0, interval.size());
898  pv = initPermutationBuffer(pv, interval.begin, interval.end);
899  const auto compare = createComparator(order_entries, pv, executor, true);
900  permutation_views[interval.index] = topPermutation(pv, top_n, compare);
901  });
902  }
903  top_sort_threads.wait();
904 
905  // In case you are considering implementing a parallel reduction, note that the
906  // ResultSetComparator constructor is O(N) in order to materialize some of the aggregate
907  // columns as necessary to perform a comparison. This cost is why reduction is chosen to
908  // be serial instead; only one more Comparator is needed below.
909 
910  // Left-copy disjoint top-sorted subranges into one contiguous range.
911  // ++++....+++.....+++++... -> ++++++++++++............
912  auto end = permutation_.begin() + permutation_views.front().size();
913  for (size_t i = 1; i < nthreads; ++i) {
914  std::copy(permutation_views[i].begin(), permutation_views[i].end(), end);
915  end += permutation_views[i].size();
916  }
917 
918  // Top sort final range.
919  PermutationView pv(permutation_.data(), end - permutation_.begin());
920  const auto compare = createComparator(order_entries, pv, executor, false);
921  pv = topPermutation(pv, top_n, compare);
922  permutation_.resize(pv.size());
923  permutation_.shrink_to_fit();
924 }
925 
926 std::pair<size_t, size_t> ResultSet::getStorageIndex(const size_t entry_idx) const {
927  size_t fixedup_entry_idx = entry_idx;
928  auto entry_count = storage_->query_mem_desc_.getEntryCount();
929  const bool is_rowwise_layout = !storage_->query_mem_desc_.didOutputColumnar();
930  if (fixedup_entry_idx < entry_count) {
931  return {0, fixedup_entry_idx};
932  }
933  fixedup_entry_idx -= entry_count;
934  for (size_t i = 0; i < appended_storage_.size(); ++i) {
935  const auto& desc = appended_storage_[i]->query_mem_desc_;
936  CHECK_NE(is_rowwise_layout, desc.didOutputColumnar());
937  entry_count = desc.getEntryCount();
938  if (fixedup_entry_idx < entry_count) {
939  return {i + 1, fixedup_entry_idx};
940  }
941  fixedup_entry_idx -= entry_count;
942  }
943  UNREACHABLE() << "entry_idx = " << entry_idx << ", query_mem_desc_.getEntryCount() = "
944  << query_mem_desc_.getEntryCount();
945  return {};
946 }
947 
950 
952  auto [stg_idx, fixedup_entry_idx] = getStorageIndex(entry_idx);
953  return {stg_idx ? appended_storage_[stg_idx - 1].get() : storage_.get(),
954  fixedup_entry_idx,
955  stg_idx};
956 }
957 
958 template <typename BUFFER_ITERATOR_TYPE>
960  BUFFER_ITERATOR_TYPE>::materializeCountDistinctColumns() {
961  for (const auto& order_entry : order_entries_) {
962  if (is_distinct_target(result_set_->targets_[order_entry.tle_no - 1])) {
963  count_distinct_materialized_buffers_.emplace_back(
964  materializeCountDistinctColumn(order_entry));
965  }
966  }
967 }
968 
969 namespace {
970 struct IsAggKind {
971  std::vector<TargetInfo> const& targets_;
973  IsAggKind(std::vector<TargetInfo> const& targets, SQLAgg const agg_kind)
974  : targets_(targets), agg_kind_(agg_kind) {}
975  bool operator()(Analyzer::OrderEntry const& order_entry) const {
976  return targets_[order_entry.tle_no - 1].agg_kind == agg_kind_;
977  }
978 };
979 } // namespace
980 
981 template <typename BUFFER_ITERATOR_TYPE>
983  BUFFER_ITERATOR_TYPE>::materializeApproxQuantileColumns() const {
984  ResultSet::ApproxQuantileBuffers approx_quantile_materialized_buffers;
985  for (const auto& order_entry : order_entries_) {
986  if (result_set_->targets_[order_entry.tle_no - 1].agg_kind == kAPPROX_QUANTILE) {
987  approx_quantile_materialized_buffers.emplace_back(
988  materializeApproxQuantileColumn(order_entry));
989  }
990  }
991  return approx_quantile_materialized_buffers;
992 }
993 
994 template <typename BUFFER_ITERATOR_TYPE>
997  ResultSet::ModeBuffers mode_buffers;
998  IsAggKind const is_mode(result_set_->targets_, kMODE);
999  mode_buffers.reserve(
1000  std::count_if(order_entries_.begin(), order_entries_.end(), is_mode));
1001  for (auto const& order_entry : order_entries_) {
1002  if (is_mode(order_entry)) {
1003  mode_buffers.emplace_back(materializeModeColumn(order_entry));
1004  }
1005  }
1006  return mode_buffers;
1007 }
1008 
1009 template <typename BUFFER_ITERATOR_TYPE>
1010 std::vector<int64_t>
1012  const Analyzer::OrderEntry& order_entry) const {
1013  const size_t num_storage_entries = result_set_->query_mem_desc_.getEntryCount();
1014  std::vector<int64_t> count_distinct_materialized_buffer(num_storage_entries);
1015  const CountDistinctDescriptor count_distinct_descriptor =
1016  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no - 1);
1017  const size_t num_non_empty_entries = permutation_.size();
1018 
1019  const auto work = [&, parent_thread_local_ids = logger::thread_local_ids()](
1020  const size_t start, const size_t end) {
1021  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1022  for (size_t i = start; i < end; ++i) {
1023  const PermutationIdx permuted_idx = permutation_[i];
1024  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1025  const auto storage = storage_lookup_result.storage_ptr;
1026  const auto off = storage_lookup_result.fixedup_entry_idx;
1027  const auto value = buffer_itr_.getColumnInternal(
1028  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1029  count_distinct_materialized_buffer[permuted_idx] =
1030  count_distinct_set_size(value.i1, count_distinct_descriptor);
1031  }
1032  };
1033  // TODO(tlm): Allow use of tbb after we determine how to easily encapsulate the choice
1034  // between thread pool types
1035  if (single_threaded_) {
1036  work(0, num_non_empty_entries);
1037  } else {
1038  threading::task_group thread_pool;
1039  for (auto interval : makeIntervals<size_t>(0, num_non_empty_entries, cpu_threads())) {
1040  thread_pool.run([=] { work(interval.begin, interval.end); });
1041  }
1042  thread_pool.wait();
1043  }
1044  return count_distinct_materialized_buffer;
1045 }
1046 
1048  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1049  CHECK(t_digest);
1050  t_digest->mergeBufferFinal();
1051  double const quantile = t_digest->quantile();
1052  return boost::math::isnan(quantile) ? NULL_DOUBLE : quantile;
1053 }
1054 
1055 template <typename BUFFER_ITERATOR_TYPE>
1056 ResultSet::ApproxQuantileBuffers::value_type
1058  const Analyzer::OrderEntry& order_entry) const {
1059  ResultSet::ApproxQuantileBuffers::value_type materialized_buffer(
1060  result_set_->query_mem_desc_.getEntryCount());
1061  const size_t size = permutation_.size();
1062  const auto work = [&, parent_thread_local_ids = logger::thread_local_ids()](
1063  const size_t start, const size_t end) {
1064  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1065  for (size_t i = start; i < end; ++i) {
1066  const PermutationIdx permuted_idx = permutation_[i];
1067  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1068  const auto storage = storage_lookup_result.storage_ptr;
1069  const auto off = storage_lookup_result.fixedup_entry_idx;
1070  const auto value = buffer_itr_.getColumnInternal(
1071  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1072  materialized_buffer[permuted_idx] =
1073  value.i1 ? calculateQuantile(reinterpret_cast<quantile::TDigest*>(value.i1))
1074  : NULL_DOUBLE;
1075  }
1076  };
1077  if (single_threaded_) {
1078  work(0, size);
1079  } else {
1080  threading::task_group thread_pool;
1081  for (auto interval : makeIntervals<size_t>(0, size, cpu_threads())) {
1082  thread_pool.run([=] { work(interval.begin, interval.end); });
1083  }
1084  thread_pool.wait();
1085  }
1086  return materialized_buffer;
1087 }
1088 
1089 namespace {
1090 // i1 is from InternalTargetValue
1091 int64_t materializeMode(int64_t const i1) {
1092  if (auto const* const agg_mode = reinterpret_cast<AggMode const*>(i1)) {
1093  if (std::optional<int64_t> const mode = agg_mode->mode()) {
1094  return *mode;
1095  }
1096  }
1097  return NULL_BIGINT;
1098 }
1099 
1100 using ModeBlockedRange = tbb::blocked_range<size_t>;
1101 } // namespace
1102 
1103 template <typename BUFFER_ITERATOR_TYPE>
1104 struct ResultSet::ResultSetComparator<BUFFER_ITERATOR_TYPE>::ModeScatter {
1108  ResultSet::ModeBuffers::value_type& materialized_buffer_;
1109 
1110  void operator()(ModeBlockedRange const& r) const {
1111  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids_.setNewThreadId();
1112  for (size_t i = r.begin(); i != r.end(); ++i) {
1113  PermutationIdx const permuted_idx = rsc_->permutation_[i];
1114  auto const storage_lookup_result = rsc_->result_set_->findStorage(permuted_idx);
1115  auto const storage = storage_lookup_result.storage_ptr;
1116  auto const off = storage_lookup_result.fixedup_entry_idx;
1117  auto const value = rsc_->buffer_itr_.getColumnInternal(
1118  storage->buff_, off, order_entry_.tle_no - 1, storage_lookup_result);
1119  materialized_buffer_[permuted_idx] = materializeMode(value.i1);
1120  }
1121  }
1122 };
1123 
1124 template <typename BUFFER_ITERATOR_TYPE>
1125 ResultSet::ModeBuffers::value_type
1127  const Analyzer::OrderEntry& order_entry) const {
1128  ResultSet::ModeBuffers::value_type materialized_buffer(
1129  result_set_->query_mem_desc_.getEntryCount());
1130  ModeScatter mode_scatter{
1131  logger::thread_local_ids(), this, order_entry, materialized_buffer};
1132  if (single_threaded_) {
1133  mode_scatter(ModeBlockedRange(0, permutation_.size())); // Still has new thread_id.
1134  } else {
1135  tbb::parallel_for(ModeBlockedRange(0, permutation_.size()), mode_scatter);
1136  }
1137  return materialized_buffer;
1138 }
1139 
1140 template <typename BUFFER_ITERATOR_TYPE>
1142  const PermutationIdx lhs,
1143  const PermutationIdx rhs) const {
1144  // NB: The compare function must define a strict weak ordering, otherwise
1145  // std::sort will trigger a segmentation fault (or corrupt memory).
1146  const auto lhs_storage_lookup_result = result_set_->findStorage(lhs);
1147  const auto rhs_storage_lookup_result = result_set_->findStorage(rhs);
1148  const auto lhs_storage = lhs_storage_lookup_result.storage_ptr;
1149  const auto rhs_storage = rhs_storage_lookup_result.storage_ptr;
1150  const auto fixedup_lhs = lhs_storage_lookup_result.fixedup_entry_idx;
1151  const auto fixedup_rhs = rhs_storage_lookup_result.fixedup_entry_idx;
1152  size_t materialized_count_distinct_buffer_idx{0};
1153  size_t materialized_approx_quantile_buffer_idx{0};
1154  size_t materialized_mode_buffer_idx{0};
1155 
1156  for (const auto& order_entry : order_entries_) {
1157  CHECK_GE(order_entry.tle_no, 1);
1158  // lhs_entry_ti and rhs_entry_ti can differ on comp_param w/ UNION of string dicts.
1159  const auto& lhs_agg_info = lhs_storage->targets_[order_entry.tle_no - 1];
1160  const auto& rhs_agg_info = rhs_storage->targets_[order_entry.tle_no - 1];
1161  const auto lhs_entry_ti = get_compact_type(lhs_agg_info);
1162  const auto rhs_entry_ti = get_compact_type(rhs_agg_info);
1163  // When lhs vs rhs doesn't matter, the lhs is used. For example:
1164  bool float_argument_input = takes_float_argument(lhs_agg_info);
1165  // Need to determine if the float value has been stored as float
1166  // or if it has been compacted to a different (often larger 8 bytes)
1167  // in distributed case the floats are actually 4 bytes
1168  // TODO the above takes_float_argument() is widely used wonder if this problem
1169  // exists elsewhere
1170  if (lhs_entry_ti.get_type() == kFLOAT) {
1171  const auto is_col_lazy =
1172  !result_set_->lazy_fetch_info_.empty() &&
1173  result_set_->lazy_fetch_info_[order_entry.tle_no - 1].is_lazily_fetched;
1174  if (result_set_->query_mem_desc_.getPaddedSlotWidthBytes(order_entry.tle_no - 1) ==
1175  sizeof(float)) {
1176  float_argument_input =
1177  result_set_->query_mem_desc_.didOutputColumnar() ? !is_col_lazy : true;
1178  }
1179  }
1180 
1181  if (UNLIKELY(is_distinct_target(lhs_agg_info))) {
1182  CHECK_LT(materialized_count_distinct_buffer_idx,
1183  count_distinct_materialized_buffers_.size());
1184 
1185  const auto& count_distinct_materialized_buffer =
1186  count_distinct_materialized_buffers_[materialized_count_distinct_buffer_idx];
1187  const auto lhs_sz = count_distinct_materialized_buffer[lhs];
1188  const auto rhs_sz = count_distinct_materialized_buffer[rhs];
1189  ++materialized_count_distinct_buffer_idx;
1190  if (lhs_sz == rhs_sz) {
1191  continue;
1192  }
1193  return (lhs_sz < rhs_sz) != order_entry.is_desc;
1194  } else if (UNLIKELY(lhs_agg_info.agg_kind == kAPPROX_QUANTILE)) {
1195  CHECK_LT(materialized_approx_quantile_buffer_idx,
1196  approx_quantile_materialized_buffers_.size());
1197  const auto& approx_quantile_materialized_buffer =
1198  approx_quantile_materialized_buffers_[materialized_approx_quantile_buffer_idx];
1199  const auto lhs_value = approx_quantile_materialized_buffer[lhs];
1200  const auto rhs_value = approx_quantile_materialized_buffer[rhs];
1201  ++materialized_approx_quantile_buffer_idx;
1202  if (lhs_value == rhs_value) {
1203  continue;
1204  } else if (!lhs_entry_ti.get_notnull()) {
1205  if (lhs_value == NULL_DOUBLE) {
1206  return order_entry.nulls_first;
1207  } else if (rhs_value == NULL_DOUBLE) {
1208  return !order_entry.nulls_first;
1209  }
1210  }
1211  return (lhs_value < rhs_value) != order_entry.is_desc;
1212  } else if (UNLIKELY(lhs_agg_info.agg_kind == kMODE)) {
1213  CHECK_LT(materialized_mode_buffer_idx, mode_buffers_.size());
1214  auto const& mode_buffer = mode_buffers_[materialized_mode_buffer_idx++];
1215  int64_t const lhs_value = mode_buffer[lhs];
1216  int64_t const rhs_value = mode_buffer[rhs];
1217  if (lhs_value == rhs_value) {
1218  continue;
1219  // MODE(x) can only be NULL when the group is empty, since it skips null values.
1220  } else if (lhs_value == NULL_BIGINT) { // NULL_BIGINT from materializeMode()
1221  return order_entry.nulls_first;
1222  } else if (rhs_value == NULL_BIGINT) {
1223  return !order_entry.nulls_first;
1224  } else {
1225  return result_set_->isLessThan(lhs_entry_ti, lhs_value, rhs_value) !=
1226  order_entry.is_desc;
1227  }
1228  }
1229 
1230  const auto lhs_v = buffer_itr_.getColumnInternal(lhs_storage->buff_,
1231  fixedup_lhs,
1232  order_entry.tle_no - 1,
1233  lhs_storage_lookup_result);
1234  const auto rhs_v = buffer_itr_.getColumnInternal(rhs_storage->buff_,
1235  fixedup_rhs,
1236  order_entry.tle_no - 1,
1237  rhs_storage_lookup_result);
1238 
1239  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1240  isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1241  continue;
1242  }
1243  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1244  !isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1245  return order_entry.nulls_first;
1246  }
1247  if (UNLIKELY(isNull(rhs_entry_ti, rhs_v, float_argument_input) &&
1248  !isNull(lhs_entry_ti, lhs_v, float_argument_input))) {
1249  return !order_entry.nulls_first;
1250  }
1251 
1252  if (LIKELY(lhs_v.isInt())) {
1253  CHECK(rhs_v.isInt());
1254  if (UNLIKELY(lhs_entry_ti.is_string() &&
1255  lhs_entry_ti.get_compression() == kENCODING_DICT)) {
1256  CHECK_EQ(4, lhs_entry_ti.get_logical_size());
1257  CHECK(executor_);
1258  const auto lhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1259  lhs_entry_ti.getStringDictKey(), result_set_->row_set_mem_owner_, false);
1260  const auto rhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1261  rhs_entry_ti.getStringDictKey(), result_set_->row_set_mem_owner_, false);
1262  const auto lhs_str = lhs_string_dict_proxy->getString(lhs_v.i1);
1263  const auto rhs_str = rhs_string_dict_proxy->getString(rhs_v.i1);
1264  if (lhs_str == rhs_str) {
1265  continue;
1266  }
1267  return (lhs_str < rhs_str) != order_entry.is_desc;
1268  }
1269 
1270  if (lhs_v.i1 == rhs_v.i1) {
1271  continue;
1272  }
1273  if (lhs_entry_ti.is_fp()) {
1274  if (float_argument_input) {
1275  const auto lhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&lhs_v.i1));
1276  const auto rhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&rhs_v.i1));
1277  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1278  } else {
1279  const auto lhs_dval =
1280  *reinterpret_cast<const double*>(may_alias_ptr(&lhs_v.i1));
1281  const auto rhs_dval =
1282  *reinterpret_cast<const double*>(may_alias_ptr(&rhs_v.i1));
1283  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1284  }
1285  }
1286  return (lhs_v.i1 < rhs_v.i1) != order_entry.is_desc;
1287  } else {
1288  if (lhs_v.isPair()) {
1289  CHECK(rhs_v.isPair());
1290  const auto lhs =
1291  pair_to_double({lhs_v.i1, lhs_v.i2}, lhs_entry_ti, float_argument_input);
1292  const auto rhs =
1293  pair_to_double({rhs_v.i1, rhs_v.i2}, rhs_entry_ti, float_argument_input);
1294  if (lhs == rhs) {
1295  continue;
1296  }
1297  return (lhs < rhs) != order_entry.is_desc;
1298  } else {
1299  CHECK(lhs_v.isStr() && rhs_v.isStr());
1300  const auto lhs = lhs_v.strVal();
1301  const auto rhs = rhs_v.strVal();
1302  if (lhs == rhs) {
1303  continue;
1304  }
1305  return (lhs < rhs) != order_entry.is_desc;
1306  }
1307  }
1308  }
1309  return false;
1310 }
1311 
1312 // Partial sort permutation into top(least by compare) n elements.
1313 // If permutation.size() <= n then sort entire permutation by compare.
1314 // Return PermutationView with new size() = min(n, permutation.size()).
1316  const size_t n,
1317  const Comparator& compare) {
1318  auto timer = DEBUG_TIMER(__func__);
1319  if (n < permutation.size()) {
1320  std::partial_sort(
1321  permutation.begin(), permutation.begin() + n, permutation.end(), compare);
1322  permutation.resize(n);
1323  } else {
1324  std::sort(permutation.begin(), permutation.end(), compare);
1325  }
1326  return permutation;
1327 }
1328 
1330  const std::list<Analyzer::OrderEntry>& order_entries) const {
1331  auto timer = DEBUG_TIMER(__func__);
1333  const int device_id{0};
1334  auto allocator = std::make_unique<CudaAllocator>(
1335  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1336  CHECK_GT(block_size_, 0);
1337  CHECK_GT(grid_size_, 0);
1338  std::vector<int64_t*> group_by_buffers(block_size_);
1339  group_by_buffers[0] = reinterpret_cast<int64_t*>(storage_->getUnderlyingBuffer());
1340  auto dev_group_by_buffers =
1341  create_dev_group_by_buffers(allocator.get(),
1342  group_by_buffers,
1343  query_mem_desc_,
1344  block_size_,
1345  grid_size_,
1346  device_id,
1348  /*num_input_rows=*/-1,
1349  /*prepend_index_buffer=*/true,
1350  /*always_init_group_by_on_host=*/true,
1351  /*use_bump_allocator=*/false,
1352  /*has_varlen_output=*/false,
1353  /*insitu_allocator*=*/nullptr);
1355  order_entries, query_mem_desc_, dev_group_by_buffers, data_mgr, device_id);
1357  *allocator,
1358  group_by_buffers,
1359  query_mem_desc_.getBufferSizeBytes(ExecutorDeviceType::GPU),
1360  dev_group_by_buffers.data,
1361  query_mem_desc_,
1362  block_size_,
1363  grid_size_,
1364  device_id,
1365  /*use_bump_allocator=*/false,
1366  /*has_varlen_output=*/false);
1367 }
1368 
1370  const std::list<Analyzer::OrderEntry>& order_entries) const {
1371  auto timer = DEBUG_TIMER(__func__);
1372  CHECK(!query_mem_desc_.hasKeylessHash());
1373  std::vector<int64_t> tmp_buff(query_mem_desc_.getEntryCount());
1374  std::vector<int32_t> idx_buff(query_mem_desc_.getEntryCount());
1375  CHECK_EQ(size_t(1), order_entries.size());
1376  auto buffer_ptr = storage_->getUnderlyingBuffer();
1377  for (const auto& order_entry : order_entries) {
1378  const auto target_idx = order_entry.tle_no - 1;
1379  const auto sortkey_val_buff = reinterpret_cast<int64_t*>(
1380  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1381  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1382  sort_groups_cpu(sortkey_val_buff,
1383  &idx_buff[0],
1384  query_mem_desc_.getEntryCount(),
1385  order_entry.is_desc,
1386  chosen_bytes);
1387  apply_permutation_cpu(reinterpret_cast<int64_t*>(buffer_ptr),
1388  &idx_buff[0],
1389  query_mem_desc_.getEntryCount(),
1390  &tmp_buff[0],
1391  sizeof(int64_t));
1392  for (size_t target_idx = 0; target_idx < query_mem_desc_.getSlotCount();
1393  ++target_idx) {
1394  if (static_cast<int>(target_idx) == order_entry.tle_no - 1) {
1395  continue;
1396  }
1397  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1398  const auto satellite_val_buff = reinterpret_cast<int64_t*>(
1399  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1400  apply_permutation_cpu(satellite_val_buff,
1401  &idx_buff[0],
1402  query_mem_desc_.getEntryCount(),
1403  &tmp_buff[0],
1404  chosen_bytes);
1405  }
1406  }
1407 }
1408 
1409 size_t ResultSet::getLimit() const {
1410  return keep_first_;
1411 }
1412 
1413 const std::vector<std::string> ResultSet::getStringDictionaryPayloadCopy(
1414  const shared::StringDictKey& dict_key) const {
1415  const auto sdp =
1416  row_set_mem_owner_->getOrAddStringDictProxy(dict_key, /*with_generation=*/true);
1417  CHECK(sdp);
1418  return sdp->getDictionary()->copyStrings();
1419 }
1420 
1421 const std::pair<std::vector<int32_t>, std::vector<std::string>>
1423  const auto col_type_info = getColType(col_idx);
1424  std::unordered_set<int32_t> unique_string_ids_set;
1425  const size_t num_entries = entryCount();
1426  std::vector<bool> targets_to_skip(colCount(), true);
1427  targets_to_skip[col_idx] = false;
1428  CHECK(col_type_info.is_dict_encoded_type()); // Array<Text> or Text
1429  const int64_t null_val = inline_fixed_encoding_null_val(
1430  col_type_info.is_array() ? col_type_info.get_elem_type() : col_type_info);
1431 
1432  for (size_t row_idx = 0; row_idx < num_entries; ++row_idx) {
1433  const auto result_row = getRowAtNoTranslations(row_idx, targets_to_skip);
1434  if (!result_row.empty()) {
1435  if (const auto scalar_col_val =
1436  boost::get<ScalarTargetValue>(&result_row[col_idx])) {
1437  const int32_t string_id =
1438  static_cast<int32_t>(boost::get<int64_t>(*scalar_col_val));
1439  if (string_id != null_val) {
1440  unique_string_ids_set.emplace(string_id);
1441  }
1442  } else if (const auto array_col_val =
1443  boost::get<ArrayTargetValue>(&result_row[col_idx])) {
1444  if (*array_col_val) {
1445  for (const ScalarTargetValue& scalar : array_col_val->value()) {
1446  const int32_t string_id = static_cast<int32_t>(boost::get<int64_t>(scalar));
1447  if (string_id != null_val) {
1448  unique_string_ids_set.emplace(string_id);
1449  }
1450  }
1451  }
1452  }
1453  }
1454  }
1455 
1456  const size_t num_unique_strings = unique_string_ids_set.size();
1457  std::vector<int32_t> unique_string_ids(num_unique_strings);
1458  size_t string_idx{0};
1459  for (const auto unique_string_id : unique_string_ids_set) {
1460  unique_string_ids[string_idx++] = unique_string_id;
1461  }
1462 
1463  const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1464  col_type_info.getStringDictKey(), /*with_generation=*/true);
1465  CHECK(sdp);
1466 
1467  return std::make_pair(unique_string_ids, sdp->getStrings(unique_string_ids));
1468 }
1469 
1479  return false;
1480  } else if (query_mem_desc_.didOutputColumnar()) {
1481  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1483  query_mem_desc_.getQueryDescriptionType() ==
1485  query_mem_desc_.getQueryDescriptionType() ==
1487  query_mem_desc_.getQueryDescriptionType() ==
1489  } else {
1490  CHECK(!(query_mem_desc_.getQueryDescriptionType() ==
1492  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1494  query_mem_desc_.getQueryDescriptionType() ==
1496  }
1497 }
1498 
1500  return query_mem_desc_.didOutputColumnar() &&
1501  (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection ||
1502  query_mem_desc_.getQueryDescriptionType() ==
1504  appended_storage_.empty() && storage_ &&
1505  (lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
1506 }
1507 
1508 const int8_t* ResultSet::getColumnarBuffer(size_t column_idx) const {
1509  CHECK(isZeroCopyColumnarConversionPossible(column_idx));
1510  return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(column_idx);
1511 }
1512 
1513 const size_t ResultSet::getColumnarBufferSize(size_t column_idx) const {
1514  const auto col_context = query_mem_desc_.getColSlotContext();
1515  const auto idx = col_context.getSlotsForCol(column_idx).front();
1516  return query_mem_desc_.getPaddedSlotBufferSize(idx);
1517  if (checkSlotUsesFlatBufferFormat(idx)) {
1518  return query_mem_desc_.getFlatBufferSize(idx);
1519  }
1520  const size_t padded_slot_width = static_cast<size_t>(getPaddedSlotWidthBytes(idx));
1521  return padded_slot_width * entryCount();
1522 }
1523 
1524 // returns a bitmap (and total number) of all single slot targets
1525 std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
1526  std::vector<bool> target_bitmap(targets_.size(), true);
1527  size_t num_single_slot_targets = 0;
1528  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1529  const auto& sql_type = targets_[target_idx].sql_type;
1530  if (targets_[target_idx].is_agg && targets_[target_idx].agg_kind == kAVG) {
1531  target_bitmap[target_idx] = false;
1532  } else if (sql_type.is_varlen()) {
1533  target_bitmap[target_idx] = false;
1534  } else {
1535  num_single_slot_targets++;
1536  }
1537  }
1538  return std::make_tuple(std::move(target_bitmap), num_single_slot_targets);
1539 }
1540 
1549 std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
1550  const {
1551  CHECK(isDirectColumnarConversionPossible());
1552  auto [single_slot_targets, num_single_slot_targets] = getSingleSlotTargetBitmap();
1553 
1554  for (size_t target_idx = 0; target_idx < single_slot_targets.size(); target_idx++) {
1555  const auto& target = targets_[target_idx];
1556  if (single_slot_targets[target_idx] &&
1557  (is_distinct_target(target) ||
1558  shared::is_any<kAPPROX_QUANTILE, kMODE>(target.agg_kind) ||
1559  (target.is_agg && target.agg_kind == kSAMPLE && target.sql_type == kFLOAT))) {
1560  single_slot_targets[target_idx] = false;
1561  num_single_slot_targets--;
1562  }
1563  }
1564  CHECK_GE(num_single_slot_targets, size_t(0));
1565  return std::make_tuple(std::move(single_slot_targets), num_single_slot_targets);
1566 }
1567 
1568 // returns the starting slot index for all targets in the result set
1569 std::vector<size_t> ResultSet::getSlotIndicesForTargetIndices() const {
1570  std::vector<size_t> slot_indices(targets_.size(), 0);
1571  size_t slot_index = 0;
1572  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1573  slot_indices[target_idx] = slot_index;
1574  slot_index = advance_slot(slot_index, targets_[target_idx], false);
1575  }
1576  return slot_indices;
1577 }
1578 
1579 // namespace result_set
1580 
1582  return !rows.isTruncated();
1583 }
1584 
1585 namespace {
1587  bool operator()(TargetInfo const& target_info) const {
1588  return target_info.sql_type.is_dict_encoded_string();
1589  }
1590 };
1591 } // namespace
1592 
1594  std::vector<TargetInfo> const& targets) {
1595  auto const itr = std::find_if(targets.begin(), targets.end(), IsDictEncodedStr{});
1596  return itr == targets.end() ? std::nullopt
1597  : std::make_optional<size_t>(itr - targets.begin());
1598 }
1599 
1601  return result_set::can_use_parallel_algorithms(rows) && rows.entryCount() >= 20000;
1602 }
bool is_agg(const Analyzer::Expr *expr)
int getThreadIdx() const
Definition: ResultSet.cpp:762
void syncEstimatorBuffer() const
Definition: ResultSet.cpp:702
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:301
const QueryMemoryDescriptor & getQueryMemDesc() const
Definition: ResultSet.cpp:678
void sort_groups_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:27
GpuGroupByBuffers create_dev_group_by_buffers(DeviceAllocator *device_allocator, const std::vector< int64_t * > &group_by_buffers, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const ExecutorDispatchMode dispatch_mode, const int64_t num_input_rows, const bool prepend_index_buffer, const bool always_init_group_by_on_host, const bool use_bump_allocator, const bool has_varlen_output, Allocator *insitu_allocator)
Definition: GpuMemUtils.cpp:70
size_t g_parallel_top_max
Definition: ResultSet.cpp:50
std::pair< size_t, size_t > getStorageIndex(const size_t entry_idx) const
Definition: ResultSet.cpp:926
#define NULL_DOUBLE
DEVICE void push_back(T const &value)
Definition: VectorView.h:73
bool isValidationOnlyRes() const
Definition: ResultSet.cpp:754
void setValidationOnlyRes()
Definition: ResultSet.cpp:750
PermutationView initPermutationBuffer(PermutationView permutation, PermutationIdx const begin, PermutationIdx const end) const
Definition: ResultSet.cpp:858
bool g_enable_direct_columnarization
Definition: Execute.cpp:130
void moveToBegin() const
Definition: ResultSet.cpp:737
StringDictionaryProxy * getStringDictionaryProxy(const shared::StringDictKey &dict_key) const
Definition: ResultSet.cpp:429
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
#define NULL_BIGINT
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
#define LOG(tag)
Definition: Logger.h:285
void sort(const std::list< Analyzer::OrderEntry > &order_entries, size_t top_n, const ExecutorDeviceType device_type, const Executor *executor)
Definition: ResultSet.cpp:778
ResultSet::ResultSetComparator< BUFFER_ITERATOR_TYPE > const *const rsc_
Definition: ResultSet.cpp:1106
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
DEVICE RealType quantile(VectorView< IndexType const > const partial_sum, RealType const q) const
Definition: quantile.h:839
static const size_t baseline_threshold
Definition: Execute.h:1549
int tle_no
Definition: Analyzer.h:2680
#define UNREACHABLE()
Definition: Logger.h:338
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const std::vector< TargetInfo > & getTargetInfos() const
Definition: ResultSet.cpp:683
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1593
#define CHECK_GE(x, y)
Definition: Logger.h:306
void setKernelQueueTime(const int64_t kernel_queue_time)
Definition: ResultSet.cpp:720
size_t rowCount(const bool force_parallel=false) const
Returns the number of valid entries in the result set (i.e that will be returned from the SQL query o...
Definition: ResultSet.cpp:599
std::shared_ptr< ResultSet > ResultSetPtr
CellCallback(StringDictionaryProxy::IdMap &&id_map, int64_t const null_int)
Definition: ResultSet.cpp:442
int64_t read_int_from_buff(const int8_t *ptr, const int8_t compact_sz)
void keepFirstN(const size_t n)
Definition: ResultSet.cpp:54
size_t g_streaming_topn_max
Definition: ResultSet.cpp:51
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
void addCompilationQueueTime(const int64_t compilation_queue_time)
Definition: ResultSet.cpp:724
std::vector< std::vector< double >> ApproxQuantileBuffers
Definition: ResultSet.h:829
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:106
void parallelTop(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
Definition: ResultSet.cpp:878
size_t colCount() const
Definition: ResultSet.cpp:416
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
void apply_permutation_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, int64_t *tmp_buff, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:46
DEVICE void resize(size_type const size)
Definition: VectorView.h:74
#define CHECK_GT(x, y)
Definition: Logger.h:305
size_t getLimit() const
Definition: ResultSet.cpp:1409
std::vector< int64_t > materializeCountDistinctColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1011
ExecutorDeviceType
ApproxQuantileBuffers::value_type materializeApproxQuantileColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1057
bool isTruncated() const
Definition: ResultSet.cpp:742
size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset)
Definition: ResultSet.cpp:545
size_t parallelRowCount() const
Definition: ResultSet.cpp:635
void baselineSort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const ExecutorDeviceType device_type, const Executor *executor)
DEVICE void mergeBufferFinal()
Definition: quantile.h:663
void radixSortOnCpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1369
const SQLTypeInfo get_compact_type(const TargetInfo &target)
bool definitelyHasNoRows() const
Definition: ResultSet.cpp:674
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1600
tuple rows
Definition: report.py:114
bool isZeroCopyColumnarConversionPossible(size_t column_idx) const
Definition: ResultSet.cpp:1499
size_t g_parallel_top_min
Definition: ResultSet.cpp:49
int8_t * getHostEstimatorBuffer() const
Definition: ResultSet.cpp:698
Data_Namespace::DataMgr & getDataMgr() const
Definition: SysCatalog.h:234
DEVICE size_type size() const
Definition: VectorView.h:83
void invalidateCachedRowCount() const
Definition: ResultSet.cpp:611
IsAggKind(std::vector< TargetInfo > const &targets, SQLAgg const agg_kind)
Definition: ResultSet.cpp:973
static SysCatalog & instance()
Definition: SysCatalog.h:343
const ResultSetStorage * allocateStorage() const
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
bool operator()(TargetInfo const &target_info) const
Definition: ResultSet.cpp:1587
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void setQueueTime(const int64_t queue_time)
Definition: ResultSet.cpp:716
#define CHECK_NE(x, y)
Definition: Logger.h:302
void dropFirstN(const size_t n)
Definition: ResultSet.cpp:59
const size_t getColumnarBufferSize(size_t column_idx) const
Definition: ResultSet.cpp:1513
DEVICE T * begin() const
Definition: VectorView.h:59
std::vector< PermutationIdx > Permutation
Definition: ResultSet.h:153
std::tuple< std::vector< bool >, size_t > getSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1525
executor_(executor)
bool g_enable_watchdog
#define LIKELY(x)
Definition: likely.h:24
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
StorageLookupResult findStorage(const size_t entry_idx) const
Definition: ResultSet.cpp:951
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:102
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
ResultSetPtr copy()
Definition: ResultSet.cpp:333
std::function< bool(const PermutationIdx, const PermutationIdx)> Comparator
Definition: ResultSet.h:155
bool g_enable_smem_group_by true
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
static double calculateQuantile(quantile::TDigest *const t_digest)
Definition: ResultSet.cpp:1047
std::vector< TargetInfo > const & targets_
Definition: ResultSet.cpp:971
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
void radixSortOnGpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1329
const ResultSetStorage * getStorage() const
Definition: ResultSet.cpp:412
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
const std::pair< std::vector< int32_t >, std::vector< std::string > > getUniqueStringsForDictEncodedTargetCol(const size_t col_idx) const
Definition: ResultSet.cpp:1422
int64_t getQueueTime() const
Definition: ResultSet.cpp:728
#define UNLIKELY(x)
Definition: likely.h:25
ModeBuffers::value_type materializeModeColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1126
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const shared::StringDictKey &dest_dict_key, const std::vector< int32_t > &source_ids, const shared::StringDictKey &source_dict_key, const int32_t dest_generation)
uint32_t PermutationIdx
Definition: ResultSet.h:152
#define CHECK_LT(x, y)
Definition: Logger.h:303
Definition: sqltypes.h:79
SQLTypeInfo getColType(const size_t col_idx) const
Definition: ResultSet.cpp:420
std::tuple< std::vector< bool >, size_t > getSupportedSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1549
ExecutorDeviceType getDeviceType() const
Definition: ResultSet.cpp:254
const int8_t * getColumnarBuffer(size_t column_idx) const
Definition: ResultSet.cpp:1508
bool isExplain() const
Definition: ResultSet.cpp:746
void eachCellInColumn(RowIterationState &, CellCallback const &)
Definition: ResultSet.cpp:491
StringDictionaryProxy::IdMap const id_map_
Definition: ResultSet.cpp:438
const std::vector< std::string > getStringDictionaryPayloadCopy(const shared::StringDictKey &dict_key) const
Definition: ResultSet.cpp:1413
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:766
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t rowCountImpl(const bool force_parallel) const
Definition: ResultSet.cpp:561
const Permutation & getPermutationBuffer() const
Definition: ResultSet.cpp:874
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void append(ResultSet &that)
Definition: ResultSet.cpp:303
std::string summaryToString() const
Definition: ResultSet.cpp:222
data_mgr_(data_mgr)
static PermutationView topPermutation(PermutationView, const size_t n, const Comparator &)
Definition: ResultSet.cpp:1315
size_t getCurrentRowBufferIndex() const
Definition: ResultSet.cpp:295
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
int8_t * getDeviceEstimatorBuffer() const
Definition: ResultSet.cpp:692
int64_t materializeMode(int64_t const i1)
Definition: ResultSet.cpp:1091
tbb::blocked_range< size_t > ModeBlockedRange
Definition: ResultSet.cpp:1100
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool operator()(const PermutationIdx lhs, const PermutationIdx rhs) const
Definition: ResultSet.cpp:1141
Basic constructors and methods of the row set interface.
bool operator()(Analyzer::OrderEntry const &order_entry) const
Definition: ResultSet.cpp:975
bool isEmpty() const
Returns a boolean signifying whether there are valid entries in the result set.
Definition: ResultSet.cpp:655
bool is_dict_encoded_string() const
Definition: sqltypes.h:641
const std::vector< int64_t > & getTargetInitVals() const
Definition: ResultSet.cpp:687
std::vector< size_t > getSlotIndicesForTargetIndices() const
Definition: ResultSet.cpp:1569
Allocate GPU memory using GpuBuffers via DataMgr.
constexpr double n
Definition: Utm.h:38
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
constexpr int64_t uninitialized_cached_row_count
Definition: ResultSet.cpp:52
Definition: Analyzer.h:2675
int cpu_threads()
Definition: thread_count.h:25
void operator()(ModeBlockedRange const &r) const
Definition: ResultSet.cpp:1110
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:74
void translateDictEncodedColumns(std::vector< TargetInfo > const &, size_t const start_idx)
Definition: ResultSet.cpp:459
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void copy_group_by_buffers_from_gpu(DeviceAllocator &device_allocator, const std::vector< int64_t * > &group_by_buffers, const size_t groups_buffer_size, const int8_t *group_by_dev_buffers_mem, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const bool prepend_index_buffer, const bool has_varlen_output)
void operator()(int8_t const *const cell_ptr) const
Definition: ResultSet.cpp:444
bool can_use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1581
int64_t getRenderTime() const
Definition: ResultSet.cpp:733
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:64
void setCachedRowCount(const size_t row_count) const
Definition: ResultSet.cpp:615
bool isDirectColumnarConversionPossible() const
Definition: ResultSet.cpp:1477
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:83
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:880
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ModeBuffers materializeModeColumns() const
Definition: ResultSet.cpp:996
ResultSet::ModeBuffers::value_type & materialized_buffer_
Definition: ResultSet.cpp:1108
static constexpr int32_t literalsDictId
Definition: DictRef.h:18
size_t binSearchRowCount() const
Definition: ResultSet.cpp:622
int getDeviceId() const
Definition: ResultSet.cpp:758
std::vector< std::vector< int64_t >> ModeBuffers
Definition: ResultSet.h:830
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180
DEVICE T * end() const
Definition: VectorView.h:67