37 const std::vector<Fragmenter_Namespace::FragmentInfo>& fragment_info,
40 std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
42 std::vector<std::shared_ptr<void>>& malloc_owner,
45 static std::mutex fragment_fetch_mutex;
46 std::lock_guard<std::mutex> fragment_fetch_lock(fragment_fetch_mutex);
51 effective_memory_level,
58 CHECK(dev_buff_owner);
76 const int device_id) {
81 for (
size_t i = 0; i < memsz; ++i) {
93 const int device_id)
const {
94 return toStringFlat<int64_t>(
this, device_type, device_id);
98 const int device_id)
const {
99 return toStringFlat<int32_t>(
this, device_type, device_id);
105 for (
auto k : e.
key) {
147 const std::vector<llvm::Value*>& hash_join_idx_args_in,
148 const bool is_sharded,
149 const bool col_is_nullable,
151 const int64_t sub_buff_size,
153 bool is_bucketized) {
157 std::string fname(is_bucketized ?
"bucketized_hash_join_idx"s :
"hash_join_idx"s);
165 if (!is_bw_eq && col_is_nullable) {
166 fname +=
"_nullable";
169 const auto slot_lv = executor->cgen_state_->emitCall(fname, hash_join_idx_args_in);
170 const auto slot_valid_lv = executor->cgen_state_->ir_builder_.CreateICmpSGE(
171 slot_lv, executor->cgen_state_->llInt(int64_t(0)));
173 auto pos_ptr = hash_join_idx_args_in[0];
176 auto count_ptr = executor->cgen_state_->ir_builder_.CreateAdd(
177 pos_ptr, executor->cgen_state_->llInt(sub_buff_size));
178 auto hash_join_idx_args = hash_join_idx_args_in;
179 hash_join_idx_args[0] = executor->cgen_state_->ir_builder_.CreatePtrToInt(
180 count_ptr, llvm::Type::getInt64Ty(executor->cgen_state_->context_));
182 const auto row_count_lv = executor->cgen_state_->ir_builder_.CreateSelect(
184 executor->cgen_state_->emitCall(fname, hash_join_idx_args),
185 executor->cgen_state_->llInt(int64_t(0)));
186 auto rowid_base_i32 = executor->cgen_state_->ir_builder_.CreateIntToPtr(
187 executor->cgen_state_->ir_builder_.CreateAdd(
188 pos_ptr, executor->cgen_state_->llInt(2 * sub_buff_size)),
189 llvm::Type::getInt32PtrTy(executor->cgen_state_->context_));
191 executor->cgen_state_->ir_builder_.CreateGEP(rowid_base_i32, slot_lv);
192 return {rowid_ptr_i32, row_count_lv, slot_lv};
197 llvm::Value* hash_ptr =
nullptr;
198 const auto total_table_count =
199 executor->plan_state_->join_info_.join_hash_tables_.size();
200 CHECK_LT(table_idx, total_table_count);
201 if (total_table_count > 1) {
202 auto hash_tables_ptr =
205 table_idx > 0 ? executor->cgen_state_->ir_builder_.CreateGEP(
207 executor->cgen_state_->llInt(static_cast<int64_t>(table_idx)))
209 hash_ptr = executor->cgen_state_->ir_builder_.CreateLoad(hash_pptr);
211 hash_ptr =
get_arg_by_name(executor->cgen_state_->row_func_,
"join_hash_tables");
219 const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
220 const std::vector<InputTableInfo>& query_infos,
223 const int device_count,
228 std::shared_ptr<HashJoin> join_hash_table;
231 throw std::runtime_error(
232 "Overlaps hash join disabled, attempting to fall back to loop join");
234 if (qual_bin_oper->is_overlaps_oper()) {
235 VLOG(1) <<
"Trying to build geo hash table:";
243 }
else if (dynamic_cast<const Analyzer::ExpressionTuple*>(
244 qual_bin_oper->get_left_operand())) {
245 VLOG(1) <<
"Trying to build keyed hash table:";
255 VLOG(1) <<
"Trying to build perfect hash table:";
265 CHECK_EQ(join_quals.size(), size_t(1));
266 const auto join_qual =
268 VLOG(1) <<
"Trying to build keyed hash table after perfect hash table:";
278 CHECK(join_hash_table);
281 for (
int device_id = 0; device_id < join_hash_table->getDeviceCount();
285 VLOG(2) <<
"Built GPU hash table: " 291 VLOG(2) <<
"Built CPU hash table: " 296 return join_hash_table;
300 const std::vector<InnerOuter>& inner_outer_pairs,
301 const Executor* executor) {
303 std::vector<const void*> sd_inner_proxy_per_key;
304 std::vector<const void*> sd_outer_proxy_per_key;
305 std::vector<ChunkKey> cache_key_chunks;
306 const auto db_id = executor->getCatalog()->getCurrentDB().dbId;
307 for (
const auto& inner_outer_pair : inner_outer_pairs) {
308 const auto inner_col = inner_outer_pair.first;
309 const auto outer_col = inner_outer_pair.second;
310 const auto& inner_ti = inner_col->get_type_info();
311 const auto& outer_ti = outer_col->get_type_info();
312 ChunkKey cache_key_chunks_for_column{
313 db_id, inner_col->get_table_id(), inner_col->get_column_id()};
314 if (inner_ti.is_string() &&
315 !(inner_ti.get_comp_param() == outer_ti.get_comp_param())) {
316 CHECK(outer_ti.is_string());
319 const auto sd_inner_proxy = executor->getStringDictionaryProxy(
320 inner_ti.get_comp_param(), executor->getRowSetMemoryOwner(),
true);
321 const auto sd_outer_proxy = executor->getStringDictionaryProxy(
322 outer_ti.get_comp_param(), executor->getRowSetMemoryOwner(),
true);
323 CHECK(sd_inner_proxy && sd_outer_proxy);
324 sd_inner_proxy_per_key.push_back(sd_inner_proxy);
325 sd_outer_proxy_per_key.push_back(sd_outer_proxy);
326 cache_key_chunks_for_column.push_back(sd_outer_proxy->getGeneration());
328 sd_inner_proxy_per_key.emplace_back();
329 sd_outer_proxy_per_key.emplace_back();
331 cache_key_chunks.push_back(cache_key_chunks_for_column);
333 return {sd_inner_proxy_per_key, sd_outer_proxy_per_key, cache_key_chunks};
337 std::string_view column,
339 Executor* executor) {
340 auto catalog = executor->getCatalog();
343 auto tmeta = catalog->getMetadataForTable(std::string(table));
346 auto cmeta = catalog->getMetadataForColumn(tmeta->tableId, std::string(column));
349 auto ti = cmeta->columnType;
351 if (ti.is_geometry() && ti.get_type() !=
kPOINT) {
353 switch (ti.get_type()) {
355 geoColumnId = cmeta->columnId + 2;
359 geoColumnId = cmeta->columnId + 3;
363 geoColumnId = cmeta->columnId + 4;
369 cmeta = catalog->getMetadataForColumn(tmeta->tableId, geoColumnId);
371 ti = cmeta->columnType;
375 std::make_shared<Analyzer::ColumnVar>(ti, tmeta->tableId, cmeta->columnId, rte_idx);
390 std::set<const Analyzer::ColumnVar*>
result;
391 for (
const auto& expr_component : expr_tuple->
getTuple()) {
392 const auto component_rte_set = visitor.
visit(expr_component.get());
393 result.insert(component_rte_set.begin(), component_rte_set.end());
399 const std::set<const Analyzer::ColumnVar*>& aggregate,
400 const std::set<const Analyzer::ColumnVar*>& next_result)
const override {
402 result.insert(next_result.begin(), next_result.end());
408 std::unordered_set<int> phys_table_ids;
409 for (
auto cv : cvs) {
410 phys_table_ids.insert(cv->get_table_id());
413 std::unordered_set<PhysicalInput> phys_inputs;
414 for (
auto cv : cvs) {
415 phys_inputs.emplace(
PhysicalInput{cv->get_column_id(), cv->get_table_id()});
418 executor->setupCaching(phys_inputs, phys_table_ids);
422 std::set<const Analyzer::ColumnVar*> cvs,
423 Executor* executor) {
424 auto catalog = executor->getCatalog();
427 std::unordered_set<int> phys_table_ids;
428 for (
auto cv : cvs) {
429 phys_table_ids.insert(cv->get_table_id());
435 std::vector<InputTableInfo> query_infos(phys_table_ids.size());
437 for (
auto id : phys_table_ids) {
438 auto tmeta = catalog->getMetadataForTable(
id);
439 query_infos[i].table_id = id;
440 query_infos[i].info = tmeta->fragmenter->getFragmentsForQuery();
449 std::string_view table1,
450 std::string_view column1,
451 std::string_view table2,
452 std::string_view column2,
455 const int device_count,
457 Executor* executor) {
461 auto qual_bin_oper = std::make_shared<Analyzer::BinOper>(
kBOOLEAN,
kEQ,
kONE, a1, a2);
463 std::set<const Analyzer::ColumnVar*> cvs =
482 const std::shared_ptr<Analyzer::BinOper> qual_bin_oper,
485 const int device_count,
487 Executor* executor) {
488 std::set<const Analyzer::ColumnVar*> cvs =
506 const size_t shard_count,
507 const Executor* executor) {
513 const auto inner_td = executor->getCatalog()->getMetadataForTable(table_id);
534 const Executor* executor) {
537 std::shared_ptr<Analyzer::BinOper> redirected_bin_oper;
539 std::tie(inner_col, outer_col) =
540 get_cols(join_condition, *executor->getCatalog(), executor->getTemporaryTables());
544 if (!inner_col || !outer_col) {
554 const bool is_overlaps_join) {
557 if (!is_overlaps_join) {
558 if (lhs_ti.get_type() != rhs_ti.get_type()) {
559 throw HashJoinFail(
"Equijoin types must be identical, found: " +
560 lhs_ti.get_type_name() +
", " + rhs_ti.get_type_name());
562 if (!lhs_ti.is_integer() && !lhs_ti.is_time() && !lhs_ti.is_string() &&
563 !lhs_ti.is_decimal()) {
564 throw HashJoinFail(
"Cannot apply hash join to inner column type " +
565 lhs_ti.get_type_name());
568 if (lhs_ti.is_decimal() && (lhs_ti.get_scale() != rhs_ti.get_scale() ||
569 lhs_ti.get_precision() != rhs_ti.get_precision())) {
570 throw HashJoinFail(
"Equijoin with different decimal types");
576 if (lhs_ti.is_string() && (
static_cast<bool>(lhs_cast) != static_cast<bool>(rhs_cast) ||
577 (lhs_cast && lhs_cast->get_optype() !=
kCAST) ||
578 (rhs_cast && rhs_cast->get_optype() !=
kCAST))) {
579 throw HashJoinFail(
"Cannot use hash join for given expression");
582 if (lhs_ti.is_decimal() && (lhs_cast || rhs_cast)) {
583 throw HashJoinFail(
"Cannot use hash join for given expression");
587 : dynamic_cast<const Analyzer::ColumnVar*>(lhs);
590 : dynamic_cast<const Analyzer::ColumnVar*>(rhs);
591 if (!lhs_col && !rhs_col) {
592 throw HashJoinFail(
"Cannot use hash join for given expression");
596 auto outer_ti = lhs_ti;
597 auto inner_ti = rhs_ti;
599 if ((!lhs_col || (rhs_col && lhs_col->get_rte_idx() < rhs_col->get_rte_idx())) &&
600 (!rhs_col || (!lhs_col || lhs_col->get_rte_idx() < rhs_col->get_rte_idx()))) {
604 if (lhs_col && lhs_col->get_rte_idx() == 0) {
605 throw HashJoinFail(
"Cannot use hash join for given expression");
613 throw HashJoinFail(
"Cannot use hash join for given expression");
617 int outer_rte_idx = rte_idx_visitor.
visit(outer_expr);
620 if (inner_col->get_rte_idx() <= outer_rte_idx) {
621 throw HashJoinFail(
"Cannot use hash join for given expression");
627 inner_col->get_column_id(), inner_col->get_table_id(),
cat);
628 const auto inner_col_real_ti =
get_column_type(inner_col->get_column_id(),
629 inner_col->get_table_id(),
632 const auto& outer_col_ti =
634 ? outer_col->get_type_info()
637 if ((inner_col_real_ti.is_decimal() || outer_col_ti.is_decimal()) &&
638 (lhs_cast || rhs_cast)) {
639 throw HashJoinFail(
"Cannot use hash join for given expression");
641 if (is_overlaps_join) {
642 if (!inner_col_real_ti.is_array()) {
644 "Overlaps join only supported for inner columns with array type");
646 auto is_bounds_array = [](
const auto ti) {
647 return ti.is_fixlen_array() && ti.get_size() == 32;
649 if (!is_bounds_array(inner_col_real_ti)) {
651 "Overlaps join only supported for 4-element double fixed length arrays");
653 if (!(outer_col_ti.get_type() ==
kPOINT || is_bounds_array(outer_col_ti))) {
655 "Overlaps join only supported for geometry outer columns of type point or " 656 "geometry columns with bounds");
659 if (!(inner_col_real_ti.is_integer() || inner_col_real_ti.is_time() ||
660 inner_col_real_ti.is_decimal() ||
661 (inner_col_real_ti.is_string() &&
664 "Can only apply hash join to integer-like types and dictionary encoded " 669 auto normalized_inner_col = inner_col;
670 auto normalized_outer_col = outer_col ? outer_col : outer_expr;
672 const auto& normalized_inner_ti = normalized_inner_col->get_type_info();
673 const auto& normalized_outer_ti = normalized_outer_col->get_type_info();
675 if (normalized_inner_ti.is_string() != normalized_outer_ti.is_string()) {
676 throw HashJoinFail(std::string(
"Could not build hash tables for incompatible types " +
677 normalized_inner_ti.get_type_name() +
" and " +
678 normalized_outer_ti.get_type_name()));
681 return {normalized_inner_col, normalized_outer_col};
687 std::vector<InnerOuter>
result;
688 const auto lhs_tuple_expr =
690 const auto rhs_tuple_expr =
693 CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
694 if (lhs_tuple_expr) {
695 const auto& lhs_tuple = lhs_tuple_expr->getTuple();
696 const auto& rhs_tuple = rhs_tuple_expr->getTuple();
697 CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
698 for (
size_t i = 0; i < lhs_tuple.size(); ++i) {
706 CHECK(!lhs_tuple_expr && !rhs_tuple_expr);
virtual HashJoinMatchingSet codegenMatchingSet(const CompilationOptions &, const size_t)=0
static llvm::Value * codegenHashTableLoad(const size_t table_idx, Executor *executor)
class for a per-database catalog. also includes metadata for the current database and the current use...
std::pair< const Analyzer::ColumnVar *, const Analyzer::Expr * > InnerOuter
std::string toStringFlat(const HashJoin *hash_table, const ExecutorDeviceType device_type, const int device_id)
static std::shared_ptr< OverlapsJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query's parse tree etc.
std::list< std::shared_ptr< Analyzer::Expr > > coalesce_singleton_equi_join(const std::shared_ptr< Analyzer::BinOper > &join_qual)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
void setupSyntheticCaching(std::set< const Analyzer::ColumnVar *> cvs, Executor *executor)
JoinColumn fetchJoinColumn(const Analyzer::ColumnVar *hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragment_info, const Data_Namespace::MemoryLevel effective_memory_level, const int device_id, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, DeviceAllocator *dev_buff_owner, std::vector< std::shared_ptr< void >> &malloc_owner, Executor *executor, ColumnCacheMap *column_cache)
virtual void copyToDevice(int8_t *device_dst, const int8_t *host_src, const size_t num_bytes) const =0
const SQLTypeInfo get_column_type(const int col_id, const int table_id, const ColumnDescriptor *cd, const TemporaryTables *temporary_tables)
std::vector< InputTableInfo > getSyntheticInputTableInfo(std::set< const Analyzer::ColumnVar *> cvs, Executor *executor)
static std::shared_ptr< PerfectJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query's parse tree etc.
virtual int8_t * alloc(const size_t num_bytes)=0
virtual std::string toStringFlat64(const ExecutorDeviceType device_type, const int device_id) const
static std::shared_ptr< BaselineJoinHashTable > getInstance(const std::shared_ptr< Analyzer::BinOper > condition, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from an in-flight SQL query's parse tree etc.
static JoinColumn makeJoinColumn(Executor *executor, const Analyzer::ColumnVar &hash_col, const std::vector< Fragmenter_Namespace::FragmentInfo > &fragments, const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator *device_allocator, std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner, std::vector< std::shared_ptr< void >> &malloc_owner, ColumnCacheMap &column_cache)
Creates a JoinColumn struct containing an array of JoinChunk structs.
static std::shared_ptr< HashJoin > getInstance(const std::shared_ptr< Analyzer::BinOper > qual_bin_oper, const std::vector< InputTableInfo > &query_infos, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor, const QueryHint &query_hint)
Make hash table from an in-flight SQL query's parse tree etc.
std::set< const Analyzer::ColumnVar * > aggregateResult(const std::set< const Analyzer::ColumnVar *> &aggregate, const std::set< const Analyzer::ColumnVar *> &next_result) const override
virtual std::string toStringFlat32(const ExecutorDeviceType device_type, const int device_id) const
size_t col_chunks_buff_sz
llvm::Value * get_arg_by_name(llvm::Function *func, const std::string &name)
const ColumnDescriptor * get_column_descriptor_maybe(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
bool is_overlaps_oper() const
static QueryHint defaults()
const std::vector< std::shared_ptr< Analyzer::Expr > > & getTuple() const
InnerOuter normalize_column_pair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join)
int64_t getJoinHashBuffer(const ExecutorDeviceType device_type, const int device_id) const
std::vector< InnerOuter > normalize_column_pairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
const int8_t * col_chunks_buff
std::ostream & operator<<(std::ostream &os, const DecodedJoinHashBufferEntry &e)
static void checkHashJoinReplicationConstraint(const int table_id, const size_t shard_count, const Executor *executor)
bool g_enable_overlaps_hashjoin
static CompositeKeyInfo getCompositeKeyInfo(const std::vector< InnerOuter > &inner_outer_pairs, const Executor *executor)
size_t getJoinHashBufferSize(const ExecutorDeviceType device_type)
bool table_is_replicated(const TableDescriptor *td)
std::set< DecodedJoinHashBufferEntry > DecodedJoinHashBufferSet
std::set< const Analyzer::ColumnVar * > visitColumnVarTuple(const Analyzer::ExpressionTuple *expr_tuple) const override
T visit(const Analyzer::Expr *expr) const
const SQLTypeInfo & get_type_info() const
std::set< const Analyzer::ColumnVar * > visitColumnVar(const Analyzer::ColumnVar *column) const override
std::set< int32_t > payload
#define DEBUG_TIMER(name)
std::vector< int > ChunkKey
DEVICE void swap(ARGS &&... args)
static std::shared_ptr< HashJoin > getSyntheticInstance(std::string_view table1, std::string_view column1, std::string_view table2, std::string_view column2, const Data_Namespace::MemoryLevel memory_level, const HashType preferred_hash_type, const int device_count, ColumnCacheMap &column_cache, Executor *executor)
Make hash table from named tables and columns (such as for testing).
std::vector< int64_t > key
size_t get_shard_count(const Analyzer::BinOper *join_condition, const Executor *executor)
std::shared_ptr< Analyzer::ColumnVar > getSyntheticColumnVar(std::string_view table, std::string_view column, int rte_idx, Executor *executor)
InnerOuter get_cols(const Analyzer::BinOper *qual_bin_oper, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
const Expr * get_right_operand() const
const Expr * get_left_operand() const