OmniSciDB  a667adc9c8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSetConverter Class Reference

#include <ArrowResultSet.h>

Classes

struct  ColumnBuilder
 
struct  SerializedArrowOutput
 

Public Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method)
 
ArrowResult getArrowResult () const
 
 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
 
std::shared_ptr
< arrow::RecordBatch > 
convertToArrow () const
 

Private Member Functions

std::shared_ptr
< arrow::RecordBatch > 
getArrowBatch (const std::shared_ptr< arrow::Schema > &schema) const
 
std::shared_ptr< arrow::Field > makeField (const std::string name, const SQLTypeInfo &target_type) const
 
SerializedArrowOutput getSerializedArrowOutput (arrow::ipc::DictionaryFieldMapper *mapper) const
 
void initializeColumnBuilder (ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
 
void append (ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
 
std::shared_ptr< arrow::Array > finishColumnBuilder (ColumnBuilder &column_builder) const
 

Private Attributes

std::shared_ptr< ResultSetresults_
 
std::shared_ptr
< Data_Namespace::DataMgr
data_mgr_ = nullptr
 
ExecutorDeviceType device_type_ = ExecutorDeviceType::GPU
 
int32_t device_id_ = 0
 
std::vector< std::string > col_names_
 
int32_t top_n_
 
ArrowTransport transport_method_
 

Friends

class ArrowResultSet
 

Detailed Description

Definition at line 181 of file ArrowResultSet.h.

Constructor & Destructor Documentation

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::shared_ptr< Data_Namespace::DataMgr data_mgr,
const ExecutorDeviceType  device_type,
const int32_t  device_id,
const std::vector< std::string > &  col_names,
const int32_t  first_n,
const ArrowTransport  transport_method 
)
inline

Definition at line 183 of file ArrowResultSet.h.

190  : results_(results)
191  , data_mgr_(data_mgr)
192  , device_type_(device_type)
193  , device_id_(device_id)
194  , col_names_(col_names)
195  , top_n_(first_n)
196  , transport_method_(transport_method) {}
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
ArrowTransport transport_method_
std::vector< std::string > col_names_
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > results_
ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::vector< std::string > &  col_names,
const int32_t  first_n 
)
inline

Definition at line 209 of file ArrowResultSet.h.

212  : results_(results), col_names_(col_names), top_n_(first_n) {}
std::vector< std::string > col_names_
std::shared_ptr< ResultSet > results_

Member Function Documentation

void ArrowResultSetConverter::append ( ColumnBuilder column_builder,
const ValueArray values,
const std::shared_ptr< std::vector< bool >> &  is_valid 
) const
private

Definition at line 1116 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, ArrowResultSetConverter::ColumnBuilder::col_type, GPU, SQLTypeInfo::is_dict_encoded_string(), kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and ArrowResultSetConverter::ColumnBuilder::physical_type.

1119  {
1120  if (column_builder.col_type.is_dict_encoded_string()) {
1121  CHECK_EQ(column_builder.physical_type,
1122  kINT); // assume all dicts use none-encoded type for now
1123  appendToColumnBuilder<StringDictionary32Builder, int32_t>(
1124  column_builder, values, is_valid);
1125  return;
1126  }
1127  switch (column_builder.physical_type) {
1128  case kBOOLEAN:
1129  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
1130  break;
1131  case kTINYINT:
1132  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
1133  break;
1134  case kSMALLINT:
1135  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
1136  break;
1137  case kINT:
1138  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
1139  break;
1140  case kBIGINT:
1141  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
1142  break;
1143  case kFLOAT:
1144  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
1145  break;
1146  case kDOUBLE:
1147  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
1148  break;
1149  case kTIME:
1150  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
1151  break;
1152  case kTIMESTAMP:
1153  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
1154  break;
1155  case kDATE:
1157  ? appendToColumnBuilder<Date64Builder, int64_t>(
1158  column_builder, values, is_valid)
1159  : appendToColumnBuilder<Date32Builder, int32_t>(
1160  column_builder, values, is_valid);
1161  break;
1162  case kCHAR:
1163  case kVARCHAR:
1164  case kTEXT:
1165  default:
1166  // TODO(miyu): support more scalar types.
1167  throw std::runtime_error(column_builder.col_type.get_type_name() +
1168  " is not supported in Arrow result sets.");
1169  }
1170 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:48
ExecutorDeviceType device_type_
Definition: sqltypes.h:51
Definition: sqltypes.h:52
Definition: sqltypes.h:40
Definition: sqltypes.h:44

+ Here is the call graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::convertToArrow ( ) const

Definition at line 581 of file ArrowResultSetConverter.cpp.

References CHECK, DEBUG_TIMER, f, i, and VLOG.

581  {
582  auto timer = DEBUG_TIMER(__func__);
583  const auto col_count = results_->colCount();
584  std::vector<std::shared_ptr<arrow::Field>> fields;
585  CHECK(col_names_.empty() || col_names_.size() == col_count);
586  for (size_t i = 0; i < col_count; ++i) {
587  const auto ti = results_->getColType(i);
588  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
589  }
590 #if ARROW_CONVERTER_DEBUG
591  VLOG(1) << "Arrow fields: ";
592  for (const auto& f : fields) {
593  VLOG(1) << "\t" << f->ToString(true);
594  }
595 #endif
596  return getArrowBatch(arrow::schema(fields));
597 }
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::vector< std::string > col_names_
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
char * f
#define VLOG(n)
Definition: Logger.h:291
std::shared_ptr< arrow::Array > ArrowResultSetConverter::finishColumnBuilder ( ColumnBuilder column_builder) const
inlineprivate

Definition at line 1047 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, and ArrowResultSetConverter::ColumnBuilder::builder.

1048  {
1049  std::shared_ptr<Array> values;
1050  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1051  return values;
1052 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::getArrowBatch ( const std::shared_ptr< arrow::Schema > &  schema) const
private

Definition at line 599 of file ArrowResultSetConverter.cpp.

References File_Namespace::append(), ARROW_RECORDBATCH_MAKE, CHECK, CHECK_EQ, cpu_threads(), DEBUG_TIMER, DICTIONARY, field(), GPU, i, generate_TableFunctionsFactory_init::j, kBIGINT, kBOOLEAN, kDATE, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTIME, kTIMESTAMP, kTINYINT, Projection, and run_benchmark_import::result.

600  {
601  std::vector<std::shared_ptr<arrow::Array>> result_columns;
602 
603  const size_t entry_count = top_n_ < 0
604  ? results_->entryCount()
605  : std::min(size_t(top_n_), results_->entryCount());
606  if (!entry_count) {
607  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
608  }
609  const auto col_count = results_->colCount();
610  size_t row_count = 0;
611 
612  result_columns.resize(col_count);
613  std::vector<ColumnBuilder> builders(col_count);
614 
615  // Create array builders
616  for (size_t i = 0; i < col_count; ++i) {
617  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
618  }
619 
620  // TODO(miyu): speed up for columnar buffers
621  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
622  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
623  const std::vector<bool>& non_lazy_cols,
624  const size_t start_entry,
625  const size_t end_entry) -> size_t {
626  CHECK_EQ(value_seg.size(), col_count);
627  CHECK_EQ(null_bitmap_seg.size(), col_count);
628  const auto entry_count = end_entry - start_entry;
629  size_t seg_row_count = 0;
630  for (size_t i = start_entry; i < end_entry; ++i) {
631  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
632  if (row.empty()) {
633  continue;
634  }
635  ++seg_row_count;
636  for (size_t j = 0; j < col_count; ++j) {
637  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
638  continue;
639  }
640 
641  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
642  // TODO(miyu): support more types other than scalar.
643  CHECK(scalar_value);
644  const auto& column = builders[j];
645  switch (column.physical_type) {
646  case kBOOLEAN:
647  create_or_append_value<bool, int64_t>(
648  *scalar_value, value_seg[j], entry_count);
649  create_or_append_validity<int64_t>(
650  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
651  break;
652  case kTINYINT:
653  create_or_append_value<int8_t, int64_t>(
654  *scalar_value, value_seg[j], entry_count);
655  create_or_append_validity<int64_t>(
656  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
657  break;
658  case kSMALLINT:
659  create_or_append_value<int16_t, int64_t>(
660  *scalar_value, value_seg[j], entry_count);
661  create_or_append_validity<int64_t>(
662  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
663  break;
664  case kINT:
665  create_or_append_value<int32_t, int64_t>(
666  *scalar_value, value_seg[j], entry_count);
667  create_or_append_validity<int64_t>(
668  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
669  break;
670  case kBIGINT:
671  create_or_append_value<int64_t, int64_t>(
672  *scalar_value, value_seg[j], entry_count);
673  create_or_append_validity<int64_t>(
674  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
675  break;
676  case kFLOAT:
677  create_or_append_value<float, float>(
678  *scalar_value, value_seg[j], entry_count);
679  create_or_append_validity<float>(
680  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
681  break;
682  case kDOUBLE:
683  create_or_append_value<double, double>(
684  *scalar_value, value_seg[j], entry_count);
685  create_or_append_validity<double>(
686  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
687  break;
688  case kTIME:
689  create_or_append_value<int32_t, int64_t>(
690  *scalar_value, value_seg[j], entry_count);
691  create_or_append_validity<int64_t>(
692  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
693  break;
694  case kDATE:
696  ? create_or_append_value<int64_t, int64_t>(
697  *scalar_value, value_seg[j], entry_count)
698  : create_or_append_value<int32_t, int64_t>(
699  *scalar_value, value_seg[j], entry_count);
700  create_or_append_validity<int64_t>(
701  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
702  break;
703  case kTIMESTAMP:
704  create_or_append_value<int64_t, int64_t>(
705  *scalar_value, value_seg[j], entry_count);
706  create_or_append_validity<int64_t>(
707  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
708  break;
709  default:
710  // TODO(miyu): support more scalar types.
711  throw std::runtime_error(column.col_type.get_type_name() +
712  " is not supported in Arrow result sets.");
713  }
714  }
715  }
716  return seg_row_count;
717  };
718 
719  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
720  const std::vector<bool>& non_lazy_cols,
721  const size_t start_col,
722  const size_t end_col) {
723  for (size_t col = start_col; col < end_col; ++col) {
724  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
725  continue;
726  }
727 
728  const auto& column = builders[col];
729  switch (column.physical_type) {
730  case kTINYINT:
731  convert_column<int8_t>(results_, col, entry_count, result[col]);
732  break;
733  case kSMALLINT:
734  convert_column<int16_t>(results_, col, entry_count, result[col]);
735  break;
736  case kINT:
737  convert_column<int32_t>(results_, col, entry_count, result[col]);
738  break;
739  case kBIGINT:
740  convert_column<int64_t>(results_, col, entry_count, result[col]);
741  break;
742  case kFLOAT:
743  convert_column<float>(results_, col, entry_count, result[col]);
744  break;
745  case kDOUBLE:
746  convert_column<double>(results_, col, entry_count, result[col]);
747  break;
748  default:
749  throw std::runtime_error(column.col_type.get_type_name() +
750  " is not supported in Arrow column converter.");
751  }
752  }
753  };
754 
755  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
756  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
757  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
758  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
759  results_->getQueryMemDesc().getQueryDescriptionType() ==
761  entry_count == results_->entryCount();
762  std::vector<bool> non_lazy_cols;
763  if (use_columnar_converter) {
764  auto timer = DEBUG_TIMER("columnar converter");
765  std::vector<size_t> non_lazy_col_pos;
766  size_t non_lazy_col_count = 0;
767  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
768 
769  non_lazy_cols.reserve(col_count);
770  non_lazy_col_pos.reserve(col_count);
771  for (size_t i = 0; i < col_count; ++i) {
772  bool is_lazy =
773  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
774  // Currently column converter cannot handle some data types.
775  // Treat them as lazy.
776  switch (builders[i].physical_type) {
777  case kBOOLEAN:
778  case kTIME:
779  case kDATE:
780  case kTIMESTAMP:
781  is_lazy = true;
782  break;
783  default:
784  break;
785  }
786  if (builders[i].field->type()->id() == Type::DICTIONARY) {
787  is_lazy = true;
788  }
789  non_lazy_cols.emplace_back(!is_lazy);
790  if (!is_lazy) {
791  ++non_lazy_col_count;
792  non_lazy_col_pos.emplace_back(i);
793  }
794  }
795 
796  if (non_lazy_col_count == col_count) {
797  non_lazy_cols.clear();
798  non_lazy_col_pos.clear();
799  } else {
800  non_lazy_col_pos.emplace_back(col_count);
801  }
802 
803  std::vector<std::future<void>> child_threads;
804  size_t num_threads =
805  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
806 
807  size_t start_col = 0;
808  size_t end_col = 0;
809  for (size_t i = 0; i < num_threads; ++i) {
810  start_col = end_col;
811  end_col = (i + 1) * non_lazy_col_count / num_threads;
812  size_t phys_start_col =
813  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
814  size_t phys_end_col =
815  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
816  child_threads.push_back(std::async(std::launch::async,
817  convert_columns,
818  std::ref(result_columns),
819  non_lazy_cols,
820  phys_start_col,
821  phys_end_col));
822  }
823  for (auto& child : child_threads) {
824  child.get();
825  }
826  row_count = entry_count;
827  }
828  if (!use_columnar_converter || !non_lazy_cols.empty()) {
829  auto timer = DEBUG_TIMER("row converter");
830  row_count = 0;
831  if (multithreaded) {
832  const size_t cpu_count = cpu_threads();
833  std::vector<std::future<size_t>> child_threads;
834  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
835  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
836  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
837  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
838  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
839  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
840  ++i, start_entry += stride) {
841  const auto end_entry = std::min(entry_count, start_entry + stride);
842  child_threads.push_back(std::async(std::launch::async,
843  fetch,
844  std::ref(column_value_segs[i]),
845  std::ref(null_bitmap_segs[i]),
846  non_lazy_cols,
847  start_entry,
848  end_entry));
849  }
850  for (auto& child : child_threads) {
851  row_count += child.get();
852  }
853  {
854  auto timer = DEBUG_TIMER("append rows to arrow");
855  for (int i = 0; i < schema->num_fields(); ++i) {
856  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
857  continue;
858  }
859 
860  for (size_t j = 0; j < cpu_count; ++j) {
861  if (!column_value_segs[j][i]) {
862  continue;
863  }
864  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
865  }
866  }
867  }
868  } else {
869  row_count =
870  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
871  {
872  auto timer = DEBUG_TIMER("append rows to arrow single thread");
873  for (int i = 0; i < schema->num_fields(); ++i) {
874  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
875  continue;
876  }
877 
878  append(builders[i], *column_values[i], null_bitmaps[i]);
879  }
880  }
881  }
882 
883  {
884  auto timer = DEBUG_TIMER("finish builders");
885  for (size_t i = 0; i < col_count; ++i) {
886  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
887  continue;
888  }
889 
890  result_columns[i] = finishColumnBuilder(builders[i]);
891  }
892  }
893  }
894 
895  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
896 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:48
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
#define DICTIONARY
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
#define ARROW_RECORDBATCH_MAKE
ExecutorDeviceType device_type_
Definition: sqltypes.h:52
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
Definition: sqltypes.h:44
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

ArrowResult ArrowResultSetConverter::getArrowResult ( ) const

Serialize an Arrow result to IPC memory. Users are responsible for freeing all CPU IPC buffers using deallocateArrowResultBuffer. GPU buffers will become owned by the caller upon deserialization, and will be automatically freed when they go out of scope.

Definition at line 340 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_LOG, ARROW_THROW_NOT_OK, CHECK, CHECK_GE, CPU, DEBUG_TIMER, arrow::get_and_copy_to_shm(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_shm_buffer(), GPU, SHARED_MEMORY, UNREACHABLE, and WIRE.

340  {
341  auto timer = DEBUG_TIMER(__func__);
342  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
343 
346  const auto getWireResult =
347  [&](const int64_t schema_size,
348  const int64_t dict_size,
349  const int64_t records_size,
350  const std::shared_ptr<Buffer>& serialized_schema,
351  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
352  auto timer = DEBUG_TIMER("serialize batch to wire");
353  const int64_t total_size = schema_size + records_size + dict_size;
354  std::vector<char> record_handle_data(total_size);
355  auto serialized_records =
356  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
357 
358  ARROW_ASSIGN_OR_THROW(auto writer, Buffer::GetWriter(serialized_records));
359 
360  ARROW_THROW_NOT_OK(writer->Write(
361  reinterpret_cast<const uint8_t*>(serialized_schema->data()), schema_size));
362 
363  ARROW_THROW_NOT_OK(writer->Write(
364  reinterpret_cast<const uint8_t*>(serialized_dict->data()), dict_size));
365 
366  io::FixedSizeBufferWriter stream(
367  SliceMutableBuffer(serialized_records, schema_size + dict_size));
368 
369  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
370  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
371 
372  return {std::vector<char>(0),
373  0,
374  std::vector<char>(0),
375  serialized_records->size(),
376  std::string{""},
377  std::move(record_handle_data)};
378  };
379 
380  const auto getShmResult =
381  [&](const int64_t schema_size,
382  const int64_t dict_size,
383  const int64_t records_size,
384  const std::shared_ptr<Buffer>& serialized_schema,
385  const std::shared_ptr<Buffer>& serialized_dict) -> ArrowResult {
386  auto timer = DEBUG_TIMER("serialize batch to shared memory");
387  std::shared_ptr<Buffer> serialized_records;
388  std::vector<char> schema_handle_buffer;
389  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
390  key_t records_shm_key = IPC_PRIVATE;
391  const int64_t total_size = schema_size + records_size + dict_size;
392 
393  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
394 
395  memcpy(serialized_records->mutable_data(),
396  serialized_schema->data(),
397  (size_t)schema_size);
398  memcpy(serialized_records->mutable_data() + schema_size,
399  serialized_dict->data(),
400  (size_t)dict_size);
401 
402  io::FixedSizeBufferWriter stream(
403  SliceMutableBuffer(serialized_records, schema_size + dict_size));
404  ARROW_THROW_NOT_OK(ipc::SerializeRecordBatch(
405  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
406  memcpy(&record_handle_buffer[0],
407  reinterpret_cast<const unsigned char*>(&records_shm_key),
408  sizeof(key_t));
409 
410  return {schema_handle_buffer,
411  0,
412  record_handle_buffer,
413  serialized_records->size(),
414  std::string{""}};
415  };
416 
417  std::shared_ptr<Buffer> serialized_schema;
418  int64_t records_size = 0;
419  int64_t schema_size = 0;
420  ipc::DictionaryFieldMapper mapper(*record_batch->schema());
421  auto options = ipc::IpcWriteOptions::Defaults();
422  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
423 
424  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
425 
426  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
427 
428  for (auto& pair : dictionaries) {
429  ipc::IpcPayload payload;
430  int64_t dictionary_id = pair.first;
431  const auto& dictionary = pair.second;
432 
434  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
435  int32_t metadata_length = 0;
437  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
438  }
439  auto serialized_dict = dict_stream->Finish().ValueOrDie();
440  auto dict_size = serialized_dict->size();
441 
443  serialized_schema,
444  ipc::SerializeSchema(*record_batch->schema(), default_memory_pool()));
445  schema_size = serialized_schema->size();
446 
447  ARROW_THROW_NOT_OK(ipc::GetRecordBatchSize(*record_batch, &records_size));
448 
449  switch (transport_method_) {
451  return getWireResult(
452  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
454  return getShmResult(
455  schema_size, dict_size, records_size, serialized_schema, serialized_dict);
456  default:
457  UNREACHABLE();
458  }
459  }
460 #ifdef HAVE_CUDA
462 
463  // Copy the schema to the schema handle
464  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
465  ARROW_THROW_NOT_OK(out_stream_result.status());
466  auto out_stream = std::move(out_stream_result).ValueOrDie();
467 
468  ipc::DictionaryFieldMapper mapper(*record_batch->schema());
469  arrow::ipc::DictionaryMemo current_memo;
470  arrow::ipc::DictionaryMemo serialized_memo;
471 
472  arrow::ipc::IpcPayload schema_payload;
473  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
474  arrow::ipc::IpcWriteOptions::Defaults(),
475  mapper,
476  &schema_payload));
477  int32_t schema_payload_length = 0;
478  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
479  arrow::ipc::IpcWriteOptions::Defaults(),
480  out_stream.get(),
481  &schema_payload_length));
482  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
483  ARROW_LOG("GPU") << "Dictionary "
484  << "found dicts: " << dictionaries.size();
485 
487  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
488 
489  // now try a dictionary
490  std::shared_ptr<arrow::Schema> dummy_schema;
491  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
492 
493  for (const auto& pair : dictionaries) {
494  ipc::IpcPayload payload;
495  const auto& dict_id = pair.first;
496  CHECK_GE(dict_id, 0);
497  ARROW_LOG("GPU") << "Dictionary "
498  << "dict_id: " << dict_id;
499  const auto& dict = pair.second;
500  CHECK(dict);
501 
502  if (!dummy_schema) {
503  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
504  dummy_schema = std::make_shared<arrow::Schema>(
505  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
506  }
507  dict_batches.emplace_back(
508  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
509  }
510 
511  if (!dict_batches.empty()) {
512  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
513  dict_batches, ipc::IpcWriteOptions::Defaults(), out_stream.get()));
514  }
515 
516  auto complete_ipc_stream = out_stream->Finish();
517  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
518  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
519 
520  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
521  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
522  memcpy(&schema_record_key_buffer[0],
523  reinterpret_cast<const unsigned char*>(&record_key),
524  sizeof(key_t));
525 
526  arrow::cuda::CudaDeviceManager* manager;
527  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
528  std::shared_ptr<arrow::cuda::CudaContext> context;
529  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
530 
531  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
532  ARROW_ASSIGN_OR_THROW(device_serialized,
533  SerializeRecordBatch(*record_batch, context.get()));
534 
535  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
536  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
537 
538  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
539  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
540  cuda_handle->Serialize(arrow::default_memory_pool()));
541 
542  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
543  memcpy(&record_handle_buffer[0],
544  serialized_cuda_handle->data(),
545  serialized_cuda_handle->size());
546 
547  return {schema_record_key_buffer,
548  serialized_records->size(),
549  record_handle_buffer,
550  serialized_cuda_handle->size(),
551  serialized_cuda_handle->ToString()};
552 #else
553  UNREACHABLE();
554  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
555 #endif
556 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
ArrowTransport transport_method_
ExecutorDeviceType device_type_
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
std::pair< key_t, std::shared_ptr< Buffer > > get_shm_buffer(size_t size)
#define CHECK(condition)
Definition: Logger.h:197
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

ArrowResultSetConverter::SerializedArrowOutput ArrowResultSetConverter::getSerializedArrowOutput ( arrow::ipc::DictionaryFieldMapper *  mapper) const
private

Definition at line 559 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, and DEBUG_TIMER.

560  {
561  auto timer = DEBUG_TIMER(__func__);
562  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
563  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
564 
566  serialized_schema,
567  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
568 
569  if (arrow_copy->num_rows()) {
570  auto timer = DEBUG_TIMER("serialize records");
571  ARROW_THROW_NOT_OK(arrow_copy->Validate());
572  ARROW_ASSIGN_OR_THROW(serialized_records,
573  arrow::ipc::SerializeRecordBatch(
574  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
575  } else {
576  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
577  }
578  return {serialized_schema, serialized_records};
579 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define DEBUG_TIMER(name)
Definition: Logger.h:313
std::shared_ptr< arrow::RecordBatch > convertToArrow() const
void ArrowResultSetConverter::initializeColumnBuilder ( ColumnBuilder column_builder,
const SQLTypeInfo col_type,
const std::shared_ptr< arrow::Field > &  field 
) const
private

Definition at line 1014 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, ArrowResultSetConverter::ColumnBuilder::builder, CHECK, ArrowResultSetConverter::ColumnBuilder::col_type, field(), ArrowResultSetConverter::ColumnBuilder::field, SQLTypeInfo::get_comp_param(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_dict_index_type(), foreign_storage::get_physical_type(), SQLTypeInfo::is_dict_encoded_string(), and ArrowResultSetConverter::ColumnBuilder::physical_type.

1017  {
1018  column_builder.field = field;
1019  column_builder.col_type = col_type;
1020  column_builder.physical_type = col_type.is_dict_encoded_string()
1021  ? get_dict_index_type(col_type)
1022  : get_physical_type(col_type);
1023 
1024  auto value_type = field->type();
1025  if (col_type.is_dict_encoded_string()) {
1026  column_builder.builder.reset(new StringDictionary32Builder());
1027  // add values to the builder
1028  const int dict_id = col_type.get_comp_param();
1029  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1030 
1031  arrow::StringBuilder str_array_builder;
1032  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
1033  std::shared_ptr<StringArray> string_array;
1034  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1035 
1036  auto dict_builder =
1037  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1038  CHECK(dict_builder);
1039 
1040  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1041  } else {
1043  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
1044  }
1045 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< ResultSet > results_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
#define CHECK(condition)
Definition: Logger.h:197
bool is_dict_encoded_string() const
Definition: sqltypes.h:525

+ Here is the call graph for this function:

std::shared_ptr< arrow::Field > ArrowResultSetConverter::makeField ( const std::string  name,
const SQLTypeInfo target_type 
) const
private

Definition at line 967 of file ArrowResultSetConverter.cpp.

References field(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_arrow_type(), and SQLTypeInfo::get_notnull().

969  {
970  return arrow::field(
971  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
972 }
string name
Definition: setup.in.py:62
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
ExecutorDeviceType device_type_
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:321

+ Here is the call graph for this function:

Friends And Related Function Documentation

friend class ArrowResultSet
friend

Definition at line 249 of file ArrowResultSet.h.

Member Data Documentation

std::vector<std::string> ArrowResultSetConverter::col_names_
private

Definition at line 245 of file ArrowResultSet.h.

std::shared_ptr<Data_Namespace::DataMgr> ArrowResultSetConverter::data_mgr_ = nullptr
private

Definition at line 242 of file ArrowResultSet.h.

int32_t ArrowResultSetConverter::device_id_ = 0
private

Definition at line 244 of file ArrowResultSet.h.

ExecutorDeviceType ArrowResultSetConverter::device_type_ = ExecutorDeviceType::GPU
private

Definition at line 243 of file ArrowResultSet.h.

std::shared_ptr<ResultSet> ArrowResultSetConverter::results_
private

Definition at line 241 of file ArrowResultSet.h.

int32_t ArrowResultSetConverter::top_n_
private

Definition at line 246 of file ArrowResultSet.h.

ArrowTransport ArrowResultSetConverter::transport_method_
private

Definition at line 247 of file ArrowResultSet.h.


The documentation for this class was generated from the following files: