OmniSciDB  baf940c279
ArrowResultSetConverter Class Reference

#include <ArrowResultSet.h>

Classes

struct  ColumnBuilder
 
struct  SerializedArrowOutput
 

Public Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method)
 
ArrowResult getArrowResult () const
 

Private Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
 
std::shared_ptr< arrow::RecordBatch > convertToArrow () const
 
std::shared_ptr< arrow::RecordBatch > getArrowBatch (const std::shared_ptr< arrow::Schema > &schema) const
 
std::shared_ptr< arrow::Field > makeField (const std::string name, const SQLTypeInfo &target_type) const
 
SerializedArrowOutput getSerializedArrowOutput (arrow::ipc::DictionaryMemo *memo) const
 
void initializeColumnBuilder (ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
 
void append (ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
 
std::shared_ptr< arrow::Array > finishColumnBuilder (ColumnBuilder &column_builder) const
 

Private Attributes

std::shared_ptr< ResultSetresults_
 
std::shared_ptr< Data_Namespace::DataMgrdata_mgr_ = nullptr
 
ExecutorDeviceType device_type_ = ExecutorDeviceType::GPU
 
int32_t device_id_ = 0
 
std::vector< std::string > col_names_
 
int32_t top_n_
 
ArrowTransport transport_method_
 
std::vector< std::unique_ptr< int8_t[]> > values_
 
std::vector< std::unique_ptr< uint8_t[]> > is_valid_
 

Friends

class ArrowResultSet
 

Detailed Description

Definition at line 175 of file ArrowResultSet.h.

Constructor & Destructor Documentation

◆ ArrowResultSetConverter() [1/2]

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::shared_ptr< Data_Namespace::DataMgr data_mgr,
const ExecutorDeviceType  device_type,
const int32_t  device_id,
const std::vector< std::string > &  col_names,
const int32_t  first_n,
const ArrowTransport  transport_method 
)
inline

Definition at line 177 of file ArrowResultSet.h.

184  : results_(results)
185  , data_mgr_(data_mgr)
186  , device_type_(device_type)
187  , device_id_(device_id)
188  , col_names_(col_names)
189  , top_n_(first_n)
190  , transport_method_(transport_method) {}
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
ArrowTransport transport_method_
std::vector< std::string > col_names_
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > results_

◆ ArrowResultSetConverter() [2/2]

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::vector< std::string > &  col_names,
const int32_t  first_n 
)
inlineprivate

Definition at line 204 of file ArrowResultSet.h.

References setup::name.

207  : results_(results), col_names_(col_names), top_n_(first_n) {}
std::vector< std::string > col_names_
std::shared_ptr< ResultSet > results_

Member Function Documentation

◆ append()

void ArrowResultSetConverter::append ( ColumnBuilder column_builder,
const ValueArray values,
const std::shared_ptr< std::vector< bool >> &  is_valid 
) const
private

Definition at line 1090 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, ArrowResultSetConverter::ColumnBuilder::col_type, GPU, SQLTypeInfo::is_dict_encoded_string(), kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and ArrowResultSetConverter::ColumnBuilder::physical_type.

1093  {
1094  if (column_builder.col_type.is_dict_encoded_string()) {
1095  CHECK_EQ(column_builder.physical_type,
1096  kINT); // assume all dicts use none-encoded type for now
1097  appendToColumnBuilder<StringDictionary32Builder, int32_t>(
1098  column_builder, values, is_valid);
1099  return;
1100  }
1101  switch (column_builder.physical_type) {
1102  case kBOOLEAN:
1103  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
1104  break;
1105  case kTINYINT:
1106  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
1107  break;
1108  case kSMALLINT:
1109  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
1110  break;
1111  case kINT:
1112  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
1113  break;
1114  case kBIGINT:
1115  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
1116  break;
1117  case kFLOAT:
1118  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
1119  break;
1120  case kDOUBLE:
1121  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
1122  break;
1123  case kTIME:
1124  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
1125  break;
1126  case kTIMESTAMP:
1127  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
1128  break;
1129  case kDATE:
1131  ? appendToColumnBuilder<Date64Builder, int64_t>(
1132  column_builder, values, is_valid)
1133  : appendToColumnBuilder<Date32Builder, int32_t>(
1134  column_builder, values, is_valid);
1135  break;
1136  case kCHAR:
1137  case kVARCHAR:
1138  case kTEXT:
1139  default:
1140  // TODO(miyu): support more scalar types.
1141  throw std::runtime_error(column_builder.col_type.get_type_name() +
1142  " is not supported in Arrow result sets.");
1143  }
1144 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:51
ExecutorDeviceType device_type_
Definition: sqltypes.h:54
Definition: sqltypes.h:55
Definition: sqltypes.h:43
Definition: sqltypes.h:47
+ Here is the call graph for this function:

◆ convertToArrow()

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::convertToArrow ( ) const
private

Definition at line 554 of file ArrowResultSetConverter.cpp.

References CHECK, and DEBUG_TIMER.

554  {
555  auto timer = DEBUG_TIMER(__func__);
556  const auto col_count = results_->colCount();
557  std::vector<std::shared_ptr<arrow::Field>> fields;
558  CHECK(col_names_.empty() || col_names_.size() == col_count);
559  for (size_t i = 0; i < col_count; ++i) {
560  const auto ti = results_->getColType(i);
561  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
562  }
563  return getArrowBatch(arrow::schema(fields));
564 }
std::vector< std::string > col_names_
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const

◆ finishColumnBuilder()

std::shared_ptr< arrow::Array > ArrowResultSetConverter::finishColumnBuilder ( ColumnBuilder column_builder) const
inlineprivate

Definition at line 1021 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, and ArrowResultSetConverter::ColumnBuilder::builder.

1022  {
1023  std::shared_ptr<Array> values;
1024  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1025  return values;
1026 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36

◆ getArrowBatch()

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::getArrowBatch ( const std::shared_ptr< arrow::Schema > &  schema) const
private

Definition at line 566 of file ArrowResultSetConverter.cpp.

References File_Namespace::append(), ARROW_RECORDBATCH_MAKE, CHECK, CHECK_EQ, cpu_threads(), DEBUG_TIMER, field(), GPU, kBIGINT, kBOOLEAN, kDATE, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTIME, kTIMESTAMP, kTINYINT, Projection, and run_benchmark_import::result.

567  {
568  std::vector<std::shared_ptr<arrow::Array>> result_columns;
569 
570  const size_t entry_count = top_n_ < 0
571  ? results_->entryCount()
572  : std::min(size_t(top_n_), results_->entryCount());
573  if (!entry_count) {
574  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
575  }
576  const auto col_count = results_->colCount();
577  size_t row_count = 0;
578 
579  result_columns.resize(col_count);
580  std::vector<ColumnBuilder> builders(col_count);
581 
582  // Create array builders
583  for (size_t i = 0; i < col_count; ++i) {
584  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
585  }
586 
587  // TODO(miyu): speed up for columnar buffers
588  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
589  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
590  const std::vector<bool>& non_lazy_cols,
591  const size_t start_entry,
592  const size_t end_entry) -> size_t {
593  CHECK_EQ(value_seg.size(), col_count);
594  CHECK_EQ(null_bitmap_seg.size(), col_count);
595  const auto entry_count = end_entry - start_entry;
596  size_t seg_row_count = 0;
597  for (size_t i = start_entry; i < end_entry; ++i) {
598  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
599  if (row.empty()) {
600  continue;
601  }
602  ++seg_row_count;
603  for (size_t j = 0; j < col_count; ++j) {
604  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
605  continue;
606  }
607 
608  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
609  // TODO(miyu): support more types other than scalar.
610  CHECK(scalar_value);
611  const auto& column = builders[j];
612  switch (column.physical_type) {
613  case kBOOLEAN:
614  create_or_append_value<bool, int64_t>(
615  *scalar_value, value_seg[j], entry_count);
616  create_or_append_validity<int64_t>(
617  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
618  break;
619  case kTINYINT:
620  create_or_append_value<int8_t, int64_t>(
621  *scalar_value, value_seg[j], entry_count);
622  create_or_append_validity<int64_t>(
623  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
624  break;
625  case kSMALLINT:
626  create_or_append_value<int16_t, int64_t>(
627  *scalar_value, value_seg[j], entry_count);
628  create_or_append_validity<int64_t>(
629  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
630  break;
631  case kINT:
632  create_or_append_value<int32_t, int64_t>(
633  *scalar_value, value_seg[j], entry_count);
634  create_or_append_validity<int64_t>(
635  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
636  break;
637  case kBIGINT:
638  create_or_append_value<int64_t, int64_t>(
639  *scalar_value, value_seg[j], entry_count);
640  create_or_append_validity<int64_t>(
641  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
642  break;
643  case kFLOAT:
644  create_or_append_value<float, float>(
645  *scalar_value, value_seg[j], entry_count);
646  create_or_append_validity<float>(
647  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
648  break;
649  case kDOUBLE:
650  create_or_append_value<double, double>(
651  *scalar_value, value_seg[j], entry_count);
652  create_or_append_validity<double>(
653  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
654  break;
655  case kTIME:
656  create_or_append_value<int32_t, int64_t>(
657  *scalar_value, value_seg[j], entry_count);
658  create_or_append_validity<int64_t>(
659  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
660  break;
661  case kDATE:
663  ? create_or_append_value<int64_t, int64_t>(
664  *scalar_value, value_seg[j], entry_count)
665  : create_or_append_value<int32_t, int64_t>(
666  *scalar_value, value_seg[j], entry_count);
667  create_or_append_validity<int64_t>(
668  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
669  break;
670  case kTIMESTAMP:
671  create_or_append_value<int64_t, int64_t>(
672  *scalar_value, value_seg[j], entry_count);
673  create_or_append_validity<int64_t>(
674  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
675  break;
676  default:
677  // TODO(miyu): support more scalar types.
678  throw std::runtime_error(column.col_type.get_type_name() +
679  " is not supported in Arrow result sets.");
680  }
681  }
682  }
683  return seg_row_count;
684  };
685 
686  auto convert_columns = [&](std::vector<std::unique_ptr<int8_t[]>>& values,
687  std::vector<std::unique_ptr<uint8_t[]>>& is_valid,
688  std::vector<std::shared_ptr<arrow::Array>>& result,
689  const std::vector<bool>& non_lazy_cols,
690  const size_t start_col,
691  const size_t end_col) {
692  for (size_t col = start_col; col < end_col; ++col) {
693  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
694  continue;
695  }
696 
697  const auto& column = builders[col];
698  switch (column.physical_type) {
699  case kTINYINT:
700  convert_column<int8_t>(
701  results_, col, values[col], is_valid[col], entry_count, result[col]);
702  break;
703  case kSMALLINT:
704  convert_column<int16_t>(
705  results_, col, values[col], is_valid[col], entry_count, result[col]);
706  break;
707  case kINT:
708  convert_column<int32_t>(
709  results_, col, values[col], is_valid[col], entry_count, result[col]);
710  break;
711  case kBIGINT:
712  convert_column<int64_t>(
713  results_, col, values[col], is_valid[col], entry_count, result[col]);
714  break;
715  case kFLOAT:
716  convert_column<float>(
717  results_, col, values[col], is_valid[col], entry_count, result[col]);
718  break;
719  case kDOUBLE:
720  convert_column<double>(
721  results_, col, values[col], is_valid[col], entry_count, result[col]);
722  break;
723  default:
724  throw std::runtime_error(column.col_type.get_type_name() +
725  " is not supported in Arrow column converter.");
726  }
727  }
728  };
729 
730  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
731  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
732  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
733  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
734  results_->getQueryMemDesc().getQueryDescriptionType() ==
736  entry_count == results_->entryCount();
737  std::vector<bool> non_lazy_cols;
738  if (use_columnar_converter) {
739  auto timer = DEBUG_TIMER("columnar converter");
740  std::vector<size_t> non_lazy_col_pos;
741  size_t non_lazy_col_count = 0;
742  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
743 
744  non_lazy_cols.reserve(col_count);
745  non_lazy_col_pos.reserve(col_count);
746  for (size_t i = 0; i < col_count; ++i) {
747  bool is_lazy =
748  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
749  // Currently column converter cannot handle some data types.
750  // Treat them as lazy.
751  switch (builders[i].physical_type) {
752  case kBOOLEAN:
753  case kTIME:
754  case kDATE:
755  case kTIMESTAMP:
756  is_lazy = true;
757  break;
758  default:
759  break;
760  }
761  if (builders[i].field->type()->id() == Type::DICTIONARY) {
762  is_lazy = true;
763  }
764  non_lazy_cols.emplace_back(!is_lazy);
765  if (!is_lazy) {
766  ++non_lazy_col_count;
767  non_lazy_col_pos.emplace_back(i);
768  }
769  }
770 
771  if (non_lazy_col_count == col_count) {
772  non_lazy_cols.clear();
773  non_lazy_col_pos.clear();
774  } else {
775  non_lazy_col_pos.emplace_back(col_count);
776  }
777 
778  values_.resize(col_count);
779  is_valid_.resize(col_count);
780  std::vector<std::future<void>> child_threads;
781  size_t num_threads =
782  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
783 
784  size_t start_col = 0;
785  size_t end_col = 0;
786  for (size_t i = 0; i < num_threads; ++i) {
787  start_col = end_col;
788  end_col = (i + 1) * non_lazy_col_count / num_threads;
789  size_t phys_start_col =
790  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
791  size_t phys_end_col =
792  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
793  child_threads.push_back(std::async(std::launch::async,
794  convert_columns,
795  std::ref(values_),
796  std::ref(is_valid_),
797  std::ref(result_columns),
798  non_lazy_cols,
799  phys_start_col,
800  phys_end_col));
801  }
802  for (auto& child : child_threads) {
803  child.get();
804  }
805  row_count = entry_count;
806  }
807  if (!use_columnar_converter || !non_lazy_cols.empty()) {
808  auto timer = DEBUG_TIMER("row converter");
809  row_count = 0;
810  if (multithreaded) {
811  const size_t cpu_count = cpu_threads();
812  std::vector<std::future<size_t>> child_threads;
813  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
814  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
815  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
816  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
817  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
818  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
819  ++i, start_entry += stride) {
820  const auto end_entry = std::min(entry_count, start_entry + stride);
821  child_threads.push_back(std::async(std::launch::async,
822  fetch,
823  std::ref(column_value_segs[i]),
824  std::ref(null_bitmap_segs[i]),
825  non_lazy_cols,
826  start_entry,
827  end_entry));
828  }
829  for (auto& child : child_threads) {
830  row_count += child.get();
831  }
832  {
833  auto timer = DEBUG_TIMER("append rows to arrow");
834  for (int i = 0; i < schema->num_fields(); ++i) {
835  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
836  continue;
837  }
838 
839  for (size_t j = 0; j < cpu_count; ++j) {
840  if (!column_value_segs[j][i]) {
841  continue;
842  }
843  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
844  }
845  }
846  }
847  } else {
848  row_count =
849  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
850  {
851  auto timer = DEBUG_TIMER("append rows to arrow single thread");
852  for (int i = 0; i < schema->num_fields(); ++i) {
853  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
854  continue;
855  }
856 
857  append(builders[i], *column_values[i], null_bitmaps[i]);
858  }
859  }
860  }
861 
862  {
863  auto timer = DEBUG_TIMER("finish builders");
864  for (size_t i = 0; i < col_count; ++i) {
865  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
866  continue;
867  }
868 
869  result_columns[i] = finishColumnBuilder(builders[i]);
870  }
871  }
872  }
873 
874  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
875 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:51
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
#define ARROW_RECORDBATCH_MAKE
std::vector< std::unique_ptr< int8_t[]> > values_
ExecutorDeviceType device_type_
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
Definition: sqltypes.h:55
std::shared_ptr< ResultSet > results_
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::vector< std::unique_ptr< uint8_t[]> > is_valid_
Definition: sqltypes.h:47
int cpu_threads()
Definition: thread_count.h:24
+ Here is the call graph for this function:

◆ getArrowResult()

ArrowResult ArrowResultSetConverter::getArrowResult ( ) const

Serialize an Arrow result to IPC memory. Users are responsible for freeing all CPU IPC buffers using deallocateArrowResultBuffer. GPU buffers will become owned by the caller upon deserialization, and will be automatically freed when they go out of scope.

Definition at line 317 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, CHECK, CHECK_GE, CPU, DEBUG_TIMER, field(), arrow::get_and_copy_to_shm(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_shm_buffer(), GPU, SHARED_MEMORY, UNREACHABLE, and WIRE.

317  {
318  auto timer = DEBUG_TIMER(__func__);
319  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
320 
323  const auto getWireResult =
324  [&](const int64_t schema_size,
325  const int64_t dict_size,
326  const int64_t records_size,
327  const std::shared_ptr<Buffer>& serialized_schema,
328  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
329  auto timer = DEBUG_TIMER("serialize batch to wire");
330  std::vector<char> schema_handle_data;
331  std::vector<char> record_handle_data;
332  const int64_t total_size = schema_size + records_size + dict_size;
333  record_handle_data.insert(record_handle_data.end(),
334  serialized_schema->data(),
335  serialized_schema->data() + schema_size);
336 
337  record_handle_data.insert(record_handle_data.end(),
338  serialized_dict->data(),
339  serialized_dict->data() + dict_size);
340 
341  record_handle_data.resize(total_size);
342  auto serialized_records =
343  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
344 
345  io::FixedSizeBufferWriter stream(
346  SliceMutableBuffer(serialized_records, schema_size + dict_size));
347  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
348  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
349 
350  return {std::vector<char>(0),
351  0,
352  std::vector<char>(0),
353  serialized_records->size(),
354  std::string{""},
355  record_handle_data};
356  };
357 
358  const auto getShmResult =
359  [&](const int64_t schema_size,
360  const int64_t dict_size,
361  const int64_t records_size,
362  const std::shared_ptr<Buffer>& serialized_schema,
363  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
364  auto timer = DEBUG_TIMER("serialize batch to shared memory");
365  std::shared_ptr<Buffer> serialized_records;
366  std::vector<char> schema_handle_buffer;
367  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
368  key_t records_shm_key = IPC_PRIVATE;
369  const int64_t total_size = schema_size + records_size + dict_size;
370 
371  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
372 
373  memcpy(serialized_records->mutable_data(),
374  serialized_schema->data(),
375  (size_t)schema_size);
376  memcpy(serialized_records->mutable_data() + schema_size,
377  serialized_dict->data(),
378  (size_t)dict_size);
379 
380  io::FixedSizeBufferWriter stream(
381  SliceMutableBuffer(serialized_records, schema_size + dict_size));
382  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
383  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
384  memcpy(&record_handle_buffer[0],
385  reinterpret_cast<const unsigned char*>(&records_shm_key),
386  sizeof(key_t));
387 
388  return {schema_handle_buffer,
389  0,
390  record_handle_buffer,
391  serialized_records->size(),
392  std::string{""}};
393  };
394 
395  std::shared_ptr<Buffer> serialized_schema;
396  int64_t records_size = 0;
397  int64_t schema_size = 0;
398  ipc::DictionaryMemo memo;
399  auto options = ipc::IpcWriteOptions::Defaults();
400  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
401 
402  ARROW_THROW_NOT_OK(CollectDictionaries(*record_batch, &memo));
403  for (auto& pair : memo.dictionaries()) {
404  ipc::IpcPayload payload;
405  int64_t dictionary_id = pair.first;
406  const auto& dictionary = pair.second;
407 
409  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
410  int32_t metadata_length = 0;
412  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
413  }
414  auto serialized_dict = dict_stream->Finish().ValueOrDie();
415  auto dict_size = serialized_dict->size();
416 
418  serialized_schema,
419  ipc::SerializeSchema(*record_batch->schema(), nullptr, default_memory_pool()));
420  schema_size = serialized_schema->size();
421 
422  ARROW_THROW_NOT_OK(ipc::GetRecordBatchSize(*record_batch, &records_size));
423 
424  switch (transport_method_) {
426  return getWireResult(
427  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
429  return getShmResult(
430  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
431  default:
432  UNREACHABLE();
433  }
434  }
435 #ifdef HAVE_CUDA
437 
438  // Copy the schema to the schema handle
439  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
440  ARROW_THROW_NOT_OK(out_stream_result.status());
441  auto out_stream = std::move(out_stream_result).ValueOrDie();
442 
443  arrow::ipc::DictionaryMemo current_memo;
444  arrow::ipc::DictionaryMemo serialized_memo;
445 
446  arrow::ipc::IpcPayload schema_payload;
447  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
448  arrow::ipc::IpcWriteOptions::Defaults(),
449  &serialized_memo,
450  &schema_payload));
451  int32_t schema_payload_length = 0;
452  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
453  arrow::ipc::IpcWriteOptions::Defaults(),
454  out_stream.get(),
455  &schema_payload_length));
456 
457  ARROW_THROW_NOT_OK(CollectDictionaries(*record_batch, &current_memo));
458 
459  // now try a dictionary
460  std::shared_ptr<arrow::Schema> dummy_schema;
461  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
462  for (int i = 0; i < record_batch->schema()->num_fields(); i++) {
463  auto field = record_batch->schema()->field(i);
464  if (field->type()->id() == arrow::Type::DICTIONARY) {
465  int64_t dict_id = -1;
466  ARROW_THROW_NOT_OK(current_memo.GetId(field.get(), &dict_id));
467  CHECK_GE(dict_id, 0);
468  std::shared_ptr<Array> dict;
469  ARROW_THROW_NOT_OK(current_memo.GetDictionary(dict_id, &dict));
470  CHECK(dict);
471 
472  if (!dummy_schema) {
473  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
474  dummy_schema = std::make_shared<arrow::Schema>(
475  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
476  }
477  dict_batches.emplace_back(
478  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
479  }
480  }
481 
482  if (!dict_batches.empty()) {
483  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
484  dict_batches, ipc::IpcWriteOptions::Defaults(), out_stream.get()));
485  }
486 
487  auto complete_ipc_stream = out_stream->Finish();
488  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
489  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
490 
491  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
492  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
493  memcpy(&schema_record_key_buffer[0],
494  reinterpret_cast<const unsigned char*>(&record_key),
495  sizeof(key_t));
496 
497  arrow::cuda::CudaDeviceManager* manager;
498  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
499  std::shared_ptr<arrow::cuda::CudaContext> context;
500  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
501 
502  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
503  ARROW_ASSIGN_OR_THROW(device_serialized,
504  SerializeRecordBatch(*record_batch, context.get()));
505 
506  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
507  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
508 
509  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
510  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
511  cuda_handle->Serialize(arrow::default_memory_pool()));
512 
513  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
514  memcpy(&record_handle_buffer[0],
515  serialized_cuda_handle->data(),
516  serialized_cuda_handle->size());
517 
518  return {schema_record_key_buffer,
519  serialized_records->size(),
520  record_handle_buffer,
521  serialized_cuda_handle->size(),
522  serialized_cuda_handle->ToString()};
523 #else
524  UNREACHABLE();
525  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
526 #endif
527 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
std::shared_ptr< arrow::RecordBatch > convertToArrow() const
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
ExecutorDeviceType device_type_
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
std::pair< key_t, std::shared_ptr< Buffer > > get_shm_buffer(size_t size)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
+ Here is the call graph for this function:

◆ getSerializedArrowOutput()

ArrowResultSetConverter::SerializedArrowOutput ArrowResultSetConverter::getSerializedArrowOutput ( arrow::ipc::DictionaryMemo *  memo) const
private

Definition at line 530 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, and DEBUG_TIMER.

531  {
532  auto timer = DEBUG_TIMER(__func__);
533  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
534  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
535 
536  ARROW_ASSIGN_OR_THROW(serialized_schema,
537  arrow::ipc::SerializeSchema(
538  *arrow_copy->schema(), memo, arrow::default_memory_pool()));
539 
540  ARROW_THROW_NOT_OK(CollectDictionaries(*arrow_copy, memo));
541 
542  if (arrow_copy->num_rows()) {
543  auto timer = DEBUG_TIMER("serialize records");
544  ARROW_THROW_NOT_OK(arrow_copy->Validate());
545  ARROW_ASSIGN_OR_THROW(serialized_records,
546  arrow::ipc::SerializeRecordBatch(
547  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
548  } else {
549  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
550  }
551  return {serialized_schema, serialized_records};
552 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
std::shared_ptr< arrow::RecordBatch > convertToArrow() const
#define DEBUG_TIMER(name)
Definition: Logger.h:313

◆ initializeColumnBuilder()

void ArrowResultSetConverter::initializeColumnBuilder ( ColumnBuilder column_builder,
const SQLTypeInfo col_type,
const std::shared_ptr< arrow::Field > &  field 
) const
private

Definition at line 988 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, ArrowResultSetConverter::ColumnBuilder::builder, CHECK, ArrowResultSetConverter::ColumnBuilder::col_type, field(), ArrowResultSetConverter::ColumnBuilder::field, SQLTypeInfo::get_comp_param(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_dict_index_type(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_physical_type(), SQLTypeInfo::is_dict_encoded_string(), and ArrowResultSetConverter::ColumnBuilder::physical_type.

991  {
992  column_builder.field = field;
993  column_builder.col_type = col_type;
994  column_builder.physical_type = col_type.is_dict_encoded_string()
995  ? get_dict_index_type(col_type)
996  : get_physical_type(col_type);
997 
998  auto value_type = field->type();
999  if (col_type.is_dict_encoded_string()) {
1000  column_builder.builder.reset(new StringDictionary32Builder());
1001  // add values to the builder
1002  const int dict_id = col_type.get_comp_param();
1003  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1004 
1005  arrow::StringBuilder str_array_builder;
1006  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
1007  std::shared_ptr<StringArray> string_array;
1008  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1009 
1010  auto dict_builder =
1011  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1012  CHECK(dict_builder);
1013 
1014  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1015  } else {
1017  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
1018  }
1019 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool is_dict_encoded_string() const
Definition: sqltypes.h:444
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:

◆ makeField()

std::shared_ptr< arrow::Field > ArrowResultSetConverter::makeField ( const std::string  name,
const SQLTypeInfo target_type 
) const
private

Definition at line 941 of file ArrowResultSetConverter.cpp.

References field(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_arrow_type(), and SQLTypeInfo::get_notnull().

943  {
944  return arrow::field(
945  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
946 }
name
Definition: setup.py:35
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:266
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
ExecutorDeviceType device_type_
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ ArrowResultSet

friend class ArrowResultSet
friend

Definition at line 247 of file ArrowResultSet.h.

Member Data Documentation

◆ col_names_

std::vector<std::string> ArrowResultSetConverter::col_names_
private

Definition at line 238 of file ArrowResultSet.h.

◆ data_mgr_

std::shared_ptr<Data_Namespace::DataMgr> ArrowResultSetConverter::data_mgr_ = nullptr
private

Definition at line 235 of file ArrowResultSet.h.

◆ device_id_

int32_t ArrowResultSetConverter::device_id_ = 0
private

Definition at line 237 of file ArrowResultSet.h.

◆ device_type_

ExecutorDeviceType ArrowResultSetConverter::device_type_ = ExecutorDeviceType::GPU
private

Definition at line 236 of file ArrowResultSet.h.

◆ is_valid_

std::vector<std::unique_ptr<uint8_t[]> > ArrowResultSetConverter::is_valid_
mutableprivate

Definition at line 245 of file ArrowResultSet.h.

◆ results_

std::shared_ptr<ResultSet> ArrowResultSetConverter::results_
private

Definition at line 234 of file ArrowResultSet.h.

◆ top_n_

int32_t ArrowResultSetConverter::top_n_
private

Definition at line 239 of file ArrowResultSet.h.

◆ transport_method_

ArrowTransport ArrowResultSetConverter::transport_method_
private

Definition at line 240 of file ArrowResultSet.h.

◆ values_

std::vector<std::unique_ptr<int8_t[]> > ArrowResultSetConverter::values_
mutableprivate

Definition at line 244 of file ArrowResultSet.h.


The documentation for this class was generated from the following files: