OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <numeric>
20 
22 #include "QueryEngine/Execute.h"
27 #include "Shared/Intervals.h"
28 #include "Shared/checked_alloc.h"
29 #include "Shared/funcannotations.h"
30 #include "Shared/threading.h"
31 
32 #ifdef HAVE_TBB
33 //#include <tbb/parallel_for.h>
34 #include <tbb/parallel_sort.h>
35 #else
36 #include <thrust/sort.h>
37 #endif
38 
41 
44 
45 // Non-partitioned version (no join table provided)
47  const Analyzer::WindowFunction* window_func,
48  const size_t elem_count,
49  const ExecutorDeviceType device_type,
50  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
51  : window_func_(window_func)
52  , partitions_(nullptr)
53  , elem_count_(elem_count)
54  , output_(nullptr)
55  , partition_start_(nullptr)
56  , partition_end_(nullptr)
57  , device_type_(device_type)
58  , row_set_mem_owner_(row_set_mem_owner)
59  , dummy_count_(elem_count)
60  , dummy_offset_(0)
61  , dummy_payload_(nullptr) {
62  CHECK_LE(elem_count_, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
63  if (elem_count_ > 0) {
65  reinterpret_cast<int32_t*>(checked_malloc(elem_count_ * sizeof(int32_t)));
67  }
68 }
69 
70 // Partitioned version
72  const Analyzer::WindowFunction* window_func,
73  const std::shared_ptr<HashJoin>& partitions,
74  const size_t elem_count,
75  const ExecutorDeviceType device_type,
76  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
77  : window_func_(window_func)
78  , partitions_(partitions)
79  , elem_count_(elem_count)
80  , output_(nullptr)
81  , partition_start_(nullptr)
82  , partition_end_(nullptr)
83  , device_type_(device_type)
84  , row_set_mem_owner_(row_set_mem_owner)
85  , dummy_count_(elem_count)
86  , dummy_offset_(0)
87  , dummy_payload_(nullptr) {
88  CHECK(partitions_); // This version should have hash table
89 }
90 
92  free(partition_start_);
93  free(partition_end_);
94  if (dummy_payload_) {
95  free(dummy_payload_);
96  }
97 }
98 
100  const int8_t* column,
101  const Analyzer::ColumnVar* col_var,
102  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
103  order_columns_owner_.push_back(chunks_owner);
104  order_columns_.push_back(column);
105 }
106 
107 namespace {
108 
109 // Converts the sorted indices to a mapping from row position to row number.
110 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
111  std::vector<int64_t> row_numbers(index_size);
112  for (size_t i = 0; i < index_size; ++i) {
113  row_numbers[index[i]] = i + 1;
114  }
115  return row_numbers;
116 }
117 
118 // Returns true iff the current element is greater than the previous, according to the
119 // comparator. This is needed because peer rows have to have the same rank.
121  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
122  const int64_t* index,
123  const size_t i) {
124  if (i == 0) {
125  return false;
126  }
127  return comparator(index[i - 1], index[i]);
128 }
129 
130 // Computes the mapping from row position to rank.
131 std::vector<int64_t> index_to_rank(
132  const int64_t* index,
133  const size_t index_size,
134  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
135  std::vector<int64_t> rank(index_size);
136  size_t crt_rank = 1;
137  for (size_t i = 0; i < index_size; ++i) {
138  if (advance_current_rank(comparator, index, i)) {
139  crt_rank = i + 1;
140  }
141  rank[index[i]] = crt_rank;
142  }
143  return rank;
144 }
145 
146 // Computes the mapping from row position to dense rank.
147 std::vector<int64_t> index_to_dense_rank(
148  const int64_t* index,
149  const size_t index_size,
150  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
151  std::vector<int64_t> dense_rank(index_size);
152  size_t crt_rank = 1;
153  for (size_t i = 0; i < index_size; ++i) {
154  if (advance_current_rank(comparator, index, i)) {
155  ++crt_rank;
156  }
157  dense_rank[index[i]] = crt_rank;
158  }
159  return dense_rank;
160 }
161 
162 // Computes the mapping from row position to percent rank.
163 std::vector<double> index_to_percent_rank(
164  const int64_t* index,
165  const size_t index_size,
166  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
167  std::vector<double> percent_rank(index_size);
168  size_t crt_rank = 1;
169  for (size_t i = 0; i < index_size; ++i) {
170  if (advance_current_rank(comparator, index, i)) {
171  crt_rank = i + 1;
172  }
173  percent_rank[index[i]] =
174  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
175  }
176  return percent_rank;
177 }
178 
179 // Computes the mapping from row position to cumulative distribution.
180 std::vector<double> index_to_cume_dist(
181  const int64_t* index,
182  const size_t index_size,
183  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
184  std::vector<double> cume_dist(index_size);
185  size_t start_peer_group = 0;
186  while (start_peer_group < index_size) {
187  size_t end_peer_group = start_peer_group + 1;
188  while (end_peer_group < index_size &&
189  !advance_current_rank(comparator, index, end_peer_group)) {
190  ++end_peer_group;
191  }
192  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
193  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
194  }
195  start_peer_group = end_peer_group;
196  }
197  return cume_dist;
198 }
199 
200 // Computes the mapping from row position to the n-tile statistic.
201 std::vector<int64_t> index_to_ntile(const int64_t* index,
202  const size_t index_size,
203  const size_t n) {
204  std::vector<int64_t> row_numbers(index_size);
205  if (!n) {
206  throw std::runtime_error("NTILE argument cannot be zero");
207  }
208  const size_t tile_size = (index_size + n - 1) / n;
209  for (size_t i = 0; i < index_size; ++i) {
210  row_numbers[index[i]] = i / tile_size + 1;
211  }
212  return row_numbers;
213 }
214 
215 // The element size in the result buffer for the given window function kind. Currently
216 // it's always 8.
218  return 8;
219 }
220 
221 // Extracts the integer constant from a constant expression.
223  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
224  if (!lag_constant) {
225  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
226  }
227  const auto& lag_ti = lag_constant->get_type_info();
228  switch (lag_ti.get_type()) {
229  case kSMALLINT: {
230  return lag_constant->get_constval().smallintval;
231  }
232  case kINT: {
233  return lag_constant->get_constval().intval;
234  }
235  case kBIGINT: {
236  return lag_constant->get_constval().bigintval;
237  }
238  default: {
239  LOG(FATAL) << "Invalid type for the lag argument";
240  }
241  }
242  return 0;
243 }
244 
245 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
247  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
248  window_func->getKind() == SqlWindowFunctionKind::LEAD);
249  const auto& args = window_func->getArgs();
250  if (args.size() == 3) {
251  throw std::runtime_error("LAG with default not supported yet");
252  }
253  if (args.size() == 2) {
254  const int64_t lag_or_lead =
255  static_cast<int64_t>(get_int_constant_from_expr(args[1].get()));
256  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
257  : -lag_or_lead;
258  }
259  CHECK_EQ(args.size(), size_t(1));
260  return window_func->getKind() == SqlWindowFunctionKind::LAG ? 1 : -1;
261 }
262 
263 // Redistributes the original_indices according to the permutation given by
264 // output_for_partition_buff, reusing it as an output buffer.
265 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
266  const int32_t* original_indices,
267 
268  const size_t partition_size) {
269  std::vector<int64_t> new_output_for_partition_buff(partition_size);
270  for (size_t i = 0; i < partition_size; ++i) {
271  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
272  }
273  std::copy(new_output_for_partition_buff.begin(),
274  new_output_for_partition_buff.end(),
275  output_for_partition_buff);
276 }
277 
278 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
279 void apply_lag_to_partition(const int64_t lag,
280  const int32_t* original_indices,
281  int64_t* sorted_indices,
282  const size_t partition_size) {
283  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
284  for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
285  int64_t lag_idx = idx - lag;
286  if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
287  continue;
288  }
289  lag_sorted_indices[idx] = sorted_indices[lag_idx];
290  }
291  std::vector<int64_t> lag_original_indices(partition_size);
292  for (size_t k = 0; k < partition_size; ++k) {
293  const auto lag_index = lag_sorted_indices[k];
294  lag_original_indices[sorted_indices[k]] =
295  lag_index != -1 ? original_indices[lag_index] : -1;
296  }
297  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
298 }
299 
300 // Computes first value function for the given output_for_partition_buff, reusing it as an
301 // output buffer.
302 void apply_first_value_to_partition(const int32_t* original_indices,
303  int64_t* output_for_partition_buff,
304  const size_t partition_size) {
305  const auto first_value_idx = original_indices[output_for_partition_buff[0]];
306  std::fill(output_for_partition_buff,
307  output_for_partition_buff + partition_size,
308  first_value_idx);
309 }
310 
311 // Computes last value function for the given output_for_partition_buff, reusing it as an
312 // output buffer.
313 void apply_last_value_to_partition(const int32_t* original_indices,
314  int64_t* output_for_partition_buff,
315  const size_t partition_size) {
316  std::copy(
317  original_indices, original_indices + partition_size, output_for_partition_buff);
318 }
319 
321  const int8_t* partition_end,
322  const size_t off,
323  const int64_t* index,
324  const size_t index_size,
325  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
326  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
327  for (size_t i = 0; i < index_size; ++i) {
328  if (advance_current_rank(comparator, index, i)) {
329  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0);
330  }
331  }
332  CHECK(index_size);
333  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0);
334 }
335 
336 bool pos_is_set(const int64_t bitset, const int64_t pos) {
337  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
338 }
339 
340 // Write value to pending integer outputs collected for all the peer rows. The end of
341 // groups is represented by the bitset.
342 template <class T>
343 void apply_window_pending_outputs_int(const int64_t handle,
344  const int64_t value,
345  const int64_t bitset,
346  const int64_t pos) {
347  if (!pos_is_set(bitset, pos)) {
348  return;
349  }
350  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
351  for (auto pending_output_slot : pending_output_slots) {
352  *reinterpret_cast<T*>(pending_output_slot) = value;
353  }
354  pending_output_slots.clear();
355 }
356 
357 } // namespace
358 
359 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle,
360  const int64_t value,
361  const int64_t bitset,
362  const int64_t pos) {
363  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
364 }
365 
366 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle,
367  const int64_t value,
368  const int64_t bitset,
369  const int64_t pos) {
370  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
371 }
372 
373 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle,
374  const int64_t value,
375  const int64_t bitset,
376  const int64_t pos) {
377  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
378 }
379 
380 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle,
381  const int64_t value,
382  const int64_t bitset,
383  const int64_t pos) {
384  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
385 }
386 
387 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle,
388  const double value,
389  const int64_t bitset,
390  const int64_t pos) {
391  if (!pos_is_set(bitset, pos)) {
392  return;
393  }
394  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
395  for (auto pending_output_slot : pending_output_slots) {
396  *reinterpret_cast<double*>(pending_output_slot) = value;
397  }
398  pending_output_slots.clear();
399 }
400 
401 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle,
402  const float value,
403  const int64_t bitset,
404  const int64_t pos) {
405  if (!pos_is_set(bitset, pos)) {
406  return;
407  }
408  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
409  for (auto pending_output_slot : pending_output_slots) {
410  *reinterpret_cast<double*>(pending_output_slot) = value;
411  }
412  pending_output_slots.clear();
413 }
414 
416  const int64_t handle,
417  const float value,
418  const int64_t bitset,
419  const int64_t pos) {
420  if (!pos_is_set(bitset, pos)) {
421  return;
422  }
423  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
424  for (auto pending_output_slot : pending_output_slots) {
425  *reinterpret_cast<float*>(pending_output_slot) = value;
426  }
427  pending_output_slots.clear();
428 }
429 
430 // Add a pending output slot to be written back at the end of a peer row group.
431 extern "C" RUNTIME_EXPORT void add_window_pending_output(void* pending_output,
432  const int64_t handle) {
433  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
434 }
435 
436 // Returns true iff the aggregate window function requires special multiplicity handling
437 // to ensure that peer rows have the same value for the window function.
439  if (!window_function_is_aggregate(window_func->getKind())) {
440  return false;
441  }
442  if (window_func->getOrderKeys().empty()) {
443  return true;
444  }
445  switch (window_func->getKind()) {
448  return false;
449  }
450  default: {
451  return true;
452  }
453  }
454 }
455 
456 void WindowFunctionContext::computePartition(const size_t partition_idx,
457  int64_t* output_for_partition_buff) {
458  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
459  if (partition_size == 0) {
460  return;
461  }
462  const auto offset = offsets()[partition_idx];
463  std::iota(
464  output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
465  std::vector<Comparator> comparators;
466  const auto& order_keys = window_func_->getOrderKeys();
467  const auto& collation = window_func_->getCollation();
468  CHECK_EQ(order_keys.size(), collation.size());
469  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
470  ++order_column_idx) {
471  auto order_column_buffer = order_columns_[order_column_idx];
472  const auto order_col =
473  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
474  CHECK(order_col);
475  const auto& order_col_collation = collation[order_column_idx];
476  const auto asc_comparator = makeComparator(order_col,
477  order_column_buffer,
478  payload() + offset,
479  order_col_collation.nulls_first);
480  auto comparator = asc_comparator;
481  if (order_col_collation.is_desc) {
482  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
483  return asc_comparator(rhs, lhs);
484  };
485  }
486  comparators.push_back(comparator);
487  }
488  const auto col_tuple_comparator = [&comparators](const int64_t lhs, const int64_t rhs) {
489  for (const auto& comparator : comparators) {
490  if (comparator(lhs, rhs)) {
491  return true;
492  }
493  }
494  return false;
495  };
496 
498  partition_size >= g_parallel_window_partition_sort_threshold) {
499 #ifdef HAVE_TBB
500  tbb::parallel_sort(output_for_partition_buff,
501  output_for_partition_buff + partition_size,
502  col_tuple_comparator);
503 #else
504  thrust::sort(output_for_partition_buff,
505  output_for_partition_buff + partition_size,
506  col_tuple_comparator);
507 #endif
508  } else {
509  std::sort(output_for_partition_buff,
510  output_for_partition_buff + partition_size,
511  col_tuple_comparator);
512  }
513  computePartitionBuffer(output_for_partition_buff,
514  partition_size,
515  offset,
516  window_func_,
517  col_tuple_comparator);
518 }
519 
521  auto timer = DEBUG_TIMER(__func__);
522  CHECK(!output_);
523  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(
525  /*thread_idx=*/0));
526  const bool is_window_function_aggregate =
528  if (is_window_function_aggregate) {
532  }
533  }
534 
535  std::unique_ptr<int64_t[]> scratchpad;
536  int64_t* intermediate_output_buffer;
537  if (is_window_function_aggregate) {
538  intermediate_output_buffer = reinterpret_cast<int64_t*>(output_);
539  } else {
540  scratchpad.reset(new int64_t[elem_count_]);
541  intermediate_output_buffer = scratchpad.get();
542  }
543 
544  const size_t partition_count{partitionCount()};
545 
546  const auto compute_partitions = [&](const size_t start, const size_t end) {
547  for (size_t partition_idx = start; partition_idx != end; ++partition_idx) {
548  computePartition(partition_idx,
549  intermediate_output_buffer + offsets()[partition_idx]);
550  }
551  };
552 
553  const bool should_parallelize{g_enable_parallel_window_partition_compute &&
554  elem_count_ >=
556  if (should_parallelize) {
557  auto timer = DEBUG_TIMER("Window Function Partition Compute");
558  threading::task_group thread_pool;
559  for (auto interval : makeIntervals<size_t>(0, partition_count, cpu_threads())) {
560  thread_pool.run([=] { compute_partitions(interval.begin, interval.end); });
561  }
562  thread_pool.wait();
563  } else {
564  auto timer = DEBUG_TIMER("Window Function Non-Parallelized Partition Compute");
565  compute_partitions(0, partition_count);
566  }
567 
568  if (is_window_function_aggregate) {
569  // If window function is aggregate we were able to write to the final output buffer
570  // directly in computePartition and we are done.
571  return;
572  }
573 
574  auto output_i64 = reinterpret_cast<int64_t*>(output_);
575 
576  const auto payload_copy = [&](const size_t start, const size_t end) {
577  for (size_t i = start; i < end; ++i) {
578  output_i64[payload()[i]] = intermediate_output_buffer[i];
579  }
580  };
581 
582  if (should_parallelize) {
583  auto timer = DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Parallelized");
584  threading::task_group thread_pool;
585  for (auto interval : makeIntervals<size_t>(0, elem_count_, cpu_threads())) {
586  thread_pool.run([=] { payload_copy(interval.begin, interval.end); });
587  }
588  thread_pool.wait();
589  } else {
590  auto timer =
591  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Non-Parallelized");
592  payload_copy(0, elem_count_);
593  }
594 }
595 
597  return window_func_;
598 }
599 
600 const int8_t* WindowFunctionContext::output() const {
601  return output_;
602 }
603 
606  return &aggregate_state_.val;
607 }
608 
611  return &aggregate_state_.count;
612 }
613 
616  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
617 }
618 
620  return partition_start_;
621 }
622 
623 const int8_t* WindowFunctionContext::partitionEnd() const {
624  return partition_end_;
625 }
626 
628  return elem_count_;
629 }
630 
631 namespace {
632 
633 template <class T>
634 bool integer_comparator(const int8_t* order_column_buffer,
635  const SQLTypeInfo& ti,
636  const int32_t* partition_indices,
637  const int64_t lhs,
638  const int64_t rhs,
639  const bool nulls_first) {
640  const auto values = reinterpret_cast<const T*>(order_column_buffer);
641  const auto lhs_val = values[partition_indices[lhs]];
642  const auto rhs_val = values[partition_indices[rhs]];
643  const auto null_val = inline_fixed_encoding_null_val(ti);
644  if (lhs_val == null_val && rhs_val == null_val) {
645  return false;
646  }
647  if (lhs_val == null_val && rhs_val != null_val) {
648  return nulls_first;
649  }
650  if (rhs_val == null_val && lhs_val != null_val) {
651  return !nulls_first;
652  }
653  return lhs_val < rhs_val;
654 }
655 
656 template <class T, class NullPatternType>
657 bool fp_comparator(const int8_t* order_column_buffer,
658  const SQLTypeInfo& ti,
659  const int32_t* partition_indices,
660  const int64_t lhs,
661  const int64_t rhs,
662  const bool nulls_first) {
663  const auto values = reinterpret_cast<const T*>(order_column_buffer);
664  const auto lhs_val = values[partition_indices[lhs]];
665  const auto rhs_val = values[partition_indices[rhs]];
666  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
667  const auto lhs_bit_pattern =
668  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
669  const auto rhs_bit_pattern =
670  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
671  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
672  return false;
673  }
674  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
675  return nulls_first;
676  }
677  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
678  return !nulls_first;
679  }
680  return lhs_val < rhs_val;
681 }
682 
683 } // namespace
684 
685 std::function<bool(const int64_t lhs, const int64_t rhs)>
687  const int8_t* order_column_buffer,
688  const int32_t* partition_indices,
689  const bool nulls_first) {
690  const auto& ti = col_var->get_type_info();
691  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
692  switch (ti.get_size()) {
693  case 8: {
694  return [order_column_buffer, nulls_first, partition_indices, &ti](
695  const int64_t lhs, const int64_t rhs) {
696  return integer_comparator<int64_t>(
697  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
698  };
699  }
700  case 4: {
701  return [order_column_buffer, nulls_first, partition_indices, &ti](
702  const int64_t lhs, const int64_t rhs) {
703  return integer_comparator<int32_t>(
704  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
705  };
706  }
707  case 2: {
708  return [order_column_buffer, nulls_first, partition_indices, &ti](
709  const int64_t lhs, const int64_t rhs) {
710  return integer_comparator<int16_t>(
711  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
712  };
713  }
714  case 1: {
715  return [order_column_buffer, nulls_first, partition_indices, &ti](
716  const int64_t lhs, const int64_t rhs) {
717  return integer_comparator<int8_t>(
718  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
719  };
720  }
721  default: {
722  LOG(FATAL) << "Invalid type size: " << ti.get_size();
723  }
724  }
725  }
726  if (ti.is_fp()) {
727  switch (ti.get_type()) {
728  case kFLOAT: {
729  return [order_column_buffer, nulls_first, partition_indices, &ti](
730  const int64_t lhs, const int64_t rhs) {
731  return fp_comparator<float, int32_t>(
732  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
733  };
734  }
735  case kDOUBLE: {
736  return [order_column_buffer, nulls_first, partition_indices, &ti](
737  const int64_t lhs, const int64_t rhs) {
738  return fp_comparator<double, int64_t>(
739  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
740  };
741  }
742  default: {
743  LOG(FATAL) << "Invalid float type";
744  }
745  }
746  }
747  throw std::runtime_error("Type not supported yet");
748 }
749 
751  int64_t* output_for_partition_buff,
752  const size_t partition_size,
753  const size_t off,
754  const Analyzer::WindowFunction* window_func,
755  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
756  switch (window_func->getKind()) {
758  const auto row_numbers =
759  index_to_row_number(output_for_partition_buff, partition_size);
760  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
761  break;
762  }
764  const auto rank =
765  index_to_rank(output_for_partition_buff, partition_size, comparator);
766  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
767  break;
768  }
770  const auto dense_rank =
771  index_to_dense_rank(output_for_partition_buff, partition_size, comparator);
772  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
773  break;
774  }
776  const auto percent_rank =
777  index_to_percent_rank(output_for_partition_buff, partition_size, comparator);
778  std::copy(percent_rank.begin(),
779  percent_rank.end(),
780  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
781  break;
782  }
784  const auto cume_dist =
785  index_to_cume_dist(output_for_partition_buff, partition_size, comparator);
786  std::copy(cume_dist.begin(),
787  cume_dist.end(),
788  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
789  break;
790  }
792  const auto& args = window_func->getArgs();
793  CHECK_EQ(args.size(), size_t(1));
794  const auto n = get_int_constant_from_expr(args.front().get());
795  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
796  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
797  break;
798  }
801  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
802  const auto partition_row_offsets = payload() + off;
804  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
805  break;
806  }
808  const auto partition_row_offsets = payload() + off;
810  partition_row_offsets, output_for_partition_buff, partition_size);
811  break;
812  }
814  const auto partition_row_offsets = payload() + off;
816  partition_row_offsets, output_for_partition_buff, partition_size);
817  break;
818  }
824  const auto partition_row_offsets = payload() + off;
825  if (window_function_requires_peer_handling(window_func)) {
827  partitionEnd(), off, output_for_partition_buff, partition_size, comparator);
828  }
830  output_for_partition_buff, partition_row_offsets, partition_size);
831  break;
832  }
833  default: {
834  throw std::runtime_error("Window function not supported yet: " +
835  ::toString(window_func->getKind()));
836  }
837  }
838 }
839 
842  0,
843  static_cast<int64_t>(elem_count_),
844  false,
846  1};
847  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
848  if (partitions_) {
849  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
850  }
851  partition_start_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
852  int64_t partition_count = partitionCount();
853  std::vector<size_t> partition_offsets(partition_count);
854  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
855  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
856  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
857  for (int64_t i = 0; i < partition_count - 1; ++i) {
858  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
859  }
860 }
861 
864  0,
865  static_cast<int64_t>(elem_count_),
866  false,
868  1};
869  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
870  if (partitions_) {
871  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
872  }
873  partition_end_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
874  int64_t partition_count = partitionCount();
875  std::vector<size_t> partition_offsets(partition_count);
876  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
877  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
878  for (int64_t i = 0; i < partition_count - 1; ++i) {
879  if (partition_offsets[i] == 0) {
880  continue;
881  }
882  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
883  }
884  if (elem_count_) {
885  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
886  }
887 }
888 
889 const int32_t* WindowFunctionContext::payload() const {
890  if (partitions_) {
891  return reinterpret_cast<const int32_t*>(
892  partitions_->getJoinHashBuffer(device_type_, 0) +
893  partitions_->payloadBufferOff());
894  }
895  return dummy_payload_; // non-partitioned window function
896 }
897 
898 const int32_t* WindowFunctionContext::offsets() const {
899  if (partitions_) {
900  return reinterpret_cast<const int32_t*>(
901  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
902  }
903  return &dummy_offset_;
904 }
905 
906 const int32_t* WindowFunctionContext::counts() const {
907  if (partitions_) {
908  return reinterpret_cast<const int32_t*>(
909  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
910  }
911  return &dummy_count_;
912 }
913 
915  if (partitions_) {
916  const auto partition_count = counts() - offsets();
917  CHECK_GE(partition_count, 0);
918  return partition_count;
919  }
920  return 1; // non-partitioned window function
921 }
922 
924  std::unique_ptr<WindowFunctionContext> window_function_context,
925  const size_t target_index) {
926  const auto it_ok = window_contexts_.emplace(
927  std::make_pair(target_index, std::move(window_function_context)));
928  CHECK(it_ok.second);
929 }
930 
932  Executor* executor,
933  const size_t target_index) const {
934  const auto it = window_contexts_.find(target_index);
935  CHECK(it != window_contexts_.end());
936  executor->active_window_function_ = it->second.get();
937  return executor->active_window_function_;
938 }
939 
941  executor->active_window_function_ = nullptr;
942 }
943 
945  Executor* executor) {
946  return executor->active_window_function_;
947 }
948 
950  executor->window_project_node_context_owned_ =
951  std::make_unique<WindowProjectNodeContext>();
952  return executor->window_project_node_context_owned_.get();
953 }
954 
956  return executor->window_project_node_context_owned_.get();
957 }
958 
959 void WindowProjectNodeContext::reset(Executor* executor) {
960  executor->window_project_node_context_owned_ = nullptr;
961  executor->active_window_function_ = nullptr;
962 }
bool g_enable_parallel_window_partition_sort
#define CHECK_EQ(x, y)
Definition: Logger.h:219
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1607
void computePartitionBuffer(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
std::string toString(const ExtArgumentType &sig_type)
const int32_t dummy_count_
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
RUNTIME_EXPORT void add_window_pending_output(void *pending_output, const int64_t handle)
ExecutorDeviceType
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
Utility functions for easy access to the result set buffers.
const int32_t dummy_offset_
#define LOG(tag)
Definition: Logger.h:205
const int8_t * partitionStart() const
void computePartition(const size_t partition_idx, int64_t *output_for_partition_buff)
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
bool fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
#define CHECK_GE(x, y)
Definition: Logger.h:224
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t g_parallel_window_partition_compute_threshold
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
size_t g_parallel_window_partition_sort_threshold
RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1615
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:1619
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static const WindowProjectNodeContext * get(Executor *executor)
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const int64_t * aggregateStateCount() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:1609
bool integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
const int8_t * partitionEnd() const
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
int64_t aggregateStatePendingOutputs() const
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const int64_t * aggregateState() const
RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
#define RUNTIME_EXPORT
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
#define CHECK_LE(x, y)
Definition: Logger.h:222
RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
bool pos_is_set(const int64_t bitset, const int64_t pos)
std::shared_ptr< HashJoin > partitions_
RUNTIME_EXPORT void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:83
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
const Analyzer::WindowFunction * getWindowFunction() const
Definition: sqltypes.h:45
const int32_t * payload() const
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
constexpr double n
Definition: Utm.h:39
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
int cpu_threads()
Definition: thread_count.h:24
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const ExecutorDeviceType device_type_