36 const Executor* executor,
41 const int64_t num_rows,
42 const std::vector<std::vector<const int8_t*>>& col_buffers,
43 const std::vector<std::vector<uint64_t>>& frag_offsets,
44 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
45 const bool output_columnar,
47 const size_t thread_idx,
49 : query_mem_desc_(query_mem_desc)
51 , device_type_(device_type)
52 , dispatch_mode_(dispatch_mode)
53 , row_set_mem_owner_(row_set_mem_owner)
54 , output_columnar_(output_columnar) {
56 auto data_mgr = executor->getDataMgr();
62 auto render_allocator_map = render_info && render_info->
isInSitu()
65 query_buffers_ = std::make_unique<QueryMemoryInitializer>(ra_exe_unit,
85 const size_t i)
const {
88 auto deinterleaved_query_mem_desc =
90 deinterleaved_query_mem_desc.setHasInterleavedBinsOnGpu(
false);
91 deinterleaved_query_mem_desc.useConsistentSlotWidthSize(8);
93 auto deinterleaved_result_set =
94 std::make_shared<ResultSet>(result_set->getTargetInfos(),
95 std::vector<ColumnLazyFetchInfo>{},
96 std::vector<std::vector<const int8_t*>>{},
97 std::vector<std::vector<int64_t>>{},
98 std::vector<int64_t>{},
101 deinterleaved_query_mem_desc,
105 auto deinterleaved_storage =
106 deinterleaved_result_set->allocateStorage(
executor_->plan_state_->init_agg_vals_);
107 auto deinterleaved_buffer =
108 reinterpret_cast<int64_t*
>(deinterleaved_storage->getUnderlyingBuffer());
109 const auto rows_ptr = result_set->getStorage()->getUnderlyingBuffer();
110 size_t deinterleaved_buffer_idx = 0;
112 auto do_work = [&](
const size_t bin_base_off) {
113 std::vector<int64_t> agg_vals(agg_col_count, 0);
115 &
executor_->plan_state_->init_agg_vals_[0],
116 agg_col_count *
sizeof(agg_vals[0]));
123 result_set->getTargetInfos(),
125 for (
size_t agg_idx = 0; agg_idx < agg_col_count;
126 ++agg_idx, ++deinterleaved_buffer_idx) {
127 deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];
132 bin_idx < result_set->entryCount();
134 if (
UNLIKELY((bin_idx & 0xFFFF) == 0 &&
135 executor_->checkNonKernelTimeInterrupted())) {
136 throw std::runtime_error(
137 "Query execution has interrupted during result set reduction");
139 do_work(bin_base_off);
143 bin_idx < result_set->entryCount();
145 do_work(bin_base_off);
149 return deinterleaved_result_set;
161 std::vector<std::pair<ResultSetPtr, std::vector<size_t>>> results_per_sm;
163 const auto group_by_buffers_size =
query_buffers_->getNumBuffers();
165 const size_t expected_num_buffers = query_mem_desc.
hasVarlenOutput() ? 2 : 1;
166 CHECK_EQ(expected_num_buffers, group_by_buffers_size);
170 const size_t group_by_output_buffers_size =
172 for (
size_t i = 0; i < group_by_output_buffers_size; i += step) {
176 return executor_->reduceMultiDeviceResults(
191 for (
const auto err : error_codes) {
196 for (
const auto err : error_codes) {
209 const bool hoist_literals,
210 const std::vector<int8_t>& literal_buff,
211 std::vector<std::vector<const int8_t*>> col_buffers,
212 const std::vector<std::vector<int64_t>>& num_rows,
213 const std::vector<std::vector<uint64_t>>& frag_offsets,
214 const int32_t scan_limit,
216 const unsigned block_size_x,
217 const unsigned grid_size_x,
219 const size_t shared_memory_size,
221 const uint32_t num_tables,
222 const bool allow_runtime_interrupt,
223 const std::vector<int8_t*>& join_hash_tables,
225 bool optimize_cuda_block_and_grid_sizes) {
230 CHECK(compilation_context);
236 if (render_allocator_map) {
242 std::vector<int64_t*> out_vec;
243 uint32_t num_fragments = col_buffers.size();
244 std::vector<int32_t> error_codes(grid_size_x * block_size_x);
246 auto prepareClock = kernel->make_clock();
247 auto launchClock = kernel->make_clock();
248 auto finishClock = kernel->make_clock();
251 prepareClock->start();
255 kernel->initializeDynamicWatchdog(
260 if (allow_runtime_interrupt && !render_allocator) {
261 kernel->initializeRuntimeInterrupter(device_id);
281 const unsigned block_size_y = 1;
282 const unsigned block_size_z = 1;
283 const unsigned grid_size_y = 1;
284 const unsigned grid_size_z = 1;
285 const auto total_thread_count = block_size_x * grid_size_x;
286 const auto err_desc = kernel_params[
ERROR_CODE];
290 auto gpu_group_by_buffers =
302 const auto max_matched =
static_cast<int32_t
>(gpu_group_by_buffers.entry_count);
304 kernel_params[
MAX_MATCHED], &max_matched,
sizeof(max_matched));
306 kernel_params[
GROUPBY_BUF] = gpu_group_by_buffers.ptrs;
307 std::vector<void*> param_ptrs;
308 for (
auto& param : kernel_params) {
309 param_ptrs.push_back(¶m);
313 auto prepareTime = prepareClock->stop();
315 <<
": launchGpuCode: group-by prepare: " <<
std::to_string(prepareTime)
317 launchClock->start();
320 if (hoist_literals) {
321 kernel->launch(grid_size_x,
329 optimize_cuda_block_and_grid_sizes);
331 param_ptrs.erase(param_ptrs.begin() +
LITERALS);
332 kernel->launch(grid_size_x,
340 optimize_cuda_block_and_grid_sizes);
343 auto launchTime = launchClock->stop();
345 <<
": launchGpuCode: group-by cuLaunchKernel: "
347 finishClock->start();
350 gpu_allocator_->copyFromDevice(reinterpret_cast<int8_t*>(error_codes.data()),
351 reinterpret_cast<int8_t*>(err_desc),
352 error_codes.size() *
sizeof(error_codes[0]));
354 if (*error_code > 0) {
358 if (!render_allocator) {
362 gpu_group_by_buffers,
371 gpu_group_by_buffers,
374 }
catch (
const std::bad_alloc&) {
384 gpu_group_by_buffers,
389 size_t num_allocated_rows{0};
395 if (*error_code < 0) {
404 gpu_group_by_buffers,
410 if (num_allocated_rows) {
411 CHECK(ra_exe_unit.use_bump_allocator);
422 gpu_group_by_buffers,
432 std::vector<int8_t*> out_vec_dev_buffers;
433 const size_t agg_col_count{ra_exe_unit.
estimator ? size_t(1) : init_agg_vals.size()};
438 const auto num_results_per_agg_col =
439 shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
440 const auto output_buffer_size_per_agg = num_results_per_agg_col *
sizeof(int64_t);
446 for (
size_t i = 0; i < agg_col_count; ++i) {
447 int8_t* out_vec_dev_buffer =
448 num_fragments ?
gpu_allocator_->alloc(output_buffer_size_per_agg) :
nullptr;
449 out_vec_dev_buffers.push_back(out_vec_dev_buffer);
450 if (shared_memory_size) {
451 CHECK_EQ(output_buffer_size_per_agg,
size_t(8));
452 gpu_allocator_->copyToDevice(reinterpret_cast<int8_t*>(out_vec_dev_buffer),
453 reinterpret_cast<const int8_t*>(&init_agg_vals[i]),
454 output_buffer_size_per_agg);
458 auto out_vec_dev_ptr =
gpu_allocator_->alloc(agg_col_count *
sizeof(int8_t*));
460 reinterpret_cast<int8_t*>(out_vec_dev_buffers.data()),
461 agg_col_count *
sizeof(int8_t*));
463 std::vector<void*> param_ptrs;
464 for (
auto& param : kernel_params) {
465 param_ptrs.push_back(¶m);
469 auto prepareTime = prepareClock->stop();
472 <<
": launchGpuCode: prepare: " <<
std::to_string(prepareTime) <<
" ms";
473 launchClock->start();
476 if (hoist_literals) {
477 kernel->launch(grid_size_x,
485 optimize_cuda_block_and_grid_sizes);
487 param_ptrs.erase(param_ptrs.begin() +
LITERALS);
488 kernel->launch(grid_size_x,
496 optimize_cuda_block_and_grid_sizes);
500 auto launchTime = launchClock->stop();
502 <<
": launchGpuCode: cuLaunchKernel: " <<
std::to_string(launchTime)
504 finishClock->start();
508 &error_codes[0], err_desc, error_codes.size() *
sizeof(error_codes[0]));
510 if (*error_code > 0) {
518 for (
size_t i = 0; i < agg_col_count; ++i) {
519 int64_t* host_out_vec =
new int64_t[output_buffer_size_per_agg];
521 host_out_vec, out_vec_dev_buffers[i], output_buffer_size_per_agg);
522 out_vec.push_back(host_out_vec);
525 const auto count_distinct_bitmap_mem =
query_buffers_->getCountDistinctBitmapPtr();
526 if (count_distinct_bitmap_mem) {
528 reinterpret_cast<void*
>(count_distinct_bitmap_mem),
532 const auto varlen_output_gpu_buf =
query_buffers_->getVarlenOutputPtr();
533 if (varlen_output_gpu_buf) {
535 const size_t varlen_output_buf_bytes =
540 reinterpret_cast<void*
>(varlen_output_gpu_buf),
541 varlen_output_buf_bytes);
545 if (allow_runtime_interrupt) {
546 kernel->resetRuntimeInterrupter(device_id);
548 auto finishTime = finishClock->stop();
550 <<
": launchGpuCode: finish: " <<
std::to_string(finishTime) <<
" ms";
559 const bool hoist_literals,
560 const std::vector<int8_t>& literal_buff,
561 std::vector<std::vector<const int8_t*>> col_buffers,
562 const std::vector<std::vector<int64_t>>& num_rows,
563 const std::vector<std::vector<uint64_t>>& frag_offsets,
564 const int32_t scan_limit,
566 const uint32_t num_tables,
567 const std::vector<int8_t*>& join_hash_tables,
575 std::vector<const int8_t**> multifrag_col_buffers;
576 for (
auto& col_buffer : col_buffers) {
577 multifrag_col_buffers.push_back(col_buffer.empty() ?
nullptr : col_buffer.data());
579 const int8_t*** multifrag_cols_ptr{
580 multifrag_col_buffers.empty() ?
nullptr : &multifrag_col_buffers[0]};
581 const uint64_t num_fragments =
582 multifrag_cols_ptr ?
static_cast<uint64_t
>(col_buffers.size()) : uint64_t(0);
583 const auto num_out_frags = multifrag_cols_ptr ? num_fragments : uint64_t(0);
586 std::vector<int64_t*> out_vec;
598 for (
size_t i = 0; i < init_agg_vals.size(); ++i) {
599 auto buff =
new int64_t[num_out_frags];
600 out_vec.push_back(static_cast<int64_t*>(buff));
605 CHECK_EQ(num_rows.size(), col_buffers.size());
606 std::vector<int64_t> flatened_num_rows;
607 for (
auto& nums : num_rows) {
608 flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
610 std::vector<uint64_t> flatened_frag_offsets;
611 for (
auto& offsets : frag_offsets) {
612 flatened_frag_offsets.insert(
613 flatened_frag_offsets.end(), offsets.begin(), offsets.end());
615 int64_t rowid_lookup_num_rows{*error_code ? *error_code + 1 : 0};
616 int64_t* num_rows_ptr;
617 if (num_rows_to_process > 0) {
619 num_rows_ptr = flatened_num_rows.data();
622 rowid_lookup_num_rows ? &rowid_lookup_num_rows : flatened_num_rows.data();
624 int32_t total_matched_init{0};
626 std::vector<int64_t> cmpt_val_buff;
635 int8_t* row_func_mgr_ptr =
reinterpret_cast<int8_t*
>(&mgr);
638 const int64_t* join_hash_tables_ptr =
639 join_hash_tables.size() == 1
640 ?
reinterpret_cast<const int64_t*
>(join_hash_tables[0])
641 : (join_hash_tables.size() > 1
642 ?
reinterpret_cast<const int64_t*
>(&join_hash_tables[0])
644 if (hoist_literals) {
645 using agg_query = void (*)(
const int8_t***,
659 reinterpret_cast<agg_query
>(native_code->
func())(
664 flatened_frag_offsets.data(),
667 cmpt_val_buff.data(),
671 join_hash_tables_ptr,
674 reinterpret_cast<agg_query
>(native_code->
func())(multifrag_cols_ptr,
678 flatened_frag_offsets.data(),
681 init_agg_vals.data(),
685 join_hash_tables_ptr,
689 using agg_query = void (*)(
const int8_t***,
702 reinterpret_cast<agg_query
>(native_code->
func())(
706 flatened_frag_offsets.data(),
709 cmpt_val_buff.data(),
713 join_hash_tables_ptr,
716 reinterpret_cast<agg_query
>(native_code->
func())(multifrag_cols_ptr,
719 flatened_frag_offsets.data(),
722 init_agg_vals.data(),
726 join_hash_tables_ptr,
735 if (rowid_lookup_num_rows && *error_code < 0) {
751 const std::vector<std::vector<const int8_t*>>& col_buffers,
752 const std::vector<int8_t>& literal_buff,
753 const std::vector<std::vector<int64_t>>& num_rows,
754 const std::vector<std::vector<uint64_t>>& frag_offsets,
755 const int32_t scan_limit,
756 const std::vector<int64_t>& init_agg_vals,
757 const std::vector<int32_t>& error_codes,
758 const uint32_t num_tables,
759 const std::vector<int8_t*>& join_hash_tables,
762 const bool hoist_literals,
763 const bool is_group_by)
const {
766 const uint64_t num_fragments =
static_cast<uint64_t
>(col_buffers.size());
767 const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0};
769 std::vector<int8_t*> multifrag_col_dev_buffers;
770 for (
auto frag_col_buffers : col_buffers) {
771 std::vector<const int8_t*> col_dev_buffers;
772 for (
auto col_buffer : frag_col_buffers) {
773 col_dev_buffers.push_back((int8_t*)col_buffer);
775 auto col_buffers_dev_ptr =
gpu_allocator_->alloc(col_count *
sizeof(int8_t*));
777 col_buffers_dev_ptr, &col_dev_buffers[0], col_count *
sizeof(int8_t*));
778 multifrag_col_dev_buffers.push_back(col_buffers_dev_ptr);
783 &multifrag_col_dev_buffers[0],
784 num_fragments *
sizeof(int8_t*));
789 int8_t* literals_and_addr_mapping =
791 CHECK_EQ(0, (int64_t)literals_and_addr_mapping % 8);
792 std::vector<int64_t> additional_literal_bytes;
793 const auto count_distinct_bitmap_mem =
query_buffers_->getCountDistinctBitmapPtr();
794 if (count_distinct_bitmap_mem) {
796 const auto count_distinct_bitmap_host_mem =
query_buffers_->getCountDistinctHostPtr();
797 CHECK(count_distinct_bitmap_host_mem);
798 additional_literal_bytes.push_back(
799 reinterpret_cast<int64_t>(count_distinct_bitmap_host_mem));
800 additional_literal_bytes.push_back(static_cast<int64_t>(count_distinct_bitmap_mem));
802 literals_and_addr_mapping,
803 &additional_literal_bytes[0],
804 additional_literal_bytes.size() *
sizeof(additional_literal_bytes[0]));
806 params[
LITERALS] = literals_and_addr_mapping + additional_literal_bytes.size() *
807 sizeof(additional_literal_bytes[0]);
808 if (!literal_buff.empty()) {
809 CHECK(hoist_literals);
812 CHECK_EQ(num_rows.size(), col_buffers.size());
813 std::vector<int64_t> flatened_num_rows;
814 for (
auto& nums : num_rows) {
816 flatened_num_rows.insert(flatened_num_rows.end(), nums.begin(), nums.end());
820 &flatened_num_rows[0],
821 sizeof(int64_t) * flatened_num_rows.size());
823 CHECK_EQ(frag_offsets.size(), col_buffers.size());
824 std::vector<int64_t> flatened_frag_offsets;
825 for (
auto& offsets : frag_offsets) {
826 CHECK_EQ(offsets.size(), num_tables);
827 flatened_frag_offsets.insert(
828 flatened_frag_offsets.end(), offsets.begin(), offsets.end());
831 gpu_allocator_->alloc(
sizeof(int64_t) * flatened_frag_offsets.size());
833 &flatened_frag_offsets[0],
834 sizeof(int64_t) * flatened_num_rows.size());
838 int32_t max_matched{scan_limit};
842 int32_t total_matched{0};
845 params[
TOTAL_MATCHED], &total_matched,
sizeof(total_matched));
852 params[
INIT_AGG_VALS], &cmpt_val_buff[0], cmpt_sz *
sizeof(int64_t));
856 params[
INIT_AGG_VALS], &init_agg_vals[0], init_agg_vals.size() *
sizeof(int64_t));
861 params[
ERROR_CODE], &error_codes[0], error_codes.size() *
sizeof(error_codes[0]));
866 const auto hash_table_count = join_hash_tables.size();
867 switch (hash_table_count) {
879 &join_hash_tables[0],
880 hash_table_count *
sizeof(int64_t));
const Executor * executor_
size_t getSlotCount() const
RenderAllocator * getRenderAllocator(size_t device_id)
size_t getEntryCount() const
std::unique_ptr< DeviceAllocator > gpu_allocator_
size_t get_num_allocated_rows_from_gpu(DeviceAllocator &device_allocator, int8_t *projection_size_gpu, const int device_id)
QueryExecutionContext(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &, const Executor *executor, const ExecutorDeviceType device_type, const ExecutorDispatchMode dispatch_mode, const int device_id, const shared::TableKey &outer_table_key, const int64_t num_rows, const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< std::vector< uint64_t >> &frag_offsets, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const bool output_columnar, const bool sort_on_gpu, const size_t thread_idx, RenderInfo *)
void sort_on_gpu(int64_t *val_buff, int32_t *idx_buff, const uint64_t entry_count, const bool desc, const uint32_t chosen_bytes, ThrustAllocator &alloc, const int device_id)
Streaming Top N algorithm.
bool hasVarlenOutput() const
const std::list< Analyzer::OrderEntry > order_entries
std::vector< int64_t * > launchCpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CpuCompilationContext *fn_ptrs, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, int32_t *error_code, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, const int64_t num_rows_to_process=-1)
bool use_speculative_top_n(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc)
const ExecutorDispatchMode dispatch_mode_
bool hasKeylessHash() const
size_t num_rows_to_process(const size_t start_row_index, const size_t max_fragment_size, const size_t rows_remaining)
std::shared_ptr< ResultSet > ResultSetPtr
bool g_enable_dynamic_watchdog
bool g_enable_non_kernel_time_query_interrupt
void inplace_sort_gpu(const std::list< Analyzer::OrderEntry > &order_entries, const QueryMemoryDescriptor &query_mem_desc, const GpuGroupByBuffers &group_by_buffers, Data_Namespace::DataMgr *data_mgr, const int device_id)
const ExecutorDeviceType device_type_
const bool output_columnar_
std::unique_ptr< QueryMemoryInitializer > query_buffers_
ResultSetPtr getRowSet(const RelAlgExecutionUnit &ra_exe_unit, const QueryMemoryDescriptor &query_mem_desc) const
bool useStreamingTopN() const
#define INJECT_TIMER(DESC)
std::vector< int64_t > compact_init_vals(const size_t cmpt_size, const std::vector< int64_t > &init_vec, const QueryMemoryDescriptor &query_mem_desc)
int64_t getAggInitValForIndex(const size_t index) const
const std::shared_ptr< Analyzer::Estimator > estimator
QueryDescriptionType getQueryDescriptionType() const
QueryMemoryDescriptor query_mem_desc_
std::optional< size_t > varlenOutputBufferElemSize() const
std::vector< int8_t * > prepareKernelParams(const std::vector< std::vector< const int8_t * >> &col_buffers, const std::vector< int8_t > &literal_buff, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_offsets, const int32_t scan_limit, const std::vector< int64_t > &init_agg_vals, const std::vector< int32_t > &error_codes, const uint32_t num_tables, const std::vector< int8_t * > &join_hash_tables, Data_Namespace::DataMgr *data_mgr, const int device_id, const bool hoist_literals, const bool is_group_by) const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
Speculative top N algorithm.
ResultSetPtr groupBufferToDeinterleavedResults(const size_t i) const
ResultSetPtr groupBufferToResults(const size_t i) const
std::unique_ptr< RenderAllocatorMap > render_allocator_map_ptr
std::unique_ptr< DeviceKernel > create_device_kernel(const CompilationContext *ctx, int device_id)
Descriptor for the result set buffer layout.
static QueryMemoryDescriptor fixupQueryMemoryDescriptor(const QueryMemoryDescriptor &)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
size_t getColsSize() const
bool didOutputColumnar() const
bool interleavedBins(const ExecutorDeviceType) const
bool threadsShareMemory() const
#define DEBUG_TIMER(name)
int32_t aggregate_error_codes(const std::vector< int32_t > &error_codes)
Basic constructors and methods of the row set interface.
Execution unit for relational algebra. It's a low-level description of any relational algebra operati...
std::unique_ptr< ResultSet > estimator_result_set_
unsigned g_dynamic_watchdog_time_limit
size_t getColOffInBytes(const size_t col_idx) const
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t getColOffInBytesInNextBin(const size_t col_idx) const
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
std::vector< int64_t * > launchGpuCode(const RelAlgExecutionUnit &ra_exe_unit, const CompilationContext *compilation_context, const bool hoist_literals, const std::vector< int8_t > &literal_buff, std::vector< std::vector< const int8_t * >> col_buffers, const std::vector< std::vector< int64_t >> &num_rows, const std::vector< std::vector< uint64_t >> &frag_row_offsets, const int32_t scan_limit, Data_Namespace::DataMgr *data_mgr, const unsigned block_size_x, const unsigned grid_size_x, const int device_id, const size_t shared_memory_size, int32_t *error_code, const uint32_t num_tables, const bool allow_runtime_interrupt, const std::vector< int8_t * > &join_hash_tables, RenderAllocatorMap *render_allocator_map, bool optimize_cuda_block_and_grid_sizes)