OmniSciDB  8a228a1076
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "DynamicWatchdog.h"
26 #include "Execute.h"
27 #include "ResultSet.h"
29 #include "ResultSetReductionJIT.h"
30 #include "RuntimeFunctions.h"
31 #include "Shared/SqlTypesLayout.h"
32 
33 #include "Shared/likely.h"
34 #include "Shared/thread_count.h"
35 
36 #include <llvm/ExecutionEngine/GenericValue.h>
37 
38 #include <algorithm>
39 #include <future>
40 #include <numeric>
41 
42 extern bool g_enable_dynamic_watchdog;
43 
44 namespace {
45 
46 bool use_multithreaded_reduction(const size_t entry_count) {
47  return entry_count > 100000;
48 }
49 
50 size_t get_row_qw_count(const QueryMemoryDescriptor& query_mem_desc) {
51  const auto row_bytes = get_row_bytes(query_mem_desc);
52  CHECK_EQ(size_t(0), row_bytes % 8);
53  return row_bytes / 8;
54 }
55 
56 std::vector<int64_t> make_key(const int64_t* buff,
57  const size_t entry_count,
58  const size_t key_count) {
59  std::vector<int64_t> key;
60  size_t off = 0;
61  for (size_t i = 0; i < key_count; ++i) {
62  key.push_back(buff[off]);
63  off += entry_count;
64  }
65  return key;
66 }
67 
68 void fill_slots(int64_t* dst_entry,
69  const size_t dst_entry_count,
70  const int64_t* src_buff,
71  const size_t src_entry_idx,
72  const size_t src_entry_count,
73  const QueryMemoryDescriptor& query_mem_desc) {
74  const auto slot_count = query_mem_desc.getBufferColSlotCount();
75  const auto key_count = query_mem_desc.getGroupbyColCount();
76  if (query_mem_desc.didOutputColumnar()) {
77  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
78  ++i, dst_slot_off += dst_entry_count) {
79  dst_entry[dst_slot_off] =
80  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
81  }
82  } else {
83  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
84  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
85  for (size_t i = 0; i < slot_count; ++i) {
86  dst_entry[i] = row_ptr[slot_off_quad + i];
87  }
88  }
89 }
90 
92 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
93  for (size_t i = 0; i < key_count; ++i) {
94  key_ptr_i32[i] = EMPTY_KEY_32;
95  }
96 }
97 
99 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
100  for (size_t i = 0; i < key_count; ++i) {
101  key_ptr_i64[i] = EMPTY_KEY_64;
102  }
103 }
104 
105 inline int64_t get_component(const int8_t* group_by_buffer,
106  const size_t comp_sz,
107  const size_t index = 0) {
108  int64_t ret = std::numeric_limits<int64_t>::min();
109  switch (comp_sz) {
110  case 1: {
111  ret = group_by_buffer[index];
112  break;
113  }
114  case 2: {
115  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
116  ret = buffer_ptr[index];
117  break;
118  }
119  case 4: {
120  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
121  ret = buffer_ptr[index];
122  break;
123  }
124  case 8: {
125  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
126  ret = buffer_ptr[index];
127  break;
128  }
129  default:
130  CHECK(false);
131  }
132  return ret;
133 }
134 
135 void run_reduction_code(const ReductionCode& reduction_code,
136  int8_t* this_buff,
137  const int8_t* that_buff,
138  const int32_t start_entry_index,
139  const int32_t end_entry_index,
140  const int32_t that_entry_count,
141  const void* this_qmd,
142  const void* that_qmd,
143  const void* serialized_varlen_buffer) {
144  int err = 0;
145  if (reduction_code.func_ptr) {
146  err = reduction_code.func_ptr(this_buff,
147  that_buff,
148  start_entry_index,
149  end_entry_index,
150  that_entry_count,
151  this_qmd,
152  that_qmd,
153  serialized_varlen_buffer);
154  } else {
155  // Calls LLVM methods that are not thread safe, ensure nothing else compiles while we
156  // run this reduction
157  std::lock_guard<std::mutex> compilation_lock(Executor::compilation_mutex_);
158  auto ret = ReductionInterpreter::run(
159  reduction_code.ir_reduce_loop.get(),
160  {ReductionInterpreter::EvalValue{.ptr = this_buff},
161  ReductionInterpreter::EvalValue{.ptr = that_buff},
162  ReductionInterpreter::EvalValue{.int_val = start_entry_index},
163  ReductionInterpreter::EvalValue{.int_val = end_entry_index},
164  ReductionInterpreter::EvalValue{.int_val = that_entry_count},
167  ReductionInterpreter::EvalValue{.ptr = serialized_varlen_buffer}});
168  err = ret.int_val;
169  }
170  if (err) {
172  throw std::runtime_error("Multiple distinct values encountered");
173  }
174 
175  throw std::runtime_error(
176  "Query execution has exceeded the time limit or was interrupted during result "
177  "set reduction");
178  }
179 }
180 
181 } // namespace
182 
183 void fill_empty_key(void* key_ptr, const size_t key_count, const size_t key_width) {
184  switch (key_width) {
185  case 4: {
186  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
187  fill_empty_key_32(key_ptr_i32, key_count);
188  break;
189  }
190  case 8: {
191  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
192  fill_empty_key_64(key_ptr_i64, key_count);
193  break;
194  }
195  default:
196  CHECK(false);
197  }
198 }
199 
200 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
201 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
203  const std::vector<std::string>& serialized_varlen_buffer,
204  const ReductionCode& reduction_code) const {
205  auto entry_count = query_mem_desc_.getEntryCount();
206  CHECK_GT(entry_count, size_t(0));
214  }
215  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
218  CHECK_GE(entry_count, that_entry_count);
219  break;
220  default:
221  CHECK_EQ(entry_count, that_entry_count);
222  }
223  auto this_buff = buff_;
224  CHECK(this_buff);
225  auto that_buff = that.buff_;
226  CHECK(that_buff);
229  if (!serialized_varlen_buffer.empty()) {
230  throw std::runtime_error(
231  "Projection of variable length targets with baseline hash group by is not yet "
232  "supported in Distributed mode");
233  }
234  if (use_multithreaded_reduction(that_entry_count)) {
235  const size_t thread_count = cpu_threads();
236  std::vector<std::future<void>> reduction_threads;
237  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
238  const auto thread_entry_count =
239  (that_entry_count + thread_count - 1) / thread_count;
240  const auto start_index = thread_idx * thread_entry_count;
241  const auto end_index =
242  std::min(start_index + thread_entry_count, that_entry_count);
243  reduction_threads.emplace_back(std::async(
244  std::launch::async,
245  [this,
246  this_buff,
247  that_buff,
248  start_index,
249  end_index,
250  that_entry_count,
251  &reduction_code,
252  &that] {
253  if (reduction_code.ir_reduce_loop) {
254  run_reduction_code(reduction_code,
255  this_buff,
256  that_buff,
257  start_index,
258  end_index,
259  that_entry_count,
260  &query_mem_desc_,
261  &that.query_mem_desc_,
262  nullptr);
263  } else {
264  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
265  reduceOneEntryBaseline(
266  this_buff, that_buff, entry_idx, that_entry_count, that);
267  }
268  }
269  }));
270  }
271  for (auto& reduction_thread : reduction_threads) {
272  reduction_thread.wait();
273  }
274  for (auto& reduction_thread : reduction_threads) {
275  reduction_thread.get();
276  }
277  } else {
278  if (reduction_code.ir_reduce_loop) {
279  run_reduction_code(reduction_code,
280  this_buff,
281  that_buff,
282  0,
283  that_entry_count,
284  that_entry_count,
286  &that.query_mem_desc_,
287  nullptr);
288  } else {
289  for (size_t i = 0; i < that_entry_count; ++i) {
290  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
291  }
292  }
293  }
294  return;
295  }
296  if (use_multithreaded_reduction(entry_count)) {
297  const size_t thread_count = cpu_threads();
298  std::vector<std::future<void>> reduction_threads;
299  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
300  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
301  const auto start_index = thread_idx * thread_entry_count;
302  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
304  reduction_threads.emplace_back(std::async(std::launch::async,
305  [this,
306  this_buff,
307  that_buff,
308  start_index,
309  end_index,
310  &that,
311  &serialized_varlen_buffer] {
313  this_buff,
314  that_buff,
315  that,
316  start_index,
317  end_index,
318  serialized_varlen_buffer);
319  }));
320  } else {
321  reduction_threads.emplace_back(std::async(std::launch::async,
322  [this,
323  this_buff,
324  that_buff,
325  start_index,
326  end_index,
327  that_entry_count,
328  &reduction_code,
329  &that,
330  &serialized_varlen_buffer] {
331  CHECK(reduction_code.ir_reduce_loop);
333  reduction_code,
334  this_buff,
335  that_buff,
336  start_index,
337  end_index,
338  that_entry_count,
340  &that.query_mem_desc_,
341  &serialized_varlen_buffer);
342  }));
343  }
344  }
345  for (auto& reduction_thread : reduction_threads) {
346  reduction_thread.wait();
347  }
348  for (auto& reduction_thread : reduction_threads) {
349  reduction_thread.get();
350  }
351  } else {
354  that_buff,
355  that,
356  0,
358  serialized_varlen_buffer);
359  } else {
360  CHECK(reduction_code.ir_reduce_loop);
361  run_reduction_code(reduction_code,
362  this_buff,
363  that_buff,
364  0,
365  entry_count,
366  that_entry_count,
368  &that.query_mem_desc_,
369  &serialized_varlen_buffer);
370  }
371  }
372 }
373 
374 namespace {
375 
376 ALWAYS_INLINE void check_watchdog(const size_t sample_seed) {
377  if (UNLIKELY(g_enable_dynamic_watchdog && (sample_seed & 0x3F) == 0 &&
378  dynamic_watchdog())) {
379  // TODO(alex): distinguish between the deadline and interrupt
380  throw std::runtime_error(
381  "Query execution has exceeded the time limit or was interrupted during result "
382  "set reduction");
383  }
384 }
385 
386 } // namespace
387 
389  int8_t* this_buff,
390  const int8_t* that_buff,
391  const ResultSetStorage& that,
392  const size_t start_index,
393  const size_t end_index,
394  const std::vector<std::string>& serialized_varlen_buffer) const {
395  // TODO(adb / saman): Support column wise output when serializing distributed agg
396  // functions
397  CHECK(serialized_varlen_buffer.empty());
398 
399  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
400 
401  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
402  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
403  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
404  const auto& agg_info = targets_[target_idx];
405  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
406 
407  bool two_slot_target{false};
408  if (agg_info.is_agg &&
409  (agg_info.agg_kind == kAVG ||
410  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
411  // Note that this assumes if one of the slot pairs in a given target is an array,
412  // all slot pairs are arrays. Currently this is true for all geo targets, but we
413  // should better codify and store this information in the future
414  two_slot_target = true;
415  }
416 
417  for (size_t target_slot_idx = slots_for_col.front();
418  target_slot_idx < slots_for_col.back() + 1;
419  target_slot_idx += 2) {
420  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
421  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
422  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
423  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
424 
425  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
426  check_watchdog(entry_idx);
427  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
428  continue;
429  }
431  // copy the key from right hand side
432  copyKeyColWise(entry_idx, this_buff, that_buff);
433  }
434  auto this_ptr1 =
435  this_crt_col_ptr +
436  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
437  auto that_ptr1 =
438  that_crt_col_ptr +
439  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
440  int8_t* this_ptr2{nullptr};
441  const int8_t* that_ptr2{nullptr};
442  if (UNLIKELY(two_slot_target)) {
443  this_ptr2 =
444  this_next_col_ptr +
445  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
446  that_ptr2 =
447  that_next_col_ptr +
448  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
449  }
450  reduceOneSlot(this_ptr1,
451  this_ptr2,
452  that_ptr1,
453  that_ptr2,
454  agg_info,
455  target_idx,
456  target_slot_idx,
457  target_slot_idx,
458  that,
459  slots_for_col.front(),
460  serialized_varlen_buffer);
461  }
462 
463  this_crt_col_ptr = this_next_col_ptr;
464  that_crt_col_ptr = that_next_col_ptr;
465  if (UNLIKELY(two_slot_target)) {
466  this_crt_col_ptr = advance_to_next_columnar_target_buff(
467  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
468  that_crt_col_ptr = advance_to_next_columnar_target_buff(
469  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
470  }
471  }
472  }
473 }
474 
475 /*
476  * copy all keys from the columnar prepended group buffer of "that_buff" into
477  * "this_buff"
478  */
479 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
480  int8_t* this_buff,
481  const int8_t* that_buff) const {
483  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
484  group_idx++) {
485  // if the column corresponds to a group key
486  const auto column_offset_bytes =
488  auto lhs_key_ptr = this_buff + column_offset_bytes;
489  auto rhs_key_ptr = that_buff + column_offset_bytes;
490  switch (query_mem_desc_.groupColWidth(group_idx)) {
491  case 8:
492  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
493  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
494  break;
495  case 4:
496  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
497  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
498  break;
499  case 2:
500  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
501  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
502  break;
503  case 1:
504  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
505  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
506  break;
507  default:
508  CHECK(false);
509  break;
510  }
511  }
512 }
513 
514 // Rewrites the entries of this ResultSetStorage object to point directly into the
515 // serialized_varlen_buffer rather than using offsets.
517  const std::vector<std::string>& serialized_varlen_buffer) const {
518  if (serialized_varlen_buffer.empty()) {
519  return;
520  }
521 
523  auto entry_count = query_mem_desc_.getEntryCount();
524  CHECK_GT(entry_count, size_t(0));
525  CHECK(buff_);
526 
527  // Row-wise iteration, consider moving to separate function
528  for (size_t i = 0; i < entry_count; ++i) {
529  if (isEmptyEntry(i, buff_)) {
530  continue;
531  }
532  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
533  const auto key_bytes_with_padding = align_to_int64(key_bytes);
534  auto rowwise_targets_ptr =
535  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
536  size_t target_slot_idx = 0;
537  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
538  ++target_logical_idx) {
539  const auto& target_info = targets_[target_logical_idx];
540  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
541  CHECK(target_info.agg_kind == kSAMPLE);
542  auto ptr1 = rowwise_targets_ptr;
543  auto slot_idx = target_slot_idx;
544  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
545  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
546 
547  const auto& elem_ti = target_info.sql_type.get_elem_type();
548  size_t length_to_elems =
549  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
550  ? 1
551  : elem_ti.get_size();
552  if (target_info.sql_type.is_geometry()) {
553  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
554  if (j > 0) {
555  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
556  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
557  slot_idx += 2;
558  length_to_elems = 4;
559  }
560  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
561  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
562  const auto str_ptr =
563  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
564  CHECK(ptr1);
565  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
566  CHECK(ptr2);
567  *reinterpret_cast<int64_t*>(ptr2) =
568  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
569  }
570  } else {
571  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
572  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
573  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
574  CHECK(ptr1);
575  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
576  CHECK(ptr2);
577  *reinterpret_cast<int64_t*>(ptr2) =
578  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
579  }
580  }
581 
582  rowwise_targets_ptr = advance_target_ptr_row_wise(
583  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
584  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
585  }
586  }
587 
588  return;
589 }
590 
591 namespace {
592 
594  const uint32_t h,
595  const int64_t* key,
596  const uint32_t key_qw_count,
597  const size_t entry_count) {
598  auto off = h;
599  const auto old_key =
600  __sync_val_compare_and_swap(&groups_buffer[off], EMPTY_KEY_64, *key);
601  if (old_key == EMPTY_KEY_64) {
602  for (size_t i = 0; i < key_qw_count; ++i) {
603  groups_buffer[off] = key[i];
604  off += entry_count;
605  }
606  return {&groups_buffer[off], true};
607  }
608  off = h;
609  for (size_t i = 0; i < key_qw_count; ++i) {
610  if (groups_buffer[off] != key[i]) {
611  return {nullptr, true};
612  }
613  off += entry_count;
614  }
615  return {&groups_buffer[off], false};
616 }
617 
618 // TODO(alex): fix synchronization when we enable it
620  int64_t* groups_buffer,
621  const uint32_t groups_buffer_entry_count,
622  const int64_t* key,
623  const uint32_t key_qw_count) {
624  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
626  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
627  if (matching_gvi.first) {
628  return matching_gvi;
629  }
630  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
631  while (h_probe != h) {
633  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
634  if (matching_gvi.first) {
635  return matching_gvi;
636  }
637  h_probe = (h_probe + 1) % groups_buffer_entry_count;
638  }
639  return {nullptr, true};
640 }
641 
642 #define cas_cst(ptr, expected, desired) \
643  __atomic_compare_exchange_n( \
644  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
645 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
646 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
647 
648 template <typename T = int64_t>
650  int64_t* groups_buffer,
651  const uint32_t h,
652  const T* key,
653  const uint32_t key_count,
654  const QueryMemoryDescriptor& query_mem_desc,
655  const int64_t* that_buff_i64,
656  const size_t that_entry_idx,
657  const size_t that_entry_count,
658  const uint32_t row_size_quad) {
659  auto off = h * row_size_quad;
660  T empty_key = get_empty_key<T>();
661  T write_pending = get_empty_key<T>() - 1;
662  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
663  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
664  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
665  if (success) {
666  fill_slots(groups_buffer + off + slot_off_quad,
667  query_mem_desc.getEntryCount(),
668  that_buff_i64,
669  that_entry_idx,
670  that_entry_count,
671  query_mem_desc);
672  if (key_count > 1) {
673  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
674  }
675  store_cst(row_ptr, *key);
676  return {groups_buffer + off + slot_off_quad, true};
677  }
678  while (load_cst(row_ptr) == write_pending) {
679  // spin until the winning thread has finished writing the entire key and the init
680  // value
681  }
682  for (size_t i = 0; i < key_count; ++i) {
683  if (load_cst(row_ptr + i) != key[i]) {
684  return {nullptr, true};
685  }
686  }
687  return {groups_buffer + off + slot_off_quad, false};
688 }
689 
690 #undef load_cst
691 #undef store_cst
692 #undef cas_cst
693 
695  int64_t* groups_buffer,
696  const uint32_t h,
697  const int64_t* key,
698  const uint32_t key_count,
699  const size_t key_width,
700  const QueryMemoryDescriptor& query_mem_desc,
701  const int64_t* that_buff_i64,
702  const size_t that_entry_idx,
703  const size_t that_entry_count,
704  const uint32_t row_size_quad) {
705  switch (key_width) {
706  case 4:
707  return get_matching_group_value_reduction(groups_buffer,
708  h,
709  reinterpret_cast<const int32_t*>(key),
710  key_count,
711  query_mem_desc,
712  that_buff_i64,
713  that_entry_idx,
714  that_entry_count,
715  row_size_quad);
716  case 8:
717  return get_matching_group_value_reduction(groups_buffer,
718  h,
719  key,
720  key_count,
721  query_mem_desc,
722  that_buff_i64,
723  that_entry_idx,
724  that_entry_count,
725  row_size_quad);
726  default:
727  CHECK(false);
728  return {nullptr, true};
729  }
730 }
731 
732 } // namespace
733 
735  const uint32_t groups_buffer_entry_count,
736  const int64_t* key,
737  const uint32_t key_count,
738  const size_t key_width,
739  const QueryMemoryDescriptor& query_mem_desc,
740  const int64_t* that_buff_i64,
741  const size_t that_entry_idx,
742  const size_t that_entry_count,
743  const uint32_t row_size_quad) {
744  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
745  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
746  h,
747  key,
748  key_count,
749  key_width,
750  query_mem_desc,
751  that_buff_i64,
752  that_entry_idx,
753  that_entry_count,
754  row_size_quad);
755  if (matching_gvi.first) {
756  return matching_gvi;
757  }
758  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
759  while (h_probe != h) {
760  matching_gvi = get_matching_group_value_reduction(groups_buffer,
761  h_probe,
762  key,
763  key_count,
764  key_width,
765  query_mem_desc,
766  that_buff_i64,
767  that_entry_idx,
768  that_entry_count,
769  row_size_quad);
770  if (matching_gvi.first) {
771  return matching_gvi;
772  }
773  h_probe = (h_probe + 1) % groups_buffer_entry_count;
774  }
775  return {nullptr, true};
776 }
777 
778 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
779 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
781  const int8_t* that_buff,
782  const size_t that_entry_idx,
783  const size_t that_entry_count,
784  const ResultSetStorage& that) const {
785  check_watchdog(that_entry_idx);
786  const auto key_count = query_mem_desc_.getGroupbyColCount();
791  const auto key_off =
793  if (isEmptyEntry(that_entry_idx, that_buff)) {
794  return;
795  }
796  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
797  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
798  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
799  auto [this_entry_slots, empty_entry] = get_group_value_columnar_reduction(
800  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
801  CHECK(this_entry_slots);
802  if (empty_entry) {
803  fill_slots(this_entry_slots,
805  that_buff_i64,
806  that_entry_idx,
807  that_entry_count,
809  return;
810  }
812  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
813 }
814 
815 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
816  const int64_t* that_buff,
817  const size_t that_entry_idx,
818  const size_t that_entry_count,
819  const ResultSetStorage& that) const {
821  const auto key_count = query_mem_desc_.getGroupbyColCount();
822  size_t j = 0;
823  size_t init_agg_val_idx = 0;
824  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
825  ++target_logical_idx) {
826  const auto& target_info = targets_[target_logical_idx];
827  const auto that_slot_off = slot_offset_colwise(
828  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
829  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
830  reduceOneSlotBaseline(this_entry_slots,
831  this_slot_off,
832  that_buff,
833  that_entry_count,
834  that_slot_off,
835  target_info,
836  target_logical_idx,
837  j,
838  init_agg_val_idx,
839  that);
841  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
842  } else {
843  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
844  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
845  }
846  }
847  j = advance_slot(j, target_info, false);
848  }
849 }
850 
852  const size_t this_slot,
853  const int64_t* that_buff,
854  const size_t that_entry_count,
855  const size_t that_slot,
856  const TargetInfo& target_info,
857  const size_t target_logical_idx,
858  const size_t target_slot_idx,
859  const size_t init_agg_val_idx,
860  const ResultSetStorage& that) const {
862  int8_t* this_ptr2{nullptr};
863  const int8_t* that_ptr2{nullptr};
864  if (target_info.is_agg &&
865  (target_info.agg_kind == kAVG ||
866  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
867  const auto this_count_off = query_mem_desc_.getEntryCount();
868  const auto that_count_off = that_entry_count;
869  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
870  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
871  }
872  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
873  this_ptr2,
874  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
875  that_ptr2,
876  target_info,
877  target_logical_idx,
878  target_slot_idx,
879  init_agg_val_idx,
880  that,
881  target_slot_idx, // dummy, for now
882  {});
883 }
884 
885 // During the reduction of two result sets using the baseline strategy, we first create a
886 // big enough buffer to hold the entries for both and we move the entries from the first
887 // into it before doing the reduction as usual (into the first buffer).
888 template <class KeyType>
890  const size_t new_entry_count) const {
892  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
893  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
894  const auto key_count = query_mem_desc_.getGroupbyColCount();
897  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
898  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
899  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
900 
902  const size_t thread_count = cpu_threads();
903  std::vector<std::future<void>> move_threads;
904 
905  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
906  const auto thread_entry_count =
907  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
908  const auto start_index = thread_idx * thread_entry_count;
909  const auto end_index =
910  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
911  move_threads.emplace_back(std::async(
912  std::launch::async,
913  [this,
914  src_buff,
915  new_buff_i64,
916  new_entry_count,
917  start_index,
918  end_index,
919  key_count,
920  row_qw_count,
921  key_byte_width] {
922  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
923  moveOneEntryToBuffer<KeyType>(entry_idx,
924  new_buff_i64,
925  new_entry_count,
926  key_count,
927  row_qw_count,
928  src_buff,
929  key_byte_width);
930  }
931  }));
932  }
933  for (auto& move_thread : move_threads) {
934  move_thread.wait();
935  }
936  for (auto& move_thread : move_threads) {
937  move_thread.get();
938  }
939  } else {
940  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
941  moveOneEntryToBuffer<KeyType>(entry_idx,
942  new_buff_i64,
943  new_entry_count,
944  key_count,
945  row_qw_count,
946  src_buff,
947  key_byte_width);
948  }
949  }
950 }
951 
952 template <class KeyType>
953 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
954  int64_t* new_buff_i64,
955  const size_t new_entry_count,
956  const size_t key_count,
957  const size_t row_qw_count,
958  const int64_t* src_buff,
959  const size_t key_byte_width) const {
960  const auto key_off =
963  : row_qw_count * entry_index;
964  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
965  if (*key_ptr == get_empty_key<KeyType>()) {
966  return;
967  }
968  int64_t* new_entries_ptr{nullptr};
970  const auto key =
971  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
972  new_entries_ptr =
973  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
974  } else {
975  new_entries_ptr = get_group_value(new_buff_i64,
976  new_entry_count,
977  &src_buff[key_off],
978  key_count,
979  key_byte_width,
980  row_qw_count,
981  nullptr);
982  }
983  CHECK(new_entries_ptr);
984  fill_slots(new_entries_ptr,
985  new_entry_count,
986  src_buff,
987  entry_index,
990 }
991 
994  storage_->initializeColWise();
995  } else {
996  storage_->initializeRowWise();
997  }
998 }
999 
1000 // Driver for reductions. Needed because the result of a reduction on the baseline
1001 // layout, which can have collisions, cannot be done in place and something needs
1002 // to take the ownership of the new result set with the bigger underlying buffer.
1003 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets) {
1004  CHECK(!result_sets.empty());
1005  auto result_rs = result_sets.front();
1006  CHECK(result_rs->storage_);
1007  auto& first_result = *result_rs->storage_;
1008  auto result = &first_result;
1009  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1010  for (const auto result_set : result_sets) {
1011  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1012  }
1013  const auto executor = result_rs->executor_;
1014  for (const auto result_set : result_sets) {
1015  CHECK_EQ(executor, result_set->executor_);
1016  }
1017  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1019  const auto total_entry_count =
1020  std::accumulate(result_sets.begin(),
1021  result_sets.end(),
1022  size_t(0),
1023  [](const size_t init, const ResultSet* rs) {
1024  return init + rs->query_mem_desc_.getEntryCount();
1025  });
1026  CHECK(total_entry_count);
1027  auto query_mem_desc = first_result.query_mem_desc_;
1028  query_mem_desc.setEntryCount(total_entry_count);
1029  rs_.reset(new ResultSet(first_result.targets_,
1031  query_mem_desc,
1032  row_set_mem_owner,
1033  executor));
1034  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1035  rs_->initializeStorage();
1036  switch (query_mem_desc.getEffectiveKeyWidth()) {
1037  case 4:
1038  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1039  query_mem_desc.getEntryCount());
1040  break;
1041  case 8:
1042  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1043  query_mem_desc.getEntryCount());
1044  break;
1045  default:
1046  CHECK(false);
1047  }
1048  result = rs_->storage_.get();
1049  result_rs = rs_.get();
1050  }
1051 
1052  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1053  if (!serialized_varlen_buffer.empty()) {
1054  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1055  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1056  ++result_it) {
1057  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1058  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1059  serialized_varlen_buffer.emplace_back(
1060  std::move(result_serialized_varlen_buffer.front()));
1061  }
1062  }
1063 
1064  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1065  result_rs->getTargetInfos(),
1066  result_rs->getTargetInitVals());
1067  auto reduction_code = reduction_jit.codegen();
1068  size_t ctr = 1;
1069  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1070  ++result_it) {
1071  if (!serialized_varlen_buffer.empty()) {
1072  result->reduce(
1073  *((*result_it)->storage_), serialized_varlen_buffer[ctr++], reduction_code);
1074  } else {
1075  result->reduce(*((*result_it)->storage_), {}, reduction_code);
1076  }
1077  }
1078  return result_rs;
1079 }
1080 
1081 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1082  return rs_;
1083 }
1084 
1086  auto& result_storage = result_rs->storage_;
1087  result_storage->rewriteAggregateBufferOffsets(
1088  result_rs->serialized_varlen_buffer_.front());
1089 }
1090 
1091 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1092  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1093  const auto key_count = query_mem_desc_.getGroupbyColCount();
1094  CHECK_EQ(slot_count + key_count, entry.size());
1095  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1097  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1098  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1099  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1100  for (size_t i = 0; i < key_count; ++i) {
1101  this_buff[key_off + i] = entry[i];
1102  }
1103  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1104  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1105  this_buff[first_slot_off + i] = entry[key_count + i];
1106  }
1107 }
1108 
1110  const auto key_count = query_mem_desc_.getGroupbyColCount();
1111  const auto row_size = get_row_bytes(query_mem_desc_);
1112  CHECK_EQ(row_size % 8, 0u);
1113  const auto key_bytes_with_padding =
1117  case 4: {
1118  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1119  auto row_ptr = buff_ + i * row_size;
1120  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1121  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1122  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1123  slot_ptr[j] = target_init_vals_[j];
1124  }
1125  }
1126  break;
1127  }
1128  case 8: {
1129  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1130  auto row_ptr = buff_ + i * row_size;
1131  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1132  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1133  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1134  slot_ptr[j] = target_init_vals_[j];
1135  }
1136  }
1137  break;
1138  }
1139  default:
1140  CHECK(false);
1141  }
1142 }
1143 
1144 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1146  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1147  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1148  const auto key_count = query_mem_desc_.getGroupbyColCount();
1149  CHECK_EQ(slot_count + key_count, entry.size());
1150  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1151 
1152  for (size_t i = 0; i < key_count; i++) {
1153  const auto key_offset = key_offset_colwise(0, i, 1);
1154  this_buff[key_offset] = entry[i];
1155  }
1156 
1157  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1158  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1159  this_buff[slot_offset] = entry[key_count + i];
1160  }
1161 }
1162 
1164  const auto key_count = query_mem_desc_.getGroupbyColCount();
1165  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1167  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1168  const auto first_key_off =
1170  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1171  this_buff[first_key_off + i] = EMPTY_KEY_64;
1172  }
1173  }
1174  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1175  const auto first_val_off =
1176  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1177  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1178  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1179  }
1180  }
1181 }
1182 
1183 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1184  CHECK(entry_slots);
1186  size_t slot_off = 0;
1187  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1188  entry_slots[slot_off] = target_init_vals_[j];
1189  slot_off += query_mem_desc_.getEntryCount();
1190  }
1191  } else {
1192  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1193  entry_slots[j] = target_init_vals_[j];
1194  }
1195  }
1196 }
1197 
1198 #define AGGREGATE_ONE_VALUE( \
1199  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1200  do { \
1201  const auto sql_type = get_compact_type(agg_info__); \
1202  if (sql_type.is_fp()) { \
1203  if (chosen_bytes__ == sizeof(float)) { \
1204  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1205  *reinterpret_cast<const float*>(other_ptr__)); \
1206  } else { \
1207  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1208  *reinterpret_cast<const double*>(other_ptr__)); \
1209  } \
1210  } else { \
1211  if (chosen_bytes__ == sizeof(int32_t)) { \
1212  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1213  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1214  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1215  } else { \
1216  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1217  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1218  agg_##agg_kind__(val_ptr, *other_ptr); \
1219  } \
1220  } \
1221  } while (0)
1222 
1223 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1224  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1225  do { \
1226  if (agg_info__.skip_null_val) { \
1227  const auto sql_type = get_compact_type(agg_info__); \
1228  if (sql_type.is_fp()) { \
1229  if (chosen_bytes__ == sizeof(float)) { \
1230  agg_##agg_kind__##_float_skip_val( \
1231  reinterpret_cast<int32_t*>(val_ptr__), \
1232  *reinterpret_cast<const float*>(other_ptr__), \
1233  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1234  } else { \
1235  agg_##agg_kind__##_double_skip_val( \
1236  reinterpret_cast<int64_t*>(val_ptr__), \
1237  *reinterpret_cast<const double*>(other_ptr__), \
1238  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1239  } \
1240  } else { \
1241  if (chosen_bytes__ == sizeof(int32_t)) { \
1242  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1243  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1244  const auto null_val = static_cast<int32_t>(init_val__); \
1245  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1246  } else { \
1247  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1248  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1249  const auto null_val = static_cast<int64_t>(init_val__); \
1250  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1251  } \
1252  } \
1253  } else { \
1254  AGGREGATE_ONE_VALUE( \
1255  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1256  } \
1257  } while (0)
1258 
1259 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1260  do { \
1261  if (chosen_bytes__ == sizeof(int32_t)) { \
1262  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1263  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1264  agg_sum_int32(val_ptr, *other_ptr); \
1265  } else { \
1266  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1267  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1268  agg_sum(val_ptr, *other_ptr); \
1269  } \
1270  } while (0)
1271 
1272 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1273  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1274  { \
1275  if (agg_info__.skip_null_val) { \
1276  const auto sql_type = get_compact_type(agg_info__); \
1277  if (sql_type.is_fp()) { \
1278  if (chosen_bytes__ == sizeof(float)) { \
1279  agg_sum_float_skip_val( \
1280  reinterpret_cast<int32_t*>(val_ptr__), \
1281  *reinterpret_cast<const float*>(other_ptr__), \
1282  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1283  } else { \
1284  agg_sum_double_skip_val( \
1285  reinterpret_cast<int64_t*>(val_ptr__), \
1286  *reinterpret_cast<const double*>(other_ptr__), \
1287  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1288  } \
1289  } else { \
1290  if (chosen_bytes__ == sizeof(int32_t)) { \
1291  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1292  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1293  const auto null_val = static_cast<int32_t>(init_val__); \
1294  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1295  } else { \
1296  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1297  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1298  const auto null_val = static_cast<int64_t>(init_val__); \
1299  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1300  } \
1301  } \
1302  } else { \
1303  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1304  } \
1305  }
1306 
1307 // to be used for 8/16-bit kMIN and kMAX only
1308 #define AGGREGATE_ONE_VALUE_SMALL( \
1309  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1310  do { \
1311  if (chosen_bytes__ == sizeof(int16_t)) { \
1312  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1313  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1314  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1315  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1316  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1317  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1318  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1319  } else { \
1320  UNREACHABLE(); \
1321  } \
1322  } while (0)
1323 
1324 // to be used for 8/16-bit kMIN and kMAX only
1325 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1326  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1327  do { \
1328  if (agg_info__.skip_null_val) { \
1329  if (chosen_bytes__ == sizeof(int16_t)) { \
1330  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1331  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1332  const auto null_val = static_cast<int16_t>(init_val__); \
1333  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1334  } else if (chosen_bytes == sizeof(int8_t)) { \
1335  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1336  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1337  const auto null_val = static_cast<int8_t>(init_val__); \
1338  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1339  } \
1340  } else { \
1341  AGGREGATE_ONE_VALUE_SMALL( \
1342  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1343  } \
1344  } while (0)
1345 
1346 int8_t get_width_for_slot(const size_t target_slot_idx,
1347  const bool float_argument_input,
1348  const QueryMemoryDescriptor& query_mem_desc) {
1349  if (float_argument_input) {
1350  return sizeof(float);
1351  }
1352  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1353 }
1354 
1356  const TargetInfo& target_info,
1357  const size_t target_slot_idx,
1358  const size_t init_agg_val_idx,
1359  const int8_t* that_ptr1) const {
1360  const bool float_argument_input = takes_float_argument(target_info);
1361  const auto chosen_bytes =
1362  get_width_for_slot(target_slot_idx, float_argument_input, query_mem_desc_);
1363  auto init_val = target_init_vals_[init_agg_val_idx];
1364 
1365  auto reduce = [&](auto const& size_tag) {
1366  using CastTarget = std::decay_t<decltype(size_tag)>;
1367  const auto lhs_proj_col = *reinterpret_cast<const CastTarget*>(this_ptr1);
1368  const auto rhs_proj_col = *reinterpret_cast<const CastTarget*>(that_ptr1);
1369  if (rhs_proj_col == init_val) {
1370  // ignore
1371  } else if (lhs_proj_col == init_val) {
1372  *reinterpret_cast<CastTarget*>(this_ptr1) = rhs_proj_col;
1373  } else if (lhs_proj_col != rhs_proj_col) {
1374  throw std::runtime_error("Multiple distinct values encountered");
1375  }
1376  };
1377 
1378  switch (chosen_bytes) {
1379  case 1: {
1381  reduce(int8_t());
1382  break;
1383  }
1384  case 2: {
1386  reduce(int16_t());
1387  break;
1388  }
1389  case 4: {
1390  reduce(int32_t());
1391  break;
1392  }
1393  case 8: {
1394  CHECK(!target_info.sql_type.is_varlen());
1395  reduce(int64_t());
1396  break;
1397  }
1398  default:
1399  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1400  }
1401 }
1402 
1404  int8_t* this_ptr1,
1405  int8_t* this_ptr2,
1406  const int8_t* that_ptr1,
1407  const int8_t* that_ptr2,
1408  const TargetInfo& target_info,
1409  const size_t target_logical_idx,
1410  const size_t target_slot_idx,
1411  const size_t init_agg_val_idx,
1412  const ResultSetStorage& that,
1413  const size_t first_slot_idx_for_target,
1414  const std::vector<std::string>& serialized_varlen_buffer) const {
1416  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1417  return;
1418  }
1419  }
1420  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1421  const bool float_argument_input = takes_float_argument(target_info);
1422  const auto chosen_bytes =
1423  get_width_for_slot(target_slot_idx, float_argument_input, query_mem_desc_);
1424  auto init_val = target_init_vals_[init_agg_val_idx];
1425 
1426  if (target_info.is_agg && target_info.agg_kind == kSINGLE_VALUE) {
1428  this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1429  } else if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1430  switch (target_info.agg_kind) {
1431  case kCOUNT:
1432  case kAPPROX_COUNT_DISTINCT: {
1433  if (is_distinct_target(target_info)) {
1434  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1435  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1436  break;
1437  }
1438  CHECK_EQ(int64_t(0), init_val);
1439  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1440  break;
1441  }
1442  case kAVG: {
1443  // Ignore float argument compaction for count component for fear of its overflow
1444  AGGREGATE_ONE_COUNT(this_ptr2,
1445  that_ptr2,
1446  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1447  }
1448  // fall thru
1449  case kSUM: {
1451  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1452  break;
1453  }
1454  case kMIN: {
1455  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1457  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1458  } else {
1460  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1461  }
1462  break;
1463  }
1464  case kMAX: {
1465  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1467  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1468  } else {
1470  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1471  }
1472  break;
1473  }
1474  default:
1475  CHECK(false);
1476  }
1477  } else {
1478  switch (chosen_bytes) {
1479  case 1: {
1481  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1482  if (rhs_proj_col != init_val) {
1483  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1484  }
1485  break;
1486  }
1487  case 2: {
1489  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1490  if (rhs_proj_col != init_val) {
1491  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1492  }
1493  break;
1494  }
1495  case 4: {
1496  CHECK(target_info.agg_kind != kSAMPLE ||
1498  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1499  if (rhs_proj_col != init_val) {
1500  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1501  }
1502  break;
1503  }
1504  case 8: {
1505  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1506  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1507  !serialized_varlen_buffer.empty()) {
1508  size_t length_to_elems{0};
1509  if (target_info.sql_type.is_geometry()) {
1510  // TODO: Assumes hard-coded sizes for geometry targets
1511  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1512  } else {
1513  const auto& elem_ti = target_info.sql_type.get_elem_type();
1514  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1515  }
1516 
1517  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1518  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1519  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1520  *reinterpret_cast<int64_t*>(this_ptr1) =
1521  reinterpret_cast<const int64_t>(str_ptr);
1522  *reinterpret_cast<int64_t*>(this_ptr2) =
1523  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1524  } else {
1525  if (rhs_proj_col != init_val) {
1526  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1527  }
1528  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1529  CHECK(this_ptr2 && that_ptr2);
1530  *reinterpret_cast<int64_t*>(this_ptr2) =
1531  *reinterpret_cast<const int64_t*>(that_ptr2);
1532  }
1533  }
1534 
1535  break;
1536  }
1537  default:
1538  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1539  }
1540  }
1541 }
1542 
1544  const int8_t* that_ptr1,
1545  const size_t target_logical_idx,
1546  const ResultSetStorage& that) const {
1548  const auto& old_count_distinct_desc =
1549  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1550  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1551  const auto& new_count_distinct_desc =
1552  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1553  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1554  CHECK(this_ptr1 && that_ptr1);
1555  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1556  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1558  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1559 }
1560 
1561 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1562  const int8_t warp_count,
1563  const bool is_columnar,
1564  const bool replace_bitmap_ptr_with_bitmap_sz,
1565  std::vector<int64_t>& agg_vals,
1566  const QueryMemoryDescriptor& query_mem_desc,
1567  const std::vector<TargetInfo>& targets,
1568  const std::vector<int64_t>& agg_init_vals) {
1569  const size_t agg_col_count{agg_vals.size()};
1570  const auto row_size = query_mem_desc.getRowSize();
1571  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1572  CHECK_GE(agg_col_count, targets.size());
1573  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1574  CHECK(query_mem_desc.hasKeylessHash());
1575  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1576  bool discard_row = true;
1577  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1578  bool discard_partial_result = true;
1579  for (size_t target_idx = 0, agg_col_idx = 0;
1580  target_idx < targets.size() && agg_col_idx < agg_col_count;
1581  ++target_idx, ++agg_col_idx) {
1582  const auto& agg_info = targets[target_idx];
1583  const bool float_argument_input = takes_float_argument(agg_info);
1584  const auto chosen_bytes = float_argument_input
1585  ? sizeof(float)
1586  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1587  auto partial_bin_val = get_component(
1588  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1589  partial_agg_vals[agg_col_idx] = partial_bin_val;
1590  if (is_distinct_target(agg_info)) {
1591  CHECK_EQ(int8_t(1), warp_count);
1592  CHECK(agg_info.is_agg && (agg_info.agg_kind == kCOUNT ||
1593  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1594  partial_bin_val = count_distinct_set_size(
1595  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1596  if (replace_bitmap_ptr_with_bitmap_sz) {
1597  partial_agg_vals[agg_col_idx] = partial_bin_val;
1598  }
1599  }
1600  if (kAVG == agg_info.agg_kind) {
1601  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1602  ++agg_col_idx;
1603  partial_bin_val = partial_agg_vals[agg_col_idx] =
1604  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1605  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1606  }
1607  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1608  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1609  CHECK(agg_info.is_agg);
1610  discard_partial_result = false;
1611  }
1612  }
1613  row_ptr += row_size;
1614  if (discard_partial_result) {
1615  continue;
1616  }
1617  discard_row = false;
1618  for (size_t target_idx = 0, agg_col_idx = 0;
1619  target_idx < targets.size() && agg_col_idx < agg_col_count;
1620  ++target_idx, ++agg_col_idx) {
1621  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1622  const auto& agg_info = targets[target_idx];
1623  const bool float_argument_input = takes_float_argument(agg_info);
1624  const auto chosen_bytes = float_argument_input
1625  ? sizeof(float)
1626  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1627  const auto& chosen_type = get_compact_type(agg_info);
1628  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1629  try {
1630  switch (agg_info.agg_kind) {
1631  case kCOUNT:
1634  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1635  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1636  agg_init_vals[agg_col_idx],
1637  chosen_bytes,
1638  agg_info);
1639  break;
1640  case kAVG:
1641  // Ignore float argument compaction for count component for fear of its
1642  // overflow
1644  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1645  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1646  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1647  // fall thru
1648  case kSUM:
1650  sum,
1651  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1652  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1653  agg_init_vals[agg_col_idx],
1654  chosen_bytes,
1655  agg_info);
1656  break;
1657  case kMIN:
1658  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1660  min,
1661  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1662  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1663  agg_init_vals[agg_col_idx],
1664  chosen_bytes,
1665  agg_info);
1666  } else {
1668  min,
1669  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1670  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1671  agg_init_vals[agg_col_idx],
1672  chosen_bytes,
1673  agg_info);
1674  }
1675  break;
1676  case kMAX:
1677  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1679  max,
1680  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1681  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1682  agg_init_vals[agg_col_idx],
1683  chosen_bytes,
1684  agg_info);
1685  } else {
1687  max,
1688  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1689  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1690  agg_init_vals[agg_col_idx],
1691  chosen_bytes,
1692  agg_info);
1693  }
1694  break;
1695  default:
1696  CHECK(false);
1697  break;
1698  }
1699  } catch (std::runtime_error& e) {
1700  // TODO(miyu): handle the case where chosen_bytes < 8
1701  LOG(ERROR) << e.what();
1702  }
1703  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1704  switch (chosen_bytes) {
1705  case 8:
1706  break;
1707  case 4: {
1708  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1709  if (!(agg_info.agg_kind == kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1710  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1711  }
1712  break;
1713  }
1714  default:
1715  CHECK(false);
1716  }
1717  }
1718  if (kAVG == agg_info.agg_kind) {
1719  ++agg_col_idx;
1720  }
1721  } else {
1722  if (agg_info.agg_kind == kSAMPLE) {
1723  CHECK(!agg_info.sql_type.is_varlen())
1724  << "Interleaved bins reduction not supported for variable length "
1725  "arguments "
1726  "to SAMPLE";
1727  }
1728  if (agg_vals[agg_col_idx]) {
1729  if (agg_info.agg_kind == kSAMPLE) {
1730  continue;
1731  }
1732  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1733  } else {
1734  agg_vals[agg_col_idx] = partial_bin_val;
1735  }
1736  }
1737  }
1738  }
1739  return discard_row;
1740 }
std::pair< int64_t *, bool > GroupValueInfo
Definition: ResultSet.h:949
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
const int64_t const uint32_t const uint32_t const uint32_t agg_col_count
NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
void run_reduction_code(const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
#define EMPTY_KEY_64
bool is_string() const
Definition: sqltypes.h:416
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
const std::vector< TargetInfo > targets_
Definition: ResultSet.h:207
ssize_t getTargetGroupbyIndex(const size_t target_idx) const
std::vector< int64_t > target_init_vals_
Definition: ResultSet.h:211
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code) const
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
#define LOG(tag)
Definition: Logger.h:188
bool dynamic_watchdog()
size_t getCountDistinctDescriptorsSize() const
#define CHECK_GE(x, y)
Definition: Logger.h:210
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:133
const int64_t const uint32_t groups_buffer_entry_count
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
bool is_varlen() const
Definition: sqltypes.h:431
void initializeColWise() const
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
const int64_t const uint32_t const uint32_t key_qw_count
#define CHECK_GT(x, y)
Definition: Logger.h:209
friend class ResultSet
Definition: ResultSet.h:219
void rewriteVarlenAggregates(ResultSet *)
virtual ReductionCode codegen() const
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
Definition: sqldefs.h:73
const SQLTypeInfo get_compact_type(const TargetInfo &target)
#define AGGREGATE_ONE_NULLABLE_VALUE( agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
static EvalValue run(const Function *function, const std::vector< EvalValue > &inputs)
bool is_agg
Definition: TargetInfo.h:40
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
#define store_cst(ptr, val)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:75
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
Definition: sqldefs.h:75
ALWAYS_INLINE void check_watchdog(const size_t sample_seed)
#define LIKELY(x)
Definition: likely.h:19
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:129
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad, const int64_t *init_vals)
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
Definition: Execute.h:950
size_t targetGroupbyIndicesSize() const
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
SQLAgg agg_kind
Definition: TargetInfo.h:41
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:20
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
int8_t groupColWidth(const size_t key_idx) const
#define load_cst(ptr)
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
int32_t getTargetIdxForKey() const
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
#define CHECK_LT(x, y)
Definition: Logger.h:207
bool is_geometry() const
Definition: sqltypes.h:428
size_t getColOnlyOffInBytes(const size_t col_idx) const
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
int8_t * buff_
Definition: ResultSet.h:209
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:76
#define AGGREGATE_ONE_NULLABLE_COUNT( val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
void initializeStorage() const
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:623
#define CHECK(condition)
Definition: Logger.h:197
const ColSlotContext & getColSlotContext() const
#define EMPTY_KEY_32
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
static std::mutex compilation_mutex_
Definition: Execute.h:953
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer) const
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
QueryDescriptionType getQueryDescriptionType() const
Definition: sqldefs.h:74
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:24
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:72
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
ResultSet * reduce(std::vector< ResultSet *> &)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const
QueryMemoryDescriptor query_mem_desc_
Definition: ResultSet.h:208
size_t getEffectiveKeyWidth() const