31 const std::shared_ptr<HashJoin>& partitions,
32 const size_t elem_count,
34 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
35 : window_func_(window_func)
36 , partitions_(partitions)
37 , elem_count_(elem_count)
39 , partition_start_(nullptr)
40 , partition_end_(nullptr)
41 , device_type_(device_type)
42 , row_set_mem_owner_(row_set_mem_owner) {}
52 const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
61 std::vector<int64_t> row_numbers(index_size);
62 for (
size_t i = 0;
i < index_size; ++
i) {
63 row_numbers[index[
i]] =
i + 1;
71 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator,
77 return comparator(index[i - 1], index[i]);
83 const size_t index_size,
84 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
85 std::vector<int64_t> rank(index_size);
87 for (
size_t i = 0;
i < index_size; ++
i) {
91 rank[index[
i]] = crt_rank;
99 const size_t index_size,
100 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
101 std::vector<int64_t> dense_rank(index_size);
103 for (
size_t i = 0;
i < index_size; ++
i) {
107 dense_rank[index[
i]] = crt_rank;
114 const int64_t* index,
115 const size_t index_size,
116 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
117 std::vector<double> percent_rank(index_size);
119 for (
size_t i = 0;
i < index_size; ++
i) {
123 percent_rank[index[
i]] =
124 index_size == 1 ? 0 :
static_cast<double>(crt_rank - 1) / (index_size - 1);
131 const int64_t* index,
132 const size_t index_size,
133 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
134 std::vector<double> cume_dist(index_size);
135 size_t start_peer_group = 0;
136 while (start_peer_group < index_size) {
137 size_t end_peer_group = start_peer_group + 1;
138 while (end_peer_group < index_size &&
142 for (
size_t i = start_peer_group;
i < end_peer_group; ++
i) {
143 cume_dist[index[
i]] =
static_cast<double>(end_peer_group) / index_size;
145 start_peer_group = end_peer_group;
152 const size_t index_size,
154 std::vector<int64_t> row_numbers(index_size);
156 throw std::runtime_error(
"NTILE argument cannot be zero");
158 const size_t tile_size = (index_size + n - 1) / n;
159 for (
size_t i = 0;
i < index_size; ++
i) {
160 row_numbers[index[
i]] =
i / tile_size + 1;
175 throw std::runtime_error(
"LAG with non-constant lag argument not supported yet");
177 const auto& lag_ti = lag_constant->get_type_info();
178 switch (lag_ti.get_type()) {
180 return lag_constant->get_constval().smallintval;
183 return lag_constant->get_constval().intval;
186 return lag_constant->get_constval().bigintval;
189 LOG(
FATAL) <<
"Invalid type for the lag argument";
200 if (args.size() == 3) {
201 throw std::runtime_error(
"LAG with default not supported yet");
203 if (args.size() == 2) {
204 const int64_t lag_or_lead =
216 const int32_t* original_indices,
217 const size_t partition_size) {
218 std::vector<int64_t> new_output_for_partition_buff(partition_size);
219 for (
size_t i = 0;
i < partition_size; ++
i) {
220 new_output_for_partition_buff[
i] = original_indices[output_for_partition_buff[
i]];
222 std::copy(new_output_for_partition_buff.begin(),
223 new_output_for_partition_buff.end(),
224 output_for_partition_buff);
229 const int32_t* original_indices,
230 int64_t* sorted_indices,
231 const size_t partition_size) {
232 std::vector<int64_t> lag_sorted_indices(partition_size, -1);
233 for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
234 int64_t lag_idx = idx - lag;
235 if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
238 lag_sorted_indices[idx] = sorted_indices[lag_idx];
240 std::vector<int64_t> lag_original_indices(partition_size);
241 for (
size_t k = 0;
k < partition_size; ++
k) {
242 const auto lag_index = lag_sorted_indices[
k];
243 lag_original_indices[sorted_indices[
k]] =
244 lag_index != -1 ? original_indices[lag_index] : -1;
246 std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
252 int64_t* output_for_partition_buff,
253 const size_t partition_size) {
254 const auto first_value_idx = original_indices[output_for_partition_buff[0]];
256 output_for_partition_buff + partition_size,
263 int64_t* output_for_partition_buff,
264 const size_t partition_size) {
266 original_indices, original_indices + partition_size, output_for_partition_buff);
270 const int8_t* partition_end,
272 const int64_t* index,
273 const size_t index_size,
274 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
275 int64_t partition_end_handle =
reinterpret_cast<int64_t
>(partition_end);
276 for (
size_t i = 0;
i < index_size; ++
i) {
286 return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
294 const int64_t bitset,
299 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
300 for (
auto pending_output_slot : pending_output_slots) {
301 *
reinterpret_cast<T*
>(pending_output_slot) = value;
303 pending_output_slots.clear();
310 const int64_t bitset,
312 apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
317 const int64_t bitset,
319 apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
324 const int64_t bitset,
326 apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
331 const int64_t bitset,
333 apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
338 const int64_t bitset,
343 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
344 for (
auto pending_output_slot : pending_output_slots) {
345 *
reinterpret_cast<double*
>(pending_output_slot) = value;
347 pending_output_slots.clear();
352 const int64_t bitset,
357 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
358 for (
auto pending_output_slot : pending_output_slots) {
359 *
reinterpret_cast<double*
>(pending_output_slot) = value;
361 pending_output_slots.clear();
366 const int64_t bitset,
371 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
372 for (
auto pending_output_slot : pending_output_slots) {
373 *
reinterpret_cast<float*
>(pending_output_slot) = value;
375 pending_output_slots.clear();
380 reinterpret_cast<std::vector<void*>*
>(handle)->push_back(pending_output);
392 switch (window_func->
getKind()) {
413 std::unique_ptr<int64_t[]> scratchpad(
new int64_t[
elem_count_]);
416 auto partition_size =
counts()[
i];
417 if (partition_size == 0) {
420 auto output_for_partition_buff = scratchpad.get() +
offsets()[
i];
422 output_for_partition_buff + partition_size,
424 std::vector<Comparator> comparators;
427 CHECK_EQ(order_keys.size(), collation.size());
428 for (
size_t order_column_idx = 0; order_column_idx <
order_columns_.size();
429 ++order_column_idx) {
431 const auto order_col =
434 const auto& order_col_collation = collation[order_column_idx];
438 order_col_collation.nulls_first);
439 auto comparator = asc_comparator;
440 if (order_col_collation.is_desc) {
441 comparator = [asc_comparator](
const int64_t lhs,
const int64_t rhs) {
442 return asc_comparator(rhs, lhs);
445 comparators.push_back(comparator);
447 const auto col_tuple_comparator = [&comparators](
const int64_t lhs,
449 for (
const auto& comparator : comparators) {
450 if (comparator(lhs, rhs)) {
457 output_for_partition_buff + partition_size,
458 col_tuple_comparator);
463 col_tuple_comparator);
466 off += partition_size;
471 CHECK_EQ(static_cast<size_t>(off), elem_count_);
473 auto output_i64 =
reinterpret_cast<int64_t*
>(
output_);
478 output_i64[
payload()[
i]] = scratchpad[
i];
531 const int32_t* partition_indices,
534 const bool nulls_first) {
535 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
536 const auto lhs_val = values[partition_indices[lhs]];
537 const auto rhs_val = values[partition_indices[rhs]];
539 if (lhs_val == null_val && rhs_val == null_val) {
542 if (lhs_val == null_val && rhs_val != null_val) {
545 if (rhs_val == null_val && lhs_val != null_val) {
548 return lhs_val < rhs_val;
551 template <
class T,
class NullPatternType>
554 const int32_t* partition_indices,
557 const bool nulls_first) {
558 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
559 const auto lhs_val = values[partition_indices[lhs]];
560 const auto rhs_val = values[partition_indices[rhs]];
562 const auto lhs_bit_pattern =
563 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&lhs_val));
564 const auto rhs_bit_pattern =
565 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&rhs_val));
566 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
569 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
572 if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
575 return lhs_val < rhs_val;
580 std::function<bool(const int64_t lhs, const int64_t rhs)>
582 const int8_t* order_column_buffer,
583 const int32_t* partition_indices,
584 const bool nulls_first) {
586 if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
587 switch (ti.get_size()) {
589 return [order_column_buffer, nulls_first, partition_indices, &ti](
590 const int64_t lhs,
const int64_t rhs) {
591 return integer_comparator<int64_t>(
592 order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
596 return [order_column_buffer, nulls_first, partition_indices, &ti](
597 const int64_t lhs,
const int64_t rhs) {
598 return integer_comparator<int32_t>(
599 order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
603 return [order_column_buffer, nulls_first, partition_indices, &ti](
604 const int64_t lhs,
const int64_t rhs) {
605 return integer_comparator<int16_t>(
606 order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
610 return [order_column_buffer, nulls_first, partition_indices, &ti](
611 const int64_t lhs,
const int64_t rhs) {
612 return integer_comparator<int8_t>(
613 order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
617 LOG(
FATAL) <<
"Invalid type size: " << ti.get_size();
622 switch (ti.get_type()) {
624 return [order_column_buffer, nulls_first, partition_indices, &ti](
625 const int64_t lhs,
const int64_t rhs) {
626 return fp_comparator<float, int32_t>(
627 order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
631 return [order_column_buffer, nulls_first, partition_indices, &ti](
632 const int64_t lhs,
const int64_t rhs) {
633 return fp_comparator<double, int64_t>(
634 order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
642 throw std::runtime_error(
"Type not supported yet");
646 int64_t* output_for_partition_buff,
647 const size_t partition_size,
650 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
651 switch (window_func->
getKind()) {
653 const auto row_numbers =
655 std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
660 index_to_rank(output_for_partition_buff, partition_size, comparator);
661 std::copy(rank.begin(), rank.end(), output_for_partition_buff);
665 const auto dense_rank =
667 std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
671 const auto percent_rank =
675 reinterpret_cast<double*
>(may_alias_ptr(output_for_partition_buff)));
679 const auto cume_dist =
683 reinterpret_cast<double*
>(may_alias_ptr(output_for_partition_buff)));
690 const auto ntile =
index_to_ntile(output_for_partition_buff, partition_size, n);
691 std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
697 const auto partition_row_offsets =
payload() + off;
699 lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
703 const auto partition_row_offsets =
payload() + off;
705 partition_row_offsets, output_for_partition_buff, partition_size);
709 const auto partition_row_offsets =
payload() + off;
711 partition_row_offsets, output_for_partition_buff, partition_size);
719 const auto partition_row_offsets =
payload() + off;
722 partitionEnd(), off, output_for_partition_buff, partition_size, comparator);
725 output_for_partition_buff, partition_row_offsets, partition_size);
729 throw std::runtime_error(
"Window function not supported yet: " +
743 checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
745 std::vector<size_t> partition_offsets(partition_count);
749 for (int64_t
i = 0;
i < partition_count - 1; ++
i) {
762 checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
764 std::vector<size_t> partition_offsets(partition_count);
766 auto partition_end_handle =
reinterpret_cast<int64_t
>(
partition_end_);
767 for (int64_t
i = 0;
i < partition_count - 1; ++
i) {
768 if (partition_offsets[
i] == 0) {
779 return reinterpret_cast<const int32_t*
>(
784 return reinterpret_cast<const int32_t*
>(
789 return reinterpret_cast<const int32_t*
>(
796 return partition_count;
800 std::unique_ptr<WindowFunctionContext> window_function_context,
801 const size_t target_index) {
802 const auto it_ok = window_contexts_.emplace(
803 std::make_pair(target_index, std::move(window_function_context)));
809 const size_t target_index)
const {
810 const auto it = window_contexts_.find(target_index);
811 CHECK(it != window_contexts_.end());
812 executor->active_window_function_ = it->second.get();
813 return executor->active_window_function_;
817 executor->active_window_function_ =
nullptr;
821 Executor* executor) {
822 return executor->active_window_function_;
826 executor->window_project_node_context_owned_ =
827 std::make_unique<WindowProjectNodeContext>();
828 return executor->window_project_node_context_owned_.
get();
832 return executor->window_project_node_context_owned_.
get();
836 executor->window_project_node_context_owned_ =
nullptr;
837 executor->active_window_function_ =
nullptr;
SqlWindowFunctionKind getKind() const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
std::string toString(const ExtArgumentType &sig_type)
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
Utility functions for easy access to the result set buffers.
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
bool fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
HOST DEVICE SQLTypes get_type() const
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
void add_window_pending_output(void *pending_output, const int64_t handle)
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
void computePartition(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static const WindowProjectNodeContext * get(Executor *executor)
bool window_function_is_value(const SqlWindowFunctionKind kind)
DEVICE void fill(ARGS &&...args)
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const int64_t * aggregateStateCount() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
bool integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
const int8_t * partitionEnd() const
DEVICE auto copy(ARGS &&...args)
static void reset(Executor *executor)
void setRowNumber(llvm::Value *row_number)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
DEVICE void partial_sum(ARGS &&...args)
void * checked_calloc(const size_t nmemb, const size_t size)
int64_t aggregateStatePendingOutputs() const
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
const int64_t * aggregateState() const
const SQLTypeInfo & get_type_info() const
void fillPartitionStart()
void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
int8_t * partition_start_
DEVICE void iota(ARGS &&...args)
llvm::Value * getRowNumber() const
bool pos_is_set(const int64_t bitset, const int64_t pos)
std::shared_ptr< HashJoin > partitions_
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< HashJoin > &partitions, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const Analyzer::WindowFunction * getWindowFunction() const
const int32_t * payload() const
void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const ExecutorDeviceType device_type_
std::vector< void * > outputs