45 const ResultSet& rows,
46 const std::vector<SQLTypeInfo>& target_types) {
47 std::vector<size_t> padded_target_sizes;
52 if (!rows.hasValidBuffer() ||
53 rows.getQueryMemDesc().getColCount() < target_types.size()) {
54 for (
const auto& target_type : target_types) {
55 padded_target_sizes.emplace_back(target_type.get_size());
57 return padded_target_sizes;
61 const auto col_context = rows.getQueryMemDesc().getColSlotContext();
62 for (
size_t col_idx = 0; col_idx < target_types.size(); col_idx++) {
65 const auto idx = col_context.getSlotsForCol(col_idx).front();
66 const size_t padded_slot_width =
67 static_cast<size_t>(rows.getPaddedSlotWidthBytes(idx));
68 padded_target_sizes.emplace_back(
69 padded_slot_width == 0UL ? target_types[col_idx].get_size() : padded_slot_width);
71 return padded_target_sizes;
77 const ResultSet& rows,
78 const size_t num_columns,
79 const std::vector<SQLTypeInfo>& target_types,
80 const size_t executor_id,
81 const size_t thread_idx,
82 const bool is_parallel_execution_enforced)
83 : column_buffers_(num_columns)
85 rows.isDirectColumnarConversionPossible()
88 , target_types_(target_types)
89 , parallel_conversion_(is_parallel_execution_enforced
92 , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
99 for (
size_t i = 0; i < num_columns; ++i) {
100 const bool is_varlen = target_types[i].is_array() ||
101 (target_types[i].is_string() &&
103 target_types[i].is_geometry();
109 !rows.isZeroCopyColumnarConversionPossible(i)) {
123 const int8_t* one_col_buffer,
124 const size_t num_rows,
126 const size_t executor_id,
127 const size_t thread_idx)
129 , num_rows_(num_rows)
130 , target_types_{target_type}
131 , parallel_conversion_(
false)
132 , direct_columnar_conversion_(
false)
135 const bool is_varlen =
136 target_type.is_array() ||
137 (target_type.is_string() && target_type.get_compression() ==
kENCODING_NONE) ||
138 target_type.is_geometry();
143 padded_target_sizes_.emplace_back(target_type.get_size());
145 const auto buf_size = num_rows * target_type.get_size();
147 reinterpret_cast<int8_t*
>(row_set_mem_owner->allocate(buf_size,
thread_idx_));
148 memcpy(((
void*)column_buffers_[0]), one_col_buffer, buf_size);
152 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
153 const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
154 if (sub_results.empty()) {
161 [](
const size_t init,
const std::unique_ptr<ColumnarResults>&
result) {
162 return init +
result->size();
164 std::unique_ptr<ColumnarResults> merged_results(
168 const auto col_count = sub_results[0]->column_buffers_.size();
169 const auto nonempty_it = std::find_if(
172 [](
const std::unique_ptr<ColumnarResults>& needle) {
return needle->size(); });
173 if (nonempty_it == sub_results.end()) {
176 for (
size_t col_idx = 0; col_idx < col_count; ++col_idx) {
177 const auto byte_width = merged_results->padded_target_sizes_[col_idx];
178 auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
179 merged_results->column_buffers_.push_back(write_ptr);
180 for (
auto& rs : sub_results) {
181 CHECK_EQ(col_count, rs->column_buffers_.size());
185 CHECK_EQ(byte_width, rs->padded_target_sizes_[col_idx]);
186 memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
187 write_ptr += rs->size() * byte_width;
190 return merged_results;
198 const size_t num_columns) {
199 std::atomic<size_t> row_idx{0};
202 std::vector<std::future<void>> conversion_threads;
203 const auto do_work = [num_columns, &rows, &row_idx,
this](
const size_t i) {
204 const auto crt_row = rows.getRowAtNoTranslations(i);
205 if (!crt_row.empty()) {
206 auto cur_row_idx = row_idx.fetch_add(1);
207 for (
size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
212 for (
auto interval :
makeIntervals(
size_t(0), rows.entryCount(), worker_count)) {
215 [&do_work,
this](
const size_t start,
const size_t end) {
217 size_t local_idx = 0;
218 for (
size_t i = start; i < end; ++i, ++local_idx) {
219 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
220 executor_->checkNonKernelTimeInterrupted())) {
226 for (
size_t i = start; i < end; ++i) {
236 for (
auto& child : conversion_threads) {
253 const auto do_work = [num_columns, &row_idx, &rows, &done,
this]() {
254 const auto crt_row = rows.getNextRow(
false,
false);
255 if (crt_row.empty()) {
259 for (
size_t i = 0; i < num_columns; ++i) {
266 if (
UNLIKELY((row_idx & 0xFFFF) == 0 &&
267 executor_->checkNonKernelTimeInterrupted())) {
290 const size_t row_idx,
291 const size_t column_idx) {
292 const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
293 CHECK(scalar_col_val);
294 auto i64_p = boost::get<int64_t>(scalar_col_val);
300 ((int8_t*)
column_buffers_[column_idx])[row_idx] =
static_cast<int8_t
>(val);
303 ((int16_t*)
column_buffers_[column_idx])[row_idx] =
static_cast<int16_t
>(val);
306 ((int32_t*)
column_buffers_[column_idx])[row_idx] =
static_cast<int32_t
>(val);
318 auto float_p = boost::get<float>(scalar_col_val);
319 ((
float*)
column_buffers_[column_idx])[row_idx] =
static_cast<float>(*float_p);
323 auto double_p = boost::get<double>(scalar_col_val);
324 ((
double*)
column_buffers_[column_idx])[row_idx] =
static_cast<double>(*double_p);
338 template <
typename DATA_TYPE>
340 const size_t input_buffer_entry_idx,
341 const size_t output_buffer_entry_idx,
342 const size_t target_idx,
343 const size_t slot_idx,
346 read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
348 reinterpret_cast<DATA_TYPE*
>(
column_buffers_[target_idx])[output_buffer_entry_idx] =
353 void ColumnarResults::writeBackCellDirect<float>(
const ResultSet& rows,
354 const size_t input_buffer_entry_idx,
355 const size_t output_buffer_entry_idx,
356 const size_t target_idx,
357 const size_t slot_idx,
360 read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
361 const float fval = *
reinterpret_cast<const float*
>(may_alias_ptr(&ival));
362 reinterpret_cast<float*
>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
366 void ColumnarResults::writeBackCellDirect<double>(
367 const ResultSet& rows,
368 const size_t input_buffer_entry_idx,
369 const size_t output_buffer_entry_idx,
370 const size_t target_idx,
371 const size_t slot_idx,
374 read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
375 const double dval = *
reinterpret_cast<const double*
>(may_alias_ptr(&ival));
376 reinterpret_cast<double*
>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
389 const size_t num_columns) {
391 switch (rows.getQueryDescriptionType()) {
407 <<
"Direct columnar conversion for this query type is not supported yet.";
419 const size_t num_columns) {
420 CHECK(rows.query_mem_desc_.didOutputColumnar());
422 (rows.query_mem_desc_.getQueryDescriptionType() ==
425 const auto& lazy_fetch_info = rows.getLazyFetchInfo();
435 const size_t num_columns) {
436 CHECK(rows.query_mem_desc_.didOutputColumnar());
438 (rows.query_mem_desc_.getQueryDescriptionType() ==
441 const auto& lazy_fetch_info = rows.getLazyFetchInfo();
443 for (
const auto& col_lazy_fetch_info : lazy_fetch_info) {
444 CHECK(!col_lazy_fetch_info.is_lazily_fetched);
457 const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
458 const ResultSet& rows,
459 const size_t num_columns) {
461 const auto is_column_non_lazily_fetched = [&lazy_fetch_info](
const size_t col_idx) {
463 if (lazy_fetch_info.empty()) {
466 return !lazy_fetch_info[col_idx].is_lazily_fetched;
471 std::vector<std::future<void>> direct_copy_threads;
472 for (
size_t col_idx = 0; col_idx < num_columns; col_idx++) {
473 if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
475 column_buffers_[col_idx] =
const_cast<int8_t*
>(rows.getColumnarBuffer(col_idx));
476 }
else if (is_column_non_lazily_fetched(col_idx)) {
477 CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
481 [&rows,
this](
const size_t column_index) {
483 rows.copyColumnIntoBuffer(
490 for (
auto& child : direct_copy_threads) {
505 const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
506 const ResultSet& rows,
507 const size_t num_columns) {
509 CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
511 const auto do_work_just_lazy_columns = [num_columns, &rows,
this](
512 const size_t row_idx,
513 const std::vector<bool>& targets_to_skip) {
514 const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
515 for (
size_t i = 0; i < num_columns; ++i) {
516 if (!targets_to_skip.empty() && !targets_to_skip[i]) {
522 const auto contains_lazy_fetched_column =
523 [](
const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
524 for (
auto& col_info : lazy_fetch_info) {
525 if (col_info.is_lazily_fetched) {
533 const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
534 if (contains_lazy_fetched_column(lazy_fetch_info)) {
535 const size_t worker_count =
537 std::vector<std::future<void>> conversion_threads;
538 std::vector<bool> targets_to_skip;
539 if (skip_non_lazy_columns) {
540 CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
541 targets_to_skip.reserve(num_columns);
542 for (
size_t i = 0; i < num_columns; i++) {
544 targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
547 for (
auto interval :
makeIntervals(
size_t(0), rows.entryCount(), worker_count)) {
550 [&do_work_just_lazy_columns, &targets_to_skip,
this](
const size_t start,
553 size_t local_idx = 0;
554 for (
size_t i = start; i < end; ++i, ++local_idx) {
555 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
556 executor_->checkNonKernelTimeInterrupted())) {
559 do_work_just_lazy_columns(i, targets_to_skip);
562 for (
size_t i = start; i < end; ++i) {
563 do_work_just_lazy_columns(i, targets_to_skip);
572 for (
auto& child : conversion_threads) {
593 const size_t num_columns) {
599 const size_t entry_count = rows.entryCount();
600 const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
603 std::vector<size_t> non_empty_per_thread(num_threads,
609 rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
615 non_empty_per_thread,
629 std::vector<size_t>& non_empty_per_thread,
630 const size_t entry_count,
631 const size_t num_threads,
632 const size_t size_per_thread)
const {
636 CHECK_EQ(num_threads, non_empty_per_thread.size());
637 auto do_work = [&rows, &bitmap](
size_t& total_non_empty,
638 const size_t local_idx,
639 const size_t entry_idx,
640 const size_t thread_idx) {
641 if (!rows.isRowAtEmpty(entry_idx)) {
643 bitmap.
set(local_idx, thread_idx,
true);
646 auto locate_and_count_func =
647 [&do_work, &non_empty_per_thread,
this](
648 size_t start_index,
size_t end_index,
size_t thread_idx) {
649 size_t total_non_empty = 0;
650 size_t local_idx = 0;
652 for (
size_t entry_idx = start_index; entry_idx < end_index;
653 entry_idx++, local_idx++) {
654 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
655 executor_->checkNonKernelTimeInterrupted())) {
658 do_work(total_non_empty, local_idx, entry_idx, thread_idx);
661 for (
size_t entry_idx = start_index; entry_idx < end_index;
662 entry_idx++, local_idx++) {
663 do_work(total_non_empty, local_idx, entry_idx, thread_idx);
666 non_empty_per_thread[thread_idx] = total_non_empty;
669 std::vector<std::future<void>> conversion_threads;
670 for (
size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
671 const size_t start_entry = thread_idx * size_per_thread;
672 const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
678 for (
auto& child : conversion_threads) {
701 const ResultSet& rows,
703 const std::vector<size_t>& non_empty_per_thread,
704 const size_t num_columns,
705 const size_t entry_count,
706 const size_t num_threads,
707 const size_t size_per_thread) {
711 CHECK_EQ(num_threads, non_empty_per_thread.size());
714 std::vector<size_t> global_offsets(num_threads + 1, 0);
716 non_empty_per_thread.end(),
717 std::next(global_offsets.begin()));
719 const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
720 const auto [single_slot_targets_to_skip, num_single_slot_targets] =
721 rows.getSupportedSingleSlotTargetBitmap();
725 if (num_single_slot_targets < num_columns) {
728 non_empty_per_thread,
730 single_slot_targets_to_skip,
731 slot_idx_per_target_idx,
739 non_empty_per_thread,
741 slot_idx_per_target_idx,
756 const ResultSet& rows,
758 const std::vector<size_t>& non_empty_per_thread,
759 const std::vector<size_t>& global_offsets,
760 const std::vector<bool>& targets_to_skip,
761 const std::vector<size_t>& slot_idx_per_target_idx,
762 const size_t num_columns,
763 const size_t entry_count,
764 const size_t num_threads,
765 const size_t size_per_thread) {
770 const auto [write_functions, read_functions] =
772 CHECK_EQ(write_functions.size(), num_columns);
773 CHECK_EQ(read_functions.size(), num_columns);
774 auto do_work = [
this,
777 &slot_idx_per_target_idx,
781 &write_functions = write_functions,
782 &read_functions = read_functions](
size_t& non_empty_idx,
783 const size_t total_non_empty,
784 const size_t local_idx,
786 const size_t thread_idx,
787 const size_t end_idx) {
788 if (non_empty_idx >= total_non_empty) {
792 const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
793 if (bitmap.
get(local_idx, thread_idx)) {
795 const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
796 for (
size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
797 if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
798 writeBackCell(crt_row[column_idx], output_buffer_row_idx, column_idx);
803 for (
size_t column_idx = 0; column_idx < num_columns; column_idx++) {
804 if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
807 write_functions[column_idx](rows,
809 output_buffer_row_idx,
811 slot_idx_per_target_idx[column_idx],
812 read_functions[column_idx]);
818 auto compact_buffer_func = [&non_empty_per_thread, &do_work,
this](
819 const size_t start_index,
820 const size_t end_index,
821 const size_t thread_idx) {
822 const size_t total_non_empty = non_empty_per_thread[thread_idx];
823 size_t non_empty_idx = 0;
824 size_t local_idx = 0;
826 for (
size_t entry_idx = start_index; entry_idx < end_index;
827 entry_idx++, local_idx++) {
828 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
829 executor_->checkNonKernelTimeInterrupted())) {
833 non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
836 for (
size_t entry_idx = start_index; entry_idx < end_index;
837 entry_idx++, local_idx++) {
839 non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
844 std::vector<std::future<void>> compaction_threads;
845 for (
size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
846 const size_t start_entry = thread_idx * size_per_thread;
847 const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
853 for (
auto& child : compaction_threads) {
873 const ResultSet& rows,
875 const std::vector<size_t>& non_empty_per_thread,
876 const std::vector<size_t>& global_offsets,
877 const std::vector<size_t>& slot_idx_per_target_idx,
878 const size_t num_columns,
879 const size_t entry_count,
880 const size_t num_threads,
881 const size_t size_per_thread) {
886 const auto [write_functions, read_functions] =
888 CHECK_EQ(write_functions.size(), num_columns);
889 CHECK_EQ(read_functions.size(), num_columns);
890 auto do_work = [&rows,
894 &slot_idx_per_target_idx,
895 &write_functions = write_functions,
896 &read_functions = read_functions](
size_t& entry_idx,
897 size_t& non_empty_idx,
898 const size_t total_non_empty,
899 const size_t local_idx,
900 const size_t thread_idx,
901 const size_t end_idx) {
902 if (non_empty_idx >= total_non_empty) {
907 const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
908 if (bitmap.get(local_idx, thread_idx)) {
909 for (
size_t column_idx = 0; column_idx < num_columns; column_idx++) {
910 write_functions[column_idx](rows,
912 output_buffer_row_idx,
914 slot_idx_per_target_idx[column_idx],
915 read_functions[column_idx]);
920 auto compact_buffer_func = [&non_empty_per_thread, &do_work,
this](
921 const size_t start_index,
922 const size_t end_index,
923 const size_t thread_idx) {
924 const size_t total_non_empty = non_empty_per_thread[thread_idx];
925 size_t non_empty_idx = 0;
926 size_t local_idx = 0;
928 for (
size_t entry_idx = start_index; entry_idx < end_index;
929 entry_idx++, local_idx++) {
930 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
931 executor_->checkNonKernelTimeInterrupted())) {
935 entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
938 for (
size_t entry_idx = start_index; entry_idx < end_index;
939 entry_idx++, local_idx++) {
941 entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
946 std::vector<std::future<void>> compaction_threads;
947 for (
size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
948 const size_t start_entry = thread_idx * size_per_thread;
949 const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
955 for (
auto& child : compaction_threads) {
974 const ResultSet& rows,
975 const std::vector<bool>& targets_to_skip) {
980 std::vector<WriteFunction>
result;
983 for (
size_t target_idx = 0; target_idx <
target_types_.size(); target_idx++) {
984 if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
985 result.emplace_back([](
const ResultSet& rows,
986 const size_t input_buffer_entry_idx,
987 const size_t output_buffer_entry_idx,
988 const size_t target_idx,
989 const size_t slot_idx,
991 UNREACHABLE() <<
"Invalid write back function used.";
999 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
1001 std::placeholders::_1,
1002 std::placeholders::_2,
1003 std::placeholders::_3,
1004 std::placeholders::_4,
1005 std::placeholders::_5,
1006 std::placeholders::_6));
1009 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
1011 std::placeholders::_1,
1012 std::placeholders::_2,
1013 std::placeholders::_3,
1014 std::placeholders::_4,
1015 std::placeholders::_5,
1016 std::placeholders::_6));
1019 UNREACHABLE() <<
"Invalid target type encountered.";
1025 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
1027 std::placeholders::_1,
1028 std::placeholders::_2,
1029 std::placeholders::_3,
1030 std::placeholders::_4,
1031 std::placeholders::_5,
1032 std::placeholders::_6));
1035 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1037 std::placeholders::_1,
1038 std::placeholders::_2,
1039 std::placeholders::_3,
1040 std::placeholders::_4,
1041 std::placeholders::_5,
1042 std::placeholders::_6));
1045 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1047 std::placeholders::_1,
1048 std::placeholders::_2,
1049 std::placeholders::_3,
1050 std::placeholders::_4,
1051 std::placeholders::_5,
1052 std::placeholders::_6));
1055 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1057 std::placeholders::_1,
1058 std::placeholders::_2,
1059 std::placeholders::_3,
1060 std::placeholders::_4,
1061 std::placeholders::_5,
1062 std::placeholders::_6));
1065 UNREACHABLE() <<
"Invalid target type encountered.";
1076 const size_t input_buffer_entry_idx,
1077 const size_t target_idx,
1078 const size_t slot_idx) {
1079 UNREACHABLE() <<
"Invalid read function used, target should have been skipped.";
1080 return static_cast<int64_t
>(0);
1083 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1085 const size_t input_buffer_entry_idx,
1086 const size_t target_idx,
1087 const size_t slot_idx) {
1091 auto fval =
static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1092 input_buffer_entry_idx, target_idx, slot_idx));
1093 return *
reinterpret_cast<int32_t*
>(may_alias_ptr(&fval));
1096 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1098 const size_t input_buffer_entry_idx,
1099 const size_t target_idx,
1100 const size_t slot_idx) {
1101 return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1102 input_buffer_entry_idx, target_idx, slot_idx);
1105 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1107 const size_t input_buffer_entry_idx,
1108 const size_t target_idx,
1109 const size_t slot_idx) {
1110 return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1111 input_buffer_entry_idx, target_idx, slot_idx);
1114 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1116 const size_t input_buffer_entry_idx,
1117 const size_t target_idx,
1118 const size_t slot_idx) {
1119 return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1120 input_buffer_entry_idx, target_idx, slot_idx);
1123 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1125 const size_t input_buffer_entry_idx,
1126 const size_t target_idx,
1127 const size_t slot_idx) {
1128 return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1129 input_buffer_entry_idx, target_idx, slot_idx);
1132 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1134 const size_t input_buffer_entry_idx,
1135 const size_t target_idx,
1136 const size_t slot_idx) {
1137 auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1138 input_buffer_entry_idx, target_idx, slot_idx);
1139 return *
reinterpret_cast<int32_t*
>(may_alias_ptr(&fval));
1142 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1144 const size_t input_buffer_entry_idx,
1145 const size_t target_idx,
1146 const size_t slot_idx) {
1147 auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1148 input_buffer_entry_idx, target_idx, slot_idx);
1149 return *
reinterpret_cast<int64_t*
>(may_alias_ptr(&dval));
1160 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1162 const ResultSet& rows,
1163 const std::vector<size_t>& slot_idx_per_target_idx,
1164 const std::vector<bool>& targets_to_skip) {
1166 CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1167 CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1169 std::vector<ReadFunction> read_functions;
1172 for (
size_t target_idx = 0; target_idx <
target_types_.size(); target_idx++) {
1173 if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1181 if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1183 CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1185 CHECK_EQ(
size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1188 read_functions.emplace_back(
1189 read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1192 read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1196 <<
"Invalid data type encountered (BaselineHash, floating point key).";
1200 switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1202 read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1205 read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1209 <<
"Invalid data type encountered (BaselineHash, integer key).";
1216 switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1218 read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1221 read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1224 UNREACHABLE() <<
"Invalid data type encountered (floating point agg column).";
1228 switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1230 read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1233 read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1236 read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1239 read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1242 UNREACHABLE() <<
"Invalid data type encountered (integer agg column).";
1247 return read_functions;
1257 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1258 std::vector<ColumnarResults::ReadFunction>>
1260 const ResultSet& rows,
1261 const std::vector<size_t>& slot_idx_per_target_idx,
1262 const std::vector<bool>& targets_to_skip) {
1269 if (rows.didOutputColumnar()) {
1270 return std::make_tuple(
1271 std::move(write_functions),
1272 initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1273 rows, slot_idx_per_target_idx, targets_to_skip));
1275 return std::make_tuple(
1276 std::move(write_functions),
1277 initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1278 rows, slot_idx_per_target_idx, targets_to_skip));
1281 if (rows.didOutputColumnar()) {
1282 return std::make_tuple(
1283 std::move(write_functions),
1284 initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1285 rows, slot_idx_per_target_idx, targets_to_skip));
1287 return std::make_tuple(
1288 std::move(write_functions),
1289 initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1290 rows, slot_idx_per_target_idx, targets_to_skip));
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::vector< int8_t * > column_buffers_
int32_t getErrorCode() const
static const int32_t ERR_INTERRUPTED
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
std::vector< size_t > get_padded_target_sizes(const ResultSet &rows, const std::vector< SQLTypeInfo > &target_types)
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
bool g_enable_non_kernel_time_query_interrupt
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
bool use_parallel_algorithms(const ResultSet &rows)
future< Result > async(Fn &&fn, Args &&...args)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
bool isDirectColumnarConversionPossible() const
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
std::shared_ptr< Executor > executor_
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx)
std::vector< size_t > padded_target_sizes_
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
#define DEBUG_TIMER(name)
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
const std::vector< SQLTypeInfo > target_types_
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)