19 #include <boost/graph/topological_sort.hpp>
25 : result_(), targets_meta_(), filter_push_down_enabled_(
false) {
26 std::vector<TargetInfo> target_infos;
31 const std::vector<TargetMetaInfo>& targets_meta)
32 : result_(rows), targets_meta_(targets_meta), filter_push_down_enabled_(
false) {}
35 const std::vector<TargetMetaInfo>& targets_meta)
36 : targets_meta_(targets_meta), filter_push_down_enabled_(
false) {
41 : targets_meta_(that.targets_meta_)
42 , pushed_down_filter_info_(that.pushed_down_filter_info_)
43 , filter_push_down_enabled_(that.filter_push_down_enabled_) {
52 : targets_meta_(std::move(that.targets_meta_))
53 , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
54 , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_)) {
59 result_ = std::move(that.result_);
63 const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
64 bool filter_push_down_enabled)
65 : pushed_down_filter_info_(pushed_down_filter_info)
66 , filter_push_down_enabled_(filter_push_down_enabled) {}
98 DAG::in_edge_iterator ie_iter, ie_end;
99 std::unordered_set<Vertex> inputs;
100 for (
const auto vert : vertices) {
101 if (
const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
102 boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
103 CHECK(
size_t(1) ==
sort->inputCount() && boost::next(ie_iter) == ie_end);
104 const auto in_vert = boost::source(*ie_iter, graph);
105 const auto input = graph[in_vert];
106 if (dynamic_cast<const RelScan*>(input)) {
107 throw std::runtime_error(
"Standalone sort not supported yet");
109 if (boost::out_degree(in_vert, graph) > 1) {
110 throw std::runtime_error(
"Sort's input node used by others not supported yet");
112 inputs.insert(in_vert);
116 std::vector<Vertex> new_vertices;
117 for (
const auto vert : vertices) {
118 if (inputs.count(vert)) {
121 new_vertices.push_back(vert);
129 std::unordered_map<const RelAlgNode*, int> node_ptr_to_vert_idx{
130 std::make_pair(sink, 0)};
131 std::vector<const RelAlgNode*> stack(1, sink);
132 while (!stack.empty()) {
133 const auto node = stack.back();
135 if (dynamic_cast<const RelScan*>(node)) {
139 const auto input_num = node->inputCount();
142 CHECK(dynamic_cast<const RelLogicalValues*>(node));
146 CHECK(dynamic_cast<const RelJoin*>(node) ||
147 dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
148 dynamic_cast<const RelLogicalUnion*>(node) ||
149 dynamic_cast<const RelTableFunction*>(node));
152 CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
153 dynamic_cast<const RelLogicalUnion*>(node) ||
154 dynamic_cast<const RelTableFunction*>(node));
156 for (
size_t i = 0; i < input_num; ++i) {
157 const auto input = node->getInput(i);
159 const bool visited = node_ptr_to_vert_idx.count(input) > 0;
161 node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
163 boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
165 graph[node_ptr_to_vert_idx[input]] = input;
166 stack.push_back(input);
175 std::unordered_set<Vertex> joins;
176 for (
const auto vert : vertices) {
177 if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
181 if (!dynamic_cast<const RelJoin*>(graph[vert])) {
184 if (boost::out_degree(vert, graph) > 1) {
185 throw std::runtime_error(
"Join used more than once not supported yet");
187 auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
188 CHECK(std::next(oe_iter) == oe_end);
189 const auto out_vert = boost::target(*oe_iter, graph);
190 if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
200 const bool build_sequence) {
202 if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
203 throw std::runtime_error(
"Query not supported yet");
214 if (build_sequence) {
222 descs_.emplace_back(std::move(exec_desc));
231 auto& node =
graph_[vert];
233 if (dynamic_cast<const RelScan*>(node)) {
237 descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
238 return descs_.back().get();
244 if (after_broadcast) {
272 size_t num_descriptors = 0;
273 size_t crt_vertex = 0;
279 auto& node =
graph_[vert];
281 if (dynamic_cast<const RelScan*>(node)) {
286 return num_descriptors;
290 size_t steps_to_next_broadcast = 0;
303 ++steps_to_next_broadcast;
310 return ++steps_to_next_broadcast;
313 if (
auto sort = dynamic_cast<const RelSort*>(node)) {
315 node =
sort->getInput(0);
317 if (dynamic_cast<const RelScan*>(node)) {
318 return steps_to_next_broadcast;
320 if (dynamic_cast<const RelModify*>(node)) {
323 ++steps_to_next_broadcast;
326 if (
auto project = dynamic_cast<const RelProject*>(node)) {
327 if (project->hasWindowFunctionExpr()) {
328 ++steps_to_next_broadcast;
332 for (
size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
333 if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
334 return steps_to_next_broadcast;
337 ++steps_to_next_broadcast;
339 return steps_to_next_broadcast;
DAG build_dag(const RelAlgNode *sink)
ExecutionResult & operator=(const ExecutionResult &that)
DEVICE void sort(ARGS &&...args)
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
RaExecutionSequence(const RelAlgNode *, const bool build_sequence=true)
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
bool filter_push_down_enabled_
bool executionFinished() const
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
size_t stepsToNextBroadcast() const
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
const RelAlgNode * getBody() const
std::vector< Vertex > merge_sort_with_input(const std::vector< Vertex > &vertices, const DAG &graph)
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
size_t totalDescriptorsCount() const
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
void setContextData(const void *context_data) const
std::optional< size_t > nextStepId(const bool after_broadcast) const