36 #include <tbb/parallel_sort.h>
38 #include <thrust/sort.h>
52 const size_t elem_count,
54 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
55 : window_func_(window_func)
58 , partitions_(nullptr)
59 , elem_count_(elem_count)
61 , sorted_partition_buf_(nullptr)
63 , aggregate_trees_depth_(nullptr)
64 , ordered_partition_null_start_pos_(nullptr)
65 , ordered_partition_null_end_pos_(nullptr)
66 , partition_start_offset_(nullptr)
67 , partition_start_(nullptr)
68 , partition_end_(nullptr)
69 , device_type_(device_type)
70 , row_set_mem_owner_(row_set_mem_owner)
71 , dummy_count_(elem_count)
73 , dummy_payload_(nullptr) {
97 const std::shared_ptr<HashJoin>& partitions,
98 const size_t elem_count,
100 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
101 size_t aggregation_tree_fan_out)
102 : window_func_(window_func)
103 , partition_cache_key_(partition_cache_key)
105 , partitions_(partitions)
106 , elem_count_(elem_count)
108 , sorted_partition_buf_(nullptr)
109 , aggregate_trees_fan_out_(aggregation_tree_fan_out)
110 , aggregate_trees_depth_(nullptr)
111 , ordered_partition_null_start_pos_(nullptr)
112 , ordered_partition_null_end_pos_(nullptr)
113 , partition_start_offset_(nullptr)
114 , partition_start_(nullptr)
115 , partition_end_(nullptr)
116 , device_type_(device_type)
117 , row_set_mem_owner_(row_set_mem_owner)
118 , dummy_count_(elem_count)
120 , dummy_payload_(nullptr) {
124 reinterpret_cast<int64_t*
>(
checked_calloc(partition_count + 1,
sizeof(int64_t)));
127 reinterpret_cast<size_t*
>(
checked_calloc(partition_count,
sizeof(
size_t)));
129 reinterpret_cast<int64_t*
>(
checked_calloc(partition_count,
sizeof(int64_t)));
131 reinterpret_cast<int64_t*
>(
checked_calloc(partition_count,
sizeof(int64_t)));
158 const int8_t* column,
160 const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
167 const int8_t* column,
168 const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
173 const std::vector<const int8_t*>&
196 std::vector<int64_t> row_numbers(index_size);
197 for (
size_t i = 0; i < index_size; ++i) {
198 row_numbers[index[i]] = i + 1;
206 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator,
207 const int64_t* index,
212 return comparator(index[i - 1], index[i]);
217 const int64_t* index,
218 const size_t index_size,
219 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
220 std::vector<int64_t> rank(index_size);
222 for (
size_t i = 0; i < index_size; ++i) {
226 rank[index[i]] = crt_rank;
233 const int64_t* index,
234 const size_t index_size,
235 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
236 std::vector<int64_t> dense_rank(index_size);
238 for (
size_t i = 0; i < index_size; ++i) {
242 dense_rank[index[i]] = crt_rank;
249 const int64_t* index,
250 const size_t index_size,
251 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
252 std::vector<double> percent_rank(index_size);
254 for (
size_t i = 0; i < index_size; ++i) {
258 percent_rank[index[i]] =
259 index_size == 1 ? 0 :
static_cast<double>(crt_rank - 1) / (index_size - 1);
266 const int64_t* index,
267 const size_t index_size,
268 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
269 std::vector<double> cume_dist(index_size);
270 size_t start_peer_group = 0;
271 while (start_peer_group < index_size) {
272 size_t end_peer_group = start_peer_group + 1;
273 while (end_peer_group < index_size &&
277 for (
size_t i = start_peer_group; i < end_peer_group; ++i) {
278 cume_dist[index[i]] =
static_cast<double>(end_peer_group) / index_size;
280 start_peer_group = end_peer_group;
287 const size_t index_size,
289 std::vector<int64_t> row_numbers(index_size);
291 throw std::runtime_error(
"NTILE argument cannot be zero");
293 const size_t tile_size = (index_size + n - 1) / n;
294 for (
size_t i = 0; i < index_size; ++i) {
295 row_numbers[index[i]] = i / tile_size + 1;
310 throw std::runtime_error(
"LAG with non-constant lag argument not supported yet");
312 const auto& lag_ti = lag_constant->get_type_info();
313 switch (lag_ti.get_type()) {
315 return lag_constant->get_constval().smallintval;
318 return lag_constant->get_constval().intval;
321 return lag_constant->get_constval().bigintval;
324 LOG(
FATAL) <<
"Invalid type for the lag argument";
335 if (args.size() == 3) {
336 throw std::runtime_error(
"LAG with default not supported yet");
338 if (args.size() == 2) {
339 const int64_t lag_or_lead =
350 const size_t partition_size) {
355 : partition_size - 1;
361 const int32_t* original_indices,
362 const size_t partition_size) {
363 std::vector<int64_t> new_output_for_partition_buff(partition_size);
364 for (
size_t i = 0; i < partition_size; ++i) {
365 new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
367 std::copy(new_output_for_partition_buff.begin(),
368 new_output_for_partition_buff.end(),
369 output_for_partition_buff);
374 const int32_t* original_indices,
375 int64_t* sorted_indices,
376 const size_t partition_size) {
377 std::vector<int64_t> lag_sorted_indices(partition_size, -1);
378 for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
379 int64_t lag_idx = idx - lag;
380 if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
383 lag_sorted_indices[idx] = sorted_indices[lag_idx];
385 std::vector<int64_t> lag_original_indices(partition_size);
386 for (
size_t k = 0; k < partition_size; ++k) {
387 const auto lag_index = lag_sorted_indices[k];
388 lag_original_indices[sorted_indices[k]] =
389 lag_index != -1 ? original_indices[lag_index] : -1;
391 std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
395 int64_t* output_for_partition_buff,
396 const size_t partition_size,
397 const size_t target_pos) {
398 CHECK_LT(target_pos, partition_size);
399 const auto target_idx = original_indices[output_for_partition_buff[target_pos]];
401 output_for_partition_buff, output_for_partition_buff + partition_size, target_idx);
405 int64_t* output_for_partition_buff,
406 const size_t partition_size) {
407 for (
size_t i = 0; i < partition_size; i++) {
408 const auto target_idx = original_indices[output_for_partition_buff[i]];
409 output_for_partition_buff[i] = target_idx;
414 const int8_t* partition_end,
416 const int64_t* index,
417 const size_t index_size,
418 const std::function<
bool(
const int64_t lhs,
const int64_t rhs)>& comparator) {
419 int64_t partition_end_handle =
reinterpret_cast<int64_t
>(partition_end);
420 for (
size_t i = 0; i < index_size; ++i) {
430 return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
438 const int64_t bitset,
443 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
444 for (
auto pending_output_slot : pending_output_slots) {
445 *
reinterpret_cast<T*
>(pending_output_slot) = value;
447 pending_output_slots.clear();
454 const int64_t bitset,
456 apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
461 const int64_t bitset,
463 apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
468 const int64_t bitset,
470 apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
475 const int64_t bitset,
477 apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
482 const int64_t bitset,
487 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
488 for (
auto pending_output_slot : pending_output_slots) {
489 *
reinterpret_cast<double*
>(pending_output_slot) = value;
491 pending_output_slots.clear();
496 const int64_t bitset,
501 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
502 for (
auto pending_output_slot : pending_output_slots) {
503 *
reinterpret_cast<double*
>(pending_output_slot) = value;
505 pending_output_slots.clear();
509 const int64_t handle,
511 const int64_t bitset,
516 auto& pending_output_slots = *
reinterpret_cast<std::vector<void*>*
>(handle);
517 for (
auto pending_output_slot : pending_output_slots) {
518 *
reinterpret_cast<float*
>(pending_output_slot) = value;
520 pending_output_slots.clear();
525 const int64_t handle) {
526 reinterpret_cast<std::vector<void*>*
>(handle)->push_back(pending_output);
538 switch (window_func->
getKind()) {
550 std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
551 std::unordered_map<
QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>&
552 sorted_partition_cache,
553 std::unordered_map<size_t, AggregateTreeForWindowFraming>& aggregate_tree_map) {
559 size_t output_buf_sz =
563 const bool is_window_function_aggregate_or_has_framing =
565 if (is_window_function_aggregate_or_has_framing) {
572 std::unique_ptr<int64_t[]> scratchpad;
573 int64_t* intermediate_output_buffer;
574 if (is_window_function_aggregate_or_has_framing) {
575 intermediate_output_buffer =
reinterpret_cast<int64_t*
>(
output_);
579 intermediate_output_buffer = scratchpad.get();
585 auto cached_sorted_partition_it =
587 if (cached_sorted_partition_it != sorted_partition_cache.end()) {
588 auto& sorted_partition = cached_sorted_partition_it->second;
589 VLOG(1) <<
"Reuse cached sorted partition to compute window function context (key: "
593 DEBUG_TIMER(
"Window Function Cached Sorted Partition Copy");
594 std::memcpy(intermediate_output_buffer, sorted_partition->data(), output_buf_sz);
600 const auto sort_partitions = [&](
const size_t start,
const size_t end) {
601 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
603 intermediate_output_buffer +
offsets()[partition_idx],
608 if (should_parallelize) {
609 auto sorted_partition_copy_timer =
610 DEBUG_TIMER(
"Window Function Partition Sorting Parallelized");
613 const tbb::blocked_range<int64_t>& r) {
615 parent_thread_local_ids.setNewThreadId();
616 sort_partitions(r.begin(), r.end());
619 auto sorted_partition_copy_timer =
620 DEBUG_TIMER(
"Window Function Partition Sorting Non-Parallelized");
623 auto sorted_partition_ref_cnt_it =
625 bool can_access_sorted_partition =
626 sorted_partition_ref_cnt_it != sorted_partition_key_ref_count_map.end() &&
627 sorted_partition_ref_cnt_it->second > 1;
632 DEBUG_TIMER(
"Window Function Sorted Partition Copy For Caching");
646 const auto compute_ordered_partition_null_range = [=](
const size_t start,
648 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
653 intermediate_output_buffer +
offsets()[partition_idx]);
658 if (should_parallelize) {
659 auto partition_compuation_timer =
660 DEBUG_TIMER(
"Window Function Ordered-Partition Null-Range Compute");
663 const tbb::blocked_range<int64_t>& r) {
665 parent_thread_local_ids.setNewThreadId();
666 compute_ordered_partition_null_range(r.begin(), r.end());
670 "Window Function Non-Parallelized Ordered-Partition Null-Range Compute");
674 auto const c_it = aggregate_tree_map.find(cache_key);
675 if (c_it != aggregate_tree_map.cend()) {
676 VLOG(1) <<
"Reuse aggregate tree for window function framing";
681 sizeof(
size_t) * partition_count);
684 const auto build_aggregation_tree_for_partitions = [=](
const size_t start,
686 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
692 const auto partition_size =
counts()[partition_idx];
697 intermediate_output_buffer,
702 if (should_parallelize) {
704 "Window Function Parallelized Segment Tree Construction for Partitions");
707 const tbb::blocked_range<int64_t>& r) {
709 parent_thread_local_ids.setNewThreadId();
710 build_aggregation_tree_for_partitions(r.begin(), r.end());
714 "Window Function Non-Parallelized Segment Tree Construction for "
716 build_aggregation_tree_for_partitions(0, partition_count);
720 VLOG(2) <<
"Put aggregate tree for the window framing";
724 const auto compute_partitions = [=](
const size_t start,
const size_t end) {
725 for (
size_t partition_idx = start; partition_idx < end; ++partition_idx) {
727 intermediate_output_buffer +
offsets()[partition_idx],
732 if (should_parallelize) {
733 auto partition_compuation_timer =
DEBUG_TIMER(
"Window Function Partition Compute");
736 const tbb::blocked_range<int64_t>& r) {
738 parent_thread_local_ids.setNewThreadId();
739 compute_partitions(r.begin(), r.end());
742 auto partition_compuation_timer =
743 DEBUG_TIMER(
"Window Function Non-Parallelized Partition Compute");
747 if (is_window_function_aggregate_or_has_framing) {
753 auto output_i64 =
reinterpret_cast<int64_t*
>(
output_);
754 const auto payload_copy = [=](
const size_t start,
const size_t end) {
755 for (
size_t i = start; i < end; ++i) {
756 output_i64[
payload()[i]] = intermediate_output_buffer[i];
759 if (should_parallelize) {
760 auto payload_copy_timer =
761 DEBUG_TIMER(
"Window Function Non-Aggregate Payload Copy Parallelized");
764 const tbb::blocked_range<int64_t>& r) {
766 parent_thread_local_ids.setNewThreadId();
767 payload_copy(r.begin(), r.end());
770 auto payload_copy_timer =
771 DEBUG_TIMER(
"Window Function Non-Aggregate Payload Copy Non-Parallelized");
781 int64_t null_bit_pattern = -1;
783 template <
typename T>
785 IndexPair null_range{std::numeric_limits<int64_t>::max(),
786 std::numeric_limits<int64_t>::min()};
787 auto const null_val = inline_int_null_value<T>();
788 auto const casted_order_col_buf =
reinterpret_cast<T const*
>(order_col_buf);
789 if (casted_order_col_buf[original_col_idx_buf[ordered_col_idx_buf[0]]] == null_val) {
790 int64_t null_range_max = 1;
791 while (null_range_max < partition_size &&
793 [original_col_idx_buf[ordered_col_idx_buf[null_range_max]]] ==
797 null_range.first = 0;
798 null_range.second = null_range_max - 1;
799 }
else if (casted_order_col_buf
800 [original_col_idx_buf[ordered_col_idx_buf[partition_size - 1]]] ==
802 int64_t null_range_min = partition_size - 2;
803 while (null_range_min >= 0 &&
805 [original_col_idx_buf[ordered_col_idx_buf[null_range_min]]] ==
809 null_range.first = null_range_min + 1;
810 null_range.second = partition_size - 1;
815 template <
typename COL_TYPE,
817 std::conditional_t<sizeof(COL_TYPE) == sizeof(int32_t), int32_t, int64_t>>
819 IndexPair null_range{std::numeric_limits<int64_t>::max(),
820 std::numeric_limits<int64_t>::min()};
821 auto const casted_order_col_buf =
reinterpret_cast<COL_TYPE const*
>(order_col_buf);
822 auto check_null_val = [&casted_order_col_buf,
this](
size_t idx) {
823 return *
reinterpret_cast<NULL_TYPE const*
>(
824 may_alias_ptr(&casted_order_col_buf
825 [original_col_idx_buf[ordered_col_idx_buf[idx]]])) ==
828 if (check_null_val(0)) {
829 int64_t null_range_max = 1;
830 while (null_range_max < partition_size && check_null_val(null_range_max)) {
833 null_range.first = 0;
834 null_range.second = null_range_max - 1;
835 }
else if (check_null_val(partition_size - 1)) {
836 int64_t null_range_min = partition_size - 2;
837 while (null_range_min >= 0 && check_null_val(null_range_min)) {
840 null_range.first = null_range_min + 1;
841 null_range.second = partition_size - 1;
850 size_t partition_idx,
851 const int32_t* original_col_idx_buf,
852 const int64_t* ordered_col_idx_buf) {
854 const auto partition_size =
counts()[partition_idx];
855 if (partition_size > 0) {
858 FindNullRange
const null_range_info{
859 original_col_idx_buf, ordered_col_idx_buf, partition_size};
863 null_range_info.find_null_range_int<int64_t>(
order_columns_.front());
867 null_range_info.find_null_range_int<int32_t>(
order_columns_.front());
871 null_range_info.find_null_range_int<int16_t>(
order_columns_.front());
875 null_range_info.find_null_range_int<int8_t>(
order_columns_.front());
880 }
else if (order_col_ti.
is_fp()) {
881 const auto null_bit_pattern =
883 FindNullRange
const null_range_info{
884 original_col_idx_buf, ordered_col_idx_buf, partition_size, null_bit_pattern};
887 null_range = null_range_info.find_null_range_fp<
float>(
order_columns_.front());
890 null_range = null_range_info.find_null_range_fp<
double>(
order_columns_.front());
896 LOG(
FATAL) <<
"Invalid column type for window aggregation over the frame";
904 size_t partition_idx) {
906 std::vector<WindowFunctionContext::Comparator> partition_comparator;
909 CHECK_EQ(order_keys.size(), collation.size());
910 for (
size_t order_column_idx = 0; order_column_idx <
order_columns_.size();
911 ++order_column_idx) {
913 const auto order_col =
916 const auto& order_col_collation = collation[order_column_idx];
920 !order_col_collation.is_desc,
921 order_col_collation.nulls_first);
922 if (order_col_collation.is_desc) {
923 comparator = [comparator](
const int64_t lhs,
const int64_t rhs) {
924 return comparator(rhs, lhs);
927 partition_comparator.push_back(comparator);
929 return partition_comparator;
933 int64_t* output_for_partition_buff,
934 bool should_parallelize) {
935 const size_t partition_size{
static_cast<size_t>(
counts()[partition_idx])};
936 if (partition_size == 0) {
940 output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
942 if (!partition_comparator.empty()) {
943 const auto col_tuple_comparator = [&partition_comparator](
const int64_t lhs,
945 for (
const auto& comparator : partition_comparator) {
946 const auto comparator_result = comparator(lhs, rhs);
947 switch (comparator_result) {
961 if (should_parallelize) {
963 tbb::parallel_sort(output_for_partition_buff,
964 output_for_partition_buff + partition_size,
965 col_tuple_comparator);
968 output_for_partition_buff + partition_size,
969 col_tuple_comparator);
973 output_for_partition_buff + partition_size,
974 col_tuple_comparator);
1033 const int8_t* order_column_buffer,
1035 const int32_t* partition_indices,
1038 const bool asc_ordering,
1039 const bool nulls_first) {
1040 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1041 const auto lhs_val = values[partition_indices[lhs]];
1042 const auto rhs_val = values[partition_indices[rhs]];
1044 if (lhs_val == null_val && rhs_val == null_val) {
1047 if (lhs_val == null_val && rhs_val != null_val) {
1051 if (rhs_val == null_val && lhs_val != null_val) {
1055 if (lhs_val < rhs_val) {
1058 if (lhs_val > rhs_val) {
1066 const int8_t* order_column_buffer,
1068 const int32_t* partition_indices,
1071 const bool asc_ordering,
1072 const bool nulls_first) {
1073 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1074 const auto lhs_val = values[partition_indices[lhs]];
1075 const auto rhs_val = values[partition_indices[rhs]];
1077 if (lhs_val == null_val && rhs_val == null_val) {
1080 if (lhs_val == null_val && rhs_val != null_val) {
1084 if (rhs_val == null_val && lhs_val != null_val) {
1088 if (lhs_val < rhs_val) {
1091 if (lhs_val > rhs_val) {
1097 template <
class T,
class NullPatternType>
1099 const int8_t* order_column_buffer,
1101 const int32_t* partition_indices,
1104 const bool asc_ordering,
1105 const bool nulls_first) {
1106 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1107 const auto lhs_val = values[partition_indices[lhs]];
1108 const auto rhs_val = values[partition_indices[rhs]];
1110 const auto lhs_bit_pattern =
1111 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&lhs_val));
1112 const auto rhs_bit_pattern =
1113 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&rhs_val));
1114 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
1117 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
1121 if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
1125 if (lhs_val < rhs_val) {
1128 if (lhs_val > rhs_val) {
1134 template <
class T,
class NullPatternType>
1136 const int8_t* order_column_buffer,
1138 const int32_t* partition_indices,
1141 const bool asc_ordering,
1142 const bool nulls_first) {
1143 const auto values =
reinterpret_cast<const T*
>(order_column_buffer);
1144 const auto lhs_val = values[partition_indices[lhs]];
1145 const auto rhs_val = values[partition_indices[rhs]];
1147 const auto lhs_bit_pattern =
1148 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&lhs_val));
1149 const auto rhs_bit_pattern =
1150 *
reinterpret_cast<const NullPatternType*
>(may_alias_ptr(&rhs_val));
1151 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
1154 if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
1158 if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
1162 if (lhs_val < rhs_val) {
1165 if (lhs_val > rhs_val) {
1175 const int8_t* order_column_buffer,
1176 const int32_t* partition_indices,
1177 const bool asc_ordering,
1178 const bool nulls_first) {
1180 if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1181 switch (ti.get_size()) {
1183 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1184 const int64_t lhs,
const int64_t rhs) {
1185 return asc_ordering ? integer_comparator_asc<int64_t>(order_column_buffer,
1192 : integer_comparator_desc<int64_t>(order_column_buffer,
1202 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1203 const int64_t lhs,
const int64_t rhs) {
1204 return asc_ordering ? integer_comparator_asc<int32_t>(order_column_buffer,
1211 : integer_comparator_desc<int32_t>(order_column_buffer,
1221 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1222 const int64_t lhs,
const int64_t rhs) {
1223 return asc_ordering ? integer_comparator_asc<int16_t>(order_column_buffer,
1230 : integer_comparator_desc<int16_t>(order_column_buffer,
1240 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1241 const int64_t lhs,
const int64_t rhs) {
1242 return asc_ordering ? integer_comparator_asc<int8_t>(order_column_buffer,
1249 : integer_comparator_desc<int8_t>(order_column_buffer,
1259 LOG(
FATAL) <<
"Invalid type size: " << ti.get_size();
1264 switch (ti.get_type()) {
1266 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1267 const int64_t lhs,
const int64_t rhs) {
1268 return asc_ordering ? fp_comparator_asc<float, int32_t>(order_column_buffer,
1275 : fp_comparator_desc<float, int32_t>(order_column_buffer,
1285 return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1286 const int64_t lhs,
const int64_t rhs) {
1287 return asc_ordering ? fp_comparator_asc<double, int64_t>(order_column_buffer,
1294 : fp_comparator_desc<double, int64_t>(order_column_buffer,
1304 LOG(
FATAL) <<
"Invalid float type";
1308 throw std::runtime_error(
"Type not supported yet");
1312 const size_t partition_idx,
1313 int64_t* output_for_partition_buff,
1315 const size_t partition_size{
static_cast<size_t>(
counts()[partition_idx])};
1316 if (partition_size == 0) {
1319 const auto offset =
offsets()[partition_idx];
1321 const auto col_tuple_comparator = [&partition_comparator](
const int64_t lhs,
1322 const int64_t rhs) {
1323 for (
const auto& comparator : partition_comparator) {
1324 const auto comparator_result = comparator(lhs, rhs);
1325 switch (comparator_result) {
1339 switch (window_func->
getKind()) {
1341 const auto row_numbers =
1343 std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
1348 index_to_rank(output_for_partition_buff, partition_size, col_tuple_comparator);
1349 std::copy(rank.begin(), rank.end(), output_for_partition_buff);
1354 output_for_partition_buff, partition_size, col_tuple_comparator);
1355 std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
1360 output_for_partition_buff, partition_size, col_tuple_comparator);
1363 reinterpret_cast<double*
>(may_alias_ptr(output_for_partition_buff)));
1368 output_for_partition_buff, partition_size, col_tuple_comparator);
1371 reinterpret_cast<double*
>(may_alias_ptr(output_for_partition_buff)));
1378 const auto ntile =
index_to_ntile(output_for_partition_buff, partition_size, n);
1379 std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
1385 const auto partition_row_offsets =
payload() + offset;
1387 lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
1392 const auto target_idx =
1394 const auto partition_row_offsets =
payload() + offset;
1396 partition_row_offsets, output_for_partition_buff, partition_size, target_idx);
1400 auto const n_value_ptr =
1403 auto const n_value =
static_cast<size_t>(n_value_ptr->get_constval().intval);
1404 const auto partition_row_offsets =
payload() + offset;
1405 if (n_value < partition_size) {
1407 partition_row_offsets, output_for_partition_buff, partition_size, n_value);
1414 partition_row_offsets, output_for_partition_buff, partition_size);
1428 const auto partition_row_offsets =
payload() + offset;
1432 output_for_partition_buff,
1434 col_tuple_comparator);
1437 output_for_partition_buff, partition_row_offsets, partition_size);
1441 throw std::runtime_error(
"Window function not supported yet: " +
1457 size_t partition_idx,
1458 size_t partition_size,
1459 const int32_t* original_rowid_buf,
1460 const int64_t* ordered_rowid_buf,
1464 throw QueryNotSupported(
"Window aggregate function over frame on a column type " +
1471 "Aggregation over a window frame for a column type " +
1473 " must use one of the following window aggregate function: MIN / MAX / COUNT");
1480 if (partition_size > 0) {
1483 const int64_t* ordered_rowid_buf_for_partition =
1484 ordered_rowid_buf +
offsets()[partition_idx];
1488 const auto segment_tree = std::make_shared<SegmentTree<int8_t, int64_t>>(
1492 ordered_rowid_buf_for_partition,
1497 segment_tree ? segment_tree->getLeafDepth() : 0;
1500 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1503 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1509 const auto segment_tree = std::make_shared<SegmentTree<int16_t, int64_t>>(
1513 ordered_rowid_buf_for_partition,
1518 segment_tree ? segment_tree->getLeafDepth() : 0;
1521 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1524 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1530 const auto segment_tree = std::make_shared<SegmentTree<int32_t, int64_t>>(
1534 ordered_rowid_buf_for_partition,
1539 segment_tree ? segment_tree->getLeafDepth() : 0;
1542 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1545 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1553 const auto segment_tree = std::make_shared<SegmentTree<int64_t, int64_t>>(
1557 ordered_rowid_buf_for_partition,
1562 segment_tree ? segment_tree->getLeafDepth() : 0;
1565 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1568 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1574 const auto segment_tree =
1578 ordered_rowid_buf_for_partition,
1583 segment_tree ? segment_tree->getLeafDepth() : 0;
1586 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1589 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1595 const auto segment_tree =
1599 ordered_rowid_buf_for_partition,
1604 segment_tree ? segment_tree->getLeafDepth() : 0;
1607 segment_tree ? segment_tree->getDerivedAggregatedValues() :
nullptr;
1610 segment_tree ? segment_tree->getAggregatedValues() :
nullptr;
1684 auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1697 for (int64_t i = 0; i < partition_count - 1; ++i) {
1702 std::vector<size_t> partition_offsets(partition_count);
1704 for (int64_t i = 0; i < partition_count - 1; ++i) {
1717 auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1722 auto partition_end_handle =
reinterpret_cast<int64_t
>(
partition_end_);
1729 for (int64_t i = 0; i < partition_count - 1; ++i) {
1740 std::vector<size_t> partition_offsets(partition_count);
1742 for (int64_t i = 0; i < partition_count - 1; ++i) {
1743 if (partition_offsets[i] == 0) {
1756 return reinterpret_cast<const int32_t*
>(
1765 return reinterpret_cast<const int32_t*
>(
1773 return reinterpret_cast<const int32_t*
>(
1783 return partition_count;
1804 boost::hash_combine(cache_key, order_entry.toString());
1810 std::unique_ptr<WindowFunctionContext> window_function_context,
1811 const size_t target_index) {
1812 const auto it_ok = window_contexts_.emplace(
1813 std::make_pair(target_index, std::move(window_function_context)));
1814 CHECK(it_ok.second);
1819 const size_t target_index)
const {
1820 const auto it = window_contexts_.find(target_index);
1821 CHECK(it != window_contexts_.end());
1822 executor->active_window_function_ = it->second.get();
1823 return executor->active_window_function_;
1827 executor->active_window_function_ =
nullptr;
1831 Executor* executor) {
1832 return executor->active_window_function_;
1836 executor->window_project_node_context_owned_ =
1837 std::make_unique<WindowProjectNodeContext>();
1838 return executor->window_project_node_context_owned_.
get();
1842 return executor->window_project_node_context_owned_.
get();
1846 executor->window_project_node_context_owned_ =
nullptr;
1847 executor->active_window_function_ =
nullptr;
size_t getAggregateTreeFanout() const
int32_t const * original_col_idx_buf
bool g_enable_parallel_window_partition_sort
std::vector< SumAndCountPair< double > * > derived_aggregate_tree_for_double_type_
SqlWindowFunctionKind getKind() const
void addOrderColumn(const int8_t *column, const SQLTypeInfo &ti, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
HOST DEVICE int get_size() const
WindowFunctionContext::WindowComparatorResult fp_comparator_asc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
int64_t * ordered_partition_null_start_pos_
RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
const int32_t dummy_count_
std::vector< double * > aggregate_tree_for_double_type_
size_t get_target_idx_for_first_or_last_value_func(const Analyzer::WindowFunction *window_func, const size_t partition_size)
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
RUNTIME_EXPORT void add_window_pending_output(void *pending_output, const int64_t handle)
bool hasAggregateTreeRequiredWindowFunc() const
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
int64_t * getNullValueEndPos() const
Utility functions for easy access to the result set buffers.
bool is_time_or_date() const
const int32_t dummy_offset_
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool asc_ordering, const bool nulls_first)
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< SQLTypeInfo > & getOrderKeyColumnBufferTypes() const
void setSortedPartitionCacheKey(QueryPlanHash cache_key)
void computeNullRangeOfSortedPartition(const SQLTypeInfo &order_col_ti, size_t partition_idx, const int32_t *original_col_idx_buf, const int64_t *ordered_col_idx_buf)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
static WindowProjectNodeContext * create(Executor *executor)
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
Constants for Builtin SQL Types supported by HEAVY.AI.
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t g_parallel_window_partition_compute_threshold
HOST DEVICE SQLTypes get_type() const
size_t g_parallel_window_partition_sort_threshold
RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
const bool needsToBuildAggregateTree() const
size_t * getAggregateTreeDepth() const
int64_t ** getAggregationTreesForIntegerTypeWindowExpr() const
const std::vector< const int8_t * > & getColumnBufferForWindowFunctionExpressions() const
int64_t * getNullValueStartPos() const
SumAndCountPair< double > ** getDerivedAggregationTreesForDoubleTypeWindowExpr() const
const int64_t * partitionStartOffset() const
std::vector< std::shared_ptr< void > > segment_trees_owned_
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
int32_t const partition_size
static const WindowProjectNodeContext * get(Executor *executor)
size_t g_window_function_aggregation_tree_fanout
DEVICE void fill(ARGS &&...args)
const int64_t * aggregateStateCount() const
std::vector< Comparator > createComparator(size_t partition_idx)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
QueryPlanHash sorted_partition_cache_key_
void apply_nth_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size, const size_t target_pos)
const int8_t * partitionEnd() const
void * checked_malloc(const size_t size)
DEVICE auto copy(ARGS &&...args)
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > window_func_expr_columns_owner_
void addColumnBufferForWindowFunctionExpression(const int8_t *column, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
size_t * aggregate_trees_depth_
DEVICE void partial_sum(ARGS &&...args)
void * checked_calloc(const size_t nmemb, const size_t size)
int64_t aggregateStatePendingOutputs() const
std::string toString(const ExecutorDeviceType &device_type)
void buildAggregationTreeForPartition(SqlWindowFunctionKind agg_type, size_t partition_idx, size_t partition_size, const int32_t *original_rowid_buf, const int64_t *ordered_rowid_buf, const SQLTypeInfo &input_col_ti)
RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const int64_t * aggregateState() const
size_t aggregate_trees_fan_out_
RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const SQLTypeInfo & get_type_info() const
std::pair< int64_t, int64_t > IndexPair
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
SumAndCountPair< int64_t > ** getDerivedAggregationTreesForIntegerTypeWindowExpr() const
void fillPartitionStart()
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
int8_t * partition_start_
std::vector< const int8_t * > window_func_expr_columns_
SQLTypes get_int_type_by_size(size_t const nbytes)
int64_t const * ordered_col_idx_buf
AggregateTreeForWindowFraming aggregate_trees_
void sortPartition(const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
DEVICE void iota(ARGS &&...args)
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
bool pos_is_set(const int64_t bitset, const int64_t pos)
const std::vector< const int8_t * > & getOrderKeyColumnBuffers() const
std::shared_ptr< HashJoin > partitions_
void resizeStorageForWindowFraming(size_t partition_count)
const int64_t * partitionNumCountBuf() const
RUNTIME_EXPORT void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
void apply_original_index_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void resizeStorageForWindowFraming(bool const for_reuse=false)
WindowFunctionContext::WindowComparatorResult integer_comparator_desc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
void computePartitionBuffer(const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
int64_t * partition_start_offset_
#define DEBUG_TIMER(name)
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
const int64_t * sortedPartition() const
const QueryPlanHash computeAggregateTreeCacheKey() const
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::vector< SQLTypeInfo > order_columns_ti_
std::vector< SumAndCountPair< int64_t > * > derived_aggregate_tree_for_integer_type_
const Analyzer::WindowFunction * getWindowFunction() const
const int32_t * payload() const
IndexPair find_null_range_int(int8_t const *order_col_buf) const
std::function< WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)> Comparator
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
size_t * aggregate_trees_depth_
RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
WindowFunctionContext::WindowComparatorResult fp_comparator_desc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
std::vector< int64_t * > aggregate_tree_for_integer_type_
WindowFunctionContext::WindowComparatorResult integer_comparator_asc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
IndexPair find_null_range_fp(int8_t const *order_col_buf) const
ThreadLocalIds thread_local_ids()
const ExecutorDeviceType device_type_
void compute(std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, std::unordered_map< QueryPlanHash, std::shared_ptr< std::vector< int64_t >>> &sorted_partition_cache, std::unordered_map< QueryPlanHash, AggregateTreeForWindowFraming > &aggregate_tree_map)
double ** getAggregationTreesForDoubleTypeWindowExpr() const
int64_t * ordered_partition_null_end_pos_
std::vector< void * > outputs