21 #include <boost/algorithm/cxx11/any_of.hpp>
27 bool operator()(std::shared_ptr<Analyzer::Expr>
const& qual) {
28 if (
auto oper = std::dynamic_pointer_cast<const Analyzer::BinOper>(qual)) {
38 std::vector<InnerOuterOrLoopQual>
result;
39 const auto lhs_tuple_expr =
41 const auto rhs_tuple_expr =
44 CHECK_EQ(static_cast<bool>(lhs_tuple_expr), static_cast<bool>(rhs_tuple_expr));
45 auto do_normalize_inner_outer_pair = [&
result, &condition,
this](
54 condition->is_overlaps_oper())
57 std::make_pair(inner_outer_pair.first, inner_outer_pair.second),
false};
58 result.push_back(valid_qual);
61 result.push_back(invalid_qual);
65 const auto& lhs_tuple = lhs_tuple_expr->getTuple();
66 const auto& rhs_tuple = rhs_tuple_expr->getTuple();
67 CHECK_EQ(lhs_tuple.size(), rhs_tuple.size());
68 for (
size_t i = 0; i < lhs_tuple.size(); ++i) {
69 do_normalize_inner_outer_pair(
70 lhs_tuple[i].
get(), rhs_tuple[i].
get(),
executor_->getTemporaryTables());
73 do_normalize_inner_outer_pair(condition->get_left_operand(),
74 condition->get_right_operand(),
89 if (dag_checker_res.first) {
90 VLOG(1) <<
"Stop DAG extraction (" << dag_checker_res.second <<
")";
94 auto& cached_dag = executor->getQueryPlanDagCache();
97 auto extracted_query_plan_dag = dag_extractor.getExtractedQueryPlanDagStr();
99 if (
auto sort_node = dynamic_cast<RelSort const*>(top_node)) {
103 auto child_dag = dag_extractor.getExtractedQueryPlanDagStr(1);
104 sort_node->getInput(0)->setQueryPlanDag(child_dag);
106 return {extracted_query_plan_dag, dag_extractor.isDagExtractionAvailable()};
111 std::optional<unsigned> left_deep_tree_id,
112 std::unordered_map<unsigned, JoinQualsPerNestingLevel> left_deep_tree_infos,
113 Executor* executor) {
119 auto& cached_dag = executor->getQueryPlanDagCache();
131 VLOG(1) <<
"Stop DAG extraction (Query plan dag cache reaches the maximum capacity)";
140 if (
auto table_func_node = dynamic_cast<const RelTableFunction*>(top_node)) {
141 for (
size_t i = 0; i < table_func_node->inputCount(); ++i) {
142 dag_extractor.
visit(table_func_node, table_func_node->getInput(i));
146 switch (num_child_node) {
151 if (
auto trans_join_node = dynamic_cast<const RelTranslatedJoin*>(top_node)) {
152 dag_extractor.
visit(trans_join_node, trans_join_node->getLHS());
153 dag_extractor.
visit(trans_join_node, trans_join_node->getRHS());
156 VLOG(1) <<
"Visit an invalid rel node while extracting query plan DAG: "
175 dag_extractor.
executor_->registerExtractedQueryPlanDag(
182 std::ostringstream oss;
188 if (cnt >= start_pos) {
189 oss << dag_node_id <<
"|";
197 std::optional<RelNodeId> retrieved_node_id) {
198 if (!retrieved_node_id) {
199 VLOG(1) <<
"Stop DAG extraction (Detect an invalid dag id)";
203 CHECK(retrieved_node_id.has_value());
211 std::optional<RelNodeId> retrieved_node_id) {
214 CHECK(retrieved_node_id.has_value());
228 for (
size_t i = 0; i < child_node->
inputCount(); i++) {
243 bool child_visited =
false;
245 if (
auto left_deep_joins = dynamic_cast<const RelLeftDeepInnerJoin*>(child_node)) {
248 VLOG(1) <<
"Stop DAG extraction (Detect non-supported join pattern)";
252 auto true_parent_node = parent_node;
253 std::shared_ptr<RelFilter> dummy_filter_node{
nullptr};
254 const auto inner_cond = left_deep_joins->getInnerCondition();
261 if (
auto cond = dynamic_cast<const RexOperator*>(inner_cond)) {
263 auto copied_inner_cond = copier.
visit(cond);
264 dummy_filter_node = std::make_shared<RelFilter>(copied_inner_cond);
266 true_parent_node = dummy_filter_node.get();
269 child_visited =
true;
270 }
else if (
auto translated_join_node =
271 dynamic_cast<const RelTranslatedJoin*>(child_node)) {
273 child_visited =
true;
276 if (!child_visited) {
288 CHECK(rel_trans_join);
307 auto fill_node_ids_to_dag_vec = [&](
const std::string& node_ids) {
308 auto node_ids_vec =
split(node_ids,
"|");
310 std::for_each(node_ids_vec.begin(),
311 std::prev(node_ids_vec.end()),
314 QueryPlanDAG current_plan_dag, after_rhs_visited, after_lhs_visited;
316 auto rhs_node = rel_trans_join->
getRHS();
317 std::unordered_set<size_t> rhs_input_keys, lhs_input_keys;
320 visit(rel_trans_join, rhs_node);
322 fill_node_ids_to_dag_vec(rhs_node->getQueryPlanDag());
328 auto lhs_node = rel_trans_join->
getLHS();
329 if (rel_trans_join->
getLHS()) {
331 visit(rel_trans_join, lhs_node);
333 fill_node_ids_to_dag_vec(lhs_node->getQueryPlanDag());
340 VLOG(1) <<
"Stop DAG extraction (Detect invalid query plan dag of join col(s))";
346 auto outer_table_identifier =
split(after_rhs_visited, current_plan_dag)[1];
347 auto hash_table_identfier =
split(after_lhs_visited, after_rhs_visited)[1];
350 auto inner_join_cols = rel_trans_join->
getJoinCols(
true);
351 auto inner_join_col_info =
353 boost::hash_combine(join_qual_info, inner_join_col_info);
354 auto outer_join_cols = rel_trans_join->
getJoinCols(
false);
355 auto outer_join_col_info =
357 boost::hash_combine(join_qual_info, outer_join_col_info);
359 std::unordered_set<size_t> collected_table_keys;
360 collected_table_keys.insert(lhs_input_keys.begin(), lhs_input_keys.end());
361 if (!inner_join_cols.empty() &&
362 inner_join_cols[0]->get_type_info().is_dict_encoded_type()) {
363 collected_table_keys.insert(rhs_input_keys.begin(), rhs_input_keys.end());
368 VLOG(2) <<
"Add hashtable access path"
369 <<
", inner join col info: " << inner_join_col_info
370 <<
" (access path: " << hash_table_identfier <<
")"
371 <<
", outer join col info: " << outer_join_col_info
372 <<
" (access path: " << outer_table_identifier <<
")";
377 boost::hash_value(hash_table_identfier),
378 boost::hash_value(outer_table_identifier),
379 std::move(collected_table_keys)));
382 VLOG(2) <<
"Add loop join access path, for LHS: " << outer_table_identifier
383 <<
", for RHS: " << hash_table_identfier <<
"\n";
397 for (
size_t input_idx = 0; input_idx < rel_left_deep_join->
inputCount(); ++input_idx) {
398 auto const input_node = rel_left_deep_join->
getInput(input_idx);
399 auto const scan_node =
dynamic_cast<const RelScan*
>(input_node);
401 : -1 * input_node->getId();
402 if (target_table_id == tbl_id) {
412 if (
auto col_var = dynamic_cast<const Analyzer::ColumnVar*>(col_info)) {
427 CHECK(rel_left_deep_join);
434 auto left_deep_tree_id = rel_left_deep_join->
getId();
436 if (!left_deep_join_info) {
438 VLOG(1) <<
"Stop DAG extraction (Detect Non-supported join pattern)";
453 for (
size_t level_idx = 0; level_idx < left_deep_join_info->size(); ++level_idx) {
454 const auto& current_level_join_conditions = left_deep_join_info->at(level_idx);
455 std::vector<const Analyzer::ColumnVar*> inner_join_cols;
456 std::vector<const Analyzer::ColumnVar*> outer_join_cols;
457 std::vector<std::shared_ptr<const Analyzer::Expr>> filter_ops;
458 int inner_input_idx{-1};
459 int outer_input_idx{-1};
460 OpInfo op_info{
"UNDEFINED",
"UNDEFINED",
"UNDEFINED"};
461 std::unordered_set<std::string> visited_filter_ops;
464 const bool found_eq_join_qual =
466 boost::algorithm::any_of(current_level_join_conditions.quals, IsEquivBinOp{});
467 const bool nested_loop = !found_eq_join_qual;
468 const bool is_left_join = current_level_join_conditions.type ==
JoinType::LEFT;
476 bool is_geo_join{
false};
477 for (
const auto& join_qual : current_level_join_conditions.quals) {
478 auto qual_bin_oper = std::dynamic_pointer_cast<
const Analyzer::BinOper>(join_qual);
481 is_geo_join = qual_bin_oper->is_overlaps_oper();
482 if (join_qual == current_level_join_conditions.quals.front()) {
484 op_info = OpInfo{
::toString(qual_bin_oper->get_optype()),
485 ::
toString(qual_bin_oper->get_qualifier()),
486 qual_bin_oper->get_type_info().to_string()};
493 if (!found_eq_join_qual && (is_left_join || col_pair_info.loop_join_qual)) {
499 if (visited_filter_ops.insert(join_qual_str).second) {
500 filter_ops.push_back(join_qual);
505 bool found_valid_col_vars =
false;
506 std::vector<const Analyzer::ColumnVar*> lhs_cvs, rhs_cvs;
507 if (col_pair_info.inner_outer.first && col_pair_info.inner_outer.second) {
509 if (
auto range_oper = dynamic_cast<const Analyzer::RangeOper*>(
510 col_pair_info.inner_outer.second)) {
511 lhs_cvs =
getColVar(range_oper->get_left_operand());
512 rhs_cvs =
getColVar(col_pair_info.inner_outer.first);
516 lhs_cvs =
getColVar(col_pair_info.inner_outer.first);
517 rhs_cvs =
getColVar(col_pair_info.inner_outer.second);
519 if (!lhs_cvs.empty() && !rhs_cvs.empty()) {
520 found_valid_col_vars =
true;
521 if (inner_input_idx == -1) {
523 get_input_idx(rel_left_deep_join, lhs_cvs.front()->get_table_id());
525 if (outer_input_idx == -1) {
527 get_input_idx(rel_left_deep_join, rhs_cvs.front()->get_table_id());
530 lhs_cvs.begin(), lhs_cvs.end(), std::back_inserter(inner_join_cols));
532 rhs_cvs.begin(), rhs_cvs.end(), std::back_inserter(outer_join_cols));
535 if (!found_valid_col_vars &&
536 visited_filter_ops.insert(join_qual_str).second) {
537 filter_ops.push_back(join_qual);
542 if (visited_filter_ops.insert(join_qual_str).second) {
543 filter_ops.push_back(join_qual);
547 if (!is_geo_join && (inner_join_cols.size() != outer_join_cols.size())) {
548 VLOG(1) <<
"Stop DAG extraction (Detect inner/outer col mismatch)";
568 if (inner_input_idx != -1 && outer_input_idx != -1) {
569 lhs = rel_left_deep_join->
getInput(inner_input_idx);
570 rhs = rel_left_deep_join->
getInput(outer_input_idx);
572 if (level_idx == 0) {
573 lhs = rel_left_deep_join->
getInput(0);
574 rhs = rel_left_deep_join->
getInput(1);
577 rhs = rel_left_deep_join->
getInput(level_idx + 1);
582 auto cur_translated_join_node =
583 std::make_shared<RelTranslatedJoin>(lhs,
590 current_level_join_conditions.type,
594 CHECK(cur_translated_join_node);
bool isNestedLoopQual() const
std::optional< RelNodeId > addNodeIfAbsent(const RelAlgNode *)
#define IS_EQUIVALENCE(X)
void connectNodes(const RelNodeId parent_id, const RelNodeId child_id)
const RexScalar * getOuterCondition(const size_t nesting_level) const
void setQueryPlanDag(const std::string &extracted_query_plan_dag) const
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const Expr * get_right_operand() const
void setRelNodeDagId(const size_t id) const
static std::pair< bool, std::string > hasNonSupportedNodeInDag(const RelAlgNode *rel_alg_node)
std::vector< const Analyzer::ColumnVar * > getJoinCols(bool lhs) const
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
size_t getQueryPlanDagHash() const
virtual T visit(const RexScalar *rex_scalar) const
const RelAlgNode * getRHS() const
static std::pair< InnerOuter, InnerOuterStringOpInfos > normalizeColumnPair(const Analyzer::Expr *lhs, const Analyzer::Expr *rhs, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables, const bool is_overlaps_join=false)
size_t translateColVarsToInfoHash(std::vector< const Analyzer::ColumnVar * > &col_vars, bool col_id_only) const
DEVICE auto copy(ARGS &&...args)
std::unique_lock< T > unique_lock
const RelAlgNode * getInput(const size_t idx) const
std::string toString(const Executor::ExtModuleKinds &kind)
const RelAlgNode * getLHS() const
constexpr char const * EMPTY_QUERY_PLAN
const Expr * get_left_operand() const
const size_t inputCount() const
size_t getRelNodeDagId() const
unsigned node_id(const rapidjson::Value &ra_node) noexcept
const TableDescriptor * getTableDescriptor() const
std::vector< const Analyzer::ColumnVar * > collectColVars(const Analyzer::Expr *target)
int get_input_idx(RelAlgExecutionUnit const &ra_exe_unit, int const outer_table_id)