53 invalidateCachedRowCount();
58 invalidateCachedRowCount();
65 const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
67 const unsigned block_size,
68 const unsigned grid_size)
70 , device_type_(device_type)
72 , query_mem_desc_(query_mem_desc)
73 , crt_row_buff_idx_(0)
77 , row_set_mem_owner_(row_set_mem_owner)
79 , block_size_(block_size)
80 , grid_size_(grid_size)
82 , separate_varlen_storage_valid_(
false)
83 , just_explain_(
false)
84 , for_validation_only_(
false)
90 , can_use_speculative_top_n_sort(std::nullopt) {}
93 const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
94 const std::vector<std::vector<const int8_t*>>& col_buffers,
95 const std::vector<std::vector<int64_t>>& frag_offsets,
96 const std::vector<int64_t>& consistent_frag_sizes,
100 const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
102 const unsigned block_size,
103 const unsigned grid_size)
105 , device_type_(device_type)
106 , device_id_(device_id)
107 , query_mem_desc_(query_mem_desc)
108 , crt_row_buff_idx_(0)
112 , row_set_mem_owner_(row_set_mem_owner)
114 , block_size_(block_size)
115 , grid_size_(grid_size)
116 , lazy_fetch_info_(lazy_fetch_info)
117 , col_buffers_{col_buffers}
118 , frag_offsets_{frag_offsets}
119 , consistent_frag_sizes_{consistent_frag_sizes}
121 , separate_varlen_storage_valid_(
false)
122 , just_explain_(
false)
123 , for_validation_only_(
false)
125 , geo_return_type_(GeoReturnType::WktString)
127 , query_exec_time_(0)
129 , can_use_speculative_top_n_sort(std::nullopt) {}
135 : device_type_(device_type)
136 , device_id_(device_id)
138 , crt_row_buff_idx_(0)
139 , estimator_(estimator)
141 , separate_varlen_storage_valid_(
false)
142 , just_explain_(
false)
143 , for_validation_only_(
false)
145 , geo_return_type_(GeoReturnType::WktString)
147 , query_exec_time_(0)
149 , can_use_speculative_top_n_sort(std::nullopt) {
152 data_mgr_, estimator_->getBufferSize(), device_id_);
153 data_mgr->getCudaMgr()->zeroDeviceMem(device_estimator_buffer_->getMemoryPtr(),
154 estimator_->getBufferSize(),
158 host_estimator_buffer_ =
159 static_cast<int8_t*
>(
checked_calloc(estimator_->getBufferSize(), 1));
167 , separate_varlen_storage_valid_(
false)
168 , explanation_(explanation)
169 , just_explain_(
true)
170 , for_validation_only_(
false)
174 , query_exec_time_(0)
176 , can_use_speculative_top_n_sort(std::nullopt) {}
179 int64_t render_time_ms,
180 const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
184 , row_set_mem_owner_(row_set_mem_owner)
186 , separate_varlen_storage_valid_(
false)
187 , just_explain_(
true)
188 , for_validation_only_(
false)
190 , geo_return_type_(GeoReturnType::WktString)
192 , query_exec_time_(0)
194 , can_use_speculative_top_n_sort(std::nullopt) {}
198 if (!storage_->buff_is_provided_) {
199 CHECK(storage_->getUnderlyingBuffer());
200 free(storage_->getUnderlyingBuffer());
203 for (
auto& storage : appended_storage_) {
204 if (storage && !storage->buff_is_provided_) {
205 free(storage->getUnderlyingBuffer());
208 if (host_estimator_buffer_) {
210 free(host_estimator_buffer_);
212 if (device_estimator_buffer_) {
214 data_mgr_->free(device_estimator_buffer_);
219 std::ostringstream oss;
220 oss <<
"Result Set Info" << std::endl;
221 oss <<
"\tLayout: " << query_mem_desc_.queryDescTypeToString() << std::endl;
222 oss <<
"\tColumns: " << colCount() << std::endl;
223 oss <<
"\tRows: " << rowCount() << std::endl;
224 oss <<
"\tEntry count: " << entryCount() << std::endl;
225 const std::string is_empty = isEmpty() ?
"True" :
"False";
226 oss <<
"\tIs empty: " << is_empty << std::endl;
227 const std::string did_output_columnar = didOutputColumnar() ?
"True" :
"False;";
228 oss <<
"\tColumnar: " << did_output_columnar << std::endl;
229 oss <<
"\tLazy-fetched columns: " << getNumColumnsLazyFetched() << std::endl;
230 const std::string is_direct_columnar_conversion_possible =
231 isDirectColumnarConversionPossible() ?
"True" :
"False";
232 oss <<
"\tDirect columnar conversion possible: "
233 << is_direct_columnar_conversion_possible << std::endl;
235 size_t num_columns_zero_copy_columnarizable{0};
236 for (
size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
237 if (isZeroCopyColumnarConversionPossible(target_idx)) {
238 num_columns_zero_copy_columnarizable++;
241 oss <<
"\tZero-copy columnar conversion columns: "
242 << num_columns_zero_copy_columnarizable << std::endl;
244 oss <<
"\tPermutation size: " << permutation_.size() << std::endl;
245 oss <<
"\tLimit: " << keep_first_ << std::endl;
246 oss <<
"\tOffset: " << drop_first_ << std::endl;
256 CHECK(row_set_mem_owner_);
257 auto buff = row_set_mem_owner_->allocate(
258 query_mem_desc_.getBufferSizeBytes(device_type_), 0);
261 return storage_.get();
266 const std::vector<int64_t>& target_init_vals,
267 std::shared_ptr<VarlenOutputInfo> varlen_output_info)
const {
270 storage_.reset(
new ResultSetStorage(targets_, query_mem_desc_, buff,
true));
272 storage_->target_init_vals_ = target_init_vals;
273 if (varlen_output_info) {
274 storage_->varlen_output_info_ = varlen_output_info;
276 return storage_.get();
280 const std::vector<int64_t>& target_init_vals)
const {
282 CHECK(row_set_mem_owner_);
283 auto buff = row_set_mem_owner_->allocate(
284 query_mem_desc_.getBufferSizeBytes(device_type_), 0);
287 storage_->target_init_vals_ = target_init_vals;
288 return storage_.get();
292 if (crt_row_buff_idx_ == 0) {
293 throw std::runtime_error(
"current row buffer iteration index is undefined");
295 return crt_row_buff_idx_ - 1;
300 invalidateCachedRowCount();
301 if (!that.storage_) {
304 appended_storage_.push_back(std::move(that.storage_));
305 query_mem_desc_.setEntryCount(
306 query_mem_desc_.getEntryCount() +
307 appended_storage_.back()->query_mem_desc_.getEntryCount());
308 chunks_.insert(chunks_.end(), that.chunks_.begin(), that.chunks_.end());
310 col_buffers_.end(), that.col_buffers_.begin(), that.col_buffers_.end());
311 frag_offsets_.insert(
312 frag_offsets_.end(), that.frag_offsets_.begin(), that.frag_offsets_.end());
313 consistent_frag_sizes_.insert(consistent_frag_sizes_.end(),
314 that.consistent_frag_sizes_.begin(),
315 that.consistent_frag_sizes_.end());
317 chunk_iters_.end(), that.chunk_iters_.begin(), that.chunk_iters_.end());
318 if (separate_varlen_storage_valid_) {
319 CHECK(that.separate_varlen_storage_valid_);
320 serialized_varlen_buffer_.insert(serialized_varlen_buffer_.end(),
321 that.serialized_varlen_buffer_.begin(),
322 that.serialized_varlen_buffer_.end());
324 for (
auto& buff : that.literal_buffers_) {
325 literal_buffers_.push_back(std::move(buff));
335 auto executor = getExecutor();
337 ResultSetPtr copied_rs = std::make_shared<ResultSet>(targets_,
341 executor->getCatalog(),
342 executor->blockSize(),
343 executor->gridSize());
345 auto allocate_and_copy_storage =
346 [&](
const ResultSetStorage* prev_storage) -> std::unique_ptr<ResultSetStorage> {
347 const auto& prev_qmd = prev_storage->query_mem_desc_;
348 const auto storage_size = prev_qmd.getBufferSizeBytes(device_type_);
349 auto buff = row_set_mem_owner_->allocate(storage_size, 0);
350 std::unique_ptr<ResultSetStorage> new_storage;
352 prev_storage->targets_, prev_qmd, buff,
true));
353 new_storage->target_init_vals_ = prev_storage->target_init_vals_;
354 if (prev_storage->varlen_output_info_) {
355 new_storage->varlen_output_info_ = prev_storage->varlen_output_info_;
357 memcpy(new_storage->buff_, prev_storage->buff_, storage_size);
358 new_storage->query_mem_desc_ = prev_qmd;
362 copied_rs->storage_ = allocate_and_copy_storage(storage_.get());
363 if (!appended_storage_.empty()) {
364 for (
const auto& storage : appended_storage_) {
365 copied_rs->appended_storage_.push_back(allocate_and_copy_storage(storage.get()));
368 std::copy(chunks_.begin(), chunks_.end(), std::back_inserter(copied_rs->chunks_));
371 std::back_inserter(copied_rs->chunk_iters_));
374 std::back_inserter(copied_rs->col_buffers_));
377 std::back_inserter(copied_rs->frag_offsets_));
378 std::copy(consistent_frag_sizes_.begin(),
379 consistent_frag_sizes_.end(),
380 std::back_inserter(copied_rs->consistent_frag_sizes_));
381 if (separate_varlen_storage_valid_) {
382 std::copy(serialized_varlen_buffer_.begin(),
383 serialized_varlen_buffer_.end(),
384 std::back_inserter(copied_rs->serialized_varlen_buffer_));
387 literal_buffers_.end(),
388 std::back_inserter(copied_rs->literal_buffers_));
390 lazy_fetch_info_.end(),
391 std::back_inserter(copied_rs->lazy_fetch_info_));
393 copied_rs->permutation_ = permutation_;
394 copied_rs->drop_first_ = drop_first_;
395 copied_rs->keep_first_ = keep_first_;
396 copied_rs->separate_varlen_storage_valid_ = separate_varlen_storage_valid_;
397 copied_rs->query_exec_time_ = query_exec_time_;
398 copied_rs->input_table_keys_ = input_table_keys_;
399 copied_rs->target_meta_info_ = target_meta_info_;
400 copied_rs->geo_return_type_ = geo_return_type_;
401 copied_rs->query_plan_ = query_plan_;
402 if (can_use_speculative_top_n_sort) {
403 copied_rs->can_use_speculative_top_n_sort = can_use_speculative_top_n_sort;
410 return storage_.get();
414 return just_explain_ ? 1 : targets_.size();
423 : targets_[col_idx].sql_type;
427 constexpr
bool with_generation =
true;
428 return catalog_ ? row_set_mem_owner_->getOrAddStringDictProxy(
430 : row_set_mem_owner_->getStringDictProxy(dict_id);
441 using StringId = int32_t;
442 StringId*
const string_id_ptr =
443 const_cast<StringId*
>(
reinterpret_cast<StringId const*
>(cell_ptr));
445 *string_id_ptr =
id_map_[*string_id_ptr];
456 size_t const start_idx) {
458 CHECK_EQ(targets.size(), storage_->targets_.size());
460 for (
size_t target_idx = start_idx; target_idx < targets.size(); ++target_idx) {
461 auto const& type_lhs = targets[target_idx].sql_type;
462 if (type_lhs.is_dict_encoded_string()) {
464 const_cast<SQLTypeInfo&
>(storage_->targets_[target_idx].sql_type);
465 CHECK(type_rhs.is_dict_encoded_string());
466 if (type_lhs.get_comp_param() != type_rhs.get_comp_param()) {
467 auto*
const sdp_lhs = getStringDictionaryProxy(type_lhs.get_comp_param());
469 auto const*
const sdp_rhs = getStringDictionaryProxy(type_rhs.get_comp_param());
471 state.cur_target_idx_ = target_idx;
475 type_rhs.set_comp_param(type_lhs.get_comp_param());
488 CHECK_LT(target_idx, lazy_fetch_info_.size());
489 auto& col_lazy_fetch = lazy_fetch_info_[target_idx];
490 CHECK(col_lazy_fetch.is_lazily_fetched);
491 int const target_size = storage_->targets_[target_idx].sql_type.get_size();
492 CHECK_LT(0, target_size) << storage_->targets_[target_idx].toString();
493 size_t const nrows = storage_->binSearchRowCount();
500 : query_mem_desc_.getEffectiveKeyWidth();
503 size_t const next_target_idx = j + 1;
506 auto const& next_agg_info = storage_->targets_[next_target_idx];
511 : query_mem_desc_.getEffectiveKeyWidth();
513 for (
size_t i = 0; i < nrows; ++i) {
517 auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
518 CHECK_LT(
size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
519 int8_t
const*
const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
520 func(col_frag + pos * target_size);
523 size_t const key_bytes_with_padding =
525 for (
size_t i = 0; i < nrows; ++i) {
526 int8_t
const*
const keys_ptr =
row_ptr_rowwise(storage_->buff_, storage_qmd, i);
527 int8_t
const*
const rowwise_target_ptr = keys_ptr + key_bytes_with_padding;
528 int64_t pos = *
reinterpret_cast<int64_t const*
>(rowwise_target_ptr);
529 auto& frag_col_buffers = getColumnFrag(0, target_idx, pos);
530 CHECK_LT(
size_t(col_lazy_fetch.local_col_id), frag_col_buffers.size());
531 int8_t
const*
const col_frag = frag_col_buffers[col_lazy_fetch.local_col_id];
532 func(col_frag + pos * target_size);
540 if (total_row_count < offset) {
544 size_t total_truncated_row_count = total_row_count - offset;
547 return std::min(total_truncated_row_count, limit);
550 return total_truncated_row_count;
562 if (!permutation_.empty()) {
570 CHECK(permutation_.empty());
572 return binSearchRowCount();
575 constexpr
size_t auto_parallel_row_count_threshold{20000UL};
576 if (force_parallel || entryCount() >= auto_parallel_row_count_threshold) {
577 return parallelRowCount();
579 std::lock_guard<std::mutex> lock(row_iteration_mutex_);
583 auto crt_row = getNextRowUnlocked(
false,
false);
584 if (crt_row.empty()) {
596 const int64_t cached_row_count = cached_row_count_;
599 return cached_row_count;
601 setCachedRowCount(rowCountImpl(force_parallel));
602 return cached_row_count_;
610 const int64_t signed_row_count =
static_cast<int64_t
>(row_count);
611 const int64_t old_cached_row_count = cached_row_count_.exchange(signed_row_count);
613 old_cached_row_count == signed_row_count);
621 size_t row_count = storage_->binSearchRowCount();
622 for (
auto& s : appended_storage_) {
623 row_count += s->binSearchRowCount();
630 using namespace threading;
632 const blocked_range<size_t>& r,
635 for (
size_t i = r.begin(); i < r.end(); ++i) {
636 if (!isRowAtEmpty(i)) {
642 const auto row_count =
parallel_reduce(blocked_range<size_t>(0, entryCount()),
644 execute_parallel_row_count,
665 return rowCount() == size_t(0);
669 return (!storage_ && !estimator_ && !just_explain_) || cached_row_count_ == 0;
674 return storage_->query_mem_desc_;
683 return storage_->target_init_vals_;
688 CHECK(device_estimator_buffer_);
689 return device_estimator_buffer_->getMemoryPtr();
693 return host_estimator_buffer_;
698 CHECK(!host_estimator_buffer_);
699 CHECK_EQ(
size_t(0), estimator_->getBufferSize() %
sizeof(int64_t));
700 host_estimator_buffer_ =
701 static_cast<int8_t*
>(
checked_calloc(estimator_->getBufferSize(), 1));
702 CHECK(device_estimator_buffer_);
703 auto device_buffer_ptr = device_estimator_buffer_->getMemoryPtr();
704 auto allocator = std::make_unique<CudaAllocator>(
706 allocator->copyFromDevice(
707 host_estimator_buffer_, device_buffer_ptr, estimator_->getBufferSize());
711 timings_.executor_queue_time = queue_time;
715 timings_.kernel_queue_time = kernel_queue_time;
719 timings_.compilation_queue_time += compilation_queue_time;
723 return timings_.executor_queue_time + timings_.kernel_queue_time +
724 timings_.compilation_queue_time;
728 return timings_.render_time;
732 crt_row_buff_idx_ = 0;
737 return keep_first_ + drop_first_;
741 return just_explain_;
745 for_validation_only_ =
true;
749 return for_validation_only_;
759 query_mem_desc_copy.resetGroupColWidths(
760 std::vector<int8_t>(query_mem_desc_copy.getGroupbyColCount(), 8));
762 return query_mem_desc_copy;
764 query_mem_desc_copy.alignPaddedSlots();
765 return query_mem_desc_copy;
770 const Executor* executor) {
776 invalidateCachedRowCount();
777 CHECK(!targets_.empty());
779 if (canUseFastBaselineSort(order_entries, top_n)) {
780 baselineSort(order_entries, top_n, executor);
784 if (query_mem_desc_.sortOnGpu()) {
786 radixSortOnGpu(order_entries);
788 LOG(
WARNING) <<
"Out of GPU memory during sort, finish on CPU";
789 radixSortOnCpu(order_entries);
790 }
catch (
const std::bad_alloc&) {
791 LOG(
WARNING) <<
"Out of GPU memory during sort, finish on CPU";
792 radixSortOnCpu(order_entries);
797 if (query_mem_desc_.getEntryCount() > std::numeric_limits<uint32_t>::max()) {
801 CHECK(permutation_.empty());
807 parallelTop(order_entries, top_n, executor);
812 permutation_.resize(query_mem_desc_.getEntryCount());
815 pv = initPermutationBuffer(pv, 0, permutation_.size());
819 pv = topPermutation(pv, top_n, createComparator(order_entries, pv, executor,
false));
820 if (pv.size() < permutation_.size()) {
821 permutation_.resize(pv.size());
822 permutation_.shrink_to_fit();
830 const Executor* executor) {
833 if (getGpuCount() > 1) {
851 const auto storage_lookup_result = findStorage(i);
852 const auto lhs_storage = storage_lookup_result.storage_ptr;
853 const auto off = storage_lookup_result.fixedup_entry_idx;
855 if (!lhs_storage->isEmptyEntry(off)) {
868 const Executor* executor) {
873 permutation_.resize(query_mem_desc_.getEntryCount());
874 std::vector<PermutationView> permutation_views(nthreads);
876 for (
auto interval : makeIntervals<PermutationIdx>(0, permutation_.size(), nthreads)) {
877 top_sort_threads.
run([
this,
885 PermutationView pv(permutation_.data() + interval.begin, 0, interval.size());
886 pv = initPermutationBuffer(pv, interval.begin, interval.end);
887 const auto compare = createComparator(order_entries, pv, executor,
true);
888 permutation_views[interval.index] = topPermutation(pv, top_n, compare);
891 top_sort_threads.
wait();
900 auto end = permutation_.begin() + permutation_views.front().size();
901 for (
size_t i = 1; i < nthreads; ++i) {
902 std::copy(permutation_views[i].begin(), permutation_views[i].end(), end);
903 end += permutation_views[i].size();
908 const auto compare = createComparator(order_entries, pv, executor,
false);
909 pv = topPermutation(pv, top_n, compare);
910 permutation_.resize(pv.size());
911 permutation_.shrink_to_fit();
915 size_t fixedup_entry_idx = entry_idx;
916 auto entry_count = storage_->query_mem_desc_.getEntryCount();
917 const bool is_rowwise_layout = !storage_->query_mem_desc_.didOutputColumnar();
918 if (fixedup_entry_idx < entry_count) {
919 return {0, fixedup_entry_idx};
921 fixedup_entry_idx -= entry_count;
922 for (
size_t i = 0; i < appended_storage_.size(); ++i) {
923 const auto& desc = appended_storage_[i]->query_mem_desc_;
924 CHECK_NE(is_rowwise_layout, desc.didOutputColumnar());
925 entry_count = desc.getEntryCount();
926 if (fixedup_entry_idx < entry_count) {
927 return {i + 1, fixedup_entry_idx};
929 fixedup_entry_idx -= entry_count;
931 UNREACHABLE() <<
"entry_idx = " << entry_idx <<
", query_mem_desc_.getEntryCount() = "
932 << query_mem_desc_.getEntryCount();
940 auto [stg_idx, fixedup_entry_idx] = getStorageIndex(entry_idx);
941 return {stg_idx ? appended_storage_[stg_idx - 1].get() : storage_.get(),
946 template <
typename BUFFER_ITERATOR_TYPE>
948 BUFFER_ITERATOR_TYPE>::materializeCountDistinctColumns() {
949 for (
const auto& order_entry : order_entries_) {
951 count_distinct_materialized_buffers_.emplace_back(
952 materializeCountDistinctColumn(order_entry));
957 template <
typename BUFFER_ITERATOR_TYPE>
959 BUFFER_ITERATOR_TYPE>::materializeApproxQuantileColumns()
const {
961 for (
const auto& order_entry : order_entries_) {
962 if (result_set_->targets_[order_entry.tle_no - 1].agg_kind ==
kAPPROX_QUANTILE) {
963 approx_quantile_materialized_buffers.emplace_back(
964 materializeApproxQuantileColumn(order_entry));
967 return approx_quantile_materialized_buffers;
970 template <
typename BUFFER_ITERATOR_TYPE>
974 const size_t num_storage_entries = result_set_->query_mem_desc_.getEntryCount();
975 std::vector<int64_t> count_distinct_materialized_buffer(num_storage_entries);
977 result_set_->query_mem_desc_.getCountDistinctDescriptor(order_entry.
tle_no - 1);
978 const size_t num_non_empty_entries = permutation_.size();
983 for (
size_t i = start; i < end; ++i) {
985 const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
986 const auto storage = storage_lookup_result.storage_ptr;
987 const auto off = storage_lookup_result.fixedup_entry_idx;
988 const auto value = buffer_itr_.getColumnInternal(
989 storage->buff_, off, order_entry.
tle_no - 1, storage_lookup_result);
990 count_distinct_materialized_buffer[permuted_idx] =
996 if (single_threaded_) {
997 work(0, num_non_empty_entries);
1000 for (
auto interval : makeIntervals<size_t>(0, num_non_empty_entries,
cpu_threads())) {
1001 thread_pool.
run([=] { work(interval.begin, interval.end); });
1005 return count_distinct_materialized_buffer;
1012 double const quantile = t_digest->
quantile();
1013 return boost::math::isnan(quantile) ?
NULL_DOUBLE : quantile;
1016 template <
typename BUFFER_ITERATOR_TYPE>
1017 ResultSet::ApproxQuantileBuffers::value_type
1020 ResultSet::ApproxQuantileBuffers::value_type materialized_buffer(
1021 result_set_->query_mem_desc_.getEntryCount());
1022 const size_t size = permutation_.size();
1026 for (
size_t i = start; i < end; ++i) {
1028 const auto storage_lookup_result = result_set_->findStorage(permuted_idx);
1029 const auto storage = storage_lookup_result.storage_ptr;
1030 const auto off = storage_lookup_result.fixedup_entry_idx;
1031 const auto value = buffer_itr_.getColumnInternal(
1032 storage->buff_, off, order_entry.
tle_no - 1, storage_lookup_result);
1033 materialized_buffer[permuted_idx] =
1034 value.i1 ? calculateQuantile(reinterpret_cast<quantile::TDigest*>(value.i1))
1038 if (single_threaded_) {
1042 for (
auto interval : makeIntervals<size_t>(0, size,
cpu_threads())) {
1043 thread_pool.
run([=] { work(interval.begin, interval.end); });
1047 return materialized_buffer;
1050 template <
typename BUFFER_ITERATOR_TYPE>
1056 const auto lhs_storage_lookup_result = result_set_->findStorage(lhs);
1057 const auto rhs_storage_lookup_result = result_set_->findStorage(rhs);
1058 const auto lhs_storage = lhs_storage_lookup_result.storage_ptr;
1059 const auto rhs_storage = rhs_storage_lookup_result.storage_ptr;
1060 const auto fixedup_lhs = lhs_storage_lookup_result.fixedup_entry_idx;
1061 const auto fixedup_rhs = rhs_storage_lookup_result.fixedup_entry_idx;
1062 size_t materialized_count_distinct_buffer_idx{0};
1063 size_t materialized_approx_quantile_buffer_idx{0};
1065 for (
const auto& order_entry : order_entries_) {
1068 const auto& lhs_agg_info = lhs_storage->targets_[order_entry.tle_no - 1];
1069 const auto& rhs_agg_info = rhs_storage->targets_[order_entry.tle_no - 1];
1079 if (lhs_entry_ti.get_type() ==
kFLOAT) {
1080 const auto is_col_lazy =
1081 !result_set_->lazy_fetch_info_.empty() &&
1082 result_set_->lazy_fetch_info_[order_entry.tle_no - 1].is_lazily_fetched;
1083 if (result_set_->query_mem_desc_.getPaddedSlotWidthBytes(order_entry.tle_no - 1) ==
1085 float_argument_input =
1086 result_set_->query_mem_desc_.didOutputColumnar() ? !is_col_lazy :
true;
1091 CHECK_LT(materialized_count_distinct_buffer_idx,
1092 count_distinct_materialized_buffers_.size());
1094 const auto& count_distinct_materialized_buffer =
1095 count_distinct_materialized_buffers_[materialized_count_distinct_buffer_idx];
1096 const auto lhs_sz = count_distinct_materialized_buffer[lhs];
1097 const auto rhs_sz = count_distinct_materialized_buffer[rhs];
1098 ++materialized_count_distinct_buffer_idx;
1099 if (lhs_sz == rhs_sz) {
1102 return (lhs_sz < rhs_sz) != order_entry.is_desc;
1104 CHECK_LT(materialized_approx_quantile_buffer_idx,
1105 approx_quantile_materialized_buffers_.size());
1106 const auto& approx_quantile_materialized_buffer =
1107 approx_quantile_materialized_buffers_[materialized_approx_quantile_buffer_idx];
1108 const auto lhs_value = approx_quantile_materialized_buffer[lhs];
1109 const auto rhs_value = approx_quantile_materialized_buffer[rhs];
1110 ++materialized_approx_quantile_buffer_idx;
1111 if (lhs_value == rhs_value) {
1113 }
else if (!lhs_entry_ti.get_notnull()) {
1115 return order_entry.nulls_first;
1117 return !order_entry.nulls_first;
1120 return (lhs_value < rhs_value) != order_entry.is_desc;
1123 const auto lhs_v = buffer_itr_.getColumnInternal(lhs_storage->buff_,
1125 order_entry.tle_no - 1,
1126 lhs_storage_lookup_result);
1127 const auto rhs_v = buffer_itr_.getColumnInternal(rhs_storage->buff_,
1129 order_entry.tle_no - 1,
1130 rhs_storage_lookup_result);
1132 if (
UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1133 isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1136 if (
UNLIKELY(isNull(lhs_entry_ti, lhs_v, float_argument_input) &&
1137 !isNull(rhs_entry_ti, rhs_v, float_argument_input))) {
1138 return order_entry.nulls_first;
1140 if (
UNLIKELY(isNull(rhs_entry_ti, rhs_v, float_argument_input) &&
1141 !isNull(lhs_entry_ti, lhs_v, float_argument_input))) {
1142 return !order_entry.nulls_first;
1145 if (
LIKELY(lhs_v.isInt())) {
1146 CHECK(rhs_v.isInt());
1147 if (
UNLIKELY(lhs_entry_ti.is_string() &&
1149 CHECK_EQ(4, lhs_entry_ti.get_logical_size());
1151 const auto lhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1152 lhs_entry_ti.get_comp_param(), result_set_->row_set_mem_owner_,
false);
1153 const auto rhs_string_dict_proxy = executor_->getStringDictionaryProxy(
1154 rhs_entry_ti.get_comp_param(), result_set_->row_set_mem_owner_,
false);
1155 const auto lhs_str = lhs_string_dict_proxy->getString(lhs_v.i1);
1156 const auto rhs_str = rhs_string_dict_proxy->getString(rhs_v.i1);
1157 if (lhs_str == rhs_str) {
1160 return (lhs_str < rhs_str) != order_entry.is_desc;
1163 if (lhs_v.i1 == rhs_v.i1) {
1166 if (lhs_entry_ti.is_fp()) {
1167 if (float_argument_input) {
1168 const auto lhs_dval = *
reinterpret_cast<const float*
>(may_alias_ptr(&lhs_v.i1));
1169 const auto rhs_dval = *
reinterpret_cast<const float*
>(may_alias_ptr(&rhs_v.i1));
1170 return (lhs_dval < rhs_dval) != order_entry.is_desc;
1172 const auto lhs_dval =
1173 *
reinterpret_cast<const double*
>(may_alias_ptr(&lhs_v.i1));
1174 const auto rhs_dval =
1175 *
reinterpret_cast<const double*
>(may_alias_ptr(&rhs_v.i1));
1176 return (lhs_dval < rhs_dval) != order_entry.is_desc;
1179 return (lhs_v.i1 < rhs_v.i1) != order_entry.is_desc;
1181 if (lhs_v.isPair()) {
1182 CHECK(rhs_v.isPair());
1184 pair_to_double({lhs_v.i1, lhs_v.i2}, lhs_entry_ti, float_argument_input);
1186 pair_to_double({rhs_v.i1, rhs_v.i2}, rhs_entry_ti, float_argument_input);
1190 return (lhs < rhs) != order_entry.is_desc;
1192 CHECK(lhs_v.isStr() && rhs_v.isStr());
1193 const auto lhs = lhs_v.strVal();
1194 const auto rhs = rhs_v.strVal();
1198 return (lhs < rhs) != order_entry.is_desc;
1212 if (n < permutation.
size()) {
1214 permutation.
begin(), permutation.
begin() +
n, permutation.
end(), compare);
1223 const std::list<Analyzer::OrderEntry>& order_entries)
const {
1225 auto data_mgr = &
catalog_->getDataMgr();
1226 const int device_id{0};
1227 auto allocator = std::make_unique<CudaAllocator>(
1231 std::vector<int64_t*> group_by_buffers(block_size_);
1232 group_by_buffers[0] =
reinterpret_cast<int64_t*
>(storage_->getUnderlyingBuffer());
1233 auto dev_group_by_buffers =
1248 order_entries, query_mem_desc_, dev_group_by_buffers, data_mgr, device_id);
1253 dev_group_by_buffers.data,
1263 const std::list<Analyzer::OrderEntry>& order_entries)
const {
1265 CHECK(!query_mem_desc_.hasKeylessHash());
1266 std::vector<int64_t> tmp_buff(query_mem_desc_.getEntryCount());
1267 std::vector<int32_t> idx_buff(query_mem_desc_.getEntryCount());
1268 CHECK_EQ(
size_t(1), order_entries.size());
1269 auto buffer_ptr = storage_->getUnderlyingBuffer();
1270 for (
const auto& order_entry : order_entries) {
1271 const auto target_idx = order_entry.tle_no - 1;
1272 const auto sortkey_val_buff =
reinterpret_cast<int64_t*
>(
1273 buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1274 const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1277 query_mem_desc_.getEntryCount(),
1278 order_entry.is_desc,
1282 query_mem_desc_.getEntryCount(),
1285 for (
size_t target_idx = 0; target_idx < query_mem_desc_.getSlotCount();
1287 if (static_cast<int>(target_idx) == order_entry.tle_no - 1) {
1290 const auto chosen_bytes = query_mem_desc_.getPaddedSlotWidthBytes(target_idx);
1291 const auto satellite_val_buff =
reinterpret_cast<int64_t*
>(
1292 buffer_ptr + query_mem_desc_.getColOffInBytes(target_idx));
1295 query_mem_desc_.getEntryCount(),
1307 const int dict_id)
const {
1308 const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1311 return sdp->getDictionary()->copyStrings();
1314 const std::pair<std::vector<int32_t>, std::vector<std::string>>
1316 const auto col_type_info = getColType(col_idx);
1317 CHECK(col_type_info.is_dict_encoded_string());
1318 std::unordered_set<int32_t> unique_string_ids_set;
1319 const size_t num_entries = entryCount();
1320 std::vector<bool> targets_to_skip(colCount(),
true);
1321 targets_to_skip[col_idx] =
false;
1324 for (
size_t row_idx = 0; row_idx < num_entries; ++row_idx) {
1325 const auto result_row = getRowAtNoTranslations(row_idx, targets_to_skip);
1326 if (!result_row.empty()) {
1327 const auto scalar_col_val = boost::get<ScalarTargetValue>(result_row[col_idx]);
1328 const int32_t string_id =
static_cast<int32_t
>(boost::get<int64_t>(scalar_col_val));
1329 if (string_id != null_val) {
1330 unique_string_ids_set.emplace(string_id);
1335 const size_t num_unique_strings = unique_string_ids_set.size();
1336 std::vector<int32_t> unique_string_ids(num_unique_strings);
1337 size_t string_idx{0};
1338 for (
const auto unique_string_id : unique_string_ids_set) {
1339 unique_string_ids[string_idx++] = unique_string_id;
1342 const int32_t dict_id = col_type_info.get_comp_param();
1343 const auto sdp = row_set_mem_owner_->getOrAddStringDictProxy(
1347 return std::make_pair(unique_string_ids, sdp->getStrings(unique_string_ids));
1360 }
else if (query_mem_desc_.didOutputColumnar()) {
1361 return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1363 query_mem_desc_.getQueryDescriptionType() ==
1365 query_mem_desc_.getQueryDescriptionType() ==
1367 query_mem_desc_.getQueryDescriptionType() ==
1370 CHECK(!(query_mem_desc_.getQueryDescriptionType() ==
1372 return permutation_.empty() && (query_mem_desc_.getQueryDescriptionType() ==
1374 query_mem_desc_.getQueryDescriptionType() ==
1380 return query_mem_desc_.didOutputColumnar() &&
1382 query_mem_desc_.getQueryDescriptionType() ==
1384 appended_storage_.empty() && storage_ &&
1385 (lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
1389 CHECK(isZeroCopyColumnarConversionPossible(column_idx));
1390 return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(column_idx);
1395 std::vector<bool> target_bitmap(targets_.size(),
true);
1396 size_t num_single_slot_targets = 0;
1397 for (
size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1398 const auto& sql_type = targets_[target_idx].sql_type;
1399 if (targets_[target_idx].
is_agg && targets_[target_idx].agg_kind ==
kAVG) {
1400 target_bitmap[target_idx] =
false;
1401 }
else if (sql_type.is_varlen()) {
1402 target_bitmap[target_idx] =
false;
1404 num_single_slot_targets++;
1407 return std::make_tuple(std::move(target_bitmap), num_single_slot_targets);
1420 CHECK(isDirectColumnarConversionPossible());
1421 auto [single_slot_targets, num_single_slot_targets] = getSingleSlotTargetBitmap();
1423 for (
size_t target_idx = 0; target_idx < single_slot_targets.size(); target_idx++) {
1424 const auto& target = targets_[target_idx];
1425 if (single_slot_targets[target_idx] &&
1427 (target.is_agg && target.agg_kind ==
kSAMPLE && target.sql_type ==
kFLOAT))) {
1428 single_slot_targets[target_idx] =
false;
1429 num_single_slot_targets--;
1432 CHECK_GE(num_single_slot_targets,
size_t(0));
1433 return std::make_tuple(std::move(single_slot_targets), num_single_slot_targets);
1438 std::vector<size_t> slot_indices(targets_.size(), 0);
1439 size_t slot_index = 0;
1440 for (
size_t target_idx = 0; target_idx < targets_.size(); target_idx++) {
1441 slot_indices[target_idx] = slot_index;
1442 slot_index =
advance_slot(slot_index, targets_[target_idx],
false);
1444 return slot_indices;
1450 return !rows.isTruncated();
1462 std::vector<TargetInfo>
const& targets) {
1463 auto const itr = std::find_if(targets.begin(), targets.end(), IsDictEncodedStr{});
1464 return itr == targets.end() ? std::nullopt
1465 : std::make_optional<size_t>(itr - targets.begin());
QidScopeGuard set_thread_local_query_id(QueryId const query_id)
bool is_agg(const Analyzer::Expr *expr)
void syncEstimatorBuffer() const
const QueryMemoryDescriptor & getQueryMemDesc() const
void sort_groups_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes)
GpuGroupByBuffers create_dev_group_by_buffers(DeviceAllocator *device_allocator, const std::vector< int64_t * > &group_by_buffers, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const ExecutorDispatchMode dispatch_mode, const int64_t num_input_rows, const bool prepend_index_buffer, const bool always_init_group_by_on_host, const bool use_bump_allocator, const bool has_varlen_output, Allocator *insitu_allocator)
size_t g_parallel_top_max
std::pair< size_t, size_t > getStorageIndex(const size_t entry_idx) const
DEVICE void push_back(T const &value)
bool isValidationOnlyRes() const
void setValidationOnlyRes()
PermutationView initPermutationBuffer(PermutationView permutation, PermutationIdx const begin, PermutationIdx const end) const
class for a per-database catalog. also includes metadata for the current database and the current use...
bool g_enable_direct_columnarization
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
DEVICE RealType quantile(VectorView< IndexType const > const partial_sum, RealType const q) const
static const size_t baseline_threshold
ResultSet(const std::vector< TargetInfo > &targets, const ExecutorDeviceType device_type, const QueryMemoryDescriptor &query_mem_desc, const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const Catalog_Namespace::Catalog *catalog, const unsigned block_size, const unsigned grid_size)
DEVICE void sort(ARGS &&...args)
const std::vector< TargetInfo > & getTargetInfos() const
std::optional< size_t > first_dict_encoded_idx(std::vector< TargetInfo > const &)
void setKernelQueueTime(const int64_t kernel_queue_time)
size_t rowCount(const bool force_parallel=false) const
Returns the number of valid entries in the result set (i.e that will be returned from the SQL query o...
std::shared_ptr< ResultSet > ResultSetPtr
CellCallback(StringDictionaryProxy::IdMap &&id_map, int64_t const null_int)
int64_t read_int_from_buff(const int8_t *ptr, const int8_t compact_sz)
void keepFirstN(const size_t n)
size_t g_streaming_topn_max
double pair_to_double(const std::pair< int64_t, int64_t > &fp_pair, const SQLTypeInfo &ti, const bool float_argument_input)
void addCompilationQueueTime(const int64_t compilation_queue_time)
std::vector< std::vector< double >> ApproxQuantileBuffers
bool takes_float_argument(const TargetInfo &target_info)
void parallelTop(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
void apply_permutation_cpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, int64_t *tmp_buff, const uint32_t chosen_bytes)
DEVICE void resize(size_type const size)
std::vector< int64_t > materializeCountDistinctColumn(const Analyzer::OrderEntry &order_entry) const
ApproxQuantileBuffers::value_type materializeApproxQuantileColumn(const Analyzer::OrderEntry &order_entry) const
size_t get_truncated_row_count(size_t total_row_count, size_t limit, size_t offset)
size_t parallelRowCount() const
DEVICE void mergeBufferFinal()
void radixSortOnCpu(const std::list< Analyzer::OrderEntry > &order_entries) const
const SQLTypeInfo get_compact_type(const TargetInfo &target)
bool definitelyHasNoRows() const
const std::vector< std::string > getStringDictionaryPayloadCopy(const int dict_id) const
bool use_parallel_algorithms(const ResultSet &rows)
bool isZeroCopyColumnarConversionPossible(size_t column_idx) const
size_t g_parallel_top_min
int8_t * getHostEstimatorBuffer() const
DEVICE size_type size() const
void invalidateCachedRowCount() const
const ResultSetStorage * allocateStorage() const
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
bool operator()(TargetInfo const &target_info) const
void sort(const std::list< Analyzer::OrderEntry > &order_entries, size_t top_n, const Executor *executor)
DEVICE auto copy(ARGS &&...args)
void setQueueTime(const int64_t queue_time)
void dropFirstN(const size_t n)
std::vector< PermutationIdx > Permutation
std::tuple< std::vector< bool >, size_t > getSingleSlotTargetBitmap() const
void * checked_calloc(const size_t nmemb, const size_t size)
StorageLookupResult findStorage(const size_t entry_idx) const
bool is_distinct_target(const TargetInfo &target_info)
void translate_string_ids(std::vector< int32_t > &dest_ids, const LeafHostInfo &dict_server_host, const DictRef dest_dict_ref, const std::vector< int32_t > &source_ids, const DictRef source_dict_ref, const int32_t dest_generation)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
std::function< bool(const PermutationIdx, const PermutationIdx)> Comparator
bool g_enable_smem_group_by true
static double calculateQuantile(quantile::TDigest *const t_digest)
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
void radixSortOnGpu(const std::list< Analyzer::OrderEntry > &order_entries) const
const ResultSetStorage * getStorage() const
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
const std::pair< std::vector< int32_t >, std::vector< std::string > > getUniqueStringsForDictEncodedTargetCol(const size_t col_idx) const
int64_t getQueueTime() const
SQLTypeInfo getColType(const size_t col_idx) const
std::tuple< std::vector< bool >, size_t > getSupportedSingleSlotTargetBitmap() const
ExecutorDeviceType getDeviceType() const
StringDictionaryProxy * getStringDictionaryProxy(int const dict_id) const
const int8_t * getColumnarBuffer(size_t column_idx) const
void eachCellInColumn(RowIterationState &, CellCallback const &)
StringDictionaryProxy::IdMap const id_map_
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
size_t rowCountImpl(const bool force_parallel) const
void baselineSort(const std::list< Analyzer::OrderEntry > &order_entries, const size_t top_n, const Executor *executor)
const Permutation & getPermutationBuffer() const
void append(ResultSet &that)
std::string summaryToString() const
bool didOutputColumnar() const
static PermutationView topPermutation(PermutationView, const size_t n, const Comparator &)
size_t getCurrentRowBufferIndex() const
bool g_enable_watchdog false
#define DEBUG_TIMER(name)
int8_t * getDeviceEstimatorBuffer() const
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
bool operator()(const PermutationIdx lhs, const PermutationIdx rhs) const
Basic constructors and methods of the row set interface.
bool isEmpty() const
Returns a boolean signifying whether there are valid entries in the result set.
bool is_dict_encoded_string() const
const std::vector< int64_t > & getTargetInitVals() const
std::vector< size_t > getSlotIndicesForTargetIndices() const
Allocate GPU memory using GpuBuffers via DataMgr.
Execution unit for relational algebra. It's a low-level description of any relational algebra operati...
constexpr int64_t uninitialized_cached_row_count
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
void translateDictEncodedColumns(std::vector< TargetInfo > const &, size_t const start_idx)
void copy_group_by_buffers_from_gpu(DeviceAllocator &device_allocator, const std::vector< int64_t * > &group_by_buffers, const size_t groups_buffer_size, const int8_t *group_by_dev_buffers_mem, const QueryMemoryDescriptor &query_mem_desc, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const bool prepend_index_buffer, const bool has_varlen_output)
void operator()(int8_t const *const cell_ptr) const
bool can_use_parallel_algorithms(const ResultSet &rows)
int64_t getRenderTime() const
void setCachedRowCount(const size_t row_count) const
bool isDirectColumnarConversionPossible() const
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
size_t binSearchRowCount() const