35 #include <llvm/ExecutionEngine/GenericValue.h>
46 return entry_count > 100000;
55 std::vector<int64_t>
make_key(
const int64_t* buff,
56 const size_t entry_count,
57 const size_t key_count) {
58 std::vector<int64_t> key;
60 for (
size_t i = 0; i < key_count; ++i) {
61 key.push_back(buff[off]);
68 const size_t dst_entry_count,
69 const int64_t* src_buff,
70 const size_t src_entry_idx,
71 const size_t src_entry_count,
76 for (
size_t i = 0, dst_slot_off = 0; i < slot_count;
77 ++i, dst_slot_off += dst_entry_count) {
78 dst_entry[dst_slot_off] =
82 const auto row_ptr = src_buff +
get_row_qw_count(query_mem_desc) * src_entry_idx;
84 for (
size_t i = 0; i < slot_count; ++i) {
85 dst_entry[i] = row_ptr[slot_off_quad + i];
92 for (
size_t i = 0; i < key_count; ++i) {
99 for (
size_t i = 0; i < key_count; ++i) {
105 const size_t comp_sz,
106 const size_t index = 0) {
107 int64_t ret = std::numeric_limits<int64_t>::min();
110 ret = group_by_buffer[index];
114 const int16_t* buffer_ptr =
reinterpret_cast<const int16_t*
>(group_by_buffer);
115 ret = buffer_ptr[index];
119 const int32_t* buffer_ptr =
reinterpret_cast<const int32_t*
>(group_by_buffer);
120 ret = buffer_ptr[index];
124 const int64_t* buffer_ptr =
reinterpret_cast<const int64_t*
>(group_by_buffer);
125 ret = buffer_ptr[index];
137 const int8_t* that_buff,
138 const int32_t start_entry_index,
139 const int32_t end_entry_index,
140 const int32_t that_entry_count,
141 const void* this_qmd,
142 const void* that_qmd,
143 const void* serialized_varlen_buffer) {
146 err = reduction_code.
func_ptr(this_buff,
153 serialized_varlen_buffer);
170 throw std::runtime_error(
"Multiple distinct values encountered");
173 throw std::runtime_error(
174 "Query execution has interrupted during result set reduction");
176 throw std::runtime_error(
177 "Query execution has exceeded the time limit or was interrupted during result "
185 const size_t key_count,
186 const size_t key_width) {
189 auto key_ptr_i32 =
reinterpret_cast<int32_t*
>(key_ptr);
194 auto key_ptr_i64 =
reinterpret_cast<int64_t*
>(key_ptr);
206 const std::vector<std::string>& serialized_varlen_buffer,
208 const size_t executor_id)
const {
222 CHECK_GE(entry_count, that_entry_count);
225 CHECK_EQ(entry_count, that_entry_count);
227 auto this_buff =
buff_;
229 auto that_buff = that.
buff_;
233 if (!serialized_varlen_buffer.empty()) {
234 throw std::runtime_error(
235 "Projection of variable length targets with baseline hash group by is not yet "
236 "supported in Distributed mode");
240 std::vector<std::future<void>> reduction_threads;
241 for (
size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
242 const auto thread_entry_count =
243 (that_entry_count + thread_count - 1) / thread_count;
244 const auto start_index = thread_idx * thread_entry_count;
245 const auto end_index =
246 std::min(start_index + thread_entry_count, that_entry_count);
270 for (
size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
272 this_buff, that_buff, entry_idx, that_entry_count, that);
277 for (
auto& reduction_thread : reduction_threads) {
278 reduction_thread.wait();
280 for (
auto& reduction_thread : reduction_threads) {
281 reduction_thread.get();
296 for (
size_t i = 0; i < that_entry_count; ++i) {
305 std::vector<std::future<void>> reduction_threads;
306 for (
size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
307 const auto thread_entry_count = (entry_count + thread_count - 1) / thread_count;
308 const auto start_index = thread_idx * thread_entry_count;
309 const auto end_index = std::min(start_index + thread_entry_count, entry_count);
310 if (query_mem_desc_.didOutputColumnar()) {
318 &serialized_varlen_buffer,
320 reduceEntriesNoCollisionsColWise(
326 serialized_varlen_buffer,
340 &serialized_varlen_buffer] {
341 CHECK(reduction_code.ir_reduce_loop);
351 &that.query_mem_desc_,
352 &serialized_varlen_buffer);
356 for (
auto& reduction_thread : reduction_threads) {
357 reduction_thread.wait();
359 for (
auto& reduction_thread : reduction_threads) {
360 reduction_thread.get();
363 if (query_mem_desc_.didOutputColumnar()) {
364 reduceEntriesNoCollisionsColWise(this_buff,
368 query_mem_desc_.getEntryCount(),
369 serialized_varlen_buffer,
372 CHECK(reduction_code.ir_reduce_loop);
381 &that.query_mem_desc_,
382 &serialized_varlen_buffer);
392 throw std::runtime_error(
393 "Query execution has exceeded the time limit or was interrupted during result "
401 throw std::runtime_error(
402 "Query execution has exceeded the time limit or was interrupted during result "
411 const int8_t* that_buff,
413 const size_t start_index,
414 const size_t end_index,
415 const std::vector<std::string>& serialized_varlen_buffer,
416 const size_t executor_id)
const {
419 CHECK(serialized_varlen_buffer.empty());
427 for (
size_t target_idx = 0; target_idx <
targets_.size(); ++target_idx) {
428 const auto& agg_info =
targets_[target_idx];
429 const auto& slots_for_col = col_slot_context.getSlotsForCol(target_idx);
431 bool two_slot_target{
false};
432 if (agg_info.is_agg &&
433 (agg_info.agg_kind ==
kAVG ||
434 (agg_info.agg_kind ==
kSAMPLE && agg_info.sql_type.is_varlen()))) {
438 two_slot_target =
true;
441 executor->checkNonKernelTimeInterrupted())) {
442 throw std::runtime_error(
443 "Query execution was interrupted during result set reduction");
448 for (
size_t target_slot_idx = slots_for_col.front();
449 target_slot_idx < slots_for_col.back() + 1;
450 target_slot_idx += 2) {
455 for (
size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
469 int8_t* this_ptr2{
nullptr};
470 const int8_t* that_ptr2{
nullptr};
488 slots_for_col.front(),
489 serialized_varlen_buffer);
492 this_crt_col_ptr = this_next_col_ptr;
493 that_crt_col_ptr = that_next_col_ptr;
510 const int8_t* that_buff)
const {
515 const auto column_offset_bytes =
517 auto lhs_key_ptr = this_buff + column_offset_bytes;
518 auto rhs_key_ptr = that_buff + column_offset_bytes;
521 *(
reinterpret_cast<int64_t*
>(lhs_key_ptr) + entry_idx) =
522 *(
reinterpret_cast<const int64_t*
>(rhs_key_ptr) + entry_idx);
525 *(
reinterpret_cast<int32_t*
>(lhs_key_ptr) + entry_idx) =
526 *(
reinterpret_cast<const int32_t*
>(rhs_key_ptr) + entry_idx);
529 *(
reinterpret_cast<int16_t*
>(lhs_key_ptr) + entry_idx) =
530 *(
reinterpret_cast<const int16_t*
>(rhs_key_ptr) + entry_idx);
533 *(
reinterpret_cast<int8_t*
>(lhs_key_ptr) + entry_idx) =
534 *(
reinterpret_cast<const int8_t*
>(rhs_key_ptr) + entry_idx);
546 const std::vector<std::string>& serialized_varlen_buffer)
const {
547 if (serialized_varlen_buffer.empty()) {
557 for (
size_t i = 0; i < entry_count; ++i) {
563 auto rowwise_targets_ptr =
565 size_t target_slot_idx = 0;
566 for (
size_t target_logical_idx = 0; target_logical_idx <
targets_.size();
567 ++target_logical_idx) {
568 const auto& target_info =
targets_[target_logical_idx];
569 if (target_info.sql_type.is_varlen() && target_info.is_agg) {
571 auto ptr1 = rowwise_targets_ptr;
572 auto slot_idx = target_slot_idx;
574 auto offset = *
reinterpret_cast<const int64_t*
>(ptr1);
576 const auto& elem_ti = target_info.sql_type.get_elem_type();
577 size_t length_to_elems =
578 target_info.sql_type.is_string() || target_info.sql_type.is_geometry()
580 : elem_ti.get_size();
581 if (target_info.sql_type.is_geometry()) {
582 for (
int j = 0; j < target_info.sql_type.get_physical_coord_cols(); j++) {
589 CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
590 const auto& varlen_bytes_str = serialized_varlen_buffer[offset++];
592 reinterpret_cast<const int8_t*
>(varlen_bytes_str.c_str());
594 *
reinterpret_cast<int64_t*
>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
596 *
reinterpret_cast<int64_t*
>(ptr2) =
597 static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
600 CHECK_LT(static_cast<size_t>(offset), serialized_varlen_buffer.size());
601 const auto& varlen_bytes_str = serialized_varlen_buffer[offset];
602 const auto str_ptr =
reinterpret_cast<const int8_t*
>(varlen_bytes_str.c_str());
604 *
reinterpret_cast<int64_t*
>(ptr1) = reinterpret_cast<const int64_t>(str_ptr);
606 *
reinterpret_cast<int64_t*
>(ptr2) =
607 static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
612 rowwise_targets_ptr, target_info, target_slot_idx,
query_mem_desc_,
false);
613 target_slot_idx =
advance_slot(target_slot_idx, target_info,
false);
623 #define mapd_cas(address, compare, val) \
624 InterlockedCompareExchange(reinterpret_cast<volatile long*>(address), \
625 static_cast<long>(val), \
626 static_cast<long>(compare))
628 #define mapd_cas(address, compare, val) __sync_val_compare_and_swap(address, compare, val)
634 const uint32_t key_qw_count,
635 const size_t entry_count) {
639 for (
size_t i = 0; i < key_qw_count; ++i) {
640 groups_buffer[off] = key[i];
643 return {&groups_buffer[off],
true};
646 for (
size_t i = 0; i < key_qw_count; ++i) {
647 if (groups_buffer[off] != key[i]) {
648 return {
nullptr,
true};
652 return {&groups_buffer[off],
false};
659 int64_t* groups_buffer,
660 const uint32_t groups_buffer_entry_count,
662 const uint32_t key_qw_count) {
663 uint32_t h =
key_hash(key, key_qw_count,
sizeof(int64_t)) % groups_buffer_entry_count;
665 groups_buffer, h, key, key_qw_count, groups_buffer_entry_count);
666 if (matching_gvi.first) {
669 uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
670 while (h_probe != h) {
672 groups_buffer, h_probe, key, key_qw_count, groups_buffer_entry_count);
673 if (matching_gvi.first) {
676 h_probe = (h_probe + 1) % groups_buffer_entry_count;
678 return {
nullptr,
true};
682 #define cas_cst(ptr, expected, desired) \
683 (InterlockedCompareExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
684 reinterpret_cast<void*>(&desired), \
685 expected) == expected)
686 #define store_cst(ptr, val) \
687 InterlockedExchangePointer(reinterpret_cast<void* volatile*>(ptr), \
688 reinterpret_cast<void*>(val))
689 #define load_cst(ptr) \
690 InterlockedCompareExchange(reinterpret_cast<volatile long*>(ptr), 0, 0)
692 #define cas_cst(ptr, expected, desired) \
693 __atomic_compare_exchange_n( \
694 ptr, expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
695 #define store_cst(ptr, val) __atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)
696 #define load_cst(ptr) __atomic_load_n(ptr, __ATOMIC_SEQ_CST)
699 template <
typename T =
int64_t>
701 int64_t* groups_buffer,
704 const uint32_t key_count,
706 const int64_t* that_buff_i64,
707 const size_t that_entry_idx,
708 const size_t that_entry_count,
709 const uint32_t row_size_quad) {
710 auto off = h * row_size_quad;
711 T empty_key = get_empty_key<T>();
712 T write_pending = get_empty_key<T>() - 1;
713 auto row_ptr =
reinterpret_cast<T*
>(groups_buffer + off);
715 const bool success =
cas_cst(row_ptr, &empty_key, write_pending);
717 fill_slots(groups_buffer + off + slot_off_quad,
724 memcpy(row_ptr + 1, key + 1, (key_count - 1) *
sizeof(
T));
727 return {groups_buffer + off + slot_off_quad,
true};
729 while (
load_cst(row_ptr) == write_pending) {
733 for (
size_t i = 0; i < key_count; ++i) {
734 if (
load_cst(row_ptr + i) != key[i]) {
735 return {
nullptr,
true};
738 return {groups_buffer + off + slot_off_quad,
false};
746 int64_t* groups_buffer,
749 const uint32_t key_count,
750 const size_t key_width,
752 const int64_t* that_buff_i64,
753 const size_t that_entry_idx,
754 const size_t that_entry_count,
755 const uint32_t row_size_quad) {
760 reinterpret_cast<const int32_t*>(key),
779 return {
nullptr,
true};
786 int64_t* groups_buffer,
787 const uint32_t groups_buffer_entry_count,
789 const uint32_t key_count,
790 const size_t key_width,
792 const int64_t* that_buff_i64,
793 const size_t that_entry_idx,
794 const size_t that_entry_count,
795 const uint32_t row_size_quad) {
796 uint32_t h =
key_hash(key, key_count, key_width) % groups_buffer_entry_count;
807 if (matching_gvi.first) {
810 uint32_t h_probe = (h + 1) % groups_buffer_entry_count;
811 while (h_probe != h) {
822 if (matching_gvi.first) {
825 h_probe = (h_probe + 1) % groups_buffer_entry_count;
827 return {
nullptr,
true};
833 const int8_t* that_buff,
834 const size_t that_entry_idx,
835 const size_t that_entry_count,
850 auto this_buff_i64 =
reinterpret_cast<int64_t*
>(this_buff);
851 auto that_buff_i64 =
reinterpret_cast<const int64_t*
>(that_buff);
852 const auto key =
make_key(&that_buff_i64[key_off], that_entry_count, key_count);
855 CHECK(this_entry_slots);
866 this_entry_slots, that_buff_i64, that_entry_idx, that_entry_count, that);
870 const int64_t* that_buff,
871 const size_t that_entry_idx,
872 const size_t that_entry_count,
877 size_t init_agg_val_idx = 0;
878 for (
size_t target_logical_idx = 0; target_logical_idx <
targets_.size();
879 ++target_logical_idx) {
880 const auto& target_info =
targets_[target_logical_idx];
882 that_entry_idx, init_agg_val_idx, key_count, that_entry_count);
895 init_agg_val_idx =
advance_slot(init_agg_val_idx, target_info,
false);
898 init_agg_val_idx =
advance_slot(init_agg_val_idx, target_info,
false);
906 const size_t this_slot,
907 const int64_t* that_buff,
908 const size_t that_entry_count,
909 const size_t that_slot,
911 const size_t target_logical_idx,
912 const size_t target_slot_idx,
913 const size_t init_agg_val_idx,
916 int8_t* this_ptr2{
nullptr};
917 const int8_t* that_ptr2{
nullptr};
922 const auto that_count_off = that_entry_count;
923 this_ptr2 =
reinterpret_cast<int8_t*
>(&this_buff[this_slot + this_count_off]);
924 that_ptr2 =
reinterpret_cast<const int8_t*
>(&that_buff[that_slot + that_count_off]);
926 reduceOneSlot(reinterpret_cast<int8_t*>(&this_buff[this_slot]),
928 reinterpret_cast<const int8_t*>(&that_buff[that_slot]),
942 template <
class KeyType>
944 const size_t new_entry_count)
const {
947 auto new_buff_i64 =
reinterpret_cast<int64_t*
>(new_buff);
951 const auto src_buff =
reinterpret_cast<const int64_t*
>(
buff_);
957 std::vector<std::future<void>> move_threads;
959 for (
size_t thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
960 const auto thread_entry_count =
962 const auto start_index = thread_idx * thread_entry_count;
963 const auto end_index =
976 for (
size_t entry_idx = start_index; entry_idx < end_index; ++entry_idx) {
977 moveOneEntryToBuffer<KeyType>(entry_idx,
987 for (
auto& move_thread : move_threads) {
990 for (
auto& move_thread : move_threads) {
995 moveOneEntryToBuffer<KeyType>(entry_idx,
1006 template <
class KeyType>
1008 int64_t* new_buff_i64,
1009 const size_t new_entry_count,
1010 const size_t key_count,
1011 const size_t row_qw_count,
1012 const int64_t* src_buff,
1013 const size_t key_byte_width)
const {
1014 const auto key_off =
1017 : row_qw_count * entry_index;
1018 const auto key_ptr =
reinterpret_cast<const KeyType*
>(&src_buff[key_off]);
1019 if (*key_ptr == get_empty_key<KeyType>()) {
1022 int64_t* new_entries_ptr{
nullptr};
1036 CHECK(new_entries_ptr);
1046 if (query_mem_desc_.didOutputColumnar()) {
1047 storage_->initializeColWise();
1049 storage_->initializeRowWise();
1057 const size_t executor_id) {
1058 CHECK(!result_sets.empty());
1059 auto result_rs = result_sets.front();
1060 CHECK(result_rs->storage_);
1061 auto& first_result = *result_rs->storage_;
1062 auto result = &first_result;
1063 const auto row_set_mem_owner = result_rs->row_set_mem_owner_;
1064 for (
const auto result_set : result_sets) {
1065 CHECK_EQ(row_set_mem_owner, result_set->row_set_mem_owner_);
1067 const auto catalog = result_rs->catalog_;
1068 for (
const auto result_set : result_sets) {
1069 CHECK_EQ(catalog, result_set->catalog_);
1071 if (first_result.query_mem_desc_.getQueryDescriptionType() ==
1073 const auto total_entry_count =
1077 [](
const size_t init,
const ResultSet* rs) {
1078 return init + rs->query_mem_desc_.getEntryCount();
1080 CHECK(total_entry_count);
1083 rs_.reset(
new ResultSet(first_result.targets_,
1090 auto result_storage =
rs_->allocateStorage(first_result.target_init_vals_);
1091 rs_->initializeStorage();
1094 first_result.moveEntriesToBuffer<int32_t>(result_storage->getUnderlyingBuffer(),
1098 first_result.moveEntriesToBuffer<int64_t>(result_storage->getUnderlyingBuffer(),
1105 result_rs =
rs_.get();
1108 auto& serialized_varlen_buffer = result_sets.front()->serialized_varlen_buffer_;
1109 if (!serialized_varlen_buffer.empty()) {
1110 result->rewriteAggregateBufferOffsets(serialized_varlen_buffer.front());
1111 for (
auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1113 auto& result_serialized_varlen_buffer = (*result_it)->serialized_varlen_buffer_;
1114 CHECK_EQ(result_serialized_varlen_buffer.size(), size_t(1));
1115 serialized_varlen_buffer.emplace_back(
1116 std::move(result_serialized_varlen_buffer.front()));
1121 result_rs->getTargetInfos(),
1122 result_rs->getTargetInitVals(),
1124 auto reduction_code = reduction_jit.
codegen();
1126 for (
auto result_it = result_sets.begin() + 1; result_it != result_sets.end();
1128 if (!serialized_varlen_buffer.empty()) {
1129 result->reduce(*((*result_it)->storage_),
1130 serialized_varlen_buffer[ctr++],
1134 result->reduce(*((*result_it)->storage_), {}, reduction_code, executor_id);
1145 auto& result_storage = result_rs->storage_;
1146 result_storage->rewriteAggregateBufferOffsets(
1147 result_rs->serialized_varlen_buffer_.front());
1153 CHECK_EQ(slot_count + key_count, entry.size());
1154 auto this_buff =
reinterpret_cast<int64_t*
>(
buff_);
1159 for (
size_t i = 0; i < key_count; ++i) {
1160 this_buff[key_off + i] = entry[i];
1164 this_buff[first_slot_off + i] = entry[key_count + i];
1172 const auto key_bytes_with_padding =
1178 auto row_ptr =
buff_ + i * row_size;
1180 auto slot_ptr =
reinterpret_cast<int64_t*
>(row_ptr + key_bytes_with_padding);
1189 auto row_ptr =
buff_ + i * row_size;
1191 auto slot_ptr =
reinterpret_cast<int64_t*
>(row_ptr + key_bytes_with_padding);
1208 CHECK_EQ(slot_count + key_count, entry.size());
1209 auto this_buff =
reinterpret_cast<int64_t*
>(
buff_);
1211 for (
size_t i = 0; i < key_count; i++) {
1213 this_buff[key_offset] = entry[i];
1218 this_buff[slot_offset] = entry[key_count + i];
1224 auto this_buff =
reinterpret_cast<int64_t*
>(
buff_);
1226 for (
size_t key_idx = 0; key_idx < key_count; ++key_idx) {
1227 const auto first_key_off =
1233 for (
size_t target_idx = 0; target_idx <
target_init_vals_.size(); ++target_idx) {
1234 const auto first_val_off =
1245 size_t slot_off = 0;
1257 #define AGGREGATE_ONE_VALUE( \
1258 agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1260 const auto sql_type = get_compact_type(agg_info__); \
1261 if (sql_type.is_fp()) { \
1262 if (chosen_bytes__ == sizeof(float)) { \
1263 agg_##agg_kind__##_float(reinterpret_cast<int32_t*>(val_ptr__), \
1264 *reinterpret_cast<const float*>(other_ptr__)); \
1266 agg_##agg_kind__##_double(reinterpret_cast<int64_t*>(val_ptr__), \
1267 *reinterpret_cast<const double*>(other_ptr__)); \
1270 if (chosen_bytes__ == sizeof(int32_t)) { \
1271 auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1272 auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1273 agg_##agg_kind__##_int32(val_ptr, *other_ptr); \
1275 auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1276 auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1277 agg_##agg_kind__(val_ptr, *other_ptr); \
1282 #define AGGREGATE_ONE_NULLABLE_VALUE( \
1283 agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1285 if (agg_info__.skip_null_val) { \
1286 const auto sql_type = get_compact_type(agg_info__); \
1287 if (sql_type.is_fp()) { \
1288 if (chosen_bytes__ == sizeof(float)) { \
1289 agg_##agg_kind__##_float_skip_val( \
1290 reinterpret_cast<int32_t*>(val_ptr__), \
1291 *reinterpret_cast<const float*>(other_ptr__), \
1292 *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1294 agg_##agg_kind__##_double_skip_val( \
1295 reinterpret_cast<int64_t*>(val_ptr__), \
1296 *reinterpret_cast<const double*>(other_ptr__), \
1297 *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1300 if (chosen_bytes__ == sizeof(int32_t)) { \
1301 int32_t* val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1302 const int32_t* other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1303 const auto null_val = static_cast<int32_t>(init_val__); \
1304 agg_##agg_kind__##_int32_skip_val(val_ptr, *other_ptr, null_val); \
1306 int64_t* val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1307 const int64_t* other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1308 const auto null_val = static_cast<int64_t>(init_val__); \
1309 agg_##agg_kind__##_skip_val(val_ptr, *other_ptr, null_val); \
1313 AGGREGATE_ONE_VALUE( \
1314 agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1318 #define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__) \
1320 if (chosen_bytes__ == sizeof(int32_t)) { \
1321 auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1322 auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1323 agg_sum_int32(val_ptr, *other_ptr); \
1325 auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1326 auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1327 agg_sum(val_ptr, *other_ptr); \
1331 #define AGGREGATE_ONE_NULLABLE_COUNT( \
1332 val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1334 if (agg_info__.skip_null_val) { \
1335 const auto sql_type = get_compact_type(agg_info__); \
1336 if (sql_type.is_fp()) { \
1337 if (chosen_bytes__ == sizeof(float)) { \
1338 agg_sum_float_skip_val( \
1339 reinterpret_cast<int32_t*>(val_ptr__), \
1340 *reinterpret_cast<const float*>(other_ptr__), \
1341 *reinterpret_cast<const float*>(may_alias_ptr(&init_val__))); \
1343 agg_sum_double_skip_val( \
1344 reinterpret_cast<int64_t*>(val_ptr__), \
1345 *reinterpret_cast<const double*>(other_ptr__), \
1346 *reinterpret_cast<const double*>(may_alias_ptr(&init_val__))); \
1349 if (chosen_bytes__ == sizeof(int32_t)) { \
1350 auto val_ptr = reinterpret_cast<int32_t*>(val_ptr__); \
1351 auto other_ptr = reinterpret_cast<const int32_t*>(other_ptr__); \
1352 const auto null_val = static_cast<int32_t>(init_val__); \
1353 agg_sum_int32_skip_val(val_ptr, *other_ptr, null_val); \
1355 auto val_ptr = reinterpret_cast<int64_t*>(val_ptr__); \
1356 auto other_ptr = reinterpret_cast<const int64_t*>(other_ptr__); \
1357 const auto null_val = static_cast<int64_t>(init_val__); \
1358 agg_sum_skip_val(val_ptr, *other_ptr, null_val); \
1362 AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__); \
1367 #define AGGREGATE_ONE_VALUE_SMALL( \
1368 agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__) \
1370 if (chosen_bytes__ == sizeof(int16_t)) { \
1371 auto val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1372 auto other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1373 agg_##agg_kind__##_int16(val_ptr, *other_ptr); \
1374 } else if (chosen_bytes__ == sizeof(int8_t)) { \
1375 auto val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1376 auto other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1377 agg_##agg_kind__##_int8(val_ptr, *other_ptr); \
1384 #define AGGREGATE_ONE_NULLABLE_VALUE_SMALL( \
1385 agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__) \
1387 if (agg_info__.skip_null_val) { \
1388 if (chosen_bytes__ == sizeof(int16_t)) { \
1389 int16_t* val_ptr = reinterpret_cast<int16_t*>(val_ptr__); \
1390 const int16_t* other_ptr = reinterpret_cast<const int16_t*>(other_ptr__); \
1391 const auto null_val = static_cast<int16_t>(init_val__); \
1392 agg_##agg_kind__##_int16_skip_val(val_ptr, *other_ptr, null_val); \
1393 } else if (chosen_bytes == sizeof(int8_t)) { \
1394 int8_t* val_ptr = reinterpret_cast<int8_t*>(val_ptr__); \
1395 const int8_t* other_ptr = reinterpret_cast<const int8_t*>(other_ptr__); \
1396 const auto null_val = static_cast<int8_t>(init_val__); \
1397 agg_##agg_kind__##_int8_skip_val(val_ptr, *other_ptr, null_val); \
1400 AGGREGATE_ONE_VALUE_SMALL( \
1401 agg_kind__, val_ptr__, other_ptr__, chosen_bytes__, agg_info__); \
1406 const bool float_argument_input,
1408 if (float_argument_input) {
1409 return sizeof(float);
1416 const size_t target_slot_idx,
1417 const size_t init_agg_val_idx,
1418 const int8_t* that_ptr1)
const {
1424 auto reduce = [&](
auto const& size_tag) {
1425 using CastTarget = std::decay_t<decltype(size_tag)>;
1426 const auto lhs_proj_col = *
reinterpret_cast<const CastTarget*
>(this_ptr1);
1427 const auto rhs_proj_col = *
reinterpret_cast<const CastTarget*
>(that_ptr1);
1428 if (rhs_proj_col == init_val) {
1430 }
else if (lhs_proj_col == init_val) {
1431 *
reinterpret_cast<CastTarget*
>(this_ptr1) = rhs_proj_col;
1432 }
else if (lhs_proj_col != rhs_proj_col) {
1433 throw std::runtime_error(
"Multiple distinct values encountered");
1437 switch (chosen_bytes) {
1458 LOG(
FATAL) <<
"Invalid slot width: " << chosen_bytes;
1465 const int8_t* that_ptr1,
1466 const int8_t* that_ptr2,
1468 const size_t target_logical_idx,
1469 const size_t target_slot_idx,
1470 const size_t init_agg_val_idx,
1472 const size_t first_slot_idx_for_target,
1473 const std::vector<std::string>& serialized_varlen_buffer)
const {
1487 this_ptr1, target_info, target_logical_idx, init_agg_val_idx, that_ptr1);
1493 CHECK_EQ(static_cast<size_t>(chosen_bytes),
sizeof(int64_t));
1510 sum, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1514 if (static_cast<size_t>(chosen_bytes) <=
sizeof(int16_t)) {
1516 min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1519 min, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1524 if (static_cast<size_t>(chosen_bytes) <=
sizeof(int16_t)) {
1526 max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1529 max, this_ptr1, that_ptr1, init_val, chosen_bytes, target_info);
1534 CHECK_EQ(static_cast<int8_t>(
sizeof(int64_t)), chosen_bytes);
1541 switch (chosen_bytes) {
1544 const auto rhs_proj_col = *
reinterpret_cast<const int8_t*
>(that_ptr1);
1545 if (rhs_proj_col != init_val) {
1546 *
reinterpret_cast<int8_t*
>(this_ptr1) = rhs_proj_col;
1552 const auto rhs_proj_col = *
reinterpret_cast<const int16_t*
>(that_ptr1);
1553 if (rhs_proj_col != init_val) {
1554 *
reinterpret_cast<int16_t*
>(this_ptr1) = rhs_proj_col;
1561 const auto rhs_proj_col = *
reinterpret_cast<const int32_t*
>(that_ptr1);
1562 if (rhs_proj_col != init_val) {
1563 *
reinterpret_cast<int32_t*
>(this_ptr1) = rhs_proj_col;
1568 auto rhs_proj_col = *
reinterpret_cast<const int64_t*
>(that_ptr1);
1570 !serialized_varlen_buffer.empty()) {
1571 size_t length_to_elems{0};
1574 length_to_elems = target_slot_idx == first_slot_idx_for_target ? 1 : 4;
1580 CHECK_LT(static_cast<size_t>(rhs_proj_col), serialized_varlen_buffer.size());
1581 const auto& varlen_bytes_str = serialized_varlen_buffer[rhs_proj_col];
1582 const auto str_ptr =
reinterpret_cast<const int8_t*
>(varlen_bytes_str.c_str());
1583 *
reinterpret_cast<int64_t*
>(this_ptr1) =
1584 reinterpret_cast<const int64_t>(str_ptr);
1585 *
reinterpret_cast<int64_t*
>(this_ptr2) =
1586 static_cast<int64_t>(varlen_bytes_str.size() / length_to_elems);
1588 if (rhs_proj_col != init_val) {
1589 *
reinterpret_cast<int64_t*
>(this_ptr1) = rhs_proj_col;
1592 CHECK(this_ptr2 && that_ptr2);
1593 *
reinterpret_cast<int64_t*
>(this_ptr2) =
1594 *reinterpret_cast<const int64_t*>(that_ptr2);
1601 LOG(
FATAL) <<
"Invalid slot width: " << chosen_bytes;
1607 const int8_t* that_ptr1,
1608 const size_t target_logical_idx,
1613 CHECK(incoming) <<
"this_ptr1=" << (
void*)this_ptr1
1614 <<
", that_ptr1=" << (
void const*)that_ptr1
1615 <<
", target_logical_idx=" << target_logical_idx;
1616 if (incoming->centroids().capacity()) {
1618 CHECK(accumulator) <<
"this_ptr1=" << (
void*)this_ptr1
1619 <<
", that_ptr1=" << (
void const*)that_ptr1
1620 <<
", target_logical_idx=" << target_logical_idx;
1621 accumulator->allocate();
1622 accumulator->mergeTDigest(*incoming);
1627 const int8_t* that_ptr1,
1628 const size_t target_logical_idx,
1631 const auto& old_count_distinct_desc =
1634 const auto& new_count_distinct_desc =
1636 CHECK(old_count_distinct_desc.impl_type_ == new_count_distinct_desc.impl_type_);
1637 CHECK(this_ptr1 && that_ptr1);
1638 auto old_set_ptr =
reinterpret_cast<const int64_t*
>(this_ptr1);
1639 auto new_set_ptr =
reinterpret_cast<const int64_t*
>(that_ptr1);
1641 *new_set_ptr, *old_set_ptr, new_count_distinct_desc, old_count_distinct_desc);
1645 const int8_t warp_count,
1646 const bool is_columnar,
1647 const bool replace_bitmap_ptr_with_bitmap_sz,
1648 std::vector<int64_t>& agg_vals,
1650 const std::vector<TargetInfo>& targets,
1651 const std::vector<int64_t>& agg_init_vals) {
1652 const size_t agg_col_count{agg_vals.size()};
1653 const auto row_size = query_mem_desc.
getRowSize();
1655 CHECK_GE(agg_col_count, targets.size());
1658 std::vector<int64_t> partial_agg_vals(agg_col_count, 0);
1659 bool discard_row =
true;
1660 for (int8_t warp_idx = 0; warp_idx < warp_count; ++warp_idx) {
1661 bool discard_partial_result =
true;
1662 for (
size_t target_idx = 0, agg_col_idx = 0;
1663 target_idx < targets.size() && agg_col_idx < agg_col_count;
1664 ++target_idx, ++agg_col_idx) {
1665 const auto& agg_info = targets[target_idx];
1667 const auto chosen_bytes = float_argument_input
1672 partial_agg_vals[agg_col_idx] = partial_bin_val;
1675 CHECK(agg_info.is_agg && (agg_info.agg_kind ==
kCOUNT ||
1679 if (replace_bitmap_ptr_with_bitmap_sz) {
1680 partial_agg_vals[agg_col_idx] = partial_bin_val;
1683 if (
kAVG == agg_info.agg_kind) {
1684 CHECK(agg_info.is_agg && !agg_info.is_distinct);
1686 partial_bin_val = partial_agg_vals[agg_col_idx] =
1692 CHECK(agg_info.is_agg);
1693 discard_partial_result =
false;
1696 row_ptr += row_size;
1697 if (discard_partial_result) {
1700 discard_row =
false;
1701 for (
size_t target_idx = 0, agg_col_idx = 0;
1702 target_idx < targets.size() && agg_col_idx < agg_col_count;
1703 ++target_idx, ++agg_col_idx) {
1704 auto partial_bin_val = partial_agg_vals[agg_col_idx];
1705 const auto& agg_info = targets[target_idx];
1707 const auto chosen_bytes = float_argument_input
1711 if (agg_info.is_agg && agg_info.agg_kind !=
kSAMPLE) {
1713 switch (agg_info.agg_kind) {
1717 reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1718 reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1719 agg_init_vals[agg_col_idx],
1727 reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx + 1]),
1728 reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx + 1]),
1734 reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1735 reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1736 agg_init_vals[agg_col_idx],
1741 if (static_cast<size_t>(chosen_bytes) <=
sizeof(int16_t)) {
1744 reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1745 reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1746 agg_init_vals[agg_col_idx],
1752 reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1753 reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1754 agg_init_vals[agg_col_idx],
1760 if (static_cast<size_t>(chosen_bytes) <=
sizeof(int16_t)) {
1763 reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1764 reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1765 agg_init_vals[agg_col_idx],
1771 reinterpret_cast<int8_t*>(&agg_vals[agg_col_idx]),
1772 reinterpret_cast<int8_t*>(&partial_agg_vals[agg_col_idx]),
1773 agg_init_vals[agg_col_idx],
1782 }
catch (std::runtime_error& e) {
1786 if (chosen_type.is_integer() || chosen_type.is_decimal()) {
1787 switch (chosen_bytes) {
1791 int32_t ret = *
reinterpret_cast<const int32_t*
>(&agg_vals[agg_col_idx]);
1792 if (!(agg_info.agg_kind ==
kCOUNT && ret != agg_init_vals[agg_col_idx])) {
1793 agg_vals[agg_col_idx] =
static_cast<int64_t
>(ret);
1801 if (
kAVG == agg_info.agg_kind) {
1805 if (agg_info.agg_kind ==
kSAMPLE) {
1806 CHECK(!agg_info.sql_type.is_varlen())
1807 <<
"Interleaved bins reduction not supported for variable length "
1811 if (agg_vals[agg_col_idx]) {
1812 if (agg_info.agg_kind ==
kSAMPLE) {
1815 CHECK_EQ(agg_vals[agg_col_idx], partial_bin_val);
1817 agg_vals[agg_col_idx] = partial_bin_val;
GroupValueInfo get_group_value_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const size_t key_width, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
size_t slot_offset_rowwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t slot_count)
size_t getSlotCount() const
void count_distinct_set_union(const int64_t new_set_handle, const int64_t old_set_handle, const CountDistinctDescriptor &new_count_distinct_desc, const CountDistinctDescriptor &old_count_distinct_desc)
size_t getEntryCount() const
__device__ bool dynamic_watchdog()
void moveOneEntryToBuffer(const size_t entry_index, int64_t *new_buff_i64, const size_t new_entry_count, const size_t key_count, const size_t row_qw_count, const int64_t *src_buff, const size_t key_byte_width) const
bool isEmptyEntry(const size_t entry_idx, const int8_t *buff) const
static const int32_t ERR_INTERRUPTED
const std::vector< TargetInfo > targets_
ALWAYS_INLINE void reduceOneSlot(int8_t *this_ptr1, int8_t *this_ptr2, const int8_t *that_ptr1, const int8_t *that_ptr2, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that, const size_t first_slot_idx_for_target, const std::vector< std::string > &serialized_varlen_buffer) const
std::vector< int64_t > target_init_vals_
void initializeColWise() const
size_t slot_offset_colwise(const size_t entry_idx, const size_t slot_idx, const size_t key_count, const size_t entry_count)
void reduceOneEntryBaseline(int8_t *this_buff, const int8_t *that_buff, const size_t i, const size_t that_entry_count, const ResultSetStorage &that) const
int64_t getTargetGroupbyIndex(const size_t target_idx) const
bool isEmptyEntryColumnar(const size_t entry_idx, const int8_t *buff) const
int64_t get_component(const int8_t *group_by_buffer, const size_t comp_sz, const size_t index=0)
std::unique_ptr< Function > ir_reduce_loop
bool isLogicalSizedColumnsAllowed() const
T advance_to_next_columnar_target_buff(T target_ptr, const QueryMemoryDescriptor &query_mem_desc, const size_t target_slot_idx)
std::shared_ptr< ResultSet > rs_
GroupValueInfo get_matching_group_value_reduction(int64_t *groups_buffer, const uint32_t h, const T *key, const uint32_t key_count, const QueryMemoryDescriptor &query_mem_desc, const int64_t *that_buff_i64, const size_t that_entry_idx, const size_t that_entry_count, const uint32_t row_size_quad)
ALWAYS_INLINE void check_watchdog_with_seed(const size_t sample_seed)
void initializeStorage() const
bool hasKeylessHash() const
void fill_slots(int64_t *dst_entry, const size_t dst_entry_count, const int64_t *src_buff, const size_t src_entry_idx, const size_t src_entry_count, const QueryMemoryDescriptor &query_mem_desc)
void initializeRowWise() const
void reduceOneApproxQuantileSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
void initializeBaselineValueSlots(int64_t *this_entry_slots) const
GroupValueInfo get_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_slot_off_quad(const QueryMemoryDescriptor &query_mem_desc)
size_t getEffectiveKeyWidth() const
ALWAYS_INLINE void fill_empty_key_32(int32_t *key_ptr_i32, const size_t key_count)
void reduceOneSlotBaseline(int64_t *this_buff, const size_t this_slot, const int64_t *that_buff, const size_t that_entry_count, const size_t that_slot, const TargetInfo &target_info, const size_t target_logical_idx, const size_t target_slot_idx, const size_t init_agg_val_idx, const ResultSetStorage &that) const
bool g_enable_dynamic_watchdog
bool takes_float_argument(const TargetInfo &target_info)
bool g_enable_non_kernel_time_query_interrupt
std::vector< int64_t > make_key(const int64_t *buff, const size_t entry_count, const size_t key_count)
T advance_target_ptr_row_wise(T target_ptr, const TargetInfo &target_info, const size_t slot_idx, const QueryMemoryDescriptor &query_mem_desc, const bool separate_varlen_storage)
void rewriteVarlenAggregates(ResultSet *)
size_t getRowSize() const
size_t getColOnlyOffInBytes(const size_t col_idx) const
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
const SQLTypeInfo get_compact_type(const TargetInfo &target)
int8_t get_width_for_slot(const size_t target_slot_idx, const bool float_argument_input, const QueryMemoryDescriptor &query_mem_desc)
int8_t groupColWidth(const size_t key_idx) const
void reduceOneEntrySlotsBaseline(int64_t *this_entry_slots, const int64_t *that_buff, const size_t that_entry_idx, const size_t that_entry_count, const ResultSetStorage &that) const
future< Result > async(Fn &&fn, Args &&...args)
size_t advance_slot(const size_t j, const TargetInfo &target_info, const bool separate_varlen_storage)
void copyKeyColWise(const size_t entry_idx, int8_t *this_buff, const int8_t *that_buff) const
int64_t count_distinct_set_size(const int64_t set_handle, const CountDistinctDescriptor &count_distinct_desc)
void init(LogOptions const &log_opts)
void reduceOneCountDistinctSlot(int8_t *this_ptr1, const int8_t *that_ptr1, const size_t target_logical_idx, const ResultSetStorage &that) const
size_t getGroupbyColCount() const
void moveEntriesToBuffer(int8_t *new_buff, const size_t new_entry_count) const
void fill_empty_key(void *key_ptr, const size_t key_count, const size_t key_width)
#define store_cst(ptr, val)
size_t targetGroupbyIndicesSize() const
ALWAYS_INLINE void check_watchdog()
std::pair< int64_t *, bool > GroupValueInfo
DEVICE auto accumulate(ARGS &&...args)
std::shared_ptr< ResultSet > getOwnResultSet()
void fillOneEntryColWise(const std::vector< int64_t > &entry)
bool is_distinct_target(const TargetInfo &target_info)
static EvalValue MakeEvalValue(const T &val)
GroupValueInfo get_matching_group_value_columnar_reduction(int64_t *groups_buffer, const uint32_t h, const int64_t *key, const uint32_t key_qw_count, const size_t entry_count)
const int8_t getPaddedSlotWidthBytes(const size_t slot_idx) const
static const int32_t ERR_SINGLE_VALUE_FOUND_MULTIPLE_VALUES
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_count, const uint32_t key_width, const uint32_t row_size_quad)
std::string toString(const Executor::ExtModuleKinds &kind)
void reduce(const ResultSetStorage &that, const std::vector< std::string > &serialized_varlen_buffer, const ReductionCode &reduction_code, const size_t executor_id) const
T row_ptr_rowwise(T buff, const QueryMemoryDescriptor &query_mem_desc, const size_t entry_idx)
#define mapd_cas(address, compare, val)
size_t getCountDistinctDescriptorsSize() const
QueryDescriptionType getQueryDescriptionType() const
#define cas_cst(ptr, expected, desired)
size_t key_offset_colwise(const size_t entry_idx, const size_t key_idx, const size_t entry_count)
virtual ReductionCode codegen() const
const CountDistinctDescriptor & getCountDistinctDescriptor(const size_t idx) const
void reduceEntriesNoCollisionsColWise(int8_t *this_buff, const int8_t *that_buff, const ResultSetStorage &that, const size_t start_index, const size_t end_index, const std::vector< std::string > &serialized_varlen_buffer, const size_t executor_id) const
#define AGGREGATE_ONE_NULLABLE_VALUE(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t get_row_bytes(const QueryMemoryDescriptor &query_mem_desc)
ALWAYS_INLINE void reduceOneSlotSingleValue(int8_t *this_ptr1, const TargetInfo &target_info, const size_t target_slot_idx, const size_t init_agg_val_idx, const int8_t *that_ptr1) const
#define AGGREGATE_ONE_COUNT(val_ptr__, other_ptr__, chosen_bytes__)
#define AGGREGATE_ONE_NULLABLE_COUNT(val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
size_t key_offset_rowwise(const size_t entry_idx, const size_t key_count, const size_t slot_count)
bool use_multithreaded_reduction(const size_t entry_count)
bool didOutputColumnar() const
RUNTIME_EXPORT ALWAYS_INLINE DEVICE uint32_t key_hash(const int64_t *key, const uint32_t key_count, const uint32_t key_byte_width)
const ColSlotContext & getColSlotContext() const
static EvalValue run(const size_t execution_id, const Function *function, const std::vector< EvalValue > &inputs)
void fillOneEntryRowWise(const std::vector< int64_t > &entry)
Basic constructors and methods of the row set interface.
size_t get_row_qw_count(const QueryMemoryDescriptor &query_mem_desc)
#define AGGREGATE_ONE_NULLABLE_VALUE_SMALL(agg_kind__, val_ptr__, other_ptr__, init_val__, chosen_bytes__, agg_info__)
void rewriteAggregateBufferOffsets(const std::vector< std::string > &serialized_varlen_buffer) const
ALWAYS_INLINE void fill_empty_key_64(int64_t *key_ptr_i64, const size_t key_count)
void run_reduction_code(const size_t executor_id, const ReductionCode &reduction_code, int8_t *this_buff, const int8_t *that_buff, const int32_t start_entry_index, const int32_t end_entry_index, const int32_t that_entry_count, const void *this_qmd, const void *that_qmd, const void *serialized_varlen_buffer)
SQLTypeInfo get_elem_type() const
size_t getBufferColSlotCount() const
T get_cols_ptr(T buff, const QueryMemoryDescriptor &query_mem_desc)
static bool reduceSingleRow(const int8_t *row_ptr, const int8_t warp_count, const bool is_columnar, const bool replace_bitmap_ptr_with_bitmap_sz, std::vector< int64_t > &agg_vals, const QueryMemoryDescriptor &query_mem_desc, const std::vector< TargetInfo > &targets, const std::vector< int64_t > &agg_init_vals)
RUNTIME_EXPORT NEVER_INLINE DEVICE int64_t * get_group_value_columnar(int64_t *groups_buffer, const uint32_t groups_buffer_entry_count, const int64_t *key, const uint32_t key_qw_count)
size_t get_key_bytes_rowwise(const QueryMemoryDescriptor &query_mem_desc)
FORCE_INLINE HOST DEVICE T align_to_int64(T addr)
ResultSet * reduce(std::vector< ResultSet * > &, const size_t executor_id)
QueryMemoryDescriptor query_mem_desc_
int32_t getTargetIdxForKey() const
size_t getPrependedGroupColOffInBytes(const size_t group_idx) const