OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <numeric>
20 
22 #include "QueryEngine/Execute.h"
27 #include "Shared/checked_alloc.h"
28 
30  const Analyzer::WindowFunction* window_func,
31  const std::shared_ptr<JoinHashTableInterface>& partitions,
32  const size_t elem_count,
33  const ExecutorDeviceType device_type,
34  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
35  : window_func_(window_func)
36  , partitions_(partitions)
37  , elem_count_(elem_count)
38  , output_(nullptr)
39  , partition_start_(nullptr)
40  , partition_end_(nullptr)
41  , device_type_(device_type)
42  , row_set_mem_owner_(row_set_mem_owner) {}
43 
45  free(partition_start_);
46  free(partition_end_);
47 }
48 
50  const int8_t* column,
51  const Analyzer::ColumnVar* col_var,
52  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
53  order_columns_owner_.push_back(chunks_owner);
54  order_columns_.push_back(column);
55 }
56 
57 namespace {
58 
59 // Converts the sorted indices to a mapping from row position to row number.
60 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
61  std::vector<int64_t> row_numbers(index_size);
62  for (size_t i = 0; i < index_size; ++i) {
63  row_numbers[index[i]] = i + 1;
64  }
65  return row_numbers;
66 }
67 
68 // Returns true iff the current element is greater than the previous, according to the
69 // comparator. This is needed because peer rows have to have the same rank.
71  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
72  const int64_t* index,
73  const size_t i) {
74  if (i == 0) {
75  return false;
76  }
77  return comparator(index[i - 1], index[i]);
78 }
79 
80 // Computes the mapping from row position to rank.
81 std::vector<int64_t> index_to_rank(
82  const int64_t* index,
83  const size_t index_size,
84  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
85  std::vector<int64_t> rank(index_size);
86  size_t crt_rank = 1;
87  for (size_t i = 0; i < index_size; ++i) {
88  if (advance_current_rank(comparator, index, i)) {
89  crt_rank = i + 1;
90  }
91  rank[index[i]] = crt_rank;
92  }
93  return rank;
94 }
95 
96 // Computes the mapping from row position to dense rank.
97 std::vector<int64_t> index_to_dense_rank(
98  const int64_t* index,
99  const size_t index_size,
100  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
101  std::vector<int64_t> dense_rank(index_size);
102  size_t crt_rank = 1;
103  for (size_t i = 0; i < index_size; ++i) {
104  if (advance_current_rank(comparator, index, i)) {
105  ++crt_rank;
106  }
107  dense_rank[index[i]] = crt_rank;
108  }
109  return dense_rank;
110 }
111 
112 // Computes the mapping from row position to percent rank.
113 std::vector<double> index_to_percent_rank(
114  const int64_t* index,
115  const size_t index_size,
116  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
117  std::vector<double> percent_rank(index_size);
118  size_t crt_rank = 1;
119  for (size_t i = 0; i < index_size; ++i) {
120  if (advance_current_rank(comparator, index, i)) {
121  crt_rank = i + 1;
122  }
123  percent_rank[index[i]] =
124  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
125  }
126  return percent_rank;
127 }
128 
129 // Computes the mapping from row position to cumulative distribution.
130 std::vector<double> index_to_cume_dist(
131  const int64_t* index,
132  const size_t index_size,
133  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
134  std::vector<double> cume_dist(index_size);
135  size_t start_peer_group = 0;
136  while (start_peer_group < index_size) {
137  size_t end_peer_group = start_peer_group + 1;
138  while (end_peer_group < index_size &&
139  !advance_current_rank(comparator, index, end_peer_group)) {
140  ++end_peer_group;
141  }
142  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
143  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
144  }
145  start_peer_group = end_peer_group;
146  }
147  return cume_dist;
148 }
149 
150 // Computes the mapping from row position to the n-tile statistic.
151 std::vector<int64_t> index_to_ntile(const int64_t* index,
152  const size_t index_size,
153  const size_t n) {
154  std::vector<int64_t> row_numbers(index_size);
155  if (!n) {
156  throw std::runtime_error("NTILE argument cannot be zero");
157  }
158  const size_t tile_size = (index_size + n - 1) / n;
159  for (size_t i = 0; i < index_size; ++i) {
160  row_numbers[index[i]] = i / tile_size + 1;
161  }
162  return row_numbers;
163 }
164 
165 // The element size in the result buffer for the given window function kind. Currently
166 // it's always 8.
168  return 8;
169 }
170 
171 // Extracts the integer constant from a constant expression.
173  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
174  if (!lag_constant) {
175  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
176  }
177  const auto& lag_ti = lag_constant->get_type_info();
178  switch (lag_ti.get_type()) {
179  case kSMALLINT: {
180  return lag_constant->get_constval().smallintval;
181  }
182  case kINT: {
183  return lag_constant->get_constval().intval;
184  }
185  case kBIGINT: {
186  return lag_constant->get_constval().bigintval;
187  }
188  default: {
189  LOG(FATAL) << "Invalid type for the lag argument";
190  }
191  }
192  return 0;
193 }
194 
195 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
197  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
198  window_func->getKind() == SqlWindowFunctionKind::LEAD);
199  const auto& args = window_func->getArgs();
200  if (args.size() == 3) {
201  throw std::runtime_error("LAG with default not supported yet");
202  }
203  if (args.size() == 2) {
204  const int64_t lag_or_lead =
205  static_cast<int64_t>(get_int_constant_from_expr(args[1].get()));
206  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
207  : -lag_or_lead;
208  }
209  CHECK_EQ(args.size(), size_t(1));
210  return window_func->getKind() == SqlWindowFunctionKind::LAG ? 1 : -1;
211 }
212 
213 // Redistributes the original_indices according to the permutation given by
214 // output_for_partition_buff, reusing it as an output buffer.
215 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
216  const int32_t* original_indices,
217  const size_t partition_size) {
218  std::vector<int64_t> new_output_for_partition_buff(partition_size);
219  for (size_t i = 0; i < partition_size; ++i) {
220  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
221  }
222  std::copy(new_output_for_partition_buff.begin(),
223  new_output_for_partition_buff.end(),
224  output_for_partition_buff);
225 }
226 
227 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
228 void apply_lag_to_partition(const int64_t lag,
229  const int32_t* original_indices,
230  int64_t* sorted_indices,
231  const size_t partition_size) {
232  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
233  for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
234  int64_t lag_idx = idx - lag;
235  if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
236  continue;
237  }
238  lag_sorted_indices[idx] = sorted_indices[lag_idx];
239  }
240  std::vector<int64_t> lag_original_indices(partition_size);
241  for (size_t k = 0; k < partition_size; ++k) {
242  const auto lag_index = lag_sorted_indices[k];
243  lag_original_indices[sorted_indices[k]] =
244  lag_index != -1 ? original_indices[lag_index] : -1;
245  }
246  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
247 }
248 
249 // Computes first value function for the given output_for_partition_buff, reusing it as an
250 // output buffer.
251 void apply_first_value_to_partition(const int32_t* original_indices,
252  int64_t* output_for_partition_buff,
253  const size_t partition_size) {
254  const auto first_value_idx = original_indices[output_for_partition_buff[0]];
255  std::fill(output_for_partition_buff,
256  output_for_partition_buff + partition_size,
257  first_value_idx);
258 }
259 
260 // Computes last value function for the given output_for_partition_buff, reusing it as an
261 // output buffer.
262 void apply_last_value_to_partition(const int32_t* original_indices,
263  int64_t* output_for_partition_buff,
264  const size_t partition_size) {
265  std::copy(
266  original_indices, original_indices + partition_size, output_for_partition_buff);
267 }
268 
270  const int8_t* partition_end,
271  const size_t off,
272  const int64_t* index,
273  const size_t index_size,
274  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
275  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
276  for (size_t i = 0; i < index_size; ++i) {
277  if (advance_current_rank(comparator, index, i)) {
278  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0);
279  }
280  }
281  CHECK(index_size);
282  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0);
283 }
284 
285 bool pos_is_set(const int64_t bitset, const int64_t pos) {
286  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
287 }
288 
289 // Write value to pending integer outputs collected for all the peer rows. The end of
290 // groups is represented by the bitset.
291 template <class T>
292 void apply_window_pending_outputs_int(const int64_t handle,
293  const int64_t value,
294  const int64_t bitset,
295  const int64_t pos) {
296  if (!pos_is_set(bitset, pos)) {
297  return;
298  }
299  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
300  for (auto pending_output_slot : pending_output_slots) {
301  *reinterpret_cast<T*>(pending_output_slot) = value;
302  }
303  pending_output_slots.clear();
304 }
305 
306 } // namespace
307 
308 extern "C" void apply_window_pending_outputs_int64(const int64_t handle,
309  const int64_t value,
310  const int64_t bitset,
311  const int64_t pos) {
312  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
313 }
314 
315 extern "C" void apply_window_pending_outputs_int32(const int64_t handle,
316  const int64_t value,
317  const int64_t bitset,
318  const int64_t pos) {
319  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
320 }
321 
322 extern "C" void apply_window_pending_outputs_int16(const int64_t handle,
323  const int64_t value,
324  const int64_t bitset,
325  const int64_t pos) {
326  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
327 }
328 
329 extern "C" void apply_window_pending_outputs_int8(const int64_t handle,
330  const int64_t value,
331  const int64_t bitset,
332  const int64_t pos) {
333  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
334 }
335 
336 extern "C" void apply_window_pending_outputs_double(const int64_t handle,
337  const double value,
338  const int64_t bitset,
339  const int64_t pos) {
340  if (!pos_is_set(bitset, pos)) {
341  return;
342  }
343  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
344  for (auto pending_output_slot : pending_output_slots) {
345  *reinterpret_cast<double*>(pending_output_slot) = value;
346  }
347  pending_output_slots.clear();
348 }
349 
350 extern "C" void apply_window_pending_outputs_float(const int64_t handle,
351  const float value,
352  const int64_t bitset,
353  const int64_t pos) {
354  if (!pos_is_set(bitset, pos)) {
355  return;
356  }
357  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
358  for (auto pending_output_slot : pending_output_slots) {
359  *reinterpret_cast<double*>(pending_output_slot) = value;
360  }
361  pending_output_slots.clear();
362 }
363 
364 extern "C" void apply_window_pending_outputs_float_columnar(const int64_t handle,
365  const float value,
366  const int64_t bitset,
367  const int64_t pos) {
368  if (!pos_is_set(bitset, pos)) {
369  return;
370  }
371  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
372  for (auto pending_output_slot : pending_output_slots) {
373  *reinterpret_cast<float*>(pending_output_slot) = value;
374  }
375  pending_output_slots.clear();
376 }
377 
378 // Add a pending output slot to be written back at the end of a peer row group.
379 extern "C" void add_window_pending_output(void* pending_output, const int64_t handle) {
380  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
381 }
382 
383 // Returns true iff the aggregate window function requires special multiplicity handling
384 // to ensure that peer rows have the same value for the window function.
386  if (!window_function_is_aggregate(window_func->getKind())) {
387  return false;
388  }
389  if (window_func->getOrderKeys().empty()) {
390  return true;
391  }
392  switch (window_func->getKind()) {
395  return false;
396  }
397  default: {
398  return true;
399  }
400  }
401 }
402 
404  CHECK(!output_);
405  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(
411  }
412  }
413  std::unique_ptr<int64_t[]> scratchpad(new int64_t[elem_count_]);
414  int64_t off = 0;
415  for (size_t i = 0; i < partitionCount(); ++i) {
416  auto partition_size = counts()[i];
417  if (partition_size == 0) {
418  continue;
419  }
420  auto output_for_partition_buff = scratchpad.get() + offsets()[i];
421  std::iota(output_for_partition_buff,
422  output_for_partition_buff + partition_size,
423  int64_t(0));
424  std::vector<Comparator> comparators;
425  const auto& order_keys = window_func_->getOrderKeys();
426  const auto& collation = window_func_->getCollation();
427  CHECK_EQ(order_keys.size(), collation.size());
428  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
429  ++order_column_idx) {
430  auto order_column_buffer = order_columns_[order_column_idx];
431  const auto order_col =
432  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
433  CHECK(order_col);
434  const auto& order_col_collation = collation[order_column_idx];
435  const auto asc_comparator = makeComparator(order_col,
436  order_column_buffer,
437  payload() + offsets()[i],
438  order_col_collation.nulls_first);
439  auto comparator = asc_comparator;
440  if (order_col_collation.is_desc) {
441  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
442  return asc_comparator(rhs, lhs);
443  };
444  }
445  comparators.push_back(comparator);
446  }
447  const auto col_tuple_comparator = [&comparators](const int64_t lhs,
448  const int64_t rhs) {
449  for (const auto& comparator : comparators) {
450  if (comparator(lhs, rhs)) {
451  return true;
452  }
453  }
454  return false;
455  };
456  std::sort(output_for_partition_buff,
457  output_for_partition_buff + partition_size,
458  col_tuple_comparator);
459  computePartition(output_for_partition_buff,
460  partition_size,
461  off,
462  window_func_,
463  col_tuple_comparator);
466  off += partition_size;
467  }
468  }
471  CHECK_EQ(static_cast<size_t>(off), elem_count_);
472  }
473  auto output_i64 = reinterpret_cast<int64_t*>(output_);
475  std::copy(scratchpad.get(), scratchpad.get() + elem_count_, output_i64);
476  } else {
477  for (size_t i = 0; i < elem_count_; ++i) {
478  output_i64[payload()[i]] = scratchpad[i];
479  }
480  }
481 }
482 
484  return window_func_;
485 }
486 
487 const int8_t* WindowFunctionContext::output() const {
488  return output_;
489 }
490 
493  return &aggregate_state_.val;
494 }
495 
498  return &aggregate_state_.count;
499 }
500 
503  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
504 }
505 
507  return partition_start_;
508 }
509 
510 const int8_t* WindowFunctionContext::partitionEnd() const {
511  return partition_end_;
512 }
513 
515  return elem_count_;
516 }
517 
518 void WindowFunctionContext::setRowNumber(llvm::Value* row_number) {
519  aggregate_state_.row_number = row_number;
520 }
521 
524 }
525 
526 namespace {
527 
528 template <class T>
529 bool integer_comparator(const int8_t* order_column_buffer,
530  const SQLTypeInfo& ti,
531  const int32_t* partition_indices,
532  const int64_t lhs,
533  const int64_t rhs,
534  const bool nulls_first) {
535  const auto values = reinterpret_cast<const T*>(order_column_buffer);
536  const auto lhs_val = values[partition_indices[lhs]];
537  const auto rhs_val = values[partition_indices[rhs]];
538  const auto null_val = inline_fixed_encoding_null_val(ti);
539  if (lhs_val == null_val && rhs_val == null_val) {
540  return false;
541  }
542  if (lhs_val == null_val && rhs_val != null_val) {
543  return nulls_first;
544  }
545  if (rhs_val == null_val && lhs_val != null_val) {
546  return !nulls_first;
547  }
548  return lhs_val < rhs_val;
549 }
550 
551 template <class T, class NullPatternType>
552 bool fp_comparator(const int8_t* order_column_buffer,
553  const SQLTypeInfo& ti,
554  const int32_t* partition_indices,
555  const int64_t lhs,
556  const int64_t rhs,
557  const bool nulls_first) {
558  const auto values = reinterpret_cast<const T*>(order_column_buffer);
559  const auto lhs_val = values[partition_indices[lhs]];
560  const auto rhs_val = values[partition_indices[rhs]];
561  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
562  const auto lhs_bit_pattern =
563  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
564  const auto rhs_bit_pattern =
565  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
566  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
567  return false;
568  }
569  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
570  return nulls_first;
571  }
572  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
573  return !nulls_first;
574  }
575  return lhs_val < rhs_val;
576 }
577 
578 } // namespace
579 
580 std::function<bool(const int64_t lhs, const int64_t rhs)>
582  const int8_t* order_column_buffer,
583  const int32_t* partition_indices,
584  const bool nulls_first) {
585  const auto& ti = col_var->get_type_info();
586  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
587  switch (ti.get_size()) {
588  case 8: {
589  return [order_column_buffer, nulls_first, partition_indices, &ti](
590  const int64_t lhs, const int64_t rhs) {
591  return integer_comparator<int64_t>(
592  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
593  };
594  }
595  case 4: {
596  return [order_column_buffer, nulls_first, partition_indices, &ti](
597  const int64_t lhs, const int64_t rhs) {
598  return integer_comparator<int32_t>(
599  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
600  };
601  }
602  case 2: {
603  return [order_column_buffer, nulls_first, partition_indices, &ti](
604  const int64_t lhs, const int64_t rhs) {
605  return integer_comparator<int16_t>(
606  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
607  };
608  }
609  case 1: {
610  return [order_column_buffer, nulls_first, partition_indices, &ti](
611  const int64_t lhs, const int64_t rhs) {
612  return integer_comparator<int8_t>(
613  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
614  };
615  }
616  default: {
617  LOG(FATAL) << "Invalid type size: " << ti.get_size();
618  }
619  }
620  }
621  if (ti.is_fp()) {
622  switch (ti.get_type()) {
623  case kFLOAT: {
624  return [order_column_buffer, nulls_first, partition_indices, &ti](
625  const int64_t lhs, const int64_t rhs) {
626  return fp_comparator<float, int32_t>(
627  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
628  };
629  }
630  case kDOUBLE: {
631  return [order_column_buffer, nulls_first, partition_indices, &ti](
632  const int64_t lhs, const int64_t rhs) {
633  return fp_comparator<double, int64_t>(
634  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
635  };
636  }
637  default: {
638  LOG(FATAL) << "Invalid float type";
639  }
640  }
641  }
642  throw std::runtime_error("Type not supported yet");
643 }
644 
646  int64_t* output_for_partition_buff,
647  const size_t partition_size,
648  const size_t off,
649  const Analyzer::WindowFunction* window_func,
650  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
651  switch (window_func->getKind()) {
653  const auto row_numbers =
654  index_to_row_number(output_for_partition_buff, partition_size);
655  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
656  break;
657  }
659  const auto rank =
660  index_to_rank(output_for_partition_buff, partition_size, comparator);
661  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
662  break;
663  }
665  const auto dense_rank =
666  index_to_dense_rank(output_for_partition_buff, partition_size, comparator);
667  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
668  break;
669  }
671  const auto percent_rank =
672  index_to_percent_rank(output_for_partition_buff, partition_size, comparator);
673  std::copy(percent_rank.begin(),
674  percent_rank.end(),
675  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
676  break;
677  }
679  const auto cume_dist =
680  index_to_cume_dist(output_for_partition_buff, partition_size, comparator);
681  std::copy(cume_dist.begin(),
682  cume_dist.end(),
683  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
684  break;
685  }
687  const auto& args = window_func->getArgs();
688  CHECK_EQ(args.size(), size_t(1));
689  const auto n = get_int_constant_from_expr(args.front().get());
690  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
691  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
692  break;
693  }
696  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
697  const auto partition_row_offsets = payload() + off;
699  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
700  break;
701  }
703  const auto partition_row_offsets = payload() + off;
705  partition_row_offsets, output_for_partition_buff, partition_size);
706  break;
707  }
709  const auto partition_row_offsets = payload() + off;
711  partition_row_offsets, output_for_partition_buff, partition_size);
712  break;
713  }
719  const auto partition_row_offsets = payload() + off;
720  if (window_function_requires_peer_handling(window_func)) {
722  partitionEnd(), off, output_for_partition_buff, partition_size, comparator);
723  }
725  output_for_partition_buff, partition_row_offsets, partition_size);
726  break;
727  }
728  default: {
729  throw std::runtime_error("Window function not supported yet: " +
730  ::toString(window_func->getKind()));
731  }
732  }
733 }
734 
737  0,
738  static_cast<int64_t>(elem_count_),
739  false,
741  1};
742  partition_start_ = static_cast<int8_t*>(
743  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
744  int64_t partition_count = partitionCount();
745  std::vector<size_t> partition_offsets(partition_count);
746  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
747  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
748  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
749  for (int64_t i = 0; i < partition_count - 1; ++i) {
750  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
751  }
752 }
753 
756  0,
757  static_cast<int64_t>(elem_count_),
758  false,
760  1};
761  partition_end_ = static_cast<int8_t*>(
762  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
763  int64_t partition_count = partitionCount();
764  std::vector<size_t> partition_offsets(partition_count);
765  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
766  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
767  for (int64_t i = 0; i < partition_count - 1; ++i) {
768  if (partition_offsets[i] == 0) {
769  continue;
770  }
771  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
772  }
773  if (elem_count_) {
774  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
775  }
776 }
777 
778 const int32_t* WindowFunctionContext::payload() const {
779  return reinterpret_cast<const int32_t*>(
780  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->payloadBufferOff());
781 }
782 
783 const int32_t* WindowFunctionContext::offsets() const {
784  return reinterpret_cast<const int32_t*>(
785  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
786 }
787 
788 const int32_t* WindowFunctionContext::counts() const {
789  return reinterpret_cast<const int32_t*>(
790  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
791 }
792 
794  const auto partition_count = counts() - offsets();
795  CHECK_GE(partition_count, 0);
796  return partition_count;
797 }
798 
800  std::unique_ptr<WindowFunctionContext> window_function_context,
801  const size_t target_index) {
802  const auto it_ok = window_contexts_.emplace(
803  std::make_pair(target_index, std::move(window_function_context)));
804  CHECK(it_ok.second);
805 }
806 
808  Executor* executor,
809  const size_t target_index) const {
810  const auto it = window_contexts_.find(target_index);
811  CHECK(it != window_contexts_.end());
812  executor->active_window_function_ = it->second.get();
813  return executor->active_window_function_;
814 }
815 
817  executor->active_window_function_ = nullptr;
818 }
819 
821  Executor* executor) {
822  return executor->active_window_function_;
823 }
824 
826  executor->window_project_node_context_owned_ =
827  std::make_unique<WindowProjectNodeContext>();
828  return executor->window_project_node_context_owned_.get();
829 }
830 
832  return executor->window_project_node_context_owned_.get();
833 }
834 
835 void WindowProjectNodeContext::reset(Executor* executor) {
836  executor->window_project_node_context_owned_ = nullptr;
837  executor->active_window_function_ = nullptr;
838 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1447
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
std::string toString(const ExtArgumentType &sig_type)
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
ExecutorDeviceType
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
Utility functions for easy access to the result set buffers.
#define LOG(tag)
Definition: Logger.h:188
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
bool fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
#define CHECK_GE(x, y)
Definition: Logger.h:210
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:330
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1455
void add_window_pending_output(void *pending_output, const int64_t handle)
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:1459
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
void computePartition(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static const WindowProjectNodeContext * get(Executor *executor)
bool window_function_is_value(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:27
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const int64_t * aggregateStateCount() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:1449
bool integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
const int8_t * partitionEnd() const
static void reset(Executor *executor)
void setRowNumber(llvm::Value *row_number)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< JoinHashTableInterface > &partitions, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:52
int64_t aggregateStatePendingOutputs() const
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
const int64_t * aggregateState() const
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
std::shared_ptr< JoinHashTableInterface > partitions_
void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
llvm::Value * getRowNumber() const
bool pos_is_set(const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:82
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
#define CHECK(condition)
Definition: Logger.h:197
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const Analyzer::WindowFunction * getWindowFunction() const
Definition: sqltypes.h:47
const int32_t * payload() const
void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const ExecutorDeviceType device_type_