OmniSciDB  2e3a973ef4
RelAlgExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef QUERYENGINE_RELALGEXECUTOR_H
18 #define QUERYENGINE_RELALGEXECUTOR_H
19 
22 #include "QueryEngine/Execute.h"
29 #include "Shared/scope.h"
31 
32 #include <ctime>
33 #include <sstream>
34 
35 #include "StorageIOFacility.h"
36 
37 extern bool g_skip_intermediate_count;
38 
39 enum class MergeType { Union, Reduce };
40 
44  const unsigned node_id;
46 };
47 
49  using ExecutorType = Executor;
52 };
53 
54 class RelAlgExecutor : private StorageIOFacility<RelAlgExecutorTraits> {
55  public:
56  using TargetInfoList = std::vector<TargetInfo>;
57 
58  RelAlgExecutor(Executor* executor,
60  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
61  : StorageIOFacility(executor, cat)
62  , executor_(executor)
63  , cat_(cat)
64  , query_state_(std::move(query_state))
65  , now_(0)
66  , queue_time_ms_(0) {}
67 
68  RelAlgExecutor(Executor* executor,
70  const std::string& query_ra,
71  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
72  : StorageIOFacility(executor, cat)
73  , executor_(executor)
74  , cat_(cat)
75  , query_dag_(std::make_unique<RelAlgDagBuilder>(query_ra, cat_, nullptr))
76  , query_state_(std::move(query_state))
77  , now_(0)
78  , queue_time_ms_(0) {}
79 
80  RelAlgExecutor(Executor* executor,
82  std::unique_ptr<RelAlgDagBuilder> query_dag,
83  std::shared_ptr<const query_state::QueryState> query_state = nullptr)
84  : StorageIOFacility(executor, cat)
85  , executor_(executor)
86  , cat_(cat)
87  , query_dag_(std::move(query_dag))
88  , query_state_(std::move(query_state))
89  , now_(0)
90  , queue_time_ms_(0) {}
91 
92  size_t getOuterFragmentCount(const CompilationOptions& co, const ExecutionOptions& eo);
93 
94  ExecutionResult executeRelAlgQuery(const CompilationOptions& co,
95  const ExecutionOptions& eo,
96  const bool just_explain_plan,
97  RenderInfo* render_info);
98 
99  ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence& seq,
100  const CompilationOptions& co,
101  const ExecutionOptions& eo,
102  RenderInfo* render_info,
103  const int64_t queue_time_ms);
104 
105  void prepareLeafExecution(
106  const AggregatedColRange& agg_col_range,
107  const StringDictionaryGenerations& string_dictionary_generations,
108  const TableGenerations& table_generations);
109 
110  ExecutionResult executeRelAlgSeq(const RaExecutionSequence& seq,
111  const CompilationOptions& co,
112  const ExecutionOptions& eo,
113  RenderInfo* render_info,
114  const int64_t queue_time_ms,
115  const bool with_existing_temp_tables = false);
116 
117  ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence& seq,
118  const std::pair<size_t, size_t> interval,
119  const CompilationOptions& co,
120  const ExecutionOptions& eo,
121  RenderInfo* render_info,
122  const int64_t queue_time_ms);
123 
124  FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence& seq,
125  const size_t step_idx,
126  const CompilationOptions& co,
127  const ExecutionOptions& eo,
128  RenderInfo* render_info);
129 
130  void addLeafResult(const unsigned id, const AggregatedResult& result) {
131  const auto it_ok = leaf_results_.emplace(id, result);
132  CHECK(it_ok.second);
133  }
134 
135  const RelAlgNode& getRootRelAlgNode() const {
136  CHECK(query_dag_);
137  return query_dag_->getRootNode();
138  }
139  const std::vector<std::shared_ptr<RexSubQuery>>& getSubqueries() const noexcept {
140  CHECK(query_dag_);
141  return query_dag_->getSubqueries();
142  };
144  CHECK(query_dag_);
145  return query_dag_->getQueryHints();
146  }
147 
148  ExecutionResult executeSimpleInsert(const Analyzer::Query& insert_query);
149 
150  AggregatedColRange computeColRangesCache();
151  StringDictionaryGenerations computeStringDictionaryGenerations();
152  TableGenerations computeTableGenerations();
153 
154  Executor* getExecutor() const;
155 
156  void cleanupPostExecution();
157 
158  static std::string getErrorMessageFromCode(const int32_t error_code);
159 
160  private:
161  ExecutionResult executeRelAlgQueryNoRetry(const CompilationOptions& co,
162  const ExecutionOptions& eo,
163  const bool just_explain_plan,
164  RenderInfo* render_info);
165 
166  void executeRelAlgStep(const RaExecutionSequence& seq,
167  const size_t step_idx,
168  const CompilationOptions&,
169  const ExecutionOptions&,
170  RenderInfo*,
171  const int64_t queue_time_ms);
172 
173  void executeUpdate(const RelAlgNode* node,
174  const CompilationOptions& co,
175  const ExecutionOptions& eo,
176  const int64_t queue_time_ms);
177 
178  void executeDelete(const RelAlgNode* node,
179  const CompilationOptions& co,
180  const ExecutionOptions& eo_in,
181  const int64_t queue_time_ms);
182 
183  ExecutionResult executeCompound(const RelCompound*,
184  const CompilationOptions&,
185  const ExecutionOptions&,
186  RenderInfo*,
187  const int64_t queue_time_ms);
188 
189  ExecutionResult executeAggregate(const RelAggregate* aggregate,
190  const CompilationOptions& co,
191  const ExecutionOptions& eo,
192  RenderInfo* render_info,
193  const int64_t queue_time_ms);
194 
195  ExecutionResult executeProject(const RelProject*,
196  const CompilationOptions&,
197  const ExecutionOptions&,
198  RenderInfo*,
199  const int64_t queue_time_ms,
200  const std::optional<size_t> previous_count);
201 
202  ExecutionResult executeTableFunction(const RelTableFunction*,
203  const CompilationOptions&,
204  const ExecutionOptions&,
205  const int64_t queue_time_ms);
206 
207  // Computes the window function results to be used by the query.
208  void computeWindow(const RelAlgExecutionUnit& ra_exe_unit,
209  const CompilationOptions& co,
210  const ExecutionOptions& eo,
211  ColumnCacheMap& column_cache_map,
212  const int64_t queue_time_ms);
213 
214  // Creates the window context for the given window function.
215  std::unique_ptr<WindowFunctionContext> createWindowFunctionContext(
216  const Analyzer::WindowFunction* window_func,
217  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
218  const RelAlgExecutionUnit& ra_exe_unit,
219  const std::vector<InputTableInfo>& query_infos,
220  const CompilationOptions& co,
221  ColumnCacheMap& column_cache_map,
222  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner);
223 
224  ExecutionResult executeFilter(const RelFilter*,
225  const CompilationOptions&,
226  const ExecutionOptions&,
227  RenderInfo*,
228  const int64_t queue_time_ms);
229 
230  ExecutionResult executeSort(const RelSort*,
231  const CompilationOptions&,
232  const ExecutionOptions&,
233  RenderInfo*,
234  const int64_t queue_time_ms);
235 
236  ExecutionResult executeLogicalValues(const RelLogicalValues*, const ExecutionOptions&);
237 
238  ExecutionResult executeModify(const RelModify* modify, const ExecutionOptions& eo);
239 
240  ExecutionResult executeUnion(const RelLogicalUnion*,
241  const RaExecutionSequence&,
242  const CompilationOptions&,
243  const ExecutionOptions&,
244  RenderInfo*,
245  const int64_t queue_time_ms);
246 
247  // TODO(alex): just move max_groups_buffer_entry_guess to RelAlgExecutionUnit once
248  // we deprecate the plan-based executor paths and remove WorkUnit
249  struct WorkUnit {
251  const RelAlgNode* body;
253  std::unique_ptr<QueryRewriter> query_rewriter;
254  const std::vector<size_t> input_permutation;
255  const std::vector<size_t> left_deep_join_input_sizes;
256  };
257 
260  const RelAlgNode* body;
261  };
262 
263  WorkUnit createSortInputWorkUnit(const RelSort*, const ExecutionOptions& eo);
264 
265  ExecutionResult executeWorkUnit(
266  const WorkUnit& work_unit,
267  const std::vector<TargetMetaInfo>& targets_meta,
268  const bool is_agg,
269  const CompilationOptions& co_in,
270  const ExecutionOptions& eo,
271  RenderInfo*,
272  const int64_t queue_time_ms,
273  const std::optional<size_t> previous_count = std::nullopt);
274 
275  size_t getNDVEstimation(const WorkUnit& work_unit,
276  const int64_t range,
277  const bool is_agg,
278  const CompilationOptions& co,
279  const ExecutionOptions& eo);
280 
281  std::optional<size_t> getFilteredCountAll(const WorkUnit& work_unit,
282  const bool is_agg,
283  const CompilationOptions& co,
284  const ExecutionOptions& eo);
285 
286  FilterSelectivity getFilterSelectivity(
287  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
288  const CompilationOptions& co,
289  const ExecutionOptions& eo);
290 
291  std::vector<PushedDownFilterInfo> selectFiltersToBePushedDown(
292  const RelAlgExecutor::WorkUnit& work_unit,
293  const CompilationOptions& co,
294  const ExecutionOptions& eo);
295 
296  bool isRowidLookup(const WorkUnit& work_unit);
297 
298  ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit& work_unit,
299  const std::vector<TargetMetaInfo>& targets_meta,
300  const bool is_agg,
301  const CompilationOptions& co,
302  const ExecutionOptions& eo,
303  RenderInfo* render_info,
304  const bool was_multifrag_kernel_launch,
305  const int64_t queue_time_ms);
306 
307  // Allows an out of memory error through if CPU retry is enabled. Otherwise, throws an
308  // appropriate exception corresponding to the query error code.
309  static void handlePersistentError(const int32_t error_code);
310 
311  WorkUnit createWorkUnit(const RelAlgNode*, const SortInfo&, const ExecutionOptions& eo);
312 
313  WorkUnit createCompoundWorkUnit(const RelCompound*,
314  const SortInfo&,
315  const ExecutionOptions& eo);
316 
317  WorkUnit createAggregateWorkUnit(const RelAggregate*,
318  const SortInfo&,
319  const bool just_explain);
320 
321  WorkUnit createProjectWorkUnit(const RelProject*,
322  const SortInfo&,
323  const ExecutionOptions& eo);
324 
325  WorkUnit createFilterWorkUnit(const RelFilter*,
326  const SortInfo&,
327  const bool just_explain);
328 
329  WorkUnit createJoinWorkUnit(const RelJoin*, const SortInfo&, const bool just_explain);
330 
331  WorkUnit createUnionWorkUnit(const RelLogicalUnion*,
332  const SortInfo&,
333  const ExecutionOptions& eo);
334 
335  TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction* table_func,
336  const bool just_explain);
337 
338  void addTemporaryTable(const int table_id, const ResultSetPtr& result) {
339  CHECK_LT(size_t(0), result->colCount());
340  CHECK_LT(table_id, 0);
341  const auto it_ok = temporary_tables_.emplace(table_id, result);
342  CHECK(it_ok.second);
343  }
344 
345  void eraseFromTemporaryTables(const int table_id) { temporary_tables_.erase(table_id); }
346 
347  void handleNop(RaExecutionDesc& ed);
348 
349  JoinQualsPerNestingLevel translateLeftDeepJoinFilter(
350  const RelLeftDeepInnerJoin* join,
351  const std::vector<InputDescriptor>& input_descs,
352  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
353  const bool just_explain);
354 
355  // Transform the provided `join_condition` to conjunctive form, find composite
356  // key opportunities and finally translate it to an Analyzer expression.
357  std::list<std::shared_ptr<Analyzer::Expr>> makeJoinQuals(
358  const RexScalar* join_condition,
359  const std::vector<JoinType>& join_types,
360  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
361  const bool just_explain) const;
362 
363  Executor* executor_;
365  std::unique_ptr<RelAlgDagBuilder> query_dag_;
366  std::shared_ptr<const query_state::QueryState> query_state_;
368  time_t now_;
369  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_; // TODO(alex): remove
370  std::unordered_map<unsigned, AggregatedResult> leaf_results_;
371  int64_t queue_time_ms_;
373  static const size_t max_groups_buffer_entry_default_guess{16384};
374 
375  friend class PendingExecutionClosure;
376 };
377 
378 #endif // QUERYENGINE_RELALGEXECUTOR_H
bool is_agg(const Analyzer::Expr *expr)
int64_t queue_time_ms_
const std::vector< size_t > left_deep_join_input_sizes
void addLeafResult(const unsigned id, const AggregatedResult &result)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:97
const RelAlgNode * body
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > >> ColumnCacheMap
Streaming Top N algorithm.
static SpeculativeTopNBlacklist speculative_topn_blacklist_
RelAlgExecutionUnit exe_unit
TemporaryTables temporary_tables_
std::string join(T const &container, std::string const &delim)
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, const std::string &query_ra, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::unique_ptr< QueryRewriter > query_rewriter
std::unordered_map< unsigned, AggregatedResult > leaf_results_
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
bool g_skip_intermediate_count
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::unique_ptr< RelAlgDagBuilder > query_dag, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
std::string cat(Ts &&... args)
const QueryHint getParsedQueryHints() const
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
const std::vector< size_t > input_permutation
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::shared_ptr< const query_state::QueryState > query_state_
Speculative top N algorithm.
void eraseFromTemporaryTables(const int table_id)
const RelAlgNode & getRootRelAlgNode() const
TableFunctionExecutionUnit exe_unit
ExecutionResult result
const size_t max_groups_buffer_entry_guess
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat, std::shared_ptr< const query_state::QueryState > query_state=nullptr)
#define CHECK(condition)
Definition: Logger.h:197
const MergeType merge_type
std::unique_ptr< RelAlgDagBuilder > query_dag_
specifies the content in-memory of a row in the table metadata table
Executor * executor_