26 #include <tbb/parallel_reduce.h>
47 const ResultSet& rows,
48 const std::vector<SQLTypeInfo>& target_types) {
49 std::vector<size_t> padded_target_sizes;
54 if (!rows.hasValidBuffer() ||
55 rows.getQueryMemDesc().getColCount() < target_types.size()) {
56 for (
const auto& target_type : target_types) {
57 padded_target_sizes.emplace_back(target_type.get_size());
59 return padded_target_sizes;
63 const auto col_context = rows.getQueryMemDesc().getColSlotContext();
64 for (
size_t col_idx = 0; col_idx < target_types.size(); col_idx++) {
67 const auto idx = col_context.getSlotsForCol(col_idx).front();
68 const size_t padded_slot_width =
69 static_cast<size_t>(rows.getPaddedSlotWidthBytes(idx));
70 padded_target_sizes.emplace_back(
71 padded_slot_width == 0UL ? target_types[col_idx].get_size() : padded_slot_width);
73 return padded_target_sizes;
78 const auto array_col_val = boost::get<ArrayTargetValue>(&col_val);
80 const auto& vec = array_col_val->get();
83 for (
const auto& item : vec) {
84 offset +=
toBuffer(item, elem_type_info, buf + offset);
87 }
else if (type_info.
is_fp()) {
88 const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
91 auto float_p = boost::get<float>(scalar_col_val);
92 *((
float*)buf) =
static_cast<float>(*float_p);
96 auto double_p = boost::get<double>(scalar_col_val);
97 *((
double*)buf) =
static_cast<double>(*double_p);
104 const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
105 CHECK(scalar_col_val);
106 auto i64_p = boost::get<int64_t>(scalar_col_val);
110 *buf =
static_cast<int8_t
>(val);
113 *((int16_t*)buf) =
static_cast<int16_t
>(val);
116 *((int32_t*)buf) =
static_cast<int32_t
>(val);
119 *((int64_t*)buf) =
static_cast<int64_t
>(val);
130 tbb::blocked_range<int64_t>(0, rows.rowCount()),
131 static_cast<int64_t>(0),
132 [&](tbb::blocked_range<int64_t> r, int64_t running_count) {
133 for (
int i = r.begin(); i < r.end(); ++i) {
134 const auto crt_row = rows.getRowAtNoTranslations(i);
135 const auto arr_tv = boost::get<ArrayTargetValue>(&crt_row[column_idx]);
137 if (arr_tv->is_initialized()) {
138 const auto& vec = arr_tv->get();
139 running_count += vec.size();
142 return running_count;
144 std::plus<int64_t>());
150 const ResultSet& rows,
151 const size_t num_columns,
152 const std::vector<SQLTypeInfo>& target_types,
153 const size_t executor_id,
154 const size_t thread_idx,
155 const bool is_parallel_execution_enforced)
156 : column_buffers_(num_columns)
158 rows.isDirectColumnarConversionPossible()
161 , target_types_(target_types)
162 , parallel_conversion_(is_parallel_execution_enforced
165 , direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
173 for (
size_t i = 0; i < num_columns; ++i) {
174 const auto ti = target_types[i];
177 rows.isZeroCopyColumnarConversionPossible(i)) {
178 const int8_t* col_buf = rows.getColumnarBuffer(i);
182 const int64_t flatbuffer_size =
189 const bool is_varlen =
190 (ti.is_string() && ti.get_compression() ==
kENCODING_NONE) || ti.is_geometry();
195 !rows.isZeroCopyColumnarConversionPossible(i)) {
210 const int8_t* one_col_buffer,
211 const size_t num_rows,
213 const size_t executor_id,
214 const size_t thread_idx)
216 , num_rows_(num_rows)
217 , target_types_{target_type}
218 , parallel_conversion_(
false)
219 , direct_columnar_conversion_(
false)
222 const bool is_varlen =
223 target_type.is_array() ||
224 (target_type.is_string() && target_type.get_compression() ==
kENCODING_NONE) ||
225 target_type.is_geometry();
230 padded_target_sizes_.emplace_back(target_type.get_size());
232 const auto buf_size = num_rows * target_type.get_size();
234 reinterpret_cast<int8_t*
>(row_set_mem_owner->allocate(buf_size,
thread_idx_));
235 memcpy(((
void*)column_buffers_[0]), one_col_buffer, buf_size);
239 std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
240 const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
241 if (sub_results.empty()) {
248 [](
const size_t init,
const std::unique_ptr<ColumnarResults>&
result) {
249 return init +
result->size();
251 std::unique_ptr<ColumnarResults> merged_results(
255 const auto col_count = sub_results[0]->column_buffers_.size();
256 const auto nonempty_it = std::find_if(
259 [](
const std::unique_ptr<ColumnarResults>& needle) {
return needle->size(); });
260 if (nonempty_it == sub_results.end()) {
263 for (
size_t col_idx = 0; col_idx < col_count; ++col_idx) {
264 const auto byte_width = merged_results->padded_target_sizes_[col_idx];
265 auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
266 merged_results->column_buffers_.push_back(write_ptr);
267 for (
auto& rs : sub_results) {
268 CHECK_EQ(col_count, rs->column_buffers_.size());
272 CHECK_EQ(byte_width, rs->padded_target_sizes_[col_idx]);
273 memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
274 write_ptr += rs->size() * byte_width;
277 return merged_results;
285 const size_t num_columns) {
286 std::atomic<size_t> row_idx{0};
289 std::vector<std::future<void>> conversion_threads;
290 std::mutex write_mutex;
292 [num_columns, &rows, &row_idx, &write_mutex,
this](
const size_t i) {
293 const auto crt_row = rows.getRowAtNoTranslations(i);
294 if (!crt_row.empty()) {
295 auto cur_row_idx = row_idx.fetch_add(1);
296 for (
size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
297 writeBackCell(crt_row[col_idx], cur_row_idx, col_idx, &write_mutex);
301 for (
auto interval :
makeIntervals(
size_t(0), rows.entryCount(), worker_count)) {
304 [&do_work,
this](
const size_t start,
const size_t end) {
306 size_t local_idx = 0;
307 for (
size_t i = start; i < end; ++i, ++local_idx) {
308 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
309 executor_->checkNonKernelTimeInterrupted())) {
315 for (
size_t i = start; i < end; ++i) {
325 for (
auto& child : conversion_threads) {
342 const auto do_work = [num_columns, &row_idx, &rows, &done,
this]() {
343 const auto crt_row = rows.getNextRow(
false,
false);
344 if (crt_row.empty()) {
348 for (
size_t i = 0; i < num_columns; ++i) {
355 if (
UNLIKELY((row_idx & 0xFFFF) == 0 &&
356 executor_->checkNonKernelTimeInterrupted())) {
380 const size_t row_idx,
381 const size_t column_idx,
382 std::mutex* write_mutex) {
384 if (type_info.is_array()) {
387 const auto arr_tv = boost::get<ArrayTargetValue>(&col_val);
389 if (arr_tv->is_initialized()) {
390 const auto& vec = arr_tv->get();
391 auto array_item_size = type_info.get_elem_type().get_size();
396 int8_t* buf =
nullptr;
400 (write_mutex ==
nullptr ? std::unique_lock<std::mutex>()
401 : std::unique_lock<std::mutex>(*write_mutex));
402 status = m.setEmptyItemNoValidation(row_idx, vec.size() * array_item_size, &buf);
404 CHECK_EQ(status, FlatBufferManager::Status::Success);
411 (write_mutex ==
nullptr ? std::unique_lock<std::mutex>()
412 : std::unique_lock<std::mutex>(*write_mutex));
413 m.setNullNoValidation(row_idx);
418 toBuffer(col_val, type_info, buf + type_info.get_size() * row_idx);
427 template <
typename DATA_TYPE>
429 const size_t input_buffer_entry_idx,
430 const size_t output_buffer_entry_idx,
431 const size_t target_idx,
432 const size_t slot_idx,
435 read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx),
437 reinterpret_cast<DATA_TYPE*
>(
column_buffers_[target_idx])[output_buffer_entry_idx] =
442 void ColumnarResults::writeBackCellDirect<float>(
const ResultSet& rows,
443 const size_t input_buffer_entry_idx,
444 const size_t output_buffer_entry_idx,
445 const size_t target_idx,
446 const size_t slot_idx,
449 read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
450 const float fval = *
reinterpret_cast<const float*
>(may_alias_ptr(&ival));
451 reinterpret_cast<float*
>(column_buffers_[target_idx])[output_buffer_entry_idx] = fval;
455 void ColumnarResults::writeBackCellDirect<double>(
456 const ResultSet& rows,
457 const size_t input_buffer_entry_idx,
458 const size_t output_buffer_entry_idx,
459 const size_t target_idx,
460 const size_t slot_idx,
463 read_from_function(rows, input_buffer_entry_idx, target_idx, slot_idx);
464 const double dval = *
reinterpret_cast<const double*
>(may_alias_ptr(&ival));
465 reinterpret_cast<double*
>(column_buffers_[target_idx])[output_buffer_entry_idx] = dval;
478 const size_t num_columns) {
480 switch (rows.getQueryDescriptionType()) {
496 <<
"Direct columnar conversion for this query type is not supported yet.";
508 const size_t num_columns) {
509 CHECK(rows.query_mem_desc_.didOutputColumnar());
511 (rows.query_mem_desc_.getQueryDescriptionType() ==
514 const auto& lazy_fetch_info = rows.getLazyFetchInfo();
524 const size_t num_columns) {
525 CHECK(rows.query_mem_desc_.didOutputColumnar());
527 (rows.query_mem_desc_.getQueryDescriptionType() ==
530 const auto& lazy_fetch_info = rows.getLazyFetchInfo();
532 for (
const auto& col_lazy_fetch_info : lazy_fetch_info) {
533 CHECK(!col_lazy_fetch_info.is_lazily_fetched);
546 const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
547 const ResultSet& rows,
548 const size_t num_columns) {
550 const auto is_column_non_lazily_fetched = [&lazy_fetch_info](
const size_t col_idx) {
552 if (lazy_fetch_info.empty()) {
555 return !lazy_fetch_info[col_idx].is_lazily_fetched;
560 std::vector<std::future<void>> direct_copy_threads;
561 for (
size_t col_idx = 0; col_idx < num_columns; col_idx++) {
562 if (rows.isZeroCopyColumnarConversionPossible(col_idx)) {
565 column_buffers_[col_idx] =
const_cast<int8_t*
>(rows.getColumnarBuffer(col_idx));
566 }
else if (is_column_non_lazily_fetched(col_idx)) {
567 CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
571 [&rows,
this](
const size_t column_index) {
573 rows.copyColumnIntoBuffer(
580 for (
auto& child : direct_copy_threads) {
595 const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info,
596 const ResultSet& rows,
597 const size_t num_columns) {
599 CHECK(!(rows.query_mem_desc_.getQueryDescriptionType() ==
601 std::mutex write_mutex;
602 const auto do_work_just_lazy_columns = [num_columns, &rows, &write_mutex,
this](
603 const size_t row_idx,
604 const std::vector<bool>& targets_to_skip) {
605 const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
606 for (
size_t i = 0; i < num_columns; ++i) {
607 if (!targets_to_skip.empty() && !targets_to_skip[i]) {
613 const auto contains_lazy_fetched_column =
614 [](
const std::vector<ColumnLazyFetchInfo>& lazy_fetch_info) {
615 for (
auto& col_info : lazy_fetch_info) {
616 if (col_info.is_lazily_fetched) {
624 const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
625 if (contains_lazy_fetched_column(lazy_fetch_info)) {
626 const size_t worker_count =
628 std::vector<std::future<void>> conversion_threads;
629 std::vector<bool> targets_to_skip;
630 if (skip_non_lazy_columns) {
631 CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
632 targets_to_skip.reserve(num_columns);
633 for (
size_t i = 0; i < num_columns; i++) {
635 targets_to_skip.push_back(!lazy_fetch_info[i].is_lazily_fetched);
638 for (
auto interval :
makeIntervals(
size_t(0), rows.entryCount(), worker_count)) {
641 [&do_work_just_lazy_columns, &targets_to_skip,
this](
const size_t start,
644 size_t local_idx = 0;
645 for (
size_t i = start; i < end; ++i, ++local_idx) {
646 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
647 executor_->checkNonKernelTimeInterrupted())) {
650 do_work_just_lazy_columns(i, targets_to_skip);
653 for (
size_t i = start; i < end; ++i) {
654 do_work_just_lazy_columns(i, targets_to_skip);
663 for (
auto& child : conversion_threads) {
684 const size_t num_columns) {
690 const size_t entry_count = rows.entryCount();
691 const size_t size_per_thread = (entry_count + num_threads - 1) / num_threads;
694 std::vector<size_t> non_empty_per_thread(num_threads,
700 rows, bitmap, non_empty_per_thread, entry_count, num_threads, size_per_thread);
706 non_empty_per_thread,
720 std::vector<size_t>& non_empty_per_thread,
721 const size_t entry_count,
722 const size_t num_threads,
723 const size_t size_per_thread)
const {
727 CHECK_EQ(num_threads, non_empty_per_thread.size());
728 auto do_work = [&rows, &bitmap](
size_t& total_non_empty,
729 const size_t local_idx,
730 const size_t entry_idx,
731 const size_t thread_idx) {
732 if (!rows.isRowAtEmpty(entry_idx)) {
734 bitmap.
set(local_idx, thread_idx,
true);
737 auto locate_and_count_func =
738 [&do_work, &non_empty_per_thread,
this](
739 size_t start_index,
size_t end_index,
size_t thread_idx) {
740 size_t total_non_empty = 0;
741 size_t local_idx = 0;
743 for (
size_t entry_idx = start_index; entry_idx < end_index;
744 entry_idx++, local_idx++) {
745 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
746 executor_->checkNonKernelTimeInterrupted())) {
749 do_work(total_non_empty, local_idx, entry_idx, thread_idx);
752 for (
size_t entry_idx = start_index; entry_idx < end_index;
753 entry_idx++, local_idx++) {
754 do_work(total_non_empty, local_idx, entry_idx, thread_idx);
757 non_empty_per_thread[thread_idx] = total_non_empty;
760 std::vector<std::future<void>> conversion_threads;
761 for (
size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
762 const size_t start_entry = thread_idx * size_per_thread;
763 const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
769 for (
auto& child : conversion_threads) {
792 const ResultSet& rows,
794 const std::vector<size_t>& non_empty_per_thread,
795 const size_t num_columns,
796 const size_t entry_count,
797 const size_t num_threads,
798 const size_t size_per_thread) {
802 CHECK_EQ(num_threads, non_empty_per_thread.size());
805 std::vector<size_t> global_offsets(num_threads + 1, 0);
807 non_empty_per_thread.end(),
808 std::next(global_offsets.begin()));
810 const auto slot_idx_per_target_idx = rows.getSlotIndicesForTargetIndices();
811 const auto [single_slot_targets_to_skip, num_single_slot_targets] =
812 rows.getSupportedSingleSlotTargetBitmap();
816 if (num_single_slot_targets < num_columns) {
819 non_empty_per_thread,
821 single_slot_targets_to_skip,
822 slot_idx_per_target_idx,
830 non_empty_per_thread,
832 slot_idx_per_target_idx,
847 const ResultSet& rows,
849 const std::vector<size_t>& non_empty_per_thread,
850 const std::vector<size_t>& global_offsets,
851 const std::vector<bool>& targets_to_skip,
852 const std::vector<size_t>& slot_idx_per_target_idx,
853 const size_t num_columns,
854 const size_t entry_count,
855 const size_t num_threads,
856 const size_t size_per_thread) {
861 const auto [write_functions, read_functions] =
863 CHECK_EQ(write_functions.size(), num_columns);
864 CHECK_EQ(read_functions.size(), num_columns);
865 std::mutex write_mutex;
866 auto do_work = [
this,
869 &slot_idx_per_target_idx,
874 &write_functions = write_functions,
875 &read_functions = read_functions](
size_t& non_empty_idx,
876 const size_t total_non_empty,
877 const size_t local_idx,
879 const size_t thread_idx,
880 const size_t end_idx) {
881 if (non_empty_idx >= total_non_empty) {
885 const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
886 if (bitmap.
get(local_idx, thread_idx)) {
888 const auto crt_row = rows.getRowAtNoTranslations(entry_idx, targets_to_skip);
889 for (
size_t column_idx = 0; column_idx < num_columns; ++column_idx) {
890 if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
892 crt_row[column_idx], output_buffer_row_idx, column_idx, &write_mutex);
897 for (
size_t column_idx = 0; column_idx < num_columns; column_idx++) {
898 if (!targets_to_skip.empty() && !targets_to_skip[column_idx]) {
901 write_functions[column_idx](rows,
903 output_buffer_row_idx,
905 slot_idx_per_target_idx[column_idx],
906 read_functions[column_idx]);
912 auto compact_buffer_func = [&non_empty_per_thread, &do_work,
this](
913 const size_t start_index,
914 const size_t end_index,
915 const size_t thread_idx) {
916 const size_t total_non_empty = non_empty_per_thread[thread_idx];
917 size_t non_empty_idx = 0;
918 size_t local_idx = 0;
920 for (
size_t entry_idx = start_index; entry_idx < end_index;
921 entry_idx++, local_idx++) {
922 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
923 executor_->checkNonKernelTimeInterrupted())) {
927 non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
930 for (
size_t entry_idx = start_index; entry_idx < end_index;
931 entry_idx++, local_idx++) {
933 non_empty_idx, total_non_empty, local_idx, entry_idx, thread_idx, end_index);
938 std::vector<std::future<void>> compaction_threads;
939 for (
size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
940 const size_t start_entry = thread_idx * size_per_thread;
941 const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
947 for (
auto& child : compaction_threads) {
967 const ResultSet& rows,
969 const std::vector<size_t>& non_empty_per_thread,
970 const std::vector<size_t>& global_offsets,
971 const std::vector<size_t>& slot_idx_per_target_idx,
972 const size_t num_columns,
973 const size_t entry_count,
974 const size_t num_threads,
975 const size_t size_per_thread) {
980 const auto [write_functions, read_functions] =
982 CHECK_EQ(write_functions.size(), num_columns);
983 CHECK_EQ(read_functions.size(), num_columns);
984 auto do_work = [&rows,
988 &slot_idx_per_target_idx,
989 &write_functions = write_functions,
990 &read_functions = read_functions](
size_t& entry_idx,
991 size_t& non_empty_idx,
992 const size_t total_non_empty,
993 const size_t local_idx,
994 const size_t thread_idx,
995 const size_t end_idx) {
996 if (non_empty_idx >= total_non_empty) {
1001 const size_t output_buffer_row_idx = global_offsets[thread_idx] + non_empty_idx;
1002 if (bitmap.get(local_idx, thread_idx)) {
1003 for (
size_t column_idx = 0; column_idx < num_columns; column_idx++) {
1004 write_functions[column_idx](rows,
1006 output_buffer_row_idx,
1008 slot_idx_per_target_idx[column_idx],
1009 read_functions[column_idx]);
1014 auto compact_buffer_func = [&non_empty_per_thread, &do_work,
this](
1015 const size_t start_index,
1016 const size_t end_index,
1017 const size_t thread_idx) {
1018 const size_t total_non_empty = non_empty_per_thread[thread_idx];
1019 size_t non_empty_idx = 0;
1020 size_t local_idx = 0;
1022 for (
size_t entry_idx = start_index; entry_idx < end_index;
1023 entry_idx++, local_idx++) {
1024 if (
UNLIKELY((local_idx & 0xFFFF) == 0 &&
1025 executor_->checkNonKernelTimeInterrupted())) {
1029 entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1032 for (
size_t entry_idx = start_index; entry_idx < end_index;
1033 entry_idx++, local_idx++) {
1035 entry_idx, non_empty_idx, total_non_empty, local_idx, thread_idx, end_index);
1040 std::vector<std::future<void>> compaction_threads;
1041 for (
size_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
1042 const size_t start_entry = thread_idx * size_per_thread;
1043 const size_t end_entry = std::min(start_entry + size_per_thread, entry_count);
1049 for (
auto& child : compaction_threads) {
1068 const ResultSet& rows,
1069 const std::vector<bool>& targets_to_skip) {
1074 std::vector<WriteFunction>
result;
1077 for (
size_t target_idx = 0; target_idx <
target_types_.size(); target_idx++) {
1078 if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1079 result.emplace_back([](
const ResultSet& rows,
1080 const size_t input_buffer_entry_idx,
1081 const size_t output_buffer_entry_idx,
1082 const size_t target_idx,
1083 const size_t slot_idx,
1085 UNREACHABLE() <<
"Invalid write back function used.";
1093 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<double>,
1095 std::placeholders::_1,
1096 std::placeholders::_2,
1097 std::placeholders::_3,
1098 std::placeholders::_4,
1099 std::placeholders::_5,
1100 std::placeholders::_6));
1103 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<float>,
1105 std::placeholders::_1,
1106 std::placeholders::_2,
1107 std::placeholders::_3,
1108 std::placeholders::_4,
1109 std::placeholders::_5,
1110 std::placeholders::_6));
1113 UNREACHABLE() <<
"Invalid target type encountered.";
1119 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int64_t>,
1121 std::placeholders::_1,
1122 std::placeholders::_2,
1123 std::placeholders::_3,
1124 std::placeholders::_4,
1125 std::placeholders::_5,
1126 std::placeholders::_6));
1129 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int32_t>,
1131 std::placeholders::_1,
1132 std::placeholders::_2,
1133 std::placeholders::_3,
1134 std::placeholders::_4,
1135 std::placeholders::_5,
1136 std::placeholders::_6));
1139 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int16_t>,
1141 std::placeholders::_1,
1142 std::placeholders::_2,
1143 std::placeholders::_3,
1144 std::placeholders::_4,
1145 std::placeholders::_5,
1146 std::placeholders::_6));
1149 result.emplace_back(std::bind(&ColumnarResults::writeBackCellDirect<int8_t>,
1151 std::placeholders::_1,
1152 std::placeholders::_2,
1153 std::placeholders::_3,
1154 std::placeholders::_4,
1155 std::placeholders::_5,
1156 std::placeholders::_6));
1159 UNREACHABLE() <<
"Invalid target type encountered.";
1170 const size_t input_buffer_entry_idx,
1171 const size_t target_idx,
1172 const size_t slot_idx) {
1173 UNREACHABLE() <<
"Invalid read function used, target should have been skipped.";
1174 return static_cast<int64_t
>(0);
1177 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1179 const size_t input_buffer_entry_idx,
1180 const size_t target_idx,
1181 const size_t slot_idx) {
1185 auto fval =
static_cast<float>(rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1186 input_buffer_entry_idx, target_idx, slot_idx));
1187 return *
reinterpret_cast<int32_t*
>(may_alias_ptr(&fval));
1190 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1192 const size_t input_buffer_entry_idx,
1193 const size_t target_idx,
1194 const size_t slot_idx) {
1195 return rows.getEntryAt<int64_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1196 input_buffer_entry_idx, target_idx, slot_idx);
1199 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1201 const size_t input_buffer_entry_idx,
1202 const size_t target_idx,
1203 const size_t slot_idx) {
1204 return rows.getEntryAt<int32_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1205 input_buffer_entry_idx, target_idx, slot_idx);
1208 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1210 const size_t input_buffer_entry_idx,
1211 const size_t target_idx,
1212 const size_t slot_idx) {
1213 return rows.getEntryAt<int16_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1214 input_buffer_entry_idx, target_idx, slot_idx);
1217 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1219 const size_t input_buffer_entry_idx,
1220 const size_t target_idx,
1221 const size_t slot_idx) {
1222 return rows.getEntryAt<int8_t, QUERY_TYPE, COLUMNAR_OUTPUT>(
1223 input_buffer_entry_idx, target_idx, slot_idx);
1226 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1228 const size_t input_buffer_entry_idx,
1229 const size_t target_idx,
1230 const size_t slot_idx) {
1231 auto fval = rows.getEntryAt<float, QUERY_TYPE, COLUMNAR_OUTPUT>(
1232 input_buffer_entry_idx, target_idx, slot_idx);
1233 return *
reinterpret_cast<int32_t*
>(may_alias_ptr(&fval));
1236 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1238 const size_t input_buffer_entry_idx,
1239 const size_t target_idx,
1240 const size_t slot_idx) {
1241 auto dval = rows.getEntryAt<double, QUERY_TYPE, COLUMNAR_OUTPUT>(
1242 input_buffer_entry_idx, target_idx, slot_idx);
1243 return *
reinterpret_cast<int64_t*
>(may_alias_ptr(&dval));
1254 template <QueryDescriptionType QUERY_TYPE,
bool COLUMNAR_OUTPUT>
1256 const ResultSet& rows,
1257 const std::vector<size_t>& slot_idx_per_target_idx,
1258 const std::vector<bool>& targets_to_skip) {
1260 CHECK(COLUMNAR_OUTPUT == rows.didOutputColumnar());
1261 CHECK(QUERY_TYPE == rows.getQueryDescriptionType());
1263 std::vector<ReadFunction> read_functions;
1266 for (
size_t target_idx = 0; target_idx <
target_types_.size(); target_idx++) {
1267 if (!targets_to_skip.empty() && !targets_to_skip[target_idx]) {
1275 if (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx]) == 0) {
1277 CHECK(rows.query_mem_desc_.getTargetGroupbyIndex(target_idx) >= 0);
1279 CHECK_EQ(
size_t(8), rows.query_mem_desc_.getEffectiveKeyWidth());
1282 read_functions.emplace_back(
1283 read_float_key_baseline<QUERY_TYPE, COLUMNAR_OUTPUT>);
1286 read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1290 <<
"Invalid data type encountered (BaselineHash, floating point key).";
1294 switch (rows.query_mem_desc_.getEffectiveKeyWidth()) {
1296 read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1299 read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1303 <<
"Invalid data type encountered (BaselineHash, integer key).";
1310 switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1312 read_functions.emplace_back(read_double_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1315 read_functions.emplace_back(read_float_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1318 UNREACHABLE() <<
"Invalid data type encountered (floating point agg column).";
1322 switch (rows.getPaddedSlotWidthBytes(slot_idx_per_target_idx[target_idx])) {
1324 read_functions.emplace_back(read_int64_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1327 read_functions.emplace_back(read_int32_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1330 read_functions.emplace_back(read_int16_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1333 read_functions.emplace_back(read_int8_func<QUERY_TYPE, COLUMNAR_OUTPUT>);
1336 UNREACHABLE() <<
"Invalid data type encountered (integer agg column).";
1341 return read_functions;
1351 std::tuple<std::vector<ColumnarResults::WriteFunction>,
1352 std::vector<ColumnarResults::ReadFunction>>
1354 const ResultSet& rows,
1355 const std::vector<size_t>& slot_idx_per_target_idx,
1356 const std::vector<bool>& targets_to_skip) {
1363 if (rows.didOutputColumnar()) {
1364 return std::make_tuple(
1365 std::move(write_functions),
1366 initReadFunctions<QueryDescriptionType::GroupByPerfectHash, true>(
1367 rows, slot_idx_per_target_idx, targets_to_skip));
1369 return std::make_tuple(
1370 std::move(write_functions),
1371 initReadFunctions<QueryDescriptionType::GroupByPerfectHash, false>(
1372 rows, slot_idx_per_target_idx, targets_to_skip));
1375 if (rows.didOutputColumnar()) {
1376 return std::make_tuple(
1377 std::move(write_functions),
1378 initReadFunctions<QueryDescriptionType::GroupByBaselineHash, true>(
1379 rows, slot_idx_per_target_idx, targets_to_skip));
1381 return std::make_tuple(
1382 std::move(write_functions),
1383 initReadFunctions<QueryDescriptionType::GroupByBaselineHash, false>(
1384 rows, slot_idx_per_target_idx, targets_to_skip));
void writeBackCell(const TargetValue &col_val, const size_t row_idx, const size_t column_idx, std::mutex *write_mutex=nullptr)
bool isParallelConversion() const
int64_t read_int16_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::vector< int8_t * > column_buffers_
HOST DEVICE int get_size() const
int32_t getErrorCode() const
static const int32_t ERR_INTERRUPTED
void materializeAllColumnsTableFunction(const ResultSet &rows, const size_t num_columns)
static std::unique_ptr< ColumnarResults > mergeResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const std::vector< std::unique_ptr< ColumnarResults >> &sub_results)
std::vector< ReadFunction > initReadFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
void locateAndCountEntries(const ResultSet &rows, ColumnBitmap &bitmap, std::vector< size_t > &non_empty_per_thread, const size_t entry_count, const size_t num_threads, const size_t size_per_thread) const
int64_t countNumberOfValues(const ResultSet &rows, const size_t column_idx)
std::vector< size_t > get_padded_target_sizes(const ResultSet &rows, const std::vector< SQLTypeInfo > &target_types)
int64_t read_double_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
SQLTypeInfo get_logical_type_info(const SQLTypeInfo &type_info)
Constants for Builtin SQL Types supported by HEAVY.AI.
void set(const size_t index, const size_t bank_index, const bool val)
Intervals< T > makeIntervals(T begin, T end, std::size_t n_workers)
bool g_enable_non_kernel_time_query_interrupt
HOST DEVICE SQLTypes get_type() const
void compactAndCopyEntries(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
bool use_parallel_algorithms(const ResultSet &rows)
future< Result > async(Fn &&fn, Args &&...args)
std::function< int64_t(const ResultSet &, const size_t, const size_t, const size_t)> ReadFunction
void materializeAllColumnsThroughIteration(const ResultSet &rows, const size_t num_columns)
void init(LogOptions const &log_opts)
int64_t fixed_encoding_nullable_val(const int64_t val, const SQLTypeInfo &type_info)
std::vector< WriteFunction > initWriteFunctions(const ResultSet &rows, const std::vector< bool > &targets_to_skip={})
DEVICE void partial_sum(ARGS &&...args)
void materializeAllColumnsGroupBy(const ResultSet &rows, const size_t num_columns)
DEVICE auto accumulate(ARGS &&...args)
int64_t read_int32_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
std::tuple< std::vector< WriteFunction >, std::vector< ReadFunction > > initAllConversionFunctions(const ResultSet &rows, const std::vector< size_t > &slot_idx_per_target_idx, const std::vector< bool > &targets_to_skip={})
bool g_enable_smem_group_by true
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
bool isDirectColumnarConversionPossible() const
void initializeVarlenArray(FlatBufferManager &m, int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
int64_t read_int8_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsDirectly(const ResultSet &rows, const size_t num_columns)
HOST DEVICE EncodingType get_compression() const
void writeBackCellDirect(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t output_buffer_entry_idx, const size_t target_idx, const size_t slot_idx, const ReadFunction &read_function)
bool get(const size_t index, const size_t bank_index) const
int64_t toBuffer(const TargetValue &col_val, const SQLTypeInfo &type_info, int8_t *buf)
std::shared_ptr< Executor > executor_
std::vector< size_t > padded_target_sizes_
void copyAllNonLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
int64_t getVarlenArrayBufferSize(int64_t items_count, int64_t max_nof_values, const SQLTypeInfo &ti)
#define DEBUG_TIMER(name)
int64_t read_float_key_baseline(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void materializeAllColumnsProjection(const ResultSet &rows, const size_t num_columns)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
int64_t read_float_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
boost::variant< ScalarTargetValue, ArrayTargetValue, GeoTargetValue, GeoTargetValuePtr > TargetValue
int64_t invalid_read_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
int64_t read_int64_func(const ResultSet &rows, const size_t input_buffer_entry_idx, const size_t target_idx, const size_t slot_idx)
void compactAndCopyEntriesWithTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< bool > &targets_to_skip, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
void compactAndCopyEntriesWithoutTargetSkipping(const ResultSet &rows, const ColumnBitmap &bitmap, const std::vector< size_t > &non_empty_per_thread, const std::vector< size_t > &global_offsets, const std::vector< size_t > &slot_idx_per_target_idx, const size_t num_columns, const size_t entry_count, const size_t num_threads, const size_t size_per_thread)
HOST static DEVICE bool isFlatBuffer(const void *buffer)
const std::vector< SQLTypeInfo > target_types_
SQLTypeInfo get_elem_type() const
Divide up indexes (A, A+1, A+2, ..., B-2, B-1) among N workers as evenly as possible in a range-based...
void materializeAllLazyColumns(const std::vector< ColumnLazyFetchInfo > &lazy_fetch_info, const ResultSet &rows, const size_t num_columns)
ColumnarResults(const std::shared_ptr< RowSetMemoryOwner > row_set_mem_owner, const ResultSet &rows, const size_t num_columns, const std::vector< SQLTypeInfo > &target_types, const size_t executor_id, const size_t thread_idx, const bool is_parallel_execution_enforced=false)