OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/graph/topological_sort.hpp>
20 
23 
25  : filter_push_down_enabled_(false)
26  , success_(true)
27  , execution_time_ms_(0)
28  , type_(QueryResult) {}
29 
30 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
31  const std::vector<TargetMetaInfo>& targets_meta)
32  : result_(rows)
33  , targets_meta_(targets_meta)
34  , filter_push_down_enabled_(false)
35  , success_(true)
36  , execution_time_ms_(0)
37  , type_(QueryResult) {}
38 
40  const std::vector<TargetMetaInfo>& targets_meta)
41  : result_(std::move(result))
42  , targets_meta_(targets_meta)
43  , filter_push_down_enabled_(false)
44  , success_(true)
45  , execution_time_ms_(0)
46  , type_(QueryResult) {}
47 
49  : targets_meta_(that.targets_meta_)
50  , pushed_down_filter_info_(that.pushed_down_filter_info_)
51  , filter_push_down_enabled_(that.filter_push_down_enabled_)
52  , success_(true)
53  , execution_time_ms_(0)
54  , type_(QueryResult) {
55  if (!pushed_down_filter_info_.empty() ||
57  return;
58  }
59  result_ = that.result_;
60 }
61 
63  : targets_meta_(std::move(that.targets_meta_))
64  , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
65  , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_))
66  , success_(true)
67  , execution_time_ms_(0)
68  , type_(QueryResult) {
69  if (!pushed_down_filter_info_.empty() ||
71  return;
72  }
73  result_ = std::move(that.result_);
74 }
75 
77  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
78  bool filter_push_down_enabled)
79  : pushed_down_filter_info_(pushed_down_filter_info)
80  , filter_push_down_enabled_(filter_push_down_enabled)
81  , success_(true)
82  , execution_time_ms_(0)
83  , type_(QueryResult) {}
84 
86  if (!that.pushed_down_filter_info_.empty() ||
87  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
90  return *this;
91  }
92  result_ = that.result_;
94  success_ = that.success_;
96  type_ = that.type_;
97  return *this;
98 }
99 
100 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
101  const {
103 }
104 
105 void ExecutionResult::updateResultSet(const std::string& query,
106  RType type,
107  bool success) {
108  targets_meta_.clear();
109  pushed_down_filter_info_.clear();
110  success_ = success;
111  type_ = type;
112  result_ = std::make_shared<ResultSet>(query);
113 }
114 
116  if (!empty()) {
117  return getRows()->getExplanation();
118  }
119  return {};
120 }
121 
122 void RaExecutionDesc::setResult(const ExecutionResult& result) {
123  result_ = result;
124  body_->setContextData(this);
125 }
126 
128  return body_;
129 }
130 
131 namespace {
132 
133 std::vector<Vertex> merge_sort_with_input(const std::vector<Vertex>& vertices,
134  const DAG& graph) {
135  DAG::in_edge_iterator ie_iter, ie_end;
136  std::unordered_set<Vertex> inputs;
137  for (const auto vert : vertices) {
138  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
139  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
140  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
141  const auto in_vert = boost::source(*ie_iter, graph);
142  const auto input = graph[in_vert];
143  if (dynamic_cast<const RelScan*>(input)) {
144  throw std::runtime_error("Standalone sort not supported yet");
145  }
146  if (boost::out_degree(in_vert, graph) > 1) {
147  throw std::runtime_error("Sort's input node used by others not supported yet");
148  }
149  inputs.insert(in_vert);
150  }
151  }
152 
153  std::vector<Vertex> new_vertices;
154  for (const auto vert : vertices) {
155  if (inputs.count(vert)) {
156  continue;
157  }
158  new_vertices.push_back(vert);
159  }
160  return new_vertices;
161 }
162 
163 DAG build_dag(const RelAlgNode* sink) {
164  DAG graph(1);
165  graph[0] = sink;
166  std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
167  std::make_pair(sink, 0)};
168  std::vector<const RelAlgNode*> stack(1, sink);
169  while (!stack.empty()) {
170  const auto node = stack.back();
171  stack.pop_back();
172  if (dynamic_cast<const RelScan*>(node)) {
173  continue;
174  }
175 
176  const auto input_num = node->inputCount();
177  switch (input_num) {
178  case 0:
179  CHECK(dynamic_cast<const RelLogicalValues*>(node));
180  case 1:
181  break;
182  case 2:
183  CHECK(dynamic_cast<const RelJoin*>(node) ||
184  dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
185  dynamic_cast<const RelLogicalUnion*>(node) ||
186  dynamic_cast<const RelTableFunction*>(node));
187  break;
188  default:
189  CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
190  dynamic_cast<const RelLogicalUnion*>(node) ||
191  dynamic_cast<const RelTableFunction*>(node));
192  }
193  for (size_t i = 0; i < input_num; ++i) {
194  const auto input = node->getInput(i);
195  CHECK(input);
196  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
197  if (!visited) {
198  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
199  }
200  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
201  if (!visited) {
202  graph[node_ptr_to_vert_idx[input]] = input;
203  stack.push_back(input);
204  }
205  }
206  }
207  return graph;
208 }
209 
210 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
211  const DAG& graph) {
212  std::unordered_set<Vertex> joins;
213  for (const auto vert : vertices) {
214  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
215  joins.insert(vert);
216  continue;
217  }
218  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
219  continue;
220  }
221  if (boost::out_degree(vert, graph) > 1) {
222  throw std::runtime_error("Join used more than once not supported yet");
223  }
224  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
225  CHECK(std::next(oe_iter) == oe_end);
226  const auto out_vert = boost::target(*oe_iter, graph);
227  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
228  joins.insert(vert);
229  }
230  }
231  return joins;
232 }
233 
234 } // namespace
235 
237  const bool build_sequence) {
238  CHECK(sink);
239  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
240  throw std::runtime_error("Query not supported yet");
241  }
242 
243  graph_ = build_dag(sink);
244 
245  boost::topological_sort(graph_, std::back_inserter(ordering_));
246  std::reverse(ordering_.begin(), ordering_.end());
247 
248  ordering_ = merge_sort_with_input(ordering_, graph_);
249  joins_ = get_join_vertices(ordering_, graph_);
250 
251  if (build_sequence) {
252  while (next()) {
253  // noop
254  }
255  }
256 }
257 
258 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
259  descs_.emplace_back(std::move(exec_desc));
260 }
261 
263  while (current_vertex_ < ordering_.size()) {
264  auto vert = ordering_[current_vertex_++];
265  if (joins_.count(vert)) {
266  continue;
267  }
268  auto& node = graph_[vert];
269  CHECK(node);
270  if (dynamic_cast<const RelScan*>(node)) {
271  scan_count_++;
272  continue;
273  }
274  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
275  return descs_.back().get();
276  }
277  return nullptr;
278 }
279 
281  if (descs_.empty()) {
282  return nullptr;
283  }
284  if (descs_.size() == 1) {
285  return nullptr;
286  }
287  CHECK_GE(descs_.size(), size_t(2));
288  auto itr = descs_.rbegin();
289  return (++itr)->get();
290 }
291 
292 std::optional<size_t> RaExecutionSequence::nextStepId(const bool after_broadcast) const {
293  if (after_broadcast) {
294  if (current_vertex_ == ordering_.size()) {
295  return std::nullopt;
296  }
297  return descs_.size() + stepsToNextBroadcast();
298  } else if (current_vertex_ == ordering_.size()) {
299  return std::nullopt;
300  } else {
301  return descs_.size();
302  }
303 }
304 
306  if (current_vertex_ == ordering_.size()) {
307  // All descriptors visited, execution finished
308  return true;
309  } else {
310  const auto next_step_id = nextStepId(true);
311  if (!next_step_id || (*next_step_id == totalDescriptorsCount())) {
312  // One step remains (the current vertex), or all remaining steps can be executed
313  // without another broadcast (i.e. on the aggregator)
314  return true;
315  }
316  }
317  return false;
318 }
319 
321  size_t num_descriptors = 0;
322  size_t crt_vertex = 0;
323  while (crt_vertex < ordering_.size()) {
324  auto vert = ordering_[crt_vertex++];
325  if (joins_.count(vert)) {
326  continue;
327  }
328  auto& node = graph_[vert];
329  CHECK(node);
330  if (dynamic_cast<const RelScan*>(node)) {
331  continue;
332  }
333  ++num_descriptors;
334  }
335  return num_descriptors;
336 }
337 
339  size_t steps_to_next_broadcast = 0;
340  auto crt_vertex = current_vertex_;
341  while (crt_vertex < ordering_.size()) {
342  auto vert = ordering_[crt_vertex++];
343  auto node = graph_[vert];
344  CHECK(node);
345  if (joins_.count(vert)) {
346  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
347  CHECK(join_node);
348  for (size_t i = 0; i < join_node->inputCount(); i++) {
349  const auto input = join_node->getInput(i);
350  if (dynamic_cast<const RelScan*>(input)) {
351  return steps_to_next_broadcast;
352  }
353  }
354  if (crt_vertex < ordering_.size() - 1) {
355  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
356  // Note that crt_vertex has already been incremented once above for the join node
357  // -- increment it again to account for the parent node of the join
358  ++steps_to_next_broadcast;
359  ++crt_vertex;
360  continue;
361  } else {
362  CHECK_EQ(crt_vertex, ordering_.size() - 1);
363  // If the join node parent is the last node in the tree, run all remaining steps
364  // on the aggregator
365  return ++steps_to_next_broadcast;
366  }
367  }
368  if (auto sort = dynamic_cast<const RelSort*>(node)) {
369  CHECK_EQ(sort->inputCount(), size_t(1));
370  node = sort->getInput(0);
371  }
372  if (dynamic_cast<const RelScan*>(node)) {
373  return steps_to_next_broadcast;
374  }
375  if (dynamic_cast<const RelModify*>(node)) {
376  // Modify runs on the leaf automatically, run the same node as a noop on the
377  // aggregator
378  ++steps_to_next_broadcast;
379  continue;
380  }
381  if (auto project = dynamic_cast<const RelProject*>(node)) {
382  if (project->hasWindowFunctionExpr()) {
383  ++steps_to_next_broadcast;
384  continue;
385  }
386  }
387  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
388  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
389  return steps_to_next_broadcast;
390  }
391  }
392  ++steps_to_next_broadcast;
393  }
394  return steps_to_next_broadcast;
395 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
ExecutionResult & operator=(const ExecutionResult &that)
const RelAlgNode * body_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
RaExecutionSequence(const RelAlgNode *, const bool build_sequence=true)
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
const std::shared_ptr< ResultSet > & getRows() const
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
bool g_enable_smem_group_by true
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:197
const RelAlgNode * getBody() const
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
void setContextData(const void *context_data) const
std::optional< size_t > nextStepId(const bool after_broadcast) const