OmniSciDB  29e35f4d58
RelAlgExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef QUERYENGINE_RELALGEXECUTOR_H
18 #define QUERYENGINE_RELALGEXECUTOR_H
19 
20 #include "../Shared/scope.h"
23 #include "Execute.h"
24 #include "InputMetadata.h"
25 #include "JoinFilterPushDown.h"
27 #include "QueryRewrite.h"
28 #include "SpeculativeTopN.h"
29 #include "StreamingTopN.h"
31 
32 #include <ctime>
33 #include <sstream>
34 
35 #include "StorageIOFacility.h"
36 
37 extern bool g_skip_intermediate_count;
38 
39 enum class MergeType { Union, Reduce };
40 
44  const unsigned node_id;
46 };
47 
49  using ExecutorType = Executor;
52 };
53 
54 class RelAlgExecutor : private StorageIOFacility<RelAlgExecutorTraits> {
55  public:
56  using TargetInfoList = std::vector<TargetInfo>;
57 
58  RelAlgExecutor(Executor* executor,
59  const Catalog_Namespace::Catalog& cat,
60  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
61  : StorageIOFacility(executor, cat)
62  , executor_(executor)
63  , cat_(cat)
64  , query_state_(std::move(query_state))
65  , now_(0)
66  , queue_time_ms_(0) {}
67 
68  RelAlgExecutor(Executor* executor,
69  const Catalog_Namespace::Catalog& cat,
70  const std::string& query_ra,
71  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
72  : StorageIOFacility(executor, cat)
73  , executor_(executor)
74  , cat_(cat)
75  , query_dag_(std::make_unique<RelAlgDagBuilder>(query_ra, cat_, nullptr))
76  , query_state_(std::move(query_state))
77  , now_(0)
78  , queue_time_ms_(0) {}
79 
80  RelAlgExecutor(Executor* executor,
81  const Catalog_Namespace::Catalog& cat,
82  std::unique_ptr<RelAlgDagBuilder> query_dag,
83  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
84  : StorageIOFacility(executor, cat)
85  , executor_(executor)
86  , cat_(cat)
87  , query_dag_(std::move(query_dag))
88  , query_state_(std::move(query_state))
89  , now_(0)
90  , queue_time_ms_(0) {}
91 
92  ExecutionResult executeRelAlgQuery(const CompilationOptions& co,
93  const ExecutionOptions& eo,
94  RenderInfo* render_info);
95 
96  ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence& seq,
97  const CompilationOptions& co,
98  const ExecutionOptions& eo,
99  RenderInfo* render_info,
100  const int64_t queue_time_ms);
101 
102  void prepareLeafExecution(
103  const AggregatedColRange& agg_col_range,
104  const StringDictionaryGenerations& string_dictionary_generations,
105  const TableGenerations& table_generations);
106 
107  ExecutionResult executeRelAlgSeq(const RaExecutionSequence& seq,
108  const CompilationOptions& co,
109  const ExecutionOptions& eo,
110  RenderInfo* render_info,
111  const int64_t queue_time_ms,
112  const bool with_existing_temp_tables = false);
113 
114  ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence& seq,
115  const std::pair<size_t, size_t> interval,
116  const CompilationOptions& co,
117  const ExecutionOptions& eo,
118  RenderInfo* render_info,
119  const int64_t queue_time_ms);
120 
121  FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence& seq,
122  const size_t step_idx,
123  const CompilationOptions& co,
124  const ExecutionOptions& eo,
125  RenderInfo* render_info);
126 
127  void addLeafResult(const unsigned id, const AggregatedResult& result) {
128  const auto it_ok = leaf_results_.emplace(id, result);
129  CHECK(it_ok.second);
130  }
131 
132  const RelAlgNode& getRootRelAlgNode() const {
133  CHECK(query_dag_);
134  return query_dag_->getRootNode();
135  }
136  const std::vector<std::shared_ptr<RexSubQuery>>& getSubqueries() const noexcept {
137  CHECK(query_dag_);
138  return query_dag_->getSubqueries();
139  };
140 
141  AggregatedColRange computeColRangesCache();
142  StringDictionaryGenerations computeStringDictionaryGenerations();
143  TableGenerations computeTableGenerations();
144 
145  Executor* getExecutor() const;
146 
147  void cleanupPostExecution();
148 
149  static std::string getErrorMessageFromCode(const int32_t error_code);
150 
151  private:
152  ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions& co,
153  const ExecutionOptions& eo,
154  RenderInfo* render_info);
155 
156  void executeRelAlgStep(const RaExecutionSequence& seq,
157  const size_t step_idx,
158  const CompilationOptions&,
159  const ExecutionOptions&,
160  RenderInfo*,
161  const int64_t queue_time_ms);
162 
163  void executeUpdateViaCompound(const RelCompound* compound,
164  const CompilationOptions& co,
165  const ExecutionOptions& eo,
166  RenderInfo* render_info,
167  const int64_t queue_time_ms);
168  void executeUpdateViaProject(const RelProject*,
169  const CompilationOptions&,
170  const ExecutionOptions&,
171  RenderInfo*,
172  const int64_t queue_time_ms);
173 
174  void executeDeleteViaCompound(const RelCompound* compound,
175  const CompilationOptions& co,
176  const ExecutionOptions& eo,
177  RenderInfo* render_info,
178  const int64_t queue_time_ms);
179  void executeDeleteViaProject(const RelProject*,
180  const CompilationOptions&,
181  const ExecutionOptions&,
182  RenderInfo*,
183  const int64_t queue_time_ms);
184 
185  ExecutionResult executeCompound(const RelCompound*,
186  const CompilationOptions&,
187  const ExecutionOptions&,
188  RenderInfo*,
189  const int64_t queue_time_ms);
190 
191  ExecutionResult executeAggregate(const RelAggregate* aggregate,
192  const CompilationOptions& co,
193  const ExecutionOptions& eo,
194  RenderInfo* render_info,
195  const int64_t queue_time_ms);
196 
197  ExecutionResult executeProject(const RelProject*,
198  const CompilationOptions&,
199  const ExecutionOptions&,
200  RenderInfo*,
201  const int64_t queue_time_ms,
202  const ssize_t previous_count);
203 
204  ExecutionResult executeTableFunction(const RelTableFunction*,
205  const CompilationOptions&,
206  const ExecutionOptions&,
207  const int64_t queue_time_ms);
208 
209  // Computes the window function results to be used by the query.
210  void computeWindow(const RelAlgExecutionUnit& ra_exe_unit,
211  const CompilationOptions& co,
212  const ExecutionOptions& eo,
213  ColumnCacheMap& column_cache_map,
214  const int64_t queue_time_ms);
215 
216  // Creates the window context for the given window function.
217  std::unique_ptr<WindowFunctionContext> createWindowFunctionContext(
218  const Analyzer::WindowFunction* window_func,
219  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
220  const RelAlgExecutionUnit& ra_exe_unit,
221  const std::vector<InputTableInfo>& query_infos,
222  const CompilationOptions& co,
223  ColumnCacheMap& column_cache_map);
224 
225  ExecutionResult executeFilter(const RelFilter*,
226  const CompilationOptions&,
227  const ExecutionOptions&,
228  RenderInfo*,
229  const int64_t queue_time_ms);
230 
231  ExecutionResult executeSort(const RelSort*,
232  const CompilationOptions&,
233  const ExecutionOptions&,
234  RenderInfo*,
235  const int64_t queue_time_ms);
236 
237  ExecutionResult executeLogicalValues(const RelLogicalValues*, const ExecutionOptions&);
238  ExecutionResult executeModify(const RelModify* modify, const ExecutionOptions& eo);
239 
240  // TODO(alex): just move max_groups_buffer_entry_guess to RelAlgExecutionUnit once
241  // we deprecate the plan-based executor paths and remove WorkUnit
242  struct WorkUnit {
244  const RelAlgNode* body;
246  std::unique_ptr<QueryRewriter> query_rewriter;
247  const std::vector<size_t> input_permutation;
248  const std::vector<size_t> left_deep_join_input_sizes;
249  };
250 
253  const RelAlgNode* body;
254  };
255 
256  WorkUnit createSortInputWorkUnit(const RelSort*, const bool just_explain);
257 
258  ExecutionResult executeWorkUnit(const WorkUnit& work_unit,
259  const std::vector<TargetMetaInfo>& targets_meta,
260  const bool is_agg,
261  const CompilationOptions& co_in,
262  const ExecutionOptions& eo,
263  RenderInfo*,
264  const int64_t queue_time_ms,
265  const ssize_t previous_count = -1);
266 
267  size_t getNDVEstimation(const WorkUnit& work_unit,
268  const bool is_agg,
269  const CompilationOptions& co,
270  const ExecutionOptions& eo);
271 
272  ssize_t getFilteredCountAll(const WorkUnit& work_unit,
273  const bool is_agg,
274  const CompilationOptions& co,
275  const ExecutionOptions& eo);
276 
277  FilterSelectivity getFilterSelectivity(
278  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
279  const CompilationOptions& co,
280  const ExecutionOptions& eo);
281 
282  std::vector<PushedDownFilterInfo> selectFiltersToBePushedDown(
283  const RelAlgExecutor::WorkUnit& work_unit,
284  const CompilationOptions& co,
285  const ExecutionOptions& eo);
286 
287  bool isRowidLookup(const WorkUnit& work_unit);
288 
289  ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit& work_unit,
290  const std::vector<TargetMetaInfo>& targets_meta,
291  const bool is_agg,
292  const CompilationOptions& co,
293  const ExecutionOptions& eo,
294  RenderInfo* render_info,
295  const bool was_multifrag_kernel_launch,
296  const int64_t queue_time_ms);
297 
298  // Allows an out of memory error through if CPU retry is enabled. Otherwise, throws an
299  // appropriate exception corresponding to the query error code.
300  static void handlePersistentError(const int32_t error_code);
301 
302  WorkUnit createWorkUnit(const RelAlgNode*, const SortInfo&, const bool just_explain);
303 
304  WorkUnit createCompoundWorkUnit(const RelCompound*,
305  const SortInfo&,
306  const bool just_explain);
307 
308  WorkUnit createAggregateWorkUnit(const RelAggregate*,
309  const SortInfo&,
310  const bool just_explain);
311 
312  WorkUnit createProjectWorkUnit(const RelProject*,
313  const SortInfo&,
314  const bool just_explain);
315 
316  WorkUnit createFilterWorkUnit(const RelFilter*,
317  const SortInfo&,
318  const bool just_explain);
319 
320  WorkUnit createJoinWorkUnit(const RelJoin*, const SortInfo&, const bool just_explain);
321 
322  TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction* table_func,
323  const bool just_explain);
324 
325  void addTemporaryTable(const int table_id, const ResultSetPtr& result) {
326  CHECK_LT(size_t(0), result->colCount());
327  CHECK_LT(table_id, 0);
328  const auto it_ok = temporary_tables_.emplace(table_id, result);
329  CHECK(it_ok.second);
330  }
331 
332  void eraseFromTemporaryTables(const int table_id) { temporary_tables_.erase(table_id); }
333 
334  void handleNop(RaExecutionDesc& ed);
335 
336  JoinQualsPerNestingLevel translateLeftDeepJoinFilter(
337  const RelLeftDeepInnerJoin* join,
338  const std::vector<InputDescriptor>& input_descs,
339  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
340  const bool just_explain);
341 
342  // Transform the provided `join_condition` to conjunctive form, find composite
343  // key opportunities and finally translate it to an Analyzer expression.
344  std::list<std::shared_ptr<Analyzer::Expr>> makeJoinQuals(
345  const RexScalar* join_condition,
346  const std::vector<JoinType>& join_types,
347  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
348  const bool just_explain) const;
349 
350  Executor* executor_;
352  std::unique_ptr<RelAlgDagBuilder> query_dag_;
353  std::shared_ptr<const query_state::QueryState> query_state_;
355  time_t now_;
356  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_; // TODO(alex): remove
357  std::unordered_map<unsigned, AggregatedResult> leaf_results_;
358  int64_t queue_time_ms_;
360  static const size_t max_groups_buffer_entry_default_guess{16384};
361 
362  friend class PendingExecutionClosure;
363 };
364 
365 #endif // QUERYENGINE_RELALGEXECUTOR_H
bool is_agg(const Analyzer::Expr *expr)
int64_t queue_time_ms_
const std::vector< size_t > left_deep_join_input_sizes
void addLeafResult(const unsigned id, const AggregatedResult &result)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
const RelAlgNode * body
Streaming Top N algorithm.
static SpeculativeTopNBlacklist speculative_topn_blacklist_
RelAlgExecutionUnit exe_unit
TemporaryTables temporary_tables_
std::string join(T const &container, std::string const &delim)
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::unique_ptr< QueryRewriter > query_rewriter
std::unordered_map< unsigned, AggregatedResult > leaf_results_
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
bool g_skip_intermediate_count
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDagBuilder > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
const std::vector< size_t > input_permutation
#define CHECK_LT(x, y)
Definition: Logger.h:203
std::shared_ptr< const query_state::QueryState > query_state_
Speculative top N algorithm.
void eraseFromTemporaryTables(const int table_id)
const RelAlgNode & getRootRelAlgNode() const
TableFunctionExecutionUnit exe_unit
ExecutionResult result
const size_t max_groups_buffer_entry_guess
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
#define CHECK(condition)
Definition: Logger.h:193
const MergeType merge_type
std::unique_ptr< RelAlgDagBuilder > query_dag_
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
specifies the content in-memory of a row in the table metadata table
Executor * executor_