38 #include <tbb/parallel_for.h>
45 #include <thrust/scan.h>
56 const int64_t min_inner_elem,
57 const int64_t min_outer_elem,
58 const int64_t max_outer_elem,
59 const int32_t* inner_to_outer_translation_map) {
60 const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
61 if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
71 const int64_t hash_entry_count,
72 const int32_t invalid_slot_val,
73 const int32_t cpu_thread_idx,
74 const int32_t cpu_thread_count) {
76 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
77 int32_t step = blockDim.x * gridDim.x;
79 int32_t start = cpu_thread_idx;
80 int32_t step = cpu_thread_count;
82 for (int64_t i = start; i < hash_entry_count; i += step) {
83 groups_buffer[i] = invalid_slot_val;
90 void SUFFIX(init_hash_join_buff_tbb)(int32_t* groups_buffer,
91 const int64_t hash_entry_count,
92 const int32_t invalid_slot_val) {
94 [=](
const tbb::blocked_range<int64_t>& r) {
95 const auto start_idx = r.begin();
96 const auto end_idx = r.end();
97 for (
auto entry_idx = start_idx; entry_idx != end_idx;
99 groups_buffer[entry_idx] = invalid_slot_val;
104 #endif // #ifdef HAVE_TBB
105 #endif // #ifndef __CUDACC__
108 #define mapd_cas(address, compare, val) atomicCAS(address, compare, val)
109 #elif defined(_MSC_VER)
110 #define mapd_cas(address, compare, val) \
111 InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
112 static_cast<long>(val), \
113 static_cast<long>(compare))
115 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
118 template <
typename HASHTABLE_FILLING_FUNC>
122 const int32_t* sd_inner_to_outer_translation_map,
123 const int32_t min_inner_elem,
124 const int32_t cpu_thread_idx,
125 const int32_t cpu_thread_count,
126 HASHTABLE_FILLING_FUNC filling_func) {
128 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
129 int32_t step = blockDim.x * gridDim.x;
131 int32_t start = cpu_thread_idx;
132 int32_t step = cpu_thread_count;
135 for (
auto item : col.slice(start, step)) {
136 const size_t index = item.index;
137 int64_t elem = item.element;
146 if (sd_inner_to_outer_translation_map &&
152 sd_inner_to_outer_translation_map);
159 if (filling_func(elem, index)) {
168 const int32_t invalid_slot_val,
169 const bool for_semi_join,
172 const int32_t* sd_inner_to_outer_translation_map,
173 const int32_t min_inner_elem,
174 const int32_t cpu_thread_idx,
175 const int32_t cpu_thread_count,
176 const int64_t bucket_normalization) {
179 auto hashtable_filling_func = [&](
auto elem,
size_t index) {
183 type_info.min_val / bucket_normalization,
184 type_info.translated_null_val,
185 bucket_normalization);
186 return filling_func(index, entry_ptr, invalid_slot_val);
192 sd_inner_to_outer_translation_map,
196 hashtable_filling_func);
200 const int32_t invalid_slot_val,
201 const bool for_semi_join,
204 const int32_t* sd_inner_to_outer_translation_map,
205 const int32_t min_inner_elem,
206 const int32_t cpu_thread_idx,
207 const int32_t cpu_thread_count) {
210 auto hashtable_filling_func = [&](
auto elem,
size_t index) {
212 return filling_func(index, entry_ptr, invalid_slot_val);
218 sd_inner_to_outer_translation_map,
222 hashtable_filling_func);
225 template <
typename HASHTABLE_FILLING_FUNC>
231 const int32_t* sd_inner_to_outer_translation_map,
232 const int32_t min_inner_elem,
233 const int32_t cpu_thread_idx,
234 const int32_t cpu_thread_count,
235 HASHTABLE_FILLING_FUNC filling_func) {
237 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
238 int32_t step = blockDim.x * gridDim.x;
240 int32_t start = cpu_thread_idx;
241 int32_t step = cpu_thread_count;
244 for (
auto item : col.slice(start, step)) {
245 const size_t index = item.index;
246 int64_t elem = item.element;
248 if (shard != shard_info.
shard) {
259 if (sd_inner_to_outer_translation_map &&
265 sd_inner_to_outer_translation_map);
272 if (filling_func(elem, shard, index)) {
281 const int32_t invalid_slot_val,
282 const bool for_semi_join,
286 const int32_t* sd_inner_to_outer_translation_map,
287 const int32_t min_inner_elem,
288 const int32_t cpu_thread_idx,
289 const int32_t cpu_thread_count,
290 const int64_t bucket_normalization) {
293 auto hashtable_filling_func = [&](
auto elem,
auto shard,
size_t index) {
297 type_info.min_val / bucket_normalization,
298 type_info.translated_null_val,
299 shard_info.entry_count_per_shard,
301 shard_info.num_shards,
302 shard_info.device_count,
303 bucket_normalization);
304 return filling_func(index, entry_ptr, invalid_slot_val);
311 sd_inner_to_outer_translation_map,
315 hashtable_filling_func);
320 const int32_t invalid_slot_val,
321 const bool for_semi_join,
325 const int32_t* sd_inner_to_outer_translation_map,
326 const int32_t min_inner_elem,
327 const int32_t cpu_thread_idx,
328 const int32_t cpu_thread_count) {
331 auto hashtable_filling_func = [&](
auto elem,
auto shard,
size_t index) {
335 shard_info.entry_count_per_shard,
337 shard_info.num_shards,
338 shard_info.device_count);
339 return filling_func(index, entry_ptr, invalid_slot_val);
346 sd_inner_to_outer_translation_map,
350 hashtable_filling_func);
353 template <
typename T>
355 const int64_t entry_count,
356 const size_t key_component_count,
357 const bool with_val_slot,
358 const int32_t invalid_slot_val,
359 const int32_t cpu_thread_idx,
360 const int32_t cpu_thread_count) {
362 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
363 int32_t step = blockDim.x * gridDim.x;
365 int32_t start = cpu_thread_idx;
366 int32_t step = cpu_thread_count;
368 auto hash_entry_size = (key_component_count + (with_val_slot ? 1 : 0)) *
sizeof(
T);
370 for (int64_t h = start; h < entry_count; h += step) {
371 int64_t off = h * hash_entry_size;
372 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + off);
373 for (
size_t i = 0; i < key_component_count; ++i) {
374 row_ptr[i] = empty_key;
377 row_ptr[key_component_count] = invalid_slot_val;
385 template <
typename T>
386 DEVICE void SUFFIX(init_baseline_hash_join_buff_tbb)(int8_t* hash_buff,
387 const int64_t entry_count,
388 const size_t key_component_count,
389 const bool with_val_slot,
390 const int32_t invalid_slot_val) {
391 const auto hash_entry_size =
392 (key_component_count + (with_val_slot ? 1 : 0)) *
sizeof(
T);
395 [=](
const tbb::blocked_range<int64_t>& r) {
396 const auto start_idx = r.begin();
397 const auto end_idx = r.end();
398 for (int64_t entry_idx = start_idx; entry_idx < end_idx;
400 const int64_t offset = entry_idx * hash_entry_size;
401 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + offset);
402 for (
size_t k = 0; k < key_component_count; ++k) {
403 row_ptr[k] = empty_key;
406 row_ptr[key_component_count] = invalid_slot_val;
412 #endif // #ifdef HAVE_TBB
413 #endif // #ifndef __CUDACC__
416 template <
typename T>
420 const size_t key_component_count,
421 const int64_t hash_entry_size) {
422 uint32_t off = h * hash_entry_size;
423 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + off);
426 const T old = atomicCAS(row_ptr, empty_key, *key);
427 if (empty_key == old && key_component_count > 1) {
428 for (int64_t i = 1; i <= key_component_count - 1; ++i) {
429 atomicExch(row_ptr + i, key[i]);
433 if (key_component_count > 1) {
434 while (atomicAdd(row_ptr + key_component_count - 1, 0) == empty_key) {
440 for (uint32_t i = 0; i < key_component_count; ++i) {
441 if (row_ptr[i] != key[i]) {
448 return reinterpret_cast<T*
>(row_ptr + key_component_count);
455 #define cas_cst(ptr, expected, desired) \
456 (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
457 reinterpret_cast<void*>(&desired), \
458 expected) == expected)
459 #define store_cst(ptr, val) \
460 InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
461 reinterpret_cast<void*>(val))
462 #define load_cst(ptr) \
463 InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
465 #define cas_cst(ptr, expected, desired) \
466 __atomic_compare_exchange_n( \
467 ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
468 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
469 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
472 template <
typename T>
476 const size_t key_component_count,
477 const int64_t hash_entry_size) {
478 uint32_t off = h * hash_entry_size;
479 auto row_ptr =
reinterpret_cast<T*
>(hash_buff + off);
482 if (
UNLIKELY(*key == write_pending)) {
487 const bool success =
cas_cst(row_ptr, &empty_key, write_pending);
489 if (key_component_count > 1) {
490 memcpy(row_ptr + 1, key + 1, (key_component_count - 1) *
sizeof(
T));
493 return reinterpret_cast<T*
>(row_ptr + key_component_count);
495 while (
load_cst(row_ptr) == write_pending) {
498 for (
size_t i = 0; i < key_component_count; ++i) {
499 if (
load_cst(row_ptr + i) != key[i]) {
503 return reinterpret_cast<T*
>(row_ptr + key_component_count);
512 template <
typename T>
515 const int64_t entry_count,
517 const size_t key_component_count,
518 const bool with_val_slot,
519 const int32_t invalid_slot_val,
520 const size_t key_size_in_bytes,
521 const size_t hash_entry_size) {
522 const uint32_t h =
MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
524 hash_buff, h, key, key_component_count, hash_entry_size);
525 if (!matching_group) {
526 uint32_t h_probe = (h + 1) % entry_count;
527 while (h_probe != h) {
529 hash_buff, h_probe, key, key_component_count, hash_entry_size);
530 if (matching_group) {
533 h_probe = (h_probe + 1) % entry_count;
536 if (!matching_group) {
539 if (!with_val_slot) {
542 if (
mapd_cas(matching_group, invalid_slot_val, val) != invalid_slot_val) {
548 template <
typename T>
551 const int64_t entry_count,
553 const size_t key_component_count,
554 const bool with_val_slot,
555 const int32_t invalid_slot_val,
556 const size_t key_size_in_bytes,
557 const size_t hash_entry_size) {
558 const uint32_t h =
MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
560 hash_buff, h, key, key_component_count, hash_entry_size);
561 if (!matching_group) {
562 uint32_t h_probe = (h + 1) % entry_count;
563 while (h_probe != h) {
565 hash_buff, h_probe, key, key_component_count, hash_entry_size);
566 if (matching_group) {
569 h_probe = (h_probe + 1) % entry_count;
572 if (!matching_group) {
575 if (!with_val_slot) {
578 mapd_cas(matching_group, invalid_slot_val, val);
582 template <
typename T,
typename FILL_HANDLER>
584 const int64_t entry_count,
585 const int32_t invalid_slot_val,
586 const bool for_semi_join,
587 const size_t key_component_count,
588 const bool with_val_slot,
589 const FILL_HANDLER*
f,
590 const int64_t num_elems,
591 const int32_t cpu_thread_idx,
592 const int32_t cpu_thread_count) {
594 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
595 int32_t step = blockDim.x * gridDim.x;
597 int32_t start = cpu_thread_idx;
598 int32_t step = cpu_thread_count;
602 const size_t key_size_in_bytes = key_component_count *
sizeof(
T);
603 const size_t hash_entry_size =
604 (key_component_count + (with_val_slot ? 1 : 0)) *
sizeof(
T);
605 auto key_buff_handler = [hash_buff,
611 &for_semi_join](
const int64_t entry_idx,
612 const T* key_scratch_buffer,
613 const size_t key_component_count) {
615 return write_baseline_hash_slot_for_semi_join<T>(entry_idx,
625 return write_baseline_hash_slot<T>(entry_idx,
638 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
639 for (
auto& it : cols.slice(start, step)) {
640 const auto err = (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
651 #define mapd_add(address, val) atomicAdd(address, val)
652 #elif defined(_MSC_VER)
653 #define mapd_add(address, val) \
654 InterlockedExchangeAdd(reinterpret_cast<volatile long*>(address), \
655 static_cast<long>(val))
657 #define mapd_add(address, val) __sync_fetch_and_add(address, val)
660 template <
typename SLOT_
SELECTOR>
666 const int32_t* sd_inner_to_outer_translation_map,
667 const int32_t min_inner_elem,
668 const int32_t cpu_thread_idx,
669 const int32_t cpu_thread_count
672 SLOT_SELECTOR slot_selector) {
674 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
675 int32_t step = blockDim.x * gridDim.x;
677 int32_t start = cpu_thread_idx;
678 int32_t step = cpu_thread_count;
681 for (
auto item : col.slice(start, step)) {
682 int64_t elem = item.element;
683 if (elem == type_info.null_val) {
684 if (type_info.uses_bw_eq) {
685 elem = type_info.translated_null_val;
691 if (sd_inner_to_outer_translation_map &&
692 (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
697 sd_inner_to_outer_translation_map);
704 auto* entry_ptr = slot_selector(count_buff, elem);
714 const int32_t* sd_inner_to_outer_translation_map,
715 const int32_t min_inner_elem,
716 const int32_t cpu_thread_idx,
717 const int32_t cpu_thread_count
720 auto slot_sel = [&type_info](
auto count_buff,
auto elem) {
728 sd_inner_to_outer_translation_map,
743 const int32_t* sd_inner_to_outer_translation_map,
744 const int32_t min_inner_elem,
745 const int32_t cpu_thread_idx,
746 const int32_t cpu_thread_count
749 const int64_t bucket_normalization) {
750 auto slot_sel = [bucket_normalization, &type_info](
auto count_buff,
auto elem) {
753 type_info.min_val / bucket_normalization,
754 type_info.translated_null_val,
755 bucket_normalization);
762 sd_inner_to_outer_translation_map,
778 const int32_t* sd_inner_to_outer_translation_map,
779 const int32_t min_inner_elem,
780 const int32_t cpu_thread_idx,
781 const int32_t cpu_thread_count
785 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
786 int32_t step = blockDim.x * gridDim.x;
788 int32_t start = cpu_thread_idx;
789 int32_t step = cpu_thread_count;
792 for (
auto item : col.slice(start, step)) {
793 int64_t elem = item.element;
794 if (elem == type_info.null_val) {
795 if (type_info.uses_bw_eq) {
796 elem = type_info.translated_null_val;
802 if (sd_inner_to_outer_translation_map &&
803 (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
808 sd_inner_to_outer_translation_map);
818 shard_info.entry_count_per_shard,
819 shard_info.num_shards,
820 shard_info.device_count);
825 template <
typename T>
828 const size_t key_component_count,
829 const T* composite_key_dict,
830 const int64_t entry_count,
831 const size_t key_size_in_bytes) {
832 const uint32_t h =
MurmurHash1Impl(key, key_size_in_bytes, 0) % entry_count;
833 uint32_t off = h * key_component_count;
834 if (
keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
835 return &composite_key_dict[off];
837 uint32_t h_probe = (h + 1) % entry_count;
838 while (h_probe != h) {
839 off = h_probe * key_component_count;
840 if (
keys_are_equal(&composite_key_dict[off], key, key_component_count)) {
841 return &composite_key_dict[off];
843 h_probe = (h_probe + 1) % entry_count;
853 template <
typename T,
typename KEY_HANDLER>
855 const T* composite_key_dict,
856 const int64_t entry_count,
857 const KEY_HANDLER*
f,
858 const int64_t num_elems
861 const int32_t cpu_thread_idx,
862 const int32_t cpu_thread_count
866 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
867 int32_t step = blockDim.x * gridDim.x;
869 int32_t start = cpu_thread_idx;
870 int32_t step = cpu_thread_count;
873 assert(composite_key_dict);
876 const size_t key_size_in_bytes = f->get_key_component_count() *
sizeof(
T);
877 auto key_buff_handler = [composite_key_dict,
880 key_size_in_bytes](
const int64_t row_entry_idx,
881 const T* key_scratch_buff,
882 const size_t key_component_count) {
883 const auto matching_group =
889 const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
890 mapd_add(&count_buff[entry_idx], int32_t(1));
895 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
896 for (
auto& it : cols.slice(start, step)) {
897 (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
901 template <
typename SLOT_
SELECTOR>
903 const int64_t hash_entry_count,
906 const bool for_window_framing
909 const int32_t* sd_inner_to_outer_translation_map,
910 const int32_t min_inner_elem,
911 const int32_t cpu_thread_idx,
912 const int32_t cpu_thread_count
915 SLOT_SELECTOR slot_selector) {
916 int32_t* pos_buff = buff;
917 int32_t* count_buff = buff + hash_entry_count;
918 int32_t* id_buff = count_buff + hash_entry_count;
919 int32_t* reversed_id_buff =
920 for_window_framing ? id_buff + join_column.
num_elems :
nullptr;
923 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
924 int32_t step = blockDim.x * gridDim.x;
926 int32_t start = cpu_thread_idx;
927 int32_t step = cpu_thread_count;
930 for (
auto item : col.slice(start, step)) {
931 const size_t index = item.index;
932 int64_t elem = item.element;
941 if (sd_inner_to_outer_translation_map &&
947 sd_inner_to_outer_translation_map);
954 auto pos_ptr = slot_selector(pos_buff, elem);
955 const auto bin_idx = pos_ptr - pos_buff;
956 const auto id_buff_idx =
mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
957 id_buff[id_buff_idx] =
static_cast<int32_t
>(index);
958 if (for_window_framing) {
959 reversed_id_buff[index] = id_buff_idx;
965 const int64_t hash_entry_count,
968 const bool for_window_framing
971 const int32_t* sd_inner_to_outer_translation_map,
972 const int32_t min_inner_elem,
973 const int32_t cpu_thread_idx,
974 const int32_t cpu_thread_count
977 auto slot_sel = [&type_info](
auto pos_buff,
auto elem) {
988 sd_inner_to_outer_translation_map,
999 const int64_t hash_entry_count,
1004 const int32_t* sd_inner_to_outer_translation_map,
1005 const int32_t min_inner_elem,
1006 const int32_t cpu_thread_idx,
1007 const int32_t cpu_thread_count
1010 const int64_t bucket_normalization) {
1011 auto slot_sel = [&type_info, bucket_normalization](
auto pos_buff,
auto elem) {
1014 type_info.min_val / bucket_normalization,
1015 type_info.translated_null_val,
1016 bucket_normalization);
1025 sd_inner_to_outer_translation_map,
1034 template <
typename SLOT_
SELECTOR>
1036 const int64_t hash_entry_count,
1042 const int32_t* sd_inner_to_outer_translation_map,
1043 const int32_t min_inner_elem,
1044 const int32_t cpu_thread_idx,
1045 const int32_t cpu_thread_count
1048 SLOT_SELECTOR slot_selector) {
1050 int32_t* pos_buff = buff;
1051 int32_t* count_buff = buff + hash_entry_count;
1052 int32_t* id_buff = count_buff + hash_entry_count;
1055 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1056 int32_t step = blockDim.x * gridDim.x;
1058 int32_t start = cpu_thread_idx;
1059 int32_t step = cpu_thread_count;
1062 for (
auto item : col.slice(start, step)) {
1063 const size_t index = item.index;
1064 int64_t elem = item.element;
1073 if (sd_inner_to_outer_translation_map &&
1079 sd_inner_to_outer_translation_map);
1086 auto* pos_ptr = slot_selector(pos_buff, elem);
1087 const auto bin_idx = pos_ptr - pos_buff;
1088 const auto id_buff_idx =
mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1089 id_buff[id_buff_idx] =
static_cast<int32_t
>(index);
1094 const int64_t hash_entry_count,
1100 const int32_t* sd_inner_to_outer_translation_map,
1101 const int32_t min_inner_elem,
1102 const int32_t cpu_thread_idx,
1103 const int32_t cpu_thread_count
1106 auto slot_sel = [&type_info, &shard_info](
auto pos_buff,
auto elem) {
1110 shard_info.entry_count_per_shard,
1111 shard_info.num_shards,
1112 shard_info.device_count);
1122 sd_inner_to_outer_translation_map,
1133 const int64_t hash_entry_count,
1139 const int32_t* sd_inner_to_outer_translation_map,
1140 const int32_t min_inner_elem,
1141 const int32_t cpu_thread_idx,
1142 const int32_t cpu_thread_count
1145 const int64_t bucket_normalization) {
1146 auto slot_sel = [&shard_info, &type_info, bucket_normalization](
auto pos_buff,
1151 type_info.min_val / bucket_normalization,
1152 type_info.translated_null_val,
1153 shard_info.entry_count_per_shard,
1154 shard_info.num_shards,
1155 shard_info.device_count,
1156 bucket_normalization);
1166 sd_inner_to_outer_translation_map,
1175 template <
typename T,
typename KEY_HANDLER>
1177 const T* composite_key_dict,
1178 const int64_t hash_entry_count,
1179 const KEY_HANDLER*
f,
1180 const int64_t num_elems,
1181 const bool for_window_framing
1184 const int32_t cpu_thread_idx,
1185 const int32_t cpu_thread_count
1188 int32_t* pos_buff = buff;
1189 int32_t* count_buff = buff + hash_entry_count;
1190 int32_t* id_buff = count_buff + hash_entry_count;
1191 int32_t* reversed_id_buff = for_window_framing ? id_buff + num_elems :
nullptr;
1193 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1194 int32_t step = blockDim.x * gridDim.x;
1196 int32_t start = cpu_thread_idx;
1197 int32_t step = cpu_thread_count;
1202 assert(composite_key_dict);
1204 const size_t key_size_in_bytes = f->get_key_component_count() *
sizeof(
T);
1205 auto key_buff_handler = [composite_key_dict,
1212 for_window_framing](
const int64_t row_index,
1213 const T* key_scratch_buff,
1214 const size_t key_component_count) {
1215 const T* matching_group =
1217 key_component_count,
1221 const auto entry_idx = (matching_group - composite_key_dict) / key_component_count;
1222 int32_t* pos_ptr = pos_buff + entry_idx;
1223 const auto bin_idx = pos_ptr - pos_buff;
1224 const auto id_buff_idx =
mapd_add(count_buff + bin_idx, 1) + *pos_ptr;
1225 id_buff[id_buff_idx] =
static_cast<int32_t
>(row_index);
1226 if (for_window_framing) {
1227 reversed_id_buff[row_index] = id_buff_idx;
1233 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1234 for (
auto& it : cols.slice(start, step)) {
1235 (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1242 template <
typename KEY_HANDLER>
1244 int32_t* row_count_buffer,
1246 const int64_t num_elems,
1247 const KEY_HANDLER* f
1250 const int32_t cpu_thread_idx,
1251 const int32_t cpu_thread_count
1255 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1256 int32_t step = blockDim.x * gridDim.x;
1258 int32_t start = cpu_thread_idx;
1259 int32_t step = cpu_thread_count;
1262 auto key_buff_handler = [b, hll_buffer, row_count_buffer](
1263 const int64_t entry_idx,
1264 const int64_t* key_scratch_buff,
1265 const size_t key_component_count) {
1266 if (row_count_buffer) {
1267 row_count_buffer[entry_idx] += 1;
1270 const uint64_t hash =
1272 const uint32_t index = hash >> (64 - b);
1273 const auto rank =
get_rank(hash << b, 64 - b);
1275 atomicMax(reinterpret_cast<int32_t*>(hll_buffer) + index, rank);
1277 hll_buffer[index] = std::max(hll_buffer[index], rank);
1286 f->get_number_of_columns(), f->get_join_columns(), f->get_join_column_type_infos());
1287 for (
auto& it : cols.slice(start, step)) {
1288 (*f)(it.join_column_iterators, key_scratch_buff, key_buff_handler);
1296 __device__
double atomicMin(
double* address,
double val) {
1297 unsigned long long int* address_as_ull = (
unsigned long long int*)address;
1298 unsigned long long int old = *address_as_ull, assumed;
1302 old = atomicCAS(address_as_ull,
1304 __double_as_longlong(min(val, __longlong_as_double(assumed))));
1305 }
while (assumed != old);
1307 return __longlong_as_double(old);
1316 const double* bucket_size_thresholds
1319 const int32_t cpu_thread_idx,
1320 const int32_t cpu_thread_count
1324 int32_t start = threadIdx.x + blockDim.x * blockIdx.x;
1325 int32_t step = blockDim.x * gridDim.x;
1327 int32_t start = cpu_thread_idx;
1328 int32_t step = cpu_thread_count;
1333 double bounds[2 *
N];
1334 for (
size_t j = 0; j < 2 *
N; j++) {
1338 for (
size_t j = 0; j <
N; j++) {
1339 const auto diff = bounds[j +
N] - bounds[j];
1341 if (diff > bucket_size_thresholds[j]) {
1342 atomicMin(&bucket_sizes_for_thread[j], diff);
1345 if (diff < bucket_size_thresholds[j] && diff > bucket_sizes_for_thread[j]) {
1346 bucket_sizes_for_thread[j] = diff;
1355 template <
typename InputIterator,
typename OutputIterator>
1359 const size_t thread_count) {
1360 using ElementType =
typename InputIterator::value_type;
1361 using OffsetType =
typename InputIterator::difference_type;
1362 const OffsetType elem_count = last - first;
1363 if (elem_count < 10000 || thread_count <= 1) {
1364 ElementType sum = 0;
1365 for (
auto iter = first; iter != last; ++iter, ++out) {
1366 *out = sum += *iter;
1371 const OffsetType step = (elem_count + thread_count - 1) / thread_count;
1372 OffsetType start_off = 0;
1373 OffsetType end_off = std::min(step, elem_count);
1374 std::vector<ElementType> partial_sums(thread_count);
1375 std::vector<std::future<void>> counter_threads;
1376 for (
size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx,
1377 start_off = std::min(start_off + step, elem_count),
1378 end_off = std::min(start_off + step, elem_count)) {
1382 ElementType&
partial_sum,
const OffsetType start,
const OffsetType end) {
1383 ElementType sum = 0;
1384 for (
auto in_iter = first + start, out_iter = out + start;
1385 in_iter != (first + end);
1386 ++in_iter, ++out_iter) {
1387 *out_iter = sum += *in_iter;
1391 std::ref(partial_sums[thread_idx]),
1395 for (
auto& child : counter_threads) {
1399 ElementType sum = 0;
1400 for (
auto& s : partial_sums) {
1405 counter_threads.clear();
1406 start_off = std::min(step, elem_count);
1407 end_off = std::min(start_off + step, elem_count);
1408 for (
size_t thread_idx = 0; thread_idx < thread_count - 1; ++thread_idx,
1409 start_off = std::min(start_off + step, elem_count),
1410 end_off = std::min(start_off + step, elem_count)) {
1413 [out](
const ElementType prev_sum,
const OffsetType start,
const OffsetType end) {
1414 for (
auto iter = out + start; iter != (out + end); ++iter) {
1418 partial_sums[thread_idx],
1422 for (
auto& child : counter_threads) {
1427 template <
typename COUNT_MATCHES_LAUNCH_FUNCTOR,
typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1429 const int64_t hash_entry_count,
1432 const int32_t* sd_inner_to_outer_translation_map,
1433 const int32_t min_inner_elem,
1434 const unsigned cpu_thread_count,
1435 const bool for_window_framing,
1436 COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func,
1437 FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func) {
1439 int32_t* pos_buff = buff;
1440 int32_t* count_buff = buff + hash_entry_count;
1441 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1442 std::vector<std::future<void>> counter_threads;
1443 for (
unsigned cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1448 for (
auto& child : counter_threads) {
1452 std::vector<int32_t> count_copy(hash_entry_count, 0);
1453 CHECK_GT(hash_entry_count, int64_t(0));
1454 memcpy(count_copy.data() + 1, count_buff, (hash_entry_count - 1) *
sizeof(int32_t));
1459 count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1461 std::vector<std::future<void>> pos_threads;
1462 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1465 [&](
size_t thread_idx) {
1466 for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1467 if (count_buff[i]) {
1468 pos_buff[i] = count_copy[i];
1474 for (
auto& child : pos_threads) {
1478 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1479 std::vector<std::future<void>> rowid_threads;
1480 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1485 for (
auto& child : rowid_threads) {
1494 const int32_t* sd_inner_to_outer_translation_map,
1495 const int32_t min_inner_elem,
1496 const unsigned cpu_thread_count,
1497 const bool for_window_framing) {
1499 auto launch_count_matches =
1503 sd_inner_to_outer_translation_map,
1504 min_inner_elem](
auto cpu_thread_idx,
auto cpu_thread_count) {
1509 sd_inner_to_outer_translation_map,
1514 auto launch_fill_row_ids =
1519 sd_inner_to_outer_translation_map,
1521 for_window_framing](
auto cpu_thread_idx,
auto cpu_thread_count) {
1528 sd_inner_to_outer_translation_map,
1538 sd_inner_to_outer_translation_map,
1542 launch_count_matches,
1543 launch_fill_row_ids);
1551 const int32_t* sd_inner_to_outer_translation_map,
1552 const int32_t min_inner_elem,
1553 const unsigned cpu_thread_count) {
1557 auto launch_count_matches = [bucket_normalization,
1558 count_buff = buff + hash_entry_count,
1561 sd_inner_to_outer_translation_map,
1562 min_inner_elem](
auto cpu_thread_idx,
1563 auto cpu_thread_count) {
1568 sd_inner_to_outer_translation_map,
1572 bucket_normalization);
1574 auto launch_fill_row_ids = [bucket_normalization,
1579 sd_inner_to_outer_translation_map,
1580 min_inner_elem](
auto cpu_thread_idx,
1581 auto cpu_thread_count) {
1587 sd_inner_to_outer_translation_map,
1591 bucket_normalization);
1598 sd_inner_to_outer_translation_map,
1602 launch_count_matches,
1603 launch_fill_row_ids);
1606 template <
typename COUNT_MATCHES_LAUNCH_FUNCTOR,
typename FILL_ROW_IDS_LAUNCH_FUNCTOR>
1609 const int64_t hash_entry_count,
1613 const int32_t* sd_inner_to_outer_translation_map,
1614 const int32_t min_inner_elem,
1615 const unsigned cpu_thread_count,
1616 COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher,
1617 FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher) {
1619 int32_t* pos_buff = buff;
1620 int32_t* count_buff = buff + hash_entry_count;
1621 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1622 std::vector<std::future<void>> counter_threads;
1623 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1628 for (
auto& child : counter_threads) {
1632 std::vector<int32_t> count_copy(hash_entry_count, 0);
1633 CHECK_GT(hash_entry_count, int64_t(0));
1634 memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) *
sizeof(int32_t));
1636 count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
1637 std::vector<std::future<void>> pos_threads;
1638 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1641 [&](
const unsigned thread_idx) {
1642 for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
1643 if (count_buff[i]) {
1644 pos_buff[i] = count_copy[i];
1650 for (
auto& child : pos_threads) {
1654 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1655 std::vector<std::future<void>> rowid_threads;
1656 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1661 for (
auto& child : rowid_threads) {
1667 const int64_t hash_entry_count,
1671 const int32_t* sd_inner_to_outer_translation_map,
1672 const int32_t min_inner_elem,
1673 const unsigned cpu_thread_count) {
1674 auto launch_count_matches = [count_buff = buff + hash_entry_count,
1680 sd_inner_to_outer_translation_map,
1683 ](
auto cpu_thread_idx,
auto cpu_thread_count) {
1690 sd_inner_to_outer_translation_map,
1698 auto launch_fill_row_ids = [buff,
1705 sd_inner_to_outer_translation_map,
1708 ](
auto cpu_thread_idx,
auto cpu_thread_count) {
1716 sd_inner_to_outer_translation_map,
1730 sd_inner_to_outer_translation_map,
1735 launch_count_matches,
1736 launch_fill_row_ids);
1740 const int64_t entry_count,
1741 const size_t key_component_count,
1742 const bool with_val_slot,
1743 const int32_t invalid_slot_val,
1744 const int32_t cpu_thread_idx,
1745 const int32_t cpu_thread_count) {
1746 init_baseline_hash_join_buff<int32_t>(hash_join_buff,
1748 key_component_count,
1756 const int64_t entry_count,
1757 const size_t key_component_count,
1758 const bool with_val_slot,
1759 const int32_t invalid_slot_val,
1760 const int32_t cpu_thread_idx,
1761 const int32_t cpu_thread_count) {
1762 init_baseline_hash_join_buff<int64_t>(hash_join_buff,
1764 key_component_count,
1774 void init_baseline_hash_join_buff_tbb_32(int8_t* hash_join_buff,
1775 const int64_t entry_count,
1776 const size_t key_component_count,
1777 const bool with_val_slot,
1778 const int32_t invalid_slot_val) {
1779 init_baseline_hash_join_buff_tbb<int32_t>(
1780 hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1783 void init_baseline_hash_join_buff_tbb_64(int8_t* hash_join_buff,
1784 const int64_t entry_count,
1785 const size_t key_component_count,
1786 const bool with_val_slot,
1787 const int32_t invalid_slot_val) {
1788 init_baseline_hash_join_buff_tbb<int64_t>(
1789 hash_join_buff, entry_count, key_component_count, with_val_slot, invalid_slot_val);
1792 #endif // #ifdef HAVE_TBB
1793 #endif // #ifndef __CUDACC__
1796 const int64_t entry_count,
1797 const int32_t invalid_slot_val,
1798 const bool for_semi_join,
1799 const size_t key_component_count,
1800 const bool with_val_slot,
1802 const int64_t num_elems,
1803 const int32_t cpu_thread_idx,
1804 const int32_t cpu_thread_count) {
1805 return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1809 key_component_count,
1818 const int64_t entry_count,
1819 const int32_t invalid_slot_val,
1820 const size_t key_component_count,
1821 const bool with_val_slot,
1823 const int64_t num_elems,
1824 const int32_t cpu_thread_idx,
1825 const int32_t cpu_thread_count) {
1826 return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1830 key_component_count,
1839 const size_t entry_count,
1840 const int32_t invalid_slot_val,
1841 const size_t key_component_count,
1842 const bool with_val_slot,
1844 const size_t num_elems,
1845 const int32_t cpu_thread_idx,
1846 const int32_t cpu_thread_count) {
1847 return fill_baseline_hash_join_buff<int32_t>(hash_buff,
1851 key_component_count,
1860 const int64_t entry_count,
1861 const int32_t invalid_slot_val,
1862 const bool for_semi_join,
1863 const size_t key_component_count,
1864 const bool with_val_slot,
1866 const int64_t num_elems,
1867 const int32_t cpu_thread_idx,
1868 const int32_t cpu_thread_count) {
1869 return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1873 key_component_count,
1882 const int64_t entry_count,
1883 const int32_t invalid_slot_val,
1884 const size_t key_component_count,
1885 const bool with_val_slot,
1887 const int64_t num_elems,
1888 const int32_t cpu_thread_idx,
1889 const int32_t cpu_thread_count) {
1890 return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1894 key_component_count,
1903 const size_t entry_count,
1904 const int32_t invalid_slot_val,
1905 const size_t key_component_count,
1906 const bool with_val_slot,
1908 const size_t num_elems,
1909 const int32_t cpu_thread_idx,
1910 const int32_t cpu_thread_count) {
1911 return fill_baseline_hash_join_buff<int64_t>(hash_buff,
1915 key_component_count,
1923 template <
typename T>
1926 const T* composite_key_dict,
1927 const int64_t hash_entry_count,
1928 const size_t key_component_count,
1929 const std::vector<JoinColumn>& join_column_per_key,
1930 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
1931 const std::vector<JoinBucketInfo>& join_buckets_per_key,
1932 const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
1933 const std::vector<int32_t>& sd_min_inner_elems,
1934 const size_t cpu_thread_count,
1935 const bool is_range_join,
1936 const bool is_geo_compressed,
1937 const bool for_window_framing) {
1938 int32_t* pos_buff = buff;
1939 int32_t* count_buff = buff + hash_entry_count;
1940 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
1941 std::vector<std::future<void>> counter_threads;
1942 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
1943 if (is_range_join) {
1949 &join_buckets_per_key,
1950 &join_column_per_key,
1956 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
1957 &join_column_per_key[0],
1958 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
1963 join_column_per_key[0].num_elems,
1967 }
else if (join_buckets_per_key.size() > 0) {
1973 &join_buckets_per_key,
1974 &join_column_per_key,
1978 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
1979 &join_column_per_key[0],
1980 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
1985 join_column_per_key[0].num_elems,
1990 counter_threads.push_back(
1994 &key_component_count,
1996 &join_column_per_key,
1998 &sd_inner_to_outer_translation_maps,
1999 &sd_min_inner_elems,
2002 const auto key_handler =
2005 &join_column_per_key[0],
2006 &type_info_per_key[0],
2007 &sd_inner_to_outer_translation_maps[0],
2008 &sd_min_inner_elems[0]);
2013 join_column_per_key[0].num_elems,
2020 for (
auto& child : counter_threads) {
2024 std::vector<int32_t> count_copy(hash_entry_count, 0);
2025 CHECK_GT(hash_entry_count, int64_t(0));
2026 memcpy(&count_copy[1], count_buff, (hash_entry_count - 1) *
sizeof(int32_t));
2028 count_copy.begin(), count_copy.end(), count_copy.begin(), cpu_thread_count);
2029 std::vector<std::future<void>> pos_threads;
2030 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
2033 [&](
const int thread_idx) {
2034 for (int64_t i = thread_idx; i < hash_entry_count; i += cpu_thread_count) {
2035 if (count_buff[i]) {
2036 pos_buff[i] = count_copy[i];
2042 for (
auto& child : pos_threads) {
2046 memset(count_buff, 0, hash_entry_count *
sizeof(int32_t));
2047 std::vector<std::future<void>> rowid_threads;
2048 for (
size_t cpu_thread_idx = 0; cpu_thread_idx < cpu_thread_count; ++cpu_thread_idx) {
2049 if (is_range_join) {
2055 &join_column_per_key,
2056 &join_buckets_per_key,
2062 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2063 &join_column_per_key[0],
2064 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2070 join_column_per_key[0].num_elems,
2075 }
else if (join_buckets_per_key.size() > 0) {
2081 &join_column_per_key,
2082 &join_buckets_per_key,
2087 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2088 &join_column_per_key[0],
2089 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2095 join_column_per_key[0].num_elems,
2105 key_component_count,
2106 &join_column_per_key,
2108 &sd_inner_to_outer_translation_maps,
2109 &sd_min_inner_elems,
2114 key_component_count,
2116 &join_column_per_key[0],
2117 &type_info_per_key[0],
2118 &sd_inner_to_outer_translation_maps[0],
2119 &sd_min_inner_elems[0]);
2125 join_column_per_key[0].num_elems,
2133 for (
auto& child : rowid_threads) {
2140 const int32_t* composite_key_dict,
2141 const int64_t hash_entry_count,
2142 const size_t key_component_count,
2143 const std::vector<JoinColumn>& join_column_per_key,
2144 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2145 const std::vector<JoinBucketInfo>& join_bucket_info,
2146 const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2147 const std::vector<int32_t>& sd_min_inner_elems,
2148 const int32_t cpu_thread_count,
2149 const bool is_range_join,
2150 const bool is_geo_compressed,
2151 const bool for_window_framing) {
2152 fill_one_to_many_baseline_hash_table<int32_t>(buff,
2155 key_component_count,
2156 join_column_per_key,
2159 sd_inner_to_outer_translation_maps,
2164 for_window_framing);
2169 const int64_t* composite_key_dict,
2170 const int64_t hash_entry_count,
2171 const size_t key_component_count,
2172 const std::vector<JoinColumn>& join_column_per_key,
2173 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2174 const std::vector<JoinBucketInfo>& join_bucket_info,
2175 const std::vector<const int32_t*>& sd_inner_to_outer_translation_maps,
2176 const std::vector<int32_t>& sd_min_inner_elems,
2177 const int32_t cpu_thread_count,
2178 const bool is_range_join,
2179 const bool is_geo_compressed,
2180 const bool for_window_framing) {
2181 fill_one_to_many_baseline_hash_table<int64_t>(buff,
2184 key_component_count,
2185 join_column_per_key,
2188 sd_inner_to_outer_translation_maps,
2193 for_window_framing);
2198 const size_t padded_size_bytes,
2199 const std::vector<JoinColumn>& join_column_per_key,
2200 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2201 const int thread_count) {
2202 CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2203 CHECK(!join_column_per_key.empty());
2205 std::vector<std::future<void>> approx_distinct_threads;
2206 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2207 approx_distinct_threads.push_back(
std::async(
2209 [&join_column_per_key,
2212 hll_buffer_all_cpus,
2216 auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2220 &join_column_per_key[0],
2221 &type_info_per_key[0],
2227 join_column_per_key[0].num_elems,
2233 for (
auto& child : approx_distinct_threads) {
2239 uint8_t* hll_buffer_all_cpus,
2240 std::vector<int32_t>& row_counts,
2242 const size_t padded_size_bytes,
2243 const std::vector<JoinColumn>& join_column_per_key,
2244 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2245 const std::vector<JoinBucketInfo>& join_buckets_per_key,
2246 const int thread_count) {
2247 CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2248 CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2249 CHECK(!join_column_per_key.empty());
2251 std::vector<std::future<void>> approx_distinct_threads;
2252 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2253 approx_distinct_threads.push_back(
std::async(
2255 [&join_column_per_key,
2256 &join_buckets_per_key,
2259 hll_buffer_all_cpus,
2263 auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2266 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2267 &join_column_per_key[0],
2268 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2272 join_column_per_key[0].num_elems,
2278 for (
auto& child : approx_distinct_threads) {
2283 row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2287 uint8_t* hll_buffer_all_cpus,
2288 std::vector<int32_t>& row_counts,
2290 const size_t padded_size_bytes,
2291 const std::vector<JoinColumn>& join_column_per_key,
2292 const std::vector<JoinColumnTypeInfo>& type_info_per_key,
2293 const std::vector<JoinBucketInfo>& join_buckets_per_key,
2294 const bool is_compressed,
2295 const int thread_count) {
2296 CHECK_EQ(join_column_per_key.size(), join_buckets_per_key.size());
2297 CHECK_EQ(join_column_per_key.size(), type_info_per_key.size());
2298 CHECK(!join_column_per_key.empty());
2300 std::vector<std::future<void>> approx_distinct_threads;
2301 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2302 approx_distinct_threads.push_back(
std::async(
2304 [&join_column_per_key,
2305 &join_buckets_per_key,
2308 hll_buffer_all_cpus,
2313 auto hll_buffer = hll_buffer_all_cpus + thread_idx * padded_size_bytes;
2317 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.size(),
2318 &join_column_per_key[0],
2319 join_buckets_per_key[0].inverse_bucket_sizes_for_dimension.data());
2323 join_column_per_key[0].num_elems,
2329 for (
auto& child : approx_distinct_threads) {
2334 row_counts.begin(), row_counts.end(), row_counts.begin(), thread_count);
2340 const std::vector<double>& bucket_size_thresholds,
2341 const int thread_count) {
2342 std::vector<std::vector<double>> bucket_sizes_for_threads;
2343 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2344 bucket_sizes_for_threads.emplace_back(bucket_sizes_for_dimension.size(), 0.0);
2346 std::vector<std::future<void>> threads;
2347 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2349 compute_bucket_sizes_impl<2>,
2350 bucket_sizes_for_threads[thread_idx].data(),
2353 bucket_size_thresholds.data(),
2357 for (
auto& child : threads) {
2361 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
2362 for (
size_t i = 0; i < bucket_sizes_for_dimension.size(); i++) {
2363 if (bucket_sizes_for_threads[thread_idx][i] > bucket_sizes_for_dimension[i]) {
2364 bucket_sizes_for_dimension[i] = bucket_sizes_for_threads[thread_idx][i];
2370 #endif // ifndef __CUDACC__
T * get_matching_baseline_hash_slot_at(int8_t *hash_buff, const uint32_t h, const T *key, const size_t key_component_count, const int64_t hash_entry_size)
bool keys_are_equal(const T *key1, const T *key2, const size_t key_component_count)
GLOBAL void SUFFIX() count_matches_sharded(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_hash_table(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count, const bool for_window_framing)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
FORCE_INLINE uint8_t get_rank(uint64_t x, uint32_t b)
GLOBAL void SUFFIX() fill_row_ids_sharded_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE void fill_row_ids_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
__device__ double atomicMin(double *address, double val)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
ALWAYS_INLINE DEVICE int SUFFIX() fill_hashtable_for_semi_join(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void inclusive_scan(InputIterator first, InputIterator last, OutputIterator out, const size_t thread_count)
DEVICE void SUFFIX() init_baseline_hash_join_buff(int8_t *hash_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
FORCE_INLINE DEVICE uint32_t MurmurHash1Impl(const void *key, int len, const uint32_t seed)
DEVICE auto fill_hash_join_buff_impl(int32_t *buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
const int64_t translated_null_val
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key)
void approximate_distinct_tuples(uint8_t *hll_buffer_all_cpus, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const int thread_count)
DEVICE NEVER_INLINE const T *SUFFIX() get_matching_baseline_hash_slot_readonly(const T *key, const size_t key_component_count, const T *composite_key_dict, const int64_t entry_count, const size_t key_size_in_bytes)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const int64_t bucket_normalization)
future< Result > async(Fn &&fn, Args &&...args)
static constexpr int32_t INVALID_STR_ID
GLOBAL void SUFFIX() fill_row_ids_baseline(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const KEY_HANDLER *f, const int64_t num_elems, const bool for_window_framing, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Iterates over the rows of a JoinColumn across multiple fragments/chunks.
int64_t bucket_normalization
DEVICE T SUFFIX() get_invalid_key()
GLOBAL void SUFFIX() fill_row_ids_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define mapd_cas(address, compare, val)
void compute_bucket_sizes_on_cpu(std::vector< double > &bucket_sizes_for_dimension, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const std::vector< double > &bucket_size_thresholds, const int thread_count)
DEVICE void partial_sum(ARGS &&...args)
int64_t map_str_id_to_outer_dict(const int64_t inner_elem, const int64_t min_inner_elem, const int64_t min_outer_elem, const int64_t max_outer_elem, const int32_t *inner_to_outer_translation_map)
GLOBAL void SUFFIX() count_matches(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define cas_cst(ptr, expected, desired)
GLOBAL void SUFFIX() approximate_distinct_tuples_impl(uint8_t *hll_buffer, int32_t *row_count_buffer, const uint32_t b, const int64_t num_elems, const KEY_HANDLER *f, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_hash_table_sharded(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
DEVICE int SUFFIX() fill_hash_join_buff(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() fill_row_ids_bucketized(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void approximate_distinct_tuples_overlaps(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const int thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count)
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
size_t getNormalizedHashEntryCount() const
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count)
DEVICE int write_baseline_hash_slot(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
DEVICE int fill_hash_join_buff_sharded_impl(int32_t *buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, HASHTABLE_FILLING_FUNC filling_func)
void approximate_distinct_tuples_range(uint8_t *hll_buffer_all_cpus, std::vector< int32_t > &row_counts, const uint32_t b, const size_t padded_size_bytes, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const bool is_compressed, const int thread_count)
#define store_cst(ptr, val)
GLOBAL void SUFFIX() fill_row_ids(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const bool for_window_framing, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define DEBUG_TIMER(name)
ALWAYS_INLINE DEVICE int SUFFIX() fill_one_to_one_hashtable(size_t idx, int32_t *entry_ptr, const int32_t invalid_slot_val)
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded_opt(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
DEVICE NEVER_INLINE double SUFFIX() fixed_width_double_decode_noinline(const int8_t *byte_stream, const int64_t pos)
GLOBAL void SUFFIX() count_matches_bucketized(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_baseline_hash_table(int32_t *buff, const T *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_buckets_per_key, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const size_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
DEVICE int write_baseline_hash_slot_for_semi_join(const int32_t val, int8_t *hash_buff, const int64_t entry_count, const T *key, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const size_t key_size_in_bytes, const size_t hash_entry_size)
DEVICE int SUFFIX() fill_hash_join_buff_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
DEVICE void fill_row_ids_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const bool for_window_framing, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)
__device__ double atomicMax(double *address, double val)
void fill_one_to_many_hash_table_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count, const bool for_window_framing, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_func, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_func)
DEVICE FORCE_INLINE const int8_t * ptr() const
#define mapd_add(address, val)
GLOBAL void SUFFIX() count_matches_baseline(int32_t *count_buff, const T *composite_key_dict, const int64_t entry_count, const KEY_HANDLER *f, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
GLOBAL void SUFFIX() compute_bucket_sizes_impl(double *bucket_sizes_for_thread, const JoinColumn *join_column, const JoinColumnTypeInfo *type_info, const double *bucket_size_thresholds, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
const size_t g_maximum_conditions_to_coalesce
ALWAYS_INLINE DEVICE int32_t *SUFFIX() get_bucketized_hash_slot_sharded(int32_t *buff, const int64_t key, const int64_t min_key, const int64_t translated_null_val, const uint32_t entry_count_per_shard, const uint32_t num_shards, const uint32_t device_count, const int64_t bucket_normalization)
DEVICE int SUFFIX() fill_hash_join_buff_sharded(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define SHARD_FOR_KEY(key, num_shards)
void fill_one_to_many_hash_table_sharded_impl(int32_t *buff, const int64_t hash_entry_count, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count, COUNT_MATCHES_LAUNCH_FUNCTOR count_matches_launcher, FILL_ROW_IDS_LAUNCH_FUNCTOR fill_row_ids_launcher)
size_t bucketized_hash_entry_count
FORCE_INLINE DEVICE uint64_t MurmurHash64AImpl(const void *key, int len, uint64_t seed)
DEVICE void count_matches_impl(int32_t *count_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, SLOT_SELECTOR slot_selector)