OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2019 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <boost/graph/topological_sort.hpp>
20 
23 
25  : filter_push_down_enabled_(false)
26  , success_(true)
27  , execution_time_ms_(0)
28  , type_(QueryResult) {}
29 
30 ExecutionResult::ExecutionResult(const std::shared_ptr<ResultSet>& rows,
31  const std::vector<TargetMetaInfo>& targets_meta)
32  : result_(rows)
33  , targets_meta_(targets_meta)
34  , filter_push_down_enabled_(false)
35  , success_(true)
36  , execution_time_ms_(0)
37  , type_(QueryResult) {}
38 
40  const std::vector<TargetMetaInfo>& targets_meta)
41  : result_(std::move(result))
42  , targets_meta_(targets_meta)
43  , filter_push_down_enabled_(false)
44  , success_(true)
45  , execution_time_ms_(0)
46  , type_(QueryResult) {}
47 
49  : targets_meta_(that.targets_meta_)
50  , pushed_down_filter_info_(that.pushed_down_filter_info_)
51  , filter_push_down_enabled_(that.filter_push_down_enabled_)
52  , success_(true)
53  , execution_time_ms_(0)
54  , type_(QueryResult) {
55  if (!pushed_down_filter_info_.empty() ||
57  return;
58  }
59  result_ = that.result_;
60 }
61 
63  : targets_meta_(std::move(that.targets_meta_))
64  , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
65  , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_))
66  , success_(true)
67  , execution_time_ms_(0)
68  , type_(QueryResult) {
69  if (!pushed_down_filter_info_.empty() ||
71  return;
72  }
73  result_ = std::move(that.result_);
74 }
75 
77  const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
78  bool filter_push_down_enabled)
79  : pushed_down_filter_info_(pushed_down_filter_info)
80  , filter_push_down_enabled_(filter_push_down_enabled)
81  , success_(true)
82  , execution_time_ms_(0)
83  , type_(QueryResult) {}
84 
86  if (!that.pushed_down_filter_info_.empty() ||
87  (that.filter_push_down_enabled_ && that.pushed_down_filter_info_.empty())) {
90  return *this;
91  }
92  result_ = that.result_;
94  success_ = that.success_;
96  type_ = that.type_;
97  return *this;
98 }
99 
100 const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInfo()
101  const {
103 }
104 
105 void ExecutionResult::updateResultSet(const std::string& query,
106  RType type,
107  bool success) {
108  targets_meta_.clear();
109  pushed_down_filter_info_.clear();
110  success_ = success;
111  type_ = type;
112  result_ = std::make_shared<ResultSet>(query);
113 }
114 
116  if (!empty()) {
117  return getRows()->getExplanation();
118  }
119  return {};
120 }
121 
122 void RaExecutionDesc::setResult(const ExecutionResult& result) {
123  result_ = result;
124  body_->setContextData(this);
125 }
126 
128  return body_;
129 }
130 
131 namespace {
132 
133 std::vector<Vertex> merge_sort_with_input(const std::vector<Vertex>& vertices,
134  const DAG& graph) {
135  DAG::in_edge_iterator ie_iter, ie_end;
136  std::unordered_set<Vertex> inputs;
137  for (const auto vert : vertices) {
138  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
139  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
140  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
141  const auto in_vert = boost::source(*ie_iter, graph);
142  const auto input = graph[in_vert];
143  if (dynamic_cast<const RelScan*>(input)) {
144  throw std::runtime_error("Standalone sort not supported yet");
145  }
146  if (boost::out_degree(in_vert, graph) > 1) {
147  throw std::runtime_error("Sort's input node used by others not supported yet");
148  }
149  inputs.insert(in_vert);
150  }
151  }
152 
153  std::vector<Vertex> new_vertices;
154  for (const auto vert : vertices) {
155  if (inputs.count(vert)) {
156  continue;
157  }
158  new_vertices.push_back(vert);
159  }
160  return new_vertices;
161 }
162 
163 DAG build_dag(const RelAlgNode* sink) {
164  DAG graph(1);
165  graph[0] = sink;
166  std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
167  std::make_pair(sink, 0)};
168  std::vector<const RelAlgNode*> stack(1, sink);
169  while (!stack.empty()) {
170  const auto node = stack.back();
171  stack.pop_back();
172  if (dynamic_cast<const RelScan*>(node)) {
173  continue;
174  }
175 
176  const auto input_num = node->inputCount();
177  switch (input_num) {
178  case 0:
179  CHECK(dynamic_cast<const RelLogicalValues*>(node) ||
180  dynamic_cast<const RelTableFunction*>(node));
181  case 1:
182  break;
183  case 2:
184  CHECK(dynamic_cast<const RelJoin*>(node) ||
185  dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
186  dynamic_cast<const RelLogicalUnion*>(node) ||
187  dynamic_cast<const RelTableFunction*>(node));
188  break;
189  default:
190  CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
191  dynamic_cast<const RelLogicalUnion*>(node) ||
192  dynamic_cast<const RelTableFunction*>(node));
193  }
194  for (size_t i = 0; i < input_num; ++i) {
195  const auto input = node->getInput(i);
196  CHECK(input);
197  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
198  if (!visited) {
199  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
200  }
201  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
202  if (!visited) {
203  graph[node_ptr_to_vert_idx[input]] = input;
204  stack.push_back(input);
205  }
206  }
207  }
208  return graph;
209 }
210 
211 std::unordered_set<Vertex> get_join_vertices(const std::vector<Vertex>& vertices,
212  const DAG& graph) {
213  std::unordered_set<Vertex> joins;
214  for (const auto vert : vertices) {
215  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
216  joins.insert(vert);
217  continue;
218  }
219  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
220  continue;
221  }
222  if (boost::out_degree(vert, graph) > 1) {
223  throw std::runtime_error("Join used more than once not supported yet");
224  }
225  auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
226  CHECK(std::next(oe_iter) == oe_end);
227  const auto out_vert = boost::target(*oe_iter, graph);
228  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
229  joins.insert(vert);
230  }
231  }
232  return joins;
233 }
234 
235 } // namespace
236 
238  const bool build_sequence) {
239  CHECK(sink);
240  if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
241  throw std::runtime_error("Query not supported yet");
242  }
243 
244  graph_ = build_dag(sink);
245 
246  boost::topological_sort(graph_, std::back_inserter(ordering_));
247  std::reverse(ordering_.begin(), ordering_.end());
248 
249  ordering_ = merge_sort_with_input(ordering_, graph_);
250  joins_ = get_join_vertices(ordering_, graph_);
251 
252  if (build_sequence) {
253  while (next()) {
254  // noop
255  }
256  }
257 }
258 
259 RaExecutionSequence::RaExecutionSequence(std::unique_ptr<RaExecutionDesc> exec_desc) {
260  descs_.emplace_back(std::move(exec_desc));
261 }
262 
264  while (current_vertex_ < ordering_.size()) {
265  auto vert = ordering_[current_vertex_++];
266  if (joins_.count(vert)) {
267  continue;
268  }
269  auto& node = graph_[vert];
270  CHECK(node);
271  if (dynamic_cast<const RelScan*>(node)) {
272  scan_count_++;
273  continue;
274  }
275  descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
276  return descs_.back().get();
277  }
278  return nullptr;
279 }
280 
282  if (descs_.empty()) {
283  return nullptr;
284  }
285  if (descs_.size() == 1) {
286  return nullptr;
287  }
288  CHECK_GE(descs_.size(), size_t(2));
289  auto itr = descs_.rbegin();
290  return (++itr)->get();
291 }
292 
293 std::optional<size_t> RaExecutionSequence::nextStepId(const bool after_broadcast) const {
294  if (after_broadcast) {
295  if (current_vertex_ == ordering_.size()) {
296  return std::nullopt;
297  }
298  return descs_.size() + stepsToNextBroadcast();
299  } else if (current_vertex_ == ordering_.size()) {
300  return std::nullopt;
301  } else {
302  return descs_.size();
303  }
304 }
305 
307  if (current_vertex_ == ordering_.size()) {
308  // All descriptors visited, execution finished
309  return true;
310  } else {
311  const auto next_step_id = nextStepId(true);
312  if (!next_step_id || (*next_step_id == totalDescriptorsCount())) {
313  // One step remains (the current vertex), or all remaining steps can be executed
314  // without another broadcast (i.e. on the aggregator)
315  return true;
316  }
317  }
318  return false;
319 }
320 
322  size_t num_descriptors = 0;
323  size_t crt_vertex = 0;
324  while (crt_vertex < ordering_.size()) {
325  auto vert = ordering_[crt_vertex++];
326  if (joins_.count(vert)) {
327  continue;
328  }
329  auto& node = graph_[vert];
330  CHECK(node);
331  if (dynamic_cast<const RelScan*>(node)) {
332  continue;
333  }
334  ++num_descriptors;
335  }
336  return num_descriptors;
337 }
338 
340  size_t steps_to_next_broadcast = 0;
341  auto crt_vertex = current_vertex_;
342  while (crt_vertex < ordering_.size()) {
343  auto vert = ordering_[crt_vertex++];
344  auto node = graph_[vert];
345  CHECK(node);
346  if (joins_.count(vert)) {
347  auto join_node = dynamic_cast<const RelLeftDeepInnerJoin*>(node);
348  CHECK(join_node);
349  for (size_t i = 0; i < join_node->inputCount(); i++) {
350  const auto input = join_node->getInput(i);
351  if (dynamic_cast<const RelScan*>(input)) {
352  return steps_to_next_broadcast;
353  }
354  }
355  if (crt_vertex < ordering_.size() - 1) {
356  // Force the parent node of the RelLeftDeepInnerJoin to run on the aggregator.
357  // Note that crt_vertex has already been incremented once above for the join node
358  // -- increment it again to account for the parent node of the join
359  ++steps_to_next_broadcast;
360  ++crt_vertex;
361  continue;
362  } else {
363  CHECK_EQ(crt_vertex, ordering_.size() - 1);
364  // If the join node parent is the last node in the tree, run all remaining steps
365  // on the aggregator
366  return ++steps_to_next_broadcast;
367  }
368  }
369  if (auto sort = dynamic_cast<const RelSort*>(node)) {
370  CHECK_EQ(sort->inputCount(), size_t(1));
371  node = sort->getInput(0);
372  }
373  if (dynamic_cast<const RelScan*>(node)) {
374  return steps_to_next_broadcast;
375  }
376  if (dynamic_cast<const RelModify*>(node)) {
377  // Modify runs on the leaf automatically, run the same node as a noop on the
378  // aggregator
379  ++steps_to_next_broadcast;
380  continue;
381  }
382  if (auto project = dynamic_cast<const RelProject*>(node)) {
383  if (project->hasWindowFunctionExpr()) {
384  ++steps_to_next_broadcast;
385  continue;
386  }
387  }
388  for (size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
389  if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
390  return steps_to_next_broadcast;
391  }
392  }
393  ++steps_to_next_broadcast;
394  }
395  return steps_to_next_broadcast;
396 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
ExecutionResult & operator=(const ExecutionResult &that)
const RelAlgNode * body_
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:222
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
RaExecutionSequence(const RelAlgNode *, const bool build_sequence=true)
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
const std::shared_ptr< ResultSet > & getRows() const
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
bool g_enable_smem_group_by true
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
const RelAlgNode * getBody() const
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
Definition: gpu_enabled.h:96
void setContextData(const void *context_data) const
std::optional< size_t > nextStepId(const bool after_broadcast) const