OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/graph/topological_sort.hpp>
20 
23 
25  : result_(), targets_meta_(), filter_push_down_enabled_(false) {
26  std::vector<TargetInfo> target_infos;
27  result_ = nullptr;
28 }
29 
30 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
31  const std::vector<TargetMetaInfo>& targets_meta)
32  : result_(rows), targets_meta_(targets_meta), filter_push_down_enabled_(false) {}
33 
35  const std::vector<TargetMetaInfo>& targets_meta)
36  : targets_meta_(targets_meta), filter_push_down_enabled_(false) {
37  result_ = std::move(result);
38 }
39 
41  : targets_meta_(that.targets_meta_)
42  , pushed_down_filter_info_(that.pushed_down_filter_info_)
43  , filter_push_down_enabled_(that.filter_push_down_enabled_) {
44  if (!pushed_down_filter_info_.empty() ||
46  return;
47  }
48  result_ = that.result_;
49 }
50 
52  : targets_meta_(std::move(that.targets_meta_))
53  , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
54  , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_)) {
55  if (!pushed_down_filter_info_.empty() ||
57  return;
58  }
59  result_ = std::move(that.result_);
60 }
61 
63  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
64  bool filter_push_down_enabled)
65  : pushed_down_filter_info_(pushed_down_filter_info)
66  , filter_push_down_enabled_(filter_push_down_enabled) {}
67 
69  if (!that.pushed_down_filter_info_.empty() ||
70  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
73  return *this;
74  }
75  result_ = that.result_;
77  return *this;
78 }
79 
80 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
81  const {
83 }
84 
85 void RaExecutionDesc::setResult(const ExecutionResult& result) {
86  result_ = result;
87  body_->setContextData(this);
88 }
89 
91  return body_;
92 }
93 
94 namespace {
95 
96 std::vector<Vertex> merge_sort_with_input(const std::vector<Vertex>& vertices,
97  const DAG& graph) {
98  DAG::in_edge_iterator ie_iter, ie_end;
99  std::unordered_set<Vertex> inputs;
100  for (const auto vert : vertices) {
101  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
102  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
103  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
104  const auto in_vert = boost::source(*ie_iter, graph);
105  const auto input = graph[in_vert];
106  if (dynamic_cast<const RelScan*>(input)) {
107  throw std::runtime_error("Standalone sort not supported yet");
108  }
109  if (boost::out_degree(in_vert, graph) > 1) {
110  throw std::runtime_error("Sort's input node used by others not supported yet");
111  }
112  inputs.insert(in_vert);
113  }
114  }
115 
116  std::vector<Vertex> new_vertices;
117  for (const auto vert : vertices) {
118  if (inputs.count(vert)) {
119  continue;
120  }
121  new_vertices.push_back(vert);
122  }
123  return new_vertices;
124 }
125 
126 DAG build_dag(const RelAlgNode* sink) {
127  DAG graph(1);
128  graph[0] = sink;
129  std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
130  std::make_pair(sink, 0)};
131  std::vector<const RelAlgNode*> stack(1, sink);
132  while (!stack.empty()) {
133  const auto node = stack.back();
134  stack.pop_back();
135  if (dynamic_cast<const RelScan*>(node)) {
136  continue;
137  }
138 
139  const auto input_num = node->inputCount();
140  switch (input_num) {
141  case 0:
142  CHECK(dynamic_cast<const RelLogicalValues*>(node));
143  case 1:
144  break;
145  case 2:
146  CHECK(dynamic_cast<const RelJoin*>(node) ||
147  dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
148  dynamic_cast<const RelLogicalUnion*>(node) ||
149  dynamic_cast<const RelTableFunction*>(node));
150  break;
151  default:
152  CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
153  dynamic_cast<const RelLogicalUnion*>(node) ||
154  dynamic_cast<const RelTableFunction*>(node));
155  }
156  for (size_t i = 0; i < input_num; ++i) {
157  const auto input = node->getInput(i);
158  CHECK(input);
159  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
160  if (!visited) {
161  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
162  }
163  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
164  if (!visited) {
165  graph[node_ptr_to_vert_idx[input]] = input;
166  stack.push_back(input);
167  }
168  }
169  }
170  return graph;
171 }
172 
173 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
174  const DAG& graph) {
175  std::unordered_set<Vertex> joins;
176  for (const auto vert : vertices) {
177  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
178  joins.insert(vert);
179  continue;
180  }
181  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
182  continue;
183  }
184  if (boost::out_degree(vert, graph) > 1) {
185  throw std::runtime_error("Join used more than once not supported yet");
186  }
187  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
188  CHECK(std::next(oe_iter) == oe_end);
189  const auto out_vert = boost::target(*oe_iter, graph);
190  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
191  joins.insert(vert);
192  }
193  }
194  return joins;
195 }
196 
197 } // namespace
198 
200  const bool build_sequence) {
201  CHECK(sink);
202  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
203  throw std::runtime_error("Query not supported yet");
204  }
205 
206  graph_ = build_dag(sink);
207 
208  boost::topological_sort(graph_, std::back_inserter(ordering_));
209  std::reverse(ordering_.begin(), ordering_.end());
210 
211  ordering_ = merge_sort_with_input(ordering_, graph_);
212  joins_ = get_join_vertices(ordering_, graph_);
213 
214  if (build_sequence) {
215  while (next()) {
216  // noop
217  }
218  }
219 }
220 
221 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
222  descs_.emplace_back(std::move(exec_desc));
223 }
224 
226  while (current_vertex_ < ordering_.size()) {
227  auto vert = ordering_[current_vertex_++];
228  if (joins_.count(vert)) {
229  continue;
230  }
231  auto& node = graph_[vert];
232  CHECK(node);
233  if (dynamic_cast<const RelScan*>(node)) {
234  scan_count_++;
235  continue;
236  }
237  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
238  return descs_.back().get();
239  }
240  return nullptr;
241 }
242 
243 std::optional<size_t> RaExecutionSequence::nextStepId(const bool after_broadcast) const {
244  if (after_broadcast) {
245  if (current_vertex_ == ordering_.size()) {
246  return std::nullopt;
247  }
248  return descs_.size() + stepsToNextBroadcast();
249  } else if (current_vertex_ == ordering_.size()) {
250  return std::nullopt;
251  } else {
252  return descs_.size();
253  }
254 }
255 
257  if (current_vertex_ == ordering_.size()) {
258  // All descriptors visited, execution finished
259  return true;
260  } else {
261  const auto next_step_id = nextStepId(true);
262  if (!next_step_id || (*next_step_id == totalDescriptorsCount())) {
263  // One step remains (the current vertex), or all remaining steps can be executed
264  // without another broadcast (i.e. on the aggregator)
265  return true;
266  }
267  }
268  return false;
269 }
270 
272  size_t num_descriptors = 0;
273  size_t crt_vertex = 0;
274  while (crt_vertex < ordering_.size()) {
275  auto vert = ordering_[crt_vertex++];
276  if (joins_.count(vert)) {
277  continue;
278  }
279  auto& node = graph_[vert];
280  CHECK(node);
281  if (dynamic_cast<const RelScan*>(node)) {
282  continue;
283  }
284  ++num_descriptors;
285  }
286  return num_descriptors;
287 }
288 
290  size_t steps_to_next_broadcast = 0;
291  auto crt_vertex = current_vertex_;
292  while (crt_vertex < ordering_.size()) {
293  auto vert = ordering_[crt_vertex++];
294  auto node = graph_[vert];
295  CHECK(node);
296  if (joins_.count(vert)) {
297  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
298  CHECK(join_node);
299  if (crt_vertex < ordering_.size() - 1) {
300  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
301  // Note that crt_vertex has already been incremented once above for the join node
302  // -- increment it again to account for the parent node of the join
303  ++steps_to_next_broadcast;
304  ++crt_vertex;
305  continue;
306  } else {
307  CHECK_EQ(crt_vertex, ordering_.size() - 1);
308  // If the join node parent is the last node in the tree, run all remaining steps
309  // on the aggregator
310  return ++steps_to_next_broadcast;
311  }
312  }
313  if (auto sort = dynamic_cast<const RelSort*>(node)) {
314  CHECK_EQ(sort->inputCount(), size_t(1));
315  node = sort->getInput(0);
316  }
317  if (dynamic_cast<const RelScan*>(node)) {
318  return steps_to_next_broadcast;
319  }
320  if (dynamic_cast<const RelModify*>(node)) {
321  // Modify runs on the leaf automatically, run the same node as a noop on the
322  // aggregator
323  ++steps_to_next_broadcast;
324  continue;
325  }
326  if (auto project = dynamic_cast<const RelProject*>(node)) {
327  if (project->hasWindowFunctionExpr()) {
328  ++steps_to_next_broadcast;
329  continue;
330  }
331  }
332  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
333  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
334  return steps_to_next_broadcast;
335  }
336  }
337  ++steps_to_next_broadcast;
338  }
339  return steps_to_next_broadcast;
340 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
ExecutionResult & operator=(const ExecutionResult &that)
const RelAlgNode * body_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
RaExecutionSequence(const RelAlgNode *, const bool build_sequence=true)
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:197
const RelAlgNode * getBody() const
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
void setContextData(const void *context_data) const
std::optional< size_t > nextStepId(const bool after_broadcast) const