38 std::make_unique<HashingSchemeRecycler>();
41 std::pair<InnerOuter, InnerOuterStringOpInfos>
get_cols(
52 bool const is_bw_eq) {
53 using EmptyRangeSize = boost::optional<size_t>;
55 bool const is_bw_eq) -> EmptyRangeSize {
64 return EmptyRangeSize{};
67 auto empty_range = empty_range_check(col_range, is_bw_eq);
69 return {size_t(*empty_range), 1};
72 int64_t bucket_normalization =
75 return {size_t(col_range.
getIntMax() - col_range.
getIntMin() + 1 + (is_bw_eq ? 1 : 0)),
76 bucket_normalization};
83 return is_bw_eq ? 1 : 0;
93 const Executor* executor) {
94 const auto inner_table_info = executor->getTableInfo(inner_table_id);
95 std::unordered_set<int> device_holding_fragments;
96 auto cuda_mgr = executor->getDataMgr()->getCudaMgr();
97 const int device_count = cuda_mgr ? cuda_mgr->getDeviceCount() : 1;
98 for (
const auto& fragment : inner_table_info.fragments) {
99 if (fragment.shard != -1) {
100 const auto it_ok = device_holding_fragments.emplace(fragment.shard % device_count);
112 std::pair<const Analyzer::ColumnVar*, const Analyzer::Expr*> equi_pair,
113 const Executor* executor) {
114 const auto inner_col = equi_pair.first;
116 if (!outer_col || inner_col->get_table_id() < 0 || outer_col->get_table_id() < 0) {
119 if (outer_col->get_rte_idx()) {
122 if (inner_col->get_type_info() != outer_col->get_type_info()) {
125 const auto catalog = executor->getCatalog();
126 const auto inner_td = catalog->getMetadataForTable(inner_col->get_table_id());
128 const auto outer_td = catalog->getMetadataForTable(outer_col->get_table_id());
130 if (inner_td->shardedColumnId == 0 || outer_td->shardedColumnId == 0 ||
131 inner_td->nShards != outer_td->nShards) {
138 return (inner_td->shardedColumnId == inner_col->get_column_id() &&
139 outer_td->shardedColumnId == outer_col->get_column_id()) ||
140 (outer_td->shardedColumnId == inner_col->get_column_id() &&
141 inner_td->shardedColumnId == inner_col->get_column_id())
148 const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
149 const std::vector<InputTableInfo>& query_infos,
153 const int device_count,
159 const auto cols_and_string_op_infos =
160 get_cols(qual_bin_oper.get(), *executor->getCatalog(), executor->temporary_tables_);
161 const auto& cols = cols_and_string_op_infos.first;
162 const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
163 const auto inner_col = cols.first;
165 const auto& ti = inner_col->get_type_info();
170 "Could not compute range for the expressions involved in the equijoin");
172 const auto rhs_source_col_range =
174 if (ti.is_string()) {
178 "Could not compute range for the expressions involved in the equijoin");
180 if (rhs_source_col_range.getIntMin() > rhs_source_col_range.getIntMax()) {
182 CHECK_EQ(rhs_source_col_range.getIntMin(), int64_t(0));
183 CHECK_EQ(rhs_source_col_range.getIntMax(), int64_t(-1));
184 col_range = rhs_source_col_range;
187 std::min(rhs_source_col_range.getIntMin(), col_range.getIntMin()),
188 std::max(rhs_source_col_range.getIntMax(), col_range.getIntMax()),
190 rhs_source_col_range.hasNulls());
195 const auto max_hash_entry_count =
197 ?
static_cast<size_t>(std::numeric_limits<int32_t>::max() /
sizeof(int32_t))
198 :
static_cast<size_t>(std::numeric_limits<int32_t>::max());
201 ti, col_range, qual_bin_oper->get_optype() ==
kBW_EQ);
204 if (bucketized_entry_count > max_hash_entry_count) {
208 if (qual_bin_oper->get_optype() ==
kBW_EQ &&
209 col_range.getIntMax() >= std::numeric_limits<int64_t>::max()) {
210 throw HashJoinFail(
"Cannot translate null value for kBW_EQ");
212 decltype(std::chrono::steady_clock::now()) ts1, ts2;
214 ts1 = std::chrono::steady_clock::now();
217 auto join_hash_table = std::shared_ptr<PerfectJoinHashTable>(
225 rhs_source_col_range,
229 hashtable_build_dag_map,
230 table_id_to_node_map,
231 inner_outer_string_op_infos));
233 join_hash_table->reify();
236 join_hash_table->freeHashBufferMemory();
237 throw std::runtime_error(e.what());
241 join_hash_table->freeHashBufferMemory();
242 throw HashJoinFail(std::string(
"Could not build a 1-to-1 correspondence for columns "
243 "involved in equijoin | ") +
246 throw HashJoinFail(std::string(
"Could not build hash tables for equijoin | ") +
250 std::string(
"Ran out of memory while building hash tables for equijoin | ") +
252 }
catch (
const std::exception& e) {
253 throw std::runtime_error(
254 std::string(
"Fatal error while attempting to build hash tables for join: ") +
258 ts2 = std::chrono::steady_clock::now();
259 VLOG(1) <<
"Built perfect hash table "
261 << std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count()
264 return join_hash_table;
270 const Executor* executor) {
271 if (inner_outer_string_op_infos.first.size() ||
272 inner_outer_string_op_infos.second.size()) {
275 auto inner_col = inner_outer_col_pair.first;
276 auto outer_col_expr = inner_outer_col_pair.second;
277 const auto catalog = executor->getCatalog();
280 inner_col->get_column_id(), inner_col->get_table_id(), *catalog);
282 inner_col->get_table_id(),
284 executor->getTemporaryTables());
286 if (!inner_ti.is_string()) {
292 outer_col->get_column_id(), outer_col->get_table_id(), *catalog);
294 if (!inner_cd || !outer_cd) {
298 outer_col->get_table_id(),
300 executor->getTemporaryTables());
301 CHECK_EQ(inner_ti.is_string(), outer_ti.is_string());
303 if (outer_ti.get_comp_param() != inner_ti.get_comp_param()) {
306 const auto inner_str_dict_proxy =
307 executor->getStringDictionaryProxy(inner_col->get_comp_param(),
true);
308 CHECK(inner_str_dict_proxy);
309 const auto outer_str_dict_proxy =
310 executor->getStringDictionaryProxy(inner_col->get_comp_param(),
true);
311 CHECK(outer_str_dict_proxy);
313 return *inner_str_dict_proxy != *outer_str_dict_proxy;
317 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
319 const int device_count) {
320 std::vector<Fragmenter_Namespace::FragmentInfo> shards_for_device;
321 for (
const auto& fragment : fragments) {
323 if (fragment.shard % device_count == device_id) {
324 shards_for_device.push_back(fragment);
327 return shards_for_device;
331 const std::vector<ColumnsForDevice>& columns_per_device)
const {
334 const auto max_unique_hash_input_entries =
337 .getNormalizedHashEntryCount() +
339 for (
const auto& device_columns : columns_per_device) {
340 CHECK(!device_columns.join_columns.empty());
341 const auto rhs_join_col_num_entries = device_columns.join_columns.front().num_elems;
342 if (rhs_join_col_num_entries > max_unique_hash_input_entries) {
343 VLOG(1) <<
"Skipping attempt to build perfect hash one-to-one table as number of "
344 "rhs column entries ("
345 << rhs_join_col_num_entries <<
") exceeds range for rhs join column ("
346 << max_unique_hash_input_entries <<
").";
359 const auto inner_col = cols.first;
361 inner_col->get_table_id(),
365 if (query_info.fragments.empty()) {
368 if (query_info.getNumTuplesUpperBound() >
369 static_cast<size_t>(std::numeric_limits<int32_t>::max())) {
372 std::vector<std::future<void>> init_threads;
383 std::vector<std::vector<Fragmenter_Namespace::FragmentInfo>> fragments_per_device;
384 std::vector<ColumnsForDevice> columns_per_device;
385 std::vector<std::unique_ptr<CudaAllocator>> dev_buff_owners;
402 std::vector<ChunkKey> chunk_key_per_device;
405 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
406 fragments_per_device.emplace_back(
409 : query_info.fragments);
411 dev_buff_owners.emplace_back(std::make_unique<CudaAllocator>(
414 const auto chunk_key =
415 genChunkKey(fragments_per_device[device_id], outer_col, inner_col);
416 chunk_key_per_device.emplace_back(std::move(chunk_key));
420 auto hashtable_access_path_info =
428 fragments_per_device,
432 table_keys_ = hashtable_access_path_info.table_keys;
438 chunk_key_per_device,
439 executor_->getCatalog()->getDatabaseId(),
448 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
450 fragments_per_device[device_id].begin(),
451 fragments_per_device[device_id].end(),
453 [](
size_t sum,
const auto& fragment) {
return sum + fragment.getNumTuples(); });
456 outer_col ? outer_col : inner_col,
458 chunk_key_per_device[device_id],
469 const bool invalid_cache_key =
471 if (!invalid_cache_key) {
476 std::for_each(hashtable_cache_key_.cbegin(),
477 hashtable_cache_key_.cend(),
524 auto allow_hashtable_recycling =
526 needs_dict_translation_,
528 inner_col->get_table_id());
529 bool has_invalid_cached_hash_table =
false;
532 allow_hashtable_recycling, invalid_cache_key,
join_type_)) {
535 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
544 has_invalid_cached_hash_table =
true;
549 if (has_invalid_cached_hash_table) {
555 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
570 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
571 columns_per_device.emplace_back(
575 ? dev_buff_owners[device_id].
get()
581 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
582 const auto chunk_key =
genChunkKey(fragments_per_device[device_id],
589 columns_per_device[device_id],
594 for (
auto& init_thread : init_threads) {
597 for (
auto& init_thread : init_threads) {
601 VLOG(1) <<
"RHS/Inner hash join values detected to not be unique, falling back to "
602 "One-to-Many hash layout.";
606 init_threads.clear();
608 CHECK_EQ(dev_buff_owners.size(), size_t(device_count_));
610 CHECK_EQ(columns_per_device.size(), size_t(device_count_));
611 for (
int device_id = 0; device_id <
device_count_; ++device_id) {
612 const auto chunk_key =
genChunkKey(fragments_per_device[device_id],
619 columns_per_device[device_id],
624 for (
auto& init_thread : init_threads) {
627 for (
auto& init_thread : init_threads) {
634 const std::vector<InnerOuter>& inner_outer_pairs)
const {
644 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
648 std::vector<JoinColumn> join_columns;
649 std::vector<std::shared_ptr<Chunk_NS::Chunk>> chunks_owner;
650 std::vector<JoinColumnTypeInfo> join_column_types;
651 std::vector<JoinBucketInfo> join_bucket_info;
652 std::vector<std::shared_ptr<void>> malloc_owner;
653 const auto effective_memory_level =
656 const auto inner_col = inner_outer_pair.first;
658 inner_col->get_column_id(), inner_col->get_table_id(), catalog);
659 if (inner_cd && inner_cd->isVirtualCol) {
664 effective_memory_level,
671 const auto& ti = inner_col->get_type_info();
680 return {join_columns, join_column_types, chunks_owner, join_bucket_info, malloc_owner};
689 const auto effective_memory_level =
694 auto& join_column = columns_for_device.
join_columns.front();
701 effective_memory_level,
711 effective_memory_level,
714 throw std::runtime_error(
"Unexpected error building one to many hash table: " +
726 const int device_id) {
728 const auto inner_col = cols.first;
742 const int32_t hash_join_invalid_val{-1};
743 auto hashtable_layout = layout;
744 auto allow_hashtable_recycling =
748 inner_col->get_table_id());
749 if (allow_hashtable_recycling) {
755 if (cached_hashtable_layout_type) {
761 CHECK(!chunk_key.empty());
762 std::shared_ptr<PerfectHashTable> hash_table{
nullptr};
763 decltype(std::chrono::steady_clock::now()) ts1, ts2;
764 ts1 = std::chrono::steady_clock::now();
777 hash_join_invalid_val,
787 hash_join_invalid_val,
791 ts2 = std::chrono::steady_clock::now();
793 std::chrono::duration_cast<std::chrono::milliseconds>(ts2 - ts1).count();
794 hash_table->setHashEntryInfo(hash_entry_info);
795 hash_table->setColumnNumElems(join_column.
num_elems);
796 if (allow_hashtable_recycling && hash_table) {
816 const auto& ti = inner_col->get_type_info();
817 CHECK(ti.is_string());
839 builder.initHashTableOnGpu(chunk_key,
848 hash_join_invalid_val,
854 if (!err && allow_hashtable_recycling && hash_tables_for_device_[device_id]) {
858 hash_tables_for_device_[device_id]->getLayout(),
873 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragments,
880 std::for_each(fragments.cbegin(), fragments.cend(), [&chunk_key](
const auto& fragment) {
882 chunk_key.push_back(fragment.fragmentId);
884 if (ti.is_string()) {
889 size_t outer_elem_count =
891 outer_query_info.fragments.end(),
893 [&chunk_key](
size_t sum,
const auto& fragment) {
894 chunk_key.push_back(fragment.fragmentId);
895 return sum + fragment.getNumTuples();
897 chunk_key.push_back(outer_elem_count);
909 VLOG(1) <<
"Checking CPU hash table cache.";
921 std::shared_ptr<PerfectHashTable> hashtable_ptr,
923 size_t hashtable_building_time) {
925 CHECK(hashtable_ptr && !hashtable_ptr->getGpuBuffer());
932 hashtable_building_time);
938 if (hash_ptr->getType()->isIntegerTy(64)) {
941 CHECK(hash_ptr->getType()->isPointerTy());
942 return executor_->cgen_state_->ir_builder_.CreatePtrToInt(
944 llvm::Type::getInt64Ty(
executor_->cgen_state_->context_));
948 llvm::Value* hash_ptr,
951 const int shard_count,
961 auto hash_entry_info =
964 std::vector<llvm::Value*> hash_join_idx_args{
966 executor_->cgen_state_->castToTypeIn(key_lv, 64),
970 const auto expected_hash_entry_count =
972 const auto entry_count_per_shard =
973 (expected_hash_entry_count + shard_count - 1) / shard_count;
974 hash_join_idx_args.push_back(
975 executor_->cgen_state_->llInt<uint32_t>(entry_count_per_shard));
976 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt<uint32_t>(shard_count));
980 if (!key_col_logical_ti.get_notnull() ||
isBitwiseEq()) {
981 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt(
984 auto special_date_bucketization_case = key_col_ti.
get_type() ==
kDATE;
986 if (special_date_bucketization_case) {
987 hash_join_idx_args.push_back(
executor_->cgen_state_->llInt(
990 hash_join_idx_args.push_back(
995 if (special_date_bucketization_case) {
996 hash_join_idx_args.emplace_back(
997 executor_->cgen_state_->llInt(hash_entry_info.bucket_normalization));
1000 return hash_join_idx_args;
1004 const size_t index) {
1010 auto key_col = cols.second;
1012 auto val_col = cols.first;
1019 if (key_col_var && val_col_var &&
1024 throw std::runtime_error(
1025 "Query execution fails because the query contains not supported self-join "
1026 "pattern. We suspect the query requires multiple left-deep join tree due to "
1028 "join condition of the self-join and is not supported for now. Please consider "
1029 "rewriting table order in "
1037 auto hash_join_idx_args =
getHashJoinArgs(pos_ptr, key_lv, key_col, shard_count, co);
1039 const auto& key_col_ti = key_col->get_type_info();
1041 auto bucketize = (key_col_ti.get_type() ==
kDATE);
1044 !key_col_ti.get_notnull(),
1069 return hash_table->getEntryCount() *
sizeof(int32_t);
1081 std::shared_ptr<PerfectHashTable>& cpu_hash_table,
1082 const int device_id,
1086 CHECK(cpu_hash_table);
1091 cpu_hash_table->getLayout(),
1092 cpu_hash_table->getHashEntryInfo(),
1098 std::shared_ptr<PerfectHashTable> gpu_hash_table = gpu_builder.
getHashTable();
1099 CHECK(gpu_hash_table);
1100 auto gpu_buffer_ptr = gpu_hash_table->getGpuBuffer();
1101 CHECK(gpu_buffer_ptr);
1106 auto device_allocator = std::make_unique<CudaAllocator>(
1108 device_allocator->copyToDevice(
1110 cpu_hash_table->getCpuBuffer(),
1117 const int device_id,
1123 std::unique_ptr<int8_t[]> buffer_copy;
1125 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1127 auto data_mgr =
executor_->getDataMgr();
1128 auto device_allocator = std::make_unique<CudaAllocator>(
1130 device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1132 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1134 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1143 hash_table ? hash_table->getEntryCount() : 0,
1154 const int device_id)
const {
1159 std::unique_ptr<int8_t[]> buffer_copy;
1161 buffer_copy = std::make_unique<int8_t[]>(buffer_size);
1163 auto data_mgr =
executor_->getDataMgr();
1164 auto device_allocator = std::make_unique<CudaAllocator>(
1166 device_allocator->copyFromDevice(buffer_copy.get(), buffer, buffer_size);
1168 auto ptr1 = buffer_copy ? buffer_copy.get() :
reinterpret_cast<const int8_t*
>(buffer);
1170 auto ptr1 =
reinterpret_cast<const int8_t*
>(buffer);
1177 hash_table ? hash_table->getEntryCount() : 0,
1186 const size_t index) {
1188 using namespace std::string_literals;
1191 const auto cols_and_string_op_infos =
get_cols(
1193 const auto& cols = cols_and_string_op_infos.first;
1194 const auto& inner_outer_string_op_infos = cols_and_string_op_infos.second;
1195 auto key_col = cols.second;
1197 auto val_col = cols.first;
1202 if (key_col_var && val_col_var &&
1207 throw std::runtime_error(
1208 "Query execution failed because the query contains not supported self-join "
1209 "pattern. We suspect the query requires multiple left-deep join tree due to "
1210 "the join condition of the self-join and is not supported for now. Please "
1211 "consider chaning the table order in the FROM clause.");
1215 key_col, inner_outer_string_op_infos.second, code_generator, co);
1221 const auto hash_join_idx_args =
1225 std::string fname((key_col_ti.get_type() ==
kDATE) ?
"bucketized_hash_join_idx"s
1226 :
"hash_join_idx"s);
1229 fname +=
"_bitwise";
1232 fname +=
"_sharded";
1235 if (!
isBitwiseEq() && !key_col_ti.get_notnull()) {
1236 fname +=
"_nullable";
1238 return executor_->cgen_state_->emitCall(fname, hash_join_idx_args);
1247 const int inner_table_id,
1248 const std::vector<InputTableInfo>& query_infos) {
1249 std::optional<size_t> ti_idx;
1250 for (
size_t i = 0; i < query_infos.size(); ++i) {
1251 if (inner_table_id == query_infos[i].table_id) {
1257 return query_infos[*ti_idx];
1261 const size_t shard_count,
1262 const size_t device_count,
1264 const auto entries_per_shard =
1265 shard_count ? (total_entries + shard_count - 1) / shard_count : total_entries;
1266 size_t entries_per_device = entries_per_shard;
1268 const auto shards_per_device = (shard_count + device_count - 1) / device_count;
1270 entries_per_device = entries_per_shard * shards_per_device;
1272 return entries_per_device;
llvm::Value * codegenHashTableLoad(const size_t table_idx)
void reifyForDevice(const ChunkKey &hash_table_key, const ColumnsForDevice &columns_for_device, const HashType layout, const int device_id, const logger::ThreadId parent_thread_id)
int64_t getIntMin() const
std::vector< int > ChunkKey
size_t get_hash_entry_count(const ExpressionRange &col_range, const bool is_bw_eq)
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
Data_Namespace::MemoryLevel getEffectiveMemoryLevel(const std::vector< InnerOuter > &inner_outer_pairs) const
ExpressionRange rhs_source_col_range_
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
bool self_join_not_covered_by_left_deep_tree(const Analyzer::ColumnVar *key_side, const Analyzer::ColumnVar *val_side, const int max_rte_covered)
static bool isInvalidHashTableCacheKey(const std::vector< QueryPlanHash > &cache_keys)
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
HashEntryInfo get_bucketized_hash_entry_info(SQLTypeInfo const &context_ti, ExpressionRange const &col_range, bool const is_bw_eq)
#define IS_EQUIVALENCE(X)
static bool canAccessHashTable(bool allow_hash_table_recycling, bool invalid_cache_key, JoinType join_type)
const Data_Namespace::MemoryLevel memory_level_
size_t getComponentBufferSize() const noexceptoverride
std::shared_ptr< Analyzer::BinOper > qual_bin_oper_
std::mutex str_proxy_translation_mutex_
ChunkKey genChunkKey(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Analyzer::Expr *outer_col, const Analyzer::ColumnVar *inner_col) const
const TableIdToNodeMap table_id_to_node_map_
const Expr * get_right_operand() const
size_t offsetBufferOff() const noexceptoverride
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
std::vector< std::shared_ptr< HashTable > > hash_tables_for_device_
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
HashTableBuildDagMap hashtable_build_dag_map_
Data_Namespace::MemoryLevel get_effective_memory_level(const Data_Namespace::MemoryLevel memory_level, const bool needs_dict_translation)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
const InputTableInfo & getInnerQueryInfo(const Analyzer::ColumnVar *inner_col) const
#define DEBUG_TIMER_NEW_THREAD(parent_thread_id)
size_t payloadBufferOff() const noexceptoverride
std::shared_ptr< PerfectHashTable > initHashTableOnCpuFromCache(QueryPlanHash key, CacheItemType item_type, DeviceIdentifier device_identifier)
HOST DEVICE SQLTypes get_type() const
bool needs_dictionary_translation(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const Executor *executor)
static llvm::Value * codegenColOrStringOper(const Analyzer::Expr *col_or_string_oper, const std::vector< StringOps_Namespace::StringOpInfo > &string_op_infos, CodeGenerator &code_generator, const CompilationOptions &co)
void freeHashBufferMemory()
const int get_max_rte_scan_table(std::unordered_map< int, llvm::Value * > &scan_idx_to_hash_pos)
HashType getHashType() const noexceptoverride
void copyCpuHashTableToGpu(std::shared_ptr< PerfectHashTable > &cpu_hash_table, const int device_id, Data_Namespace::DataMgr *data_mgr)
HashtableCacheMetaInfo hashtable_cache_meta_info_
static std::unique_ptr< HashtableRecycler > hash_table_cache_
ColumnsForDevice fetchColumnsForDevice(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, DeviceAllocator *dev_buff_owner, const Catalog_Namespace::Catalog &catalog)
future< Result > async(Fn &&fn, Args &&...args)
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
std::unordered_map< size_t, HashTableBuildDag > HashTableBuildDagMap
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
std::unordered_map< int, const RelAlgNode * > TableIdToNodeMap
std::vector< Fragmenter_Namespace::FragmentInfo > only_shards_for_device(const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const int device_id, const int device_count)
bool isOneToOneHashPossible(const std::vector< ColumnsForDevice > &columns_per_device) const
int8_t * getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
DEVICE auto accumulate(ARGS &&...args)
ExpressionRange getExpressionRange(const Analyzer::BinOper *expr, const std::vector< InputTableInfo > &query_infos, const Executor *, boost::optional< std::list< std::shared_ptr< Analyzer::Expr >>> simple_quals)
size_t shardCount() const
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
bool needs_dict_translation_
const SQLTypeInfo & get_type_info() const
std::vector< llvm::Value * > getHashJoinArgs(llvm::Value *hash_ptr, llvm::Value *key_lvs, const Analyzer::Expr *key_col, const int shard_count, const CompilationOptions &co)
void initOneToManyHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::vector< InnerOuter > inner_outer_pairs_
static ExpressionRange makeIntRange(const int64_t int_min, const int64_t int_max, const int64_t bucket, const bool has_nulls)
static const StringDictionaryProxy::IdMap * translateInnerToOuterStrDictProxies(const InnerOuter &cols, const InnerOuterStringOpInfos &inner_outer_string_op_infos, ExpressionRange &old_col_range, const Executor *executor)
PerfectJoinHashTable(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const Analyzer::ColumnVar *col_var, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const ExpressionRange &col_range, const ExpressionRange &rhs_source_col_range, ColumnCacheMap &column_cache, Executor *executor, const int device_count, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map, const InnerOuterStringOpInfos &inner_outer_string_op_infos={})
void initOneToOneHashTableOnCpu(const JoinColumn &join_column, const ExpressionRange &col_range, const bool is_bitwise_eq, const InnerOuter &cols, const StringDictionaryProxy::IdMap *str_proxy_translation_map, const JoinType join_type, const HashType hash_type, const HashEntryInfo hash_entry_info, const int32_t hash_join_invalid_val, const Executor *executor)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
static std::unique_ptr< HashingSchemeRecycler > hash_table_layout_cache_
std::unique_ptr< PerfectHashTable > getHashTable()
const InputTableInfo & get_inner_query_info(const int inner_table_id, const std::vector< InputTableInfo > &query_infos)
static QueryPlanHash getAlternativeCacheKey(AlternativeCacheKeyForPerfectHashJoin &info)
const std::vector< InputTableInfo > & query_infos_
ExpressionRange col_range_
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const JoinType join_type, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const HashTableBuildDagMap &hashtable_build_dag_map, const TableIdToNodeMap &table_id_to_node_map)
Make hash table from an in-flight SQL query's parse tree etc.
void allocateDeviceMemory(const size_t num_column_elems, const HashType layout, HashEntryInfo hash_entry_info, const size_t shard_count, const int device_id, const int device_count, const Executor *executor)
int getInnerTableId() const noexceptoverride
void putHashTableOnCpuToCache(QueryPlanHash key, CacheItemType item_type, std::shared_ptr< PerfectHashTable > hashtable_ptr, DeviceIdentifier device_identifier, size_t hashtable_building_time)
static std::string getHashTypeString(HashType ht) noexcept
static std::unordered_set< size_t > getAlternativeTableKeys(const std::vector< ChunkKey > &chunk_keys, int db_id, int inner_table_id)
std::string toString(const ExecutorDeviceType device_type, const int device_id=0, bool raw=false) const override
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
static std::string toString(const std::string &type, const std::string &layout_type, size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size, bool raw=false)
Decode hash table into a human-readable string.
ColumnCacheMap & column_cache_
size_t get_entries_per_device(const size_t total_entries, const size_t shard_count, const size_t device_count, const Data_Namespace::MemoryLevel memory_level)
CUstream getQueryEngineCudaStreamForDevice(int device_num)
const InnerOuterStringOpInfos inner_outer_string_op_infos_
llvm::Value * codegenSlot(const CompilationOptions &, const size_t) override
bool shard_count_less_or_equal_device_count(const int inner_table_id, const Executor *executor)
size_t getNormalizedHashEntryCount() const
int64_t getIntMax() const
ColumnType get_join_column_type_kind(const SQLTypeInfo &ti)
int64_t getBucket() const
std::pair< std::vector< StringOps_Namespace::StringOpInfo >, std::vector< StringOps_Namespace::StringOpInfo >> InnerOuterStringOpInfos
#define DEBUG_TIMER(name)
std::mutex cpu_hash_table_buff_mutex_
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
size_t countBufferOff() const noexceptoverride
const Expr * get_left_operand() const
static DecodedJoinHashBufferSet toSet(size_t key_component_count, size_t key_component_width, size_t entry_count, const int8_t *ptr1, const int8_t *ptr2, const int8_t *ptr3, const int8_t *ptr4, size_t buffer_size)
Decode hash table into a std::set for easy inspection and validation.
int initHashTableForDevice(const ChunkKey &chunk_key, const JoinColumn &join_column, const InnerOuter &cols, const HashType layout, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id)
int get_column_id() const
std::vector< QueryPlanHash > hashtable_cache_key_
HashTable * getHashTableForDevice(const size_t device_id) const
static constexpr DeviceIdentifier CPU_DEVICE_IDENTIFIER
const JoinType join_type_
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
static HashtableAccessPathInfo getHashtableAccessPathInfo(const std::vector< InnerOuter > &inner_outer_pairs, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_infos_pairs, const SQLOps op_type, const JoinType join_type, const HashTableBuildDagMap &hashtable_build_dag_map, int device_count, int shard_count, const std::vector< std::vector< Fragmenter_Namespace::FragmentInfo >> &frags_for_device, Executor *executor)
std::set< DecodedJoinHashBufferEntry > toSet(const ExecutorDeviceType device_type, const int device_id) const override
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
const StringDictionaryProxy::IdMap * str_proxy_translation_map_
const std::vector< JoinColumn > join_columns
std::unordered_set< size_t > table_keys_
HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t) override
static bool isSafeToCacheHashtable(const TableIdToNodeMap &table_id_to_node_map, bool need_dict_translation, const std::vector< InnerOuterStringOpInfos > &inner_outer_string_op_info_pairs, const int table_id)
bool isBitwiseEq() const override