OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowContext.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Analyzer/Analyzer.h"
20 #include "DataMgr/Chunk/Chunk.h"
23 #include "Shared/sqltypes.h"
24 
25 #include <functional>
26 #include <unordered_map>
27 
28 // Returns true for value window functions, false otherwise.
30  switch (kind) {
36  return true;
37  default:
38  return false;
39  }
40 }
41 
42 // Returns true for aggregate window functions, false otherwise.
44  switch (kind) {
54  return true;
55  default:
56  return false;
57  }
58 }
59 
61  switch (kind) {
64  return true;
65  default:
66  return false;
67  }
68 }
69 
70 class Executor;
71 
73  std::vector<int64_t*> aggregate_tree_for_integer_type_;
74  std::vector<double*> aggregate_tree_for_double_type_;
75  std::vector<SumAndCountPair<int64_t>*> derived_aggregate_tree_for_integer_type_;
76  std::vector<SumAndCountPair<double>*> derived_aggregate_tree_for_double_type_;
78 
79  void resizeStorageForWindowFraming(size_t partition_count) {
80  aggregate_tree_for_integer_type_.resize(partition_count);
81  aggregate_tree_for_double_type_.resize(partition_count);
82  derived_aggregate_tree_for_integer_type_.resize(partition_count);
83  derived_aggregate_tree_for_double_type_.resize(partition_count);
84  }
85 };
86 
90  llvm::Value* current_row_pos_lv;
91  llvm::Value* current_col_value_lv;
93  llvm::Value* int64_t_zero_val_lv;
94  llvm::Value* int64_t_one_val_lv;
96  llvm::Value* order_key_buf_ptr_lv;
97  std::string order_type_col_name;
100  llvm::Value* nulls_first_lv;
101  llvm::Value* null_start_pos_lv;
102  llvm::Value* null_end_pos_lv;
103 };
104 
110 };
111 
112 // Per-window function context which encapsulates the logic for computing the various
113 // window function kinds and keeps ownership of buffers which contain the results. For
114 // rank functions, the code generated for the projection simply reads the values and
115 // writes them to the result set. For value and aggregate functions, only the iteration
116 // order is written to the buffer, the rest is handled by generating code in a similar
117 // way we do for non-window queries.
119  public:
120  // we currently only use a single GPU to process the window function because
121  // a query step having a window function expression only has a single fragment input
122  // (i.e., push the window function expression down to the child projection node)
123  // todo (yoonmin) : support window function execution with multi-fragmented input
124  // todo (yoonmin) : support heterogeneous execution (i.e., CPU + GPU)
125  static const int NUM_EXECUTION_DEVICES = 1;
126 
127  // non-partitioned version
129  const size_t elem_count,
130  const ExecutorDeviceType device_type,
131  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner);
132 
133  // partitioned version
135  const Analyzer::WindowFunction* window_func,
136  QueryPlanHash cache_key,
137  const std::shared_ptr<HashJoin>& partitions,
138  const size_t elem_count,
139  const ExecutorDeviceType device_type,
140  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
141  size_t aggregation_tree_fan_out = g_window_function_aggregation_tree_fanout);
142 
144 
146 
148 
149  // Adds the order column buffer to the context and keeps ownership of it.
150  void addOrderColumn(const int8_t* column,
151  const SQLTypeInfo& ti,
152  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
153 
155 
157  const int8_t* column,
158  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
159 
160  enum class WindowComparatorResult { LT, EQ, GT };
161  using Comparator =
162  std::function<WindowFunctionContext::WindowComparatorResult(const int64_t lhs,
163  const int64_t rhs)>;
164 
165  std::vector<Comparator> createComparator(size_t partition_idx);
166 
167  // Computes the window function result to be used during the actual projection query.
168  void compute(
169  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
170  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>&
171  sorted_partition_cache,
172  std::unordered_map<QueryPlanHash, AggregateTreeForWindowFraming>&
173  aggregate_tree_map);
174 
175  // Returns a pointer to the window function associated with this context.
177 
178  // Returns a pointer to the output buffer of the window function result.
179  const int8_t* output() const;
180 
181  // Returns a pointer to the sorted row index buffer
182  const int64_t* sortedPartition() const;
183 
184  // Returns a pointer to the value field of the aggregation state.
185  const int64_t* aggregateState() const;
186 
187  // Returns a pointer to the count field of the aggregation state.
188  const int64_t* aggregateStateCount() const;
189 
190  // Returns a handle to the pending outputs for the aggregate window function.
191  int64_t aggregateStatePendingOutputs() const;
192 
193  const int64_t* partitionStartOffset() const;
194 
195  const int64_t* partitionNumCountBuf() const;
196 
197  const std::vector<const int8_t*>& getColumnBufferForWindowFunctionExpressions() const;
198 
199  const std::vector<const int8_t*>& getOrderKeyColumnBuffers() const;
200 
201  const std::vector<SQLTypeInfo>& getOrderKeyColumnBufferTypes() const;
202 
204 
206 
208 
210 
211  size_t* getAggregateTreeDepth() const;
212 
213  size_t getAggregateTreeFanout() const;
214 
215  int64_t* getNullValueStartPos() const;
216 
217  int64_t* getNullValueEndPos() const;
218 
219  // Returns a pointer to the partition start bitmap.
220  const int8_t* partitionStart() const;
221 
222  // Returns a pointer to the partition end bitmap.
223  const int8_t* partitionEnd() const;
224 
225  // Returns the element count in the columns used by the window function.
226  size_t elementCount() const;
227 
228  const int32_t* payload() const;
229 
230  const int32_t* offsets() const;
231 
232  const int32_t* counts() const;
233 
234  size_t partitionCount() const;
235 
236  const bool needsToBuildAggregateTree() const;
237 
238  private:
239  // State for a window aggregate. The count field is only used for average.
240  struct AggregateState {
241  int64_t val;
242  int64_t count;
243  std::vector<void*> outputs;
244  llvm::Value* row_number = nullptr;
245  };
246 
247  static Comparator makeComparator(const Analyzer::ColumnVar* col_var,
248  const int8_t* partition_values,
249  const int32_t* partition_indices,
250  const bool asc_ordering,
251  const bool nulls_first);
252 
253  void computePartitionBuffer(const size_t partition_idx,
254  int64_t* output_for_partition_buff,
255  const Analyzer::WindowFunction* window_func);
256 
257  void sortPartition(const size_t partition_idx,
258  int64_t* output_for_partition_buff,
259  bool should_parallelize);
260 
261  void computeNullRangeOfSortedPartition(const SQLTypeInfo& order_col_ti,
262  size_t partition_idx,
263  const int32_t* original_col_idx_buf,
264  const int64_t* ordered_col_idx_buf);
265 
267  size_t partition_idx,
268  size_t partition_size,
269  const int32_t* original_rowid_buf,
270  const int64_t* ordered_rowid_buf,
271  const SQLTypeInfo& input_col_ti);
272 
273  void fillPartitionStart();
274 
275  void fillPartitionEnd();
276 
277  void resizeStorageForWindowFraming(bool const for_reuse = false);
278 
280 
284  // Keeps ownership of order column.
285  std::vector<std::vector<std::shared_ptr<Chunk_NS::Chunk>>> order_columns_owner_;
286  // Order column buffers.
287  std::vector<const int8_t*> order_columns_;
288  std::vector<SQLTypeInfo> order_columns_ti_;
289  // Hash table which contains the partitions specified by the window.
290  std::shared_ptr<HashJoin> partitions_;
291  // The number of elements in the table.
292  size_t elem_count_;
293  // The output of the window function.
294  int8_t* output_;
295  std::shared_ptr<std::vector<int64_t>> sorted_partition_buf_;
296  // Keeps ownership of column referenced in window function expression.
297  std::vector<std::vector<std::shared_ptr<Chunk_NS::Chunk>>>
299  // Column buffers used for window function expression
300  std::vector<const int8_t*> window_func_expr_columns_;
301  // we need to build a segment tree depending on the input column type
302  std::vector<std::shared_ptr<void>> segment_trees_owned_;
309  // Markers for partition start used to reinitialize state for aggregate window
310  // functions.
312  // Markers for partition end used to reinitialize state for aggregate window
313  // functions.
314  int8_t* partition_end_;
315  // State for aggregate function over a window.
318  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
319 
320  // For use when we do not have partitions_ hash table
321  const int32_t dummy_count_;
322  const int32_t dummy_offset_;
323  // dummy_payload_ is only initialized if there is no partitions_ hash table
324  // TODO(todd): There is no need for index buffer for non-partitioned
325  // window functions, as the row to index mapping is the identity function,
326  // so refactor makeComparator and ilk to allow for this
327  int32_t* dummy_payload_;
328 };
329 
330 // Keeps track of the multiple window functions in a window query.
332  public:
334  std::unique_ptr<WindowFunctionContext> window_function_context,
335  const size_t target_index);
336 
337  // Marks the window function at the given target index as active. This simplifies the
338  // code generation since it's now context sensitive. Each value window function can
339  // have its own iteration order, therefore fetching a column at a given position
340  // changes depending on which window function is active.
342  Executor* executor,
343  const size_t target_index) const;
344 
345  // Resets the active window function, which restores the regular (non-window) codegen
346  // behavior.
347  static void resetWindowFunctionContext(Executor* executor);
348 
349  // Gets the current active window function.
350  static WindowFunctionContext* getActiveWindowFunctionContext(Executor* executor);
351 
352  // Creates the context for a window function execution unit.
353  static WindowProjectNodeContext* create(Executor* executor);
354 
355  // Retrieves the context for the active window function execution unit.
356  static const WindowProjectNodeContext* get(Executor* executor);
357 
358  // Resets the active context.
359  static void reset(Executor* executor);
360 
361  private:
362  // A map from target index to the context associated with the window function at that
363  // target index.
364  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>> window_contexts_;
365 };
366 
368 
size_t getAggregateTreeFanout() const
Defines data structures for the semantic analysis phase of query processing.
std::vector< SumAndCountPair< double > * > derived_aggregate_tree_for_double_type_
Definition: WindowContext.h:76
void addOrderColumn(const int8_t *column, const SQLTypeInfo &ti, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
llvm::Value * num_elem_current_partition_lv
Definition: WindowContext.h:95
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
int64_t * ordered_partition_null_start_pos_
llvm::Value * current_col_value_lv
Definition: WindowContext.h:91
const int32_t dummy_count_
std::vector< double * > aggregate_tree_for_double_type_
Definition: WindowContext.h:74
int64_t * getNullValueEndPos() const
bool window_function_conditional_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:60
llvm::Value * current_partition_start_offset_lv
const int32_t dummy_offset_
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool asc_ordering, const bool nulls_first)
llvm::Value * current_row_pos_lv
Definition: WindowContext.h:90
llvm::Value * target_partition_rowid_ptr_lv
Definition: WindowContext.h:98
const int8_t * partitionStart() const
llvm::Value * frame_end_bound_expr_lv
Definition: WindowContext.h:89
llvm::Value * num_elem_current_partition_lv
llvm::Value * nulls_first_lv
const std::vector< SQLTypeInfo > & getOrderKeyColumnBufferTypes() const
void setSortedPartitionCacheKey(QueryPlanHash cache_key)
void computeNullRangeOfSortedPartition(const SQLTypeInfo &order_col_ti, size_t partition_idx, const int32_t *original_col_idx_buf, const int64_t *ordered_col_idx_buf)
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
Constants for Builtin SQL Types supported by HEAVY.AI.
const int32_t * offsets() const
llvm::Value * target_partition_sorted_rowid_ptr_lv
llvm::Value * target_partition_rowid_ptr_lv
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
ExecutorDeviceType
const bool needsToBuildAggregateTree() const
size_t * getAggregateTreeDepth() const
int64_t ** getAggregationTreesForIntegerTypeWindowExpr() const
const std::vector< const int8_t * > & getColumnBufferForWindowFunctionExpressions() const
int64_t * getNullValueStartPos() const
SumAndCountPair< double > ** getDerivedAggregationTreesForDoubleTypeWindowExpr() const
const int64_t * partitionStartOffset() const
std::vector< std::shared_ptr< void > > segment_trees_owned_
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t partitionCount() const
AggregateState aggregate_state_
llvm::Value * null_start_pos_lv
bool window_function_is_value(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:29
size_t g_window_function_aggregation_tree_fanout
const int64_t * aggregateStateCount() const
std::vector< Comparator > createComparator(size_t partition_idx)
QueryPlanHash sorted_partition_cache_key_
const int8_t * partitionEnd() const
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > window_func_expr_columns_owner_
void addColumnBufferForWindowFunctionExpression(const int8_t *column, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
QueryPlanHash partition_cache_key_
int64_t aggregateStatePendingOutputs() const
void buildAggregationTreeForPartition(SqlWindowFunctionKind agg_type, size_t partition_idx, size_t partition_size, const int32_t *original_rowid_buf, const int64_t *ordered_rowid_buf, const SQLTypeInfo &input_col_ti)
const int64_t * aggregateState() const
std::function< bool(const PermutationIdx, const PermutationIdx)> Comparator
Definition: ResultSet.h:155
WindowFunctionContext & operator=(const WindowFunctionContext &)=delete
SumAndCountPair< int64_t > ** getDerivedAggregationTreesForIntegerTypeWindowExpr() const
static const int NUM_EXECUTION_DEVICES
Executor(const ExecutorId id, Data_Namespace::DataMgr *data_mgr, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:272
llvm::Value * current_partition_start_offset_lv
Definition: WindowContext.h:92
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:43
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
llvm::Value * target_partition_sorted_rowid_ptr_lv
Definition: WindowContext.h:99
std::vector< const int8_t * > window_func_expr_columns_
AggregateTreeForWindowFraming aggregate_trees_
void sortPartition(const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
const std::vector< const int8_t * > & getOrderKeyColumnBuffers() const
std::shared_ptr< HashJoin > partitions_
size_t QueryPlanHash
llvm::Value * null_end_pos_lv
void resizeStorageForWindowFraming(size_t partition_count)
Definition: WindowContext.h:79
const int64_t * partitionNumCountBuf() const
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:122
void resizeStorageForWindowFraming(bool const for_reuse=false)
std::unordered_map< size_t, std::unique_ptr< WindowFunctionContext > > window_contexts_
void computePartitionBuffer(const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
llvm::Value * order_key_buf_ptr_lv
Definition: WindowContext.h:96
int64_t * partition_start_offset_
llvm::Value * int64_t_zero_val_lv
Definition: WindowContext.h:93
std::vector< const int8_t * > order_columns_
const int64_t * sortedPartition() const
const QueryPlanHash computeAggregateTreeCacheKey() const
static void resetWindowFunctionContext(Executor *executor)
std::vector< SQLTypeInfo > order_columns_ti_
std::vector< SumAndCountPair< int64_t > * > derived_aggregate_tree_for_integer_type_
Definition: WindowContext.h:75
const Analyzer::WindowFunction * getWindowFunction() const
const int32_t * payload() const
std::function< WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)> Comparator
size_t * aggregate_trees_depth_
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
llvm::Value * int64_t_one_val_lv
Definition: WindowContext.h:94
llvm::Value * frame_start_bound_expr_lv
Definition: WindowContext.h:88
std::vector< int64_t * > aggregate_tree_for_integer_type_
Definition: WindowContext.h:73
const ExecutorDeviceType device_type_
std::string order_type_col_name
Definition: WindowContext.h:97
void compute(std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, std::unordered_map< QueryPlanHash, std::shared_ptr< std::vector< int64_t >>> &sorted_partition_cache, std::unordered_map< QueryPlanHash, AggregateTreeForWindowFraming > &aggregate_tree_map)
double ** getAggregationTreesForDoubleTypeWindowExpr() const
int64_t * ordered_partition_null_end_pos_