21 #include <thrust/device_ptr.h>
22 #include <thrust/scan.h>
24 #define checkCudaErrors(err) CHECK_EQ(err, cudaSuccess)
26 template <
typename F,
typename... ARGS>
30 checkCudaErrors(cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, func));
32 func<<<grid_size, block_size, 0, qe_cuda_stream>>>(std::forward<ARGS>(
args)...);
37 const int32_t invalid_slot_val,
38 const bool for_semi_join,
43 buff, invalid_slot_val, for_semi_join, join_column, type_info, NULL, NULL, -1, -1);
44 atomicCAS(err, 0, partial_err);
49 const int32_t invalid_slot_val,
50 const bool for_semi_join,
54 const int64_t bucket_normalization) {
64 bucket_normalization);
65 atomicCAS(err, 0, partial_err);
69 const int32_t invalid_slot_val,
70 const bool for_semi_join,
74 const int64_t bucket_normalization) {
82 bucket_normalization);
86 const int32_t invalid_slot_val,
87 const bool for_semi_join,
102 const int32_t invalid_slot_val,
103 const bool for_semi_join,
108 const int64_t bucket_normalization) {
119 bucket_normalization);
120 atomicCAS(err, 0, partial_err);
124 const int32_t invalid_slot_val,
125 const bool for_semi_join,
140 atomicCAS(err, 0, partial_err);
145 const int32_t invalid_slot_val,
146 const bool for_semi_join,
151 const int64_t bucket_normalization) {
160 bucket_normalization);
164 const int32_t invalid_slot_val,
165 const bool for_semi_join,
181 const int64_t hash_entry_count,
182 const int32_t invalid_slot_val) {
187 const int64_t hash_entry_count,
188 const int32_t invalid_slot_val) {
193 #define VALID_POS_FLAG 0
196 const int32_t* count_buff,
197 const int64_t entry_count) {
198 const int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
199 const int32_t step = blockDim.x * gridDim.x;
200 for (int64_t i = start; i < entry_count; i += step) {
209 const int64_t entry_count) {
210 const int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
211 const int32_t step = blockDim.x * gridDim.x;
212 for (int64_t i = start; i < entry_count; i += step) {
214 pos_buff[i] = !i ? 0 : count_buff[i - 1];
219 template <
typename COUNT_MATCHES_FUNCTOR,
typename FILL_ROW_IDS_FUNCTOR>
221 const int64_t hash_entry_count,
224 COUNT_MATCHES_FUNCTOR count_matches_func,
225 FILL_ROW_IDS_FUNCTOR fill_row_ids_func) {
226 int32_t* pos_buff = buff;
227 int32_t* count_buff = buff + hash_entry_count;
230 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
232 count_matches_func();
236 auto count_buff_dev_ptr = thrust::device_pointer_cast(count_buff);
238 count_buff_dev_ptr, count_buff_dev_ptr + hash_entry_count, count_buff_dev_ptr);
242 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
251 const bool for_window_framing) {
253 auto count_matches_func = [count_buff = buff + hash_entry_count,
259 auto fill_row_ids_func =
260 [buff, hash_entry_count, join_column, type_info, for_window_framing] {
283 auto count_matches_func = [count_buff = buff + hash_entry_count,
286 bucket_normalization =
292 bucket_normalization);
295 auto fill_row_ids_func = [buff,
306 bucket_normalization);
324 int32_t* pos_buff = buff;
325 int32_t* count_buff = buff + hash_entry_count;
328 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
335 auto count_buff_dev_ptr = thrust::device_pointer_cast(count_buff);
337 count_buff_dev_ptr, count_buff_dev_ptr + hash_entry_count, count_buff_dev_ptr);
340 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
350 template <
typename T,
typename KEY_HANDLER>
352 const T* composite_key_dict,
353 const int64_t hash_entry_count,
354 const KEY_HANDLER* key_handler,
355 const size_t num_elems,
356 const bool for_window_framing) {
357 auto pos_buff = buff;
358 auto count_buff = buff + hash_entry_count;
361 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
372 auto count_buff_dev_ptr = thrust::device_pointer_cast(count_buff);
374 count_buff_dev_ptr, count_buff_dev_ptr + hash_entry_count, count_buff_dev_ptr);
377 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
389 template <
typename T>
391 const int64_t entry_count,
392 const size_t key_component_count,
393 const bool with_val_slot,
394 const int32_t invalid_slot_val) {
405 const int64_t entry_count,
406 const size_t key_component_count,
407 const bool with_val_slot,
408 const int32_t invalid_slot_val) {
418 const int64_t entry_count,
419 const size_t key_component_count,
420 const bool with_val_slot,
421 const int32_t invalid_slot_val) {
430 template <
typename T,
typename KEY_HANDLER>
432 const int64_t entry_count,
433 const int32_t invalid_slot_val,
434 const bool for_semi_join,
435 const size_t key_component_count,
436 const bool with_val_slot,
438 const KEY_HANDLER* key_handler,
439 const int64_t num_elems) {
450 atomicCAS(err, 0, partial_err);
454 const int64_t entry_count,
455 const int32_t invalid_slot_val,
456 const bool for_semi_join,
457 const size_t key_component_count,
458 const bool with_val_slot,
461 const int64_t num_elems) {
463 fill_baseline_hash_join_buff_wrapper<int32_t, GenericKeyHandler>,
476 const int64_t entry_count,
477 const int32_t invalid_slot_val,
478 const bool for_semi_join,
479 const size_t key_component_count,
480 const bool with_val_slot,
483 const int64_t num_elems) {
485 fill_baseline_hash_join_buff_wrapper<unsigned long long, GenericKeyHandler>,
499 const int64_t entry_count,
500 const int32_t invalid_slot_val,
501 const size_t key_component_count,
502 const bool with_val_slot,
505 const int64_t num_elems) {
507 fill_baseline_hash_join_buff_wrapper<unsigned long long, OverlapsKeyHandler>,
520 const int64_t entry_count,
521 const int32_t invalid_slot_val,
522 const size_t key_component_count,
523 const bool with_val_slot,
526 const size_t num_elems) {
528 fill_baseline_hash_join_buff_wrapper<unsigned long long, RangeKeyHandler>,
542 const int32_t* composite_key_dict,
543 const int64_t hash_entry_count,
544 const size_t key_component_count,
546 const int64_t num_elems,
547 const bool for_window_framing) {
548 fill_one_to_many_baseline_hash_table_on_device<int32_t>(buff,
558 const int64_t* composite_key_dict,
559 const int64_t hash_entry_count,
561 const int64_t num_elems,
562 const bool for_window_framing) {
563 fill_one_to_many_baseline_hash_table_on_device<int64_t>(buff,
573 const int64_t* composite_key_dict,
574 const int64_t hash_entry_count,
576 const int64_t num_elems) {
577 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
578 buff, composite_key_dict, hash_entry_count, key_handler, num_elems,
false);
583 const int64_t* composite_key_dict,
584 const size_t hash_entry_count,
586 const size_t num_elems) {
587 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
588 buff, composite_key_dict, hash_entry_count, key_handler, num_elems,
false);
593 int32_t* row_counts_buffer,
595 const int64_t num_elems) {
603 auto row_counts_buffer_ptr = thrust::device_pointer_cast(row_counts_buffer);
605 row_counts_buffer_ptr, row_counts_buffer_ptr + num_elems, row_counts_buffer_ptr);
610 int32_t* row_counts_buffer,
612 const size_t num_elems,
613 const size_t block_size_x,
614 const size_t grid_size_x) {
616 approximate_distinct_tuples_impl_gpu<<<grid_size_x, block_size_x, 0, qe_cuda_stream>>>(
617 hll_buffer, row_counts_buffer, b, num_elems, key_handler);
620 auto row_counts_buffer_ptr = thrust::device_pointer_cast(row_counts_buffer);
622 row_counts_buffer_ptr, row_counts_buffer_ptr + num_elems, row_counts_buffer_ptr);
628 const int64_t num_elems) {
640 const double* bucket_sz_threshold) {
645 bucket_sz_threshold);
__global__ void fill_baseline_hash_join_buff_wrapper(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *err, const KEY_HANDLER *key_handler, const int64_t num_elems)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
void fill_one_to_many_baseline_hash_table_on_device(int32_t *buff, const SIZE *composite_key_dict, const size_t hash_entry_count, const size_t key_component_count, const KEY_HANDLER *key_handler, const size_t num_elems, const bool for_window_framing)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
void fill_baseline_hash_join_buff_on_device_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
void fill_hash_join_buff_on_device_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info)
__global__ void fill_hash_join_buff_bucketized_wrapper(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, int *err, const int64_t bucket_normalization)
void range_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const RangeKeyHandler *key_handler, const size_t num_elems)
__global__ void fill_hash_join_buff_wrapper(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, int *err)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
void approximate_distinct_tuples_on_device_range(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const RangeKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
__global__ void fill_hash_join_buff_wrapper_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, int *err)
__global__ void fill_hash_join_buff_wrapper_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, int *err, const int64_t bucket_normalization)
int64_t bucket_normalization
void fill_one_to_many_hash_table_on_device_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, COUNT_MATCHES_FUNCTOR count_matches_func, FILL_ROW_IDS_FUNCTOR fill_row_ids_func)
CUstream getQueryEngineCudaStream()
void fill_one_to_many_baseline_hash_table_on_device_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const GenericKeyHandler *key_handler, const int64_t num_elems, const bool for_window_framing)
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const GenericKeyHandler *key_handler, const int64_t num_elems, const bool for_window_framing)
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const bool for_window_framing)
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void range_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const RangeKeyHandler *key_handler, const size_t num_elems)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
__global__ void init_baseline_hash_join_buff_wrapper(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
size_t getNormalizedHashEntryCount() const
__global__ void set_valid_pos_flag(int32_t *pos_buff, const int32_t *count_buff, const int64_t entry_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void cuda_kernel_launch_wrapper(F func, ARGS &&...args)
void approximate_distinct_tuples_on_device(uint8_t *hll_buffer, const uint32_t b, const GenericKeyHandler *key_handler, const int64_t num_elems)
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const bool for_window_framing, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define checkCudaErrors(err)
void fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info)
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
__global__ void init_hash_join_buff_wrapper(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val)
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)
size_t bucketized_hash_entry_count
__global__ void set_valid_pos(int32_t *pos_buff, int32_t *count_buff, const int64_t entry_count)