OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowContext.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Analyzer/Analyzer.h"
20 #include "DataMgr/Chunk/Chunk.h"
22 
23 #include <functional>
24 #include <unordered_map>
25 
26 // Returns true for value window functions, false otherwise.
28  switch (kind) {
33  return true;
34  }
35  default: {
36  return false;
37  }
38  }
39 }
40 
41 // Returns true for aggregate window functions, false otherwise.
43  switch (kind) {
50  return true;
51  }
52  default: {
53  return false;
54  }
55  }
56 }
57 
58 class Executor;
59 
60 // Per-window function context which encapsulates the logic for computing the various
61 // window function kinds and keeps ownership of buffers which contain the results. For
62 // rank functions, the code generated for the projection simply reads the values and
63 // writes them to the result set. For value and aggregate functions, only the iteration
64 // order is written to the buffer, the rest is handled by generating code in a similar way
65 // we do for non-window queries.
67  public:
68  // non-partitioned version
70  const size_t elem_count,
71  const ExecutorDeviceType device_type,
72  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner);
73 
74  // partitioned version
76  const std::shared_ptr<HashJoin>& partitions,
77  const size_t elem_count,
78  const ExecutorDeviceType device_type,
79  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner);
80 
82 
84 
86 
87  // Adds the order column buffer to the context and keeps ownership of it.
88  void addOrderColumn(const int8_t* column,
89  const Analyzer::ColumnVar* col_var,
90  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner);
91 
92  // Computes the window function result to be used during the actual projection query.
93  void compute();
94 
95  // Returns a pointer to the window function associated with this context.
97 
98  // Returns a pointer to the output buffer of the window function result.
99  const int8_t* output() const;
100 
101  // Returns a pointer to the value field of the aggregation state.
102  const int64_t* aggregateState() const;
103 
104  // Returns a pointer to the count field of the aggregation state.
105  const int64_t* aggregateStateCount() const;
106 
107  // Returns a handle to the pending outputs for the aggregate window function.
108  int64_t aggregateStatePendingOutputs() const;
109 
110  // Returns a pointer to the partition start bitmap.
111  const int8_t* partitionStart() const;
112 
113  // Returns a pointer to the partition end bitmap.
114  const int8_t* partitionEnd() const;
115 
116  // Returns the element count in the columns used by the window function.
117  size_t elementCount() const;
118 
119  using Comparator = std::function<bool(const int64_t lhs, const int64_t rhs)>;
120 
121  private:
122  // State for a window aggregate. The count field is only used for average.
123  struct AggregateState {
124  int64_t val;
125  int64_t count;
126  std::vector<void*> outputs;
127  llvm::Value* row_number = nullptr;
128  };
129 
130  static Comparator makeComparator(const Analyzer::ColumnVar* col_var,
131  const int8_t* partition_values,
132  const int32_t* partition_indices,
133  const bool nulls_first);
134 
135  void computePartition(const size_t partition_idx, int64_t* output_for_partition_buff);
136 
138  int64_t* output_for_partition_buff,
139  const size_t partition_size,
140  const size_t off,
141  const Analyzer::WindowFunction* window_func,
142  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator);
143 
144  void fillPartitionStart();
145 
146  void fillPartitionEnd();
147 
148  const int32_t* payload() const;
149 
150  const int32_t* offsets() const;
151 
152  const int32_t* counts() const;
153 
154  size_t partitionCount() const;
155 
157  // Keeps ownership of order column.
158  std::vector<std::vector<std::shared_ptr<Chunk_NS::Chunk>>> order_columns_owner_;
159  // Order column buffers.
160  std::vector<const int8_t*> order_columns_;
161  // Hash table which contains the partitions specified by the window.
162  std::shared_ptr<HashJoin> partitions_;
163  // The number of elements in the table.
164  size_t elem_count_;
165  // The output of the window function.
166  int8_t* output_;
167  // Markers for partition start used to reinitialize state for aggregate window
168  // functions.
170  // Markers for partition start used to reinitialize state for aggregate window
171  // functions.
172  int8_t* partition_end_;
173  // State for aggregate function over a window.
176  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner_;
177 
178  // For use when we do not have partitions_ hash table
179  const int32_t dummy_count_;
180  const int32_t dummy_offset_;
181  // dummy_payload_ is only initialized if there is no partitions_ hash table
182  // TODO(todd): There is no need for index buffer for non-partitioned
183  // window functions, as the row to index mapping is the identity function,
184  // so refactor makeComparator and ilk to allow for this
185  int32_t* dummy_payload_;
186 };
187 
188 // Keeps track of the multiple window functions in a window query.
190  public:
192  std::unique_ptr<WindowFunctionContext> window_function_context,
193  const size_t target_index);
194 
195  // Marks the window function at the given target index as active. This simplifies the
196  // code generation since it's now context sensitive. Each value window function can have
197  // its own iteration order, therefore fetching a column at a given position changes
198  // depending on which window function is active.
200  Executor* executor,
201  const size_t target_index) const;
202 
203  // Resets the active window function, which restores the regular (non-window) codegen
204  // behavior.
205  static void resetWindowFunctionContext(Executor* executor);
206 
207  // Gets the current active window function.
208  static WindowFunctionContext* getActiveWindowFunctionContext(Executor* executor);
209 
210  // Creates the context for a window function execution unit.
211  static WindowProjectNodeContext* create(Executor* executor);
212 
213  // Retrieves the context for the active window function execution unit.
214  static const WindowProjectNodeContext* get(Executor* executor);
215 
216  // Resets the active context.
217  static void reset(Executor* executor);
218 
219  private:
220  // A map from target index to the context associated with the window function at that
221  // target index.
222  std::unordered_map<size_t, std::unique_ptr<WindowFunctionContext>> window_contexts_;
223 };
224 
226 
Defines data structures for the semantic analysis phase of query processing.
void computePartitionBuffer(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
const int32_t dummy_count_
ExecutorDeviceType
const int32_t dummy_offset_
const int8_t * partitionStart() const
void computePartition(const size_t partition_idx, int64_t *output_for_partition_buff)
std::function< bool(const int64_t lhs, const int64_t rhs)> Comparator
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
const int32_t * offsets() const
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
size_t partitionCount() const
AggregateState aggregate_state_
bool window_function_is_value(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:27
const int64_t * aggregateStateCount() const
const int8_t * partitionEnd() const
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
int64_t aggregateStatePendingOutputs() const
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
const int64_t * aggregateState() const
WindowFunctionContext & operator=(const WindowFunctionContext &)=delete
Executor(const ExecutorId id, Data_Namespace::DataMgr *data_mgr, const size_t block_size_x, const size_t grid_size_x, const size_t max_gpu_slab_size, const std::string &debug_dir, const std::string &debug_file)
Definition: Execute.cpp:156
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
std::shared_ptr< HashJoin > partitions_
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:83
std::unordered_map< size_t, std::unique_ptr< WindowFunctionContext > > window_contexts_
std::vector< const int8_t * > order_columns_
static void resetWindowFunctionContext(Executor *executor)
const Analyzer::WindowFunction * getWindowFunction() const
const int32_t * payload() const
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
const ExecutorDeviceType device_type_