17 #include "../Shared/DateConverters.h"
20 #include "arrow/ipc/dictionary.h"
21 #include "arrow/ipc/options.h"
26 #include <sys/types.h>
40 #include "arrow/api.h"
41 #include "arrow/io/memory.h"
42 #include "arrow/ipc/api.h"
47 #include <arrow/gpu/cuda_api.h>
51 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
53 #define ARROW_CONVERTER_DEBUG true
55 #define ARROW_LOG(category) \
56 VLOG(1) << "[Arrow]" \
57 << "[" << category "] "
67 : arrow::Buffer(buf, size), _rs(rs) {}
109 template <
typename TYPE,
typename VALUE_ARRAY_TYPE>
111 std::shared_ptr<ValueArray>& values,
112 const size_t max_size) {
114 values = std::make_shared<ValueArray>(std::vector<TYPE>());
115 boost::get<std::vector<TYPE>>(*values).reserve(max_size);
118 auto values_ty = boost::get<std::vector<TYPE>>(values.get());
121 auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
123 if constexpr (std::is_same_v<VALUE_ARRAY_TYPE, NullableString>) {
124 if (
auto str = boost::get<std::string>(pval_cty)) {
125 values_ty->push_back(*str);
127 values_ty->push_back(
"");
130 auto val_ty =
static_cast<TYPE
>(*pval_cty);
131 values_ty->push_back(val_ty);
135 template <
typename TYPE,
typename VALUE_ARRAY_TYPE>
137 std::shared_ptr<ValueArray>& values,
138 const size_t max_size) {
140 values = std::make_shared<ValueArray>(
Vec2<TYPE>());
141 boost::get<Vec2<TYPE>>(*values).reserve(max_size);
145 Vec2<TYPE>* values_ty = boost::get<Vec2<TYPE>>(values.get());
148 values_ty->emplace_back(std::vector<TYPE>{});
151 for (
auto val_cty : val_ctys.value()) {
152 auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
154 auto val_ty =
static_cast<TYPE
>(*pval_cty);
155 values_ty->back().emplace_back(val_ty);
162 std::shared_ptr<std::vector<bool>>& null_bitmap,
163 const size_t max_size) {
170 null_bitmap = std::make_shared<std::vector<bool>>();
171 null_bitmap->reserve(max_size);
174 null_bitmap->push_back(value ?
true :
false);
177 template <
typename TYPE>
180 std::shared_ptr<std::vector<bool>>& null_bitmap,
181 const size_t max_size) {
186 auto pvalue = boost::get<TYPE>(&value);
188 bool is_valid =
false;
189 if constexpr (std::is_same_v<TYPE, NullableString>) {
190 is_valid = boost::get<std::string>(pvalue) !=
nullptr;
198 }
else if (col_type.
is_fp()) {
206 null_bitmap = std::make_shared<std::vector<bool>>();
207 null_bitmap->reserve(max_size);
210 null_bitmap->push_back(is_valid);
213 template <
typename TYPE,
typename enable =
void>
216 template <
typename TYPE>
217 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
219 static constexpr
type value = inline_int_null_value<type>();
222 template <
typename TYPE>
223 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
225 static constexpr
type value = inline_fp_null_value<type>();
228 template <
typename TYPE>
231 template <
typename C_TYPE,
232 typename ARROW_TYPE =
typename arrow::CTypeTraits<C_TYPE>::ArrowType>
236 std::shared_ptr<arrow::Array>& out) {
237 CHECK(
sizeof(C_TYPE) == result->getColType(col).get_size());
239 std::shared_ptr<arrow::Buffer> values;
240 std::shared_ptr<arrow::Buffer> is_valid;
241 const int64_t buf_size = entry_count *
sizeof(C_TYPE);
242 if (result->isZeroCopyColumnarConversionPossible(col)) {
244 reinterpret_cast<const uint8_t*>(result->getColumnarBuffer(col)),
248 auto res = arrow::AllocateBuffer(buf_size);
250 values = std::move(
res).ValueOrDie();
251 result->copyColumnIntoBuffer(
252 col, reinterpret_cast<int8_t*>(values->mutable_data()), buf_size);
255 int64_t null_count = 0;
256 auto res = arrow::AllocateBuffer((entry_count + 7) / 8);
258 is_valid = std::move(
res).ValueOrDie();
260 auto is_valid_data = is_valid->mutable_data();
266 size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
267 for (
size_t i = 0; i < unroll_count; i += 8) {
268 uint8_t valid_byte = 0;
270 valid = vals[i + 0] != null_val;
271 valid_byte |= valid << 0;
272 null_count += !valid;
273 valid = vals[i + 1] != null_val;
274 valid_byte |= valid << 1;
275 null_count += !valid;
276 valid = vals[i + 2] != null_val;
277 valid_byte |= valid << 2;
278 null_count += !valid;
279 valid = vals[i + 3] != null_val;
280 valid_byte |= valid << 3;
281 null_count += !valid;
282 valid = vals[i + 4] != null_val;
283 valid_byte |= valid << 4;
284 null_count += !valid;
285 valid = vals[i + 5] != null_val;
286 valid_byte |= valid << 5;
287 null_count += !valid;
288 valid = vals[i + 6] != null_val;
289 valid_byte |= valid << 6;
290 null_count += !valid;
291 valid = vals[i + 7] != null_val;
292 valid_byte |= valid << 7;
293 null_count += !valid;
294 is_valid_data[i >> 3] = valid_byte;
296 if (unroll_count != entry_count) {
297 uint8_t valid_byte = 0;
298 for (
size_t i = unroll_count; i < entry_count; ++i) {
299 bool valid = vals[i] != null_val;
300 valid_byte |= valid << (i & 7);
301 null_count += !valid;
303 is_valid_data[unroll_count >> 3] = valid_byte;
314 new arrow::NumericArray<ARROW_TYPE>(entry_count, values, is_valid, null_count));
316 out.reset(
new arrow::NumericArray<ARROW_TYPE>(entry_count, values));
321 std::pair<key_t, void*>
get_shm(
size_t shmsz) {
323 return std::make_pair(IPC_PRIVATE,
nullptr);
331 auto key =
static_cast<key_t
>(rand());
335 while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
344 if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
345 throw std::runtime_error(
"failed to create a shared memory");
347 key =
static_cast<key_t
>(rand());
350 auto ipc_ptr = shmat(shmid, NULL, 0);
351 if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
352 throw std::runtime_error(
"failed to attach a shared memory");
355 return std::make_pair(key, ipc_ptr);
361 throw std::runtime_error(
"Arrow IPC not yet supported on Windows.");
362 return std::make_pair(0,
nullptr);
364 auto [key, ipc_ptr] =
get_shm(size);
365 std::shared_ptr<arrow::Buffer> buffer(
366 new arrow::MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
367 return std::make_pair<key_t, std::shared_ptr<arrow::Buffer>>(std::move(key),
378 throw std::runtime_error(
"Arrow IPC not yet supported on Windows.");
380 auto [key, ipc_ptr] =
get_shm(data->size());
384 memcpy(ipc_ptr, data->data(), data->size());
398 std::shared_ptr<arrow::RecordBatch> record_batch =
convertToArrow();
400 struct BuildResultParams {
401 int64_t schemaSize()
const {
402 return serialized_schema ? serialized_schema->size() : 0;
404 int64_t dictSize()
const {
return serialized_dict ? serialized_dict->size() : 0; };
405 int64_t totalSize()
const {
return schemaSize() + records_size + dictSize(); }
406 bool hasRecordBatch()
const {
return records_size > 0; }
407 bool hasDict()
const {
return dictSize() > 0; }
409 int64_t records_size{0};
410 std::shared_ptr<arrow::Buffer> serialized_schema{
nullptr};
411 std::shared_ptr<arrow::Buffer> serialized_dict{
nullptr};
417 auto timer =
DEBUG_TIMER(
"serialize batch to wire");
418 const auto total_size = result_params.totalSize();
419 std::vector<char> record_handle_data(total_size);
420 auto serialized_records =
421 arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
426 reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
427 result_params.schemaSize()));
429 if (result_params.hasDict()) {
431 reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
432 result_params.dictSize()));
435 arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
436 serialized_records, result_params.schemaSize() + result_params.dictSize()));
438 if (result_params.hasRecordBatch()) {
440 *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
443 return {std::vector<char>(0),
445 std::vector<char>(0),
446 serialized_records->size(),
448 std::move(record_handle_data)};
452 auto timer =
DEBUG_TIMER(
"serialize batch to shared memory");
453 std::shared_ptr<arrow::Buffer> serialized_records;
454 std::vector<char> schema_handle_buffer;
455 std::vector<char> record_handle_buffer(
sizeof(key_t), 0);
456 key_t records_shm_key = IPC_PRIVATE;
457 const int64_t total_size = result_params.totalSize();
459 std::tie(records_shm_key, serialized_records) =
get_shm_buffer(total_size);
461 memcpy(serialized_records->mutable_data(),
462 result_params.serialized_schema->data(),
463 (size_t)result_params.schemaSize());
465 if (result_params.hasDict()) {
466 memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
467 result_params.serialized_dict->data(),
468 (size_t)result_params.dictSize());
471 arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
472 serialized_records, result_params.schemaSize() + result_params.dictSize()));
474 if (result_params.hasRecordBatch()) {
476 *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
479 memcpy(&record_handle_buffer[0],
480 reinterpret_cast<const unsigned char*>(&records_shm_key),
483 return {schema_handle_buffer,
485 record_handle_buffer,
486 serialized_records->size(),
490 arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
491 auto options = arrow::ipc::IpcWriteOptions::Defaults();
492 auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
496 if (!record_batch->num_rows()) {
498 arrow::ipc::SerializeSchema(*record_batch->schema(),
499 arrow::default_memory_pool()));
503 return getWireResult();
505 return getShmResult();
513 ARROW_LOG(
"CPU") <<
"found " << dictionaries.size() <<
" dictionaries";
515 for (
auto& pair : dictionaries) {
516 arrow::ipc::IpcPayload payload;
517 int64_t dictionary_id = pair.first;
518 const auto& dictionary = pair.second;
521 GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
522 int32_t metadata_length = 0;
524 WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
526 result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
529 arrow::ipc::SerializeSchema(*record_batch->schema(),
530 arrow::default_memory_pool()));
533 arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
537 return getWireResult();
539 return getShmResult();
548 auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
550 auto out_stream = std::move(out_stream_result).ValueOrDie();
552 arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
553 arrow::ipc::DictionaryMemo current_memo;
554 arrow::ipc::DictionaryMemo serialized_memo;
556 arrow::ipc::IpcPayload schema_payload;
558 arrow::ipc::IpcWriteOptions::Defaults(),
561 int32_t schema_payload_length = 0;
563 arrow::ipc::IpcWriteOptions::Defaults(),
565 &schema_payload_length));
568 <<
"found dicts: " << dictionaries.size();
571 arrow::ipc::internal::CollectDictionaries(*record_batch, ¤t_memo));
574 std::shared_ptr<arrow::Schema> dummy_schema;
575 std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
577 for (
const auto& pair : dictionaries) {
578 arrow::ipc::IpcPayload payload;
579 const auto& dict_id = pair.first;
582 <<
"dict_id: " << dict_id;
583 const auto& dict = pair.second;
587 auto dummy_field = std::make_shared<arrow::Field>(
"", dict->type());
588 dummy_schema = std::make_shared<arrow::Schema>(
589 std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
591 dict_batches.emplace_back(
592 arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
595 if (!dict_batches.empty()) {
597 dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
600 auto complete_ipc_stream = out_stream->Finish();
602 auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
605 std::vector<char> schema_record_key_buffer(
sizeof(key_t), 0);
606 memcpy(&schema_record_key_buffer[0],
607 reinterpret_cast<const unsigned char*>(&record_key),
610 arrow::cuda::CudaDeviceManager* manager;
612 std::shared_ptr<arrow::cuda::CudaContext> context;
615 std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
617 SerializeRecordBatch(*record_batch, context.get()));
619 std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
622 std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
624 cuda_handle->Serialize(arrow::default_memory_pool()));
626 std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
627 memcpy(&record_handle_buffer[0],
628 serialized_cuda_handle->data(),
629 serialized_cuda_handle->size());
631 return {schema_record_key_buffer,
632 serialized_records->size(),
633 record_handle_buffer,
634 serialized_cuda_handle->size(),
635 serialized_cuda_handle->ToString()};
638 return {std::vector<char>{}, 0, std::vector<char>{}, 0,
""};
644 arrow::ipc::DictionaryFieldMapper* mapper)
const {
646 std::shared_ptr<arrow::RecordBatch> arrow_copy =
convertToArrow();
647 std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
651 arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
653 if (arrow_copy->num_rows()) {
657 arrow::ipc::SerializeRecordBatch(
658 *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
662 return {serialized_schema, serialized_records};
667 const auto col_count =
results_->colCount();
668 std::vector<std::shared_ptr<arrow::Field>> fields;
670 for (
size_t i = 0; i < col_count; ++i) {
671 const auto ti =
results_->getColType(i);
674 #if ARROW_CONVERTER_DEBUG
675 VLOG(1) <<
"Arrow fields: ";
676 for (
const auto&
f : fields) {
677 VLOG(1) <<
"\t" <<
f->ToString(
true);
684 const std::shared_ptr<arrow::Schema>& schema)
const {
685 std::vector<std::shared_ptr<arrow::Array>> result_columns;
694 const size_t entry_count =
top_n_ < 0
698 const auto col_count =
results_->colCount();
699 size_t row_count = 0;
701 result_columns.resize(col_count);
702 std::vector<ColumnBuilder> builders(col_count);
705 for (
size_t i = 0; i < col_count; ++i) {
710 auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
711 std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
712 const std::vector<bool>& non_lazy_cols,
713 const size_t start_entry,
714 const size_t end_entry) ->
size_t {
715 CHECK_EQ(value_seg.size(), col_count);
716 CHECK_EQ(null_bitmap_seg.size(), col_count);
717 const auto local_entry_count = end_entry - start_entry;
718 size_t seg_row_count = 0;
719 for (
size_t i = start_entry; i < end_entry; ++i) {
720 auto row =
results_->getRowAtNoTranslations(i, non_lazy_cols);
725 for (
size_t j = 0; j < col_count; ++j) {
726 if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
730 if (
auto scalar_value = boost::get<ScalarTargetValue>(&row[j])) {
733 const auto& column = builders[j];
734 switch (column.physical_type) {
736 create_or_append_value<bool, int64_t>(
737 *scalar_value, value_seg[j], local_entry_count);
738 create_or_append_validity<int64_t>(
739 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
742 create_or_append_value<int8_t, int64_t>(
743 *scalar_value, value_seg[j], local_entry_count);
744 create_or_append_validity<int64_t>(
745 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
748 create_or_append_value<int16_t, int64_t>(
749 *scalar_value, value_seg[j], local_entry_count);
750 create_or_append_validity<int64_t>(
751 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
754 create_or_append_value<int32_t, int64_t>(
755 *scalar_value, value_seg[j], local_entry_count);
756 create_or_append_validity<int64_t>(
757 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
760 create_or_append_value<int64_t, int64_t>(
761 *scalar_value, value_seg[j], local_entry_count);
762 create_or_append_validity<int64_t>(
763 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
766 create_or_append_value<int64_t, int64_t>(
767 *scalar_value, value_seg[j], local_entry_count);
768 create_or_append_validity<int64_t>(
769 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
772 create_or_append_value<float, float>(
773 *scalar_value, value_seg[j], local_entry_count);
774 create_or_append_validity<float>(
775 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
778 create_or_append_value<double, double>(
779 *scalar_value, value_seg[j], local_entry_count);
780 create_or_append_validity<double>(
781 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
784 create_or_append_value<int32_t, int64_t>(
785 *scalar_value, value_seg[j], local_entry_count);
786 create_or_append_validity<int64_t>(
787 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
791 ? create_or_append_value<int64_t, int64_t>(
792 *scalar_value, value_seg[j], local_entry_count)
793 : create_or_append_value<int32_t, int64_t>(
794 *scalar_value, value_seg[j], local_entry_count);
795 create_or_append_validity<int64_t>(
796 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
799 create_or_append_value<int64_t, int64_t>(
800 *scalar_value, value_seg[j], local_entry_count);
801 create_or_append_validity<int64_t>(
802 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
805 create_or_append_value<std::string, NullableString>(
806 *scalar_value, value_seg[j], local_entry_count);
807 create_or_append_validity<NullableString>(
808 *scalar_value, column.col_type, null_bitmap_seg[j], local_entry_count);
812 throw std::runtime_error(column.col_type.get_type_name() +
813 " is not supported in Arrow result sets.");
815 }
else if (
auto array = boost::get<ArrayTargetValue>(&row[j])) {
817 const auto& column = builders[j];
818 switch (column.col_type.get_subtype()) {
820 create_or_append_value<int8_t, int64_t>(
821 *array, value_seg[j], local_entry_count);
823 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
826 create_or_append_value<int8_t, int64_t>(
827 *array, value_seg[j], local_entry_count);
829 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
832 create_or_append_value<int16_t, int64_t>(
833 *array, value_seg[j], local_entry_count);
835 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
838 create_or_append_value<int32_t, int64_t>(
839 *array, value_seg[j], local_entry_count);
841 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
844 create_or_append_value<int64_t, int64_t>(
845 *array, value_seg[j], local_entry_count);
847 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
850 create_or_append_value<float, float>(
851 *array, value_seg[j], local_entry_count);
853 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
856 create_or_append_value<double, double>(
857 *array, value_seg[j], local_entry_count);
859 *array, column.col_type, null_bitmap_seg[j], local_entry_count);
862 throw std::runtime_error(column.col_type.get_type_name() +
863 " is not supported in Arrow result sets.");
868 return seg_row_count;
871 auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>&
result,
872 const std::vector<bool>& non_lazy_cols,
873 const size_t start_col,
874 const size_t end_col) {
875 for (
size_t col = start_col; col < end_col; ++col) {
876 if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
880 const auto& column = builders[col];
881 switch (column.physical_type) {
883 convert_column<int8_t>(
results_, col, entry_count, result[col]);
886 convert_column<int16_t>(
results_, col, entry_count, result[col]);
889 convert_column<int32_t>(
results_, col, entry_count, result[col]);
892 convert_column<int64_t>(
results_, col, entry_count, result[col]);
895 convert_column<float>(
results_, col, entry_count, result[col]);
898 convert_column<double>(
results_, col, entry_count, result[col]);
901 throw std::runtime_error(column.col_type.get_type_name() +
902 " is not supported in Arrow column converter.");
907 std::vector<std::shared_ptr<ValueArray>> column_values(col_count,
nullptr);
908 std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count,
nullptr);
909 const bool multithreaded = entry_count > 10000 && !
results_->isTruncated();
912 bool use_columnar_converter =
results_->isDirectColumnarConversionPossible() &&
913 (
results_->getQueryMemDesc().getQueryDescriptionType() ==
915 results_->getQueryMemDesc().getQueryDescriptionType() ==
917 entry_count ==
results_->entryCount();
918 std::vector<bool> non_lazy_cols;
919 if (use_columnar_converter) {
921 std::vector<size_t> non_lazy_col_pos;
922 size_t non_lazy_col_count = 0;
923 const auto& lazy_fetch_info =
results_->getLazyFetchInfo();
925 non_lazy_cols.reserve(col_count);
926 non_lazy_col_pos.reserve(col_count);
927 for (
size_t i = 0; i < col_count; ++i) {
929 lazy_fetch_info.empty() ?
false : lazy_fetch_info[i].is_lazily_fetched;
932 switch (builders[i].physical_type) {
942 if (builders[i].
field->type()->id() == arrow::Type::DICTIONARY) {
945 non_lazy_cols.emplace_back(!is_lazy);
947 ++non_lazy_col_count;
948 non_lazy_col_pos.emplace_back(i);
952 if (non_lazy_col_count == col_count) {
953 non_lazy_cols.clear();
954 non_lazy_col_pos.clear();
956 non_lazy_col_pos.emplace_back(col_count);
959 std::vector<std::future<void>> child_threads;
961 std::min(multithreaded ? (
size_t)
cpu_threads() : (
size_t)1, non_lazy_col_count);
963 size_t start_col = 0;
965 for (
size_t i = 0; i < num_threads; ++i) {
967 end_col = (i + 1) * non_lazy_col_count / num_threads;
968 size_t phys_start_col =
969 non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
970 size_t phys_end_col =
971 non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
974 std::ref(result_columns),
979 for (
auto& child : child_threads) {
982 row_count = entry_count;
984 if (!use_columnar_converter || !non_lazy_cols.empty()) {
989 std::vector<std::future<size_t>> child_threads;
990 std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
991 cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count,
nullptr));
992 std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
993 cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count,
nullptr));
994 const auto stride = (entry_count + cpu_count - 1) / cpu_count;
995 for (
size_t i = 0, start_entry = 0; start_entry < entry_count;
996 ++i, start_entry += stride) {
997 const auto end_entry = std::min(entry_count, start_entry + stride);
1000 std::ref(column_value_segs[i]),
1001 std::ref(null_bitmap_segs[i]),
1006 for (
auto& child : child_threads) {
1007 row_count += child.get();
1011 for (
int i = 0; i < schema->num_fields(); ++i) {
1012 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1016 for (
size_t j = 0; j < cpu_count; ++j) {
1017 if (!column_value_segs[j][i]) {
1020 append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
1026 fetch(column_values, null_bitmaps, non_lazy_cols,
size_t(0), entry_count);
1028 auto timer =
DEBUG_TIMER(
"append rows to arrow single thread");
1029 for (
int i = 0; i < schema->num_fields(); ++i) {
1030 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1034 append(builders[i], *column_values[i], null_bitmaps[i]);
1041 for (
size_t i = 0; i < col_count; ++i) {
1042 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
1060 return arrow::boolean();
1062 return arrow::int8();
1064 return arrow::int16();
1066 return arrow::int32();
1068 return arrow::int64();
1070 return arrow::float32();
1072 return arrow::float64();
1077 auto value_type = std::make_shared<arrow::StringType>();
1078 return dictionary(arrow::int32(), value_type,
false);
1080 return arrow::utf8();
1085 return time32(arrow::TimeUnit::SECOND);
1091 return arrow::date64();
1093 return arrow::date32();
1099 return timestamp(arrow::TimeUnit::SECOND);
1101 return timestamp(arrow::TimeUnit::MILLI);
1103 return timestamp(arrow::TimeUnit::MICRO);
1105 return timestamp(arrow::TimeUnit::NANO);
1107 throw std::runtime_error(
1108 "Unsupported timestamp precision for Arrow result sets: " +
1114 return arrow::list(arrow::boolean());
1116 return arrow::list(arrow::int8());
1118 return arrow::list(arrow::int16());
1120 return arrow::list(arrow::int32());
1122 return arrow::list(arrow::int64());
1124 return arrow::list(arrow::float32());
1126 return arrow::list(arrow::float64());
1128 throw std::runtime_error(
"Unsupported array type for Arrow result sets: " +
1135 " is not supported in Arrow result sets.");
1143 const std::string
name,
1152 const size_t device_id,
1153 std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
1159 const key_t& schema_key = *(key_t*)(&result.
sm_handle[0]);
1160 auto shm_id = shmget(schema_key, result.
sm_size, 0666);
1162 throw std::runtime_error(
1163 "failed to get an valid shm ID w/ given shm key of the schema");
1165 if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1166 throw std::runtime_error(
"failed to deallocate Arrow schema on errorno(" +
1173 const key_t& df_key = *(key_t*)(&result.
df_handle[0]);
1174 auto shm_id = shmget(df_key, result.
df_size, 0666);
1176 throw std::runtime_error(
1177 "failed to get an valid shm ID w/ given shm key of the data");
1179 if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1180 throw std::runtime_error(
"failed to deallocate Arrow data frame");
1192 const size_t results_col_slot_idx,
1193 const std::shared_ptr<arrow::Field>&
field)
const {
1195 column_builder.
col_type = col_type;
1200 auto value_type = field->type();
1202 auto timer =
DEBUG_TIMER(
"Translate string dictionary to Arrow dictionary");
1203 column_builder.
builder.reset(
new arrow::StringDictionary32Builder());
1210 const size_t result_set_rows =
results_->rowCount();
1214 auto sdp =
results_->getStringDictionaryProxy(dict_id);
1215 const size_t dictionary_proxy_entries = sdp->entryCount();
1216 const double dictionary_to_result_size_ratio =
1217 static_cast<double>(dictionary_proxy_entries) / result_set_rows;
1233 const bool do_dictionary_bulk_fetch =
1235 dictionary_to_result_size_ratio <=
1238 arrow::StringBuilder str_array_builder;
1240 if (do_dictionary_bulk_fetch) {
1241 VLOG(1) <<
"Arrow dictionary creation: bulk copying all dictionary "
1242 <<
" entries for column at offset " << results_col_slot_idx <<
". "
1243 <<
"Column has " << dictionary_proxy_entries <<
" string entries"
1244 <<
" for a result set with " << result_set_rows <<
" rows.";
1247 auto str_list =
results_->getStringDictionaryPayloadCopy(dict_id);
1257 int32_t crt_transient_id =
static_cast<int32_t
>(str_list.size());
1258 auto const& transient_vecmap = sdp->getTransientVector();
1259 for (
unsigned index = 0; index < transient_vecmap.size(); ++index) {
1263 .insert(std::make_pair(old_id, crt_transient_id++))
1268 VLOG(1) <<
"Arrow dictionary creation: serializing unique result set dictionary "
1269 <<
" entries for column at offset " << results_col_slot_idx <<
". "
1270 <<
"Column has " << dictionary_proxy_entries <<
" string entries"
1271 <<
" for a result set with " << result_set_rows <<
" rows.";
1280 auto unique_ids_and_strings =
1281 results_->getUniqueStringsForDictEncodedTargetCol(results_col_slot_idx);
1282 const auto& unique_ids = unique_ids_and_strings.first;
1283 const auto& unique_strings = unique_ids_and_strings.second;
1285 const int32_t num_unique_strings = unique_strings.size();
1286 CHECK_EQ(num_unique_strings, unique_ids.size());
1290 for (int32_t unique_string_idx = 0; unique_string_idx < num_unique_strings;
1291 ++unique_string_idx) {
1294 .insert(std::make_pair(unique_ids[unique_string_idx], unique_string_idx))
1301 std::shared_ptr<arrow::StringArray> string_array;
1305 dynamic_cast<arrow::StringDictionary32Builder*
>(column_builder.
builder.get());
1306 CHECK(dict_builder);
1311 arrow::default_memory_pool(), value_type, &column_builder.
builder));
1317 std::shared_ptr<arrow::Array> values;
1324 template <
typename BUILDER_TYPE,
typename VALUE_ARRAY_TYPE>
1327 const std::shared_ptr<std::vector<bool>>& is_valid) {
1328 static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1329 "Dictionary encoded string builder requires function specialization.");
1331 std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1333 if (scale_epoch_values<BUILDER_TYPE>()) {
1334 auto scale_sec_to_millisec = [](
auto seconds) {
return seconds *
kMilliSecsPerSec; };
1335 auto scale_values = [&](
auto epoch) {
1336 return std::is_same<BUILDER_TYPE, arrow::Date32Builder>::value
1338 : scale_sec_to_millisec(epoch);
1340 std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1343 auto typed_builder =
dynamic_cast<BUILDER_TYPE*
>(column_builder.
builder.get());
1344 CHECK(typed_builder);
1345 if (column_builder.
field->nullable()) {
1346 CHECK(is_valid.get());
1354 void appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1357 const std::shared_ptr<std::vector<bool>>& is_valid) {
1358 std::vector<int64_t> vals = boost::get<std::vector<int64_t>>(values);
1359 auto typed_builder =
1360 dynamic_cast<arrow::Decimal128Builder*
>(column_builder.builder.get());
1361 CHECK(typed_builder);
1362 CHECK_EQ(is_valid->size(), vals.size());
1363 if (column_builder.field->nullable()) {
1364 CHECK(is_valid.get());
1365 for (
size_t i = 0; i < vals.size(); i++) {
1366 const auto v = vals[i];
1367 const auto valid = (*is_valid)[i];
1375 for (
const auto& v : vals) {
1382 void appendToColumnBuilder<arrow::StringBuilder, std::string>(
1385 const std::shared_ptr<std::vector<bool>>& is_valid) {
1386 std::vector<std::string> vals = boost::get<std::vector<std::string>>(values);
1387 auto typed_builder =
dynamic_cast<arrow::StringBuilder*
>(column_builder.builder.get());
1388 CHECK(typed_builder);
1389 CHECK_EQ(is_valid->size(), vals.size());
1391 if (column_builder.field->nullable()) {
1392 CHECK(is_valid.get());
1395 std::vector<uint8_t> transformed_bitmap;
1396 transformed_bitmap.reserve(is_valid->size());
1398 is_valid->begin(), is_valid->end(), [&transformed_bitmap](
const bool is_valid) {
1399 transformed_bitmap.push_back(is_valid ? 1 : 0);
1408 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1411 const std::shared_ptr<std::vector<bool>>& is_valid) {
1412 auto typed_builder =
1413 dynamic_cast<arrow::StringDictionary32Builder*
>(column_builder.builder.get());
1414 CHECK(typed_builder);
1416 std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1420 for (
size_t i = 0; i < vals.size(); i++) {
1421 auto& val = vals[i];
1425 vals[i] = column_builder.string_remapping.at(val);
1429 if (column_builder.field->nullable()) {
1430 CHECK(is_valid.get());
1432 std::vector<uint8_t> transformed_bitmap;
1433 transformed_bitmap.reserve(is_valid->size());
1435 is_valid->begin(), is_valid->end(), [&transformed_bitmap](
const bool is_valid) {
1436 transformed_bitmap.push_back(is_valid ? 1 : 0);
1440 vals.data(),
static_cast<int64_t
>(vals.size()), transformed_bitmap.data()));
1443 typed_builder->AppendIndices(vals.data(),
static_cast<int64_t
>(vals.size())));
1447 template <
typename BUILDER_TYPE,
typename VALUE_TYPE>
1450 const std::shared_ptr<std::vector<bool>>& is_valid) {
1452 auto list_builder =
dynamic_cast<arrow::ListBuilder*
>(column_builder.
builder.get());
1453 CHECK(list_builder);
1455 auto value_builder =
static_cast<BUILDER_TYPE*
>(list_builder->value_builder());
1457 if (column_builder.
field->nullable()) {
1458 for (
size_t i = 0; i < vals.size(); i++) {
1459 if ((*is_valid)[i]) {
1460 const auto& val = vals[i];
1461 std::vector<uint8_t> bitmap(val.size());
1462 std::transform(val.begin(), val.end(), bitmap.begin(), [](VALUE_TYPE pvalue) {
1466 if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1467 std::vector<uint8_t> bval(val.size());
1468 std::copy(val.begin(), val.end(), bval.begin());
1470 value_builder->AppendValues(bval.data(), bval.size(), bitmap.data()));
1473 value_builder->AppendValues(val.data(), val.size(), bitmap.data()));
1480 for (
size_t i = 0; i < vals.size(); i++) {
1481 if ((*is_valid)[i]) {
1482 const auto& val = vals[i];
1484 if constexpr (std::is_same_v<BUILDER_TYPE, arrow::BooleanBuilder>) {
1485 std::vector<uint8_t> bval(val.size());
1486 std::copy(val.begin(), val.end(), bval.begin());
1503 const std::shared_ptr<std::vector<bool>>& is_valid)
const {
1507 appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1508 column_builder, values, is_valid);
1513 appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1514 column_builder, values, is_valid);
1517 appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1520 appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1521 column_builder, values, is_valid);
1524 appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1525 column_builder, values, is_valid);
1528 appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1529 column_builder, values, is_valid);
1532 appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1533 column_builder, values, is_valid);
1536 appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1539 appendToColumnBuilder<arrow::DoubleBuilder, double>(
1540 column_builder, values, is_valid);
1543 appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1544 column_builder, values, is_valid);
1547 appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1548 column_builder, values, is_valid);
1552 ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1553 column_builder, values, is_valid)
1554 : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1555 column_builder, values, is_valid);
1558 if (column_builder.col_type.get_subtype() ==
kBOOLEAN) {
1559 appendToListColumnBuilder<arrow::BooleanBuilder, int8_t>(
1560 column_builder, values, is_valid);
1562 }
else if (column_builder.col_type.get_subtype() ==
kTINYINT) {
1563 appendToListColumnBuilder<arrow::Int8Builder, int8_t>(
1564 column_builder, values, is_valid);
1566 }
else if (column_builder.col_type.get_subtype() ==
kSMALLINT) {
1567 appendToListColumnBuilder<arrow::Int16Builder, int16_t>(
1568 column_builder, values, is_valid);
1570 }
else if (column_builder.col_type.get_subtype() ==
kINT) {
1571 appendToListColumnBuilder<arrow::Int32Builder, int32_t>(
1572 column_builder, values, is_valid);
1574 }
else if (column_builder.col_type.get_subtype() ==
kBIGINT) {
1575 appendToListColumnBuilder<arrow::Int64Builder, int64_t>(
1576 column_builder, values, is_valid);
1578 }
else if (column_builder.col_type.get_subtype() ==
kFLOAT) {
1579 appendToListColumnBuilder<arrow::FloatBuilder, float>(
1580 column_builder, values, is_valid);
1582 }
else if (column_builder.col_type.get_subtype() ==
kDOUBLE) {
1583 appendToListColumnBuilder<arrow::DoubleBuilder, double>(
1584 column_builder, values, is_valid);
1587 throw std::runtime_error(column_builder.col_type.get_type_name() +
1588 " is not supported in Arrow result sets.");
1593 appendToColumnBuilder<arrow::StringBuilder, std::string>(
1594 column_builder, values, is_valid);
1598 throw std::runtime_error(column_builder.col_type.get_type_name() +
1599 " is not supported in Arrow result sets.");
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
HOST DEVICE SQLTypes get_subtype() const
ArrowStringRemapMode string_remap_mode
std::vector< std::vector< T >> Vec2
const size_t min_result_size_for_bulk_dictionary_fetch_
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
#define ARROW_THROW_NOT_OK(s)
SQLTypes get_dict_index_type(const SQLTypeInfo &ti)
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
void appendToListColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
void convert_column(ResultSetPtr result, size_t col, size_t entry_count, std::shared_ptr< arrow::Array > &out)
ArrowResult getArrowResult() const
std::vector< char > sm_handle
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
ResultSetBuffer(const uint8_t *buf, size_t size, ResultSetPtr rs)
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
#define ARROW_LOG(category)
HOST DEVICE int get_scale() const
typename std::make_signed< TYPE >::type type
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
std::shared_ptr< arrow::Field > field
std::shared_ptr< ResultSet > ResultSetPtr
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
HOST DEVICE SQLTypes get_type() const
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::pair< key_t, void * > get_shm(size_t shmsz)
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
std::vector< std::string > col_names_
DEVICE auto copy(ARGS &&...args)
ExecutorDeviceType device_type_
boost::optional< std::vector< ScalarTargetValue >> ArrayTargetValue
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
static int32_t transientIndexToId(unsigned const index)
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
OUTPUT transform(INPUT const &input, FUNC const &func)
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< arrow::Decimal128 >, std::vector< float >, std::vector< double >, std::vector< std::vector< int8_t >>, std::vector< std::vector< int16_t >>, std::vector< std::vector< int32_t >>, std::vector< std::vector< int64_t >>, std::vector< std::vector< float >>, std::vector< std::vector< double >>, std::vector< std::string >> ValueArray
int get_precision() const
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
std::unordered_map< StrId, ArrowStrId > string_remapping
std::shared_ptr< ResultSet > results_
std::string get_type_name() const
typename null_type< TYPE >::type null_type_t
void create_or_append_validity(const ArrayTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
HOST DEVICE int get_comp_param() const
#define DEBUG_TIMER(name)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
bool is_dict_encoded_string() const
HOST DEVICE bool get_notnull() const
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
std::shared_ptr< arrow::RecordBatch > convertToArrow() const