OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DynamicWatchdog.h"
24 #include "Execute.h"
25 #include "ResultSet.h"
27 #include "ResultSetReductionJIT.h"
28 #include "RuntimeFunctions.h"
29 #include "Shared/SqlTypesLayout.h"
30 #include "Shared/likely.h"
31 #include "Shared/thread_count.h"
32 
33 #include <llvm/ExecutionEngine/GenericValue.h>
34 
35 #include <algorithm>
36 #include <future>
37 #include <numeric>
38 
39 extern bool g_enable_dynamic_watchdog;
40 
41 namespace {
42 
43 bool use_multithreaded_reduction(const size_t entry_count) {
44  return entry_count > 100000;
45 }
46 
48  const auto row_bytes = get_row_bytes(query_mem_desc);
49  CHECK_EQ(size_t(0), row_bytes % 8);
50  return row_bytes / 8;
51 }
52 
53 std::vector<int64_t> make_key(const int64_t* buff,
54  const size_t entry_count,
55  const size_t key_count) {
56  std::vector<int64_t> key;
57  size_t off = 0;
58  for (size_t i = 0; i < key_count; ++i) {
59  key.push_back(buff[off]);
60  off += entry_count;
61  }
62  return key;
63 }
64 
65 void fill_slots(int64_t* dst_entry,
66  const size_t dst_entry_count,
67  const int64_t* src_buff,
68  const size_t src_entry_idx,
69  const size_t src_entry_count,
71  const auto slot_count = query_mem_desc.getBufferColSlotCount();
72  const auto key_count = query_mem_desc.getGroupbyColCount();
73  if (query_mem_desc.didOutputColumnar()) {
74  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
75  ++i, dst_slot_off += dst_entry_count) {
76  dst_entry[dst_slot_off] =
77  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
78  }
79  } else {
80  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
81  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
82  for (size_t i = 0; i < slot_count; ++i) {
83  dst_entry[i] = row_ptr[slot_off_quad + i];
84  }
85  }
86 }
87 
89 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
90  for (size_t i = 0; i < key_count; ++i) {
91  key_ptr_i32[i] = EMPTY_KEY_32;
92  }
93 }
94 
96 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
97  for (size_t i = 0; i < key_count; ++i) {
98  key_ptr_i64[i] = EMPTY_KEY_64;
99  }
100 }
101 
102 inline int64_t get_component(const int8_t* group_by_buffer,
103  const size_t comp_sz,
104  const size_t index = 0) {
105  int64_t ret = std::numeric_limits<int64_t>::min();
106  switch (comp_sz) {
107  case 1: {
108  ret = group_by_buffer[index];
109  break;
110  }
111  case 2: {
112  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
113  ret = buffer_ptr[index];
114  break;
115  }
116  case 4: {
117  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
118  ret = buffer_ptr[index];
119  break;
120  }
121  case 8: {
122  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
123  ret = buffer_ptr[index];
124  break;
125  }
126  default:
127  CHECK(false);
128  }
129  return ret;
130 }
131 
132 void run_reduction_code(const size_t executor_id,
133  const ReductionCode& reduction_code,
134  int8_t* this_buff,
135  const int8_t* that_buff,
136  const int32_t start_entry_index,
137  const int32_t end_entry_index,
138  const int32_t that_entry_count,
139  const void* this_qmd,
140  const void* that_qmd,
141  const void* serialized_varlen_buffer) {
142  int err = 0;
143  if (reduction_code.func_ptr) {
144  err = reduction_code.func_ptr(this_buff,
145  that_buff,
146  start_entry_index,
147  end_entry_index,
148  that_entry_count,
149  this_qmd,
150  that_qmd,
151  serialized_varlen_buffer);
152  } else {
153  auto ret = ReductionInterpreter::run(
154  executor_id,
155  reduction_code.ir_reduce_loop.get(),
158  ReductionInterpreter::MakeEvalValue(start_entry_index),
159  ReductionInterpreter::MakeEvalValue(end_entry_index),
160  ReductionInterpreter::MakeEvalValue(that_entry_count),
163  ReductionInterpreter::MakeEvalValue(serialized_varlen_buffer)});
164  err = ret.int_val;
165  }
166  if (err) {
168  throw std::runtime_error("Multiple distinct values encountered");
169  }
170  if (err == Executor::ERR_INTERRUPTED) {
171  throw std::runtime_error(
172  "Query execution has interrupted during result set reduction");
173  }
174  throw std::runtime_error(
175  "Query execution has exceeded the time limit or was interrupted during result "
176  "set reduction");
177  }
178 }
179 
180 } // namespace
181 
182 void result_set::fill_empty_key(void* key_ptr,
183  const size_t key_count,
184  const size_t key_width) {
185  switch (key_width) {
186  case 4: {
187  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
188  fill_empty_key_32(key_ptr_i32, key_count);
189  break;
190  }
191  case 8: {
192  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
193  fill_empty_key_64(key_ptr_i64, key_count);
194  break;
195  }
196  default:
197  CHECK(false);
198  }
199 }
200 
201 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
202 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
204  const std::vector<std::string>& serialized_varlen_buffer,
205  const ReductionCode& reduction_code,
206  const size_t executor_id) const {
207  auto entry_count = query_mem_desc_.getEntryCount();
208  CHECK_GT(entry_count, size_t(0));
216  }
217  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
220  CHECK_GE(entry_count, that_entry_count);
221  break;
222  default:
223  CHECK_EQ(entry_count, that_entry_count);
224  }
225  auto this_buff = buff_;
226  CHECK(this_buff);
227  auto that_buff = that.buff_;
228  CHECK(that_buff);
231  if (!serialized_varlen_buffer.empty()) {
232  throw std::runtime_error(
233  "Projection of variable length targets with baseline hash group by is not yet "
234  "supported in Distributed mode");
235  }
236  if (use_multithreaded_reduction(that_entry_count)) {
237  const size_t thread_count = cpu_threads();
238  std::vector<std::future<void>> reduction_threads;
239  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
240  const auto thread_entry_count =
241  (that_entry_count + thread_count - 1) / thread_count;
242  const auto start_index = thread_idx * thread_entry_count;
243  const auto end_index =
244  std::min(start_index + thread_entry_count, that_entry_count);
245  reduction_threads.emplace_back(std::async(
247  [this,
248  this_buff,
249  that_buff,
250  start_index,
251  end_index,
252  that_entry_count,
253  executor_id,
254  &reduction_code,
255  &that] {
256  if (reduction_code.ir_reduce_loop) {
257  run_reduction_code(executor_id,
258  reduction_code,
259  this_buff,
260  that_buff,
261  start_index,
262  end_index,
263  that_entry_count,
265  &that.query_mem_desc_,
266  nullptr);
267  } else {
268  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
270  this_buff, that_buff, entry_idx, that_entry_count, that);
271  }
272  }
273  }));
274  }
275  for (auto& reduction_thread : reduction_threads) {
276  reduction_thread.wait();
277  }
278  for (auto& reduction_thread : reduction_threads) {
279  reduction_thread.get();
280  }
281  } else {
282  if (reduction_code.ir_reduce_loop) {
283  run_reduction_code(executor_id,
284  reduction_code,
285  this_buff,
286  that_buff,
287  0,
288  that_entry_count,
289  that_entry_count,
291  &that.query_mem_desc_,
292  nullptr);
293  } else {
294  for (size_t i = 0; i < that_entry_count; ++i) {
295  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
296  }
297  }
298  }
299  return;
300  }
301  if (use_multithreaded_reduction(entry_count)) {
302  const size_t thread_count = cpu_threads();
303  std::vector<std::future<void>> reduction_threads;
304  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
305  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
306  const auto start_index = thread_idx * thread_entry_count;
307  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
308  if (query_mem_desc_.didOutputColumnar()) {
309  reduction_threads.emplace_back(std::async(std::launch::async,
310  [this,
311  this_buff,
312  that_buff,
313  start_index,
314  end_index,
315  &that,
316  &serialized_varlen_buffer,
317  &executor_id] {
318  reduceEntriesNoCollisionsColWise(
319  this_buff,
320  that_buff,
321  that,
322  start_index,
323  end_index,
324  serialized_varlen_buffer,
325  executor_id);
326  }));
327  } else {
328  reduction_threads.emplace_back(std::async(std::launch::async,
329  [this,
330  this_buff,
331  that_buff,
332  start_index,
333  end_index,
334  that_entry_count,
335  executor_id,
336  &reduction_code,
337  &that,
338  &serialized_varlen_buffer] {
339  CHECK(reduction_code.ir_reduce_loop);
341  executor_id,
342  reduction_code,
343  this_buff,
344  that_buff,
345  start_index,
346  end_index,
347  that_entry_count,
348  &query_mem_desc_,
349  &that.query_mem_desc_,
350  &serialized_varlen_buffer);
351  }));
352  }
353  }
354  for (auto& reduction_thread : reduction_threads) {
355  reduction_thread.wait();
356  }
357  for (auto& reduction_thread : reduction_threads) {
358  reduction_thread.get();
359  }
360  } else {
361  if (query_mem_desc_.didOutputColumnar()) {
362  reduceEntriesNoCollisionsColWise(this_buff,
363  that_buff,
364  that,
365  0,
366  query_mem_desc_.getEntryCount(),
367  serialized_varlen_buffer,
368  executor_id);
369  } else {
370  CHECK(reduction_code.ir_reduce_loop);
371  run_reduction_code(executor_id,
372  reduction_code,
373  this_buff,
374  that_buff,
375  0,
376  entry_count,
377  that_entry_count,
378  &query_mem_desc_,
379  &that.query_mem_desc_,
380  &serialized_varlen_buffer);
381  }
382  }
383 }
384 
385 namespace {
386 
388  if (UNLIKELY(dynamic_watchdog())) {
389  // TODO(alex): distinguish between the deadline and interrupt
390  throw std::runtime_error(
391  "Query execution has exceeded the time limit or was interrupted during result "
392  "set reduction");
393  }
394 }
395 
396 ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed) {
397  if (UNLIKELY((sample_seed & 0x3F) == 0 && dynamic_watchdog())) {
398  // TODO(alex): distinguish between the deadline and interrupt
399  throw std::runtime_error(
400  "Query execution has exceeded the time limit or was interrupted during result "
401  "set reduction");
402  }
403 }
404 
405 } // namespace
406 
408  int8_t* this_buff,
409  const int8_t* that_buff,
410  const ResultSetStorage& that,
411  const size_t start_index,
412  const size_t end_index,
413  const std::vector<std::string>& serialized_varlen_buffer,
414  const size_t executor_id) const {
415  // TODO(adb / saman): Support column wise output when serializing distributed agg
416  // functions
417  CHECK(serialized_varlen_buffer.empty());
418 
419  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
420 
421  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
422  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
423  auto executor = Executor::getExecutor(executor_id);
424  CHECK(executor);
425  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
426  const auto& agg_info = targets_[target_idx];
427  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
428 
429  bool two_slot_target{false};
430  if (agg_info.is_agg &&
431  (agg_info.agg_kind == kAVG ||
432  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
433  // Note that this assumes if one of the slot pairs in a given target is an array,
434  // all slot pairs are arrays. Currently this is true for all geo targets, but we
435  // should better codify and store this information in the future
436  two_slot_target = true;
437  }
439  executor->checkNonKernelTimeInterrupted())) {
440  throw std::runtime_error(
441  "Query execution was interrupted during result set reduction");
442  }
444  check_watchdog();
445  }
446  for (size_t target_slot_idx = slots_for_col.front();
447  target_slot_idx < slots_for_col.back() + 1;
448  target_slot_idx += 2) {
449  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
450  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
451  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
452  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
453  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
454  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
455  continue;
456  }
458  // copy the key from right hand side
459  copyKeyColWise(entry_idx, this_buff, that_buff);
460  }
461  auto this_ptr1 =
462  this_crt_col_ptr +
463  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
464  auto that_ptr1 =
465  that_crt_col_ptr +
466  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
467  int8_t* this_ptr2{nullptr};
468  const int8_t* that_ptr2{nullptr};
469  if (UNLIKELY(two_slot_target)) {
470  this_ptr2 =
471  this_next_col_ptr +
472  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
473  that_ptr2 =
474  that_next_col_ptr +
475  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
476  }
477  reduceOneSlot(this_ptr1,
478  this_ptr2,
479  that_ptr1,
480  that_ptr2,
481  agg_info,
482  target_idx,
483  target_slot_idx,
484  target_slot_idx,
485  that,
486  slots_for_col.front(),
487  serialized_varlen_buffer);
488  }
489 
490  this_crt_col_ptr = this_next_col_ptr;
491  that_crt_col_ptr = that_next_col_ptr;
492  if (UNLIKELY(two_slot_target)) {
493  this_crt_col_ptr = advance_to_next_columnar_target_buff(
494  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
495  that_crt_col_ptr = advance_to_next_columnar_target_buff(
496  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
497  }
498  }
499  }
500 }
501 
502 /*
503  * copy all keys from the columnar prepended group buffer of "that_buff" into
504  * "this_buff"
505  */
506 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
507  int8_t* this_buff,
508  const int8_t* that_buff) const {
510  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
511  group_idx++) {
512  // if the column corresponds to a group key
513  const auto column_offset_bytes =
515  auto lhs_key_ptr = this_buff + column_offset_bytes;
516  auto rhs_key_ptr = that_buff + column_offset_bytes;
517  switch (query_mem_desc_.groupColWidth(group_idx)) {
518  case 8:
519  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
520  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
521  break;
522  case 4:
523  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
524  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
525  break;
526  case 2:
527  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
528  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
529  break;
530  case 1:
531  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
532  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
533  break;
534  default:
535  CHECK(false);
536  break;
537  }
538  }
539 }
540 
541 // Rewrites the entries of this ResultSetStorage object to point directly into the
542 // serialized_varlen_buffer rather than using offsets.
544  const std::vector<std::string>& serialized_varlen_buffer) const {
545  if (serialized_varlen_buffer.empty()) {
546  return;
547  }
548 
550  auto entry_count = query_mem_desc_.getEntryCount();
551  CHECK_GT(entry_count, size_t(0));
552  CHECK(buff_);
553 
554  // Row-wise iteration, consider moving to separate function
555  for (size_t i = 0; i < entry_count; ++i) {
556  if (isEmptyEntry(i, buff_)) {
557  continue;
558  }
559  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
560  const auto key_bytes_with_padding = align_to_int64(key_bytes);
561  auto rowwise_targets_ptr =
562  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
563  size_t target_slot_idx = 0;
564  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
565  ++target_logical_idx) {
566  const auto& target_info = targets_[target_logical_idx];
567  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
568  CHECK(target_info.agg_kind == kSAMPLE);
569  auto ptr1 = rowwise_targets_ptr;
570  auto slot_idx = target_slot_idx;
571  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
572  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
573 
574  const auto& elem_ti = target_info.sql_type.get_elem_type();
575  size_t length_to_elems =
576  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
577  ? 1
578  : elem_ti.get_size();
579  if (target_info.sql_type.is_geometry()) {
580  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
581  if (j > 0) {
582  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
583  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
584  slot_idx += 2;
585  length_to_elems = 4;
586  }
587  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
588  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
589  const auto str_ptr =
590  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
591  CHECK(ptr1);
592  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
593  CHECK(ptr2);
594  *reinterpret_cast<int64_t*>(ptr2) =
595  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
596  }
597  } else {
598  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
599  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
600  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
601  CHECK(ptr1);
602  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
603  CHECK(ptr2);
604  *reinterpret_cast<int64_t*>(ptr2) =
605  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
606  }
607  }
608 
609  rowwise_targets_ptr = advance_target_ptr_row_wise(
610  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
611  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
612  }
613  }
614 
615  return;
616 }
617 
618 namespace {
619 
620 #ifdef _MSC_VER
621 #define mapd_cas(address, compare, val) \
622  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
623  static_cast<long>(val), \
624  static_cast<long>(compare))
625 #else
626 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
627 #endif
628 
630  const uint32_t h,
631  const int64_t* key,
632  const uint32_t key_qw_count,
633  const size_t entry_count) {
634  auto off = h;
635  const auto old_key = mapd_cas(&groups_buffer[off], EMPTY_KEY_64, *key);
636  if (old_key == EMPTY_KEY_64) {
637  for (size_t i = 0; i < key_qw_count; ++i) {
638  groups_buffer[off] = key[i];
639  off += entry_count;
640  }
641  return {&groups_buffer[off], true};
642  }
643  off = h;
644  for (size_t i = 0; i < key_qw_count; ++i) {
645  if (groups_buffer[off] != key[i]) {
646  return {nullptr, true};
647  }
648  off += entry_count;
649  }
650  return {&groups_buffer[off], false};
651 }
652 
653 #undef mapd_cas
654 
655 // TODO(alex): fix synchronization when we enable it
657  int64_t* groups_buffer,
658  const uint32_t groups_buffer_entry_count,
659  const int64_t* key,
660  const uint32_t key_qw_count) {
661  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
663  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
664  if (matching_gvi.first) {
665  return matching_gvi;
666  }
667  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
668  while (h_probe != h) {
670  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
671  if (matching_gvi.first) {
672  return matching_gvi;
673  }
674  h_probe = (h_probe + 1) % groups_buffer_entry_count;
675  }
676  return {nullptr, true};
677 }
678 
679 #ifdef _MSC_VER
680 #define cas_cst(ptr, expected, desired) \
681  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
682  reinterpret_cast<void*>(&desired), \
683  expected) == expected)
684 #define store_cst(ptr, val) \
685  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
686  reinterpret_cast<void*>(val))
687 #define load_cst(ptr) \
688  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
689 #else
690 #define cas_cst(ptr, expected, desired) \
691  __atomic_compare_exchange_n( \
692  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
693 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
694 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
695 #endif
696 
697 template <typename T = int64_t>
699  int64_t* groups_buffer,
700  const uint32_t h,
701  const T* key,
702  const uint32_t key_count,
704  const int64_t* that_buff_i64,
705  const size_t that_entry_idx,
706  const size_t that_entry_count,
707  const uint32_t row_size_quad) {
708  auto off = h * row_size_quad;
709  T empty_key = get_empty_key<T>();
710  T write_pending = get_empty_key<T>() - 1;
711  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
712  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
713  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
714  if (success) {
715  fill_slots(groups_buffer + off + slot_off_quad,
716  query_mem_desc.getEntryCount(),
717  that_buff_i64,
718  that_entry_idx,
719  that_entry_count,
721  if (key_count > 1) {
722  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
723  }
724  store_cst(row_ptr, *key);
725  return {groups_buffer + off + slot_off_quad, true};
726  }
727  while (load_cst(row_ptr) == write_pending) {
728  // spin until the winning thread has finished writing the entire key and the init
729  // value
730  }
731  for (size_t i = 0; i < key_count; ++i) {
732  if (load_cst(row_ptr + i) != key[i]) {
733  return {nullptr, true};
734  }
735  }
736  return {groups_buffer + off + slot_off_quad, false};
737 }
738 
739 #undef load_cst
740 #undef store_cst
741 #undef cas_cst
742 
744  int64_t* groups_buffer,
745  const uint32_t h,
746  const int64_t* key,
747  const uint32_t key_count,
748  const size_t key_width,
750  const int64_t* that_buff_i64,
751  const size_t that_entry_idx,
752  const size_t that_entry_count,
753  const uint32_t row_size_quad) {
754  switch (key_width) {
755  case 4:
756  return get_matching_group_value_reduction(groups_buffer,
757  h,
758  reinterpret_cast<const int32_t*>(key),
759  key_count,
760  query_mem_desc,
761  that_buff_i64,
762  that_entry_idx,
763  that_entry_count,
764  row_size_quad);
765  case 8:
766  return get_matching_group_value_reduction(groups_buffer,
767  h,
768  key,
769  key_count,
770  query_mem_desc,
771  that_buff_i64,
772  that_entry_idx,
773  that_entry_count,
774  row_size_quad);
775  default:
776  CHECK(false);
777  return {nullptr, true};
778  }
779 }
780 
781 } // namespace
782 
784  int64_t* groups_buffer,
785  const uint32_t groups_buffer_entry_count,
786  const int64_t* key,
787  const uint32_t key_count,
788  const size_t key_width,
790  const int64_t* that_buff_i64,
791  const size_t that_entry_idx,
792  const size_t that_entry_count,
793  const uint32_t row_size_quad) {
794  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
795  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
796  h,
797  key,
798  key_count,
799  key_width,
800  query_mem_desc,
801  that_buff_i64,
802  that_entry_idx,
803  that_entry_count,
804  row_size_quad);
805  if (matching_gvi.first) {
806  return matching_gvi;
807  }
808  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
809  while (h_probe != h) {
810  matching_gvi = get_matching_group_value_reduction(groups_buffer,
811  h_probe,
812  key,
813  key_count,
814  key_width,
815  query_mem_desc,
816  that_buff_i64,
817  that_entry_idx,
818  that_entry_count,
819  row_size_quad);
820  if (matching_gvi.first) {
821  return matching_gvi;
822  }
823  h_probe = (h_probe + 1) % groups_buffer_entry_count;
824  }
825  return {nullptr, true};
826 }
827 
828 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
829 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
831  const int8_t* that_buff,
832  const size_t that_entry_idx,
833  const size_t that_entry_count,
834  const ResultSetStorage& that) const {
836  check_watchdog_with_seed(that_entry_idx);
837  }
838  const auto key_count = query_mem_desc_.getGroupbyColCount();
843  const auto key_off =
845  if (isEmptyEntry(that_entry_idx, that_buff)) {
846  return;
847  }
848  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
849  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
850  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
851  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
852  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
853  CHECK(this_entry_slots);
854  if (empty_entry) {
855  fill_slots(this_entry_slots,
857  that_buff_i64,
858  that_entry_idx,
859  that_entry_count,
861  return;
862  }
864  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
865 }
866 
867 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
868  const int64_t* that_buff,
869  const size_t that_entry_idx,
870  const size_t that_entry_count,
871  const ResultSetStorage& that) const {
873  const auto key_count = query_mem_desc_.getGroupbyColCount();
874  size_t j = 0;
875  size_t init_agg_val_idx = 0;
876  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
877  ++target_logical_idx) {
878  const auto& target_info = targets_[target_logical_idx];
879  const auto that_slot_off = slot_offset_colwise(
880  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
881  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
882  reduceOneSlotBaseline(this_entry_slots,
883  this_slot_off,
884  that_buff,
885  that_entry_count,
886  that_slot_off,
887  target_info,
888  target_logical_idx,
889  j,
890  init_agg_val_idx,
891  that);
893  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
894  } else {
895  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
896  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
897  }
898  }
899  j = advance_slot(j, target_info, false);
900  }
901 }
902 
904  const size_t this_slot,
905  const int64_t* that_buff,
906  const size_t that_entry_count,
907  const size_t that_slot,
908  const TargetInfo& target_info,
909  const size_t target_logical_idx,
910  const size_t target_slot_idx,
911  const size_t init_agg_val_idx,
912  const ResultSetStorage& that) const {
914  int8_t* this_ptr2{nullptr};
915  const int8_t* that_ptr2{nullptr};
916  if (target_info.is_agg &&
917  (target_info.agg_kind == kAVG ||
918  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
919  const auto this_count_off = query_mem_desc_.getEntryCount();
920  const auto that_count_off = that_entry_count;
921  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
922  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
923  }
924  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
925  this_ptr2,
926  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
927  that_ptr2,
928  target_info,
929  target_logical_idx,
930  target_slot_idx,
931  init_agg_val_idx,
932  that,
933  target_slot_idx, // dummy, for now
934  {});
935 }
936 
937 // During the reduction of two result sets using the baseline strategy, we first create a
938 // big enough buffer to hold the entries for both and we move the entries from the first
939 // into it before doing the reduction as usual (into the first buffer).
940 template <class KeyType>
942  const size_t new_entry_count) const {
944  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
945  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
946  const auto key_count = query_mem_desc_.getGroupbyColCount();
949  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
950  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
951  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
952 
954  const size_t thread_count = cpu_threads();
955  std::vector<std::future<void>> move_threads;
956 
957  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
958  const auto thread_entry_count =
959  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
960  const auto start_index = thread_idx * thread_entry_count;
961  const auto end_index =
962  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
963  move_threads.emplace_back(std::async(
965  [this,
966  src_buff,
967  new_buff_i64,
968  new_entry_count,
969  start_index,
970  end_index,
971  key_count,
972  row_qw_count,
973  key_byte_width] {
974  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
975  moveOneEntryToBuffer<KeyType>(entry_idx,
976  new_buff_i64,
977  new_entry_count,
978  key_count,
979  row_qw_count,
980  src_buff,
981  key_byte_width);
982  }
983  }));
984  }
985  for (auto& move_thread : move_threads) {
986  move_thread.wait();
987  }
988  for (auto& move_thread : move_threads) {
989  move_thread.get();
990  }
991  } else {
992  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
993  moveOneEntryToBuffer<KeyType>(entry_idx,
994  new_buff_i64,
995  new_entry_count,
996  key_count,
997  row_qw_count,
998  src_buff,
999  key_byte_width);
1000  }
1001  }
1002 }
1003 
1004 template <class KeyType>
1005 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
1006  int64_t* new_buff_i64,
1007  const size_t new_entry_count,
1008  const size_t key_count,
1009  const size_t row_qw_count,
1010  const int64_t* src_buff,
1011  const size_t key_byte_width) const {
1012  const auto key_off =
1014  ? key_offset_colwise(entry_index, 0, query_mem_desc_.getEntryCount())
1015  : row_qw_count * entry_index;
1016  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
1017  if (*key_ptr == get_empty_key<KeyType>()) {
1018  return;
1019  }
1020  int64_t* new_entries_ptr{nullptr};
1022  const auto key =
1023  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
1024  new_entries_ptr =
1025  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
1026  } else {
1027  new_entries_ptr = get_group_value(new_buff_i64,
1028  new_entry_count,
1029  &src_buff[key_off],
1030  key_count,
1031  key_byte_width,
1032  row_qw_count);
1033  }
1034  CHECK(new_entries_ptr);
1035  fill_slots(new_entries_ptr,
1036  new_entry_count,
1037  src_buff,
1038  entry_index,
1040  query_mem_desc_);
1041 }
1042 
1044  if (query_mem_desc_.didOutputColumnar()) {
1045  storage_->initializeColWise();
1046  } else {
1047  storage_->initializeRowWise();
1048  }
1049 }
1050 
1051 // Driver for reductions. Needed because the result of a reduction on the baseline
1052 // layout, which can have collisions, cannot be done in place and something needs
1053 // to take the ownership of the new result set with the bigger underlying buffer.
1054 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets,
1055  const size_t executor_id) {
1056  CHECK(!result_sets.empty());
1057  auto result_rs = result_sets.front();
1058  CHECK(result_rs->storage_);
1059  auto& first_result = *result_rs->storage_;
1060  auto result = &first_result;
1061  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1062  for (const auto result_set : result_sets) {
1063  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1064  }
1065  const auto catalog = result_rs->catalog_;
1066  for (const auto result_set : result_sets) {
1067  CHECK_EQ(catalog, result_set->catalog_);
1068  }
1069  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1071  const auto total_entry_count =
1072  std::accumulate(result_sets.begin(),
1073  result_sets.end(),
1074  size_t(0),
1075  [](const size_t init, const ResultSet* rs) {
1076  return init + rs->query_mem_desc_.getEntryCount();
1077  });
1078  CHECK(total_entry_count);
1079  auto query_mem_desc = first_result.query_mem_desc_;
1080  query_mem_desc.setEntryCount(total_entry_count);
1081  rs_.reset(new ResultSet(first_result.targets_,
1084  row_set_mem_owner,
1085  catalog,
1086  0,
1087  0));
1088  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1089  rs_->initializeStorage();
1090  switch (query_mem_desc.getEffectiveKeyWidth()) {
1091  case 4:
1092  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1093  query_mem_desc.getEntryCount());
1094  break;
1095  case 8:
1096  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1097  query_mem_desc.getEntryCount());
1098  break;
1099  default:
1100  CHECK(false);
1101  }
1102  result = rs_->storage_.get();
1103  result_rs = rs_.get();
1104  }
1105 
1106  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1107  if (!serialized_varlen_buffer.empty()) {
1108  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1109  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1110  ++result_it) {
1111  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1112  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1113  serialized_varlen_buffer.emplace_back(
1114  std::move(result_serialized_varlen_buffer.front()));
1115  }
1116  }
1117 
1118  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1119  result_rs->getTargetInfos(),
1120  result_rs->getTargetInitVals(),
1121  executor_id);
1122  auto reduction_code = reduction_jit.codegen();
1123  size_t ctr = 1;
1124  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1125  ++result_it) {
1126  if (!serialized_varlen_buffer.empty()) {
1127  result->reduce(*((*result_it)->storage_),
1128  serialized_varlen_buffer[ctr++],
1129  reduction_code,
1130  executor_id);
1131  } else {
1132  result->reduce(*((*result_it)->storage_), {}, reduction_code, executor_id);
1133  }
1134  }
1135  return result_rs;
1136 }
1137 
1138 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1139  return rs_;
1140 }
1141 
1142 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1143  auto& result_storage = result_rs->storage_;
1144  result_storage->rewriteAggregateBufferOffsets(
1145  result_rs->serialized_varlen_buffer_.front());
1146 }
1147 
1148 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1149  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1150  const auto key_count = query_mem_desc_.getGroupbyColCount();
1151  CHECK_EQ(slot_count + key_count, entry.size());
1152  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1154  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1155  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1156  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1157  for (size_t i = 0; i < key_count; ++i) {
1158  this_buff[key_off + i] = entry[i];
1159  }
1160  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1161  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1162  this_buff[first_slot_off + i] = entry[key_count + i];
1163  }
1164 }
1165 
1167  const auto key_count = query_mem_desc_.getGroupbyColCount();
1168  const auto row_size = get_row_bytes(query_mem_desc_);
1169  CHECK_EQ(row_size % 8, 0u);
1170  const auto key_bytes_with_padding =
1174  case 4: {
1175  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1176  auto row_ptr = buff_ + i * row_size;
1177  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1178  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1179  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1180  slot_ptr[j] = target_init_vals_[j];
1181  }
1182  }
1183  break;
1184  }
1185  case 8: {
1186  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1187  auto row_ptr = buff_ + i * row_size;
1188  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1189  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1190  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1191  slot_ptr[j] = target_init_vals_[j];
1192  }
1193  }
1194  break;
1195  }
1196  default:
1197  CHECK(false);
1198  }
1199 }
1200 
1201 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1203  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1204  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1205  const auto key_count = query_mem_desc_.getGroupbyColCount();
1206  CHECK_EQ(slot_count + key_count, entry.size());
1207  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1208 
1209  for (size_t i = 0; i < key_count; i++) {
1210  const auto key_offset = key_offset_colwise(0, i, 1);
1211  this_buff[key_offset] = entry[i];
1212  }
1213 
1214  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1215  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1216  this_buff[slot_offset] = entry[key_count + i];
1217  }
1218 }
1219 
1221  const auto key_count = query_mem_desc_.getGroupbyColCount();
1222  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1224  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1225  const auto first_key_off =
1227  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1228  this_buff[first_key_off + i] = EMPTY_KEY_64;
1229  }
1230  }
1231  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1232  const auto first_val_off =
1233  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1234  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1235  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1236  }
1237  }
1238 }
1239 
1240 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1241  CHECK(entry_slots);
1243  size_t slot_off = 0;
1244  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1245  entry_slots[slot_off] = target_init_vals_[j];
1246  slot_off += query_mem_desc_.getEntryCount();
1247  }
1248  } else {
1249  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1250  entry_slots[j] = target_init_vals_[j];
1251  }
1252  }
1253 }
1254 
1255 #define AGGREGATE_ONE_VALUE( \
1256  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1257  do { \
1258  const auto sql_type = get_compact_type(agg_info__); \
1259  if (sql_type.is_fp()) { \
1260  if (chosen_bytes__ == sizeof(float)) { \
1261  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1262  *reinterpret_cast<const float*>(other_ptr__)); \
1263  } else { \
1264  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1265  *reinterpret_cast<const double*>(other_ptr__)); \
1266  } \
1267  } else { \
1268  if (chosen_bytes__ == sizeof(int32_t)) { \
1269  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1270  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1271  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1272  } else { \
1273  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1274  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1275  agg_##agg_kind__(val_ptr, *other_ptr); \
1276  } \
1277  } \
1278  } while (0)
1279 
1280 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1281  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1282  do { \
1283  if (agg_info__.skip_null_val) { \
1284  const auto sql_type = get_compact_type(agg_info__); \
1285  if (sql_type.is_fp()) { \
1286  if (chosen_bytes__ == sizeof(float)) { \
1287  agg_##agg_kind__##_float_skip_val( \
1288  reinterpret_cast<int32_t*>(val_ptr__), \
1289  *reinterpret_cast<const float*>(other_ptr__), \
1290  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1291  } else { \
1292  agg_##agg_kind__##_double_skip_val( \
1293  reinterpret_cast<int64_t*>(val_ptr__), \
1294  *reinterpret_cast<const double*>(other_ptr__), \
1295  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1296  } \
1297  } else { \
1298  if (chosen_bytes__ == sizeof(int32_t)) { \
1299  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1300  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1301  const auto null_val = static_cast<int32_t>(init_val__); \
1302  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1303  } else { \
1304  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1305  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1306  const auto null_val = static_cast<int64_t>(init_val__); \
1307  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1308  } \
1309  } \
1310  } else { \
1311  AGGREGATE_ONE_VALUE( \
1312  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1313  } \
1314  } while (0)
1315 
1316 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1317  do { \
1318  if (chosen_bytes__ == sizeof(int32_t)) { \
1319  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1320  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1321  agg_sum_int32(val_ptr, *other_ptr); \
1322  } else { \
1323  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1324  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1325  agg_sum(val_ptr, *other_ptr); \
1326  } \
1327  } while (0)
1328 
1329 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1330  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1331  { \
1332  if (agg_info__.skip_null_val) { \
1333  const auto sql_type = get_compact_type(agg_info__); \
1334  if (sql_type.is_fp()) { \
1335  if (chosen_bytes__ == sizeof(float)) { \
1336  agg_sum_float_skip_val( \
1337  reinterpret_cast<int32_t*>(val_ptr__), \
1338  *reinterpret_cast<const float*>(other_ptr__), \
1339  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1340  } else { \
1341  agg_sum_double_skip_val( \
1342  reinterpret_cast<int64_t*>(val_ptr__), \
1343  *reinterpret_cast<const double*>(other_ptr__), \
1344  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1345  } \
1346  } else { \
1347  if (chosen_bytes__ == sizeof(int32_t)) { \
1348  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1349  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1350  const auto null_val = static_cast<int32_t>(init_val__); \
1351  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1352  } else { \
1353  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1354  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1355  const auto null_val = static_cast<int64_t>(init_val__); \
1356  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1357  } \
1358  } \
1359  } else { \
1360  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1361  } \
1362  }
1363 
1364 // to be used for 8/16-bit kMIN and kMAX only
1365 #define AGGREGATE_ONE_VALUE_SMALL( \
1366  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1367  do { \
1368  if (chosen_bytes__ == sizeof(int16_t)) { \
1369  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1370  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1371  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1372  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1373  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1374  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1375  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1376  } else { \
1377  UNREACHABLE(); \
1378  } \
1379  } while (0)
1380 
1381 // to be used for 8/16-bit kMIN and kMAX only
1382 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1383  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1384  do { \
1385  if (agg_info__.skip_null_val) { \
1386  if (chosen_bytes__ == sizeof(int16_t)) { \
1387  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1388  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1389  const auto null_val = static_cast<int16_t>(init_val__); \
1390  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1391  } else if (chosen_bytes == sizeof(int8_t)) { \
1392  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1393  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1394  const auto null_val = static_cast<int8_t>(init_val__); \
1395  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1396  } \
1397  } else { \
1398  AGGREGATE_ONE_VALUE_SMALL( \
1399  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1400  } \
1401  } while (0)
1402 
1403 int8_t result_set::get_width_for_slot(const size_t target_slot_idx,
1404  const bool float_argument_input,
1406  if (float_argument_input) {
1407  return sizeof(float);
1408  }
1409  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1410 }
1411 
1413  const TargetInfo& target_info,
1414  const size_t target_slot_idx,
1415  const size_t init_agg_val_idx,
1416  const int8_t* that_ptr1) const {
1417  const bool float_argument_input = takes_float_argument(target_info);
1418  const auto chosen_bytes = result_set::get_width_for_slot(
1419  target_slot_idx, float_argument_input, query_mem_desc_);
1420  auto init_val = target_init_vals_[init_agg_val_idx];
1421 
1422  auto reduce = [&](auto const& size_tag) {
1423  using CastTarget = std::decay_t<decltype(size_tag)>;
1424  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1425  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1426  if (rhs_proj_col == init_val) {
1427  // ignore
1428  } else if (lhs_proj_col == init_val) {
1429  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1430  } else if (lhs_proj_col != rhs_proj_col) {
1431  throw std::runtime_error("Multiple distinct values encountered");
1432  }
1433  };
1434 
1435  switch (chosen_bytes) {
1436  case 1: {
1438  reduce(int8_t());
1439  break;
1440  }
1441  case 2: {
1443  reduce(int16_t());
1444  break;
1445  }
1446  case 4: {
1447  reduce(int32_t());
1448  break;
1449  }
1450  case 8: {
1451  CHECK(!target_info.sql_type.is_varlen());
1452  reduce(int64_t());
1453  break;
1454  }
1455  default:
1456  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1457  }
1458 }
1459 
1461  int8_t* this_ptr1,
1462  int8_t* this_ptr2,
1463  const int8_t* that_ptr1,
1464  const int8_t* that_ptr2,
1465  const TargetInfo& target_info,
1466  const size_t target_logical_idx,
1467  const size_t target_slot_idx,
1468  const size_t init_agg_val_idx,
1469  const ResultSetStorage& that,
1470  const size_t first_slot_idx_for_target,
1471  const std::vector<std::string>& serialized_varlen_buffer) const {
1473  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1474  return;
1475  }
1476  }
1477  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1478  const bool float_argument_input = takes_float_argument(target_info);
1479  const auto chosen_bytes = result_set::get_width_for_slot(
1480  target_slot_idx, float_argument_input, query_mem_desc_);
1481  int64_t init_val = target_init_vals_[init_agg_val_idx]; // skip_val for nullable types
1482 
1483  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1485  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1486  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1487  switch (target_info.agg_kind) {
1488  case kCOUNT:
1489  case kAPPROX_COUNT_DISTINCT: {
1490  if (is_distinct_target(target_info)) {
1491  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1492  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1493  break;
1494  }
1495  CHECK_EQ(int64_t(0), init_val);
1496  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1497  break;
1498  }
1499  case kAVG: {
1500  // Ignore float argument compaction for count component for fear of its overflow
1501  AGGREGATE_ONE_COUNT(this_ptr2,
1502  that_ptr2,
1503  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1504  }
1505  // fall thru
1506  case kSUM: {
1508  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1509  break;
1510  }
1511  case kMIN: {
1512  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1514  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1515  } else {
1517  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1518  }
1519  break;
1520  }
1521  case kMAX: {
1522  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1524  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1525  } else {
1527  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1528  }
1529  break;
1530  }
1531  case kAPPROX_QUANTILE:
1532  CHECK_EQ(static_cast<int8_t>(sizeof(int64_t)), chosen_bytes);
1533  reduceOneApproxQuantileSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1534  break;
1535  default:
1536  UNREACHABLE() << toString(target_info.agg_kind);
1537  }
1538  } else {
1539  switch (chosen_bytes) {
1540  case 1: {
1542  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1543  if (rhs_proj_col != init_val) {
1544  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1545  }
1546  break;
1547  }
1548  case 2: {
1550  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1551  if (rhs_proj_col != init_val) {
1552  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1553  }
1554  break;
1555  }
1556  case 4: {
1557  CHECK(target_info.agg_kind != kSAMPLE ||
1559  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1560  if (rhs_proj_col != init_val) {
1561  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1562  }
1563  break;
1564  }
1565  case 8: {
1566  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1567  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1568  !serialized_varlen_buffer.empty()) {
1569  size_t length_to_elems{0};
1570  if (target_info.sql_type.is_geometry()) {
1571  // TODO: Assumes hard-coded sizes for geometry targets
1572  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1573  } else {
1574  const auto& elem_ti = target_info.sql_type.get_elem_type();
1575  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1576  }
1577 
1578  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1579  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1580  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1581  *reinterpret_cast<int64_t*>(this_ptr1) =
1582  reinterpret_cast<const int64_t>(str_ptr);
1583  *reinterpret_cast<int64_t*>(this_ptr2) =
1584  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1585  } else {
1586  if (rhs_proj_col != init_val) {
1587  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1588  }
1589  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1590  CHECK(this_ptr2 && that_ptr2);
1591  *reinterpret_cast<int64_t*>(this_ptr2) =
1592  *reinterpret_cast<const int64_t*>(that_ptr2);
1593  }
1594  }
1595 
1596  break;
1597  }
1598  default:
1599  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1600  }
1601  }
1602 }
1603 
1605  const int8_t* that_ptr1,
1606  const size_t target_logical_idx,
1607  const ResultSetStorage& that) const {
1609  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1610  auto* incoming = *reinterpret_cast<quantile::TDigest* const*>(that_ptr1);
1611  CHECK(incoming) << "this_ptr1=" << (void*)this_ptr1
1612  << ", that_ptr1=" << (void const*)that_ptr1
1613  << ", target_logical_idx=" << target_logical_idx;
1614  if (incoming->centroids().capacity()) {
1615  auto* accumulator = *reinterpret_cast<quantile::TDigest**>(this_ptr1);
1616  CHECK(accumulator) << "this_ptr1=" << (void*)this_ptr1
1617  << ", that_ptr1=" << (void const*)that_ptr1
1618  << ", target_logical_idx=" << target_logical_idx;
1619  accumulator->allocate();
1620  accumulator->mergeTDigest(*incoming);
1621  }
1622 }
1623 
1625  const int8_t* that_ptr1,
1626  const size_t target_logical_idx,
1627  const ResultSetStorage& that) const {
1629  const auto& old_count_distinct_desc =
1630  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1631  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1632  const auto& new_count_distinct_desc =
1633  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1634  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1635  CHECK(this_ptr1 && that_ptr1);
1636  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1637  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1639  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1640 }
1641 
1642 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1643  const int8_t warp_count,
1644  const bool is_columnar,
1645  const bool replace_bitmap_ptr_with_bitmap_sz,
1646  std::vector<int64_t>& agg_vals,
1648  const std::vector<TargetInfo>& targets,
1649  const std::vector<int64_t>& agg_init_vals) {
1650  const size_t agg_col_count{agg_vals.size()};
1651  const auto row_size = query_mem_desc.getRowSize();
1652  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1653  CHECK_GE(agg_col_count, targets.size());
1654  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1655  CHECK(query_mem_desc.hasKeylessHash());
1656  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1657  bool discard_row = true;
1658  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1659  bool discard_partial_result = true;
1660  for (size_t target_idx = 0, agg_col_idx = 0;
1661  target_idx < targets.size() && agg_col_idx < agg_col_count;
1662  ++target_idx, ++agg_col_idx) {
1663  const auto& agg_info = targets[target_idx];
1664  const bool float_argument_input = takes_float_argument(agg_info);
1665  const auto chosen_bytes = float_argument_input
1666  ? sizeof(float)
1667  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1668  auto partial_bin_val = get_component(
1669  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1670  partial_agg_vals[agg_col_idx] = partial_bin_val;
1671  if (is_distinct_target(agg_info)) {
1672  CHECK_EQ(int8_t(1), warp_count);
1673  CHECK(agg_info.is_agg && (agg_info.agg_kind == kCOUNT ||
1674  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1675  partial_bin_val = count_distinct_set_size(
1676  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1677  if (replace_bitmap_ptr_with_bitmap_sz) {
1678  partial_agg_vals[agg_col_idx] = partial_bin_val;
1679  }
1680  }
1681  if (kAVG == agg_info.agg_kind) {
1682  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1683  ++agg_col_idx;
1684  partial_bin_val = partial_agg_vals[agg_col_idx] =
1685  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1686  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1687  }
1688  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1689  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1690  CHECK(agg_info.is_agg);
1691  discard_partial_result = false;
1692  }
1693  }
1694  row_ptr += row_size;
1695  if (discard_partial_result) {
1696  continue;
1697  }
1698  discard_row = false;
1699  for (size_t target_idx = 0, agg_col_idx = 0;
1700  target_idx < targets.size() && agg_col_idx < agg_col_count;
1701  ++target_idx, ++agg_col_idx) {
1702  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1703  const auto& agg_info = targets[target_idx];
1704  const bool float_argument_input = takes_float_argument(agg_info);
1705  const auto chosen_bytes = float_argument_input
1706  ? sizeof(float)
1707  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1708  const auto& chosen_type = get_compact_type(agg_info);
1709  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1710  try {
1711  switch (agg_info.agg_kind) {
1712  case kCOUNT:
1715  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1716  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1717  agg_init_vals[agg_col_idx],
1718  chosen_bytes,
1719  agg_info);
1720  break;
1721  case kAVG:
1722  // Ignore float argument compaction for count component for fear of its
1723  // overflow
1725  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1726  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1727  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1728  // fall thru
1729  case kSUM:
1731  sum,
1732  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1733  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1734  agg_init_vals[agg_col_idx],
1735  chosen_bytes,
1736  agg_info);
1737  break;
1738  case kMIN:
1739  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1741  min,
1742  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1743  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1744  agg_init_vals[agg_col_idx],
1745  chosen_bytes,
1746  agg_info);
1747  } else {
1749  min,
1750  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1751  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1752  agg_init_vals[agg_col_idx],
1753  chosen_bytes,
1754  agg_info);
1755  }
1756  break;
1757  case kMAX:
1758  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1760  max,
1761  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1762  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1763  agg_init_vals[agg_col_idx],
1764  chosen_bytes,
1765  agg_info);
1766  } else {
1768  max,
1769  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1770  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1771  agg_init_vals[agg_col_idx],
1772  chosen_bytes,
1773  agg_info);
1774  }
1775  break;
1776  default:
1777  CHECK(false);
1778  break;
1779  }
1780  } catch (std::runtime_error& e) {
1781  // TODO(miyu): handle the case where chosen_bytes < 8
1782  LOG(ERROR) << e.what();
1783  }
1784  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1785  switch (chosen_bytes) {
1786  case 8:
1787  break;
1788  case 4: {
1789  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1790  if (!(agg_info.agg_kind == kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1791  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1792  }
1793  break;
1794  }
1795  default:
1796  CHECK(false);
1797  }
1798  }
1799  if (kAVG == agg_info.agg_kind) {
1800  ++agg_col_idx;
1801  }
1802  } else {
1803  if (agg_info.agg_kind == kSAMPLE) {
1804  CHECK(!agg_info.sql_type.is_varlen())
1805  << "Interleaved bins reduction not supported for variable length "
1806  "arguments "
1807  "to SAMPLE";
1808  }
1809  if (agg_vals[agg_col_idx]) {
1810  if (agg_info.agg_kind == kSAMPLE) {
1811  continue;
1812  }
1813  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1814  } else {
1815  agg_vals[agg_col_idx] = partial_bin_val;
1816  }
1817  }
1818  }
1819  }
1820  return discard_row;
1821 }
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:230
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
#define EMPTY_KEY_64
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1378
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
SQLTypeInfo sql_type
Definition: TargetInfo.h:52
#define LOG(tag)
Definition: Logger.h:216
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:998
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed)
bool is_varlen() const
Definition: sqltypes.h:640
void initializeStorage() const
#define UNREACHABLE()
Definition: Logger.h:266
#define CHECK_GE(x, y)
Definition: Logger.h:235
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void reduceOneApproxQuantileSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
std::string toString(const QueryDescriptionType &type)
Definition: Types.h:64
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:80
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:111
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:126
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
#define CHECK_GT(x, y)
Definition: Logger.h:234
void rewriteVarlenAggregates(ResultSet *)
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:74
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:477
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
future< Result > async(Fn &&fn, Args &&...args)
bool is_agg
Definition: TargetInfo.h:50
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:308
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define store_cst(ptr, val)
size_t targetGroupbyIndicesSize() const
std::pair< int64_t *, bool > GroupValueInfo
Definition: sqldefs.h:76
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
#define LIKELY(x)
Definition: likely.h:24
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:107
static EvalValue MakeEvalValue(const T &val)
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1383
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code, const size_t executor_id) const
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
#define mapd_cas(address, compare, val)
SQLAgg agg_kind
Definition: TargetInfo.h:51
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:25
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer, const size_t executor_id) const
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:232
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:77
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
RUNTIME_EXPORT ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
const ColSlotContext & getColSlotContext() const
#define CHECK(condition)
Definition: Logger.h:222
bool is_geometry() const
Definition: sqltypes.h:612
#define EMPTY_KEY_32
static EvalValue run(const size_t execution_id, const Function *function, const std::vector< EvalValue > &inputs)
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
bool is_string() const
Definition: sqltypes.h:600
void run_reduction_code(const size_t executor_id, const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
Definition: sqldefs.h:75
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:25
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:981
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:73
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ResultSet * reduce(std::vector< ResultSet * > &, const size_t executor_id)
QueryMemoryDescriptor query_mem_desc_
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const