OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DynamicWatchdog.h"
24 #include "Execute.h"
25 #include "ResultSet.h"
27 #include "ResultSetReductionJIT.h"
28 #include "RuntimeFunctions.h"
29 #include "Shared/SqlTypesLayout.h"
30 #include "Shared/likely.h"
31 #include "Shared/thread_count.h"
32 
33 #include <llvm/ExecutionEngine/GenericValue.h>
34 
35 #include <algorithm>
36 #include <future>
37 #include <numeric>
38 
39 extern bool g_enable_dynamic_watchdog;
40 
41 namespace {
42 
43 bool use_multithreaded_reduction(const size_t entry_count) {
44  return entry_count > 100000;
45 }
46 
48  const auto row_bytes = get_row_bytes(query_mem_desc);
49  CHECK_EQ(size_t(0), row_bytes % 8);
50  return row_bytes / 8;
51 }
52 
53 std::vector<int64_t> make_key(const int64_t* buff,
54  const size_t entry_count,
55  const size_t key_count) {
56  std::vector<int64_t> key;
57  size_t off = 0;
58  for (size_t i = 0; i < key_count; ++i) {
59  key.push_back(buff[off]);
60  off += entry_count;
61  }
62  return key;
63 }
64 
65 void fill_slots(int64_t* dst_entry,
66  const size_t dst_entry_count,
67  const int64_t* src_buff,
68  const size_t src_entry_idx,
69  const size_t src_entry_count,
71  const auto slot_count = query_mem_desc.getBufferColSlotCount();
72  const auto key_count = query_mem_desc.getGroupbyColCount();
73  if (query_mem_desc.didOutputColumnar()) {
74  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
75  ++i, dst_slot_off += dst_entry_count) {
76  dst_entry[dst_slot_off] =
77  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
78  }
79  } else {
80  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
81  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
82  for (size_t i = 0; i < slot_count; ++i) {
83  dst_entry[i] = row_ptr[slot_off_quad + i];
84  }
85  }
86 }
87 
89 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
90  for (size_t i = 0; i < key_count; ++i) {
91  key_ptr_i32[i] = EMPTY_KEY_32;
92  }
93 }
94 
96 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
97  for (size_t i = 0; i < key_count; ++i) {
98  key_ptr_i64[i] = EMPTY_KEY_64;
99  }
100 }
101 
102 inline int64_t get_component(const int8_t* group_by_buffer,
103  const size_t comp_sz,
104  const size_t index = 0) {
105  int64_t ret = std::numeric_limits<int64_t>::min();
106  switch (comp_sz) {
107  case 1: {
108  ret = group_by_buffer[index];
109  break;
110  }
111  case 2: {
112  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
113  ret = buffer_ptr[index];
114  break;
115  }
116  case 4: {
117  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
118  ret = buffer_ptr[index];
119  break;
120  }
121  case 8: {
122  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
123  ret = buffer_ptr[index];
124  break;
125  }
126  default:
127  CHECK(false);
128  }
129  return ret;
130 }
131 
132 void run_reduction_code(const size_t executor_id,
133  const ReductionCode& reduction_code,
134  int8_t* this_buff,
135  const int8_t* that_buff,
136  const int32_t start_entry_index,
137  const int32_t end_entry_index,
138  const int32_t that_entry_count,
139  const void* this_qmd,
140  const void* that_qmd,
141  const void* serialized_varlen_buffer) {
142  int err = 0;
143  if (reduction_code.func_ptr) {
144  err = reduction_code.func_ptr(this_buff,
145  that_buff,
146  start_entry_index,
147  end_entry_index,
148  that_entry_count,
149  this_qmd,
150  that_qmd,
151  serialized_varlen_buffer);
152  } else {
153  auto ret = ReductionInterpreter::run(
154  executor_id,
155  reduction_code.ir_reduce_loop.get(),
158  ReductionInterpreter::MakeEvalValue(start_entry_index),
159  ReductionInterpreter::MakeEvalValue(end_entry_index),
160  ReductionInterpreter::MakeEvalValue(that_entry_count),
163  ReductionInterpreter::MakeEvalValue(serialized_varlen_buffer)});
164  err = ret.int_val;
165  }
166  if (err) {
168  throw std::runtime_error("Multiple distinct values encountered");
169  }
170  if (err == Executor::ERR_INTERRUPTED) {
171  throw std::runtime_error(
172  "Query execution has interrupted during result set reduction");
173  }
174  throw std::runtime_error(
175  "Query execution has exceeded the time limit or was interrupted during result "
176  "set reduction");
177  }
178 }
179 
180 } // namespace
181 
182 void result_set::fill_empty_key(void* key_ptr,
183  const size_t key_count,
184  const size_t key_width) {
185  switch (key_width) {
186  case 4: {
187  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
188  fill_empty_key_32(key_ptr_i32, key_count);
189  break;
190  }
191  case 8: {
192  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
193  fill_empty_key_64(key_ptr_i64, key_count);
194  break;
195  }
196  default:
197  CHECK(false);
198  }
199 }
200 
201 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
202 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
204  const std::vector<std::string>& serialized_varlen_buffer,
205  const ReductionCode& reduction_code,
206  const size_t executor_id) const {
207  auto entry_count = query_mem_desc_.getEntryCount();
208  CHECK_GT(entry_count, size_t(0));
216  }
217  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
220  CHECK_GE(entry_count, that_entry_count);
221  break;
222  default:
223  CHECK_EQ(entry_count, that_entry_count);
224  }
225  auto this_buff = buff_;
226  CHECK(this_buff);
227  auto that_buff = that.buff_;
228  CHECK(that_buff);
231  if (!serialized_varlen_buffer.empty()) {
232  throw std::runtime_error(
233  "Projection of variable length targets with baseline hash group by is not yet "
234  "supported in Distributed mode");
235  }
236  if (use_multithreaded_reduction(that_entry_count)) {
237  const size_t thread_count = cpu_threads();
238  std::vector<std::future<void>> reduction_threads;
239  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
240  const auto thread_entry_count =
241  (that_entry_count + thread_count - 1) / thread_count;
242  const auto start_index = thread_idx * thread_entry_count;
243  const auto end_index =
244  std::min(start_index + thread_entry_count, that_entry_count);
245  reduction_threads.emplace_back(std::async(
247  [this,
248  this_buff,
249  that_buff,
250  start_index,
251  end_index,
252  that_entry_count,
253  executor_id,
254  &reduction_code,
255  &that] {
256  if (reduction_code.ir_reduce_loop) {
257  run_reduction_code(executor_id,
258  reduction_code,
259  this_buff,
260  that_buff,
261  start_index,
262  end_index,
263  that_entry_count,
265  &that.query_mem_desc_,
266  nullptr);
267  } else {
268  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
270  this_buff, that_buff, entry_idx, that_entry_count, that);
271  }
272  }
273  }));
274  }
275  for (auto& reduction_thread : reduction_threads) {
276  reduction_thread.wait();
277  }
278  for (auto& reduction_thread : reduction_threads) {
279  reduction_thread.get();
280  }
281  } else {
282  if (reduction_code.ir_reduce_loop) {
283  run_reduction_code(executor_id,
284  reduction_code,
285  this_buff,
286  that_buff,
287  0,
288  that_entry_count,
289  that_entry_count,
291  &that.query_mem_desc_,
292  nullptr);
293  } else {
294  for (size_t i = 0; i < that_entry_count; ++i) {
295  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
296  }
297  }
298  }
299  return;
300  }
301  if (use_multithreaded_reduction(entry_count)) {
302  const size_t thread_count = cpu_threads();
303  std::vector<std::future<void>> reduction_threads;
304  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
305  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
306  const auto start_index = thread_idx * thread_entry_count;
307  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
308  if (query_mem_desc_.didOutputColumnar()) {
309  reduction_threads.emplace_back(std::async(std::launch::async,
310  [this,
311  this_buff,
312  that_buff,
313  start_index,
314  end_index,
315  &that,
316  &serialized_varlen_buffer,
317  &executor_id] {
318  reduceEntriesNoCollisionsColWise(
319  this_buff,
320  that_buff,
321  that,
322  start_index,
323  end_index,
324  serialized_varlen_buffer,
325  executor_id);
326  }));
327  } else {
328  reduction_threads.emplace_back(std::async(std::launch::async,
329  [this,
330  this_buff,
331  that_buff,
332  start_index,
333  end_index,
334  that_entry_count,
335  executor_id,
336  &reduction_code,
337  &that,
338  &serialized_varlen_buffer] {
339  CHECK(reduction_code.ir_reduce_loop);
341  executor_id,
342  reduction_code,
343  this_buff,
344  that_buff,
345  start_index,
346  end_index,
347  that_entry_count,
348  &query_mem_desc_,
349  &that.query_mem_desc_,
350  &serialized_varlen_buffer);
351  }));
352  }
353  }
354  for (auto& reduction_thread : reduction_threads) {
355  reduction_thread.wait();
356  }
357  for (auto& reduction_thread : reduction_threads) {
358  reduction_thread.get();
359  }
360  } else {
361  if (query_mem_desc_.didOutputColumnar()) {
362  reduceEntriesNoCollisionsColWise(this_buff,
363  that_buff,
364  that,
365  0,
366  query_mem_desc_.getEntryCount(),
367  serialized_varlen_buffer,
368  executor_id);
369  } else {
370  CHECK(reduction_code.ir_reduce_loop);
371  run_reduction_code(executor_id,
372  reduction_code,
373  this_buff,
374  that_buff,
375  0,
376  entry_count,
377  that_entry_count,
378  &query_mem_desc_,
379  &that.query_mem_desc_,
380  &serialized_varlen_buffer);
381  }
382  }
383 }
384 
385 namespace {
386 
388  if (UNLIKELY(dynamic_watchdog())) {
389  // TODO(alex): distinguish between the deadline and interrupt
390  throw std::runtime_error(
391  "Query execution has exceeded the time limit or was interrupted during result "
392  "set reduction");
393  }
394 }
395 
396 ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed) {
397  if (UNLIKELY((sample_seed & 0x3F) == 0 && dynamic_watchdog())) {
398  // TODO(alex): distinguish between the deadline and interrupt
399  throw std::runtime_error(
400  "Query execution has exceeded the time limit or was interrupted during result "
401  "set reduction");
402  }
403 }
404 
405 } // namespace
406 
408  int8_t* this_buff,
409  const int8_t* that_buff,
410  const ResultSetStorage& that,
411  const size_t start_index,
412  const size_t end_index,
413  const std::vector<std::string>& serialized_varlen_buffer,
414  const size_t executor_id) const {
415  // TODO(adb / saman): Support column wise output when serializing distributed agg
416  // functions
417  CHECK(serialized_varlen_buffer.empty());
418 
419  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
420 
421  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
422  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
423  auto executor = Executor::getExecutor(executor_id);
424  CHECK(executor);
425  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
426  const auto& agg_info = targets_[target_idx];
427  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
428 
429  bool two_slot_target{false};
430  if (agg_info.is_agg &&
431  (agg_info.agg_kind == kAVG ||
432  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
433  // Note that this assumes if one of the slot pairs in a given target is an array,
434  // all slot pairs are arrays. Currently this is true for all geo targets, but we
435  // should better codify and store this information in the future
436  two_slot_target = true;
437  }
439  executor->checkNonKernelTimeInterrupted())) {
440  throw std::runtime_error(
441  "Query execution was interrupted during result set reduction");
442  }
444  check_watchdog();
445  }
446  for (size_t target_slot_idx = slots_for_col.front();
447  target_slot_idx < slots_for_col.back() + 1;
448  target_slot_idx += 2) {
449  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
450  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
451  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
452  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
453  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
454  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
455  continue;
456  }
458  // copy the key from right hand side
459  copyKeyColWise(entry_idx, this_buff, that_buff);
460  }
461  auto this_ptr1 =
462  this_crt_col_ptr +
463  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
464  auto that_ptr1 =
465  that_crt_col_ptr +
466  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
467  int8_t* this_ptr2{nullptr};
468  const int8_t* that_ptr2{nullptr};
469  if (UNLIKELY(two_slot_target)) {
470  this_ptr2 =
471  this_next_col_ptr +
472  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
473  that_ptr2 =
474  that_next_col_ptr +
475  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
476  }
477  reduceOneSlot(this_ptr1,
478  this_ptr2,
479  that_ptr1,
480  that_ptr2,
481  agg_info,
482  target_idx,
483  target_slot_idx,
484  target_slot_idx,
485  that,
486  slots_for_col.front(),
487  serialized_varlen_buffer);
488  }
489 
490  this_crt_col_ptr = this_next_col_ptr;
491  that_crt_col_ptr = that_next_col_ptr;
492  if (UNLIKELY(two_slot_target)) {
493  this_crt_col_ptr = advance_to_next_columnar_target_buff(
494  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
495  that_crt_col_ptr = advance_to_next_columnar_target_buff(
496  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
497  }
498  }
499  }
500 }
501 
502 /*
503  * copy all keys from the columnar prepended group buffer of "that_buff" into
504  * "this_buff"
505  */
506 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
507  int8_t* this_buff,
508  const int8_t* that_buff) const {
510  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
511  group_idx++) {
512  // if the column corresponds to a group key
513  const auto column_offset_bytes =
515  auto lhs_key_ptr = this_buff + column_offset_bytes;
516  auto rhs_key_ptr = that_buff + column_offset_bytes;
517  switch (query_mem_desc_.groupColWidth(group_idx)) {
518  case 8:
519  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
520  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
521  break;
522  case 4:
523  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
524  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
525  break;
526  case 2:
527  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
528  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
529  break;
530  case 1:
531  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
532  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
533  break;
534  default:
535  CHECK(false);
536  break;
537  }
538  }
539 }
540 
541 // Rewrites the entries of this ResultSetStorage object to point directly into the
542 // serialized_varlen_buffer rather than using offsets.
544  const std::vector<std::string>& serialized_varlen_buffer) const {
545  if (serialized_varlen_buffer.empty()) {
546  return;
547  }
548 
550  auto entry_count = query_mem_desc_.getEntryCount();
551  CHECK_GT(entry_count, size_t(0));
552  CHECK(buff_);
553 
554  // Row-wise iteration, consider moving to separate function
555  for (size_t i = 0; i < entry_count; ++i) {
556  if (isEmptyEntry(i, buff_)) {
557  continue;
558  }
559  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
560  const auto key_bytes_with_padding = align_to_int64(key_bytes);
561  auto rowwise_targets_ptr =
562  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
563  size_t target_slot_idx = 0;
564  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
565  ++target_logical_idx) {
566  const auto& target_info = targets_[target_logical_idx];
567  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
568  CHECK(target_info.agg_kind == kSAMPLE);
569  auto ptr1 = rowwise_targets_ptr;
570  auto slot_idx = target_slot_idx;
571  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
572  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
573 
574  const auto& elem_ti = target_info.sql_type.get_elem_type();
575  size_t length_to_elems =
576  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
577  ? 1
578  : elem_ti.get_size();
579  if (target_info.sql_type.is_geometry()) {
580  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
581  if (j > 0) {
582  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
583  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
584  slot_idx += 2;
585  length_to_elems = 4;
586  }
587  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
588  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
589  const auto str_ptr =
590  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
591  CHECK(ptr1);
592  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
593  CHECK(ptr2);
594  *reinterpret_cast<int64_t*>(ptr2) =
595  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
596  }
597  } else {
598  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
599  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
600  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
601  CHECK(ptr1);
602  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
603  CHECK(ptr2);
604  *reinterpret_cast<int64_t*>(ptr2) =
605  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
606  }
607  }
608 
609  rowwise_targets_ptr = advance_target_ptr_row_wise(
610  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
611  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
612  }
613  }
614 
615  return;
616 }
617 
618 namespace {
619 
620 #ifdef _MSC_VER
621 #define mapd_cas(address, compare, val) \
622  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
623  static_cast<long>(val), \
624  static_cast<long>(compare))
625 #else
626 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
627 #endif
628 
630  const uint32_t h,
631  const int64_t* key,
632  const uint32_t key_qw_count,
633  const size_t entry_count) {
634  auto off = h;
635  const auto old_key = mapd_cas(&groups_buffer[off], EMPTY_KEY_64, *key);
636  if (old_key == EMPTY_KEY_64) {
637  for (size_t i = 0; i < key_qw_count; ++i) {
638  groups_buffer[off] = key[i];
639  off += entry_count;
640  }
641  return {&groups_buffer[off], true};
642  }
643  off = h;
644  for (size_t i = 0; i < key_qw_count; ++i) {
645  if (groups_buffer[off] != key[i]) {
646  return {nullptr, true};
647  }
648  off += entry_count;
649  }
650  return {&groups_buffer[off], false};
651 }
652 
653 #undef mapd_cas
654 
655 // TODO(alex): fix synchronization when we enable it
657  int64_t* groups_buffer,
658  const uint32_t groups_buffer_entry_count,
659  const int64_t* key,
660  const uint32_t key_qw_count) {
661  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
663  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
664  if (matching_gvi.first) {
665  return matching_gvi;
666  }
667  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
668  while (h_probe != h) {
670  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
671  if (matching_gvi.first) {
672  return matching_gvi;
673  }
674  h_probe = (h_probe + 1) % groups_buffer_entry_count;
675  }
676  return {nullptr, true};
677 }
678 
679 #ifdef _MSC_VER
680 #define cas_cst(ptr, expected, desired) \
681  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
682  reinterpret_cast<void*>(&desired), \
683  expected) == expected)
684 #define store_cst(ptr, val) \
685  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
686  reinterpret_cast<void*>(val))
687 #define load_cst(ptr) \
688  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
689 #else
690 #define cas_cst(ptr, expected, desired) \
691  __atomic_compare_exchange_n( \
692  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
693 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
694 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
695 #endif
696 
697 template <typename T = int64_t>
699  int64_t* groups_buffer,
700  const uint32_t h,
701  const T* key,
702  const uint32_t key_count,
704  const int64_t* that_buff_i64,
705  const size_t that_entry_idx,
706  const size_t that_entry_count,
707  const uint32_t row_size_quad) {
708  auto off = h * row_size_quad;
709  T empty_key = get_empty_key<T>();
710  T write_pending = get_empty_key<T>() - 1;
711  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
712  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
713  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
714  if (success) {
715  fill_slots(groups_buffer + off + slot_off_quad,
716  query_mem_desc.getEntryCount(),
717  that_buff_i64,
718  that_entry_idx,
719  that_entry_count,
721  if (key_count > 1) {
722  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
723  }
724  store_cst(row_ptr, *key);
725  return {groups_buffer + off + slot_off_quad, true};
726  }
727  while (load_cst(row_ptr) == write_pending) {
728  // spin until the winning thread has finished writing the entire key and the init
729  // value
730  }
731  for (size_t i = 0; i < key_count; ++i) {
732  if (load_cst(row_ptr + i) != key[i]) {
733  return {nullptr, true};
734  }
735  }
736  return {groups_buffer + off + slot_off_quad, false};
737 }
738 
739 #undef load_cst
740 #undef store_cst
741 #undef cas_cst
742 
744  int64_t* groups_buffer,
745  const uint32_t h,
746  const int64_t* key,
747  const uint32_t key_count,
748  const size_t key_width,
750  const int64_t* that_buff_i64,
751  const size_t that_entry_idx,
752  const size_t that_entry_count,
753  const uint32_t row_size_quad) {
754  switch (key_width) {
755  case 4:
756  return get_matching_group_value_reduction(groups_buffer,
757  h,
758  reinterpret_cast<const int32_t*>(key),
759  key_count,
760  query_mem_desc,
761  that_buff_i64,
762  that_entry_idx,
763  that_entry_count,
764  row_size_quad);
765  case 8:
766  return get_matching_group_value_reduction(groups_buffer,
767  h,
768  key,
769  key_count,
770  query_mem_desc,
771  that_buff_i64,
772  that_entry_idx,
773  that_entry_count,
774  row_size_quad);
775  default:
776  CHECK(false);
777  return {nullptr, true};
778  }
779 }
780 
781 } // namespace
782 
784  int64_t* groups_buffer,
785  const uint32_t groups_buffer_entry_count,
786  const int64_t* key,
787  const uint32_t key_count,
788  const size_t key_width,
790  const int64_t* that_buff_i64,
791  const size_t that_entry_idx,
792  const size_t that_entry_count,
793  const uint32_t row_size_quad) {
794  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
795  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
796  h,
797  key,
798  key_count,
799  key_width,
800  query_mem_desc,
801  that_buff_i64,
802  that_entry_idx,
803  that_entry_count,
804  row_size_quad);
805  if (matching_gvi.first) {
806  return matching_gvi;
807  }
808  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
809  while (h_probe != h) {
810  matching_gvi = get_matching_group_value_reduction(groups_buffer,
811  h_probe,
812  key,
813  key_count,
814  key_width,
815  query_mem_desc,
816  that_buff_i64,
817  that_entry_idx,
818  that_entry_count,
819  row_size_quad);
820  if (matching_gvi.first) {
821  return matching_gvi;
822  }
823  h_probe = (h_probe + 1) % groups_buffer_entry_count;
824  }
825  return {nullptr, true};
826 }
827 
828 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
829 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
831  const int8_t* that_buff,
832  const size_t that_entry_idx,
833  const size_t that_entry_count,
834  const ResultSetStorage& that) const {
836  check_watchdog_with_seed(that_entry_idx);
837  }
838  const auto key_count = query_mem_desc_.getGroupbyColCount();
843  const auto key_off =
845  if (isEmptyEntry(that_entry_idx, that_buff)) {
846  return;
847  }
848  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
849  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
850  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
851  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
852  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
853  CHECK(this_entry_slots);
854  if (empty_entry) {
855  fill_slots(this_entry_slots,
857  that_buff_i64,
858  that_entry_idx,
859  that_entry_count,
861  return;
862  }
864  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
865 }
866 
867 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
868  const int64_t* that_buff,
869  const size_t that_entry_idx,
870  const size_t that_entry_count,
871  const ResultSetStorage& that) const {
873  const auto key_count = query_mem_desc_.getGroupbyColCount();
874  size_t j = 0;
875  size_t init_agg_val_idx = 0;
876  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
877  ++target_logical_idx) {
878  const auto& target_info = targets_[target_logical_idx];
879  const auto that_slot_off = slot_offset_colwise(
880  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
881  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
882  reduceOneSlotBaseline(this_entry_slots,
883  this_slot_off,
884  that_buff,
885  that_entry_count,
886  that_slot_off,
887  target_info,
888  target_logical_idx,
889  j,
890  init_agg_val_idx,
891  that);
893  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
894  } else {
895  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
896  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
897  }
898  }
899  j = advance_slot(j, target_info, false);
900  }
901 }
902 
904  const size_t this_slot,
905  const int64_t* that_buff,
906  const size_t that_entry_count,
907  const size_t that_slot,
908  const TargetInfo& target_info,
909  const size_t target_logical_idx,
910  const size_t target_slot_idx,
911  const size_t init_agg_val_idx,
912  const ResultSetStorage& that) const {
914  int8_t* this_ptr2{nullptr};
915  const int8_t* that_ptr2{nullptr};
916  if (target_info.is_agg &&
917  (target_info.agg_kind == kAVG ||
918  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
919  const auto this_count_off = query_mem_desc_.getEntryCount();
920  const auto that_count_off = that_entry_count;
921  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
922  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
923  }
924  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
925  this_ptr2,
926  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
927  that_ptr2,
928  target_info,
929  target_logical_idx,
930  target_slot_idx,
931  init_agg_val_idx,
932  that,
933  target_slot_idx, // dummy, for now
934  {});
935 }
936 
937 // During the reduction of two result sets using the baseline strategy, we first create a
938 // big enough buffer to hold the entries for both and we move the entries from the first
939 // into it before doing the reduction as usual (into the first buffer).
940 template <class KeyType>
942  const size_t new_entry_count) const {
944  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
945  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
946  const auto key_count = query_mem_desc_.getGroupbyColCount();
949  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
950  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
951  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
952 
954  const size_t thread_count = cpu_threads();
955  std::vector<std::future<void>> move_threads;
956 
957  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
958  const auto thread_entry_count =
959  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
960  const auto start_index = thread_idx * thread_entry_count;
961  const auto end_index =
962  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
963  move_threads.emplace_back(std::async(
965  [this,
966  src_buff,
967  new_buff_i64,
968  new_entry_count,
969  start_index,
970  end_index,
971  key_count,
972  row_qw_count,
973  key_byte_width] {
974  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
975  moveOneEntryToBuffer<KeyType>(entry_idx,
976  new_buff_i64,
977  new_entry_count,
978  key_count,
979  row_qw_count,
980  src_buff,
981  key_byte_width);
982  }
983  }));
984  }
985  for (auto& move_thread : move_threads) {
986  move_thread.wait();
987  }
988  for (auto& move_thread : move_threads) {
989  move_thread.get();
990  }
991  } else {
992  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
993  moveOneEntryToBuffer<KeyType>(entry_idx,
994  new_buff_i64,
995  new_entry_count,
996  key_count,
997  row_qw_count,
998  src_buff,
999  key_byte_width);
1000  }
1001  }
1002 }
1003 
1004 template <class KeyType>
1005 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
1006  int64_t* new_buff_i64,
1007  const size_t new_entry_count,
1008  const size_t key_count,
1009  const size_t row_qw_count,
1010  const int64_t* src_buff,
1011  const size_t key_byte_width) const {
1012  const auto key_off =
1014  ? key_offset_colwise(entry_index, 0, query_mem_desc_.getEntryCount())
1015  : row_qw_count * entry_index;
1016  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
1017  if (*key_ptr == get_empty_key<KeyType>()) {
1018  return;
1019  }
1020  int64_t* new_entries_ptr{nullptr};
1022  const auto key =
1023  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
1024  new_entries_ptr =
1025  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
1026  } else {
1027  new_entries_ptr = get_group_value(new_buff_i64,
1028  new_entry_count,
1029  &src_buff[key_off],
1030  key_count,
1031  key_byte_width,
1032  row_qw_count);
1033  }
1034  CHECK(new_entries_ptr);
1035  fill_slots(new_entries_ptr,
1036  new_entry_count,
1037  src_buff,
1038  entry_index,
1040  query_mem_desc_);
1041 }
1042 
1044  if (query_mem_desc_.didOutputColumnar()) {
1045  storage_->initializeColWise();
1046  } else {
1047  storage_->initializeRowWise();
1048  }
1049 }
1050 
1051 // Driver for reductions. Needed because the result of a reduction on the baseline
1052 // layout, which can have collisions, cannot be done in place and something needs
1053 // to take the ownership of the new result set with the bigger underlying buffer.
1054 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets,
1055  const size_t executor_id) {
1056  CHECK(!result_sets.empty());
1057  auto result_rs = result_sets.front();
1058  CHECK(result_rs->storage_);
1059  auto& first_result = *result_rs->storage_;
1060  auto result = &first_result;
1061  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1062  for (const auto result_set : result_sets) {
1063  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1064  }
1065  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1067  const auto total_entry_count =
1068  std::accumulate(result_sets.begin(),
1069  result_sets.end(),
1070  size_t(0),
1071  [](const size_t init, const ResultSet* rs) {
1072  return init + rs->query_mem_desc_.getEntryCount();
1073  });
1074  CHECK(total_entry_count);
1075  auto query_mem_desc = first_result.query_mem_desc_;
1076  query_mem_desc.setEntryCount(total_entry_count);
1077  rs_.reset(new ResultSet(first_result.targets_,
1080  row_set_mem_owner,
1081  0,
1082  0));
1083  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1084  rs_->initializeStorage();
1085  switch (query_mem_desc.getEffectiveKeyWidth()) {
1086  case 4:
1087  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1088  query_mem_desc.getEntryCount());
1089  break;
1090  case 8:
1091  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1092  query_mem_desc.getEntryCount());
1093  break;
1094  default:
1095  CHECK(false);
1096  }
1097  result = rs_->storage_.get();
1098  result_rs = rs_.get();
1099  }
1100 
1101  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1102  if (!serialized_varlen_buffer.empty()) {
1103  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1104  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1105  ++result_it) {
1106  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1107  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1108  serialized_varlen_buffer.emplace_back(
1109  std::move(result_serialized_varlen_buffer.front()));
1110  }
1111  }
1112 
1113  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1114  result_rs->getTargetInfos(),
1115  result_rs->getTargetInitVals(),
1116  executor_id);
1117  auto reduction_code = reduction_jit.codegen();
1118  size_t ctr = 1;
1119  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1120  ++result_it) {
1121  if (!serialized_varlen_buffer.empty()) {
1122  result->reduce(*((*result_it)->storage_),
1123  serialized_varlen_buffer[ctr++],
1124  reduction_code,
1125  executor_id);
1126  } else {
1127  result->reduce(*((*result_it)->storage_), {}, reduction_code, executor_id);
1128  }
1129  }
1130  return result_rs;
1131 }
1132 
1133 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1134  return rs_;
1135 }
1136 
1137 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1138  auto& result_storage = result_rs->storage_;
1139  result_storage->rewriteAggregateBufferOffsets(
1140  result_rs->serialized_varlen_buffer_.front());
1141 }
1142 
1143 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1144  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1145  const auto key_count = query_mem_desc_.getGroupbyColCount();
1146  CHECK_EQ(slot_count + key_count, entry.size());
1147  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1149  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1150  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1151  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1152  for (size_t i = 0; i < key_count; ++i) {
1153  this_buff[key_off + i] = entry[i];
1154  }
1155  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1156  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1157  this_buff[first_slot_off + i] = entry[key_count + i];
1158  }
1159 }
1160 
1162  const auto key_count = query_mem_desc_.getGroupbyColCount();
1163  const auto row_size = get_row_bytes(query_mem_desc_);
1164  CHECK_EQ(row_size % 8, 0u);
1165  const auto key_bytes_with_padding =
1169  case 4: {
1170  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1171  auto row_ptr = buff_ + i * row_size;
1172  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1173  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1174  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1175  slot_ptr[j] = target_init_vals_[j];
1176  }
1177  }
1178  break;
1179  }
1180  case 8: {
1181  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1182  auto row_ptr = buff_ + i * row_size;
1183  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1184  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1185  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1186  slot_ptr[j] = target_init_vals_[j];
1187  }
1188  }
1189  break;
1190  }
1191  default:
1192  CHECK(false);
1193  }
1194 }
1195 
1196 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1198  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1199  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1200  const auto key_count = query_mem_desc_.getGroupbyColCount();
1201  CHECK_EQ(slot_count + key_count, entry.size());
1202  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1203 
1204  for (size_t i = 0; i < key_count; i++) {
1205  const auto key_offset = key_offset_colwise(0, i, 1);
1206  this_buff[key_offset] = entry[i];
1207  }
1208 
1209  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1210  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1211  this_buff[slot_offset] = entry[key_count + i];
1212  }
1213 }
1214 
1216  const auto key_count = query_mem_desc_.getGroupbyColCount();
1217  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1219  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1220  const auto first_key_off =
1222  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1223  this_buff[first_key_off + i] = EMPTY_KEY_64;
1224  }
1225  }
1226  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1227  const auto first_val_off =
1228  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1229  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1230  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1231  }
1232  }
1233 }
1234 
1235 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1236  CHECK(entry_slots);
1238  size_t slot_off = 0;
1239  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1240  entry_slots[slot_off] = target_init_vals_[j];
1241  slot_off += query_mem_desc_.getEntryCount();
1242  }
1243  } else {
1244  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1245  entry_slots[j] = target_init_vals_[j];
1246  }
1247  }
1248 }
1249 
1250 #define AGGREGATE_ONE_VALUE( \
1251  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1252  do { \
1253  const auto sql_type = get_compact_type(agg_info__); \
1254  if (sql_type.is_fp()) { \
1255  if (chosen_bytes__ == sizeof(float)) { \
1256  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1257  *reinterpret_cast<const float*>(other_ptr__)); \
1258  } else { \
1259  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1260  *reinterpret_cast<const double*>(other_ptr__)); \
1261  } \
1262  } else { \
1263  if (chosen_bytes__ == sizeof(int32_t)) { \
1264  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1265  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1266  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1267  } else { \
1268  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1269  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1270  agg_##agg_kind__(val_ptr, *other_ptr); \
1271  } \
1272  } \
1273  } while (0)
1274 
1275 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1276  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1277  do { \
1278  if (agg_info__.skip_null_val) { \
1279  const auto sql_type = get_compact_type(agg_info__); \
1280  if (sql_type.is_fp()) { \
1281  if (chosen_bytes__ == sizeof(float)) { \
1282  agg_##agg_kind__##_float_skip_val( \
1283  reinterpret_cast<int32_t*>(val_ptr__), \
1284  *reinterpret_cast<const float*>(other_ptr__), \
1285  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1286  } else { \
1287  agg_##agg_kind__##_double_skip_val( \
1288  reinterpret_cast<int64_t*>(val_ptr__), \
1289  *reinterpret_cast<const double*>(other_ptr__), \
1290  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1291  } \
1292  } else { \
1293  if (chosen_bytes__ == sizeof(int32_t)) { \
1294  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1295  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1296  const auto null_val = static_cast<int32_t>(init_val__); \
1297  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1298  } else { \
1299  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1300  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1301  const auto null_val = static_cast<int64_t>(init_val__); \
1302  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1303  } \
1304  } \
1305  } else { \
1306  AGGREGATE_ONE_VALUE( \
1307  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1308  } \
1309  } while (0)
1310 
1311 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1312  do { \
1313  if (chosen_bytes__ == sizeof(int32_t)) { \
1314  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1315  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1316  agg_sum_int32(val_ptr, *other_ptr); \
1317  } else { \
1318  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1319  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1320  agg_sum(val_ptr, *other_ptr); \
1321  } \
1322  } while (0)
1323 
1324 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1325  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1326  { \
1327  if (agg_info__.skip_null_val) { \
1328  const auto sql_type = get_compact_type(agg_info__); \
1329  if (sql_type.is_fp()) { \
1330  if (chosen_bytes__ == sizeof(float)) { \
1331  agg_sum_float_skip_val( \
1332  reinterpret_cast<int32_t*>(val_ptr__), \
1333  *reinterpret_cast<const float*>(other_ptr__), \
1334  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1335  } else { \
1336  agg_sum_double_skip_val( \
1337  reinterpret_cast<int64_t*>(val_ptr__), \
1338  *reinterpret_cast<const double*>(other_ptr__), \
1339  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1340  } \
1341  } else { \
1342  if (chosen_bytes__ == sizeof(int32_t)) { \
1343  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1344  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1345  const auto null_val = static_cast<int32_t>(init_val__); \
1346  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1347  } else { \
1348  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1349  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1350  const auto null_val = static_cast<int64_t>(init_val__); \
1351  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1352  } \
1353  } \
1354  } else { \
1355  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1356  } \
1357  }
1358 
1359 // to be used for 8/16-bit kMIN and kMAX only
1360 #define AGGREGATE_ONE_VALUE_SMALL( \
1361  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1362  do { \
1363  if (chosen_bytes__ == sizeof(int16_t)) { \
1364  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1365  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1366  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1367  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1368  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1369  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1370  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1371  } else { \
1372  UNREACHABLE(); \
1373  } \
1374  } while (0)
1375 
1376 // to be used for 8/16-bit kMIN and kMAX only
1377 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1378  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1379  do { \
1380  if (agg_info__.skip_null_val) { \
1381  if (chosen_bytes__ == sizeof(int16_t)) { \
1382  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1383  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1384  const auto null_val = static_cast<int16_t>(init_val__); \
1385  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1386  } else if (chosen_bytes == sizeof(int8_t)) { \
1387  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1388  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1389  const auto null_val = static_cast<int8_t>(init_val__); \
1390  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1391  } \
1392  } else { \
1393  AGGREGATE_ONE_VALUE_SMALL( \
1394  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1395  } \
1396  } while (0)
1397 
1398 int8_t result_set::get_width_for_slot(const size_t target_slot_idx,
1399  const bool float_argument_input,
1401  if (float_argument_input) {
1402  return sizeof(float);
1403  }
1404  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1405 }
1406 
1408  const TargetInfo& target_info,
1409  const size_t target_slot_idx,
1410  const size_t init_agg_val_idx,
1411  const int8_t* that_ptr1) const {
1412  const bool float_argument_input = takes_float_argument(target_info);
1413  const auto chosen_bytes = result_set::get_width_for_slot(
1414  target_slot_idx, float_argument_input, query_mem_desc_);
1415  auto init_val = target_init_vals_[init_agg_val_idx];
1416 
1417  auto reduce = [&](auto const& size_tag) {
1418  using CastTarget = std::decay_t<decltype(size_tag)>;
1419  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1420  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1421  if (rhs_proj_col == init_val) {
1422  // ignore
1423  } else if (lhs_proj_col == init_val) {
1424  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1425  } else if (lhs_proj_col != rhs_proj_col) {
1426  throw std::runtime_error("Multiple distinct values encountered");
1427  }
1428  };
1429 
1430  switch (chosen_bytes) {
1431  case 1: {
1433  reduce(int8_t());
1434  break;
1435  }
1436  case 2: {
1438  reduce(int16_t());
1439  break;
1440  }
1441  case 4: {
1442  reduce(int32_t());
1443  break;
1444  }
1445  case 8: {
1446  CHECK(!target_info.sql_type.is_varlen());
1447  reduce(int64_t());
1448  break;
1449  }
1450  default:
1451  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1452  }
1453 }
1454 
1456  int8_t* this_ptr1,
1457  int8_t* this_ptr2,
1458  const int8_t* that_ptr1,
1459  const int8_t* that_ptr2,
1460  const TargetInfo& target_info,
1461  const size_t target_logical_idx,
1462  const size_t target_slot_idx,
1463  const size_t init_agg_val_idx,
1464  const ResultSetStorage& that,
1465  const size_t first_slot_idx_for_target,
1466  const std::vector<std::string>& serialized_varlen_buffer) const {
1468  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1469  return;
1470  }
1471  }
1472  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1473  const bool float_argument_input = takes_float_argument(target_info);
1474  const auto chosen_bytes = result_set::get_width_for_slot(
1475  target_slot_idx, float_argument_input, query_mem_desc_);
1476  int64_t init_val = target_init_vals_[init_agg_val_idx]; // skip_val for nullable types
1477 
1478  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1480  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1481  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1482  switch (target_info.agg_kind) {
1483  case kCOUNT:
1484  case kCOUNT_IF:
1485  case kAPPROX_COUNT_DISTINCT: {
1486  if (is_distinct_target(target_info)) {
1487  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1488  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1489  break;
1490  }
1491  CHECK_EQ(int64_t(0), init_val);
1492  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1493  break;
1494  }
1495  case kAVG: {
1496  // Ignore float argument compaction for count component for fear of its overflow
1497  AGGREGATE_ONE_COUNT(this_ptr2,
1498  that_ptr2,
1499  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1500  }
1501  // fall thru
1502  case kSUM:
1503  case kSUM_IF: {
1505  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1506  break;
1507  }
1508  case kMIN: {
1509  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1511  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1512  } else {
1514  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1515  }
1516  break;
1517  }
1518  case kMAX: {
1519  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1521  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1522  } else {
1524  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1525  }
1526  break;
1527  }
1528  case kAPPROX_QUANTILE:
1529  CHECK_EQ(static_cast<int8_t>(sizeof(int64_t)), chosen_bytes);
1530  reduceOneApproxQuantileSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1531  break;
1532  default:
1533  UNREACHABLE() << toString(target_info.agg_kind);
1534  }
1535  } else {
1536  switch (chosen_bytes) {
1537  case 1: {
1539  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1540  if (rhs_proj_col != init_val) {
1541  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1542  }
1543  break;
1544  }
1545  case 2: {
1547  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1548  if (rhs_proj_col != init_val) {
1549  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1550  }
1551  break;
1552  }
1553  case 4: {
1554  CHECK(target_info.agg_kind != kSAMPLE ||
1556  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1557  if (rhs_proj_col != init_val) {
1558  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1559  }
1560  break;
1561  }
1562  case 8: {
1563  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1564  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1565  !serialized_varlen_buffer.empty()) {
1566  size_t length_to_elems{0};
1567  if (target_info.sql_type.is_geometry()) {
1568  // TODO: Assumes hard-coded sizes for geometry targets
1569  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1570  } else {
1571  const auto& elem_ti = target_info.sql_type.get_elem_type();
1572  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1573  }
1574 
1575  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1576  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1577  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1578  *reinterpret_cast<int64_t*>(this_ptr1) =
1579  reinterpret_cast<const int64_t>(str_ptr);
1580  *reinterpret_cast<int64_t*>(this_ptr2) =
1581  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1582  } else {
1583  if (rhs_proj_col != init_val) {
1584  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1585  }
1586  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1587  CHECK(this_ptr2 && that_ptr2);
1588  *reinterpret_cast<int64_t*>(this_ptr2) =
1589  *reinterpret_cast<const int64_t*>(that_ptr2);
1590  }
1591  }
1592 
1593  break;
1594  }
1595  default:
1596  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1597  }
1598  }
1599 }
1600 
1602  const int8_t* that_ptr1,
1603  const size_t target_logical_idx,
1604  const ResultSetStorage& that) const {
1606  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1607  auto* incoming = *reinterpret_cast<quantile::TDigest* const*>(that_ptr1);
1608  CHECK(incoming) << "this_ptr1=" << (void*)this_ptr1
1609  << ", that_ptr1=" << (void const*)that_ptr1
1610  << ", target_logical_idx=" << target_logical_idx;
1611  if (incoming->centroids().capacity()) {
1612  auto* accumulator = *reinterpret_cast<quantile::TDigest**>(this_ptr1);
1613  CHECK(accumulator) << "this_ptr1=" << (void*)this_ptr1
1614  << ", that_ptr1=" << (void const*)that_ptr1
1615  << ", target_logical_idx=" << target_logical_idx;
1616  accumulator->allocate();
1617  accumulator->mergeTDigest(*incoming);
1618  }
1619 }
1620 
1622  const int8_t* that_ptr1,
1623  const size_t target_logical_idx,
1624  const ResultSetStorage& that) const {
1626  const auto& old_count_distinct_desc =
1627  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1628  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1629  const auto& new_count_distinct_desc =
1630  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1631  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1632  CHECK(this_ptr1 && that_ptr1);
1633  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1634  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1636  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1637 }
1638 
1639 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1640  const int8_t warp_count,
1641  const bool is_columnar,
1642  const bool replace_bitmap_ptr_with_bitmap_sz,
1643  std::vector<int64_t>& agg_vals,
1645  const std::vector<TargetInfo>& targets,
1646  const std::vector<int64_t>& agg_init_vals) {
1647  const size_t agg_col_count{agg_vals.size()};
1648  const auto row_size = query_mem_desc.getRowSize();
1649  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1650  CHECK_GE(agg_col_count, targets.size());
1651  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1652  CHECK(query_mem_desc.hasKeylessHash());
1653  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1654  bool discard_row = true;
1655  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1656  bool discard_partial_result = true;
1657  for (size_t target_idx = 0, agg_col_idx = 0;
1658  target_idx < targets.size() && agg_col_idx < agg_col_count;
1659  ++target_idx, ++agg_col_idx) {
1660  const auto& agg_info = targets[target_idx];
1661  const bool float_argument_input = takes_float_argument(agg_info);
1662  const auto chosen_bytes = float_argument_input
1663  ? sizeof(float)
1664  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1665  auto partial_bin_val = get_component(
1666  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1667  partial_agg_vals[agg_col_idx] = partial_bin_val;
1668  if (is_distinct_target(agg_info)) {
1669  CHECK_EQ(int8_t(1), warp_count);
1670  CHECK(agg_info.is_agg &&
1671  (agg_info.agg_kind == kCOUNT || agg_info.agg_kind == kCOUNT_IF ||
1672  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1673  partial_bin_val = count_distinct_set_size(
1674  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1675  if (replace_bitmap_ptr_with_bitmap_sz) {
1676  partial_agg_vals[agg_col_idx] = partial_bin_val;
1677  }
1678  }
1679  if (kAVG == agg_info.agg_kind) {
1680  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1681  ++agg_col_idx;
1682  partial_bin_val = partial_agg_vals[agg_col_idx] =
1683  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1684  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1685  }
1686  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1687  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1688  CHECK(agg_info.is_agg);
1689  discard_partial_result = false;
1690  }
1691  }
1692  row_ptr += row_size;
1693  if (discard_partial_result) {
1694  continue;
1695  }
1696  discard_row = false;
1697  for (size_t target_idx = 0, agg_col_idx = 0;
1698  target_idx < targets.size() && agg_col_idx < agg_col_count;
1699  ++target_idx, ++agg_col_idx) {
1700  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1701  const auto& agg_info = targets[target_idx];
1702  const bool float_argument_input = takes_float_argument(agg_info);
1703  const auto chosen_bytes = float_argument_input
1704  ? sizeof(float)
1705  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1706  const auto& chosen_type = get_compact_type(agg_info);
1707  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1708  try {
1709  switch (agg_info.agg_kind) {
1710  case kCOUNT:
1711  case kCOUNT_IF:
1714  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1715  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1716  agg_init_vals[agg_col_idx],
1717  chosen_bytes,
1718  agg_info);
1719  break;
1720  case kAVG:
1721  // Ignore float argument compaction for count component for fear of its
1722  // overflow
1724  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1725  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1726  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1727  // fall thru
1728  case kSUM:
1729  case kSUM_IF:
1731  sum,
1732  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1733  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1734  agg_init_vals[agg_col_idx],
1735  chosen_bytes,
1736  agg_info);
1737  break;
1738  case kMIN:
1739  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1741  min,
1742  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1743  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1744  agg_init_vals[agg_col_idx],
1745  chosen_bytes,
1746  agg_info);
1747  } else {
1749  min,
1750  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1751  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1752  agg_init_vals[agg_col_idx],
1753  chosen_bytes,
1754  agg_info);
1755  }
1756  break;
1757  case kMAX:
1758  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1760  max,
1761  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1762  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1763  agg_init_vals[agg_col_idx],
1764  chosen_bytes,
1765  agg_info);
1766  } else {
1768  max,
1769  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1770  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1771  agg_init_vals[agg_col_idx],
1772  chosen_bytes,
1773  agg_info);
1774  }
1775  break;
1776  default:
1777  CHECK(false);
1778  break;
1779  }
1780  } catch (std::runtime_error& e) {
1781  // TODO(miyu): handle the case where chosen_bytes < 8
1782  LOG(ERROR) << e.what();
1783  }
1784  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1785  switch (chosen_bytes) {
1786  case 8:
1787  break;
1788  case 4: {
1789  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1790  if (!(shared::is_any<kCOUNT, kCOUNT_IF>(agg_info.agg_kind) &&
1791  ret != agg_init_vals[agg_col_idx])) {
1792  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1793  }
1794  break;
1795  }
1796  default:
1797  CHECK(false);
1798  }
1799  }
1800  if (kAVG == agg_info.agg_kind) {
1801  ++agg_col_idx;
1802  }
1803  } else {
1804  if (agg_info.agg_kind == kSAMPLE) {
1805  CHECK(!agg_info.sql_type.is_varlen())
1806  << "Interleaved bins reduction not supported for variable length "
1807  "arguments "
1808  "to SAMPLE";
1809  }
1810  if (agg_vals[agg_col_idx]) {
1811  if (agg_info.agg_kind == kSAMPLE) {
1812  continue;
1813  }
1814  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1815  } else {
1816  agg_vals[agg_col_idx] = partial_bin_val;
1817  }
1818  }
1819  }
1820  }
1821  return discard_row;
1822 }
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
#define EMPTY_KEY_64
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1623
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
#define LOG(tag)
Definition: Logger.h:285
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:1042
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed)
bool is_varlen() const
Definition: sqltypes.h:629
void initializeStorage() const
#define UNREACHABLE()
Definition: Logger.h:338
#define CHECK_GE(x, y)
Definition: Logger.h:306
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void reduceOneApproxQuantileSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:81
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:106
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:134
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
#define CHECK_GT(x, y)
Definition: Logger.h:305
void rewriteVarlenAggregates(ResultSet *)
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:75
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:509
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
future< Result > async(Fn &&fn, Args &&...args)
bool is_agg
Definition: TargetInfo.h:50
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:364
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define store_cst(ptr, val)
size_t targetGroupbyIndicesSize() const
std::pair< int64_t *, bool > GroupValueInfo
Definition: sqldefs.h:77
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
#define LIKELY(x)
Definition: likely.h:24
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:102
static EvalValue MakeEvalValue(const T &val)
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1628
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code, const size_t executor_id) const
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
#define mapd_cas(address, compare, val)
SQLAgg agg_kind
Definition: TargetInfo.h:51
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:25
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer, const size_t executor_id) const
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:303
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:78
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
RUNTIME_EXPORT ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
const ColSlotContext & getColSlotContext() const
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:595
#define EMPTY_KEY_32
static EvalValue run(const size_t execution_id, const Function *function, const std::vector< EvalValue > &inputs)
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
bool is_string() const
Definition: sqltypes.h:559
void run_reduction_code(const size_t executor_id, const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
Definition: sqldefs.h:76
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:25
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:74
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ResultSet * reduce(std::vector< ResultSet * > &, const size_t executor_id)
QueryMemoryDescriptor query_mem_desc_
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const