OmniSciDB  17c254d2f8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "DynamicWatchdog.h"
26 #include "Execute.h"
27 #include "ResultSet.h"
29 #include "ResultSetReductionJIT.h"
30 #include "RuntimeFunctions.h"
31 #include "Shared/SqlTypesLayout.h"
32 
33 #include "Shared/likely.h"
34 #include "Shared/thread_count.h"
35 
36 #include <llvm/ExecutionEngine/GenericValue.h>
37 
38 #include <algorithm>
39 #include <future>
40 #include <numeric>
41 
42 extern bool g_enable_dynamic_watchdog;
43 
44 namespace {
45 
46 bool use_multithreaded_reduction(const size_t entry_count) {
47  return entry_count > 100000;
48 }
49 
51  const auto row_bytes = get_row_bytes(query_mem_desc);
52  CHECK_EQ(size_t(0), row_bytes % 8);
53  return row_bytes / 8;
54 }
55 
56 std::vector<int64_t> make_key(const int64_t* buff,
57  const size_t entry_count,
58  const size_t key_count) {
59  std::vector<int64_t> key;
60  size_t off = 0;
61  for (size_t i = 0; i < key_count; ++i) {
62  key.push_back(buff[off]);
63  off += entry_count;
64  }
65  return key;
66 }
67 
68 void fill_slots(int64_t* dst_entry,
69  const size_t dst_entry_count,
70  const int64_t* src_buff,
71  const size_t src_entry_idx,
72  const size_t src_entry_count,
74  const auto slot_count = query_mem_desc.getBufferColSlotCount();
75  const auto key_count = query_mem_desc.getGroupbyColCount();
76  if (query_mem_desc.didOutputColumnar()) {
77  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
78  ++i, dst_slot_off += dst_entry_count) {
79  dst_entry[dst_slot_off] =
80  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
81  }
82  } else {
83  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
84  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
85  for (size_t i = 0; i < slot_count; ++i) {
86  dst_entry[i] = row_ptr[slot_off_quad + i];
87  }
88  }
89 }
90 
92 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
93  for (size_t i = 0; i < key_count; ++i) {
94  key_ptr_i32[i] = EMPTY_KEY_32;
95  }
96 }
97 
99 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
100  for (size_t i = 0; i < key_count; ++i) {
101  key_ptr_i64[i] = EMPTY_KEY_64;
102  }
103 }
104 
105 inline int64_t get_component(const int8_t* group_by_buffer,
106  const size_t comp_sz,
107  const size_t index = 0) {
108  int64_t ret = std::numeric_limits<int64_t>::min();
109  switch (comp_sz) {
110  case 1: {
111  ret = group_by_buffer[index];
112  break;
113  }
114  case 2: {
115  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
116  ret = buffer_ptr[index];
117  break;
118  }
119  case 4: {
120  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
121  ret = buffer_ptr[index];
122  break;
123  }
124  case 8: {
125  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
126  ret = buffer_ptr[index];
127  break;
128  }
129  default:
130  CHECK(false);
131  }
132  return ret;
133 }
134 
135 void run_reduction_code(const ReductionCode& reduction_code,
136  int8_t* this_buff,
137  const int8_t* that_buff,
138  const int32_t start_entry_index,
139  const int32_t end_entry_index,
140  const int32_t that_entry_count,
141  const void* this_qmd,
142  const void* that_qmd,
143  const void* serialized_varlen_buffer) {
144  int err = 0;
145  if (reduction_code.func_ptr) {
146  err = reduction_code.func_ptr(this_buff,
147  that_buff,
148  start_entry_index,
149  end_entry_index,
150  that_entry_count,
151  this_qmd,
152  that_qmd,
153  serialized_varlen_buffer);
154  } else {
155  auto ret = ReductionInterpreter::run(
156  reduction_code.ir_reduce_loop.get(),
157  {ReductionInterpreter::EvalValue{.ptr = this_buff},
158  ReductionInterpreter::EvalValue{.ptr = that_buff},
159  ReductionInterpreter::EvalValue{.int_val = start_entry_index},
160  ReductionInterpreter::EvalValue{.int_val = end_entry_index},
161  ReductionInterpreter::EvalValue{.int_val = that_entry_count},
164  ReductionInterpreter::EvalValue{.ptr = serialized_varlen_buffer}});
165  err = ret.int_val;
166  }
167  if (err) {
169  throw std::runtime_error("Multiple distinct values encountered");
170  }
171 
172  throw std::runtime_error(
173  "Query execution has exceeded the time limit or was interrupted during result "
174  "set reduction");
175  }
176 }
177 
178 } // namespace
179 
180 void fill_empty_key(void* key_ptr, const size_t key_count, const size_t key_width) {
181  switch (key_width) {
182  case 4: {
183  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
184  fill_empty_key_32(key_ptr_i32, key_count);
185  break;
186  }
187  case 8: {
188  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
189  fill_empty_key_64(key_ptr_i64, key_count);
190  break;
191  }
192  default:
193  CHECK(false);
194  }
195 }
196 
197 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
198 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
200  const std::vector<std::string>& serialized_varlen_buffer,
201  const ReductionCode& reduction_code) const {
202  auto entry_count = query_mem_desc_.getEntryCount();
203  CHECK_GT(entry_count, size_t(0));
211  }
212  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
215  CHECK_GE(entry_count, that_entry_count);
216  break;
217  default:
218  CHECK_EQ(entry_count, that_entry_count);
219  }
220  auto this_buff = buff_;
221  CHECK(this_buff);
222  auto that_buff = that.buff_;
223  CHECK(that_buff);
226  if (!serialized_varlen_buffer.empty()) {
227  throw std::runtime_error(
228  "Projection of variable length targets with baseline hash group by is not yet "
229  "supported in Distributed mode");
230  }
231  if (use_multithreaded_reduction(that_entry_count)) {
232  const size_t thread_count = cpu_threads();
233  std::vector<std::future<void>> reduction_threads;
234  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
235  const auto thread_entry_count =
236  (that_entry_count + thread_count - 1) / thread_count;
237  const auto start_index = thread_idx * thread_entry_count;
238  const auto end_index =
239  std::min(start_index + thread_entry_count, that_entry_count);
240  reduction_threads.emplace_back(std::async(
241  std::launch::async,
242  [this,
243  this_buff,
244  that_buff,
245  start_index,
246  end_index,
247  that_entry_count,
248  &reduction_code,
249  &that] {
250  if (reduction_code.ir_reduce_loop) {
251  run_reduction_code(reduction_code,
252  this_buff,
253  that_buff,
254  start_index,
255  end_index,
256  that_entry_count,
258  &that.query_mem_desc_,
259  nullptr);
260  } else {
261  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
263  this_buff, that_buff, entry_idx, that_entry_count, that);
264  }
265  }
266  }));
267  }
268  for (auto& reduction_thread : reduction_threads) {
269  reduction_thread.wait();
270  }
271  for (auto& reduction_thread : reduction_threads) {
272  reduction_thread.get();
273  }
274  } else {
275  if (reduction_code.ir_reduce_loop) {
276  run_reduction_code(reduction_code,
277  this_buff,
278  that_buff,
279  0,
280  that_entry_count,
281  that_entry_count,
283  &that.query_mem_desc_,
284  nullptr);
285  } else {
286  for (size_t i = 0; i < that_entry_count; ++i) {
287  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
288  }
289  }
290  }
291  return;
292  }
293  if (use_multithreaded_reduction(entry_count)) {
294  const size_t thread_count = cpu_threads();
295  std::vector<std::future<void>> reduction_threads;
296  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
297  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
298  const auto start_index = thread_idx * thread_entry_count;
299  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
300  if (query_mem_desc_.didOutputColumnar()) {
301  reduction_threads.emplace_back(std::async(std::launch::async,
302  [this,
303  this_buff,
304  that_buff,
305  start_index,
306  end_index,
307  &that,
308  &serialized_varlen_buffer] {
309  reduceEntriesNoCollisionsColWise(
310  this_buff,
311  that_buff,
312  that,
313  start_index,
314  end_index,
315  serialized_varlen_buffer);
316  }));
317  } else {
318  reduction_threads.emplace_back(std::async(std::launch::async,
319  [this,
320  this_buff,
321  that_buff,
322  start_index,
323  end_index,
324  that_entry_count,
325  &reduction_code,
326  &that,
327  &serialized_varlen_buffer] {
328  CHECK(reduction_code.ir_reduce_loop);
330  reduction_code,
331  this_buff,
332  that_buff,
333  start_index,
334  end_index,
335  that_entry_count,
336  &query_mem_desc_,
337  &that.query_mem_desc_,
338  &serialized_varlen_buffer);
339  }));
340  }
341  }
342  for (auto& reduction_thread : reduction_threads) {
343  reduction_thread.wait();
344  }
345  for (auto& reduction_thread : reduction_threads) {
346  reduction_thread.get();
347  }
348  } else {
349  if (query_mem_desc_.didOutputColumnar()) {
350  reduceEntriesNoCollisionsColWise(this_buff,
351  that_buff,
352  that,
353  0,
354  query_mem_desc_.getEntryCount(),
355  serialized_varlen_buffer);
356  } else {
357  CHECK(reduction_code.ir_reduce_loop);
358  run_reduction_code(reduction_code,
359  this_buff,
360  that_buff,
361  0,
362  entry_count,
363  that_entry_count,
364  &query_mem_desc_,
365  &that.query_mem_desc_,
366  &serialized_varlen_buffer);
367  }
368  }
369 }
370 
371 namespace {
372 
373 ALWAYS_INLINE void check_watchdog(const size_t sample_seed) {
374  if (UNLIKELY(g_enable_dynamic_watchdog && (sample_seed & 0x3F) == 0 &&
375  dynamic_watchdog())) {
376  // TODO(alex): distinguish between the deadline and interrupt
377  throw std::runtime_error(
378  "Query execution has exceeded the time limit or was interrupted during result "
379  "set reduction");
380  }
381 }
382 
383 } // namespace
384 
386  int8_t* this_buff,
387  const int8_t* that_buff,
388  const ResultSetStorage& that,
389  const size_t start_index,
390  const size_t end_index,
391  const std::vector<std::string>& serialized_varlen_buffer) const {
392  // TODO(adb / saman): Support column wise output when serializing distributed agg
393  // functions
394  CHECK(serialized_varlen_buffer.empty());
395 
396  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
397 
398  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
399  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
400  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
401  const auto& agg_info = targets_[target_idx];
402  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
403 
404  bool two_slot_target{false};
405  if (agg_info.is_agg &&
406  (agg_info.agg_kind == kAVG ||
407  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
408  // Note that this assumes if one of the slot pairs in a given target is an array,
409  // all slot pairs are arrays. Currently this is true for all geo targets, but we
410  // should better codify and store this information in the future
411  two_slot_target = true;
412  }
413 
414  for (size_t target_slot_idx = slots_for_col.front();
415  target_slot_idx < slots_for_col.back() + 1;
416  target_slot_idx += 2) {
417  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
418  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
419  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
420  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
421 
422  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
423  check_watchdog(entry_idx);
424  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
425  continue;
426  }
428  // copy the key from right hand side
429  copyKeyColWise(entry_idx, this_buff, that_buff);
430  }
431  auto this_ptr1 =
432  this_crt_col_ptr +
433  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
434  auto that_ptr1 =
435  that_crt_col_ptr +
436  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
437  int8_t* this_ptr2{nullptr};
438  const int8_t* that_ptr2{nullptr};
439  if (UNLIKELY(two_slot_target)) {
440  this_ptr2 =
441  this_next_col_ptr +
442  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
443  that_ptr2 =
444  that_next_col_ptr +
445  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
446  }
447  reduceOneSlot(this_ptr1,
448  this_ptr2,
449  that_ptr1,
450  that_ptr2,
451  agg_info,
452  target_idx,
453  target_slot_idx,
454  target_slot_idx,
455  that,
456  slots_for_col.front(),
457  serialized_varlen_buffer);
458  }
459 
460  this_crt_col_ptr = this_next_col_ptr;
461  that_crt_col_ptr = that_next_col_ptr;
462  if (UNLIKELY(two_slot_target)) {
463  this_crt_col_ptr = advance_to_next_columnar_target_buff(
464  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
465  that_crt_col_ptr = advance_to_next_columnar_target_buff(
466  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
467  }
468  }
469  }
470 }
471 
472 /*
473  * copy all keys from the columnar prepended group buffer of "that_buff" into
474  * "this_buff"
475  */
476 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
477  int8_t* this_buff,
478  const int8_t* that_buff) const {
480  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
481  group_idx++) {
482  // if the column corresponds to a group key
483  const auto column_offset_bytes =
485  auto lhs_key_ptr = this_buff + column_offset_bytes;
486  auto rhs_key_ptr = that_buff + column_offset_bytes;
487  switch (query_mem_desc_.groupColWidth(group_idx)) {
488  case 8:
489  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
490  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
491  break;
492  case 4:
493  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
494  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
495  break;
496  case 2:
497  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
498  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
499  break;
500  case 1:
501  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
502  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
503  break;
504  default:
505  CHECK(false);
506  break;
507  }
508  }
509 }
510 
511 // Rewrites the entries of this ResultSetStorage object to point directly into the
512 // serialized_varlen_buffer rather than using offsets.
514  const std::vector<std::string>& serialized_varlen_buffer) const {
515  if (serialized_varlen_buffer.empty()) {
516  return;
517  }
518 
520  auto entry_count = query_mem_desc_.getEntryCount();
521  CHECK_GT(entry_count, size_t(0));
522  CHECK(buff_);
523 
524  // Row-wise iteration, consider moving to separate function
525  for (size_t i = 0; i < entry_count; ++i) {
526  if (isEmptyEntry(i, buff_)) {
527  continue;
528  }
529  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
530  const auto key_bytes_with_padding = align_to_int64(key_bytes);
531  auto rowwise_targets_ptr =
532  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
533  size_t target_slot_idx = 0;
534  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
535  ++target_logical_idx) {
536  const auto& target_info = targets_[target_logical_idx];
537  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
538  CHECK(target_info.agg_kind == kSAMPLE);
539  auto ptr1 = rowwise_targets_ptr;
540  auto slot_idx = target_slot_idx;
541  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
542  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
543 
544  const auto& elem_ti = target_info.sql_type.get_elem_type();
545  size_t length_to_elems =
546  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
547  ? 1
548  : elem_ti.get_size();
549  if (target_info.sql_type.is_geometry()) {
550  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
551  if (j > 0) {
552  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
553  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
554  slot_idx += 2;
555  length_to_elems = 4;
556  }
557  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
558  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
559  const auto str_ptr =
560  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
561  CHECK(ptr1);
562  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
563  CHECK(ptr2);
564  *reinterpret_cast<int64_t*>(ptr2) =
565  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
566  }
567  } else {
568  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
569  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
570  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
571  CHECK(ptr1);
572  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
573  CHECK(ptr2);
574  *reinterpret_cast<int64_t*>(ptr2) =
575  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
576  }
577  }
578 
579  rowwise_targets_ptr = advance_target_ptr_row_wise(
580  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
581  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
582  }
583  }
584 
585  return;
586 }
587 
588 namespace {
589 
591  const uint32_t h,
592  const int64_t* key,
593  const uint32_t key_qw_count,
594  const size_t entry_count) {
595  auto off = h;
596  const auto old_key =
597  __sync_val_compare_and_swap(&groups_buffer[off], EMPTY_KEY_64, *key);
598  if (old_key == EMPTY_KEY_64) {
599  for (size_t i = 0; i < key_qw_count; ++i) {
600  groups_buffer[off] = key[i];
601  off += entry_count;
602  }
603  return {&groups_buffer[off], true};
604  }
605  off = h;
606  for (size_t i = 0; i < key_qw_count; ++i) {
607  if (groups_buffer[off] != key[i]) {
608  return {nullptr, true};
609  }
610  off += entry_count;
611  }
612  return {&groups_buffer[off], false};
613 }
614 
615 // TODO(alex): fix synchronization when we enable it
617  int64_t* groups_buffer,
618  const uint32_t groups_buffer_entry_count,
619  const int64_t* key,
620  const uint32_t key_qw_count) {
621  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
623  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
624  if (matching_gvi.first) {
625  return matching_gvi;
626  }
627  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
628  while (h_probe != h) {
630  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
631  if (matching_gvi.first) {
632  return matching_gvi;
633  }
634  h_probe = (h_probe + 1) % groups_buffer_entry_count;
635  }
636  return {nullptr, true};
637 }
638 
639 #define cas_cst(ptr, expected, desired) \
640  __atomic_compare_exchange_n( \
641  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
642 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
643 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
644 
645 template <typename T = int64_t>
647  int64_t* groups_buffer,
648  const uint32_t h,
649  const T* key,
650  const uint32_t key_count,
652  const int64_t* that_buff_i64,
653  const size_t that_entry_idx,
654  const size_t that_entry_count,
655  const uint32_t row_size_quad) {
656  auto off = h * row_size_quad;
657  T empty_key = get_empty_key<T>();
658  T write_pending = get_empty_key<T>() - 1;
659  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
660  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
661  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
662  if (success) {
663  fill_slots(groups_buffer + off + slot_off_quad,
664  query_mem_desc.getEntryCount(),
665  that_buff_i64,
666  that_entry_idx,
667  that_entry_count,
669  if (key_count > 1) {
670  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
671  }
672  store_cst(row_ptr, *key);
673  return {groups_buffer + off + slot_off_quad, true};
674  }
675  while (load_cst(row_ptr) == write_pending) {
676  // spin until the winning thread has finished writing the entire key and the init
677  // value
678  }
679  for (size_t i = 0; i < key_count; ++i) {
680  if (load_cst(row_ptr + i) != key[i]) {
681  return {nullptr, true};
682  }
683  }
684  return {groups_buffer + off + slot_off_quad, false};
685 }
686 
687 #undef load_cst
688 #undef store_cst
689 #undef cas_cst
690 
692  int64_t* groups_buffer,
693  const uint32_t h,
694  const int64_t* key,
695  const uint32_t key_count,
696  const size_t key_width,
698  const int64_t* that_buff_i64,
699  const size_t that_entry_idx,
700  const size_t that_entry_count,
701  const uint32_t row_size_quad) {
702  switch (key_width) {
703  case 4:
704  return get_matching_group_value_reduction(groups_buffer,
705  h,
706  reinterpret_cast<const int32_t*>(key),
707  key_count,
708  query_mem_desc,
709  that_buff_i64,
710  that_entry_idx,
711  that_entry_count,
712  row_size_quad);
713  case 8:
714  return get_matching_group_value_reduction(groups_buffer,
715  h,
716  key,
717  key_count,
718  query_mem_desc,
719  that_buff_i64,
720  that_entry_idx,
721  that_entry_count,
722  row_size_quad);
723  default:
724  CHECK(false);
725  return {nullptr, true};
726  }
727 }
728 
729 } // namespace
730 
732  const uint32_t groups_buffer_entry_count,
733  const int64_t* key,
734  const uint32_t key_count,
735  const size_t key_width,
737  const int64_t* that_buff_i64,
738  const size_t that_entry_idx,
739  const size_t that_entry_count,
740  const uint32_t row_size_quad) {
741  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
742  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
743  h,
744  key,
745  key_count,
746  key_width,
747  query_mem_desc,
748  that_buff_i64,
749  that_entry_idx,
750  that_entry_count,
751  row_size_quad);
752  if (matching_gvi.first) {
753  return matching_gvi;
754  }
755  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
756  while (h_probe != h) {
757  matching_gvi = get_matching_group_value_reduction(groups_buffer,
758  h_probe,
759  key,
760  key_count,
761  key_width,
762  query_mem_desc,
763  that_buff_i64,
764  that_entry_idx,
765  that_entry_count,
766  row_size_quad);
767  if (matching_gvi.first) {
768  return matching_gvi;
769  }
770  h_probe = (h_probe + 1) % groups_buffer_entry_count;
771  }
772  return {nullptr, true};
773 }
774 
775 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
776 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
778  const int8_t* that_buff,
779  const size_t that_entry_idx,
780  const size_t that_entry_count,
781  const ResultSetStorage& that) const {
782  check_watchdog(that_entry_idx);
783  const auto key_count = query_mem_desc_.getGroupbyColCount();
788  const auto key_off =
790  if (isEmptyEntry(that_entry_idx, that_buff)) {
791  return;
792  }
793  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
794  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
795  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
796  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
797  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
798  CHECK(this_entry_slots);
799  if (empty_entry) {
800  fill_slots(this_entry_slots,
802  that_buff_i64,
803  that_entry_idx,
804  that_entry_count,
806  return;
807  }
809  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
810 }
811 
812 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
813  const int64_t* that_buff,
814  const size_t that_entry_idx,
815  const size_t that_entry_count,
816  const ResultSetStorage& that) const {
818  const auto key_count = query_mem_desc_.getGroupbyColCount();
819  size_t j = 0;
820  size_t init_agg_val_idx = 0;
821  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
822  ++target_logical_idx) {
823  const auto& target_info = targets_[target_logical_idx];
824  const auto that_slot_off = slot_offset_colwise(
825  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
826  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
827  reduceOneSlotBaseline(this_entry_slots,
828  this_slot_off,
829  that_buff,
830  that_entry_count,
831  that_slot_off,
832  target_info,
833  target_logical_idx,
834  j,
835  init_agg_val_idx,
836  that);
838  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
839  } else {
840  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
841  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
842  }
843  }
844  j = advance_slot(j, target_info, false);
845  }
846 }
847 
849  const size_t this_slot,
850  const int64_t* that_buff,
851  const size_t that_entry_count,
852  const size_t that_slot,
853  const TargetInfo& target_info,
854  const size_t target_logical_idx,
855  const size_t target_slot_idx,
856  const size_t init_agg_val_idx,
857  const ResultSetStorage& that) const {
859  int8_t* this_ptr2{nullptr};
860  const int8_t* that_ptr2{nullptr};
861  if (target_info.is_agg &&
862  (target_info.agg_kind == kAVG ||
863  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
864  const auto this_count_off = query_mem_desc_.getEntryCount();
865  const auto that_count_off = that_entry_count;
866  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
867  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
868  }
869  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
870  this_ptr2,
871  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
872  that_ptr2,
873  target_info,
874  target_logical_idx,
875  target_slot_idx,
876  init_agg_val_idx,
877  that,
878  target_slot_idx, // dummy, for now
879  {});
880 }
881 
882 // During the reduction of two result sets using the baseline strategy, we first create a
883 // big enough buffer to hold the entries for both and we move the entries from the first
884 // into it before doing the reduction as usual (into the first buffer).
885 template <class KeyType>
887  const size_t new_entry_count) const {
889  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
890  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
891  const auto key_count = query_mem_desc_.getGroupbyColCount();
894  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
895  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
896  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
897 
899  const size_t thread_count = cpu_threads();
900  std::vector<std::future<void>> move_threads;
901 
902  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
903  const auto thread_entry_count =
904  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
905  const auto start_index = thread_idx * thread_entry_count;
906  const auto end_index =
907  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
908  move_threads.emplace_back(std::async(
909  std::launch::async,
910  [this,
911  src_buff,
912  new_buff_i64,
913  new_entry_count,
914  start_index,
915  end_index,
916  key_count,
917  row_qw_count,
918  key_byte_width] {
919  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
920  moveOneEntryToBuffer<KeyType>(entry_idx,
921  new_buff_i64,
922  new_entry_count,
923  key_count,
924  row_qw_count,
925  src_buff,
926  key_byte_width);
927  }
928  }));
929  }
930  for (auto& move_thread : move_threads) {
931  move_thread.wait();
932  }
933  for (auto& move_thread : move_threads) {
934  move_thread.get();
935  }
936  } else {
937  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
938  moveOneEntryToBuffer<KeyType>(entry_idx,
939  new_buff_i64,
940  new_entry_count,
941  key_count,
942  row_qw_count,
943  src_buff,
944  key_byte_width);
945  }
946  }
947 }
948 
949 template <class KeyType>
950 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
951  int64_t* new_buff_i64,
952  const size_t new_entry_count,
953  const size_t key_count,
954  const size_t row_qw_count,
955  const int64_t* src_buff,
956  const size_t key_byte_width) const {
957  const auto key_off =
960  : row_qw_count * entry_index;
961  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
962  if (*key_ptr == get_empty_key<KeyType>()) {
963  return;
964  }
965  int64_t* new_entries_ptr{nullptr};
967  const auto key =
968  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
969  new_entries_ptr =
970  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
971  } else {
972  new_entries_ptr = get_group_value(new_buff_i64,
973  new_entry_count,
974  &src_buff[key_off],
975  key_count,
976  key_byte_width,
977  row_qw_count,
978  nullptr);
979  }
980  CHECK(new_entries_ptr);
981  fill_slots(new_entries_ptr,
982  new_entry_count,
983  src_buff,
984  entry_index,
987 }
988 
990  if (query_mem_desc_.didOutputColumnar()) {
991  storage_->initializeColWise();
992  } else {
993  storage_->initializeRowWise();
994  }
995 }
996 
997 // Driver for reductions. Needed because the result of a reduction on the baseline
998 // layout, which can have collisions, cannot be done in place and something needs
999 // to take the ownership of the new result set with the bigger underlying buffer.
1000 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets) {
1001  CHECK(!result_sets.empty());
1002  auto result_rs = result_sets.front();
1003  CHECK(result_rs->storage_);
1004  auto& first_result = *result_rs->storage_;
1005  auto result = &first_result;
1006  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1007  for (const auto result_set : result_sets) {
1008  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1009  }
1010  const auto executor = result_rs->executor_;
1011  for (const auto result_set : result_sets) {
1012  CHECK_EQ(executor, result_set->executor_);
1013  }
1014  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1016  const auto total_entry_count =
1017  std::accumulate(result_sets.begin(),
1018  result_sets.end(),
1019  size_t(0),
1020  [](const size_t init, const ResultSet* rs) {
1021  return init + rs->query_mem_desc_.getEntryCount();
1022  });
1023  CHECK(total_entry_count);
1024  auto query_mem_desc = first_result.query_mem_desc_;
1025  query_mem_desc.setEntryCount(total_entry_count);
1026  rs_.reset(new ResultSet(first_result.targets_,
1029  row_set_mem_owner,
1030  executor));
1031  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1032  rs_->initializeStorage();
1033  switch (query_mem_desc.getEffectiveKeyWidth()) {
1034  case 4:
1035  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1036  query_mem_desc.getEntryCount());
1037  break;
1038  case 8:
1039  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1040  query_mem_desc.getEntryCount());
1041  break;
1042  default:
1043  CHECK(false);
1044  }
1045  result = rs_->storage_.get();
1046  result_rs = rs_.get();
1047  }
1048 
1049  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1050  if (!serialized_varlen_buffer.empty()) {
1051  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1052  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1053  ++result_it) {
1054  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1055  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1056  serialized_varlen_buffer.emplace_back(
1057  std::move(result_serialized_varlen_buffer.front()));
1058  }
1059  }
1060 
1061  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1062  result_rs->getTargetInfos(),
1063  result_rs->getTargetInitVals());
1064  auto reduction_code = reduction_jit.codegen();
1065  size_t ctr = 1;
1066  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1067  ++result_it) {
1068  if (!serialized_varlen_buffer.empty()) {
1069  result->reduce(
1070  *((*result_it)->storage_), serialized_varlen_buffer[ctr++], reduction_code);
1071  } else {
1072  result->reduce(*((*result_it)->storage_), {}, reduction_code);
1073  }
1074  }
1075  return result_rs;
1076 }
1077 
1078 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1079  return rs_;
1080 }
1081 
1082 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1083  auto& result_storage = result_rs->storage_;
1084  result_storage->rewriteAggregateBufferOffsets(
1085  result_rs->serialized_varlen_buffer_.front());
1086 }
1087 
1088 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1089  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1090  const auto key_count = query_mem_desc_.getGroupbyColCount();
1091  CHECK_EQ(slot_count + key_count, entry.size());
1092  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1094  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1095  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1096  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1097  for (size_t i = 0; i < key_count; ++i) {
1098  this_buff[key_off + i] = entry[i];
1099  }
1100  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1101  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1102  this_buff[first_slot_off + i] = entry[key_count + i];
1103  }
1104 }
1105 
1107  const auto key_count = query_mem_desc_.getGroupbyColCount();
1108  const auto row_size = get_row_bytes(query_mem_desc_);
1109  CHECK_EQ(row_size % 8, 0u);
1110  const auto key_bytes_with_padding =
1114  case 4: {
1115  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1116  auto row_ptr = buff_ + i * row_size;
1117  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1118  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1119  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1120  slot_ptr[j] = target_init_vals_[j];
1121  }
1122  }
1123  break;
1124  }
1125  case 8: {
1126  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1127  auto row_ptr = buff_ + i * row_size;
1128  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1129  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1130  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1131  slot_ptr[j] = target_init_vals_[j];
1132  }
1133  }
1134  break;
1135  }
1136  default:
1137  CHECK(false);
1138  }
1139 }
1140 
1141 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1143  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1144  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1145  const auto key_count = query_mem_desc_.getGroupbyColCount();
1146  CHECK_EQ(slot_count + key_count, entry.size());
1147  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1148 
1149  for (size_t i = 0; i < key_count; i++) {
1150  const auto key_offset = key_offset_colwise(0, i, 1);
1151  this_buff[key_offset] = entry[i];
1152  }
1153 
1154  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1155  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1156  this_buff[slot_offset] = entry[key_count + i];
1157  }
1158 }
1159 
1161  const auto key_count = query_mem_desc_.getGroupbyColCount();
1162  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1164  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1165  const auto first_key_off =
1167  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1168  this_buff[first_key_off + i] = EMPTY_KEY_64;
1169  }
1170  }
1171  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1172  const auto first_val_off =
1173  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1174  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1175  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1176  }
1177  }
1178 }
1179 
1180 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1181  CHECK(entry_slots);
1183  size_t slot_off = 0;
1184  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1185  entry_slots[slot_off] = target_init_vals_[j];
1186  slot_off += query_mem_desc_.getEntryCount();
1187  }
1188  } else {
1189  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1190  entry_slots[j] = target_init_vals_[j];
1191  }
1192  }
1193 }
1194 
1195 #define AGGREGATE_ONE_VALUE( \
1196  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1197  do { \
1198  const auto sql_type = get_compact_type(agg_info__); \
1199  if (sql_type.is_fp()) { \
1200  if (chosen_bytes__ == sizeof(float)) { \
1201  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1202  *reinterpret_cast<const float*>(other_ptr__)); \
1203  } else { \
1204  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1205  *reinterpret_cast<const double*>(other_ptr__)); \
1206  } \
1207  } else { \
1208  if (chosen_bytes__ == sizeof(int32_t)) { \
1209  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1210  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1211  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1212  } else { \
1213  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1214  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1215  agg_##agg_kind__(val_ptr, *other_ptr); \
1216  } \
1217  } \
1218  } while (0)
1219 
1220 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1221  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1222  do { \
1223  if (agg_info__.skip_null_val) { \
1224  const auto sql_type = get_compact_type(agg_info__); \
1225  if (sql_type.is_fp()) { \
1226  if (chosen_bytes__ == sizeof(float)) { \
1227  agg_##agg_kind__##_float_skip_val( \
1228  reinterpret_cast<int32_t*>(val_ptr__), \
1229  *reinterpret_cast<const float*>(other_ptr__), \
1230  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1231  } else { \
1232  agg_##agg_kind__##_double_skip_val( \
1233  reinterpret_cast<int64_t*>(val_ptr__), \
1234  *reinterpret_cast<const double*>(other_ptr__), \
1235  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1236  } \
1237  } else { \
1238  if (chosen_bytes__ == sizeof(int32_t)) { \
1239  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1240  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1241  const auto null_val = static_cast<int32_t>(init_val__); \
1242  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1243  } else { \
1244  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1245  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1246  const auto null_val = static_cast<int64_t>(init_val__); \
1247  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1248  } \
1249  } \
1250  } else { \
1251  AGGREGATE_ONE_VALUE( \
1252  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1253  } \
1254  } while (0)
1255 
1256 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1257  do { \
1258  if (chosen_bytes__ == sizeof(int32_t)) { \
1259  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1260  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1261  agg_sum_int32(val_ptr, *other_ptr); \
1262  } else { \
1263  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1264  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1265  agg_sum(val_ptr, *other_ptr); \
1266  } \
1267  } while (0)
1268 
1269 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1270  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1271  { \
1272  if (agg_info__.skip_null_val) { \
1273  const auto sql_type = get_compact_type(agg_info__); \
1274  if (sql_type.is_fp()) { \
1275  if (chosen_bytes__ == sizeof(float)) { \
1276  agg_sum_float_skip_val( \
1277  reinterpret_cast<int32_t*>(val_ptr__), \
1278  *reinterpret_cast<const float*>(other_ptr__), \
1279  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1280  } else { \
1281  agg_sum_double_skip_val( \
1282  reinterpret_cast<int64_t*>(val_ptr__), \
1283  *reinterpret_cast<const double*>(other_ptr__), \
1284  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1285  } \
1286  } else { \
1287  if (chosen_bytes__ == sizeof(int32_t)) { \
1288  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1289  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1290  const auto null_val = static_cast<int32_t>(init_val__); \
1291  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1292  } else { \
1293  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1294  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1295  const auto null_val = static_cast<int64_t>(init_val__); \
1296  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1297  } \
1298  } \
1299  } else { \
1300  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1301  } \
1302  }
1303 
1304 // to be used for 8/16-bit kMIN and kMAX only
1305 #define AGGREGATE_ONE_VALUE_SMALL( \
1306  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1307  do { \
1308  if (chosen_bytes__ == sizeof(int16_t)) { \
1309  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1310  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1311  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1312  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1313  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1314  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1315  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1316  } else { \
1317  UNREACHABLE(); \
1318  } \
1319  } while (0)
1320 
1321 // to be used for 8/16-bit kMIN and kMAX only
1322 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1323  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1324  do { \
1325  if (agg_info__.skip_null_val) { \
1326  if (chosen_bytes__ == sizeof(int16_t)) { \
1327  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1328  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1329  const auto null_val = static_cast<int16_t>(init_val__); \
1330  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1331  } else if (chosen_bytes == sizeof(int8_t)) { \
1332  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1333  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1334  const auto null_val = static_cast<int8_t>(init_val__); \
1335  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1336  } \
1337  } else { \
1338  AGGREGATE_ONE_VALUE_SMALL( \
1339  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1340  } \
1341  } while (0)
1342 
1343 int8_t get_width_for_slot(const size_t target_slot_idx,
1344  const bool float_argument_input,
1346  if (float_argument_input) {
1347  return sizeof(float);
1348  }
1349  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1350 }
1351 
1353  const TargetInfo& target_info,
1354  const size_t target_slot_idx,
1355  const size_t init_agg_val_idx,
1356  const int8_t* that_ptr1) const {
1357  const bool float_argument_input = takes_float_argument(target_info);
1358  const auto chosen_bytes =
1359  get_width_for_slot(target_slot_idx, float_argument_input, query_mem_desc_);
1360  auto init_val = target_init_vals_[init_agg_val_idx];
1361 
1362  auto reduce = [&](auto const& size_tag) {
1363  using CastTarget = std::decay_t<decltype(size_tag)>;
1364  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1365  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1366  if (rhs_proj_col == init_val) {
1367  // ignore
1368  } else if (lhs_proj_col == init_val) {
1369  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1370  } else if (lhs_proj_col != rhs_proj_col) {
1371  throw std::runtime_error("Multiple distinct values encountered");
1372  }
1373  };
1374 
1375  switch (chosen_bytes) {
1376  case 1: {
1378  reduce(int8_t());
1379  break;
1380  }
1381  case 2: {
1383  reduce(int16_t());
1384  break;
1385  }
1386  case 4: {
1387  reduce(int32_t());
1388  break;
1389  }
1390  case 8: {
1391  CHECK(!target_info.sql_type.is_varlen());
1392  reduce(int64_t());
1393  break;
1394  }
1395  default:
1396  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1397  }
1398 }
1399 
1401  int8_t* this_ptr1,
1402  int8_t* this_ptr2,
1403  const int8_t* that_ptr1,
1404  const int8_t* that_ptr2,
1405  const TargetInfo& target_info,
1406  const size_t target_logical_idx,
1407  const size_t target_slot_idx,
1408  const size_t init_agg_val_idx,
1409  const ResultSetStorage& that,
1410  const size_t first_slot_idx_for_target,
1411  const std::vector<std::string>& serialized_varlen_buffer) const {
1413  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1414  return;
1415  }
1416  }
1417  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1418  const bool float_argument_input = takes_float_argument(target_info);
1419  const auto chosen_bytes =
1420  get_width_for_slot(target_slot_idx, float_argument_input, query_mem_desc_);
1421  auto init_val = target_init_vals_[init_agg_val_idx];
1422 
1423  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1425  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1426  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1427  switch (target_info.agg_kind) {
1428  case kCOUNT:
1429  case kAPPROX_COUNT_DISTINCT: {
1430  if (is_distinct_target(target_info)) {
1431  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1432  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1433  break;
1434  }
1435  CHECK_EQ(int64_t(0), init_val);
1436  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1437  break;
1438  }
1439  case kAVG: {
1440  // Ignore float argument compaction for count component for fear of its overflow
1441  AGGREGATE_ONE_COUNT(this_ptr2,
1442  that_ptr2,
1443  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1444  }
1445  // fall thru
1446  case kSUM: {
1448  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1449  break;
1450  }
1451  case kMIN: {
1452  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1454  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1455  } else {
1457  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1458  }
1459  break;
1460  }
1461  case kMAX: {
1462  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1464  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1465  } else {
1467  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1468  }
1469  break;
1470  }
1471  default:
1472  CHECK(false);
1473  }
1474  } else {
1475  switch (chosen_bytes) {
1476  case 1: {
1478  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1479  if (rhs_proj_col != init_val) {
1480  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1481  }
1482  break;
1483  }
1484  case 2: {
1486  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1487  if (rhs_proj_col != init_val) {
1488  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1489  }
1490  break;
1491  }
1492  case 4: {
1493  CHECK(target_info.agg_kind != kSAMPLE ||
1495  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1496  if (rhs_proj_col != init_val) {
1497  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1498  }
1499  break;
1500  }
1501  case 8: {
1502  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1503  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1504  !serialized_varlen_buffer.empty()) {
1505  size_t length_to_elems{0};
1506  if (target_info.sql_type.is_geometry()) {
1507  // TODO: Assumes hard-coded sizes for geometry targets
1508  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1509  } else {
1510  const auto& elem_ti = target_info.sql_type.get_elem_type();
1511  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1512  }
1513 
1514  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1515  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1516  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1517  *reinterpret_cast<int64_t*>(this_ptr1) =
1518  reinterpret_cast<const int64_t>(str_ptr);
1519  *reinterpret_cast<int64_t*>(this_ptr2) =
1520  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1521  } else {
1522  if (rhs_proj_col != init_val) {
1523  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1524  }
1525  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1526  CHECK(this_ptr2 && that_ptr2);
1527  *reinterpret_cast<int64_t*>(this_ptr2) =
1528  *reinterpret_cast<const int64_t*>(that_ptr2);
1529  }
1530  }
1531 
1532  break;
1533  }
1534  default:
1535  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1536  }
1537  }
1538 }
1539 
1541  const int8_t* that_ptr1,
1542  const size_t target_logical_idx,
1543  const ResultSetStorage& that) const {
1545  const auto& old_count_distinct_desc =
1546  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1547  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1548  const auto& new_count_distinct_desc =
1549  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1550  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1551  CHECK(this_ptr1 && that_ptr1);
1552  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1553  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1555  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1556 }
1557 
1558 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1559  const int8_t warp_count,
1560  const bool is_columnar,
1561  const bool replace_bitmap_ptr_with_bitmap_sz,
1562  std::vector<int64_t>& agg_vals,
1564  const std::vector<TargetInfo>& targets,
1565  const std::vector<int64_t>& agg_init_vals) {
1566  const size_t agg_col_count{agg_vals.size()};
1567  const auto row_size = query_mem_desc.getRowSize();
1568  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1569  CHECK_GE(agg_col_count, targets.size());
1570  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1571  CHECK(query_mem_desc.hasKeylessHash());
1572  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1573  bool discard_row = true;
1574  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1575  bool discard_partial_result = true;
1576  for (size_t target_idx = 0, agg_col_idx = 0;
1577  target_idx < targets.size() && agg_col_idx < agg_col_count;
1578  ++target_idx, ++agg_col_idx) {
1579  const auto& agg_info = targets[target_idx];
1580  const bool float_argument_input = takes_float_argument(agg_info);
1581  const auto chosen_bytes = float_argument_input
1582  ? sizeof(float)
1583  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1584  auto partial_bin_val = get_component(
1585  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1586  partial_agg_vals[agg_col_idx] = partial_bin_val;
1587  if (is_distinct_target(agg_info)) {
1588  CHECK_EQ(int8_t(1), warp_count);
1589  CHECK(agg_info.is_agg && (agg_info.agg_kind == kCOUNT ||
1590  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1591  partial_bin_val = count_distinct_set_size(
1592  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1593  if (replace_bitmap_ptr_with_bitmap_sz) {
1594  partial_agg_vals[agg_col_idx] = partial_bin_val;
1595  }
1596  }
1597  if (kAVG == agg_info.agg_kind) {
1598  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1599  ++agg_col_idx;
1600  partial_bin_val = partial_agg_vals[agg_col_idx] =
1601  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1602  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1603  }
1604  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1605  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1606  CHECK(agg_info.is_agg);
1607  discard_partial_result = false;
1608  }
1609  }
1610  row_ptr += row_size;
1611  if (discard_partial_result) {
1612  continue;
1613  }
1614  discard_row = false;
1615  for (size_t target_idx = 0, agg_col_idx = 0;
1616  target_idx < targets.size() && agg_col_idx < agg_col_count;
1617  ++target_idx, ++agg_col_idx) {
1618  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1619  const auto& agg_info = targets[target_idx];
1620  const bool float_argument_input = takes_float_argument(agg_info);
1621  const auto chosen_bytes = float_argument_input
1622  ? sizeof(float)
1623  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1624  const auto& chosen_type = get_compact_type(agg_info);
1625  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1626  try {
1627  switch (agg_info.agg_kind) {
1628  case kCOUNT:
1631  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1632  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1633  agg_init_vals[agg_col_idx],
1634  chosen_bytes,
1635  agg_info);
1636  break;
1637  case kAVG:
1638  // Ignore float argument compaction for count component for fear of its
1639  // overflow
1641  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1642  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1643  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1644  // fall thru
1645  case kSUM:
1647  sum,
1648  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1649  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1650  agg_init_vals[agg_col_idx],
1651  chosen_bytes,
1652  agg_info);
1653  break;
1654  case kMIN:
1655  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1657  min,
1658  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1659  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1660  agg_init_vals[agg_col_idx],
1661  chosen_bytes,
1662  agg_info);
1663  } else {
1665  min,
1666  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1667  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1668  agg_init_vals[agg_col_idx],
1669  chosen_bytes,
1670  agg_info);
1671  }
1672  break;
1673  case kMAX:
1674  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1676  max,
1677  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1678  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1679  agg_init_vals[agg_col_idx],
1680  chosen_bytes,
1681  agg_info);
1682  } else {
1684  max,
1685  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1686  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1687  agg_init_vals[agg_col_idx],
1688  chosen_bytes,
1689  agg_info);
1690  }
1691  break;
1692  default:
1693  CHECK(false);
1694  break;
1695  }
1696  } catch (std::runtime_error& e) {
1697  // TODO(miyu): handle the case where chosen_bytes < 8
1698  LOG(ERROR) << e.what();
1699  }
1700  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1701  switch (chosen_bytes) {
1702  case 8:
1703  break;
1704  case 4: {
1705  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1706  if (!(agg_info.agg_kind == kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1707  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1708  }
1709  break;
1710  }
1711  default:
1712  CHECK(false);
1713  }
1714  }
1715  if (kAVG == agg_info.agg_kind) {
1716  ++agg_col_idx;
1717  }
1718  } else {
1719  if (agg_info.agg_kind == kSAMPLE) {
1720  CHECK(!agg_info.sql_type.is_varlen())
1721  << "Interleaved bins reduction not supported for variable length "
1722  "arguments "
1723  "to SAMPLE";
1724  }
1725  if (agg_vals[agg_col_idx]) {
1726  if (agg_info.agg_kind == kSAMPLE) {
1727  continue;
1728  }
1729  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1730  } else {
1731  agg_vals[agg_col_idx] = partial_bin_val;
1732  }
1733  }
1734  }
1735  }
1736  return discard_row;
1737 }
std::pair< int64_t *, bool > GroupValueInfo
Definition: ResultSet.h:931
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const int32_t groups_buffer_size return groups_buffer
const int64_t const uint32_t const uint32_t const uint32_t agg_col_count
NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
void run_reduction_code(const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
#define EMPTY_KEY_64
const std::vector< TargetInfo > targets_
Definition: ResultSet.h:207
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
Definition: ResultSet.h:211
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
std::unique_ptr< Function > ir_reduce_loop
ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
#define LOG(tag)
Definition: Logger.h:188
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:906
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
bool is_varlen() const
Definition: sqltypes.h:423
void initializeStorage() const
#define CHECK_GE(x, y)
Definition: Logger.h:210
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:74
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:121
const int64_t const uint32_t groups_buffer_entry_count
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
const int64_t const uint32_t const uint32_t key_qw_count
#define CHECK_GT(x, y)
Definition: Logger.h:209
void rewriteVarlenAggregates(ResultSet *)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code) const
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:73
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
static EvalValue run(const Function *function, const std::vector< EvalValue > &inputs)
bool is_agg
Definition: TargetInfo.h:40
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
CHECK(cgen_state)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:276
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
#define store_cst(ptr, val)
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t targetGroupbyIndicesSize() const
ResultSet * reduce(std::vector< ResultSet * > &)
Definition: sqldefs.h:75
ALWAYS_INLINE void check_watchdog(const size_t sample_seed)
#define LIKELY(x)
Definition: likely.h:19
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:117
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad, const int64_t *init_vals)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:1041
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
SQLAgg agg_kind
Definition: TargetInfo.h:41
size_t getCountDistinctDescriptorsSize() const
ssize_t getTargetGroupbyIndex(const size_t target_idx) const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:20
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:207
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
int8_t * buff_
Definition: ResultSet.h:209
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:76
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
const ColSlotContext & getColSlotContext() const
bool is_geometry() const
Definition: sqltypes.h:421
#define EMPTY_KEY_32
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
bool is_string() const
Definition: sqltypes.h:409
Definition: sqldefs.h:74
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:25
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:617
size_t getBufferColSlotCount() const
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:72
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
QueryMemoryDescriptor query_mem_desc_
Definition: ResultSet.h:208
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const