OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <numeric>
20 
22 #include "QueryEngine/Execute.h"
28 #include "Shared/Intervals.h"
29 #include "Shared/checked_alloc.h"
30 #include "Shared/funcannotations.h"
31 #include "Shared/sqltypes.h"
32 #include "Shared/threading.h"
33 
34 #ifdef HAVE_TBB
35 //#include <tbb/parallel_for.h>
36 #include <tbb/parallel_sort.h>
37 #else
38 #include <thrust/sort.h>
39 #endif
40 
43 
46 
48 
49 // Non-partitioned version (no hash table provided)
51  const Analyzer::WindowFunction* window_func,
52  const size_t elem_count,
53  const ExecutorDeviceType device_type,
54  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
55  : window_func_(window_func)
56  , partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
57  , sorted_partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
58  , partitions_(nullptr)
59  , elem_count_(elem_count)
60  , output_(nullptr)
61  , sorted_partition_buf_(nullptr)
62  , aggregate_trees_fan_out_(g_window_function_aggregation_tree_fanout)
63  , aggregate_trees_depth_(nullptr)
64  , ordered_partition_null_start_pos_(nullptr)
65  , ordered_partition_null_end_pos_(nullptr)
66  , partition_start_offset_(nullptr)
67  , partition_start_(nullptr)
68  , partition_end_(nullptr)
69  , device_type_(device_type)
70  , row_set_mem_owner_(row_set_mem_owner)
71  , dummy_count_(elem_count)
72  , dummy_offset_(0)
73  , dummy_payload_(nullptr) {
74  CHECK_LE(elem_count_, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
76  reinterpret_cast<int32_t*>(checked_malloc(elem_count_ * sizeof(int32_t)));
80  // in this case, we consider all rows of the row belong to the same and only
81  // existing partition
83  reinterpret_cast<int64_t*>(checked_calloc(2, sizeof(int64_t)));
85  aggregate_trees_depth_ = reinterpret_cast<size_t*>(checked_calloc(1, sizeof(size_t)));
87  reinterpret_cast<int64_t*>(checked_calloc(1, sizeof(int64_t)));
89  reinterpret_cast<int64_t*>(checked_calloc(1, sizeof(int64_t)));
90  }
91 }
92 
93 // Partitioned version
95  const Analyzer::WindowFunction* window_func,
96  QueryPlanHash partition_cache_key,
97  const std::shared_ptr<HashJoin>& partitions,
98  const size_t elem_count,
99  const ExecutorDeviceType device_type,
100  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
101  size_t aggregation_tree_fan_out)
102  : window_func_(window_func)
103  , partition_cache_key_(partition_cache_key)
104  , sorted_partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
105  , partitions_(partitions)
106  , elem_count_(elem_count)
107  , output_(nullptr)
108  , sorted_partition_buf_(nullptr)
109  , aggregate_trees_fan_out_(aggregation_tree_fan_out)
110  , aggregate_trees_depth_(nullptr)
111  , ordered_partition_null_start_pos_(nullptr)
112  , ordered_partition_null_end_pos_(nullptr)
113  , partition_start_offset_(nullptr)
114  , partition_start_(nullptr)
115  , partition_end_(nullptr)
116  , device_type_(device_type)
117  , row_set_mem_owner_(row_set_mem_owner)
118  , dummy_count_(elem_count)
119  , dummy_offset_(0)
120  , dummy_payload_(nullptr) {
121  CHECK(partitions_); // This version should have hash table
122  size_t partition_count = partitionCount();
124  reinterpret_cast<int64_t*>(checked_calloc(partition_count + 1, sizeof(int64_t)));
127  reinterpret_cast<size_t*>(checked_calloc(partition_count, sizeof(size_t)));
129  reinterpret_cast<int64_t*>(checked_calloc(partition_count, sizeof(int64_t)));
131  reinterpret_cast<int64_t*>(checked_calloc(partition_count, sizeof(int64_t)));
132  }
133  // the first partition starts at zero position
134  std::partial_sum(counts(), counts() + partition_count, partition_start_offset_ + 1);
135 }
136 
138  free(partition_start_);
139  free(partition_end_);
140  if (dummy_payload_) {
141  free(dummy_payload_);
142  }
145  }
148  }
151  }
154  }
155 }
156 
158  const int8_t* column,
159  const SQLTypeInfo& ti,
160  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
161  order_columns_owner_.push_back(chunks_owner);
162  order_columns_.push_back(column);
163  order_columns_ti_.push_back(ti);
164 }
165 
167  const int8_t* column,
168  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
169  window_func_expr_columns_owner_.push_back(chunks_owner);
170  window_func_expr_columns_.push_back(column);
171 };
172 
173 const std::vector<const int8_t*>&
176 }
177 
178 const std::vector<const int8_t*>& WindowFunctionContext::getOrderKeyColumnBuffers()
179  const {
180  return order_columns_;
181 }
182 
184  const {
185  return order_columns_ti_;
186 }
187 
189  sorted_partition_cache_key_ = cache_key;
190 }
191 
192 namespace {
193 
194 // Converts the sorted indices to a mapping from row position to row number.
195 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
196  std::vector<int64_t> row_numbers(index_size);
197  for (size_t i = 0; i < index_size; ++i) {
198  row_numbers[index[i]] = i + 1;
199  }
200  return row_numbers;
201 }
202 
203 // Returns true iff the current element is greater than the previous, according to the
204 // comparator. This is needed because peer rows have to have the same rank.
206  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
207  const int64_t* index,
208  const size_t i) {
209  if (i == 0) {
210  return false;
211  }
212  return comparator(index[i - 1], index[i]);
213 }
214 
215 // Computes the mapping from row position to rank.
216 std::vector<int64_t> index_to_rank(
217  const int64_t* index,
218  const size_t index_size,
219  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
220  std::vector<int64_t> rank(index_size);
221  size_t crt_rank = 1;
222  for (size_t i = 0; i < index_size; ++i) {
223  if (advance_current_rank(comparator, index, i)) {
224  crt_rank = i + 1;
225  }
226  rank[index[i]] = crt_rank;
227  }
228  return rank;
229 }
230 
231 // Computes the mapping from row position to dense rank.
232 std::vector<int64_t> index_to_dense_rank(
233  const int64_t* index,
234  const size_t index_size,
235  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
236  std::vector<int64_t> dense_rank(index_size);
237  size_t crt_rank = 1;
238  for (size_t i = 0; i < index_size; ++i) {
239  if (advance_current_rank(comparator, index, i)) {
240  ++crt_rank;
241  }
242  dense_rank[index[i]] = crt_rank;
243  }
244  return dense_rank;
245 }
246 
247 // Computes the mapping from row position to percent rank.
248 std::vector<double> index_to_percent_rank(
249  const int64_t* index,
250  const size_t index_size,
251  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
252  std::vector<double> percent_rank(index_size);
253  size_t crt_rank = 1;
254  for (size_t i = 0; i < index_size; ++i) {
255  if (advance_current_rank(comparator, index, i)) {
256  crt_rank = i + 1;
257  }
258  percent_rank[index[i]] =
259  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
260  }
261  return percent_rank;
262 }
263 
264 // Computes the mapping from row position to cumulative distribution.
265 std::vector<double> index_to_cume_dist(
266  const int64_t* index,
267  const size_t index_size,
268  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
269  std::vector<double> cume_dist(index_size);
270  size_t start_peer_group = 0;
271  while (start_peer_group < index_size) {
272  size_t end_peer_group = start_peer_group + 1;
273  while (end_peer_group < index_size &&
274  !advance_current_rank(comparator, index, end_peer_group)) {
275  ++end_peer_group;
276  }
277  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
278  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
279  }
280  start_peer_group = end_peer_group;
281  }
282  return cume_dist;
283 }
284 
285 // Computes the mapping from row position to the n-tile statistic.
286 std::vector<int64_t> index_to_ntile(const int64_t* index,
287  const size_t index_size,
288  const size_t n) {
289  std::vector<int64_t> row_numbers(index_size);
290  if (!n) {
291  throw std::runtime_error("NTILE argument cannot be zero");
292  }
293  const size_t tile_size = (index_size + n - 1) / n;
294  for (size_t i = 0; i < index_size; ++i) {
295  row_numbers[index[i]] = i / tile_size + 1;
296  }
297  return row_numbers;
298 }
299 
300 // The element size in the result buffer for the given window function kind. Currently
301 // it's always 8.
303  return 8;
304 }
305 
306 // Extracts the integer constant from a constant expression.
308  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
309  if (!lag_constant) {
310  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
311  }
312  const auto& lag_ti = lag_constant->get_type_info();
313  switch (lag_ti.get_type()) {
314  case kSMALLINT: {
315  return lag_constant->get_constval().smallintval;
316  }
317  case kINT: {
318  return lag_constant->get_constval().intval;
319  }
320  case kBIGINT: {
321  return lag_constant->get_constval().bigintval;
322  }
323  default: {
324  LOG(FATAL) << "Invalid type for the lag argument";
325  }
326  }
327  return 0;
328 }
329 
330 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
332  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
333  window_func->getKind() == SqlWindowFunctionKind::LEAD);
334  const auto& args = window_func->getArgs();
335  if (args.size() == 3) {
336  throw std::runtime_error("LAG with default not supported yet");
337  }
338  if (args.size() == 2) {
339  const int64_t lag_or_lead =
340  static_cast<int64_t>(get_int_constant_from_expr(args[1].get()));
341  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
342  : -lag_or_lead;
343  }
344  CHECK_EQ(args.size(), size_t(1));
345  return window_func->getKind() == SqlWindowFunctionKind::LAG ? 1 : -1;
346 }
347 
349  const Analyzer::WindowFunction* window_func,
350  const size_t partition_size) {
352  window_func->getKind() == SqlWindowFunctionKind::LAST_VALUE);
353  return window_func->getKind() == SqlWindowFunctionKind::FIRST_VALUE
354  ? 0
355  : partition_size - 1;
356 }
357 
358 // Redistributes the original_indices according to the permutation given by
359 // output_for_partition_buff, reusing it as an output buffer.
360 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
361  const int32_t* original_indices,
362  const size_t partition_size) {
363  std::vector<int64_t> new_output_for_partition_buff(partition_size);
364  for (size_t i = 0; i < partition_size; ++i) {
365  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
366  }
367  std::copy(new_output_for_partition_buff.begin(),
368  new_output_for_partition_buff.end(),
369  output_for_partition_buff);
370 }
371 
372 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
373 void apply_lag_to_partition(const int64_t lag,
374  const int32_t* original_indices,
375  int64_t* sorted_indices,
376  const size_t partition_size) {
377  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
378  for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
379  int64_t lag_idx = idx - lag;
380  if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
381  continue;
382  }
383  lag_sorted_indices[idx] = sorted_indices[lag_idx];
384  }
385  std::vector<int64_t> lag_original_indices(partition_size);
386  for (size_t k = 0; k < partition_size; ++k) {
387  const auto lag_index = lag_sorted_indices[k];
388  lag_original_indices[sorted_indices[k]] =
389  lag_index != -1 ? original_indices[lag_index] : -1;
390  }
391  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
392 }
393 
394 void apply_nth_value_to_partition(const int32_t* original_indices,
395  int64_t* output_for_partition_buff,
396  const size_t partition_size,
397  const size_t target_pos) {
398  CHECK_LT(target_pos, partition_size);
399  const auto target_idx = original_indices[output_for_partition_buff[target_pos]];
400  std::fill(
401  output_for_partition_buff, output_for_partition_buff + partition_size, target_idx);
402 }
403 
404 void apply_original_index_to_partition(const int32_t* original_indices,
405  int64_t* output_for_partition_buff,
406  const size_t partition_size) {
407  for (size_t i = 0; i < partition_size; i++) {
408  const auto target_idx = original_indices[output_for_partition_buff[i]];
409  output_for_partition_buff[i] = target_idx;
410  }
411 }
412 
414  const int8_t* partition_end,
415  const size_t off,
416  const int64_t* index,
417  const size_t index_size,
418  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
419  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
420  for (size_t i = 0; i < index_size; ++i) {
421  if (advance_current_rank(comparator, index, i)) {
422  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0, 0);
423  }
424  }
425  CHECK(index_size);
426  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0, 0);
427 }
428 
429 bool pos_is_set(const int64_t bitset, const int64_t pos) {
430  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
431 }
432 
433 // Write value to pending integer outputs collected for all the peer rows. The end of
434 // groups is represented by the bitset.
435 template <class T>
436 void apply_window_pending_outputs_int(const int64_t handle,
437  const int64_t value,
438  const int64_t bitset,
439  const int64_t pos) {
440  if (!pos_is_set(bitset, pos)) {
441  return;
442  }
443  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
444  for (auto pending_output_slot : pending_output_slots) {
445  *reinterpret_cast<T*>(pending_output_slot) = value;
446  }
447  pending_output_slots.clear();
448 }
449 
450 } // namespace
451 
452 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle,
453  const int64_t value,
454  const int64_t bitset,
455  const int64_t pos) {
456  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
457 }
458 
459 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle,
460  const int64_t value,
461  const int64_t bitset,
462  const int64_t pos) {
463  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
464 }
465 
466 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle,
467  const int64_t value,
468  const int64_t bitset,
469  const int64_t pos) {
470  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
471 }
472 
473 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle,
474  const int64_t value,
475  const int64_t bitset,
476  const int64_t pos) {
477  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
478 }
479 
480 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle,
481  const double value,
482  const int64_t bitset,
483  const int64_t pos) {
484  if (!pos_is_set(bitset, pos)) {
485  return;
486  }
487  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
488  for (auto pending_output_slot : pending_output_slots) {
489  *reinterpret_cast<double*>(pending_output_slot) = value;
490  }
491  pending_output_slots.clear();
492 }
493 
494 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle,
495  const float value,
496  const int64_t bitset,
497  const int64_t pos) {
498  if (!pos_is_set(bitset, pos)) {
499  return;
500  }
501  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
502  for (auto pending_output_slot : pending_output_slots) {
503  *reinterpret_cast<double*>(pending_output_slot) = value;
504  }
505  pending_output_slots.clear();
506 }
507 
509  const int64_t handle,
510  const float value,
511  const int64_t bitset,
512  const int64_t pos) {
513  if (!pos_is_set(bitset, pos)) {
514  return;
515  }
516  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
517  for (auto pending_output_slot : pending_output_slots) {
518  *reinterpret_cast<float*>(pending_output_slot) = value;
519  }
520  pending_output_slots.clear();
521 }
522 
523 // Add a pending output slot to be written back at the end of a peer row group.
524 extern "C" RUNTIME_EXPORT void add_window_pending_output(void* pending_output,
525  const int64_t handle) {
526  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
527 }
528 
529 // Returns true iff the aggregate window function requires special multiplicity handling
530 // to ensure that peer rows have the same value for the window function.
532  if (!window_function_is_aggregate(window_func->getKind())) {
533  return false;
534  }
535  if (window_func->getOrderKeys().empty()) {
536  return true;
537  }
538  switch (window_func->getKind()) {
541  return false;
542  }
543  default: {
544  return true;
545  }
546  }
547 }
548 
550  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
551  std::unordered_map<QueryPlanHash, std::shared_ptr<std::vector<int64_t>>>&
552  sorted_partition_cache,
553  std::unordered_map<size_t, AggregateTreeForWindowFraming>& aggregate_tree_map) {
554  auto timer = DEBUG_TIMER(__func__);
555  CHECK(!output_);
556  if (elem_count_ == 0) {
557  return;
558  }
559  size_t output_buf_sz =
561  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(output_buf_sz,
562  /*thread_idx=*/0));
563  bool const is_agg_func = window_function_is_aggregate(window_func_->getKind());
564  bool const need_window_partition_buf =
566  if (is_agg_func || need_window_partition_buf) {
569  need_window_partition_buf) {
571  }
572  }
573  std::unique_ptr<int64_t[]> scratchpad;
574  int64_t* intermediate_output_buffer;
575  if (is_agg_func || need_window_partition_buf) {
576  intermediate_output_buffer = reinterpret_cast<int64_t*>(output_);
577  } else {
578  output_buf_sz = sizeof(int64_t) * elem_count_;
579  scratchpad.reset(new int64_t[elem_count_]);
580  intermediate_output_buffer = scratchpad.get();
581  }
582  const bool should_parallelize{g_enable_parallel_window_partition_compute &&
583  elem_count_ >=
585 
586  auto cached_sorted_partition_it =
587  sorted_partition_cache.find(sorted_partition_cache_key_);
588  if (cached_sorted_partition_it != sorted_partition_cache.end()) {
589  auto& sorted_partition = cached_sorted_partition_it->second;
590  VLOG(1) << "Reuse cached sorted partition to compute window function context (key: "
592  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
593  << ")";
594  DEBUG_TIMER("Window Function Cached Sorted Partition Copy");
595  std::memcpy(intermediate_output_buffer, sorted_partition->data(), output_buf_sz);
596  if (need_window_partition_buf) {
597  sorted_partition_buf_ = sorted_partition;
598  }
599  } else {
600  // ordering partitions if necessary
601  const auto sort_partitions = [&](const size_t start, const size_t end) {
602  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
603  sortPartition(partition_idx,
604  intermediate_output_buffer + offsets()[partition_idx],
605  should_parallelize);
606  }
607  };
608 
609  if (should_parallelize) {
610  auto sorted_partition_copy_timer =
611  DEBUG_TIMER("Window Function Partition Sorting Parallelized");
612  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
613  [&, parent_thread_local_ids = logger::thread_local_ids()](
614  const tbb::blocked_range<int64_t>& r) {
616  parent_thread_local_ids.setNewThreadId();
617  sort_partitions(r.begin(), r.end());
618  });
619  } else {
620  auto sorted_partition_copy_timer =
621  DEBUG_TIMER("Window Function Partition Sorting Non-Parallelized");
622  sort_partitions(0, partitionCount());
623  }
624  auto sorted_partition_ref_cnt_it =
625  sorted_partition_key_ref_count_map.find(sorted_partition_cache_key_);
626  bool can_access_sorted_partition =
627  sorted_partition_ref_cnt_it != sorted_partition_key_ref_count_map.end() &&
628  sorted_partition_ref_cnt_it->second > 1;
629  if (can_access_sorted_partition || need_window_partition_buf) {
630  // keep the sorted partition only if it will be reused from other window function
631  // context of this query
632  sorted_partition_buf_ = std::make_shared<std::vector<int64_t>>(elem_count_);
633  DEBUG_TIMER("Window Function Sorted Partition Copy For Caching");
634  std::memcpy(
635  sorted_partition_buf_->data(), intermediate_output_buffer, output_buf_sz);
636  auto it = sorted_partition_cache.emplace(sorted_partition_cache_key_,
638  if (it.second) {
639  VLOG(1) << "Put sorted partition to cache (key: " << sorted_partition_cache_key_
640  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
641  << ")";
642  }
643  }
644  }
645 
646  if (need_window_partition_buf) {
647  const auto compute_ordered_partition_null_range = [=](const size_t start,
648  const size_t end) {
649  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
651  window_func_->getOrderKeys().front()->get_type_info(),
652  partition_idx,
653  payload() + offsets()[partition_idx],
654  intermediate_output_buffer + offsets()[partition_idx]);
655  }
656  };
657  auto partition_count = partitionCount();
658 
659  if (should_parallelize) {
660  auto partition_compuation_timer =
661  DEBUG_TIMER("Window Function Ordered-Partition Null-Range Compute");
662  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
663  [&, parent_thread_local_ids = logger::thread_local_ids()](
664  const tbb::blocked_range<int64_t>& r) {
666  parent_thread_local_ids.setNewThreadId();
667  compute_ordered_partition_null_range(r.begin(), r.end());
668  });
669  } else {
670  auto partition_compuation_timer = DEBUG_TIMER(
671  "Window Function Non-Parallelized Ordered-Partition Null-Range Compute");
672  compute_ordered_partition_null_range(0, partitionCount());
673  }
674  auto const cache_key = computeAggregateTreeCacheKey();
675  auto const c_it = aggregate_tree_map.find(cache_key);
676  if (c_it != aggregate_tree_map.cend()) {
677  VLOG(1) << "Reuse aggregate tree for window function framing";
679  aggregate_trees_ = c_it->second;
680  memcpy(aggregate_trees_depth_,
682  sizeof(size_t) * partition_count);
683  } else {
685  const auto build_aggregation_tree_for_partitions = [=](const size_t start,
686  const size_t end) {
687  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
688  // build a segment tree for the partition
689  // todo (yoonmin) : support generic window function expression
690  // i.e., when window_func_expr_columns_.size() > 1
691  SQLTypeInfo const input_col_ti =
692  window_func_->getArgs().front()->get_type_info();
693  const auto partition_size = counts()[partition_idx];
695  partition_idx,
696  partition_size,
697  payload() + offsets()[partition_idx],
698  intermediate_output_buffer,
699  input_col_ti);
700  }
701  };
703  if (should_parallelize) {
704  auto partition_compuation_timer = DEBUG_TIMER(
705  "Window Function Parallelized Segment Tree Construction for Partitions");
706  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
707  [=, parent_thread_local_ids = logger::thread_local_ids()](
708  const tbb::blocked_range<int64_t>& r) {
710  parent_thread_local_ids.setNewThreadId();
711  build_aggregation_tree_for_partitions(r.begin(), r.end());
712  });
713  } else {
714  auto partition_compuation_timer = DEBUG_TIMER(
715  "Window Function Non-Parallelized Segment Tree Construction for "
716  "Partitions");
717  build_aggregation_tree_for_partitions(0, partition_count);
718  }
719  }
720  CHECK(aggregate_tree_map.emplace(cache_key, aggregate_trees_).second);
721  VLOG(2) << "Put aggregate tree for the window framing";
722  }
723  }
724 
725  const auto compute_partitions = [=](const size_t start, const size_t end) {
726  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
727  computePartitionBuffer(partition_idx,
728  intermediate_output_buffer + offsets()[partition_idx],
729  window_func_);
730  }
731  };
732 
733  if (should_parallelize) {
734  auto partition_compuation_timer = DEBUG_TIMER("Window Function Partition Compute");
735  tbb::parallel_for(tbb::blocked_range<int64_t>(0, partitionCount()),
736  [&, parent_thread_local_ids = logger::thread_local_ids()](
737  const tbb::blocked_range<int64_t>& r) {
739  parent_thread_local_ids.setNewThreadId();
740  compute_partitions(r.begin(), r.end());
741  });
742  } else {
743  auto partition_compuation_timer =
744  DEBUG_TIMER("Window Function Non-Parallelized Partition Compute");
745  compute_partitions(0, partitionCount());
746  }
747 
748  if (is_agg_func || need_window_partition_buf) {
749  // If window function is aggregate we were able to write to the final output buffer
750  // directly in computePartition and we are done.
751  return;
752  }
753 
754  auto output_i64 = reinterpret_cast<int64_t*>(output_);
755  const auto payload_copy = [=](const size_t start, const size_t end) {
756  for (size_t i = start; i < end; ++i) {
757  output_i64[payload()[i]] = intermediate_output_buffer[i];
758  }
759  };
760  if (should_parallelize) {
761  auto payload_copy_timer =
762  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Parallelized");
763  tbb::parallel_for(tbb::blocked_range<int64_t>(0, elem_count_),
764  [&, parent_thread_local_ids = logger::thread_local_ids()](
765  const tbb::blocked_range<int64_t>& r) {
767  parent_thread_local_ids.setNewThreadId();
768  payload_copy(r.begin(), r.end());
769  });
770  } else {
771  auto payload_copy_timer =
772  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Non-Parallelized");
773  payload_copy(0, elem_count_);
774  }
775 }
776 
777 namespace {
779  int32_t const* original_col_idx_buf;
780  int64_t const* ordered_col_idx_buf;
781  int32_t const partition_size;
782  int64_t null_bit_pattern = -1;
783 
784  template <typename T>
785  IndexPair find_null_range_int(int8_t const* order_col_buf) const {
786  IndexPair null_range{std::numeric_limits<int64_t>::max(),
787  std::numeric_limits<int64_t>::min()};
788  auto const null_val = inline_int_null_value<T>();
789  auto const casted_order_col_buf = reinterpret_cast<T const*>(order_col_buf);
790  if (casted_order_col_buf[original_col_idx_buf[ordered_col_idx_buf[0]]] == null_val) {
791  int64_t null_range_max = 1;
792  while (null_range_max < partition_size &&
793  casted_order_col_buf
794  [original_col_idx_buf[ordered_col_idx_buf[null_range_max]]] ==
795  null_val) {
796  null_range_max++;
797  }
798  null_range.first = 0;
799  null_range.second = null_range_max - 1;
800  } else if (casted_order_col_buf
801  [original_col_idx_buf[ordered_col_idx_buf[partition_size - 1]]] ==
802  null_val) {
803  int64_t null_range_min = partition_size - 2;
804  while (null_range_min >= 0 &&
805  casted_order_col_buf
806  [original_col_idx_buf[ordered_col_idx_buf[null_range_min]]] ==
807  null_val) {
808  null_range_min--;
809  }
810  null_range.first = null_range_min + 1;
811  null_range.second = partition_size - 1;
812  }
813  return null_range;
814  }
815 
816  template <typename COL_TYPE,
817  typename NULL_TYPE =
818  std::conditional_t<sizeof(COL_TYPE) == sizeof(int32_t), int32_t, int64_t>>
819  IndexPair find_null_range_fp(int8_t const* order_col_buf) const {
820  IndexPair null_range{std::numeric_limits<int64_t>::max(),
821  std::numeric_limits<int64_t>::min()};
822  auto const casted_order_col_buf = reinterpret_cast<COL_TYPE const*>(order_col_buf);
823  auto check_null_val = [&casted_order_col_buf, this](size_t idx) {
824  return *reinterpret_cast<NULL_TYPE const*>(
825  may_alias_ptr(&casted_order_col_buf
826  [original_col_idx_buf[ordered_col_idx_buf[idx]]])) ==
827  null_bit_pattern;
828  };
829  if (check_null_val(0)) {
830  int64_t null_range_max = 1;
831  while (null_range_max < partition_size && check_null_val(null_range_max)) {
832  null_range_max++;
833  }
834  null_range.first = 0;
835  null_range.second = null_range_max - 1;
836  } else if (check_null_val(partition_size - 1)) {
837  int64_t null_range_min = partition_size - 2;
838  while (null_range_min >= 0 && check_null_val(null_range_min)) {
839  null_range_min--;
840  }
841  null_range.first = null_range_min + 1;
842  null_range.second = partition_size - 1;
843  }
844  return null_range;
845  }
846 };
847 } // namespace
848 
850  const SQLTypeInfo& order_col_ti,
851  size_t partition_idx,
852  const int32_t* original_col_idx_buf,
853  const int64_t* ordered_col_idx_buf) {
854  IndexPair null_range;
855  const auto partition_size = counts()[partition_idx];
856  if (partition_size > 0) {
857  if (order_col_ti.is_integer() || order_col_ti.is_decimal() ||
858  order_col_ti.is_time_or_date() || order_col_ti.is_boolean()) {
859  FindNullRange const null_range_info{
860  original_col_idx_buf, ordered_col_idx_buf, partition_size};
861  switch (order_col_ti.get_size()) {
862  case 8:
863  null_range =
864  null_range_info.find_null_range_int<int64_t>(order_columns_.front());
865  break;
866  case 4:
867  null_range =
868  null_range_info.find_null_range_int<int32_t>(order_columns_.front());
869  break;
870  case 2:
871  null_range =
872  null_range_info.find_null_range_int<int16_t>(order_columns_.front());
873  break;
874  case 1:
875  null_range =
876  null_range_info.find_null_range_int<int8_t>(order_columns_.front());
877  break;
878  default:
879  LOG(FATAL) << "Invalid type size: " << order_col_ti.get_size();
880  }
881  } else if (order_col_ti.is_fp()) {
882  const auto null_bit_pattern =
883  null_val_bit_pattern(order_col_ti, order_col_ti.get_type() == kFLOAT);
884  FindNullRange const null_range_info{
885  original_col_idx_buf, ordered_col_idx_buf, partition_size, null_bit_pattern};
886  switch (order_col_ti.get_type()) {
887  case kFLOAT:
888  null_range = null_range_info.find_null_range_fp<float>(order_columns_.front());
889  break;
890  case kDOUBLE:
891  null_range = null_range_info.find_null_range_fp<double>(order_columns_.front());
892  break;
893  default:
894  LOG(FATAL) << "Invalid float type";
895  }
896  } else {
897  LOG(FATAL) << "Invalid column type for window aggregation over the frame";
898  }
899  }
900  ordered_partition_null_start_pos_[partition_idx] = null_range.first;
901  ordered_partition_null_end_pos_[partition_idx] = null_range.second + 1;
902 }
903 
904 std::vector<WindowFunctionContext::Comparator> WindowFunctionContext::createComparator(
905  size_t partition_idx) {
906  // create tuple comparator
907  std::vector<WindowFunctionContext::Comparator> partition_comparator;
908  const auto& order_keys = window_func_->getOrderKeys();
909  const auto& collation = window_func_->getCollation();
910  CHECK_EQ(order_keys.size(), collation.size());
911  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
912  ++order_column_idx) {
913  auto order_column_buffer = order_columns_[order_column_idx];
914  const auto order_col =
915  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
916  CHECK(order_col);
917  const auto& order_col_collation = collation[order_column_idx];
918  auto comparator = makeComparator(order_col,
919  order_column_buffer,
920  payload() + offsets()[partition_idx],
921  !order_col_collation.is_desc,
922  order_col_collation.nulls_first);
923  if (order_col_collation.is_desc) {
924  comparator = [comparator](const int64_t lhs, const int64_t rhs) {
925  return comparator(rhs, lhs);
926  };
927  }
928  partition_comparator.push_back(comparator);
929  }
930  return partition_comparator;
931 }
932 
933 void WindowFunctionContext::sortPartition(const size_t partition_idx,
934  int64_t* output_for_partition_buff,
935  bool should_parallelize) {
936  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
937  if (partition_size == 0) {
938  return;
939  }
940  std::iota(
941  output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
942  auto partition_comparator = createComparator(partition_idx);
943  if (!partition_comparator.empty()) {
944  const auto col_tuple_comparator = [&partition_comparator](const int64_t lhs,
945  const int64_t rhs) {
946  for (const auto& comparator : partition_comparator) {
947  const auto comparator_result = comparator(lhs, rhs);
948  switch (comparator_result) {
950  return true;
952  return false;
953  default:
954  // WindowComparatorResult::EQ: continue to next comparator
955  continue;
956  }
957  }
958  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
959  // return false as sort algo must enforce weak ordering
960  return false;
961  };
962  if (should_parallelize) {
963 #ifdef HAVE_TBB
964  tbb::parallel_sort(output_for_partition_buff,
965  output_for_partition_buff + partition_size,
966  col_tuple_comparator);
967 #else
968  thrust::sort(output_for_partition_buff,
969  output_for_partition_buff + partition_size,
970  col_tuple_comparator);
971 #endif
972  } else {
973  std::sort(output_for_partition_buff,
974  output_for_partition_buff + partition_size,
975  col_tuple_comparator);
976  }
977  }
978 }
979 
981  return window_func_;
982 }
983 
984 const int8_t* WindowFunctionContext::output() const {
985  return output_;
986 }
987 
990  return sorted_partition_buf_->data();
991 }
992 
995  return &aggregate_state_.val;
996 }
997 
1000  return &aggregate_state_.count;
1001 }
1002 
1005  return partition_start_offset_;
1006 }
1007 
1010  return partition_start_offset_ + 1;
1011 }
1012 
1015  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
1016 }
1017 
1019  return partition_start_;
1020 }
1021 
1023  return partition_end_;
1024 }
1025 
1027  return elem_count_;
1028 }
1029 
1030 namespace {
1031 
1032 template <class T>
1034  const int8_t* order_column_buffer,
1035  const SQLTypeInfo& ti,
1036  const int32_t* partition_indices,
1037  const int64_t lhs,
1038  const int64_t rhs,
1039  const bool asc_ordering,
1040  const bool nulls_first) {
1041  const auto values = reinterpret_cast<const T*>(order_column_buffer);
1042  const auto lhs_val = values[partition_indices[lhs]];
1043  const auto rhs_val = values[partition_indices[rhs]];
1044  const auto null_val = inline_fixed_encoding_null_val(ti);
1045  if (lhs_val == null_val && rhs_val == null_val) {
1047  }
1048  if (lhs_val == null_val && rhs_val != null_val) {
1051  }
1052  if (rhs_val == null_val && lhs_val != null_val) {
1055  }
1056  if (lhs_val < rhs_val) {
1058  }
1059  if (lhs_val > rhs_val) {
1061  }
1063 }
1064 
1065 template <class T>
1067  const int8_t* order_column_buffer,
1068  const SQLTypeInfo& ti,
1069  const int32_t* partition_indices,
1070  const int64_t lhs,
1071  const int64_t rhs,
1072  const bool asc_ordering,
1073  const bool nulls_first) {
1074  const auto values = reinterpret_cast<const T*>(order_column_buffer);
1075  const auto lhs_val = values[partition_indices[lhs]];
1076  const auto rhs_val = values[partition_indices[rhs]];
1077  const auto null_val = inline_fixed_encoding_null_val(ti);
1078  if (lhs_val == null_val && rhs_val == null_val) {
1080  }
1081  if (lhs_val == null_val && rhs_val != null_val) {
1084  }
1085  if (rhs_val == null_val && lhs_val != null_val) {
1088  }
1089  if (lhs_val < rhs_val) {
1091  }
1092  if (lhs_val > rhs_val) {
1094  }
1096 }
1097 
1098 template <class T, class NullPatternType>
1100  const int8_t* order_column_buffer,
1101  const SQLTypeInfo& ti,
1102  const int32_t* partition_indices,
1103  const int64_t lhs,
1104  const int64_t rhs,
1105  const bool asc_ordering,
1106  const bool nulls_first) {
1107  const auto values = reinterpret_cast<const T*>(order_column_buffer);
1108  const auto lhs_val = values[partition_indices[lhs]];
1109  const auto rhs_val = values[partition_indices[rhs]];
1110  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
1111  const auto lhs_bit_pattern =
1112  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
1113  const auto rhs_bit_pattern =
1114  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
1115  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
1117  }
1118  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
1121  }
1122  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
1125  }
1126  if (lhs_val < rhs_val) {
1128  }
1129  if (lhs_val > rhs_val) {
1131  }
1133 }
1134 
1135 template <class T, class NullPatternType>
1137  const int8_t* order_column_buffer,
1138  const SQLTypeInfo& ti,
1139  const int32_t* partition_indices,
1140  const int64_t lhs,
1141  const int64_t rhs,
1142  const bool asc_ordering,
1143  const bool nulls_first) {
1144  const auto values = reinterpret_cast<const T*>(order_column_buffer);
1145  const auto lhs_val = values[partition_indices[lhs]];
1146  const auto rhs_val = values[partition_indices[rhs]];
1147  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
1148  const auto lhs_bit_pattern =
1149  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
1150  const auto rhs_bit_pattern =
1151  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
1152  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
1154  }
1155  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
1158  }
1159  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
1162  }
1163  if (lhs_val < rhs_val) {
1165  }
1166  if (lhs_val > rhs_val) {
1168  }
1170 }
1171 
1172 } // namespace
1173 
1175  const Analyzer::ColumnVar* col_var,
1176  const int8_t* order_column_buffer,
1177  const int32_t* partition_indices,
1178  const bool asc_ordering,
1179  const bool nulls_first) {
1180  const auto& ti = col_var->get_type_info();
1181  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
1182  switch (ti.get_size()) {
1183  case 8: {
1184  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1185  const int64_t lhs, const int64_t rhs) {
1186  return asc_ordering ? integer_comparator_asc<int64_t>(order_column_buffer,
1187  ti,
1188  partition_indices,
1189  lhs,
1190  rhs,
1191  asc_ordering,
1192  nulls_first)
1193  : integer_comparator_desc<int64_t>(order_column_buffer,
1194  ti,
1195  partition_indices,
1196  lhs,
1197  rhs,
1198  asc_ordering,
1199  nulls_first);
1200  };
1201  }
1202  case 4: {
1203  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1204  const int64_t lhs, const int64_t rhs) {
1205  return asc_ordering ? integer_comparator_asc<int32_t>(order_column_buffer,
1206  ti,
1207  partition_indices,
1208  lhs,
1209  rhs,
1210  asc_ordering,
1211  nulls_first)
1212  : integer_comparator_desc<int32_t>(order_column_buffer,
1213  ti,
1214  partition_indices,
1215  lhs,
1216  rhs,
1217  asc_ordering,
1218  nulls_first);
1219  };
1220  }
1221  case 2: {
1222  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1223  const int64_t lhs, const int64_t rhs) {
1224  return asc_ordering ? integer_comparator_asc<int16_t>(order_column_buffer,
1225  ti,
1226  partition_indices,
1227  lhs,
1228  rhs,
1229  asc_ordering,
1230  nulls_first)
1231  : integer_comparator_desc<int16_t>(order_column_buffer,
1232  ti,
1233  partition_indices,
1234  lhs,
1235  rhs,
1236  asc_ordering,
1237  nulls_first);
1238  };
1239  }
1240  case 1: {
1241  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1242  const int64_t lhs, const int64_t rhs) {
1243  return asc_ordering ? integer_comparator_asc<int8_t>(order_column_buffer,
1244  ti,
1245  partition_indices,
1246  lhs,
1247  rhs,
1248  asc_ordering,
1249  nulls_first)
1250  : integer_comparator_desc<int8_t>(order_column_buffer,
1251  ti,
1252  partition_indices,
1253  lhs,
1254  rhs,
1255  asc_ordering,
1256  nulls_first);
1257  };
1258  }
1259  default: {
1260  LOG(FATAL) << "Invalid type size: " << ti.get_size();
1261  }
1262  }
1263  }
1264  if (ti.is_fp()) {
1265  switch (ti.get_type()) {
1266  case kFLOAT: {
1267  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1268  const int64_t lhs, const int64_t rhs) {
1269  return asc_ordering ? fp_comparator_asc<float, int32_t>(order_column_buffer,
1270  ti,
1271  partition_indices,
1272  lhs,
1273  rhs,
1274  asc_ordering,
1275  nulls_first)
1276  : fp_comparator_desc<float, int32_t>(order_column_buffer,
1277  ti,
1278  partition_indices,
1279  lhs,
1280  rhs,
1281  asc_ordering,
1282  nulls_first);
1283  };
1284  }
1285  case kDOUBLE: {
1286  return [order_column_buffer, nulls_first, partition_indices, asc_ordering, &ti](
1287  const int64_t lhs, const int64_t rhs) {
1288  return asc_ordering ? fp_comparator_asc<double, int64_t>(order_column_buffer,
1289  ti,
1290  partition_indices,
1291  lhs,
1292  rhs,
1293  asc_ordering,
1294  nulls_first)
1295  : fp_comparator_desc<double, int64_t>(order_column_buffer,
1296  ti,
1297  partition_indices,
1298  lhs,
1299  rhs,
1300  asc_ordering,
1301  nulls_first);
1302  };
1303  }
1304  default: {
1305  LOG(FATAL) << "Invalid float type";
1306  }
1307  }
1308  }
1309  throw std::runtime_error("Type not supported yet");
1310 }
1311 
1313  const size_t partition_idx,
1314  int64_t* output_for_partition_buff,
1315  const Analyzer::WindowFunction* window_func) {
1316  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
1317  if (partition_size == 0) {
1318  return;
1319  }
1320  const auto offset = offsets()[partition_idx];
1321  auto partition_comparator = createComparator(partition_idx);
1322  const auto col_tuple_comparator = [&partition_comparator](const int64_t lhs,
1323  const int64_t rhs) {
1324  for (const auto& comparator : partition_comparator) {
1325  const auto comparator_result = comparator(lhs, rhs);
1326  switch (comparator_result) {
1328  return true;
1330  return false;
1331  default:
1332  // WindowComparatorResult::EQ: continue to next comparator
1333  continue;
1334  }
1335  }
1336  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
1337  // return false as sort algo must enforce weak ordering
1338  return false;
1339  };
1340  switch (window_func->getKind()) {
1342  const auto row_numbers =
1343  index_to_row_number(output_for_partition_buff, partition_size);
1344  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
1345  break;
1346  }
1348  const auto rank =
1349  index_to_rank(output_for_partition_buff, partition_size, col_tuple_comparator);
1350  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
1351  break;
1352  }
1354  const auto dense_rank = index_to_dense_rank(
1355  output_for_partition_buff, partition_size, col_tuple_comparator);
1356  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
1357  break;
1358  }
1360  const auto percent_rank = index_to_percent_rank(
1361  output_for_partition_buff, partition_size, col_tuple_comparator);
1362  std::copy(percent_rank.begin(),
1363  percent_rank.end(),
1364  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
1365  break;
1366  }
1368  const auto cume_dist = index_to_cume_dist(
1369  output_for_partition_buff, partition_size, col_tuple_comparator);
1370  std::copy(cume_dist.begin(),
1371  cume_dist.end(),
1372  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
1373  break;
1374  }
1376  const auto& args = window_func->getArgs();
1377  CHECK_EQ(args.size(), size_t(1));
1378  const auto n = get_int_constant_from_expr(args.front().get());
1379  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
1380  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
1381  break;
1382  }
1385  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
1386  const auto partition_row_offsets = payload() + offset;
1388  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
1389  break;
1390  }
1393  const auto target_idx =
1394  get_target_idx_for_first_or_last_value_func(window_func, partition_size);
1395  const auto partition_row_offsets = payload() + offset;
1397  partition_row_offsets, output_for_partition_buff, partition_size, target_idx);
1398  break;
1399  }
1401  auto const n_value_ptr =
1402  dynamic_cast<Analyzer::Constant*>(window_func_->getArgs()[1].get());
1403  CHECK(n_value_ptr);
1404  auto const n_value = static_cast<size_t>(n_value_ptr->get_constval().intval);
1405  const auto partition_row_offsets = payload() + offset;
1406  if (n_value < partition_size) {
1408  partition_row_offsets, output_for_partition_buff, partition_size, n_value);
1409  } else {
1410  // when NTH_VALUE of the current row is NULL, we keep the NULL value in the
1411  // current row's output storage in the query output buffer, so we assign the
1412  // original index of the current row to the corresponding slot in
1413  // `output_for_partition_buff`
1415  partition_row_offsets, output_for_partition_buff, partition_size);
1416  }
1417  break;
1418  }
1434  const auto partition_row_offsets = payload() + offset;
1435  if (window_function_requires_peer_handling(window_func)) {
1437  offset,
1438  output_for_partition_buff,
1439  partition_size,
1440  col_tuple_comparator);
1441  }
1443  output_for_partition_buff, partition_row_offsets, partition_size);
1444  break;
1445  }
1446  default: {
1447  throw std::runtime_error("Window function not supported yet: " +
1448  ::toString(window_func->getKind()));
1449  }
1450  }
1451 }
1452 
1454  auto const partition_count = partitionCount();
1456  if (!for_reuse) {
1457  segment_trees_owned_.resize(partition_count);
1458  }
1459 }
1460 
1461 namespace {
1463  switch (kind) {
1465  return true;
1466  default:
1467  return false;
1468  }
1469 }
1470 } // namespace
1471 
1473  SqlWindowFunctionKind agg_type,
1474  size_t partition_idx,
1475  size_t partition_size,
1476  const int32_t* original_rowid_buf,
1477  const int64_t* ordered_rowid_buf,
1478  const SQLTypeInfo& input_col_ti) {
1479  if (!(input_col_ti.is_number() || input_col_ti.is_boolean() ||
1480  input_col_ti.is_time_or_date())) {
1481  throw QueryNotSupported("Window aggregate function over frame on a column type " +
1482  ::toString(input_col_ti.get_type()) + " is not supported.");
1483  }
1484  if (input_col_ti.is_time_or_date() &&
1486  !(agg_type == SqlWindowFunctionKind::MIN ||
1487  agg_type == SqlWindowFunctionKind::MAX ||
1488  agg_type == SqlWindowFunctionKind::COUNT)) {
1489  throw QueryNotSupported(
1490  "Aggregation over a window frame for a column type " +
1491  ::toString(input_col_ti.get_type()) +
1492  " must use one of the following window aggregate function: MIN / MAX / COUNT");
1493  }
1494  const auto type = input_col_ti.is_decimal() ? decimal_to_int_type(input_col_ti)
1495  : input_col_ti.is_time_or_date()
1496  ? get_int_type_by_size(input_col_ti.get_size())
1497  : input_col_ti.get_type();
1498  if (partition_size > 0) {
1499  IndexPair order_col_null_range{ordered_partition_null_start_pos_[partition_idx],
1500  ordered_partition_null_end_pos_[partition_idx]};
1501  const int64_t* ordered_rowid_buf_for_partition =
1502  ordered_rowid_buf + offsets()[partition_idx];
1503  switch (type) {
1504  case kBOOLEAN:
1505  case kTINYINT: {
1506  const auto segment_tree = std::make_shared<SegmentTree<int8_t, int64_t>>(
1508  input_col_ti,
1509  original_rowid_buf,
1510  ordered_rowid_buf_for_partition,
1511  partition_size,
1512  agg_type,
1514  aggregate_trees_depth_[partition_idx] =
1515  segment_tree ? segment_tree->getLeafDepth() : 0;
1516  if (agg_type == SqlWindowFunctionKind::AVG) {
1518  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1519  } else {
1521  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1522  }
1523  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1524  break;
1525  }
1526  case kSMALLINT: {
1527  const auto segment_tree = std::make_shared<SegmentTree<int16_t, int64_t>>(
1529  input_col_ti,
1530  original_rowid_buf,
1531  ordered_rowid_buf_for_partition,
1532  partition_size,
1533  agg_type,
1535  aggregate_trees_depth_[partition_idx] =
1536  segment_tree ? segment_tree->getLeafDepth() : 0;
1537  if (agg_type == SqlWindowFunctionKind::AVG) {
1539  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1540  } else {
1542  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1543  }
1544  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1545  break;
1546  }
1547  case kINT: {
1548  const auto segment_tree = std::make_shared<SegmentTree<int32_t, int64_t>>(
1550  input_col_ti,
1551  original_rowid_buf,
1552  ordered_rowid_buf_for_partition,
1553  partition_size,
1554  agg_type,
1556  aggregate_trees_depth_[partition_idx] =
1557  segment_tree ? segment_tree->getLeafDepth() : 0;
1558  if (agg_type == SqlWindowFunctionKind::AVG) {
1560  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1561  } else {
1563  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1564  }
1565  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1566  break;
1567  }
1568  case kDECIMAL:
1569  case kNUMERIC:
1570  case kBIGINT: {
1571  const auto segment_tree = std::make_shared<SegmentTree<int64_t, int64_t>>(
1573  input_col_ti,
1574  original_rowid_buf,
1575  ordered_rowid_buf_for_partition,
1576  partition_size,
1577  agg_type,
1579  aggregate_trees_depth_[partition_idx] =
1580  segment_tree ? segment_tree->getLeafDepth() : 0;
1581  if (agg_type == SqlWindowFunctionKind::AVG) {
1583  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1584  } else {
1586  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1587  }
1588  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1589  break;
1590  }
1591  case kFLOAT: {
1592  const auto segment_tree =
1593  std::make_shared<SegmentTree<float, double>>(window_func_expr_columns_,
1594  input_col_ti,
1595  original_rowid_buf,
1596  ordered_rowid_buf_for_partition,
1597  partition_size,
1598  agg_type,
1600  aggregate_trees_depth_[partition_idx] =
1601  segment_tree ? segment_tree->getLeafDepth() : 0;
1602  if (agg_type == SqlWindowFunctionKind::AVG) {
1604  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1605  } else {
1607  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1608  }
1609  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1610  break;
1611  }
1612  case kDOUBLE: {
1613  const auto segment_tree =
1614  std::make_shared<SegmentTree<double, double>>(window_func_expr_columns_,
1615  input_col_ti,
1616  original_rowid_buf,
1617  ordered_rowid_buf_for_partition,
1618  partition_size,
1619  agg_type,
1621  aggregate_trees_depth_[partition_idx] =
1622  segment_tree ? segment_tree->getLeafDepth() : 0;
1623  if (agg_type == SqlWindowFunctionKind::AVG) {
1625  segment_tree ? segment_tree->getDerivedAggregatedValues() : nullptr;
1626  } else {
1628  segment_tree ? segment_tree->getAggregatedValues() : nullptr;
1629  }
1630  segment_trees_owned_[partition_idx] = std::move(segment_tree);
1631  break;
1632  }
1633  default:
1634  UNREACHABLE();
1635  }
1636  } else {
1637  // handling a case of an empty partition
1638  aggregate_trees_depth_[partition_idx] = 0;
1639  if (input_col_ti.is_integer() || input_col_ti.is_decimal() ||
1640  input_col_ti.is_boolean() || input_col_ti.is_time_or_date()) {
1641  if (agg_type == SqlWindowFunctionKind::AVG) {
1643  nullptr;
1644  } else {
1645  aggregate_trees_.aggregate_tree_for_integer_type_[partition_idx] = nullptr;
1646  }
1647  } else {
1648  CHECK(input_col_ti.is_fp());
1649  if (agg_type == SqlWindowFunctionKind::AVG) {
1651  } else {
1652  aggregate_trees_.aggregate_tree_for_double_type_[partition_idx] = nullptr;
1653  }
1654  }
1655  }
1657 }
1658 
1660  return const_cast<int64_t**>(aggregate_trees_.aggregate_tree_for_integer_type_.data());
1661 }
1662 
1664  return const_cast<double**>(aggregate_trees_.aggregate_tree_for_double_type_.data());
1665 }
1666 
1669  return const_cast<SumAndCountPair<int64_t>**>(
1671 }
1672 
1675  return const_cast<SumAndCountPair<double>**>(
1677 }
1678 
1680  return aggregate_trees_depth_;
1681 }
1682 
1684  return aggregate_trees_fan_out_;
1685 }
1686 
1689 }
1690 
1693 }
1694 
1697  0,
1698  0,
1699  static_cast<int64_t>(elem_count_),
1700  false,
1702  1};
1703  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1704  if (partitions_) {
1705  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
1706  }
1707  partition_start_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
1708  int64_t partition_count = partitionCount();
1709  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
1710  agg_count_distinct_bitmap(&partition_start_handle, 0, 0, 0);
1712  // if we have `partition_start_offset_`, we can reuse it for this logic
1713  // but note that it has partition_count + 1 elements where the first element is zero
1714  // which means the first partition's start offset is zero
1715  // and rest of them can represent values required for this logic
1716  for (int64_t i = 0; i < partition_count - 1; ++i) {
1718  &partition_start_handle, partition_start_offset_[i + 1], 0, 0);
1719  }
1720  } else {
1721  std::vector<size_t> partition_offsets(partition_count);
1722  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
1723  for (int64_t i = 0; i < partition_count - 1; ++i) {
1724  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0, 0);
1725  }
1726  }
1727 }
1728 
1731  0,
1732  0,
1733  static_cast<int64_t>(elem_count_),
1734  false,
1736  1};
1737  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
1738  if (partitions_) {
1739  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
1740  }
1741  partition_end_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
1742  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
1743  int64_t partition_count = partitionCount();
1745  // if we have `partition_start_offset_`, we can reuse it for this logic
1746  // but note that it has partition_count + 1 elements where the first element is zero
1747  // which means the first partition's start offset is zero
1748  // and rest of them can represent values required for this logic
1749  for (int64_t i = 0; i < partition_count - 1; ++i) {
1750  if (partition_start_offset_[i + 1] == 0) {
1751  continue;
1752  }
1754  &partition_end_handle, partition_start_offset_[i + 1] - 1, 0, 0);
1755  }
1756  if (elem_count_) {
1757  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0, 0);
1758  }
1759  } else {
1760  std::vector<size_t> partition_offsets(partition_count);
1761  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
1762  for (int64_t i = 0; i < partition_count - 1; ++i) {
1763  if (partition_offsets[i] == 0) {
1764  continue;
1765  }
1766  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0, 0);
1767  }
1768  if (elem_count_) {
1769  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0, 0);
1770  }
1771  }
1772 }
1773 
1774 const int32_t* WindowFunctionContext::payload() const {
1775  if (partitions_) {
1776  return reinterpret_cast<const int32_t*>(
1777  partitions_->getJoinHashBuffer(device_type_, 0) +
1778  partitions_->payloadBufferOff());
1779  }
1780  return dummy_payload_; // non-partitioned window function
1781 }
1782 
1783 const int32_t* WindowFunctionContext::offsets() const {
1784  if (partitions_) {
1785  return reinterpret_cast<const int32_t*>(
1786  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
1787  }
1788  return &dummy_offset_;
1789 }
1790 
1791 const int32_t* WindowFunctionContext::counts() const {
1792  if (partitions_) {
1793  return reinterpret_cast<const int32_t*>(
1794  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
1795  }
1796  return &dummy_count_;
1797 }
1798 
1800  if (partitions_) {
1801  const auto partition_count = counts() - offsets();
1802  CHECK_GE(partition_count, 0);
1803  return partition_count;
1804  }
1805  return 1; // non-partitioned window function
1806 }
1807 
1809  return window_func_->hasFraming() &&
1811 }
1812 
1814  // aggregate tree is constructed per window aggregate function kind, input expression,
1815  // partition key(s) and ordering key
1816  // this means when two window definitions have the same condition listed above but
1817  // differ in frame bound declaration,
1818  // they can share the same aggregate tree
1819  auto cache_key = boost::hash_value(::toString(window_func_->getKind()));
1820  boost::hash_combine(cache_key, ::toString(window_func_->getArgs()));
1821  boost::hash_combine(cache_key, ::toString(window_func_->getPartitionKeys()));
1822  boost::hash_combine(cache_key, ::toString(window_func_->getOrderKeys()));
1823  for (auto& order_entry : window_func_->getCollation()) {
1824  boost::hash_combine(cache_key, order_entry.toString());
1825  }
1826  return cache_key;
1827 }
1828 
1830  std::unique_ptr<WindowFunctionContext> window_function_context,
1831  const size_t target_index) {
1832  const auto it_ok = window_contexts_.emplace(
1833  std::make_pair(target_index, std::move(window_function_context)));
1834  CHECK(it_ok.second);
1835 }
1836 
1838  Executor* executor,
1839  const size_t target_index) const {
1840  const auto it = window_contexts_.find(target_index);
1841  CHECK(it != window_contexts_.end());
1842  executor->active_window_function_ = it->second.get();
1843  return executor->active_window_function_;
1844 }
1845 
1847  executor->active_window_function_ = nullptr;
1848 }
1849 
1851  Executor* executor) {
1852  return executor->active_window_function_;
1853 }
1854 
1856  executor->window_project_node_context_owned_ =
1857  std::make_unique<WindowProjectNodeContext>();
1858  return executor->window_project_node_context_owned_.get();
1859 }
1860 
1862  return executor->window_project_node_context_owned_.get();
1863 }
1864 
1865 void WindowProjectNodeContext::reset(Executor* executor) {
1866  executor->window_project_node_context_owned_ = nullptr;
1867  executor->active_window_function_ = nullptr;
1868 }
size_t getAggregateTreeFanout() const
bool g_enable_parallel_window_partition_sort
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< SumAndCountPair< double > * > derived_aggregate_tree_for_double_type_
Definition: WindowContext.h:76
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2794
void addOrderColumn(const int8_t *column, const SQLTypeInfo &ti, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
WindowFunctionContext::WindowComparatorResult fp_comparator_asc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
int64_t * ordered_partition_null_start_pos_
RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
bool allow_framing_on_time_or_date(SqlWindowFunctionKind kind)
const int32_t dummy_count_
bool isMissingValueFillingFunction() const
Definition: Analyzer.h:2855
std::vector< double * > aggregate_tree_for_double_type_
Definition: WindowContext.h:74
size_t get_target_idx_for_first_or_last_value_func(const Analyzer::WindowFunction *window_func, const size_t partition_size)
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
RUNTIME_EXPORT void add_window_pending_output(void *pending_output, const int64_t handle)
bool hasAggregateTreeRequiredWindowFunc() const
Definition: Analyzer.h:2842
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
int64_t * getNullValueEndPos() const
Utility functions for easy access to the result set buffers.
bool is_time_or_date() const
Definition: sqltypes.h:1030
const int32_t dummy_offset_
#define LOG(tag)
Definition: Logger.h:285
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool asc_ordering, const bool nulls_first)
bool is_fp() const
Definition: sqltypes.h:571
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< SQLTypeInfo > & getOrderKeyColumnBufferTypes() const
void setSortedPartitionCacheKey(QueryPlanHash cache_key)
#define UNREACHABLE()
Definition: Logger.h:338
void computeNullRangeOfSortedPartition(const SQLTypeInfo &order_col_ti, size_t partition_idx, const int32_t *original_col_idx_buf, const int64_t *ordered_col_idx_buf)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:306
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val, const int64_t bucket_size)
const int32_t * counts() const
Constants for Builtin SQL Types supported by HEAVY.AI.
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t g_parallel_window_partition_compute_threshold
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
size_t g_parallel_window_partition_sort_threshold
RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
bool is_number() const
Definition: sqltypes.h:574
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2802
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2820
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
ExecutorDeviceType
const bool needsToBuildAggregateTree() const
size_t * getAggregateTreeDepth() const
int64_t ** getAggregationTreesForIntegerTypeWindowExpr() const
const std::vector< const int8_t * > & getColumnBufferForWindowFunctionExpressions() const
int64_t * getNullValueStartPos() const
SumAndCountPair< double > ** getDerivedAggregationTreesForDoubleTypeWindowExpr() const
const int64_t * partitionStartOffset() const
std::vector< std::shared_ptr< void > > segment_trees_owned_
std::shared_ptr< std::vector< int64_t > > sorted_partition_buf_
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static const WindowProjectNodeContext * get(Executor *executor)
size_t g_window_function_aggregation_tree_fanout
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
const int64_t * aggregateStateCount() const
std::vector< Comparator > createComparator(size_t partition_idx)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2796
QueryPlanHash sorted_partition_cache_key_
void apply_nth_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size, const size_t target_pos)
bool is_integer() const
Definition: sqltypes.h:565
const int8_t * partitionEnd() const
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > window_func_expr_columns_owner_
void addColumnBufferForWindowFunctionExpression(const int8_t *column, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
int64_t aggregateStatePendingOutputs() const
void buildAggregationTreeForPartition(SqlWindowFunctionKind agg_type, size_t partition_idx, size_t partition_size, const int32_t *original_rowid_buf, const int64_t *ordered_rowid_buf, const SQLTypeInfo &input_col_ti)
RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
bool is_boolean() const
Definition: sqltypes.h:580
const int64_t * aggregateState() const
RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:79
std::pair< int64_t, int64_t > IndexPair
SQLTypes decimal_to_int_type(const SQLTypeInfo &ti)
Definition: Datum.cpp:561
SumAndCountPair< int64_t > ** getDerivedAggregationTreesForIntegerTypeWindowExpr() const
#define RUNTIME_EXPORT
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:43
#define CHECK_LT(x, y)
Definition: Logger.h:303
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
#define CHECK_LE(x, y)
Definition: Logger.h:304
RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
std::vector< const int8_t * > window_func_expr_columns_
SQLTypes get_int_type_by_size(size_t const nbytes)
Definition: sqltypes.h:1452
AggregateTreeForWindowFraming aggregate_trees_
void sortPartition(const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
bool pos_is_set(const int64_t bitset, const int64_t pos)
const std::vector< const int8_t * > & getOrderKeyColumnBuffers() const
std::shared_ptr< HashJoin > partitions_
size_t QueryPlanHash
void resizeStorageForWindowFraming(size_t partition_count)
Definition: WindowContext.h:79
const int64_t * partitionNumCountBuf() const
RUNTIME_EXPORT void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
void apply_original_index_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:122
void parallel_for(const blocked_range< Int > &range, const Body &body, const Partitioner &p=Partitioner())
void resizeStorageForWindowFraming(bool const for_reuse=false)
std::size_t hash_value(RexAbstractInput const &rex_ab_input)
Definition: RelAlgDag.cpp:3525
WindowFunctionContext::WindowComparatorResult integer_comparator_desc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
void computePartitionBuffer(const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
int64_t * partition_start_offset_
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:412
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
const int64_t * sortedPartition() const
const QueryPlanHash computeAggregateTreeCacheKey() const
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
std::vector< SQLTypeInfo > order_columns_ti_
std::vector< SumAndCountPair< int64_t > * > derived_aggregate_tree_for_integer_type_
Definition: WindowContext.h:75
const Analyzer::WindowFunction * getWindowFunction() const
Definition: sqltypes.h:72
const int32_t * payload() const
IndexPair find_null_range_int(int8_t const *order_col_buf) const
std::function< WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)> Comparator
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
size_t * aggregate_trees_depth_
RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
constexpr double n
Definition: Utm.h:38
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
WindowFunctionContext::WindowComparatorResult fp_comparator_desc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getPartitionKeys() const
Definition: Analyzer.h:2798
bool is_decimal() const
Definition: sqltypes.h:568
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
bool hasFraming() const
Definition: Analyzer.h:2830
std::vector< int64_t * > aggregate_tree_for_integer_type_
Definition: WindowContext.h:73
WindowFunctionContext::WindowComparatorResult integer_comparator_asc(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool asc_ordering, const bool nulls_first)
IndexPair find_null_range_fp(int8_t const *order_col_buf) const
ThreadLocalIds thread_local_ids()
Definition: Logger.cpp:880
const ExecutorDeviceType device_type_
#define VLOG(n)
Definition: Logger.h:388
void compute(std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, std::unordered_map< QueryPlanHash, std::shared_ptr< std::vector< int64_t >>> &sorted_partition_cache, std::unordered_map< QueryPlanHash, AggregateTreeForWindowFraming > &aggregate_tree_map)
double ** getAggregationTreesForDoubleTypeWindowExpr() const
int64_t * ordered_partition_null_end_pos_