OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef QUERYENGINE_RELALGEXECUTOR_H
18 #define QUERYENGINE_RELALGEXECUTOR_H
19 
22 #include "QueryEngine/Execute.h"
29 #include "Shared/scope.h"
31 
32 #include <ctime>
33 #include <sstream>
34 
35 #include "StorageIOFacility.h"
36 
37 extern bool g_skip_intermediate_count;
38 
39 enum class MergeType { Union, Reduce };
40 
44  const unsigned node_id;
46 };
47 
49  public:
50  using TargetInfoList = std::vector<TargetInfo>;
51 
52  RelAlgExecutor(Executor* executor,
54  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
55  : StorageIOFacility(executor, cat)
56  , executor_(executor)
57  , cat_(cat)
58  , query_state_(std::move(query_state))
59  , now_(0)
60  , queue_time_ms_(0) {}
61 
62  RelAlgExecutor(Executor* executor,
64  const std::string& query_ra,
65  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
66  : StorageIOFacility(executor, cat)
67  , executor_(executor)
68  , cat_(cat)
69  , query_dag_(std::make_unique<RelAlgDagBuilder>(query_ra, cat_, nullptr))
70  , query_state_(std::move(query_state))
71  , now_(0)
72  , queue_time_ms_(0) {}
73 
74  RelAlgExecutor(Executor* executor,
76  std::unique_ptr<RelAlgDagBuilder> query_dag,
77  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
78  : StorageIOFacility(executor, cat)
79  , executor_(executor)
80  , cat_(cat)
81  , query_dag_(std::move(query_dag))
82  , query_state_(std::move(query_state))
83  , now_(0)
84  , queue_time_ms_(0) {}
85 
86  size_t getOuterFragmentCount(const CompilationOptions& co, const ExecutionOptions& eo);
87 
89  const ExecutionOptions& eo,
90  const bool just_explain_plan,
91  RenderInfo* render_info);
92 
94  const CompilationOptions& co,
95  const ExecutionOptions& eo,
96  RenderInfo* render_info,
97  const int64_t queue_time_ms);
98 
100  const AggregatedColRange& agg_col_range,
101  const StringDictionaryGenerations& string_dictionary_generations,
102  const TableGenerations& table_generations);
103 
105  const CompilationOptions& co,
106  const ExecutionOptions& eo,
107  RenderInfo* render_info,
108  const int64_t queue_time_ms,
109  const bool with_existing_temp_tables = false);
110 
112  const std::pair<size_t, size_t> interval,
113  const CompilationOptions& co,
114  const ExecutionOptions& eo,
115  RenderInfo* render_info,
116  const int64_t queue_time_ms);
117 
119  const size_t step_idx,
120  const CompilationOptions& co,
121  const ExecutionOptions& eo,
122  RenderInfo* render_info);
123 
124  void addLeafResult(const unsigned id, const AggregatedResult& result) {
125  const auto it_ok = leaf_results_.emplace(id, result);
126  CHECK(it_ok.second);
127  }
128 
129  const RelAlgNode& getRootRelAlgNode() const {
130  CHECK(query_dag_);
131  return query_dag_->getRootNode();
132  }
133 
134  std::shared_ptr<const RelAlgNode> getRootRelAlgNodeShPtr() const {
135  CHECK(query_dag_);
136  return query_dag_->getRootNodeShPtr();
137  }
138 
139  std::pair<std::vector<unsigned>, std::unordered_map<unsigned, JoinQualsPerNestingLevel>>
140  getJoinInfo(const RelAlgNode* root_node);
141 
142  std::shared_ptr<RelAlgTranslator> getRelAlgTranslator(const RelAlgNode* root_node);
143 
144  const std::vector<std::shared_ptr<RexSubQuery>>& getSubqueries() const noexcept {
145  CHECK(query_dag_);
146  return query_dag_->getSubqueries();
147  };
148 
149  std::optional<RegisteredQueryHint> getParsedQueryHint(const RelAlgNode* node) {
150  return query_dag_ ? query_dag_->getQueryHint(node) : std::nullopt;
151  }
152 
154  std::unordered_map<size_t, std::unordered_map<unsigned, RegisteredQueryHint>>>
156  return query_dag_ ? std::make_optional(query_dag_->getQueryHints()) : std::nullopt;
157  }
158 
159  std::optional<RegisteredQueryHint> getGlobalQueryHint() {
160  return query_dag_ ? std::make_optional(query_dag_->getGlobalHints()) : std::nullopt;
161  }
162 
164 
168 
169  Executor* getExecutor() const;
170 
171  void cleanupPostExecution();
172 
173  static std::string getErrorMessageFromCode(const int32_t error_code);
174 
176 
177  private:
179  const ExecutionOptions& eo,
180  const bool just_explain_plan,
181  RenderInfo* render_info);
182 
183  void executeRelAlgStep(const RaExecutionSequence& seq,
184  const size_t step_idx,
185  const CompilationOptions&,
186  const ExecutionOptions&,
187  RenderInfo*,
188  const int64_t queue_time_ms);
189 
190  void executeUpdate(const RelAlgNode* node,
191  const CompilationOptions& co,
192  const ExecutionOptions& eo,
193  const int64_t queue_time_ms);
194 
195  void executeDelete(const RelAlgNode* node,
196  const CompilationOptions& co,
197  const ExecutionOptions& eo_in,
198  const int64_t queue_time_ms);
199 
201  const CompilationOptions&,
202  const ExecutionOptions&,
203  RenderInfo*,
204  const int64_t queue_time_ms);
205 
207  const CompilationOptions& co,
208  const ExecutionOptions& eo,
209  RenderInfo* render_info,
210  const int64_t queue_time_ms);
211 
213  const CompilationOptions&,
214  const ExecutionOptions&,
215  RenderInfo*,
216  const int64_t queue_time_ms,
217  const std::optional<size_t> previous_count);
218 
220  const CompilationOptions&,
221  const ExecutionOptions&,
222  const int64_t queue_time_ms);
223 
224  // Computes the window function results to be used by the query.
225  void computeWindow(const RelAlgExecutionUnit& ra_exe_unit,
226  const CompilationOptions& co,
227  const ExecutionOptions& eo,
228  ColumnCacheMap& column_cache_map,
229  const int64_t queue_time_ms);
230 
231  // Creates the window context for the given window function.
232  std::unique_ptr<WindowFunctionContext> createWindowFunctionContext(
233  const Analyzer::WindowFunction* window_func,
234  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
235  const RelAlgExecutionUnit& ra_exe_unit,
236  const std::vector<InputTableInfo>& query_infos,
237  const CompilationOptions& co,
238  ColumnCacheMap& column_cache_map,
239  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner);
240 
242  const CompilationOptions&,
243  const ExecutionOptions&,
244  RenderInfo*,
245  const int64_t queue_time_ms);
246 
248  const CompilationOptions&,
249  const ExecutionOptions&,
250  RenderInfo*,
251  const int64_t queue_time_ms);
252 
254 
255  ExecutionResult executeModify(const RelModify* modify, const ExecutionOptions& eo);
256 
258  const RaExecutionSequence&,
259  const CompilationOptions&,
260  const ExecutionOptions&,
261  RenderInfo*,
262  const int64_t queue_time_ms);
263 
264  // TODO(alex): just move max_groups_buffer_entry_guess to RelAlgExecutionUnit once
265  // we deprecate the plan-based executor paths and remove WorkUnit
266  struct WorkUnit {
268  const RelAlgNode* body;
270  std::unique_ptr<QueryRewriter> query_rewriter;
271  const std::vector<size_t> input_permutation;
272  const std::vector<size_t> left_deep_join_input_sizes;
273  };
274 
277  const RelAlgNode* body;
278  };
279 
281 
283  const WorkUnit& work_unit,
284  const std::vector<TargetMetaInfo>& targets_meta,
285  const bool is_agg,
286  const CompilationOptions& co_in,
287  const ExecutionOptions& eo_in,
288  RenderInfo*,
289  const int64_t queue_time_ms,
290  const std::optional<size_t> previous_count = std::nullopt);
291 
292  size_t getNDVEstimation(const WorkUnit& work_unit,
293  const int64_t range,
294  const bool is_agg,
295  const CompilationOptions& co,
296  const ExecutionOptions& eo);
297 
298  std::optional<size_t> getFilteredCountAll(const WorkUnit& work_unit,
299  const bool is_agg,
300  const CompilationOptions& co,
301  const ExecutionOptions& eo);
302 
304  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
305  const CompilationOptions& co,
306  const ExecutionOptions& eo);
307 
308  std::vector<PushedDownFilterInfo> selectFiltersToBePushedDown(
309  const RelAlgExecutor::WorkUnit& work_unit,
310  const CompilationOptions& co,
311  const ExecutionOptions& eo);
312 
313  bool isRowidLookup(const WorkUnit& work_unit);
314 
316  const std::vector<TargetMetaInfo>& targets_meta,
317  const bool is_agg,
318  const CompilationOptions& co,
319  const ExecutionOptions& eo,
320  RenderInfo* render_info,
321  const bool was_multifrag_kernel_launch,
322  const int64_t queue_time_ms);
323 
324  // Allows an out of memory error through if CPU retry is enabled. Otherwise, throws an
325  // appropriate exception corresponding to the query error code.
326  static void handlePersistentError(const int32_t error_code);
327 
328  WorkUnit createWorkUnit(const RelAlgNode*, const SortInfo&, const ExecutionOptions& eo);
329 
331  const SortInfo&,
332  const ExecutionOptions& eo);
333 
335  const SortInfo&,
336  const bool just_explain);
337 
339  const SortInfo&,
340  const ExecutionOptions& eo);
341 
343  const SortInfo&,
344  const bool just_explain);
345 
346  WorkUnit createJoinWorkUnit(const RelJoin*, const SortInfo&, const bool just_explain);
347 
349  const SortInfo&,
350  const ExecutionOptions& eo);
351 
353  const bool just_explain,
354  const bool is_gpu);
355 
356  void addTemporaryTable(const int table_id, const ResultSetPtr& result) {
357  CHECK_LT(size_t(0), result->colCount());
358  CHECK_LT(table_id, 0);
359  const auto it_ok = temporary_tables_.emplace(table_id, result);
360  CHECK(it_ok.second);
361  }
362 
363  void eraseFromTemporaryTables(const int table_id) { temporary_tables_.erase(table_id); }
364 
365  void handleNop(RaExecutionDesc& ed);
366 
367  std::unordered_map<unsigned, JoinQualsPerNestingLevel>& getLeftDeepJoinTreesInfo() {
368  return left_deep_join_info_;
369  }
370 
372  const RelLeftDeepInnerJoin* join,
373  const std::vector<InputDescriptor>& input_descs,
374  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
375  const bool just_explain);
376 
377  // Transform the provided `join_condition` to conjunctive form, find composite
378  // key opportunities and finally translate it to an Analyzer expression.
379  std::list<std::shared_ptr<Analyzer::Expr>> makeJoinQuals(
380  const RexScalar* join_condition,
381  const std::vector<JoinType>& join_types,
382  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
383  const bool just_explain) const;
384 
385  Executor* executor_;
387  std::unique_ptr<RelAlgDagBuilder> query_dag_;
388  std::shared_ptr<const query_state::QueryState> query_state_;
390  time_t now_;
391  std::unordered_map<unsigned, JoinQualsPerNestingLevel> left_deep_join_info_;
392  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_; // TODO(alex): remove
393  std::unordered_map<unsigned, AggregatedResult> leaf_results_;
394  int64_t queue_time_ms_;
396 
397  std::unique_ptr<TransactionParameters> dml_transaction_parameters_;
398  std::optional<std::function<void()>> post_execution_callback_;
399 
401 };
402 
403 #endif // QUERYENGINE_RELALGEXECUTOR_H
bool is_agg(const Analyzer::Expr *expr)
int64_t queue_time_ms_
std::optional< std::function< void()> > post_execution_callback_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const std::vector< size_t > left_deep_join_input_sizes
std::string cat(Ts &&...args)
void addLeafResult(const unsigned id, const AggregatedResult &result)
AggregatedColRange computeColRangesCache()
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:114
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain, const bool is_gpu)
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
bool g_skip_intermediate_count
#define const
Streaming Top N algorithm.
TableGenerations computeTableGenerations()
static SpeculativeTopNBlacklist speculative_topn_blacklist_
RelAlgExecutionUnit exe_unit
TemporaryTables temporary_tables_
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
FilterSelectivity getFilterSelectivity(const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
std::string join(T const &container, std::string const &delim)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > & getLeftDeepJoinTreesInfo()
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
std::optional< std::unordered_map< size_t, std::unordered_map< unsigned, RegisteredQueryHint > > > getParsedQueryHints()
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::unique_ptr< QueryRewriter > query_rewriter
std::unordered_map< unsigned, AggregatedResult > leaf_results_
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
std::optional< RegisteredQueryHint > getParsedQueryHint(const RelAlgNode *node)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
QueryStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
void handleNop(RaExecutionDesc &ed)
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDagBuilder > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::shared_ptr< const RelAlgNode > getRootRelAlgNodeShPtr() const
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const ExecutionOptions &eo)
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const ExecutionOptions &eo)
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
size_t getOuterFragmentCount(const CompilationOptions &co, const ExecutionOptions &eo)
MergeType
void executeUpdate(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo, const int64_t queue_time_ms)
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
friend class PendingExecutionClosure
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
void executePostExecutionCallback()
size_t getNDVEstimation(const WorkUnit &work_unit, const int64_t range, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const std::vector< size_t > input_permutation
WorkUnit createUnionWorkUnit(const RelLogicalUnion *, const SortInfo &, const ExecutionOptions &eo)
ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::optional< size_t > getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
Executor * getExecutor() const
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults >>> ColumnCacheMap
std::optional< RegisteredQueryHint > getGlobalQueryHint()
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
std::unordered_map< unsigned, JoinQualsPerNestingLevel > left_deep_join_info_
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo_in, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count=std::nullopt)
#define CHECK_LT(x, y)
Definition: Logger.h:221
ExecutionResult executeSimpleInsert(const Analyzer::Query &insert_query)
WorkUnit createJoinWorkUnit(const RelJoin *, const SortInfo &, const bool just_explain)
ExecutionResult executeUnion(const RelLogicalUnion *, const RaExecutionSequence &, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::shared_ptr< const query_state::QueryState > query_state_
Speculative top N algorithm.
const MergeType merge_type
const RelAlgNode & getRootRelAlgNode() const
void eraseFromTemporaryTables(const int table_id)
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const std::optional< size_t > previous_count)
TableFunctionExecutionUnit exe_unit
static void handlePersistentError(const int32_t error_code)
const size_t max_groups_buffer_entry_guess
StringDictionaryGenerations computeStringDictionaryGenerations()
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
ExecutionResult executeRelAlgQuery(const CompilationOptions &co, const ExecutionOptions &eo, const bool just_explain_plan, RenderInfo *render_info)
#define CHECK(condition)
Definition: Logger.h:211
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::unique_ptr< RelAlgDagBuilder > query_dag_
static std::string getErrorMessageFromCode(const int32_t error_code)
void cleanupPostExecution()
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
std::unique_ptr< TransactionParameters > dml_transaction_parameters_
std::pair< std::vector< unsigned >, std::unordered_map< unsigned, JoinQualsPerNestingLevel > > getJoinInfo(const RelAlgNode *root_node)
void executeDelete(const RelAlgNode *node, const CompilationOptions &co, const ExecutionOptions &eo_in, const int64_t queue_time_ms)
ExecutionResult result
std::shared_ptr< RelAlgTranslator > getRelAlgTranslator(const RelAlgNode *root_node)
Executor * executor_
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
WorkUnit createSortInputWorkUnit(const RelSort *, const ExecutionOptions &eo)
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)