OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ResultSetReductionJIT.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ResultSetReductionJIT.h"
20 
21 #include "CodeGenerator.h"
22 #include "DynamicWatchdog.h"
23 #include "Execute.h"
24 #include "IRCodegenUtils.h"
26 #include "Shared/likely.h"
27 #include "Shared/quantile.h"
28 
29 #include <llvm/Bitcode/BitcodeReader.h>
30 #include <llvm/IR/Function.h>
31 #include <llvm/IR/IRBuilder.h>
32 #include <llvm/IR/Verifier.h>
33 #include <llvm/Support/SourceMgr.h>
34 #include <llvm/Support/raw_os_ostream.h>
35 
36 namespace {
37 
38 // Error code to be returned when the watchdog timer triggers during the reduction.
39 const int32_t WATCHDOG_ERROR{-1};
40 // Error code to be returned when the interrupt is triggered during the reduction.
41 const int32_t INTERRUPT_ERROR{10};
42 // Use the interpreter, not the JIT, for a number of entries lower than the threshold.
43 const size_t INTERP_THRESHOLD{25};
44 
45 // Load the value stored at 'ptr' interpreted as 'ptr_type'.
46 Value* emit_load(Value* ptr, Type ptr_type, Function* function) {
47  return function->add<Load>(
48  function->add<Cast>(Cast::CastOp::BitCast, ptr, ptr_type, ""),
49  ptr->label() + "_loaded");
50 }
51 
52 // Load the value stored at 'ptr' as a 32-bit signed integer.
53 Value* emit_load_i32(Value* ptr, Function* function) {
54  return emit_load(ptr, Type::Int32Ptr, function);
55 }
56 
57 // Load the value stored at 'ptr' as a 64-bit signed integer.
58 Value* emit_load_i64(Value* ptr, Function* function) {
59  return emit_load(ptr, Type::Int64Ptr, function);
60 }
61 
62 // Read a 32- or 64-bit integer stored at 'ptr' and sign extend to 64-bit.
63 Value* emit_read_int_from_buff(Value* ptr, const int8_t compact_sz, Function* function) {
64  switch (compact_sz) {
65  case 8: {
66  return emit_load_i64(ptr, function);
67  }
68  case 4: {
69  const auto loaded_val = emit_load_i32(ptr, function);
70  return function->add<Cast>(Cast::CastOp::SExt, loaded_val, Type::Int64, "");
71  }
72  default: {
73  LOG(FATAL) << "Invalid byte width: " << compact_sz;
74  return nullptr;
75  }
76  }
77 }
78 
79 // Emit a runtime call to accumulate into the 'val_ptr' byte address the 'other_ptr'
80 // value when the type is specified as not null.
81 void emit_aggregate_one_value(const std::string& agg_kind,
82  Value* val_ptr,
83  Value* other_ptr,
84  const size_t chosen_bytes,
85  const TargetInfo& agg_info,
86  Function* ir_reduce_one_entry) {
87  const auto sql_type = get_compact_type(agg_info);
88  const auto dest_name = agg_kind + "_dest";
89  if (sql_type.is_fp()) {
90  if (chosen_bytes == sizeof(float)) {
91  const auto agg = ir_reduce_one_entry->add<Cast>(
92  Cast::CastOp::BitCast, val_ptr, Type::Int32Ptr, dest_name);
93  const auto val = emit_load(other_ptr, Type::FloatPtr, ir_reduce_one_entry);
94  ir_reduce_one_entry->add<Call>(
95  "agg_" + agg_kind + "_float", std::vector<const Value*>{agg, val}, "");
96  } else {
97  CHECK_EQ(chosen_bytes, sizeof(double));
98  const auto agg = ir_reduce_one_entry->add<Cast>(
99  Cast::CastOp::BitCast, val_ptr, Type::Int64Ptr, dest_name);
100  const auto val = emit_load(other_ptr, Type::DoublePtr, ir_reduce_one_entry);
101  ir_reduce_one_entry->add<Call>(
102  "agg_" + agg_kind + "_double", std::vector<const Value*>{agg, val}, "");
103  }
104  } else {
105  if (chosen_bytes == sizeof(int32_t)) {
106  const auto agg = ir_reduce_one_entry->add<Cast>(
107  Cast::CastOp::BitCast, val_ptr, Type::Int32Ptr, dest_name);
108  const auto val = emit_load(other_ptr, Type::Int32Ptr, ir_reduce_one_entry);
109  ir_reduce_one_entry->add<Call>(
110  "agg_" + agg_kind + "_int32", std::vector<const Value*>{agg, val}, "");
111  } else {
112  CHECK_EQ(chosen_bytes, sizeof(int64_t));
113  const auto agg = ir_reduce_one_entry->add<Cast>(
114  Cast::CastOp::BitCast, val_ptr, Type::Int64Ptr, dest_name);
115  const auto val = emit_load(other_ptr, Type::Int64Ptr, ir_reduce_one_entry);
116  ir_reduce_one_entry->add<Call>(
117  "agg_" + agg_kind, std::vector<const Value*>{agg, val}, "");
118  }
119  }
120 }
121 
122 // Same as above, but support nullable types as well.
123 void emit_aggregate_one_nullable_value(const std::string& agg_kind,
124  Value* val_ptr,
125  Value* other_ptr,
126  const int64_t init_val,
127  const size_t chosen_bytes,
128  const TargetInfo& agg_info,
129  Function* ir_reduce_one_entry) {
130  const auto dest_name = agg_kind + "_dest";
131  if (agg_info.skip_null_val) {
132  const auto sql_type = get_compact_type(agg_info);
133  if (sql_type.is_fp()) {
134  if (chosen_bytes == sizeof(float)) {
135  const auto agg = ir_reduce_one_entry->add<Cast>(
136  Cast::CastOp::BitCast, val_ptr, Type::Int32Ptr, dest_name);
137  const auto val = emit_load(other_ptr, Type::FloatPtr, ir_reduce_one_entry);
138  const auto init_val_lv = ir_reduce_one_entry->addConstant<ConstantFP>(
139  *reinterpret_cast<const float*>(may_alias_ptr(&init_val)), Type::Float);
140  ir_reduce_one_entry->add<Call>("agg_" + agg_kind + "_float_skip_val",
141  std::vector<const Value*>{agg, val, init_val_lv},
142  "");
143  } else {
144  CHECK_EQ(chosen_bytes, sizeof(double));
145  const auto agg = ir_reduce_one_entry->add<Cast>(
146  Cast::CastOp::BitCast, val_ptr, Type::Int64Ptr, dest_name);
147  const auto val = emit_load(other_ptr, Type::DoublePtr, ir_reduce_one_entry);
148  const auto init_val_lv = ir_reduce_one_entry->addConstant<ConstantFP>(
149  *reinterpret_cast<const double*>(may_alias_ptr(&init_val)), Type::Double);
150  ir_reduce_one_entry->add<Call>("agg_" + agg_kind + "_double_skip_val",
151  std::vector<const Value*>{agg, val, init_val_lv},
152  "");
153  }
154  } else {
155  if (chosen_bytes == sizeof(int32_t)) {
156  const auto agg = ir_reduce_one_entry->add<Cast>(
157  Cast::CastOp::BitCast, val_ptr, Type::Int32Ptr, dest_name);
158  const auto val = emit_load(other_ptr, Type::Int32Ptr, ir_reduce_one_entry);
159  const auto init_val_lv =
160  ir_reduce_one_entry->addConstant<ConstantInt>(init_val, Type::Int32);
161  ir_reduce_one_entry->add<Call>("agg_" + agg_kind + "_int32_skip_val",
162  std::vector<const Value*>{agg, val, init_val_lv},
163  "");
164  } else {
165  CHECK_EQ(chosen_bytes, sizeof(int64_t));
166  const auto agg = ir_reduce_one_entry->add<Cast>(
167  Cast::CastOp::BitCast, val_ptr, Type::Int64Ptr, dest_name);
168  const auto val = emit_load(other_ptr, Type::Int64Ptr, ir_reduce_one_entry);
169  const auto init_val_lv =
170  ir_reduce_one_entry->addConstant<ConstantInt>(init_val, Type::Int64);
171  ir_reduce_one_entry->add<Call>("agg_" + agg_kind + "_skip_val",
172  std::vector<const Value*>{agg, val, init_val_lv},
173  "");
174  }
175  }
176  } else {
178  agg_kind, val_ptr, other_ptr, chosen_bytes, agg_info, ir_reduce_one_entry);
179  }
180 }
181 
182 // Emit code to accumulate the 'other_ptr' count into the 'val_ptr' destination.
184  Value* other_ptr,
185  const size_t chosen_bytes,
186  Function* ir_reduce_one_entry) {
187  const auto dest_name = "count_dest";
188  if (chosen_bytes == sizeof(int32_t)) {
189  const auto agg = ir_reduce_one_entry->add<Cast>(
190  Cast::CastOp::BitCast, val_ptr, Type::Int32Ptr, dest_name);
191  const auto val = emit_load(other_ptr, Type::Int32Ptr, ir_reduce_one_entry);
192  ir_reduce_one_entry->add<Call>(
193  "agg_sum_int32", std::vector<const Value*>{agg, val}, "");
194  } else {
195  CHECK_EQ(chosen_bytes, sizeof(int64_t));
196  const auto agg = ir_reduce_one_entry->add<Cast>(
197  Cast::CastOp::BitCast, val_ptr, Type::Int64Ptr, dest_name);
198  const auto val = emit_load(other_ptr, Type::Int64Ptr, ir_reduce_one_entry);
199  ir_reduce_one_entry->add<Call>("agg_sum", std::vector<const Value*>{agg, val}, "");
200  }
201 }
202 
203 // Emit code to load the value stored at the 'other_pi8' as an integer of the given width
204 // 'chosen_bytes' and write it to the 'slot_pi8' destination only if necessary (the
205 // existing value at destination is the initialization value).
207  Value* other_pi8,
208  const int64_t init_val,
209  const size_t chosen_bytes,
210  Function* ir_reduce_one_entry) {
211  const auto func_name = "write_projection_int" + std::to_string(chosen_bytes * 8);
212  if (chosen_bytes == sizeof(int32_t)) {
213  const auto proj_val = emit_load_i32(other_pi8, ir_reduce_one_entry);
214  ir_reduce_one_entry->add<Call>(
215  func_name,
216  std::vector<const Value*>{
217  slot_pi8,
218  proj_val,
219  ir_reduce_one_entry->addConstant<ConstantInt>(init_val, Type::Int64)},
220  "");
221  } else {
222  CHECK_EQ(chosen_bytes, sizeof(int64_t));
223  const auto proj_val = emit_load_i64(other_pi8, ir_reduce_one_entry);
224  ir_reduce_one_entry->add<Call>(
225  func_name,
226  std::vector<const Value*>{
227  slot_pi8,
228  proj_val,
229  ir_reduce_one_entry->addConstant<ConstantInt>(init_val, Type::Int64)},
230  "");
231  }
232 }
233 
234 // Emit code to load the value stored at the 'other_pi8' as an integer of the given width
235 // 'chosen_bytes' and write it to the 'slot_pi8' destination only if necessary (the
236 // existing value at destination is the initialization value).
238  Value* other_pi8,
239  const int64_t init_val,
240  const size_t chosen_bytes,
241  Function* ir_reduce_one_entry) {
242  if (chosen_bytes == sizeof(int32_t)) {
243  const auto func_name = "checked_single_agg_id_int32";
244  const auto proj_val = emit_load_i32(other_pi8, ir_reduce_one_entry);
245  const auto slot_pi32 = ir_reduce_one_entry->add<Cast>(
246  Cast::CastOp::BitCast, slot_pi8, Type::Int32Ptr, "");
247  return ir_reduce_one_entry->add<Call>(
248  func_name,
249  Type::Int32,
250  std::vector<const Value*>{
251  slot_pi32,
252  proj_val,
253  ir_reduce_one_entry->addConstant<ConstantInt>(init_val, Type::Int32)},
254  "");
255  } else {
256  const auto func_name = "checked_single_agg_id";
257  CHECK_EQ(chosen_bytes, sizeof(int64_t));
258  const auto proj_val = emit_load_i64(other_pi8, ir_reduce_one_entry);
259  const auto slot_pi64 = ir_reduce_one_entry->add<Cast>(
260  Cast::CastOp::BitCast, slot_pi8, Type::Int64Ptr, "");
261 
262  return ir_reduce_one_entry->add<Call>(
263  func_name,
264  Type::Int32,
265  std::vector<const Value*>{
266  slot_pi64,
267  proj_val,
268  ir_reduce_one_entry->addConstant<ConstantInt>(init_val, Type::Int64)},
269  "");
270  }
271 }
272 
273 std::unique_ptr<Function> create_function(
274  const std::string name,
275  const std::vector<Function::NamedArg>& arg_types,
276  const Type ret_type,
277  const bool always_inline) {
278  return std::make_unique<Function>(name, arg_types, ret_type, always_inline);
279 }
280 
281 // Create the declaration for the 'is_empty_entry' function. Use private linkage since
282 // it's a helper only called from the generated code and mark it as always inline.
283 std::unique_ptr<Function> setup_is_empty_entry(ReductionCode* reduction_code) {
284  return create_function(
285  "is_empty_entry", {{"row_ptr", Type::Int8Ptr}}, Type::Int1, /*always_inline=*/true);
286 }
287 
288 // Create the declaration for the 'reduce_one_entry' helper.
289 std::unique_ptr<Function> setup_reduce_one_entry(ReductionCode* reduction_code,
290  const QueryDescriptionType hash_type) {
291  std::string this_ptr_name;
292  std::string that_ptr_name;
293  switch (hash_type) {
295  this_ptr_name = "this_targets_ptr";
296  that_ptr_name = "that_targets_ptr";
297  break;
298  }
301  this_ptr_name = "this_row_ptr";
302  that_ptr_name = "that_row_ptr";
303  break;
304  }
305  default: {
306  LOG(FATAL) << "Unexpected query description type";
307  }
308  }
309  return create_function("reduce_one_entry",
310  {{this_ptr_name, Type::Int8Ptr},
311  {that_ptr_name, Type::Int8Ptr},
312  {"this_qmd", Type::VoidPtr},
313  {"that_qmd", Type::VoidPtr},
314  {"serialized_varlen_buffer_arg", Type::VoidPtr}},
315  Type::Int32,
316  /*always_inline=*/true);
317 }
318 
319 // Create the declaration for the 'reduce_one_entry_idx' helper.
320 std::unique_ptr<Function> setup_reduce_one_entry_idx(ReductionCode* reduction_code) {
321  return create_function("reduce_one_entry_idx",
322  {{"this_buff", Type::Int8Ptr},
323  {"that_buff", Type::Int8Ptr},
324  {"that_entry_idx", Type::Int32},
325  {"that_entry_count", Type::Int32},
326  {"this_qmd_handle", Type::VoidPtr},
327  {"that_qmd_handle", Type::VoidPtr},
328  {"serialized_varlen_buffer", Type::VoidPtr}},
329  Type::Int32,
330  /*always_inline=*/true);
331 }
332 
333 // Create the declaration for the 'reduce_loop' entry point. Use external linkage, this is
334 // the public API of the generated code directly used from result set reduction.
335 std::unique_ptr<Function> setup_reduce_loop(ReductionCode* reduction_code) {
336  return create_function("reduce_loop",
337  {{"this_buff", Type::Int8Ptr},
338  {"that_buff", Type::Int8Ptr},
339  {"start_index", Type::Int32},
340  {"end_index", Type::Int32},
341  {"that_entry_count", Type::Int32},
342  {"this_qmd_handle", Type::VoidPtr},
343  {"that_qmd_handle", Type::VoidPtr},
344  {"serialized_varlen_buffer", Type::VoidPtr}},
345  Type::Int32,
346  /*always_inline=*/false);
347 }
348 
349 llvm::Function* create_llvm_function(const Function* function, CgenState* cgen_state) {
350  AUTOMATIC_IR_METADATA(cgen_state);
351  auto& ctx = cgen_state->context_;
352  std::vector<llvm::Type*> parameter_types;
353  const auto& arg_types = function->arg_types();
354  for (const auto& named_arg : arg_types) {
355  CHECK(named_arg.type != Type::Void);
356  parameter_types.push_back(llvm_type(named_arg.type, ctx));
357  }
358  const auto func_type = llvm::FunctionType::get(
359  llvm_type(function->ret_type(), ctx), parameter_types, false);
360  const auto linkage = function->always_inline() ? llvm::Function::PrivateLinkage
361  : llvm::Function::ExternalLinkage;
362  auto func =
363  llvm::Function::Create(func_type, linkage, function->name(), cgen_state->module_);
364  const auto arg_it = func->arg_begin();
365  for (size_t i = 0; i < arg_types.size(); ++i) {
366  const auto arg = &*(arg_it + i);
367  arg->setName(arg_types[i].name);
368  }
369  if (function->always_inline()) {
371  }
372  return func;
373 }
374 
375 // Setup the reduction function and helpers declarations, create a module and a code
376 // generation state object.
378  ReductionCode reduction_code{};
379  reduction_code.ir_is_empty = setup_is_empty_entry(&reduction_code);
380  reduction_code.ir_reduce_one_entry = setup_reduce_one_entry(&reduction_code, hash_type);
381  reduction_code.ir_reduce_one_entry_idx = setup_reduce_one_entry_idx(&reduction_code);
382  reduction_code.ir_reduce_loop = setup_reduce_loop(&reduction_code);
383  return reduction_code;
384 }
385 
387  return hash_type == QueryDescriptionType::GroupByBaselineHash ||
390 }
391 
392 // Variable length sample fast path (no serialized variable length buffer).
393 void varlen_buffer_sample(int8_t* this_ptr1,
394  int8_t* this_ptr2,
395  const int8_t* that_ptr1,
396  const int8_t* that_ptr2,
397  const int64_t init_val) {
398  const auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
399  if (rhs_proj_col != init_val) {
400  *reinterpret_cast<int64_t*>(this_ptr1) = rhs_proj_col;
401  }
402  CHECK(this_ptr2 && that_ptr2);
403  *reinterpret_cast<int64_t*>(this_ptr2) = *reinterpret_cast<const int64_t*>(that_ptr2);
404 }
405 
406 } // namespace
407 
409  const void* serialized_varlen_buffer_handle,
410  int8_t* this_ptr1,
411  int8_t* this_ptr2,
412  const int8_t* that_ptr1,
413  const int8_t* that_ptr2,
414  const int64_t init_val,
415  const int64_t length_to_elems) {
416  if (!serialized_varlen_buffer_handle) {
417  varlen_buffer_sample(this_ptr1, this_ptr2, that_ptr1, that_ptr2, init_val);
418  return;
419  }
420  const auto& serialized_varlen_buffer =
421  *reinterpret_cast<const std::vector<std::string>*>(serialized_varlen_buffer_handle);
422  if (!serialized_varlen_buffer.empty()) {
423  const auto rhs_proj_col = *reinterpret_cast<const int64_t*>(that_ptr1);
424  CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
425  const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
426  const auto str_ptr = reinterpret_cast<const int8_t*>(varlen_bytes_str.c_str());
427  *reinterpret_cast<int64_t*>(this_ptr1) = reinterpret_cast<const int64_t>(str_ptr);
428  *reinterpret_cast<int64_t*>(this_ptr2) =
429  static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
430  } else {
431  varlen_buffer_sample(this_ptr1, this_ptr2, that_ptr1, that_ptr2, init_val);
432  }
433 }
434 
435 // Wrappers to be called from the generated code, sharing implementation with the rest of
436 // the system.
437 
439  const int64_t new_set_handle,
440  const int64_t old_set_handle,
441  const void* that_qmd_handle,
442  const void* this_qmd_handle,
443  const int64_t target_logical_idx) {
444  const auto that_qmd = reinterpret_cast<const QueryMemoryDescriptor*>(that_qmd_handle);
445  const auto this_qmd = reinterpret_cast<const QueryMemoryDescriptor*>(this_qmd_handle);
446  const auto& new_count_distinct_desc =
447  that_qmd->getCountDistinctDescriptor(target_logical_idx);
448  const auto& old_count_distinct_desc =
449  this_qmd->getCountDistinctDescriptor(target_logical_idx);
450  CHECK(old_count_distinct_desc.impl_type_ != CountDistinctImplType::Invalid);
451  CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
453  new_set_handle, old_set_handle, new_count_distinct_desc, old_count_distinct_desc);
454 }
455 
456 extern "C" RUNTIME_EXPORT void approx_quantile_jit_rt(const int64_t new_set_handle,
457  const int64_t old_set_handle,
458  const void* that_qmd_handle,
459  const void* this_qmd_handle,
460  const int64_t target_logical_idx) {
461  auto* incoming = reinterpret_cast<quantile::TDigest*>(new_set_handle);
462  if (incoming->centroids().capacity()) {
463  auto* accumulator = reinterpret_cast<quantile::TDigest*>(old_set_handle);
464  accumulator->allocate();
465  accumulator->mergeTDigest(*incoming);
466  }
467 }
468 
470  int8_t* groups_buffer,
471  const int8_t* key,
472  const uint32_t key_count,
473  const void* this_qmd_handle,
474  const int8_t* that_buff,
475  const uint32_t that_entry_idx,
476  const uint32_t that_entry_count,
477  const uint32_t row_size_bytes,
478  int64_t** buff_out,
479  uint8_t* empty) {
480  const auto& this_qmd = *reinterpret_cast<const QueryMemoryDescriptor*>(this_qmd_handle);
481  const auto gvi =
482  result_set::get_group_value_reduction(reinterpret_cast<int64_t*>(groups_buffer),
483  this_qmd.getEntryCount(),
484  reinterpret_cast<const int64_t*>(key),
485  key_count,
486  this_qmd.getEffectiveKeyWidth(),
487  this_qmd,
488  reinterpret_cast<const int64_t*>(that_buff),
489  that_entry_idx,
490  that_entry_count,
491  row_size_bytes >> 3);
492  *buff_out = gvi.first;
493  *empty = gvi.second;
494 }
495 
496 extern "C" RUNTIME_EXPORT uint8_t check_watchdog_rt(const size_t sample_seed) {
497  if (UNLIKELY(g_enable_dynamic_watchdog && (sample_seed & 0x3F) == 0 &&
498  dynamic_watchdog())) {
499  return true;
500  }
501  return false;
502 }
503 
504 extern "C" uint8_t check_interrupt_rt(const size_t sample_seed) {
505  // this func is called iff we enable runtime query interrupt
506  if (UNLIKELY((sample_seed & 0xFFFF) == 0 && check_interrupt())) {
507  return true;
508  }
509  return false;
510 }
511 
513  const std::vector<TargetInfo>& targets,
514  const std::vector<int64_t>& target_init_vals,
515  const size_t executor_id)
516  : executor_id_(executor_id)
517  , query_mem_desc_(query_mem_desc)
518  , targets_(targets)
519  , target_init_vals_(target_init_vals) {}
520 
521 // The code generated for a reduction between two result set buffers is structured in
522 // several functions and their IR is stored in the 'ReductionCode' structure. At a high
523 // level, the pseudocode is:
524 //
525 // func is_empty_func(row_ptr):
526 // ...
527 //
528 // func reduce_func_baseline(this_ptr, that_ptr):
529 // if is_empty_func(that_ptr):
530 // return
531 // for each target in the row:
532 // reduce target from that_ptr into this_ptr
533 //
534 // func reduce_func_perfect_hash(this_ptr, that_ptr):
535 // if is_empty_func(that_ptr):
536 // return
537 // for each target in the row:
538 // reduce target from that_ptr into this_ptr
539 //
540 // func reduce_func_idx(this_buff, that_buff, that_entry_index):
541 // that_ptr = that_result_set[that_entry_index]
542 // # Retrieval of 'this_ptr' is different between perfect hash and baseline.
543 // this_ptr = this_result_set[that_entry_index]
544 // or
545 // get_row(key(that_row_ptr), this_result_setBuffer)
546 // reduce_func_[baseline|perfect_hash](this_ptr, that_ptr)
547 //
548 // func reduce_loop(this_buff, that_buff, start_entry_index, end_entry_index):
549 // for that_entry_index in [start_entry_index, end_entry_index):
550 // reduce_func_idx(this_buff, that_buff, that_entry_index)
551 
553  const auto hash_type = query_mem_desc_.getQueryDescriptionType();
555  return {};
556  }
557  auto reduction_code = setup_functions_ir(hash_type);
558  isEmpty(reduction_code);
562  reduceOneEntryNoCollisions(reduction_code);
563  reduceOneEntryNoCollisionsIdx(reduction_code);
564  break;
565  }
567  reduceOneEntryBaseline(reduction_code);
568  reduceOneEntryBaselineIdx(reduction_code);
569  break;
570  }
571  default: {
572  LOG(FATAL) << "Unexpected query description type";
573  }
574  }
575  reduceLoop(reduction_code);
576  // For small result sets, avoid native code generation and use the interpreter instead.
579  return reduction_code;
580  }
581  auto executor = Executor::getExecutor(executor_id_);
582  CodeCacheKey key{cacheKey()};
583  std::lock_guard<std::mutex> compilation_lock(executor->compilation_mutex_);
584  auto& s_code_cache = executor->s_code_cache;
585  const auto compilation_context = s_code_cache.get(key);
586  if (compilation_context) {
587  reduction_code.func_ptr =
588  reinterpret_cast<ReductionCode::FuncPtr>(compilation_context->first->func());
589  return reduction_code;
590  }
591  auto cgen_state_ = std::unique_ptr<CgenState>(new CgenState({}, false, executor.get()));
592  auto cgen_state = reduction_code.cgen_state = cgen_state_.get();
593  cgen_state->set_module_shallow_copy(executor->get_rt_module());
594  reduction_code.module = cgen_state->module_;
595 
596  AUTOMATIC_IR_METADATA(cgen_state);
597  auto ir_is_empty = create_llvm_function(reduction_code.ir_is_empty.get(), cgen_state);
598  auto ir_reduce_one_entry =
599  create_llvm_function(reduction_code.ir_reduce_one_entry.get(), cgen_state);
600  auto ir_reduce_one_entry_idx =
601  create_llvm_function(reduction_code.ir_reduce_one_entry_idx.get(), cgen_state);
602  auto ir_reduce_loop =
603  create_llvm_function(reduction_code.ir_reduce_loop.get(), cgen_state);
604  std::unordered_map<const Function*, llvm::Function*> f;
605  f.emplace(reduction_code.ir_is_empty.get(), ir_is_empty);
606  f.emplace(reduction_code.ir_reduce_one_entry.get(), ir_reduce_one_entry);
607  f.emplace(reduction_code.ir_reduce_one_entry_idx.get(), ir_reduce_one_entry_idx);
608  f.emplace(reduction_code.ir_reduce_loop.get(), ir_reduce_loop);
609  translate_function(reduction_code.ir_is_empty.get(), ir_is_empty, reduction_code, f);
611  reduction_code.ir_reduce_one_entry.get(), ir_reduce_one_entry, reduction_code, f);
612  translate_function(reduction_code.ir_reduce_one_entry_idx.get(),
613  ir_reduce_one_entry_idx,
614  reduction_code,
615  f);
617  reduction_code.ir_reduce_loop.get(), ir_reduce_loop, reduction_code, f);
618  reduction_code.llvm_reduce_loop = ir_reduce_loop;
620  reduction_code.cgen_state = nullptr;
622  reduction_code, ir_is_empty, ir_reduce_one_entry, ir_reduce_one_entry_idx, key);
623  return reduction_code;
624 }
625 
626 void ResultSetReductionJIT::isEmpty(const ReductionCode& reduction_code) const {
627  auto ir_is_empty = reduction_code.ir_is_empty.get();
630  Value* key{nullptr};
631  Value* empty_key_val{nullptr};
632  const auto keys_ptr = ir_is_empty->arg(0);
637  CHECK_LT(static_cast<size_t>(query_mem_desc_.getTargetIdxForKey()),
638  target_init_vals_.size());
639  const int64_t target_slot_off = result_set::get_byteoff_of_slot(
641  const auto slot_ptr = ir_is_empty->add<GetElementPtr>(
642  keys_ptr,
643  ir_is_empty->addConstant<ConstantInt>(target_slot_off, Type::Int32),
644  "is_empty_slot_ptr");
645  const auto compact_sz =
647  key = emit_read_int_from_buff(slot_ptr, compact_sz, ir_is_empty);
648  empty_key_val = ir_is_empty->addConstant<ConstantInt>(
650  } else {
652  case 4: {
655  key = emit_load_i32(keys_ptr, ir_is_empty);
656  empty_key_val = ir_is_empty->addConstant<ConstantInt>(EMPTY_KEY_32, Type::Int32);
657  break;
658  }
659  case 8: {
660  key = emit_load_i64(keys_ptr, ir_is_empty);
661  empty_key_val = ir_is_empty->addConstant<ConstantInt>(EMPTY_KEY_64, Type::Int64);
662  break;
663  }
664  default:
665  LOG(FATAL) << "Invalid key width";
666  }
667  }
668  const auto ret =
669  ir_is_empty->add<ICmp>(ICmp::Predicate::EQ, key, empty_key_val, "is_key_empty");
670  ir_is_empty->add<Ret>(ret);
671 }
672 
674  const ReductionCode& reduction_code) const {
675  auto ir_reduce_one_entry = reduction_code.ir_reduce_one_entry.get();
676  const auto this_row_ptr = ir_reduce_one_entry->arg(0);
677  const auto that_row_ptr = ir_reduce_one_entry->arg(1);
678  const auto that_is_empty =
679  ir_reduce_one_entry->add<Call>(reduction_code.ir_is_empty.get(),
680  std::vector<const Value*>{that_row_ptr},
681  "that_is_empty");
682  ir_reduce_one_entry->add<ReturnEarly>(
683  that_is_empty, ir_reduce_one_entry->addConstant<ConstantInt>(0, Type::Int32), "");
684 
685  const auto key_bytes = get_key_bytes_rowwise(query_mem_desc_);
686  if (key_bytes) { // copy the key from right hand side
687  ir_reduce_one_entry->add<MemCpy>(
688  this_row_ptr,
689  that_row_ptr,
690  ir_reduce_one_entry->addConstant<ConstantInt>(key_bytes, Type::Int32));
691  }
692 
693  const auto key_bytes_with_padding = align_to_int64(key_bytes);
694  const auto key_bytes_lv =
695  ir_reduce_one_entry->addConstant<ConstantInt>(key_bytes_with_padding, Type::Int32);
696  const auto this_targets_start_ptr = ir_reduce_one_entry->add<GetElementPtr>(
697  this_row_ptr, key_bytes_lv, "this_targets_start");
698  const auto that_targets_start_ptr = ir_reduce_one_entry->add<GetElementPtr>(
699  that_row_ptr, key_bytes_lv, "that_targets_start");
700 
702  ir_reduce_one_entry, this_targets_start_ptr, that_targets_start_ptr);
703 }
704 
706  Function* ir_reduce_one_entry,
707  Value* this_targets_start_ptr,
708  Value* that_targets_start_ptr) const {
709  const auto& col_slot_context = query_mem_desc_.getColSlotContext();
710  Value* this_targets_ptr = this_targets_start_ptr;
711  Value* that_targets_ptr = that_targets_start_ptr;
712  size_t init_agg_val_idx = 0;
713  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
714  ++target_logical_idx) {
715  const auto& target_info = targets_[target_logical_idx];
716  const auto& slots_for_col = col_slot_context.getSlotsForCol(target_logical_idx);
717  Value* this_ptr2{nullptr};
718  Value* that_ptr2{nullptr};
719 
720  bool two_slot_target{false};
721  if (target_info.is_agg &&
722  (target_info.agg_kind == kAVG ||
723  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
724  // Note that this assumes if one of the slot pairs in a given target is an array,
725  // all slot pairs are arrays. Currently this is true for all geo targets, but we
726  // should better codify and store this information in the future
727  two_slot_target = true;
728  }
729 
730  for (size_t target_slot_idx = slots_for_col.front();
731  target_slot_idx < slots_for_col.back() + 1;
732  target_slot_idx += 2) {
733  const auto slot_off_val = query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx);
734  const auto slot_off =
735  ir_reduce_one_entry->addConstant<ConstantInt>(slot_off_val, Type::Int32);
736  if (UNLIKELY(two_slot_target)) {
737  const auto desc = "target_" + std::to_string(target_logical_idx) + "_second_slot";
738  this_ptr2 = ir_reduce_one_entry->add<GetElementPtr>(
739  this_targets_ptr, slot_off, "this_" + desc);
740  that_ptr2 = ir_reduce_one_entry->add<GetElementPtr>(
741  that_targets_ptr, slot_off, "that_" + desc);
742  }
743  reduceOneSlot(this_targets_ptr,
744  this_ptr2,
745  that_targets_ptr,
746  that_ptr2,
747  target_info,
748  target_logical_idx,
749  target_slot_idx,
750  init_agg_val_idx,
751  slots_for_col.front(),
752  ir_reduce_one_entry);
753  auto increment_agg_val_idx_maybe =
754  [&init_agg_val_idx, &target_logical_idx, this](const int slot_count) {
756  query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
757  init_agg_val_idx += slot_count;
758  }
759  };
760  if (target_logical_idx + 1 == targets_.size() &&
761  target_slot_idx + 1 >= slots_for_col.back()) {
762  break;
763  }
764  const auto next_desc =
765  "target_" + std::to_string(target_logical_idx + 1) + "_first_slot";
766  if (UNLIKELY(two_slot_target)) {
767  increment_agg_val_idx_maybe(2);
768  const auto two_slot_off = ir_reduce_one_entry->addConstant<ConstantInt>(
769  slot_off_val + query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx + 1),
770  Type::Int32);
771  this_targets_ptr = ir_reduce_one_entry->add<GetElementPtr>(
772  this_targets_ptr, two_slot_off, "this_" + next_desc);
773  that_targets_ptr = ir_reduce_one_entry->add<GetElementPtr>(
774  that_targets_ptr, two_slot_off, "that_" + next_desc);
775  } else {
776  increment_agg_val_idx_maybe(1);
777  this_targets_ptr = ir_reduce_one_entry->add<GetElementPtr>(
778  this_targets_ptr, slot_off, "this_" + next_desc);
779  that_targets_ptr = ir_reduce_one_entry->add<GetElementPtr>(
780  that_targets_ptr, slot_off, "that_" + next_desc);
781  }
782  }
783  }
784  ir_reduce_one_entry->add<Ret>(
785  ir_reduce_one_entry->addConstant<ConstantInt>(0, Type::Int32));
786 }
787 
789  const ReductionCode& reduction_code) const {
790  auto ir_reduce_one_entry = reduction_code.ir_reduce_one_entry.get();
791  const auto this_targets_ptr_arg = ir_reduce_one_entry->arg(0);
792  const auto that_targets_ptr_arg = ir_reduce_one_entry->arg(1);
793  Value* this_ptr1 = this_targets_ptr_arg;
794  Value* that_ptr1 = that_targets_ptr_arg;
795  size_t j = 0;
796  size_t init_agg_val_idx = 0;
797  for (size_t target_logical_idx = 0; target_logical_idx < targets_.size();
798  ++target_logical_idx) {
799  const auto& target_info = targets_[target_logical_idx];
800  Value* this_ptr2{nullptr};
801  Value* that_ptr2{nullptr};
802  if (target_info.is_agg &&
803  (target_info.agg_kind == kAVG ||
804  (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()))) {
805  const auto desc = "target_" + std::to_string(target_logical_idx) + "_second_slot";
806  const auto second_slot_rel_off =
807  ir_reduce_one_entry->addConstant<ConstantInt>(sizeof(int64_t), Type::Int32);
808  this_ptr2 = ir_reduce_one_entry->add<GetElementPtr>(
809  this_ptr1, second_slot_rel_off, "this_" + desc);
810  that_ptr2 = ir_reduce_one_entry->add<GetElementPtr>(
811  that_ptr1, second_slot_rel_off, "that_" + desc);
812  }
813  reduceOneSlot(this_ptr1,
814  this_ptr2,
815  that_ptr1,
816  that_ptr2,
817  target_info,
818  target_logical_idx,
819  j,
820  init_agg_val_idx,
821  j,
822  ir_reduce_one_entry);
823  if (target_logical_idx + 1 == targets_.size()) {
824  break;
825  }
827  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
828  } else {
829  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) < 0) {
830  init_agg_val_idx = advance_slot(init_agg_val_idx, target_info, false);
831  }
832  }
833  j = advance_slot(j, target_info, false);
834  const auto next_desc =
835  "target_" + std::to_string(target_logical_idx + 1) + "_first_slot";
836  auto next_slot_rel_off = ir_reduce_one_entry->addConstant<ConstantInt>(
837  init_agg_val_idx * sizeof(int64_t), Type::Int32);
838  this_ptr1 = ir_reduce_one_entry->add<GetElementPtr>(
839  this_targets_ptr_arg, next_slot_rel_off, next_desc);
840  that_ptr1 = ir_reduce_one_entry->add<GetElementPtr>(
841  that_targets_ptr_arg, next_slot_rel_off, next_desc);
842  }
843  ir_reduce_one_entry->add<Ret>(
844  ir_reduce_one_entry->addConstant<ConstantInt>(0, Type::Int32));
845 }
846 
848  const ReductionCode& reduction_code) const {
849  auto ir_reduce_one_entry_idx = reduction_code.ir_reduce_one_entry_idx.get();
854  const auto this_buff = ir_reduce_one_entry_idx->arg(0);
855  const auto that_buff = ir_reduce_one_entry_idx->arg(1);
856  const auto entry_idx = ir_reduce_one_entry_idx->arg(2);
857  const auto this_qmd_handle = ir_reduce_one_entry_idx->arg(4);
858  const auto that_qmd_handle = ir_reduce_one_entry_idx->arg(5);
859  const auto serialized_varlen_buffer_arg = ir_reduce_one_entry_idx->arg(6);
860  const auto row_bytes = ir_reduce_one_entry_idx->addConstant<ConstantInt>(
862  const auto entry_idx_64 = ir_reduce_one_entry_idx->add<Cast>(
863  Cast::CastOp::SExt, entry_idx, Type::Int64, "entry_idx_64");
864  const auto row_off_in_bytes = ir_reduce_one_entry_idx->add<BinaryOperator>(
865  BinaryOperator::BinaryOp::Mul, entry_idx_64, row_bytes, "row_off_in_bytes");
866  const auto this_row_ptr = ir_reduce_one_entry_idx->add<GetElementPtr>(
867  this_buff, row_off_in_bytes, "this_row_ptr");
868  const auto that_row_ptr = ir_reduce_one_entry_idx->add<GetElementPtr>(
869  that_buff, row_off_in_bytes, "that_row_ptr");
870  const auto reduce_rc = ir_reduce_one_entry_idx->add<Call>(
871  reduction_code.ir_reduce_one_entry.get(),
872  std::vector<const Value*>{this_row_ptr,
873  that_row_ptr,
874  this_qmd_handle,
875  that_qmd_handle,
876  serialized_varlen_buffer_arg},
877  "");
878  ir_reduce_one_entry_idx->add<Ret>(reduce_rc);
879 }
880 
882  const ReductionCode& reduction_code) const {
883  auto ir_reduce_one_entry_idx = reduction_code.ir_reduce_one_entry_idx.get();
888  const auto this_buff = ir_reduce_one_entry_idx->arg(0);
889  const auto that_buff = ir_reduce_one_entry_idx->arg(1);
890  const auto that_entry_idx = ir_reduce_one_entry_idx->arg(2);
891  const auto that_entry_count = ir_reduce_one_entry_idx->arg(3);
892  const auto this_qmd_handle = ir_reduce_one_entry_idx->arg(4);
893  const auto that_qmd_handle = ir_reduce_one_entry_idx->arg(5);
894  const auto serialized_varlen_buffer_arg = ir_reduce_one_entry_idx->arg(6);
895  const auto row_bytes = ir_reduce_one_entry_idx->addConstant<ConstantInt>(
897  const auto that_entry_idx_64 = ir_reduce_one_entry_idx->add<Cast>(
898  Cast::CastOp::SExt, that_entry_idx, Type::Int64, "that_entry_idx_64");
899  const auto that_row_off_in_bytes =
900  ir_reduce_one_entry_idx->add<BinaryOperator>(BinaryOperator::BinaryOp::Mul,
901  that_entry_idx_64,
902  row_bytes,
903  "that_row_off_in_bytes");
904  const auto that_row_ptr = ir_reduce_one_entry_idx->add<GetElementPtr>(
905  that_buff, that_row_off_in_bytes, "that_row_ptr");
906  const auto that_is_empty =
907  ir_reduce_one_entry_idx->add<Call>(reduction_code.ir_is_empty.get(),
908  std::vector<const Value*>{that_row_ptr},
909  "that_is_empty");
910  ir_reduce_one_entry_idx->add<ReturnEarly>(
911  that_is_empty,
912  ir_reduce_one_entry_idx->addConstant<ConstantInt>(0, Type::Int32),
913  "");
914  const auto key_count = query_mem_desc_.getGroupbyColCount();
915  const auto one_element =
916  ir_reduce_one_entry_idx->addConstant<ConstantInt>(1, Type::Int32);
917  const auto this_targets_ptr_i64_ptr = ir_reduce_one_entry_idx->add<Alloca>(
918  Type::Int64Ptr, one_element, "this_targets_ptr_out");
919  const auto this_is_empty_ptr =
920  ir_reduce_one_entry_idx->add<Alloca>(Type::Int8, one_element, "this_is_empty_out");
921  ir_reduce_one_entry_idx->add<ExternalCall>(
922  "get_group_value_reduction_rt",
923  Type::Void,
924  std::vector<const Value*>{
925  this_buff,
926  that_row_ptr,
927  ir_reduce_one_entry_idx->addConstant<ConstantInt>(key_count, Type::Int32),
928  this_qmd_handle,
929  that_buff,
930  that_entry_idx,
931  that_entry_count,
932  row_bytes,
933  this_targets_ptr_i64_ptr,
934  this_is_empty_ptr},
935  "");
936  const auto this_targets_ptr_i64 = ir_reduce_one_entry_idx->add<Load>(
937  this_targets_ptr_i64_ptr, "this_targets_ptr_i64");
938  auto this_is_empty =
939  ir_reduce_one_entry_idx->add<Load>(this_is_empty_ptr, "this_is_empty");
940  this_is_empty = ir_reduce_one_entry_idx->add<Cast>(
941  Cast::CastOp::Trunc, this_is_empty, Type::Int1, "this_is_empty_bool");
942  ir_reduce_one_entry_idx->add<ReturnEarly>(
943  this_is_empty,
944  ir_reduce_one_entry_idx->addConstant<ConstantInt>(0, Type::Int32),
945  "");
946  const auto key_qw_count = get_slot_off_quad(query_mem_desc_);
947  const auto this_targets_ptr = ir_reduce_one_entry_idx->add<Cast>(
948  Cast::CastOp::BitCast, this_targets_ptr_i64, Type::Int8Ptr, "this_targets_ptr");
949  const auto key_byte_count = key_qw_count * sizeof(int64_t);
950  const auto key_byte_count_lv =
951  ir_reduce_one_entry_idx->addConstant<ConstantInt>(key_byte_count, Type::Int32);
952  const auto that_targets_ptr = ir_reduce_one_entry_idx->add<GetElementPtr>(
953  that_row_ptr, key_byte_count_lv, "that_targets_ptr");
954  const auto reduce_rc = ir_reduce_one_entry_idx->add<Call>(
955  reduction_code.ir_reduce_one_entry.get(),
956  std::vector<const Value*>{this_targets_ptr,
957  that_targets_ptr,
958  this_qmd_handle,
959  that_qmd_handle,
960  serialized_varlen_buffer_arg},
961  "");
962  ir_reduce_one_entry_idx->add<Ret>(reduce_rc);
963 }
964 
965 namespace {
966 
967 void generate_loop_body(For* for_loop,
968  Function* ir_reduce_loop,
969  Function* ir_reduce_one_entry_idx,
970  Value* this_buff,
971  Value* that_buff,
972  Value* start_index,
973  Value* that_entry_count,
974  Value* this_qmd_handle,
975  Value* that_qmd_handle,
976  Value* serialized_varlen_buffer) {
977  const auto that_entry_idx = for_loop->add<BinaryOperator>(
978  BinaryOperator::BinaryOp::Add, for_loop->iter(), start_index, "that_entry_idx");
979  const auto sample_seed =
980  for_loop->add<Cast>(Cast::CastOp::SExt, that_entry_idx, Type::Int64, "");
982  const auto checker_rt_name =
983  g_enable_dynamic_watchdog ? "check_watchdog_rt" : "check_interrupt_rt";
984  const auto error_code = g_enable_dynamic_watchdog ? WATCHDOG_ERROR : INTERRUPT_ERROR;
985  const auto checker_triggered = for_loop->add<ExternalCall>(
986  checker_rt_name, Type::Int8, std::vector<const Value*>{sample_seed}, "");
987  const auto interrupt_triggered_bool =
988  for_loop->add<ICmp>(ICmp::Predicate::NE,
989  checker_triggered,
990  ir_reduce_loop->addConstant<ConstantInt>(0, Type::Int8),
991  "");
992  for_loop->add<ReturnEarly>(
993  interrupt_triggered_bool,
994  ir_reduce_loop->addConstant<ConstantInt>(error_code, Type::Int32),
995  "");
996  }
997  const auto reduce_rc =
998  for_loop->add<Call>(ir_reduce_one_entry_idx,
999  std::vector<const Value*>{this_buff,
1000  that_buff,
1001  that_entry_idx,
1002  that_entry_count,
1003  this_qmd_handle,
1004  that_qmd_handle,
1005  serialized_varlen_buffer},
1006  "");
1007 
1008  auto reduce_rc_bool =
1009  for_loop->add<ICmp>(ICmp::Predicate::NE,
1010  reduce_rc,
1011  ir_reduce_loop->addConstant<ConstantInt>(0, Type::Int32),
1012  "");
1013  for_loop->add<ReturnEarly>(reduce_rc_bool, reduce_rc, "");
1014 }
1015 
1016 } // namespace
1017 
1018 void ResultSetReductionJIT::reduceLoop(const ReductionCode& reduction_code) const {
1019  auto ir_reduce_loop = reduction_code.ir_reduce_loop.get();
1020  const auto this_buff_arg = ir_reduce_loop->arg(0);
1021  const auto that_buff_arg = ir_reduce_loop->arg(1);
1022  const auto start_index_arg = ir_reduce_loop->arg(2);
1023  const auto end_index_arg = ir_reduce_loop->arg(3);
1024  const auto that_entry_count_arg = ir_reduce_loop->arg(4);
1025  const auto this_qmd_handle_arg = ir_reduce_loop->arg(5);
1026  const auto that_qmd_handle_arg = ir_reduce_loop->arg(6);
1027  const auto serialized_varlen_buffer_arg = ir_reduce_loop->arg(7);
1028  For* for_loop =
1029  static_cast<For*>(ir_reduce_loop->add<For>(start_index_arg, end_index_arg, ""));
1030  generate_loop_body(for_loop,
1031  ir_reduce_loop,
1032  reduction_code.ir_reduce_one_entry_idx.get(),
1033  this_buff_arg,
1034  that_buff_arg,
1035  start_index_arg,
1036  that_entry_count_arg,
1037  this_qmd_handle_arg,
1038  that_qmd_handle_arg,
1039  serialized_varlen_buffer_arg);
1040  ir_reduce_loop->add<Ret>(ir_reduce_loop->addConstant<ConstantInt>(0, Type::Int32));
1041 }
1042 
1044  Value* this_ptr2,
1045  Value* that_ptr1,
1046  Value* that_ptr2,
1047  const TargetInfo& target_info,
1048  const size_t target_logical_idx,
1049  const size_t target_slot_idx,
1050  const size_t init_agg_val_idx,
1051  const size_t first_slot_idx_for_target,
1052  Function* ir_reduce_one_entry) const {
1054  if (query_mem_desc_.getTargetGroupbyIndex(target_logical_idx) >= 0) {
1055  return;
1056  }
1057  }
1058  const bool float_argument_input = takes_float_argument(target_info);
1059  const auto chosen_bytes = result_set::get_width_for_slot(
1060  target_slot_idx, float_argument_input, query_mem_desc_);
1061  CHECK_LT(init_agg_val_idx, target_init_vals_.size());
1062  auto init_val = target_init_vals_[init_agg_val_idx];
1063  if (target_info.is_agg &&
1064  (target_info.agg_kind != kSINGLE_VALUE && target_info.agg_kind != kSAMPLE)) {
1065  reduceOneAggregateSlot(this_ptr1,
1066  this_ptr2,
1067  that_ptr1,
1068  that_ptr2,
1069  target_info,
1070  target_logical_idx,
1071  target_slot_idx,
1072  init_val,
1073  chosen_bytes,
1074  ir_reduce_one_entry);
1075  } else if (target_info.agg_kind == kSINGLE_VALUE) {
1076  const auto checked_rc = emit_checked_write_projection(
1077  this_ptr1, that_ptr1, init_val, chosen_bytes, ir_reduce_one_entry);
1078 
1079  auto checked_rc_bool = ir_reduce_one_entry->add<ICmp>(
1081  checked_rc,
1082  ir_reduce_one_entry->addConstant<ConstantInt>(0, Type::Int32),
1083  "");
1084 
1085  ir_reduce_one_entry->add<ReturnEarly>(checked_rc_bool, checked_rc, "");
1086 
1087  } else {
1089  this_ptr1, that_ptr1, init_val, chosen_bytes, ir_reduce_one_entry);
1090  if (target_info.agg_kind == kSAMPLE && target_info.sql_type.is_varlen()) {
1091  CHECK(this_ptr2 && that_ptr2);
1092  size_t length_to_elems{0};
1093  if (target_info.sql_type.is_geometry()) {
1094  // TODO: Assumes hard-coded sizes for geometry targets
1095  length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1096  } else {
1097  const auto& elem_ti = target_info.sql_type.get_elem_type();
1098  length_to_elems = target_info.sql_type.is_string() ? 1 : elem_ti.get_size();
1099  }
1100  const auto serialized_varlen_buffer_arg = ir_reduce_one_entry->arg(4);
1101  ir_reduce_one_entry->add<ExternalCall>(
1102  "serialized_varlen_buffer_sample",
1103  Type::Void,
1104  std::vector<const Value*>{
1105  serialized_varlen_buffer_arg,
1106  this_ptr1,
1107  this_ptr2,
1108  that_ptr1,
1109  that_ptr2,
1110  ir_reduce_one_entry->addConstant<ConstantInt>(init_val, Type::Int64),
1111  ir_reduce_one_entry->addConstant<ConstantInt>(length_to_elems,
1112  Type::Int64)},
1113  "");
1114  }
1115  }
1116 }
1117 
1119  Value* this_ptr2,
1120  Value* that_ptr1,
1121  Value* that_ptr2,
1122  const TargetInfo& target_info,
1123  const size_t target_logical_idx,
1124  const size_t target_slot_idx,
1125  const int64_t init_val,
1126  const int8_t chosen_bytes,
1127  Function* ir_reduce_one_entry) const {
1128  switch (target_info.agg_kind) {
1129  case kCOUNT:
1130  case kAPPROX_COUNT_DISTINCT: {
1131  if (is_distinct_target(target_info)) {
1132  CHECK_EQ(static_cast<size_t>(chosen_bytes), sizeof(int64_t));
1134  this_ptr1, that_ptr1, target_logical_idx, ir_reduce_one_entry);
1135  break;
1136  }
1137  CHECK_EQ(int64_t(0), init_val);
1138  emit_aggregate_one_count(this_ptr1, that_ptr1, chosen_bytes, ir_reduce_one_entry);
1139  break;
1140  }
1141  case kAPPROX_QUANTILE:
1142  CHECK_EQ(chosen_bytes, static_cast<int8_t>(sizeof(int64_t)));
1144  this_ptr1, that_ptr1, target_logical_idx, ir_reduce_one_entry);
1145  break;
1146  case kAVG: {
1147  // Ignore float argument compaction for count component for fear of its overflow
1148  emit_aggregate_one_count(this_ptr2,
1149  that_ptr2,
1150  query_mem_desc_.getPaddedSlotWidthBytes(target_slot_idx),
1151  ir_reduce_one_entry);
1152  }
1153  // fall thru
1154  case kSUM: {
1156  this_ptr1,
1157  that_ptr1,
1158  init_val,
1159  chosen_bytes,
1160  target_info,
1161  ir_reduce_one_entry);
1162  break;
1163  }
1164  case kMIN: {
1166  this_ptr1,
1167  that_ptr1,
1168  init_val,
1169  chosen_bytes,
1170  target_info,
1171  ir_reduce_one_entry);
1172  break;
1173  }
1174  case kMAX: {
1176  this_ptr1,
1177  that_ptr1,
1178  init_val,
1179  chosen_bytes,
1180  target_info,
1181  ir_reduce_one_entry);
1182  break;
1183  }
1184  default:
1185  LOG(FATAL) << "Invalid aggregate type";
1186  }
1187 }
1188 
1190  Value* this_ptr1,
1191  Value* that_ptr1,
1192  const size_t target_logical_idx,
1193  Function* ir_reduce_one_entry) const {
1195  const auto old_set_handle = emit_load_i64(this_ptr1, ir_reduce_one_entry);
1196  const auto new_set_handle = emit_load_i64(that_ptr1, ir_reduce_one_entry);
1197  const auto this_qmd_arg = ir_reduce_one_entry->arg(2);
1198  const auto that_qmd_arg = ir_reduce_one_entry->arg(3);
1199  ir_reduce_one_entry->add<ExternalCall>(
1200  "count_distinct_set_union_jit_rt",
1201  Type::Void,
1202  std::vector<const Value*>{
1203  new_set_handle,
1204  old_set_handle,
1205  that_qmd_arg,
1206  this_qmd_arg,
1207  ir_reduce_one_entry->addConstant<ConstantInt>(target_logical_idx, Type::Int64)},
1208  "");
1209 }
1210 
1212  Value* this_ptr1,
1213  Value* that_ptr1,
1214  const size_t target_logical_idx,
1215  Function* ir_reduce_one_entry) const {
1217  const auto old_set_handle = emit_load_i64(this_ptr1, ir_reduce_one_entry);
1218  const auto new_set_handle = emit_load_i64(that_ptr1, ir_reduce_one_entry);
1219  const auto this_qmd_arg = ir_reduce_one_entry->arg(2);
1220  const auto that_qmd_arg = ir_reduce_one_entry->arg(3);
1221  ir_reduce_one_entry->add<ExternalCall>(
1222  "approx_quantile_jit_rt",
1223  Type::Void,
1224  std::vector<const Value*>{
1225  new_set_handle,
1226  old_set_handle,
1227  that_qmd_arg,
1228  this_qmd_arg,
1229  ir_reduce_one_entry->addConstant<ConstantInt>(target_logical_idx, Type::Int64)},
1230  "");
1231 }
1232 
1234  ReductionCode& reduction_code,
1235  const llvm::Function* ir_is_empty,
1236  const llvm::Function* ir_reduce_one_entry,
1237  const llvm::Function* ir_reduce_one_entry_idx,
1238  const CodeCacheKey& key) const {
1239  CompilationOptions co{
1241  auto executor = Executor::getExecutor(executor_id_);
1242  auto& s_code_cache = executor->s_code_cache;
1243 #ifdef NDEBUG
1244  LOG(IR) << "Reduction Loop:\n"
1245  << serialize_llvm_object(reduction_code.llvm_reduce_loop);
1246  LOG(IR) << "Reduction Is Empty Func:\n" << serialize_llvm_object(ir_is_empty);
1247  LOG(IR) << "Reduction One Entry Func:\n" << serialize_llvm_object(ir_reduce_one_entry);
1248  LOG(IR) << "Reduction One Entry Idx Func:\n"
1249  << serialize_llvm_object(ir_reduce_one_entry_idx);
1250 #else
1251  LOG(IR) << serialize_llvm_object(reduction_code.module);
1252 #endif
1253 
1255  reduction_code.llvm_reduce_loop, {reduction_code.llvm_reduce_loop}, co);
1256  auto cpu_compilation_context = std::make_shared<CpuCompilationContext>(std::move(ee));
1257  cpu_compilation_context->setFunctionPointer(reduction_code.llvm_reduce_loop);
1258  reduction_code.func_ptr =
1259  reinterpret_cast<ReductionCode::FuncPtr>(cpu_compilation_context->func());
1260  CHECK(reduction_code.llvm_reduce_loop->getParent() == reduction_code.module);
1262  key, cpu_compilation_context, reduction_code.module, s_code_cache);
1263 }
1264 
1265 namespace {
1266 
1267 std::string target_info_key(const TargetInfo& target_info) {
1268  return std::to_string(target_info.is_agg) + "\n" +
1269  std::to_string(target_info.agg_kind) + "\n" +
1270  target_info.sql_type.get_type_name() + "\n" +
1271  std::to_string(target_info.sql_type.get_notnull()) + "\n" +
1272  target_info.agg_arg_type.get_type_name() + "\n" +
1273  std::to_string(target_info.agg_arg_type.get_notnull()) + "\n" +
1274  std::to_string(target_info.skip_null_val) + "\n" +
1275  std::to_string(target_info.is_distinct);
1276 }
1277 
1278 } // namespace
1279 
1280 std::string ResultSetReductionJIT::cacheKey() const {
1281  std::vector<std::string> target_init_vals_strings;
1283  target_init_vals_.end(),
1284  std::back_inserter(target_init_vals_strings),
1285  [](const int64_t v) { return std::to_string(v); });
1286  const auto target_init_vals_key =
1287  boost::algorithm::join(target_init_vals_strings, ", ");
1288  std::vector<std::string> targets_strings;
1290  targets_.begin(),
1291  targets_.end(),
1292  std::back_inserter(targets_strings),
1293  [](const TargetInfo& target_info) { return target_info_key(target_info); });
1294  const auto targets_key = boost::algorithm::join(targets_strings, ", ");
1295  return query_mem_desc_.reductionKey() + "\n" + target_init_vals_key + "\n" +
1296  targets_key;
1297 }
1298 
1300  const auto hash_type = query_mem_desc_.getQueryDescriptionType();
1301  auto reduction_code = setup_functions_ir(hash_type);
1303  isEmpty(reduction_code);
1304  reduceOneEntryNoCollisions(reduction_code);
1305  reduceOneEntryNoCollisionsIdx(reduction_code);
1306  reduceLoop(reduction_code);
1307  auto executor = Executor::getExecutor(executor_id_);
1308  auto cgen_state_ = std::unique_ptr<CgenState>(new CgenState({}, false, executor.get()));
1309  auto cgen_state = reduction_code.cgen_state = cgen_state_.get();
1310  // CHECK(executor->thread_id_ == logger::thread_id()); // do we need compilation mutex?
1311  cgen_state->set_module_shallow_copy(executor->get_rt_module());
1312  reduction_code.module = cgen_state->module_;
1313 
1314  AUTOMATIC_IR_METADATA(cgen_state);
1315  auto ir_is_empty = create_llvm_function(reduction_code.ir_is_empty.get(), cgen_state);
1316  auto ir_reduce_one_entry =
1317  create_llvm_function(reduction_code.ir_reduce_one_entry.get(), cgen_state);
1318  auto ir_reduce_one_entry_idx =
1319  create_llvm_function(reduction_code.ir_reduce_one_entry_idx.get(), cgen_state);
1320  auto ir_reduce_loop =
1321  create_llvm_function(reduction_code.ir_reduce_loop.get(), cgen_state);
1322  std::unordered_map<const Function*, llvm::Function*> f;
1323  f.emplace(reduction_code.ir_is_empty.get(), ir_is_empty);
1324  f.emplace(reduction_code.ir_reduce_one_entry.get(), ir_reduce_one_entry);
1325  f.emplace(reduction_code.ir_reduce_one_entry_idx.get(), ir_reduce_one_entry_idx);
1326  f.emplace(reduction_code.ir_reduce_loop.get(), ir_reduce_loop);
1327  translate_function(reduction_code.ir_is_empty.get(), ir_is_empty, reduction_code, f);
1329  reduction_code.ir_reduce_one_entry.get(), ir_reduce_one_entry, reduction_code, f);
1330  translate_function(reduction_code.ir_reduce_one_entry_idx.get(),
1331  ir_reduce_one_entry_idx,
1332  reduction_code,
1333  f);
1335  reduction_code.ir_reduce_loop.get(), ir_reduce_loop, reduction_code, f);
1336  reduction_code.llvm_reduce_loop = ir_reduce_loop;
1337  reduction_code.cgen_state = nullptr;
1338  return reduction_code;
1339 }
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
void emit_aggregate_one_nullable_value(const std::string &agg_kind, Value *val_ptr, Value *other_ptr, const int64_t init_val, const size_t chosen_bytes, const TargetInfo &agg_info, Function *ir_reduce_one_entry)
#define CHECK_EQ(x, y)
Definition: Logger.h:219
CgenState * cgen_state
void reduceOneSlot(Value *this_ptr1, Value *this_ptr2, Value *that_ptr1, Value *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const size_t first_slot_idx_for_target, Function *ir_reduce_one_entry) const
bool is_aggregate_query(const QueryDescriptionType hash_type)
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
__device__ bool dynamic_watchdog()
#define EMPTY_KEY_64
const std::string & label() const
RUNTIME_EXPORT uint8_t check_watchdog_rt(const size_t sample_seed)
void reduceOneEntryNoCollisions(const ReductionCode &reduction_code) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
void varlen_buffer_sample(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const int64_t init_val)
std::unique_ptr< Function > ir_reduce_loop
Value * emit_read_int_from_buff(Value *ptr, const int8_t compact_sz, Function *function)
void reduceOneEntryBaselineIdx(const ReductionCode &reduction_code) const
SQLTypeInfo sql_type
Definition: TargetInfo.h:51
#define LOG(tag)
Definition: Logger.h:205
void mark_function_always_inline(llvm::Function *func)
void reduceLoop(const ReductionCode &reduction_code) const
bool is_varlen() const
Definition: sqltypes.h:545
string name
Definition: setup.in.py:72
std::string join(T const &container, std::string const &delim)
llvm::Function * llvm_reduce_loop
void reduceOneEntryNoCollisionsIdx(const ReductionCode &reduction_code) const
#define CHECK_GE(x, y)
Definition: Logger.h:224
std::vector< std::string > CodeCacheKey
Definition: CodeCache.h:25
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
std::string cacheKey() const
size_t getEffectiveKeyWidth() const
std::unique_ptr< Function > ir_reduce_one_entry
bool g_enable_dynamic_watchdog
Definition: Execute.cpp:78
static ExecutionEngineWrapper generateNativeCPUCode(llvm::Function *func, const std::unordered_set< llvm::Function * > &live_funcs, const CompilationOptions &co)
const std::vector< int64_t > target_init_vals_
void reduceOneAggregateSlot(Value *this_ptr1, Value *this_ptr2, Value *that_ptr1, Value *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const int64_t init_val, const int8_t chosen_bytes, Function *ir_reduce_one_entry) const
bool takes_float_argument(const TargetInfo &target_info)
Definition: TargetInfo.h:157
bool g_enable_non_kernel_time_query_interrupt
Definition: Execute.cpp:121
Value * add(Args &&...args)
bool skip_null_val
Definition: TargetInfo.h:53
const Value * emit_checked_write_projection(Value *slot_pi8, Value *other_pi8, const int64_t init_val, const size_t chosen_bytes, Function *ir_reduce_one_entry)
std::unique_ptr< Function > setup_reduce_one_entry_idx(ReductionCode *reduction_code)
int32_t(*)(int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer) FuncPtr
const QueryMemoryDescriptor query_mem_desc_
std::unique_ptr< Function > ir_is_empty
std::string to_string(char const *&&v)
SQLTypeInfo agg_arg_type
Definition: TargetInfo.h:52
void translate_function(const Function *function, llvm::Function *llvm_function, const ReductionCode &reduction_code, const std::unordered_map< const Function *, llvm::Function * > &f)
void emit_aggregate_one_value(const std::string &agg_kind, Value *val_ptr, Value *other_ptr, const size_t chosen_bytes, const TargetInfo &agg_info, Function *ir_reduce_one_entry)
RUNTIME_EXPORT void serialized_varlen_buffer_sample(const void *serialized_varlen_buffer_handle, int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const int64_t init_val, const int64_t length_to_elems)
Value * emit_load_i32(Value *ptr, Function *function)
Definition: sqldefs.h:73
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:388
const SQLTypeInfo get_compact_type(const TargetInfo &target)
__device__ bool check_interrupt()
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
llvm::Module * module_
Definition: CgenState.h:343
llvm::LLVMContext & context_
Definition: CgenState.h:352
size_t get_byteoff_of_slot(const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc)
bool is_agg
Definition: TargetInfo.h:49
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void reduceOneCountDistinctSlot(Value *this_ptr1, Value *that_ptr1, const size_t target_logical_idx, Function *ir_reduce_one_entry) const
s_code_cache(code_cache_size)
uint8_t check_interrupt_rt(const size_t sample_seed)
size_t getGroupbyColCount() const
size_t targetGroupbyIndicesSize() const
void generate_loop_body(For *for_loop, Function *ir_reduce_loop, Function *ir_reduce_one_entry_idx, Value *this_buff, Value *that_buff, Value *start_index, Value *that_entry_count, Value *this_qmd_handle, Value *that_qmd_handle, Value *serialized_varlen_buffer)
void emit_write_projection(Value *slot_pi8, Value *other_pi8, const int64_t init_val, const size_t chosen_bytes, Function *ir_reduce_one_entry)
Definition: sqldefs.h:75
ReductionCode codegen() const override
bool is_distinct_target(const TargetInfo &target_info)
Definition: TargetInfo.h:153
std::string target_info_key(const TargetInfo &target_info)
OUTPUT transform(INPUT const &input, FUNC const &func)
Definition: misc.h:290
std::unique_ptr< Function > ir_reduce_one_entry_idx
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
ReductionCode setup_functions_ir(const QueryDescriptionType hash_type)
DEVICE void allocate()
Definition: quantile.h:590
#define AUTOMATIC_IR_METADATA(CGENSTATE)
std::unique_ptr< Function > setup_is_empty_entry(ReductionCode *reduction_code)
SQLAgg agg_kind
Definition: TargetInfo.h:50
size_t getCountDistinctDescriptorsSize() const
void reduceOneEntryTargetsNoCollisions(Function *ir_reduce_one_entry, Value *this_targets_start_ptr, Value *that_targets_start_ptr) const
QueryDescriptionType getQueryDescriptionType() const
#define AUTOMATIC_IR_METADATA_DONE()
#define UNLIKELY(x)
Definition: likely.h:25
void set_module_shallow_copy(const std::unique_ptr< llvm::Module > &module, bool always_clone=false)
Definition: CgenState.cpp:368
#define RUNTIME_EXPORT
llvm::Type * llvm_type(const Type type, llvm::LLVMContext &ctx)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
llvm::Module * module
#define CHECK_LT(x, y)
Definition: Logger.h:221
std::string serialize_llvm_object(const T *llvm_obj)
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
RUNTIME_EXPORT void count_distinct_set_union_jit_rt(const int64_t new_set_handle, const int64_t old_set_handle, const void *that_qmd_handle, const void *this_qmd_handle, const int64_t target_logical_idx)
static void addCodeToCache(const CodeCacheKey &, std::shared_ptr< CC >, llvm::Module *, CodeCache< CC > &)
void finalizeReductionCode(ReductionCode &reduction_code, const llvm::Function *ir_is_empty, const llvm::Function *ir_reduce_one_entry, const llvm::Function *ir_reduce_one_entry_idx, const CodeCacheKey &key) const
Definition: sqldefs.h:76
std::unique_ptr< Function > create_function(const std::string name, const std::vector< Function::NamedArg > &arg_types, const Type ret_type, const bool always_inline)
std::string get_type_name() const
Definition: sqltypes.h:442
const Value * iter() const
RUNTIME_EXPORT void approx_quantile_jit_rt(const int64_t new_set_handle, const int64_t old_set_handle, const void *that_qmd_handle, const void *this_qmd_handle, const int64_t target_logical_idx)
RUNTIME_EXPORT void get_group_value_reduction_rt(int8_t *groups_buffer, const int8_t *key, const uint32_t key_count, const void *this_qmd_handle, const int8_t *that_buff, const uint32_t that_entry_idx, const uint32_t that_entry_count, const uint32_t row_size_bytes, int64_t **buff_out, uint8_t *empty)
std::unique_ptr< Function > setup_reduce_one_entry(ReductionCode *reduction_code, const QueryDescriptionType hash_type)
void isEmpty(const ReductionCode &reduction_code) const
Value * emit_load_i64(Value *ptr, Function *function)
const ColSlotContext & getColSlotContext() const
#define CHECK(condition)
Definition: Logger.h:211
bool is_geometry() const
Definition: sqltypes.h:531
#define EMPTY_KEY_32
void reduceOneEntryBaseline(const ReductionCode &reduction_code) const
QueryDescriptionType
Definition: Types.h:26
char * f
Value * emit_load(Value *ptr, Type ptr_type, Function *function)
bool is_string() const
Definition: sqltypes.h:519
llvm::Function * create_llvm_function(const Function *function, CgenState *cgen_state)
ResultSetReductionJIT(const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &target_init_vals, const size_t executor_id)
bool is_distinct
Definition: TargetInfo.h:54
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
void emit_aggregate_one_count(Value *val_ptr, Value *other_ptr, const size_t chosen_bytes, Function *ir_reduce_one_entry)
Definition: sqldefs.h:74
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:861
Definition: sqldefs.h:72
void reduceOneApproxQuantileSlot(Value *this_ptr1, Value *that_ptr1, const size_t target_logical_idx, Function *ir_reduce_one_entry) const
std::unique_ptr< Function > setup_reduce_loop(ReductionCode *reduction_code)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
std::string reductionKey() const
const Executor * getExecutor() const
int32_t getTargetIdxForKey() const
const std::vector< TargetInfo > targets_