OmniSciDB  8a228a1076
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/graph/topological_sort.hpp>
20 
23 
24 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
25  const std::vector<TargetMetaInfo>& targets_meta)
26  : result_(rows), targets_meta_(targets_meta), filter_push_down_enabled_(false) {}
27 
29  const std::vector<TargetMetaInfo>& targets_meta)
30  : targets_meta_(targets_meta), filter_push_down_enabled_(false) {
31  result_ = std::move(result);
32 }
33 
38  if (!pushed_down_filter_info_.empty() ||
40  return;
41  }
42  result_ = that.result_;
43 }
44 
46  : targets_meta_(std::move(that.targets_meta_))
49  if (!pushed_down_filter_info_.empty() ||
51  return;
52  }
53  result_ = std::move(that.result_);
54 }
55 
57  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
58  bool filter_push_down_enabled)
59  : pushed_down_filter_info_(pushed_down_filter_info)
60  , filter_push_down_enabled_(filter_push_down_enabled) {}
61 
63  if (!that.pushed_down_filter_info_.empty() ||
64  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
67  return *this;
68  }
69  result_ = that.result_;
71  return *this;
72 }
73 
74 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
75  const {
77 }
78 
80  result_ = result;
81  body_->setContextData(this);
82 }
83 
85  return body_;
86 }
87 
88 namespace {
89 
90 std::vector<Vertex> merge_sort_with_input(const std::vector<Vertex>& vertices,
91  const DAG& graph) {
92  DAG::in_edge_iterator ie_iter, ie_end;
93  std::unordered_set<Vertex> inputs;
94  for (const auto vert : vertices) {
95  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
96  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
97  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
98  const auto in_vert = boost::source(*ie_iter, graph);
99  const auto input = graph[in_vert];
100  if (dynamic_cast<const RelScan*>(input)) {
101  throw std::runtime_error("Standalone sort not supported yet");
102  }
103  if (boost::out_degree(in_vert, graph) > 1) {
104  throw std::runtime_error("Sort's input node used by others not supported yet");
105  }
106  inputs.insert(in_vert);
107  }
108  }
109 
110  std::vector<Vertex> new_vertices;
111  for (const auto vert : vertices) {
112  if (inputs.count(vert)) {
113  continue;
114  }
115  new_vertices.push_back(vert);
116  }
117  return new_vertices;
118 }
119 
120 DAG build_dag(const RelAlgNode* sink) {
121  DAG graph(1);
122  graph[0] = sink;
123  std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
124  std::make_pair(sink, 0)};
125  std::vector<const RelAlgNode*> stack(1, sink);
126  while (!stack.empty()) {
127  const auto node = stack.back();
128  stack.pop_back();
129  if (dynamic_cast<const RelScan*>(node)) {
130  continue;
131  }
132 
133  const auto input_num = node->inputCount();
134  switch (input_num) {
135  case 0:
136  CHECK(dynamic_cast<const RelLogicalValues*>(node));
137  case 1:
138  break;
139  case 2:
140  CHECK(dynamic_cast<const RelJoin*>(node) ||
141  dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
142  dynamic_cast<const RelLogicalUnion*>(node) ||
143  dynamic_cast<const RelTableFunction*>(node));
144  break;
145  default:
146  CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
147  dynamic_cast<const RelLogicalUnion*>(node) ||
148  dynamic_cast<const RelTableFunction*>(node));
149  }
150  for (size_t i = 0; i < input_num; ++i) {
151  const auto input = node->getInput(i);
152  CHECK(input);
153  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
154  if (!visited) {
155  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
156  }
157  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
158  if (!visited) {
159  graph[node_ptr_to_vert_idx[input]] = input;
160  stack.push_back(input);
161  }
162  }
163  }
164  return graph;
165 }
166 
167 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
168  const DAG& graph) {
169  std::unordered_set<Vertex> joins;
170  for (const auto vert : vertices) {
171  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
172  joins.insert(vert);
173  continue;
174  }
175  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
176  continue;
177  }
178  if (boost::out_degree(vert, graph) > 1) {
179  throw std::runtime_error("Join used more than once not supported yet");
180  }
181  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
182  CHECK(std::next(oe_iter) == oe_end);
183  const auto out_vert = boost::target(*oe_iter, graph);
184  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
185  joins.insert(vert);
186  }
187  }
188  return joins;
189 }
190 
191 } // namespace
192 
194  const bool build_sequence) {
195  CHECK(sink);
196  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
197  throw std::runtime_error("Query not supported yet");
198  }
199 
200  graph_ = build_dag(sink);
201 
202  boost::topological_sort(graph_, std::back_inserter(ordering_));
203  std::reverse(ordering_.begin(), ordering_.end());
204 
205  ordering_ = merge_sort_with_input(ordering_, graph_);
206  joins_ = get_join_vertices(ordering_, graph_);
207 
208  if (build_sequence) {
209  while (next()) {
210  // noop
211  }
212  }
213 }
214 
215 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
216  descs_.emplace_back(std::move(exec_desc));
217 }
218 
220  while (current_vertex_ < ordering_.size()) {
221  auto vert = ordering_[current_vertex_++];
222  if (joins_.count(vert)) {
223  continue;
224  }
225  auto& node = graph_[vert];
226  CHECK(node);
227  if (dynamic_cast<const RelScan*>(node)) {
228  scan_count_++;
229  continue;
230  }
231  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
232  return descs_.back().get();
233  }
234  return nullptr;
235 }
236 
237 ssize_t RaExecutionSequence::nextStepId(const bool after_broadcast) const {
238  if (after_broadcast) {
239  if (current_vertex_ == ordering_.size()) {
240  return -1;
241  }
242  const auto steps_to_next_broadcast = static_cast<ssize_t>(stepsToNextBroadcast());
243  return static_cast<ssize_t>(descs_.size()) + steps_to_next_broadcast;
244  } else {
245  return current_vertex_ == ordering_.size() ? -1 : descs_.size();
246  }
247 }
248 
250  if (current_vertex_ == ordering_.size()) {
251  // All descriptors visited, execution finished
252  return true;
253  } else {
254  const auto next_step_id = nextStepId(true);
255  if (next_step_id < 0 ||
256  (static_cast<size_t>(next_step_id) == totalDescriptorsCount())) {
257  // One step remains (the current vertex), or all remaining steps can be executed
258  // without another broadcast (i.e. on the aggregator)
259  return true;
260  }
261  }
262  return false;
263 }
264 
266  size_t num_descriptors = 0;
267  size_t crt_vertex = 0;
268  while (crt_vertex < ordering_.size()) {
269  auto vert = ordering_[crt_vertex++];
270  if (joins_.count(vert)) {
271  continue;
272  }
273  auto& node = graph_[vert];
274  CHECK(node);
275  if (dynamic_cast<const RelScan*>(node)) {
276  continue;
277  }
278  ++num_descriptors;
279  }
280  return num_descriptors;
281 }
282 
284  size_t steps_to_next_broadcast = 0;
285  auto crt_vertex = current_vertex_;
286  while (crt_vertex < ordering_.size()) {
287  auto vert = ordering_[crt_vertex++];
288  auto node = graph_[vert];
289  CHECK(node);
290  if (joins_.count(vert)) {
291  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
292  CHECK(join_node);
293  if (crt_vertex < ordering_.size() - 1) {
294  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
295  // Note that crt_vertex has already been incremented once above for the join node
296  // -- increment it again to account for the parent node of the join
297  ++steps_to_next_broadcast;
298  ++crt_vertex;
299  continue;
300  } else {
301  CHECK_EQ(crt_vertex, ordering_.size() - 1);
302  // If the join node parent is the last node in the tree, run all remaining steps
303  // on the aggregator
304  return ++steps_to_next_broadcast;
305  }
306  }
307  if (auto sort = dynamic_cast<const RelSort*>(node)) {
308  CHECK_EQ(sort->inputCount(), size_t(1));
309  node = sort->getInput(0);
310  }
311  if (dynamic_cast<const RelScan*>(node)) {
312  return steps_to_next_broadcast;
313  }
314  if (dynamic_cast<const RelModify*>(node)) {
315  // Modify runs on the leaf automatically, run the same node as a noop on the
316  // aggregator
317  ++steps_to_next_broadcast;
318  continue;
319  }
320  if (auto project = dynamic_cast<const RelProject*>(node)) {
321  if (project->hasWindowFunctionExpr()) {
322  ++steps_to_next_broadcast;
323  continue;
324  }
325  }
326  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
327  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
328  return steps_to_next_broadcast;
329  }
330  }
331  ++steps_to_next_broadcast;
332  }
333  return steps_to_next_broadcast;
334 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
ssize_t nextStepId(const bool after_broadcast) const
ExecutionResult & operator=(const ExecutionResult &that)
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::shared_ptr< ResultSet > ResultSetPtr
const RelAlgNode * getBody() const
RaExecutionSequence(const RelAlgNode *, const bool build_sequence=true)
ExecutionResult(const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta)
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
std::vector< TargetMetaInfo > targets_meta_
void setResult(const ExecutionResult &result)