OmniSciDB  3a86f6ec37
RelAlgExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef QUERYENGINE_RELALGEXECUTOR_H
18 #define QUERYENGINE_RELALGEXECUTOR_H
19 
22 #include "QueryEngine/Execute.h"
29 #include "Shared/scope.h"
31 
32 #include <ctime>
33 #include <sstream>
34 
35 #include "StorageIOFacility.h"
36 
37 extern bool g_skip_intermediate_count;
38 
39 enum class MergeType { Union, Reduce };
40 
44  const unsigned node_id;
46 };
47 
49  public:
50  using TargetInfoList = std::vector<TargetInfo>;
51 
52  RelAlgExecutor(Executor* executor,
54  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
55  : StorageIOFacility(executor, cat)
56  , executor_(executor)
57  , cat_(cat)
58  , query_state_(std::move(query_state))
59  , now_(0)
60  , queue_time_ms_(0) {}
61 
62  RelAlgExecutor(Executor* executor,
64  const std::string& query_ra,
65  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
66  : StorageIOFacility(executor, cat)
67  , executor_(executor)
68  , cat_(cat)
69  , query_dag_(std::make_unique<RelAlgDagBuilder>(query_ra, cat_, nullptr))
70  , query_state_(std::move(query_state))
71  , now_(0)
72  , queue_time_ms_(0) {}
73 
74  RelAlgExecutor(Executor* executor,
76  std::unique_ptr<RelAlgDagBuilder> query_dag,
77  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
78  : StorageIOFacility(executor, cat)
79  , executor_(executor)
80  , cat_(cat)
81  , query_dag_(std::move(query_dag))
82  , query_state_(std::move(query_state))
83  , now_(0)
84  , queue_time_ms_(0) {}
85 
86  size_t getOuterFragmentCount(const CompilationOptions& co, const ExecutionOptions& eo);
87 
88  ExecutionResult executeRelAlgQuery(const CompilationOptions& co,
89  const ExecutionOptions& eo,
90  const bool just_explain_plan,
91  RenderInfo* render_info);
92 
93  ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence& seq,
94  const CompilationOptions& co,
95  const ExecutionOptions& eo,
96  RenderInfo* render_info,
97  const int64_t queue_time_ms);
98 
99  void prepareLeafExecution(
100  const AggregatedColRange& agg_col_range,
101  const StringDictionaryGenerations& string_dictionary_generations,
102  const TableGenerations& table_generations);
103 
104  ExecutionResult executeRelAlgSeq(const RaExecutionSequence& seq,
105  const CompilationOptions& co,
106  const ExecutionOptions& eo,
107  RenderInfo* render_info,
108  const int64_t queue_time_ms,
109  const bool with_existing_temp_tables = false);
110 
111  ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence& seq,
112  const std::pair<size_t, size_t> interval,
113  const CompilationOptions& co,
114  const ExecutionOptions& eo,
115  RenderInfo* render_info,
116  const int64_t queue_time_ms);
117 
118  QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence& seq,
119  const size_t step_idx,
120  const CompilationOptions& co,
121  const ExecutionOptions& eo,
122  RenderInfo* render_info);
123 
124  void addLeafResult(const unsigned id, const AggregatedResult& result) {
125  const auto it_ok = leaf_results_.emplace(id, result);
126  CHECK(it_ok.second);
127  }
128 
129  const RelAlgNode& getRootRelAlgNode() const {
130  CHECK(query_dag_);
131  return query_dag_->getRootNode();
132  }
133  const std::vector<std::shared_ptr<RexSubQuery>>& getSubqueries() const noexcept {
134  CHECK(query_dag_);
135  return query_dag_->getSubqueries();
136  };
138  CHECK(query_dag_);
139  return query_dag_->getQueryHints();
140  }
141 
142  ExecutionResult executeSimpleInsert(const Analyzer::Query& insert_query);
143 
144  AggregatedColRange computeColRangesCache();
145  StringDictionaryGenerations computeStringDictionaryGenerations();
146  TableGenerations computeTableGenerations();
147 
148  Executor* getExecutor() const;
149 
150  void cleanupPostExecution();
151 
152  static std::string getErrorMessageFromCode(const int32_t error_code);
153 
154  void executePostExecutionCallback();
155 
156  private:
157  ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions& co,
158  const ExecutionOptions& eo,
159  const bool just_explain_plan,
160  RenderInfo* render_info);
161 
162  void executeRelAlgStep(const RaExecutionSequence& seq,
163  const size_t step_idx,
164  const CompilationOptions&,
165  const ExecutionOptions&,
166  RenderInfo*,
167  const int64_t queue_time_ms);
168 
169  void executeUpdate(const RelAlgNode* node,
170  const CompilationOptions& co,
171  const ExecutionOptions& eo,
172  const int64_t queue_time_ms);
173 
174  void executeDelete(const RelAlgNode* node,
175  const CompilationOptions& co,
176  const ExecutionOptions& eo_in,
177  const int64_t queue_time_ms);
178 
179  ExecutionResult executeCompound(const RelCompound*,
180  const CompilationOptions&,
181  const ExecutionOptions&,
182  RenderInfo*,
183  const int64_t queue_time_ms);
184 
185  ExecutionResult executeAggregate(const RelAggregate* aggregate,
186  const CompilationOptions& co,
187  const ExecutionOptions& eo,
188  RenderInfo* render_info,
189  const int64_t queue_time_ms);
190 
191  ExecutionResult executeProject(const RelProject*,
192  const CompilationOptions&,
193  const ExecutionOptions&,
194  RenderInfo*,
195  const int64_t queue_time_ms,
196  const std::optional<size_t> previous_count);
197 
198  ExecutionResult executeTableFunction(const RelTableFunction*,
199  const CompilationOptions&,
200  const ExecutionOptions&,
201  const int64_t queue_time_ms);
202 
203  // Computes the window function results to be used by the query.
204  void computeWindow(const RelAlgExecutionUnit& ra_exe_unit,
205  const CompilationOptions& co,
206  const ExecutionOptions& eo,
207  ColumnCacheMap& column_cache_map,
208  const int64_t queue_time_ms);
209 
210  // Creates the window context for the given window function.
211  std::unique_ptr<WindowFunctionContext> createWindowFunctionContext(
212  const Analyzer::WindowFunction* window_func,
213  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
214  const RelAlgExecutionUnit& ra_exe_unit,
215  const std::vector<InputTableInfo>& query_infos,
216  const CompilationOptions& co,
217  ColumnCacheMap& column_cache_map,
218  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner);
219 
220  ExecutionResult executeFilter(const RelFilter*,
221  const CompilationOptions&,
222  const ExecutionOptions&,
223  RenderInfo*,
224  const int64_t queue_time_ms);
225 
226  ExecutionResult executeSort(const RelSort*,
227  const CompilationOptions&,
228  const ExecutionOptions&,
229  RenderInfo*,
230  const int64_t queue_time_ms);
231 
232  ExecutionResult executeLogicalValues(const RelLogicalValues*, const ExecutionOptions&);
233 
234  ExecutionResult executeModify(const RelModify* modify, const ExecutionOptions& eo);
235 
236  ExecutionResult executeUnion(const RelLogicalUnion*,
237  const RaExecutionSequence&,
238  const CompilationOptions&,
239  const ExecutionOptions&,
240  RenderInfo*,
241  const int64_t queue_time_ms);
242 
243  // TODO(alex): just move max_groups_buffer_entry_guess to RelAlgExecutionUnit once
244  // we deprecate the plan-based executor paths and remove WorkUnit
245  struct WorkUnit {
247  const RelAlgNode* body;
249  std::unique_ptr<QueryRewriter> query_rewriter;
250  const std::vector<size_t> input_permutation;
251  const std::vector<size_t> left_deep_join_input_sizes;
252  };
253 
256  const RelAlgNode* body;
257  };
258 
259  WorkUnit createSortInputWorkUnit(const RelSort*, const ExecutionOptions& eo);
260 
261  ExecutionResult executeWorkUnit(
262  const WorkUnit& work_unit,
263  const std::vector<TargetMetaInfo>& targets_meta,
264  const bool is_agg,
265  const CompilationOptions& co_in,
266  const ExecutionOptions& eo,
267  RenderInfo*,
268  const int64_t queue_time_ms,
269  const std::optional<size_t> previous_count = std::nullopt);
270 
271  size_t getNDVEstimation(const WorkUnit& work_unit,
272  const int64_t range,
273  const bool is_agg,
274  const CompilationOptions& co,
275  const ExecutionOptions& eo);
276 
277  std::optional<size_t> getFilteredCountAll(const WorkUnit& work_unit,
278  const bool is_agg,
279  const CompilationOptions& co,
280  const ExecutionOptions& eo);
281 
282  FilterSelectivity getFilterSelectivity(
283  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
284  const CompilationOptions& co,
285  const ExecutionOptions& eo);
286 
287  std::vector<PushedDownFilterInfo> selectFiltersToBePushedDown(
288  const RelAlgExecutor::WorkUnit& work_unit,
289  const CompilationOptions& co,
290  const ExecutionOptions& eo);
291 
292  bool isRowidLookup(const WorkUnit& work_unit);
293 
294  ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit& work_unit,
295  const std::vector<TargetMetaInfo>& targets_meta,
296  const bool is_agg,
297  const CompilationOptions& co,
298  const ExecutionOptions& eo,
299  RenderInfo* render_info,
300  const bool was_multifrag_kernel_launch,
301  const int64_t queue_time_ms);
302 
303  // Allows an out of memory error through if CPU retry is enabled. Otherwise, throws an
304  // appropriate exception corresponding to the query error code.
305  static void handlePersistentError(const int32_t error_code);
306 
307  WorkUnit createWorkUnit(const RelAlgNode*, const SortInfo&, const ExecutionOptions& eo);
308 
309  WorkUnit createCompoundWorkUnit(const RelCompound*,
310  const SortInfo&,
311  const ExecutionOptions& eo);
312 
313  WorkUnit createAggregateWorkUnit(const RelAggregate*,
314  const SortInfo&,
315  const bool just_explain);
316 
317  WorkUnit createProjectWorkUnit(const RelProject*,
318  const SortInfo&,
319  const ExecutionOptions& eo);
320 
321  WorkUnit createFilterWorkUnit(const RelFilter*,
322  const SortInfo&,
323  const bool just_explain);
324 
325  WorkUnit createJoinWorkUnit(const RelJoin*, const SortInfo&, const bool just_explain);
326 
327  WorkUnit createUnionWorkUnit(const RelLogicalUnion*,
328  const SortInfo&,
329  const ExecutionOptions& eo);
330 
331  TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction* table_func,
332  const bool just_explain,
333  const bool is_gpu);
334 
335  void addTemporaryTable(const int table_id, const ResultSetPtr& result) {
336  CHECK_LT(size_t(0), result->colCount());
337  CHECK_LT(table_id, 0);
338  const auto it_ok = temporary_tables_.emplace(table_id, result);
339  CHECK(it_ok.second);
340  }
341 
342  void eraseFromTemporaryTables(const int table_id) { temporary_tables_.erase(table_id); }
343 
344  void handleNop(RaExecutionDesc& ed);
345 
346  JoinQualsPerNestingLevel translateLeftDeepJoinFilter(
347  const RelLeftDeepInnerJoin* join,
348  const std::vector<InputDescriptor>& input_descs,
349  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
350  const bool just_explain);
351 
352  // Transform the provided `join_condition` to conjunctive form, find composite
353  // key opportunities and finally translate it to an Analyzer expression.
354  std::list<std::shared_ptr<Analyzer::Expr>> makeJoinQuals(
355  const RexScalar* join_condition,
356  const std::vector<JoinType>& join_types,
357  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
358  const bool just_explain) const;
359 
360  Executor* executor_;
362  std::unique_ptr<RelAlgDagBuilder> query_dag_;
363  std::shared_ptr<const query_state::QueryState> query_state_;
365  time_t now_;
366  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_; // TODO(alex): remove
367  std::unordered_map<unsigned, AggregatedResult> leaf_results_;
368  int64_t queue_time_ms_;
370  static const size_t max_groups_buffer_entry_default_guess{16384};
371 
372  std::unique_ptr<TransactionParameters> dml_transaction_parameters_;
373  std::optional<std::function<void()>> post_execution_callback_;
374 
375  friend class PendingExecutionClosure;
376 };
377 
378 #endif // QUERYENGINE_RELALGEXECUTOR_H
bool is_agg(const Analyzer::Expr *expr)
int64_t queue_time_ms_
std::optional< std::function< void()> > post_execution_callback_
const std::vector< size_t > left_deep_join_input_sizes
void addLeafResult(const unsigned id, const AggregatedResult &result)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:101
const RelAlgNode * body
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
Streaming Top N algorithm.
static SpeculativeTopNBlacklist speculative_topn_blacklist_
RelAlgExecutionUnit exe_unit
TemporaryTables temporary_tables_
std::string join(T const &container, std::string const &delim)
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::unique_ptr< QueryRewriter > query_rewriter
std::unordered_map< unsigned, AggregatedResult > leaf_results_
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
bool g_skip_intermediate_count
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDagBuilder > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::string cat(Ts &&... args)
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
const std::vector< size_t > input_permutation
QueryHint getParsedQueryHints()
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::shared_ptr< const query_state::QueryState > query_state_
Speculative top N algorithm.
const MergeType merge_type
void eraseFromTemporaryTables(const int table_id)
const RelAlgNode & getRootRelAlgNode() const
TableFunctionExecutionUnit exe_unit
const size_t max_groups_buffer_entry_guess
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
#define CHECK(condition)
Definition: Logger.h:197
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
ExecutionResult result
Executor * executor_