OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <numeric>
20 
22 #include "QueryEngine/Execute.h"
27 #include "Shared/checked_alloc.h"
28 #include "Shared/funcannotations.h"
29 
31  const Analyzer::WindowFunction* window_func,
32  const std::shared_ptr<HashJoin>& partitions,
33  const size_t elem_count,
34  const ExecutorDeviceType device_type,
35  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
36  : window_func_(window_func)
37  , partitions_(partitions)
38  , elem_count_(elem_count)
39  , output_(nullptr)
40  , partition_start_(nullptr)
41  , partition_end_(nullptr)
42  , device_type_(device_type)
43  , row_set_mem_owner_(row_set_mem_owner) {}
44 
46  free(partition_start_);
47  free(partition_end_);
48 }
49 
51  const int8_t* column,
52  const Analyzer::ColumnVar* col_var,
53  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
54  order_columns_owner_.push_back(chunks_owner);
55  order_columns_.push_back(column);
56 }
57 
58 namespace {
59 
60 // Converts the sorted indices to a mapping from row position to row number.
61 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
62  std::vector<int64_t> row_numbers(index_size);
63  for (size_t i = 0; i < index_size; ++i) {
64  row_numbers[index[i]] = i + 1;
65  }
66  return row_numbers;
67 }
68 
69 // Returns true iff the current element is greater than the previous, according to the
70 // comparator. This is needed because peer rows have to have the same rank.
72  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
73  const int64_t* index,
74  const size_t i) {
75  if (i == 0) {
76  return false;
77  }
78  return comparator(index[i - 1], index[i]);
79 }
80 
81 // Computes the mapping from row position to rank.
82 std::vector<int64_t> index_to_rank(
83  const int64_t* index,
84  const size_t index_size,
85  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
86  std::vector<int64_t> rank(index_size);
87  size_t crt_rank = 1;
88  for (size_t i = 0; i < index_size; ++i) {
89  if (advance_current_rank(comparator, index, i)) {
90  crt_rank = i + 1;
91  }
92  rank[index[i]] = crt_rank;
93  }
94  return rank;
95 }
96 
97 // Computes the mapping from row position to dense rank.
98 std::vector<int64_t> index_to_dense_rank(
99  const int64_t* index,
100  const size_t index_size,
101  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
102  std::vector<int64_t> dense_rank(index_size);
103  size_t crt_rank = 1;
104  for (size_t i = 0; i < index_size; ++i) {
105  if (advance_current_rank(comparator, index, i)) {
106  ++crt_rank;
107  }
108  dense_rank[index[i]] = crt_rank;
109  }
110  return dense_rank;
111 }
112 
113 // Computes the mapping from row position to percent rank.
114 std::vector<double> index_to_percent_rank(
115  const int64_t* index,
116  const size_t index_size,
117  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
118  std::vector<double> percent_rank(index_size);
119  size_t crt_rank = 1;
120  for (size_t i = 0; i < index_size; ++i) {
121  if (advance_current_rank(comparator, index, i)) {
122  crt_rank = i + 1;
123  }
124  percent_rank[index[i]] =
125  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
126  }
127  return percent_rank;
128 }
129 
130 // Computes the mapping from row position to cumulative distribution.
131 std::vector<double> index_to_cume_dist(
132  const int64_t* index,
133  const size_t index_size,
134  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
135  std::vector<double> cume_dist(index_size);
136  size_t start_peer_group = 0;
137  while (start_peer_group < index_size) {
138  size_t end_peer_group = start_peer_group + 1;
139  while (end_peer_group < index_size &&
140  !advance_current_rank(comparator, index, end_peer_group)) {
141  ++end_peer_group;
142  }
143  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
144  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
145  }
146  start_peer_group = end_peer_group;
147  }
148  return cume_dist;
149 }
150 
151 // Computes the mapping from row position to the n-tile statistic.
152 std::vector<int64_t> index_to_ntile(const int64_t* index,
153  const size_t index_size,
154  const size_t n) {
155  std::vector<int64_t> row_numbers(index_size);
156  if (!n) {
157  throw std::runtime_error("NTILE argument cannot be zero");
158  }
159  const size_t tile_size = (index_size + n - 1) / n;
160  for (size_t i = 0; i < index_size; ++i) {
161  row_numbers[index[i]] = i / tile_size + 1;
162  }
163  return row_numbers;
164 }
165 
166 // The element size in the result buffer for the given window function kind. Currently
167 // it's always 8.
169  return 8;
170 }
171 
172 // Extracts the integer constant from a constant expression.
174  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
175  if (!lag_constant) {
176  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
177  }
178  const auto& lag_ti = lag_constant->get_type_info();
179  switch (lag_ti.get_type()) {
180  case kSMALLINT: {
181  return lag_constant->get_constval().smallintval;
182  }
183  case kINT: {
184  return lag_constant->get_constval().intval;
185  }
186  case kBIGINT: {
187  return lag_constant->get_constval().bigintval;
188  }
189  default: {
190  LOG(FATAL) << "Invalid type for the lag argument";
191  }
192  }
193  return 0;
194 }
195 
196 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
198  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
199  window_func->getKind() == SqlWindowFunctionKind::LEAD);
200  const auto& args = window_func->getArgs();
201  if (args.size() == 3) {
202  throw std::runtime_error("LAG with default not supported yet");
203  }
204  if (args.size() == 2) {
205  const int64_t lag_or_lead =
206  static_cast<int64_t>(get_int_constant_from_expr(args[1].get()));
207  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
208  : -lag_or_lead;
209  }
210  CHECK_EQ(args.size(), size_t(1));
211  return window_func->getKind() == SqlWindowFunctionKind::LAG ? 1 : -1;
212 }
213 
214 // Redistributes the original_indices according to the permutation given by
215 // output_for_partition_buff, reusing it as an output buffer.
216 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
217  const int32_t* original_indices,
218  const size_t partition_size) {
219  std::vector<int64_t> new_output_for_partition_buff(partition_size);
220  for (size_t i = 0; i < partition_size; ++i) {
221  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
222  }
223  std::copy(new_output_for_partition_buff.begin(),
224  new_output_for_partition_buff.end(),
225  output_for_partition_buff);
226 }
227 
228 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
229 void apply_lag_to_partition(const int64_t lag,
230  const int32_t* original_indices,
231  int64_t* sorted_indices,
232  const size_t partition_size) {
233  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
234  for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
235  int64_t lag_idx = idx - lag;
236  if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
237  continue;
238  }
239  lag_sorted_indices[idx] = sorted_indices[lag_idx];
240  }
241  std::vector<int64_t> lag_original_indices(partition_size);
242  for (size_t k = 0; k < partition_size; ++k) {
243  const auto lag_index = lag_sorted_indices[k];
244  lag_original_indices[sorted_indices[k]] =
245  lag_index != -1 ? original_indices[lag_index] : -1;
246  }
247  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
248 }
249 
250 // Computes first value function for the given output_for_partition_buff, reusing it as an
251 // output buffer.
252 void apply_first_value_to_partition(const int32_t* original_indices,
253  int64_t* output_for_partition_buff,
254  const size_t partition_size) {
255  const auto first_value_idx = original_indices[output_for_partition_buff[0]];
256  std::fill(output_for_partition_buff,
257  output_for_partition_buff + partition_size,
258  first_value_idx);
259 }
260 
261 // Computes last value function for the given output_for_partition_buff, reusing it as an
262 // output buffer.
263 void apply_last_value_to_partition(const int32_t* original_indices,
264  int64_t* output_for_partition_buff,
265  const size_t partition_size) {
266  std::copy(
267  original_indices, original_indices + partition_size, output_for_partition_buff);
268 }
269 
271  const int8_t* partition_end,
272  const size_t off,
273  const int64_t* index,
274  const size_t index_size,
275  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
276  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
277  for (size_t i = 0; i < index_size; ++i) {
278  if (advance_current_rank(comparator, index, i)) {
279  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0);
280  }
281  }
282  CHECK(index_size);
283  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0);
284 }
285 
286 bool pos_is_set(const int64_t bitset, const int64_t pos) {
287  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
288 }
289 
290 // Write value to pending integer outputs collected for all the peer rows. The end of
291 // groups is represented by the bitset.
292 template <class T>
293 void apply_window_pending_outputs_int(const int64_t handle,
294  const int64_t value,
295  const int64_t bitset,
296  const int64_t pos) {
297  if (!pos_is_set(bitset, pos)) {
298  return;
299  }
300  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
301  for (auto pending_output_slot : pending_output_slots) {
302  *reinterpret_cast<T*>(pending_output_slot) = value;
303  }
304  pending_output_slots.clear();
305 }
306 
307 } // namespace
308 
309 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle,
310  const int64_t value,
311  const int64_t bitset,
312  const int64_t pos) {
313  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
314 }
315 
316 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle,
317  const int64_t value,
318  const int64_t bitset,
319  const int64_t pos) {
320  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
321 }
322 
323 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle,
324  const int64_t value,
325  const int64_t bitset,
326  const int64_t pos) {
327  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
328 }
329 
330 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle,
331  const int64_t value,
332  const int64_t bitset,
333  const int64_t pos) {
334  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
335 }
336 
337 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle,
338  const double value,
339  const int64_t bitset,
340  const int64_t pos) {
341  if (!pos_is_set(bitset, pos)) {
342  return;
343  }
344  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
345  for (auto pending_output_slot : pending_output_slots) {
346  *reinterpret_cast<double*>(pending_output_slot) = value;
347  }
348  pending_output_slots.clear();
349 }
350 
351 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle,
352  const float value,
353  const int64_t bitset,
354  const int64_t pos) {
355  if (!pos_is_set(bitset, pos)) {
356  return;
357  }
358  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
359  for (auto pending_output_slot : pending_output_slots) {
360  *reinterpret_cast<double*>(pending_output_slot) = value;
361  }
362  pending_output_slots.clear();
363 }
364 
366  const int64_t handle,
367  const float value,
368  const int64_t bitset,
369  const int64_t pos) {
370  if (!pos_is_set(bitset, pos)) {
371  return;
372  }
373  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
374  for (auto pending_output_slot : pending_output_slots) {
375  *reinterpret_cast<float*>(pending_output_slot) = value;
376  }
377  pending_output_slots.clear();
378 }
379 
380 // Add a pending output slot to be written back at the end of a peer row group.
381 extern "C" RUNTIME_EXPORT void add_window_pending_output(void* pending_output,
382  const int64_t handle) {
383  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
384 }
385 
386 // Returns true iff the aggregate window function requires special multiplicity handling
387 // to ensure that peer rows have the same value for the window function.
389  if (!window_function_is_aggregate(window_func->getKind())) {
390  return false;
391  }
392  if (window_func->getOrderKeys().empty()) {
393  return true;
394  }
395  switch (window_func->getKind()) {
398  return false;
399  }
400  default: {
401  return true;
402  }
403  }
404 }
405 
407  CHECK(!output_);
408  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(
410  /*thread_idx=*/0));
415  }
416  }
417  std::unique_ptr<int64_t[]> scratchpad(new int64_t[elem_count_]);
418  int64_t off = 0;
419  for (size_t i = 0; i < partitionCount(); ++i) {
420  auto partition_size = counts()[i];
421  if (partition_size == 0) {
422  continue;
423  }
424  auto output_for_partition_buff = scratchpad.get() + offsets()[i];
425  std::iota(output_for_partition_buff,
426  output_for_partition_buff + partition_size,
427  int64_t(0));
428  std::vector<Comparator> comparators;
429  const auto& order_keys = window_func_->getOrderKeys();
430  const auto& collation = window_func_->getCollation();
431  CHECK_EQ(order_keys.size(), collation.size());
432  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
433  ++order_column_idx) {
434  auto order_column_buffer = order_columns_[order_column_idx];
435  const auto order_col =
436  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
437  CHECK(order_col);
438  const auto& order_col_collation = collation[order_column_idx];
439  const auto asc_comparator = makeComparator(order_col,
440  order_column_buffer,
441  payload() + offsets()[i],
442  order_col_collation.nulls_first);
443  auto comparator = asc_comparator;
444  if (order_col_collation.is_desc) {
445  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
446  return asc_comparator(rhs, lhs);
447  };
448  }
449  comparators.push_back(comparator);
450  }
451  const auto col_tuple_comparator = [&comparators](const int64_t lhs,
452  const int64_t rhs) {
453  for (const auto& comparator : comparators) {
454  if (comparator(lhs, rhs)) {
455  return true;
456  }
457  }
458  return false;
459  };
460  std::sort(output_for_partition_buff,
461  output_for_partition_buff + partition_size,
462  col_tuple_comparator);
463  computePartition(output_for_partition_buff,
464  partition_size,
465  off,
466  window_func_,
467  col_tuple_comparator);
470  off += partition_size;
471  }
472  }
475  CHECK_EQ(static_cast<size_t>(off), elem_count_);
476  }
477  auto output_i64 = reinterpret_cast<int64_t*>(output_);
479  std::copy(scratchpad.get(), scratchpad.get() + elem_count_, output_i64);
480  } else {
481  for (size_t i = 0; i < elem_count_; ++i) {
482  output_i64[payload()[i]] = scratchpad[i];
483  }
484  }
485 }
486 
488  return window_func_;
489 }
490 
491 const int8_t* WindowFunctionContext::output() const {
492  return output_;
493 }
494 
497  return &aggregate_state_.val;
498 }
499 
502  return &aggregate_state_.count;
503 }
504 
507  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
508 }
509 
511  return partition_start_;
512 }
513 
514 const int8_t* WindowFunctionContext::partitionEnd() const {
515  return partition_end_;
516 }
517 
519  return elem_count_;
520 }
521 
522 namespace {
523 
524 template <class T>
525 bool integer_comparator(const int8_t* order_column_buffer,
526  const SQLTypeInfo& ti,
527  const int32_t* partition_indices,
528  const int64_t lhs,
529  const int64_t rhs,
530  const bool nulls_first) {
531  const auto values = reinterpret_cast<const T*>(order_column_buffer);
532  const auto lhs_val = values[partition_indices[lhs]];
533  const auto rhs_val = values[partition_indices[rhs]];
534  const auto null_val = inline_fixed_encoding_null_val(ti);
535  if (lhs_val == null_val && rhs_val == null_val) {
536  return false;
537  }
538  if (lhs_val == null_val && rhs_val != null_val) {
539  return nulls_first;
540  }
541  if (rhs_val == null_val && lhs_val != null_val) {
542  return !nulls_first;
543  }
544  return lhs_val < rhs_val;
545 }
546 
547 template <class T, class NullPatternType>
548 bool fp_comparator(const int8_t* order_column_buffer,
549  const SQLTypeInfo& ti,
550  const int32_t* partition_indices,
551  const int64_t lhs,
552  const int64_t rhs,
553  const bool nulls_first) {
554  const auto values = reinterpret_cast<const T*>(order_column_buffer);
555  const auto lhs_val = values[partition_indices[lhs]];
556  const auto rhs_val = values[partition_indices[rhs]];
557  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
558  const auto lhs_bit_pattern =
559  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
560  const auto rhs_bit_pattern =
561  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
562  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
563  return false;
564  }
565  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
566  return nulls_first;
567  }
568  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
569  return !nulls_first;
570  }
571  return lhs_val < rhs_val;
572 }
573 
574 } // namespace
575 
576 std::function<bool(const int64_t lhs, const int64_t rhs)>
578  const int8_t* order_column_buffer,
579  const int32_t* partition_indices,
580  const bool nulls_first) {
581  const auto& ti = col_var->get_type_info();
582  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
583  switch (ti.get_size()) {
584  case 8: {
585  return [order_column_buffer, nulls_first, partition_indices, &ti](
586  const int64_t lhs, const int64_t rhs) {
587  return integer_comparator<int64_t>(
588  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
589  };
590  }
591  case 4: {
592  return [order_column_buffer, nulls_first, partition_indices, &ti](
593  const int64_t lhs, const int64_t rhs) {
594  return integer_comparator<int32_t>(
595  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
596  };
597  }
598  case 2: {
599  return [order_column_buffer, nulls_first, partition_indices, &ti](
600  const int64_t lhs, const int64_t rhs) {
601  return integer_comparator<int16_t>(
602  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
603  };
604  }
605  case 1: {
606  return [order_column_buffer, nulls_first, partition_indices, &ti](
607  const int64_t lhs, const int64_t rhs) {
608  return integer_comparator<int8_t>(
609  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
610  };
611  }
612  default: {
613  LOG(FATAL) << "Invalid type size: " << ti.get_size();
614  }
615  }
616  }
617  if (ti.is_fp()) {
618  switch (ti.get_type()) {
619  case kFLOAT: {
620  return [order_column_buffer, nulls_first, partition_indices, &ti](
621  const int64_t lhs, const int64_t rhs) {
622  return fp_comparator<float, int32_t>(
623  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
624  };
625  }
626  case kDOUBLE: {
627  return [order_column_buffer, nulls_first, partition_indices, &ti](
628  const int64_t lhs, const int64_t rhs) {
629  return fp_comparator<double, int64_t>(
630  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
631  };
632  }
633  default: {
634  LOG(FATAL) << "Invalid float type";
635  }
636  }
637  }
638  throw std::runtime_error("Type not supported yet");
639 }
640 
642  int64_t* output_for_partition_buff,
643  const size_t partition_size,
644  const size_t off,
645  const Analyzer::WindowFunction* window_func,
646  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
647  switch (window_func->getKind()) {
649  const auto row_numbers =
650  index_to_row_number(output_for_partition_buff, partition_size);
651  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
652  break;
653  }
655  const auto rank =
656  index_to_rank(output_for_partition_buff, partition_size, comparator);
657  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
658  break;
659  }
661  const auto dense_rank =
662  index_to_dense_rank(output_for_partition_buff, partition_size, comparator);
663  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
664  break;
665  }
667  const auto percent_rank =
668  index_to_percent_rank(output_for_partition_buff, partition_size, comparator);
669  std::copy(percent_rank.begin(),
670  percent_rank.end(),
671  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
672  break;
673  }
675  const auto cume_dist =
676  index_to_cume_dist(output_for_partition_buff, partition_size, comparator);
677  std::copy(cume_dist.begin(),
678  cume_dist.end(),
679  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
680  break;
681  }
683  const auto& args = window_func->getArgs();
684  CHECK_EQ(args.size(), size_t(1));
685  const auto n = get_int_constant_from_expr(args.front().get());
686  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
687  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
688  break;
689  }
692  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
693  const auto partition_row_offsets = payload() + off;
695  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
696  break;
697  }
699  const auto partition_row_offsets = payload() + off;
701  partition_row_offsets, output_for_partition_buff, partition_size);
702  break;
703  }
705  const auto partition_row_offsets = payload() + off;
707  partition_row_offsets, output_for_partition_buff, partition_size);
708  break;
709  }
715  const auto partition_row_offsets = payload() + off;
716  if (window_function_requires_peer_handling(window_func)) {
718  partitionEnd(), off, output_for_partition_buff, partition_size, comparator);
719  }
721  output_for_partition_buff, partition_row_offsets, partition_size);
722  break;
723  }
724  default: {
725  throw std::runtime_error("Window function not supported yet: " +
726  ::toString(window_func->getKind()));
727  }
728  }
729 }
730 
733  0,
734  static_cast<int64_t>(elem_count_),
735  false,
737  1};
738  partition_start_ = static_cast<int8_t*>(
739  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
740  int64_t partition_count = partitionCount();
741  std::vector<size_t> partition_offsets(partition_count);
742  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
743  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
744  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
745  for (int64_t i = 0; i < partition_count - 1; ++i) {
746  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
747  }
748 }
749 
752  0,
753  static_cast<int64_t>(elem_count_),
754  false,
756  1};
757  partition_end_ = static_cast<int8_t*>(
758  checked_calloc(partition_start_bitmap.bitmapPaddedSizeBytes(), 1));
759  int64_t partition_count = partitionCount();
760  std::vector<size_t> partition_offsets(partition_count);
761  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
762  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
763  for (int64_t i = 0; i < partition_count - 1; ++i) {
764  if (partition_offsets[i] == 0) {
765  continue;
766  }
767  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
768  }
769  if (elem_count_) {
770  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
771  }
772 }
773 
774 const int32_t* WindowFunctionContext::payload() const {
775  return reinterpret_cast<const int32_t*>(
776  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->payloadBufferOff());
777 }
778 
779 const int32_t* WindowFunctionContext::offsets() const {
780  return reinterpret_cast<const int32_t*>(
781  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
782 }
783 
784 const int32_t* WindowFunctionContext::counts() const {
785  return reinterpret_cast<const int32_t*>(
786  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
787 }
788 
790  const auto partition_count = counts() - offsets();
791  CHECK_GE(partition_count, 0);
792  return partition_count;
793 }
794 
796  std::unique_ptr<WindowFunctionContext> window_function_context,
797  const size_t target_index) {
798  const auto it_ok = window_contexts_.emplace(
799  std::make_pair(target_index, std::move(window_function_context)));
800  CHECK(it_ok.second);
801 }
802 
804  Executor* executor,
805  const size_t target_index) const {
806  const auto it = window_contexts_.find(target_index);
807  CHECK(it != window_contexts_.end());
808  executor->active_window_function_ = it->second.get();
809  return executor->active_window_function_;
810 }
811 
813  executor->active_window_function_ = nullptr;
814 }
815 
817  Executor* executor) {
818  return executor->active_window_function_;
819 }
820 
822  executor->window_project_node_context_owned_ =
823  std::make_unique<WindowProjectNodeContext>();
824  return executor->window_project_node_context_owned_.get();
825 }
826 
828  return executor->window_project_node_context_owned_.get();
829 }
830 
831 void WindowProjectNodeContext::reset(Executor* executor) {
832  executor->window_project_node_context_owned_ = nullptr;
833  executor->active_window_function_ = nullptr;
834 }
#define CHECK_EQ(x, y)
Definition: Logger.h:211
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:1447
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
std::string toString(const ExtArgumentType &sig_type)
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
RUNTIME_EXPORT void add_window_pending_output(void *pending_output, const int64_t handle)
ExecutorDeviceType
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
Utility functions for easy access to the result set buffers.
#define LOG(tag)
Definition: Logger.h:194
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
bool fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
#define CHECK_GE(x, y)
Definition: Logger.h:216
static WindowProjectNodeContext * create(Executor *executor)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:1455
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:1459
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
void computePartition(int64_t *output_for_partition_buff, const size_t partition_size, const size_t off, const Analyzer::WindowFunction *window_func, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static const WindowProjectNodeContext * get(Executor *executor)
bool window_function_is_value(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:27
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const int64_t * aggregateStateCount() const
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:1449
bool integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
const int8_t * partitionEnd() const
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
int64_t aggregateStatePendingOutputs() const
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const int64_t * aggregateState() const
RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:78
#define RUNTIME_EXPORT
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
bool pos_is_set(const int64_t bitset, const int64_t pos)
std::shared_ptr< HashJoin > partitions_
RUNTIME_EXPORT void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:83
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const std::shared_ptr< HashJoin > &partitions, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
#define CHECK(condition)
Definition: Logger.h:203
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
const Analyzer::WindowFunction * getWindowFunction() const
Definition: sqltypes.h:44
const int32_t * payload() const
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
const ExecutorDeviceType device_type_