31 const size_t shard_count,
33 const int device_count,
37 const auto shards_per_device = (shard_count + device_count - 1) / device_count;
39 const size_t entries_per_shard =
43 const size_t total_count =
49 std::make_unique<PerfectHashTable>(executor->
getDataMgr(),
54 hash_table_->allocateGpuMemory(total_count, device_id);
63 const size_t shard_count,
65 const int device_count,
77 void initHashTableOnGpu(
const ChunkKey& chunk_key,
80 const bool is_bitwise_eq,
85 const size_t shard_count,
86 const int32_t hash_join_invalid_val,
88 const int device_count,
94 ScopeGuard cleanup_error_buff = [&data_mgr, gpu_hash_table_err_buff]() {
95 data_mgr->free(gpu_hash_table_err_buff);
97 CHECK(gpu_hash_table_err_buff);
98 auto dev_err_buff = gpu_hash_table_err_buff->
getMemoryPtr();
101 auto allocator = std::make_unique<CudaAllocator>(
103 allocator->copyToDevice(dev_err_buff, &err,
sizeof(err));
106 auto gpu_hash_table_buff =
hash_table_->getGpuBuffer();
110 hash_join_invalid_val);
111 if (chunk_key.empty()) {
116 const auto inner_col = cols.first;
118 const auto& ti = inner_col->get_type_info();
119 auto translated_null_val = col_range.
getIntMax() + 1;
121 translated_null_val = col_range.
getIntMin() - 1;
130 auto use_bucketization = inner_col->get_type_info().get_type() ==
kDATE;
132 const size_t entries_per_shard =
135 for (
size_t shard = device_id; shard < shard_count; shard += device_count) {
136 ShardInfo shard_info{shard, entries_per_shard, shard_count, device_count};
139 reinterpret_cast<int32_t*>(gpu_hash_table_buff),
140 hash_join_invalid_val,
142 reinterpret_cast<int*>(dev_err_buff),
149 reinterpret_cast<int32_t*>(gpu_hash_table_buff),
159 reinterpret_cast<int32_t*>(gpu_hash_table_buff),
160 hash_join_invalid_val,
162 reinterpret_cast<int*>(dev_err_buff),
167 if (use_bucketization) {
169 reinterpret_cast<int32_t*>(gpu_hash_table_buff),
175 reinterpret_cast<int32_t*>(gpu_hash_table_buff),
183 allocator->copyFromDevice(&err, dev_err_buff,
sizeof(err));
188 throw std::runtime_error(
"Unexpected error when building perfect hash table: " +
198 const bool is_bitwise_eq,
204 const int32_t hash_join_invalid_val,
207 const auto inner_col = cols.first;
209 const auto& ti = inner_col->get_type_info();
213 std::make_unique<PerfectHashTable>(executor->
getDataMgr(),
219 auto cpu_hash_table_buff =
reinterpret_cast<int32_t*
>(
hash_table_->getCpuBuffer());
221 std::vector<std::thread> init_cpu_buff_threads;
224 auto timer_init =
DEBUG_TIMER(
"CPU One-To-One Perfect-Hash: init_hash_join_buff");
226 init_hash_join_buff_tbb(cpu_hash_table_buff,
228 hash_join_invalid_val);
229 #else // #ifdef HAVE_TBB
230 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
231 init_cpu_buff_threads.emplace_back([hash_entry_info,
232 hash_join_invalid_val,
235 cpu_hash_table_buff] {
238 hash_join_invalid_val,
243 for (
auto& t : init_cpu_buff_threads) {
246 init_cpu_buff_threads.clear();
250 std::atomic<int> err{0};
253 DEBUG_TIMER(
"CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
254 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
255 init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
257 str_proxy_translation_map,
267 auto translated_null_val = col_range.
getIntMax() + 1;
269 translated_null_val = col_range.
getIntMin() - 1;
273 hash_join_invalid_val,
276 {
static_cast<size_t>(ti.get_size()),
283 str_proxy_translation_map ? str_proxy_translation_map->
data() :
nullptr,
284 str_proxy_translation_map ? str_proxy_translation_map->
domainStart()
290 err.compare_exchange_strong(zero, partial_err);
293 for (
auto& t : init_cpu_buff_threads) {
307 const bool is_bitwise_eq,
308 const std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*>& cols,
312 const int32_t hash_join_invalid_val,
315 const auto inner_col = cols.first;
317 const auto& ti = inner_col->get_type_info();
326 auto cpu_hash_table_buff =
reinterpret_cast<int32_t*
>(
hash_table_->getCpuBuffer());
331 DEBUG_TIMER(
"CPU One-To-Many Perfect Hash Table Builder: init_hash_join_buff");
333 init_hash_join_buff_tbb(cpu_hash_table_buff,
335 hash_join_invalid_val);
336 #else // #ifdef HAVE_TBB
337 std::vector<std::future<void> > init_threads;
338 for (
int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
339 init_threads.emplace_back(
344 hash_join_invalid_val,
348 for (
auto& child : init_threads) {
351 for (
auto& child : init_threads) {
358 "CPU One-To-Many Perfect Hash Table Builder: fill_hash_join_buff_bucketized");
359 auto translated_null_val = col_range.
getIntMax() + 1;
361 translated_null_val = col_range.
getIntMin() - 1;
363 if (ti.get_type() ==
kDATE) {
368 {
static_cast<size_t>(ti.get_size()),
375 str_proxy_translation_map ? str_proxy_translation_map->
data() :
nullptr,
376 str_proxy_translation_map ? str_proxy_translation_map->
domainStart()
384 {
static_cast<size_t>(ti.get_size()),
391 str_proxy_translation_map ? str_proxy_translation_map->
data() :
nullptr,
392 str_proxy_translation_map ? str_proxy_translation_map->
domainStart()
403 const size_t shard_count) {
405 return (total_entry_count + shard_count - 1) / shard_count;
int64_t getIntMin() const
std::vector< int > ChunkKey
void fill_hash_join_buff_on_device_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int64_t bucket_normalization)
DEVICE int SUFFIX() fill_hash_join_buff_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const int32_t cpu_thread_idx, const int32_t cpu_thread_count, const int64_t bucket_normalization)
void fill_one_to_many_hash_table(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count, const bool for_window_framing)
void fill_one_to_many_hash_table_on_device_sharded(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const ShardInfo &shard_info)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
virtual int8_t * getMemoryPtr()=0
int32_t domainStart() const
const bool for_semi_anti_join(const JoinType join_type)
static size_t get_entries_per_shard(const size_t total_entry_count, const size_t shard_count)
PerfectJoinHashTableBuilder()
void fill_one_to_many_hash_table_on_device_bucketized(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info)
void fill_one_to_many_hash_table_bucketized(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const int32_t *sd_inner_to_outer_translation_map, const int32_t min_inner_elem, const unsigned cpu_thread_count)
future< Result > async(Fn &&fn, Args &&...args)
void allocateDeviceMemory(const size_t num_column_elems, const HashType layout, BucketizedHashEntryInfo hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
int64_t bucket_normalization
void allocateDeviceMemory(const JoinColumn &join_column, const HashType layout, BucketizedHashEntryInfo &hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
An AbstractBuffer is a unit of data management for a data manager.
void init_hash_join_buff_on_device(int32_t *buff, const int64_t entry_count, const int32_t invalid_slot_val)
void fill_one_to_many_hash_table_on_device(int32_t *buff, const BucketizedHashEntryInfo hash_entry_info, const JoinColumn &join_column, const JoinColumnTypeInfo &type_info, const bool for_window_framing)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::unique_ptr< PerfectHashTable > getHashTable()
size_t getNormalizedHashEntryCount() const
DEVICE void SUFFIX() init_hash_join_buff(int32_t *groups_buffer, const int64_t hash_entry_count, const int32_t invalid_slot_val, const int32_t cpu_thread_idx, const int32_t cpu_thread_count)
Data_Namespace::DataMgr * getDataMgr() const
static Data_Namespace::AbstractBuffer * allocGpuAbstractBuffer(Data_Namespace::DataMgr *data_mgr, const size_t num_bytes, const int device_id)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
#define DEBUG_TIMER(name)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::unique_ptr< PerfectHashTable > hash_table_
void fill_hash_join_buff_on_device_sharded_bucketized(int32_t *buff, const int32_t invalid_slot_val, const bool for_semi_join, int *dev_err_buff, const JoinColumn join_column, const JoinColumnTypeInfo type_info, const ShardInfo shard_info, const int64_t bucket_normalization)
size_t bucketized_hash_entry_count