OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ResultSetReduction.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "DynamicWatchdog.h"
26 #include "ResultSet.h"
28 #include "ResultSetReductionJIT.h"
29 #include "RuntimeFunctions.h"
30 #include "Shared/SqlTypesLayout.h"
31 
32 #include "Shared/likely.h"
33 #include "Shared/thread_count.h"
34 
35 #include <llvm/ExecutionEngine/GenericValue.h>
36 
37 #include <algorithm>
38 #include <future>
39 #include <numeric>
40 
41 extern bool g_enable_dynamic_watchdog;
42 
43 namespace {
44 
45 bool use_multithreaded_reduction(const size_t entry_count) {
46  return entry_count > 100000;
47 }
48 
50  const auto row_bytes = get_row_bytes(query_mem_desc);
51  CHECK_EQ(size_t(0), row_bytes % 8);
52  return row_bytes / 8;
53 }
54 
55 std::vector<int64_t> make_key(const int64_t* buff,
56  const size_t entry_count,
57  const size_t key_count) {
58  std::vector<int64_t> key;
59  size_t off = 0;
60  for (size_t i = 0; i < key_count; ++i) {
61  key.push_back(buff[off]);
62  off += entry_count;
63  }
64  return key;
65 }
66 
67 void fill_slots(int64_t* dst_entry,
68  const size_t dst_entry_count,
69  const int64_t* src_buff,
70  const size_t src_entry_idx,
71  const size_t src_entry_count,
73  const auto slot_count = query_mem_desc.getBufferColSlotCount();
74  const auto key_count = query_mem_desc.getGroupbyColCount();
75  if (query_mem_desc.didOutputColumnar()) {
76  for (size_t i = 0, dst_slot_off = 0; i < slot_count;
77  ++i, dst_slot_off += dst_entry_count) {
78  dst_entry[dst_slot_off] =
79  src_buff[slot_offset_colwise(src_entry_idx, i, key_count, src_entry_count)];
80  }
81  } else {
82  const auto row_ptr = src_buff + get_row_qw_count(query_mem_desc) * src_entry_idx;
83  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
84  for (size_t i = 0; i < slot_count; ++i) {
85  dst_entry[i] = row_ptr[slot_off_quad + i];
86  }
87  }
88 }
89 
91 void fill_empty_key_32(int32_t* key_ptr_i32, const size_t key_count) {
92  for (size_t i = 0; i < key_count; ++i) {
93  key_ptr_i32[i] = EMPTY_KEY_32;
94  }
95 }
96 
98 void fill_empty_key_64(int64_t* key_ptr_i64, const size_t key_count) {
99  for (size_t i = 0; i < key_count; ++i) {
100  key_ptr_i64[i] = EMPTY_KEY_64;
101  }
102 }
103 
104 inline int64_t get_component(const int8_t* group_by_buffer,
105  const size_t comp_sz,
106  const size_t index = 0) {
107  int64_t ret = std::numeric_limits<int64_t>::min();
108  switch (comp_sz) {
109  case 1: {
110  ret = group_by_buffer[index];
111  break;
112  }
113  case 2: {
114  const int16_t* buffer_ptr = reinterpret_cast<const int16_t*>(group_by_buffer);
115  ret = buffer_ptr[index];
116  break;
117  }
118  case 4: {
119  const int32_t* buffer_ptr = reinterpret_cast<const int32_t*>(group_by_buffer);
120  ret = buffer_ptr[index];
121  break;
122  }
123  case 8: {
124  const int64_t* buffer_ptr = reinterpret_cast<const int64_t*>(group_by_buffer);
125  ret = buffer_ptr[index];
126  break;
127  }
128  default:
129  CHECK(false);
130  }
131  return ret;
132 }
133 
134 void run_reduction_code(const ReductionCode& reduction_code,
135  int8_t* this_buff,
136  const int8_t* that_buff,
137  const int32_t start_entry_index,
138  const int32_t end_entry_index,
139  const int32_t that_entry_count,
140  const void* this_qmd,
141  const void* that_qmd,
142  const void* serialized_varlen_buffer) {
143  int err = 0;
144  if (reduction_code.func_ptr) {
145  err = reduction_code.func_ptr(this_buff,
146  that_buff,
147  start_entry_index,
148  end_entry_index,
149  that_entry_count,
150  this_qmd,
151  that_qmd,
152  serialized_varlen_buffer);
153  } else {
154  auto ret = ReductionInterpreter::run(
155  reduction_code.ir_reduce_loop.get(),
156  {ReductionInterpreter::EvalValue{.ptr = this_buff},
157  ReductionInterpreter::EvalValue{.ptr = that_buff},
158  ReductionInterpreter::EvalValue{.int_val = start_entry_index},
159  ReductionInterpreter::EvalValue{.int_val = end_entry_index},
160  ReductionInterpreter::EvalValue{.int_val = that_entry_count},
163  ReductionInterpreter::EvalValue{.ptr = serialized_varlen_buffer}});
164  err = ret.int_val;
165  }
166  if (err) {
167  throw std::runtime_error(
168  "Query execution has exceeded the time limit or was interrupted during result "
169  "set reduction");
170  }
171 }
172 
173 } // namespace
174 
175 void fill_empty_key(void* key_ptr, const size_t key_count, const size_t key_width) {
176  switch (key_width) {
177  case 4: {
178  auto key_ptr_i32 = reinterpret_cast<int32_t*>(key_ptr);
179  fill_empty_key_32(key_ptr_i32, key_count);
180  break;
181  }
182  case 8: {
183  auto key_ptr_i64 = reinterpret_cast<int64_t*>(key_ptr);
184  fill_empty_key_64(key_ptr_i64, key_count);
185  break;
186  }
187  default:
188  CHECK(false);
189  }
190 }
191 
192 // Driver method for various buffer layouts, actual work is done by reduceOne* methods.
193 // Reduces the entries of `that` into the buffer of this ResultSetStorage object.
195  const std::vector<std::string>& serialized_varlen_buffer,
196  const ReductionCode& reduction_code) const {
197  auto entry_count = query_mem_desc_.getEntryCount();
198  CHECK_GT(entry_count, size_t(0));
206  }
207  const auto that_entry_count = that.query_mem_desc_.getEntryCount();
210  CHECK_GE(entry_count, that_entry_count);
211  break;
212  default:
213  CHECK_EQ(entry_count, that_entry_count);
214  }
215  auto this_buff = buff_;
216  CHECK(this_buff);
217  auto that_buff = that.buff_;
218  CHECK(that_buff);
221  if (!serialized_varlen_buffer.empty()) {
222  throw std::runtime_error(
223  "Projection of variable length targets with baseline hash group by is not yet "
224  "supported in Distributed mode");
225  }
226  if (use_multithreaded_reduction(that_entry_count)) {
227  const size_t thread_count = cpu_threads();
228  std::vector<std::future<void>> reduction_threads;
229  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
230  const auto thread_entry_count =
231  (that_entry_count + thread_count - 1) / thread_count;
232  const auto start_index = thread_idx * thread_entry_count;
233  const auto end_index =
234  std::min(start_index + thread_entry_count, that_entry_count);
235  reduction_threads.emplace_back(std::async(
236  std::launch::async,
237  [this,
238  this_buff,
239  that_buff,
240  start_index,
241  end_index,
242  that_entry_count,
243  &reduction_code,
244  &that] {
245  if (reduction_code.ir_reduce_loop) {
246  run_reduction_code(reduction_code,
247  this_buff,
248  that_buff,
249  start_index,
250  end_index,
251  that_entry_count,
253  &that.query_mem_desc_,
254  nullptr);
255  } else {
256  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
258  this_buff, that_buff, entry_idx, that_entry_count, that);
259  }
260  }
261  }));
262  }
263  for (auto& reduction_thread : reduction_threads) {
264  reduction_thread.wait();
265  }
266  for (auto& reduction_thread : reduction_threads) {
267  reduction_thread.get();
268  }
269  } else {
270  if (reduction_code.ir_reduce_loop) {
271  run_reduction_code(reduction_code,
272  this_buff,
273  that_buff,
274  0,
275  that_entry_count,
276  that_entry_count,
278  &that.query_mem_desc_,
279  nullptr);
280  } else {
281  for (size_t i = 0; i < that_entry_count; ++i) {
282  reduceOneEntryBaseline(this_buff, that_buff, i, that_entry_count, that);
283  }
284  }
285  }
286  return;
287  }
288  if (use_multithreaded_reduction(entry_count)) {
289  const size_t thread_count = cpu_threads();
290  std::vector<std::future<void>> reduction_threads;
291  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
292  const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
293  const auto start_index = thread_idx * thread_entry_count;
294  const auto end_index = std::min(start_index + thread_entry_count, entry_count);
295  if (query_mem_desc_.didOutputColumnar()) {
296  reduction_threads.emplace_back(std::async(std::launch::async,
297  [this,
298  this_buff,
299  that_buff,
300  start_index,
301  end_index,
302  &that,
303  &serialized_varlen_buffer] {
304  reduceEntriesNoCollisionsColWise(
305  this_buff,
306  that_buff,
307  that,
308  start_index,
309  end_index,
310  serialized_varlen_buffer);
311  }));
312  } else {
313  reduction_threads.emplace_back(std::async(std::launch::async,
314  [this,
315  this_buff,
316  that_buff,
317  start_index,
318  end_index,
319  that_entry_count,
320  &reduction_code,
321  &that,
322  &serialized_varlen_buffer] {
323  CHECK(reduction_code.ir_reduce_loop);
325  reduction_code,
326  this_buff,
327  that_buff,
328  start_index,
329  end_index,
330  that_entry_count,
331  &query_mem_desc_,
332  &that.query_mem_desc_,
333  &serialized_varlen_buffer);
334  }));
335  }
336  }
337  for (auto& reduction_thread : reduction_threads) {
338  reduction_thread.wait();
339  }
340  for (auto& reduction_thread : reduction_threads) {
341  reduction_thread.get();
342  }
343  } else {
344  if (query_mem_desc_.didOutputColumnar()) {
345  reduceEntriesNoCollisionsColWise(this_buff,
346  that_buff,
347  that,
348  0,
349  query_mem_desc_.getEntryCount(),
350  serialized_varlen_buffer);
351  } else {
352  CHECK(reduction_code.ir_reduce_loop);
353  run_reduction_code(reduction_code,
354  this_buff,
355  that_buff,
356  0,
357  entry_count,
358  that_entry_count,
359  &query_mem_desc_,
360  &that.query_mem_desc_,
361  &serialized_varlen_buffer);
362  }
363  }
364 }
365 
366 namespace {
367 
368 ALWAYS_INLINE void check_watchdog(const size_t sample_seed) {
369  if (UNLIKELY(g_enable_dynamic_watchdog && (sample_seed & 0x3F) == 0 &&
370  dynamic_watchdog())) {
371  // TODO(alex): distinguish between the deadline and interrupt
372  throw std::runtime_error(
373  "Query execution has exceeded the time limit or was interrupted during result "
374  "set reduction");
375  }
376 }
377 
378 } // namespace
379 
381  int8_t* this_buff,
382  const int8_t* that_buff,
383  const ResultSetStorage& that,
384  const size_t start_index,
385  const size_t end_index,
386  const std::vector<std::string>& serialized_varlen_buffer) const {
387  // TODO(adb / saman): Support column wise output when serializing distributed agg
388  // functions
389  CHECK(serialized_varlen_buffer.empty());
390 
391  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
392 
393  auto this_crt_col_ptr = get_cols_ptr(this_buff, query_mem_desc_);
394  auto that_crt_col_ptr = get_cols_ptr(that_buff, query_mem_desc_);
395  for (size_t target_idx = 0; target_idx < targets_.size(); ++target_idx) {
396  const auto& agg_info = targets_[target_idx];
397  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
398 
399  bool two_slot_target{false};
400  if (agg_info.is_agg &&
401  (agg_info.agg_kind == kAVG ||
402  (agg_info.agg_kind == kSAMPLE && agg_info.sql_type.is_varlen()))) {
403  // Note that this assumes if one of the slot pairs in a given target is an array,
404  // all slot pairs are arrays. Currently this is true for all geo targets, but we
405  // should better codify and store this information in the future
406  two_slot_target = true;
407  }
408 
409  for (size_t target_slot_idx = slots_for_col.front();
410  target_slot_idx < slots_for_col.back() + 1;
411  target_slot_idx += 2) {
412  const auto this_next_col_ptr = advance_to_next_columnar_target_buff(
413  this_crt_col_ptr, query_mem_desc_, target_slot_idx);
414  const auto that_next_col_ptr = advance_to_next_columnar_target_buff(
415  that_crt_col_ptr, query_mem_desc_, target_slot_idx);
416 
417  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
418  check_watchdog(entry_idx);
419  if (isEmptyEntryColumnar(entry_idx, that_buff)) {
420  continue;
421  }
423  // copy the key from right hand side
424  copyKeyColWise(entry_idx, this_buff, that_buff);
425  }
426  auto this_ptr1 =
427  this_crt_col_ptr +
428  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
429  auto that_ptr1 =
430  that_crt_col_ptr +
431  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
432  int8_t* this_ptr2{nullptr};
433  const int8_t* that_ptr2{nullptr};
434  if (UNLIKELY(two_slot_target)) {
435  this_ptr2 =
436  this_next_col_ptr +
437  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
438  that_ptr2 =
439  that_next_col_ptr +
440  entry_idx * query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1);
441  }
442  reduceOneSlot(this_ptr1,
443  this_ptr2,
444  that_ptr1,
445  that_ptr2,
446  agg_info,
447  target_idx,
448  target_slot_idx,
449  target_slot_idx,
450  that,
451  slots_for_col.front(),
452  serialized_varlen_buffer);
453  }
454 
455  this_crt_col_ptr = this_next_col_ptr;
456  that_crt_col_ptr = that_next_col_ptr;
457  if (UNLIKELY(two_slot_target)) {
458  this_crt_col_ptr = advance_to_next_columnar_target_buff(
459  this_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
460  that_crt_col_ptr = advance_to_next_columnar_target_buff(
461  that_crt_col_ptr, query_mem_desc_, target_slot_idx + 1);
462  }
463  }
464  }
465 }
466 
467 /*
468  * copy all keys from the columnar prepended group buffer of "that_buff" into
469  * "this_buff"
470  */
471 void ResultSetStorage::copyKeyColWise(const size_t entry_idx,
472  int8_t* this_buff,
473  const int8_t* that_buff) const {
475  for (size_t group_idx = 0; group_idx < query_mem_desc_.getGroupbyColCount();
476  group_idx++) {
477  // if the column corresponds to a group key
478  const auto column_offset_bytes =
480  auto lhs_key_ptr = this_buff + column_offset_bytes;
481  auto rhs_key_ptr = that_buff + column_offset_bytes;
482  switch (query_mem_desc_.groupColWidth(group_idx)) {
483  case 8:
484  *(reinterpret_cast<int64_t*>(lhs_key_ptr) + entry_idx) =
485  *(reinterpret_cast<const int64_t*>(rhs_key_ptr) + entry_idx);
486  break;
487  case 4:
488  *(reinterpret_cast<int32_t*>(lhs_key_ptr) + entry_idx) =
489  *(reinterpret_cast<const int32_t*>(rhs_key_ptr) + entry_idx);
490  break;
491  case 2:
492  *(reinterpret_cast<int16_t*>(lhs_key_ptr) + entry_idx) =
493  *(reinterpret_cast<const int16_t*>(rhs_key_ptr) + entry_idx);
494  break;
495  case 1:
496  *(reinterpret_cast<int8_t*>(lhs_key_ptr) + entry_idx) =
497  *(reinterpret_cast<const int8_t*>(rhs_key_ptr) + entry_idx);
498  break;
499  default:
500  CHECK(false);
501  break;
502  }
503  }
504 }
505 
506 // Rewrites the entries of this ResultSetStorage object to point directly into the
507 // serialized_varlen_buffer rather than using offsets.
509  const std::vector<std::string>& serialized_varlen_buffer) const {
510  if (serialized_varlen_buffer.empty()) {
511  return;
512  }
513 
515  auto entry_count = query_mem_desc_.getEntryCount();
516  CHECK_GT(entry_count, size_t(0));
517  CHECK(buff_);
518 
519  // Row-wise iteration, consider moving to separate function
520  for (size_t i = 0; i < entry_count; ++i) {
521  if (isEmptyEntry(i, buff_)) {
522  continue;
523  }
524  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
525  const auto key_bytes_with_padding = align_to_int64(key_bytes);
526  auto rowwise_targets_ptr =
527  row_ptr_rowwise(buff_, query_mem_desc_, i) + key_bytes_with_padding;
528  size_t target_slot_idx = 0;
529  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
530  ++target_logical_idx) {
531  const auto& target_info = targets_[target_logical_idx];
532  if (target_info.sql_type.is_varlen() && target_info.is_agg) {
533  CHECK(target_info.agg_kind == kSAMPLE);
534  auto ptr1 = rowwise_targets_ptr;
535  auto slot_idx = target_slot_idx;
536  auto ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
537  auto offset = *reinterpret_cast<const int64_t*>(ptr1);
538 
539  const auto& elem_ti = target_info.sql_type.get_elem_type();
540  size_t length_to_elems =
541  target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
542  ? 1
543  : elem_ti.get_size();
544  if (target_info.sql_type.is_geometry()) {
545  for (int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
546  if (j > 0) {
547  ptr1 = ptr2 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 1);
548  ptr2 = ptr1 + query_mem_desc_.getPaddedSlotWidthBytes(slot_idx + 2);
549  slot_idx += 2;
550  length_to_elems = 4;
551  }
552  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
553  const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
554  const auto str_ptr =
555  reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
556  CHECK(ptr1);
557  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
558  CHECK(ptr2);
559  *reinterpret_cast<int64_t*>(ptr2) =
560  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
561  }
562  } else {
563  CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
564  const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
565  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
566  CHECK(ptr1);
567  *reinterpret_cast<int64_t*>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
568  CHECK(ptr2);
569  *reinterpret_cast<int64_t*>(ptr2) =
570  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
571  }
572  }
573 
574  rowwise_targets_ptr = advance_target_ptr_row_wise(
575  rowwise_targets_ptr, target_info, target_slot_idx, query_mem_desc_, false);
576  target_slot_idx = advance_slot(target_slot_idx, target_info, false);
577  }
578  }
579 
580  return;
581 }
582 
583 namespace {
584 
586  const uint32_t h,
587  const int64_t* key,
588  const uint32_t key_qw_count,
589  const size_t entry_count) {
590  auto off = h;
591  const auto old_key =
592  __sync_val_compare_and_swap(&groups_buffer[off], EMPTY_KEY_64, *key);
593  if (old_key == EMPTY_KEY_64) {
594  for (size_t i = 0; i < key_qw_count; ++i) {
595  groups_buffer[off] = key[i];
596  off += entry_count;
597  }
598  return {&groups_buffer[off], true};
599  }
600  off = h;
601  for (size_t i = 0; i < key_qw_count; ++i) {
602  if (groups_buffer[off] != key[i]) {
603  return {nullptr, true};
604  }
605  off += entry_count;
606  }
607  return {&groups_buffer[off], false};
608 }
609 
610 // TODO(alex): fix synchronization when we enable it
612  int64_t* groups_buffer,
613  const uint32_t groups_buffer_entry_count,
614  const int64_t* key,
615  const uint32_t key_qw_count) {
616  uint32_t h = key_hash(key, key_qw_count, sizeof(int64_t)) % groups_buffer_entry_count;
618  groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
619  if (matching_gvi.first) {
620  return matching_gvi;
621  }
622  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
623  while (h_probe != h) {
625  groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
626  if (matching_gvi.first) {
627  return matching_gvi;
628  }
629  h_probe = (h_probe + 1) % groups_buffer_entry_count;
630  }
631  return {nullptr, true};
632 }
633 
634 #define cas_cst(ptr, expected, desired) \
635  __atomic_compare_exchange_n( \
636  ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
637 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
638 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
639 
640 template <typename T = int64_t>
642  int64_t* groups_buffer,
643  const uint32_t h,
644  const T* key,
645  const uint32_t key_count,
647  const int64_t* that_buff_i64,
648  const size_t that_entry_idx,
649  const size_t that_entry_count,
650  const uint32_t row_size_quad) {
651  auto off = h * row_size_quad;
652  T empty_key = get_empty_key<T>();
653  T write_pending = get_empty_key<T>() - 1;
654  auto row_ptr = reinterpret_cast<T*>(groups_buffer + off);
655  const auto slot_off_quad = get_slot_off_quad(query_mem_desc);
656  const bool success = cas_cst(row_ptr, &empty_key, write_pending);
657  if (success) {
658  fill_slots(groups_buffer + off + slot_off_quad,
659  query_mem_desc.getEntryCount(),
660  that_buff_i64,
661  that_entry_idx,
662  that_entry_count,
664  if (key_count > 1) {
665  memcpy(row_ptr + 1, key + 1, (key_count - 1) * sizeof(T));
666  }
667  store_cst(row_ptr, *key);
668  return {groups_buffer + off + slot_off_quad, true};
669  }
670  while (load_cst(row_ptr) == write_pending) {
671  // spin until the winning thread has finished writing the entire key and the init
672  // value
673  }
674  for (size_t i = 0; i < key_count; ++i) {
675  if (load_cst(row_ptr + i) != key[i]) {
676  return {nullptr, true};
677  }
678  }
679  return {groups_buffer + off + slot_off_quad, false};
680 }
681 
682 #undef load_cst
683 #undef store_cst
684 #undef cas_cst
685 
687  int64_t* groups_buffer,
688  const uint32_t h,
689  const int64_t* key,
690  const uint32_t key_count,
691  const size_t key_width,
693  const int64_t* that_buff_i64,
694  const size_t that_entry_idx,
695  const size_t that_entry_count,
696  const uint32_t row_size_quad) {
697  switch (key_width) {
698  case 4:
699  return get_matching_group_value_reduction(groups_buffer,
700  h,
701  reinterpret_cast<const int32_t*>(key),
702  key_count,
703  query_mem_desc,
704  that_buff_i64,
705  that_entry_idx,
706  that_entry_count,
707  row_size_quad);
708  case 8:
709  return get_matching_group_value_reduction(groups_buffer,
710  h,
711  key,
712  key_count,
713  query_mem_desc,
714  that_buff_i64,
715  that_entry_idx,
716  that_entry_count,
717  row_size_quad);
718  default:
719  CHECK(false);
720  return {nullptr, true};
721  }
722 }
723 
724 } // namespace
725 
727  const uint32_t groups_buffer_entry_count,
728  const int64_t* key,
729  const uint32_t key_count,
730  const size_t key_width,
732  const int64_t* that_buff_i64,
733  const size_t that_entry_idx,
734  const size_t that_entry_count,
735  const uint32_t row_size_quad) {
736  uint32_t h = key_hash(key, key_count, key_width) % groups_buffer_entry_count;
737  auto matching_gvi = get_matching_group_value_reduction(groups_buffer,
738  h,
739  key,
740  key_count,
741  key_width,
742  query_mem_desc,
743  that_buff_i64,
744  that_entry_idx,
745  that_entry_count,
746  row_size_quad);
747  if (matching_gvi.first) {
748  return matching_gvi;
749  }
750  uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
751  while (h_probe != h) {
752  matching_gvi = get_matching_group_value_reduction(groups_buffer,
753  h_probe,
754  key,
755  key_count,
756  key_width,
757  query_mem_desc,
758  that_buff_i64,
759  that_entry_idx,
760  that_entry_count,
761  row_size_quad);
762  if (matching_gvi.first) {
763  return matching_gvi;
764  }
765  h_probe = (h_probe + 1) % groups_buffer_entry_count;
766  }
767  return {nullptr, true};
768 }
769 
770 // Reduces entry at position that_entry_idx in that_buff into this_buff. This is
771 // the baseline layout, so the position in this_buff isn't known to be that_entry_idx.
773  const int8_t* that_buff,
774  const size_t that_entry_idx,
775  const size_t that_entry_count,
776  const ResultSetStorage& that) const {
777  check_watchdog(that_entry_idx);
778  const auto key_count = query_mem_desc_.getGroupbyColCount();
783  const auto key_off =
785  if (isEmptyEntry(that_entry_idx, that_buff)) {
786  return;
787  }
788  int64_t* this_entry_slots{nullptr};
789  auto this_buff_i64 = reinterpret_cast<int64_t*>(this_buff);
790  auto that_buff_i64 = reinterpret_cast<const int64_t*>(that_buff);
791  bool empty_entry = false;
792  const auto key = make_key(&that_buff_i64[key_off], that_entry_count, key_count);
793  std::tie(this_entry_slots, empty_entry) = get_group_value_columnar_reduction(
794  this_buff_i64, query_mem_desc_.getEntryCount(), &key[0], key_count);
795  CHECK(this_entry_slots);
796  if (empty_entry) {
797  fill_slots(this_entry_slots,
799  that_buff_i64,
800  that_entry_idx,
801  that_entry_count,
803  return;
804  }
806  this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
807 }
808 
809 void ResultSetStorage::reduceOneEntrySlotsBaseline(int64_t* this_entry_slots,
810  const int64_t* that_buff,
811  const size_t that_entry_idx,
812  const size_t that_entry_count,
813  const ResultSetStorage& that) const {
815  const auto key_count = query_mem_desc_.getGroupbyColCount();
816  size_t j = 0;
817  size_t init_agg_val_idx = 0;
818  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
819  ++target_logical_idx) {
820  const auto& target_info = targets_[target_logical_idx];
821  const auto that_slot_off = slot_offset_colwise(
822  that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
823  const auto this_slot_off = init_agg_val_idx * query_mem_desc_.getEntryCount();
824  reduceOneSlotBaseline(this_entry_slots,
825  this_slot_off,
826  that_buff,
827  that_entry_count,
828  that_slot_off,
829  target_info,
830  target_logical_idx,
831  j,
832  init_agg_val_idx,
833  that);
835  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
836  } else {
837  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
838  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
839  }
840  }
841  j = advance_slot(j, target_info, false);
842  }
843 }
844 
846  const size_t this_slot,
847  const int64_t* that_buff,
848  const size_t that_entry_count,
849  const size_t that_slot,
850  const TargetInfo& target_info,
851  const size_t target_logical_idx,
852  const size_t target_slot_idx,
853  const size_t init_agg_val_idx,
854  const ResultSetStorage& that) const {
856  int8_t* this_ptr2{nullptr};
857  const int8_t* that_ptr2{nullptr};
858  if (target_info.is_agg &&
859  (target_info.agg_kind == kAVG ||
860  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
861  const auto this_count_off = query_mem_desc_.getEntryCount();
862  const auto that_count_off = that_entry_count;
863  this_ptr2 = reinterpret_cast<int8_t*>(&this_buff[this_slot + this_count_off]);
864  that_ptr2 = reinterpret_cast<const int8_t*>(&that_buff[that_slot + that_count_off]);
865  }
866  reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
867  this_ptr2,
868  reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
869  that_ptr2,
870  target_info,
871  target_logical_idx,
872  target_slot_idx,
873  init_agg_val_idx,
874  that,
875  target_slot_idx, // dummy, for now
876  {});
877 }
878 
879 // During the reduction of two result sets using the baseline strategy, we first create a
880 // big enough buffer to hold the entries for both and we move the entries from the first
881 // into it before doing the reduction as usual (into the first buffer).
882 template <class KeyType>
884  const size_t new_entry_count) const {
886  CHECK_GT(new_entry_count, query_mem_desc_.getEntryCount());
887  auto new_buff_i64 = reinterpret_cast<int64_t*>(new_buff);
888  const auto key_count = query_mem_desc_.getGroupbyColCount();
891  const auto src_buff = reinterpret_cast<const int64_t*>(buff_);
892  const auto row_qw_count = get_row_qw_count(query_mem_desc_);
893  const auto key_byte_width = query_mem_desc_.getEffectiveKeyWidth();
894 
896  const size_t thread_count = cpu_threads();
897  std::vector<std::future<void>> move_threads;
898 
899  for (size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
900  const auto thread_entry_count =
901  (query_mem_desc_.getEntryCount() + thread_count - 1) / thread_count;
902  const auto start_index = thread_idx * thread_entry_count;
903  const auto end_index =
904  std::min(start_index + thread_entry_count, query_mem_desc_.getEntryCount());
905  move_threads.emplace_back(std::async(
906  std::launch::async,
907  [this,
908  src_buff,
909  new_buff_i64,
910  new_entry_count,
911  start_index,
912  end_index,
913  key_count,
914  row_qw_count,
915  key_byte_width] {
916  for (size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
917  moveOneEntryToBuffer<KeyType>(entry_idx,
918  new_buff_i64,
919  new_entry_count,
920  key_count,
921  row_qw_count,
922  src_buff,
923  key_byte_width);
924  }
925  }));
926  }
927  for (auto& move_thread : move_threads) {
928  move_thread.wait();
929  }
930  for (auto& move_thread : move_threads) {
931  move_thread.get();
932  }
933  } else {
934  for (size_t entry_idx = 0; entry_idx < query_mem_desc_.getEntryCount(); ++entry_idx) {
935  moveOneEntryToBuffer<KeyType>(entry_idx,
936  new_buff_i64,
937  new_entry_count,
938  key_count,
939  row_qw_count,
940  src_buff,
941  key_byte_width);
942  }
943  }
944 }
945 
946 template <class KeyType>
947 void ResultSetStorage::moveOneEntryToBuffer(const size_t entry_index,
948  int64_t* new_buff_i64,
949  const size_t new_entry_count,
950  const size_t key_count,
951  const size_t row_qw_count,
952  const int64_t* src_buff,
953  const size_t key_byte_width) const {
954  const auto key_off =
957  : row_qw_count * entry_index;
958  const auto key_ptr = reinterpret_cast<const KeyType*>(&src_buff[key_off]);
959  if (*key_ptr == get_empty_key<KeyType>()) {
960  return;
961  }
962  int64_t* new_entries_ptr{nullptr};
964  const auto key =
965  make_key(&src_buff[key_off], query_mem_desc_.getEntryCount(), key_count);
966  new_entries_ptr =
967  get_group_value_columnar(new_buff_i64, new_entry_count, &key[0], key_count);
968  } else {
969  new_entries_ptr = get_group_value(new_buff_i64,
970  new_entry_count,
971  &src_buff[key_off],
972  key_count,
973  key_byte_width,
974  row_qw_count,
975  nullptr);
976  }
977  CHECK(new_entries_ptr);
978  fill_slots(new_entries_ptr,
979  new_entry_count,
980  src_buff,
981  entry_index,
984 }
985 
987  if (query_mem_desc_.didOutputColumnar()) {
988  storage_->initializeColWise();
989  } else {
990  storage_->initializeRowWise();
991  }
992 }
993 
994 // Driver for reductions. Needed because the result of a reduction on the baseline
995 // layout, which can have collisions, cannot be done in place and something needs
996 // to take the ownership of the new result set with the bigger underlying buffer.
997 ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets) {
998  CHECK(!result_sets.empty());
999  auto result_rs = result_sets.front();
1000  CHECK(result_rs->storage_);
1001  auto& first_result = *result_rs->storage_;
1002  auto result = &first_result;
1003  const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1004  for (const auto result_set : result_sets) {
1005  CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1006  }
1007  const auto executor = result_rs->executor_;
1008  for (const auto result_set : result_sets) {
1009  CHECK_EQ(executor, result_set->executor_);
1010  }
1011  if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1013  const auto total_entry_count =
1014  std::accumulate(result_sets.begin(),
1015  result_sets.end(),
1016  size_t(0),
1017  [](const size_t init, const ResultSet* rs) {
1018  return init + rs->query_mem_desc_.getEntryCount();
1019  });
1020  CHECK(total_entry_count);
1021  auto query_mem_desc = first_result.query_mem_desc_;
1022  query_mem_desc.setEntryCount(total_entry_count);
1023  rs_.reset(new ResultSet(first_result.targets_,
1026  row_set_mem_owner,
1027  executor));
1028  auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
1029  rs_->initializeStorage();
1030  switch (query_mem_desc.getEffectiveKeyWidth()) {
1031  case 4:
1032  first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1033  query_mem_desc.getEntryCount());
1034  break;
1035  case 8:
1036  first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1037  query_mem_desc.getEntryCount());
1038  break;
1039  default:
1040  CHECK(false);
1041  }
1042  result = rs_->storage_.get();
1043  result_rs = rs_.get();
1044  }
1045 
1046  auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1047  if (!serialized_varlen_buffer.empty()) {
1048  result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1049  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1050  ++result_it) {
1051  auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1052  CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1053  serialized_varlen_buffer.emplace_back(
1054  std::move(result_serialized_varlen_buffer.front()));
1055  }
1056  }
1057 
1058  ResultSetReductionJIT reduction_jit(result_rs->getQueryMemDesc(),
1059  result_rs->getTargetInfos(),
1060  result_rs->getTargetInitVals());
1061  auto reduction_code = reduction_jit.codegen();
1062  size_t ctr = 1;
1063  for (auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1064  ++result_it) {
1065  if (!serialized_varlen_buffer.empty()) {
1066  result->reduce(
1067  *((*result_it)->storage_), serialized_varlen_buffer[ctr++], reduction_code);
1068  } else {
1069  result->reduce(*((*result_it)->storage_), {}, reduction_code);
1070  }
1071  }
1072  return result_rs;
1073 }
1074 
1075 std::shared_ptr<ResultSet> ResultSetManager::getOwnResultSet() {
1076  return rs_;
1077 }
1078 
1079 void ResultSetManager::rewriteVarlenAggregates(ResultSet* result_rs) {
1080  auto& result_storage = result_rs->storage_;
1081  result_storage->rewriteAggregateBufferOffsets(
1082  result_rs->serialized_varlen_buffer_.front());
1083 }
1084 
1085 void ResultSetStorage::fillOneEntryRowWise(const std::vector<int64_t>& entry) {
1086  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1087  const auto key_count = query_mem_desc_.getGroupbyColCount();
1088  CHECK_EQ(slot_count + key_count, entry.size());
1089  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1091  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1092  const auto key_off = key_offset_rowwise(0, key_count, slot_count);
1093  CHECK_EQ(query_mem_desc_.getEffectiveKeyWidth(), sizeof(int64_t));
1094  for (size_t i = 0; i < key_count; ++i) {
1095  this_buff[key_off + i] = entry[i];
1096  }
1097  const auto first_slot_off = slot_offset_rowwise(0, 0, key_count, slot_count);
1098  for (size_t i = 0; i < target_init_vals_.size(); ++i) {
1099  this_buff[first_slot_off + i] = entry[key_count + i];
1100  }
1101 }
1102 
1104  const auto key_count = query_mem_desc_.getGroupbyColCount();
1105  const auto row_size = get_row_bytes(query_mem_desc_);
1106  CHECK_EQ(row_size % 8, 0u);
1107  const auto key_bytes_with_padding =
1111  case 4: {
1112  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1113  auto row_ptr = buff_ + i * row_size;
1114  fill_empty_key_32(reinterpret_cast<int32_t*>(row_ptr), key_count);
1115  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1116  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1117  slot_ptr[j] = target_init_vals_[j];
1118  }
1119  }
1120  break;
1121  }
1122  case 8: {
1123  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1124  auto row_ptr = buff_ + i * row_size;
1125  fill_empty_key_64(reinterpret_cast<int64_t*>(row_ptr), key_count);
1126  auto slot_ptr = reinterpret_cast<int64_t*>(row_ptr + key_bytes_with_padding);
1127  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1128  slot_ptr[j] = target_init_vals_[j];
1129  }
1130  }
1131  break;
1132  }
1133  default:
1134  CHECK(false);
1135  }
1136 }
1137 
1138 void ResultSetStorage::fillOneEntryColWise(const std::vector<int64_t>& entry) {
1140  CHECK_EQ(size_t(1), query_mem_desc_.getEntryCount());
1141  const auto slot_count = query_mem_desc_.getBufferColSlotCount();
1142  const auto key_count = query_mem_desc_.getGroupbyColCount();
1143  CHECK_EQ(slot_count + key_count, entry.size());
1144  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1145 
1146  for (size_t i = 0; i < key_count; i++) {
1147  const auto key_offset = key_offset_colwise(0, i, 1);
1148  this_buff[key_offset] = entry[i];
1149  }
1150 
1151  for (size_t i = 0; i < target_init_vals_.size(); i++) {
1152  const auto slot_offset = slot_offset_colwise(0, i, key_count, 1);
1153  this_buff[slot_offset] = entry[key_count + i];
1154  }
1155 }
1156 
1158  const auto key_count = query_mem_desc_.getGroupbyColCount();
1159  auto this_buff = reinterpret_cast<int64_t*>(buff_);
1161  for (size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1162  const auto first_key_off =
1164  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1165  this_buff[first_key_off + i] = EMPTY_KEY_64;
1166  }
1167  }
1168  for (size_t target_idx = 0; target_idx < target_init_vals_.size(); ++target_idx) {
1169  const auto first_val_off =
1170  slot_offset_colwise(0, target_idx, key_count, query_mem_desc_.getEntryCount());
1171  for (size_t i = 0; i < query_mem_desc_.getEntryCount(); ++i) {
1172  this_buff[first_val_off + i] = target_init_vals_[target_idx];
1173  }
1174  }
1175 }
1176 
1177 void ResultSetStorage::initializeBaselineValueSlots(int64_t* entry_slots) const {
1178  CHECK(entry_slots);
1180  size_t slot_off = 0;
1181  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1182  entry_slots[slot_off] = target_init_vals_[j];
1183  slot_off += query_mem_desc_.getEntryCount();
1184  }
1185  } else {
1186  for (size_t j = 0; j < target_init_vals_.size(); ++j) {
1187  entry_slots[j] = target_init_vals_[j];
1188  }
1189  }
1190 }
1191 
1192 #define AGGREGATE_ONE_VALUE( \
1193  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1194  do { \
1195  const auto sql_type = get_compact_type(agg_info__); \
1196  if (sql_type.is_fp()) { \
1197  if (chosen_bytes__ == sizeof(float)) { \
1198  agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1199  *reinterpret_cast<const float*>(other_ptr__)); \
1200  } else { \
1201  agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1202  *reinterpret_cast<const double*>(other_ptr__)); \
1203  } \
1204  } else { \
1205  if (chosen_bytes__ == sizeof(int32_t)) { \
1206  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1207  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1208  agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1209  } else { \
1210  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1211  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1212  agg_##agg_kind__(val_ptr, *other_ptr); \
1213  } \
1214  } \
1215  } while (0)
1216 
1217 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1218  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1219  do { \
1220  if (agg_info__.skip_null_val) { \
1221  const auto sql_type = get_compact_type(agg_info__); \
1222  if (sql_type.is_fp()) { \
1223  if (chosen_bytes__ == sizeof(float)) { \
1224  agg_##agg_kind__##_float_skip_val( \
1225  reinterpret_cast<int32_t*>(val_ptr__), \
1226  *reinterpret_cast<const float*>(other_ptr__), \
1227  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1228  } else { \
1229  agg_##agg_kind__##_double_skip_val( \
1230  reinterpret_cast<int64_t*>(val_ptr__), \
1231  *reinterpret_cast<const double*>(other_ptr__), \
1232  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1233  } \
1234  } else { \
1235  if (chosen_bytes__ == sizeof(int32_t)) { \
1236  int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1237  const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1238  const auto null_val = static_cast<int32_t>(init_val__); \
1239  agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1240  } else { \
1241  int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1242  const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1243  const auto null_val = static_cast<int64_t>(init_val__); \
1244  agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1245  } \
1246  } \
1247  } else { \
1248  AGGREGATE_ONE_VALUE( \
1249  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1250  } \
1251  } while (0)
1252 
1253 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1254  do { \
1255  if (chosen_bytes__ == sizeof(int32_t)) { \
1256  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1257  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1258  agg_sum_int32(val_ptr, *other_ptr); \
1259  } else { \
1260  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1261  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1262  agg_sum(val_ptr, *other_ptr); \
1263  } \
1264  } while (0)
1265 
1266 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1267  val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1268  { \
1269  if (agg_info__.skip_null_val) { \
1270  const auto sql_type = get_compact_type(agg_info__); \
1271  if (sql_type.is_fp()) { \
1272  if (chosen_bytes__ == sizeof(float)) { \
1273  agg_sum_float_skip_val( \
1274  reinterpret_cast<int32_t*>(val_ptr__), \
1275  *reinterpret_cast<const float*>(other_ptr__), \
1276  *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1277  } else { \
1278  agg_sum_double_skip_val( \
1279  reinterpret_cast<int64_t*>(val_ptr__), \
1280  *reinterpret_cast<const double*>(other_ptr__), \
1281  *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1282  } \
1283  } else { \
1284  if (chosen_bytes__ == sizeof(int32_t)) { \
1285  auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1286  auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1287  const auto null_val = static_cast<int32_t>(init_val__); \
1288  agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1289  } else { \
1290  auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1291  auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1292  const auto null_val = static_cast<int64_t>(init_val__); \
1293  agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1294  } \
1295  } \
1296  } else { \
1297  AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1298  } \
1299  }
1300 
1301 // to be used for 8/16-bit kMIN and kMAX only
1302 #define AGGREGATE_ONE_VALUE_SMALL( \
1303  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1304  do { \
1305  if (chosen_bytes__ == sizeof(int16_t)) { \
1306  auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1307  auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1308  agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1309  } else if (chosen_bytes__ == sizeof(int8_t)) { \
1310  auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1311  auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1312  agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1313  } else { \
1314  UNREACHABLE(); \
1315  } \
1316  } while (0)
1317 
1318 // to be used for 8/16-bit kMIN and kMAX only
1319 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1320  agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1321  do { \
1322  if (agg_info__.skip_null_val) { \
1323  if (chosen_bytes__ == sizeof(int16_t)) { \
1324  int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1325  const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1326  const auto null_val = static_cast<int16_t>(init_val__); \
1327  agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1328  } else if (chosen_bytes == sizeof(int8_t)) { \
1329  int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1330  const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1331  const auto null_val = static_cast<int8_t>(init_val__); \
1332  agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1333  } \
1334  } else { \
1335  AGGREGATE_ONE_VALUE_SMALL( \
1336  agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1337  } \
1338  } while (0)
1339 
1340 int8_t get_width_for_slot(const size_t target_slot_idx,
1341  const bool float_argument_input,
1343  if (float_argument_input) {
1344  return sizeof(float);
1345  }
1346  return query_mem_desc.getPaddedSlotWidthBytes(target_slot_idx);
1347 }
1348 
1350  int8_t* this_ptr1,
1351  int8_t* this_ptr2,
1352  const int8_t* that_ptr1,
1353  const int8_t* that_ptr2,
1354  const TargetInfo& target_info,
1355  const size_t target_logical_idx,
1356  const size_t target_slot_idx,
1357  const size_t init_agg_val_idx,
1358  const ResultSetStorage& that,
1359  const size_t first_slot_idx_for_target,
1360  const std::vector<std::string>& serialized_varlen_buffer) const {
1362  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1363  return;
1364  }
1365  }
1366  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1367  const bool float_argument_input = takes_float_argument(target_info);
1368  const auto chosen_bytes =
1369  get_width_for_slot(target_slot_idx, float_argument_input, query_mem_desc_);
1370  auto init_val = target_init_vals_[init_agg_val_idx];
1371  if (target_info.is_agg && target_info.agg_kind != kSAMPLE) {
1372  switch (target_info.agg_kind) {
1373  case kCOUNT:
1374  case kAPPROX_COUNT_DISTINCT: {
1375  if (is_distinct_target(target_info)) {
1376  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1377  reduceOneCountDistinctSlot(this_ptr1, that_ptr1, target_logical_idx, that);
1378  break;
1379  }
1380  CHECK_EQ(int64_t(0), init_val);
1381  AGGREGATE_ONE_COUNT(this_ptr1, that_ptr1, chosen_bytes);
1382  break;
1383  }
1384  case kAVG: {
1385  // Ignore float argument compaction for count component for fear of its overflow
1386  AGGREGATE_ONE_COUNT(this_ptr2,
1387  that_ptr2,
1388  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx));
1389  }
1390  // fall thru
1391  case kSUM: {
1393  sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1394  break;
1395  }
1396  case kMIN: {
1397  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1399  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1400  } else {
1402  min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1403  }
1404  break;
1405  }
1406  case kMAX: {
1407  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1409  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1410  } else {
1412  max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1413  }
1414  break;
1415  }
1416  default:
1417  CHECK(false);
1418  }
1419  } else {
1420  switch (chosen_bytes) {
1421  case 1: {
1423  const auto rhs_proj_col = *reinterpret_cast<const int8_t*>(that_ptr1);
1424  if (rhs_proj_col != init_val) {
1425  *reinterpret_cast<int8_t*>(this_ptr1) = rhs_proj_col;
1426  }
1427  break;
1428  }
1429  case 2: {
1431  const auto rhs_proj_col = *reinterpret_cast<const int16_t*>(that_ptr1);
1432  if (rhs_proj_col != init_val) {
1433  *reinterpret_cast<int16_t*>(this_ptr1) = rhs_proj_col;
1434  }
1435  break;
1436  }
1437  case 4: {
1438  CHECK(target_info.agg_kind != kSAMPLE ||
1440  const auto rhs_proj_col = *reinterpret_cast<const int32_t*>(that_ptr1);
1441  if (rhs_proj_col != init_val) {
1442  *reinterpret_cast<int32_t*>(this_ptr1) = rhs_proj_col;
1443  }
1444  break;
1445  }
1446  case 8: {
1447  auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
1448  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) &&
1449  !serialized_varlen_buffer.empty()) {
1450  size_t length_to_elems{0};
1451  if (target_info.sql_type.is_geometry()) {
1452  // TODO: Assumes hard-coded sizes for geometry targets
1453  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1454  } else {
1455  const auto& elem_ti = target_info.sql_type.get_elem_type();
1456  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1457  }
1458 
1459  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1460  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1461  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
1462  *reinterpret_cast<int64_t*>(this_ptr1) =
1463  reinterpret_cast<const int64_t>(str_ptr);
1464  *reinterpret_cast<int64_t*>(this_ptr2) =
1465  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1466  } else {
1467  if (rhs_proj_col != init_val) {
1468  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
1469  }
1470  if ((target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen())) {
1471  CHECK(this_ptr2 && that_ptr2);
1472  *reinterpret_cast<int64_t*>(this_ptr2) =
1473  *reinterpret_cast<const int64_t*>(that_ptr2);
1474  }
1475  }
1476 
1477  break;
1478  }
1479  default:
1480  LOG(FATAL) << "Invalid slot width: " << chosen_bytes;
1481  }
1482  }
1483 }
1484 
1486  const int8_t* that_ptr1,
1487  const size_t target_logical_idx,
1488  const ResultSetStorage& that) const {
1490  const auto& old_count_distinct_desc =
1491  query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1492  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
1493  const auto& new_count_distinct_desc =
1494  that.query_mem_desc_.getCountDistinctDescriptor(target_logical_idx);
1495  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1496  CHECK(this_ptr1 && that_ptr1);
1497  auto old_set_ptr = reinterpret_cast<const int64_t*>(this_ptr1);
1498  auto new_set_ptr = reinterpret_cast<const int64_t*>(that_ptr1);
1500  *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1501 }
1502 
1503 bool ResultSetStorage::reduceSingleRow(const int8_t* row_ptr,
1504  const int8_t warp_count,
1505  const bool is_columnar,
1506  const bool replace_bitmap_ptr_with_bitmap_sz,
1507  std::vector<int64_t>& agg_vals,
1509  const std::vector<TargetInfo>& targets,
1510  const std::vector<int64_t>& agg_init_vals) {
1511  const size_t agg_col_count{agg_vals.size()};
1512  const auto row_size = query_mem_desc.getRowSize();
1513  CHECK_EQ(agg_col_count, query_mem_desc.getSlotCount());
1514  CHECK_GE(agg_col_count, targets.size());
1515  CHECK_EQ(is_columnar, query_mem_desc.didOutputColumnar());
1516  CHECK(query_mem_desc.hasKeylessHash());
1517  std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1518  bool discard_row = true;
1519  for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1520  bool discard_partial_result = true;
1521  for (size_t target_idx = 0, agg_col_idx = 0;
1522  target_idx < targets.size() && agg_col_idx < agg_col_count;
1523  ++target_idx, ++agg_col_idx) {
1524  const auto& agg_info = targets[target_idx];
1525  const bool float_argument_input = takes_float_argument(agg_info);
1526  const auto chosen_bytes = float_argument_input
1527  ? sizeof(float)
1528  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1529  auto partial_bin_val = get_component(
1530  row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx), chosen_bytes);
1531  partial_agg_vals[agg_col_idx] = partial_bin_val;
1532  if (is_distinct_target(agg_info)) {
1533  CHECK_EQ(int8_t(1), warp_count);
1534  CHECK(agg_info.is_agg && (agg_info.agg_kind == kCOUNT ||
1535  agg_info.agg_kind == kAPPROX_COUNT_DISTINCT));
1536  partial_bin_val = count_distinct_set_size(
1537  partial_bin_val, query_mem_desc.getCountDistinctDescriptor(target_idx));
1538  if (replace_bitmap_ptr_with_bitmap_sz) {
1539  partial_agg_vals[agg_col_idx] = partial_bin_val;
1540  }
1541  }
1542  if (kAVG == agg_info.agg_kind) {
1543  CHECK(agg_info.is_agg && !agg_info.is_distinct);
1544  ++agg_col_idx;
1545  partial_bin_val = partial_agg_vals[agg_col_idx] =
1546  get_component(row_ptr + query_mem_desc.getColOnlyOffInBytes(agg_col_idx),
1547  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1548  }
1549  if (agg_col_idx == static_cast<size_t>(query_mem_desc.getTargetIdxForKey()) &&
1550  partial_bin_val != agg_init_vals[query_mem_desc.getTargetIdxForKey()]) {
1551  CHECK(agg_info.is_agg);
1552  discard_partial_result = false;
1553  }
1554  }
1555  row_ptr += row_size;
1556  if (discard_partial_result) {
1557  continue;
1558  }
1559  discard_row = false;
1560  for (size_t target_idx = 0, agg_col_idx = 0;
1561  target_idx < targets.size() && agg_col_idx < agg_col_count;
1562  ++target_idx, ++agg_col_idx) {
1563  auto partial_bin_val = partial_agg_vals[agg_col_idx];
1564  const auto& agg_info = targets[target_idx];
1565  const bool float_argument_input = takes_float_argument(agg_info);
1566  const auto chosen_bytes = float_argument_input
1567  ? sizeof(float)
1568  : query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx);
1569  const auto& chosen_type = get_compact_type(agg_info);
1570  if (agg_info.is_agg && agg_info.agg_kind != kSAMPLE) {
1571  try {
1572  switch (agg_info.agg_kind) {
1573  case kCOUNT:
1576  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1577  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1578  agg_init_vals[agg_col_idx],
1579  chosen_bytes,
1580  agg_info);
1581  break;
1582  case kAVG:
1583  // Ignore float argument compaction for count component for fear of its
1584  // overflow
1586  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1587  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1588  query_mem_desc.getPaddedSlotWidthBytes(agg_col_idx));
1589  // fall thru
1590  case kSUM:
1592  sum,
1593  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1594  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1595  agg_init_vals[agg_col_idx],
1596  chosen_bytes,
1597  agg_info);
1598  break;
1599  case kMIN:
1600  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1602  min,
1603  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1604  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1605  agg_init_vals[agg_col_idx],
1606  chosen_bytes,
1607  agg_info);
1608  } else {
1610  min,
1611  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1612  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1613  agg_init_vals[agg_col_idx],
1614  chosen_bytes,
1615  agg_info);
1616  }
1617  break;
1618  case kMAX:
1619  if (static_cast<size_t>(chosen_bytes) <= sizeof(int16_t)) {
1621  max,
1622  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1623  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1624  agg_init_vals[agg_col_idx],
1625  chosen_bytes,
1626  agg_info);
1627  } else {
1629  max,
1630  reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1631  reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1632  agg_init_vals[agg_col_idx],
1633  chosen_bytes,
1634  agg_info);
1635  }
1636  break;
1637  default:
1638  CHECK(false);
1639  break;
1640  }
1641  } catch (std::runtime_error& e) {
1642  // TODO(miyu): handle the case where chosen_bytes < 8
1643  LOG(ERROR) << e.what();
1644  }
1645  if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1646  switch (chosen_bytes) {
1647  case 8:
1648  break;
1649  case 4: {
1650  int32_t ret = *reinterpret_cast<const int32_t*>(&agg_vals[agg_col_idx]);
1651  if (!(agg_info.agg_kind == kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1652  agg_vals[agg_col_idx] = static_cast<int64_t>(ret);
1653  }
1654  break;
1655  }
1656  default:
1657  CHECK(false);
1658  }
1659  }
1660  if (kAVG == agg_info.agg_kind) {
1661  ++agg_col_idx;
1662  }
1663  } else {
1664  if (agg_info.agg_kind == kSAMPLE) {
1665  CHECK(!agg_info.sql_type.is_varlen())
1666  << "Interleaved bins reduction not supported for variable length "
1667  "arguments "
1668  "to SAMPLE";
1669  }
1670  if (agg_vals[agg_col_idx]) {
1671  if (agg_info.agg_kind == kSAMPLE) {
1672  continue;
1673  }
1674  CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1675  } else {
1676  agg_vals[agg_col_idx] = partial_bin_val;
1677  }
1678  }
1679  }
1680  }
1681  return discard_row;
1682 }
std::pair< int64_t *, bool > GroupValueInfo
Definition: ResultSet.h:913
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
#define CHECK_EQ(x, y)
Definition: Logger.h:198
const int32_t groups_buffer_size return groups_buffer
const int64_t const uint32_t const uint32_t const uint32_t agg_col_count
NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
void run_reduction_code(const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
#define EMPTY_KEY_64
const std::vector< TargetInfo > targets_
Definition: ResultSet.h:196
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
Definition: ResultSet.h:200
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
std::unique_ptr< Function > ir_reduce_loop
ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
SQLTypeInfo sql_type
Definition: TargetInfo.h:42
#define LOG(tag)
Definition: Logger.h:185
std::shared_ptr< ResultSet > rs_
Definition: ResultSet.h:888
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
void initializeStorage() const
#define CHECK_GE(x, y)
Definition: Logger.h:203
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
bool is_varlen() const
Definition: sqltypes.h:491
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:72
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:120
const int64_t const uint32_t groups_buffer_entry_count
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
const int64_t const uint32_t const uint32_t key_qw_count
#define CHECK_GT(x, y)
Definition: Logger.h:202
void rewriteVarlenAggregates(ResultSet *)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code) const
size_t getColOnlyOffInBytes(const size_t col_idx) const
Definition: sqldefs.h:71
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
static EvalValue run(const Function *function, const std::vector< EvalValue > &inputs)
bool is_agg
Definition: TargetInfo.h:40
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
CHECK(cgen_state)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
Definition: CountDistinct.h:75
void init(LogOptions const &log_opts)
Definition: Logger.cpp:265
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
#define store_cst(ptr, val)
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t targetGroupbyIndicesSize() const
ResultSet * reduce(std::vector< ResultSet * > &)
Definition: sqldefs.h:71
ALWAYS_INLINE void check_watchdog(const size_t sample_seed)
#define LIKELY(x)
Definition: likely.h:19
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:116
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad, const int64_t *init_vals)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
SQLAgg agg_kind
Definition: TargetInfo.h:41
bool is_geometry() const
Definition: sqltypes.h:489
size_t getCountDistinctDescriptorsSize() const
ssize_t getTargetGroupbyIndex(const size_t target_idx) const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
#define UNLIKELY(x)
Definition: likely.h:20
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
#define load_cst(ptr)
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
#define CHECK_LT(x, y)
Definition: Logger.h:200
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
int8_t * buff_
Definition: ResultSet.h:198
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
Definition: sqldefs.h:71
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
bool is_string() const
Definition: sqltypes.h:477
const ColSlotContext & getColSlotContext() const
#define EMPTY_KEY_32
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
SQLTypeInfoCore get_elem_type() const
Definition: sqltypes.h:659
Definition: sqldefs.h:71
#define ALWAYS_INLINE
int cpu_threads()
Definition: thread_count.h:25
size_t getBufferColSlotCount() const
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
Definition: sqldefs.h:71
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
QueryMemoryDescriptor query_mem_desc_
Definition: ResultSet.h:197
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const