OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/graph/topological_sort.hpp>
20 
23 
24 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
25  const std::vector<TargetMetaInfo>& targets_meta)
26  : result_(rows), targets_meta_(targets_meta), filter_push_down_enabled_(false) {}
27 
29  const std::vector<TargetMetaInfo>& targets_meta)
30  : targets_meta_(targets_meta), filter_push_down_enabled_(false) {
31  result_ = std::move(result);
32 }
33 
35  : targets_meta_(that.targets_meta_)
36  , pushed_down_filter_info_(that.pushed_down_filter_info_)
37  , filter_push_down_enabled_(that.filter_push_down_enabled_) {
38  if (!pushed_down_filter_info_.empty() ||
40  return;
41  }
42  result_ = that.result_;
43 }
44 
46  : targets_meta_(std::move(that.targets_meta_))
47  , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
48  , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_)) {
49  if (!pushed_down_filter_info_.empty() ||
51  return;
52  }
53  result_ = std::move(that.result_);
54 }
55 
57  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
58  bool filter_push_down_enabled)
59  : pushed_down_filter_info_(pushed_down_filter_info)
60  , filter_push_down_enabled_(filter_push_down_enabled) {}
61 
63  if (!that.pushed_down_filter_info_.empty() ||
64  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
67  return *this;
68  }
69  result_ = that.result_;
71  return *this;
72 }
73 
74 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
75  const {
77 }
78 
79 void RaExecutionDesc::setResult(const ExecutionResult& result) {
80  result_ = result;
81  body_->setContextData(this);
82 }
83 
85  return body_;
86 }
87 
88 namespace {
89 
90 std::vector<Vertex> merge_sort_with_input(const std::vector<Vertex>& vertices,
91  const DAG& graph) {
92  DAG::in_edge_iterator ie_iter, ie_end;
93  std::unordered_set<Vertex> inputs;
94  for (const auto vert : vertices) {
95  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
96  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
97  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
98  const auto in_vert = boost::source(*ie_iter, graph);
99  const auto input = graph[in_vert];
100  if (dynamic_cast<const RelScan*>(input)) {
101  throw std::runtime_error("Standalone sort not supported yet");
102  }
103  if (boost::out_degree(in_vert, graph) > 1) {
104  throw std::runtime_error("Sort's input node used by others not supported yet");
105  }
106  inputs.insert(in_vert);
107  }
108  }
109 
110  std::vector<Vertex> new_vertices;
111  for (const auto vert : vertices) {
112  if (inputs.count(vert)) {
113  continue;
114  }
115  new_vertices.push_back(vert);
116  }
117  return new_vertices;
118 }
119 
120 DAG build_dag(const RelAlgNode* sink) {
121  DAG graph(1);
122  graph[0] = sink;
123  std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
124  std::make_pair(sink, 0)};
125  std::vector<const RelAlgNode*> stack(1, sink);
126  while (!stack.empty()) {
127  const auto node = stack.back();
128  stack.pop_back();
129  if (dynamic_cast<const RelScan*>(node)) {
130  continue;
131  }
132 
133  const auto input_num = node->inputCount();
134  CHECK(input_num == 1 ||
135  (dynamic_cast<const RelLogicalValues*>(node) && input_num == 0) ||
136  (dynamic_cast<const RelModify*>(node) && input_num == 1) ||
137  (input_num == 2 && (dynamic_cast<const RelJoin*>(node) ||
138  dynamic_cast<const RelLeftDeepInnerJoin*>(node))) ||
139  (input_num > 2 && (dynamic_cast<const RelLeftDeepInnerJoin*>(node))));
140  for (size_t i = 0; i < input_num; ++i) {
141  const auto input = node->getInput(i);
142  CHECK(input);
143  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
144  if (!visited) {
145  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
146  }
147  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
148  if (!visited) {
149  graph[node_ptr_to_vert_idx[input]] = input;
150  stack.push_back(input);
151  }
152  }
153  }
154  return graph;
155 }
156 
157 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
158  const DAG& graph) {
159  std::unordered_set<Vertex> joins;
160  for (const auto vert : vertices) {
161  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
162  joins.insert(vert);
163  continue;
164  }
165  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
166  continue;
167  }
168  if (boost::out_degree(vert, graph) > 1) {
169  throw std::runtime_error("Join used more than once not supported yet");
170  }
171  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
172  CHECK(std::next(oe_iter) == oe_end);
173  const auto out_vert = boost::target(*oe_iter, graph);
174  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
175  joins.insert(vert);
176  }
177  }
178  return joins;
179 }
180 
181 } // namespace
182 
184  const bool build_sequence) {
185  CHECK(sink);
186  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
187  throw std::runtime_error("Query not supported yet");
188  }
189 
190  graph_ = build_dag(sink);
191 
192  boost::topological_sort(graph_, std::back_inserter(ordering_));
193  std::reverse(ordering_.begin(), ordering_.end());
194 
195  ordering_ = merge_sort_with_input(ordering_, graph_);
196  joins_ = get_join_vertices(ordering_, graph_);
197 
198  if (build_sequence) {
199  while (next()) {
200  // noop
201  }
202  }
203 }
204 
205 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
206  descs_.emplace_back(std::move(exec_desc));
207 }
208 
210  while (current_vertex_ < ordering_.size()) {
211  auto vert = ordering_[current_vertex_++];
212  if (joins_.count(vert)) {
213  continue;
214  }
215  auto& node = graph_[vert];
216  CHECK(node);
217  if (dynamic_cast<const RelScan*>(node)) {
218  scan_count_++;
219  continue;
220  }
221  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
222  return descs_.back().get();
223  }
224  return nullptr;
225 }
226 
227 ssize_t RaExecutionSequence::nextStepId(const bool after_reduction) const {
228  if (after_reduction) {
229  if (current_vertex_ == ordering_.size()) {
230  return -1;
231  }
232  const auto steps_to_next_reduction = static_cast<ssize_t>(stepsToNextReduction());
233  return static_cast<ssize_t>(descs_.size()) + steps_to_next_reduction;
234  } else {
235  return current_vertex_ == ordering_.size() ? -1 : descs_.size();
236  }
237 }
238 
240  if (current_vertex_ == ordering_.size()) {
241  // All descriptors visited, execution finished
242  return true;
243  } else {
244  const auto next_step_id = nextStepId(true);
245  if (next_step_id < 0 ||
246  (static_cast<size_t>(next_step_id) == totalDescriptorsCount())) {
247  // One step remains (the current vertex), or all remaining steps can be executed
248  // without another reduction
249  return true;
250  }
251  }
252  return false;
253 }
254 
256  size_t num_descriptors = 0;
257  size_t crt_vertex = 0;
258  while (crt_vertex < ordering_.size()) {
259  auto vert = ordering_[crt_vertex++];
260  if (joins_.count(vert)) {
261  continue;
262  }
263  auto& node = graph_[vert];
264  CHECK(node);
265  if (dynamic_cast<const RelScan*>(node)) {
266  continue;
267  }
268  ++num_descriptors;
269  }
270  return num_descriptors;
271 }
272 
274  size_t steps_to_next_reduction = 0;
275  auto crt_vertex = current_vertex_;
276  while (crt_vertex < ordering_.size()) {
277  auto vert = ordering_[crt_vertex++];
278  auto node = graph_[vert];
279  CHECK(node);
280  if (joins_.count(vert)) {
281  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
282  CHECK(join_node);
283  if (crt_vertex < ordering_.size() - 1) {
284  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
285  // Note that crt_vertex has already been incremented once above for the join node
286  // -- increment it again to account for the parent node of the join
287  ++steps_to_next_reduction;
288  ++crt_vertex;
289  continue;
290  } else {
291  CHECK_EQ(crt_vertex, ordering_.size() - 1);
292  // If the join node parent is the last node in the tree, run all remaining steps
293  // on the aggregator
294  return ++steps_to_next_reduction;
295  }
296  }
297  if (auto sort = dynamic_cast<const RelSort*>(node)) {
298  CHECK_EQ(sort->inputCount(), size_t(1));
299  node = sort->getInput(0);
300  }
301  if (dynamic_cast<const RelScan*>(node)) {
302  return steps_to_next_reduction;
303  }
304  if (dynamic_cast<const RelModify*>(node)) {
305  // Modify runs on the leaf automatically, run the same node as a noop on the
306  // aggregator
307  ++steps_to_next_reduction;
308  continue;
309  }
310  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
311  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
312  return steps_to_next_reduction;
313  }
314  }
315  ++steps_to_next_reduction;
316  }
317  return steps_to_next_reduction;
318 }
#define CHECK_EQ(x, y)
Definition: Logger.h:198
ExecutionResult & operator=(const ExecutionResult &that)
const RelAlgNode * body_
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
ssize_t nextStepId(const bool after_reduction) const
CHECK(cgen_state)
RaExecutionSequence(const RelAlgNode *, const bool build_sequence=true)
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
ExecutionResult(const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta)
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
Definition: Execute.cpp:71
const RelAlgNode * getBody() const
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
std::vector< TargetMetaInfo > targets_meta_
void setContextData(const void *context_data) const