OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
WindowContext.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <numeric>
20 
22 #include "QueryEngine/Execute.h"
27 #include "Shared/Intervals.h"
28 #include "Shared/checked_alloc.h"
29 #include "Shared/funcannotations.h"
30 #include "Shared/threading.h"
31 
32 #ifdef HAVE_TBB
33 //#include <tbb/parallel_for.h>
34 #include <tbb/parallel_sort.h>
35 #else
36 #include <thrust/sort.h>
37 #endif
38 
41 
44 
45 // Non-partitioned version (no hash table provided)
47  const Analyzer::WindowFunction* window_func,
48  const size_t elem_count,
49  const ExecutorDeviceType device_type,
50  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
51  : window_func_(window_func)
52  , partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
53  , sorted_partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
54  , partitions_(nullptr)
55  , elem_count_(elem_count)
56  , output_(nullptr)
57  , partition_start_(nullptr)
58  , partition_end_(nullptr)
59  , device_type_(device_type)
60  , row_set_mem_owner_(row_set_mem_owner)
61  , dummy_count_(elem_count)
62  , dummy_offset_(0)
63  , dummy_payload_(nullptr) {
64  CHECK_LE(elem_count_, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
65  if (elem_count_ > 0) {
67  reinterpret_cast<int32_t*>(checked_malloc(elem_count_ * sizeof(int32_t)));
69  }
70 }
71 
72 // Partitioned version
74  const Analyzer::WindowFunction* window_func,
75  QueryPlanHash partition_cache_key,
76  const std::shared_ptr<HashJoin>& partitions,
77  const size_t elem_count,
78  const ExecutorDeviceType device_type,
79  std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner)
80  : window_func_(window_func)
81  , partition_cache_key_(partition_cache_key)
82  , sorted_partition_cache_key_(EMPTY_HASHED_PLAN_DAG_KEY)
83  , partitions_(partitions)
84  , elem_count_(elem_count)
85  , output_(nullptr)
86  , partition_start_(nullptr)
87  , partition_end_(nullptr)
88  , device_type_(device_type)
89  , row_set_mem_owner_(row_set_mem_owner)
90  , dummy_count_(elem_count)
91  , dummy_offset_(0)
92  , dummy_payload_(nullptr) {
93  CHECK(partitions_); // This version should have hash table
94 }
95 
97  free(partition_start_);
98  free(partition_end_);
99  if (dummy_payload_) {
100  free(dummy_payload_);
101  }
102 }
103 
105  const int8_t* column,
106  const Analyzer::ColumnVar* col_var,
107  const std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner) {
108  order_columns_owner_.push_back(chunks_owner);
109  order_columns_.push_back(column);
110 }
111 
113  sorted_partition_cache_key_ = cache_key;
114 }
115 
116 namespace {
117 
118 // Converts the sorted indices to a mapping from row position to row number.
119 std::vector<int64_t> index_to_row_number(const int64_t* index, const size_t index_size) {
120  std::vector<int64_t> row_numbers(index_size);
121  for (size_t i = 0; i < index_size; ++i) {
122  row_numbers[index[i]] = i + 1;
123  }
124  return row_numbers;
125 }
126 
127 // Returns true iff the current element is greater than the previous, according to the
128 // comparator. This is needed because peer rows have to have the same rank.
130  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator,
131  const int64_t* index,
132  const size_t i) {
133  if (i == 0) {
134  return false;
135  }
136  return comparator(index[i - 1], index[i]);
137 }
138 
139 // Computes the mapping from row position to rank.
140 std::vector<int64_t> index_to_rank(
141  const int64_t* index,
142  const size_t index_size,
143  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
144  std::vector<int64_t> rank(index_size);
145  size_t crt_rank = 1;
146  for (size_t i = 0; i < index_size; ++i) {
147  if (advance_current_rank(comparator, index, i)) {
148  crt_rank = i + 1;
149  }
150  rank[index[i]] = crt_rank;
151  }
152  return rank;
153 }
154 
155 // Computes the mapping from row position to dense rank.
156 std::vector<int64_t> index_to_dense_rank(
157  const int64_t* index,
158  const size_t index_size,
159  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
160  std::vector<int64_t> dense_rank(index_size);
161  size_t crt_rank = 1;
162  for (size_t i = 0; i < index_size; ++i) {
163  if (advance_current_rank(comparator, index, i)) {
164  ++crt_rank;
165  }
166  dense_rank[index[i]] = crt_rank;
167  }
168  return dense_rank;
169 }
170 
171 // Computes the mapping from row position to percent rank.
172 std::vector<double> index_to_percent_rank(
173  const int64_t* index,
174  const size_t index_size,
175  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
176  std::vector<double> percent_rank(index_size);
177  size_t crt_rank = 1;
178  for (size_t i = 0; i < index_size; ++i) {
179  if (advance_current_rank(comparator, index, i)) {
180  crt_rank = i + 1;
181  }
182  percent_rank[index[i]] =
183  index_size == 1 ? 0 : static_cast<double>(crt_rank - 1) / (index_size - 1);
184  }
185  return percent_rank;
186 }
187 
188 // Computes the mapping from row position to cumulative distribution.
189 std::vector<double> index_to_cume_dist(
190  const int64_t* index,
191  const size_t index_size,
192  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
193  std::vector<double> cume_dist(index_size);
194  size_t start_peer_group = 0;
195  while (start_peer_group < index_size) {
196  size_t end_peer_group = start_peer_group + 1;
197  while (end_peer_group < index_size &&
198  !advance_current_rank(comparator, index, end_peer_group)) {
199  ++end_peer_group;
200  }
201  for (size_t i = start_peer_group; i < end_peer_group; ++i) {
202  cume_dist[index[i]] = static_cast<double>(end_peer_group) / index_size;
203  }
204  start_peer_group = end_peer_group;
205  }
206  return cume_dist;
207 }
208 
209 // Computes the mapping from row position to the n-tile statistic.
210 std::vector<int64_t> index_to_ntile(const int64_t* index,
211  const size_t index_size,
212  const size_t n) {
213  std::vector<int64_t> row_numbers(index_size);
214  if (!n) {
215  throw std::runtime_error("NTILE argument cannot be zero");
216  }
217  const size_t tile_size = (index_size + n - 1) / n;
218  for (size_t i = 0; i < index_size; ++i) {
219  row_numbers[index[i]] = i / tile_size + 1;
220  }
221  return row_numbers;
222 }
223 
224 // The element size in the result buffer for the given window function kind. Currently
225 // it's always 8.
227  return 8;
228 }
229 
230 // Extracts the integer constant from a constant expression.
232  const auto lag_constant = dynamic_cast<const Analyzer::Constant*>(expr);
233  if (!lag_constant) {
234  throw std::runtime_error("LAG with non-constant lag argument not supported yet");
235  }
236  const auto& lag_ti = lag_constant->get_type_info();
237  switch (lag_ti.get_type()) {
238  case kSMALLINT: {
239  return lag_constant->get_constval().smallintval;
240  }
241  case kINT: {
242  return lag_constant->get_constval().intval;
243  }
244  case kBIGINT: {
245  return lag_constant->get_constval().bigintval;
246  }
247  default: {
248  LOG(FATAL) << "Invalid type for the lag argument";
249  }
250  }
251  return 0;
252 }
253 
254 // Gets the lag or lead argument canonicalized as lag (lag = -lead).
256  CHECK(window_func->getKind() == SqlWindowFunctionKind::LAG ||
257  window_func->getKind() == SqlWindowFunctionKind::LEAD);
258  const auto& args = window_func->getArgs();
259  if (args.size() == 3) {
260  throw std::runtime_error("LAG with default not supported yet");
261  }
262  if (args.size() == 2) {
263  const int64_t lag_or_lead =
264  static_cast<int64_t>(get_int_constant_from_expr(args[1].get()));
265  return window_func->getKind() == SqlWindowFunctionKind::LAG ? lag_or_lead
266  : -lag_or_lead;
267  }
268  CHECK_EQ(args.size(), size_t(1));
269  return window_func->getKind() == SqlWindowFunctionKind::LAG ? 1 : -1;
270 }
271 
272 // Redistributes the original_indices according to the permutation given by
273 // output_for_partition_buff, reusing it as an output buffer.
274 void apply_permutation_to_partition(int64_t* output_for_partition_buff,
275  const int32_t* original_indices,
276 
277  const size_t partition_size) {
278  std::vector<int64_t> new_output_for_partition_buff(partition_size);
279  for (size_t i = 0; i < partition_size; ++i) {
280  new_output_for_partition_buff[i] = original_indices[output_for_partition_buff[i]];
281  }
282  std::copy(new_output_for_partition_buff.begin(),
283  new_output_for_partition_buff.end(),
284  output_for_partition_buff);
285 }
286 
287 // Applies a lag to the given sorted_indices, reusing it as an output buffer.
288 void apply_lag_to_partition(const int64_t lag,
289  const int32_t* original_indices,
290  int64_t* sorted_indices,
291  const size_t partition_size) {
292  std::vector<int64_t> lag_sorted_indices(partition_size, -1);
293  for (int64_t idx = 0; idx < static_cast<int64_t>(partition_size); ++idx) {
294  int64_t lag_idx = idx - lag;
295  if (lag_idx < 0 || lag_idx >= static_cast<int64_t>(partition_size)) {
296  continue;
297  }
298  lag_sorted_indices[idx] = sorted_indices[lag_idx];
299  }
300  std::vector<int64_t> lag_original_indices(partition_size);
301  for (size_t k = 0; k < partition_size; ++k) {
302  const auto lag_index = lag_sorted_indices[k];
303  lag_original_indices[sorted_indices[k]] =
304  lag_index != -1 ? original_indices[lag_index] : -1;
305  }
306  std::copy(lag_original_indices.begin(), lag_original_indices.end(), sorted_indices);
307 }
308 
309 // Computes first value function for the given output_for_partition_buff, reusing it as an
310 // output buffer.
311 void apply_first_value_to_partition(const int32_t* original_indices,
312  int64_t* output_for_partition_buff,
313  const size_t partition_size) {
314  const auto first_value_idx = original_indices[output_for_partition_buff[0]];
315  std::fill(output_for_partition_buff,
316  output_for_partition_buff + partition_size,
317  first_value_idx);
318 }
319 
320 // Computes last value function for the given output_for_partition_buff, reusing it as an
321 // output buffer.
322 void apply_last_value_to_partition(const int32_t* original_indices,
323  int64_t* output_for_partition_buff,
324  const size_t partition_size) {
325  std::copy(
326  original_indices, original_indices + partition_size, output_for_partition_buff);
327 }
328 
330  const int8_t* partition_end,
331  const size_t off,
332  const int64_t* index,
333  const size_t index_size,
334  const std::function<bool(const int64_t lhs, const int64_t rhs)>& comparator) {
335  int64_t partition_end_handle = reinterpret_cast<int64_t>(partition_end);
336  for (size_t i = 0; i < index_size; ++i) {
337  if (advance_current_rank(comparator, index, i)) {
338  agg_count_distinct_bitmap(&partition_end_handle, off + i - 1, 0);
339  }
340  }
341  CHECK(index_size);
342  agg_count_distinct_bitmap(&partition_end_handle, off + index_size - 1, 0);
343 }
344 
345 bool pos_is_set(const int64_t bitset, const int64_t pos) {
346  return (reinterpret_cast<const int8_t*>(bitset))[pos >> 3] & (1 << (pos & 7));
347 }
348 
349 // Write value to pending integer outputs collected for all the peer rows. The end of
350 // groups is represented by the bitset.
351 template <class T>
352 void apply_window_pending_outputs_int(const int64_t handle,
353  const int64_t value,
354  const int64_t bitset,
355  const int64_t pos) {
356  if (!pos_is_set(bitset, pos)) {
357  return;
358  }
359  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
360  for (auto pending_output_slot : pending_output_slots) {
361  *reinterpret_cast<T*>(pending_output_slot) = value;
362  }
363  pending_output_slots.clear();
364 }
365 
366 } // namespace
367 
368 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle,
369  const int64_t value,
370  const int64_t bitset,
371  const int64_t pos) {
372  apply_window_pending_outputs_int<int64_t>(handle, value, bitset, pos);
373 }
374 
375 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle,
376  const int64_t value,
377  const int64_t bitset,
378  const int64_t pos) {
379  apply_window_pending_outputs_int<int32_t>(handle, value, bitset, pos);
380 }
381 
382 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle,
383  const int64_t value,
384  const int64_t bitset,
385  const int64_t pos) {
386  apply_window_pending_outputs_int<int16_t>(handle, value, bitset, pos);
387 }
388 
389 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle,
390  const int64_t value,
391  const int64_t bitset,
392  const int64_t pos) {
393  apply_window_pending_outputs_int<int8_t>(handle, value, bitset, pos);
394 }
395 
396 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle,
397  const double value,
398  const int64_t bitset,
399  const int64_t pos) {
400  if (!pos_is_set(bitset, pos)) {
401  return;
402  }
403  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
404  for (auto pending_output_slot : pending_output_slots) {
405  *reinterpret_cast<double*>(pending_output_slot) = value;
406  }
407  pending_output_slots.clear();
408 }
409 
410 extern "C" RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle,
411  const float value,
412  const int64_t bitset,
413  const int64_t pos) {
414  if (!pos_is_set(bitset, pos)) {
415  return;
416  }
417  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
418  for (auto pending_output_slot : pending_output_slots) {
419  *reinterpret_cast<double*>(pending_output_slot) = value;
420  }
421  pending_output_slots.clear();
422 }
423 
425  const int64_t handle,
426  const float value,
427  const int64_t bitset,
428  const int64_t pos) {
429  if (!pos_is_set(bitset, pos)) {
430  return;
431  }
432  auto& pending_output_slots = *reinterpret_cast<std::vector<void*>*>(handle);
433  for (auto pending_output_slot : pending_output_slots) {
434  *reinterpret_cast<float*>(pending_output_slot) = value;
435  }
436  pending_output_slots.clear();
437 }
438 
439 // Add a pending output slot to be written back at the end of a peer row group.
440 extern "C" RUNTIME_EXPORT void add_window_pending_output(void* pending_output,
441  const int64_t handle) {
442  reinterpret_cast<std::vector<void*>*>(handle)->push_back(pending_output);
443 }
444 
445 // Returns true iff the aggregate window function requires special multiplicity handling
446 // to ensure that peer rows have the same value for the window function.
448  if (!window_function_is_aggregate(window_func->getKind())) {
449  return false;
450  }
451  if (window_func->getOrderKeys().empty()) {
452  return true;
453  }
454  switch (window_func->getKind()) {
457  return false;
458  }
459  default: {
460  return true;
461  }
462  }
463 }
464 
466  std::unordered_map<QueryPlanHash, size_t>& sorted_partition_key_ref_count_map,
467  std::unordered_map<QueryPlanHash, std::unique_ptr<int64_t[]>>&
468  sorted_partition_cache) {
469  auto timer = DEBUG_TIMER(__func__);
470  CHECK(!output_);
471  size_t output_buf_sz =
473  output_ = static_cast<int8_t*>(row_set_mem_owner_->allocate(output_buf_sz,
474  /*thread_idx=*/0));
475  const bool is_window_function_aggregate =
477  if (is_window_function_aggregate) {
481  }
482  }
483  std::unique_ptr<int64_t[]> scratchpad, copied_sorted_partition;
484  int64_t* intermediate_output_buffer;
485  if (is_window_function_aggregate) {
486  intermediate_output_buffer = reinterpret_cast<int64_t*>(output_);
487  } else {
488  output_buf_sz = sizeof(int64_t) * elem_count_;
489  scratchpad.reset(new int64_t[elem_count_]);
490  intermediate_output_buffer = scratchpad.get();
491  }
492  const bool should_parallelize{g_enable_parallel_window_partition_compute &&
493  elem_count_ >=
495 
496  auto cached_sorted_partition_it =
497  sorted_partition_cache.find(sorted_partition_cache_key_);
498  if (cached_sorted_partition_it != sorted_partition_cache.end()) {
499  int64_t* sorted_partition = cached_sorted_partition_it->second.get();
500  VLOG(1) << "Reuse cached sorted partition to compute window function context (key: "
502  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
503  << ")";
504  DEBUG_TIMER("Window Function Cached Sorted Partition Copy");
505  std::memcpy(intermediate_output_buffer, sorted_partition, output_buf_sz);
506  } else {
507  // ordering partitions if necessary
508  const auto sort_partitions = [&](const size_t start, const size_t end) {
509  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
510  sortPartition(partition_idx,
511  intermediate_output_buffer + offsets()[partition_idx],
512  should_parallelize);
513  }
514  };
515 
516  if (should_parallelize) {
517  auto sorted_partition_copy_timer =
518  DEBUG_TIMER("Window Function Partition Sorting Parallelized");
519  threading::task_group thread_pool;
520  for (auto interval : makeIntervals<size_t>(0, partitionCount(), cpu_threads())) {
521  thread_pool.run([=] { sort_partitions(interval.begin, interval.end); });
522  }
523  thread_pool.wait();
524  } else {
525  auto sorted_partition_copy_timer =
526  DEBUG_TIMER("Window Function Partition Sorting Non-Parallelized");
527  sort_partitions(0, partitionCount());
528  }
529  auto sorted_partition_ref_cnt_it =
530  sorted_partition_key_ref_count_map.find(sorted_partition_cache_key_);
531  if (sorted_partition_ref_cnt_it != sorted_partition_key_ref_count_map.end() &&
532  sorted_partition_ref_cnt_it->second > 1) {
533  // keep the sorted partition only if it will be reused from other window function
534  // context of this query
535  copied_sorted_partition.reset(new int64_t[elem_count_]);
536  auto copied_sorted_partition_ptr = copied_sorted_partition.get();
537  DEBUG_TIMER("Window Function Sorted Partition Copy For Caching");
538  std::memcpy(copied_sorted_partition_ptr, intermediate_output_buffer, output_buf_sz);
539  CHECK(sorted_partition_cache
540  .emplace(sorted_partition_cache_key_, std::move(copied_sorted_partition))
541  .second);
542  VLOG(1) << "Put sorted partition to cache (key: " << sorted_partition_cache_key_
543  << ", ordering condition: " << ::toString(window_func_->getOrderKeys())
544  << ")";
545  }
546  }
547 
548  const auto compute_partitions = [&](const size_t start, const size_t end) {
549  for (size_t partition_idx = start; partition_idx < end; ++partition_idx) {
550  computePartitionBuffer(partition_idx,
551  intermediate_output_buffer + offsets()[partition_idx],
552  window_func_);
553  }
554  };
555 
556  if (should_parallelize) {
557  auto partition_compuation_timer = DEBUG_TIMER("Window Function Partition Compute");
558  threading::task_group thread_pool;
559  for (auto interval : makeIntervals<size_t>(0, partitionCount(), cpu_threads())) {
560  thread_pool.run([=] { compute_partitions(interval.begin, interval.end); });
561  }
562  thread_pool.wait();
563  } else {
564  auto partition_compuation_timer =
565  DEBUG_TIMER("Window Function Non-Parallelized Partition Compute");
566  compute_partitions(0, partitionCount());
567  }
568 
569  if (is_window_function_aggregate) {
570  // If window function is aggregate we were able to write to the final output buffer
571  // directly in computePartition and we are done.
572  return;
573  }
574 
575  auto output_i64 = reinterpret_cast<int64_t*>(output_);
576  const auto payload_copy = [&](const size_t start, const size_t end) {
577  for (size_t i = start; i < end; ++i) {
578  output_i64[payload()[i]] = intermediate_output_buffer[i];
579  }
580  };
581  if (should_parallelize) {
582  auto payload_copy_timer =
583  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Parallelized");
584  threading::task_group thread_pool;
585  for (auto interval : makeIntervals<size_t>(
586  0,
587  elem_count_,
588  std::min(static_cast<size_t>(cpu_threads()),
591  thread_pool.run([=] { payload_copy(interval.begin, interval.end); });
592  }
593  thread_pool.wait();
594  } else {
595  auto payload_copy_timer =
596  DEBUG_TIMER("Window Function Non-Aggregate Payload Copy Non-Parallelized");
597  payload_copy(0, elem_count_);
598  }
599 }
600 
601 std::vector<WindowFunctionContext::Comparator> WindowFunctionContext::createComparator(
602  size_t partition_idx) {
603  // create tuple comparator
604  std::vector<WindowFunctionContext::Comparator> partition_comparator;
605  const auto& order_keys = window_func_->getOrderKeys();
606  const auto& collation = window_func_->getCollation();
607  CHECK_EQ(order_keys.size(), collation.size());
608  for (size_t order_column_idx = 0; order_column_idx < order_columns_.size();
609  ++order_column_idx) {
610  auto order_column_buffer = order_columns_[order_column_idx];
611  const auto order_col =
612  dynamic_cast<const Analyzer::ColumnVar*>(order_keys[order_column_idx].get());
613  CHECK(order_col);
614  const auto& order_col_collation = collation[order_column_idx];
615  const auto asc_comparator = makeComparator(order_col,
616  order_column_buffer,
617  payload() + offsets()[partition_idx],
618  order_col_collation.nulls_first);
619  auto comparator = asc_comparator;
620  if (order_col_collation.is_desc) {
621  comparator = [asc_comparator](const int64_t lhs, const int64_t rhs) {
622  return asc_comparator(rhs, lhs);
623  };
624  }
625  partition_comparator.push_back(comparator);
626  }
627  return partition_comparator;
628 }
629 
630 void WindowFunctionContext::sortPartition(const size_t partition_idx,
631  int64_t* output_for_partition_buff,
632  bool should_parallelize) {
633  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
634  if (partition_size == 0) {
635  return;
636  }
637  std::iota(
638  output_for_partition_buff, output_for_partition_buff + partition_size, int64_t(0));
639  auto partition_comparator = createComparator(partition_idx);
640  const auto col_tuple_comparator = [&partition_idx, &partition_comparator](
641  const int64_t lhs, const int64_t rhs) {
642  for (const auto& comparator : partition_comparator) {
643  const auto comparator_result = comparator(lhs, rhs);
644  switch (comparator_result) {
646  return true;
648  return false;
649  default:
650  // WindowComparatorResult::EQ: continue to next comparator
651  continue;
652  }
653  }
654  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
655  // return false as sort algo must enforce weak ordering
656  return false;
657  };
658  if (should_parallelize) {
659 #ifdef HAVE_TBB
660  tbb::parallel_sort(output_for_partition_buff,
661  output_for_partition_buff + partition_size,
662  col_tuple_comparator);
663 #else
664  thrust::sort(output_for_partition_buff,
665  output_for_partition_buff + partition_size,
666  col_tuple_comparator);
667 #endif
668  } else {
669  std::sort(output_for_partition_buff,
670  output_for_partition_buff + partition_size,
671  col_tuple_comparator);
672  }
673 }
674 
676  return window_func_;
677 }
678 
679 const int8_t* WindowFunctionContext::output() const {
680  return output_;
681 }
682 
685  return &aggregate_state_.val;
686 }
687 
690  return &aggregate_state_.count;
691 }
692 
695  return reinterpret_cast<int64_t>(&aggregate_state_.outputs);
696 }
697 
699  return partition_start_;
700 }
701 
702 const int8_t* WindowFunctionContext::partitionEnd() const {
703  return partition_end_;
704 }
705 
707  return elem_count_;
708 }
709 
710 namespace {
711 
712 template <class T>
714  const int8_t* order_column_buffer,
715  const SQLTypeInfo& ti,
716  const int32_t* partition_indices,
717  const int64_t lhs,
718  const int64_t rhs,
719  const bool nulls_first) {
720  const auto values = reinterpret_cast<const T*>(order_column_buffer);
721  const auto lhs_val = values[partition_indices[lhs]];
722  const auto rhs_val = values[partition_indices[rhs]];
723  const auto null_val = inline_fixed_encoding_null_val(ti);
724  if (lhs_val == null_val && rhs_val == null_val) {
726  }
727  if (lhs_val == null_val && rhs_val != null_val) {
730  }
731  if (rhs_val == null_val && lhs_val != null_val) {
734  }
735  if (lhs_val < rhs_val) {
737  }
738  if (lhs_val > rhs_val) {
740  }
742 }
743 
744 template <class T, class NullPatternType>
746  const int8_t* order_column_buffer,
747  const SQLTypeInfo& ti,
748  const int32_t* partition_indices,
749  const int64_t lhs,
750  const int64_t rhs,
751  const bool nulls_first) {
752  const auto values = reinterpret_cast<const T*>(order_column_buffer);
753  const auto lhs_val = values[partition_indices[lhs]];
754  const auto rhs_val = values[partition_indices[rhs]];
755  const auto null_bit_pattern = null_val_bit_pattern(ti, ti.get_type() == kFLOAT);
756  const auto lhs_bit_pattern =
757  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&lhs_val));
758  const auto rhs_bit_pattern =
759  *reinterpret_cast<const NullPatternType*>(may_alias_ptr(&rhs_val));
760  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern == null_bit_pattern) {
762  }
763  if (lhs_bit_pattern == null_bit_pattern && rhs_bit_pattern != null_bit_pattern) {
766  }
767  if (rhs_bit_pattern == null_bit_pattern && lhs_bit_pattern != null_bit_pattern) {
770  }
771  if (lhs_val < rhs_val) {
773  }
774  if (lhs_val > rhs_val) {
776  }
778 }
779 
780 } // namespace
781 
783  const Analyzer::ColumnVar* col_var,
784  const int8_t* order_column_buffer,
785  const int32_t* partition_indices,
786  const bool nulls_first) {
787  const auto& ti = col_var->get_type_info();
788  if (ti.is_integer() || ti.is_decimal() || ti.is_time() || ti.is_boolean()) {
789  switch (ti.get_size()) {
790  case 8: {
791  return [order_column_buffer, nulls_first, partition_indices, &ti](
792  const int64_t lhs, const int64_t rhs) {
793  return integer_comparator<int64_t>(
794  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
795  };
796  }
797  case 4: {
798  return [order_column_buffer, nulls_first, partition_indices, &ti](
799  const int64_t lhs, const int64_t rhs) {
800  return integer_comparator<int32_t>(
801  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
802  };
803  }
804  case 2: {
805  return [order_column_buffer, nulls_first, partition_indices, &ti](
806  const int64_t lhs, const int64_t rhs) {
807  return integer_comparator<int16_t>(
808  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
809  };
810  }
811  case 1: {
812  return [order_column_buffer, nulls_first, partition_indices, &ti](
813  const int64_t lhs, const int64_t rhs) {
814  return integer_comparator<int8_t>(
815  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
816  };
817  }
818  default: {
819  LOG(FATAL) << "Invalid type size: " << ti.get_size();
820  }
821  }
822  }
823  if (ti.is_fp()) {
824  switch (ti.get_type()) {
825  case kFLOAT: {
826  return [order_column_buffer, nulls_first, partition_indices, &ti](
827  const int64_t lhs, const int64_t rhs) {
828  return fp_comparator<float, int32_t>(
829  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
830  };
831  }
832  case kDOUBLE: {
833  return [order_column_buffer, nulls_first, partition_indices, &ti](
834  const int64_t lhs, const int64_t rhs) {
835  return fp_comparator<double, int64_t>(
836  order_column_buffer, ti, partition_indices, lhs, rhs, nulls_first);
837  };
838  }
839  default: {
840  LOG(FATAL) << "Invalid float type";
841  }
842  }
843  }
844  throw std::runtime_error("Type not supported yet");
845 }
846 
848  const size_t partition_idx,
849  int64_t* output_for_partition_buff,
850  const Analyzer::WindowFunction* window_func) {
851  const size_t partition_size{static_cast<size_t>(counts()[partition_idx])};
852  if (partition_size == 0) {
853  return;
854  }
855  const auto offset = offsets()[partition_idx];
856  auto partition_comparator = createComparator(partition_idx);
857  const auto col_tuple_comparator = [&partition_comparator](const int64_t lhs,
858  const int64_t rhs) {
859  for (const auto& comparator : partition_comparator) {
860  const auto comparator_result = comparator(lhs, rhs);
861  switch (comparator_result) {
863  return true;
865  return false;
866  default:
867  // WindowComparatorResult::EQ: continue to next comparator
868  continue;
869  }
870  }
871  // If here WindowFunctionContext::WindowComparatorResult::KEQ for all keys
872  // return false as sort algo must enforce weak ordering
873  return false;
874  };
875  switch (window_func->getKind()) {
877  const auto row_numbers =
878  index_to_row_number(output_for_partition_buff, partition_size);
879  std::copy(row_numbers.begin(), row_numbers.end(), output_for_partition_buff);
880  break;
881  }
883  const auto rank =
884  index_to_rank(output_for_partition_buff, partition_size, col_tuple_comparator);
885  std::copy(rank.begin(), rank.end(), output_for_partition_buff);
886  break;
887  }
889  const auto dense_rank = index_to_dense_rank(
890  output_for_partition_buff, partition_size, col_tuple_comparator);
891  std::copy(dense_rank.begin(), dense_rank.end(), output_for_partition_buff);
892  break;
893  }
895  const auto percent_rank = index_to_percent_rank(
896  output_for_partition_buff, partition_size, col_tuple_comparator);
897  std::copy(percent_rank.begin(),
898  percent_rank.end(),
899  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
900  break;
901  }
903  const auto cume_dist = index_to_cume_dist(
904  output_for_partition_buff, partition_size, col_tuple_comparator);
905  std::copy(cume_dist.begin(),
906  cume_dist.end(),
907  reinterpret_cast<double*>(may_alias_ptr(output_for_partition_buff)));
908  break;
909  }
911  const auto& args = window_func->getArgs();
912  CHECK_EQ(args.size(), size_t(1));
913  const auto n = get_int_constant_from_expr(args.front().get());
914  const auto ntile = index_to_ntile(output_for_partition_buff, partition_size, n);
915  std::copy(ntile.begin(), ntile.end(), output_for_partition_buff);
916  break;
917  }
920  const auto lag_or_lead = get_lag_or_lead_argument(window_func);
921  const auto partition_row_offsets = payload() + offset;
923  lag_or_lead, partition_row_offsets, output_for_partition_buff, partition_size);
924  break;
925  }
927  const auto partition_row_offsets = payload() + offset;
929  partition_row_offsets, output_for_partition_buff, partition_size);
930  break;
931  }
933  const auto partition_row_offsets = payload() + offset;
935  partition_row_offsets, output_for_partition_buff, partition_size);
936  break;
937  }
943  const auto partition_row_offsets = payload() + offset;
944  if (window_function_requires_peer_handling(window_func)) {
946  offset,
947  output_for_partition_buff,
948  partition_size,
949  col_tuple_comparator);
950  }
952  output_for_partition_buff, partition_row_offsets, partition_size);
953  break;
954  }
955  default: {
956  throw std::runtime_error("Window function not supported yet: " +
957  ::toString(window_func->getKind()));
958  }
959  }
960 }
961 
964  0,
965  static_cast<int64_t>(elem_count_),
966  false,
968  1};
969  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
970  if (partitions_) {
971  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
972  }
973  partition_start_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
974  int64_t partition_count = partitionCount();
975  std::vector<size_t> partition_offsets(partition_count);
976  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
977  auto partition_start_handle = reinterpret_cast<int64_t>(partition_start_);
978  agg_count_distinct_bitmap(&partition_start_handle, 0, 0);
979  for (int64_t i = 0; i < partition_count - 1; ++i) {
980  agg_count_distinct_bitmap(&partition_start_handle, partition_offsets[i], 0);
981  }
982 }
983 
986  0,
987  static_cast<int64_t>(elem_count_),
988  false,
990  1};
991  auto bitmap_sz = partition_start_bitmap.bitmapPaddedSizeBytes();
992  if (partitions_) {
993  bitmap_sz += partitions_->isBitwiseEq() ? 1 : 0;
994  }
995  partition_end_ = static_cast<int8_t*>(checked_calloc(bitmap_sz, 1));
996  int64_t partition_count = partitionCount();
997  std::vector<size_t> partition_offsets(partition_count);
998  std::partial_sum(counts(), counts() + partition_count, partition_offsets.begin());
999  auto partition_end_handle = reinterpret_cast<int64_t>(partition_end_);
1000  for (int64_t i = 0; i < partition_count - 1; ++i) {
1001  if (partition_offsets[i] == 0) {
1002  continue;
1003  }
1004  agg_count_distinct_bitmap(&partition_end_handle, partition_offsets[i] - 1, 0);
1005  }
1006  if (elem_count_) {
1007  agg_count_distinct_bitmap(&partition_end_handle, elem_count_ - 1, 0);
1008  }
1009 }
1010 
1011 const int32_t* WindowFunctionContext::payload() const {
1012  if (partitions_) {
1013  return reinterpret_cast<const int32_t*>(
1014  partitions_->getJoinHashBuffer(device_type_, 0) +
1015  partitions_->payloadBufferOff());
1016  }
1017  return dummy_payload_; // non-partitioned window function
1018 }
1019 
1020 const int32_t* WindowFunctionContext::offsets() const {
1021  if (partitions_) {
1022  return reinterpret_cast<const int32_t*>(
1023  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->offsetBufferOff());
1024  }
1025  return &dummy_offset_;
1026 }
1027 
1028 const int32_t* WindowFunctionContext::counts() const {
1029  if (partitions_) {
1030  return reinterpret_cast<const int32_t*>(
1031  partitions_->getJoinHashBuffer(device_type_, 0) + partitions_->countBufferOff());
1032  }
1033  return &dummy_count_;
1034 }
1035 
1037  if (partitions_) {
1038  const auto partition_count = counts() - offsets();
1039  CHECK_GE(partition_count, 0);
1040  return partition_count;
1041  }
1042  return 1; // non-partitioned window function
1043 }
1044 
1046  std::unique_ptr<WindowFunctionContext> window_function_context,
1047  const size_t target_index) {
1048  const auto it_ok = window_contexts_.emplace(
1049  std::make_pair(target_index, std::move(window_function_context)));
1050  CHECK(it_ok.second);
1051 }
1052 
1054  Executor* executor,
1055  const size_t target_index) const {
1056  const auto it = window_contexts_.find(target_index);
1057  CHECK(it != window_contexts_.end());
1058  executor->active_window_function_ = it->second.get();
1059  return executor->active_window_function_;
1060 }
1061 
1063  executor->active_window_function_ = nullptr;
1064 }
1065 
1067  Executor* executor) {
1068  return executor->active_window_function_;
1069 }
1070 
1072  executor->window_project_node_context_owned_ =
1073  std::make_unique<WindowProjectNodeContext>();
1074  return executor->window_project_node_context_owned_.get();
1075 }
1076 
1078  return executor->window_project_node_context_owned_.get();
1079 }
1080 
1081 void WindowProjectNodeContext::reset(Executor* executor) {
1082  executor->window_project_node_context_owned_ = nullptr;
1083  executor->active_window_function_ = nullptr;
1084 }
bool g_enable_parallel_window_partition_sort
#define CHECK_EQ(x, y)
Definition: Logger.h:231
SqlWindowFunctionKind getKind() const
Definition: Analyzer.h:2261
WindowFunctionContext::WindowComparatorResult integer_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)
std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner_
RUNTIME_EXPORT void apply_window_pending_outputs_float(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
Descriptor for the storage layout use for (approximate) count distinct operations.
const int32_t dummy_count_
void compute(std::unordered_map< QueryPlanHash, size_t > &sorted_partition_key_ref_count_map, std::unordered_map< QueryPlanHash, std::unique_ptr< int64_t[]>> &sorted_partition_cache)
bool advance_current_rank(const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator, const int64_t *index, const size_t i)
RUNTIME_EXPORT void add_window_pending_output(void *pending_output, const int64_t handle)
ExecutorDeviceType
std::vector< int64_t > index_to_ntile(const int64_t *index, const size_t index_size, const size_t n)
Utility functions for easy access to the result set buffers.
const int32_t dummy_offset_
#define LOG(tag)
Definition: Logger.h:217
constexpr QueryPlanHash EMPTY_HASHED_PLAN_DAG_KEY
const int8_t * partitionStart() const
std::vector< double > index_to_percent_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
void setSortedPartitionCacheKey(QueryPlanHash cache_key)
void apply_permutation_to_partition(int64_t *output_for_partition_buff, const int32_t *original_indices, const size_t partition_size)
DEVICE void sort(ARGS &&...args)
Definition: gpu_enabled.h:105
#define CHECK_GE(x, y)
Definition: Logger.h:236
static WindowProjectNodeContext * create(Executor *executor)
RUNTIME_EXPORT ALWAYS_INLINE void agg_count_distinct_bitmap(int64_t *agg, const int64_t val, const int64_t min_val)
size_t elementCount() const
const int8_t * output() const
const Analyzer::WindowFunction * window_func_
const int32_t * counts() const
int64_t get_lag_or_lead_argument(const Analyzer::WindowFunction *window_func)
const int32_t * offsets() const
void index_to_partition_end(const int8_t *partition_end, const size_t off, const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
size_t g_parallel_window_partition_compute_threshold
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
size_t g_parallel_window_partition_sort_threshold
RUNTIME_EXPORT void apply_window_pending_outputs_double(const int64_t handle, const double value, const int64_t bitset, const int64_t pos)
std::vector< int64_t > index_to_row_number(const int64_t *index, const size_t index_size)
std::vector< int64_t > index_to_dense_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getOrderKeys() const
Definition: Analyzer.h:2269
int64_t null_val_bit_pattern(const SQLTypeInfo &ti, const bool float_argument_input)
const std::vector< OrderEntry > & getCollation() const
Definition: Analyzer.h:2273
static WindowFunctionContext * getActiveWindowFunctionContext(Executor *executor)
size_t partitionCount() const
AggregateState aggregate_state_
void apply_window_pending_outputs_int(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
static const WindowProjectNodeContext * get(Executor *executor)
DEVICE void fill(ARGS &&...args)
Definition: gpu_enabled.h:60
void apply_last_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
const int64_t * aggregateStateCount() const
std::vector< Comparator > createComparator(size_t partition_idx)
const std::vector< std::shared_ptr< Analyzer::Expr > > & getArgs() const
Definition: Analyzer.h:2263
QueryPlanHash sorted_partition_cache_key_
const int8_t * partitionEnd() const
void * checked_malloc(const size_t size)
Definition: checked_alloc.h:45
DEVICE auto copy(ARGS &&...args)
Definition: gpu_enabled.h:51
static void reset(Executor *executor)
const WindowFunctionContext * activateWindowFunctionContext(Executor *executor, const size_t target_index) const
static Comparator makeComparator(const Analyzer::ColumnVar *col_var, const int8_t *partition_values, const int32_t *partition_indices, const bool nulls_first)
void apply_first_value_to_partition(const int32_t *original_indices, int64_t *output_for_partition_buff, const size_t partition_size)
DEVICE void partial_sum(ARGS &&...args)
Definition: gpu_enabled.h:87
void * checked_calloc(const size_t nmemb, const size_t size)
Definition: checked_alloc.h:53
int64_t aggregateStatePendingOutputs() const
RUNTIME_EXPORT void apply_window_pending_outputs_int64(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const int64_t * aggregateState() const
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1453
RUNTIME_EXPORT void apply_window_pending_outputs_int32(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
const SQLTypeInfo & get_type_info() const
Definition: Analyzer.h:81
#define RUNTIME_EXPORT
bool window_function_is_aggregate(const SqlWindowFunctionKind kind)
Definition: WindowContext.h:42
void addWindowFunctionContext(std::unique_ptr< WindowFunctionContext > window_function_context, const size_t target_index)
#define CHECK_LE(x, y)
Definition: Logger.h:234
RUNTIME_EXPORT void apply_window_pending_outputs_int8(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
void sortPartition(const size_t partition_idx, int64_t *output_for_partition_buff, bool should_parallelize)
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
WindowFunctionContext(const Analyzer::WindowFunction *window_func, const size_t elem_count, const ExecutorDeviceType device_type, std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner)
bool pos_is_set(const int64_t bitset, const int64_t pos)
std::shared_ptr< HashJoin > partitions_
size_t QueryPlanHash
RUNTIME_EXPORT void apply_window_pending_outputs_float_columnar(const int64_t handle, const float value, const int64_t bitset, const int64_t pos)
void apply_lag_to_partition(const int64_t lag, const int32_t *original_indices, int64_t *sorted_indices, const size_t partition_size)
bool window_function_requires_peer_handling(const Analyzer::WindowFunction *window_func)
SqlWindowFunctionKind
Definition: sqldefs.h:111
size_t window_function_buffer_element_size(const SqlWindowFunctionKind)
bool g_enable_parallel_window_partition_compute
void computePartitionBuffer(const size_t partition_idx, int64_t *output_for_partition_buff, const Analyzer::WindowFunction *window_func)
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
std::vector< double > index_to_cume_dist(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
std::vector< const int8_t * > order_columns_
static void resetWindowFunctionContext(Executor *executor)
std::vector< int64_t > index_to_rank(const int64_t *index, const size_t index_size, const std::function< bool(const int64_t lhs, const int64_t rhs)> &comparator)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
const Analyzer::WindowFunction * getWindowFunction() const
Definition: sqltypes.h:45
const int32_t * payload() const
std::function< WindowFunctionContext::WindowComparatorResult(const int64_t lhs, const int64_t rhs)> Comparator
Definition: WindowContext.h:98
size_t get_int_constant_from_expr(const Analyzer::Expr *expr)
RUNTIME_EXPORT void apply_window_pending_outputs_int16(const int64_t handle, const int64_t value, const int64_t bitset, const int64_t pos)
constexpr double n
Definition: Utm.h:38
std::vector< std::vector< std::shared_ptr< Chunk_NS::Chunk > > > order_columns_owner_
int cpu_threads()
Definition: thread_count.h:24
void addOrderColumn(const int8_t *column, const Analyzer::ColumnVar *col_var, const std::vector< std::shared_ptr< Chunk_NS::Chunk >> &chunks_owner)
const ExecutorDeviceType device_type_
#define VLOG(n)
Definition: Logger.h:317
WindowFunctionContext::WindowComparatorResult fp_comparator(const int8_t *order_column_buffer, const SQLTypeInfo &ti, const int32_t *partition_indices, const int64_t lhs, const int64_t rhs, const bool nulls_first)