OmniSciDB  04ee39c94c
RelAlgExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef QUERYENGINE_RELALGEXECUTOR_H
18 #define QUERYENGINE_RELALGEXECUTOR_H
19 
20 #include "../Shared/scope.h"
23 #include "Execute.h"
24 #include "InputMetadata.h"
25 #include "JoinFilterPushDown.h"
26 #include "QueryRewrite.h"
27 #include "SpeculativeTopN.h"
28 #include "StreamingTopN.h"
29 
30 #include <ctime>
31 #include <sstream>
32 
33 #include "StorageIOFacility.h"
34 
35 extern bool g_skip_intermediate_count;
36 
37 enum class MergeType { Union, Reduce };
38 
42  const unsigned node_id;
44 };
45 
47  using ExecutorType = Executor;
50 };
51 
52 class RelAlgExecutor : private StorageIOFacility<RelAlgExecutorTraits> {
53  public:
54  using TargetInfoList = std::vector<TargetInfo>;
55 
56  RelAlgExecutor(Executor* executor, const Catalog_Namespace::Catalog& cat)
57  : StorageIOFacility(executor, cat)
58  , executor_(executor)
59  , cat_(cat)
60  , now_(0)
61  , queue_time_ms_(0) {}
62 
63  ExecutionResult executeRelAlgQuery(const std::string& query_ra,
64  const CompilationOptions& co,
65  const ExecutionOptions& eo,
66  RenderInfo* render_info);
67 
68  ExecutionResult executeRelAlgQueryWithFilterPushDown(
69  std::vector<RaExecutionDesc>& ed_list,
70  const CompilationOptions& co,
71  const ExecutionOptions& eo,
72  RenderInfo* render_info,
73  const int64_t queue_time_ms);
74 
75  FirstStepExecutionResult executeRelAlgQuerySingleStep(const RaExecutionDesc& exec_desc,
76  const CompilationOptions& co,
77  const ExecutionOptions& eo,
78  RenderInfo* render_info);
79 
80  void prepareLeafExecution(
81  const AggregatedColRange& agg_col_range,
82  const StringDictionaryGenerations& string_dictionary_generations,
83  const TableGenerations& table_generations);
84 
85  ExecutionResult executeRelAlgSubQuery(const RexSubQuery* subquery,
86  const CompilationOptions& co,
87  const ExecutionOptions& eo);
88 
89  ExecutionResult executeRelAlgSeq(std::vector<RaExecutionDesc>& ed_list,
90  const CompilationOptions& co,
91  const ExecutionOptions& eo,
92  RenderInfo* render_info,
93  const int64_t queue_time_ms,
94  const bool with_existing_temp_tables = false);
95 
96  ExecutionResult executeRelAlgSubSeq(std::vector<RaExecutionDesc>::iterator start_desc,
97  std::vector<RaExecutionDesc>::iterator end_desc,
98  const CompilationOptions& co,
99  const ExecutionOptions& eo,
100  RenderInfo* render_info,
101  const int64_t queue_time_ms);
102 
103  void addLeafResult(const unsigned id, const AggregatedResult& result) {
104  const auto it_ok = leaf_results_.emplace(id, result);
105  CHECK(it_ok.second);
106  }
107 
108  void registerSubquery(std::shared_ptr<RexSubQuery> subquery) noexcept {
109  subqueries_.push_back(subquery);
110  }
111 
112  const std::vector<std::shared_ptr<RexSubQuery>>& getSubqueries() const noexcept {
113  return subqueries_;
114  };
115 
116  AggregatedColRange computeColRangesCache(const RelAlgNode* ra);
117 
118  StringDictionaryGenerations computeStringDictionaryGenerations(const RelAlgNode* ra);
119 
120  TableGenerations computeTableGenerations(const RelAlgNode* ra);
121 
122  Executor* getExecutor() const;
123 
124  void cleanupPostExecution();
125 
126  static std::string getErrorMessageFromCode(const int32_t error_code);
127 
128  private:
129  ExecutionResult executeRelAlgQueryNoRetry(const std::string& query_ra,
130  const CompilationOptions& co,
131  const ExecutionOptions& eo,
132  RenderInfo* render_info);
133 
134  void executeRelAlgStep(const size_t step_idx,
135  std::vector<RaExecutionDesc>::iterator exec_desc_itr,
136  const CompilationOptions&,
137  const ExecutionOptions&,
138  RenderInfo*,
139  const int64_t queue_time_ms);
140 
141  void executeUpdateViaCompound(const RelCompound* compound,
142  const CompilationOptions& co,
143  const ExecutionOptions& eo,
144  RenderInfo* render_info,
145  const int64_t queue_time_ms);
146  void executeUpdateViaProject(const RelProject*,
147  const CompilationOptions&,
148  const ExecutionOptions&,
149  RenderInfo*,
150  const int64_t queue_time_ms);
151 
152  void executeDeleteViaCompound(const RelCompound* compound,
153  const CompilationOptions& co,
154  const ExecutionOptions& eo,
155  RenderInfo* render_info,
156  const int64_t queue_time_ms);
157  void executeDeleteViaProject(const RelProject*,
158  const CompilationOptions&,
159  const ExecutionOptions&,
160  RenderInfo*,
161  const int64_t queue_time_ms);
162 
163  ExecutionResult executeCompound(const RelCompound*,
164  const CompilationOptions&,
165  const ExecutionOptions&,
166  RenderInfo*,
167  const int64_t queue_time_ms);
168 
169  ExecutionResult executeAggregate(const RelAggregate* aggregate,
170  const CompilationOptions& co,
171  const ExecutionOptions& eo,
172  RenderInfo* render_info,
173  const int64_t queue_time_ms);
174 
175  ExecutionResult executeProject(const RelProject*,
176  const CompilationOptions&,
177  const ExecutionOptions&,
178  RenderInfo*,
179  const int64_t queue_time_ms,
180  const ssize_t previous_count);
181 
182  // Computes the window function results to be used by the query.
183  void computeWindow(const RelAlgExecutionUnit& ra_exe_unit,
184  const CompilationOptions& co,
185  const ExecutionOptions& eo,
186  ColumnCacheMap& column_cache_map,
187  const int64_t queue_time_ms);
188 
189  // Creates the window context for the given window function.
190  std::unique_ptr<WindowFunctionContext> createWindowFunctionContext(
191  const Analyzer::WindowFunction* window_func,
192  const std::shared_ptr<Analyzer::BinOper>& partition_key_cond,
193  const RelAlgExecutionUnit& ra_exe_unit,
194  const std::vector<InputTableInfo>& query_infos,
195  const CompilationOptions& co,
196  ColumnCacheMap& column_cache_map);
197 
198  ExecutionResult executeFilter(const RelFilter*,
199  const CompilationOptions&,
200  const ExecutionOptions&,
201  RenderInfo*,
202  const int64_t queue_time_ms);
203 
204  ExecutionResult executeSort(const RelSort*,
205  const CompilationOptions&,
206  const ExecutionOptions&,
207  RenderInfo*,
208  const int64_t queue_time_ms);
209 
210  ExecutionResult executeLogicalValues(const RelLogicalValues*, const ExecutionOptions&);
211  ExecutionResult executeModify(const RelModify* modify, const ExecutionOptions& eo);
212 
213  // TODO(alex): just move max_groups_buffer_entry_guess to RelAlgExecutionUnit once
214  // we deprecate the plan-based executor paths and remove WorkUnit
215  struct WorkUnit {
217  const RelAlgNode* body;
219  std::unique_ptr<QueryRewriter> query_rewriter;
220  const std::vector<size_t> input_permutation;
221  const std::vector<size_t> left_deep_join_input_sizes;
222  };
223 
224  WorkUnit createSortInputWorkUnit(const RelSort*, const bool just_explain);
225 
226  ExecutionResult executeWorkUnit(const WorkUnit& work_unit,
227  const std::vector<TargetMetaInfo>& targets_meta,
228  const bool is_agg,
229  const CompilationOptions& co_in,
230  const ExecutionOptions& eo,
231  RenderInfo*,
232  const int64_t queue_time_ms,
233  const ssize_t previous_count = -1);
234 
235  size_t getNDVEstimation(const WorkUnit& work_unit,
236  const bool is_agg,
237  const CompilationOptions& co,
238  const ExecutionOptions& eo);
239 
240  ssize_t getFilteredCountAll(const WorkUnit& work_unit,
241  const bool is_agg,
242  const CompilationOptions& co,
243  const ExecutionOptions& eo);
244 
245  FilterSelectivity getFilterSelectivity(
246  const std::vector<std::shared_ptr<Analyzer::Expr>>& filter_expressions,
247  const CompilationOptions& co,
248  const ExecutionOptions& eo);
249 
250  std::vector<PushedDownFilterInfo> selectFiltersToBePushedDown(
251  const RelAlgExecutor::WorkUnit& work_unit,
252  const CompilationOptions& co,
253  const ExecutionOptions& eo);
254 
255  bool isRowidLookup(const WorkUnit& work_unit);
256 
257  ExecutionResult handleOutOfMemoryRetry(const RelAlgExecutor::WorkUnit& work_unit,
258  const std::vector<TargetMetaInfo>& targets_meta,
259  const bool is_agg,
260  const CompilationOptions& co,
261  const ExecutionOptions& eo,
262  RenderInfo* render_info,
263  const bool was_multifrag_kernel_launch,
264  const int64_t queue_time_ms);
265 
266  // Allows an out of memory error through if CPU retry is enabled. Otherwise, throws an
267  // appropriate exception corresponding to the query error code.
268  static void handlePersistentError(const int32_t error_code);
269 
270  WorkUnit createWorkUnit(const RelAlgNode*, const SortInfo&, const bool just_explain);
271 
272  WorkUnit createModifyCompoundWorkUnit(const RelCompound* compound,
273  const SortInfo& sort_info,
274  const bool just_explain);
275  WorkUnit createCompoundWorkUnit(const RelCompound*,
276  const SortInfo&,
277  const bool just_explain);
278 
279  WorkUnit createAggregateWorkUnit(const RelAggregate*,
280  const SortInfo&,
281  const bool just_explain);
282 
283  WorkUnit createModifyProjectWorkUnit(const RelProject* project,
284  const SortInfo& sort_info,
285  const bool just_explain);
286 
287  WorkUnit createProjectWorkUnit(const RelProject*,
288  const SortInfo&,
289  const bool just_explain);
290 
291  WorkUnit createFilterWorkUnit(const RelFilter*,
292  const SortInfo&,
293  const bool just_explain);
294 
295  WorkUnit createJoinWorkUnit(const RelJoin*, const SortInfo&, const bool just_explain);
296 
297  void addTemporaryTable(const int table_id, const ResultSetPtr& result) {
298  CHECK_LT(size_t(0), result->colCount());
299  CHECK_LT(table_id, 0);
300  const auto it_ok = temporary_tables_.emplace(table_id, result);
301  CHECK(it_ok.second);
302  }
303 
304  void eraseFromTemporaryTables(const int table_id) { temporary_tables_.erase(table_id); }
305 
306  void handleNop(RaExecutionDesc& ed);
307 
308  JoinQualsPerNestingLevel translateLeftDeepJoinFilter(
309  const RelLeftDeepInnerJoin* join,
310  const std::vector<InputDescriptor>& input_descs,
311  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
312  const bool just_explain);
313 
314  // Transform the provided `join_condition` to conjunctive form, find composite
315  // key opportunities and finally translate it to an Analyzer expression.
316  std::list<std::shared_ptr<Analyzer::Expr>> makeJoinQuals(
317  const RexScalar* join_condition,
318  const std::vector<JoinType>& join_types,
319  const std::unordered_map<const RelAlgNode*, int>& input_to_nest_level,
320  const bool just_explain) const;
321 
322  Executor* executor_;
325  time_t now_;
326  std::vector<std::shared_ptr<Analyzer::Expr>> target_exprs_owned_; // TODO(alex): remove
327  std::vector<std::shared_ptr<RexSubQuery>> subqueries_;
328  std::unordered_map<unsigned, AggregatedResult> leaf_results_;
329  int64_t queue_time_ms_;
331  static const size_t max_groups_buffer_entry_default_guess{16384};
332 
333  friend class PendingExecutionClosure;
334 };
335 
336 #endif // QUERYENGINE_RELALGEXECUTOR_H
RelAlgExecutor(Executor *executor, const Catalog_Namespace::Catalog &cat)
void registerSubquery(std::shared_ptr< RexSubQuery > subquery) noexcept
int64_t queue_time_ms_
const std::vector< size_t > left_deep_join_input_sizes
void addLeafResult(const unsigned id, const AggregatedResult &result)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:81
std::vector< std::shared_ptr< RexSubQuery > > subqueries_
const RelAlgNode * body
Streaming Top N algorithm.
static SpeculativeTopNBlacklist speculative_topn_blacklist_
RelAlgExecutionUnit exe_unit
TemporaryTables temporary_tables_
std::string join(T const &container, std::string const &delim)
void addTemporaryTable(const int table_id, const ResultSetPtr &result)
std::unique_ptr< QueryRewriter > query_rewriter
std::unordered_map< unsigned, AggregatedResult > leaf_results_
std::vector< TargetInfo > TargetInfoList
std::vector< JoinCondition > JoinQualsPerNestingLevel
std::shared_ptr< ResultSet > ResultSetPtr
const std::vector< std::shared_ptr< RexSubQuery > > & getSubqueries() const noexcept
bool g_skip_intermediate_count
std::unordered_map< int, const ResultSetPtr & > TemporaryTables
Definition: InputMetadata.h:31
std::vector< std::shared_ptr< Analyzer::Expr > > target_exprs_owned_
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t uint32_t const int64_t int32_t * error_code
MergeType
const Catalog_Namespace::Catalog & cat_
const std::vector< size_t > input_permutation
#define CHECK_LT(x, y)
Definition: Logger.h:197
Speculative top N algorithm.
void eraseFromTemporaryTables(const int table_id)
ExecutionResult result
const size_t max_groups_buffer_entry_guess
#define CHECK(condition)
Definition: Logger.h:187
const MergeType merge_type
std::unordered_map< int, std::unordered_map< int, std::shared_ptr< const ColumnarResults > > > ColumnCacheMap
specifies the content in-memory of a row in the table metadata table
Executor * executor_