OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "DynamicWatchdog.h"
26 #include "Execute.h"
27 #include "ResultSet.h"
29 #include "ResultSetReductionJIT.h"
30 #include "RuntimeFunctions.h"
31 #include "Shared/SqlTypesLayout.h"
32 #include "Shared/likely.h"
33 #include "Shared/thread_count.h"
34 
35 #include <llvm/ExecutionEngine/GenericValue.h>
36 
37 #include <algorithm>
38 #include <future>
39 #include <numeric>
40 
41 extern bool g_enable_dynamic_watchdog;
42 
43 namespace {
44 
45 bool use_multithreaded_reduction(const size_t entry_count) {
46  return entry_count > 100000;
47 }
48 
50  const auto row_bytes = get_row_bytes(query_mem_desc);
51  CHECK_EQ(size_t(0), row_bytes % 8);
52  return row_bytes / 8;
53 }
54 
55 std::vector<int64_t> make_key(const int64_t* buff,
56  const size_t entry_count,
57  const size_t key_count) {
58  std::vector<int64_t> key;
59  size_t off = 0;
60  for (size_t i = 0; i < key_count; ++i) {
61  key.push_back(buff[off]);
62  off += entry_count;
63  }
64  return key;
65 }
66 
67 void fill_slots(int64_t* dst_entry,
68  const size_t dst_entry_count,
69  const int64_t* src_buff,
70  const size_t src_entry_idx,
71  const size_t src_entry_count,
73  const auto slot_count = query_mem_desc.getBufferColSlotCount();
74  const auto key_count = query_mem_desc.getGroupbyColCount();
75  if (query_mem_desc.didOutputColumnar()) {
76  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
77  ++i, dst_slot_off += dst_entry_count) {
78  dst_entry[dst_slot_off] =
79  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
80  }
81  } else {
82  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
83  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
84  for (size_t i = 0; i < slot_count; ++i) {
85  dst_entry[i] = row_ptr[slot_off_quad + i];
86  }
87  }
88 }
89 
91 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
92  for (size_t i = 0; i < key_count; ++i) {
93  key_ptr_i32[i] = EMPTY_KEY_32;
94  }
95 }
96 
98 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
99  for (size_t i = 0; i < key_count; ++i) {
100  key_ptr_i64[i] = EMPTY_KEY_64;
101  }
102 }
103 
104 inline int64_t get_component(const int8_t* group_by_buffer,
105  const size_t comp_sz,
106  const size_t index = 0) {
107  int64_t ret = std::numeric_limits<int64_t>::min();
108  switch (comp_sz) {
109  case 1: {
110  ret = group_by_buffer[index];
111  break;
112  }
113  case 2: {
114  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
115  ret = buffer_ptr[index];
116  break;
117  }
118  case 4: {
119  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
120  ret = buffer_ptr[index];
121  break;
122  }
123  case 8: {
124  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
125  ret = buffer_ptr[index];
126  break;
127  }
128  default:
129  CHECK(false);
130  }
131  return ret;
132 }
133 
134 void run_reduction_code(const size_t executor_id,
135  const ReductionCode& reduction_code,
136  int8_t* this_buff,
137  const int8_t* that_buff,
138  const int32_t start_entry_index,
139  const int32_t end_entry_index,
140  const int32_t that_entry_count,
141  const void* this_qmd,
142  const void* that_qmd,
143  const void* serialized_varlen_buffer) {
144  int err = 0;
145  if (reduction_code.func_ptr) {
146  err = reduction_code.func_ptr(this_buff,
147  that_buff,
148  start_entry_index,
149  end_entry_index,
150  that_entry_count,
151  this_qmd,
152  that_qmd,
153  serialized_varlen_buffer);
154  } else {
155  auto ret = ReductionInterpreter::run(
156  executor_id,
157  reduction_code.ir_reduce_loop.get(),
160  ReductionInterpreter::MakeEvalValue(start_entry_index),
161  ReductionInterpreter::MakeEvalValue(end_entry_index),
162  ReductionInterpreter::MakeEvalValue(that_entry_count),
165  ReductionInterpreter::MakeEvalValue(serialized_varlen_buffer)});
166  err = ret.int_val;
167  }
168  if (err) {
170  throw std::runtime_error("Multiple distinct values encountered");
171  }
172  if (err == Executor::ERR_INTERRUPTED) {
173  throw std::runtime_error(
174  "Query execution has interrupted during result set reduction");
175  }
176  throw std::runtime_error(
177  "Query execution has exceeded the time limit or was interrupted during result "
178  "set reduction");
179  }
180 }
181 
182 } // namespace
183 
184 void result_set::fill_empty_key(void* key_ptr,
185  const size_t key_count,
186  const size_t key_width) {
187  switch (key_width) {
188  case 4: {
189  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
190  fill_empty_key_32(key_ptr_i32, key_count);
191  break;
192  }
193  case 8: {
194  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
195  fill_empty_key_64(key_ptr_i64, key_count);
196  break;
197  }
198  default:
199  CHECK(false);
200  }
201 }
202 
203 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
204 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
206  const std::vector<std::string>& serialized_varlen_buffer,
207  const ReductionCode& reduction_code,
208  const size_t executor_id) const {
209  auto entry_count = query_mem_desc_.getEntryCount();
210  CHECK_GT(entry_count, size_t(0));
218  }
219  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
222  CHECK_GE(entry_count, that_entry_count);
223  break;
224  default:
225  CHECK_EQ(entry_count, that_entry_count);
226  }
227  auto this_buff = buff_;
228  CHECK(this_buff);
229  auto that_buff = that.buff_;
230  CHECK(that_buff);
233  if (!serialized_varlen_buffer.empty()) {
234  throw std::runtime_error(
235  "Projection of variable length targets with baseline hash group by is not yet "
236  "supported in Distributed mode");
237  }
238  if (use_multithreaded_reduction(that_entry_count)) {
239  const size_t thread_count = cpu_threads();
240  std::vector<std::future<void>> reduction_threads;
241  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
242  const auto thread_entry_count =
243  (that_entry_count + thread_count - 1) / thread_count;
244  const auto start_index = thread_idx * thread_entry_count;
245  const auto end_index =
246  std::min(start_index + thread_entry_count, that_entry_count);
247  reduction_threads.emplace_back(std::async(
249  [this,
250  this_buff,
251  that_buff,
252  start_index,
253  end_index,
254  that_entry_count,
255  executor_id,
256  &reduction_code,
257  &that] {
258  if (reduction_code.ir_reduce_loop) {
259  run_reduction_code(executor_id,
260  reduction_code,
261  this_buff,
262  that_buff,
263  start_index,
264  end_index,
265  that_entry_count,
267  &that.query_mem_desc_,
268  nullptr);
269  } else {
270  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
272  this_buff, that_buff, entry_idx, that_entry_count, that);
273  }
274  }
275  }));
276  }
277  for (auto& reduction_thread : reduction_threads) {
278  reduction_thread.wait();
279  }
280  for (auto& reduction_thread : reduction_threads) {
281  reduction_thread.get();
282  }
283  } else {
284  if (reduction_code.ir_reduce_loop) {
285  run_reduction_code(executor_id,
286  reduction_code,
287  this_buff,
288  that_buff,
289  0,
290  that_entry_count,
291  that_entry_count,
293  &that.query_mem_desc_,
294  nullptr);
295  } else {
296  for (size_t i = 0; i < that_entry_count; ++i) {
297  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
298  }
299  }
300  }
301  return;
302  }
303  if (use_multithreaded_reduction(entry_count)) {
304  const size_t thread_count = cpu_threads();
305  std::vector<std::future<void>> reduction_threads;
306  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
307  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
308  const auto start_index = thread_idx * thread_entry_count;
309  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
310  if (query_mem_desc_.didOutputColumnar()) {
311  reduction_threads.emplace_back(std::async(std::launch::async,
312  [this,
313  this_buff,
314  that_buff,
315  start_index,
316  end_index,
317  &that,
318  &serialized_varlen_buffer,
319  &executor_id] {
320  reduceEntriesNoCollisionsColWise(
321  this_buff,
322  that_buff,
323  that,
324  start_index,
325  end_index,
326  serialized_varlen_buffer,
327  executor_id);
328  }));
329  } else {
330  reduction_threads.emplace_back(std::async(std::launch::async,
331  [this,
332  this_buff,
333  that_buff,
334  start_index,
335  end_index,
336  that_entry_count,
337  executor_id,
338  &reduction_code,
339  &that,
340  &serialized_varlen_buffer] {
341  CHECK(reduction_code.ir_reduce_loop);
343  executor_id,
344  reduction_code,
345  this_buff,
346  that_buff,
347  start_index,
348  end_index,
349  that_entry_count,
350  &query_mem_desc_,
351  &that.query_mem_desc_,
352  &serialized_varlen_buffer);
353  }));
354  }
355  }
356  for (auto& reduction_thread : reduction_threads) {
357  reduction_thread.wait();
358  }
359  for (auto& reduction_thread : reduction_threads) {
360  reduction_thread.get();
361  }
362  } else {
363  if (query_mem_desc_.didOutputColumnar()) {
364  reduceEntriesNoCollisionsColWise(this_buff,
365  that_buff,
366  that,
367  0,
368  query_mem_desc_.getEntryCount(),
369  serialized_varlen_buffer,
370  executor_id);
371  } else {
372  CHECK(reduction_code.ir_reduce_loop);
373  run_reduction_code(executor_id,
374  reduction_code,
375  this_buff,
376  that_buff,
377  0,
378  entry_count,
379  that_entry_count,
380  &query_mem_desc_,
381  &that.query_mem_desc_,
382  &serialized_varlen_buffer);
383  }
384  }
385 }
386 
387 namespace {
388 
390  if (UNLIKELY(dynamic_watchdog())) {
391  // TODO(alex): distinguish between the deadline and interrupt
392  throw std::runtime_error(
393  "Query execution has exceeded the time limit or was interrupted during result "
394  "set reduction");
395  }
396 }
397 
398 ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed) {
399  if (UNLIKELY((sample_seed & 0x3F) == 0 && dynamic_watchdog())) {
400  // TODO(alex): distinguish between the deadline and interrupt
401  throw std::runtime_error(
402  "Query execution has exceeded the time limit or was interrupted during result "
403  "set reduction");
404  }
405 }
406 
407 } // namespace
408 
410  int8_t* this_buff,
411  const int8_t* that_buff,
412  const ResultSetStorage& that,
413  const size_t start_index,
414  const size_t end_index,
415  const std::vector<std::string>& serialized_varlen_buffer,
416  const size_t executor_id) const {
417  // TODO(adb / saman): Support column wise output when serializing distributed agg
418  // functions
419  CHECK(serialized_varlen_buffer.empty());
420 
421  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
422 
423  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
424  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
425  auto executor = Executor::getExecutor(executor_id);
426  CHECK(executor);
427  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
428  const auto& agg_info = targets_[target_idx];
429  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
430 
431  bool two_slot_target{false};
432  if (agg_info.is_agg &&
433  (agg_info.agg_kind == kAVG ||
434  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
435  // Note that this assumes if one of the slot pairs in a given target is an array,
436  // all slot pairs are arrays. Currently this is true for all geo targets, but we
437  // should better codify and store this information in the future
438  two_slot_target = true;
439  }
441  executor->checkNonKernelTimeInterrupted())) {
442  throw std::runtime_error(
443  "Query execution was interrupted during result set reduction");
444  }
446  check_watchdog();
447  }
448  for (size_t target_slot_idx = slots_for_col.front();
449  target_slot_idx < slots_for_col.back() + 1;
450  target_slot_idx += 2) {
451  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
452  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
453  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
454  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
455  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
456  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
457  continue;
458  }
460  // copy the key from right hand side
461  copyKeyColWise(entry_idx, this_buff, that_buff);
462  }
463  auto this_ptr1 =
464  this_crt_col_ptr +
465  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
466  auto that_ptr1 =
467  that_crt_col_ptr +
468  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
469  int8_t* this_ptr2{nullptr};
470  const int8_t* that_ptr2{nullptr};
471  if (UNLIKELY(two_slot_target)) {
472  this_ptr2 =
473  this_next_col_ptr +
474  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
475  that_ptr2 =
476  that_next_col_ptr +
477  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
478  }
479  reduceOneSlot(this_ptr1,
480  this_ptr2,
481  that_ptr1,
482  that_ptr2,
483  agg_info,
484  target_idx,
485  target_slot_idx,
486  target_slot_idx,
487  that,
488  slots_for_col.front(),
489  serialized_varlen_buffer);
490  }
491 
492  this_crt_col_ptr = this_next_col_ptr;
493  that_crt_col_ptr = that_next_col_ptr;
494  if (UNLIKELY(two_slot_target)) {
495  this_crt_col_ptr = advance_to_next_columnar_target_buff(
496  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
497  that_crt_col_ptr = advance_to_next_columnar_target_buff(
498  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
499  }
500  }
501  }
502 }
503 
504 /*
505  * copy all keys from the columnar prepended group buffer of "that_buff" into
506  * "this_buff"
507  */
508 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
509  int8_t* this_buff,
510  const int8_t* that_buff) const {
512  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
513  group_idx++) {
514  // if the column corresponds to a group key
515  const auto column_offset_bytes =
517  auto lhs_key_ptr = this_buff + column_offset_bytes;
518  auto rhs_key_ptr = that_buff + column_offset_bytes;
519  switch (query_mem_desc_.groupColWidth(group_idx)) {
520  case 8:
521  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
522  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
523  break;
524  case 4:
525  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
526  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
527  break;
528  case 2:
529  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
530  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
531  break;
532  case 1:
533  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
534  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
535  break;
536  default:
537  CHECK(false);
538  break;
539  }
540  }
541 }
542 
543 // Rewrites the entries of this ResultSetStorage object to point directly into the
544 // serialized_varlen_buffer rather than using offsets.
546  const std::vector<std::string>& serialized_varlen_buffer) const {
547  if (serialized_varlen_buffer.empty()) {
548  return;
549  }
550 
552  auto entry_count = query_mem_desc_.getEntryCount();
553  CHECK_GT(entry_count, size_t(0));
554  CHECK(buff_);
555 
556  // Row-wise iteration, consider moving to separate function
557  for (size_t i = 0; i < entry_count; ++i) {
558  if (isEmptyEntry(i, buff_)) {
559  continue;
560  }
561  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
562  const auto key_bytes_with_padding = align_to_int64(key_bytes);
563  auto rowwise_targets_ptr =
564  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
565  size_t target_slot_idx = 0;
566  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
567  ++target_logical_idx) {
568  const auto& target_info = targets_[target_logical_idx];
569  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
570  CHECK(target_info.agg_kind == kSAMPLE);
571  auto ptr1 = rowwise_targets_ptr;
572  auto slot_idx = target_slot_idx;
573  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
574  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
575 
576  const auto& elem_ti = target_info.sql_type.get_elem_type();
577  size_t length_to_elems =
578  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
579  ? 1
580  : elem_ti.get_size();
581  if (target_info.sql_type.is_geometry()) {
582  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
583  if (j > 0) {
584  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
585  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
586  slot_idx += 2;
587  length_to_elems = 4;
588  }
589  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
590  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
591  const auto str_ptr =
592  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
593  CHECK(ptr1);
594  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
595  CHECK(ptr2);
596  *reinterpret_cast<int64_t*>(ptr2) =
597  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
598  }
599  } else {
600  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
601  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
602  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
603  CHECK(ptr1);
604  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
605  CHECK(ptr2);
606  *reinterpret_cast<int64_t*>(ptr2) =
607  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
608  }
609  }
610 
611  rowwise_targets_ptr = advance_target_ptr_row_wise(
612  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
613  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
614  }
615  }
616 
617  return;
618 }
619 
620 namespace {
621 
622 #ifdef _MSC_VER
623 #define mapd_cas(address, compare, val) \
624  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
625  static_cast<long>(val), \
626  static_cast<long>(compare))
627 #else
628 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
629 #endif
630 
632  const uint32_t h,
633  const int64_t* key,
634  const uint32_t key_qw_count,
635  const size_t entry_count) {
636  auto off = h;
637  const auto old_key = mapd_cas(&groups_buffer[off], EMPTY_KEY_64, *key);
638  if (old_key == EMPTY_KEY_64) {
639  for (size_t i = 0; i < key_qw_count; ++i) {
640  groups_buffer[off] = key[i];
641  off += entry_count;
642  }
643  return {&groups_buffer[off], true};
644  }
645  off = h;
646  for (size_t i = 0; i < key_qw_count; ++i) {
647  if (groups_buffer[off] != key[i]) {
648  return {nullptr, true};
649  }
650  off += entry_count;
651  }
652  return {&groups_buffer[off], false};
653 }
654 
655 #undef mapd_cas
656 
657 // TODO(alex): fix synchronization when we enable it
659  int64_t* groups_buffer,
660  const uint32_t groups_buffer_entry_count,
661  const int64_t* key,
662  const uint32_t key_qw_count) {
663  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
665  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
666  if (matching_gvi.first) {
667  return matching_gvi;
668  }
669  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
670  while (h_probe != h) {
672  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
673  if (matching_gvi.first) {
674  return matching_gvi;
675  }
676  h_probe = (h_probe + 1) % groups_buffer_entry_count;
677  }
678  return {nullptr, true};
679 }
680 
681 #ifdef _MSC_VER
682 #define cas_cst(ptr, expected, desired) \
683  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
684  reinterpret_cast<void*>(&desired), \
685  expected) == expected)
686 #define store_cst(ptr, val) \
687  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
688  reinterpret_cast<void*>(val))
689 #define load_cst(ptr) \
690  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
691 #else
692 #define cas_cst(ptr, expected, desired) \
693  __atomic_compare_exchange_n( \
694  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
695 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
696 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
697 #endif
698 
699 template <typename T = int64_t>
701  int64_t* groups_buffer,
702  const uint32_t h,
703  const T* key,
704  const uint32_t key_count,
706  const int64_t* that_buff_i64,
707  const size_t that_entry_idx,
708  const size_t that_entry_count,
709  const uint32_t row_size_quad) {
710  auto off = h * row_size_quad;
711  T empty_key = get_empty_key<T>();
712  T write_pending = get_empty_key<T>() - 1;
713  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
714  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
715  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
716  if (success) {
717  fill_slots(groups_buffer + off + slot_off_quad,
718  query_mem_desc.getEntryCount(),
719  that_buff_i64,
720  that_entry_idx,
721  that_entry_count,
723  if (key_count > 1) {
724  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
725  }
726  store_cst(row_ptr, *key);
727  return {groups_buffer + off + slot_off_quad, true};
728  }
729  while (load_cst(row_ptr) == write_pending) {
730  // spin until the winning thread has finished writing the entire key and the init
731  // value
732  }
733  for (size_t i = 0; i < key_count; ++i) {
734  if (load_cst(row_ptr + i) != key[i]) {
735  return {nullptr, true};
736  }
737  }
738  return {groups_buffer + off + slot_off_quad, false};
739 }
740 
741 #undef load_cst
742 #undef store_cst
743 #undef cas_cst
744 
746  int64_t* groups_buffer,
747  const uint32_t h,
748  const int64_t* key,
749  const uint32_t key_count,
750  const size_t key_width,
752  const int64_t* that_buff_i64,
753  const size_t that_entry_idx,
754  const size_t that_entry_count,
755  const uint32_t row_size_quad) {
756  switch (key_width) {
757  case 4:
758  return get_matching_group_value_reduction(groups_buffer,
759  h,
760  reinterpret_cast<const int32_t*>(key),
761  key_count,
762  query_mem_desc,
763  that_buff_i64,
764  that_entry_idx,
765  that_entry_count,
766  row_size_quad);
767  case 8:
768  return get_matching_group_value_reduction(groups_buffer,
769  h,
770  key,
771  key_count,
772  query_mem_desc,
773  that_buff_i64,
774  that_entry_idx,
775  that_entry_count,
776  row_size_quad);
777  default:
778  CHECK(false);
779  return {nullptr, true};
780  }
781 }
782 
783 } // namespace
784 
786  int64_t* groups_buffer,
787  const uint32_t groups_buffer_entry_count,
788  const int64_t* key,
789  const uint32_t key_count,
790  const size_t key_width,
792  const int64_t* that_buff_i64,
793  const size_t that_entry_idx,
794  const size_t that_entry_count,
795  const uint32_t row_size_quad) {
796  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
797  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
798  h,
799  key,
800  key_count,
801  key_width,
802  query_mem_desc,
803  that_buff_i64,
804  that_entry_idx,
805  that_entry_count,
806  row_size_quad);
807  if (matching_gvi.first) {
808  return matching_gvi;
809  }
810  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
811  while (h_probe != h) {
812  matching_gvi = get_matching_group_value_reduction(groups_buffer,
813  h_probe,
814  key,
815  key_count,
816  key_width,
817  query_mem_desc,
818  that_buff_i64,
819  that_entry_idx,
820  that_entry_count,
821  row_size_quad);
822  if (matching_gvi.first) {
823  return matching_gvi;
824  }
825  h_probe = (h_probe + 1) % groups_buffer_entry_count;
826  }
827  return {nullptr, true};
828 }
829 
830 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
831 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
833  const int8_t* that_buff,
834  const size_t that_entry_idx,
835  const size_t that_entry_count,
836  const ResultSetStorage& that) const {
838  check_watchdog_with_seed(that_entry_idx);
839  }
840  const auto key_count = query_mem_desc_.getGroupbyColCount();
845  const auto key_off =
847  if (isEmptyEntry(that_entry_idx, that_buff)) {
848  return;
849  }
850  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
851  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
852  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
853  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
854  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
855  CHECK(this_entry_slots);
856  if (empty_entry) {
857  fill_slots(this_entry_slots,
859  that_buff_i64,
860  that_entry_idx,
861  that_entry_count,
863  return;
864  }
866  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
867 }
868 
869 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
870  const int64_t* that_buff,
871  const size_t that_entry_idx,
872  const size_t that_entry_count,
873  const ResultSetStorage& that) const {
875  const auto key_count = query_mem_desc_.getGroupbyColCount();
876  size_t j = 0;
877  size_t init_agg_val_idx = 0;
878  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
879  ++target_logical_idx) {
880  const auto& target_info = targets_[target_logical_idx];
881  const auto that_slot_off = slot_offset_colwise(
882  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
883  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
884  reduceOneSlotBaseline(this_entry_slots,
885  this_slot_off,
886  that_buff,
887  that_entry_count,
888  that_slot_off,
889  target_info,
890  target_logical_idx,
891  j,
892  init_agg_val_idx,
893  that);
895  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
896  } else {
897  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
898  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
899  }
900  }
901  j = advance_slot(j, target_info, false);
902  }
903 }
904 
906  const size_t this_slot,
907  const int64_t* that_buff,
908  const size_t that_entry_count,
909  const size_t that_slot,
910  const TargetInfo& target_info,
911  const size_t target_logical_idx,
912  const size_t target_slot_idx,
913  const size_t init_agg_val_idx,
914  const ResultSetStorage& that) const {
916  int8_t* this_ptr2{nullptr};
917  const int8_t* that_ptr2{nullptr};
918  if (target_info.is_agg &&
919  (target_info.agg_kind == kAVG ||
920  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
921  const auto this_count_off = query_mem_desc_.getEntryCount();
922  const auto that_count_off = that_entry_count;
923  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
924  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
925  }
926  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
927  this_ptr2,
928  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
929  that_ptr2,
930  target_info,
931  target_logical_idx,
932  target_slot_idx,
933  init_agg_val_idx,
934  that,
935  target_slot_idx, // dummy, for now
936  {});
937 }
938 
939 // During the reduction of two result sets using the baseline strategy, we first create a
940 // big enough buffer to hold the entries for both and we move the entries from the first
941 // into it before doing the reduction as usual (into the first buffer).
942 template <class KeyType>
944  const size_t new_entry_count) const {
946  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
947  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
948  const auto key_count = query_mem_desc_.getGroupbyColCount();
951  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
952  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
953  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
954 
956  const size_t thread_count = cpu_threads();
957  std::vector<std::future<void>> move_threads;
958 
959  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
960  const auto thread_entry_count =
961  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
962  const auto start_index = thread_idx * thread_entry_count;
963  const auto end_index =
964  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
965  move_threads.emplace_back(std::async(
967  [this,
968  src_buff,
969  new_buff_i64,
970  new_entry_count,
971  start_index,
972  end_index,
973  key_count,
974  row_qw_count,
975  key_byte_width] {
976  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
977  moveOneEntryToBuffer<KeyType>(entry_idx,
978  new_buff_i64,
979  new_entry_count,
980  key_count,
981  row_qw_count,
982  src_buff,
983  key_byte_width);
984  }
985  }));
986  }
987  for (auto& move_thread : move_threads) {
988  move_thread.wait();
989  }
990  for (auto& move_thread : move_threads) {
991  move_thread.get();
992  }
993  } else {
994  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
995  moveOneEntryToBuffer<KeyType>(entry_idx,
996  new_buff_i64,
997  new_entry_count,
998  key_count,
999  row_qw_count,
1000  src_buff,
1001  key_byte_width);
1002  }
1003  }
1004 }
1005 
1006 template <class KeyType>
1007 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
1008  int64_t* new_buff_i64,
1009  const size_t new_entry_count,
1010  const size_t key_count,
1011  const size_t row_qw_count,
1012  const int64_t* src_buff,
1013  const size_t key_byte_width) const {
1014  const auto key_off =
1016  ? key_offset_colwise(entry_index, 0, query_mem_desc_.getEntryCount())
1017  : row_qw_count * entry_index;
1018  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
1019  if (*key_ptr == get_empty_key<KeyType>()) {
1020  return;
1021  }
1022  int64_t* new_entries_ptr{nullptr};
1024  const auto key =
1025  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
1026  new_entries_ptr =
1027  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
1028  } else {
1029  new_entries_ptr = get_group_value(new_buff_i64,
1030  new_entry_count,
1031  &src_buff[key_off],
1032  key_count,
1033  key_byte_width,
1034  row_qw_count);
1035  }
1036  CHECK(new_entries_ptr);
1037  fill_slots(new_entries_ptr,
1038  new_entry_count,
1039  src_buff,
1040  entry_index,
1042  query_mem_desc_);
1043 }
1044 
1046  if (query_mem_desc_.didOutputColumnar()) {
1047  storage_->initializeColWise();
1048  } else {
1049  storage_->initializeRowWise();
1050  }
1051 }
1052 
1053 // Driver for reductions. Needed because the result of a reduction on the baseline
1054 // layout, which can have collisions, cannot be done in place and something needs
1055 // to take the ownership of the new result set with the bigger underlying buffer.
1056 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets,
1057  const size_t executor_id) {
1058  CHECK(!result_sets.empty());
1059  auto result_rs = result_sets.front();
1060  CHECK(result_rs->storage_);
1061  auto& first_result = *result_rs->storage_;
1062  auto result = &first_result;
1063  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1064  for (const auto result_set : result_sets) {
1065  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1066  }
1067  const auto catalog = result_rs->catalog_;
1068  for (const auto result_set : result_sets) {
1069  CHECK_EQ(catalog, result_set->catalog_);
1070  }
1071  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1073  const auto total_entry_count =
1074  std::accumulate(result_sets.begin(),
1075  result_sets.end(),
1076  size_t(0),
1077  [](const size_t init, const ResultSet* rs) {
1078  return init + rs->query_mem_desc_.getEntryCount();
1079  });
1080  CHECK(total_entry_count);
1081  auto query_mem_desc = first_result.query_mem_desc_;
1082  query_mem_desc.setEntryCount(total_entry_count);
1083  rs_.reset(new ResultSet(first_result.targets_,
1086  row_set_mem_owner,
1087  catalog,
1088  0,
1089  0));
1090  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1091  rs_->initializeStorage();
1092  switch (query_mem_desc.getEffectiveKeyWidth()) {
1093  case 4:
1094  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1095  query_mem_desc.getEntryCount());
1096  break;
1097  case 8:
1098  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1099  query_mem_desc.getEntryCount());
1100  break;
1101  default:
1102  CHECK(false);
1103  }
1104  result = rs_->storage_.get();
1105  result_rs = rs_.get();
1106  }
1107 
1108  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1109  if (!serialized_varlen_buffer.empty()) {
1110  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1111  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1112  ++result_it) {
1113  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1114  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1115  serialized_varlen_buffer.emplace_back(
1116  std::move(result_serialized_varlen_buffer.front()));
1117  }
1118  }
1119 
1120  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1121  result_rs->getTargetInfos(),
1122  result_rs->getTargetInitVals(),
1123  executor_id);
1124  auto reduction_code = reduction_jit.codegen();
1125  size_t ctr = 1;
1126  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1127  ++result_it) {
1128  if (!serialized_varlen_buffer.empty()) {
1129  result->reduce(*((*result_it)->storage_),
1130  serialized_varlen_buffer[ctr++],
1131  reduction_code,
1132  executor_id);
1133  } else {
1134  result->reduce(*((*result_it)->storage_), {}, reduction_code, executor_id);
1135  }
1136  }
1137  return result_rs;
1138 }
1139 
1140 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1141  return rs_;
1142 }
1143 
1144 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1145  auto& result_storage = result_rs->storage_;
1146  result_storage->rewriteAggregateBufferOffsets(
1147  result_rs->serialized_varlen_buffer_.front());
1148 }
1149 
1150 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1151  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1152  const auto key_count = query_mem_desc_.getGroupbyColCount();
1153  CHECK_EQ(slot_count + key_count, entry.size());
1154  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1156  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1157  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1158  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1159  for (size_t i = 0; i < key_count; ++i) {
1160  this_buff[key_off + i] = entry[i];
1161  }
1162  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1163  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1164  this_buff[first_slot_off + i] = entry[key_count + i];
1165  }
1166 }
1167 
1169  const auto key_count = query_mem_desc_.getGroupbyColCount();
1170  const auto row_size = get_row_bytes(query_mem_desc_);
1171  CHECK_EQ(row_size % 8, 0u);
1172  const auto key_bytes_with_padding =
1176  case 4: {
1177  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1178  auto row_ptr = buff_ + i * row_size;
1179  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1180  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1181  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1182  slot_ptr[j] = target_init_vals_[j];
1183  }
1184  }
1185  break;
1186  }
1187  case 8: {
1188  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1189  auto row_ptr = buff_ + i * row_size;
1190  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1191  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1192  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1193  slot_ptr[j] = target_init_vals_[j];
1194  }
1195  }
1196  break;
1197  }
1198  default:
1199  CHECK(false);
1200  }
1201 }
1202 
1203 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1205  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1206  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1207  const auto key_count = query_mem_desc_.getGroupbyColCount();
1208  CHECK_EQ(slot_count + key_count, entry.size());
1209  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1210 
1211  for (size_t i = 0; i < key_count; i++) {
1212  const auto key_offset = key_offset_colwise(0, i, 1);
1213  this_buff[key_offset] = entry[i];
1214  }
1215 
1216  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1217  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1218  this_buff[slot_offset] = entry[key_count + i];
1219  }
1220 }
1221 
1223  const auto key_count = query_mem_desc_.getGroupbyColCount();
1224  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1226  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1227  const auto first_key_off =
1229  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1230  this_buff[first_key_off + i] = EMPTY_KEY_64;
1231  }
1232  }
1233  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1234  const auto first_val_off =
1235  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1236  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1237  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1238  }
1239  }
1240 }
1241 
1242 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1243  CHECK(entry_slots);
1245  size_t slot_off = 0;
1246  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1247  entry_slots[slot_off] = target_init_vals_[j];
1248  slot_off += query_mem_desc_.getEntryCount();
1249  }
1250  } else {
1251  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1252  entry_slots[j] = target_init_vals_[j];
1253  }
1254  }
1255 }
1256 
1257 #define AGGREGATE_ONE_VALUE( \
1258  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1259  do { \
1260  const auto sql_type = get_compact_type(agg_info__); \
1261  if (sql_type.is_fp()) { \
1262  if (chosen_bytes__ == sizeof(float)) { \
1263  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1264  *reinterpret_cast<const float*>(other_ptr__)); \
1265  } else { \
1266  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1267  *reinterpret_cast<const double*>(other_ptr__)); \
1268  } \
1269  } else { \
1270  if (chosen_bytes__ == sizeof(int32_t)) { \
1271  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1272  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1273  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1274  } else { \
1275  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1276  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1277  agg_##agg_kind__(val_ptr, *other_ptr); \
1278  } \
1279  } \
1280  } while (0)
1281 
1282 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1283  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1284  do { \
1285  if (agg_info__.skip_null_val) { \
1286  const auto sql_type = get_compact_type(agg_info__); \
1287  if (sql_type.is_fp()) { \
1288  if (chosen_bytes__ == sizeof(float)) { \
1289  agg_##agg_kind__##_float_skip_val( \
1290  reinterpret_cast<int32_t*>(val_ptr__), \
1291  *reinterpret_cast<const float*>(other_ptr__), \
1292  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1293  } else { \
1294  agg_##agg_kind__##_double_skip_val( \
1295  reinterpret_cast<int64_t*>(val_ptr__), \
1296  *reinterpret_cast<const double*>(other_ptr__), \
1297  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1298  } \
1299  } else { \
1300  if (chosen_bytes__ == sizeof(int32_t)) { \
1301  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1302  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1303  const auto null_val = static_cast<int32_t>(init_val__); \
1304  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1305  } else { \
1306  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1307  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1308  const auto null_val = static_cast<int64_t>(init_val__); \
1309  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1310  } \
1311  } \
1312  } else { \
1313  AGGREGATE_ONE_VALUE( \
1314  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1315  } \
1316  } while (0)
1317 
1318 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1319  do { \
1320  if (chosen_bytes__ == sizeof(int32_t)) { \
1321  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1322  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1323  agg_sum_int32(val_ptr, *other_ptr); \
1324  } else { \
1325  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1326  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1327  agg_sum(val_ptr, *other_ptr); \
1328  } \
1329  } while (0)
1330 
1331 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1332  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1333  { \
1334  if (agg_info__.skip_null_val) { \
1335  const auto sql_type = get_compact_type(agg_info__); \
1336  if (sql_type.is_fp()) { \
1337  if (chosen_bytes__ == sizeof(float)) { \
1338  agg_sum_float_skip_val( \
1339  reinterpret_cast<int32_t*>(val_ptr__), \
1340  *reinterpret_cast<const float*>(other_ptr__), \
1341  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1342  } else { \
1343  agg_sum_double_skip_val( \
1344  reinterpret_cast<int64_t*>(val_ptr__), \
1345  *reinterpret_cast<const double*>(other_ptr__), \
1346  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1347  } \
1348  } else { \
1349  if (chosen_bytes__ == sizeof(int32_t)) { \
1350  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1351  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1352  const auto null_val = static_cast<int32_t>(init_val__); \
1353  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1354  } else { \
1355  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1356  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1357  const auto null_val = static_cast<int64_t>(init_val__); \
1358  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1359  } \
1360  } \
1361  } else { \
1362  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1363  } \
1364  }
1365 
1366 // to be used for 8/16-bit kMIN and kMAX only
1367 #define AGGREGATE_ONE_VALUE_SMALL( \
1368  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1369  do { \
1370  if (chosen_bytes__ == sizeof(int16_t)) { \
1371  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1372  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1373  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1374  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1375  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1376  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1377  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1378  } else { \
1379  UNREACHABLE(); \
1380  } \
1381  } while (0)
1382 
1383 // to be used for 8/16-bit kMIN and kMAX only
1384 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1385  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1386  do { \
1387  if (agg_info__.skip_null_val) { \
1388  if (chosen_bytes__ == sizeof(int16_t)) { \
1389  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1390  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1391  const auto null_val = static_cast<int16_t>(init_val__); \
1392  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1393  } else if (chosen_bytes == sizeof(int8_t)) { \
1394  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1395  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1396  const auto null_val = static_cast<int8_t>(init_val__); \
1397  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1398  } \
1399  } else { \
1400  AGGREGATE_ONE_VALUE_SMALL( \
1401  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1402  } \
1403  } while (0)
1404 
1405 int8_t result_set::get_width_for_slot(const size_t target_slot_idx,
1406  const bool float_argument_input,
1408  if (float_argument_input) {
1409  return sizeof(float);
1410  }
1411  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1412 }
1413 
1415  const TargetInfo& target_info,
1416  const size_t target_slot_idx,
1417  const size_t init_agg_val_idx,
1418  const int8_t* that_ptr1) const {
1419  const bool float_argument_input = takes_float_argument(target_info);
1420  const auto chosen_bytes = result_set::get_width_for_slot(
1421  target_slot_idx, float_argument_input, query_mem_desc_);
1422  auto init_val = target_init_vals_[init_agg_val_idx];
1423 
1424  auto reduce = [&](auto const& size_tag) {
1425  using CastTarget = std::decay_t<decltype(size_tag)>;
1426  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1427  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1428  if (rhs_proj_col == init_val) {
1429  // ignore
1430  } else if (lhs_proj_col == init_val) {
1431  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1432  } else if (lhs_proj_col != rhs_proj_col) {
1433  throw std::runtime_error("Multiple distinct values encountered");
1434  }
1435  };
1436 
1437  switch (chosen_bytes) {
1438  case 1: {
1440  reduce(int8_t());
1441  break;
1442  }
1443  case 2: {
1445  reduce(int16_t());
1446  break;
1447  }
1448  case 4: {
1449  reduce(int32_t());
1450  break;
1451  }
1452  case 8: {
1453  CHECK(!target_info.sql_type.is_varlen());
1454  reduce(int64_t());
1455  break;
1456  }
1457  default:
1458  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1459  }
1460 }
1461 
1463  int8_t* this_ptr1,
1464  int8_t* this_ptr2,
1465  const int8_t* that_ptr1,
1466  const int8_t* that_ptr2,
1467  const TargetInfo& target_info,
1468  const size_t target_logical_idx,
1469  const size_t target_slot_idx,
1470  const size_t init_agg_val_idx,
1471  const ResultSetStorage& that,
1472  const size_t first_slot_idx_for_target,
1473  const std::vector<std::string>& serialized_varlen_buffer) const {
1475  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1476  return;
1477  }
1478  }
1479  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1480  const bool float_argument_input = takes_float_argument(target_info);
1481  const auto chosen_bytes = result_set::get_width_for_slot(
1482  target_slot_idx, float_argument_input, query_mem_desc_);
1483  int64_t init_val = target_init_vals_[init_agg_val_idx]; // skip_val for nullable types
1484 
1485  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1487  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1488  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1489  switch (target_info.agg_kind) {
1490  case kCOUNT:
1491  case kAPPROX_COUNT_DISTINCT: {
1492  if (is_distinct_target(target_info)) {
1493  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1494  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1495  break;
1496  }
1497  CHECK_EQ(int64_t(0), init_val);
1498  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1499  break;
1500  }
1501  case kAVG: {
1502  // Ignore float argument compaction for count component for fear of its overflow
1503  AGGREGATE_ONE_COUNT(this_ptr2,
1504  that_ptr2,
1505  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1506  }
1507  // fall thru
1508  case kSUM: {
1510  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1511  break;
1512  }
1513  case kMIN: {
1514  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1516  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1517  } else {
1519  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1520  }
1521  break;
1522  }
1523  case kMAX: {
1524  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1526  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1527  } else {
1529  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1530  }
1531  break;
1532  }
1533  case kAPPROX_QUANTILE:
1534  CHECK_EQ(static_cast<int8_t>(sizeof(int64_t)), chosen_bytes);
1535  reduceOneApproxQuantileSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1536  break;
1537  default:
1538  UNREACHABLE() << toString(target_info.agg_kind);
1539  }
1540  } else {
1541  switch (chosen_bytes) {
1542  case 1: {
1544  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1545  if (rhs_proj_col != init_val) {
1546  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1547  }
1548  break;
1549  }
1550  case 2: {
1552  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1553  if (rhs_proj_col != init_val) {
1554  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1555  }
1556  break;
1557  }
1558  case 4: {
1559  CHECK(target_info.agg_kind != kSAMPLE ||
1561  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1562  if (rhs_proj_col != init_val) {
1563  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1564  }
1565  break;
1566  }
1567  case 8: {
1568  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1569  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1570  !serialized_varlen_buffer.empty()) {
1571  size_t length_to_elems{0};
1572  if (target_info.sql_type.is_geometry()) {
1573  // TODO: Assumes hard-coded sizes for geometry targets
1574  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1575  } else {
1576  const auto& elem_ti = target_info.sql_type.get_elem_type();
1577  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1578  }
1579 
1580  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1581  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1582  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1583  *reinterpret_cast<int64_t*>(this_ptr1) =
1584  reinterpret_cast<const int64_t>(str_ptr);
1585  *reinterpret_cast<int64_t*>(this_ptr2) =
1586  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1587  } else {
1588  if (rhs_proj_col != init_val) {
1589  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1590  }
1591  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1592  CHECK(this_ptr2 && that_ptr2);
1593  *reinterpret_cast<int64_t*>(this_ptr2) =
1594  *reinterpret_cast<const int64_t*>(that_ptr2);
1595  }
1596  }
1597 
1598  break;
1599  }
1600  default:
1601  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1602  }
1603  }
1604 }
1605 
1607  const int8_t* that_ptr1,
1608  const size_t target_logical_idx,
1609  const ResultSetStorage& that) const {
1611  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1612  auto* incoming = *reinterpret_cast<quantile::TDigest* const*>(that_ptr1);
1613  CHECK(incoming) << "this_ptr1=" << (void*)this_ptr1
1614  << ", that_ptr1=" << (void const*)that_ptr1
1615  << ", target_logical_idx=" << target_logical_idx;
1616  if (incoming->centroids().capacity()) {
1617  auto* accumulator = *reinterpret_cast<quantile::TDigest**>(this_ptr1);
1618  CHECK(accumulator) << "this_ptr1=" << (void*)this_ptr1
1619  << ", that_ptr1=" << (void const*)that_ptr1
1620  << ", target_logical_idx=" << target_logical_idx;
1621  accumulator->allocate();
1622  accumulator->mergeTDigest(*incoming);
1623  }
1624 }
1625 
1627  const int8_t* that_ptr1,
1628  const size_t target_logical_idx,
1629  const ResultSetStorage& that) const {
1631  const auto& old_count_distinct_desc =
1632  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1633  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1634  const auto& new_count_distinct_desc =
1635  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1636  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1637  CHECK(this_ptr1 && that_ptr1);
1638  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1639  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1641  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1642 }
1643 
1644 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1645  const int8_t warp_count,
1646  const bool is_columnar,
1647  const bool replace_bitmap_ptr_with_bitmap_sz,
1648  std::vector<int64_t>& agg_vals,
1650  const std::vector<TargetInfo>& targets,
1651  const std::vector<int64_t>& agg_init_vals) {
1652  const size_t agg_col_count{agg_vals.size()};
1653  const auto row_size = query_mem_desc.getRowSize();
1654  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1655  CHECK_GE(agg_col_count, targets.size());
1656  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1657  CHECK(query_mem_desc.hasKeylessHash());
1658  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1659  bool discard_row = true;
1660  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1661  bool discard_partial_result = true;
1662  for (size_t target_idx = 0, agg_col_idx = 0;
1663  target_idx < targets.size() && agg_col_idx < agg_col_count;
1664  ++target_idx, ++agg_col_idx) {
1665  const auto& agg_info = targets[target_idx];
1666  const bool float_argument_input = takes_float_argument(agg_info);
1667  const auto chosen_bytes = float_argument_input
1668  ? sizeof(float)
1669  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1670  auto partial_bin_val = get_component(
1671  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1672  partial_agg_vals[agg_col_idx] = partial_bin_val;
1673  if (is_distinct_target(agg_info)) {
1674  CHECK_EQ(int8_t(1), warp_count);
1675  CHECK(agg_info.is_agg && (agg_info.agg_kind == kCOUNT ||
1676  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1677  partial_bin_val = count_distinct_set_size(
1678  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1679  if (replace_bitmap_ptr_with_bitmap_sz) {
1680  partial_agg_vals[agg_col_idx] = partial_bin_val;
1681  }
1682  }
1683  if (kAVG == agg_info.agg_kind) {
1684  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1685  ++agg_col_idx;
1686  partial_bin_val = partial_agg_vals[agg_col_idx] =
1687  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1688  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1689  }
1690  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1691  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1692  CHECK(agg_info.is_agg);
1693  discard_partial_result = false;
1694  }
1695  }
1696  row_ptr += row_size;
1697  if (discard_partial_result) {
1698  continue;
1699  }
1700  discard_row = false;
1701  for (size_t target_idx = 0, agg_col_idx = 0;
1702  target_idx < targets.size() && agg_col_idx < agg_col_count;
1703  ++target_idx, ++agg_col_idx) {
1704  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1705  const auto& agg_info = targets[target_idx];
1706  const bool float_argument_input = takes_float_argument(agg_info);
1707  const auto chosen_bytes = float_argument_input
1708  ? sizeof(float)
1709  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1710  const auto& chosen_type = get_compact_type(agg_info);
1711  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1712  try {
1713  switch (agg_info.agg_kind) {
1714  case kCOUNT:
1717  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1718  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1719  agg_init_vals[agg_col_idx],
1720  chosen_bytes,
1721  agg_info);
1722  break;
1723  case kAVG:
1724  // Ignore float argument compaction for count component for fear of its
1725  // overflow
1727  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1728  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1729  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1730  // fall thru
1731  case kSUM:
1733  sum,
1734  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1735  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1736  agg_init_vals[agg_col_idx],
1737  chosen_bytes,
1738  agg_info);
1739  break;
1740  case kMIN:
1741  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1743  min,
1744  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1745  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1746  agg_init_vals[agg_col_idx],
1747  chosen_bytes,
1748  agg_info);
1749  } else {
1751  min,
1752  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1753  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1754  agg_init_vals[agg_col_idx],
1755  chosen_bytes,
1756  agg_info);
1757  }
1758  break;
1759  case kMAX:
1760  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1762  max,
1763  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1764  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1765  agg_init_vals[agg_col_idx],
1766  chosen_bytes,
1767  agg_info);
1768  } else {
1770  max,
1771  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1772  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1773  agg_init_vals[agg_col_idx],
1774  chosen_bytes,
1775  agg_info);
1776  }
1777  break;
1778  default:
1779  CHECK(false);
1780  break;
1781  }
1782  } catch (std::runtime_error& e) {
1783  // TODO(miyu): handle the case where chosen_bytes < 8
1784  LOG(ERROR) << e.what();
1785  }
1786  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1787  switch (chosen_bytes) {
1788  case 8:
1789  break;
1790  case 4: {
1791  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1792  if (!(agg_info.agg_kind == kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1793  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1794  }
1795  break;
1796  }
1797  default:
1798  CHECK(false);
1799  }
1800  }
1801  if (kAVG == agg_info.agg_kind) {
1802  ++agg_col_idx;
1803  }
1804  } else {
1805  if (agg_info.agg_kind == kSAMPLE) {
1806  CHECK(!agg_info.sql_type.is_varlen())
1807  << "Interleaved bins reduction not supported for variable length "
1808  "arguments "
1809  "to SAMPLE";
1810  }
1811  if (agg_vals[agg_col_idx]) {
1812  if (agg_info.agg_kind == kSAMPLE) {
1813  continue;
1814  }
1815  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1816  } else {
1817  agg_vals[agg_col_idx] = partial_bin_val;
1818  }
1819  }
1820  }
1821  }
1822  return discard_row;
1823 }
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:231
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
#define EMPTY_KEY_64
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1351
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
SQLTypeInfo sql_type
Definition: TargetInfo.h:51
#define LOG(tag)
Definition: Logger.h:217
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:1000
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed)
bool is_varlen() const
Definition: sqltypes.h:536
void initializeStorage() const
#define UNREACHABLE()
Definition: Logger.h:267
#define CHECK_GE(x, y)
Definition: Logger.h:236
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void reduceOneApproxQuantileSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:157
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
#define CHECK_GT(x, y)
Definition: Logger.h:235
void rewriteVarlenAggregates(ResultSet *)
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:75
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:483
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
future< Result > async(Fn &&fn, Args &&...args)
bool is_agg
Definition: TargetInfo.h:49
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:77
void init(LogOptions const &log_opts)
Definition: Logger.cpp:306
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define store_cst(ptr, val)
size_t targetGroupbyIndicesSize() const
std::pair< int64_t *, bool > GroupValueInfo
Definition: sqldefs.h:77
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
#define LIKELY(x)
Definition: likely.h:24
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:153
static EvalValue MakeEvalValue(const T &val)
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1356
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad)
std::string toString(const Executor::ExtModuleKinds &kind)
Definition: Execute.h:1453
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code, const size_t executor_id) const
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
#define mapd_cas(address, compare, val)
SQLAgg agg_kind
Definition: TargetInfo.h:50
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:25
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer, const size_t executor_id) const
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:233
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:78
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
RUNTIME_EXPORT ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
const ColSlotContext & getColSlotContext() const
#define CHECK(condition)
Definition: Logger.h:223
bool is_geometry() const
Definition: sqltypes.h:522
#define EMPTY_KEY_32
static EvalValue run(const size_t execution_id, const Function *function, const std::vector< EvalValue > &inputs)
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
bool is_string() const
Definition: sqltypes.h:510
void run_reduction_code(const size_t executor_id, const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
Definition: sqldefs.h:76
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:24
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:865
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:74
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ResultSet * reduce(std::vector< ResultSet * > &, const size_t executor_id)
QueryMemoryDescriptor query_mem_desc_
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const