27 template <
typename SIZE,
31 const size_t entry_count,
32 const int32_t invalid_slot_val,
33 const bool for_semi_join,
34 const size_t key_component_count,
35 const bool with_val_slot,
36 const KEY_HANDLER* key_handler,
37 const size_t num_elems,
38 const int32_t cpu_thread_idx,
39 const int32_t cpu_thread_count) {
41 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
52 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
63 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
64 "Only Generic, Overlaps, and Range Key Handlers are supported.");
77 template <
typename SIZE,
81 const size_t entry_count,
82 const int32_t invalid_slot_val,
83 const bool for_semi_join,
84 const size_t key_component_count,
85 const bool with_val_slot,
86 const KEY_HANDLER* key_handler,
87 const size_t num_elems,
88 const int32_t cpu_thread_idx,
89 const int32_t cpu_thread_count) {
91 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
102 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
113 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
114 "Only Generic, Overlaps, and Range Key Handlers are supported.");
127 template <
typename SIZE,
131 const size_t entry_count,
132 const int32_t invalid_slot_val,
133 const bool for_semi_join,
134 const size_t key_component_count,
135 const bool with_val_slot,
137 const KEY_HANDLER* key_handler,
138 const size_t num_elems) {
140 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
150 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
153 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
154 "Only Generic, Overlaps, and Range Key Handlers are supported.");
155 LOG(
FATAL) <<
"32-bit keys not yet supported for overlaps join.";
159 template <
typename SIZE,
163 const size_t entry_count,
164 const int32_t invalid_slot_val,
165 const bool for_semi_join,
166 const size_t key_component_count,
167 const bool with_val_slot,
169 const KEY_HANDLER* key_handler,
170 const size_t num_elems) {
172 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
182 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
192 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
193 "Only Generic, Overlaps, and Range Key Handlers are supported.");
205 template <
typename SIZE,
209 const SIZE* composite_key_dict,
210 const size_t hash_entry_count,
211 const size_t key_component_count,
212 const KEY_HANDLER* key_handler,
213 const size_t num_elems,
214 const bool for_window_framing) {
216 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
225 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value ||
226 std::is_same<KEY_HANDLER, RangeKeyHandler>::value,
227 "Only Generic, Overlaps, and Range Key Handlers are supported.");
228 LOG(
FATAL) <<
"32-bit keys not yet supported for overlaps join.";
232 template <
typename SIZE,
236 const SIZE* composite_key_dict,
237 const size_t hash_entry_count,
238 const size_t key_component_count,
239 const KEY_HANDLER* key_handler,
240 const size_t num_elems,
241 const bool for_window_framing) {
243 if constexpr (std::is_same<KEY_HANDLER, GenericKeyHandler>::value) {
250 }
else if constexpr (std::is_same<KEY_HANDLER, RangeKeyHandler>::value) {
252 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
254 static_assert(std::is_same<KEY_HANDLER, OverlapsKeyHandler>::value,
255 "Only Generic, Overlaps, and Range Key Handlers are supported.");
257 buff, composite_key_dict, hash_entry_count, key_handler, num_elems);
264 template <
class KEY_HANDLER>
267 const std::vector<JoinColumn>& join_columns,
268 const std::vector<JoinColumnTypeInfo>& join_column_types,
269 const std::vector<JoinBucketInfo>& join_bucket_info,
271 str_proxy_translation_maps_ptrs_and_offsets,
272 const size_t keyspace_entry_count,
273 const size_t keys_for_all_rows,
276 const size_t key_component_width,
277 const size_t key_component_count,
280 auto const entry_cnt = (key_component_count + (layout ==
HashType::OneToOne ? 1 : 0));
281 auto const entry_size = entry_cnt * key_component_width;
282 size_t const one_to_many_hash_entries =
284 ? 2 * keyspace_entry_count +
288 size_t const hash_table_size =
289 entry_size * keyspace_entry_count + one_to_many_hash_entries *
sizeof(int32_t);
297 if (hash_table_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
299 "Hash tables for GPU requiring larger than 2GB contigious memory not supported "
302 const bool for_semi_join =
307 layout, keyspace_entry_count, keys_for_all_rows, hash_table_size);
308 VLOG(1) <<
"Initialize a CPU baseline hash table for join type "
310 <<
", hash table size: " << hash_table_size <<
" Bytes"
311 <<
", # hash entries: " << entry_cnt <<
", entry_size: " << entry_size
312 <<
", # entries in the payload buffer: " << one_to_many_hash_entries
313 <<
" (# non-null hash entries: " << keyspace_entry_count
314 <<
", # entries stored in the payload buffer: " << keys_for_all_rows <<
")";
315 auto cpu_hash_table_ptr =
hash_table_->getCpuBuffer();
317 std::vector<std::future<void>> init_cpu_buff_threads;
320 auto timer_init =
DEBUG_TIMER(
"CPU Baseline-Hash: init_baseline_hash_join_buff_32");
322 switch (key_component_width) {
324 init_baseline_hash_join_buff_tbb_32(cpu_hash_table_ptr,
325 keyspace_entry_count,
331 init_baseline_hash_join_buff_tbb_64(cpu_hash_table_ptr,
332 keyspace_entry_count,
340 #else // #ifdef HAVE_TBB
341 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
342 init_cpu_buff_threads.emplace_back(
std::async(
344 [keyspace_entry_count,
354 switch (key_component_width) {
357 keyspace_entry_count,
366 keyspace_entry_count,
378 for (
auto& child : init_cpu_buff_threads) {
383 std::vector<std::future<int>> fill_cpu_buff_threads;
384 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
385 fill_cpu_buff_threads.emplace_back(
std::async(
388 keyspace_entry_count,
400 switch (key_component_width) {
402 return fill_baseline_hash_join_buff<int32_t>(cpu_hash_table_ptr,
403 keyspace_entry_count,
409 join_columns[0].num_elems,
415 return fill_baseline_hash_join_buff<int64_t>(cpu_hash_table_ptr,
416 keyspace_entry_count,
422 join_columns[0].num_elems,
434 for (
auto& child : fill_cpu_buff_threads) {
435 int partial_err = child.get();
444 auto one_to_many_buff =
reinterpret_cast<int32_t*
>(
445 cpu_hash_table_ptr + keyspace_entry_count * entry_size);
447 auto timer_init_additional_buffers =
448 DEBUG_TIMER(
"CPU Baseline-Hash: Additional Buffers init_hash_join_buff");
451 bool is_geo_compressed =
false;
452 if constexpr (std::is_same_v<KEY_HANDLER, RangeKeyHandler>) {
453 if (
const auto range_handler =
454 reinterpret_cast<const RangeKeyHandler*>(key_handler)) {
455 is_geo_compressed = range_handler->is_compressed_;
459 switch (key_component_width) {
461 const auto composite_key_dict =
reinterpret_cast<int32_t*
>(cpu_hash_table_ptr);
465 keyspace_entry_count,
470 str_proxy_translation_maps_ptrs_and_offsets.first,
471 str_proxy_translation_maps_ptrs_and_offsets.second,
473 std::is_same_v<KEY_HANDLER, RangeKeyHandler>,
479 const auto composite_key_dict =
reinterpret_cast<int64_t*
>(cpu_hash_table_ptr);
483 keyspace_entry_count,
488 str_proxy_translation_maps_ptrs_and_offsets.first,
489 str_proxy_translation_maps_ptrs_and_offsets.second,
491 std::is_same_v<KEY_HANDLER, RangeKeyHandler>,
504 const size_t key_component_width,
505 const size_t key_component_count,
506 const size_t keyspace_entry_count,
507 const size_t emitted_keys_count,
509 const Executor* executor,
512 const auto num_hash_entries =
514 const auto entry_size = num_hash_entries * key_component_width;
515 const size_t one_to_many_hash_entries =
517 ? 2 * keyspace_entry_count + emitted_keys_count
519 const size_t hash_table_size =
520 entry_size * keyspace_entry_count + one_to_many_hash_entries *
sizeof(int32_t);
528 if (hash_table_size > static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
530 "Hash tables for GPU requiring larger than 2GB contigious memory not supported "
534 VLOG(1) <<
"Initialize a GPU baseline hash table for device " << device_id
536 <<
", hash table size: " << hash_table_size <<
" Bytes"
537 <<
", # hash entries: " << num_hash_entries <<
", entry_size: " << entry_size
538 <<
", # entries in the payload buffer: " << one_to_many_hash_entries
539 <<
" (# non-null hash entries: " << key_component_count
540 <<
", # entries stored in the payload buffer: " << emitted_keys_count <<
")";
542 hash_table_ = std::make_unique<BaselineHashTable>(executor->getDataMgr(),
544 keyspace_entry_count,
553 template <
class KEY_HANDLER>
555 const std::vector<JoinColumn>& join_columns,
558 const size_t key_component_width,
559 const size_t key_component_count,
560 const size_t keyspace_entry_count,
561 const size_t emitted_keys_count,
563 const Executor* executor,
571 keyspace_entry_count,
576 if (!keyspace_entry_count) {
578 CHECK(!emitted_keys_count);
581 auto data_mgr = executor->getDataMgr();
582 auto allocator = std::make_unique<CudaAllocator>(
584 auto dev_err_buff = allocator->alloc(
sizeof(
int));
586 allocator->copyToDevice(dev_err_buff, &err,
sizeof(err));
587 auto gpu_hash_table_buff =
hash_table_->getGpuBuffer();
588 CHECK(gpu_hash_table_buff);
589 const bool for_semi_join =
594 switch (key_component_width) {
597 keyspace_entry_count,
604 keyspace_entry_count,
612 switch (key_component_width) {
614 fill_baseline_hash_join_buff_on_device<int32_t>(
616 keyspace_entry_count,
621 reinterpret_cast<int*
>(dev_err_buff),
623 join_columns.front().num_elems);
624 allocator->copyFromDevice(&err, dev_err_buff,
sizeof(err));
628 fill_baseline_hash_join_buff_on_device<int64_t>(
630 keyspace_entry_count,
635 reinterpret_cast<int*
>(dev_err_buff),
637 join_columns.front().num_elems);
638 allocator->copyFromDevice(&err, dev_err_buff,
sizeof(err));
648 const auto entry_size = key_component_count * key_component_width;
649 auto one_to_many_buff =
reinterpret_cast<int32_t*
>(
650 gpu_hash_table_buff + keyspace_entry_count * entry_size);
653 switch (key_component_width) {
655 const auto composite_key_dict =
reinterpret_cast<int32_t*
>(gpu_hash_table_buff);
656 fill_one_to_many_baseline_hash_table_on_device<int32_t>(
659 keyspace_entry_count,
662 join_columns.front().num_elems,
668 const auto composite_key_dict =
reinterpret_cast<int64_t*
>(gpu_hash_table_buff);
669 fill_one_to_many_baseline_hash_table_on_device<int64_t>(
672 keyspace_entry_count,
675 join_columns.front().num_elems,
void fill_baseline_hash_join_buff_on_device(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const KEY_HANDLER *key_handler, const size_t num_elems)
void init_baseline_hash_join_buff_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void fill_one_to_many_baseline_hash_table_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
void fill_one_to_many_baseline_hash_table_on_device(int32_t *buff, const SIZE *composite_key_dict, const size_t hash_entry_count, const size_t key_component_count, const KEY_HANDLER *key_handler, const size_t num_elems, const bool for_window_framing)
T * transfer_flat_object_to_gpu(const T &object, DeviceAllocator &allocator)
void init_baseline_hash_join_buff_on_device_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
void fill_baseline_hash_join_buff_on_device_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
void fill_one_to_many_baseline_hash_table_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const std::vector< JoinColumn > &join_column_per_key, const std::vector< JoinColumnTypeInfo > &type_info_per_key, const std::vector< JoinBucketInfo > &join_bucket_info, const std::vector< const int32_t * > &sd_inner_to_outer_translation_maps, const std::vector< int32_t > &sd_min_inner_elems, const int32_t cpu_thread_count, const bool is_range_join, const bool is_geo_compressed, const bool for_window_framing)
void range_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const RangeKeyHandler *key_handler, const size_t num_elems)
int initHashTableOnGpu(KEY_HANDLER *key_handler, const std::vector< JoinColumn > &join_columns, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
BaselineJoinHashTableBuilder()=default
int initHashTableOnCpu(KEY_HANDLER *key_handler, const CompositeKeyInfo &composite_key_info, const std::vector< JoinColumn > &join_columns, const std::vector< JoinColumnTypeInfo > &join_column_types, const std::vector< JoinBucketInfo > &join_bucket_info, const StrProxyTranslationMapsPtrsAndOffsets &str_proxy_translation_maps_ptrs_and_offsets, const size_t keyspace_entry_count, const size_t keys_for_all_rows, const HashType layout, const JoinType join_type, const size_t key_component_width, const size_t key_component_count, const RegisteredQueryHint &query_hint)
void init_baseline_hash_join_buff_64(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
std::unique_ptr< BaselineHashTable > hash_table_
size_t max_join_hash_table_size
void overlaps_fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
void setHashLayout(HashType layout)
void init_baseline_hash_join_buff_on_device_32(int8_t *hash_join_buff, const int64_t entry_count, const size_t key_component_count, const bool with_val_slot, const int32_t invalid_slot_val)
future< Result > async(Fn &&fn, Args &&...args)
void fill_one_to_many_baseline_hash_table_on_device_32(int32_t *buff, const int32_t *composite_key_dict, const int64_t hash_entry_count, const size_t key_component_count, const GenericKeyHandler *key_handler, const int64_t num_elems, const bool for_window_framing)
void fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const GenericKeyHandler *key_handler, const int64_t num_elems, const bool for_window_framing)
int fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void allocateDeviceMemory(const HashType layout, const size_t key_component_width, const size_t key_component_count, const size_t keyspace_entry_count, const size_t emitted_keys_count, const int device_id, const Executor *executor, const RegisteredQueryHint &query_hint)
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
int fill_baseline_hash_join_buff(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const KEY_HANDLER *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
void range_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const size_t hash_entry_count, const RangeKeyHandler *key_handler, const size_t num_elems)
int range_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
HashType getHashLayout() const
std::pair< std::vector< const int32_t * >, std::vector< int32_t >> StrProxyTranslationMapsPtrsAndOffsets
std::unique_ptr< BaselineHashTable > getHashTable()
static std::string getHashTypeString(HashType ht) noexcept
int overlaps_fill_baseline_hash_join_buff_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
int range_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const size_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const RangeKeyHandler *key_handler, const size_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
bool isHintRegistered(const QueryHint hint) const
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
int overlaps_fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const size_t key_component_count, const bool with_val_slot, const OverlapsKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
#define DEBUG_TIMER(name)
void fill_baseline_hash_join_buff_on_device_64(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, int *dev_err_buff, const GenericKeyHandler *key_handler, const int64_t num_elems)
void overlaps_fill_one_to_many_baseline_hash_table_on_device_64(int32_t *buff, const int64_t *composite_key_dict, const int64_t hash_entry_count, const OverlapsKeyHandler *key_handler, const int64_t num_elems)
Allocate GPU memory using GpuBuffers via DataMgr.
int fill_baseline_hash_join_buff_32(int8_t *hash_buff, const int64_t entry_count, const int32_t invalid_slot_val, const bool for_semi_join, const size_t key_component_count, const bool with_val_slot, const GenericKeyHandler *key_handler, const int64_t num_elems, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
ThreadLocalIds thread_local_ids()
static bool layoutRequiresAdditionalBuffers(HashType layout) noexcept