22 #include <boost/graph/topological_sort.hpp>
27 : filter_push_down_enabled_(
false)
29 , execution_time_ms_(0)
30 , type_(QueryResult) {}
33 const std::vector<TargetMetaInfo>& targets_meta)
35 , targets_meta_(targets_meta)
36 , filter_push_down_enabled_(
false)
38 , execution_time_ms_(0)
39 , type_(QueryResult) {}
42 const std::vector<TargetMetaInfo>& targets_meta)
43 : result_(std::move(
result))
44 , targets_meta_(targets_meta)
45 , filter_push_down_enabled_(
false)
47 , execution_time_ms_(0)
48 , type_(QueryResult) {}
51 : targets_meta_(that.targets_meta_)
52 , pushed_down_filter_info_(that.pushed_down_filter_info_)
53 , filter_push_down_enabled_(that.filter_push_down_enabled_)
55 , execution_time_ms_(0)
56 , type_(QueryResult) {
65 : targets_meta_(std::move(that.targets_meta_))
66 , pushed_down_filter_info_(std::move(that.pushed_down_filter_info_))
67 , filter_push_down_enabled_(std::move(that.filter_push_down_enabled_))
69 , execution_time_ms_(0)
70 , type_(QueryResult) {
75 result_ = std::move(that.result_);
79 const std::vector<PushedDownFilterInfo>& pushed_down_filter_info,
80 bool filter_push_down_enabled)
81 : pushed_down_filter_info_(pushed_down_filter_info)
82 , filter_push_down_enabled_(filter_push_down_enabled)
84 , execution_time_ms_(0)
85 , type_(QueryResult) {}
114 result_ = std::make_shared<ResultSet>(query);
119 return getRows()->getExplanation();
136 std::unordered_map<const RelAlgNode*, int>& node_ptr_to_vert_idx) {
139 node_ptr_to_vert_idx.emplace(std::make_pair(sink, 0));
140 std::vector<const RelAlgNode*> stack(1, sink);
141 while (!stack.empty()) {
142 const auto node = stack.back();
144 if (dynamic_cast<const RelScan*>(node)) {
148 const auto input_num = node->inputCount();
151 CHECK(dynamic_cast<const RelLogicalValues*>(node) ||
152 dynamic_cast<const RelTableFunction*>(node));
156 CHECK(dynamic_cast<const RelJoin*>(node) ||
157 dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
158 dynamic_cast<const RelLogicalUnion*>(node) ||
159 dynamic_cast<const RelTableFunction*>(node));
162 CHECK(dynamic_cast<const RelLeftDeepInnerJoin*>(node) ||
163 dynamic_cast<const RelLogicalUnion*>(node) ||
164 dynamic_cast<const RelTableFunction*>(node));
166 for (
size_t i = 0; i < input_num; ++i) {
167 const auto input = node->getInput(i);
169 const bool visited = node_ptr_to_vert_idx.count(input) > 0;
171 node_ptr_to_vert_idx.insert(std::make_pair(input, node_ptr_to_vert_idx.size()));
173 boost::add_edge(node_ptr_to_vert_idx[input], node_ptr_to_vert_idx[node], graph);
175 graph[node_ptr_to_vert_idx[input]] = input;
176 stack.push_back(input);
185 std::unordered_set<Vertex> joins;
186 for (
const auto vert : vertices) {
187 if (dynamic_cast<const RelLeftDeepInnerJoin*>(graph[vert])) {
191 if (!dynamic_cast<const RelJoin*>(graph[vert])) {
194 if (boost::out_degree(vert, graph) > 1) {
195 throw std::runtime_error(
"Join used more than once not supported yet");
197 auto [oe_iter, oe_end] = boost::out_edges(vert, graph);
198 CHECK(std::next(oe_iter) == oe_end);
199 const auto out_vert = boost::target(*oe_iter, graph);
200 if (!dynamic_cast<const RelJoin*>(graph[out_vert])) {
211 const bool build_sequence) {
214 if (dynamic_cast<const RelScan*>(sink) || dynamic_cast<const RelJoin*>(sink)) {
215 throw std::runtime_error(
"Query not supported yet");
226 if (build_sequence) {
239 descs_.emplace_back(std::move(exec_desc));
243 auto checkQueryStepSkippable =
246 if (
executor_->getRecultSetRecyclerHolder().hasCachedQueryResultSet(key)) {
249 const auto output_meta_info =
250 executor_->getRecultSetRecyclerHolder().getOutputMetaInfo(key);
251 CHECK(output_meta_info);
260 auto& node =
graph_[vert];
262 if (dynamic_cast<const RelScan*>(node)) {
266 descs_.emplace_back(std::make_unique<RaExecutionDesc>(node));
271 auto extracted_query_plan_dag =
274 !boost::iequals(extracted_query_plan_dag.extracted_dag,
EMPTY_QUERY_PLAN)) {
277 return descs_.back().get();
283 const std::vector<Vertex>& vertices,
285 DAG::in_edge_iterator ie_iter, ie_end;
286 std::unordered_set<Vertex> inputs;
287 for (
const auto vert : vertices) {
288 if (
const auto sort = dynamic_cast<const RelSort*>(graph[vert])) {
289 boost::tie(ie_iter, ie_end) = boost::in_edges(vert, graph);
290 CHECK(
size_t(1) ==
sort->inputCount() && boost::next(ie_iter) == ie_end);
291 if (
sort->isLimitDelivered()) {
294 const auto in_vert = boost::source(*ie_iter, graph);
295 const auto input = graph[in_vert];
296 if (dynamic_cast<const RelScan*>(input)) {
297 throw std::runtime_error(
"Standalone sort not supported yet");
299 if (boost::out_degree(in_vert, graph) > 1) {
300 throw std::runtime_error(
"Sort's input node used by others not supported yet");
302 inputs.insert(in_vert);
306 std::vector<Vertex> new_vertices;
307 for (
const auto vert : vertices) {
308 if (inputs.count(vert)) {
311 new_vertices.push_back(vert);
324 auto itr =
descs_.rbegin();
325 return (++itr)->get();
329 const auto pushChildNodes = [&](
auto& stack,
const auto node_id) {
330 auto [in_edge_iter, in_edge_end] = boost::in_edges(
node_id,
graph_);
331 while (in_edge_iter != in_edge_end) {
332 const auto child_node_id = boost::source(*in_edge_iter,
graph_);
333 stack.push(
graph_[child_node_id]);
334 std::advance(in_edge_iter, 1);
337 auto top_node_id =
descs_.size() - 1;
338 auto top_res =
skippable_steps_.emplace(top_node_id, std::unordered_set<int>{});
339 CHECK(top_res.second);
340 for (
auto it =
descs_.begin(); it != std::prev(
descs_.end()); ++it) {
341 const auto step = it->get();
342 const auto body = step->getBody();
343 const auto step_id =
static_cast<int>(std::distance(
descs_.begin(), it));
347 std::stack<const RelAlgNode*> stack;
349 while (!stack.empty()) {
350 auto child_node = stack.top();
354 auto is_desc_body = std::find_if(
355 descs_.begin(), it, [&child_node](std::unique_ptr<RaExecutionDesc>& ptr) {
356 return ptr->getBody() == child_node;
358 if (is_desc_body != it) {
361 const auto child_step_id =
362 static_cast<int>(std::distance(
descs_.begin(), is_desc_body));
377 std::unordered_set<int> skippable_query_steps;
381 const auto& child_steps = it->second;
383 child_steps.begin(), child_steps.end(), [&](
const auto& skippable_step_id) {
384 if (skippable_step_id != cached_step_id) {
385 skippable_query_steps.insert(skippable_step_id);
391 if (!skippable_query_steps.empty()) {
392 std::vector<std::unique_ptr<RaExecutionDesc>> new_descs;
393 for (
size_t step_id = 0; step_id <
descs_.size(); ++step_id) {
394 const auto body =
descs_[step_id]->getBody();
395 if (!skippable_query_steps.count(step_id)) {
396 new_descs.push_back(std::make_unique<RaExecutionDesc>(body));
399 const auto prev_num_steps =
descs_.size();
400 if (!new_descs.empty()) {
403 VLOG(1) <<
"Skip " << prev_num_steps -
descs_.size() <<
" query steps from "
404 << prev_num_steps <<
" steps";
407 for (
const auto& desc :
descs_) {
410 auto body = desc->getBody();
419 if (after_broadcast) {
449 bool operator()(std::unique_ptr<RaExecutionDesc>
const& desc)
const {
450 return desc->getBody()->getId() == body_id_;
457 unsigned const body_id,
458 size_t const start_idx)
const {
460 auto const from_end =
descs_.size() - (start_idx + 1);
461 MatchBody
const match_body{body_id};
462 auto const itr = std::find_if(
descs_.rbegin() + from_end,
descs_.rend(), match_body);
463 return itr ==
descs_.rend() ?
nullptr : itr->get();
467 size_t num_descriptors = 0;
468 size_t crt_vertex = 0;
474 auto& node =
graph_[vert];
476 if (dynamic_cast<const RelScan*>(node)) {
481 return num_descriptors;
485 size_t steps_to_next_broadcast = 0;
494 for (
size_t i = 0; i < join_node->inputCount(); i++) {
495 const auto input = join_node->getInput(i);
496 if (dynamic_cast<const RelScan*>(input)) {
497 return steps_to_next_broadcast;
504 ++steps_to_next_broadcast;
511 return ++steps_to_next_broadcast;
514 if (
auto sort = dynamic_cast<const RelSort*>(node)) {
516 node =
sort->getInput(0);
518 if (dynamic_cast<const RelScan*>(node)) {
519 return steps_to_next_broadcast;
521 if (dynamic_cast<const RelModify*>(node)) {
524 ++steps_to_next_broadcast;
527 if (
auto project = dynamic_cast<const RelProject*>(node)) {
528 if (project->hasWindowFunctionExpr()) {
529 ++steps_to_next_broadcast;
533 for (
size_t input_idx = 0; input_idx < node->inputCount(); input_idx++) {
534 if (dynamic_cast<const RelScan*>(node->getInput(input_idx))) {
535 return steps_to_next_broadcast;
538 ++steps_to_next_broadcast;
540 return steps_to_next_broadcast;
void setOutputMetainfo(std::vector< TargetMetaInfo > targets_metainfo) const
std::string getExplanation()
uint64_t execution_time_ms_
bool g_use_query_resultset_cache
std::vector< Vertex > mergeSortWithInput(const std::vector< Vertex > &vertices, const DAG &graph)
bool g_allow_query_step_skipping
ExecutionResult & operator=(const ExecutionResult &that)
DEVICE void sort(ARGS &&...args)
std::shared_ptr< ResultSet > ResultSetPtr
std::unordered_set< Vertex > joins_
bool g_enable_data_recycler
bool operator()(std::unique_ptr< RaExecutionDesc > const &desc) const
size_t getQueryPlanDagHash() const
DAG build_dag(const RelAlgNode *sink, std::unordered_map< const RelAlgNode *, int > &node_ptr_to_vert_idx)
std::unordered_set< int > cached_query_steps_
std::vector< std::unique_ptr< RaExecutionDesc > > descs_
const std::shared_ptr< ResultSet > & getRows() const
bool filter_push_down_enabled_
bool executionFinished() const
void updateResultSet(const std::string &query_ra, RType type, bool success=true)
void setContextData(const RaExecutionDesc *context_data) const
void extractQueryStepSkippingInfo()
bool g_enable_smem_group_by true
std::vector< PushedDownFilterInfo > pushed_down_filter_info_
size_t stepsToNextBroadcast() const
std::unordered_map< const RelAlgNode *, int > node_ptr_to_vert_idx_
RaExecutionSequence(const RelAlgNode *, Executor *, const bool build_sequence=true)
const std::vector< PushedDownFilterInfo > & getPushedDownFilterInfo() const
std::unordered_set< Vertex > get_join_vertices(const std::vector< Vertex > &vertices, const DAG &graph)
std::vector< Vertex > ordering_
bool g_enable_watchdog false
constexpr char const * EMPTY_QUERY_PLAN
const RelAlgNode * getBody() const
boost::adjacency_list< boost::setS, boost::vecS, boost::bidirectionalS, const RelAlgNode * > DAG
size_t totalDescriptorsCount() const
unsigned node_id(const rapidjson::Value &ra_node) noexcept
std::vector< TargetMetaInfo > targets_meta_
DEVICE void reverse(ARGS &&...args)
RaExecutionDesc * getDescriptorByBodyId(unsigned const body_id, size_t const start_idx) const
DEVICE void swap(ARGS &&...args)
std::unordered_map< int, std::unordered_set< int > > skippable_steps_
std::unordered_map< int, QueryPlanHash > cached_resultset_keys_
std::optional< size_t > nextStepId(const bool after_broadcast) const