27 template <
typename SIZE,
31 const size_t entry_count,
32 const int32_t invalid_slot_val,
33 const bool for_semi_join,
34 const size_t key_component_count,
35 const bool with_val_slot,
36 const KEY_HANDLER* key_handler,
37 const size_t num_elems,
38 const int32_t cpu_thread_idx,
39 const int32_t cpu_thread_count) {
41 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
52 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
63 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
64 "Only Generic, Overlaps, and Range Key Handlers are supported.");
77 template <
typename SIZE,
81 const size_t entry_count,
82 const int32_t invalid_slot_val,
83 const bool for_semi_join,
84 const size_t key_component_count,
85 const bool with_val_slot,
86 const KEY_HANDLER* key_handler,
87 const size_t num_elems,
88 const int32_t cpu_thread_idx,
89 const int32_t cpu_thread_count) {
91 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
102 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
113 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
114 "Only Generic, Overlaps, and Range Key Handlers are supported.");
127 template <
typename SIZE,
131 const size_t entry_count,
132 const int32_t invalid_slot_val,
133 const bool for_semi_join,
134 const size_t key_component_count,
135 const bool with_val_slot,
137 const KEY_HANDLER* key_handler,
138 const size_t num_elems) {
140 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
150 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
153 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
154 "Only Generic, Overlaps, and Range Key Handlers are supported.");
155 LOG(
FATAL) <<
"32-bit keys not yet supported for overlaps join.";
159 template <
typename SIZE,
163 const size_t entry_count,
164 const int32_t invalid_slot_val,
165 const bool for_semi_join,
166 const size_t key_component_count,
167 const bool with_val_slot,
169 const KEY_HANDLER* key_handler,
170 const size_t num_elems) {
172 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
182 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
192 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
193 "Only Generic, Overlaps, and Range Key Handlers are supported.");
205 template <
typename SIZE,
209 const SIZE* composite_key_dict,
210 const size_t hash_entry_count,
211 const size_t key_component_count,
212 const KEY_HANDLER* key_handler,
213 const size_t num_elems) {
215 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
223 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value ||
224 std::is_same<KEY_HANDLER, RangeKeyHandler>::value,
225 "Only Generic, Overlaps, and Range Key Handlers are supported.");
226 LOG(
FATAL) <<
"32-bit keys not yet supported for overlaps join.";
230 template <
typename SIZE,
234 const SIZE* composite_key_dict,
235 const size_t hash_entry_count,
236 const size_t key_component_count,
237 const KEY_HANDLER* key_handler,
238 const size_t num_elems) {
240 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
242 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
243 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
245 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
247 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
248 "Only Generic, Overlaps, and Range Key Handlers are supported.");
250 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
257 template <
class KEY_HANDLER>
260 const std::vector<JoinColumn>& join_columns,
261 const std::vector<JoinColumnTypeInfo>& join_column_types,
262 const std::vector<JoinBucketInfo>& join_bucket_info,
264 str_proxy_translation_maps_ptrs_and_offsets,
265 const size_t keyspace_entry_count,
266 const size_t keys_for_all_rows,
269 const size_t key_component_width,
270 const size_t key_component_count) {
272 const auto entry_size =
275 const size_t one_to_many_hash_entries =
277 ? 2 * keyspace_entry_count + keys_for_all_rows
279 const size_t hash_table_size =
280 entry_size * keyspace_entry_count + one_to_many_hash_entries *
sizeof(int32_t);
283 if (hash_table_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
285 "Hash tables for GPU requiring larger than 2GB contigious memory not supported "
288 const bool for_semi_join =
292 VLOG(1) <<
"Initializing CPU Join Hash Table with " << keyspace_entry_count
293 <<
" hash entries and " << one_to_many_hash_entries
294 <<
" entries in the one to many buffer";
295 VLOG(1) <<
"Total hash table size: " << hash_table_size <<
" Bytes";
298 layout, keyspace_entry_count, keys_for_all_rows, hash_table_size);
299 auto cpu_hash_table_ptr =
hash_table_->getCpuBuffer();
301 std::vector<std::future<void>> init_cpu_buff_threads;
304 auto timer_init =
DEBUG_TIMER(
"CPU Baseline-Hash: init_baseline_hash_join_buff_32");
306 switch (key_component_width) {
308 init_baseline_hash_join_buff_tbb_32(cpu_hash_table_ptr,
309 keyspace_entry_count,
315 init_baseline_hash_join_buff_tbb_64(cpu_hash_table_ptr,
316 keyspace_entry_count,
324 #else // #ifdef HAVE_TBB
325 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
326 init_cpu_buff_threads.emplace_back(
328 [keyspace_entry_count,
335 switch (key_component_width) {
338 keyspace_entry_count,
347 keyspace_entry_count,
359 for (
auto& child : init_cpu_buff_threads) {
364 std::vector<std::future<int>> fill_cpu_buff_threads;
365 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
366 fill_cpu_buff_threads.emplace_back(
std::async(
369 keyspace_entry_count,
378 switch (key_component_width) {
380 return fill_baseline_hash_join_buff<int32_t>(cpu_hash_table_ptr,
381 keyspace_entry_count,
387 join_columns[0].num_elems,
393 return fill_baseline_hash_join_buff<int64_t>(cpu_hash_table_ptr,
394 keyspace_entry_count,
400 join_columns[0].num_elems,
412 for (
auto& child : fill_cpu_buff_threads) {
413 int partial_err = child.get();
422 auto one_to_many_buff =
reinterpret_cast<int32_t*
>(
423 cpu_hash_table_ptr + keyspace_entry_count * entry_size);
425 auto timer_init_additional_buffers =
426 DEBUG_TIMER(
"CPU Baseline-Hash: Additional Buffers init_hash_join_buff");
429 bool is_geo_compressed =
false;
430 if constexpr (std::is_same_v<KEY_HANDLER, RangeKeyHandler>) {
431 if (
const auto range_handler =
432 reinterpret_cast<const RangeKeyHandler*>(key_handler)) {
433 is_geo_compressed = range_handler->is_compressed_;
438 switch (key_component_width) {
440 const auto composite_key_dict =
reinterpret_cast<int32_t*
>(cpu_hash_table_ptr);
444 keyspace_entry_count,
449 str_proxy_translation_maps_ptrs_and_offsets.first,
450 str_proxy_translation_maps_ptrs_and_offsets.second,
452 std::is_same_v<KEY_HANDLER, RangeKeyHandler>,
457 const auto composite_key_dict =
reinterpret_cast<int64_t*
>(cpu_hash_table_ptr);
461 keyspace_entry_count,
466 str_proxy_translation_maps_ptrs_and_offsets.first,
467 str_proxy_translation_maps_ptrs_and_offsets.second,
469 std::is_same_v<KEY_HANDLER, RangeKeyHandler>,
481 const size_t key_component_width,
482 const size_t key_component_count,
483 const size_t keyspace_entry_count,
484 const size_t emitted_keys_count,
486 const Executor* executor) {
488 const auto entry_size =
491 const size_t one_to_many_hash_entries =
493 ? 2 * keyspace_entry_count + emitted_keys_count
495 const size_t hash_table_size =
496 entry_size * keyspace_entry_count + one_to_many_hash_entries *
sizeof(int32_t);
499 if (hash_table_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
501 "Hash tables for GPU requiring larger than 2GB contigious memory not supported "
505 VLOG(1) <<
"Initializing GPU Hash Table for device " << device_id <<
" with "
506 << keyspace_entry_count <<
" hash entries and " << one_to_many_hash_entries
508 VLOG(1) <<
"Total hash table size: " << hash_table_size <<
" Bytes";
510 hash_table_ = std::make_unique<BaselineHashTable>(executor->getDataMgr(),
512 keyspace_entry_count,
521 template <
class KEY_HANDLER>
523 const std::vector<JoinColumn>& join_columns,
526 const size_t key_component_width,
527 const size_t key_component_count,
528 const size_t keyspace_entry_count,
529 const size_t emitted_keys_count,
531 const Executor* executor) {
538 keyspace_entry_count,
542 if (!keyspace_entry_count) {
544 CHECK(!emitted_keys_count);
547 auto data_mgr = executor->getDataMgr();
548 auto allocator = std::make_unique<CudaAllocator>(
550 auto dev_err_buff = allocator->alloc(
sizeof(
int));
552 allocator->copyToDevice(dev_err_buff, &err,
sizeof(err));
553 auto gpu_hash_table_buff =
hash_table_->getGpuBuffer();
554 CHECK(gpu_hash_table_buff);
555 const bool for_semi_join =
560 switch (key_component_width) {
563 keyspace_entry_count,
570 keyspace_entry_count,
578 switch (key_component_width) {
580 fill_baseline_hash_join_buff_on_device<int32_t>(
582 keyspace_entry_count,
587 reinterpret_cast<int*
>(dev_err_buff),
589 join_columns.front().num_elems);
590 allocator->copyFromDevice(&err, dev_err_buff,
sizeof(err));
594 fill_baseline_hash_join_buff_on_device<int64_t>(
596 keyspace_entry_count,
601 reinterpret_cast<int*
>(dev_err_buff),
603 join_columns.front().num_elems);
604 allocator->copyFromDevice(&err, dev_err_buff,
sizeof(err));
614 const auto entry_size = key_component_count * key_component_width;
615 auto one_to_many_buff =
reinterpret_cast<int32_t*
>(
616 gpu_hash_table_buff + keyspace_entry_count * entry_size);
619 switch (key_component_width) {
621 const auto composite_key_dict =
reinterpret_cast<int32_t*
>(gpu_hash_table_buff);
622 fill_one_to_many_baseline_hash_table_on_device<int32_t>(
625 keyspace_entry_count,
628 join_columns.front().num_elems);
633 const auto composite_key_dict =
reinterpret_cast<int64_t*
>(gpu_hash_table_buff);
634 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
637 keyspace_entry_count,
640 join_columns.front().num_elems);
void fill_baseline_hash_join_buff_on_device(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const KEY_HANDLER *key_handler, const size_t num_elems)
void fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const GenericKeyHandler *key_handler, const int64_t num_elems)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_on_device(int32_t *buff, const SIZE *composite_key_dict, const size_t hash_entry_count, const size_t key_component_count, const KEY_HANDLER *key_handler, const size_t num_elems)
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count)
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
void fill_baseline_hash_join_buff_on_device_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
void range_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const RangeKeyHandler *key_handler, const size_t num_elems)
BaselineJoinHashTableBuilder()=default
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::unique_ptr< BaselineHashTable > hash_table_
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
void fill_one_to_many_baseline_hash_table_on_device_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const GenericKeyHandler *key_handler, const int64_t num_elems)
void setHashLayout(HashType layout)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
future< Result > async(Fn &&fn, Args &&...args)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void range_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const RangeKeyHandler *key_handler, const size_t num_elems)
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
HashType getHashLayout() const
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void allocateDeviceMemory(const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor)
#define DEBUG_TIMER(name)
void fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed)
Allocate GPU memory using GpuBuffers via DataMgr.
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept