OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
20 #include "QueryEngine/RelAlgDag.h"
21 
22 #include <boost/graph/topological_sort.hpp>
23 
24 #include <algorithm>
25 
27  : filter_push_down_enabled_(false)
28  , success_(true)
29  , execution_time_ms_(0)
30  , type_(QueryResult) {}
31 
32 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
33  const std::vector<TargetMetaInfo>& targets_meta)
34  : result_(rows)
35  , targets_meta_(targets_meta)
36  , filter_push_down_enabled_(false)
37  , success_(true)
38  , execution_time_ms_(0)
39  , type_(QueryResult) {}
40 
42  const std::vector<TargetMetaInfo>& targets_meta)
43  : result_(std::move(result))
44  , targets_meta_(targets_meta)
45  , filter_push_down_enabled_(false)
46  , success_(true)
47  , execution_time_ms_(0)
48  , type_(QueryResult) {}
49 
51  : targets_meta_(that.targets_meta_)
52  , pushed_down_filter_info_(that.pushed_down_filter_info_)
53  , filter_push_down_enabled_(that.filter_push_down_enabled_)
54  , success_(true)
55  , execution_time_ms_(0)
56  , type_(QueryResult) {
57  if (!pushed_down_filter_info_.empty() ||
59  return;
60  }
61  result_ = that.result_;
62 }
63 
65  : targets_meta_(std::move(that.targets_meta_))
66  , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
67  , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_))
68  , success_(true)
69  , execution_time_ms_(0)
70  , type_(QueryResult) {
71  if (!pushed_down_filter_info_.empty() ||
73  return;
74  }
75  result_ = std::move(that.result_);
76 }
77 
79  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
80  bool filter_push_down_enabled)
81  : pushed_down_filter_info_(pushed_down_filter_info)
82  , filter_push_down_enabled_(filter_push_down_enabled)
83  , success_(true)
84  , execution_time_ms_(0)
85  , type_(QueryResult) {}
86 
88  if (!that.pushed_down_filter_info_.empty() ||
89  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
92  return *this;
93  }
94  result_ = that.result_;
96  success_ = that.success_;
98  type_ = that.type_;
99  return *this;
100 }
101 
102 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
103  const {
105 }
106 
107 void ExecutionResult::updateResultSet(const std::string& query,
108  RType type,
109  bool success) {
110  targets_meta_.clear();
111  pushed_down_filter_info_.clear();
112  success_ = success;
113  type_ = type;
114  result_ = std::make_shared<ResultSet>(query);
115 }
116 
118  if (!empty()) {
119  return getRows()->getExplanation();
120  }
121  return {};
122 }
123 
124 void RaExecutionDesc::setResult(const ExecutionResult& result) {
125  result_ = result;
126  body_->setContextData(this);
127 }
128 
130  return body_;
131 }
132 
133 namespace {
134 
136  std::unordered_map<const RelAlgNode*, int>& node_ptr_to_vert_idx) {
137  DAG graph(1);
138  graph[0] = sink;
139  node_ptr_to_vert_idx.emplace(std::make_pair(sink, 0));
140  std::vector<const RelAlgNode*> stack(1, sink);
141  while (!stack.empty()) {
142  const auto node = stack.back();
143  stack.pop_back();
144  if (dynamic_cast<const RelScan*>(node)) {
145  continue;
146  }
147 
148  const auto input_num = node->inputCount();
149  switch (input_num) {
150  case 0:
151  CHECK(dynamic_cast<const RelLogicalValues*>(node) ||
152  dynamic_cast<const RelTableFunction*>(node));
153  case 1:
154  break;
155  case 2:
156  CHECK(dynamic_cast<const RelJoin*>(node) ||
157  dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
158  dynamic_cast<const RelLogicalUnion*>(node) ||
159  dynamic_cast<const RelTableFunction*>(node));
160  break;
161  default:
162  CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
163  dynamic_cast<const RelLogicalUnion*>(node) ||
164  dynamic_cast<const RelTableFunction*>(node));
165  }
166  for (size_t i = 0; i < input_num; ++i) {
167  const auto input = node->getInput(i);
168  CHECK(input);
169  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
170  if (!visited) {
171  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
172  }
173  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
174  if (!visited) {
175  graph[node_ptr_to_vert_idx[input]] = input;
176  stack.push_back(input);
177  }
178  }
179  }
180  return graph;
181 }
182 
183 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
184  const DAG& graph) {
185  std::unordered_set<Vertex> joins;
186  for (const auto vert : vertices) {
187  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
188  joins.insert(vert);
189  continue;
190  }
191  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
192  continue;
193  }
194  if (boost::out_degree(vert, graph) > 1) {
195  throw std::runtime_error("Join used more than once not supported yet");
196  }
197  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
198  CHECK(std::next(oe_iter) == oe_end);
199  const auto out_vert = boost::target(*oe_iter, graph);
200  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
201  joins.insert(vert);
202  }
203  }
204  return joins;
205 }
206 
207 } // namespace
208 
210  Executor* executor,
211  const bool build_sequence) {
212  CHECK(sink);
213  CHECK(executor);
214  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
215  throw std::runtime_error("Query not supported yet");
216  }
217  executor_ = executor;
219 
220  boost::topological_sort(graph_, std::back_inserter(ordering_));
221  std::reverse(ordering_.begin(), ordering_.end());
222 
223  ordering_ = mergeSortWithInput(ordering_, graph_);
224  joins_ = get_join_vertices(ordering_, graph_);
225 
226  if (build_sequence) {
227  while (next()) {
228  // noop
229  }
233  skipQuerySteps();
234  }
235  }
236 }
237 
238 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
239  descs_.emplace_back(std::move(exec_desc));
240 }
241 
243  auto checkQueryStepSkippable =
244  [&](RelAlgNode const* node, QueryPlanHash key, size_t step_id) {
245  CHECK_GE(step_id, 0);
246  if (executor_->getRecultSetRecyclerHolder().hasCachedQueryResultSet(key)) {
247  cached_query_steps_.insert(step_id);
248  cached_resultset_keys_.emplace(-node->getId(), key);
249  const auto output_meta_info =
250  executor_->getRecultSetRecyclerHolder().getOutputMetaInfo(key);
251  CHECK(output_meta_info);
252  node->setOutputMetainfo(*output_meta_info);
253  }
254  };
255  while (current_vertex_ < ordering_.size()) {
256  auto vert = ordering_[current_vertex_++];
257  if (joins_.count(vert)) {
258  continue;
259  }
260  auto& node = graph_[vert];
261  CHECK(node);
262  if (dynamic_cast<const RelScan*>(node)) {
263  scan_count_++;
264  continue;
265  }
266  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
267  auto logical_union = dynamic_cast<const RelLogicalUnion*>(node);
268  if (logical_union) {
269  has_step_for_union_ = true;
270  }
271  auto extracted_query_plan_dag =
274  !boost::iequals(extracted_query_plan_dag.extracted_dag, EMPTY_QUERY_PLAN)) {
275  checkQueryStepSkippable(node, node->getQueryPlanDagHash(), descs_.size() - 1);
276  }
277  return descs_.back().get();
278  }
279  return nullptr;
280 }
281 
283  const std::vector<Vertex>& vertices,
284  const DAG& graph) {
285  DAG::in_edge_iterator ie_iter, ie_end;
286  std::unordered_set<Vertex> inputs;
287  for (const auto vert : vertices) {
288  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
289  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
290  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
291  if (sort->isLimitDelivered()) {
292  has_limit_clause_ = true;
293  }
294  const auto in_vert = boost::source(*ie_iter, graph);
295  const auto input = graph[in_vert];
296  if (dynamic_cast<const RelScan*>(input)) {
297  throw std::runtime_error("Standalone sort not supported yet");
298  }
299  if (boost::out_degree(in_vert, graph) > 1) {
300  throw std::runtime_error("Sort's input node used by others not supported yet");
301  }
302  inputs.insert(in_vert);
303  }
304  }
305 
306  std::vector<Vertex> new_vertices;
307  for (const auto vert : vertices) {
308  if (inputs.count(vert)) {
309  continue;
310  }
311  new_vertices.push_back(vert);
312  }
313  return new_vertices;
314 }
315 
317  if (descs_.empty()) {
318  return nullptr;
319  }
320  if (descs_.size() == 1) {
321  return nullptr;
322  }
323  CHECK_GE(descs_.size(), size_t(2));
324  auto itr = descs_.rbegin();
325  return (++itr)->get();
326 }
327 
329  const auto pushChildNodes = [&](auto& stack, const auto node_id) {
330  auto [in_edge_iter, in_edge_end] = boost::in_edges(node_id, graph_);
331  while (in_edge_iter != in_edge_end) {
332  const auto child_node_id = boost::source(*in_edge_iter, graph_);
333  stack.push(graph_[child_node_id]);
334  std::advance(in_edge_iter, 1);
335  }
336  };
337  auto top_node_id = descs_.size() - 1;
338  auto top_res = skippable_steps_.emplace(top_node_id, std::unordered_set<int>{});
339  CHECK(top_res.second);
340  for (auto it = descs_.begin(); it != std::prev(descs_.end()); ++it) {
341  const auto step = it->get();
342  const auto body = step->getBody();
343  const auto step_id = static_cast<int>(std::distance(descs_.begin(), it));
344  auto res = skippable_steps_.emplace(step_id, std::unordered_set<int>{});
345  CHECK(res.second);
346  skippable_steps_[top_node_id].insert(step_id); // top-desc can skip all child descs
347  std::stack<const RelAlgNode*> stack;
348  pushChildNodes(stack, node_ptr_to_vert_idx_[body]);
349  while (!stack.empty()) {
350  auto child_node = stack.top();
351  stack.pop();
352  // descs_ is based on the topologically sorted (flattened) DAG, so we can limit the
353  // search range for child descs
354  auto is_desc_body = std::find_if(
355  descs_.begin(), it, [&child_node](std::unique_ptr<RaExecutionDesc>& ptr) {
356  return ptr->getBody() == child_node;
357  });
358  if (is_desc_body != it) {
359  // due to the topological sorting of query plan DAG, we can avoid visiting "all"
360  // child nodes
361  const auto child_step_id =
362  static_cast<int>(std::distance(descs_.begin(), is_desc_body));
363  skippable_steps_[step_id].insert(child_step_id);
364  skippable_steps_[step_id].insert(skippable_steps_[child_step_id].begin(),
365  skippable_steps_[child_step_id].end());
366  } else {
367  pushChildNodes(stack, node_ptr_to_vert_idx_[child_node]);
368  }
369  }
370  }
371 }
372 
374  CHECK_LE(cached_query_steps_.size(), descs_.size());
375 
376  // extract skippable query steps`
377  std::unordered_set<int> skippable_query_steps;
378  for (const auto cached_step_id : cached_query_steps_) {
379  auto it = skippable_steps_.find(cached_step_id);
380  CHECK(it != skippable_steps_.end());
381  const auto& child_steps = it->second;
382  std::for_each(
383  child_steps.begin(), child_steps.end(), [&](const auto& skippable_step_id) {
384  if (skippable_step_id != cached_step_id) {
385  skippable_query_steps.insert(skippable_step_id);
386  }
387  });
388  }
389 
390  // modify query step sequence based on the skippable query steps
391  if (!skippable_query_steps.empty()) {
392  std::vector<std::unique_ptr<RaExecutionDesc>> new_descs;
393  for (size_t step_id = 0; step_id < descs_.size(); ++step_id) {
394  const auto body = descs_[step_id]->getBody();
395  if (!skippable_query_steps.count(step_id)) {
396  new_descs.push_back(std::make_unique<RaExecutionDesc>(body));
397  }
398  }
399  const auto prev_num_steps = descs_.size();
400  if (!new_descs.empty()) {
401  std::swap(descs_, new_descs);
402  }
403  VLOG(1) << "Skip " << prev_num_steps - descs_.size() << " query steps from "
404  << prev_num_steps << " steps";
405  }
406 
407  for (const auto& desc : descs_) {
408  // remove cached resultset info for each desc since
409  // it is not skipped
410  auto body = desc->getBody();
411  auto it = cached_resultset_keys_.find(-body->getId());
412  if (it != cached_resultset_keys_.end()) {
413  cached_resultset_keys_.erase(it);
414  }
415  }
416 }
417 
418 std::optional<size_t> RaExecutionSequence::nextStepId(const bool after_broadcast) const {
419  if (after_broadcast) {
420  if (current_vertex_ == ordering_.size()) {
421  return std::nullopt;
422  }
423  return descs_.size() + stepsToNextBroadcast();
424  } else if (current_vertex_ == ordering_.size()) {
425  return std::nullopt;
426  } else {
427  return descs_.size();
428  }
429 }
430 
432  if (current_vertex_ == ordering_.size()) {
433  // All descriptors visited, execution finished
434  return true;
435  } else {
436  const auto next_step_id = nextStepId(true);
437  if (!next_step_id || (*next_step_id == totalDescriptorsCount())) {
438  // One step remains (the current vertex), or all remaining steps can be executed
439  // without another broadcast (i.e. on the aggregator)
440  return true;
441  }
442  }
443  return false;
444 }
445 
446 namespace {
447 struct MatchBody {
448  unsigned const body_id_;
449  bool operator()(std::unique_ptr<RaExecutionDesc> const& desc) const {
450  return desc->getBody()->getId() == body_id_;
451  }
452 };
453 } // namespace
454 
455 // Search for RaExecutionDesc* by body, starting at start_idx and decrementing to 0.
457  unsigned const body_id,
458  size_t const start_idx) const {
459  CHECK_LT(start_idx, descs_.size());
460  auto const from_end = descs_.size() - (start_idx + 1);
461  MatchBody const match_body{body_id};
462  auto const itr = std::find_if(descs_.rbegin() + from_end, descs_.rend(), match_body);
463  return itr == descs_.rend() ? nullptr : itr->get();
464 }
465 
467  size_t num_descriptors = 0;
468  size_t crt_vertex = 0;
469  while (crt_vertex < ordering_.size()) {
470  auto vert = ordering_[crt_vertex++];
471  if (joins_.count(vert)) {
472  continue;
473  }
474  auto& node = graph_[vert];
475  CHECK(node);
476  if (dynamic_cast<const RelScan*>(node)) {
477  continue;
478  }
479  ++num_descriptors;
480  }
481  return num_descriptors;
482 }
483 
485  size_t steps_to_next_broadcast = 0;
486  auto crt_vertex = current_vertex_;
487  while (crt_vertex < ordering_.size()) {
488  auto vert = ordering_[crt_vertex++];
489  auto node = graph_[vert];
490  CHECK(node);
491  if (joins_.count(vert)) {
492  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
493  CHECK(join_node);
494  for (size_t i = 0; i < join_node->inputCount(); i++) {
495  const auto input = join_node->getInput(i);
496  if (dynamic_cast<const RelScan*>(input)) {
497  return steps_to_next_broadcast;
498  }
499  }
500  if (crt_vertex < ordering_.size() - 1) {
501  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
502  // Note that crt_vertex has already been incremented once above for the join node
503  // -- increment it again to account for the parent node of the join
504  ++steps_to_next_broadcast;
505  ++crt_vertex;
506  continue;
507  } else {
508  CHECK_EQ(crt_vertex, ordering_.size() - 1);
509  // If the join node parent is the last node in the tree, run all remaining steps
510  // on the aggregator
511  return ++steps_to_next_broadcast;
512  }
513  }
514  if (auto sort = dynamic_cast<const RelSort*>(node)) {
515  CHECK_EQ(sort->inputCount(), size_t(1));
516  node = sort->getInput(0);
517  }
518  if (dynamic_cast<const RelScan*>(node)) {
519  return steps_to_next_broadcast;
520  }
521  if (dynamic_cast<const RelModify*>(node)) {
522  // Modify runs on the leaf automatically, run the same node as a noop on the
523  // aggregator
524  ++steps_to_next_broadcast;
525  continue;
526  }
527  if (auto project = dynamic_cast<const RelProject*>(node)) {
528  if (project->hasWindowFunctionExpr()) {
529  ++steps_to_next_broadcast;
530  continue;
531  }
532  }
533  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
534  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
535  return steps_to_next_broadcast;
536  }
537  }
538  ++steps_to_next_broadcast;
539  }
540  return steps_to_next_broadcast;
541 }
#define CHECK_EQ(x, y)
Definition: Logger.h:230
static ExtractedQueryPlanDag extractQueryPlanDag(const RelAlgNode *top_node, Executor *executor)
bool g_use_query_resultset_cache
Definition: Execute.cpp:148
std::vector< Vertex > mergeSortWithInput(const std::vector< Vertex > &vertices, const DAG &graph)
bool g_allow_query_step_skipping
Definition: Execute.cpp:151
ExecutionResult & operator=(const ExecutionResult &that)
const RelAlgNode * body_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:235
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
bool g_enable_data_recycler
Definition: Execute.cpp:146
bool operator()(std::unique_ptr< RaExecutionDesc > const &desc) const
size_t getQueryPlanDagHash() const
Definition: RelAlgDag.h:808
DAG build_dag(const RelAlgNode *sink, std::unordered_map< const RelAlgNode *, int > &node_ptr_to_vert_idx)
std::unordered_set< int > cached_query_steps_
unsigned getId() const
Definition: RelAlgDag.h:814
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
const std::shared_ptr< ResultSet > & getRows() const
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
void setContextData(const RaExecutionDesc *context_data) const
Definition: RelAlgDag.h:790
bool g_enable_smem_group_by true
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
#define CHECK_LT(x, y)
Definition: Logger.h:232
#define CHECK_LE(x, y)
Definition: Logger.h:233
std::unordered_map< const RelAlgNode *, int > node_ptr_to_vert_idx_
RaExecutionSequence(const RelAlgNode *, Executor *, const bool build_sequence=true)
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
size_t QueryPlanHash
bool g_enable_watchdog false
Definition: Execute.cpp:79
constexpr char const * EMPTY_QUERY_PLAN
#define CHECK(condition)
Definition: Logger.h:222
void setOutputMetainfo(const std::vector< TargetMetaInfo > &targets_metainfo) const
Definition: RelAlgDag.h:795
const RelAlgNode * getBody() const
bool g_cluster
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
unsigned node_id(const rapidjson::Value &ra_node) noexcept
Definition: RelAlgDag.cpp:899
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
DEVICE void swap(ARGS &&...args)
Definition: gpu_enabled.h:114
std::unordered_map< int, std::unordered_set< int > > skippable_steps_
std::unordered_map< int, QueryPlanHash > cached_resultset_keys_
#define VLOG(n)
Definition: Logger.h:316
std::optional< size_t > nextStepId(const bool after_broadcast) const