38 std::make_unique<HashingSchemeRecycler>();
41 std::pair<InnerOuter, InnerOuterStringOpInfos>
get_cols(
51 bool const is_bw_eq) {
52 using EmptyRangeSize = boost::optional<size_t>;
54 bool const is_bw_eq) -> EmptyRangeSize {
63 return EmptyRangeSize{};
66 auto empty_range = empty_range_check(col_range, is_bw_eq);
68 return {size_t(*empty_range), 1};
71 int64_t bucket_normalization =
74 auto const normalized_max = col_range.
getIntMax() / bucket_normalization;
75 auto const normalized_min = col_range.
getIntMin() / bucket_normalization;
76 return {size_t(normalized_max - normalized_min + 1 + (is_bw_eq ? 1 : 0)),
77 bucket_normalization};
84 return is_bw_eq ? 1 : 0;
94 const Executor* executor) {
95 const auto inner_table_info = executor->getTableInfo(inner_table_key);
96 std::unordered_set<int> device_holding_fragments;
97 auto cuda_mgr = executor->getDataMgr()->getCudaMgr();
98 const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
99 for (
const auto& fragment : inner_table_info.fragments) {
100 if (fragment.shard != -1) {
101 const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
113 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
114 const Executor* executor) {
115 const auto inner_col = equi_pair.first;
117 if (!outer_col || inner_col->getColumnKey().table_id < 0 ||
118 outer_col->getColumnKey().table_id < 0) {
121 if (outer_col->get_rte_idx()) {
124 if (inner_col->get_type_info() != outer_col->get_type_info()) {
128 const auto inner_td =
131 const auto outer_td =
134 if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
135 inner_td->nShards != outer_td->nShards) {
142 return (inner_td->shardedColumnId == inner_col->getColumnKey().column_id &&
143 outer_td->shardedColumnId == outer_col->getColumnKey().column_id) ||
144 (outer_td->shardedColumnId == inner_col->getColumnKey().column_id &&
145 inner_td->shardedColumnId == inner_col->getColumnKey().column_id)
152 const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
153 const std::vector<InputTableInfo>& query_infos,
157 const int device_count,
164 const auto cols_and_string_op_infos =
165 get_cols(qual_bin_oper.get(), executor->temporary_tables_);
166 const auto& cols = cols_and_string_op_infos.first;
167 const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
168 const auto inner_col = cols.first;
170 const auto& ti = inner_col->get_type_info();
175 "Could not compute range for the expressions involved in the equijoin");
177 const auto rhs_source_col_range =
179 if (ti.is_string()) {
183 "Could not compute range for the expressions involved in the equijoin");
185 if (rhs_source_col_range.getIntMin() > rhs_source_col_range.getIntMax()) {
187 CHECK_EQ(rhs_source_col_range.getIntMin(), int64_t(0));
188 CHECK_EQ(rhs_source_col_range.getIntMax(), int64_t(-1));
189 col_range = rhs_source_col_range;
192 std::min(rhs_source_col_range.getIntMin(), col_range.getIntMin()),
193 std::max(rhs_source_col_range.getIntMax(), col_range.getIntMax()),
195 rhs_source_col_range.hasNulls());
200 const auto max_hash_entry_count =
202 ?
static_cast<size_t>(std::numeric_limits<int32_t>::max() /
sizeof(int32_t))
203 :
static_cast<size_t>(std::numeric_limits<int32_t>::max());
206 ti, col_range, qual_bin_oper->get_optype() ==
kBW_EQ);
208 if (bucketized_entry_count > max_hash_entry_count) {
212 if (qual_bin_oper->get_optype() ==
kBW_EQ &&
213 col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
214 throw HashJoinFail(
"Cannot translate null value for kBW_EQ");
216 decltype(std::chrono::steady_clock::now()) ts1, ts2;
218 ts1 = std::chrono::steady_clock::now();
221 auto join_hash_table = std::shared_ptr<PerfectJoinHashTable>(
229 rhs_source_col_range,
230 bucketized_entry_count_info,
235 hashtable_build_dag_map,
236 table_id_to_node_map,
237 inner_outer_string_op_infos));
239 join_hash_table->reify();
242 join_hash_table->freeHashBufferMemory();
243 throw std::runtime_error(e.what());
247 join_hash_table->freeHashBufferMemory();
248 throw HashJoinFail(std::string(
"Could not build a 1-to-1 correspondence for columns "
249 "involved in equijoin | ") +
252 throw HashJoinFail(std::string(
"Could not build hash tables for equijoin | ") +
256 std::string(
"Ran out of memory while building hash tables for equijoin | ") +
260 }
catch (
const std::exception& e) {
261 throw std::runtime_error(
262 std::string(
"Fatal error while attempting to build hash tables for join: ") +
266 ts2 = std::chrono::steady_clock::now();
267 VLOG(1) <<
"Built perfect hash table "
269 << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
272 return join_hash_table;
278 const Executor* executor) {
279 if (inner_outer_string_op_infos.first.size() ||
280 inner_outer_string_op_infos.second.size()) {
283 auto inner_col = inner_outer_col_pair.first;
284 auto outer_col_expr = inner_outer_col_pair.second;
286 const auto& inner_col_key = inner_col->getColumnKey();
288 inner_col_key.table_id,
290 executor->getTemporaryTables());
292 if (!inner_ti.is_string()) {
299 if (!inner_cd || !outer_cd) {
302 const auto& outer_col_key = outer_col->getColumnKey();
304 outer_col_key.table_id,
306 executor->getTemporaryTables());
307 CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
309 if (outer_ti.getStringDictKey() != inner_ti.getStringDictKey()) {
312 const auto inner_str_dict_proxy =
313 executor->getStringDictionaryProxy(inner_ti.getStringDictKey(),
true);
314 CHECK(inner_str_dict_proxy);
315 const auto outer_str_dict_proxy =
316 executor->getStringDictionaryProxy(outer_ti.getStringDictKey(),
true);
317 CHECK(outer_str_dict_proxy);
319 return *inner_str_dict_proxy != *outer_str_dict_proxy;
323 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
325 const int device_count) {
326 std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
327 for (
const auto& fragment : fragments) {
329 if (fragment.shard % device_count == device_id) {
330 shards_for_device.push_back(fragment);
333 return shards_for_device;
337 const std::vector<ColumnsForDevice>& columns_per_device)
const {
340 const auto max_unique_hash_input_entries =
345 for (
const auto& device_columns : columns_per_device) {
346 CHECK(!device_columns.join_columns.empty());
347 const auto rhs_join_col_num_entries = device_columns.join_columns.front().num_elems;
348 if (rhs_join_col_num_entries > max_unique_hash_input_entries) {
349 VLOG(1) <<
"Skipping attempt to build perfect hash one-to-one table as number of "
350 "rhs column entries ("
351 << rhs_join_col_num_entries <<
") exceeds range for rhs join column ("
352 << max_unique_hash_input_entries <<
").";
363 const auto inner_col = cols.first;
365 inner_col->getTableKey(),
369 if (query_info.fragments.empty()) {
372 if (query_info.getNumTuplesUpperBound() >
373 static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
376 std::vector<std::future<void>> init_threads;
387 std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
388 std::vector<ColumnsForDevice> columns_per_device;
389 std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
406 std::vector<ChunkKey> chunk_key_per_device;
409 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
410 fragments_per_device.emplace_back(
413 : query_info.fragments);
415 dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
418 const auto chunk_key =
419 genChunkKey(fragments_per_device[device_id], outer_col, inner_col);
420 chunk_key_per_device.emplace_back(std::move(chunk_key));
424 auto hashtable_access_path_info =
432 fragments_per_device,
436 table_keys_ = hashtable_access_path_info.table_keys;
451 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
453 fragments_per_device[device_id].begin(),
454 fragments_per_device[device_id].end(),
456 [](
size_t sum,
const auto& fragment) {
return sum + fragment.getNumTuples(); });
459 outer_col ? outer_col : inner_col,
461 chunk_key_per_device[device_id],
472 const bool invalid_cache_key =
474 if (!invalid_cache_key) {
479 std::for_each(hashtable_cache_key_.cbegin(),
480 hashtable_cache_key_.cend(),
526 if (!(col_range_ == copied_col_range)) {
533 auto allow_hashtable_recycling =
535 needs_dict_translation_,
537 inner_col->getTableKey());
538 bool has_invalid_cached_hash_table =
false;
541 allow_hashtable_recycling, invalid_cache_key,
join_type_)) {
544 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
553 has_invalid_cached_hash_table =
true;
558 if (has_invalid_cached_hash_table) {
564 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
579 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
580 columns_per_device.emplace_back(
584 ? dev_buff_owners[device_id].
get()
589 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
590 const auto chunk_key =
genChunkKey(fragments_per_device[device_id],
597 columns_per_device[device_id],
602 for (
auto& init_thread : init_threads) {
605 for (
auto& init_thread : init_threads) {
609 VLOG(1) <<
"RHS/Inner hash join values detected to not be unique, falling back to "
610 "One-to-Many hash layout.";
614 init_threads.clear();
616 CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
618 CHECK_EQ(columns_per_device.size(), size_t(device_count_));
619 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
620 const auto chunk_key =
genChunkKey(fragments_per_device[device_id],
627 columns_per_device[device_id],
632 for (
auto& init_thread : init_threads) {
635 for (
auto& init_thread : init_threads) {
642 const std::vector<InnerOuter>& inner_outer_pairs)
const {
652 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
655 std::vector<JoinColumn> join_columns;
656 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
657 std::vector<JoinColumnTypeInfo> join_column_types;
658 std::vector<JoinBucketInfo> join_bucket_info;
659 std::vector<std::shared_ptr<void>> malloc_owner;
660 const auto effective_memory_level =
663 const auto inner_col = inner_outer_pair.first;
665 if (inner_cd && inner_cd->isVirtualCol) {
670 effective_memory_level,
677 const auto& ti = inner_col->get_type_info();
686 return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
697 const auto effective_memory_level =
702 auto& join_column = columns_for_device.
join_columns.front();
709 effective_memory_level,
719 effective_memory_level,
722 throw std::runtime_error(
"Unexpected error building one to many hash table: " +
734 const int device_id) {
736 const auto inner_col = cols.first;
741 VLOG(1) <<
"Stop building a hash table based on a column " << inner_col->toString()
742 <<
": it is from an empty table";
749 const int32_t hash_join_invalid_val{-1};
750 auto hashtable_layout = layout;
751 auto allow_hashtable_recycling =
755 inner_col->getTableKey());
756 if (allow_hashtable_recycling) {
762 if (cached_hashtable_layout_type) {
770 : 2 * entry_count + join_column.
num_elems;
771 const auto hash_table_size = hash_table_entry_count *
sizeof(int32_t);
777 CHECK(!chunk_key.empty());
778 std::shared_ptr<PerfectHashTable> hash_table{
nullptr};
779 decltype(std::chrono::steady_clock::now()) ts1, ts2;
780 ts1 = std::chrono::steady_clock::now();
793 hash_join_invalid_val,
804 hash_join_invalid_val,
808 ts2 = std::chrono::steady_clock::now();
810 std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
812 hash_table->setColumnNumElems(join_column.
num_elems);
813 if (allow_hashtable_recycling && hash_table) {
833 const auto& ti = inner_col->get_type_info();
834 CHECK(ti.is_string());
856 builder.initHashTableOnGpu(chunk_key,
865 hash_join_invalid_val,
871 if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id]) {
875 hash_tables_for_device_[device_id]->getLayout(),
890 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
894 ChunkKey chunk_key{column_key.
db_id, column_key.table_id, column_key.column_id};
896 std::for_each(fragments.cbegin(), fragments.cend(), [&chunk_key](
const auto& fragment) {
898 chunk_key.push_back(fragment.fragmentId);
900 if (ti.is_string()) {
905 size_t outer_elem_count =
907 outer_query_info.fragments.end(),
909 [&chunk_key](
size_t sum,
const auto& fragment) {
910 chunk_key.push_back(fragment.fragmentId);
911 return sum + fragment.getNumTuples();
913 chunk_key.push_back(outer_elem_count);
925 VLOG(1) <<
"Checking CPU hash table cache.";
937 std::shared_ptr<PerfectHashTable> hashtable_ptr,
939 size_t hashtable_building_time) {
941 CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
948 hashtable_building_time);
954 if (hash_ptr->getType()->isIntegerTy(64)) {
957 CHECK(hash_ptr->getType()->isPointerTy());
958 return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
960 llvm::Type::getInt64Ty(
executor_->cgen_state_->context_));
964 llvm::Value* hash_ptr,
967 const int shard_count,
978 std::vector<llvm::Value*> hash_join_idx_args{
980 executor_->cgen_state_->castToTypeIn(key_lv, 64),
984 const auto expected_hash_entry_count =
986 const auto entry_count_per_shard =
987 (expected_hash_entry_count + shard_count - 1) / shard_count;
988 hash_join_idx_args.push_back(
989 executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
990 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt<uint32_t>(shard_count));
994 if (!key_col_logical_ti.get_notnull() ||
isBitwiseEq()) {
995 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt(
998 auto special_date_bucketization_case = key_col_ti.
get_type() ==
kDATE;
1000 if (special_date_bucketization_case) {
1001 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt(
1004 hash_join_idx_args.push_back(
1009 if (special_date_bucketization_case) {
1010 hash_join_idx_args.emplace_back(
1014 return hash_join_idx_args;
1018 const size_t index) {
1021 auto key_col = cols.second;
1023 auto val_col = cols.first;
1030 if (key_col_var && val_col_var &&
1035 throw std::runtime_error(
1036 "Query execution fails because the query contains not supported self-join "
1037 "pattern. We suspect the query requires multiple left-deep join tree due to "
1039 "join condition of the self-join and is not supported for now. Please consider "
1040 "rewriting table order in "
1048 auto hash_join_idx_args =
getHashJoinArgs(pos_ptr, key_lv, key_col, shard_count, co);
1050 const auto& key_col_ti = key_col->get_type_info();
1052 auto bucketize = (key_col_ti.get_type() ==
kDATE);
1055 !key_col_ti.get_notnull(),
1080 return hash_table->getEntryCount() *
sizeof(int32_t);
1092 std::shared_ptr<PerfectHashTable>& cpu_hash_table,
1093 const int device_id,
1097 CHECK(cpu_hash_table);
1102 cpu_hash_table->getLayout(),
1103 cpu_hash_table->getHashEntryInfo(),
1109 std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.
getHashTable();
1110 CHECK(gpu_hash_table);
1111 auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1112 CHECK(gpu_buffer_ptr);
1117 auto device_allocator = std::make_unique<CudaAllocator>(
1119 device_allocator->copyToDevice(
1121 cpu_hash_table->getCpuBuffer(),
1128 const int device_id,
1134 std::unique_ptr<int8_t[]> buffer_copy;
1136 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1138 auto data_mgr =
executor_->getDataMgr();
1139 auto device_allocator = std::make_unique<CudaAllocator>(
1141 device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1143 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1145 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1154 hash_table ? hash_table->getEntryCount() : 0,
1165 const int device_id)
const {
1170 std::unique_ptr<int8_t[]> buffer_copy;
1172 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1174 auto data_mgr =
executor_->getDataMgr();
1175 auto device_allocator = std::make_unique<CudaAllocator>(
1177 device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1179 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1181 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1188 hash_table ? hash_table->getEntryCount() : 0,
1197 const size_t index) {
1199 using namespace std::string_literals;
1202 const auto cols_and_string_op_infos =
1204 const auto& cols = cols_and_string_op_infos.first;
1205 const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
1206 auto key_col = cols.second;
1208 auto val_col = cols.first;
1213 if (key_col_var && val_col_var &&
1218 throw std::runtime_error(
1219 "Query execution failed because the query contains not supported self-join "
1220 "pattern. We suspect the query requires multiple left-deep join tree due to "
1221 "the join condition of the self-join and is not supported for now. Please "
1222 "consider chaning the table order in the FROM clause.");
1226 key_col, inner_outer_string_op_infos.second, code_generator, co);
1232 const auto hash_join_idx_args =
1236 std::string fname((key_col_ti.get_type() ==
kDATE) ?
"bucketized_hash_join_idx"s
1237 :
"hash_join_idx"s);
1240 fname +=
"_bitwise";
1243 fname +=
"_sharded";
1246 if (!
isBitwiseEq() && !key_col_ti.get_notnull()) {
1247 fname +=
"_nullable";
1249 return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1259 const std::vector<InputTableInfo>& query_infos) {
1260 std::optional<size_t> ti_idx;
1261 for (
size_t i = 0; i < query_infos.size(); ++i) {
1262 if (inner_table_key == query_infos[i].table_key) {
1268 return query_infos[*ti_idx];
1272 const size_t shard_count,
1273 const size_t device_count,
1275 const auto entries_per_shard =
1276 shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1277 size_t entries_per_device = entries_per_shard;
1279 const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1281 entries_per_device = entries_per_shard * shards_per_device;
1283 return entries_per_device;
llvm::Value * codegenHashTableLoad(const size_t table_idx)
BucketizedHashEntryInfo hash_entry_info_
int64_t getIntMin() const
std::vector< int > ChunkKey
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
ExpressionRange rhs_source_col_range_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
size_t getNormalizedHashEntryCount() const
#define IS_EQUIVALENCE(X)
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
std::mutex str_proxy_translation_mutex_
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
static void checkHashJoinReplicationConstraint(const shared::TableKey &table_key, const size_t shard_count, const Executor *executor)
const TableIdToNodeMap table_id_to_node_map_
const Expr * get_right_operand() const
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
const TableDescriptor * get_metadata_for_table(const ::shared::TableKey &table_key, bool populate_fragmenter)
bool shard_count_less_or_equal_device_count(const shared::TableKey &inner_table_key, const Executor *executor)
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
HashTableBuildDagMap hashtable_build_dag_map_
Data_Namespace::MemoryLevel get_effective_memory_level(const Data_Namespace::MemoryLevel memory_level, const bool needs_dict_translation)
const InputTableInfo & get_inner_query_info(const shared::TableKey &inner_table_key, const std::vector< InputTableInfo > &query_infos)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
size_t payloadBufferOff() const noexceptoverride
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner)
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const TemporaryTables *temporary_tables)
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
HOST DEVICE SQLTypes get_type() const
bool needs_dictionary_translation(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const Executor *executor)
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
void freeHashBufferMemory()
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
size_t max_join_hash_table_size
HashType getHashType() const noexceptoverride
BucketizedHashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
void copyCpuHashTableToGpu(std::shared_ptr< PerfectHashTable > &cpu_hash_table, const int device_id, Data_Namespace::DataMgr *data_mgr)
HashtableCacheMetaInfo hashtable_cache_meta_info_
static std::unique_ptr< HashtableRecycler > hash_table_cache_
future< Result > async(Fn &&fn, Args &&...args)
shared::TableKey getInnerTableId() const noexceptoverride
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
void allocateDeviceMemory(const size_t num_column_elems, const HashType layout, BucketizedHashEntryInfo hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
int64_t bucket_normalization
const ColumnDescriptor * get_column_descriptor_maybe(const shared::ColumnKey &column_key)
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadLocalIds)
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
bool isOneToOneHashPossible(const std::vector< ColumnsForDevice > &columns_per_device) const
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
DEVICE auto accumulate(ARGS &&...args)
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, const shared::TableKey &inner_table_key)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
size_t shardCount() const
bool needs_dict_translation_
const SQLTypeInfo & get_type_info() const
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, llvm::Value *key_lvs, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
std::unique_ptr< PerfectHashTable > getHashTable()
std::unordered_map< shared::TableKey, const RelAlgNode * > TableIdToNodeMap
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForPerfectHashJoin &info)
const std::vector< InputTableInfo > & query_infos_
ExpressionRange col_range_
const shared::ColumnKey & getColumnKey() const
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< PerfectHashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
static std::string getHashTypeString(HashType ht) noexcept
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
size_t getNormalizedHashEntryCount() const
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
ColumnCacheMap & column_cache_
LocalIdsScopeGuard setNewThreadId() const
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
bool isHintRegistered(const QueryHint hint) const
RegisteredQueryHint query_hints_
std::unordered_map< shared::TableKey, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const RegisteredQueryHint &query_hints, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query's parse tree etc.
CUstream getQueryEngineCudaStreamForDevice(int device_num)
const InnerOuterStringOpInfos inner_outer_string_op_infos_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
int64_t getIntMax() const
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const ExpressionRange &col_range, const ExpressionRange &rhs_source_col_range, const BucketizedHashEntryInfo hash_entry_info, ColumnCacheMap &column_cache, Executor *executor, const int device_count, const RegisteredQueryHint &query_hints, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map, const InnerOuterStringOpInfos &inner_outer_string_op_infos={})
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
#define DEBUG_TIMER(name)
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const shared::TableKey &table_key)
std::mutex cpu_hash_table_buff_mutex_
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const BucketizedHashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::vector< QueryPlanHash > hashtable_cache_key_
HashTable * getHashTableForDevice(const size_t device_id) const
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
const JoinType join_type_
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
ThreadLocalIds thread_local_ids()
const StringDictionaryProxy::IdMap * str_proxy_translation_map_
const std::vector< JoinColumn > join_columns
size_t bucketized_hash_entry_count
std::unordered_set< size_t > table_keys_
shared::TableKey getTableKey() const
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
bool isBitwiseEq() const override