OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "DynamicWatchdog.h"
26 #include "Execute.h"
27 #include "ResultSet.h"
29 #include "ResultSetReductionJIT.h"
30 #include "RuntimeFunctions.h"
31 #include "Shared/SqlTypesLayout.h"
32 #include "Shared/likely.h"
33 #include "Shared/thread_count.h"
34 
35 #include <llvm/ExecutionEngine/GenericValue.h>
36 
37 #include <algorithm>
38 #include <future>
39 #include <numeric>
40 
41 extern bool g_enable_dynamic_watchdog;
42 
43 namespace {
44 
45 bool use_multithreaded_reduction(const size_t entry_count) {
46  return entry_count > 100000;
47 }
48 
50  const auto row_bytes = get_row_bytes(query_mem_desc);
51  CHECK_EQ(size_t(0), row_bytes % 8);
52  return row_bytes / 8;
53 }
54 
55 std::vector<int64_t> make_key(const int64_t* buff,
56  const size_t entry_count,
57  const size_t key_count) {
58  std::vector<int64_t> key;
59  size_t off = 0;
60  for (size_t i = 0; i < key_count; ++i) {
61  key.push_back(buff[off]);
62  off += entry_count;
63  }
64  return key;
65 }
66 
67 void fill_slots(int64_t* dst_entry,
68  const size_t dst_entry_count,
69  const int64_t* src_buff,
70  const size_t src_entry_idx,
71  const size_t src_entry_count,
73  const auto slot_count = query_mem_desc.getBufferColSlotCount();
74  const auto key_count = query_mem_desc.getGroupbyColCount();
75  if (query_mem_desc.didOutputColumnar()) {
76  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
77  ++i, dst_slot_off += dst_entry_count) {
78  dst_entry[dst_slot_off] =
79  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
80  }
81  } else {
82  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
83  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
84  for (size_t i = 0; i < slot_count; ++i) {
85  dst_entry[i] = row_ptr[slot_off_quad + i];
86  }
87  }
88 }
89 
91 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
92  for (size_t i = 0; i < key_count; ++i) {
93  key_ptr_i32[i] = EMPTY_KEY_32;
94  }
95 }
96 
98 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
99  for (size_t i = 0; i < key_count; ++i) {
100  key_ptr_i64[i] = EMPTY_KEY_64;
101  }
102 }
103 
104 inline int64_t get_component(const int8_t* group_by_buffer,
105  const size_t comp_sz,
106  const size_t index = 0) {
107  int64_t ret = std::numeric_limits<int64_t>::min();
108  switch (comp_sz) {
109  case 1: {
110  ret = group_by_buffer[index];
111  break;
112  }
113  case 2: {
114  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
115  ret = buffer_ptr[index];
116  break;
117  }
118  case 4: {
119  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
120  ret = buffer_ptr[index];
121  break;
122  }
123  case 8: {
124  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
125  ret = buffer_ptr[index];
126  break;
127  }
128  default:
129  CHECK(false);
130  }
131  return ret;
132 }
133 
134 void run_reduction_code(const ReductionCode& reduction_code,
135  int8_t* this_buff,
136  const int8_t* that_buff,
137  const int32_t start_entry_index,
138  const int32_t end_entry_index,
139  const int32_t that_entry_count,
140  const void* this_qmd,
141  const void* that_qmd,
142  const void* serialized_varlen_buffer) {
143  int err = 0;
144  if (reduction_code.func_ptr) {
145  err = reduction_code.func_ptr(this_buff,
146  that_buff,
147  start_entry_index,
148  end_entry_index,
149  that_entry_count,
150  this_qmd,
151  that_qmd,
152  serialized_varlen_buffer);
153  } else {
154  // Calls LLVM methods that are not thread safe, ensure nothing else compiles while we
155  // run this reduction
156  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
157  auto ret = ReductionInterpreter::run(
158  reduction_code.ir_reduce_loop.get(),
159  {ReductionInterpreter::EvalValue{.ptr = this_buff},
160  ReductionInterpreter::EvalValue{.ptr = that_buff},
161  ReductionInterpreter::EvalValue{.int_val = start_entry_index},
162  ReductionInterpreter::EvalValue{.int_val = end_entry_index},
163  ReductionInterpreter::EvalValue{.int_val = that_entry_count},
166  ReductionInterpreter::EvalValue{.ptr = serialized_varlen_buffer}});
167  err = ret.int_val;
168  }
169  if (err) {
171  throw std::runtime_error("Multiple distinct values encountered");
172  }
173 
174  throw std::runtime_error(
175  "Query execution has exceeded the time limit or was interrupted during result "
176  "set reduction");
177  }
178 }
179 
180 } // namespace
181 
182 void result_set::fill_empty_key(void* key_ptr,
183  const size_t key_count,
184  const size_t key_width) {
185  switch (key_width) {
186  case 4: {
187  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
188  fill_empty_key_32(key_ptr_i32, key_count);
189  break;
190  }
191  case 8: {
192  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
193  fill_empty_key_64(key_ptr_i64, key_count);
194  break;
195  }
196  default:
197  CHECK(false);
198  }
199 }
200 
201 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
202 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
204  const std::vector<std::string>& serialized_varlen_buffer,
205  const ReductionCode& reduction_code) const {
206  auto entry_count = query_mem_desc_.getEntryCount();
207  CHECK_GT(entry_count, size_t(0));
215  }
216  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
219  CHECK_GE(entry_count, that_entry_count);
220  break;
221  default:
222  CHECK_EQ(entry_count, that_entry_count);
223  }
224  auto this_buff = buff_;
225  CHECK(this_buff);
226  auto that_buff = that.buff_;
227  CHECK(that_buff);
230  if (!serialized_varlen_buffer.empty()) {
231  throw std::runtime_error(
232  "Projection of variable length targets with baseline hash group by is not yet "
233  "supported in Distributed mode");
234  }
235  if (use_multithreaded_reduction(that_entry_count)) {
236  const size_t thread_count = cpu_threads();
237  std::vector<std::future<void>> reduction_threads;
238  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
239  const auto thread_entry_count =
240  (that_entry_count + thread_count - 1) / thread_count;
241  const auto start_index = thread_idx * thread_entry_count;
242  const auto end_index =
243  std::min(start_index + thread_entry_count, that_entry_count);
244  reduction_threads.emplace_back(std::async(
245  std::launch::async,
246  [this,
247  this_buff,
248  that_buff,
249  start_index,
250  end_index,
251  that_entry_count,
252  &reduction_code,
253  &that] {
254  if (reduction_code.ir_reduce_loop) {
255  run_reduction_code(reduction_code,
256  this_buff,
257  that_buff,
258  start_index,
259  end_index,
260  that_entry_count,
262  &that.query_mem_desc_,
263  nullptr);
264  } else {
265  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
267  this_buff, that_buff, entry_idx, that_entry_count, that);
268  }
269  }
270  }));
271  }
272  for (auto& reduction_thread : reduction_threads) {
273  reduction_thread.wait();
274  }
275  for (auto& reduction_thread : reduction_threads) {
276  reduction_thread.get();
277  }
278  } else {
279  if (reduction_code.ir_reduce_loop) {
280  run_reduction_code(reduction_code,
281  this_buff,
282  that_buff,
283  0,
284  that_entry_count,
285  that_entry_count,
287  &that.query_mem_desc_,
288  nullptr);
289  } else {
290  for (size_t i = 0; i < that_entry_count; ++i) {
291  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
292  }
293  }
294  }
295  return;
296  }
297  if (use_multithreaded_reduction(entry_count)) {
298  const size_t thread_count = cpu_threads();
299  std::vector<std::future<void>> reduction_threads;
300  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
301  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
302  const auto start_index = thread_idx * thread_entry_count;
303  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
304  if (query_mem_desc_.didOutputColumnar()) {
305  reduction_threads.emplace_back(std::async(std::launch::async,
306  [this,
307  this_buff,
308  that_buff,
309  start_index,
310  end_index,
311  &that,
312  &serialized_varlen_buffer] {
313  reduceEntriesNoCollisionsColWise(
314  this_buff,
315  that_buff,
316  that,
317  start_index,
318  end_index,
319  serialized_varlen_buffer);
320  }));
321  } else {
322  reduction_threads.emplace_back(std::async(std::launch::async,
323  [this,
324  this_buff,
325  that_buff,
326  start_index,
327  end_index,
328  that_entry_count,
329  &reduction_code,
330  &that,
331  &serialized_varlen_buffer] {
332  CHECK(reduction_code.ir_reduce_loop);
334  reduction_code,
335  this_buff,
336  that_buff,
337  start_index,
338  end_index,
339  that_entry_count,
340  &query_mem_desc_,
341  &that.query_mem_desc_,
342  &serialized_varlen_buffer);
343  }));
344  }
345  }
346  for (auto& reduction_thread : reduction_threads) {
347  reduction_thread.wait();
348  }
349  for (auto& reduction_thread : reduction_threads) {
350  reduction_thread.get();
351  }
352  } else {
353  if (query_mem_desc_.didOutputColumnar()) {
354  reduceEntriesNoCollisionsColWise(this_buff,
355  that_buff,
356  that,
357  0,
358  query_mem_desc_.getEntryCount(),
359  serialized_varlen_buffer);
360  } else {
361  CHECK(reduction_code.ir_reduce_loop);
362  run_reduction_code(reduction_code,
363  this_buff,
364  that_buff,
365  0,
366  entry_count,
367  that_entry_count,
368  &query_mem_desc_,
369  &that.query_mem_desc_,
370  &serialized_varlen_buffer);
371  }
372  }
373 }
374 
375 namespace {
376 
377 ALWAYS_INLINE void check_watchdog(const size_t sample_seed) {
378  if (UNLIKELY(g_enable_dynamic_watchdog && (sample_seed & 0x3F) == 0 &&
379  dynamic_watchdog())) {
380  // TODO(alex): distinguish between the deadline and interrupt
381  throw std::runtime_error(
382  "Query execution has exceeded the time limit or was interrupted during result "
383  "set reduction");
384  }
385 }
386 
387 } // namespace
388 
390  int8_t* this_buff,
391  const int8_t* that_buff,
392  const ResultSetStorage& that,
393  const size_t start_index,
394  const size_t end_index,
395  const std::vector<std::string>& serialized_varlen_buffer) const {
396  // TODO(adb / saman): Support column wise output when serializing distributed agg
397  // functions
398  CHECK(serialized_varlen_buffer.empty());
399 
400  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
401 
402  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
403  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
404  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
405  const auto& agg_info = targets_[target_idx];
406  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
407 
408  bool two_slot_target{false};
409  if (agg_info.is_agg &&
410  (agg_info.agg_kind == kAVG ||
411  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
412  // Note that this assumes if one of the slot pairs in a given target is an array,
413  // all slot pairs are arrays. Currently this is true for all geo targets, but we
414  // should better codify and store this information in the future
415  two_slot_target = true;
416  }
417 
418  for (size_t target_slot_idx = slots_for_col.front();
419  target_slot_idx < slots_for_col.back() + 1;
420  target_slot_idx += 2) {
421  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
422  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
423  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
424  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
425 
426  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
427  check_watchdog(entry_idx);
428  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
429  continue;
430  }
432  // copy the key from right hand side
433  copyKeyColWise(entry_idx, this_buff, that_buff);
434  }
435  auto this_ptr1 =
436  this_crt_col_ptr +
437  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
438  auto that_ptr1 =
439  that_crt_col_ptr +
440  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
441  int8_t* this_ptr2{nullptr};
442  const int8_t* that_ptr2{nullptr};
443  if (UNLIKELY(two_slot_target)) {
444  this_ptr2 =
445  this_next_col_ptr +
446  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
447  that_ptr2 =
448  that_next_col_ptr +
449  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
450  }
451  reduceOneSlot(this_ptr1,
452  this_ptr2,
453  that_ptr1,
454  that_ptr2,
455  agg_info,
456  target_idx,
457  target_slot_idx,
458  target_slot_idx,
459  that,
460  slots_for_col.front(),
461  serialized_varlen_buffer);
462  }
463 
464  this_crt_col_ptr = this_next_col_ptr;
465  that_crt_col_ptr = that_next_col_ptr;
466  if (UNLIKELY(two_slot_target)) {
467  this_crt_col_ptr = advance_to_next_columnar_target_buff(
468  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
469  that_crt_col_ptr = advance_to_next_columnar_target_buff(
470  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
471  }
472  }
473  }
474 }
475 
476 /*
477  * copy all keys from the columnar prepended group buffer of "that_buff" into
478  * "this_buff"
479  */
480 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
481  int8_t* this_buff,
482  const int8_t* that_buff) const {
484  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
485  group_idx++) {
486  // if the column corresponds to a group key
487  const auto column_offset_bytes =
489  auto lhs_key_ptr = this_buff + column_offset_bytes;
490  auto rhs_key_ptr = that_buff + column_offset_bytes;
491  switch (query_mem_desc_.groupColWidth(group_idx)) {
492  case 8:
493  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
494  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
495  break;
496  case 4:
497  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
498  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
499  break;
500  case 2:
501  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
502  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
503  break;
504  case 1:
505  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
506  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
507  break;
508  default:
509  CHECK(false);
510  break;
511  }
512  }
513 }
514 
515 // Rewrites the entries of this ResultSetStorage object to point directly into the
516 // serialized_varlen_buffer rather than using offsets.
518  const std::vector<std::string>& serialized_varlen_buffer) const {
519  if (serialized_varlen_buffer.empty()) {
520  return;
521  }
522 
524  auto entry_count = query_mem_desc_.getEntryCount();
525  CHECK_GT(entry_count, size_t(0));
526  CHECK(buff_);
527 
528  // Row-wise iteration, consider moving to separate function
529  for (size_t i = 0; i < entry_count; ++i) {
530  if (isEmptyEntry(i, buff_)) {
531  continue;
532  }
533  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
534  const auto key_bytes_with_padding = align_to_int64(key_bytes);
535  auto rowwise_targets_ptr =
536  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
537  size_t target_slot_idx = 0;
538  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
539  ++target_logical_idx) {
540  const auto& target_info = targets_[target_logical_idx];
541  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
542  CHECK(target_info.agg_kind == kSAMPLE);
543  auto ptr1 = rowwise_targets_ptr;
544  auto slot_idx = target_slot_idx;
545  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
546  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
547 
548  const auto& elem_ti = target_info.sql_type.get_elem_type();
549  size_t length_to_elems =
550  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
551  ? 1
552  : elem_ti.get_size();
553  if (target_info.sql_type.is_geometry()) {
554  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
555  if (j > 0) {
556  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
557  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
558  slot_idx += 2;
559  length_to_elems = 4;
560  }
561  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
562  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
563  const auto str_ptr =
564  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
565  CHECK(ptr1);
566  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
567  CHECK(ptr2);
568  *reinterpret_cast<int64_t*>(ptr2) =
569  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
570  }
571  } else {
572  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
573  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
574  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
575  CHECK(ptr1);
576  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
577  CHECK(ptr2);
578  *reinterpret_cast<int64_t*>(ptr2) =
579  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
580  }
581  }
582 
583  rowwise_targets_ptr = advance_target_ptr_row_wise(
584  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
585  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
586  }
587  }
588 
589  return;
590 }
591 
592 namespace {
593 
594 #ifdef _MSC_VER
595 #define mapd_cas(address, compare, val) \
596  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
597  static_cast<long>(val), \
598  static_cast<long>(compare))
599 #else
600 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
601 #endif
602 
604  const uint32_t h,
605  const int64_t* key,
606  const uint32_t key_qw_count,
607  const size_t entry_count) {
608  auto off = h;
609  const auto old_key = mapd_cas(&groups_buffer[off], EMPTY_KEY_64, *key);
610  if (old_key == EMPTY_KEY_64) {
611  for (size_t i = 0; i < key_qw_count; ++i) {
612  groups_buffer[off] = key[i];
613  off += entry_count;
614  }
615  return {&groups_buffer[off], true};
616  }
617  off = h;
618  for (size_t i = 0; i < key_qw_count; ++i) {
619  if (groups_buffer[off] != key[i]) {
620  return {nullptr, true};
621  }
622  off += entry_count;
623  }
624  return {&groups_buffer[off], false};
625 }
626 
627 #undef mapd_cas
628 
629 // TODO(alex): fix synchronization when we enable it
631  int64_t* groups_buffer,
632  const uint32_t groups_buffer_entry_count,
633  const int64_t* key,
634  const uint32_t key_qw_count) {
635  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
637  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
638  if (matching_gvi.first) {
639  return matching_gvi;
640  }
641  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
642  while (h_probe != h) {
644  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
645  if (matching_gvi.first) {
646  return matching_gvi;
647  }
648  h_probe = (h_probe + 1) % groups_buffer_entry_count;
649  }
650  return {nullptr, true};
651 }
652 
653 #ifdef _MSC_VER
654 #define cas_cst(ptr, expected, desired) \
655  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
656  reinterpret_cast<void*>(&desired), \
657  expected) == expected)
658 #define store_cst(ptr, val) \
659  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
660  reinterpret_cast<void*>(val))
661 #define load_cst(ptr) \
662  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
663 #else
664 #define cas_cst(ptr, expected, desired) \
665  __atomic_compare_exchange_n( \
666  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
667 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
668 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
669 #endif
670 
671 template <typename T = int64_t>
673  int64_t* groups_buffer,
674  const uint32_t h,
675  const T* key,
676  const uint32_t key_count,
678  const int64_t* that_buff_i64,
679  const size_t that_entry_idx,
680  const size_t that_entry_count,
681  const uint32_t row_size_quad) {
682  auto off = h * row_size_quad;
683  T empty_key = get_empty_key<T>();
684  T write_pending = get_empty_key<T>() - 1;
685  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
686  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
687  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
688  if (success) {
689  fill_slots(groups_buffer + off + slot_off_quad,
690  query_mem_desc.getEntryCount(),
691  that_buff_i64,
692  that_entry_idx,
693  that_entry_count,
695  if (key_count > 1) {
696  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
697  }
698  store_cst(row_ptr, *key);
699  return {groups_buffer + off + slot_off_quad, true};
700  }
701  while (load_cst(row_ptr) == write_pending) {
702  // spin until the winning thread has finished writing the entire key and the init
703  // value
704  }
705  for (size_t i = 0; i < key_count; ++i) {
706  if (load_cst(row_ptr + i) != key[i]) {
707  return {nullptr, true};
708  }
709  }
710  return {groups_buffer + off + slot_off_quad, false};
711 }
712 
713 #undef load_cst
714 #undef store_cst
715 #undef cas_cst
716 
718  int64_t* groups_buffer,
719  const uint32_t h,
720  const int64_t* key,
721  const uint32_t key_count,
722  const size_t key_width,
724  const int64_t* that_buff_i64,
725  const size_t that_entry_idx,
726  const size_t that_entry_count,
727  const uint32_t row_size_quad) {
728  switch (key_width) {
729  case 4:
730  return get_matching_group_value_reduction(groups_buffer,
731  h,
732  reinterpret_cast<const int32_t*>(key),
733  key_count,
734  query_mem_desc,
735  that_buff_i64,
736  that_entry_idx,
737  that_entry_count,
738  row_size_quad);
739  case 8:
740  return get_matching_group_value_reduction(groups_buffer,
741  h,
742  key,
743  key_count,
744  query_mem_desc,
745  that_buff_i64,
746  that_entry_idx,
747  that_entry_count,
748  row_size_quad);
749  default:
750  CHECK(false);
751  return {nullptr, true};
752  }
753 }
754 
755 } // namespace
756 
758  int64_t* groups_buffer,
759  const uint32_t groups_buffer_entry_count,
760  const int64_t* key,
761  const uint32_t key_count,
762  const size_t key_width,
764  const int64_t* that_buff_i64,
765  const size_t that_entry_idx,
766  const size_t that_entry_count,
767  const uint32_t row_size_quad) {
768  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
769  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
770  h,
771  key,
772  key_count,
773  key_width,
774  query_mem_desc,
775  that_buff_i64,
776  that_entry_idx,
777  that_entry_count,
778  row_size_quad);
779  if (matching_gvi.first) {
780  return matching_gvi;
781  }
782  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
783  while (h_probe != h) {
784  matching_gvi = get_matching_group_value_reduction(groups_buffer,
785  h_probe,
786  key,
787  key_count,
788  key_width,
789  query_mem_desc,
790  that_buff_i64,
791  that_entry_idx,
792  that_entry_count,
793  row_size_quad);
794  if (matching_gvi.first) {
795  return matching_gvi;
796  }
797  h_probe = (h_probe + 1) % groups_buffer_entry_count;
798  }
799  return {nullptr, true};
800 }
801 
802 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
803 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
805  const int8_t* that_buff,
806  const size_t that_entry_idx,
807  const size_t that_entry_count,
808  const ResultSetStorage& that) const {
809  check_watchdog(that_entry_idx);
810  const auto key_count = query_mem_desc_.getGroupbyColCount();
815  const auto key_off =
817  if (isEmptyEntry(that_entry_idx, that_buff)) {
818  return;
819  }
820  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
821  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
822  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
823  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
824  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
825  CHECK(this_entry_slots);
826  if (empty_entry) {
827  fill_slots(this_entry_slots,
829  that_buff_i64,
830  that_entry_idx,
831  that_entry_count,
833  return;
834  }
836  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
837 }
838 
839 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
840  const int64_t* that_buff,
841  const size_t that_entry_idx,
842  const size_t that_entry_count,
843  const ResultSetStorage& that) const {
845  const auto key_count = query_mem_desc_.getGroupbyColCount();
846  size_t j = 0;
847  size_t init_agg_val_idx = 0;
848  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
849  ++target_logical_idx) {
850  const auto& target_info = targets_[target_logical_idx];
851  const auto that_slot_off = slot_offset_colwise(
852  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
853  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
854  reduceOneSlotBaseline(this_entry_slots,
855  this_slot_off,
856  that_buff,
857  that_entry_count,
858  that_slot_off,
859  target_info,
860  target_logical_idx,
861  j,
862  init_agg_val_idx,
863  that);
865  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
866  } else {
867  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
868  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
869  }
870  }
871  j = advance_slot(j, target_info, false);
872  }
873 }
874 
876  const size_t this_slot,
877  const int64_t* that_buff,
878  const size_t that_entry_count,
879  const size_t that_slot,
880  const TargetInfo& target_info,
881  const size_t target_logical_idx,
882  const size_t target_slot_idx,
883  const size_t init_agg_val_idx,
884  const ResultSetStorage& that) const {
886  int8_t* this_ptr2{nullptr};
887  const int8_t* that_ptr2{nullptr};
888  if (target_info.is_agg &&
889  (target_info.agg_kind == kAVG ||
890  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
891  const auto this_count_off = query_mem_desc_.getEntryCount();
892  const auto that_count_off = that_entry_count;
893  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
894  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
895  }
896  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
897  this_ptr2,
898  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
899  that_ptr2,
900  target_info,
901  target_logical_idx,
902  target_slot_idx,
903  init_agg_val_idx,
904  that,
905  target_slot_idx, // dummy, for now
906  {});
907 }
908 
909 // During the reduction of two result sets using the baseline strategy, we first create a
910 // big enough buffer to hold the entries for both and we move the entries from the first
911 // into it before doing the reduction as usual (into the first buffer).
912 template <class KeyType>
914  const size_t new_entry_count) const {
916  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
917  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
918  const auto key_count = query_mem_desc_.getGroupbyColCount();
921  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
922  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
923  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
924 
926  const size_t thread_count = cpu_threads();
927  std::vector<std::future<void>> move_threads;
928 
929  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
930  const auto thread_entry_count =
931  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
932  const auto start_index = thread_idx * thread_entry_count;
933  const auto end_index =
934  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
935  move_threads.emplace_back(std::async(
936  std::launch::async,
937  [this,
938  src_buff,
939  new_buff_i64,
940  new_entry_count,
941  start_index,
942  end_index,
943  key_count,
944  row_qw_count,
945  key_byte_width] {
946  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
947  moveOneEntryToBuffer<KeyType>(entry_idx,
948  new_buff_i64,
949  new_entry_count,
950  key_count,
951  row_qw_count,
952  src_buff,
953  key_byte_width);
954  }
955  }));
956  }
957  for (auto& move_thread : move_threads) {
958  move_thread.wait();
959  }
960  for (auto& move_thread : move_threads) {
961  move_thread.get();
962  }
963  } else {
964  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
965  moveOneEntryToBuffer<KeyType>(entry_idx,
966  new_buff_i64,
967  new_entry_count,
968  key_count,
969  row_qw_count,
970  src_buff,
971  key_byte_width);
972  }
973  }
974 }
975 
976 template <class KeyType>
977 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
978  int64_t* new_buff_i64,
979  const size_t new_entry_count,
980  const size_t key_count,
981  const size_t row_qw_count,
982  const int64_t* src_buff,
983  const size_t key_byte_width) const {
984  const auto key_off =
987  : row_qw_count * entry_index;
988  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
989  if (*key_ptr == get_empty_key<KeyType>()) {
990  return;
991  }
992  int64_t* new_entries_ptr{nullptr};
994  const auto key =
995  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
996  new_entries_ptr =
997  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
998  } else {
999  new_entries_ptr = get_group_value(new_buff_i64,
1000  new_entry_count,
1001  &src_buff[key_off],
1002  key_count,
1003  key_byte_width,
1004  row_qw_count);
1005  }
1006  CHECK(new_entries_ptr);
1007  fill_slots(new_entries_ptr,
1008  new_entry_count,
1009  src_buff,
1010  entry_index,
1012  query_mem_desc_);
1013 }
1014 
1016  if (query_mem_desc_.didOutputColumnar()) {
1017  storage_->initializeColWise();
1018  } else {
1019  storage_->initializeRowWise();
1020  }
1021 }
1022 
1023 // Driver for reductions. Needed because the result of a reduction on the baseline
1024 // layout, which can have collisions, cannot be done in place and something needs
1025 // to take the ownership of the new result set with the bigger underlying buffer.
1026 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets) {
1027  CHECK(!result_sets.empty());
1028  auto result_rs = result_sets.front();
1029  CHECK(result_rs->storage_);
1030  auto& first_result = *result_rs->storage_;
1031  auto result = &first_result;
1032  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1033  for (const auto result_set : result_sets) {
1034  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1035  }
1036  const auto catalog = result_rs->catalog_;
1037  for (const auto result_set : result_sets) {
1038  CHECK_EQ(catalog, result_set->catalog_);
1039  }
1040  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1042  const auto total_entry_count =
1043  std::accumulate(result_sets.begin(),
1044  result_sets.end(),
1045  size_t(0),
1046  [](const size_t init, const ResultSet* rs) {
1047  return init + rs->query_mem_desc_.getEntryCount();
1048  });
1049  CHECK(total_entry_count);
1050  auto query_mem_desc = first_result.query_mem_desc_;
1051  query_mem_desc.setEntryCount(total_entry_count);
1052  rs_.reset(new ResultSet(first_result.targets_,
1055  row_set_mem_owner,
1056  catalog,
1057  0,
1058  0));
1059  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1060  rs_->initializeStorage();
1061  switch (query_mem_desc.getEffectiveKeyWidth()) {
1062  case 4:
1063  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1064  query_mem_desc.getEntryCount());
1065  break;
1066  case 8:
1067  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1068  query_mem_desc.getEntryCount());
1069  break;
1070  default:
1071  CHECK(false);
1072  }
1073  result = rs_->storage_.get();
1074  result_rs = rs_.get();
1075  }
1076 
1077  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1078  if (!serialized_varlen_buffer.empty()) {
1079  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1080  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1081  ++result_it) {
1082  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1083  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1084  serialized_varlen_buffer.emplace_back(
1085  std::move(result_serialized_varlen_buffer.front()));
1086  }
1087  }
1088 
1089  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1090  result_rs->getTargetInfos(),
1091  result_rs->getTargetInitVals());
1092  auto reduction_code = reduction_jit.codegen();
1093  size_t ctr = 1;
1094  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1095  ++result_it) {
1096  if (!serialized_varlen_buffer.empty()) {
1097  result->reduce(
1098  *((*result_it)->storage_), serialized_varlen_buffer[ctr++], reduction_code);
1099  } else {
1100  result->reduce(*((*result_it)->storage_), {}, reduction_code);
1101  }
1102  }
1103  return result_rs;
1104 }
1105 
1106 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1107  return rs_;
1108 }
1109 
1110 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1111  auto& result_storage = result_rs->storage_;
1112  result_storage->rewriteAggregateBufferOffsets(
1113  result_rs->serialized_varlen_buffer_.front());
1114 }
1115 
1116 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1117  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1118  const auto key_count = query_mem_desc_.getGroupbyColCount();
1119  CHECK_EQ(slot_count + key_count, entry.size());
1120  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1122  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1123  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1124  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1125  for (size_t i = 0; i < key_count; ++i) {
1126  this_buff[key_off + i] = entry[i];
1127  }
1128  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1129  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1130  this_buff[first_slot_off + i] = entry[key_count + i];
1131  }
1132 }
1133 
1135  const auto key_count = query_mem_desc_.getGroupbyColCount();
1136  const auto row_size = get_row_bytes(query_mem_desc_);
1137  CHECK_EQ(row_size % 8, 0u);
1138  const auto key_bytes_with_padding =
1142  case 4: {
1143  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1144  auto row_ptr = buff_ + i * row_size;
1145  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1146  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1147  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1148  slot_ptr[j] = target_init_vals_[j];
1149  }
1150  }
1151  break;
1152  }
1153  case 8: {
1154  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1155  auto row_ptr = buff_ + i * row_size;
1156  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1157  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1158  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1159  slot_ptr[j] = target_init_vals_[j];
1160  }
1161  }
1162  break;
1163  }
1164  default:
1165  CHECK(false);
1166  }
1167 }
1168 
1169 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1171  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1172  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1173  const auto key_count = query_mem_desc_.getGroupbyColCount();
1174  CHECK_EQ(slot_count + key_count, entry.size());
1175  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1176 
1177  for (size_t i = 0; i < key_count; i++) {
1178  const auto key_offset = key_offset_colwise(0, i, 1);
1179  this_buff[key_offset] = entry[i];
1180  }
1181 
1182  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1183  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1184  this_buff[slot_offset] = entry[key_count + i];
1185  }
1186 }
1187 
1189  const auto key_count = query_mem_desc_.getGroupbyColCount();
1190  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1192  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1193  const auto first_key_off =
1195  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1196  this_buff[first_key_off + i] = EMPTY_KEY_64;
1197  }
1198  }
1199  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1200  const auto first_val_off =
1201  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1202  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1203  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1204  }
1205  }
1206 }
1207 
1208 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1209  CHECK(entry_slots);
1211  size_t slot_off = 0;
1212  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1213  entry_slots[slot_off] = target_init_vals_[j];
1214  slot_off += query_mem_desc_.getEntryCount();
1215  }
1216  } else {
1217  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1218  entry_slots[j] = target_init_vals_[j];
1219  }
1220  }
1221 }
1222 
1223 #define AGGREGATE_ONE_VALUE( \
1224  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1225  do { \
1226  const auto sql_type = get_compact_type(agg_info__); \
1227  if (sql_type.is_fp()) { \
1228  if (chosen_bytes__ == sizeof(float)) { \
1229  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1230  *reinterpret_cast<const float*>(other_ptr__)); \
1231  } else { \
1232  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1233  *reinterpret_cast<const double*>(other_ptr__)); \
1234  } \
1235  } else { \
1236  if (chosen_bytes__ == sizeof(int32_t)) { \
1237  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1238  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1239  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1240  } else { \
1241  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1242  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1243  agg_##agg_kind__(val_ptr, *other_ptr); \
1244  } \
1245  } \
1246  } while (0)
1247 
1248 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1249  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1250  do { \
1251  if (agg_info__.skip_null_val) { \
1252  const auto sql_type = get_compact_type(agg_info__); \
1253  if (sql_type.is_fp()) { \
1254  if (chosen_bytes__ == sizeof(float)) { \
1255  agg_##agg_kind__##_float_skip_val( \
1256  reinterpret_cast<int32_t*>(val_ptr__), \
1257  *reinterpret_cast<const float*>(other_ptr__), \
1258  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1259  } else { \
1260  agg_##agg_kind__##_double_skip_val( \
1261  reinterpret_cast<int64_t*>(val_ptr__), \
1262  *reinterpret_cast<const double*>(other_ptr__), \
1263  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1264  } \
1265  } else { \
1266  if (chosen_bytes__ == sizeof(int32_t)) { \
1267  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1268  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1269  const auto null_val = static_cast<int32_t>(init_val__); \
1270  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1271  } else { \
1272  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1273  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1274  const auto null_val = static_cast<int64_t>(init_val__); \
1275  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1276  } \
1277  } \
1278  } else { \
1279  AGGREGATE_ONE_VALUE( \
1280  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1281  } \
1282  } while (0)
1283 
1284 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1285  do { \
1286  if (chosen_bytes__ == sizeof(int32_t)) { \
1287  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1288  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1289  agg_sum_int32(val_ptr, *other_ptr); \
1290  } else { \
1291  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1292  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1293  agg_sum(val_ptr, *other_ptr); \
1294  } \
1295  } while (0)
1296 
1297 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1298  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1299  { \
1300  if (agg_info__.skip_null_val) { \
1301  const auto sql_type = get_compact_type(agg_info__); \
1302  if (sql_type.is_fp()) { \
1303  if (chosen_bytes__ == sizeof(float)) { \
1304  agg_sum_float_skip_val( \
1305  reinterpret_cast<int32_t*>(val_ptr__), \
1306  *reinterpret_cast<const float*>(other_ptr__), \
1307  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1308  } else { \
1309  agg_sum_double_skip_val( \
1310  reinterpret_cast<int64_t*>(val_ptr__), \
1311  *reinterpret_cast<const double*>(other_ptr__), \
1312  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1313  } \
1314  } else { \
1315  if (chosen_bytes__ == sizeof(int32_t)) { \
1316  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1317  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1318  const auto null_val = static_cast<int32_t>(init_val__); \
1319  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1320  } else { \
1321  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1322  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1323  const auto null_val = static_cast<int64_t>(init_val__); \
1324  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1325  } \
1326  } \
1327  } else { \
1328  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1329  } \
1330  }
1331 
1332 // to be used for 8/16-bit kMIN and kMAX only
1333 #define AGGREGATE_ONE_VALUE_SMALL( \
1334  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1335  do { \
1336  if (chosen_bytes__ == sizeof(int16_t)) { \
1337  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1338  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1339  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1340  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1341  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1342  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1343  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1344  } else { \
1345  UNREACHABLE(); \
1346  } \
1347  } while (0)
1348 
1349 // to be used for 8/16-bit kMIN and kMAX only
1350 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1351  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1352  do { \
1353  if (agg_info__.skip_null_val) { \
1354  if (chosen_bytes__ == sizeof(int16_t)) { \
1355  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1356  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1357  const auto null_val = static_cast<int16_t>(init_val__); \
1358  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1359  } else if (chosen_bytes == sizeof(int8_t)) { \
1360  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1361  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1362  const auto null_val = static_cast<int8_t>(init_val__); \
1363  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1364  } \
1365  } else { \
1366  AGGREGATE_ONE_VALUE_SMALL( \
1367  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1368  } \
1369  } while (0)
1370 
1371 int8_t result_set::get_width_for_slot(const size_t target_slot_idx,
1372  const bool float_argument_input,
1374  if (float_argument_input) {
1375  return sizeof(float);
1376  }
1377  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1378 }
1379 
1381  const TargetInfo& target_info,
1382  const size_t target_slot_idx,
1383  const size_t init_agg_val_idx,
1384  const int8_t* that_ptr1) const {
1385  const bool float_argument_input = takes_float_argument(target_info);
1386  const auto chosen_bytes = result_set::get_width_for_slot(
1387  target_slot_idx, float_argument_input, query_mem_desc_);
1388  auto init_val = target_init_vals_[init_agg_val_idx];
1389 
1390  auto reduce = [&](auto const& size_tag) {
1391  using CastTarget = std::decay_t<decltype(size_tag)>;
1392  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1393  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1394  if (rhs_proj_col == init_val) {
1395  // ignore
1396  } else if (lhs_proj_col == init_val) {
1397  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1398  } else if (lhs_proj_col != rhs_proj_col) {
1399  throw std::runtime_error("Multiple distinct values encountered");
1400  }
1401  };
1402 
1403  switch (chosen_bytes) {
1404  case 1: {
1406  reduce(int8_t());
1407  break;
1408  }
1409  case 2: {
1411  reduce(int16_t());
1412  break;
1413  }
1414  case 4: {
1415  reduce(int32_t());
1416  break;
1417  }
1418  case 8: {
1419  CHECK(!target_info.sql_type.is_varlen());
1420  reduce(int64_t());
1421  break;
1422  }
1423  default:
1424  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1425  }
1426 }
1427 
1429  int8_t* this_ptr1,
1430  int8_t* this_ptr2,
1431  const int8_t* that_ptr1,
1432  const int8_t* that_ptr2,
1433  const TargetInfo& target_info,
1434  const size_t target_logical_idx,
1435  const size_t target_slot_idx,
1436  const size_t init_agg_val_idx,
1437  const ResultSetStorage& that,
1438  const size_t first_slot_idx_for_target,
1439  const std::vector<std::string>& serialized_varlen_buffer) const {
1441  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1442  return;
1443  }
1444  }
1445  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1446  const bool float_argument_input = takes_float_argument(target_info);
1447  const auto chosen_bytes = result_set::get_width_for_slot(
1448  target_slot_idx, float_argument_input, query_mem_desc_);
1449  int64_t init_val = target_init_vals_[init_agg_val_idx]; // skip_val for nullable types
1450 
1451  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1453  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1454  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1455  switch (target_info.agg_kind) {
1456  case kCOUNT:
1457  case kAPPROX_COUNT_DISTINCT: {
1458  if (is_distinct_target(target_info)) {
1459  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1460  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1461  break;
1462  }
1463  CHECK_EQ(int64_t(0), init_val);
1464  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1465  break;
1466  }
1467  case kAVG: {
1468  // Ignore float argument compaction for count component for fear of its overflow
1469  AGGREGATE_ONE_COUNT(this_ptr2,
1470  that_ptr2,
1471  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1472  }
1473  // fall thru
1474  case kSUM: {
1476  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1477  break;
1478  }
1479  case kMIN: {
1480  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1482  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1483  } else {
1485  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1486  }
1487  break;
1488  }
1489  case kMAX: {
1490  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1492  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1493  } else {
1495  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1496  }
1497  break;
1498  }
1499  case kAPPROX_MEDIAN:
1500  CHECK_EQ(static_cast<int8_t>(sizeof(int64_t)), chosen_bytes);
1501  reduceOneApproxMedianSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1502  break;
1503  default:
1504  UNREACHABLE() << toString(target_info.agg_kind);
1505  }
1506  } else {
1507  switch (chosen_bytes) {
1508  case 1: {
1510  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1511  if (rhs_proj_col != init_val) {
1512  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1513  }
1514  break;
1515  }
1516  case 2: {
1518  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1519  if (rhs_proj_col != init_val) {
1520  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1521  }
1522  break;
1523  }
1524  case 4: {
1525  CHECK(target_info.agg_kind != kSAMPLE ||
1527  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1528  if (rhs_proj_col != init_val) {
1529  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1530  }
1531  break;
1532  }
1533  case 8: {
1534  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1535  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1536  !serialized_varlen_buffer.empty()) {
1537  size_t length_to_elems{0};
1538  if (target_info.sql_type.is_geometry()) {
1539  // TODO: Assumes hard-coded sizes for geometry targets
1540  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1541  } else {
1542  const auto& elem_ti = target_info.sql_type.get_elem_type();
1543  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1544  }
1545 
1546  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1547  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1548  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1549  *reinterpret_cast<int64_t*>(this_ptr1) =
1550  reinterpret_cast<const int64_t>(str_ptr);
1551  *reinterpret_cast<int64_t*>(this_ptr2) =
1552  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1553  } else {
1554  if (rhs_proj_col != init_val) {
1555  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1556  }
1557  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1558  CHECK(this_ptr2 && that_ptr2);
1559  *reinterpret_cast<int64_t*>(this_ptr2) =
1560  *reinterpret_cast<const int64_t*>(that_ptr2);
1561  }
1562  }
1563 
1564  break;
1565  }
1566  default:
1567  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1568  }
1569  }
1570 }
1571 
1573  const int8_t* that_ptr1,
1574  const size_t target_logical_idx,
1575  const ResultSetStorage& that) const {
1577  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1578  auto* incoming = *reinterpret_cast<quantile::TDigest* const*>(that_ptr1);
1579  CHECK(incoming) << "this_ptr1=" << (void*)this_ptr1
1580  << ", that_ptr1=" << (void const*)that_ptr1
1581  << ", target_logical_idx=" << target_logical_idx;
1582  if (incoming->centroids().capacity()) {
1583  auto* accumulator = *reinterpret_cast<quantile::TDigest**>(this_ptr1);
1584  CHECK(accumulator) << "this_ptr1=" << (void*)this_ptr1
1585  << ", that_ptr1=" << (void const*)that_ptr1
1586  << ", target_logical_idx=" << target_logical_idx;
1587  accumulator->allocate();
1588  accumulator->mergeTDigest(*incoming);
1589  }
1590 }
1591 
1593  const int8_t* that_ptr1,
1594  const size_t target_logical_idx,
1595  const ResultSetStorage& that) const {
1597  const auto& old_count_distinct_desc =
1598  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1599  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1600  const auto& new_count_distinct_desc =
1601  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1602  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1603  CHECK(this_ptr1 && that_ptr1);
1604  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1605  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1607  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1608 }
1609 
1610 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1611  const int8_t warp_count,
1612  const bool is_columnar,
1613  const bool replace_bitmap_ptr_with_bitmap_sz,
1614  std::vector<int64_t>& agg_vals,
1616  const std::vector<TargetInfo>& targets,
1617  const std::vector<int64_t>& agg_init_vals) {
1618  const size_t agg_col_count{agg_vals.size()};
1619  const auto row_size = query_mem_desc.getRowSize();
1620  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1621  CHECK_GE(agg_col_count, targets.size());
1622  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1623  CHECK(query_mem_desc.hasKeylessHash());
1624  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1625  bool discard_row = true;
1626  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1627  bool discard_partial_result = true;
1628  for (size_t target_idx = 0, agg_col_idx = 0;
1629  target_idx < targets.size() && agg_col_idx < agg_col_count;
1630  ++target_idx, ++agg_col_idx) {
1631  const auto& agg_info = targets[target_idx];
1632  const bool float_argument_input = takes_float_argument(agg_info);
1633  const auto chosen_bytes = float_argument_input
1634  ? sizeof(float)
1635  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1636  auto partial_bin_val = get_component(
1637  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1638  partial_agg_vals[agg_col_idx] = partial_bin_val;
1639  if (is_distinct_target(agg_info)) {
1640  CHECK_EQ(int8_t(1), warp_count);
1641  CHECK(agg_info.is_agg && (agg_info.agg_kind == kCOUNT ||
1642  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1643  partial_bin_val = count_distinct_set_size(
1644  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1645  if (replace_bitmap_ptr_with_bitmap_sz) {
1646  partial_agg_vals[agg_col_idx] = partial_bin_val;
1647  }
1648  }
1649  if (kAVG == agg_info.agg_kind) {
1650  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1651  ++agg_col_idx;
1652  partial_bin_val = partial_agg_vals[agg_col_idx] =
1653  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1654  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1655  }
1656  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1657  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1658  CHECK(agg_info.is_agg);
1659  discard_partial_result = false;
1660  }
1661  }
1662  row_ptr += row_size;
1663  if (discard_partial_result) {
1664  continue;
1665  }
1666  discard_row = false;
1667  for (size_t target_idx = 0, agg_col_idx = 0;
1668  target_idx < targets.size() && agg_col_idx < agg_col_count;
1669  ++target_idx, ++agg_col_idx) {
1670  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1671  const auto& agg_info = targets[target_idx];
1672  const bool float_argument_input = takes_float_argument(agg_info);
1673  const auto chosen_bytes = float_argument_input
1674  ? sizeof(float)
1675  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1676  const auto& chosen_type = get_compact_type(agg_info);
1677  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1678  try {
1679  switch (agg_info.agg_kind) {
1680  case kCOUNT:
1683  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1684  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1685  agg_init_vals[agg_col_idx],
1686  chosen_bytes,
1687  agg_info);
1688  break;
1689  case kAVG:
1690  // Ignore float argument compaction for count component for fear of its
1691  // overflow
1693  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1694  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1695  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1696  // fall thru
1697  case kSUM:
1699  sum,
1700  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1701  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1702  agg_init_vals[agg_col_idx],
1703  chosen_bytes,
1704  agg_info);
1705  break;
1706  case kMIN:
1707  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1709  min,
1710  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1711  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1712  agg_init_vals[agg_col_idx],
1713  chosen_bytes,
1714  agg_info);
1715  } else {
1717  min,
1718  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1719  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1720  agg_init_vals[agg_col_idx],
1721  chosen_bytes,
1722  agg_info);
1723  }
1724  break;
1725  case kMAX:
1726  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1728  max,
1729  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1730  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1731  agg_init_vals[agg_col_idx],
1732  chosen_bytes,
1733  agg_info);
1734  } else {
1736  max,
1737  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1738  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1739  agg_init_vals[agg_col_idx],
1740  chosen_bytes,
1741  agg_info);
1742  }
1743  break;
1744  default:
1745  CHECK(false);
1746  break;
1747  }
1748  } catch (std::runtime_error& e) {
1749  // TODO(miyu): handle the case where chosen_bytes < 8
1750  LOG(ERROR) << e.what();
1751  }
1752  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1753  switch (chosen_bytes) {
1754  case 8:
1755  break;
1756  case 4: {
1757  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1758  if (!(agg_info.agg_kind == kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1759  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1760  }
1761  break;
1762  }
1763  default:
1764  CHECK(false);
1765  }
1766  }
1767  if (kAVG == agg_info.agg_kind) {
1768  ++agg_col_idx;
1769  }
1770  } else {
1771  if (agg_info.agg_kind == kSAMPLE) {
1772  CHECK(!agg_info.sql_type.is_varlen())
1773  << "Interleaved bins reduction not supported for variable length "
1774  "arguments "
1775  "to SAMPLE";
1776  }
1777  if (agg_vals[agg_col_idx]) {
1778  if (agg_info.agg_kind == kSAMPLE) {
1779  continue;
1780  }
1781  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1782  } else {
1783  agg_vals[agg_col_idx] = partial_bin_val;
1784  }
1785  }
1786  }
1787  }
1788  return discard_row;
1789 }
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
void run_reduction_code(const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
#define EMPTY_KEY_64
std::string toString(const ExtArgumentType &sig_type)
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
#define LOG(tag)
Definition: Logger.h:188
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:805
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
bool is_varlen() const
Definition: sqltypes.h:500
void initializeStorage() const
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:134
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
#define CHECK_GT(x, y)
Definition: Logger.h:209
void rewriteVarlenAggregates(ResultSet *)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code) const
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:73
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
static EvalValue run(const Function *function, const std::vector< EvalValue > &inputs)
bool is_agg
Definition: TargetInfo.h:40
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define store_cst(ptr, val)
size_t targetGroupbyIndicesSize() const
std::pair< int64_t *, bool > GroupValueInfo
ResultSet * reduce(std::vector< ResultSet * > &)
Definition: sqldefs.h:75
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
ALWAYS_INLINE void check_watchdog(const size_t sample_seed)
#define LIKELY(x)
Definition: likely.h:24
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:130
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1084
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
#define mapd_cas(address, compare, val)
SQLAgg agg_kind
Definition: TargetInfo.h:41
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:25
NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad)
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:207
void reduceOneApproxMedianSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:76
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
const ColSlotContext & getColSlotContext() const
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:490
#define EMPTY_KEY_32
static std::mutex compilation_mutex_
Definition: Execute.h:1087
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
bool is_string() const
Definition: sqltypes.h:478
Definition: sqldefs.h:74
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:24
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:697
size_t getBufferColSlotCount() const
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:72
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
QueryMemoryDescriptor query_mem_desc_
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const