21 #include <thrust/device_ptr.h>
22 #include <thrust/scan.h>
24 #define checkCudaErrors(err) CHECK_EQ(err, cudaSuccess)
26 template <
typename F,
typename... ARGS>
30 checkCudaErrors(cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, func));
32 func<<<grid_size, block_size, 0, qe_cuda_stream>>>(std::forward<ARGS>(
args)...);
37 const int32_t invalid_slot_val,
38 const bool for_semi_join,
43 buff, invalid_slot_val, for_semi_join, join_column, type_info, NULL, NULL, -1, -1);
44 atomicCAS(err, 0, partial_err);
49 const int32_t invalid_slot_val,
50 const bool for_semi_join,
54 const int64_t bucket_normalization) {
64 bucket_normalization);
65 atomicCAS(err, 0, partial_err);
69 const int32_t invalid_slot_val,
70 const bool for_semi_join,
74 const int64_t bucket_normalization) {
82 bucket_normalization);
86 const int32_t invalid_slot_val,
87 const bool for_semi_join,
102 const int32_t invalid_slot_val,
103 const bool for_semi_join,
108 const int64_t bucket_normalization) {
119 bucket_normalization);
120 atomicCAS(err, 0, partial_err);
124 const int32_t invalid_slot_val,
125 const bool for_semi_join,
140 atomicCAS(err, 0, partial_err);
145 const int32_t invalid_slot_val,
146 const bool for_semi_join,
151 const int64_t bucket_normalization) {
160 bucket_normalization);
164 const int32_t invalid_slot_val,
165 const bool for_semi_join,
181 const int64_t hash_entry_count,
182 const int32_t invalid_slot_val) {
187 const int64_t hash_entry_count,
188 const int32_t invalid_slot_val) {
193 #define VALID_POS_FLAG 0
196 const int32_t* count_buff,
197 const int64_t entry_count) {
198 const int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
199 const int32_t step = blockDim.x * gridDim.x;
200 for (int64_t i = start; i < entry_count; i += step) {
209 const int64_t entry_count) {
210 const int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
211 const int32_t step = blockDim.x * gridDim.x;
212 for (int64_t i = start; i < entry_count; i += step) {
214 pos_buff[i] = !i ? 0 : count_buff[i - 1];
219 template <
typename COUNT_MATCHES_FUNCTOR,
typename FILL_ROW_IDS_FUNCTOR>
221 const int64_t hash_entry_count,
224 COUNT_MATCHES_FUNCTOR count_matches_func,
225 FILL_ROW_IDS_FUNCTOR fill_row_ids_func) {
226 int32_t* pos_buff = buff;
227 int32_t* count_buff = buff + hash_entry_count;
230 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
232 count_matches_func();
236 auto count_buff_dev_ptr = thrust::device_pointer_cast(count_buff);
238 count_buff_dev_ptr, count_buff_dev_ptr + hash_entry_count, count_buff_dev_ptr);
242 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
252 auto count_matches_func = [count_buff = buff + hash_entry_count,
258 auto fill_row_ids_func = [buff, hash_entry_count, join_column, type_info] {
277 auto count_matches_func = [count_buff = buff + hash_entry_count,
280 bucket_normalization =
286 bucket_normalization);
289 auto fill_row_ids_func = [buff,
300 bucket_normalization);
317 int32_t* pos_buff = buff;
318 int32_t* count_buff = buff + hash_entry_count;
321 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
328 auto count_buff_dev_ptr = thrust::device_pointer_cast(count_buff);
330 count_buff_dev_ptr, count_buff_dev_ptr + hash_entry_count, count_buff_dev_ptr);
333 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
343 template <
typename T,
typename KEY_HANDLER>
345 const T* composite_key_dict,
346 const int64_t hash_entry_count,
347 const KEY_HANDLER* key_handler,
348 const size_t num_elems) {
349 auto pos_buff = buff;
350 auto count_buff = buff + hash_entry_count;
353 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
364 auto count_buff_dev_ptr = thrust::device_pointer_cast(count_buff);
366 count_buff_dev_ptr, count_buff_dev_ptr + hash_entry_count, count_buff_dev_ptr);
369 cudaMemsetAsync(count_buff, 0, hash_entry_count *
sizeof(int32_t), qe_cuda_stream));
380 template <
typename T>
382 const int64_t entry_count,
383 const size_t key_component_count,
384 const bool with_val_slot,
385 const int32_t invalid_slot_val) {
396 const int64_t entry_count,
397 const size_t key_component_count,
398 const bool with_val_slot,
399 const int32_t invalid_slot_val) {
409 const int64_t entry_count,
410 const size_t key_component_count,
411 const bool with_val_slot,
412 const int32_t invalid_slot_val) {
421 template <
typename T,
typename KEY_HANDLER>
423 const int64_t entry_count,
424 const int32_t invalid_slot_val,
425 const bool for_semi_join,
426 const size_t key_component_count,
427 const bool with_val_slot,
429 const KEY_HANDLER* key_handler,
430 const int64_t num_elems) {
441 atomicCAS(err, 0, partial_err);
445 const int64_t entry_count,
446 const int32_t invalid_slot_val,
447 const bool for_semi_join,
448 const size_t key_component_count,
449 const bool with_val_slot,
452 const int64_t num_elems) {
454 fill_baseline_hash_join_buff_wrapper<int32_t, GenericKeyHandler>,
467 const int64_t entry_count,
468 const int32_t invalid_slot_val,
469 const bool for_semi_join,
470 const size_t key_component_count,
471 const bool with_val_slot,
474 const int64_t num_elems) {
476 fill_baseline_hash_join_buff_wrapper<unsigned long long, GenericKeyHandler>,
490 const int64_t entry_count,
491 const int32_t invalid_slot_val,
492 const size_t key_component_count,
493 const bool with_val_slot,
496 const int64_t num_elems) {
498 fill_baseline_hash_join_buff_wrapper<unsigned long long, OverlapsKeyHandler>,
511 const int64_t entry_count,
512 const int32_t invalid_slot_val,
513 const size_t key_component_count,
514 const bool with_val_slot,
517 const size_t num_elems) {
519 fill_baseline_hash_join_buff_wrapper<unsigned long long, RangeKeyHandler>,
533 const int32_t* composite_key_dict,
534 const int64_t hash_entry_count,
535 const size_t key_component_count,
537 const int64_t num_elems) {
538 fill_one_to_many_baseline_hash_table_on_device<int32_t>(
539 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
544 const int64_t* composite_key_dict,
545 const int64_t hash_entry_count,
547 const int64_t num_elems) {
548 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
549 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
554 const int64_t* composite_key_dict,
555 const int64_t hash_entry_count,
557 const int64_t num_elems) {
558 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
559 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
564 const int64_t* composite_key_dict,
565 const size_t hash_entry_count,
567 const size_t num_elems) {
568 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
569 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
574 int32_t* row_counts_buffer,
576 const int64_t num_elems) {
584 auto row_counts_buffer_ptr = thrust::device_pointer_cast(row_counts_buffer);
586 row_counts_buffer_ptr, row_counts_buffer_ptr + num_elems, row_counts_buffer_ptr);
591 int32_t* row_counts_buffer,
593 const size_t num_elems,
594 const size_t block_size_x,
595 const size_t grid_size_x) {
597 approximate_distinct_tuples_impl_gpu<<<grid_size_x, block_size_x, 0, qe_cuda_stream>>>(
598 hll_buffer, row_counts_buffer, b, num_elems, key_handler);
601 auto row_counts_buffer_ptr = thrust::device_pointer_cast(row_counts_buffer);
603 row_counts_buffer_ptr, row_counts_buffer_ptr + num_elems, row_counts_buffer_ptr);
609 const int64_t num_elems) {
621 const double* bucket_sz_threshold) {
626 bucket_sz_threshold);
__global__ void fill_baseline_hash_join_buff_wrapper(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *err, const KEY_HANDLER *key_handler, const int64_t num_elems)
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const GenericKeyHandler *key_handler, const int64_t num_elems)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
void fill_one_to_many_baseline_hash_table_on_device(int32_t *buff, const SIZE *composite_key_dict, const size_t hash_entry_count, const size_t key_component_count, const KEY_HANDLER *key_handler, const size_t num_elems)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
void fill_baseline_hash_join_buff_on_device_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
void fill_hash_join_buff_on_device_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info)
__global__ void fill_hash_join_buff_bucketized_wrapper(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, int *err, const int64_t bucket_normalization)
void range_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const RangeKeyHandler *key_handler, const size_t num_elems)
__global__ void fill_hash_join_buff_wrapper(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, int *err)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
void fill_one_to_many_baseline_hash_table_on_device_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const GenericKeyHandler *key_handler, const int64_t num_elems)
void compute_bucket_sizes_on_device(double *bucket_sizes_buffer, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
void approximate_distinct_tuples_on_device_range(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const RangeKeyHandler *key_handler, const size_t num_elems, const size_t block_size_x, const size_t grid_size_x)
__global__ void fill_hash_join_buff_wrapper_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, int *err)
__global__ void fill_hash_join_buff_wrapper_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, int *err, const int64_t bucket_normalization)
void fill_one_to_many_hash_table_on_device_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, COUNT_MATCHES_FUNCTOR count_matches_func, FILL_ROW_IDS_FUNCTOR fill_row_ids_func)
CUstream getQueryEngineCudaStream()
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int64_t bucket_normalization
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const HashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void range_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const RangeKeyHandler *key_handler, const size_t num_elems)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
__global__ void init_baseline_hash_join_buff_wrapper(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
__global__ void set_valid_pos_flag(int32_t *pos_buff, const int32_t *count_buff, const int64_t entry_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void cuda_kernel_launch_wrapper(F func, ARGS &&...args)
size_t getNormalizedHashEntryCount() const
void approximate_distinct_tuples_on_device(uint8_t *hll_buffer, const uint32_t b, const GenericKeyHandler *key_handler, const int64_t num_elems)
#define checkCudaErrors(err)
void fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_hash_join_buff_on_device(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info)
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void approximate_distinct_tuples_on_device_overlaps(uint8_t *hll_buffer, const uint32_t b, int32_t *row_counts_buffer, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
__global__ void init_hash_join_buff_wrapper(int32_t *buff, const int64_t hash_entry_count, const int32_t invalid_slot_val)
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)
__global__ void set_valid_pos(int32_t *pos_buff, int32_t *count_buff, const int64_t entry_count)