OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
QueryPlanDagCache.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/graph/adjacency_list.hpp>
20 #include <boost/graph/graph_utility.hpp>
21 #include <boost/graph/graphviz.hpp>
22 #include <boost/graph/labeled_graph.hpp>
23 
24 #include <iostream>
25 #include <memory>
26 #include <vector>
27 
28 #include "RelAlgDag.h"
29 #include "RelAlgExecutionUnit.h"
30 #include "ScalarExprVisitor.h"
32 
33 constexpr size_t MAX_NODE_CACHE_SIZE = 1e9; // set ~1GB as cache size threshold
34 
35 // we manage the uniqueness of node ID by its explained contents that each rel node has
36 using RelNodeMap = std::unordered_map<RelNodeExplainedHash, RelNodeId>;
37 // we also maintain labeled graph to manage extracted query plan DAG
38 // this can be used in a future to support advanced features such as partial resultset
39 // reuse and compiled kernel reuse by exploiting graph-centric computation like subgraph
40 // matching and graph isomorphism
41 using QueryPlanDag = boost::labeled_graph<AdjacentList, RelNodeId, boost::hash_mapS>;
42 
44  : public ScalarExprVisitor<std::vector<const Analyzer::ColumnVar*>> {
45  protected:
46  std::vector<const Analyzer::ColumnVar*> visitColumnVar(
47  const Analyzer::ColumnVar* column) const override {
48  return {column};
49  }
50 
51  std::vector<const Analyzer::ColumnVar*> visitColumnVarTuple(
52  const Analyzer::ExpressionTuple* expr_tuple) const override {
53  ColumnVarsVisitor visitor;
54  std::vector<const Analyzer::ColumnVar*> result;
55  for (size_t i = 0; i < expr_tuple->getTuple().size(); ++i) {
56  const auto col_vars = visitor.visit(expr_tuple->getTuple()[i].get());
57  for (const auto col_var : col_vars) {
58  result.push_back(col_var);
59  }
60  }
61  return result;
62  }
63 
64  std::vector<const Analyzer::ColumnVar*> aggregateResult(
65  const std::vector<const Analyzer::ColumnVar*>& aggregate,
66  const std::vector<const Analyzer::ColumnVar*>& next_result) const override {
67  auto result = aggregate;
68  for (const auto col_var : next_result) {
69  result.push_back(col_var);
70  }
71  return result;
72  }
73 };
74 
76  public:
78 
79  static std::unordered_set<size_t> getScanNodeTableKey(RelAlgNode const* rel_alg_node) {
80  ScanNodeTableKeyCollector scan_node_table_key_collector;
81  scan_node_table_key_collector.visit(rel_alg_node);
82  return std::move(scan_node_table_key_collector.table_keys_);
83  }
84 
85  private:
86  void visit(RelScan const* scan_node) override {
87  CHECK(scan_node->getTableDescriptor());
88  CHECK(scan_node->getTableDescriptor()->fragmenter);
89  auto hashed_chunk_key = boost::hash_value(scan_node->getTableDescriptor()
90  ->fragmenter->getFragmentsForQuery()
91  .chunkKeyPrefix);
92  table_keys_.insert(hashed_chunk_key);
93  RelRexDagVisitor::visit(scan_node);
94  }
95 
96  std::unordered_set<size_t> table_keys_;
97 };
98 
99 // This is one of main data structure for data recycling which manages a query plan shape
100 // as a DAG representation
101 // A query plan DAG is a sequence of unique node ID, and it means that we can assign the
102 // same node ID to a node iff we already saw that node in a different query plan that we
103 // extracted to retrieve a query plan DAG we visit each rel node of an input query plan
104 // starting from the root to the bottom (left-to-right child visiting), and check whether
105 // it is valid for DAG extraction and its usage (we do not allow dag extraction if a query
106 // plan has not supported rel node for data recycling such as logical value) and if that
107 // visited node is valid then we check its uniqueness against DAG cache and assign the
108 // unique ID once it is unique one (otherwise we reuse node id) after visiting a query
109 // plan we have a sequence of node IDs and return it as an extracted query plan DAG
111  public:
112  QueryPlanDagCache(size_t max_node_cache_size = MAX_NODE_CACHE_SIZE)
113  : max_node_map_size_(max_node_cache_size) {}
114 
115  QueryPlanDagCache(QueryPlanDagCache&& other) = delete;
116  QueryPlanDagCache& operator=(QueryPlanDagCache&& other) = delete;
117  QueryPlanDagCache(const QueryPlanDagCache&) = delete;
119 
120  std::optional<RelNodeId> addNodeIfAbsent(const RelAlgNode*);
121 
122  void connectNodes(const RelNodeId parent_id, const RelNodeId child_id);
123 
124  std::vector<const Analyzer::ColumnVar*> collectColVars(const Analyzer::Expr* target);
125 
126  size_t getCurrentNodeMapSize() const;
127 
128  void setNodeMapMaxSize(const size_t map_size);
129 
130  size_t getJoinColumnsInfoHash(const Analyzer::Expr* join_expr,
131  JoinColumnSide target_side,
132  bool extract_only_col_id);
133 
134  size_t translateColVarsToInfoHash(std::vector<const Analyzer::ColumnVar*>& col_vars,
135  bool col_id_only) const;
136 
137  void clearQueryPlanCache();
138 
139  void printDag();
140 
141  private:
142  size_t getCurrentNodeMapSizeUnlocked() const;
143  size_t getCurrentNodeMapCardinality() const;
144 
145  // a map btw. rel node and its unique node id
147  // a graph structure that represents relationships among extracted query plan DAGs
149  // a limitation of the maximum size of DAG cache (to prevent unlimited usage of memory
150  // for DAG maintanence)
152 
153  // a lock to protect contentions while accessing internal data structure of DAG cache
154  mutable std::mutex cache_lock_;
156 };
virtual void visit(RelAlgNode const *)
std::optional< RelNodeId > addNodeIfAbsent(const RelAlgNode *)
void connectNodes(const RelNodeId parent_id, const RelNodeId child_id)
JoinColumnSide
T visit(const Analyzer::Expr *expr) const
size_t getCurrentNodeMapSizeUnlocked() const
size_t getJoinColumnsInfoHash(const Analyzer::Expr *join_expr, JoinColumnSide target_side, bool extract_only_col_id)
static std::unordered_set< size_t > getScanNodeTableKey(RelAlgNode const *rel_alg_node)
ColumnVarsVisitor col_var_visitor_
void visit(RelScan const *scan_node) override
QueryPlanDag cached_query_plan_dag_
const std::vector< std::shared_ptr< Analyzer::Expr > > & getTuple() const
Definition: Analyzer.h:253
size_t translateColVarsToInfoHash(std::vector< const Analyzer::ColumnVar * > &col_vars, bool col_id_only) const
std::vector< const Analyzer::ColumnVar * > visitColumnVarTuple(const Analyzer::ExpressionTuple *expr_tuple) const override
size_t getCurrentNodeMapCardinality() const
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
std::unordered_map< RelNodeExplainedHash, RelNodeId > RelNodeMap
std::vector< const Analyzer::ColumnVar * > aggregateResult(const std::vector< const Analyzer::ColumnVar * > &aggregate, const std::vector< const Analyzer::ColumnVar * > &next_result) const override
QueryPlanDagCache(size_t max_node_cache_size=MAX_NODE_CACHE_SIZE)
boost::labeled_graph< AdjacentList, RelNodeId, boost::hash_mapS > QueryPlanDag
constexpr size_t MAX_NODE_CACHE_SIZE
void setNodeMapMaxSize(const size_t map_size)
std::vector< const Analyzer::ColumnVar * > visitColumnVar(const Analyzer::ColumnVar *column) const override
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3525
#define CHECK(condition)
Definition: Logger.h:291
size_t getCurrentNodeMapSize() const
std::unordered_set< size_t > table_keys_
Execution unit for relational algebra. It&#39;s a low-level description of any relational algebra operati...
const TableDescriptor * getTableDescriptor() const
Definition: RelAlgDag.h:1117
std::vector< const Analyzer::ColumnVar * > collectColVars(const Analyzer::Expr *target)
size_t RelNodeId
QueryPlanDagCache & operator=(QueryPlanDagCache &&other)=delete