OmniSciDB  04ee39c94c
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "WindowContext.h"
18 #include <numeric>
19 #include "../Shared/checked_alloc.h"
20 #include "../Shared/sql_window_function_to_string.h"
24 #include "RuntimeFunctions.h"
25 #include "TypePunning.h"
26 
28  const Analyzer::WindowFunction* window_func,
29  const std::shared_ptr<JoinHashTableInterface>& partitions,
30  const size_t elem_count,
31  const ExecutorDeviceType device_type)
32  : window_func_(window_func)
33  , partitions_(partitions)
34  , elem_count_(elem_count)
35  , output_(nullptr)
36  , partition_start_(nullptr)
37  , partition_end_(nullptr)
38  , device_type_(device_type) {}
39 
41  free(output_);
42  free(partition_start_);
43  free(partition_end_);
44 }
45 
47  const int8_t* column,
48  const Analyzer::ColumnVar* col_var,
49  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
50  order_columns_owner_.push_back(chunks_owner);
51  order_columns_.push_back(column);
52 }
53 
54 namespace {
55 
56 // Converts the sorted indices to a mapping from row position to row number.
57 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
58  std::vector<int64_t> row_numbers(index_size);
59  for (size_t i = 0; i < index_size; ++i) {
60  row_numbers[index[i]] = i + 1;
61  }
62  return row_numbers;
63 }
64 
65 // Returns true iff the current element is greater than the previous, according to the
66 // comparator. This is needed because peer rows have to have the same rank.
68  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
69  const int64_t* index,
70  const size_t i) {
71  if (i == 0) {
72  return false;
73  }
74  return comparator(index[i - 1], index[i]);
75 }
76 
77 // Computes the mapping from row position to rank.
78 std::vector<int64_t> index_to_rank(
79  const int64_t* index,
80  const size_t index_size,
81  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
82  std::vector<int64_t> rank(index_size);
83  size_t crt_rank = 1;
84  for (size_t i = 0; i < index_size; ++i) {
85  if (advance_current_rank(comparator, index, i)) {
86  crt_rank = i + 1;
87  }
88  rank[index[i]] = crt_rank;
89  }
90  return rank;
91 }
92 
93 // Computes the mapping from row position to dense rank.
94 std::vector<int64_t> index_to_dense_rank(
95  const int64_t* index,
96  const size_t index_size,
97  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
98  std::vector<int64_t> dense_rank(index_size);
99  size_t crt_rank = 1;
100  for (size_t i = 0; i < index_size; ++i) {
101  if (advance_current_rank(comparator, index, i)) {
102  ++crt_rank;
103  }
104  dense_rank[index[i]] = crt_rank;
105  }
106  return dense_rank;
107 }
108 
109 // Computes the mapping from row position to percent rank.
110 std::vector<double> index_to_percent_rank(
111  const int64_t* index,
112  const size_t index_size,
113  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
114  std::vector<double> percent_rank(index_size);
115  size_t crt_rank = 1;
116  for (size_t i = 0; i < index_size; ++i) {
117  if (advance_current_rank(comparator, index, i)) {
118  crt_rank = i + 1;
119  }
120  percent_rank[index[i]] =
121  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
122  }
123  return percent_rank;
124 }
125 
126 // Computes the mapping from row position to cumulative distribution.
127 std::vector<double> index_to_cume_dist(
128  const int64_t* index,
129  const size_t index_size,
130  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
131  std::vector<double> cume_dist(index_size);
132  size_t start_peer_group = 0;
133  while (start_peer_group < index_size) {
134  size_t end_peer_group = start_peer_group + 1;
135  while (end_peer_group < index_size &&
136  !advance_current_rank(comparator, index, end_peer_group)) {
137  ++end_peer_group;
138  }
139  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
140  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
141  }
142  start_peer_group = end_peer_group;
143  }
144  return cume_dist;
145 }
146 
147 // Computes the mapping from row position to the n-tile statistic.
148 std::vector<int64_t> index_to_ntile(const int64_t* index,
149  const size_t index_size,
150  const size_t n) {
151  std::vector<int64_t> row_numbers(index_size);
152  if (!n) {
153  throw std::runtime_error("NTILE argument cannot be zero");
154  }
155  const size_t tile_size = (index_size + n - 1) / n;
156  for (size_t i = 0; i < index_size; ++i) {
157  row_numbers[index[i]] = i / tile_size + 1;
158  }
159  return row_numbers;
160 }
161 
162 // The element size in the result buffer for the given window function kind. Currently
163 // it's always 8.
165  return 8;
166 }
167 
168 // Extracts the integer constant from a constant expression.
170  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
171  if (!lag_constant) {
172  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
173  }
174  const auto& lag_ti = lag_constant->get_type_info();
175  switch (lag_ti.get_type()) {
176  case kSMALLINT: {
177  return lag_constant->get_constval().smallintval;
178  }
179  case kINT: {
180  return lag_constant->get_constval().intval;
181  }
182  case kBIGINT: {
183  return lag_constant->get_constval().bigintval;
184  }
185  default: {
186  LOG(FATAL) << "Invalid type for the lag argument";
187  }
188  }
189  return 0;
190 }
191 
192 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
194  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
195  window_func->getKind() == SqlWindowFunctionKind::LEAD);
196  const auto& args = window_func->getArgs();
197  if (args.size() == 3) {
198  throw std::runtime_error("LAG with default not supported yet");
199  }
200  if (args.size() == 2) {
201  const ssize_t lag_or_lead = get_int_constant_from_expr(args[1].get());
202  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
203  : -lag_or_lead;
204  }
205  CHECK_EQ(args.size(), size_t(1));
206  return 1;
207 }
208 
209 // Redistributes the original_indices according to the permutation given by
210 // output_for_partition_buff, reusing it as an output buffer.
211 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
212  const int32_t* original_indices,
213  const size_t partition_size) {
214  std::vector<int64_t> new_output_for_partition_buff(partition_size);
215  for (size_t i = 0; i < partition_size; ++i) {
216  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
217  }
218  std::copy(new_output_for_partition_buff.begin(),
219  new_output_for_partition_buff.end(),
220  output_for_partition_buff);
221 }
222 
223 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
224 void apply_lag_to_partition(const ssize_t lag,
225  const int32_t* original_indices,
226  int64_t* sorted_indices,
227  const size_t partition_size) {
228  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
229  for (ssize_t idx = 0; idx < static_cast<ssize_t>(partition_size); ++idx) {
230  ssize_t lag_idx = idx - lag;
231  if (lag_idx < 0 || lag_idx >= static_cast<ssize_t>(partition_size)) {
232  continue;
233  }
234  lag_sorted_indices[idx] = sorted_indices[lag_idx];
235  }
236  std::vector<int64_t> lag_original_indices(partition_size);
237  for (size_t k = 0; k < partition_size; ++k) {
238  const auto lag_index = lag_sorted_indices[k];
239  lag_original_indices[sorted_indices[k]] =
240  lag_index != -1 ? original_indices[lag_index] : -1;
241  }
242  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
243 }
244 
245 // Computes first value function for the given output_for_partition_buff, reusing it as an
246 // output buffer.
247 void apply_first_value_to_partition(const int32_t* original_indices,
248  int64_t* output_for_partition_buff,
249  const size_t partition_size) {
250  const auto first_value_idx = original_indices[output_for_partition_buff[0]];
251  std::fill(output_for_partition_buff,
252  output_for_partition_buff + partition_size,
253  first_value_idx);
254 }
255 
256 // Computes last value function for the given output_for_partition_buff, reusing it as an
257 // output buffer.
258 void apply_last_value_to_partition(const int32_t* original_indices,
259  int64_t* output_for_partition_buff,
260  const size_t partition_size) {
261  std::copy(
262  original_indices, original_indices + partition_size, output_for_partition_buff);
263 }
264 
266  const int8_t* partition_end,
267  const size_t off,
268  const int64_t* index,
269  const size_t index_size,
270  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
271  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
272  for (size_t i = 0; i < index_size; ++i) {
273  if (advance_current_rank(comparator, index, i)) {
274  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0);
275  }
276  }
277  CHECK(index_size);
278  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0);
279 }
280 
281 bool pos_is_set(const int64_t bitset, const int64_t pos) {
282  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
283 }
284 
285 // Write value to pending integer outputs collected for all the peer rows. The end of
286 // groups is represented by the bitset.
287 template <class T>
289  const int64_t value,
290  const int64_t bitset,
291  const int64_t pos) {
292  if (!pos_is_set(bitset, pos)) {
293  return;
294  }
295  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
296  for (auto pending_output_slot : pending_output_slots) {
297  *reinterpret_cast<T*>(pending_output_slot) = value;
298  }
299  pending_output_slots.clear();
300 }
301 
302 } // namespace
303 
304 extern "C" void apply_window_pending_outputs_int64(const int64_t handle,
305  const int64_t value,
306  const int64_t bitset,
307  const int64_t pos) {
308  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
309 }
310 
311 extern "C" void apply_window_pending_outputs_int32(const int64_t handle,
312  const int64_t value,
313  const int64_t bitset,
314  const int64_t pos) {
315  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
316 }
317 
318 extern "C" void apply_window_pending_outputs_int16(const int64_t handle,
319  const int64_t value,
320  const int64_t bitset,
321  const int64_t pos) {
322  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
323 }
324 
325 extern "C" void apply_window_pending_outputs_int8(const int64_t handle,
326  const int64_t value,
327  const int64_t bitset,
328  const int64_t pos) {
329  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
330 }
331 
332 extern "C" void apply_window_pending_outputs_double(const int64_t handle,
333  const double value,
334  const int64_t bitset,
335  const int64_t pos) {
336  if (!pos_is_set(bitset, pos)) {
337  return;
338  }
339  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
340  for (auto pending_output_slot : pending_output_slots) {
341  *reinterpret_cast<double*>(pending_output_slot) = value;
342  }
343  pending_output_slots.clear();
344 }
345 
346 extern "C" void apply_window_pending_outputs_float(const int64_t handle,
347  const float value,
348  const int64_t bitset,
349  const int64_t pos) {
350  if (!pos_is_set(bitset, pos)) {
351  return;
352  }
353  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
354  for (auto pending_output_slot : pending_output_slots) {
355  *reinterpret_cast<double*>(pending_output_slot) = value;
356  }
357  pending_output_slots.clear();
358 }
359 
361  const float value,
362  const int64_t bitset,
363  const int64_t pos) {
364  if (!pos_is_set(bitset, pos)) {
365  return;
366  }
367  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
368  for (auto pending_output_slot : pending_output_slots) {
369  *reinterpret_cast<float*>(pending_output_slot) = value;
370  }
371  pending_output_slots.clear();
372 }
373 
374 // Add a pending output slot to be written back at the end of a peer row group.
375 extern "C" void add_window_pending_output(void* pending_output, const int64_t handle) {
376  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
377 }
378 
379 // Returns true iff the aggregate window function requires special multiplicity handling
380 // to ensure that peer rows have the same value for the window function.
382  if (!window_function_is_aggregate(window_func->getKind())) {
383  return false;
384  }
385  if (window_func->getOrderKeys().empty()) {
386  return true;
387  }
388  switch (window_func->getKind()) {
391  return false;
392  }
393  default: {
394  return true;
395  }
396  }
397 }
398 
400  CHECK(!output_);
401  output_ = static_cast<int8_t*>(checked_malloc(
407  }
408  }
409  std::unique_ptr<int64_t[]> scratchpad(new int64_t[elem_count_]);
410  int64_t off = 0;
411  for (size_t i = 0; i < partitionCount(); ++i) {
412  auto partition_size = counts()[i];
413  if (partition_size == 0) {
414  continue;
415  }
416  auto output_for_partition_buff = scratchpad.get() + offsets()[i];
417  std::iota(output_for_partition_buff,
418  output_for_partition_buff + partition_size,
419  int64_t(0));
420  std::vector<Comparator> comparators;
421  const auto& order_keys = window_func_->getOrderKeys();
422  const auto& collation = window_func_->getCollation();
423  CHECK_EQ(order_keys.size(), collation.size());
424  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
425  ++order_column_idx) {
426  auto order_column_buffer = order_columns_[order_column_idx];
427  const auto order_col =
428  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
429  CHECK(order_col);
430  const auto& order_col_collation = collation[order_column_idx];
431  const auto asc_comparator = makeComparator(order_col,
432  order_column_buffer,
433  payload() + offsets()[i],
434  order_col_collation.nulls_first);
435  auto comparator = asc_comparator;
436  if (order_col_collation.is_desc) {
437  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
438  return asc_comparator(rhs, lhs);
439  };
440  }
441  comparators.push_back(comparator);
442  }
443  const auto col_tuple_comparator = [&comparators](const int64_t lhs,
444  const int64_t rhs) {
445  for (const auto& comparator : comparators) {
446  if (comparator(lhs, rhs)) {
447  return true;
448  }
449  }
450  return false;
451  };
452  std::sort(output_for_partition_buff,
453  output_for_partition_buff + partition_size,
454  col_tuple_comparator);
455  computePartition(output_for_partition_buff,
456  partition_size,
457  off,
458  window_func_,
459  col_tuple_comparator);
462  off += partition_size;
463  }
464  }
467  CHECK_EQ(static_cast<size_t>(off), elem_count_);
468  }
469  auto output_i64 = reinterpret_cast<int64_t*>(output_);
471  std::copy(scratchpad.get(), scratchpad.get() + elem_count_, output_i64);
472  } else {
473  for (size_t i = 0; i < elem_count_; ++i) {
474  output_i64[payload()[i]] = scratchpad[i];
475  }
476  }
477 }
478 
480  return window_func_;
481 }
482 
483 const int8_t* WindowFunctionContext::output() const {
484  return output_;
485 }
486 
489  return &aggregate_state_.val;
490 }
491 
494  return &aggregate_state_.count;
495 }
496 
499  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
500 }
501 
503  return partition_start_;
504 }
505 
506 const int8_t* WindowFunctionContext::partitionEnd() const {
507  return partition_end_;
508 }
509 
511  return elem_count_;
512 }
513 
514 void WindowFunctionContext::setRowNumber(llvm::Value* row_number) {
515  aggregate_state_.row_number = row_number;
516 }
517 
520 }
521 
522 namespace {
523 
524 template <class T>
525 bool integer_comparator(const int8_t* order_column_buffer,
526  const SQLTypeInfo& ti,
527  const int32_t* partition_indices,
528  const int64_t lhs,
529  const int64_t rhs,
530  const bool nulls_first) {
531  const auto values = reinterpret_cast<const T*>(order_column_buffer);
532  const auto lhs_val = values[partition_indices[lhs]];
533  const auto rhs_val = values[partition_indices[rhs]];
534  const auto null_val = inline_fixed_encoding_null_val(ti);
535  if (lhs_val == null_val && rhs_val == null_val) {
536  return false;
537  }
538  if (lhs_val == null_val && rhs_val != null_val) {
539  return nulls_first;
540  }
541  if (rhs_val == null_val && lhs_val != null_val) {
542  return !nulls_first;
543  }
544  return lhs_val < rhs_val;
545 }
546 
547 template <class T, class NullPatternType>
548 bool fp_comparator(const int8_t* order_column_buffer,
549  const SQLTypeInfo& ti,
550  const int32_t* partition_indices,
551  const int64_t lhs,
552  const int64_t rhs,
553  const bool nulls_first) {
554  const auto values = reinterpret_cast<const T*>(order_column_buffer);
555  const auto lhs_val = values[partition_indices[lhs]];
556  const auto rhs_val = values[partition_indices[rhs]];
557  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
558  const auto lhs_bit_pattern =
559  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
560  const auto rhs_bit_pattern =
561  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
562  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
563  return false;
564  }
565  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
566  return nulls_first;
567  }
568  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
569  return !nulls_first;
570  }
571  return lhs_val < rhs_val;
572 }
573 
574 } // namespace
575 
576 std::function<bool(const int64_t lhs, const int64_t rhs)>
578  const int8_t* order_column_buffer,
579  const int32_t* partition_indices,
580  const bool nulls_first) {
581  const auto& ti = col_var->get_type_info();
582  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
583  switch (ti.get_size()) {
584  case 8: {
585  return [order_column_buffer, nulls_first, partition_indices, &ti](
586  const int64_t lhs, const int64_t rhs) {
587  return integer_comparator<int64_t>(
588  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
589  };
590  }
591  case 4: {
592  return [order_column_buffer, nulls_first, partition_indices, &ti](
593  const int64_t lhs, const int64_t rhs) {
594  return integer_comparator<int32_t>(
595  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
596  };
597  }
598  case 2: {
599  return [order_column_buffer, nulls_first, partition_indices, &ti](
600  const int64_t lhs, const int64_t rhs) {
601  return integer_comparator<int16_t>(
602  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
603  };
604  }
605  case 1: {
606  return [order_column_buffer, nulls_first, partition_indices, &ti](
607  const int64_t lhs, const int64_t rhs) {
608  return integer_comparator<int8_t>(
609  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
610  };
611  }
612  default: {
613  LOG(FATAL) << "Invalid type size: " << ti.get_size();
614  }
615  }
616  }
617  if (ti.is_fp()) {
618  switch (ti.get_type()) {
619  case kFLOAT: {
620  return [order_column_buffer, nulls_first, partition_indices, &ti](
621  const int64_t lhs, const int64_t rhs) {
622  return fp_comparator<float, int32_t>(
623  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
624  };
625  }
626  case kDOUBLE: {
627  return [order_column_buffer, nulls_first, partition_indices, &ti](
628  const int64_t lhs, const int64_t rhs) {
629  return fp_comparator<double, int64_t>(
630  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
631  };
632  }
633  default: {
634  LOG(FATAL) << "Invalid float type";
635  }
636  }
637  }
638  throw std::runtime_error("Type not supported yet");
639 }
640 
642  int64_t* output_for_partition_buff,
643  const size_t partition_size,
644  const size_t off,
645  const Analyzer::WindowFunction* window_func,
646  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
647  switch (window_func->getKind()) {
649  const auto row_numbers =
650  index_to_row_number(output_for_partition_buff, partition_size);
651  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
652  break;
653  }
655  const auto rank =
656  index_to_rank(output_for_partition_buff, partition_size, comparator);
657  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
658  break;
659  }
661  const auto dense_rank =
662  index_to_dense_rank(output_for_partition_buff, partition_size, comparator);
663  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
664  break;
665  }
667  const auto percent_rank =
668  index_to_percent_rank(output_for_partition_buff, partition_size, comparator);
669  std::copy(percent_rank.begin(),
670  percent_rank.end(),
671  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
672  break;
673  }
675  const auto cume_dist =
676  index_to_cume_dist(output_for_partition_buff, partition_size, comparator);
677  std::copy(cume_dist.begin(),
678  cume_dist.end(),
679  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
680  break;
681  }
683  const auto& args = window_func->getArgs();
684  CHECK_EQ(args.size(), size_t(1));
685  const auto n = get_int_constant_from_expr(args.front().get());
686  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
687  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
688  break;
689  }
692  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
693  const auto partition_row_offsets = payload() + off;
695  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
696  break;
697  }
699  const auto partition_row_offsets = payload() + off;
701  partition_row_offsets, output_for_partition_buff, partition_size);
702  break;
703  }
705  const auto partition_row_offsets = payload() + off;
707  partition_row_offsets, output_for_partition_buff, partition_size);
708  break;
709  }
715  const auto partition_row_offsets = payload() + off;
716  if (window_function_requires_peer_handling(window_func)) {
718  partitionEnd(), off, output_for_partition_buff, partition_size, comparator);
719  }
721  output_for_partition_buff, partition_row_offsets, partition_size);
722  break;
723  }
724  default: {
725  throw std::runtime_error("Window function not supported yet: " +
726  sql_window_function_to_str(window_func->getKind()));
727  }
728  }
729 }
730 
733  0,
734  static_cast<int64_t>(elem_count_),
735  false,
737  1};
738  partition_start_ = static_cast<int8_t*>(
739  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
740  ssize_t partition_count = partitionCount();
741  std::vector<size_t> partition_offsets(partition_count);
742  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
743  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
744  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
745  for (ssize_t i = 0; i < partition_count - 1; ++i) {
746  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
747  }
748 }
749 
752  0,
753  static_cast<int64_t>(elem_count_),
754  false,
756  1};
757  partition_end_ = static_cast<int8_t*>(
758  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
759  ssize_t partition_count = partitionCount();
760  std::vector<size_t> partition_offsets(partition_count);
761  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
762  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
763  for (ssize_t i = 0; i < partition_count - 1; ++i) {
764  if (partition_offsets[i] == 0) {
765  continue;
766  }
767  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
768  }
769  if (elem_count_) {
770  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
771  }
772 }
773 
774 const int32_t* WindowFunctionContext::payload() const {
775  return reinterpret_cast<const int32_t*>(
776  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->payloadBufferOff());
777 }
778 
779 const int32_t* WindowFunctionContext::offsets() const {
780  return reinterpret_cast<const int32_t*>(
781  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
782 }
783 
784 const int32_t* WindowFunctionContext::counts() const {
785  return reinterpret_cast<const int32_t*>(
786  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
787 }
788 
790  const auto partition_count = counts() - offsets();
791  CHECK_GE(partition_count, 0);
792  return partition_count;
793 }
794 
796  std::unique_ptr<WindowFunctionContext> window_function_context,
797  const size_t target_index) {
798  const auto it_ok = window_contexts_.emplace(
799  std::make_pair(target_index, std::move(window_function_context)));
800  CHECK(it_ok.second);
801 }
802 
804  const size_t target_index) const {
805  const auto it = window_contexts_.find(target_index);
806  CHECK(it != window_contexts_.end());
807  s_active_window_function_ = it->second.get();
808  return s_active_window_function_;
809 }
810 
812  s_active_window_function_ = nullptr;
813 }
814 
816  return s_active_window_function_;
817 }
818 
820  s_instance_ = std::make_unique<WindowProjectNodeContext>();
821  return s_instance_.get();
822 }
823 
825  return s_instance_.get();
826 }
827 
829  s_instance_ = nullptr;
830  s_active_window_function_ = nullptr;
831 }
832 
833 std::unique_ptr<WindowProjectNodeContext> WindowProjectNodeContext::s_instance_;
#define CHECK_EQ(x, y)
Definition: Logger.h:195
size_t elementCount() const
void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static WindowFunctionContext * s_active_window_function_
Descriptor for the storage layout use for (approximate) count distinct operations.
const int32_t * payload() const
static std::unique_ptr< WindowProjectNodeContext > s_instance_
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
void apply_lag_to_partition(const ssize_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
ExecutorDeviceType
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
Utility functions for easy access to the result set buffers.
ssize_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
#define LOG(tag)
Definition: Logger.h:182
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1347
const int32_t * counts() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:323
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
bool fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
#define CHECK_GE(x, y)
Definition: Logger.h:200
const Analyzer::WindowFunction * window_func_
void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t partitionCount() const
void add_window_pending_output(void *pending_output, const int64_t handle)
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
void computePartition(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
bool window_function_is_value(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:27
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const Analyzer::WindowFunction * getWindowFunction() const
bool integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
static const WindowProjectNodeContext * get()
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:40
void setRowNumber(llvm::Value *row_number)
const int64_t * aggregateState() const
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:1351
std::string sql_window_function_to_str(const SqlWindowFunctionKind kind)
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:48
static WindowProjectNodeContext * create()
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
std::shared_ptr< JoinHashTableInterface > partitions_
llvm::Value * getRowNumber() const
void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
int64_t aggregateStatePendingOutputs() const
static void resetWindowFunctionContext()
const int8_t * partitionStart() const
const int32_t * offsets() const
bool pos_is_set(const int64_t bitset, const int64_t pos)
const int64_t * aggregateStateCount() const
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:73
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:1341
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:77
const int8_t * output() const
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
const WindowFunctionContext * activateWindowFunctionContext(const size_t target_index) const
#define CHECK(condition)
Definition: Logger.h:187
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
Definition: sqltypes.h:47
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< JoinHashTableInterface > &partitions, const size_t elem_count, const ExecutorDeviceType device_type)
void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
const int8_t * partitionEnd() const
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1339
ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const ExecutorDeviceType device_type_
static WindowFunctionContext * getActiveWindowFunctionContext()