OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "DynamicWatchdog.h"
26 #include "Execute.h"
27 #include "ResultSet.h"
29 #include "ResultSetReductionJIT.h"
30 #include "RuntimeFunctions.h"
31 #include "Shared/SqlTypesLayout.h"
32 #include "Shared/likely.h"
33 #include "Shared/thread_count.h"
34 
35 #include <llvm/ExecutionEngine/GenericValue.h>
36 
37 #include <algorithm>
38 #include <future>
39 #include <numeric>
40 
41 extern bool g_enable_dynamic_watchdog;
42 
43 namespace {
44 
45 bool use_multithreaded_reduction(const size_t entry_count) {
46  return entry_count > 100000;
47 }
48 
50  const auto row_bytes = get_row_bytes(query_mem_desc);
51  CHECK_EQ(size_t(0), row_bytes % 8);
52  return row_bytes / 8;
53 }
54 
55 std::vector<int64_t> make_key(const int64_t* buff,
56  const size_t entry_count,
57  const size_t key_count) {
58  std::vector<int64_t> key;
59  size_t off = 0;
60  for (size_t i = 0; i < key_count; ++i) {
61  key.push_back(buff[off]);
62  off += entry_count;
63  }
64  return key;
65 }
66 
67 void fill_slots(int64_t* dst_entry,
68  const size_t dst_entry_count,
69  const int64_t* src_buff,
70  const size_t src_entry_idx,
71  const size_t src_entry_count,
73  const auto slot_count = query_mem_desc.getBufferColSlotCount();
74  const auto key_count = query_mem_desc.getGroupbyColCount();
75  if (query_mem_desc.didOutputColumnar()) {
76  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
77  ++i, dst_slot_off += dst_entry_count) {
78  dst_entry[dst_slot_off] =
79  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
80  }
81  } else {
82  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
83  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
84  for (size_t i = 0; i < slot_count; ++i) {
85  dst_entry[i] = row_ptr[slot_off_quad + i];
86  }
87  }
88 }
89 
91 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
92  for (size_t i = 0; i < key_count; ++i) {
93  key_ptr_i32[i] = EMPTY_KEY_32;
94  }
95 }
96 
98 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
99  for (size_t i = 0; i < key_count; ++i) {
100  key_ptr_i64[i] = EMPTY_KEY_64;
101  }
102 }
103 
104 inline int64_t get_component(const int8_t* group_by_buffer,
105  const size_t comp_sz,
106  const size_t index = 0) {
107  int64_t ret = std::numeric_limits<int64_t>::min();
108  switch (comp_sz) {
109  case 1: {
110  ret = group_by_buffer[index];
111  break;
112  }
113  case 2: {
114  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
115  ret = buffer_ptr[index];
116  break;
117  }
118  case 4: {
119  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
120  ret = buffer_ptr[index];
121  break;
122  }
123  case 8: {
124  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
125  ret = buffer_ptr[index];
126  break;
127  }
128  default:
129  CHECK(false);
130  }
131  return ret;
132 }
133 
134 void run_reduction_code(const ReductionCode& reduction_code,
135  int8_t* this_buff,
136  const int8_t* that_buff,
137  const int32_t start_entry_index,
138  const int32_t end_entry_index,
139  const int32_t that_entry_count,
140  const void* this_qmd,
141  const void* that_qmd,
142  const void* serialized_varlen_buffer) {
143  int err = 0;
144  if (reduction_code.func_ptr) {
145  err = reduction_code.func_ptr(this_buff,
146  that_buff,
147  start_entry_index,
148  end_entry_index,
149  that_entry_count,
150  this_qmd,
151  that_qmd,
152  serialized_varlen_buffer);
153  } else {
154  // Calls LLVM methods that are not thread safe, ensure nothing else compiles while we
155  // run this reduction
156  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
157  auto ret = ReductionInterpreter::run(
158  reduction_code.ir_reduce_loop.get(),
161  ReductionInterpreter::MakeEvalValue(start_entry_index),
162  ReductionInterpreter::MakeEvalValue(end_entry_index),
163  ReductionInterpreter::MakeEvalValue(that_entry_count),
166  ReductionInterpreter::MakeEvalValue(serialized_varlen_buffer)});
167  err = ret.int_val;
168  }
169  if (err) {
171  throw std::runtime_error("Multiple distinct values encountered");
172  }
173  if (err == Executor::ERR_INTERRUPTED) {
174  throw std::runtime_error(
175  "Query execution has interrupted during result set reduction");
176  }
177  throw std::runtime_error(
178  "Query execution has exceeded the time limit or was interrupted during result "
179  "set reduction");
180  }
181 }
182 
183 } // namespace
184 
185 void result_set::fill_empty_key(void* key_ptr,
186  const size_t key_count,
187  const size_t key_width) {
188  switch (key_width) {
189  case 4: {
190  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
191  fill_empty_key_32(key_ptr_i32, key_count);
192  break;
193  }
194  case 8: {
195  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
196  fill_empty_key_64(key_ptr_i64, key_count);
197  break;
198  }
199  default:
200  CHECK(false);
201  }
202 }
203 
204 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
205 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
207  const std::vector<std::string>& serialized_varlen_buffer,
208  const ReductionCode& reduction_code) const {
209  auto entry_count = query_mem_desc_.getEntryCount();
210  CHECK_GT(entry_count, size_t(0));
218  }
219  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
222  CHECK_GE(entry_count, that_entry_count);
223  break;
224  default:
225  CHECK_EQ(entry_count, that_entry_count);
226  }
227  auto this_buff = buff_;
228  CHECK(this_buff);
229  auto that_buff = that.buff_;
230  CHECK(that_buff);
233  if (!serialized_varlen_buffer.empty()) {
234  throw std::runtime_error(
235  "Projection of variable length targets with baseline hash group by is not yet "
236  "supported in Distributed mode");
237  }
238  if (use_multithreaded_reduction(that_entry_count)) {
239  const size_t thread_count = cpu_threads();
240  std::vector<std::future<void>> reduction_threads;
241  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
242  const auto thread_entry_count =
243  (that_entry_count + thread_count - 1) / thread_count;
244  const auto start_index = thread_idx * thread_entry_count;
245  const auto end_index =
246  std::min(start_index + thread_entry_count, that_entry_count);
247  reduction_threads.emplace_back(std::async(
249  [this,
250  this_buff,
251  that_buff,
252  start_index,
253  end_index,
254  that_entry_count,
255  &reduction_code,
256  &that] {
257  if (reduction_code.ir_reduce_loop) {
258  run_reduction_code(reduction_code,
259  this_buff,
260  that_buff,
261  start_index,
262  end_index,
263  that_entry_count,
265  &that.query_mem_desc_,
266  nullptr);
267  } else {
268  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
270  this_buff, that_buff, entry_idx, that_entry_count, that);
271  }
272  }
273  }));
274  }
275  for (auto& reduction_thread : reduction_threads) {
276  reduction_thread.wait();
277  }
278  for (auto& reduction_thread : reduction_threads) {
279  reduction_thread.get();
280  }
281  } else {
282  if (reduction_code.ir_reduce_loop) {
283  run_reduction_code(reduction_code,
284  this_buff,
285  that_buff,
286  0,
287  that_entry_count,
288  that_entry_count,
290  &that.query_mem_desc_,
291  nullptr);
292  } else {
293  for (size_t i = 0; i < that_entry_count; ++i) {
294  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
295  }
296  }
297  }
298  return;
299  }
300  auto executor = query_mem_desc_.getExecutor();
301  if (!executor) {
303  }
304  auto executor_id = executor->getExecutorId();
305  if (use_multithreaded_reduction(entry_count)) {
306  const size_t thread_count = cpu_threads();
307  std::vector<std::future<void>> reduction_threads;
308  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
309  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
310  const auto start_index = thread_idx * thread_entry_count;
311  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
312  if (query_mem_desc_.didOutputColumnar()) {
313  reduction_threads.emplace_back(std::async(std::launch::async,
314  [this,
315  this_buff,
316  that_buff,
317  start_index,
318  end_index,
319  &that,
320  &serialized_varlen_buffer,
321  &executor_id] {
322  reduceEntriesNoCollisionsColWise(
323  this_buff,
324  that_buff,
325  that,
326  start_index,
327  end_index,
328  serialized_varlen_buffer,
329  executor_id);
330  }));
331  } else {
332  reduction_threads.emplace_back(std::async(std::launch::async,
333  [this,
334  this_buff,
335  that_buff,
336  start_index,
337  end_index,
338  that_entry_count,
339  &reduction_code,
340  &that,
341  &serialized_varlen_buffer] {
342  CHECK(reduction_code.ir_reduce_loop);
344  reduction_code,
345  this_buff,
346  that_buff,
347  start_index,
348  end_index,
349  that_entry_count,
350  &query_mem_desc_,
351  &that.query_mem_desc_,
352  &serialized_varlen_buffer);
353  }));
354  }
355  }
356  for (auto& reduction_thread : reduction_threads) {
357  reduction_thread.wait();
358  }
359  for (auto& reduction_thread : reduction_threads) {
360  reduction_thread.get();
361  }
362  } else {
363  if (query_mem_desc_.didOutputColumnar()) {
364  reduceEntriesNoCollisionsColWise(this_buff,
365  that_buff,
366  that,
367  0,
368  query_mem_desc_.getEntryCount(),
369  serialized_varlen_buffer,
370  executor_id);
371  } else {
372  CHECK(reduction_code.ir_reduce_loop);
373  run_reduction_code(reduction_code,
374  this_buff,
375  that_buff,
376  0,
377  entry_count,
378  that_entry_count,
379  &query_mem_desc_,
380  &that.query_mem_desc_,
381  &serialized_varlen_buffer);
382  }
383  }
384 }
385 
386 namespace {
387 
389  if (UNLIKELY(dynamic_watchdog())) {
390  // TODO(alex): distinguish between the deadline and interrupt
391  throw std::runtime_error(
392  "Query execution has exceeded the time limit or was interrupted during result "
393  "set reduction");
394  }
395 }
396 
397 ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed) {
398  if (UNLIKELY((sample_seed & 0x3F) == 0 && dynamic_watchdog())) {
399  // TODO(alex): distinguish between the deadline and interrupt
400  throw std::runtime_error(
401  "Query execution has exceeded the time limit or was interrupted during result "
402  "set reduction");
403  }
404 }
405 
406 } // namespace
407 
409  int8_t* this_buff,
410  const int8_t* that_buff,
411  const ResultSetStorage& that,
412  const size_t start_index,
413  const size_t end_index,
414  const std::vector<std::string>& serialized_varlen_buffer,
415  const size_t executor_id) const {
416  // TODO(adb / saman): Support column wise output when serializing distributed agg
417  // functions
418  CHECK(serialized_varlen_buffer.empty());
419 
420  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
421 
422  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
423  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
424  auto executor = Executor::getExecutor(executor_id);
425  CHECK(executor);
426  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
427  const auto& agg_info = targets_[target_idx];
428  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
429 
430  bool two_slot_target{false};
431  if (agg_info.is_agg &&
432  (agg_info.agg_kind == kAVG ||
433  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
434  // Note that this assumes if one of the slot pairs in a given target is an array,
435  // all slot pairs are arrays. Currently this is true for all geo targets, but we
436  // should better codify and store this information in the future
437  two_slot_target = true;
438  }
440  executor->checkNonKernelTimeInterrupted())) {
441  throw std::runtime_error(
442  "Query execution was interrupted during result set reduction");
443  }
445  check_watchdog();
446  }
447  for (size_t target_slot_idx = slots_for_col.front();
448  target_slot_idx < slots_for_col.back() + 1;
449  target_slot_idx += 2) {
450  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
451  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
452  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
453  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
454  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
455  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
456  continue;
457  }
459  // copy the key from right hand side
460  copyKeyColWise(entry_idx, this_buff, that_buff);
461  }
462  auto this_ptr1 =
463  this_crt_col_ptr +
464  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
465  auto that_ptr1 =
466  that_crt_col_ptr +
467  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
468  int8_t* this_ptr2{nullptr};
469  const int8_t* that_ptr2{nullptr};
470  if (UNLIKELY(two_slot_target)) {
471  this_ptr2 =
472  this_next_col_ptr +
473  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
474  that_ptr2 =
475  that_next_col_ptr +
476  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
477  }
478  reduceOneSlot(this_ptr1,
479  this_ptr2,
480  that_ptr1,
481  that_ptr2,
482  agg_info,
483  target_idx,
484  target_slot_idx,
485  target_slot_idx,
486  that,
487  slots_for_col.front(),
488  serialized_varlen_buffer);
489  }
490 
491  this_crt_col_ptr = this_next_col_ptr;
492  that_crt_col_ptr = that_next_col_ptr;
493  if (UNLIKELY(two_slot_target)) {
494  this_crt_col_ptr = advance_to_next_columnar_target_buff(
495  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
496  that_crt_col_ptr = advance_to_next_columnar_target_buff(
497  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
498  }
499  }
500  }
501 }
502 
503 /*
504  * copy all keys from the columnar prepended group buffer of "that_buff" into
505  * "this_buff"
506  */
507 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
508  int8_t* this_buff,
509  const int8_t* that_buff) const {
511  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
512  group_idx++) {
513  // if the column corresponds to a group key
514  const auto column_offset_bytes =
516  auto lhs_key_ptr = this_buff + column_offset_bytes;
517  auto rhs_key_ptr = that_buff + column_offset_bytes;
518  switch (query_mem_desc_.groupColWidth(group_idx)) {
519  case 8:
520  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
521  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
522  break;
523  case 4:
524  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
525  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
526  break;
527  case 2:
528  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
529  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
530  break;
531  case 1:
532  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
533  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
534  break;
535  default:
536  CHECK(false);
537  break;
538  }
539  }
540 }
541 
542 // Rewrites the entries of this ResultSetStorage object to point directly into the
543 // serialized_varlen_buffer rather than using offsets.
545  const std::vector<std::string>& serialized_varlen_buffer) const {
546  if (serialized_varlen_buffer.empty()) {
547  return;
548  }
549 
551  auto entry_count = query_mem_desc_.getEntryCount();
552  CHECK_GT(entry_count, size_t(0));
553  CHECK(buff_);
554 
555  // Row-wise iteration, consider moving to separate function
556  for (size_t i = 0; i < entry_count; ++i) {
557  if (isEmptyEntry(i, buff_)) {
558  continue;
559  }
560  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
561  const auto key_bytes_with_padding = align_to_int64(key_bytes);
562  auto rowwise_targets_ptr =
563  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
564  size_t target_slot_idx = 0;
565  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
566  ++target_logical_idx) {
567  const auto& target_info = targets_[target_logical_idx];
568  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
569  CHECK(target_info.agg_kind == kSAMPLE);
570  auto ptr1 = rowwise_targets_ptr;
571  auto slot_idx = target_slot_idx;
572  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
573  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
574 
575  const auto& elem_ti = target_info.sql_type.get_elem_type();
576  size_t length_to_elems =
577  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
578  ? 1
579  : elem_ti.get_size();
580  if (target_info.sql_type.is_geometry()) {
581  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
582  if (j > 0) {
583  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
584  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
585  slot_idx += 2;
586  length_to_elems = 4;
587  }
588  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
589  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
590  const auto str_ptr =
591  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
592  CHECK(ptr1);
593  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
594  CHECK(ptr2);
595  *reinterpret_cast<int64_t*>(ptr2) =
596  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
597  }
598  } else {
599  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
600  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
601  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
602  CHECK(ptr1);
603  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
604  CHECK(ptr2);
605  *reinterpret_cast<int64_t*>(ptr2) =
606  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
607  }
608  }
609 
610  rowwise_targets_ptr = advance_target_ptr_row_wise(
611  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
612  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
613  }
614  }
615 
616  return;
617 }
618 
619 namespace {
620 
621 #ifdef _MSC_VER
622 #define mapd_cas(address, compare, val) \
623  InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
624  static_cast<long>(val), \
625  static_cast<long>(compare))
626 #else
627 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
628 #endif
629 
631  const uint32_t h,
632  const int64_t* key,
633  const uint32_t key_qw_count,
634  const size_t entry_count) {
635  auto off = h;
636  const auto old_key = mapd_cas(&groups_buffer[off], EMPTY_KEY_64, *key);
637  if (old_key == EMPTY_KEY_64) {
638  for (size_t i = 0; i < key_qw_count; ++i) {
639  groups_buffer[off] = key[i];
640  off += entry_count;
641  }
642  return {&groups_buffer[off], true};
643  }
644  off = h;
645  for (size_t i = 0; i < key_qw_count; ++i) {
646  if (groups_buffer[off] != key[i]) {
647  return {nullptr, true};
648  }
649  off += entry_count;
650  }
651  return {&groups_buffer[off], false};
652 }
653 
654 #undef mapd_cas
655 
656 // TODO(alex): fix synchronization when we enable it
658  int64_t* groups_buffer,
659  const uint32_t groups_buffer_entry_count,
660  const int64_t* key,
661  const uint32_t key_qw_count) {
662  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
664  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
665  if (matching_gvi.first) {
666  return matching_gvi;
667  }
668  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
669  while (h_probe != h) {
671  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
672  if (matching_gvi.first) {
673  return matching_gvi;
674  }
675  h_probe = (h_probe + 1) % groups_buffer_entry_count;
676  }
677  return {nullptr, true};
678 }
679 
680 #ifdef _MSC_VER
681 #define cas_cst(ptr, expected, desired) \
682  (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
683  reinterpret_cast<void*>(&desired), \
684  expected) == expected)
685 #define store_cst(ptr, val) \
686  InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
687  reinterpret_cast<void*>(val))
688 #define load_cst(ptr) \
689  InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
690 #else
691 #define cas_cst(ptr, expected, desired) \
692  __atomic_compare_exchange_n( \
693  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
694 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
695 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
696 #endif
697 
698 template <typename T = int64_t>
700  int64_t* groups_buffer,
701  const uint32_t h,
702  const T* key,
703  const uint32_t key_count,
705  const int64_t* that_buff_i64,
706  const size_t that_entry_idx,
707  const size_t that_entry_count,
708  const uint32_t row_size_quad) {
709  auto off = h * row_size_quad;
710  T empty_key = get_empty_key<T>();
711  T write_pending = get_empty_key<T>() - 1;
712  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
713  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
714  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
715  if (success) {
716  fill_slots(groups_buffer + off + slot_off_quad,
717  query_mem_desc.getEntryCount(),
718  that_buff_i64,
719  that_entry_idx,
720  that_entry_count,
722  if (key_count > 1) {
723  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
724  }
725  store_cst(row_ptr, *key);
726  return {groups_buffer + off + slot_off_quad, true};
727  }
728  while (load_cst(row_ptr) == write_pending) {
729  // spin until the winning thread has finished writing the entire key and the init
730  // value
731  }
732  for (size_t i = 0; i < key_count; ++i) {
733  if (load_cst(row_ptr + i) != key[i]) {
734  return {nullptr, true};
735  }
736  }
737  return {groups_buffer + off + slot_off_quad, false};
738 }
739 
740 #undef load_cst
741 #undef store_cst
742 #undef cas_cst
743 
745  int64_t* groups_buffer,
746  const uint32_t h,
747  const int64_t* key,
748  const uint32_t key_count,
749  const size_t key_width,
751  const int64_t* that_buff_i64,
752  const size_t that_entry_idx,
753  const size_t that_entry_count,
754  const uint32_t row_size_quad) {
755  switch (key_width) {
756  case 4:
757  return get_matching_group_value_reduction(groups_buffer,
758  h,
759  reinterpret_cast<const int32_t*>(key),
760  key_count,
761  query_mem_desc,
762  that_buff_i64,
763  that_entry_idx,
764  that_entry_count,
765  row_size_quad);
766  case 8:
767  return get_matching_group_value_reduction(groups_buffer,
768  h,
769  key,
770  key_count,
771  query_mem_desc,
772  that_buff_i64,
773  that_entry_idx,
774  that_entry_count,
775  row_size_quad);
776  default:
777  CHECK(false);
778  return {nullptr, true};
779  }
780 }
781 
782 } // namespace
783 
785  int64_t* groups_buffer,
786  const uint32_t groups_buffer_entry_count,
787  const int64_t* key,
788  const uint32_t key_count,
789  const size_t key_width,
791  const int64_t* that_buff_i64,
792  const size_t that_entry_idx,
793  const size_t that_entry_count,
794  const uint32_t row_size_quad) {
795  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
796  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
797  h,
798  key,
799  key_count,
800  key_width,
801  query_mem_desc,
802  that_buff_i64,
803  that_entry_idx,
804  that_entry_count,
805  row_size_quad);
806  if (matching_gvi.first) {
807  return matching_gvi;
808  }
809  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
810  while (h_probe != h) {
811  matching_gvi = get_matching_group_value_reduction(groups_buffer,
812  h_probe,
813  key,
814  key_count,
815  key_width,
816  query_mem_desc,
817  that_buff_i64,
818  that_entry_idx,
819  that_entry_count,
820  row_size_quad);
821  if (matching_gvi.first) {
822  return matching_gvi;
823  }
824  h_probe = (h_probe + 1) % groups_buffer_entry_count;
825  }
826  return {nullptr, true};
827 }
828 
829 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
830 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
832  const int8_t* that_buff,
833  const size_t that_entry_idx,
834  const size_t that_entry_count,
835  const ResultSetStorage& that) const {
837  check_watchdog_with_seed(that_entry_idx);
838  }
839  const auto key_count = query_mem_desc_.getGroupbyColCount();
844  const auto key_off =
846  if (isEmptyEntry(that_entry_idx, that_buff)) {
847  return;
848  }
849  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
850  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
851  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
852  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
853  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
854  CHECK(this_entry_slots);
855  if (empty_entry) {
856  fill_slots(this_entry_slots,
858  that_buff_i64,
859  that_entry_idx,
860  that_entry_count,
862  return;
863  }
865  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
866 }
867 
868 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
869  const int64_t* that_buff,
870  const size_t that_entry_idx,
871  const size_t that_entry_count,
872  const ResultSetStorage& that) const {
874  const auto key_count = query_mem_desc_.getGroupbyColCount();
875  size_t j = 0;
876  size_t init_agg_val_idx = 0;
877  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
878  ++target_logical_idx) {
879  const auto& target_info = targets_[target_logical_idx];
880  const auto that_slot_off = slot_offset_colwise(
881  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
882  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
883  reduceOneSlotBaseline(this_entry_slots,
884  this_slot_off,
885  that_buff,
886  that_entry_count,
887  that_slot_off,
888  target_info,
889  target_logical_idx,
890  j,
891  init_agg_val_idx,
892  that);
894  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
895  } else {
896  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
897  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
898  }
899  }
900  j = advance_slot(j, target_info, false);
901  }
902 }
903 
905  const size_t this_slot,
906  const int64_t* that_buff,
907  const size_t that_entry_count,
908  const size_t that_slot,
909  const TargetInfo& target_info,
910  const size_t target_logical_idx,
911  const size_t target_slot_idx,
912  const size_t init_agg_val_idx,
913  const ResultSetStorage& that) const {
915  int8_t* this_ptr2{nullptr};
916  const int8_t* that_ptr2{nullptr};
917  if (target_info.is_agg &&
918  (target_info.agg_kind == kAVG ||
919  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
920  const auto this_count_off = query_mem_desc_.getEntryCount();
921  const auto that_count_off = that_entry_count;
922  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
923  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
924  }
925  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
926  this_ptr2,
927  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
928  that_ptr2,
929  target_info,
930  target_logical_idx,
931  target_slot_idx,
932  init_agg_val_idx,
933  that,
934  target_slot_idx, // dummy, for now
935  {});
936 }
937 
938 // During the reduction of two result sets using the baseline strategy, we first create a
939 // big enough buffer to hold the entries for both and we move the entries from the first
940 // into it before doing the reduction as usual (into the first buffer).
941 template <class KeyType>
943  const size_t new_entry_count) const {
945  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
946  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
947  const auto key_count = query_mem_desc_.getGroupbyColCount();
950  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
951  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
952  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
953 
955  const size_t thread_count = cpu_threads();
956  std::vector<std::future<void>> move_threads;
957 
958  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
959  const auto thread_entry_count =
960  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
961  const auto start_index = thread_idx * thread_entry_count;
962  const auto end_index =
963  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
964  move_threads.emplace_back(std::async(
966  [this,
967  src_buff,
968  new_buff_i64,
969  new_entry_count,
970  start_index,
971  end_index,
972  key_count,
973  row_qw_count,
974  key_byte_width] {
975  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
976  moveOneEntryToBuffer<KeyType>(entry_idx,
977  new_buff_i64,
978  new_entry_count,
979  key_count,
980  row_qw_count,
981  src_buff,
982  key_byte_width);
983  }
984  }));
985  }
986  for (auto& move_thread : move_threads) {
987  move_thread.wait();
988  }
989  for (auto& move_thread : move_threads) {
990  move_thread.get();
991  }
992  } else {
993  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
994  moveOneEntryToBuffer<KeyType>(entry_idx,
995  new_buff_i64,
996  new_entry_count,
997  key_count,
998  row_qw_count,
999  src_buff,
1000  key_byte_width);
1001  }
1002  }
1003 }
1004 
1005 template <class KeyType>
1006 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
1007  int64_t* new_buff_i64,
1008  const size_t new_entry_count,
1009  const size_t key_count,
1010  const size_t row_qw_count,
1011  const int64_t* src_buff,
1012  const size_t key_byte_width) const {
1013  const auto key_off =
1015  ? key_offset_colwise(entry_index, 0, query_mem_desc_.getEntryCount())
1016  : row_qw_count * entry_index;
1017  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
1018  if (*key_ptr == get_empty_key<KeyType>()) {
1019  return;
1020  }
1021  int64_t* new_entries_ptr{nullptr};
1023  const auto key =
1024  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
1025  new_entries_ptr =
1026  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
1027  } else {
1028  new_entries_ptr = get_group_value(new_buff_i64,
1029  new_entry_count,
1030  &src_buff[key_off],
1031  key_count,
1032  key_byte_width,
1033  row_qw_count);
1034  }
1035  CHECK(new_entries_ptr);
1036  fill_slots(new_entries_ptr,
1037  new_entry_count,
1038  src_buff,
1039  entry_index,
1041  query_mem_desc_);
1042 }
1043 
1045  if (query_mem_desc_.didOutputColumnar()) {
1046  storage_->initializeColWise();
1047  } else {
1048  storage_->initializeRowWise();
1049  }
1050 }
1051 
1052 // Driver for reductions. Needed because the result of a reduction on the baseline
1053 // layout, which can have collisions, cannot be done in place and something needs
1054 // to take the ownership of the new result set with the bigger underlying buffer.
1055 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets) {
1056  CHECK(!result_sets.empty());
1057  auto result_rs = result_sets.front();
1058  CHECK(result_rs->storage_);
1059  auto& first_result = *result_rs->storage_;
1060  auto result = &first_result;
1061  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1062  for (const auto result_set : result_sets) {
1063  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1064  }
1065  const auto catalog = result_rs->catalog_;
1066  for (const auto result_set : result_sets) {
1067  CHECK_EQ(catalog, result_set->catalog_);
1068  }
1069  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1071  const auto total_entry_count =
1072  std::accumulate(result_sets.begin(),
1073  result_sets.end(),
1074  size_t(0),
1075  [](const size_t init, const ResultSet* rs) {
1076  return init + rs->query_mem_desc_.getEntryCount();
1077  });
1078  CHECK(total_entry_count);
1079  auto query_mem_desc = first_result.query_mem_desc_;
1080  query_mem_desc.setEntryCount(total_entry_count);
1081  rs_.reset(new ResultSet(first_result.targets_,
1084  row_set_mem_owner,
1085  catalog,
1086  0,
1087  0));
1088  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1089  rs_->initializeStorage();
1090  switch (query_mem_desc.getEffectiveKeyWidth()) {
1091  case 4:
1092  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1093  query_mem_desc.getEntryCount());
1094  break;
1095  case 8:
1096  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1097  query_mem_desc.getEntryCount());
1098  break;
1099  default:
1100  CHECK(false);
1101  }
1102  result = rs_->storage_.get();
1103  result_rs = rs_.get();
1104  }
1105 
1106  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1107  if (!serialized_varlen_buffer.empty()) {
1108  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1109  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1110  ++result_it) {
1111  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1112  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1113  serialized_varlen_buffer.emplace_back(
1114  std::move(result_serialized_varlen_buffer.front()));
1115  }
1116  }
1117 
1118  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1119  result_rs->getTargetInfos(),
1120  result_rs->getTargetInitVals());
1121  auto reduction_code = reduction_jit.codegen();
1122  size_t ctr = 1;
1123  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1124  ++result_it) {
1125  if (!serialized_varlen_buffer.empty()) {
1126  result->reduce(
1127  *((*result_it)->storage_), serialized_varlen_buffer[ctr++], reduction_code);
1128  } else {
1129  result->reduce(*((*result_it)->storage_), {}, reduction_code);
1130  }
1131  }
1132  return result_rs;
1133 }
1134 
1135 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1136  return rs_;
1137 }
1138 
1139 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1140  auto& result_storage = result_rs->storage_;
1141  result_storage->rewriteAggregateBufferOffsets(
1142  result_rs->serialized_varlen_buffer_.front());
1143 }
1144 
1145 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1146  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1147  const auto key_count = query_mem_desc_.getGroupbyColCount();
1148  CHECK_EQ(slot_count + key_count, entry.size());
1149  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1151  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1152  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1153  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1154  for (size_t i = 0; i < key_count; ++i) {
1155  this_buff[key_off + i] = entry[i];
1156  }
1157  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1158  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1159  this_buff[first_slot_off + i] = entry[key_count + i];
1160  }
1161 }
1162 
1164  const auto key_count = query_mem_desc_.getGroupbyColCount();
1165  const auto row_size = get_row_bytes(query_mem_desc_);
1166  CHECK_EQ(row_size % 8, 0u);
1167  const auto key_bytes_with_padding =
1171  case 4: {
1172  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1173  auto row_ptr = buff_ + i * row_size;
1174  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1175  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1176  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1177  slot_ptr[j] = target_init_vals_[j];
1178  }
1179  }
1180  break;
1181  }
1182  case 8: {
1183  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1184  auto row_ptr = buff_ + i * row_size;
1185  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1186  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1187  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1188  slot_ptr[j] = target_init_vals_[j];
1189  }
1190  }
1191  break;
1192  }
1193  default:
1194  CHECK(false);
1195  }
1196 }
1197 
1198 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1200  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1201  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1202  const auto key_count = query_mem_desc_.getGroupbyColCount();
1203  CHECK_EQ(slot_count + key_count, entry.size());
1204  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1205 
1206  for (size_t i = 0; i < key_count; i++) {
1207  const auto key_offset = key_offset_colwise(0, i, 1);
1208  this_buff[key_offset] = entry[i];
1209  }
1210 
1211  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1212  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1213  this_buff[slot_offset] = entry[key_count + i];
1214  }
1215 }
1216 
1218  const auto key_count = query_mem_desc_.getGroupbyColCount();
1219  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1221  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1222  const auto first_key_off =
1224  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1225  this_buff[first_key_off + i] = EMPTY_KEY_64;
1226  }
1227  }
1228  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1229  const auto first_val_off =
1230  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1231  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1232  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1233  }
1234  }
1235 }
1236 
1237 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1238  CHECK(entry_slots);
1240  size_t slot_off = 0;
1241  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1242  entry_slots[slot_off] = target_init_vals_[j];
1243  slot_off += query_mem_desc_.getEntryCount();
1244  }
1245  } else {
1246  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1247  entry_slots[j] = target_init_vals_[j];
1248  }
1249  }
1250 }
1251 
1252 #define AGGREGATE_ONE_VALUE( \
1253  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1254  do { \
1255  const auto sql_type = get_compact_type(agg_info__); \
1256  if (sql_type.is_fp()) { \
1257  if (chosen_bytes__ == sizeof(float)) { \
1258  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1259  *reinterpret_cast<const float*>(other_ptr__)); \
1260  } else { \
1261  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1262  *reinterpret_cast<const double*>(other_ptr__)); \
1263  } \
1264  } else { \
1265  if (chosen_bytes__ == sizeof(int32_t)) { \
1266  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1267  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1268  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1269  } else { \
1270  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1271  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1272  agg_##agg_kind__(val_ptr, *other_ptr); \
1273  } \
1274  } \
1275  } while (0)
1276 
1277 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1278  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1279  do { \
1280  if (agg_info__.skip_null_val) { \
1281  const auto sql_type = get_compact_type(agg_info__); \
1282  if (sql_type.is_fp()) { \
1283  if (chosen_bytes__ == sizeof(float)) { \
1284  agg_##agg_kind__##_float_skip_val( \
1285  reinterpret_cast<int32_t*>(val_ptr__), \
1286  *reinterpret_cast<const float*>(other_ptr__), \
1287  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1288  } else { \
1289  agg_##agg_kind__##_double_skip_val( \
1290  reinterpret_cast<int64_t*>(val_ptr__), \
1291  *reinterpret_cast<const double*>(other_ptr__), \
1292  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1293  } \
1294  } else { \
1295  if (chosen_bytes__ == sizeof(int32_t)) { \
1296  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1297  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1298  const auto null_val = static_cast<int32_t>(init_val__); \
1299  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1300  } else { \
1301  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1302  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1303  const auto null_val = static_cast<int64_t>(init_val__); \
1304  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1305  } \
1306  } \
1307  } else { \
1308  AGGREGATE_ONE_VALUE( \
1309  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1310  } \
1311  } while (0)
1312 
1313 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1314  do { \
1315  if (chosen_bytes__ == sizeof(int32_t)) { \
1316  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1317  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1318  agg_sum_int32(val_ptr, *other_ptr); \
1319  } else { \
1320  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1321  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1322  agg_sum(val_ptr, *other_ptr); \
1323  } \
1324  } while (0)
1325 
1326 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1327  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1328  { \
1329  if (agg_info__.skip_null_val) { \
1330  const auto sql_type = get_compact_type(agg_info__); \
1331  if (sql_type.is_fp()) { \
1332  if (chosen_bytes__ == sizeof(float)) { \
1333  agg_sum_float_skip_val( \
1334  reinterpret_cast<int32_t*>(val_ptr__), \
1335  *reinterpret_cast<const float*>(other_ptr__), \
1336  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1337  } else { \
1338  agg_sum_double_skip_val( \
1339  reinterpret_cast<int64_t*>(val_ptr__), \
1340  *reinterpret_cast<const double*>(other_ptr__), \
1341  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1342  } \
1343  } else { \
1344  if (chosen_bytes__ == sizeof(int32_t)) { \
1345  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1346  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1347  const auto null_val = static_cast<int32_t>(init_val__); \
1348  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1349  } else { \
1350  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1351  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1352  const auto null_val = static_cast<int64_t>(init_val__); \
1353  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1354  } \
1355  } \
1356  } else { \
1357  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1358  } \
1359  }
1360 
1361 // to be used for 8/16-bit kMIN and kMAX only
1362 #define AGGREGATE_ONE_VALUE_SMALL( \
1363  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1364  do { \
1365  if (chosen_bytes__ == sizeof(int16_t)) { \
1366  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1367  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1368  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1369  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1370  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1371  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1372  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1373  } else { \
1374  UNREACHABLE(); \
1375  } \
1376  } while (0)
1377 
1378 // to be used for 8/16-bit kMIN and kMAX only
1379 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1380  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1381  do { \
1382  if (agg_info__.skip_null_val) { \
1383  if (chosen_bytes__ == sizeof(int16_t)) { \
1384  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1385  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1386  const auto null_val = static_cast<int16_t>(init_val__); \
1387  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1388  } else if (chosen_bytes == sizeof(int8_t)) { \
1389  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1390  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1391  const auto null_val = static_cast<int8_t>(init_val__); \
1392  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1393  } \
1394  } else { \
1395  AGGREGATE_ONE_VALUE_SMALL( \
1396  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1397  } \
1398  } while (0)
1399 
1400 int8_t result_set::get_width_for_slot(const size_t target_slot_idx,
1401  const bool float_argument_input,
1403  if (float_argument_input) {
1404  return sizeof(float);
1405  }
1406  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1407 }
1408 
1410  const TargetInfo& target_info,
1411  const size_t target_slot_idx,
1412  const size_t init_agg_val_idx,
1413  const int8_t* that_ptr1) const {
1414  const bool float_argument_input = takes_float_argument(target_info);
1415  const auto chosen_bytes = result_set::get_width_for_slot(
1416  target_slot_idx, float_argument_input, query_mem_desc_);
1417  auto init_val = target_init_vals_[init_agg_val_idx];
1418 
1419  auto reduce = [&](auto const& size_tag) {
1420  using CastTarget = std::decay_t<decltype(size_tag)>;
1421  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1422  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1423  if (rhs_proj_col == init_val) {
1424  // ignore
1425  } else if (lhs_proj_col == init_val) {
1426  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1427  } else if (lhs_proj_col != rhs_proj_col) {
1428  throw std::runtime_error("Multiple distinct values encountered");
1429  }
1430  };
1431 
1432  switch (chosen_bytes) {
1433  case 1: {
1435  reduce(int8_t());
1436  break;
1437  }
1438  case 2: {
1440  reduce(int16_t());
1441  break;
1442  }
1443  case 4: {
1444  reduce(int32_t());
1445  break;
1446  }
1447  case 8: {
1448  CHECK(!target_info.sql_type.is_varlen());
1449  reduce(int64_t());
1450  break;
1451  }
1452  default:
1453  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1454  }
1455 }
1456 
1458  int8_t* this_ptr1,
1459  int8_t* this_ptr2,
1460  const int8_t* that_ptr1,
1461  const int8_t* that_ptr2,
1462  const TargetInfo& target_info,
1463  const size_t target_logical_idx,
1464  const size_t target_slot_idx,
1465  const size_t init_agg_val_idx,
1466  const ResultSetStorage& that,
1467  const size_t first_slot_idx_for_target,
1468  const std::vector<std::string>& serialized_varlen_buffer) const {
1470  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1471  return;
1472  }
1473  }
1474  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1475  const bool float_argument_input = takes_float_argument(target_info);
1476  const auto chosen_bytes = result_set::get_width_for_slot(
1477  target_slot_idx, float_argument_input, query_mem_desc_);
1478  int64_t init_val = target_init_vals_[init_agg_val_idx]; // skip_val for nullable types
1479 
1480  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1482  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1483  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1484  switch (target_info.agg_kind) {
1485  case kCOUNT:
1486  case kAPPROX_COUNT_DISTINCT: {
1487  if (is_distinct_target(target_info)) {
1488  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1489  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1490  break;
1491  }
1492  CHECK_EQ(int64_t(0), init_val);
1493  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1494  break;
1495  }
1496  case kAVG: {
1497  // Ignore float argument compaction for count component for fear of its overflow
1498  AGGREGATE_ONE_COUNT(this_ptr2,
1499  that_ptr2,
1500  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1501  }
1502  // fall thru
1503  case kSUM: {
1505  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1506  break;
1507  }
1508  case kMIN: {
1509  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1511  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1512  } else {
1514  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1515  }
1516  break;
1517  }
1518  case kMAX: {
1519  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1521  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1522  } else {
1524  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1525  }
1526  break;
1527  }
1528  case kAPPROX_QUANTILE:
1529  CHECK_EQ(static_cast<int8_t>(sizeof(int64_t)), chosen_bytes);
1530  reduceOneApproxQuantileSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1531  break;
1532  default:
1533  UNREACHABLE() << toString(target_info.agg_kind);
1534  }
1535  } else {
1536  switch (chosen_bytes) {
1537  case 1: {
1539  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1540  if (rhs_proj_col != init_val) {
1541  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1542  }
1543  break;
1544  }
1545  case 2: {
1547  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1548  if (rhs_proj_col != init_val) {
1549  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1550  }
1551  break;
1552  }
1553  case 4: {
1554  CHECK(target_info.agg_kind != kSAMPLE ||
1556  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1557  if (rhs_proj_col != init_val) {
1558  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1559  }
1560  break;
1561  }
1562  case 8: {
1563  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1564  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1565  !serialized_varlen_buffer.empty()) {
1566  size_t length_to_elems{0};
1567  if (target_info.sql_type.is_geometry()) {
1568  // TODO: Assumes hard-coded sizes for geometry targets
1569  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1570  } else {
1571  const auto& elem_ti = target_info.sql_type.get_elem_type();
1572  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1573  }
1574 
1575  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1576  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1577  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1578  *reinterpret_cast<int64_t*>(this_ptr1) =
1579  reinterpret_cast<const int64_t>(str_ptr);
1580  *reinterpret_cast<int64_t*>(this_ptr2) =
1581  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1582  } else {
1583  if (rhs_proj_col != init_val) {
1584  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1585  }
1586  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1587  CHECK(this_ptr2 && that_ptr2);
1588  *reinterpret_cast<int64_t*>(this_ptr2) =
1589  *reinterpret_cast<const int64_t*>(that_ptr2);
1590  }
1591  }
1592 
1593  break;
1594  }
1595  default:
1596  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1597  }
1598  }
1599 }
1600 
1602  const int8_t* that_ptr1,
1603  const size_t target_logical_idx,
1604  const ResultSetStorage& that) const {
1606  static_assert(sizeof(int64_t) == sizeof(quantile::TDigest*));
1607  auto* incoming = *reinterpret_cast<quantile::TDigest* const*>(that_ptr1);
1608  CHECK(incoming) << "this_ptr1=" << (void*)this_ptr1
1609  << ", that_ptr1=" << (void const*)that_ptr1
1610  << ", target_logical_idx=" << target_logical_idx;
1611  if (incoming->centroids().capacity()) {
1612  auto* accumulator = *reinterpret_cast<quantile::TDigest**>(this_ptr1);
1613  CHECK(accumulator) << "this_ptr1=" << (void*)this_ptr1
1614  << ", that_ptr1=" << (void const*)that_ptr1
1615  << ", target_logical_idx=" << target_logical_idx;
1616  accumulator->allocate();
1617  accumulator->mergeTDigest(*incoming);
1618  }
1619 }
1620 
1622  const int8_t* that_ptr1,
1623  const size_t target_logical_idx,
1624  const ResultSetStorage& that) const {
1626  const auto& old_count_distinct_desc =
1627  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1628  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1629  const auto& new_count_distinct_desc =
1630  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1631  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1632  CHECK(this_ptr1 && that_ptr1);
1633  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1634  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1636  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1637 }
1638 
1639 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1640  const int8_t warp_count,
1641  const bool is_columnar,
1642  const bool replace_bitmap_ptr_with_bitmap_sz,
1643  std::vector<int64_t>& agg_vals,
1645  const std::vector<TargetInfo>& targets,
1646  const std::vector<int64_t>& agg_init_vals) {
1647  const size_t agg_col_count{agg_vals.size()};
1648  const auto row_size = query_mem_desc.getRowSize();
1649  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1650  CHECK_GE(agg_col_count, targets.size());
1651  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1652  CHECK(query_mem_desc.hasKeylessHash());
1653  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1654  bool discard_row = true;
1655  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1656  bool discard_partial_result = true;
1657  for (size_t target_idx = 0, agg_col_idx = 0;
1658  target_idx < targets.size() && agg_col_idx < agg_col_count;
1659  ++target_idx, ++agg_col_idx) {
1660  const auto& agg_info = targets[target_idx];
1661  const bool float_argument_input = takes_float_argument(agg_info);
1662  const auto chosen_bytes = float_argument_input
1663  ? sizeof(float)
1664  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1665  auto partial_bin_val = get_component(
1666  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1667  partial_agg_vals[agg_col_idx] = partial_bin_val;
1668  if (is_distinct_target(agg_info)) {
1669  CHECK_EQ(int8_t(1), warp_count);
1670  CHECK(agg_info.is_agg && (agg_info.agg_kind == kCOUNT ||
1671  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1672  partial_bin_val = count_distinct_set_size(
1673  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1674  if (replace_bitmap_ptr_with_bitmap_sz) {
1675  partial_agg_vals[agg_col_idx] = partial_bin_val;
1676  }
1677  }
1678  if (kAVG == agg_info.agg_kind) {
1679  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1680  ++agg_col_idx;
1681  partial_bin_val = partial_agg_vals[agg_col_idx] =
1682  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1683  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1684  }
1685  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1686  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1687  CHECK(agg_info.is_agg);
1688  discard_partial_result = false;
1689  }
1690  }
1691  row_ptr += row_size;
1692  if (discard_partial_result) {
1693  continue;
1694  }
1695  discard_row = false;
1696  for (size_t target_idx = 0, agg_col_idx = 0;
1697  target_idx < targets.size() && agg_col_idx < agg_col_count;
1698  ++target_idx, ++agg_col_idx) {
1699  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1700  const auto& agg_info = targets[target_idx];
1701  const bool float_argument_input = takes_float_argument(agg_info);
1702  const auto chosen_bytes = float_argument_input
1703  ? sizeof(float)
1704  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1705  const auto& chosen_type = get_compact_type(agg_info);
1706  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1707  try {
1708  switch (agg_info.agg_kind) {
1709  case kCOUNT:
1712  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1713  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1714  agg_init_vals[agg_col_idx],
1715  chosen_bytes,
1716  agg_info);
1717  break;
1718  case kAVG:
1719  // Ignore float argument compaction for count component for fear of its
1720  // overflow
1722  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1723  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1724  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1725  // fall thru
1726  case kSUM:
1728  sum,
1729  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1730  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1731  agg_init_vals[agg_col_idx],
1732  chosen_bytes,
1733  agg_info);
1734  break;
1735  case kMIN:
1736  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1738  min,
1739  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1740  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1741  agg_init_vals[agg_col_idx],
1742  chosen_bytes,
1743  agg_info);
1744  } else {
1746  min,
1747  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1748  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1749  agg_init_vals[agg_col_idx],
1750  chosen_bytes,
1751  agg_info);
1752  }
1753  break;
1754  case kMAX:
1755  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1757  max,
1758  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1759  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1760  agg_init_vals[agg_col_idx],
1761  chosen_bytes,
1762  agg_info);
1763  } else {
1765  max,
1766  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1767  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1768  agg_init_vals[agg_col_idx],
1769  chosen_bytes,
1770  agg_info);
1771  }
1772  break;
1773  default:
1774  CHECK(false);
1775  break;
1776  }
1777  } catch (std::runtime_error& e) {
1778  // TODO(miyu): handle the case where chosen_bytes < 8
1779  LOG(ERROR) << e.what();
1780  }
1781  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1782  switch (chosen_bytes) {
1783  case 8:
1784  break;
1785  case 4: {
1786  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1787  if (!(agg_info.agg_kind == kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1788  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1789  }
1790  break;
1791  }
1792  default:
1793  CHECK(false);
1794  }
1795  }
1796  if (kAVG == agg_info.agg_kind) {
1797  ++agg_col_idx;
1798  }
1799  } else {
1800  if (agg_info.agg_kind == kSAMPLE) {
1801  CHECK(!agg_info.sql_type.is_varlen())
1802  << "Interleaved bins reduction not supported for variable length "
1803  "arguments "
1804  "to SAMPLE";
1805  }
1806  if (agg_vals[agg_col_idx]) {
1807  if (agg_info.agg_kind == kSAMPLE) {
1808  continue;
1809  }
1810  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1811  } else {
1812  agg_vals[agg_col_idx] = partial_bin_val;
1813  }
1814  }
1815  }
1816  }
1817  return discard_row;
1818 }
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:219
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
void run_reduction_code(const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
#define EMPTY_KEY_64
std::string toString(const ExtArgumentType &sig_type)
static const int32_t ERR_INTERRUPTED
Definition: Execute.h:1165
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
#define const
SQLTypeInfo sql_type
Definition: TargetInfo.h:51
#define LOG(tag)
Definition: Logger.h:205
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:838
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed)
bool is_varlen() const
Definition: sqltypes.h:545
void initializeStorage() const
#define UNREACHABLE()
Definition: Logger.h:255
#define CHECK_GE(x, y)
Definition: Logger.h:224
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void reduceOneApproxQuantileSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:77
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:157
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:120
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
#define CHECK_GT(x, y)
Definition: Logger.h:223
void rewriteVarlenAggregates(ResultSet *)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code) const
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:73
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:177
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
static EvalValue run(const Function *function, const std::vector< EvalValue > &inputs)
future< Result > async(Fn &&fn, Args &&...args)
bool is_agg
Definition: TargetInfo.h:49
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:305
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define store_cst(ptr, val)
size_t targetGroupbyIndicesSize() const
std::pair< int64_t *, bool > GroupValueInfo
ResultSet * reduce(std::vector< ResultSet * > &)
Definition: sqldefs.h:75
DEVICE auto accumulate(ARGS &&...args)
Definition: gpu_enabled.h:42
#define LIKELY(x)
Definition: likely.h:24
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:153
static EvalValue MakeEvalValue(const T &val)
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1170
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad)
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
#define mapd_cas(address, compare, val)
SQLAgg agg_kind
Definition: TargetInfo.h:50
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:25
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer, const size_t executor_id) const
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:221
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:76
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
RUNTIME_EXPORT ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
const ColSlotContext & getColSlotContext() const
#define CHECK(condition)
Definition: Logger.h:211
bool is_geometry() const
Definition: sqltypes.h:531
#define EMPTY_KEY_32
static std::mutex compilation_mutex_
Definition: Execute.h:1174
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
bool is_string() const
Definition: sqltypes.h:519
Definition: sqldefs.h:74
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:24
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:861
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:72
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
QueryMemoryDescriptor query_mem_desc_
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:376
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const