18 #include "../Analyzer/Analyzer.h"
40 const Executor* executor) {
43 std::vector<SQLTypes> geo_types_for_func;
44 for (
size_t i = 0; i < func_oper->getArity(); i++) {
45 const auto arg_expr = func_oper->
getArg(i);
48 geo_types_for_func.push_back(ti.get_type());
51 std::regex geo_func_regex(
"ST_[\\w]*");
52 std::smatch geo_func_match;
53 const auto& func_name = func_oper->getName();
54 if (geo_types_for_func.size() == 2 &&
55 std::regex_match(func_name, geo_func_match, geo_func_regex)) {
70 bin_oper, *executor->getCatalog(), executor->getTemporaryTables());
71 const auto& inner_outer = normalized_bin_oper.first;
73 auto lhs = bin_oper->get_left_operand();
74 if (
auto lhs_tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(
75 bin_oper->get_left_operand())) {
76 lhs = lhs_tuple->getTuple().front().get();
79 if (lhs == inner_outer.front().first) {
81 }
else if (lhs == inner_outer.front().second) {
87 return {200, 200, inner_qual_decision};
90 return {100, 100, inner_qual_decision};
96 const std::vector<InputTableInfo>& table_infos,
97 const Executor* executor,
98 std::vector<std::map<node_t, InnerQualDecision>>& qual_detection_res) {
99 CHECK_EQ(left_deep_join_quals.size() + 1, table_infos.size());
100 std::vector<std::map<node_t, cost_t>> join_cost_graph(table_infos.size());
104 for (
const auto& current_level_join_conditions : left_deep_join_quals) {
105 for (
const auto& qual : current_level_join_conditions.quals) {
106 std::set<int> qual_nest_levels = visitor.
visit(qual.get());
107 if (qual_nest_levels.size() != 2) {
110 int lhs_nest_level = *qual_nest_levels.begin();
112 qual_nest_levels.erase(qual_nest_levels.begin());
113 int rhs_nest_level = *qual_nest_levels.begin();
118 qual_detection_res[lhs_nest_level][rhs_nest_level] = std::get<2>(qual_costing);
119 qual_detection_res[rhs_nest_level][lhs_nest_level] = std::get<2>(qual_costing);
120 const auto edge_it = join_cost_graph[lhs_nest_level].find(rhs_nest_level);
121 auto rhs_cost = std::get<1>(qual_costing);
122 if (edge_it == join_cost_graph[lhs_nest_level].end() ||
123 edge_it->second > rhs_cost) {
124 auto lhs_cost = std::get<0>(qual_costing);
125 join_cost_graph[lhs_nest_level][rhs_nest_level] = rhs_cost;
126 join_cost_graph[rhs_nest_level][lhs_nest_level] = lhs_cost;
130 return join_cost_graph;
143 for (
auto& inbound_for_node : inbound_) {
144 inbound_for_node.erase(from);
150 std::unordered_set<node_t> roots;
151 for (
node_t candidate = 0; candidate < inbound_.size(); ++candidate) {
152 if (inbound_[candidate].empty()) {
153 roots.insert(candidate);
172 const std::vector<std::map<node_t, cost_t>>& join_cost_graph) {
178 for (
size_t level_idx = 0; level_idx < left_deep_join_quals.size(); ++level_idx) {
180 dependency_tracking.
addEdge(level_idx, level_idx + 1);
183 return dependency_tracking;
190 const std::vector<std::map<node_t, cost_t>>& join_cost_graph,
191 const std::vector<InputTableInfo>& table_infos,
192 const std::function<
bool(
const node_t lhs_nest_level,
const node_t rhs_nest_level)>&
196 std::vector<std::map<node_t, InnerQualDecision>>& qual_normalization_res) {
197 std::vector<node_t> all_nest_levels(table_infos.size());
198 std::iota(all_nest_levels.begin(), all_nest_levels.end(), 0);
199 std::vector<node_t> input_permutation;
200 std::unordered_set<node_t> visited;
201 auto dependency_tracking =
203 auto schedulable_node = [&dependency_tracking, &visited](
const node_t node) {
204 const auto nodes_ready = dependency_tracking.getRoots();
205 return nodes_ready.find(node) != nodes_ready.end() &&
206 visited.find(node) == visited.end();
208 while (visited.size() < table_infos.size()) {
210 std::vector<node_t> remaining_nest_levels;
211 std::copy_if(all_nest_levels.begin(),
212 all_nest_levels.end(),
213 std::back_inserter(remaining_nest_levels),
215 CHECK(!remaining_nest_levels.empty());
217 const auto start_it = std::max_element(
218 remaining_nest_levels.begin(), remaining_nest_levels.end(), compare_node);
219 CHECK(start_it != remaining_nest_levels.end());
220 std::priority_queue<TraversalEdge, std::vector<TraversalEdge>, decltype(compare_edge)>
221 worklist(compare_edge);
232 if (remaining_nest_levels.size() == 2 && qual_normalization_res[start].size() == 1) {
233 auto inner_qual_decision = qual_normalization_res[start].begin()->second;
234 auto join_qual = left_deep_join_quals.begin()->quals;
237 bool (*)(
const Analyzer::ColumnVar*,
const Analyzer::ColumnVar*)>;
239 auto set_new_rte_idx = [](ColvarSet& cv_set,
int new_rte) {
241 cv_set.begin(), cv_set.end(), [new_rte](
const Analyzer::ColumnVar* cv) {
242 const_cast<Analyzer::ColumnVar*
>(cv)->set_rte_idx(new_rte);
252 auto analyze_join_qual = [&start,
253 &remaining_nest_levels,
254 &inner_qual_decision,
256 compare_node](
const std::shared_ptr<Analyzer::Expr>& lhs,
257 ColvarSet& lhs_colvar_set,
258 const std::shared_ptr<Analyzer::Expr>& rhs,
259 ColvarSet& rhs_colvar_set) {
260 if (!lhs || !rhs || lhs_colvar_set.empty() || rhs_colvar_set.empty()) {
261 return std::make_pair(Decision::IGNORE, start);
264 auto alternative_it = std::find_if(
265 remaining_nest_levels.begin(),
266 remaining_nest_levels.end(),
267 [start](
const size_t nest_level) {
return start != nest_level; });
268 CHECK(alternative_it != remaining_nest_levels.end());
269 auto alternative_rte = *alternative_it;
271 Decision decision = Decision::IGNORE;
275 bool is_outer_col_valid =
false;
276 auto check_expr_is_valid_col = [&is_outer_col_valid](
const Analyzer::Expr* expr) {
277 if (
auto expr_tuple = dynamic_cast<const Analyzer::ExpressionTuple*>(expr)) {
278 for (
auto& inner_expr : expr_tuple->getTuple()) {
280 HashJoin::getHashJoinColumn<Analyzer::ColumnVar>(inner_expr.get());
282 is_outer_col_valid =
false;
287 auto cv_from_expr = HashJoin::getHashJoinColumn<Analyzer::ColumnVar>(expr);
289 is_outer_col_valid =
false;
293 is_outer_col_valid =
true;
296 inner_rte = (*lhs_colvar_set.begin())->get_rte_idx();
297 outer_rte = (*rhs_colvar_set.begin())->get_rte_idx();
298 check_expr_is_valid_col(rhs.get());
300 inner_rte = (*rhs_colvar_set.begin())->get_rte_idx();
301 outer_rte = (*lhs_colvar_set.begin())->get_rte_idx();
302 check_expr_is_valid_col(lhs.get());
304 if (inner_rte >= 0 && outer_rte >= 0) {
305 const auto inner_cardinality =
306 table_infos[inner_rte].info.getNumTuplesUpperBound();
307 const auto outer_cardinality =
308 table_infos[outer_rte].info.getNumTuplesUpperBound();
310 if (inner_rte == static_cast<int>(start)) {
315 decision = is_outer_col_valid && inner_cardinality > outer_cardinality
319 CHECK_EQ(inner_rte, static_cast<int>(alternative_rte));
321 if (compare_node(inner_rte, start)) {
324 decision = Decision::IGNORE;
328 decision = Decision::KEEP;
334 if (decision == Decision::KEEP) {
335 return std::make_pair(decision, start);
336 }
else if (decision == Decision::SWAP) {
337 return std::make_pair(decision, alternative_rte);
339 return std::make_pair(Decision::IGNORE, start);
342 auto collect_colvars = [](
const std::shared_ptr<Analyzer::Expr> expr,
344 expr->collect_column_var(cv_set,
false);
347 auto adjust_reordering_logic = [&start, &start_edge, &start_it, set_new_rte_idx](
350 ColvarSet& lhs_colvar_set,
351 ColvarSet& rhs_colvar_set) {
352 CHECK(decision == Decision::SWAP);
353 start = alternative_rte;
354 set_new_rte_idx(lhs_colvar_set, alternative_rte);
355 set_new_rte_idx(rhs_colvar_set, *start_it);
356 start_edge.join_cost = 0;
357 start_edge.nest_level = start;
363 auto rhs = bin_op->get_own_right_operand();
364 if (
auto lhs_exp = dynamic_cast<Analyzer::ExpressionTuple*>(lhs.get())) {
369 auto& lhs_exprs = lhs_exp->getTuple();
370 auto& rhs_exprs = rhs_exp->getTuple();
371 CHECK_EQ(lhs_exprs.size(), rhs_exprs.size());
372 for (
size_t i = 0; i < lhs_exprs.size(); ++i) {
373 Decision decision{Decision::IGNORE};
374 int alternative_rte_idx = -1;
377 collect_colvars(lhs_exprs.at(i), lhs_colvar_set);
378 collect_colvars(rhs_exprs.at(i), rhs_colvar_set);
380 auto investigation_res =
381 analyze_join_qual(lhs, lhs_colvar_set, rhs, rhs_colvar_set);
382 decision = investigation_res.first;
383 if (decision == Decision::KEEP) {
384 return remaining_nest_levels;
386 alternative_rte_idx = investigation_res.second;
388 if (decision == Decision::SWAP) {
389 adjust_reordering_logic(
390 decision, alternative_rte_idx, lhs_colvar_set, rhs_colvar_set);
396 collect_colvars(lhs, lhs_colvar_set);
397 collect_colvars(rhs, rhs_colvar_set);
398 auto investigation_res =
399 analyze_join_qual(lhs, lhs_colvar_set, rhs, rhs_colvar_set);
400 if (investigation_res.first == Decision::KEEP) {
401 return remaining_nest_levels;
402 }
else if (investigation_res.first == Decision::SWAP) {
403 adjust_reordering_logic(investigation_res.first,
404 investigation_res.second,
412 VLOG(2) <<
"Table reordering starting with nest level " << start;
413 for (
const auto& graph_edge : join_cost_graph[*start_it]) {
414 const node_t succ = graph_edge.first;
415 if (!schedulable_node(succ)) {
419 for (
const auto& successor_edge : join_cost_graph[succ]) {
420 if (successor_edge.first == start) {
421 start_edge.
join_cost = successor_edge.second;
424 if (compare_edge(start_edge, succ_edge)) {
425 VLOG(2) <<
"Table reordering changing start nest level from " << start
428 start_edge = succ_edge;
433 VLOG(2) <<
"Table reordering picked start nest level " << start <<
" with cost "
434 << start_edge.join_cost;
435 CHECK_EQ(start, start_edge.nest_level);
436 worklist.push(start_edge);
437 const auto it_ok = visited.insert(start);
439 while (!worklist.empty()) {
443 VLOG(1) <<
"Insert input permutation, idx: " << input_permutation.size()
446 dependency_tracking.removeNode(crt.
nest_level);
448 for (
const auto& graph_edge : join_cost_graph[crt.
nest_level]) {
449 const node_t succ = graph_edge.first;
450 if (!schedulable_node(succ)) {
454 const auto it_ok = visited.insert(succ);
459 return input_permutation;
466 const std::vector<InputTableInfo>& table_infos,
467 const Executor* executor) {
468 std::vector<std::map<node_t, InnerQualDecision>> qual_normalization_res(
471 left_deep_join_quals, table_infos, executor, qual_normalization_res);
473 const auto compare_node = [&table_infos](
const node_t lhs_nest_level,
474 const node_t rhs_nest_level) {
475 return table_infos[lhs_nest_level].info.getNumTuplesUpperBound() <
476 table_infos[rhs_nest_level].info.getNumTuplesUpperBound();
478 const auto compare_edge = [&compare_node](
const TraversalEdge& lhs_edge,
479 const TraversalEdge& rhs_edge) {
481 if (lhs_edge.join_cost == rhs_edge.join_cost) {
482 return compare_node(lhs_edge.nest_level, rhs_edge.nest_level);
484 return lhs_edge.join_cost > rhs_edge.join_cost;
490 left_deep_join_quals,
491 qual_normalization_res);
std::unordered_set< node_t > getRoots() const
static bool colvar_comp(const ColumnVar *l, const ColumnVar *r)
#define IS_EQUIVALENCE(X)
SchedulingDependencyTracking build_dependency_tracking(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< std::map< node_t, cost_t >> &join_cost_graph)
static std::pair< std::vector< InnerOuter >, std::vector< InnerOuterStringOpInfos > > normalizeColumnPairs(const Analyzer::BinOper *condition, const Catalog_Namespace::Catalog &cat, const TemporaryTables *temporary_tables)
bool is_constructed_point(const Analyzer::Expr *expr)
static std::unordered_map< SQLTypes, cost_t > std::tuple< cost_t, cost_t, InnerQualDecision > get_join_qual_cost(const Analyzer::Expr *qual, const Executor *executor)
std::vector< JoinCondition > JoinQualsPerNestingLevel
T visit(const Analyzer::Expr *expr) const
unsigned g_trivial_loop_join_threshold
std::vector< node_t > get_node_input_permutation(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor)
static std::unordered_map< SQLTypes, cost_t > GEO_TYPE_COSTS
void removeNode(const node_t from)
std::vector< std::map< node_t, cost_t > > build_join_cost_graph(const JoinQualsPerNestingLevel &left_deep_join_quals, const std::vector< InputTableInfo > &table_infos, const Executor *executor, std::vector< std::map< node_t, InnerQualDecision >> &qual_detection_res)
const SQLTypeInfo & get_type_info() const
SchedulingDependencyTracking(const size_t node_count)
const Analyzer::Expr * getArg(const size_t i) const
DEVICE void iota(ARGS &&...args)
void addEdge(const node_t from, const node_t to)
std::vector< std::unordered_set< node_t > > inbound_
std::vector< node_t > traverse_join_cost_graph(const std::vector< std::map< node_t, cost_t >> &join_cost_graph, const std::vector< InputTableInfo > &table_infos, const std::function< bool(const node_t lhs_nest_level, const node_t rhs_nest_level)> &compare_node, const std::function< bool(const TraversalEdge &, const TraversalEdge &)> &compare_edge, const JoinQualsPerNestingLevel &left_deep_join_quals, std::vector< std::map< node_t, InnerQualDecision >> &qual_normalization_res)
AccessManager::Decision Decision
InnerQualDecision inner_qual_decision
const std::shared_ptr< Analyzer::Expr > get_own_left_operand() const