17 #include "../Shared/DateConverters.h"
24 #include <sys/types.h>
38 #include "arrow/api.h"
39 #include "arrow/io/memory.h"
40 #include "arrow/ipc/api.h"
45 #include <arrow/gpu/cuda_api.h>
49 #define ARROW_RECORDBATCH_MAKE arrow::RecordBatch::Make
51 using namespace arrow;
91 template <
typename TYPE,
typename VALUE_ARRAY_TYPE>
93 std::shared_ptr<ValueArray>& values,
94 const size_t max_size) {
95 auto pval_cty = boost::get<VALUE_ARRAY_TYPE>(&val_cty);
97 auto val_ty =
static_cast<TYPE
>(*pval_cty);
99 values = std::make_shared<ValueArray>(std::vector<TYPE>());
100 boost::get<std::vector<TYPE>>(*values).reserve(max_size);
103 auto values_ty = boost::get<std::vector<TYPE>>(values.get());
105 values_ty->push_back(val_ty);
108 template <
typename TYPE>
111 std::shared_ptr<std::vector<bool>>& null_bitmap,
112 const size_t max_size) {
117 auto pvalue = boost::get<TYPE>(&value);
119 bool is_valid =
false;
126 }
else if (col_type.
is_fp()) {
133 null_bitmap = std::make_shared<std::vector<bool>>();
134 null_bitmap->reserve(max_size);
137 null_bitmap->push_back(is_valid);
140 template <
typename TYPE,
typename enable =
void>
143 template <
typename TYPE>
144 struct null_type<TYPE, std::enable_if_t<std::is_integral<TYPE>::value>> {
146 static constexpr
type value = inline_int_null_value<type>();
149 template <
typename TYPE>
150 struct null_type<TYPE, std::enable_if_t<std::is_floating_point<TYPE>::value>> {
152 static constexpr
type value = inline_fp_null_value<type>();
155 template <
typename TYPE>
158 template <typename C_TYPE, typename ARROW_TYPE = typename CTypeTraits<C_TYPE>::ArrowType>
161 std::unique_ptr<int8_t[]>& values,
162 std::unique_ptr<uint8_t[]>& is_valid,
164 std::shared_ptr<Array>& out) {
165 CHECK(
sizeof(C_TYPE) == result->getColType(col).get_size());
169 const int8_t* data_ptr;
170 if (result->isZeroCopyColumnarConversionPossible(col)) {
171 data_ptr = result->getColumnarBuffer(col);
173 values.reset(
new int8_t[entry_count *
sizeof(C_TYPE)]);
174 result->copyColumnIntoBuffer(col, values.get(), entry_count *
sizeof(C_TYPE));
175 data_ptr = values.get();
178 int64_t null_count = 0;
179 is_valid.reset(
new uint8_t[(entry_count + 7) / 8]);
185 size_t unroll_count = entry_count & 0xFFFFFFFFFFFFFFF8ULL;
186 for (
size_t i = 0; i < unroll_count; i += 8) {
187 uint8_t valid_byte = 0;
189 valid = vals[i + 0] != null_val;
190 valid_byte |= valid << 0;
191 null_count += !valid;
192 valid = vals[i + 1] != null_val;
193 valid_byte |= valid << 1;
194 null_count += !valid;
195 valid = vals[i + 2] != null_val;
196 valid_byte |= valid << 2;
197 null_count += !valid;
198 valid = vals[i + 3] != null_val;
199 valid_byte |= valid << 3;
200 null_count += !valid;
201 valid = vals[i + 4] != null_val;
202 valid_byte |= valid << 4;
203 null_count += !valid;
204 valid = vals[i + 5] != null_val;
205 valid_byte |= valid << 5;
206 null_count += !valid;
207 valid = vals[i + 6] != null_val;
208 valid_byte |= valid << 6;
209 null_count += !valid;
210 valid = vals[i + 7] != null_val;
211 valid_byte |= valid << 7;
212 null_count += !valid;
213 is_valid[i >> 3] = valid_byte;
215 if (unroll_count != entry_count) {
216 uint8_t valid_byte = 0;
217 for (
size_t i = unroll_count; i < entry_count; ++i) {
218 bool valid = vals[i] != null_val;
219 valid_byte |= valid << (i & 7);
220 null_count += !valid;
222 is_valid[unroll_count >> 3] = valid_byte;
231 std::shared_ptr<Buffer> data(
new Buffer(reinterpret_cast<const uint8_t*>(data_ptr),
232 entry_count *
sizeof(C_TYPE)));
234 std::shared_ptr<Buffer> null_bitmap(
235 new Buffer(is_valid.get(), (entry_count + 7) / 8));
236 out.reset(
new NumericArray<ARROW_TYPE>(entry_count, data, null_bitmap, null_count));
238 out.reset(
new NumericArray<ARROW_TYPE>(entry_count, data));
243 std::pair<key_t, void*>
get_shm(
size_t shmsz) {
245 return std::make_pair(IPC_PRIVATE,
nullptr);
253 auto key =
static_cast<key_t
>(rand());
257 while ((shmid = shmget(key, shmsz, IPC_CREAT | IPC_EXCL | 0666)) < 0) {
266 if (!(errno & (EEXIST | EACCES | EINVAL | ENOENT))) {
267 throw std::runtime_error(
"failed to create a shared memory");
269 key =
static_cast<key_t
>(rand());
272 auto ipc_ptr = shmat(shmid, NULL, 0);
273 if (reinterpret_cast<int64_t>(ipc_ptr) == -1) {
274 throw std::runtime_error(
"failed to attach a shared memory");
277 return std::make_pair(key, ipc_ptr);
283 throw std::runtime_error(
"Arrow IPC not yet supported on Windows.");
284 return std::make_pair(0,
nullptr);
286 auto [key, ipc_ptr] =
get_shm(size);
287 std::shared_ptr<Buffer> buffer(
new MutableBuffer(static_cast<uint8_t*>(ipc_ptr), size));
288 return std::make_pair<key_t, std::shared_ptr<Buffer>>(std::move(key),
299 throw std::runtime_error(
"Arrow IPC not yet supported on Windows.");
301 auto [key, ipc_ptr] =
get_shm(data->size());
305 memcpy(ipc_ptr, data->data(), data->size());
319 std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
323 const auto getWireResult =
324 [&](
const int64_t schema_size,
325 const int64_t dict_size,
326 const int64_t records_size,
327 const std::shared_ptr<Buffer>& serialized_schema,
328 const std::shared_ptr<Buffer>& serialized_dict) ->
ArrowResult {
329 auto timer =
DEBUG_TIMER(
"serialize batch to wire");
330 std::vector<char> schema_handle_data;
331 std::vector<char> record_handle_data;
332 const int64_t total_size = schema_size + records_size + dict_size;
333 record_handle_data.insert(record_handle_data.end(),
334 serialized_schema->data(),
335 serialized_schema->data() + schema_size);
337 record_handle_data.insert(record_handle_data.end(),
338 serialized_dict->data(),
339 serialized_dict->data() + dict_size);
341 record_handle_data.resize(total_size);
342 auto serialized_records =
343 arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
345 io::FixedSizeBufferWriter stream(
346 SliceMutableBuffer(serialized_records, schema_size + dict_size));
348 *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
350 return {std::vector<char>(0),
352 std::vector<char>(0),
353 serialized_records->size(),
358 const auto getShmResult =
359 [&](
const int64_t schema_size,
360 const int64_t dict_size,
361 const int64_t records_size,
362 const std::shared_ptr<Buffer>& serialized_schema,
363 const std::shared_ptr<Buffer>& serialized_dict) ->
ArrowResult {
364 auto timer =
DEBUG_TIMER(
"serialize batch to shared memory");
365 std::shared_ptr<Buffer> serialized_records;
366 std::vector<char> schema_handle_buffer;
367 std::vector<char> record_handle_buffer(
sizeof(key_t), 0);
368 key_t records_shm_key = IPC_PRIVATE;
369 const int64_t total_size = schema_size + records_size + dict_size;
371 std::tie(records_shm_key, serialized_records) =
get_shm_buffer(total_size);
373 memcpy(serialized_records->mutable_data(),
374 serialized_schema->data(),
375 (size_t)schema_size);
376 memcpy(serialized_records->mutable_data() + schema_size,
377 serialized_dict->data(),
380 io::FixedSizeBufferWriter stream(
381 SliceMutableBuffer(serialized_records, schema_size + dict_size));
383 *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
384 memcpy(&record_handle_buffer[0],
385 reinterpret_cast<const unsigned char*>(&records_shm_key),
388 return {schema_handle_buffer,
390 record_handle_buffer,
391 serialized_records->size(),
395 std::shared_ptr<Buffer> serialized_schema;
396 int64_t records_size = 0;
397 int64_t schema_size = 0;
398 ipc::DictionaryMemo memo;
399 auto options = ipc::IpcWriteOptions::Defaults();
400 auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
403 for (
auto& pair : memo.dictionaries()) {
404 ipc::IpcPayload payload;
405 int64_t dictionary_id = pair.first;
406 const auto& dictionary = pair.second;
409 GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
410 int32_t metadata_length = 0;
412 WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
414 auto serialized_dict = dict_stream->Finish().ValueOrDie();
415 auto dict_size = serialized_dict->size();
419 ipc::SerializeSchema(*record_batch->schema(),
nullptr, default_memory_pool()));
420 schema_size = serialized_schema->size();
424 switch (transport_method_) {
426 return getWireResult(
427 schema_size, dict_size, records_size, serialized_schema, serialized_dict);
430 schema_size, dict_size, records_size, serialized_schema, serialized_dict);
439 auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
441 auto out_stream = std::move(out_stream_result).ValueOrDie();
443 arrow::ipc::DictionaryMemo current_memo;
444 arrow::ipc::DictionaryMemo serialized_memo;
446 arrow::ipc::IpcPayload schema_payload;
448 arrow::ipc::IpcWriteOptions::Defaults(),
451 int32_t schema_payload_length = 0;
453 arrow::ipc::IpcWriteOptions::Defaults(),
455 &schema_payload_length));
460 std::shared_ptr<arrow::Schema> dummy_schema;
461 std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
462 for (
int i = 0; i < record_batch->schema()->num_fields(); i++) {
463 auto field = record_batch->schema()->field(i);
464 if (
field->type()->id() == arrow::Type::DICTIONARY) {
465 int64_t dict_id = -1;
468 std::shared_ptr<Array> dict;
473 auto dummy_field = std::make_shared<arrow::Field>(
"", dict->type());
474 dummy_schema = std::make_shared<arrow::Schema>(
475 std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
477 dict_batches.emplace_back(
478 arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
482 if (!dict_batches.empty()) {
484 dict_batches, ipc::IpcWriteOptions::Defaults(), out_stream.get()));
487 auto complete_ipc_stream = out_stream->Finish();
489 auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
492 std::vector<char> schema_record_key_buffer(
sizeof(key_t), 0);
493 memcpy(&schema_record_key_buffer[0],
494 reinterpret_cast<const unsigned char*>(&record_key),
497 arrow::cuda::CudaDeviceManager* manager;
499 std::shared_ptr<arrow::cuda::CudaContext> context;
502 std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
504 SerializeRecordBatch(*record_batch, context.get()));
506 std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
509 std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
511 cuda_handle->Serialize(arrow::default_memory_pool()));
513 std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
514 memcpy(&record_handle_buffer[0],
515 serialized_cuda_handle->data(),
516 serialized_cuda_handle->size());
518 return {schema_record_key_buffer,
519 serialized_records->size(),
520 record_handle_buffer,
521 serialized_cuda_handle->size(),
522 serialized_cuda_handle->ToString()};
525 return {std::vector<char>{}, 0, std::vector<char>{}, 0,
""};
531 arrow::ipc::DictionaryMemo* memo)
const {
533 std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
534 std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
537 arrow::ipc::SerializeSchema(
538 *arrow_copy->schema(), memo, arrow::default_memory_pool()));
542 if (arrow_copy->num_rows()) {
546 arrow::ipc::SerializeRecordBatch(
547 *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
551 return {serialized_schema, serialized_records};
556 const auto col_count = results_->colCount();
557 std::vector<std::shared_ptr<arrow::Field>> fields;
558 CHECK(col_names_.empty() || col_names_.size() == col_count);
559 for (
size_t i = 0; i < col_count; ++i) {
560 const auto ti = results_->getColType(i);
561 fields.push_back(makeField(col_names_.empty() ?
"" : col_names_[i], ti));
563 return getArrowBatch(arrow::schema(fields));
567 const std::shared_ptr<arrow::Schema>& schema)
const {
568 std::vector<std::shared_ptr<arrow::Array>> result_columns;
570 const size_t entry_count = top_n_ < 0
571 ? results_->entryCount()
572 : std::min(
size_t(top_n_), results_->entryCount());
576 const auto col_count = results_->colCount();
577 size_t row_count = 0;
579 result_columns.resize(col_count);
580 std::vector<ColumnBuilder> builders(col_count);
583 for (
size_t i = 0; i < col_count; ++i) {
584 initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
588 auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
589 std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
590 const std::vector<bool>& non_lazy_cols,
591 const size_t start_entry,
592 const size_t end_entry) ->
size_t {
593 CHECK_EQ(value_seg.size(), col_count);
594 CHECK_EQ(null_bitmap_seg.size(), col_count);
595 const auto entry_count = end_entry - start_entry;
596 size_t seg_row_count = 0;
597 for (
size_t i = start_entry; i < end_entry; ++i) {
598 auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
603 for (
size_t j = 0; j < col_count; ++j) {
604 if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
608 auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
611 const auto& column = builders[j];
612 switch (column.physical_type) {
614 create_or_append_value<bool, int64_t>(
615 *scalar_value, value_seg[j], entry_count);
616 create_or_append_validity<int64_t>(
617 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
620 create_or_append_value<int8_t, int64_t>(
621 *scalar_value, value_seg[j], entry_count);
622 create_or_append_validity<int64_t>(
623 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
626 create_or_append_value<int16_t, int64_t>(
627 *scalar_value, value_seg[j], entry_count);
628 create_or_append_validity<int64_t>(
629 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
632 create_or_append_value<int32_t, int64_t>(
633 *scalar_value, value_seg[j], entry_count);
634 create_or_append_validity<int64_t>(
635 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
638 create_or_append_value<int64_t, int64_t>(
639 *scalar_value, value_seg[j], entry_count);
640 create_or_append_validity<int64_t>(
641 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
644 create_or_append_value<float, float>(
645 *scalar_value, value_seg[j], entry_count);
646 create_or_append_validity<float>(
647 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
650 create_or_append_value<double, double>(
651 *scalar_value, value_seg[j], entry_count);
652 create_or_append_validity<double>(
653 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
656 create_or_append_value<int32_t, int64_t>(
657 *scalar_value, value_seg[j], entry_count);
658 create_or_append_validity<int64_t>(
659 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
663 ? create_or_append_value<int64_t, int64_t>(
664 *scalar_value, value_seg[j], entry_count)
665 : create_or_append_value<int32_t, int64_t>(
666 *scalar_value, value_seg[j], entry_count);
667 create_or_append_validity<int64_t>(
668 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
671 create_or_append_value<int64_t, int64_t>(
672 *scalar_value, value_seg[j], entry_count);
673 create_or_append_validity<int64_t>(
674 *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
678 throw std::runtime_error(column.col_type.get_type_name() +
679 " is not supported in Arrow result sets.");
683 return seg_row_count;
686 auto convert_columns = [&](std::vector<std::unique_ptr<int8_t[]>>& values,
687 std::vector<std::unique_ptr<uint8_t[]>>& is_valid,
688 std::vector<std::shared_ptr<arrow::Array>>&
result,
689 const std::vector<bool>& non_lazy_cols,
690 const size_t start_col,
691 const size_t end_col) {
692 for (
size_t col = start_col; col < end_col; ++col) {
693 if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
697 const auto& column = builders[col];
698 switch (column.physical_type) {
700 convert_column<int8_t>(
701 results_, col, values[col], is_valid[col], entry_count, result[col]);
704 convert_column<int16_t>(
705 results_, col, values[col], is_valid[col], entry_count, result[col]);
708 convert_column<int32_t>(
709 results_, col, values[col], is_valid[col], entry_count, result[col]);
712 convert_column<int64_t>(
713 results_, col, values[col], is_valid[col], entry_count, result[col]);
716 convert_column<float>(
717 results_, col, values[col], is_valid[col], entry_count, result[col]);
720 convert_column<double>(
721 results_, col, values[col], is_valid[col], entry_count, result[col]);
724 throw std::runtime_error(column.col_type.get_type_name() +
725 " is not supported in Arrow column converter.");
730 std::vector<std::shared_ptr<ValueArray>> column_values(col_count,
nullptr);
731 std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count,
nullptr);
732 const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
733 bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
734 results_->getQueryMemDesc().getQueryDescriptionType() ==
736 entry_count == results_->entryCount();
737 std::vector<bool> non_lazy_cols;
738 if (use_columnar_converter) {
740 std::vector<size_t> non_lazy_col_pos;
741 size_t non_lazy_col_count = 0;
742 const auto& lazy_fetch_info = results_->getLazyFetchInfo();
744 non_lazy_cols.reserve(col_count);
745 non_lazy_col_pos.reserve(col_count);
746 for (
size_t i = 0; i < col_count; ++i) {
748 lazy_fetch_info.empty() ?
false : lazy_fetch_info[i].is_lazily_fetched;
751 switch (builders[i].physical_type) {
761 if (builders[i].
field->type()->id() == Type::DICTIONARY) {
764 non_lazy_cols.emplace_back(!is_lazy);
766 ++non_lazy_col_count;
767 non_lazy_col_pos.emplace_back(i);
771 if (non_lazy_col_count == col_count) {
772 non_lazy_cols.clear();
773 non_lazy_col_pos.clear();
775 non_lazy_col_pos.emplace_back(col_count);
778 values_.resize(col_count);
779 is_valid_.resize(col_count);
780 std::vector<std::future<void>> child_threads;
782 std::min(multithreaded ? (
size_t)
cpu_threads() : (
size_t)1, non_lazy_col_count);
784 size_t start_col = 0;
786 for (
size_t i = 0; i < num_threads; ++i) {
788 end_col = (i + 1) * non_lazy_col_count / num_threads;
789 size_t phys_start_col =
790 non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
791 size_t phys_end_col =
792 non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
793 child_threads.push_back(std::async(std::launch::async,
797 std::ref(result_columns),
802 for (
auto& child : child_threads) {
805 row_count = entry_count;
807 if (!use_columnar_converter || !non_lazy_cols.empty()) {
812 std::vector<std::future<size_t>> child_threads;
813 std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
814 cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count,
nullptr));
815 std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
816 cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count,
nullptr));
817 const auto stride = (entry_count + cpu_count - 1) / cpu_count;
818 for (
size_t i = 0, start_entry = 0; start_entry < entry_count;
819 ++i, start_entry += stride) {
820 const auto end_entry = std::min(entry_count, start_entry + stride);
821 child_threads.push_back(std::async(std::launch::async,
823 std::ref(column_value_segs[i]),
824 std::ref(null_bitmap_segs[i]),
829 for (
auto& child : child_threads) {
830 row_count += child.get();
834 for (
int i = 0; i < schema->num_fields(); ++i) {
835 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
839 for (
size_t j = 0; j < cpu_count; ++j) {
840 if (!column_value_segs[j][i]) {
843 append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
849 fetch(column_values, null_bitmaps, non_lazy_cols,
size_t(0), entry_count);
851 auto timer =
DEBUG_TIMER(
"append rows to arrow single thread");
852 for (
int i = 0; i < schema->num_fields(); ++i) {
853 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
857 append(builders[i], *column_values[i], null_bitmaps[i]);
864 for (
size_t i = 0; i < col_count; ++i) {
865 if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
869 result_columns[i] = finishColumnBuilder(builders[i]);
883 return arrow::boolean();
885 return arrow::int8();
887 return arrow::int16();
889 return arrow::int32();
891 return arrow::int64();
893 return arrow::float32();
895 return arrow::float64();
900 auto value_type = std::make_shared<StringType>();
901 return dictionary(int32(), value_type,
false);
908 return time32(TimeUnit::SECOND);
917 return timestamp(TimeUnit::SECOND);
919 return timestamp(TimeUnit::MILLI);
921 return timestamp(TimeUnit::MICRO);
923 return timestamp(TimeUnit::NANO);
925 throw std::runtime_error(
926 "Unsupported timestamp precision for Arrow result sets: " +
934 " is not supported in Arrow result sets.");
942 const std::string
name,
951 const size_t device_id,
952 std::shared_ptr<Data_Namespace::DataMgr>& data_mgr) {
958 const key_t& schema_key = *(key_t*)(&result.
sm_handle[0]);
959 auto shm_id = shmget(schema_key, result.
sm_size, 0666);
961 throw std::runtime_error(
962 "failed to get an valid shm ID w/ given shm key of the schema");
964 if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
965 throw std::runtime_error(
"failed to deallocate Arrow schema on errorno(" +
972 const key_t& df_key = *(key_t*)(&result.
df_handle[0]);
973 auto shm_id = shmget(df_key, result.
df_size, 0666);
975 throw std::runtime_error(
976 "failed to get an valid shm ID w/ given shm key of the data");
978 if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
979 throw std::runtime_error(
"failed to deallocate Arrow data frame");
991 const std::shared_ptr<arrow::Field>&
field)
const {
998 auto value_type = field->type();
1000 column_builder.
builder.reset(
new StringDictionary32Builder());
1003 auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1005 arrow::StringBuilder str_array_builder;
1007 std::shared_ptr<StringArray> string_array;
1011 dynamic_cast<arrow::StringDictionary32Builder*
>(column_builder.
builder.get());
1012 CHECK(dict_builder);
1017 arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.
builder));
1023 std::shared_ptr<Array> values;
1030 template <
typename BUILDER_TYPE,
typename VALUE_ARRAY_TYPE>
1033 const std::shared_ptr<std::vector<bool>>& is_valid) {
1034 static_assert(!std::is_same<BUILDER_TYPE, arrow::StringDictionary32Builder>::value,
1035 "Dictionary encoded string builder requires function specialization.");
1037 std::vector<VALUE_ARRAY_TYPE> vals = boost::get<std::vector<VALUE_ARRAY_TYPE>>(values);
1039 if (scale_epoch_values<BUILDER_TYPE>()) {
1040 auto scale_sec_to_millisec = [](
auto seconds) {
return seconds *
kMilliSecsPerSec; };
1041 auto scale_values = [&](
auto epoch) {
1042 return std::is_same<BUILDER_TYPE, Date32Builder>::value
1044 : scale_sec_to_millisec(epoch);
1046 std::transform(vals.begin(), vals.end(), vals.begin(), scale_values);
1049 auto typed_builder =
dynamic_cast<BUILDER_TYPE*
>(column_builder.
builder.get());
1050 CHECK(typed_builder);
1051 if (column_builder.
field->nullable()) {
1052 CHECK(is_valid.get());
1060 void appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1063 const std::shared_ptr<std::vector<bool>>& is_valid) {
1064 auto typed_builder =
1065 dynamic_cast<arrow::StringDictionary32Builder*
>(column_builder.builder.get());
1066 CHECK(typed_builder);
1068 std::vector<int32_t> vals = boost::get<std::vector<int32_t>>(values);
1070 if (column_builder.field->nullable()) {
1071 CHECK(is_valid.get());
1073 std::vector<uint8_t> transformed_bitmap;
1074 transformed_bitmap.reserve(is_valid->size());
1076 is_valid->begin(), is_valid->end(), [&transformed_bitmap](
const bool is_valid) {
1077 transformed_bitmap.push_back(is_valid ? 1 : 0);
1081 vals.data(),
static_cast<int64_t
>(vals.size()), transformed_bitmap.data()));
1084 typed_builder->AppendIndices(vals.data(),
static_cast<int64_t
>(vals.size())));
1093 const std::shared_ptr<std::vector<bool>>& is_valid)
const {
1097 appendToColumnBuilder<StringDictionary32Builder, int32_t>(
1098 column_builder, values, is_valid);
1103 appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
1106 appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
1109 appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
1112 appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
1115 appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
1118 appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
1121 appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
1124 appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
1127 appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
1131 ? appendToColumnBuilder<Date64Builder, int64_t>(
1132 column_builder, values, is_valid)
1133 : appendToColumnBuilder<Date32Builder, int32_t>(
1134 column_builder, values, is_valid);
1141 throw std::runtime_error(column_builder.col_type.get_type_name() +
1142 " is not supported in Arrow result sets.");
void create_or_append_value(const ScalarTargetValue &val_cty, std::shared_ptr< ValueArray > &values, const size_t max_size)
std::unique_ptr< arrow::ArrayBuilder > builder
HOST DEVICE int get_size() const
#define ARROW_THROW_NOT_OK(s)
SQLTypes get_dict_index_type(const SQLTypeInfo &ti)
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
ArrowResult getArrowResult() const
std::vector< char > sm_handle
void appendToColumnBuilder(ArrowResultSetConverter::ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid)
void convert_column(ResultSetPtr result, size_t col, std::unique_ptr< int8_t[]> &values, std::unique_ptr< uint8_t[]> &is_valid, size_t entry_count, std::shared_ptr< Array > &out)
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
HOST DEVICE int get_scale() const
typename std::make_signed< TYPE >::type type
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
std::shared_ptr< arrow::Field > field
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
std::shared_ptr< ResultSet > ResultSetPtr
HOST DEVICE SQLTypes get_type() const
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
std::pair< key_t, void * > get_shm(size_t shmsz)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
#define ARROW_RECORDBATCH_MAKE
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
size_t append(FILE *f, const size_t size, int8_t *buf)
Appends the specified number of bytes to the end of the file f from buf.
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryMemo *memo) const
int get_precision() const
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
std::pair< key_t, std::shared_ptr< Buffer > > get_shm_buffer(size_t size)
std::string get_type_name() const
typename null_type< TYPE >::type null_type_t
HOST DEVICE int get_comp_param() const
#define DEBUG_TIMER(name)
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
int64_t get_epoch_days_from_seconds(const int64_t seconds)
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
bool is_dict_encoded_string() const
parquet::Type::type get_physical_type(std::unique_ptr< parquet::arrow::FileReader > &reader, const int logical_column_index)
HOST DEVICE bool get_notnull() const
void create_or_append_validity(const ScalarTargetValue &value, const SQLTypeInfo &col_type, std::shared_ptr< std::vector< bool >> &null_bitmap, const size_t max_size)
boost::variant< int64_t, double, float, NullableString > ScalarTargetValue
std::shared_ptr< arrow::RecordBatch > convertToArrow() const