OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef QUERYENGINE_RELALGEXECUTOR_H
18 #define QUERYENGINE_RELALGEXECUTOR_H
19 
20 #include "../Shared/scope.h"
23 #include "Execute.h"
24 #include "InputMetadata.h"
25 #include "JoinFilterPushDown.h"
26 #include "QueryRewrite.h"
27 #include "SpeculativeTopN.h"
28 #include "StreamingTopN.h"
29 
30 #include <ctime>
31 #include <sstream>
32 
33 #include "StorageIOFacility.h"
34 
35 extern bool g_skip_intermediate_count;
36 
37 enum class MergeType { Union, Reduce };
38 
42  const unsigned node_id;
44 };
45 
47  using ExecutorType = Executor;
50 };
51 
52 class RelAlgExecutor : private StorageIOFacility<RelAlgExecutorTraits> {
53  public:
54  using TargetInfoList = std::vector<TargetInfo>;
55 
56  RelAlgExecutor(Executor* executor, const Catalog_Namespace::Catalog& cat)
57  : StorageIOFacility(executor, cat)
58  , executor_(executor)
59  , cat_(cat)
60  , now_(0)
61  , queue_time_ms_(0) {}
62 
63  ExecutionResult executeRelAlgQuery(const std::string& query_ra,
64  const CompilationOptions& co,
65  const ExecutionOptions& eo,
66  RenderInfo* render_info);
67 
69  const CompilationOptions& co,
70  const ExecutionOptions& eo,
71  RenderInfo* render_info,
72  const int64_t queue_time_ms);
73 
75  const AggregatedColRange& agg_col_range,
76  const StringDictionaryGenerations& string_dictionary_generations,
77  const TableGenerations& table_generations);
78 
80  const CompilationOptions& co,
81  const ExecutionOptions& eo,
82  RenderInfo* render_info,
83  const int64_t queue_time_ms,
84  const bool with_existing_temp_tables = false);
85 
87  const std::pair<size_t, size_t> interval,
88  const CompilationOptions& co,
89  const ExecutionOptions& eo,
90  RenderInfo* render_info,
91  const int64_t queue_time_ms);
92 
94  const size_t step_idx,
95  const CompilationOptions& co,
96  const ExecutionOptions& eo,
97  RenderInfo* render_info);
98 
99  void addLeafResult(const unsigned id, const AggregatedResult& result) {
100  const auto it_ok = leaf_results_.emplace(id, result);
101  CHECK(it_ok.second);
102  }
103 
104  void registerSubquery(std::shared_ptr<RexSubQuery> subquery) noexcept {
105  subqueries_.push_back(subquery);
106  }
107 
108  const std::vector<std::shared_ptr<RexSubQuery>>& getSubqueries() const noexcept {
109  return subqueries_;
110  };
111 
113 
115 
117 
118  Executor* getExecutor() const;
119 
120  void cleanupPostExecution();
121 
122  static std::string getErrorMessageFromCode(const int32_t error_code);
123 
124  private:
125  ExecutionResult executeRelAlgQueryNoRetry(const std::string& query_ra,
126  const CompilationOptions& co,
127  const ExecutionOptions& eo,
128  RenderInfo* render_info);
129 
130  void executeRelAlgStep(const RaExecutionSequence& seq,
131  const size_t step_idx,
132  const CompilationOptions&,
133  const ExecutionOptions&,
134  RenderInfo*,
135  const int64_t queue_time_ms);
136 
137  void executeUpdateViaCompound(const RelCompound* compound,
138  const CompilationOptions& co,
139  const ExecutionOptions& eo,
140  RenderInfo* render_info,
141  const int64_t queue_time_ms);
143  const CompilationOptions&,
144  const ExecutionOptions&,
145  RenderInfo*,
146  const int64_t queue_time_ms);
147 
148  void executeDeleteViaCompound(const RelCompound* compound,
149  const CompilationOptions& co,
150  const ExecutionOptions& eo,
151  RenderInfo* render_info,
152  const int64_t queue_time_ms);
154  const CompilationOptions&,
155  const ExecutionOptions&,
156  RenderInfo*,
157  const int64_t queue_time_ms);
158 
160  const CompilationOptions&,
161  const ExecutionOptions&,
162  RenderInfo*,
163  const int64_t queue_time_ms);
164 
166  const CompilationOptions& co,
167  const ExecutionOptions& eo,
168  RenderInfo* render_info,
169  const int64_t queue_time_ms);
170 
172  const CompilationOptions&,
173  const ExecutionOptions&,
174  RenderInfo*,
175  const int64_t queue_time_ms,
176  const ssize_t previous_count);
177 
179  const CompilationOptions&,
180  const ExecutionOptions&,
181  const int64_t queue_time_ms);
182 
183  // Computes the window function results to be used by the query.
184  void computeWindow(const RelAlgExecutionUnit& ra_exe_unit,
185  const CompilationOptions& co,
186  const ExecutionOptions& eo,
187  ColumnCacheMap& column_cache_map,
188  const int64_t queue_time_ms);
189 
190  // Creates the window context for the given window function.
191  std::unique_ptr<WindowFunctionContext> createWindowFunctionContext(
192  const Analyzer::WindowFunction* window_func,
193  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
194  const RelAlgExecutionUnit& ra_exe_unit,
195  const std::vector<InputTableInfo>& query_infos,
196  const CompilationOptions& co,
197  ColumnCacheMap& column_cache_map);
198 
200  const CompilationOptions&,
201  const ExecutionOptions&,
202  RenderInfo*,
203  const int64_t queue_time_ms);
204 
206  const CompilationOptions&,
207  const ExecutionOptions&,
208  RenderInfo*,
209  const int64_t queue_time_ms);
210 
212  ExecutionResult executeModify(const RelModify* modify, const ExecutionOptions& eo);
213 
214  // TODO(alex): just move max_groups_buffer_entry_guess to RelAlgExecutionUnit once
215  // we deprecate the plan-based executor paths and remove WorkUnit
216  struct WorkUnit {
218  const RelAlgNode* body;
220  std::unique_ptr<QueryRewriter> query_rewriter;
221  const std::vector<size_t> input_permutation;
222  const std::vector<size_t> left_deep_join_input_sizes;
223  };
224 
227  const RelAlgNode* body;
228  };
229 
230  WorkUnit createSortInputWorkUnit(const RelSort*, const bool just_explain);
231 
232  ExecutionResult executeWorkUnit(const WorkUnit& work_unit,
233  const std::vector<TargetMetaInfo>& targets_meta,
234  const bool is_agg,
235  const CompilationOptions& co_in,
236  const ExecutionOptions& eo,
237  RenderInfo*,
238  const int64_t queue_time_ms,
239  const ssize_t previous_count = -1);
240 
241  size_t getNDVEstimation(const WorkUnit& work_unit,
242  const bool is_agg,
243  const CompilationOptions& co,
244  const ExecutionOptions& eo);
245 
246  ssize_t getFilteredCountAll(const WorkUnit& work_unit,
247  const bool is_agg,
248  const CompilationOptions& co,
249  const ExecutionOptions& eo);
250 
252  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
253  const CompilationOptions& co,
254  const ExecutionOptions& eo);
255 
256  std::vector<PushedDownFilterInfo> selectFiltersToBePushedDown(
257  const RelAlgExecutor::WorkUnit& work_unit,
258  const CompilationOptions& co,
259  const ExecutionOptions& eo);
260 
261  bool isRowidLookup(const WorkUnit& work_unit);
262 
264  const std::vector<TargetMetaInfo>& targets_meta,
265  const bool is_agg,
266  const CompilationOptions& co,
267  const ExecutionOptions& eo,
268  RenderInfo* render_info,
269  const bool was_multifrag_kernel_launch,
270  const int64_t queue_time_ms);
271 
272  // Allows an out of memory error through if CPU retry is enabled. Otherwise, throws an
273  // appropriate exception corresponding to the query error code.
274  static void handlePersistentError(const int32_t error_code);
275 
276  WorkUnit createWorkUnit(const RelAlgNode*, const SortInfo&, const bool just_explain);
277 
279  const SortInfo& sort_info,
280  const bool just_explain);
282  const SortInfo&,
283  const bool just_explain);
284 
286  const SortInfo&,
287  const bool just_explain);
288 
290  const SortInfo& sort_info,
291  const bool just_explain);
292 
294  const SortInfo&,
295  const bool just_explain);
296 
298  const SortInfo&,
299  const bool just_explain);
300 
301  WorkUnit createJoinWorkUnit(const RelJoin*, const SortInfo&, const bool just_explain);
302 
304  const bool just_explain);
305 
306  void addTemporaryTable(const int table_id, const ResultSetPtr& result) {
307  CHECK_LT(size_t(0), result->colCount());
308  CHECK_LT(table_id, 0);
309  const auto it_ok = temporary_tables_.emplace(table_id, result);
310  CHECK(it_ok.second);
311  }
312 
313  void eraseFromTemporaryTables(const int table_id) { temporary_tables_.erase(table_id); }
314 
315  void handleNop(RaExecutionDesc& ed);
316 
318  const RelLeftDeepInnerJoin* join,
319  const std::vector<InputDescriptor>& input_descs,
320  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
321  const bool just_explain);
322 
323  // Transform the provided `join_condition` to conjunctive form, find composite
324  // key opportunities and finally translate it to an Analyzer expression.
325  std::list<std::shared_ptr<Analyzer::Expr>> makeJoinQuals(
326  const RexScalar* join_condition,
327  const std::vector<JoinType>& join_types,
328  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
329  const bool just_explain) const;
330 
331  Executor* executor_;
334  time_t now_;
335  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_; // TODO(alex): remove
336  std::vector<std::shared_ptr<RexSubQuery>> subqueries_;
337  std::unordered_map<unsigned, AggregatedResult> leaf_results_;
338  int64_t queue_time_ms_;
340  static const size_t max_groups_buffer_entry_default_guess{16384};
341 
343 };
344 
345 #endif // QUERYENGINE_RELALGEXECUTOR_H
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat)
void registerSubquery(std::shared_ptr< RexSubQuery > subquery) noexcept
bool is_agg(const Analyzer::Expr *expr)
ExecutionResult executeRelAlgQueryNoRetry(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
int64_t queue_time_ms_
ExecutionResult executeAggregate(const RelAggregate *aggregate, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
ExecutionResult executeProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count)
const std::vector< size_t > left_deep_join_input_sizes
TableFunctionWorkUnit createTableFunctionWorkUnit(const RelTableFunction *table_func, const bool just_explain)
void addLeafResult(const unsigned id, const AggregatedResult &result)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
WorkUnit createProjectWorkUnit(const RelProject *, const SortInfo &, const bool just_explain)
AggregatedColRange computeColRangesCache(const RelAlgNode *ra)
ExecutionResult executeRelAlgSeq(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms, const bool with_existing_temp_tables=false)
std::vector< std::shared_ptr< RexSubQuery > > subqueries_
WorkUnit createAggregateWorkUnit(const RelAggregate *, const SortInfo &, const bool just_explain)
const RelAlgNode * body
WorkUnit createWorkUnit(const RelAlgNode *, const SortInfo &, const bool just_explain)
Streaming Top N algorithm.
static SpeculativeTopNBlacklist speculative_topn_blacklist_
RelAlgExecutionUnit exe_unit
TemporaryTables temporary_tables_
static const size_t max_groups_buffer_entry_default_guess
TableGenerations computeTableGenerations(const RelAlgNode *ra)
ExecutionResult executeRelAlgQueryWithFilterPushDown(const RaExecutionSequence &seq, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
FilterSelectivity getFilterSelectivity(const std::vector< std::shared_ptr< Analyzer::Expr >> &filter_expressions, const CompilationOptions &co, const ExecutionOptions &eo)
std::string join(T const &container, std::string const &delim)
std::vector< PushedDownFilterInfo > selectFiltersToBePushedDown(const RelAlgExecutor::WorkUnit &work_unit, const CompilationOptions &co, const ExecutionOptions &eo)
ExecutionResult executeModify(const RelModify *modify, const ExecutionOptions &eo)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::unique_ptr< QueryRewriter > query_rewriter
std::unordered_map< unsigned, AggregatedResult > leaf_results_
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
WorkUnit createCompoundWorkUnit(const RelCompound *, const SortInfo &, const bool just_explain)
ExecutionResult executeLogicalValues(const RelLogicalValues *, const ExecutionOptions &)
ExecutionResult executeFilter(const RelFilter *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
size_t getNDVEstimation(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
void handleNop(RaExecutionDesc &ed)
void computeWindow(const RelAlgExecutionUnit &ra_exe_unit, const CompilationOptions &co, const ExecutionOptions &eo, ColumnCacheMap &column_cache_map, const int64_t queue_time_ms)
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
WorkUnit createModifyProjectWorkUnit(const RelProject *project, const SortInfo &sort_info, const bool just_explain)
CHECK(cgen_state)
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
MergeType
A container for relational algebra descriptors defining the execution order for a relational algebra ...
const Catalog_Namespace::Catalog & cat_
friend class PendingExecutionClosure
ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const bool was_multifrag_kernel_launch, const int64_t queue_time_ms)
ExecutionResult executeRelAlgSubSeq(const RaExecutionSequence &seq, const std::pair< size_t, size_t > interval, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
WorkUnit createModifyCompoundWorkUnit(const RelCompound *compound, const SortInfo &sort_info, const bool just_explain)
const std::vector< size_t > input_permutation
void executeRelAlgStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
Executor * getExecutor() const
ExecutionResult executeTableFunction(const RelTableFunction *, const CompilationOptions &, const ExecutionOptions &, const int64_t queue_time_ms)
#define CHECK_LT(x, y)
Definition: Logger.h:200
ssize_t getFilteredCountAll(const WorkUnit &work_unit, const bool is_agg, const CompilationOptions &co, const ExecutionOptions &eo)
WorkUnit createJoinWorkUnit(const RelJoin *, const SortInfo &, const bool just_explain)
Speculative top N algorithm.
void executeDeleteViaCompound(const RelCompound *compound, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
void executeUpdateViaProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
void eraseFromTemporaryTables(const int table_id)
ExecutionResult executeSort(const RelSort *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
bool isRowidLookup(const WorkUnit &work_unit)
TableFunctionExecutionUnit exe_unit
ExecutionResult result
static void handlePersistentError(const int32_t error_code)
void executeDeleteViaProject(const RelProject *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
const size_t max_groups_buffer_entry_guess
bool g_skip_intermediate_count
ExecutionResult executeWorkUnit(const WorkUnit &work_unit, const std::vector< TargetMetaInfo > &targets_meta, const bool is_agg, const CompilationOptions &co_in, const ExecutionOptions &eo, RenderInfo *, const int64_t queue_time_ms, const ssize_t previous_count=-1)
void executeUpdateViaCompound(const RelCompound *compound, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info, const int64_t queue_time_ms)
const MergeType merge_type
ExecutionResult executeCompound(const RelCompound *, const CompilationOptions &, const ExecutionOptions &, RenderInfo *, const int64_t queue_time_ms)
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
static std::string getErrorMessageFromCode(const int32_t error_code)
WorkUnit createSortInputWorkUnit(const RelSort *, const bool just_explain)
void cleanupPostExecution()
std::unique_ptr< WindowFunctionContext > createWindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< Analyzer::BinOper > &partition_key_cond, const RelAlgExecutionUnit &ra_exe_unit, const std::vector< InputTableInfo > &query_infos, const CompilationOptions &co, ColumnCacheMap &column_cache_map)
ExecutionResult executeRelAlgQuery(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
std::list< std::shared_ptr< Analyzer::Expr > > makeJoinQuals(const RexScalar *join_condition, const std::vector< JoinType > &join_types, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain) const
specifies the content in-memory of a row in the table metadata table
FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionSequence &seq, const size_t step_idx, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
Executor * executor_
WorkUnit createFilterWorkUnit(const RelFilter *, const SortInfo &, const bool just_explain)
JoinQualsPerNestingLevel translateLeftDeepJoinFilter(const RelLeftDeepInnerJoin *join, const std::vector< InputDescriptor > &input_descs, const std::unordered_map< const RelAlgNode *, int > &input_to_nest_level, const bool just_explain)
void prepareLeafExecution(const AggregatedColRange &agg_col_range, const StringDictionaryGenerations &string_dictionary_generations, const TableGenerations &table_generations)
StringDictionaryGenerations computeStringDictionaryGenerations(const RelAlgNode *ra)