OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSet.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "ResultSet.h"
26 #include "Execute.h"
27 #include "GpuMemUtils.h"
28 #include "InPlaceSort.h"
31 #include "RelAlgExecutionUnit.h"
32 #include "RuntimeFunctions.h"
33 #include "Shared/Intervals.h"
34 #include "Shared/SqlTypesLayout.h"
35 #include "Shared/checked_alloc.h"
36 #include "Shared/likely.h"
37 #include "Shared/thread_count.h"
38 #include "Shared/threading.h"
39 
40 #include <tbb/parallel_for.h>
41 
42 #include <algorithm>
43 #include <atomic>
44 #include <bitset>
45 #include <functional>
46 #include <future>
47 #include <numeric>
48 
49 size_t g_parallel_top_min = 100e3;
50 size_t g_parallel_top_max = 20e6; // In effect only with g_enable_watchdog.
51 size_t g_streaming_topn_max = 100e3;
52 constexpr int64_t uninitialized_cached_row_count{-1};
53 
54 void ResultSet::keepFirstN(const size_t n) {
55  invalidateCachedRowCount();
56  keep_first_ = n;
57 }
58 
59 void ResultSet::dropFirstN(const size_t n) {
60  invalidateCachedRowCount();
61  drop_first_ = n;
62 }
63 
64 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
65  const ExecutorDeviceType device_type,
67  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
68  const Catalog_Namespace::Catalog* catalog,
69  const unsigned block_size,
70  const unsigned grid_size)
71  : targets_(targets)
72  , device_type_(device_type)
73  , device_id_(-1)
74  , query_mem_desc_(query_mem_desc)
75  , crt_row_buff_idx_(0)
76  , fetched_so_far_(0)
77  , drop_first_(0)
78  , keep_first_(0)
79  , row_set_mem_owner_(row_set_mem_owner)
80  , catalog_(catalog)
81  , block_size_(block_size)
82  , grid_size_(grid_size)
83  , data_mgr_(nullptr)
84  , separate_varlen_storage_valid_(false)
85  , just_explain_(false)
86  , for_validation_only_(false)
87  , cached_row_count_(uninitialized_cached_row_count)
88  , geo_return_type_(GeoReturnType::WktString)
89  , cached_(false)
90  , query_exec_time_(0)
91  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
92  , can_use_speculative_top_n_sort(std::nullopt) {}
93 
94 ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
95  const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
96  const std::vector<std::vector<const int8_t*>>& col_buffers,
97  const std::vector<std::vector<int64_t>>& frag_offsets,
98  const std::vector<int64_t>& consistent_frag_sizes,
99  const ExecutorDeviceType device_type,
100  const int device_id,
102  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
103  const Catalog_Namespace::Catalog* catalog,
104  const unsigned block_size,
105  const unsigned grid_size)
106  : targets_(targets)
107  , device_type_(device_type)
108  , device_id_(device_id)
109  , query_mem_desc_(query_mem_desc)
110  , crt_row_buff_idx_(0)
111  , fetched_so_far_(0)
112  , drop_first_(0)
113  , keep_first_(0)
114  , row_set_mem_owner_(row_set_mem_owner)
115  , catalog_(catalog)
116  , block_size_(block_size)
117  , grid_size_(grid_size)
118  , lazy_fetch_info_(lazy_fetch_info)
119  , col_buffers_{col_buffers}
120  , frag_offsets_{frag_offsets}
121  , consistent_frag_sizes_{consistent_frag_sizes}
122  , data_mgr_(nullptr)
123  , separate_varlen_storage_valid_(false)
124  , just_explain_(false)
125  , for_validation_only_(false)
126  , cached_row_count_(uninitialized_cached_row_count)
127  , geo_return_type_(GeoReturnType::WktString)
128  , cached_(false)
129  , query_exec_time_(0)
130  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
131  , can_use_speculative_top_n_sort(std::nullopt) {}
132 
133 ResultSet::ResultSet(const std::shared_ptr<const Analyzer::Estimator> estimator,
134  const ExecutorDeviceType device_type,
135  const int device_id,
136  Data_Namespace::DataMgr* data_mgr)
137  : device_type_(device_type)
138  , device_id_(device_id)
139  , query_mem_desc_{}
140  , crt_row_buff_idx_(0)
141  , estimator_(estimator)
142  , data_mgr_(data_mgr)
143  , separate_varlen_storage_valid_(false)
144  , just_explain_(false)
145  , for_validation_only_(false)
146  , cached_row_count_(uninitialized_cached_row_count)
147  , geo_return_type_(GeoReturnType::WktString)
148  , cached_(false)
149  , query_exec_time_(0)
150  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
151  , can_use_speculative_top_n_sort(std::nullopt) {
152  if (device_type == ExecutorDeviceType::GPU) {
153  device_estimator_buffer_ = CudaAllocator::allocGpuAbstractBuffer(
154  data_mgr_, estimator_->getBufferSize(), device_id_);
155  data_mgr->getCudaMgr()->zeroDeviceMem(device_estimator_buffer_->getMemoryPtr(),
156  estimator_->getBufferSize(),
157  device_id_,
159  } else {
160  host_estimator_buffer_ =
161  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
162  }
163 }
164 
165 ResultSet::ResultSet(const std::string& explanation)
166  : device_type_(ExecutorDeviceType::CPU)
167  , device_id_(-1)
168  , fetched_so_far_(0)
169  , separate_varlen_storage_valid_(false)
170  , explanation_(explanation)
171  , just_explain_(true)
172  , for_validation_only_(false)
173  , cached_row_count_(uninitialized_cached_row_count)
174  , geo_return_type_(GeoReturnType::WktString)
175  , cached_(false)
176  , query_exec_time_(0)
177  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
178  , can_use_speculative_top_n_sort(std::nullopt) {}
179 
180 ResultSet::ResultSet(int64_t queue_time_ms,
181  int64_t render_time_ms,
182  const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
183  : device_type_(ExecutorDeviceType::CPU)
184  , device_id_(-1)
185  , fetched_so_far_(0)
186  , row_set_mem_owner_(row_set_mem_owner)
187  , timings_(QueryExecutionTimings{queue_time_ms, render_time_ms, 0, 0})
188  , separate_varlen_storage_valid_(false)
189  , just_explain_(true)
190  , for_validation_only_(false)
191  , cached_row_count_(uninitialized_cached_row_count)
192  , geo_return_type_(GeoReturnType::WktString)
193  , cached_(false)
194  , query_exec_time_(0)
195  , query_plan_(EMPTY_HASHED_PLAN_DAG_KEY)
196  , can_use_speculative_top_n_sort(std::nullopt) {}
197 
199  if (storage_) {
200  if (!storage_->buff_is_provided_) {
201  CHECK(storage_->getUnderlyingBuffer());
202  free(storage_->getUnderlyingBuffer());
203  }
204  }
205  for (auto& storage : appended_storage_) {
206  if (storage && !storage->buff_is_provided_) {
207  free(storage->getUnderlyingBuffer());
208  }
209  }
210  if (host_estimator_buffer_) {
211  CHECK(device_type_ == ExecutorDeviceType::CPU || device_estimator_buffer_);
212  free(host_estimator_buffer_);
213  }
214  if (device_estimator_buffer_) {
215  CHECK(data_mgr_);
216  data_mgr_->free(device_estimator_buffer_);
217  }
218 }
219 
220 std::string ResultSet::summaryToString() const {
221  std::ostringstream oss;
222  oss << "Result Set Info" << std::endl;
223  oss << "\tLayout: " << query_mem_desc_.queryDescTypeToString() << std::endl;
224  oss << "\tColumns: " << colCount() << std::endl;
225  oss << "\tRows: " << rowCount() << std::endl;
226  oss << "\tEntry count: " << entryCount() << std::endl;
227  const std::string is_empty = isEmpty() ? "True" : "False";
228  oss << "\tIs empty: " << is_empty << std::endl;
229  const std::string did_output_columnar = didOutputColumnar() ? "True" : "False;";
230  oss << "\tColumnar: " << did_output_columnar << std::endl;
231  oss << "\tLazy-fetched columns: " << getNumColumnsLazyFetched() << std::endl;
232  const std::string is_direct_columnar_conversion_possible =
233  isDirectColumnarConversionPossible() ? "True" : "False";
234  oss << "\tDirect columnar conversion possible: "
235  << is_direct_columnar_conversion_possible << std::endl;
236 
237  size_t num_columns_zero_copy_columnarizable{0};
238  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
239  if (isZeroCopyColumnarConversionPossible(target_idx)) {
240  num_columns_zero_copy_columnarizable++;
241  }
242  }
243  oss << "\tZero-copy columnar conversion columns: "
244  << num_columns_zero_copy_columnarizable << std::endl;
245 
246  oss << "\tPermutation size: " << permutation_.size() << std::endl;
247  oss << "\tLimit: " << keep_first_ << std::endl;
248  oss << "\tOffset: " << drop_first_ << std::endl;
249  return oss.str();
250 }
251 
253  return device_type_;
254 }
255 
257  CHECK(!storage_);
258  CHECK(row_set_mem_owner_);
259  auto buff = row_set_mem_owner_->allocate(
260  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
261  storage_.reset(
262  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
263  return storage_.get();
264 }
265 
267  int8_t* buff,
268  const std::vector<int64_t>& target_init_vals,
269  std::shared_ptr<VarlenOutputInfo> varlen_output_info) const {
270  CHECK(buff);
271  CHECK(!storage_);
272  storage_.reset(new ResultSetStorage(targets_, query_mem_desc_, buff, true));
273  // TODO: add both to the constructor
274  storage_->target_init_vals_ = target_init_vals;
275  if (varlen_output_info) {
276  storage_->varlen_output_info_ = varlen_output_info;
277  }
278  return storage_.get();
279 }
280 
282  const std::vector<int64_t>& target_init_vals) const {
283  CHECK(!storage_);
284  CHECK(row_set_mem_owner_);
285  auto buff = row_set_mem_owner_->allocate(
286  query_mem_desc_.getBufferSizeBytes(device_type_), /*thread_idx=*/0);
287  storage_.reset(
288  new ResultSetStorage(targets_, query_mem_desc_, buff, /*buff_is_provided=*/true));
289  storage_->target_init_vals_ = target_init_vals;
290  return storage_.get();
291 }
292 
294  if (crt_row_buff_idx_ == 0) {
295  throw std::runtime_error("current row buffer iteration index is undefined");
296  }
297  return crt_row_buff_idx_ - 1;
298 }
299 
300 // Note: that.appended_storage_ does not get appended to this.
301 void ResultSet::append(ResultSet& that) {
302  invalidateCachedRowCount();
303  if (!that.storage_) {
304  return;
305  }
306  appended_storage_.push_back(std::move(that.storage_));
307  query_mem_desc_.setEntryCount(
308  query_mem_desc_.getEntryCount() +
309  appended_storage_.back()->query_mem_desc_.getEntryCount());
310  chunks_.insert(chunks_.end(), that.chunks_.begin(), that.chunks_.end());
311  col_buffers_.insert(
312  col_buffers_.end(), that.col_buffers_.begin(), that.col_buffers_.end());
313  frag_offsets_.insert(
314  frag_offsets_.end(), that.frag_offsets_.begin(), that.frag_offsets_.end());
315  consistent_frag_sizes_.insert(consistent_frag_sizes_.end(),
316  that.consistent_frag_sizes_.begin(),
317  that.consistent_frag_sizes_.end());
318  chunk_iters_.insert(
319  chunk_iters_.end(), that.chunk_iters_.begin(), that.chunk_iters_.end());
320  if (separate_varlen_storage_valid_) {
321  CHECK(that.separate_varlen_storage_valid_);
322  serialized_varlen_buffer_.insert(serialized_varlen_buffer_.end(),
323  that.serialized_varlen_buffer_.begin(),
324  that.serialized_varlen_buffer_.end());
325  }
326  for (auto& buff : that.literal_buffers_) {
327  literal_buffers_.push_back(std::move(buff));
328  }
329 }
330 
332  auto timer = DEBUG_TIMER(__func__);
333  if (!storage_) {
334  return nullptr;
335  }
336 
337  auto executor = getExecutor();
338  CHECK(executor);
339  ResultSetPtr copied_rs = std::make_shared<ResultSet>(targets_,
340  device_type_,
341  query_mem_desc_,
342  row_set_mem_owner_,
343  executor->getCatalog(),
344  executor->blockSize(),
345  executor->gridSize());
346 
347  auto allocate_and_copy_storage =
348  [&](const ResultSetStorage* prev_storage) -> std::unique_ptr<ResultSetStorage> {
349  const auto& prev_qmd = prev_storage->query_mem_desc_;
350  const auto storage_size = prev_qmd.getBufferSizeBytes(device_type_);
351  auto buff = row_set_mem_owner_->allocate(storage_size, /*thread_idx=*/0);
352  std::unique_ptr<ResultSetStorage> new_storage;
353  new_storage.reset(new ResultSetStorage(
354  prev_storage->targets_, prev_qmd, buff, /*buff_is_provided=*/true));
355  new_storage->target_init_vals_ = prev_storage->target_init_vals_;
356  if (prev_storage->varlen_output_info_) {
357  new_storage->varlen_output_info_ = prev_storage->varlen_output_info_;
358  }
359  memcpy(new_storage->buff_, prev_storage->buff_, storage_size);
360  new_storage->query_mem_desc_ = prev_qmd;
361  return new_storage;
362  };
363 
364  copied_rs->storage_ = allocate_and_copy_storage(storage_.get());
365  if (!appended_storage_.empty()) {
366  for (const auto& storage : appended_storage_) {
367  copied_rs->appended_storage_.push_back(allocate_and_copy_storage(storage.get()));
368  }
369  }
370  std::copy(chunks_.begin(), chunks_.end(), std::back_inserter(copied_rs->chunks_));
371  std::copy(chunk_iters_.begin(),
372  chunk_iters_.end(),
373  std::back_inserter(copied_rs->chunk_iters_));
374  std::copy(col_buffers_.begin(),
375  col_buffers_.end(),
376  std::back_inserter(copied_rs->col_buffers_));
377  std::copy(frag_offsets_.begin(),
378  frag_offsets_.end(),
379  std::back_inserter(copied_rs->frag_offsets_));
380  std::copy(consistent_frag_sizes_.begin(),
381  consistent_frag_sizes_.end(),
382  std::back_inserter(copied_rs->consistent_frag_sizes_));
383  if (separate_varlen_storage_valid_) {
384  std::copy(serialized_varlen_buffer_.begin(),
385  serialized_varlen_buffer_.end(),
386  std::back_inserter(copied_rs->serialized_varlen_buffer_));
387  }
388  std::copy(literal_buffers_.begin(),
389  literal_buffers_.end(),
390  std::back_inserter(copied_rs->literal_buffers_));
391  std::copy(lazy_fetch_info_.begin(),
392  lazy_fetch_info_.end(),
393  std::back_inserter(copied_rs->lazy_fetch_info_));
394 
395  copied_rs->permutation_ = permutation_;
396  copied_rs->drop_first_ = drop_first_;
397  copied_rs->keep_first_ = keep_first_;
398  copied_rs->separate_varlen_storage_valid_ = separate_varlen_storage_valid_;
399  copied_rs->query_exec_time_ = query_exec_time_;
400  copied_rs->input_table_keys_ = input_table_keys_;
401  copied_rs->target_meta_info_ = target_meta_info_;
402  copied_rs->geo_return_type_ = geo_return_type_;
403  copied_rs->query_plan_ = query_plan_;
404  if (can_use_speculative_top_n_sort) {
405  copied_rs->can_use_speculative_top_n_sort = can_use_speculative_top_n_sort;
406  }
407 
408  return copied_rs;
409 }
410 
412  return storage_.get();
413 }
414 
415 size_t ResultSet::colCount() const {
416  return just_explain_ ? 1 : targets_.size();
417 }
418 
419 SQLTypeInfo ResultSet::getColType(const size_t col_idx) const {
420  if (just_explain_) {
421  return SQLTypeInfo(kTEXT, false);
422  }
423  CHECK_LT(col_idx, targets_.size());
424  return targets_[col_idx].agg_kind == kAVG ? SQLTypeInfo(kDOUBLE, false)
425  : targets_[col_idx].sql_type;
426 }
427 
429  constexpr bool with_generation = true;
430  return catalog_ ? row_set_mem_owner_->getOrAddStringDictProxy(
431  dict_id, with_generation, catalog_)
432  : row_set_mem_owner_->getStringDictProxy(dict_id);
433 }
434 
437  int64_t const null_int_;
438 
439  public:
440  CellCallback(StringDictionaryProxy::IdMap&& id_map, int64_t const null_int)
441  : id_map_(std::move(id_map)), null_int_(null_int) {}
442  void operator()(int8_t const* const cell_ptr) const {
443  using StringId = int32_t;
444  StringId* const string_id_ptr =
445  const_cast<StringId*>(reinterpret_cast<StringId const*>(cell_ptr));
446  if (*string_id_ptr != null_int_) {
447  *string_id_ptr = id_map_[*string_id_ptr];
448  }
449  }
450 };
451 
452 // Update any dictionary-encoded targets within storage_ with the corresponding
453 // dictionary in the given targets parameter, if their comp_param (dictionary) differs.
454 // This may modify both the storage_ values and storage_ targets.
455 // Does not iterate through appended_storage_.
456 // Iterate over targets starting at index target_idx.
457 void ResultSet::translateDictEncodedColumns(std::vector<TargetInfo> const& targets,
458  size_t const start_idx) {
459  if (storage_) {
460  CHECK_EQ(targets.size(), storage_->targets_.size());
461  RowIterationState state;
462  for (size_t target_idx = start_idx; target_idx < targets.size(); ++target_idx) {
463  auto const& type_lhs = targets[target_idx].sql_type;
464  if (type_lhs.is_dict_encoded_string()) {
465  auto& type_rhs =
466  const_cast<SQLTypeInfo&>(storage_->targets_[target_idx].sql_type);
467  CHECK(type_rhs.is_dict_encoded_string());
468  if (type_lhs.get_comp_param() != type_rhs.get_comp_param()) {
469  auto* const sdp_lhs = getStringDictionaryProxy(type_lhs.get_comp_param());
470  CHECK(sdp_lhs);
471  auto const* const sdp_rhs = getStringDictionaryProxy(type_rhs.get_comp_param());
472  CHECK(sdp_rhs);
473  state.cur_target_idx_ = target_idx;
474  CellCallback const translate_string_ids(sdp_lhs->transientUnion(*sdp_rhs),
475  inline_int_null_val(type_rhs));
476  eachCellInColumn(state, translate_string_ids);
477  type_rhs.set_comp_param(type_lhs.get_comp_param());
478  }
479  }
480  }
481  }
482 }
483 
484 // For each cell in column target_idx, callback func with pointer to datum.
485 // This currently assumes the column type is a dictionary-encoded string, but this logic
486 // can be generalized to other types.
487 void ResultSet::eachCellInColumn(RowIterationState& state, CellCallback const& func) {
488  size_t const target_idx = state.cur_target_idx_;
489  QueryMemoryDescriptor& storage_qmd = storage_->query_mem_desc_;
490  CHECK_LT(target_idx, lazy_fetch_info_.size());
491  auto& col_lazy_fetch = lazy_fetch_info_[target_idx];
492  CHECK(col_lazy_fetch.is_lazily_fetched);
493  int const target_size = storage_->targets_[target_idx].sql_type.get_size();
494  CHECK_LT(0, target_size) << storage_->targets_[target_idx].toString();
495  size_t const nrows = storage_->binSearchRowCount();
496  if (storage_qmd.didOutputColumnar()) {
497  // Logic based on ResultSet::ColumnWiseTargetAccessor::initializeOffsetsForStorage()
498  if (state.buf_ptr_ == nullptr) {
499  state.buf_ptr_ = get_cols_ptr(storage_->buff_, storage_qmd);
500  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
501  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
502  : query_mem_desc_.getEffectiveKeyWidth();
503  }
504  for (size_t j = state.prev_target_idx_; j < state.cur_target_idx_; ++j) {
505  size_t const next_target_idx = j + 1; // Set state to reflect next target_idx j+1
507  state.buf_ptr_, storage_qmd, state.agg_idx_);
508  auto const& next_agg_info = storage_->targets_[next_target_idx];
509  state.agg_idx_ =
510  advance_slot(state.agg_idx_, next_agg_info, separate_varlen_storage_valid_);
511  state.compact_sz1_ = storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
512  ? storage_qmd.getPaddedSlotWidthBytes(state.agg_idx_)
513  : query_mem_desc_.getEffectiveKeyWidth();
514  }
515  for (size_t i = 0; i < nrows; ++i) {
516  int8_t const* const pos_ptr = state.buf_ptr_ + i * state.compact_sz1_;
517  int64_t pos = read_int_from_buff(pos_ptr, target_size);
518  CHECK_GE(pos, 0);
519  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
520  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
521  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
522  func(col_frag + pos * target_size);
523  }
524  } else {
525  size_t const key_bytes_with_padding =
527  for (size_t i = 0; i < nrows; ++i) {
528  int8_t const* const keys_ptr = row_ptr_rowwise(storage_->buff_, storage_qmd, i);
529  int8_t const* const rowwise_target_ptr = keys_ptr + key_bytes_with_padding;
530  int64_t pos = *reinterpret_cast<int64_t const*>(rowwise_target_ptr);
531  auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
532  CHECK_LT(size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
533  int8_t const* const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
534  func(col_frag + pos * target_size);
535  }
536  }
537 }
538 
539 namespace {
540 
541 size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset) {
542  if (total_row_count < offset) {
543  return 0;
544  }
545 
546  size_t total_truncated_row_count = total_row_count - offset;
547 
548  if (limit) {
549  return std::min(total_truncated_row_count, limit);
550  }
551 
552  return total_truncated_row_count;
553 }
554 
555 } // namespace
556 
557 size_t ResultSet::rowCountImpl(const bool force_parallel) const {
558  if (just_explain_) {
559  return 1;
560  }
561  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::TableFunction) {
562  return entryCount();
563  }
564  if (!permutation_.empty()) {
565  // keep_first_ corresponds to SQL LIMIT
566  // drop_first_ corresponds to SQL OFFSET
567  return get_truncated_row_count(permutation_.size(), keep_first_, drop_first_);
568  }
569  if (!storage_) {
570  return 0;
571  }
572  CHECK(permutation_.empty());
573  if (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection) {
574  return binSearchRowCount();
575  }
576 
577  constexpr size_t auto_parallel_row_count_threshold{20000UL};
578  if (force_parallel || entryCount() >= auto_parallel_row_count_threshold) {
579  return parallelRowCount();
580  }
581  std::lock_guard<std::mutex> lock(row_iteration_mutex_);
582  moveToBegin();
583  size_t row_count{0};
584  while (true) {
585  auto crt_row = getNextRowUnlocked(false, false);
586  if (crt_row.empty()) {
587  break;
588  }
589  ++row_count;
590  }
591  moveToBegin();
592  return row_count;
593 }
594 
595 size_t ResultSet::rowCount(const bool force_parallel) const {
596  // cached_row_count_ is atomic, so fetch it into a local variable first
597  // to avoid repeat fetches
598  const int64_t cached_row_count = cached_row_count_;
599  if (cached_row_count != uninitialized_cached_row_count) {
600  CHECK_GE(cached_row_count, 0);
601  return cached_row_count;
602  }
603  setCachedRowCount(rowCountImpl(force_parallel));
604  return cached_row_count_;
605 }
606 
608  cached_row_count_ = uninitialized_cached_row_count;
609 }
610 
611 void ResultSet::setCachedRowCount(const size_t row_count) const {
612  const int64_t signed_row_count = static_cast<int64_t>(row_count);
613  const int64_t old_cached_row_count = cached_row_count_.exchange(signed_row_count);
614  CHECK(old_cached_row_count == uninitialized_cached_row_count ||
615  old_cached_row_count == signed_row_count);
616 }
617 
619  if (!storage_) {
620  return 0;
621  }
622 
623  size_t row_count = storage_->binSearchRowCount();
624  for (auto& s : appended_storage_) {
625  row_count += s->binSearchRowCount();
626  }
627 
628  return get_truncated_row_count(row_count, getLimit(), drop_first_);
629 }
630 
632  using namespace threading;
633  auto execute_parallel_row_count =
634  [this, parent_thread_local_ids = logger::thread_local_ids()](
635  const blocked_range<size_t>& r, size_t row_count) {
636  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
637  for (size_t i = r.begin(); i < r.end(); ++i) {
638  if (!isRowAtEmpty(i)) {
639  ++row_count;
640  }
641  }
642  return row_count;
643  };
644  const auto row_count = parallel_reduce(blocked_range<size_t>(0, entryCount()),
645  size_t(0),
646  execute_parallel_row_count,
647  std::plus<int>());
648  return get_truncated_row_count(row_count, getLimit(), drop_first_);
649 }
650 
651 bool ResultSet::isEmpty() const {
652  // To simplify this function and de-dup logic with ResultSet::rowCount()
653  // (mismatches between the two were causing bugs), we modified this function
654  // to simply fetch rowCount(). The potential downside of this approach is that
655  // in some cases more work will need to be done, as we can't just stop at the first row.
656  // Mitigating that for most cases is the following:
657  // 1) rowCount() is cached, so the logic for actually computing row counts will run only
658  // once
659  // per result set.
660  // 2) If the cache is empty (cached_row_count_ == -1), rowCount() will use parallel
661  // methods if deemed appropriate, which in many cases could be faster for a sparse
662  // large result set that single-threaded iteration from the beginning
663  // 3) Often where isEmpty() is needed, rowCount() is also needed. Since the first call
664  // to rowCount()
665  // will be cached, there is no extra overhead in these cases
666 
667  return rowCount() == size_t(0);
668 }
669 
671  return (!storage_ && !estimator_ && !just_explain_) || cached_row_count_ == 0;
672 }
673 
675  CHECK(storage_);
676  return storage_->query_mem_desc_;
677 }
678 
679 const std::vector<TargetInfo>& ResultSet::getTargetInfos() const {
680  return targets_;
681 }
682 
683 const std::vector<int64_t>& ResultSet::getTargetInitVals() const {
684  CHECK(storage_);
685  return storage_->target_init_vals_;
686 }
687 
689  CHECK(device_type_ == ExecutorDeviceType::GPU);
690  CHECK(device_estimator_buffer_);
691  return device_estimator_buffer_->getMemoryPtr();
692 }
693 
695  return host_estimator_buffer_;
696 }
697 
699  CHECK(device_type_ == ExecutorDeviceType::GPU);
700  CHECK(!host_estimator_buffer_);
701  CHECK_EQ(size_t(0), estimator_->getBufferSize() % sizeof(int64_t));
702  host_estimator_buffer_ =
703  static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
704  CHECK(device_estimator_buffer_);
705  auto device_buffer_ptr = device_estimator_buffer_->getMemoryPtr();
706  auto allocator = std::make_unique<CudaAllocator>(
707  data_mgr_, device_id_, getQueryEngineCudaStreamForDevice(device_id_));
708  allocator->copyFromDevice(
709  host_estimator_buffer_, device_buffer_ptr, estimator_->getBufferSize());
710 }
711 
712 void ResultSet::setQueueTime(const int64_t queue_time) {
713  timings_.executor_queue_time = queue_time;
714 }
715 
716 void ResultSet::setKernelQueueTime(const int64_t kernel_queue_time) {
717  timings_.kernel_queue_time = kernel_queue_time;
718 }
719 
720 void ResultSet::addCompilationQueueTime(const int64_t compilation_queue_time) {
721  timings_.compilation_queue_time += compilation_queue_time;
722 }
723 
724 int64_t ResultSet::getQueueTime() const {
725  return timings_.executor_queue_time + timings_.kernel_queue_time +
726  timings_.compilation_queue_time;
727 }
728 
729 int64_t ResultSet::getRenderTime() const {
730  return timings_.render_time;
731 }
732 
734  crt_row_buff_idx_ = 0;
735  fetched_so_far_ = 0;
736 }
737 
739  return keep_first_ + drop_first_;
740 }
741 
742 bool ResultSet::isExplain() const {
743  return just_explain_;
744 }
745 
747  for_validation_only_ = true;
748 }
749 
751  return for_validation_only_;
752 }
753 
755  return device_id_;
756 }
757 
760  auto query_mem_desc_copy = query_mem_desc;
761  query_mem_desc_copy.resetGroupColWidths(
762  std::vector<int8_t>(query_mem_desc_copy.getGroupbyColCount(), 8));
763  if (query_mem_desc.didOutputColumnar()) {
764  return query_mem_desc_copy;
765  }
766  query_mem_desc_copy.alignPaddedSlots();
767  return query_mem_desc_copy;
768 }
769 
770 void ResultSet::sort(const std::list<Analyzer::OrderEntry>& order_entries,
771  size_t top_n,
772  const Executor* executor) {
773  auto timer = DEBUG_TIMER(__func__);
774 
775  if (!storage_) {
776  return;
777  }
778  invalidateCachedRowCount();
779  CHECK(!targets_.empty());
780 #ifdef HAVE_CUDA
781  if (canUseFastBaselineSort(order_entries, top_n)) {
782  baselineSort(order_entries, top_n, executor);
783  return;
784  }
785 #endif // HAVE_CUDA
786  if (query_mem_desc_.sortOnGpu()) {
787  try {
788  radixSortOnGpu(order_entries);
789  } catch (const OutOfMemory&) {
790  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
791  radixSortOnCpu(order_entries);
792  } catch (const std::bad_alloc&) {
793  LOG(WARNING) << "Out of GPU memory during sort, finish on CPU";
794  radixSortOnCpu(order_entries);
795  }
796  return;
797  }
798  // This check isn't strictly required, but allows the index buffer to be 32-bit.
799  if (query_mem_desc_.getEntryCount() > std::numeric_limits<uint32_t>::max()) {
800  throw RowSortException("Sorting more than 4B elements not supported");
801  }
802 
803  CHECK(permutation_.empty());
804 
805  if (top_n && g_parallel_top_min < entryCount()) {
806  if (g_enable_watchdog && g_parallel_top_max < entryCount()) {
807  throw WatchdogException("Sorting the result would be too slow");
808  }
809  parallelTop(order_entries, top_n, executor);
810  } else {
811  if (g_enable_watchdog && Executor::baseline_threshold < entryCount()) {
812  throw WatchdogException("Sorting the result would be too slow");
813  }
814  permutation_.resize(query_mem_desc_.getEntryCount());
815  // PermutationView is used to share common API with parallelTop().
816  PermutationView pv(permutation_.data(), 0, permutation_.size());
817  pv = initPermutationBuffer(pv, 0, permutation_.size());
818  if (top_n == 0) {
819  top_n = pv.size(); // top_n == 0 implies a full sort
820  }
821  pv = topPermutation(pv, top_n, createComparator(order_entries, pv, executor, false));
822  if (pv.size() < permutation_.size()) {
823  permutation_.resize(pv.size());
824  permutation_.shrink_to_fit();
825  }
826  }
827 }
828 
829 #ifdef HAVE_CUDA
830 void ResultSet::baselineSort(const std::list<Analyzer::OrderEntry>& order_entries,
831  const size_t top_n,
832  const Executor* executor) {
833  auto timer = DEBUG_TIMER(__func__);
834  // If we only have on GPU, it's usually faster to do multi-threaded radix sort on CPU
835  if (getGpuCount() > 1) {
836  try {
837  doBaselineSort(ExecutorDeviceType::GPU, order_entries, top_n, executor);
838  } catch (...) {
839  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
840  }
841  } else {
842  doBaselineSort(ExecutorDeviceType::CPU, order_entries, top_n, executor);
843  }
844 }
845 #endif // HAVE_CUDA
846 
847 // Append non-empty indexes i in [begin,end) from findStorage(i) to permutation.
849  PermutationIdx const begin,
850  PermutationIdx const end) const {
851  auto timer = DEBUG_TIMER(__func__);
852  for (PermutationIdx i = begin; i < end; ++i) {
853  const auto storage_lookup_result = findStorage(i);
854  const auto lhs_storage = storage_lookup_result.storage_ptr;
855  const auto off = storage_lookup_result.fixedup_entry_idx;
856  CHECK(lhs_storage);
857  if (!lhs_storage->isEmptyEntry(off)) {
858  permutation.push_back(i);
859  }
860  }
861  return permutation;
862 }
863 
865  return permutation_;
866 }
867 
868 void ResultSet::parallelTop(const std::list<Analyzer::OrderEntry>& order_entries,
869  const size_t top_n,
870  const Executor* executor) {
871  auto timer = DEBUG_TIMER(__func__);
872  const size_t nthreads = cpu_threads();
873 
874  // Split permutation_ into nthreads subranges and top-sort in-place.
875  permutation_.resize(query_mem_desc_.getEntryCount());
876  std::vector<PermutationView> permutation_views(nthreads);
877  threading::task_group top_sort_threads;
878  for (auto interval : makeIntervals<PermutationIdx>(0, permutation_.size(), nthreads)) {
879  top_sort_threads.run([this,
880  &order_entries,
881  &permutation_views,
882  top_n,
883  executor,
884  parent_thread_local_ids = logger::thread_local_ids(),
885  interval] {
886  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
887  PermutationView pv(permutation_.data() + interval.begin, 0, interval.size());
888  pv = initPermutationBuffer(pv, interval.begin, interval.end);
889  const auto compare = createComparator(order_entries, pv, executor, true);
890  permutation_views[interval.index] = topPermutation(pv, top_n, compare);
891  });
892  }
893  top_sort_threads.wait();
894 
895  // In case you are considering implementing a parallel reduction, note that the
896  // ResultSetComparator constructor is O(N) in order to materialize some of the aggregate
897  // columns as necessary to perform a comparison. This cost is why reduction is chosen to
898  // be serial instead; only one more Comparator is needed below.
899 
900  // Left-copy disjoint top-sorted subranges into one contiguous range.
901  // ++++....+++.....+++++... -> ++++++++++++............
902  auto end = permutation_.begin() + permutation_views.front().size();
903  for (size_t i = 1; i < nthreads; ++i) {
904  std::copy(permutation_views[i].begin(), permutation_views[i].end(), end);
905  end += permutation_views[i].size();
906  }
907 
908  // Top sort final range.
909  PermutationView pv(permutation_.data(), end - permutation_.begin());
910  const auto compare = createComparator(order_entries, pv, executor, false);
911  pv = topPermutation(pv, top_n, compare);
912  permutation_.resize(pv.size());
913  permutation_.shrink_to_fit();
914 }
915 
916 std::pair<size_t, size_t> ResultSet::getStorageIndex(const size_t entry_idx) const {
917  size_t fixedup_entry_idx = entry_idx;
918  auto entry_count = storage_->query_mem_desc_.getEntryCount();
919  const bool is_rowwise_layout = !storage_->query_mem_desc_.didOutputColumnar();
920  if (fixedup_entry_idx < entry_count) {
921  return {0, fixedup_entry_idx};
922  }
923  fixedup_entry_idx -= entry_count;
924  for (size_t i = 0; i < appended_storage_.size(); ++i) {
925  const auto& desc = appended_storage_[i]->query_mem_desc_;
926  CHECK_NE(is_rowwise_layout, desc.didOutputColumnar());
927  entry_count = desc.getEntryCount();
928  if (fixedup_entry_idx < entry_count) {
929  return {i + 1, fixedup_entry_idx};
930  }
931  fixedup_entry_idx -= entry_count;
932  }
933  UNREACHABLE() << "entry_idx = " << entry_idx << ", query_mem_desc_.getEntryCount() = "
934  << query_mem_desc_.getEntryCount();
935  return {};
936 }
937 
940 
942  auto [stg_idx, fixedup_entry_idx] = getStorageIndex(entry_idx);
943  return {stg_idx ? appended_storage_[stg_idx - 1].get() : storage_.get(),
944  fixedup_entry_idx,
945  stg_idx};
946 }
947 
948 template <typename BUFFER_ITERATOR_TYPE>
950  BUFFER_ITERATOR_TYPE>::materializeCountDistinctColumns() {
951  for (const auto& order_entry : order_entries_) {
952  if (is_distinct_target(result_set_->targets_[order_entry.tle_no - 1])) {
953  count_distinct_materialized_buffers_.emplace_back(
954  materializeCountDistinctColumn(order_entry));
955  }
956  }
957 }
958 
959 namespace {
960 struct IsAggKind {
961  std::vector<TargetInfo> const& targets_;
963  IsAggKind(std::vector<TargetInfo> const& targets, SQLAgg const agg_kind)
964  : targets_(targets), agg_kind_(agg_kind) {}
965  bool operator()(Analyzer::OrderEntry const& order_entry) const {
966  return targets_[order_entry.tle_no - 1].agg_kind == agg_kind_;
967  }
968 };
969 } // namespace
970 
971 template <typename BUFFER_ITERATOR_TYPE>
973  BUFFER_ITERATOR_TYPE>::materializeApproxQuantileColumns() const {
974  ResultSet::ApproxQuantileBuffers approx_quantile_materialized_buffers;
975  for (const auto& order_entry : order_entries_) {
976  if (result_set_->targets_[order_entry.tle_no - 1].agg_kind == kAPPROX_QUANTILE) {
977  approx_quantile_materialized_buffers.emplace_back(
978  materializeApproxQuantileColumn(order_entry));
979  }
980  }
981  return approx_quantile_materialized_buffers;
982 }
983 
984 template <typename BUFFER_ITERATOR_TYPE>
987  ResultSet::ModeBuffers mode_buffers;
988  IsAggKind const is_mode(result_set_->targets_, kMODE);
989  mode_buffers.reserve(
990  std::count_if(order_entries_.begin(), order_entries_.end(), is_mode));
991  for (auto const& order_entry : order_entries_) {
992  if (is_mode(order_entry)) {
993  mode_buffers.emplace_back(materializeModeColumn(order_entry));
994  }
995  }
996  return mode_buffers;
997 }
998 
999 template <typename BUFFER_ITERATOR_TYPE>
1000 std::vector<int64_t>
1002  const Analyzer::OrderEntry& order_entry) const {
1003  const size_t num_storage_entries = result_set_->query_mem_desc_.getEntryCount();
1004  std::vector<int64_t> count_distinct_materialized_buffer(num_storage_entries);
1005  const CountDistinctDescriptor count_distinct_descriptor =
1006  result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.tle_no - 1);
1007  const size_t num_non_empty_entries = permutation_.size();
1008 
1009  const auto work = [&, parent_thread_local_ids = logger::thread_local_ids()](
1010  const size_t start, const size_t end) {
1011  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1012  for (size_t i = start; i < end; ++i) {
1013  const PermutationIdx permuted_idx = permutation_[i];
1014  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1015  const auto storage = storage_lookup_result.storage_ptr;
1016  const auto off = storage_lookup_result.fixedup_entry_idx;
1017  const auto value = buffer_itr_.getColumnInternal(
1018  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1019  count_distinct_materialized_buffer[permuted_idx] =
1020  count_distinct_set_size(value.i1, count_distinct_descriptor);
1021  }
1022  };
1023  // TODO(tlm): Allow use of tbb after we determine how to easily encapsulate the choice
1024  // between thread pool types
1025  if (single_threaded_) {
1026  work(0, num_non_empty_entries);
1027  } else {
1028  threading::task_group thread_pool;
1029  for (auto interval : makeIntervals<size_t>(0, num_non_empty_entries, cpu_threads())) {
1030  thread_pool.run([=] { work(interval.begin, interval.end); });
1031  }
1032  thread_pool.wait();
1033  }
1034  return count_distinct_materialized_buffer;
1035 }
1036 
1038  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1039  CHECK(t_digest);
1040  t_digest->mergeBufferFinal();
1041  double const quantile = t_digest->quantile();
1042  return boost::math::isnan(quantile) ? NULL_DOUBLE : quantile;
1043 }
1044 
1045 template <typename BUFFER_ITERATOR_TYPE>
1046 ResultSet::ApproxQuantileBuffers::value_type
1048  const Analyzer::OrderEntry& order_entry) const {
1049  ResultSet::ApproxQuantileBuffers::value_type materialized_buffer(
1050  result_set_->query_mem_desc_.getEntryCount());
1051  const size_t size = permutation_.size();
1052  const auto work = [&, parent_thread_local_ids = logger::thread_local_ids()](
1053  const size_t start, const size_t end) {
1054  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1055  for (size_t i = start; i < end; ++i) {
1056  const PermutationIdx permuted_idx = permutation_[i];
1057  const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1058  const auto storage = storage_lookup_result.storage_ptr;
1059  const auto off = storage_lookup_result.fixedup_entry_idx;
1060  const auto value = buffer_itr_.getColumnInternal(
1061  storage->buff_, off, order_entry.tle_no - 1, storage_lookup_result);
1062  materialized_buffer[permuted_idx] =
1063  value.i1 ? calculateQuantile(reinterpret_cast<quantile::TDigest*>(value.i1))
1064  : NULL_DOUBLE;
1065  }
1066  };
1067  if (single_threaded_) {
1068  work(0, size);
1069  } else {
1070  threading::task_group thread_pool;
1071  for (auto interval : makeIntervals<size_t>(0, size, cpu_threads())) {
1072  thread_pool.run([=] { work(interval.begin, interval.end); });
1073  }
1074  thread_pool.wait();
1075  }
1076  return materialized_buffer;
1077 }
1078 
1079 namespace {
1080 // i1 is from InternalTargetValue
1081 int64_t materializeMode(int64_t const i1) {
1082  if (auto const* const agg_mode = reinterpret_cast<AggMode const*>(i1)) {
1083  if (std::optional<int64_t> const mode = agg_mode->mode()) {
1084  return *mode;
1085  }
1086  }
1087  return NULL_BIGINT;
1088 }
1089 
1090 using ModeBlockedRange = tbb::blocked_range<size_t>;
1091 } // namespace
1092 
1093 template <typename BUFFER_ITERATOR_TYPE>
1094 struct ResultSet::ResultSetComparator<BUFFER_ITERATOR_TYPE>::ModeScatter {
1098  ResultSet::ModeBuffers::value_type& materialized_buffer_;
1099 
1100  void operator()(ModeBlockedRange const& r) const {
1101  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids_.setNewThreadId();
1102  for (size_t i = r.begin(); i != r.end(); ++i) {
1103  PermutationIdx const permuted_idx = rsc_->permutation_[i];
1104  auto const storage_lookup_result = rsc_->result_set_->findStorage(permuted_idx);
1105  auto const storage = storage_lookup_result.storage_ptr;
1106  auto const off = storage_lookup_result.fixedup_entry_idx;
1107  auto const value = rsc_->buffer_itr_.getColumnInternal(
1108  storage->buff_, off, order_entry_.tle_no - 1, storage_lookup_result);
1109  materialized_buffer_[permuted_idx] = materializeMode(value.i1);
1110  }
1111  }
1112 };
1113 
1114 template <typename BUFFER_ITERATOR_TYPE>
1115 ResultSet::ModeBuffers::value_type
1117  const Analyzer::OrderEntry& order_entry) const {
1118  ResultSet::ModeBuffers::value_type materialized_buffer(
1119  result_set_->query_mem_desc_.getEntryCount());
1120  ModeScatter mode_scatter{
1121  logger::thread_local_ids(), this, order_entry, materialized_buffer};
1122  if (single_threaded_) {
1123  mode_scatter(ModeBlockedRange(0, permutation_.size())); // Still has new thread_id.
1124  } else {
1125  tbb::parallel_for(ModeBlockedRange(0, permutation_.size()), mode_scatter);
1126  }
1127  return materialized_buffer;
1128 }
1129 
1130 template <typename BUFFER_ITERATOR_TYPE>
1132  const PermutationIdx lhs,
1133  const PermutationIdx rhs) const {
1134  // NB: The compare function must define a strict weak ordering, otherwise
1135  // std::sort will trigger a segmentation fault (or corrupt memory).
1136  const auto lhs_storage_lookup_result = result_set_->findStorage(lhs);
1137  const auto rhs_storage_lookup_result = result_set_->findStorage(rhs);
1138  const auto lhs_storage = lhs_storage_lookup_result.storage_ptr;
1139  const auto rhs_storage = rhs_storage_lookup_result.storage_ptr;
1140  const auto fixedup_lhs = lhs_storage_lookup_result.fixedup_entry_idx;
1141  const auto fixedup_rhs = rhs_storage_lookup_result.fixedup_entry_idx;
1142  size_t materialized_count_distinct_buffer_idx{0};
1143  size_t materialized_approx_quantile_buffer_idx{0};
1144  size_t materialized_mode_buffer_idx{0};
1145 
1146  for (const auto& order_entry : order_entries_) {
1147  CHECK_GE(order_entry.tle_no, 1);
1148  // lhs_entry_ti and rhs_entry_ti can differ on comp_param w/ UNION of string dicts.
1149  const auto& lhs_agg_info = lhs_storage->targets_[order_entry.tle_no - 1];
1150  const auto& rhs_agg_info = rhs_storage->targets_[order_entry.tle_no - 1];
1151  const auto lhs_entry_ti = get_compact_type(lhs_agg_info);
1152  const auto rhs_entry_ti = get_compact_type(rhs_agg_info);
1153  // When lhs vs rhs doesn't matter, the lhs is used. For example:
1154  bool float_argument_input = takes_float_argument(lhs_agg_info);
1155  // Need to determine if the float value has been stored as float
1156  // or if it has been compacted to a different (often larger 8 bytes)
1157  // in distributed case the floats are actually 4 bytes
1158  // TODO the above takes_float_argument() is widely used wonder if this problem
1159  // exists elsewhere
1160  if (lhs_entry_ti.get_type() == kFLOAT) {
1161  const auto is_col_lazy =
1162  !result_set_->lazy_fetch_info_.empty() &&
1163  result_set_->lazy_fetch_info_[order_entry.tle_no - 1].is_lazily_fetched;
1164  if (result_set_->query_mem_desc_.getPaddedSlotWidthBytes(order_entry.tle_no - 1) ==
1165  sizeof(float)) {
1166  float_argument_input =
1167  result_set_->query_mem_desc_.didOutputColumnar() ? !is_col_lazy : true;
1168  }
1169  }
1170 
1171  if (UNLIKELY(is_distinct_target(lhs_agg_info))) {
1172  CHECK_LT(materialized_count_distinct_buffer_idx,
1173  count_distinct_materialized_buffers_.size());
1174 
1175  const auto& count_distinct_materialized_buffer =
1176  count_distinct_materialized_buffers_[materialized_count_distinct_buffer_idx];
1177  const auto lhs_sz = count_distinct_materialized_buffer[lhs];
1178  const auto rhs_sz = count_distinct_materialized_buffer[rhs];
1179  ++materialized_count_distinct_buffer_idx;
1180  if (lhs_sz == rhs_sz) {
1181  continue;
1182  }
1183  return (lhs_sz < rhs_sz) != order_entry.is_desc;
1184  } else if (UNLIKELY(lhs_agg_info.agg_kind == kAPPROX_QUANTILE)) {
1185  CHECK_LT(materialized_approx_quantile_buffer_idx,
1186  approx_quantile_materialized_buffers_.size());
1187  const auto& approx_quantile_materialized_buffer =
1188  approx_quantile_materialized_buffers_[materialized_approx_quantile_buffer_idx];
1189  const auto lhs_value = approx_quantile_materialized_buffer[lhs];
1190  const auto rhs_value = approx_quantile_materialized_buffer[rhs];
1191  ++materialized_approx_quantile_buffer_idx;
1192  if (lhs_value == rhs_value) {
1193  continue;
1194  } else if (!lhs_entry_ti.get_notnull()) {
1195  if (lhs_value == NULL_DOUBLE) {
1196  return order_entry.nulls_first;
1197  } else if (rhs_value == NULL_DOUBLE) {
1198  return !order_entry.nulls_first;
1199  }
1200  }
1201  return (lhs_value < rhs_value) != order_entry.is_desc;
1202  } else if (UNLIKELY(lhs_agg_info.agg_kind == kMODE)) {
1203  CHECK_LT(materialized_mode_buffer_idx, mode_buffers_.size());
1204  auto const& mode_buffer = mode_buffers_[materialized_mode_buffer_idx++];
1205  int64_t const lhs_value = mode_buffer[lhs];
1206  int64_t const rhs_value = mode_buffer[rhs];
1207  if (lhs_value == rhs_value) {
1208  continue;
1209  // MODE(x) can only be NULL when the group is empty, since it skips null values.
1210  } else if (lhs_value == NULL_BIGINT) { // NULL_BIGINT from materializeMode()
1211  return order_entry.nulls_first;
1212  } else if (rhs_value == NULL_BIGINT) {
1213  return !order_entry.nulls_first;
1214  } else {
1215  return result_set_->isLessThan(lhs_entry_ti, lhs_value, rhs_value) !=
1216  order_entry.is_desc;
1217  }
1218  }
1219 
1220  const auto lhs_v = buffer_itr_.getColumnInternal(lhs_storage->buff_,
1221  fixedup_lhs,
1222  order_entry.tle_no - 1,
1223  lhs_storage_lookup_result);
1224  const auto rhs_v = buffer_itr_.getColumnInternal(rhs_storage->buff_,
1225  fixedup_rhs,
1226  order_entry.tle_no - 1,
1227  rhs_storage_lookup_result);
1228 
1229  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1230  isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1231  continue;
1232  }
1233  if (UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1234  !isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1235  return order_entry.nulls_first;
1236  }
1237  if (UNLIKELY(isNull(rhs_entry_ti, rhs_v, float_argument_input) &&
1238  !isNull(lhs_entry_ti, lhs_v, float_argument_input))) {
1239  return !order_entry.nulls_first;
1240  }
1241 
1242  if (LIKELY(lhs_v.isInt())) {
1243  CHECK(rhs_v.isInt());
1244  if (UNLIKELY(lhs_entry_ti.is_string() &&
1245  lhs_entry_ti.get_compression() == kENCODING_DICT)) {
1246  CHECK_EQ(4, lhs_entry_ti.get_logical_size());
1247  CHECK(executor_);
1248  const auto lhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1249  lhs_entry_ti.get_comp_param(), result_set_->row_set_mem_owner_, false);
1250  const auto rhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1251  rhs_entry_ti.get_comp_param(), result_set_->row_set_mem_owner_, false);
1252  const auto lhs_str = lhs_string_dict_proxy->getString(lhs_v.i1);
1253  const auto rhs_str = rhs_string_dict_proxy->getString(rhs_v.i1);
1254  if (lhs_str == rhs_str) {
1255  continue;
1256  }
1257  return (lhs_str < rhs_str) != order_entry.is_desc;
1258  }
1259 
1260  if (lhs_v.i1 == rhs_v.i1) {
1261  continue;
1262  }
1263  if (lhs_entry_ti.is_fp()) {
1264  if (float_argument_input) {
1265  const auto lhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&lhs_v.i1));
1266  const auto rhs_dval = *reinterpret_cast<const float*>(may_alias_ptr(&rhs_v.i1));
1267  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1268  } else {
1269  const auto lhs_dval =
1270  *reinterpret_cast<const double*>(may_alias_ptr(&lhs_v.i1));
1271  const auto rhs_dval =
1272  *reinterpret_cast<const double*>(may_alias_ptr(&rhs_v.i1));
1273  return (lhs_dval < rhs_dval) != order_entry.is_desc;
1274  }
1275  }
1276  return (lhs_v.i1 < rhs_v.i1) != order_entry.is_desc;
1277  } else {
1278  if (lhs_v.isPair()) {
1279  CHECK(rhs_v.isPair());
1280  const auto lhs =
1281  pair_to_double({lhs_v.i1, lhs_v.i2}, lhs_entry_ti, float_argument_input);
1282  const auto rhs =
1283  pair_to_double({rhs_v.i1, rhs_v.i2}, rhs_entry_ti, float_argument_input);
1284  if (lhs == rhs) {
1285  continue;
1286  }
1287  return (lhs < rhs) != order_entry.is_desc;
1288  } else {
1289  CHECK(lhs_v.isStr() && rhs_v.isStr());
1290  const auto lhs = lhs_v.strVal();
1291  const auto rhs = rhs_v.strVal();
1292  if (lhs == rhs) {
1293  continue;
1294  }
1295  return (lhs < rhs) != order_entry.is_desc;
1296  }
1297  }
1298  }
1299  return false;
1300 }
1301 
1302 // Partial sort permutation into top(least by compare) n elements.
1303 // If permutation.size() <= n then sort entire permutation by compare.
1304 // Return PermutationView with new size() = min(n, permutation.size()).
1306  const size_t n,
1307  const Comparator& compare) {
1308  auto timer = DEBUG_TIMER(__func__);
1309  if (n < permutation.size()) {
1310  std::partial_sort(
1311  permutation.begin(), permutation.begin() + n, permutation.end(), compare);
1312  permutation.resize(n);
1313  } else {
1314  std::sort(permutation.begin(), permutation.end(), compare);
1315  }
1316  return permutation;
1317 }
1318 
1320  const std::list<Analyzer::OrderEntry>& order_entries) const {
1321  auto timer = DEBUG_TIMER(__func__);
1322  auto data_mgr = &catalog_->getDataMgr();
1323  const int device_id{0};
1324  auto allocator = std::make_unique<CudaAllocator>(
1325  data_mgr, device_id, getQueryEngineCudaStreamForDevice(device_id));
1326  CHECK_GT(block_size_, 0);
1327  CHECK_GT(grid_size_, 0);
1328  std::vector<int64_t*> group_by_buffers(block_size_);
1329  group_by_buffers[0] = reinterpret_cast<int64_t*>(storage_->getUnderlyingBuffer());
1330  auto dev_group_by_buffers =
1331  create_dev_group_by_buffers(allocator.get(),
1332  group_by_buffers,
1333  query_mem_desc_,
1334  block_size_,
1335  grid_size_,
1336  device_id,
1338  /*num_input_rows=*/-1,
1339  /*prepend_index_buffer=*/true,
1340  /*always_init_group_by_on_host=*/true,
1341  /*use_bump_allocator=*/false,
1342  /*has_varlen_output=*/false,
1343  /*insitu_allocator*=*/nullptr);
1345  order_entries, query_mem_desc_, dev_group_by_buffers, data_mgr, device_id);
1347  *allocator,
1348  group_by_buffers,
1349  query_mem_desc_.getBufferSizeBytes(ExecutorDeviceType::GPU),
1350  dev_group_by_buffers.data,
1351  query_mem_desc_,
1352  block_size_,
1353  grid_size_,
1354  device_id,
1355  /*use_bump_allocator=*/false,
1356  /*has_varlen_output=*/false);
1357 }
1358 
1360  const std::list<Analyzer::OrderEntry>& order_entries) const {
1361  auto timer = DEBUG_TIMER(__func__);
1362  CHECK(!query_mem_desc_.hasKeylessHash());
1363  std::vector<int64_t> tmp_buff(query_mem_desc_.getEntryCount());
1364  std::vector<int32_t> idx_buff(query_mem_desc_.getEntryCount());
1365  CHECK_EQ(size_t(1), order_entries.size());
1366  auto buffer_ptr = storage_->getUnderlyingBuffer();
1367  for (const auto& order_entry : order_entries) {
1368  const auto target_idx = order_entry.tle_no - 1;
1369  const auto sortkey_val_buff = reinterpret_cast<int64_t*>(
1370  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1371  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1372  sort_groups_cpu(sortkey_val_buff,
1373  &idx_buff[0],
1374  query_mem_desc_.getEntryCount(),
1375  order_entry.is_desc,
1376  chosen_bytes);
1377  apply_permutation_cpu(reinterpret_cast<int64_t*>(buffer_ptr),
1378  &idx_buff[0],
1379  query_mem_desc_.getEntryCount(),
1380  &tmp_buff[0],
1381  sizeof(int64_t));
1382  for (size_t target_idx = 0; target_idx < query_mem_desc_.getSlotCount();
1383  ++target_idx) {
1384  if (static_cast<int>(target_idx) == order_entry.tle_no - 1) {
1385  continue;
1386  }
1387  const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1388  const auto satellite_val_buff = reinterpret_cast<int64_t*>(
1389  buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1390  apply_permutation_cpu(satellite_val_buff,
1391  &idx_buff[0],
1392  query_mem_desc_.getEntryCount(),
1393  &tmp_buff[0],
1394  chosen_bytes);
1395  }
1396  }
1397 }
1398 
1399 size_t ResultSet::getLimit() const {
1400  return keep_first_;
1401 }
1402 
1403 const std::vector<std::string> ResultSet::getStringDictionaryPayloadCopy(
1404  const int dict_id) const {
1405  const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1406  dict_id, /*with_generation=*/true, catalog_);
1407  CHECK(sdp);
1408  return sdp->getDictionary()->copyStrings();
1409 }
1410 
1411 const std::pair<std::vector<int32_t>, std::vector<std::string>>
1413  const auto col_type_info = getColType(col_idx);
1414  std::unordered_set<int32_t> unique_string_ids_set;
1415  const size_t num_entries = entryCount();
1416  std::vector<bool> targets_to_skip(colCount(), true);
1417  targets_to_skip[col_idx] = false;
1418  CHECK(col_type_info.is_dict_encoded_type()); // Array<Text> or Text
1419  const int64_t null_val = inline_fixed_encoding_null_val(
1420  col_type_info.is_array() ? col_type_info.get_elem_type() : col_type_info);
1421 
1422  for (size_t row_idx = 0; row_idx < num_entries; ++row_idx) {
1423  const auto result_row = getRowAtNoTranslations(row_idx, targets_to_skip);
1424  if (!result_row.empty()) {
1425  if (const auto scalar_col_val =
1426  boost::get<ScalarTargetValue>(&result_row[col_idx])) {
1427  const int32_t string_id =
1428  static_cast<int32_t>(boost::get<int64_t>(*scalar_col_val));
1429  if (string_id != null_val) {
1430  unique_string_ids_set.emplace(string_id);
1431  }
1432  } else if (const auto array_col_val =
1433  boost::get<ArrayTargetValue>(&result_row[col_idx])) {
1434  if (*array_col_val) {
1435  for (const ScalarTargetValue& scalar : array_col_val->value()) {
1436  const int32_t string_id = static_cast<int32_t>(boost::get<int64_t>(scalar));
1437  if (string_id != null_val) {
1438  unique_string_ids_set.emplace(string_id);
1439  }
1440  }
1441  }
1442  }
1443  }
1444  }
1445 
1446  const size_t num_unique_strings = unique_string_ids_set.size();
1447  std::vector<int32_t> unique_string_ids(num_unique_strings);
1448  size_t string_idx{0};
1449  for (const auto unique_string_id : unique_string_ids_set) {
1450  unique_string_ids[string_idx++] = unique_string_id;
1451  }
1452 
1453  const int32_t dict_id = col_type_info.get_comp_param();
1454  const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1455  dict_id, /*with_generation=*/true, catalog_);
1456  CHECK(sdp);
1457 
1458  return std::make_pair(unique_string_ids, sdp->getStrings(unique_string_ids));
1459 }
1460 
1470  return false;
1471  } else if (query_mem_desc_.didOutputColumnar()) {
1472  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1474  query_mem_desc_.getQueryDescriptionType() ==
1476  query_mem_desc_.getQueryDescriptionType() ==
1478  query_mem_desc_.getQueryDescriptionType() ==
1480  } else {
1481  CHECK(!(query_mem_desc_.getQueryDescriptionType() ==
1483  return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1485  query_mem_desc_.getQueryDescriptionType() ==
1487  }
1488 }
1489 
1491  return query_mem_desc_.didOutputColumnar() &&
1492  (query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection ||
1493  query_mem_desc_.getQueryDescriptionType() ==
1495  appended_storage_.empty() && storage_ &&
1496  (lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
1497 }
1498 
1499 const int8_t* ResultSet::getColumnarBuffer(size_t column_idx) const {
1500  CHECK(isZeroCopyColumnarConversionPossible(column_idx));
1501  return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(column_idx);
1502 }
1503 
1504 // returns a bitmap (and total number) of all single slot targets
1505 std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
1506  std::vector<bool> target_bitmap(targets_.size(), true);
1507  size_t num_single_slot_targets = 0;
1508  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1509  const auto& sql_type = targets_[target_idx].sql_type;
1510  if (targets_[target_idx].is_agg && targets_[target_idx].agg_kind == kAVG) {
1511  target_bitmap[target_idx] = false;
1512  } else if (sql_type.is_varlen()) {
1513  target_bitmap[target_idx] = false;
1514  } else {
1515  num_single_slot_targets++;
1516  }
1517  }
1518  return std::make_tuple(std::move(target_bitmap), num_single_slot_targets);
1519 }
1520 
1529 std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
1530  const {
1531  CHECK(isDirectColumnarConversionPossible());
1532  auto [single_slot_targets, num_single_slot_targets] = getSingleSlotTargetBitmap();
1533 
1534  for (size_t target_idx = 0; target_idx < single_slot_targets.size(); target_idx++) {
1535  const auto& target = targets_[target_idx];
1536  if (single_slot_targets[target_idx] &&
1537  (is_distinct_target(target) ||
1538  shared::is_any<kAPPROX_QUANTILE, kMODE>(target.agg_kind) ||
1539  (target.is_agg && target.agg_kind == kSAMPLE && target.sql_type == kFLOAT))) {
1540  single_slot_targets[target_idx] = false;
1541  num_single_slot_targets--;
1542  }
1543  }
1544  CHECK_GE(num_single_slot_targets, size_t(0));
1545  return std::make_tuple(std::move(single_slot_targets), num_single_slot_targets);
1546 }
1547 
1548 // returns the starting slot index for all targets in the result set
1549 std::vector<size_t> ResultSet::getSlotIndicesForTargetIndices() const {
1550  std::vector<size_t> slot_indices(targets_.size(), 0);
1551  size_t slot_index = 0;
1552  for (size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1553  slot_indices[target_idx] = slot_index;
1554  slot_index = advance_slot(slot_index, targets_[target_idx], false);
1555  }
1556  return slot_indices;
1557 }
1558 
1559 // namespace result_set
1560 
1561 bool result_set::can_use_parallel_algorithms(const ResultSet& rows) {
1562  return !rows.isTruncated();
1563 }
1564 
1565 namespace {
1567  bool operator()(TargetInfo const& target_info) const {
1568  return target_info.sql_type.is_dict_encoded_string();
1569  }
1570 };
1571 } // namespace
1572 
1574  std::vector<TargetInfo> const& targets) {
1575  auto const itr = std::find_if(targets.begin(), targets.end(), IsDictEncodedStr{});
1576  return itr == targets.end() ? std::nullopt
1577  : std::make_optional<size_t>(itr - targets.begin());
1578 }
1579 
1580 bool result_set::use_parallel_algorithms(const ResultSet& rows) {
1581  return result_set::can_use_parallel_algorithms(rows) && rows.entryCount() >= 20000;
1582 }
bool is_agg(const Analyzer::Expr *expr)
catalog_(nullptr)
void syncEstimatorBuffer() const
Definition: ResultSet.cpp:698
SQLAgg
Definition: sqldefs.h:73
#define CHECK_EQ(x, y)
Definition: Logger.h:297
const QueryMemoryDescriptor & getQueryMemDesc() const
Definition: ResultSet.cpp:674
void sort_groups_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:27
GpuGroupByBuffers create_dev_group_by_buffers(DeviceAllocator *device_allocator, const std::vector< int64_t * > &group_by_buffers, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const ExecutorDispatchMode dispatch_mode, const int64_t num_input_rows, const bool prepend_index_buffer, const bool always_init_group_by_on_host, const bool use_bump_allocator, const bool has_varlen_output, Allocator *insitu_allocator)
Definition: GpuMemUtils.cpp:70
size_t g_parallel_top_max
Definition: ResultSet.cpp:50
std::pair< size_t, size_t > getStorageIndex(const size_t entry_idx) const
Definition: ResultSet.cpp:916
#define NULL_DOUBLE
DEVICE void push_back(T const &value)
Definition: VectorView.h:73
bool isValidationOnlyRes() const
Definition: ResultSet.cpp:750
void setValidationOnlyRes()
Definition: ResultSet.cpp:746
PermutationView initPermutationBuffer(PermutationView permutation, PermutationIdx const begin, PermutationIdx const end) const
Definition: ResultSet.cpp:848
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:132
bool g_enable_direct_columnarization
Definition: Execute.cpp:122
ExecutorDeviceType
void moveToBegin() const
Definition: ResultSet.cpp:733
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
#define NULL_BIGINT
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
#define LOG(tag)
Definition: Logger.h:283
ResultSet::ResultSetComparator< BUFFER_ITERATOR_TYPE > const *const rsc_
Definition: ResultSet.cpp:1096
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
DEVICE RealType quantile(VectorView< IndexType const > const partial_sum, RealType const q) const
Definition: quantile.h:827
static const size_t baseline_threshold
Definition: Execute.h:1370
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
Definition: ResultSet.cpp:64
int tle_no
Definition: Analyzer.h:2469
#define UNREACHABLE()
Definition: Logger.h:333
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
const std::vector< TargetInfo > & getTargetInfos() const
Definition: ResultSet.cpp:679
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
Definition: ResultSet.cpp:1573
#define CHECK_GE(x, y)
Definition: Logger.h:302
void setKernelQueueTime(const int64_t kernel_queue_time)
Definition: ResultSet.cpp:716
size_t rowCount(const bool force_parallel=false) const
Returns the number of valid entries in the result set (i.e that will be returned from the SQL query o...
Definition: ResultSet.cpp:595
std::shared_ptr< ResultSet > ResultSetPtr
CellCallback(StringDictionaryProxy::IdMap &&id_map, int64_t const null_int)
Definition: ResultSet.cpp:440
int64_t read_int_from_buff(const int8_t *ptr, const int8_t compact_sz)
void keepFirstN(const size_t n)
Definition: ResultSet.cpp:54
size_t g_streaming_topn_max
Definition: ResultSet.cpp:51
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
void addCompilationQueueTime(const int64_t compilation_queue_time)
Definition: ResultSet.cpp:720
std::vector< std::vector< double >> ApproxQuantileBuffers
Definition: ResultSet.h:821
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:102
void parallelTop(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
Definition: ResultSet.cpp:868
size_t colCount() const
Definition: ResultSet.cpp:415
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
void apply_permutation_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, int64_t *tmp_buff, const uint32_t chosen_bytes)
Definition: InPlaceSort.cpp:46
DEVICE void resize(size_type const size)
Definition: VectorView.h:74
#define CHECK_GT(x, y)
Definition: Logger.h:301
size_t getLimit() const
Definition: ResultSet.cpp:1399
std::vector< int64_t > materializeCountDistinctColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1001
ApproxQuantileBuffers::value_type materializeApproxQuantileColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1047
bool isTruncated() const
Definition: ResultSet.cpp:738
size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset)
Definition: ResultSet.cpp:541
size_t parallelRowCount() const
Definition: ResultSet.cpp:631
DEVICE void mergeBufferFinal()
Definition: quantile.h:651
void radixSortOnCpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1359
const SQLTypeInfo get_compact_type(const TargetInfo &target)
bool definitelyHasNoRows() const
Definition: ResultSet.cpp:670
const std::vector< std::string > getStringDictionaryPayloadCopy(const int dict_id) const
Definition: ResultSet.cpp:1403
bool use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1580
bool isZeroCopyColumnarConversionPossible(size_t column_idx) const
Definition: ResultSet.cpp:1490
size_t g_parallel_top_min
Definition: ResultSet.cpp:49
int8_t * getHostEstimatorBuffer() const
Definition: ResultSet.cpp:694
DEVICE size_type size() const
Definition: VectorView.h:83
void invalidateCachedRowCount() const
Definition: ResultSet.cpp:607
IsAggKind(std::vector< TargetInfo > const &targets, SQLAgg const agg_kind)
Definition: ResultSet.cpp:963
const ResultSetStorage * allocateStorage() const
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
bool operator()(TargetInfo const &target_info) const
Definition: ResultSet.cpp:1567
void sort(const std::list< Analyzer::OrderEntry > &order_entries, size_t top_n, const Executor *executor)
Definition: ResultSet.cpp:770
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
void setQueueTime(const int64_t queue_time)
Definition: ResultSet.cpp:712
#define CHECK_NE(x, y)
Definition: Logger.h:298
void dropFirstN(const size_t n)
Definition: ResultSet.cpp:59
DEVICE T * begin() const
Definition: VectorView.h:59
std::vector< PermutationIdx > Permutation
Definition: ResultSet.h:153
std::tuple< std::vector< bool >, size_t > getSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1505
bool g_enable_watchdog
#define LIKELY(x)
Definition: likely.h:24
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
StorageLookupResult findStorage(const size_t entry_idx) const
Definition: ResultSet.cpp:941
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:98
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
ResultSetPtr copy()
Definition: ResultSet.cpp:331
std::function< bool(const PermutationIdx, const PermutationIdx)> Comparator
Definition: ResultSet.h:155
bool g_enable_smem_group_by true
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
static double calculateQuantile(quantile::TDigest *const t_digest)
Definition: ResultSet.cpp:1037
std::vector< TargetInfo > const & targets_
Definition: ResultSet.cpp:961
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
void radixSortOnGpu(const std::list< Analyzer::OrderEntry > &order_entries) const
Definition: ResultSet.cpp:1319
const ResultSetStorage * getStorage() const
Definition: ResultSet.cpp:411
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
const std::pair< std::vector< int32_t >, std::vector< std::string > > getUniqueStringsForDictEncodedTargetCol(const size_t col_idx) const
Definition: ResultSet.cpp:1412
int64_t getQueueTime() const
Definition: ResultSet.cpp:724
#define UNLIKELY(x)
Definition: likely.h:25
ModeBuffers::value_type materializeModeColumn(const Analyzer::OrderEntry &order_entry) const
Definition: ResultSet.cpp:1116
uint32_t PermutationIdx
Definition: ResultSet.h:152
#define CHECK_LT(x, y)
Definition: Logger.h:299
Definition: sqltypes.h:67
SQLTypeInfo getColType(const size_t col_idx) const
Definition: ResultSet.cpp:419
std::tuple< std::vector< bool >, size_t > getSupportedSingleSlotTargetBitmap() const
Definition: ResultSet.cpp:1529
ExecutorDeviceType getDeviceType() const
Definition: ResultSet.cpp:252
StringDictionaryProxy * getStringDictionaryProxy(int const dict_id) const
Definition: ResultSet.cpp:428
const int8_t * getColumnarBuffer(size_t column_idx) const
Definition: ResultSet.cpp:1499
bool isExplain() const
Definition: ResultSet.cpp:742
void eachCellInColumn(RowIterationState &, CellCallback const &)
Definition: ResultSet.cpp:487
StringDictionaryProxy::IdMap const id_map_
Definition: ResultSet.cpp:436
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
Definition: ResultSet.cpp:758
CUstream getQueryEngineCudaStreamForDevice(int device_num)
Definition: QueryEngine.cpp:7
size_t rowCountImpl(const bool force_parallel) const
Definition: ResultSet.cpp:557
void baselineSort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
const Permutation & getPermutationBuffer() const
Definition: ResultSet.cpp:864
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void append(ResultSet &that)
Definition: ResultSet.cpp:301
std::string summaryToString() const
Definition: ResultSet.cpp:220
data_mgr_(data_mgr)
static PermutationView topPermutation(PermutationView, const size_t n, const Comparator &)
Definition: ResultSet.cpp:1305
size_t getCurrentRowBufferIndex() const
Definition: ResultSet.cpp:293
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:289
#define DEBUG_TIMER(name)
Definition: Logger.h:407
int8_t * getDeviceEstimatorBuffer() const
Definition: ResultSet.cpp:688
int64_t materializeMode(int64_t const i1)
Definition: ResultSet.cpp:1081
tbb::blocked_range< size_t > ModeBlockedRange
Definition: ResultSet.cpp:1090
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool operator()(const PermutationIdx lhs, const PermutationIdx rhs) const
Definition: ResultSet.cpp:1131
Basic constructors and methods of the row set interface.
bool operator()(Analyzer::OrderEntry const &order_entry) const
Definition: ResultSet.cpp:965
bool isEmpty() const
Returns a boolean signifying whether there are valid entries in the result set.
Definition: ResultSet.cpp:651
bool is_dict_encoded_string() const
Definition: sqltypes.h:628
const std::vector< int64_t > & getTargetInitVals() const
Definition: ResultSet.cpp:683
std::vector< size_t > getSlotIndicesForTargetIndices() const
Definition: ResultSet.cpp:1549
Allocate GPU memory using GpuBuffers via DataMgr.
constexpr double n
Definition: Utm.h:38
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
constexpr int64_t uninitialized_cached_row_count
Definition: ResultSet.cpp:52
Definition: Analyzer.h:2464
int cpu_threads()
Definition: thread_count.h:25
void operator()(ModeBlockedRange const &r) const
Definition: ResultSet.cpp:1100
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:74
void translateDictEncodedColumns(std::vector< TargetInfo > const &, size_t const start_idx)
Definition: ResultSet.cpp:457
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void copy_group_by_buffers_from_gpu(DeviceAllocator &device_allocator, const std::vector< int64_t * > &group_by_buffers, const size_t groups_buffer_size, const int8_t *group_by_dev_buffers_mem, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const bool prepend_index_buffer, const bool has_varlen_output)
void operator()(int8_t const *const cell_ptr) const
Definition: ResultSet.cpp:442
bool can_use_parallel_algorithms(const ResultSet &rows)
Definition: ResultSet.cpp:1561
int64_t getRenderTime() const
Definition: ResultSet.cpp:729
void setCachedRowCount(const size_t row_count) const
Definition: ResultSet.cpp:611
bool isDirectColumnarConversionPossible() const
Definition: ResultSet.cpp:1468
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:83
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:873
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ModeBuffers materializeModeColumns() const
Definition: ResultSet.cpp:986
ResultSet::ModeBuffers::value_type & materialized_buffer_
Definition: ResultSet.cpp:1098
size_t binSearchRowCount() const
Definition: ResultSet.cpp:618
int getDeviceId() const
Definition: ResultSet.cpp:754
std::vector< std::vector< int64_t >> ModeBuffers
Definition: ResultSet.h:822
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
Definition: TargetValue.h:180
DEVICE T * end() const
Definition: VectorView.h:67