OmniSciDB  ab4938a6a3
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "WindowContext.h"
18 #include <numeric>
19 #include "../Shared/checked_alloc.h"
20 #include "../Shared/sql_window_function_to_string.h"
24 #include "RuntimeFunctions.h"
25 #include "TypePunning.h"
26 
28  const Analyzer::WindowFunction* window_func,
29  const std::shared_ptr<JoinHashTableInterface>& partitions,
30  const size_t elem_count,
31  const ExecutorDeviceType device_type,
32  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
33  : window_func_(window_func)
34  , partitions_(partitions)
35  , elem_count_(elem_count)
36  , output_(nullptr)
37  , partition_start_(nullptr)
38  , partition_end_(nullptr)
39  , device_type_(device_type)
40  , row_set_mem_owner_(row_set_mem_owner) {}
41 
43  free(partition_start_);
44  free(partition_end_);
45 }
46 
48  const int8_t* column,
49  const Analyzer::ColumnVar* col_var,
50  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
51  order_columns_owner_.push_back(chunks_owner);
52  order_columns_.push_back(column);
53 }
54 
55 namespace {
56 
57 // Converts the sorted indices to a mapping from row position to row number.
58 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
59  std::vector<int64_t> row_numbers(index_size);
60  for (size_t i = 0; i < index_size; ++i) {
61  row_numbers[index[i]] = i + 1;
62  }
63  return row_numbers;
64 }
65 
66 // Returns true iff the current element is greater than the previous, according to the
67 // comparator. This is needed because peer rows have to have the same rank.
69  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
70  const int64_t* index,
71  const size_t i) {
72  if (i == 0) {
73  return false;
74  }
75  return comparator(index[i - 1], index[i]);
76 }
77 
78 // Computes the mapping from row position to rank.
79 std::vector<int64_t> index_to_rank(
80  const int64_t* index,
81  const size_t index_size,
82  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
83  std::vector<int64_t> rank(index_size);
84  size_t crt_rank = 1;
85  for (size_t i = 0; i < index_size; ++i) {
86  if (advance_current_rank(comparator, index, i)) {
87  crt_rank = i + 1;
88  }
89  rank[index[i]] = crt_rank;
90  }
91  return rank;
92 }
93 
94 // Computes the mapping from row position to dense rank.
95 std::vector<int64_t> index_to_dense_rank(
96  const int64_t* index,
97  const size_t index_size,
98  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
99  std::vector<int64_t> dense_rank(index_size);
100  size_t crt_rank = 1;
101  for (size_t i = 0; i < index_size; ++i) {
102  if (advance_current_rank(comparator, index, i)) {
103  ++crt_rank;
104  }
105  dense_rank[index[i]] = crt_rank;
106  }
107  return dense_rank;
108 }
109 
110 // Computes the mapping from row position to percent rank.
111 std::vector<double> index_to_percent_rank(
112  const int64_t* index,
113  const size_t index_size,
114  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
115  std::vector<double> percent_rank(index_size);
116  size_t crt_rank = 1;
117  for (size_t i = 0; i < index_size; ++i) {
118  if (advance_current_rank(comparator, index, i)) {
119  crt_rank = i + 1;
120  }
121  percent_rank[index[i]] =
122  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
123  }
124  return percent_rank;
125 }
126 
127 // Computes the mapping from row position to cumulative distribution.
128 std::vector<double> index_to_cume_dist(
129  const int64_t* index,
130  const size_t index_size,
131  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
132  std::vector<double> cume_dist(index_size);
133  size_t start_peer_group = 0;
134  while (start_peer_group < index_size) {
135  size_t end_peer_group = start_peer_group + 1;
136  while (end_peer_group < index_size &&
137  !advance_current_rank(comparator, index, end_peer_group)) {
138  ++end_peer_group;
139  }
140  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
141  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
142  }
143  start_peer_group = end_peer_group;
144  }
145  return cume_dist;
146 }
147 
148 // Computes the mapping from row position to the n-tile statistic.
149 std::vector<int64_t> index_to_ntile(const int64_t* index,
150  const size_t index_size,
151  const size_t n) {
152  std::vector<int64_t> row_numbers(index_size);
153  if (!n) {
154  throw std::runtime_error("NTILE argument cannot be zero");
155  }
156  const size_t tile_size = (index_size + n - 1) / n;
157  for (size_t i = 0; i < index_size; ++i) {
158  row_numbers[index[i]] = i / tile_size + 1;
159  }
160  return row_numbers;
161 }
162 
163 // The element size in the result buffer for the given window function kind. Currently
164 // it's always 8.
166  return 8;
167 }
168 
169 // Extracts the integer constant from a constant expression.
171  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
172  if (!lag_constant) {
173  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
174  }
175  const auto& lag_ti = lag_constant->get_type_info();
176  switch (lag_ti.get_type()) {
177  case kSMALLINT: {
178  return lag_constant->get_constval().smallintval;
179  }
180  case kINT: {
181  return lag_constant->get_constval().intval;
182  }
183  case kBIGINT: {
184  return lag_constant->get_constval().bigintval;
185  }
186  default: {
187  LOG(FATAL) << "Invalid type for the lag argument";
188  }
189  }
190  return 0;
191 }
192 
193 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
195  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
196  window_func->getKind() == SqlWindowFunctionKind::LEAD);
197  const auto& args = window_func->getArgs();
198  if (args.size() == 3) {
199  throw std::runtime_error("LAG with default not supported yet");
200  }
201  if (args.size() == 2) {
202  const ssize_t lag_or_lead = get_int_constant_from_expr(args[1].get());
203  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
204  : -lag_or_lead;
205  }
206  CHECK_EQ(args.size(), size_t(1));
207  return window_func->getKind() == SqlWindowFunctionKind::LAG ? 1 : -1;
208 }
209 
210 // Redistributes the original_indices according to the permutation given by
211 // output_for_partition_buff, reusing it as an output buffer.
212 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
213  const int32_t* original_indices,
214  const size_t partition_size) {
215  std::vector<int64_t> new_output_for_partition_buff(partition_size);
216  for (size_t i = 0; i < partition_size; ++i) {
217  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
218  }
219  std::copy(new_output_for_partition_buff.begin(),
220  new_output_for_partition_buff.end(),
221  output_for_partition_buff);
222 }
223 
224 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
225 void apply_lag_to_partition(const ssize_t lag,
226  const int32_t* original_indices,
227  int64_t* sorted_indices,
228  const size_t partition_size) {
229  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
230  for (ssize_t idx = 0; idx < static_cast<ssize_t>(partition_size); ++idx) {
231  ssize_t lag_idx = idx - lag;
232  if (lag_idx < 0 || lag_idx >= static_cast<ssize_t>(partition_size)) {
233  continue;
234  }
235  lag_sorted_indices[idx] = sorted_indices[lag_idx];
236  }
237  std::vector<int64_t> lag_original_indices(partition_size);
238  for (size_t k = 0; k < partition_size; ++k) {
239  const auto lag_index = lag_sorted_indices[k];
240  lag_original_indices[sorted_indices[k]] =
241  lag_index != -1 ? original_indices[lag_index] : -1;
242  }
243  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
244 }
245 
246 // Computes first value function for the given output_for_partition_buff, reusing it as an
247 // output buffer.
248 void apply_first_value_to_partition(const int32_t* original_indices,
249  int64_t* output_for_partition_buff,
250  const size_t partition_size) {
251  const auto first_value_idx = original_indices[output_for_partition_buff[0]];
252  std::fill(output_for_partition_buff,
253  output_for_partition_buff + partition_size,
254  first_value_idx);
255 }
256 
257 // Computes last value function for the given output_for_partition_buff, reusing it as an
258 // output buffer.
259 void apply_last_value_to_partition(const int32_t* original_indices,
260  int64_t* output_for_partition_buff,
261  const size_t partition_size) {
262  std::copy(
263  original_indices, original_indices + partition_size, output_for_partition_buff);
264 }
265 
267  const int8_t* partition_end,
268  const size_t off,
269  const int64_t* index,
270  const size_t index_size,
271  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
272  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
273  for (size_t i = 0; i < index_size; ++i) {
274  if (advance_current_rank(comparator, index, i)) {
275  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0);
276  }
277  }
278  CHECK(index_size);
279  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0);
280 }
281 
282 bool pos_is_set(const int64_t bitset, const int64_t pos) {
283  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
284 }
285 
286 // Write value to pending integer outputs collected for all the peer rows. The end of
287 // groups is represented by the bitset.
288 template <class T>
289 void apply_window_pending_outputs_int(const int64_t handle,
290  const int64_t value,
291  const int64_t bitset,
292  const int64_t pos) {
293  if (!pos_is_set(bitset, pos)) {
294  return;
295  }
296  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
297  for (auto pending_output_slot : pending_output_slots) {
298  *reinterpret_cast<T*>(pending_output_slot) = value;
299  }
300  pending_output_slots.clear();
301 }
302 
303 } // namespace
304 
305 extern "C" void apply_window_pending_outputs_int64(const int64_t handle,
306  const int64_t value,
307  const int64_t bitset,
308  const int64_t pos) {
309  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
310 }
311 
312 extern "C" void apply_window_pending_outputs_int32(const int64_t handle,
313  const int64_t value,
314  const int64_t bitset,
315  const int64_t pos) {
316  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
317 }
318 
319 extern "C" void apply_window_pending_outputs_int16(const int64_t handle,
320  const int64_t value,
321  const int64_t bitset,
322  const int64_t pos) {
323  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
324 }
325 
326 extern "C" void apply_window_pending_outputs_int8(const int64_t handle,
327  const int64_t value,
328  const int64_t bitset,
329  const int64_t pos) {
330  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
331 }
332 
333 extern "C" void apply_window_pending_outputs_double(const int64_t handle,
334  const double value,
335  const int64_t bitset,
336  const int64_t pos) {
337  if (!pos_is_set(bitset, pos)) {
338  return;
339  }
340  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
341  for (auto pending_output_slot : pending_output_slots) {
342  *reinterpret_cast<double*>(pending_output_slot) = value;
343  }
344  pending_output_slots.clear();
345 }
346 
347 extern "C" void apply_window_pending_outputs_float(const int64_t handle,
348  const float value,
349  const int64_t bitset,
350  const int64_t pos) {
351  if (!pos_is_set(bitset, pos)) {
352  return;
353  }
354  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
355  for (auto pending_output_slot : pending_output_slots) {
356  *reinterpret_cast<double*>(pending_output_slot) = value;
357  }
358  pending_output_slots.clear();
359 }
360 
361 extern "C" void apply_window_pending_outputs_float_columnar(const int64_t handle,
362  const float value,
363  const int64_t bitset,
364  const int64_t pos) {
365  if (!pos_is_set(bitset, pos)) {
366  return;
367  }
368  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
369  for (auto pending_output_slot : pending_output_slots) {
370  *reinterpret_cast<float*>(pending_output_slot) = value;
371  }
372  pending_output_slots.clear();
373 }
374 
375 // Add a pending output slot to be written back at the end of a peer row group.
376 extern "C" void add_window_pending_output(void* pending_output, const int64_t handle) {
377  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
378 }
379 
380 // Returns true iff the aggregate window function requires special multiplicity handling
381 // to ensure that peer rows have the same value for the window function.
383  if (!window_function_is_aggregate(window_func->getKind())) {
384  return false;
385  }
386  if (window_func->getOrderKeys().empty()) {
387  return true;
388  }
389  switch (window_func->getKind()) {
392  return false;
393  }
394  default: {
395  return true;
396  }
397  }
398 }
399 
401  CHECK(!output_);
402  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(
408  }
409  }
410  std::unique_ptr<int64_t[]> scratchpad(new int64_t[elem_count_]);
411  int64_t off = 0;
412  for (size_t i = 0; i < partitionCount(); ++i) {
413  auto partition_size = counts()[i];
414  if (partition_size == 0) {
415  continue;
416  }
417  auto output_for_partition_buff = scratchpad.get() + offsets()[i];
418  std::iota(output_for_partition_buff,
419  output_for_partition_buff + partition_size,
420  int64_t(0));
421  std::vector<Comparator> comparators;
422  const auto& order_keys = window_func_->getOrderKeys();
423  const auto& collation = window_func_->getCollation();
424  CHECK_EQ(order_keys.size(), collation.size());
425  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
426  ++order_column_idx) {
427  auto order_column_buffer = order_columns_[order_column_idx];
428  const auto order_col =
429  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
430  CHECK(order_col);
431  const auto& order_col_collation = collation[order_column_idx];
432  const auto asc_comparator = makeComparator(order_col,
433  order_column_buffer,
434  payload() + offsets()[i],
435  order_col_collation.nulls_first);
436  auto comparator = asc_comparator;
437  if (order_col_collation.is_desc) {
438  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
439  return asc_comparator(rhs, lhs);
440  };
441  }
442  comparators.push_back(comparator);
443  }
444  const auto col_tuple_comparator = [&comparators](const int64_t lhs,
445  const int64_t rhs) {
446  for (const auto& comparator : comparators) {
447  if (comparator(lhs, rhs)) {
448  return true;
449  }
450  }
451  return false;
452  };
453  std::sort(output_for_partition_buff,
454  output_for_partition_buff + partition_size,
455  col_tuple_comparator);
456  computePartition(output_for_partition_buff,
457  partition_size,
458  off,
459  window_func_,
460  col_tuple_comparator);
463  off += partition_size;
464  }
465  }
468  CHECK_EQ(static_cast<size_t>(off), elem_count_);
469  }
470  auto output_i64 = reinterpret_cast<int64_t*>(output_);
472  std::copy(scratchpad.get(), scratchpad.get() + elem_count_, output_i64);
473  } else {
474  for (size_t i = 0; i < elem_count_; ++i) {
475  output_i64[payload()[i]] = scratchpad[i];
476  }
477  }
478 }
479 
481  return window_func_;
482 }
483 
484 const int8_t* WindowFunctionContext::output() const {
485  return output_;
486 }
487 
490  return &aggregate_state_.val;
491 }
492 
495  return &aggregate_state_.count;
496 }
497 
500  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
501 }
502 
504  return partition_start_;
505 }
506 
507 const int8_t* WindowFunctionContext::partitionEnd() const {
508  return partition_end_;
509 }
510 
512  return elem_count_;
513 }
514 
515 void WindowFunctionContext::setRowNumber(llvm::Value* row_number) {
516  aggregate_state_.row_number = row_number;
517 }
518 
521 }
522 
523 namespace {
524 
525 template <class T>
526 bool integer_comparator(const int8_t* order_column_buffer,
527  const SQLTypeInfo& ti,
528  const int32_t* partition_indices,
529  const int64_t lhs,
530  const int64_t rhs,
531  const bool nulls_first) {
532  const auto values = reinterpret_cast<const T*>(order_column_buffer);
533  const auto lhs_val = values[partition_indices[lhs]];
534  const auto rhs_val = values[partition_indices[rhs]];
535  const auto null_val = inline_fixed_encoding_null_val(ti);
536  if (lhs_val == null_val && rhs_val == null_val) {
537  return false;
538  }
539  if (lhs_val == null_val && rhs_val != null_val) {
540  return nulls_first;
541  }
542  if (rhs_val == null_val && lhs_val != null_val) {
543  return !nulls_first;
544  }
545  return lhs_val < rhs_val;
546 }
547 
548 template <class T, class NullPatternType>
549 bool fp_comparator(const int8_t* order_column_buffer,
550  const SQLTypeInfo& ti,
551  const int32_t* partition_indices,
552  const int64_t lhs,
553  const int64_t rhs,
554  const bool nulls_first) {
555  const auto values = reinterpret_cast<const T*>(order_column_buffer);
556  const auto lhs_val = values[partition_indices[lhs]];
557  const auto rhs_val = values[partition_indices[rhs]];
558  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
559  const auto lhs_bit_pattern =
560  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
561  const auto rhs_bit_pattern =
562  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
563  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
564  return false;
565  }
566  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
567  return nulls_first;
568  }
569  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
570  return !nulls_first;
571  }
572  return lhs_val < rhs_val;
573 }
574 
575 } // namespace
576 
577 std::function<bool(const int64_t lhs, const int64_t rhs)>
579  const int8_t* order_column_buffer,
580  const int32_t* partition_indices,
581  const bool nulls_first) {
582  const auto& ti = col_var->get_type_info();
583  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
584  switch (ti.get_size()) {
585  case 8: {
586  return [order_column_buffer, nulls_first, partition_indices, &ti](
587  const int64_t lhs, const int64_t rhs) {
588  return integer_comparator<int64_t>(
589  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
590  };
591  }
592  case 4: {
593  return [order_column_buffer, nulls_first, partition_indices, &ti](
594  const int64_t lhs, const int64_t rhs) {
595  return integer_comparator<int32_t>(
596  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
597  };
598  }
599  case 2: {
600  return [order_column_buffer, nulls_first, partition_indices, &ti](
601  const int64_t lhs, const int64_t rhs) {
602  return integer_comparator<int16_t>(
603  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
604  };
605  }
606  case 1: {
607  return [order_column_buffer, nulls_first, partition_indices, &ti](
608  const int64_t lhs, const int64_t rhs) {
609  return integer_comparator<int8_t>(
610  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
611  };
612  }
613  default: {
614  LOG(FATAL) << "Invalid type size: " << ti.get_size();
615  }
616  }
617  }
618  if (ti.is_fp()) {
619  switch (ti.get_type()) {
620  case kFLOAT: {
621  return [order_column_buffer, nulls_first, partition_indices, &ti](
622  const int64_t lhs, const int64_t rhs) {
623  return fp_comparator<float, int32_t>(
624  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
625  };
626  }
627  case kDOUBLE: {
628  return [order_column_buffer, nulls_first, partition_indices, &ti](
629  const int64_t lhs, const int64_t rhs) {
630  return fp_comparator<double, int64_t>(
631  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
632  };
633  }
634  default: {
635  LOG(FATAL) << "Invalid float type";
636  }
637  }
638  }
639  throw std::runtime_error("Type not supported yet");
640 }
641 
643  int64_t* output_for_partition_buff,
644  const size_t partition_size,
645  const size_t off,
646  const Analyzer::WindowFunction* window_func,
647  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
648  switch (window_func->getKind()) {
650  const auto row_numbers =
651  index_to_row_number(output_for_partition_buff, partition_size);
652  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
653  break;
654  }
656  const auto rank =
657  index_to_rank(output_for_partition_buff, partition_size, comparator);
658  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
659  break;
660  }
662  const auto dense_rank =
663  index_to_dense_rank(output_for_partition_buff, partition_size, comparator);
664  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
665  break;
666  }
668  const auto percent_rank =
669  index_to_percent_rank(output_for_partition_buff, partition_size, comparator);
670  std::copy(percent_rank.begin(),
671  percent_rank.end(),
672  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
673  break;
674  }
676  const auto cume_dist =
677  index_to_cume_dist(output_for_partition_buff, partition_size, comparator);
678  std::copy(cume_dist.begin(),
679  cume_dist.end(),
680  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
681  break;
682  }
684  const auto& args = window_func->getArgs();
685  CHECK_EQ(args.size(), size_t(1));
686  const auto n = get_int_constant_from_expr(args.front().get());
687  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
688  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
689  break;
690  }
693  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
694  const auto partition_row_offsets = payload() + off;
696  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
697  break;
698  }
700  const auto partition_row_offsets = payload() + off;
702  partition_row_offsets, output_for_partition_buff, partition_size);
703  break;
704  }
706  const auto partition_row_offsets = payload() + off;
708  partition_row_offsets, output_for_partition_buff, partition_size);
709  break;
710  }
716  const auto partition_row_offsets = payload() + off;
717  if (window_function_requires_peer_handling(window_func)) {
719  partitionEnd(), off, output_for_partition_buff, partition_size, comparator);
720  }
722  output_for_partition_buff, partition_row_offsets, partition_size);
723  break;
724  }
725  default: {
726  throw std::runtime_error("Window function not supported yet: " +
727  sql_window_function_to_str(window_func->getKind()));
728  }
729  }
730 }
731 
734  0,
735  static_cast<int64_t>(elem_count_),
736  false,
738  1};
739  partition_start_ = static_cast<int8_t*>(
740  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
741  ssize_t partition_count = partitionCount();
742  std::vector<size_t> partition_offsets(partition_count);
743  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
744  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
745  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
746  for (ssize_t i = 0; i < partition_count - 1; ++i) {
747  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
748  }
749 }
750 
753  0,
754  static_cast<int64_t>(elem_count_),
755  false,
757  1};
758  partition_end_ = static_cast<int8_t*>(
759  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
760  ssize_t partition_count = partitionCount();
761  std::vector<size_t> partition_offsets(partition_count);
762  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
763  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
764  for (ssize_t i = 0; i < partition_count - 1; ++i) {
765  if (partition_offsets[i] == 0) {
766  continue;
767  }
768  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
769  }
770  if (elem_count_) {
771  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
772  }
773 }
774 
775 const int32_t* WindowFunctionContext::payload() const {
776  return reinterpret_cast<const int32_t*>(
777  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->payloadBufferOff());
778 }
779 
780 const int32_t* WindowFunctionContext::offsets() const {
781  return reinterpret_cast<const int32_t*>(
782  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
783 }
784 
785 const int32_t* WindowFunctionContext::counts() const {
786  return reinterpret_cast<const int32_t*>(
787  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
788 }
789 
791  const auto partition_count = counts() - offsets();
792  CHECK_GE(partition_count, 0);
793  return partition_count;
794 }
795 
797  std::unique_ptr<WindowFunctionContext> window_function_context,
798  const size_t target_index) {
799  const auto it_ok = window_contexts_.emplace(
800  std::make_pair(target_index, std::move(window_function_context)));
801  CHECK(it_ok.second);
802 }
803 
805  const size_t target_index) const {
806  const auto it = window_contexts_.find(target_index);
807  CHECK(it != window_contexts_.end());
808  s_active_window_function_ = it->second.get();
809  return s_active_window_function_;
810 }
811 
813  s_active_window_function_ = nullptr;
814 }
815 
817  return s_active_window_function_;
818 }
819 
821  s_instance_ = std::make_unique<WindowProjectNodeContext>();
822  return s_instance_.get();
823 }
824 
826  return s_instance_.get();
827 }
828 
830  s_instance_ = nullptr;
831  s_active_window_function_ = nullptr;
832 }
833 
834 std::unique_ptr<WindowProjectNodeContext> WindowProjectNodeContext::s_instance_;
#define CHECK_EQ(x, y)
Definition: Logger.h:205
size_t elementCount() const
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static WindowFunctionContext * s_active_window_function_
Descriptor for the storage layout use for (approximate) count distinct operations.
const int32_t * payload() const
static std::unique_ptr< WindowProjectNodeContext > s_instance_
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
void apply_lag_to_partition(const ssize_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
ExecutorDeviceType
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
Utility functions for easy access to the result set buffers.
ssize_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
#define LOG(tag)
Definition: Logger.h:188
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1404
const int32_t * counts() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
bool fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
#define CHECK_GE(x, y)
Definition: Logger.h:210
const Analyzer::WindowFunction * window_func_
void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t partitionCount() const
void add_window_pending_output(void *pending_output, const int64_t handle)
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
void computePartition(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
bool window_function_is_value(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:27
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const Analyzer::WindowFunction * getWindowFunction() const
bool integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
static const WindowProjectNodeContext * get()
void setRowNumber(llvm::Value *row_number)
const int64_t * aggregateState() const
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:1408
std::string sql_window_function_to_str(const SqlWindowFunctionKind kind)
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< JoinHashTableInterface > &partitions, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:52
static WindowProjectNodeContext * create()
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
std::shared_ptr< JoinHashTableInterface > partitions_
llvm::Value * getRowNumber() const
void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
int64_t aggregateStatePendingOutputs() const
static void resetWindowFunctionContext()
const int8_t * partitionStart() const
const int32_t * offsets() const
bool pos_is_set(const int64_t bitset, const int64_t pos)
const int64_t * aggregateStateCount() const
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:82
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:1398
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
const int8_t * output() const
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
const WindowFunctionContext * activateWindowFunctionContext(const size_t target_index) const
#define CHECK(condition)
Definition: Logger.h:197
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:258
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
Definition: sqltypes.h:46
void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
const int8_t * partitionEnd() const
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1396
ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const ExecutorDeviceType device_type_
static WindowFunctionContext * getActiveWindowFunctionContext()