OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <numeric>
20 
22 #include "QueryEngine/Execute.h"
28 #include "Shared/Intervals.h"
29 #include "Shared/checked_alloc.h"
30 #include "Shared/funcannotations.h"
31 #include "Shared/sqltypes.h"
32 #include "Shared/threading.h"
33 
34 #ifdef HAVE_TBB
35 //#include <tbb/parallel_for.h>
36 #include <tbb/parallel_sort.h>
37 #else
38 #include <thrust/sort.h>
39 #endif
40 
43 
46 
48 
49 // Non-partitioned version (no hash table provided)
51  const Analyzer::WindowFunction* window_func,
52  const size_t elem_count,
53  const ExecutorDeviceType device_type,
54  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
55  : window_func_(window_func)
56  , partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
57  , sorted_partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
58  , partitions_(nullptr)
59  , elem_count_(elem_count)
60  , output_(nullptr)
61  , sorted_partition_buf_(nullptr)
62  , aggregate_trees_fan_out_(g_window_function_aggregation_tree_fanout)
63  , aggregate_trees_depth_(nullptr)
64  , ordered_partition_null_start_pos_(nullptr)
65  , ordered_partition_null_end_pos_(nullptr)
66  , partition_start_offset_(nullptr)
67  , partition_start_(nullptr)
68  , partition_end_(nullptr)
69  , device_type_(device_type)
70  , row_set_mem_owner_(row_set_mem_owner)
71  , dummy_count_(elem_count)
72  , dummy_offset_(0)
73  , dummy_payload_(nullptr) {
74  CHECK_LE(elem_count_, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
76  reinterpret_cast<int32_t*>(checked_malloc(elem_count_ * sizeof(int32_t)));
78  if (window_func_->hasFraming()) {
79  // in this case, we consider all rows of the row belong to the same and only
80  // existing partition
82  reinterpret_cast<int64_t*>(checked_calloc(2, sizeof(int64_t)));
84  aggregate_trees_depth_ = reinterpret_cast<size_t*>(checked_calloc(1, sizeof(size_t)));
86  reinterpret_cast<int64_t*>(checked_calloc(1, sizeof(int64_t)));
88  reinterpret_cast<int64_t*>(checked_calloc(1, sizeof(int64_t)));
89  }
90 }
91 
92 // Partitioned version
94  const Analyzer::WindowFunction* window_func,
95  QueryPlanHash partition_cache_key,
96  const std::shared_ptr<HashJoin>& partitions,
97  const size_t elem_count,
98  const ExecutorDeviceType device_type,
99  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
100  size_t aggregation_tree_fan_out)
101  : window_func_(window_func)
102  , partition_cache_key_(partition_cache_key)
103  , sorted_partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
104  , partitions_(partitions)
105  , elem_count_(elem_count)
106  , output_(nullptr)
107  , sorted_partition_buf_(nullptr)
108  , aggregate_trees_fan_out_(aggregation_tree_fan_out)
109  , aggregate_trees_depth_(nullptr)
110  , ordered_partition_null_start_pos_(nullptr)
111  , ordered_partition_null_end_pos_(nullptr)
112  , partition_start_offset_(nullptr)
113  , partition_start_(nullptr)
114  , partition_end_(nullptr)
115  , device_type_(device_type)
116  , row_set_mem_owner_(row_set_mem_owner)
117  , dummy_count_(elem_count)
118  , dummy_offset_(0)
119  , dummy_payload_(nullptr) {
120  CHECK(partitions_); // This version should have hash table
121  size_t partition_count = partitionCount();
123  reinterpret_cast<int64_t*>(checked_calloc(partition_count + 1, sizeof(int64_t)));
124  if (window_func_->hasFraming()) {
126  reinterpret_cast<size_t*>(checked_calloc(partition_count, sizeof(size_t)));
128  reinterpret_cast<int64_t*>(checked_calloc(partition_count, sizeof(int64_t)));
130  reinterpret_cast<int64_t*>(checked_calloc(partition_count, sizeof(int64_t)));
131  }
132  // the first partition starts at zero position
133  std::partial_sum(counts(), counts() + partition_count, partition_start_offset_ + 1);
134 }
135 
137  free(partition_start_);
138  free(partition_end_);
139  if (dummy_payload_) {
140  free(dummy_payload_);
141  }
144  }
147  }
150  }
153  }
154 }
155 
157  const int8_t* column,
158  const SQLTypeInfo& ti,
159  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
160  order_columns_owner_.push_back(chunks_owner);
161  order_columns_.push_back(column);
162  order_columns_ti_.push_back(ti);
163 }
164 
166  const int8_t* column,
167  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
168  window_func_expr_columns_owner_.push_back(chunks_owner);
169  window_func_expr_columns_.push_back(column);
170 };
171 
172 const std::vector<const int8_t*>&
175 }
176 
177 const std::vector<const int8_t*>& WindowFunctionContext::getOrderKeyColumnBuffers()
178  const {
179  return order_columns_;
180 }
181 
183  const {
184  return order_columns_ti_;
185 }
186 
188  sorted_partition_cache_key_ = cache_key;
189 }
190 
191 namespace {
192 
193 // Converts the sorted indices to a mapping from row position to row number.
194 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
195  std::vector<int64_t> row_numbers(index_size);
196  for (size_t i = 0; i < index_size; ++i) {
197  row_numbers[index[i]] = i + 1;
198  }
199  return row_numbers;
200 }
201 
202 // Returns true iff the current element is greater than the previous, according to the
203 // comparator. This is needed because peer rows have to have the same rank.
205  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
206  const int64_t* index,
207  const size_t i) {
208  if (i == 0) {
209  return false;
210  }
211  return comparator(index[i - 1], index[i]);
212 }
213 
214 // Computes the mapping from row position to rank.
215 std::vector<int64_t> index_to_rank(
216  const int64_t* index,
217  const size_t index_size,
218  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
219  std::vector<int64_t> rank(index_size);
220  size_t crt_rank = 1;
221  for (size_t i = 0; i < index_size; ++i) {
222  if (advance_current_rank(comparator, index, i)) {
223  crt_rank = i + 1;
224  }
225  rank[index[i]] = crt_rank;
226  }
227  return rank;
228 }
229 
230 // Computes the mapping from row position to dense rank.
231 std::vector<int64_t> index_to_dense_rank(
232  const int64_t* index,
233  const size_t index_size,
234  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
235  std::vector<int64_t> dense_rank(index_size);
236  size_t crt_rank = 1;
237  for (size_t i = 0; i < index_size; ++i) {
238  if (advance_current_rank(comparator, index, i)) {
239  ++crt_rank;
240  }
241  dense_rank[index[i]] = crt_rank;
242  }
243  return dense_rank;
244 }
245 
246 // Computes the mapping from row position to percent rank.
247 std::vector<double> index_to_percent_rank(
248  const int64_t* index,
249  const size_t index_size,
250  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
251  std::vector<double> percent_rank(index_size);
252  size_t crt_rank = 1;
253  for (size_t i = 0; i < index_size; ++i) {
254  if (advance_current_rank(comparator, index, i)) {
255  crt_rank = i + 1;
256  }
257  percent_rank[index[i]] =
258  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
259  }
260  return percent_rank;
261 }
262 
263 // Computes the mapping from row position to cumulative distribution.
264 std::vector<double> index_to_cume_dist(
265  const int64_t* index,
266  const size_t index_size,
267  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
268  std::vector<double> cume_dist(index_size);
269  size_t start_peer_group = 0;
270  while (start_peer_group < index_size) {
271  size_t end_peer_group = start_peer_group + 1;
272  while (end_peer_group < index_size &&
273  !advance_current_rank(comparator, index, end_peer_group)) {
274  ++end_peer_group;
275  }
276  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
277  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
278  }
279  start_peer_group = end_peer_group;
280  }
281  return cume_dist;
282 }
283 
284 // Computes the mapping from row position to the n-tile statistic.
285 std::vector<int64_t> index_to_ntile(const int64_t* index,
286  const size_t index_size,
287  const size_t n) {
288  std::vector<int64_t> row_numbers(index_size);
289  if (!n) {
290  throw std::runtime_error("NTILE argument cannot be zero");
291  }
292  const size_t tile_size = (index_size + n - 1) / n;
293  for (size_t i = 0; i < index_size; ++i) {
294  row_numbers[index[i]] = i / tile_size + 1;
295  }
296  return row_numbers;
297 }
298 
299 // The element size in the result buffer for the given window function kind. Currently
300 // it's always 8.
302  return 8;
303 }
304 
305 // Extracts the integer constant from a constant expression.
307  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
308  if (!lag_constant) {
309  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
310  }
311  const auto& lag_ti = lag_constant->get_type_info();
312  switch (lag_ti.get_type()) {
313  case kSMALLINT: {
314  return lag_constant->get_constval().smallintval;
315  }
316  case kINT: {
317  return lag_constant->get_constval().intval;
318  }
319  case kBIGINT: {
320  return lag_constant->get_constval().bigintval;
321  }
322  default: {
323  LOG(FATAL) << "Invalid type for the lag argument";
324  }
325  }
326  return 0;
327 }
328 
329 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
331  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
332  window_func->getKind() == SqlWindowFunctionKind::LEAD);
333  const auto& args = window_func->getArgs();
334  if (args.size() == 3) {
335  throw std::runtime_error("LAG with default not supported yet");
336  }
337  if (args.size() == 2) {
338  const int64_t lag_or_lead =
339  static_cast<int64_t>(get_int_constant_from_expr(args[1].get()));
340  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
341  : -lag_or_lead;
342  }
343  CHECK_EQ(args.size(), size_t(1));
344  return window_func->getKind() == SqlWindowFunctionKind::LAG ? 1 : -1;
345 }
346 
347 // Redistributes the original_indices according to the permutation given by
348 // output_for_partition_buff, reusing it as an output buffer.
349 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
350  const int32_t* original_indices,
351 
352  const size_t partition_size) {
353  std::vector<int64_t> new_output_for_partition_buff(partition_size);
354  for (size_t i = 0; i < partition_size; ++i) {
355  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
356  }
357  std::copy(new_output_for_partition_buff.begin(),
358  new_output_for_partition_buff.end(),
359  output_for_partition_buff);
360 }
361 
362 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
363 void apply_lag_to_partition(const int64_t lag,
364  const int32_t* original_indices,
365  int64_t* sorted_indices,
366  const size_t partition_size) {
367  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
368  for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
369  int64_t lag_idx = idx - lag;
370  if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
371  continue;
372  }
373  lag_sorted_indices[idx] = sorted_indices[lag_idx];
374  }
375  std::vector<int64_t> lag_original_indices(partition_size);
376  for (size_t k = 0; k < partition_size; ++k) {
377  const auto lag_index = lag_sorted_indices[k];
378  lag_original_indices[sorted_indices[k]] =
379  lag_index != -1 ? original_indices[lag_index] : -1;
380  }
381  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
382 }
383 
384 // Computes first value function for the given output_for_partition_buff, reusing it as an
385 // output buffer.
386 void apply_first_value_to_partition(const int32_t* original_indices,
387  int64_t* output_for_partition_buff,
388  const size_t partition_size) {
389  const auto first_value_idx = original_indices[output_for_partition_buff[0]];
390  std::fill(output_for_partition_buff,
391  output_for_partition_buff + partition_size,
392  first_value_idx);
393 }
394 
395 // Computes last value function for the given output_for_partition_buff, reusing it as an
396 // output buffer.
397 void apply_last_value_to_partition(const int32_t* original_indices,
398  int64_t* output_for_partition_buff,
399  const size_t partition_size) {
400  std::copy(
401  original_indices, original_indices + partition_size, output_for_partition_buff);
402 }
403 
405  const int8_t* partition_end,
406  const size_t off,
407  const int64_t* index,
408  const size_t index_size,
409  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
410  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
411  for (size_t i = 0; i < index_size; ++i) {
412  if (advance_current_rank(comparator, index, i)) {
413  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0);
414  }
415  }
416  CHECK(index_size);
417  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0);
418 }
419 
420 bool pos_is_set(const int64_t bitset, const int64_t pos) {
421  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
422 }
423 
424 // Write value to pending integer outputs collected for all the peer rows. The end of
425 // groups is represented by the bitset.
426 template <class T>
427 void apply_window_pending_outputs_int(const int64_t handle,
428  const int64_t value,
429  const int64_t bitset,
430  const int64_t pos) {
431  if (!pos_is_set(bitset, pos)) {
432  return;
433  }
434  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
435  for (auto pending_output_slot : pending_output_slots) {
436  *reinterpret_cast<T*>(pending_output_slot) = value;
437  }
438  pending_output_slots.clear();
439 }
440 
441 } // namespace
442 
443 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle,
444  const int64_t value,
445  const int64_t bitset,
446  const int64_t pos) {
447  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
448 }
449 
450 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle,
451  const int64_t value,
452  const int64_t bitset,
453  const int64_t pos) {
454  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
455 }
456 
457 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle,
458  const int64_t value,
459  const int64_t bitset,
460  const int64_t pos) {
461  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
462 }
463 
464 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle,
465  const int64_t value,
466  const int64_t bitset,
467  const int64_t pos) {
468  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
469 }
470 
471 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle,
472  const double value,
473  const int64_t bitset,
474  const int64_t pos) {
475  if (!pos_is_set(bitset, pos)) {
476  return;
477  }
478  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
479  for (auto pending_output_slot : pending_output_slots) {
480  *reinterpret_cast<double*>(pending_output_slot) = value;
481  }
482  pending_output_slots.clear();
483 }
484 
485 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle,
486  const float value,
487  const int64_t bitset,
488  const int64_t pos) {
489  if (!pos_is_set(bitset, pos)) {
490  return;
491  }
492  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
493  for (auto pending_output_slot : pending_output_slots) {
494  *reinterpret_cast<double*>(pending_output_slot) = value;
495  }
496  pending_output_slots.clear();
497 }
498 
500  const int64_t handle,
501  const float value,
502  const int64_t bitset,
503  const int64_t pos) {
504  if (!pos_is_set(bitset, pos)) {
505  return;
506  }
507  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
508  for (auto pending_output_slot : pending_output_slots) {
509  *reinterpret_cast<float*>(pending_output_slot) = value;
510  }
511  pending_output_slots.clear();
512 }
513 
514 // Add a pending output slot to be written back at the end of a peer row group.
515 extern "C" RUNTIME_EXPORT void add_window_pending_output(void* pending_output,
516  const int64_t handle) {
517  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
518 }
519 
520 // Returns true iff the aggregate window function requires special multiplicity handling
521 // to ensure that peer rows have the same value for the window function.
523  if (!window_function_is_aggregate(window_func->getKind())) {
524  return false;
525  }
526  if (window_func->getOrderKeys().empty()) {
527  return true;
528  }
529  switch (window_func->getKind()) {
532  return false;
533  }
534  default: {
535  return true;
536  }
537  }
538 }
539 
541  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
542  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>&
543  sorted_partition_cache) {
544  auto timer = DEBUG_TIMER(__func__);
545  CHECK(!output_);
546  if (elem_count_ == 0) {
547  return;
548  }
549  size_t output_buf_sz =
551  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(output_buf_sz,
552  /*thread_idx=*/0));
553  const bool is_window_function_aggregate_or_has_framing =
555  if (is_window_function_aggregate_or_has_framing) {
560  }
561  }
562  std::unique_ptr<int64_t[]> scratchpad;
563  int64_t* intermediate_output_buffer;
564  if (is_window_function_aggregate_or_has_framing) {
565  intermediate_output_buffer = reinterpret_cast<int64_t*>(output_);
566  } else {
567  output_buf_sz = sizeof(int64_t) * elem_count_;
568  scratchpad.reset(new int64_t[elem_count_]);
569  intermediate_output_buffer = scratchpad.get();
570  }
571  const bool should_parallelize{g_enable_parallel_window_partition_compute &&
572  elem_count_ >=
574 
575  auto cached_sorted_partition_it =
576  sorted_partition_cache.find(sorted_partition_cache_key_);
577  if (cached_sorted_partition_it != sorted_partition_cache.end()) {
578  auto& sorted_partition = cached_sorted_partition_it->second;
579  VLOG(1) << "Reuse cached sorted partition to compute window function context (key: "
581  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
582  << ")";
583  DEBUG_TIMER("Window Function Cached Sorted Partition Copy");
584  std::memcpy(intermediate_output_buffer, sorted_partition->data(), output_buf_sz);
585  if (window_func_->hasFraming()) {
586  sorted_partition_buf_ = sorted_partition;
587  }
588  } else {
589  // ordering partitions if necessary
590  const auto sort_partitions = [&](const size_t start, const size_t end) {
591  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
592  sortPartition(partition_idx,
593  intermediate_output_buffer + offsets()[partition_idx],
594  should_parallelize);
595  }
596  };
597 
598  if (should_parallelize) {
599  auto sorted_partition_copy_timer =
600  DEBUG_TIMER("Window Function Partition Sorting Parallelized");
601  threading::task_group thread_pool;
602  for (auto interval : makeIntervals<size_t>(0, partitionCount(), cpu_threads())) {
603  thread_pool.run([=] { sort_partitions(interval.begin, interval.end); });
604  }
605  thread_pool.wait();
606  } else {
607  auto sorted_partition_copy_timer =
608  DEBUG_TIMER("Window Function Partition Sorting Non-Parallelized");
609  sort_partitions(0, partitionCount());
610  }
611  auto sorted_partition_ref_cnt_it =
612  sorted_partition_key_ref_count_map.find(sorted_partition_cache_key_);
613  bool can_access_sorted_partition =
614  sorted_partition_ref_cnt_it != sorted_partition_key_ref_count_map.end() &&
615  sorted_partition_ref_cnt_it->second > 1;
616  if (can_access_sorted_partition || window_func_->hasFraming()) {
617  // keep the sorted partition only if it will be reused from other window function
618  // context of this query
619  sorted_partition_buf_ = std::make_shared<std::vector<int64_t>>(elem_count_);
620  DEBUG_TIMER("Window Function Sorted Partition Copy For Caching");
621  std::memcpy(
622  sorted_partition_buf_->data(), intermediate_output_buffer, output_buf_sz);
623  auto it = sorted_partition_cache.emplace(sorted_partition_cache_key_,
625  if (it.second) {
626  VLOG(1) << "Put sorted partition to cache (key: " << sorted_partition_cache_key_
627  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
628  << ")";
629  }
630  }
631  }
632 
633  if (window_func_->hasFraming()) {
634  const auto compute_ordered_partition_null_range = [=](const size_t start,
635  const size_t end) {
636  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
638  window_func_->getOrderKeys().front()->get_type_info(),
639  partition_idx,
640  payload() + offsets()[partition_idx],
641  intermediate_output_buffer + offsets()[partition_idx]);
642  }
643  };
644  auto partition_count = partitionCount();
645  if (should_parallelize) {
646  auto partition_compuation_timer =
647  DEBUG_TIMER("Window Function Ordered-Partition Null-Range Compute");
648  threading::task_group thread_pool;
649  for (auto interval : makeIntervals<size_t>(0, partitionCount(), cpu_threads())) {
650  thread_pool.run(
651  [=] { compute_ordered_partition_null_range(interval.begin, interval.end); });
652  }
653  thread_pool.wait();
654  } else {
655  auto partition_compuation_timer = DEBUG_TIMER(
656  "Window Function Non-Parallelized Ordered-Partition Null-Range Compute");
657  compute_ordered_partition_null_range(0, partitionCount());
658  }
659 
661  const auto build_aggregation_tree_for_partitions = [=](const size_t start,
662  const size_t end) {
663  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
664  // build a segment tree for the partition
665  // todo (yoonmin) : support generic window function expression
666  // i.e., when window_func_expr_columns_.size() > 1
667  const auto partition_size = counts()[partition_idx];
670  partition_idx,
671  partition_size,
673  payload() + offsets()[partition_idx],
674  intermediate_output_buffer,
675  window_func_->getArgs().front()->get_type_info());
676  }
677  };
678  if (should_parallelize) {
679  auto partition_compuation_timer =
680  DEBUG_TIMER("Window Function Build Segment Tree for Partitions");
681  threading::task_group thread_pool;
682  for (auto interval : makeIntervals<size_t>(0, partition_count, cpu_threads())) {
683  thread_pool.run([=] {
684  build_aggregation_tree_for_partitions(interval.begin, interval.end);
685  });
686  }
687  thread_pool.wait();
688  } else {
689  auto partition_compuation_timer =
690  DEBUG_TIMER("Window Function Build Segment Tree for Partitions");
691  build_aggregation_tree_for_partitions(0, partition_count);
692  }
693  }
694  }
695 
696  const auto compute_partitions = [=](const size_t start, const size_t end) {
697  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
698  computePartitionBuffer(partition_idx,
699  intermediate_output_buffer + offsets()[partition_idx],
700  window_func_);
701  }
702  };
703 
704  if (should_parallelize) {
705  auto partition_compuation_timer = DEBUG_TIMER("Window Function Partition Compute");
706  threading::task_group thread_pool;
707  for (auto interval : makeIntervals<size_t>(0, partitionCount(), cpu_threads())) {
708  thread_pool.run([=] { compute_partitions(interval.begin, interval.end); });
709  }
710  thread_pool.wait();
711  } else {
712  auto partition_compuation_timer =
713  DEBUG_TIMER("Window Function Non-Parallelized Partition Compute");
714  compute_partitions(0, partitionCount());
715  }
716 
717  if (is_window_function_aggregate_or_has_framing) {
718  // If window function is aggregate we were able to write to the final output buffer
719  // directly in computePartition and we are done.
720  return;
721  }
722 
723  auto output_i64 = reinterpret_cast<int64_t*>(output_);
724  const auto payload_copy = [=](const size_t start, const size_t end) {
725  for (size_t i = start; i < end; ++i) {
726  output_i64[payload()[i]] = intermediate_output_buffer[i];
727  }
728  };
729  if (should_parallelize) {
730  auto payload_copy_timer =
731  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Parallelized");
732  threading::task_group thread_pool;
733  for (auto interval : makeIntervals<size_t>(
734  0,
735  elem_count_,
736  std::min(static_cast<size_t>(cpu_threads()),
739  thread_pool.run([=] { payload_copy(interval.begin, interval.end); });
740  }
741  thread_pool.wait();
742  } else {
743  auto payload_copy_timer =
744  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Non-Parallelized");
745  payload_copy(0, elem_count_);
746  }
747 }
748 
750  const SQLTypeInfo& order_col_ti,
751  size_t partition_idx,
752  const int32_t* original_col_idx_buf,
753  const int64_t* ordered_col_idx_buf) {
754  IndexPair null_range{std::numeric_limits<int64_t>::max(),
755  std::numeric_limits<int64_t>::min()};
756  const auto& collation = window_func_->getCollation().front();
757  const auto partition_size = counts()[partition_idx];
758  if (partition_size > 0 && (order_col_ti.is_integer() || order_col_ti.is_decimal() ||
759  order_col_ti.is_time() || order_col_ti.is_boolean())) {
760  auto find_null_range_int = [&null_range,
761  &collation,
762  &original_col_idx_buf,
763  &ordered_col_idx_buf,
764  &partition_size](const auto order_col_buf,
765  const auto null_val) {
766  if (collation.nulls_first &&
767  order_col_buf[original_col_idx_buf[ordered_col_idx_buf[0]]] == null_val) {
768  int64_t null_range_max = 1;
769  while (null_range_max < partition_size &&
770  order_col_buf[original_col_idx_buf[ordered_col_idx_buf[null_range_max]]] ==
771  null_val) {
772  null_range_max++;
773  }
774  null_range.first = 0;
775  null_range.second = null_range_max - 1;
776  } else if (!collation.nulls_first &&
777  order_col_buf[original_col_idx_buf[ordered_col_idx_buf[partition_size -
778  1]]] ==
779  null_val) {
780  int64_t null_range_min = partition_size - 2;
781  while (null_range_min >= 0 &&
782  order_col_buf[original_col_idx_buf[ordered_col_idx_buf[null_range_min]]] ==
783  null_val) {
784  null_range_min--;
785  }
786  null_range.first = null_range_min + 1;
787  null_range.second = partition_size - 1;
788  }
789  };
790  switch (order_col_ti.get_size()) {
791  case 8: {
792  const auto order_col_buf =
793  reinterpret_cast<const int64_t*>(order_columns_.front());
794  find_null_range_int(order_col_buf, inline_int_null_value<int64_t>());
795  break;
796  }
797  case 4: {
798  const auto order_col_buf =
799  reinterpret_cast<const int32_t*>(order_columns_.front());
800  find_null_range_int(order_col_buf, inline_int_null_value<int32_t>());
801  break;
802  }
803  case 2: {
804  const auto order_col_buf =
805  reinterpret_cast<const int16_t*>(order_columns_.front());
806  find_null_range_int(order_col_buf, inline_int_null_value<int16_t>());
807  break;
808  }
809  case 1: {
810  const auto order_col_buf =
811  reinterpret_cast<const int8_t*>(order_columns_.front());
812  find_null_range_int(order_col_buf, inline_int_null_value<int8_t>());
813  break;
814  }
815  default: {
816  LOG(FATAL) << "Invalid type size: " << order_col_ti.get_size();
817  }
818  }
819  }
820  if (partition_size > 0 && order_col_ti.is_fp()) {
821  const auto null_bit_pattern =
822  null_val_bit_pattern(order_col_ti, order_col_ti.get_type() == kFLOAT);
823  switch (order_col_ti.get_type()) {
824  case kFLOAT: {
825  const auto order_col_buf = reinterpret_cast<const float*>(order_columns_.front());
826  auto check_null_val = [&null_bit_pattern,
827  &order_col_buf,
828  &original_col_idx_buf,
829  &ordered_col_idx_buf](size_t idx) {
830  return *reinterpret_cast<const int32_t*>(may_alias_ptr(
831  &order_col_buf[original_col_idx_buf[ordered_col_idx_buf[idx]]])) ==
832  null_bit_pattern;
833  };
834  if (collation.nulls_first && check_null_val(0)) {
835  int64_t null_range_max = 1;
836  while (null_range_max < partition_size && check_null_val(null_range_max)) {
837  null_range_max++;
838  }
839  null_range.first = 0;
840  null_range.second = null_range_max - 1;
841  } else if (!collation.nulls_first && check_null_val(partition_size - 1)) {
842  int64_t null_range_min = partition_size - 2;
843  while (null_range_min >= 0 && check_null_val(null_range_min)) {
844  null_range_min--;
845  }
846  null_range.first = null_range_min + 1;
847  null_range.second = partition_size - 1;
848  }
849  break;
850  }
851  case kDOUBLE: {
852  const auto order_col_buf =
853  reinterpret_cast<const double*>(order_columns_.front());
854  auto check_null_val = [&null_bit_pattern,
855  &order_col_buf,
856  &original_col_idx_buf,
857  &ordered_col_idx_buf](size_t idx) {
858  return *reinterpret_cast<const int64_t*>(may_alias_ptr(
859  &order_col_buf[original_col_idx_buf[ordered_col_idx_buf[idx]]])) ==
860  null_bit_pattern;
861  };
862  if (collation.nulls_first && check_null_val(0)) {
863  int64_t null_range_max = 1;
864  while (null_range_max < partition_size && check_null_val(null_range_max)) {
865  null_range_max++;
866  }
867  null_range.first = 0;
868  null_range.second = null_range_max - 1;
869  } else if (!collation.nulls_first && check_null_val(partition_size - 1)) {
870  int64_t null_range_min = partition_size - 2;
871  while (null_range_min >= 0 && check_null_val(null_range_min)) {
872  null_range_min--;
873  }
874  null_range.first = null_range_min + 1;
875  null_range.second = partition_size - 1;
876  }
877  break;
878  }
879  default: {
880  LOG(FATAL) << "Invalid float type";
881  }
882  }
883  }
884  ordered_partition_null_start_pos_[partition_idx] = null_range.first;
885  ordered_partition_null_end_pos_[partition_idx] = null_range.second + 1;
886 }
887 
888 std::vector<WindowFunctionContext::Comparator> WindowFunctionContext::createComparator(
889  size_t partition_idx) {
890  // create tuple comparator
891  std::vector<WindowFunctionContext::Comparator> partition_comparator;
892  const auto& order_keys = window_func_->getOrderKeys();
893  const auto& collation = window_func_->getCollation();
894  CHECK_EQ(order_keys.size(), collation.size());
895  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
896  ++order_column_idx) {
897  auto order_column_buffer = order_columns_[order_column_idx];
898  const auto order_col =
899  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
900  CHECK(order_col);
901  const auto& order_col_collation = collation[order_column_idx];
902  const auto asc_comparator = makeComparator(order_col,
903  order_column_buffer,
904  payload() + offsets()[partition_idx],
905  order_col_collation.nulls_first);
906  auto comparator = asc_comparator;
907  if (order_col_collation.is_desc) {
908  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
909  return asc_comparator(rhs, lhs);
910  };
911  }
912  partition_comparator.push_back(comparator);
913  }
914  return partition_comparator;
915 }
916 
917 void WindowFunctionContext::sortPartition(const size_t partition_idx,
918  int64_t* output_for_partition_buff,
919  bool should_parallelize) {
920  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
921  if (partition_size == 0) {
922  return;
923  }
924  std::iota(
925  output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
926  auto partition_comparator = createComparator(partition_idx);
927  if (!partition_comparator.empty()) {
928  const auto col_tuple_comparator = [&partition_comparator](const int64_t lhs,
929  const int64_t rhs) {
930  for (const auto& comparator : partition_comparator) {
931  const auto comparator_result = comparator(lhs, rhs);
932  switch (comparator_result) {
934  return true;
936  return false;
937  default:
938  // WindowComparatorResult::EQ: continue to next comparator
939  continue;
940  }
941  }
942  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
943  // return false as sort algo must enforce weak ordering
944  return false;
945  };
946  if (should_parallelize) {
947 #ifdef HAVE_TBB
948  tbb::parallel_sort(output_for_partition_buff,
949  output_for_partition_buff + partition_size,
950  col_tuple_comparator);
951 #else
952  thrust::sort(output_for_partition_buff,
953  output_for_partition_buff + partition_size,
954  col_tuple_comparator);
955 #endif
956  } else {
957  std::sort(output_for_partition_buff,
958  output_for_partition_buff + partition_size,
959  col_tuple_comparator);
960  }
961  }
962 }
963 
965  return window_func_;
966 }
967 
968 const int8_t* WindowFunctionContext::output() const {
969  return output_;
970 }
971 
974  return sorted_partition_buf_->data();
975 }
976 
979  return &aggregate_state_.val;
980 }
981 
984  return &aggregate_state_.count;
985 }
986 
990 }
991 
994  return partition_start_offset_ + 1;
995 }
996 
999  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
1000 }
1001 
1003  return partition_start_;
1004 }
1005 
1007  return partition_end_;
1008 }
1009 
1011  return elem_count_;
1012 }
1013 
1014 namespace {
1015 
1016 template <class T>
1018  const int8_t* order_column_buffer,
1019  const SQLTypeInfo& ti,
1020  const int32_t* partition_indices,
1021  const int64_t lhs,
1022  const int64_t rhs,
1023  const bool nulls_first) {
1024  const auto values = reinterpret_cast<const T*>(order_column_buffer);
1025  const auto lhs_val = values[partition_indices[lhs]];
1026  const auto rhs_val = values[partition_indices[rhs]];
1027  const auto null_val = inline_fixed_encoding_null_val(ti);
1028  if (lhs_val == null_val && rhs_val == null_val) {
1030  }
1031  if (lhs_val == null_val && rhs_val != null_val) {
1034  }
1035  if (rhs_val == null_val && lhs_val != null_val) {
1038  }
1039  if (lhs_val < rhs_val) {
1041  }
1042  if (lhs_val > rhs_val) {
1044  }
1046 }
1047 
1048 template <class T, class NullPatternType>
1050  const int8_t* order_column_buffer,
1051  const SQLTypeInfo& ti,
1052  const int32_t* partition_indices,
1053  const int64_t lhs,
1054  const int64_t rhs,
1055  const bool nulls_first) {
1056  const auto values = reinterpret_cast<const T*>(order_column_buffer);
1057  const auto lhs_val = values[partition_indices[lhs]];
1058  const auto rhs_val = values[partition_indices[rhs]];
1059  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
1060  const auto lhs_bit_pattern =
1061  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
1062  const auto rhs_bit_pattern =
1063  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
1064  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
1066  }
1067  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
1070  }
1071  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
1074  }
1075  if (lhs_val < rhs_val) {
1077  }
1078  if (lhs_val > rhs_val) {
1080  }
1082 }
1083 
1084 } // namespace
1085 
1087  const Analyzer::ColumnVar* col_var,
1088  const int8_t* order_column_buffer,
1089  const int32_t* partition_indices,
1090  const bool nulls_first) {
1091  const auto& ti = col_var->get_type_info();
1092  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1093  switch (ti.get_size()) {
1094  case 8: {
1095  return [order_column_buffer, nulls_first, partition_indices, &ti](
1096  const int64_t lhs, const int64_t rhs) {
1097  return integer_comparator<int64_t>(
1098  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
1099  };
1100  }
1101  case 4: {
1102  return [order_column_buffer, nulls_first, partition_indices, &ti](
1103  const int64_t lhs, const int64_t rhs) {
1104  return integer_comparator<int32_t>(
1105  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
1106  };
1107  }
1108  case 2: {
1109  return [order_column_buffer, nulls_first, partition_indices, &ti](
1110  const int64_t lhs, const int64_t rhs) {
1111  return integer_comparator<int16_t>(
1112  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
1113  };
1114  }
1115  case 1: {
1116  return [order_column_buffer, nulls_first, partition_indices, &ti](
1117  const int64_t lhs, const int64_t rhs) {
1118  return integer_comparator<int8_t>(
1119  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
1120  };
1121  }
1122  default: {
1123  LOG(FATAL) << "Invalid type size: " << ti.get_size();
1124  }
1125  }
1126  }
1127  if (ti.is_fp()) {
1128  switch (ti.get_type()) {
1129  case kFLOAT: {
1130  return [order_column_buffer, nulls_first, partition_indices, &ti](
1131  const int64_t lhs, const int64_t rhs) {
1132  return fp_comparator<float, int32_t>(
1133  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
1134  };
1135  }
1136  case kDOUBLE: {
1137  return [order_column_buffer, nulls_first, partition_indices, &ti](
1138  const int64_t lhs, const int64_t rhs) {
1139  return fp_comparator<double, int64_t>(
1140  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
1141  };
1142  }
1143  default: {
1144  LOG(FATAL) << "Invalid float type";
1145  }
1146  }
1147  }
1148  throw std::runtime_error("Type not supported yet");
1149 }
1150 
1152  const size_t partition_idx,
1153  int64_t* output_for_partition_buff,
1154  const Analyzer::WindowFunction* window_func) {
1155  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
1156  if (partition_size == 0) {
1157  return;
1158  }
1159  const auto offset = offsets()[partition_idx];
1160  auto partition_comparator = createComparator(partition_idx);
1161  const auto col_tuple_comparator = [&partition_comparator](const int64_t lhs,
1162  const int64_t rhs) {
1163  for (const auto& comparator : partition_comparator) {
1164  const auto comparator_result = comparator(lhs, rhs);
1165  switch (comparator_result) {
1167  return true;
1169  return false;
1170  default:
1171  // WindowComparatorResult::EQ: continue to next comparator
1172  continue;
1173  }
1174  }
1175  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
1176  // return false as sort algo must enforce weak ordering
1177  return false;
1178  };
1179  switch (window_func->getKind()) {
1181  const auto row_numbers =
1182  index_to_row_number(output_for_partition_buff, partition_size);
1183  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
1184  break;
1185  }
1187  const auto rank =
1188  index_to_rank(output_for_partition_buff, partition_size, col_tuple_comparator);
1189  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
1190  break;
1191  }
1193  const auto dense_rank = index_to_dense_rank(
1194  output_for_partition_buff, partition_size, col_tuple_comparator);
1195  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
1196  break;
1197  }
1199  const auto percent_rank = index_to_percent_rank(
1200  output_for_partition_buff, partition_size, col_tuple_comparator);
1201  std::copy(percent_rank.begin(),
1202  percent_rank.end(),
1203  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
1204  break;
1205  }
1207  const auto cume_dist = index_to_cume_dist(
1208  output_for_partition_buff, partition_size, col_tuple_comparator);
1209  std::copy(cume_dist.begin(),
1210  cume_dist.end(),
1211  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
1212  break;
1213  }
1215  const auto& args = window_func->getArgs();
1216  CHECK_EQ(args.size(), size_t(1));
1217  const auto n = get_int_constant_from_expr(args.front().get());
1218  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
1219  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
1220  break;
1221  }
1224  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
1225  const auto partition_row_offsets = payload() + offset;
1227  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
1228  break;
1229  }
1231  const auto partition_row_offsets = payload() + offset;
1233  partition_row_offsets, output_for_partition_buff, partition_size);
1234  break;
1235  }
1237  const auto partition_row_offsets = payload() + offset;
1239  partition_row_offsets, output_for_partition_buff, partition_size);
1240  break;
1241  }
1249  const auto partition_row_offsets = payload() + offset;
1250  if (window_function_requires_peer_handling(window_func)) {
1252  offset,
1253  output_for_partition_buff,
1254  partition_size,
1255  col_tuple_comparator);
1256  }
1258  output_for_partition_buff, partition_row_offsets, partition_size);
1259  break;
1260  }
1261  default: {
1262  throw std::runtime_error("Window function not supported yet: " +
1263  ::toString(window_func->getKind()));
1264  }
1265  }
1266 }
1267 
1269  SqlWindowFunctionKind agg_type,
1270  size_t partition_idx,
1271  size_t partition_size,
1272  const int8_t* col_buf,
1273  const int32_t* original_rowid_buf,
1274  const int64_t* ordered_rowid_buf,
1275  const SQLTypeInfo& input_col_ti) {
1276  CHECK(col_buf);
1277  if (!input_col_ti.is_number()) {
1278  throw QueryNotSupported("Window aggregate function over frame on a column type " +
1279  ::toString(input_col_ti.get_type()) + " is not supported.");
1280  }
1281  const auto type = input_col_ti.is_decimal() ? decimal_to_int_type(input_col_ti)
1282  : input_col_ti.get_type();
1283  if (partition_size > 0) {
1284  IndexPair order_col_null_range{ordered_partition_null_start_pos_[partition_idx],
1285  ordered_partition_null_end_pos_[partition_idx]};
1286  const int64_t* ordered_rowid_buf_for_partition =
1287  ordered_rowid_buf + offsets()[partition_idx];
1288  VLOG(2) << "Build Aggregation Tree For Partition-" << ::toString(partition_idx)
1289  << " (# elems: " << ::toString(partition_size)
1290  << ", null_range: " << order_col_null_range.first << " ~ "
1291  << order_col_null_range.second << ")";
1292  switch (type) {
1293  case kTINYINT: {
1294  const auto segment_tree = std::make_shared<SegmentTree<int8_t, int64_t>>(
1295  col_buf,
1296  input_col_ti,
1297  original_rowid_buf,
1298  ordered_rowid_buf_for_partition,
1299  order_col_null_range,
1300  partition_size,
1301  agg_type,
1303  aggregate_trees_depth_[partition_idx] =
1304  segment_tree ? segment_tree->getLeafDepth() : 0;
1305  if (agg_type == SqlWindowFunctionKind::AVG) {
1307  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr);
1308  } else {
1310  segment_tree ? segment_tree->getAggregatedValues() : nullptr);
1311  }
1312  segment_trees_owned_.emplace_back(std::move(segment_tree));
1313  break;
1314  }
1315  case kSMALLINT: {
1316  const auto segment_tree = std::make_shared<SegmentTree<int16_t, int64_t>>(
1317  col_buf,
1318  input_col_ti,
1319  original_rowid_buf,
1320  ordered_rowid_buf_for_partition,
1321  order_col_null_range,
1322  partition_size,
1323  agg_type,
1325  aggregate_trees_depth_[partition_idx] =
1326  segment_tree ? segment_tree->getLeafDepth() : 0;
1327  if (agg_type == SqlWindowFunctionKind::AVG) {
1329  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr);
1330  } else {
1332  segment_tree ? segment_tree->getAggregatedValues() : nullptr);
1333  }
1334  segment_trees_owned_.emplace_back(std::move(segment_tree));
1335  break;
1336  }
1337  case kINT: {
1338  const auto segment_tree = std::make_shared<SegmentTree<int32_t, int64_t>>(
1339  col_buf,
1340  input_col_ti,
1341  original_rowid_buf,
1342  ordered_rowid_buf_for_partition,
1343  order_col_null_range,
1344  partition_size,
1345  agg_type,
1347  aggregate_trees_depth_[partition_idx] =
1348  segment_tree ? segment_tree->getLeafDepth() : 0;
1349  if (agg_type == SqlWindowFunctionKind::AVG) {
1351  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr);
1352  } else {
1354  segment_tree ? segment_tree->getAggregatedValues() : nullptr);
1355  }
1356  segment_trees_owned_.emplace_back(std::move(segment_tree));
1357  break;
1358  }
1359  case kDECIMAL:
1360  case kNUMERIC:
1361  case kBIGINT: {
1362  const auto segment_tree = std::make_shared<SegmentTree<int64_t, int64_t>>(
1363  col_buf,
1364  input_col_ti,
1365  original_rowid_buf,
1366  ordered_rowid_buf_for_partition,
1367  order_col_null_range,
1368  partition_size,
1369  agg_type,
1371  aggregate_trees_depth_[partition_idx] =
1372  segment_tree ? segment_tree->getLeafDepth() : 0;
1373  if (agg_type == SqlWindowFunctionKind::AVG) {
1375  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr);
1376  } else {
1378  segment_tree ? segment_tree->getAggregatedValues() : nullptr);
1379  }
1380  segment_trees_owned_.emplace_back(std::move(segment_tree));
1381  break;
1382  }
1383  case kFLOAT: {
1384  const auto segment_tree =
1385  std::make_shared<SegmentTree<float, double>>(col_buf,
1386  input_col_ti,
1387  original_rowid_buf,
1388  ordered_rowid_buf_for_partition,
1389  order_col_null_range,
1390  partition_size,
1391  agg_type,
1393  aggregate_trees_depth_[partition_idx] =
1394  segment_tree ? segment_tree->getLeafDepth() : 0;
1395  if (agg_type == SqlWindowFunctionKind::AVG) {
1397  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr);
1398  } else {
1400  segment_tree ? segment_tree->getAggregatedValues() : nullptr);
1401  }
1402  segment_trees_owned_.emplace_back(std::move(segment_tree));
1403  break;
1404  }
1405  case kDOUBLE: {
1406  const auto segment_tree =
1407  std::make_shared<SegmentTree<double, double>>(col_buf,
1408  input_col_ti,
1409  original_rowid_buf,
1410  ordered_rowid_buf_for_partition,
1411  order_col_null_range,
1412  partition_size,
1413  agg_type,
1415  aggregate_trees_depth_[partition_idx] =
1416  segment_tree ? segment_tree->getLeafDepth() : 0;
1417  if (agg_type == SqlWindowFunctionKind::AVG) {
1419  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr);
1420  } else {
1422  segment_tree ? segment_tree->getAggregatedValues() : nullptr);
1423  }
1424  segment_trees_owned_.emplace_back(std::move(segment_tree));
1425  break;
1426  }
1427  default:
1428  UNREACHABLE();
1429  }
1430  } else {
1431  // handling a case of an empty partition
1432  aggregate_trees_depth_[partition_idx] = 0;
1433  if (input_col_ti.is_integer() || input_col_ti.is_decimal()) {
1434  if (agg_type == SqlWindowFunctionKind::AVG) {
1436  } else {
1438  }
1439  } else {
1440  CHECK(input_col_ti.is_fp());
1441  if (agg_type == SqlWindowFunctionKind::AVG) {
1443  } else {
1445  }
1446  }
1447  }
1448 }
1449 
1451  return const_cast<int64_t**>(aggregate_trees_.aggregate_tree_for_integer_type_.data());
1452 }
1453 
1455  return const_cast<double**>(aggregate_trees_.aggregate_tree_for_double_type_.data());
1456 }
1457 
1460  return const_cast<SumAndCountPair<int64_t>**>(
1462 }
1463 
1466  return const_cast<SumAndCountPair<double>**>(
1468 }
1469 
1471  return aggregate_trees_depth_;
1472 }
1473 
1475  return aggregate_trees_fan_out_;
1476 }
1477 
1480 }
1481 
1484 }
1485 
1488  0,
1489  static_cast<int64_t>(elem_count_),
1490  false,
1492  1};
1493  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1494  if (partitions_) {
1495  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
1496  }
1497  partition_start_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
1498  int64_t partition_count = partitionCount();
1499  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
1500  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
1502  // if we have `partition_start_offset_`, we can reuse it for this logic
1503  // but note that it has partition_count + 1 elements where the first element is zero
1504  // which means the first partition's start offset is zero
1505  // and rest of them can represent values required for this logic
1506  for (int64_t i = 0; i < partition_count - 1; ++i) {
1508  &partition_start_handle, partition_start_offset_[i + 1], 0);
1509  }
1510  } else {
1511  std::vector<size_t> partition_offsets(partition_count);
1512  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
1513  for (int64_t i = 0; i < partition_count - 1; ++i) {
1514  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
1515  }
1516  }
1517 }
1518 
1521  0,
1522  static_cast<int64_t>(elem_count_),
1523  false,
1525  1};
1526  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1527  if (partitions_) {
1528  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
1529  }
1530  partition_end_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
1531  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
1532  int64_t partition_count = partitionCount();
1534  // if we have `partition_start_offset_`, we can reuse it for this logic
1535  // but note that it has partition_count + 1 elements where the first element is zero
1536  // which means the first partition's start offset is zero
1537  // and rest of them can represent values required for this logic
1538  for (int64_t i = 0; i < partition_count - 1; ++i) {
1539  if (partition_start_offset_[i + 1] == 0) {
1540  continue;
1541  }
1543  &partition_end_handle, partition_start_offset_[i + 1] - 1, 0);
1544  }
1545  if (elem_count_) {
1546  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
1547  }
1548  } else {
1549  std::vector<size_t> partition_offsets(partition_count);
1550  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
1551  for (int64_t i = 0; i < partition_count - 1; ++i) {
1552  if (partition_offsets[i] == 0) {
1553  continue;
1554  }
1555  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
1556  }
1557  if (elem_count_) {
1558  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
1559  }
1560  }
1561 }
1562 
1563 const int32_t* WindowFunctionContext::payload() const {
1564  if (partitions_) {
1565  return reinterpret_cast<const int32_t*>(
1566  partitions_->getJoinHashBuffer(device_type_, 0) +
1567  partitions_->payloadBufferOff());
1568  }
1569  return dummy_payload_; // non-partitioned window function
1570 }
1571 
1572 const int32_t* WindowFunctionContext::offsets() const {
1573  if (partitions_) {
1574  return reinterpret_cast<const int32_t*>(
1575  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
1576  }
1577  return &dummy_offset_;
1578 }
1579 
1580 const int32_t* WindowFunctionContext::counts() const {
1581  if (partitions_) {
1582  return reinterpret_cast<const int32_t*>(
1583  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
1584  }
1585  return &dummy_count_;
1586 }
1587 
1589  if (partitions_) {
1590  const auto partition_count = counts() - offsets();
1591  CHECK_GE(partition_count, 0);
1592  return partition_count;
1593  }
1594  return 1; // non-partitioned window function
1595 }
1596 
1598  return window_func_->hasFraming() &&
1600 }
1601 
1603  std::unique_ptr<WindowFunctionContext> window_function_context,
1604  const size_t target_index) {
1605  const auto it_ok = window_contexts_.emplace(
1606  std::make_pair(target_index, std::move(window_function_context)));
1607  CHECK(it_ok.second);
1608 }
1609 
1611  Executor* executor,
1612  const size_t target_index) const {
1613  const auto it = window_contexts_.find(target_index);
1614  CHECK(it != window_contexts_.end());
1615  executor->active_window_function_ = it->second.get();
1616  return executor->active_window_function_;
1617 }
1618 
1620  executor->active_window_function_ = nullptr;
1621 }
1622 
1624  Executor* executor) {
1625  return executor->active_window_function_;
1626 }
1627 
1629  executor->window_project_node_context_owned_ =
1630  std::make_unique<WindowProjectNodeContext>();
1631  return executor->window_project_node_context_owned_.get();
1632 }
1633 
1635  return executor->window_project_node_context_owned_.get();
1636 }
1637 
1638 void WindowProjectNodeContext::reset(Executor* executor) {
1639  executor->window_project_node_context_owned_ = nullptr;
1640  executor->active_window_function_ = nullptr;
1641 }
size_t getAggregateTreeFanout() const
bool g_enable_parallel_window_partition_sort
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::vector< SumAndCountPair< double > * > derived_aggregate_tree_for_double_type_
Definition: WindowContext.h:66
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2406
void addOrderColumn(const int8_t *column, const SQLTypeInfo &ti, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
WindowFunctionContext::WindowComparatorResult integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
HOST DEVICE int get_size() const
Definition: sqltypes.h:414
int64_t * ordered_partition_null_start_pos_
RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
const int32_t dummy_count_
std::vector< double * > aggregate_tree_for_double_type_
Definition: WindowContext.h:64
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
RUNTIME_EXPORT void add_window_pending_output(void *pending_output, const int64_t handle)
ExecutorDeviceType
bool hasAggregateTreeRequiredWindowFunc() const
Definition: Analyzer.h:2454
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
int64_t * getNullValueEndPos() const
Utility functions for easy access to the result set buffers.
const int32_t dummy_offset_
#define LOG(tag)
Definition: Logger.h:216
bool is_fp() const
Definition: sqltypes.h:604
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< SQLTypeInfo > & getOrderKeyColumnBufferTypes() const
void setSortedPartitionCacheKey(QueryPlanHash cache_key)
#define UNREACHABLE()
Definition: Logger.h:266
void computeNullRangeOfSortedPartition(const SQLTypeInfo &order_col_ti, size_t partition_idx, const int32_t *original_col_idx_buf, const int64_t *ordered_col_idx_buf)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:235
static WindowProjectNodeContext * create(Executor *executor)
void compute(std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, std::unordered_map< QueryPlanHash, std::shared_ptr< std::vector< int64_t >>> &sorted_partition_cache)
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
Constants for Builtin SQL Types supported by HEAVY.AI.
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t g_parallel_window_partition_compute_threshold
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
size_t g_parallel_window_partition_sort_threshold
RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
bool is_number() const
Definition: sqltypes.h:605
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2414
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2432
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
bool is_time() const
Definition: sqltypes.h:606
const bool needsToBuildAggregateTree() const
size_t * getAggregateTreeDepth() const
int64_t ** getAggregationTreesForIntegerTypeWindowExpr() const
const std::vector< const int8_t * > & getColumnBufferForWindowFunctionExpressions() const
int64_t * getNullValueStartPos() const
SumAndCountPair< double > ** getDerivedAggregationTreesForDoubleTypeWindowExpr() const
const int64_t * partitionStartOffset() const
std::vector< std::shared_ptr< void > > segment_trees_owned_
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static const WindowProjectNodeContext * get(Executor *executor)
size_t g_window_function_aggregation_tree_fanout
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const int64_t * aggregateStateCount() const
std::vector< Comparator > createComparator(size_t partition_idx)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2408
QueryPlanHash sorted_partition_cache_key_
bool is_integer() const
Definition: sqltypes.h:602
const int8_t * partitionEnd() const
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > window_func_expr_columns_owner_
void addColumnBufferForWindowFunctionExpression(const int8_t *column, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
static void reset(Executor *executor)
void buildAggregationTreeForPartition(SqlWindowFunctionKind agg_type, size_t partition_idx, size_t partition_size, const int8_t *col_buf, const int32_t *original_rowid_buf, const int64_t *ordered_rowid_buf, const SQLTypeInfo &input_col_ti)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
int64_t aggregateStatePendingOutputs() const
RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
bool is_boolean() const
Definition: sqltypes.h:607
const int64_t * aggregateState() const
RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:82
std::pair< int64_t, int64_t > IndexPair
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:499
SumAndCountPair< int64_t > ** getDerivedAggregationTreesForIntegerTypeWindowExpr() const
#define RUNTIME_EXPORT
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:44
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
#define CHECK_LE(x, y)
Definition: Logger.h:233
RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
std::vector< const int8_t * > window_func_expr_columns_
AggregateTreeForWindowFraming aggregate_trees_
void sortPartition(const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
bool pos_is_set(const int64_t bitset, const int64_t pos)
const std::vector< const int8_t * > & getOrderKeyColumnBuffers() const
std::shared_ptr< HashJoin > partitions_
size_t QueryPlanHash
const int64_t * partitionNumCountBuf() const
RUNTIME_EXPORT void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:113
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
void computePartitionBuffer(const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
int64_t * partition_start_offset_
#define CHECK(condition)
Definition: Logger.h:222
#define DEBUG_TIMER(name)
Definition: Logger.h:371
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
const int64_t * sortedPartition() const
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::vector< SQLTypeInfo > order_columns_ti_
std::vector< SumAndCountPair< int64_t > * > derived_aggregate_tree_for_integer_type_
Definition: WindowContext.h:65
const Analyzer::WindowFunction * getWindowFunction() const
Definition: sqltypes.h:59
const int32_t * payload() const
std::function< WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)> Comparator
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
size_t * aggregate_trees_depth_
RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
constexpr double n
Definition: Utm.h:38
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
int cpu_threads()
Definition: thread_count.h:25
bool is_decimal() const
Definition: sqltypes.h:603
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
bool hasFraming() const
Definition: Analyzer.h:2442
std::vector< int64_t * > aggregate_tree_for_integer_type_
Definition: WindowContext.h:63
const ExecutorDeviceType device_type_
#define VLOG(n)
Definition: Logger.h:316
WindowFunctionContext::WindowComparatorResult fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
double ** getAggregationTreesForDoubleTypeWindowExpr() const
int64_t * ordered_partition_null_end_pos_