OmniSciDB  5ade3759e0
JoinFilterPushDown.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "JoinFilterPushDown.h"
18 #include "DeepCopyVisitor.h"
19 #include "RelAlgExecutor.h"
20 
21 namespace {
22 
24  std::shared_ptr<Analyzer::Expr> visitColumnVar(
25  const Analyzer::ColumnVar* col_var) const override {
26  return makeExpr<Analyzer::ColumnVar>(
27  col_var->get_type_info(), col_var->get_table_id(), col_var->get_column_id(), 0);
28  }
29 };
30 
32  : public ScalarExprVisitor<std::unordered_set<InputColDescriptor>> {
33  std::unordered_set<InputColDescriptor> visitColumnVar(
34  const Analyzer::ColumnVar* col_var) const override {
35  return {InputColDescriptor(col_var->get_column_id(), col_var->get_table_id(), 0)};
36  }
37 
38  public:
39  std::unordered_set<InputColDescriptor> aggregateResult(
40  const std::unordered_set<InputColDescriptor>& aggregate,
41  const std::unordered_set<InputColDescriptor>& next_result) const override {
42  auto result = aggregate;
43  result.insert(next_result.begin(), next_result.end());
44  return result;
45  }
46 };
47 
48 } // namespace
49 
58  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
59  const CompilationOptions& co,
60  const ExecutionOptions& eo) {
61  CollectInputColumnsVisitor input_columns_visitor;
62  std::list<std::shared_ptr<Analyzer::Expr>> quals;
63  std::unordered_set<InputColDescriptor> input_column_descriptors;
64  BindFilterToOutermostVisitor bind_filter_to_outermost;
65  for (const auto& filter_expr : filter_expressions) {
66  input_column_descriptors = input_columns_visitor.aggregateResult(
67  input_column_descriptors, input_columns_visitor.visit(filter_expr.get()));
68  quals.push_back(bind_filter_to_outermost.visit(filter_expr.get()));
69  }
70  std::vector<InputDescriptor> input_descs;
71  std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
72  for (const auto& input_col_desc : input_column_descriptors) {
73  if (input_descs.empty()) {
74  input_descs.push_back(input_col_desc.getScanDesc());
75  } else {
76  CHECK(input_col_desc.getScanDesc() == input_descs.front());
77  }
78  input_col_descs.push_back(std::make_shared<const InputColDescriptor>(input_col_desc));
79  }
80  const auto count_expr =
81  makeExpr<Analyzer::AggExpr>(SQLTypeInfo(g_bigint_count ? kBIGINT : kINT, false),
82  kCOUNT,
83  nullptr,
84  false,
85  nullptr);
86  RelAlgExecutionUnit ra_exe_unit{input_descs,
87  input_col_descs,
88  {},
89  quals,
90  {},
91  {},
92  {count_expr.get()},
93  nullptr,
94  {{}, SortAlgorithm::Default, 0, 0},
95  0};
96  size_t one{1};
97  ResultSetPtr filtered_result;
98  const auto table_infos = get_table_infos(input_descs, executor_);
99  CHECK_EQ(size_t(1), table_infos.size());
100  const size_t total_rows_upper_bound = table_infos.front().info.getNumTuplesUpperBound();
101  try {
102  ColumnCacheMap column_cache;
103  filtered_result = executor_->executeWorkUnit(one,
104  true,
105  table_infos,
106  ra_exe_unit,
107  co,
108  eo,
109  cat_,
110  executor_->getRowSetMemoryOwner(),
111  nullptr,
112  false,
113  column_cache);
114  } catch (...) {
115  return {false, 1.0, 0};
116  }
117  const auto count_row = filtered_result->getNextRow(false, false);
118  CHECK_EQ(size_t(1), count_row.size());
119  const auto& count_tv = count_row.front();
120  const auto count_scalar_tv = boost::get<ScalarTargetValue>(&count_tv);
121  CHECK(count_scalar_tv);
122  const auto count_ptr = boost::get<int64_t>(count_scalar_tv);
123  CHECK(count_ptr);
124  const auto rows_passing = *count_ptr;
125  const auto rows_total = std::max(total_rows_upper_bound, size_t(1));
126  return {true, static_cast<float>(rows_passing) / rows_total, total_rows_upper_bound};
127 }
128 
133 std::vector<PushedDownFilterInfo> RelAlgExecutor::selectFiltersToBePushedDown(
134  const RelAlgExecutor::WorkUnit& work_unit,
135  const CompilationOptions& co,
136  const ExecutionOptions& eo) {
137  const auto all_push_down_candidates =
139  work_unit.input_permutation,
140  work_unit.left_deep_join_input_sizes);
141  std::vector<PushedDownFilterInfo> selective_push_down_candidates;
142  const auto ti = get_table_infos(work_unit.exe_unit.input_descs, executor_);
144  for (const auto& candidate : all_push_down_candidates) {
145  const auto selectivity = getFilterSelectivity(candidate.filter_expressions, co, eo);
146  if (selectivity.is_valid && selectivity.isFilterSelectiveEnough()) {
147  selective_push_down_candidates.push_back(candidate);
148  }
149  }
150  }
151  return selective_push_down_candidates;
152 }
153 
155  std::vector<RaExecutionDesc>& ed_list,
156  const CompilationOptions& co,
157  const ExecutionOptions& eo,
158  RenderInfo* render_info,
159  const int64_t queue_time_ms) {
160  // we currently do not fully support filter push down with
161  // multi-step execution and/or with subqueries
162  // TODO(Saman): add proper caching to enable filter push down for all cases
163  if (ed_list.size() > 1 || !subqueries_.empty()) {
164  if (eo.just_calcite_explain) {
165  return ExecutionResult(std::vector<PushedDownFilterInfo>{},
167  }
168  const ExecutionOptions eo_modified{eo.output_columnar_hint,
169  eo.allow_multifrag,
170  eo.just_explain,
171  eo.allow_loop_joins,
172  eo.with_watchdog,
173  eo.jit_debug,
174  eo.just_validate,
177  /*find_push_down_candidates=*/false,
178  /*just_calcite_explain=*/false,
180 
181  // Dispatch the subqueries first
182  for (auto subquery : subqueries_) {
183  // Execute the subquery and cache the result.
184  RelAlgExecutor ra_executor(executor_, cat_);
185  auto result = ra_executor.executeRelAlgSubQuery(subquery.get(), co, eo_modified);
186  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
187  }
188  return executeRelAlgSeq(ed_list, co, eo_modified, render_info, queue_time_ms);
189  } else {
190  // Dispatch the subqueries first
191  for (auto subquery : subqueries_) {
192  // Execute the subquery and cache the result.
193  RelAlgExecutor ra_executor(executor_, cat_);
194  auto result = ra_executor.executeRelAlgSubQuery(subquery.get(), co, eo);
195  subquery->setExecutionResult(std::make_shared<ExecutionResult>(result));
196  }
197  return executeRelAlgSeq(ed_list, co, eo, render_info, queue_time_ms);
198  }
199 }
206  const std::vector<InputTableInfo>& table_infos) {
207  if (table_infos.size() < 2) {
208  return false;
209  }
210  // we currently do not support filter push down when there is a self-join involved:
211  // TODO(Saman): prevent Calcite from optimizing self-joins to remove this exclusion
212  std::unordered_set<int> table_ids;
213  for (auto ti : table_infos) {
214  if (table_ids.find(ti.table_id) == table_ids.end()) {
215  table_ids.insert(ti.table_id);
216  } else {
217  // a self-join is involved
218  return false;
219  }
220  }
221  // TODO(Saman): add some extra heuristics to avoid preflight count and push down if it
222  // is not going to be helpful.
223  return true;
224 }
225 
232 std::vector<PushedDownFilterInfo> find_push_down_filters(
233  const RelAlgExecutionUnit& ra_exe_unit,
234  const std::vector<size_t>& input_permutation,
235  const std::vector<size_t>& left_deep_join_input_sizes) {
236  std::vector<PushedDownFilterInfo> result;
237  if (left_deep_join_input_sizes.empty()) {
238  return result;
239  }
240  std::vector<size_t> input_size_prefix_sums(left_deep_join_input_sizes.size());
241  std::partial_sum(left_deep_join_input_sizes.begin(),
242  left_deep_join_input_sizes.end(),
243  input_size_prefix_sums.begin());
244  std::vector<int> to_original_rte_idx(ra_exe_unit.input_descs.size(),
245  ra_exe_unit.input_descs.size());
246  if (!input_permutation.empty()) {
247  CHECK_EQ(to_original_rte_idx.size(), input_permutation.size());
248  for (size_t i = 0; i < input_permutation.size(); ++i) {
249  CHECK_LT(input_permutation[i], to_original_rte_idx.size());
250  CHECK_EQ(static_cast<size_t>(to_original_rte_idx[input_permutation[i]]),
251  to_original_rte_idx.size());
252  to_original_rte_idx[input_permutation[i]] = i;
253  }
254  } else {
255  std::iota(to_original_rte_idx.begin(), to_original_rte_idx.end(), 0);
256  }
257  std::unordered_map<int, std::vector<std::shared_ptr<Analyzer::Expr>>>
258  filters_per_nesting_level;
259  for (const auto& level_conditions : ra_exe_unit.join_quals) {
261  for (const auto& cond : level_conditions.quals) {
262  const auto rte_indices = visitor.visit(cond.get());
263  if (rte_indices.size() > 1) {
264  continue;
265  }
266  const int rte_idx = (!rte_indices.empty()) ? *rte_indices.cbegin() : 0;
267  if (!rte_idx) {
268  continue;
269  }
270  CHECK_GE(rte_idx, 0);
271  CHECK_LT(static_cast<size_t>(rte_idx), to_original_rte_idx.size());
272  filters_per_nesting_level[to_original_rte_idx[rte_idx]].push_back(cond);
273  }
274  }
275  for (const auto& kv : filters_per_nesting_level) {
276  CHECK_GE(kv.first, 0);
277  CHECK_LT(static_cast<size_t>(kv.first), input_size_prefix_sums.size());
278  size_t input_prev = (kv.first > 1) ? input_size_prefix_sums[kv.first - 2] : 0;
279  size_t input_start = kv.first ? input_size_prefix_sums[kv.first - 1] : 0;
280  size_t input_next = input_size_prefix_sums[kv.first];
281  result.emplace_back(
282  PushedDownFilterInfo{kv.second, input_prev, input_start, input_next});
283  }
284  return result;
285 }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
const std::vector< size_t > left_deep_join_input_sizes
int get_column_id() const
Definition: Analyzer.h:194
RelAlgExecutionUnit exe_unit
FilterSelectivity getFilterSelectivity(const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
#define CHECK_GE(x, y)
Definition: Logger.h:200
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
std::shared_ptr< ResultSet > ResultSetPtr
const std::vector< InputDescriptor > input_descs
std::unordered_set< InputColDescriptor > aggregateResult(const std::unordered_set< InputColDescriptor > &aggregate, const std::unordered_set< InputColDescriptor > &next_result) const override
const bool allow_multifrag
const bool find_push_down_candidates
ExecutionResult executeRelAlgQueryWithFilterPushDown(std::vector< RaExecutionDesc > &ed_list, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const bool just_validate
bool to_gather_info_for_filter_selectivity(const std::vector< InputTableInfo > &table_infos)
const bool with_dynamic_watchdog
const JoinQualsPerNestingLevel join_quals
const double gpu_input_mem_limit_percent
std::vector< PushedDownFilterInfo > find_push_down_filters(const RelAlgExecutionUnit &ra_exe_unit, const std::vector< size_t > &input_permutation, const std::vector< size_t > &left_deep_join_input_sizes)
bool g_bigint_count
ExecutionResult executeRelAlgSubQuery(const RexSubQuery *subquery, const CompilationOptions &co, const ExecutionOptions &eo)
const bool output_columnar_hint
const std::vector< size_t > input_permutation
SQLTypeInfoCore< ArrayContextTypeSizer, ExecutorTypePackaging, DateTimeFacilities > SQLTypeInfo
Definition: sqltypes.h:823
#define CHECK_LT(x, y)
Definition: Logger.h:197
const bool just_calcite_explain
std::shared_ptr< Analyzer::Expr > visitColumnVar(const Analyzer::ColumnVar *col_var) const override
int get_table_id() const
Definition: Analyzer.h:193
Definition: sqldefs.h:71
T visit(const Analyzer::Expr *expr) const
const bool allow_loop_joins
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
std::unordered_set< InputColDescriptor > visitColumnVar(const Analyzer::ColumnVar *col_var) const override
#define CHECK(condition)
Definition: Logger.h:187
std::vector< InputTableInfo > get_table_infos(const std::vector< InputDescriptor > &input_descs, Executor *executor)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
Definition: sqltypes.h:47
const unsigned dynamic_watchdog_time_limit
const bool with_watchdog