OmniSciDB  ba1bac9284
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSetConverter Class Reference

#include <ArrowResultSet.h>

Classes

struct  ColumnBuilder
 
struct  SerializedArrowOutput
 

Public Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method)
 
ArrowResult getArrowResult () const
 
 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
 
std::shared_ptr
< arrow::RecordBatch > 
convertToArrow () const
 

Private Member Functions

std::shared_ptr
< arrow::RecordBatch > 
getArrowBatch (const std::shared_ptr< arrow::Schema > &schema) const
 
std::shared_ptr< arrow::Field > makeField (const std::string name, const SQLTypeInfo &target_type) const
 
SerializedArrowOutput getSerializedArrowOutput (arrow::ipc::DictionaryFieldMapper *mapper) const
 
void initializeColumnBuilder (ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
 
void append (ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
 
std::shared_ptr< arrow::Array > finishColumnBuilder (ColumnBuilder &column_builder) const
 

Private Attributes

std::shared_ptr< ResultSetresults_
 
std::shared_ptr
< Data_Namespace::DataMgr
data_mgr_ = nullptr
 
ExecutorDeviceType device_type_ = ExecutorDeviceType::GPU
 
int32_t device_id_ = 0
 
std::vector< std::string > col_names_
 
int32_t top_n_
 
ArrowTransport transport_method_
 

Friends

class ArrowResultSet
 

Detailed Description

Definition at line 182 of file ArrowResultSet.h.

Constructor & Destructor Documentation

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::shared_ptr< Data_Namespace::DataMgr data_mgr,
const ExecutorDeviceType  device_type,
const int32_t  device_id,
const std::vector< std::string > &  col_names,
const int32_t  first_n,
const ArrowTransport  transport_method 
)
inline

Definition at line 184 of file ArrowResultSet.h.

191  : results_(results)
192  , data_mgr_(data_mgr)
193  , device_type_(device_type)
194  , device_id_(device_id)
195  , col_names_(col_names)
196  , top_n_(first_n)
197  , transport_method_(transport_method) {}
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
ArrowTransport transport_method_
std::vector< std::string > col_names_
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > results_
ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::vector< std::string > &  col_names,
const int32_t  first_n 
)
inline

Definition at line 210 of file ArrowResultSet.h.

213  : results_(results), col_names_(col_names), top_n_(first_n) {}
std::vector< std::string > col_names_
std::shared_ptr< ResultSet > results_

Member Function Documentation

void ArrowResultSetConverter::append ( ColumnBuilder column_builder,
const ValueArray values,
const std::shared_ptr< std::vector< bool >> &  is_valid 
) const
private

Definition at line 1153 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, ArrowResultSetConverter::ColumnBuilder::col_type, device_type_, GPU, SQLTypeInfo::is_dict_encoded_string(), kBIGINT, kBOOLEAN, kCHAR, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and ArrowResultSetConverter::ColumnBuilder::physical_type.

Referenced by getArrowBatch().

1156  {
1157  if (column_builder.col_type.is_dict_encoded_string()) {
1158  CHECK_EQ(column_builder.physical_type,
1159  kINT); // assume all dicts use none-encoded type for now
1160  appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1161  column_builder, values, is_valid);
1162  return;
1163  }
1164  switch (column_builder.physical_type) {
1165  case kBOOLEAN:
1166  appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1167  column_builder, values, is_valid);
1168  break;
1169  case kTINYINT:
1170  appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1171  break;
1172  case kSMALLINT:
1173  appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1174  column_builder, values, is_valid);
1175  break;
1176  case kINT:
1177  appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1178  column_builder, values, is_valid);
1179  break;
1180  case kBIGINT:
1181  appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1182  column_builder, values, is_valid);
1183  break;
1184  case kDECIMAL:
1185  appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1186  column_builder, values, is_valid);
1187  break;
1188  case kFLOAT:
1189  appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1190  break;
1191  case kDOUBLE:
1192  appendToColumnBuilder<arrow::DoubleBuilder, double>(
1193  column_builder, values, is_valid);
1194  break;
1195  case kTIME:
1196  appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1197  column_builder, values, is_valid);
1198  break;
1199  case kTIMESTAMP:
1200  appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1201  column_builder, values, is_valid);
1202  break;
1203  case kDATE:
1205  ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1206  column_builder, values, is_valid)
1207  : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1208  column_builder, values, is_valid);
1209  break;
1210  case kCHAR:
1211  case kVARCHAR:
1212  case kTEXT:
1213  default:
1214  // TODO(miyu): support more scalar types.
1215  throw std::runtime_error(column_builder.col_type.get_type_name() +
1216  " is not supported in Arrow result sets.");
1217  }
1218 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
Definition: sqltypes.h:48
ExecutorDeviceType device_type_
Definition: sqltypes.h:51
Definition: sqltypes.h:52
Definition: sqltypes.h:40
Definition: sqltypes.h:44

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::convertToArrow ( ) const

Definition at line 584 of file ArrowResultSetConverter.cpp.

References CHECK, col_names_, DEBUG_TIMER, f, getArrowBatch(), i, makeField(), results_, and VLOG.

Referenced by getArrowResult(), and getSerializedArrowOutput().

584  {
585  auto timer = DEBUG_TIMER(__func__);
586  const auto col_count = results_->colCount();
587  std::vector<std::shared_ptr<arrow::Field>> fields;
588  CHECK(col_names_.empty() || col_names_.size() == col_count);
589  for (size_t i = 0; i < col_count; ++i) {
590  const auto ti = results_->getColType(i);
591  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
592  }
593 #if ARROW_CONVERTER_DEBUG
594  VLOG(1) << "Arrow fields: ";
595  for (const auto& f : fields) {
596  VLOG(1) << "\t" << f->ToString(true);
597  }
598 #endif
599  return getArrowBatch(arrow::schema(fields));
600 }
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::vector< std::string > col_names_
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
char * f
#define VLOG(n)
Definition: Logger.h:300

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::Array > ArrowResultSetConverter::finishColumnBuilder ( ColumnBuilder column_builder) const
inlineprivate

Definition at line 1056 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, and ArrowResultSetConverter::ColumnBuilder::builder.

Referenced by getArrowBatch().

1057  {
1058  std::shared_ptr<arrow::Array> values;
1059  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1060  return values;
1061 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36

+ Here is the caller graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::getArrowBatch ( const std::shared_ptr< arrow::Schema > &  schema) const
private

Definition at line 602 of file ArrowResultSetConverter.cpp.

References append(), ARROW_RECORDBATCH_MAKE, CHECK, CHECK_EQ, cpu_threads(), DEBUG_TIMER, device_type_, DICTIONARY, field(), finishColumnBuilder(), GPU, i, initializeColumnBuilder(), generate_TableFunctionsFactory_init::j, kBIGINT, kBOOLEAN, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTIME, kTIMESTAMP, kTINYINT, Projection, run_benchmark_import::result, results_, and top_n_.

Referenced by convertToArrow().

603  {
604  std::vector<std::shared_ptr<arrow::Array>> result_columns;
605 
606  const size_t entry_count = top_n_ < 0
607  ? results_->entryCount()
608  : std::min(size_t(top_n_), results_->entryCount());
609  if (!entry_count) {
610  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
611  }
612  const auto col_count = results_->colCount();
613  size_t row_count = 0;
614 
615  result_columns.resize(col_count);
616  std::vector<ColumnBuilder> builders(col_count);
617 
618  // Create array builders
619  for (size_t i = 0; i < col_count; ++i) {
620  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
621  }
622 
623  // TODO(miyu): speed up for columnar buffers
624  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
625  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
626  const std::vector<bool>& non_lazy_cols,
627  const size_t start_entry,
628  const size_t end_entry) -> size_t {
629  CHECK_EQ(value_seg.size(), col_count);
630  CHECK_EQ(null_bitmap_seg.size(), col_count);
631  const auto entry_count = end_entry - start_entry;
632  size_t seg_row_count = 0;
633  for (size_t i = start_entry; i < end_entry; ++i) {
634  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
635  if (row.empty()) {
636  continue;
637  }
638  ++seg_row_count;
639  for (size_t j = 0; j < col_count; ++j) {
640  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
641  continue;
642  }
643 
644  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
645  // TODO(miyu): support more types other than scalar.
646  CHECK(scalar_value);
647  const auto& column = builders[j];
648  switch (column.physical_type) {
649  case kBOOLEAN:
650  create_or_append_value<bool, int64_t>(
651  *scalar_value, value_seg[j], entry_count);
652  create_or_append_validity<int64_t>(
653  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
654  break;
655  case kTINYINT:
656  create_or_append_value<int8_t, int64_t>(
657  *scalar_value, value_seg[j], entry_count);
658  create_or_append_validity<int64_t>(
659  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
660  break;
661  case kSMALLINT:
662  create_or_append_value<int16_t, int64_t>(
663  *scalar_value, value_seg[j], entry_count);
664  create_or_append_validity<int64_t>(
665  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
666  break;
667  case kINT:
668  create_or_append_value<int32_t, int64_t>(
669  *scalar_value, value_seg[j], entry_count);
670  create_or_append_validity<int64_t>(
671  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
672  break;
673  case kBIGINT:
674  create_or_append_value<int64_t, int64_t>(
675  *scalar_value, value_seg[j], entry_count);
676  create_or_append_validity<int64_t>(
677  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
678  break;
679  case kDECIMAL:
680  create_or_append_value<int64_t, int64_t>(
681  *scalar_value, value_seg[j], entry_count);
682  create_or_append_validity<int64_t>(
683  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
684  break;
685  case kFLOAT:
686  create_or_append_value<float, float>(
687  *scalar_value, value_seg[j], entry_count);
688  create_or_append_validity<float>(
689  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
690  break;
691  case kDOUBLE:
692  create_or_append_value<double, double>(
693  *scalar_value, value_seg[j], entry_count);
694  create_or_append_validity<double>(
695  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
696  break;
697  case kTIME:
698  create_or_append_value<int32_t, int64_t>(
699  *scalar_value, value_seg[j], entry_count);
700  create_or_append_validity<int64_t>(
701  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
702  break;
703  case kDATE:
705  ? create_or_append_value<int64_t, int64_t>(
706  *scalar_value, value_seg[j], entry_count)
707  : create_or_append_value<int32_t, int64_t>(
708  *scalar_value, value_seg[j], entry_count);
709  create_or_append_validity<int64_t>(
710  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
711  break;
712  case kTIMESTAMP:
713  create_or_append_value<int64_t, int64_t>(
714  *scalar_value, value_seg[j], entry_count);
715  create_or_append_validity<int64_t>(
716  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
717  break;
718  default:
719  // TODO(miyu): support more scalar types.
720  throw std::runtime_error(column.col_type.get_type_name() +
721  " is not supported in Arrow result sets.");
722  }
723  }
724  }
725  return seg_row_count;
726  };
727 
728  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
729  const std::vector<bool>& non_lazy_cols,
730  const size_t start_col,
731  const size_t end_col) {
732  for (size_t col = start_col; col < end_col; ++col) {
733  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
734  continue;
735  }
736 
737  const auto& column = builders[col];
738  switch (column.physical_type) {
739  case kTINYINT:
740  convert_column<int8_t>(results_, col, entry_count, result[col]);
741  break;
742  case kSMALLINT:
743  convert_column<int16_t>(results_, col, entry_count, result[col]);
744  break;
745  case kINT:
746  convert_column<int32_t>(results_, col, entry_count, result[col]);
747  break;
748  case kBIGINT:
749  convert_column<int64_t>(results_, col, entry_count, result[col]);
750  break;
751  case kFLOAT:
752  convert_column<float>(results_, col, entry_count, result[col]);
753  break;
754  case kDOUBLE:
755  convert_column<double>(results_, col, entry_count, result[col]);
756  break;
757  default:
758  throw std::runtime_error(column.col_type.get_type_name() +
759  " is not supported in Arrow column converter.");
760  }
761  }
762  };
763 
764  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
765  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
766  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
767  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
768  results_->getQueryMemDesc().getQueryDescriptionType() ==
770  entry_count == results_->entryCount();
771  std::vector<bool> non_lazy_cols;
772  if (use_columnar_converter) {
773  auto timer = DEBUG_TIMER("columnar converter");
774  std::vector<size_t> non_lazy_col_pos;
775  size_t non_lazy_col_count = 0;
776  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
777 
778  non_lazy_cols.reserve(col_count);
779  non_lazy_col_pos.reserve(col_count);
780  for (size_t i = 0; i < col_count; ++i) {
781  bool is_lazy =
782  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
783  // Currently column converter cannot handle some data types.
784  // Treat them as lazy.
785  switch (builders[i].physical_type) {
786  case kBOOLEAN:
787  case kTIME:
788  case kDATE:
789  case kTIMESTAMP:
790  is_lazy = true;
791  break;
792  default:
793  break;
794  }
795  if (builders[i].field->type()->id() == arrow::Type::DICTIONARY) {
796  is_lazy = true;
797  }
798  non_lazy_cols.emplace_back(!is_lazy);
799  if (!is_lazy) {
800  ++non_lazy_col_count;
801  non_lazy_col_pos.emplace_back(i);
802  }
803  }
804 
805  if (non_lazy_col_count == col_count) {
806  non_lazy_cols.clear();
807  non_lazy_col_pos.clear();
808  } else {
809  non_lazy_col_pos.emplace_back(col_count);
810  }
811 
812  std::vector<std::future<void>> child_threads;
813  size_t num_threads =
814  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
815 
816  size_t start_col = 0;
817  size_t end_col = 0;
818  for (size_t i = 0; i < num_threads; ++i) {
819  start_col = end_col;
820  end_col = (i + 1) * non_lazy_col_count / num_threads;
821  size_t phys_start_col =
822  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
823  size_t phys_end_col =
824  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
825  child_threads.push_back(std::async(std::launch::async,
826  convert_columns,
827  std::ref(result_columns),
828  non_lazy_cols,
829  phys_start_col,
830  phys_end_col));
831  }
832  for (auto& child : child_threads) {
833  child.get();
834  }
835  row_count = entry_count;
836  }
837  if (!use_columnar_converter || !non_lazy_cols.empty()) {
838  auto timer = DEBUG_TIMER("row converter");
839  row_count = 0;
840  if (multithreaded) {
841  const size_t cpu_count = cpu_threads();
842  std::vector<std::future<size_t>> child_threads;
843  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
844  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
845  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
846  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
847  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
848  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
849  ++i, start_entry += stride) {
850  const auto end_entry = std::min(entry_count, start_entry + stride);
851  child_threads.push_back(std::async(std::launch::async,
852  fetch,
853  std::ref(column_value_segs[i]),
854  std::ref(null_bitmap_segs[i]),
855  non_lazy_cols,
856  start_entry,
857  end_entry));
858  }
859  for (auto& child : child_threads) {
860  row_count += child.get();
861  }
862  {
863  auto timer = DEBUG_TIMER("append rows to arrow");
864  for (int i = 0; i < schema->num_fields(); ++i) {
865  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
866  continue;
867  }
868 
869  for (size_t j = 0; j < cpu_count; ++j) {
870  if (!column_value_segs[j][i]) {
871  continue;
872  }
873  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
874  }
875  }
876  }
877  } else {
878  row_count =
879  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
880  {
881  auto timer = DEBUG_TIMER("append rows to arrow single thread");
882  for (int i = 0; i < schema->num_fields(); ++i) {
883  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
884  continue;
885  }
886 
887  append(builders[i], *column_values[i], null_bitmaps[i]);
888  }
889  }
890  }
891 
892  {
893  auto timer = DEBUG_TIMER("finish builders");
894  for (size_t i = 0; i < col_count; ++i) {
895  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
896  continue;
897  }
898 
899  result_columns[i] = finishColumnBuilder(builders[i]);
900  }
901  }
902  }
903 
904  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
905 }
#define CHECK_EQ(x, y)
Definition: Logger.h:214
Definition: sqltypes.h:48
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
#define DICTIONARY
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
#define ARROW_RECORDBATCH_MAKE
ExecutorDeviceType device_type_
Definition: sqltypes.h:52
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
Definition: sqltypes.h:44
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ArrowResult ArrowResultSetConverter::getArrowResult ( ) const

Serialize an Arrow result to IPC memory. Users are responsible for freeing all CPU IPC buffers using deallocateArrowResultBuffer. GPU buffers will become owned by the caller upon deserialization, and will be automatically freed when they go out of scope.

Definition at line 343 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_LOG, ARROW_THROW_NOT_OK, CHECK, CHECK_GE, convertToArrow(), CPU, DEBUG_TIMER, device_id_, device_type_, arrow::get_and_copy_to_shm(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_shm_buffer(), GPU, SHARED_MEMORY, transport_method_, UNREACHABLE, and WIRE.

343  {
344  auto timer = DEBUG_TIMER(__func__);
345  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
346 
349  const auto getWireResult =
350  [&](const int64_t schema_size,
351  const int64_t dict_size,
352  const int64_t records_size,
353  const std::shared_ptr<arrow::Buffer>& serialized_schema,
354  const std::shared_ptr<arrow::Buffer>& serialized_dict) -> ArrowResult {
355  auto timer = DEBUG_TIMER("serialize batch to wire");
356  const int64_t total_size = schema_size + records_size + dict_size;
357  std::vector<char> record_handle_data(total_size);
358  auto serialized_records =
359  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
360 
361  ARROW_ASSIGN_OR_THROW(auto writer, arrow::Buffer::GetWriter(serialized_records));
362 
363  ARROW_THROW_NOT_OK(writer->Write(
364  reinterpret_cast<const uint8_t*>(serialized_schema->data()), schema_size));
365 
366  ARROW_THROW_NOT_OK(writer->Write(
367  reinterpret_cast<const uint8_t*>(serialized_dict->data()), dict_size));
368 
369  arrow::io::FixedSizeBufferWriter stream(
370  SliceMutableBuffer(serialized_records, schema_size + dict_size));
371 
372  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
373  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
374 
375  return {std::vector<char>(0),
376  0,
377  std::vector<char>(0),
378  serialized_records->size(),
379  std::string{""},
380  std::move(record_handle_data)};
381  };
382 
383  const auto getShmResult =
384  [&](const int64_t schema_size,
385  const int64_t dict_size,
386  const int64_t records_size,
387  const std::shared_ptr<arrow::Buffer>& serialized_schema,
388  const std::shared_ptr<arrow::Buffer>& serialized_dict) -> ArrowResult {
389  auto timer = DEBUG_TIMER("serialize batch to shared memory");
390  std::shared_ptr<arrow::Buffer> serialized_records;
391  std::vector<char> schema_handle_buffer;
392  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
393  key_t records_shm_key = IPC_PRIVATE;
394  const int64_t total_size = schema_size + records_size + dict_size;
395 
396  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
397 
398  memcpy(serialized_records->mutable_data(),
399  serialized_schema->data(),
400  (size_t)schema_size);
401  memcpy(serialized_records->mutable_data() + schema_size,
402  serialized_dict->data(),
403  (size_t)dict_size);
404 
405  arrow::io::FixedSizeBufferWriter stream(
406  SliceMutableBuffer(serialized_records, schema_size + dict_size));
407  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
408  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
409  memcpy(&record_handle_buffer[0],
410  reinterpret_cast<const unsigned char*>(&records_shm_key),
411  sizeof(key_t));
412 
413  return {schema_handle_buffer,
414  0,
415  record_handle_buffer,
416  serialized_records->size(),
417  std::string{""}};
418  };
419 
420  std::shared_ptr<arrow::Buffer> serialized_schema;
421  int64_t records_size = 0;
422  int64_t schema_size = 0;
423  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
424  auto options = arrow::ipc::IpcWriteOptions::Defaults();
425  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
426 
427  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
428 
429  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
430 
431  for (auto& pair : dictionaries) {
432  arrow::ipc::IpcPayload payload;
433  int64_t dictionary_id = pair.first;
434  const auto& dictionary = pair.second;
435 
437  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
438  int32_t metadata_length = 0;
440  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
441  }
442  auto serialized_dict = dict_stream->Finish().ValueOrDie();
443  auto dict_size = serialized_dict->size();
444 
445  ARROW_ASSIGN_OR_THROW(serialized_schema,
446  arrow::ipc::SerializeSchema(*record_batch->schema(),
447  arrow::default_memory_pool()));
448  schema_size = serialized_schema->size();
449 
450  ARROW_THROW_NOT_OK(arrow::ipc::GetRecordBatchSize(*record_batch, &records_size));
451 
452  switch (transport_method_) {
454  return getWireResult(
455  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
457  return getShmResult(
458  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
459  default:
460  UNREACHABLE();
461  }
462  }
463 #ifdef HAVE_CUDA
465 
466  // Copy the schema to the schema handle
467  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
468  ARROW_THROW_NOT_OK(out_stream_result.status());
469  auto out_stream = std::move(out_stream_result).ValueOrDie();
470 
471  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
472  arrow::ipc::DictionaryMemo current_memo;
473  arrow::ipc::DictionaryMemo serialized_memo;
474 
475  arrow::ipc::IpcPayload schema_payload;
476  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
477  arrow::ipc::IpcWriteOptions::Defaults(),
478  mapper,
479  &schema_payload));
480  int32_t schema_payload_length = 0;
481  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
482  arrow::ipc::IpcWriteOptions::Defaults(),
483  out_stream.get(),
484  &schema_payload_length));
485  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
486  ARROW_LOG("GPU") << "Dictionary "
487  << "found dicts: " << dictionaries.size();
488 
490  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
491 
492  // now try a dictionary
493  std::shared_ptr<arrow::Schema> dummy_schema;
494  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
495 
496  for (const auto& pair : dictionaries) {
497  arrow::ipc::IpcPayload payload;
498  const auto& dict_id = pair.first;
499  CHECK_GE(dict_id, 0);
500  ARROW_LOG("GPU") << "Dictionary "
501  << "dict_id: " << dict_id;
502  const auto& dict = pair.second;
503  CHECK(dict);
504 
505  if (!dummy_schema) {
506  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
507  dummy_schema = std::make_shared<arrow::Schema>(
508  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
509  }
510  dict_batches.emplace_back(
511  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
512  }
513 
514  if (!dict_batches.empty()) {
515  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
516  dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
517  }
518 
519  auto complete_ipc_stream = out_stream->Finish();
520  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
521  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
522 
523  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
524  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
525  memcpy(&schema_record_key_buffer[0],
526  reinterpret_cast<const unsigned char*>(&record_key),
527  sizeof(key_t));
528 
529  arrow::cuda::CudaDeviceManager* manager;
530  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
531  std::shared_ptr<arrow::cuda::CudaContext> context;
532  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
533 
534  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
535  ARROW_ASSIGN_OR_THROW(device_serialized,
536  SerializeRecordBatch(*record_batch, context.get()));
537 
538  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
539  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
540 
541  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
542  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
543  cuda_handle->Serialize(arrow::default_memory_pool()));
544 
545  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
546  memcpy(&record_handle_buffer[0],
547  serialized_cuda_handle->data(),
548  serialized_cuda_handle->size());
549 
550  return {schema_record_key_buffer,
551  serialized_records->size(),
552  record_handle_buffer,
553  serialized_cuda_handle->size(),
554  serialized_cuda_handle->ToString()};
555 #else
556  UNREACHABLE();
557  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
558 #endif
559 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
#define UNREACHABLE()
Definition: Logger.h:250
#define CHECK_GE(x, y)
Definition: Logger.h:219
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
ArrowTransport transport_method_
ExecutorDeviceType device_type_
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
#define CHECK(condition)
Definition: Logger.h:206
#define DEBUG_TIMER(name)
Definition: Logger.h:322
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

ArrowResultSetConverter::SerializedArrowOutput ArrowResultSetConverter::getSerializedArrowOutput ( arrow::ipc::DictionaryFieldMapper *  mapper) const
private

Definition at line 562 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, convertToArrow(), and DEBUG_TIMER.

563  {
564  auto timer = DEBUG_TIMER(__func__);
565  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
566  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
567 
569  serialized_schema,
570  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
571 
572  if (arrow_copy->num_rows()) {
573  auto timer = DEBUG_TIMER("serialize records");
574  ARROW_THROW_NOT_OK(arrow_copy->Validate());
575  ARROW_ASSIGN_OR_THROW(serialized_records,
576  arrow::ipc::SerializeRecordBatch(
577  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
578  } else {
579  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
580  }
581  return {serialized_schema, serialized_records};
582 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define DEBUG_TIMER(name)
Definition: Logger.h:322
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

void ArrowResultSetConverter::initializeColumnBuilder ( ColumnBuilder column_builder,
const SQLTypeInfo col_type,
const std::shared_ptr< arrow::Field > &  field 
) const
private

Definition at line 1023 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, ArrowResultSetConverter::ColumnBuilder::builder, CHECK, ArrowResultSetConverter::ColumnBuilder::col_type, field(), ArrowResultSetConverter::ColumnBuilder::field, SQLTypeInfo::get_comp_param(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_dict_index_type(), foreign_storage::get_physical_type(), SQLTypeInfo::is_dict_encoded_string(), ArrowResultSetConverter::ColumnBuilder::physical_type, and results_.

Referenced by getArrowBatch().

1026  {
1027  column_builder.field = field;
1028  column_builder.col_type = col_type;
1029  column_builder.physical_type = col_type.is_dict_encoded_string()
1030  ? get_dict_index_type(col_type)
1031  : get_physical_type(col_type);
1032 
1033  auto value_type = field->type();
1034  if (col_type.is_dict_encoded_string()) {
1035  column_builder.builder.reset(new arrow::StringDictionary32Builder());
1036  // add values to the builder
1037  const int dict_id = col_type.get_comp_param();
1038  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1039 
1040  arrow::StringBuilder str_array_builder;
1041  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
1042  std::shared_ptr<arrow::StringArray> string_array;
1043  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1044 
1045  auto dict_builder =
1046  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1047  CHECK(dict_builder);
1048 
1049  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1050  } else {
1051  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1052  arrow::default_memory_pool(), value_type, &column_builder.builder));
1053  }
1054 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< ResultSet > results_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
#define CHECK(condition)
Definition: Logger.h:206
bool is_dict_encoded_string() const
Definition: sqltypes.h:526

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::Field > ArrowResultSetConverter::makeField ( const std::string  name,
const SQLTypeInfo target_type 
) const
private

Definition at line 976 of file ArrowResultSetConverter.cpp.

References device_type_, field(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_arrow_type(), and SQLTypeInfo::get_notnull().

Referenced by convertToArrow().

978  {
979  return arrow::field(
980  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
981 }
string name
Definition: setup.in.py:72
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
ExecutorDeviceType device_type_
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:321

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class ArrowResultSet
friend

Definition at line 250 of file ArrowResultSet.h.

Member Data Documentation

std::vector<std::string> ArrowResultSetConverter::col_names_
private

Definition at line 246 of file ArrowResultSet.h.

Referenced by convertToArrow().

std::shared_ptr<Data_Namespace::DataMgr> ArrowResultSetConverter::data_mgr_ = nullptr
private

Definition at line 243 of file ArrowResultSet.h.

int32_t ArrowResultSetConverter::device_id_ = 0
private

Definition at line 245 of file ArrowResultSet.h.

Referenced by getArrowResult().

ExecutorDeviceType ArrowResultSetConverter::device_type_ = ExecutorDeviceType::GPU
private

Definition at line 244 of file ArrowResultSet.h.

Referenced by append(), getArrowBatch(), getArrowResult(), and makeField().

std::shared_ptr<ResultSet> ArrowResultSetConverter::results_
private

Definition at line 242 of file ArrowResultSet.h.

Referenced by convertToArrow(), getArrowBatch(), and initializeColumnBuilder().

int32_t ArrowResultSetConverter::top_n_
private

Definition at line 247 of file ArrowResultSet.h.

Referenced by getArrowBatch().

ArrowTransport ArrowResultSetConverter::transport_method_
private

Definition at line 248 of file ArrowResultSet.h.

Referenced by getArrowResult().


The documentation for this class was generated from the following files: