OmniSciDB  d2f719934e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSetConverter Class Reference

#include <ArrowResultSet.h>

Classes

struct  ColumnBuilder
 
struct  SerializedArrowOutput
 

Public Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method)
 
ArrowResult getArrowResult () const
 
 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
 
std::shared_ptr
< arrow::RecordBatch > 
convertToArrow () const
 

Private Member Functions

std::shared_ptr
< arrow::RecordBatch > 
getArrowBatch (const std::shared_ptr< arrow::Schema > &schema) const
 
std::shared_ptr< arrow::Field > makeField (const std::string name, const SQLTypeInfo &target_type) const
 
SerializedArrowOutput getSerializedArrowOutput (arrow::ipc::DictionaryFieldMapper *mapper) const
 
void initializeColumnBuilder (ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
 
void append (ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
 
std::shared_ptr< arrow::Array > finishColumnBuilder (ColumnBuilder &column_builder) const
 

Private Attributes

std::shared_ptr< ResultSetresults_
 
std::shared_ptr
< Data_Namespace::DataMgr
data_mgr_ = nullptr
 
ExecutorDeviceType device_type_ = ExecutorDeviceType::GPU
 
int32_t device_id_ = 0
 
std::vector< std::string > col_names_
 
int32_t top_n_
 
ArrowTransport transport_method_
 

Friends

class ArrowResultSet
 

Detailed Description

Definition at line 182 of file ArrowResultSet.h.

Constructor & Destructor Documentation

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::shared_ptr< Data_Namespace::DataMgr data_mgr,
const ExecutorDeviceType  device_type,
const int32_t  device_id,
const std::vector< std::string > &  col_names,
const int32_t  first_n,
const ArrowTransport  transport_method 
)
inline

Definition at line 184 of file ArrowResultSet.h.

191  : results_(results)
192  , data_mgr_(data_mgr)
193  , device_type_(device_type)
194  , device_id_(device_id)
195  , col_names_(col_names)
196  , top_n_(first_n)
197  , transport_method_(transport_method) {}
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
ArrowTransport transport_method_
std::vector< std::string > col_names_
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > results_
ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::vector< std::string > &  col_names,
const int32_t  first_n 
)
inline

Definition at line 214 of file ArrowResultSet.h.

217  : results_(results), col_names_(col_names), top_n_(first_n) {}
std::vector< std::string > col_names_
std::shared_ptr< ResultSet > results_

Member Function Documentation

void ArrowResultSetConverter::append ( ColumnBuilder column_builder,
const ValueArray values,
const std::shared_ptr< std::vector< bool >> &  is_valid 
) const
private

Definition at line 1207 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, ArrowResultSetConverter::ColumnBuilder::col_type, device_type_, GPU, SQLTypeInfo::is_dict_encoded_string(), kBIGINT, kBOOLEAN, kCHAR, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and ArrowResultSetConverter::ColumnBuilder::physical_type.

Referenced by getArrowBatch().

1210  {
1211  if (column_builder.col_type.is_dict_encoded_string()) {
1212  CHECK_EQ(column_builder.physical_type,
1213  kINT); // assume all dicts use none-encoded type for now
1214  appendToColumnBuilder<arrow::StringDictionary32Builder, int32_t>(
1215  column_builder, values, is_valid);
1216  return;
1217  }
1218  switch (column_builder.physical_type) {
1219  case kBOOLEAN:
1220  appendToColumnBuilder<arrow::BooleanBuilder, bool>(
1221  column_builder, values, is_valid);
1222  break;
1223  case kTINYINT:
1224  appendToColumnBuilder<arrow::Int8Builder, int8_t>(column_builder, values, is_valid);
1225  break;
1226  case kSMALLINT:
1227  appendToColumnBuilder<arrow::Int16Builder, int16_t>(
1228  column_builder, values, is_valid);
1229  break;
1230  case kINT:
1231  appendToColumnBuilder<arrow::Int32Builder, int32_t>(
1232  column_builder, values, is_valid);
1233  break;
1234  case kBIGINT:
1235  appendToColumnBuilder<arrow::Int64Builder, int64_t>(
1236  column_builder, values, is_valid);
1237  break;
1238  case kDECIMAL:
1239  appendToColumnBuilder<arrow::Decimal128Builder, int64_t>(
1240  column_builder, values, is_valid);
1241  break;
1242  case kFLOAT:
1243  appendToColumnBuilder<arrow::FloatBuilder, float>(column_builder, values, is_valid);
1244  break;
1245  case kDOUBLE:
1246  appendToColumnBuilder<arrow::DoubleBuilder, double>(
1247  column_builder, values, is_valid);
1248  break;
1249  case kTIME:
1250  appendToColumnBuilder<arrow::Time32Builder, int32_t>(
1251  column_builder, values, is_valid);
1252  break;
1253  case kTIMESTAMP:
1254  appendToColumnBuilder<arrow::TimestampBuilder, int64_t>(
1255  column_builder, values, is_valid);
1256  break;
1257  case kDATE:
1259  ? appendToColumnBuilder<arrow::Date64Builder, int64_t>(
1260  column_builder, values, is_valid)
1261  : appendToColumnBuilder<arrow::Date32Builder, int32_t>(
1262  column_builder, values, is_valid);
1263  break;
1264  case kCHAR:
1265  case kVARCHAR:
1266  case kTEXT:
1267  default:
1268  // TODO(miyu): support more scalar types.
1269  throw std::runtime_error(column_builder.col_type.get_type_name() +
1270  " is not supported in Arrow result sets.");
1271  }
1272 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
Definition: sqltypes.h:49
ExecutorDeviceType device_type_
Definition: sqltypes.h:52
Definition: sqltypes.h:53
Definition: sqltypes.h:41
Definition: sqltypes.h:45

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::convertToArrow ( ) const

Definition at line 612 of file ArrowResultSetConverter.cpp.

References CHECK, col_names_, DEBUG_TIMER, f, getArrowBatch(), i, makeField(), results_, and VLOG.

Referenced by getArrowResult(), and getSerializedArrowOutput().

612  {
613  auto timer = DEBUG_TIMER(__func__);
614  const auto col_count = results_->colCount();
615  std::vector<std::shared_ptr<arrow::Field>> fields;
616  CHECK(col_names_.empty() || col_names_.size() == col_count);
617  for (size_t i = 0; i < col_count; ++i) {
618  const auto ti = results_->getColType(i);
619  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
620  }
621 #if ARROW_CONVERTER_DEBUG
622  VLOG(1) << "Arrow fields: ";
623  for (const auto& f : fields) {
624  VLOG(1) << "\t" << f->ToString(true);
625  }
626 #endif
627  return getArrowBatch(arrow::schema(fields));
628 }
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::vector< std::string > col_names_
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
char * f
#define VLOG(n)
Definition: Logger.h:305

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::Array > ArrowResultSetConverter::finishColumnBuilder ( ColumnBuilder column_builder) const
inlineprivate

Definition at line 1103 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, and ArrowResultSetConverter::ColumnBuilder::builder.

Referenced by getArrowBatch().

1104  {
1105  std::shared_ptr<arrow::Array> values;
1106  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
1107  return values;
1108 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36

+ Here is the caller graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::getArrowBatch ( const std::shared_ptr< arrow::Schema > &  schema) const
private

Definition at line 630 of file ArrowResultSetConverter.cpp.

References append(), ARROW_RECORDBATCH_MAKE, threading_serial::async(), CHECK, CHECK_EQ, cpu_threads(), DEBUG_TIMER, device_type_, DICTIONARY, field(), finishColumnBuilder(), GPU, i, initializeColumnBuilder(), kBIGINT, kBOOLEAN, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTIME, kTIMESTAMP, kTINYINT, Projection, run_benchmark_import::result, results_, and top_n_.

Referenced by convertToArrow().

631  {
632  std::vector<std::shared_ptr<arrow::Array>> result_columns;
633 
634  // First, check if the result set is empty.
635  // If so, we return an arrow result set that only
636  // contains the schema (no record batch will be serialized).
637  if (results_->isEmpty()) {
638  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
639  }
640 
641  const size_t entry_count = top_n_ < 0
642  ? results_->entryCount()
643  : std::min(size_t(top_n_), results_->entryCount());
644 
645  const auto col_count = results_->colCount();
646  size_t row_count = 0;
647 
648  result_columns.resize(col_count);
649  std::vector<ColumnBuilder> builders(col_count);
650 
651  // Create array builders
652  for (size_t i = 0; i < col_count; ++i) {
653  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
654  }
655 
656  // TODO(miyu): speed up for columnar buffers
657  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
658  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
659  const std::vector<bool>& non_lazy_cols,
660  const size_t start_entry,
661  const size_t end_entry) -> size_t {
662  CHECK_EQ(value_seg.size(), col_count);
663  CHECK_EQ(null_bitmap_seg.size(), col_count);
664  const auto entry_count = end_entry - start_entry;
665  size_t seg_row_count = 0;
666  for (size_t i = start_entry; i < end_entry; ++i) {
667  auto row = results_->getRowAtNoTranslations(i, non_lazy_cols);
668  if (row.empty()) {
669  continue;
670  }
671  ++seg_row_count;
672  for (size_t j = 0; j < col_count; ++j) {
673  if (!non_lazy_cols.empty() && non_lazy_cols[j]) {
674  continue;
675  }
676 
677  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
678  // TODO(miyu): support more types other than scalar.
679  CHECK(scalar_value);
680  const auto& column = builders[j];
681  switch (column.physical_type) {
682  case kBOOLEAN:
683  create_or_append_value<bool, int64_t>(
684  *scalar_value, value_seg[j], entry_count);
685  create_or_append_validity<int64_t>(
686  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
687  break;
688  case kTINYINT:
689  create_or_append_value<int8_t, int64_t>(
690  *scalar_value, value_seg[j], entry_count);
691  create_or_append_validity<int64_t>(
692  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
693  break;
694  case kSMALLINT:
695  create_or_append_value<int16_t, int64_t>(
696  *scalar_value, value_seg[j], entry_count);
697  create_or_append_validity<int64_t>(
698  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
699  break;
700  case kINT:
701  create_or_append_value<int32_t, int64_t>(
702  *scalar_value, value_seg[j], entry_count);
703  create_or_append_validity<int64_t>(
704  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
705  break;
706  case kBIGINT:
707  create_or_append_value<int64_t, int64_t>(
708  *scalar_value, value_seg[j], entry_count);
709  create_or_append_validity<int64_t>(
710  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
711  break;
712  case kDECIMAL:
713  create_or_append_value<int64_t, int64_t>(
714  *scalar_value, value_seg[j], entry_count);
715  create_or_append_validity<int64_t>(
716  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
717  break;
718  case kFLOAT:
719  create_or_append_value<float, float>(
720  *scalar_value, value_seg[j], entry_count);
721  create_or_append_validity<float>(
722  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
723  break;
724  case kDOUBLE:
725  create_or_append_value<double, double>(
726  *scalar_value, value_seg[j], entry_count);
727  create_or_append_validity<double>(
728  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
729  break;
730  case kTIME:
731  create_or_append_value<int32_t, int64_t>(
732  *scalar_value, value_seg[j], entry_count);
733  create_or_append_validity<int64_t>(
734  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
735  break;
736  case kDATE:
738  ? create_or_append_value<int64_t, int64_t>(
739  *scalar_value, value_seg[j], entry_count)
740  : create_or_append_value<int32_t, int64_t>(
741  *scalar_value, value_seg[j], entry_count);
742  create_or_append_validity<int64_t>(
743  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
744  break;
745  case kTIMESTAMP:
746  create_or_append_value<int64_t, int64_t>(
747  *scalar_value, value_seg[j], entry_count);
748  create_or_append_validity<int64_t>(
749  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
750  break;
751  default:
752  // TODO(miyu): support more scalar types.
753  throw std::runtime_error(column.col_type.get_type_name() +
754  " is not supported in Arrow result sets.");
755  }
756  }
757  }
758  return seg_row_count;
759  };
760 
761  auto convert_columns = [&](std::vector<std::shared_ptr<arrow::Array>>& result,
762  const std::vector<bool>& non_lazy_cols,
763  const size_t start_col,
764  const size_t end_col) {
765  for (size_t col = start_col; col < end_col; ++col) {
766  if (!non_lazy_cols.empty() && !non_lazy_cols[col]) {
767  continue;
768  }
769 
770  const auto& column = builders[col];
771  switch (column.physical_type) {
772  case kTINYINT:
773  convert_column<int8_t>(results_, col, entry_count, result[col]);
774  break;
775  case kSMALLINT:
776  convert_column<int16_t>(results_, col, entry_count, result[col]);
777  break;
778  case kINT:
779  convert_column<int32_t>(results_, col, entry_count, result[col]);
780  break;
781  case kBIGINT:
782  convert_column<int64_t>(results_, col, entry_count, result[col]);
783  break;
784  case kFLOAT:
785  convert_column<float>(results_, col, entry_count, result[col]);
786  break;
787  case kDOUBLE:
788  convert_column<double>(results_, col, entry_count, result[col]);
789  break;
790  default:
791  throw std::runtime_error(column.col_type.get_type_name() +
792  " is not supported in Arrow column converter.");
793  }
794  }
795  };
796 
797  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
798  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
799  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
800  bool use_columnar_converter = results_->isDirectColumnarConversionPossible() &&
801  results_->getQueryMemDesc().getQueryDescriptionType() ==
803  entry_count == results_->entryCount();
804  std::vector<bool> non_lazy_cols;
805  if (use_columnar_converter) {
806  auto timer = DEBUG_TIMER("columnar converter");
807  std::vector<size_t> non_lazy_col_pos;
808  size_t non_lazy_col_count = 0;
809  const auto& lazy_fetch_info = results_->getLazyFetchInfo();
810 
811  non_lazy_cols.reserve(col_count);
812  non_lazy_col_pos.reserve(col_count);
813  for (size_t i = 0; i < col_count; ++i) {
814  bool is_lazy =
815  lazy_fetch_info.empty() ? false : lazy_fetch_info[i].is_lazily_fetched;
816  // Currently column converter cannot handle some data types.
817  // Treat them as lazy.
818  switch (builders[i].physical_type) {
819  case kBOOLEAN:
820  case kTIME:
821  case kDATE:
822  case kTIMESTAMP:
823  is_lazy = true;
824  break;
825  default:
826  break;
827  }
828  if (builders[i].field->type()->id() == arrow::Type::DICTIONARY) {
829  is_lazy = true;
830  }
831  non_lazy_cols.emplace_back(!is_lazy);
832  if (!is_lazy) {
833  ++non_lazy_col_count;
834  non_lazy_col_pos.emplace_back(i);
835  }
836  }
837 
838  if (non_lazy_col_count == col_count) {
839  non_lazy_cols.clear();
840  non_lazy_col_pos.clear();
841  } else {
842  non_lazy_col_pos.emplace_back(col_count);
843  }
844 
845  std::vector<std::future<void>> child_threads;
846  size_t num_threads =
847  std::min(multithreaded ? (size_t)cpu_threads() : (size_t)1, non_lazy_col_count);
848 
849  size_t start_col = 0;
850  size_t end_col = 0;
851  for (size_t i = 0; i < num_threads; ++i) {
852  start_col = end_col;
853  end_col = (i + 1) * non_lazy_col_count / num_threads;
854  size_t phys_start_col =
855  non_lazy_col_pos.empty() ? start_col : non_lazy_col_pos[start_col];
856  size_t phys_end_col =
857  non_lazy_col_pos.empty() ? end_col : non_lazy_col_pos[end_col];
858  child_threads.push_back(std::async(std::launch::async,
859  convert_columns,
860  std::ref(result_columns),
861  non_lazy_cols,
862  phys_start_col,
863  phys_end_col));
864  }
865  for (auto& child : child_threads) {
866  child.get();
867  }
868  row_count = entry_count;
869  }
870  if (!use_columnar_converter || !non_lazy_cols.empty()) {
871  auto timer = DEBUG_TIMER("row converter");
872  row_count = 0;
873  if (multithreaded) {
874  const size_t cpu_count = cpu_threads();
875  std::vector<std::future<size_t>> child_threads;
876  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
877  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
878  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
879  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
880  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
881  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
882  ++i, start_entry += stride) {
883  const auto end_entry = std::min(entry_count, start_entry + stride);
884  child_threads.push_back(std::async(std::launch::async,
885  fetch,
886  std::ref(column_value_segs[i]),
887  std::ref(null_bitmap_segs[i]),
888  non_lazy_cols,
889  start_entry,
890  end_entry));
891  }
892  for (auto& child : child_threads) {
893  row_count += child.get();
894  }
895  {
896  auto timer = DEBUG_TIMER("append rows to arrow");
897  for (int i = 0; i < schema->num_fields(); ++i) {
898  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
899  continue;
900  }
901 
902  for (size_t j = 0; j < cpu_count; ++j) {
903  if (!column_value_segs[j][i]) {
904  continue;
905  }
906  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
907  }
908  }
909  }
910  } else {
911  row_count =
912  fetch(column_values, null_bitmaps, non_lazy_cols, size_t(0), entry_count);
913  {
914  auto timer = DEBUG_TIMER("append rows to arrow single thread");
915  for (int i = 0; i < schema->num_fields(); ++i) {
916  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
917  continue;
918  }
919 
920  append(builders[i], *column_values[i], null_bitmaps[i]);
921  }
922  }
923  }
924 
925  {
926  auto timer = DEBUG_TIMER("finish builders");
927  for (size_t i = 0; i < col_count; ++i) {
928  if (!non_lazy_cols.empty() && non_lazy_cols[i]) {
929  continue;
930  }
931 
932  result_columns[i] = finishColumnBuilder(builders[i]);
933  }
934  }
935  }
936 
937  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
938 }
#define CHECK_EQ(x, y)
Definition: Logger.h:219
Definition: sqltypes.h:49
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
#define DICTIONARY
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
future< Result > async(Fn &&fn, Args &&...args)
#define ARROW_RECORDBATCH_MAKE
ExecutorDeviceType device_type_
Definition: sqltypes.h:53
std::shared_ptr< ResultSet > results_
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
Definition: sqltypes.h:45
int cpu_threads()
Definition: thread_count.h:24

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ArrowResult ArrowResultSetConverter::getArrowResult ( ) const

Serialize an Arrow result to IPC memory. Users are responsible for freeing all CPU IPC buffers using deallocateArrowResultBuffer. GPU buffers will become owned by the caller upon deserialization, and will be automatically freed when they go out of scope.

Definition at line 343 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_LOG, ARROW_THROW_NOT_OK, CHECK, CHECK_GE, convertToArrow(), CPU, DEBUG_TIMER, device_id_, device_type_, arrow::get_and_copy_to_shm(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_shm_buffer(), GPU, SHARED_MEMORY, transport_method_, UNREACHABLE, and WIRE.

343  {
344  auto timer = DEBUG_TIMER(__func__);
345  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
346 
347  struct BuildResultParams {
348  int64_t schemaSize() const {
349  return serialized_schema ? serialized_schema->size() : 0;
350  };
351  int64_t dictSize() const { return serialized_dict ? serialized_dict->size() : 0; };
352  int64_t totalSize() const { return schemaSize() + records_size + dictSize(); }
353  bool hasRecordBatch() const { return records_size > 0; }
354  bool hasDict() const { return dictSize() > 0; }
355 
356  int64_t records_size{0};
357  std::shared_ptr<arrow::Buffer> serialized_schema{nullptr};
358  std::shared_ptr<arrow::Buffer> serialized_dict{nullptr};
359  } result_params;
360 
363  const auto getWireResult = [&]() -> ArrowResult {
364  auto timer = DEBUG_TIMER("serialize batch to wire");
365  const auto total_size = result_params.totalSize();
366  std::vector<char> record_handle_data(total_size);
367  auto serialized_records =
368  arrow::MutableBuffer::Wrap(record_handle_data.data(), total_size);
369 
370  ARROW_ASSIGN_OR_THROW(auto writer, arrow::Buffer::GetWriter(serialized_records));
371 
372  ARROW_THROW_NOT_OK(writer->Write(
373  reinterpret_cast<const uint8_t*>(result_params.serialized_schema->data()),
374  result_params.schemaSize()));
375 
376  if (result_params.hasDict()) {
377  ARROW_THROW_NOT_OK(writer->Write(
378  reinterpret_cast<const uint8_t*>(result_params.serialized_dict->data()),
379  result_params.dictSize()));
380  }
381 
382  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
383  serialized_records, result_params.schemaSize() + result_params.dictSize()));
384 
385  if (result_params.hasRecordBatch()) {
386  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
387  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
388  }
389 
390  return {std::vector<char>(0),
391  0,
392  std::vector<char>(0),
393  serialized_records->size(),
394  std::string{""},
395  std::move(record_handle_data)};
396  };
397 
398  const auto getShmResult = [&]() -> ArrowResult {
399  auto timer = DEBUG_TIMER("serialize batch to shared memory");
400  std::shared_ptr<arrow::Buffer> serialized_records;
401  std::vector<char> schema_handle_buffer;
402  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
403  key_t records_shm_key = IPC_PRIVATE;
404  const int64_t total_size = result_params.totalSize();
405 
406  std::tie(records_shm_key, serialized_records) = get_shm_buffer(total_size);
407 
408  memcpy(serialized_records->mutable_data(),
409  result_params.serialized_schema->data(),
410  (size_t)result_params.schemaSize());
411 
412  if (result_params.hasDict()) {
413  memcpy(serialized_records->mutable_data() + result_params.schemaSize(),
414  result_params.serialized_dict->data(),
415  (size_t)result_params.dictSize());
416  }
417 
418  arrow::io::FixedSizeBufferWriter stream(SliceMutableBuffer(
419  serialized_records, result_params.schemaSize() + result_params.dictSize()));
420 
421  if (result_params.hasRecordBatch()) {
422  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
423  *record_batch, arrow::ipc::IpcWriteOptions::Defaults(), &stream));
424  }
425 
426  memcpy(&record_handle_buffer[0],
427  reinterpret_cast<const unsigned char*>(&records_shm_key),
428  sizeof(key_t));
429 
430  return {schema_handle_buffer,
431  0,
432  record_handle_buffer,
433  serialized_records->size(),
434  std::string{""}};
435  };
436 
437  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
438  auto options = arrow::ipc::IpcWriteOptions::Defaults();
439  auto dict_stream = arrow::io::BufferOutputStream::Create(1024).ValueOrDie();
440 
441  // If our record batch is going to be empty, we omit it entirely,
442  // only serializing the schema.
443  if (!record_batch->num_rows()) {
444  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
445  arrow::ipc::SerializeSchema(*record_batch->schema(),
446  arrow::default_memory_pool()));
447 
448  switch (transport_method_) {
450  return getWireResult();
452  return getShmResult();
453  default:
454  UNREACHABLE();
455  }
456  }
457 
458  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
459 
460  ARROW_LOG("CPU") << "found " << dictionaries.size() << " dictionaries";
461 
462  for (auto& pair : dictionaries) {
463  arrow::ipc::IpcPayload payload;
464  int64_t dictionary_id = pair.first;
465  const auto& dictionary = pair.second;
466 
468  GetDictionaryPayload(dictionary_id, dictionary, options, &payload));
469  int32_t metadata_length = 0;
471  WriteIpcPayload(payload, options, dict_stream.get(), &metadata_length));
472  }
473  result_params.serialized_dict = dict_stream->Finish().ValueOrDie();
474 
475  ARROW_ASSIGN_OR_THROW(result_params.serialized_schema,
476  arrow::ipc::SerializeSchema(*record_batch->schema(),
477  arrow::default_memory_pool()));
478 
480  arrow::ipc::GetRecordBatchSize(*record_batch, &result_params.records_size));
481 
482  switch (transport_method_) {
484  return getWireResult();
486  return getShmResult();
487  default:
488  UNREACHABLE();
489  }
490  }
491 #ifdef HAVE_CUDA
493 
494  // Copy the schema to the schema handle
495  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
496  ARROW_THROW_NOT_OK(out_stream_result.status());
497  auto out_stream = std::move(out_stream_result).ValueOrDie();
498 
499  arrow::ipc::DictionaryFieldMapper mapper(*record_batch->schema());
500  arrow::ipc::DictionaryMemo current_memo;
501  arrow::ipc::DictionaryMemo serialized_memo;
502 
503  arrow::ipc::IpcPayload schema_payload;
504  ARROW_THROW_NOT_OK(arrow::ipc::GetSchemaPayload(*record_batch->schema(),
505  arrow::ipc::IpcWriteOptions::Defaults(),
506  mapper,
507  &schema_payload));
508  int32_t schema_payload_length = 0;
509  ARROW_THROW_NOT_OK(arrow::ipc::WriteIpcPayload(schema_payload,
510  arrow::ipc::IpcWriteOptions::Defaults(),
511  out_stream.get(),
512  &schema_payload_length));
513  ARROW_ASSIGN_OR_THROW(auto dictionaries, CollectDictionaries(*record_batch, mapper));
514  ARROW_LOG("GPU") << "Dictionary "
515  << "found dicts: " << dictionaries.size();
516 
518  arrow::ipc::internal::CollectDictionaries(*record_batch, &current_memo));
519 
520  // now try a dictionary
521  std::shared_ptr<arrow::Schema> dummy_schema;
522  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
523 
524  for (const auto& pair : dictionaries) {
525  arrow::ipc::IpcPayload payload;
526  const auto& dict_id = pair.first;
527  CHECK_GE(dict_id, 0);
528  ARROW_LOG("GPU") << "Dictionary "
529  << "dict_id: " << dict_id;
530  const auto& dict = pair.second;
531  CHECK(dict);
532 
533  if (!dummy_schema) {
534  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
535  dummy_schema = std::make_shared<arrow::Schema>(
536  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
537  }
538  dict_batches.emplace_back(
539  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
540  }
541 
542  if (!dict_batches.empty()) {
543  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
544  dict_batches, arrow::ipc::IpcWriteOptions::Defaults(), out_stream.get()));
545  }
546 
547  auto complete_ipc_stream = out_stream->Finish();
548  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
549  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
550 
551  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
552  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
553  memcpy(&schema_record_key_buffer[0],
554  reinterpret_cast<const unsigned char*>(&record_key),
555  sizeof(key_t));
556 
557  arrow::cuda::CudaDeviceManager* manager;
558  ARROW_ASSIGN_OR_THROW(manager, arrow::cuda::CudaDeviceManager::Instance());
559  std::shared_ptr<arrow::cuda::CudaContext> context;
560  ARROW_ASSIGN_OR_THROW(context, manager->GetContext(device_id_));
561 
562  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
563  ARROW_ASSIGN_OR_THROW(device_serialized,
564  SerializeRecordBatch(*record_batch, context.get()));
565 
566  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
567  ARROW_ASSIGN_OR_THROW(cuda_handle, device_serialized->ExportForIpc());
568 
569  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
570  ARROW_ASSIGN_OR_THROW(serialized_cuda_handle,
571  cuda_handle->Serialize(arrow::default_memory_pool()));
572 
573  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
574  memcpy(&record_handle_buffer[0],
575  serialized_cuda_handle->data(),
576  serialized_cuda_handle->size());
577 
578  return {schema_record_key_buffer,
579  serialized_records->size(),
580  record_handle_buffer,
581  serialized_cuda_handle->size(),
582  serialized_cuda_handle->ToString()};
583 #else
584  UNREACHABLE();
585  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
586 #endif
587 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define ARROW_LOG(category)
#define UNREACHABLE()
Definition: Logger.h:255
#define CHECK_GE(x, y)
Definition: Logger.h:224
std::pair< key_t, std::shared_ptr< arrow::Buffer > > get_shm_buffer(size_t size)
ArrowTransport transport_method_
ExecutorDeviceType device_type_
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
#define CHECK(condition)
Definition: Logger.h:211
#define DEBUG_TIMER(name)
Definition: Logger.h:358
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

ArrowResultSetConverter::SerializedArrowOutput ArrowResultSetConverter::getSerializedArrowOutput ( arrow::ipc::DictionaryFieldMapper *  mapper) const
private

Definition at line 590 of file ArrowResultSetConverter.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, convertToArrow(), and DEBUG_TIMER.

591  {
592  auto timer = DEBUG_TIMER(__func__);
593  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
594  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
595 
597  serialized_schema,
598  arrow::ipc::SerializeSchema(*arrow_copy->schema(), arrow::default_memory_pool()));
599 
600  if (arrow_copy->num_rows()) {
601  auto timer = DEBUG_TIMER("serialize records");
602  ARROW_THROW_NOT_OK(arrow_copy->Validate());
603  ARROW_ASSIGN_OR_THROW(serialized_records,
604  arrow::ipc::SerializeRecordBatch(
605  *arrow_copy, arrow::ipc::IpcWriteOptions::Defaults()));
606  } else {
607  ARROW_ASSIGN_OR_THROW(serialized_records, arrow::AllocateBuffer(0));
608  }
609  return {serialized_schema, serialized_records};
610 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
#define DEBUG_TIMER(name)
Definition: Logger.h:358
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

void ArrowResultSetConverter::initializeColumnBuilder ( ColumnBuilder column_builder,
const SQLTypeInfo col_type,
const std::shared_ptr< arrow::Field > &  field 
) const
private

Definition at line 1056 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, ArrowResultSetConverter::ColumnBuilder::builder, CHECK, ArrowResultSetConverter::ColumnBuilder::col_type, field(), ArrowResultSetConverter::ColumnBuilder::field, SQLTypeInfo::get_comp_param(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_dict_index_type(), foreign_storage::get_physical_type(), SQLTypeInfo::is_dict_encoded_string(), ArrowResultSetConverter::ColumnBuilder::physical_type, results_, and ArrowResultSetConverter::ColumnBuilder::transient_string_remapping.

Referenced by getArrowBatch().

1059  {
1060  column_builder.field = field;
1061  column_builder.col_type = col_type;
1062  column_builder.physical_type = col_type.is_dict_encoded_string()
1063  ? get_dict_index_type(col_type)
1064  : get_physical_type(col_type);
1065 
1066  auto value_type = field->type();
1067  if (col_type.is_dict_encoded_string()) {
1068  column_builder.builder.reset(new arrow::StringDictionary32Builder());
1069  // add values to the builder
1070  const int dict_id = col_type.get_comp_param();
1071  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
1072 
1073  arrow::StringBuilder str_array_builder;
1074  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(str_list));
1075 
1076  // add transients
1077  auto sdp = results_->getStringDictionaryProxy(dict_id);
1078  CHECK(sdp);
1079 
1080  const auto& transient_map = sdp->getTransientMapping();
1081  int32_t crt_transient_id = static_cast<int32_t>(str_list.size());
1082  for (auto transient_pair : transient_map) {
1083  ARROW_THROW_NOT_OK(str_array_builder.Append(transient_pair.second));
1084  CHECK(column_builder.transient_string_remapping
1085  .insert(std::make_pair(transient_pair.first, crt_transient_id++))
1086  .second);
1087  }
1088 
1089  std::shared_ptr<arrow::StringArray> string_array;
1090  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
1091 
1092  auto dict_builder =
1093  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
1094  CHECK(dict_builder);
1095 
1096  ARROW_THROW_NOT_OK(dict_builder->InsertMemoValues(*string_array));
1097  } else {
1098  ARROW_THROW_NOT_OK(arrow::MakeBuilder(
1099  arrow::default_memory_pool(), value_type, &column_builder.builder));
1100  }
1101 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
parquet::Type::type get_physical_type(ReaderPtr &reader, const int logical_column_index)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< ResultSet > results_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:211
bool is_dict_encoded_string() const
Definition: sqltypes.h:557

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::shared_ptr< arrow::Field > ArrowResultSetConverter::makeField ( const std::string  name,
const SQLTypeInfo target_type 
) const
private

Definition at line 1009 of file ArrowResultSetConverter.cpp.

References device_type_, field(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_arrow_type(), and SQLTypeInfo::get_notnull().

Referenced by convertToArrow().

1011  {
1012  return arrow::field(
1013  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
1014 }
string name
Definition: setup.in.py:72
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
ExecutorDeviceType device_type_
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &sql_type, const ExecutorDeviceType device_type)
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Friends And Related Function Documentation

friend class ArrowResultSet
friend

Definition at line 254 of file ArrowResultSet.h.

Member Data Documentation

std::vector<std::string> ArrowResultSetConverter::col_names_
private

Definition at line 250 of file ArrowResultSet.h.

Referenced by convertToArrow().

std::shared_ptr<Data_Namespace::DataMgr> ArrowResultSetConverter::data_mgr_ = nullptr
private

Definition at line 247 of file ArrowResultSet.h.

int32_t ArrowResultSetConverter::device_id_ = 0
private

Definition at line 249 of file ArrowResultSet.h.

Referenced by getArrowResult().

ExecutorDeviceType ArrowResultSetConverter::device_type_ = ExecutorDeviceType::GPU
private

Definition at line 248 of file ArrowResultSet.h.

Referenced by append(), getArrowBatch(), getArrowResult(), and makeField().

std::shared_ptr<ResultSet> ArrowResultSetConverter::results_
private

Definition at line 246 of file ArrowResultSet.h.

Referenced by convertToArrow(), getArrowBatch(), and initializeColumnBuilder().

int32_t ArrowResultSetConverter::top_n_
private

Definition at line 251 of file ArrowResultSet.h.

Referenced by getArrowBatch().

ArrowTransport ArrowResultSetConverter::transport_method_
private

Definition at line 252 of file ArrowResultSet.h.

Referenced by getArrowResult().


The documentation for this class was generated from the following files: