OmniSciDB  16c4e035a1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
20 
21 #include <boost/graph/topological_sort.hpp>
22 
23 #include <algorithm>
24 
26  : filter_push_down_enabled_(false)
27  , success_(true)
28  , execution_time_ms_(0)
29  , type_(QueryResult) {}
30 
31 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
32  const std::vector<TargetMetaInfo>& targets_meta)
33  : result_(rows)
34  , targets_meta_(targets_meta)
35  , filter_push_down_enabled_(false)
36  , success_(true)
37  , execution_time_ms_(0)
38  , type_(QueryResult) {}
39 
41  const std::vector<TargetMetaInfo>& targets_meta)
42  : result_(std::move(result))
43  , targets_meta_(targets_meta)
44  , filter_push_down_enabled_(false)
45  , success_(true)
46  , execution_time_ms_(0)
47  , type_(QueryResult) {}
48 
50  : targets_meta_(that.targets_meta_)
51  , pushed_down_filter_info_(that.pushed_down_filter_info_)
52  , filter_push_down_enabled_(that.filter_push_down_enabled_)
53  , success_(true)
54  , execution_time_ms_(0)
55  , type_(QueryResult) {
56  if (!pushed_down_filter_info_.empty() ||
58  return;
59  }
60  result_ = that.result_;
61 }
62 
64  : targets_meta_(std::move(that.targets_meta_))
65  , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
66  , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_))
67  , success_(true)
68  , execution_time_ms_(0)
69  , type_(QueryResult) {
70  if (!pushed_down_filter_info_.empty() ||
72  return;
73  }
74  result_ = std::move(that.result_);
75 }
76 
78  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
79  bool filter_push_down_enabled)
80  : pushed_down_filter_info_(pushed_down_filter_info)
81  , filter_push_down_enabled_(filter_push_down_enabled)
82  , success_(true)
83  , execution_time_ms_(0)
84  , type_(QueryResult) {}
85 
87  if (!that.pushed_down_filter_info_.empty() ||
88  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
91  return *this;
92  }
93  result_ = that.result_;
95  success_ = that.success_;
97  type_ = that.type_;
98  return *this;
99 }
100 
101 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
102  const {
104 }
105 
106 void ExecutionResult::updateResultSet(const std::string& query,
107  RType type,
108  bool success) {
109  targets_meta_.clear();
110  pushed_down_filter_info_.clear();
111  success_ = success;
112  type_ = type;
113  result_ = std::make_shared<ResultSet>(query);
114 }
115 
117  if (!empty()) {
118  return getRows()->getExplanation();
119  }
120  return {};
121 }
122 
123 void RaExecutionDesc::setResult(const ExecutionResult& result) {
124  result_ = result;
125  body_->setContextData(this);
126 }
127 
129  return body_;
130 }
131 
132 namespace {
133 
134 std::vector<Vertex> merge_sort_with_input(const std::vector<Vertex>& vertices,
135  const DAG& graph) {
136  DAG::in_edge_iterator ie_iter, ie_end;
137  std::unordered_set<Vertex> inputs;
138  for (const auto vert : vertices) {
139  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
140  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
141  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
142  const auto in_vert = boost::source(*ie_iter, graph);
143  const auto input = graph[in_vert];
144  if (dynamic_cast<const RelScan*>(input)) {
145  throw std::runtime_error("Standalone sort not supported yet");
146  }
147  if (boost::out_degree(in_vert, graph) > 1) {
148  throw std::runtime_error("Sort's input node used by others not supported yet");
149  }
150  inputs.insert(in_vert);
151  }
152  }
153 
154  std::vector<Vertex> new_vertices;
155  for (const auto vert : vertices) {
156  if (inputs.count(vert)) {
157  continue;
158  }
159  new_vertices.push_back(vert);
160  }
161  return new_vertices;
162 }
163 
164 DAG build_dag(const RelAlgNode* sink) {
165  DAG graph(1);
166  graph[0] = sink;
167  std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
168  std::make_pair(sink, 0)};
169  std::vector<const RelAlgNode*> stack(1, sink);
170  while (!stack.empty()) {
171  const auto node = stack.back();
172  stack.pop_back();
173  if (dynamic_cast<const RelScan*>(node)) {
174  continue;
175  }
176 
177  const auto input_num = node->inputCount();
178  switch (input_num) {
179  case 0:
180  CHECK(dynamic_cast<const RelLogicalValues*>(node) ||
181  dynamic_cast<const RelTableFunction*>(node));
182  case 1:
183  break;
184  case 2:
185  CHECK(dynamic_cast<const RelJoin*>(node) ||
186  dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
187  dynamic_cast<const RelLogicalUnion*>(node) ||
188  dynamic_cast<const RelTableFunction*>(node));
189  break;
190  default:
191  CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
192  dynamic_cast<const RelLogicalUnion*>(node) ||
193  dynamic_cast<const RelTableFunction*>(node));
194  }
195  for (size_t i = 0; i < input_num; ++i) {
196  const auto input = node->getInput(i);
197  CHECK(input);
198  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
199  if (!visited) {
200  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
201  }
202  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
203  if (!visited) {
204  graph[node_ptr_to_vert_idx[input]] = input;
205  stack.push_back(input);
206  }
207  }
208  }
209  return graph;
210 }
211 
212 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
213  const DAG& graph) {
214  std::unordered_set<Vertex> joins;
215  for (const auto vert : vertices) {
216  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
217  joins.insert(vert);
218  continue;
219  }
220  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
221  continue;
222  }
223  if (boost::out_degree(vert, graph) > 1) {
224  throw std::runtime_error("Join used more than once not supported yet");
225  }
226  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
227  CHECK(std::next(oe_iter) == oe_end);
228  const auto out_vert = boost::target(*oe_iter, graph);
229  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
230  joins.insert(vert);
231  }
232  }
233  return joins;
234 }
235 
236 } // namespace
237 
239  const bool build_sequence) {
240  CHECK(sink);
241  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
242  throw std::runtime_error("Query not supported yet");
243  }
244 
245  graph_ = build_dag(sink);
246 
247  boost::topological_sort(graph_, std::back_inserter(ordering_));
248  std::reverse(ordering_.begin(), ordering_.end());
249 
250  ordering_ = merge_sort_with_input(ordering_, graph_);
251  joins_ = get_join_vertices(ordering_, graph_);
252 
253  if (build_sequence) {
254  while (next()) {
255  // noop
256  }
257  }
258 }
259 
260 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
261  descs_.emplace_back(std::move(exec_desc));
262 }
263 
265  while (current_vertex_ < ordering_.size()) {
266  auto vert = ordering_[current_vertex_++];
267  if (joins_.count(vert)) {
268  continue;
269  }
270  auto& node = graph_[vert];
271  CHECK(node);
272  if (dynamic_cast<const RelScan*>(node)) {
273  scan_count_++;
274  continue;
275  }
276  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
277  return descs_.back().get();
278  }
279  return nullptr;
280 }
281 
283  if (descs_.empty()) {
284  return nullptr;
285  }
286  if (descs_.size() == 1) {
287  return nullptr;
288  }
289  CHECK_GE(descs_.size(), size_t(2));
290  auto itr = descs_.rbegin();
291  return (++itr)->get();
292 }
293 
294 std::optional<size_t> RaExecutionSequence::nextStepId(const bool after_broadcast) const {
295  if (after_broadcast) {
296  if (current_vertex_ == ordering_.size()) {
297  return std::nullopt;
298  }
299  return descs_.size() + stepsToNextBroadcast();
300  } else if (current_vertex_ == ordering_.size()) {
301  return std::nullopt;
302  } else {
303  return descs_.size();
304  }
305 }
306 
308  if (current_vertex_ == ordering_.size()) {
309  // All descriptors visited, execution finished
310  return true;
311  } else {
312  const auto next_step_id = nextStepId(true);
313  if (!next_step_id || (*next_step_id == totalDescriptorsCount())) {
314  // One step remains (the current vertex), or all remaining steps can be executed
315  // without another broadcast (i.e. on the aggregator)
316  return true;
317  }
318  }
319  return false;
320 }
321 
322 namespace {
323 struct MatchBody {
324  unsigned const body_id_;
325  bool operator()(std::unique_ptr<RaExecutionDesc> const& desc) const {
326  return desc->getBody()->getId() == body_id_;
327  }
328 };
329 } // namespace
330 
331 // Search for RaExecutionDesc* by body, starting at start_idx and decrementing to 0.
333  unsigned const body_id,
334  size_t const start_idx) const {
335  CHECK_LT(start_idx, descs_.size());
336  auto const from_end = descs_.size() - (start_idx + 1);
337  MatchBody const match_body{body_id};
338  auto const itr = std::find_if(descs_.rbegin() + from_end, descs_.rend(), match_body);
339  return itr == descs_.rend() ? nullptr : itr->get();
340 }
341 
343  size_t num_descriptors = 0;
344  size_t crt_vertex = 0;
345  while (crt_vertex < ordering_.size()) {
346  auto vert = ordering_[crt_vertex++];
347  if (joins_.count(vert)) {
348  continue;
349  }
350  auto& node = graph_[vert];
351  CHECK(node);
352  if (dynamic_cast<const RelScan*>(node)) {
353  continue;
354  }
355  ++num_descriptors;
356  }
357  return num_descriptors;
358 }
359 
361  size_t steps_to_next_broadcast = 0;
362  auto crt_vertex = current_vertex_;
363  while (crt_vertex < ordering_.size()) {
364  auto vert = ordering_[crt_vertex++];
365  auto node = graph_[vert];
366  CHECK(node);
367  if (joins_.count(vert)) {
368  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
369  CHECK(join_node);
370  for (size_t i = 0; i < join_node->inputCount(); i++) {
371  const auto input = join_node->getInput(i);
372  if (dynamic_cast<const RelScan*>(input)) {
373  return steps_to_next_broadcast;
374  }
375  }
376  if (crt_vertex < ordering_.size() - 1) {
377  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
378  // Note that crt_vertex has already been incremented once above for the join node
379  // -- increment it again to account for the parent node of the join
380  ++steps_to_next_broadcast;
381  ++crt_vertex;
382  continue;
383  } else {
384  CHECK_EQ(crt_vertex, ordering_.size() - 1);
385  // If the join node parent is the last node in the tree, run all remaining steps
386  // on the aggregator
387  return ++steps_to_next_broadcast;
388  }
389  }
390  if (auto sort = dynamic_cast<const RelSort*>(node)) {
391  CHECK_EQ(sort->inputCount(), size_t(1));
392  node = sort->getInput(0);
393  }
394  if (dynamic_cast<const RelScan*>(node)) {
395  return steps_to_next_broadcast;
396  }
397  if (dynamic_cast<const RelModify*>(node)) {
398  // Modify runs on the leaf automatically, run the same node as a noop on the
399  // aggregator
400  ++steps_to_next_broadcast;
401  continue;
402  }
403  if (auto project = dynamic_cast<const RelProject*>(node)) {
404  if (project->hasWindowFunctionExpr()) {
405  ++steps_to_next_broadcast;
406  continue;
407  }
408  }
409  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
410  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
411  return steps_to_next_broadcast;
412  }
413  }
414  ++steps_to_next_broadcast;
415  }
416  return steps_to_next_broadcast;
417 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
ExecutionResult & operator=(const ExecutionResult &that)
const RelAlgNode * body_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:224
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
bool operator()(std::unique_ptr< RaExecutionDesc > const &desc) const
RaExecutionSequence(const RelAlgNode *, const bool build_sequence=true)
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
const std::shared_ptr< ResultSet > & getRows() const
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
void setContextData(const RaExecutionDesc *context_data) const
bool g_enable_smem_group_by true
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
#define CHECK_LT(x, y)
Definition: Logger.h:221
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:211
const RelAlgNode * getBody() const
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
std::optional< size_t > nextStepId(const bool after_broadcast) const