OmniSciDB  5ade3759e0
RelAlgExecutionDescriptor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include <boost/graph/adjacency_list.hpp>
19 #include <boost/graph/topological_sort.hpp>
20 
21 namespace {
22 
23 using DAG = boost::
24  adjacency_list<boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode*>;
25 using Vertex = DAG::vertex_descriptor;
26 
27 std::vector<Vertex> merge_sort_with_input(const std::vector<Vertex>& vertices,
28  const DAG& graph) {
29  DAG::in_edge_iterator ie_iter, ie_end;
30  std::unordered_set<Vertex> inputs;
31  for (const auto vert : vertices) {
32  if (const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
33  boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
34  CHECK(size_t(1) == sort->inputCount() && boost::next(ie_iter) == ie_end);
35  const auto in_vert = boost::source(*ie_iter, graph);
36  const auto input = graph[in_vert];
37  if (dynamic_cast<const RelScan*>(input)) {
38  throw std::runtime_error("Standalone sort not supported yet");
39  }
40  if (boost::out_degree(in_vert, graph) > 1) {
41  throw std::runtime_error("Sort's input node used by others not supported yet");
42  }
43  inputs.insert(in_vert);
44  }
45  }
46 
47  std::vector<Vertex> new_vertices;
48  for (const auto vert : vertices) {
49  if (inputs.count(vert)) {
50  continue;
51  }
52  new_vertices.push_back(vert);
53  }
54  return new_vertices;
55 }
56 
57 std::vector<Vertex> merge_join_with_non_join(const std::vector<Vertex>& vertices,
58  const DAG& graph) {
59  DAG::out_edge_iterator oe_iter, oe_end;
60  std::unordered_set<Vertex> joins;
61  for (const auto vert : vertices) {
62  if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
63  joins.insert(vert);
64  continue;
65  }
66  if (!dynamic_cast<const RelJoin*>(graph[vert])) {
67  continue;
68  }
69  if (boost::out_degree(vert, graph) > 1) {
70  throw std::runtime_error("Join used more than once not supported yet");
71  }
72  boost::tie(oe_iter, oe_end) = boost::out_edges(vert, graph);
73  CHECK(boost::next(oe_iter) == oe_end);
74  const auto out_vert = boost::target(*oe_iter, graph);
75  if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
76  joins.insert(vert);
77  }
78  }
79 
80  std::vector<Vertex> new_vertices;
81  for (const auto vert : vertices) {
82  if (joins.count(vert)) {
83  continue;
84  }
85  new_vertices.push_back(vert);
86  }
87  return new_vertices;
88 }
89 
90 DAG build_dag(const RelAlgNode* sink) {
91  DAG graph(1);
92  graph[0] = sink;
93  std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
94  std::make_pair(sink, 0)};
95  std::vector<const RelAlgNode*> stack(1, sink);
96  while (!stack.empty()) {
97  const auto node = stack.back();
98  stack.pop_back();
99  if (dynamic_cast<const RelScan*>(node)) {
100  continue;
101  }
102 
103  const auto input_num = node->inputCount();
104  CHECK(input_num == 1 ||
105  (dynamic_cast<const RelLogicalValues*>(node) && input_num == 0) ||
106  (dynamic_cast<const RelModify*>(node) && input_num == 1) ||
107  (input_num == 2 && (dynamic_cast<const RelJoin*>(node) ||
108  dynamic_cast<const RelLeftDeepInnerJoin*>(node))) ||
109  (input_num > 2 && (dynamic_cast<const RelLeftDeepInnerJoin*>(node))));
110  for (size_t i = 0; i < input_num; ++i) {
111  const auto input = node->getInput(i);
112  CHECK(input);
113  const bool visited = node_ptr_to_vert_idx.count(input) > 0;
114  if (!visited) {
115  node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
116  }
117  boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
118  if (!visited) {
119  graph[node_ptr_to_vert_idx[input]] = input;
120  stack.push_back(input);
121  }
122  }
123  }
124  return graph;
125 }
126 
127 std::vector<const RelAlgNode*> schedule_ra_dag(const RelAlgNode* sink) {
128  CHECK(sink);
129  auto graph = build_dag(sink);
130  std::vector<Vertex> ordering;
131  boost::topological_sort(graph, std::back_inserter(ordering));
132  std::reverse(ordering.begin(), ordering.end());
133 
134  std::vector<const RelAlgNode*> nodes;
135  ordering = merge_sort_with_input(ordering, graph);
136  for (auto vert : merge_join_with_non_join(ordering, graph)) {
137  nodes.push_back(graph[vert]);
138  }
139 
140  return nodes;
141 }
142 
143 } // namespace
144 
145 std::vector<RaExecutionDesc> get_execution_descriptors(const RelAlgNode* ra_node) {
146  CHECK(ra_node);
147  if (dynamic_cast<const RelScan*>(ra_node) || dynamic_cast<const RelJoin*>(ra_node)) {
148  throw std::runtime_error("Query not supported yet");
149  }
150 
151  std::vector<RaExecutionDesc> descs;
152  for (const auto node : schedule_ra_dag(ra_node)) {
153  if (dynamic_cast<const RelScan*>(node)) {
154  continue;
155  }
156  descs.emplace_back(node);
157  }
158 
159  return descs;
160 }
161 
162 std::vector<RaExecutionDesc> get_execution_descriptors(
163  const std::vector<const RelAlgNode*>& ra_nodes) {
164  CHECK(!ra_nodes.empty());
165 
166  std::vector<RaExecutionDesc> descs;
167  for (const auto node : ra_nodes) {
168  if (dynamic_cast<const RelScan*>(node)) {
169  continue;
170  }
171  CHECK_GT(node->inputCount(), size_t(0));
172  descs.emplace_back(node);
173  }
174 
175  return descs;
176 }
std::vector< const RelAlgNode * > schedule_ra_dag(const RelAlgNode *sink)
#define CHECK_GT(x, y)
Definition: Logger.h:199
std::vector< Vertex > merge_join_with_non_join(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
std::vector< RaExecutionDesc > get_execution_descriptors(const RelAlgNode *ra_node)
#define CHECK(condition)
Definition: Logger.h:187
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)