OmniSciDB  7bf56492aa
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSetConverter Class Reference

#include <ArrowResultSet.h>

Classes

struct  ColumnBuilder
 
struct  SerializedArrowOutput
 

Public Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n)
 
ArrowResult getArrowResult () const
 

Private Member Functions

 ArrowResultSetConverter (const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
 
std::shared_ptr
< arrow::RecordBatch > 
convertToArrow () const
 
std::shared_ptr
< arrow::RecordBatch > 
getArrowBatch (const std::shared_ptr< arrow::Schema > &schema) const
 
std::shared_ptr< arrow::Field > makeField (const std::string name, const SQLTypeInfo &target_type) const
 
SerializedArrowOutput getSerializedArrowOutput (arrow::ipc::DictionaryMemo *memo) const
 
void initializeColumnBuilder (ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
 
void append (ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
 
std::shared_ptr< arrow::Array > finishColumnBuilder (ColumnBuilder &column_builder) const
 

Private Attributes

std::shared_ptr< ResultSetresults_
 
std::shared_ptr
< Data_Namespace::DataMgr
data_mgr_ = nullptr
 
ExecutorDeviceType device_type_ = ExecutorDeviceType::GPU
 
int32_t device_id_ = 0
 
std::vector< std::string > col_names_
 
int32_t top_n_
 

Friends

class ArrowResultSet
 

Detailed Description

Definition at line 172 of file ArrowResultSet.h.

Constructor & Destructor Documentation

ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::shared_ptr< Data_Namespace::DataMgr data_mgr,
const ExecutorDeviceType  device_type,
const int32_t  device_id,
const std::vector< std::string > &  col_names,
const int32_t  first_n 
)
inline

Definition at line 174 of file ArrowResultSet.h.

180  : results_(results)
181  , data_mgr_(data_mgr)
182  , device_type_(device_type)
183  , device_id_(device_id)
184  , col_names_(col_names)
185  , top_n_(first_n) {}
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
std::vector< std::string > col_names_
ExecutorDeviceType device_type_
std::shared_ptr< ResultSet > results_
ArrowResultSetConverter::ArrowResultSetConverter ( const std::shared_ptr< ResultSet > &  results,
const std::vector< std::string > &  col_names,
const int32_t  first_n 
)
inlineprivate

Definition at line 199 of file ArrowResultSet.h.

202  : results_(results), col_names_(col_names), top_n_(first_n) {}
std::vector< std::string > col_names_
std::shared_ptr< ResultSet > results_

Member Function Documentation

void ArrowResultSetConverter::append ( ColumnBuilder column_builder,
const ValueArray values,
const std::shared_ptr< std::vector< bool >> &  is_valid 
) const
private

Definition at line 727 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, ArrowResultSetConverter::ColumnBuilder::col_type, GPU, SQLTypeInfo::is_dict_encoded_string(), kBIGINT, kBOOLEAN, kCHAR, kDATE, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, kVARCHAR, and ArrowResultSetConverter::ColumnBuilder::physical_type.

730  {
731  if (column_builder.col_type.is_dict_encoded_string()) {
732  CHECK_EQ(column_builder.physical_type,
733  kINT); // assume all dicts use none-encoded type for now
734  appendToColumnBuilder<StringDictionary32Builder, int32_t>(
735  column_builder, values, is_valid);
736  return;
737  }
738  switch (column_builder.physical_type) {
739  case kBOOLEAN:
740  appendToColumnBuilder<BooleanBuilder, bool>(column_builder, values, is_valid);
741  break;
742  case kTINYINT:
743  appendToColumnBuilder<Int8Builder, int8_t>(column_builder, values, is_valid);
744  break;
745  case kSMALLINT:
746  appendToColumnBuilder<Int16Builder, int16_t>(column_builder, values, is_valid);
747  break;
748  case kINT:
749  appendToColumnBuilder<Int32Builder, int32_t>(column_builder, values, is_valid);
750  break;
751  case kBIGINT:
752  appendToColumnBuilder<Int64Builder, int64_t>(column_builder, values, is_valid);
753  break;
754  case kFLOAT:
755  appendToColumnBuilder<FloatBuilder, float>(column_builder, values, is_valid);
756  break;
757  case kDOUBLE:
758  appendToColumnBuilder<DoubleBuilder, double>(column_builder, values, is_valid);
759  break;
760  case kTIME:
761  appendToColumnBuilder<Time32Builder, int32_t>(column_builder, values, is_valid);
762  break;
763  case kTIMESTAMP:
764  appendToColumnBuilder<TimestampBuilder, int64_t>(column_builder, values, is_valid);
765  break;
766  case kDATE:
768  ? appendToColumnBuilder<Date64Builder, int64_t>(
769  column_builder, values, is_valid)
770  : appendToColumnBuilder<Date32Builder, int32_t>(
771  column_builder, values, is_valid);
772  break;
773  case kCHAR:
774  case kVARCHAR:
775  case kTEXT:
776  default:
777  // TODO(miyu): support more scalar types.
778  throw std::runtime_error(column_builder.col_type.get_type_name() +
779  " is not supported in Arrow result sets.");
780  }
781 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:50
ExecutorDeviceType device_type_
Definition: sqltypes.h:53
Definition: sqltypes.h:54
Definition: sqltypes.h:42
Definition: sqltypes.h:46

+ Here is the call graph for this function:

std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::convertToArrow ( ) const
private

Definition at line 346 of file ArrowResultSetConverter.cpp.

References CHECK().

346  {
347  const auto col_count = results_->colCount();
348  std::vector<std::shared_ptr<arrow::Field>> fields;
349  CHECK(col_names_.empty() || col_names_.size() == col_count);
350  for (size_t i = 0; i < col_count; ++i) {
351  const auto ti = results_->getColType(i);
352  fields.push_back(makeField(col_names_.empty() ? "" : col_names_[i], ti));
353  }
354  return getArrowBatch(arrow::schema(fields));
355 }
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::vector< std::string > col_names_
CHECK(cgen_state)
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
std::shared_ptr< ResultSet > results_

+ Here is the call graph for this function:

std::shared_ptr< arrow::Array > ArrowResultSetConverter::finishColumnBuilder ( ColumnBuilder column_builder) const
inlineprivate

Definition at line 658 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, and ArrowResultSetConverter::ColumnBuilder::builder.

659  {
660  std::shared_ptr<Array> values;
661  ARROW_THROW_NOT_OK(column_builder.builder->Finish(&values));
662  return values;
663 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:37
std::shared_ptr< arrow::RecordBatch > ArrowResultSetConverter::getArrowBatch ( const std::shared_ptr< arrow::Schema > &  schema) const
private

Definition at line 357 of file ArrowResultSetConverter.cpp.

References File_Namespace::append(), ARROW_RECORDBATCH_MAKE, CHECK(), CHECK_EQ, cpu_threads(), GPU, kBIGINT, kBOOLEAN, kDATE, kDOUBLE, kFLOAT, kINT, kSMALLINT, kTIME, kTIMESTAMP, and kTINYINT.

358  {
359  std::vector<std::shared_ptr<arrow::Array>> result_columns;
360 
361  const size_t entry_count = top_n_ < 0
362  ? results_->entryCount()
363  : std::min(size_t(top_n_), results_->entryCount());
364  if (!entry_count) {
365  return ARROW_RECORDBATCH_MAKE(schema, 0, result_columns);
366  }
367  const auto col_count = results_->colCount();
368  size_t row_count = 0;
369 
370  std::vector<ColumnBuilder> builders(col_count);
371 
372  // Create array builders
373  for (size_t i = 0; i < col_count; ++i) {
374  initializeColumnBuilder(builders[i], results_->getColType(i), schema->field(i));
375  }
376 
377  // TODO(miyu): speed up for columnar buffers
378  auto fetch = [&](std::vector<std::shared_ptr<ValueArray>>& value_seg,
379  std::vector<std::shared_ptr<std::vector<bool>>>& null_bitmap_seg,
380  const size_t start_entry,
381  const size_t end_entry) -> size_t {
382  CHECK_EQ(value_seg.size(), col_count);
383  CHECK_EQ(null_bitmap_seg.size(), col_count);
384  const auto entry_count = end_entry - start_entry;
385  size_t seg_row_count = 0;
386  for (size_t i = start_entry; i < end_entry; ++i) {
387  auto row = results_->getRowAtNoTranslations(i);
388  if (row.empty()) {
389  continue;
390  }
391  ++seg_row_count;
392  for (size_t j = 0; j < col_count; ++j) {
393  auto scalar_value = boost::get<ScalarTargetValue>(&row[j]);
394  // TODO(miyu): support more types other than scalar.
395  CHECK(scalar_value);
396  const auto& column = builders[j];
397  switch (column.physical_type) {
398  case kBOOLEAN:
399  create_or_append_value<bool, int64_t>(
400  *scalar_value, value_seg[j], entry_count);
401  create_or_append_validity<int64_t>(
402  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
403  break;
404  case kTINYINT:
405  create_or_append_value<int8_t, int64_t>(
406  *scalar_value, value_seg[j], entry_count);
407  create_or_append_validity<int64_t>(
408  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
409  break;
410  case kSMALLINT:
411  create_or_append_value<int16_t, int64_t>(
412  *scalar_value, value_seg[j], entry_count);
413  create_or_append_validity<int64_t>(
414  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
415  break;
416  case kINT:
417  create_or_append_value<int32_t, int64_t>(
418  *scalar_value, value_seg[j], entry_count);
419  create_or_append_validity<int64_t>(
420  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
421  break;
422  case kBIGINT:
423  create_or_append_value<int64_t, int64_t>(
424  *scalar_value, value_seg[j], entry_count);
425  create_or_append_validity<int64_t>(
426  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
427  break;
428  case kFLOAT:
429  create_or_append_value<float, float>(
430  *scalar_value, value_seg[j], entry_count);
431  create_or_append_validity<float>(
432  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
433  break;
434  case kDOUBLE:
435  create_or_append_value<double, double>(
436  *scalar_value, value_seg[j], entry_count);
437  create_or_append_validity<double>(
438  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
439  break;
440  case kTIME:
441  create_or_append_value<int32_t, int64_t>(
442  *scalar_value, value_seg[j], entry_count);
443  create_or_append_validity<int64_t>(
444  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
445  break;
446  case kDATE:
448  ? create_or_append_value<int64_t, int64_t>(
449  *scalar_value, value_seg[j], entry_count)
450  : create_or_append_value<int32_t, int64_t>(
451  *scalar_value, value_seg[j], entry_count);
452  create_or_append_validity<int64_t>(
453  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
454  break;
455  case kTIMESTAMP:
456  create_or_append_value<int64_t, int64_t>(
457  *scalar_value, value_seg[j], entry_count);
458  create_or_append_validity<int64_t>(
459  *scalar_value, column.col_type, null_bitmap_seg[j], entry_count);
460  break;
461  default:
462  // TODO(miyu): support more scalar types.
463  throw std::runtime_error(column.col_type.get_type_name() +
464  " is not supported in Arrow result sets.");
465  }
466  }
467  }
468  return seg_row_count;
469  };
470 
471  std::vector<std::shared_ptr<ValueArray>> column_values(col_count, nullptr);
472  std::vector<std::shared_ptr<std::vector<bool>>> null_bitmaps(col_count, nullptr);
473  const bool multithreaded = entry_count > 10000 && !results_->isTruncated();
474  if (multithreaded) {
475  const size_t cpu_count = cpu_threads();
476  std::vector<std::future<size_t>> child_threads;
477  std::vector<std::vector<std::shared_ptr<ValueArray>>> column_value_segs(
478  cpu_count, std::vector<std::shared_ptr<ValueArray>>(col_count, nullptr));
479  std::vector<std::vector<std::shared_ptr<std::vector<bool>>>> null_bitmap_segs(
480  cpu_count, std::vector<std::shared_ptr<std::vector<bool>>>(col_count, nullptr));
481  const auto stride = (entry_count + cpu_count - 1) / cpu_count;
482  for (size_t i = 0, start_entry = 0; start_entry < entry_count;
483  ++i, start_entry += stride) {
484  const auto end_entry = std::min(entry_count, start_entry + stride);
485  child_threads.push_back(std::async(std::launch::async,
486  fetch,
487  std::ref(column_value_segs[i]),
488  std::ref(null_bitmap_segs[i]),
489  start_entry,
490  end_entry));
491  }
492  for (auto& child : child_threads) {
493  row_count += child.get();
494  }
495  for (int i = 0; i < schema->num_fields(); ++i) {
496  for (size_t j = 0; j < cpu_count; ++j) {
497  if (!column_value_segs[j][i]) {
498  continue;
499  }
500  append(builders[i], *column_value_segs[j][i], null_bitmap_segs[j][i]);
501  }
502  }
503  } else {
504  row_count = fetch(column_values, null_bitmaps, size_t(0), entry_count);
505  for (int i = 0; i < schema->num_fields(); ++i) {
506  append(builders[i], *column_values[i], null_bitmaps[i]);
507  }
508  }
509 
510  for (size_t i = 0; i < col_count; ++i) {
511  result_columns.push_back(finishColumnBuilder(builders[i]));
512  }
513  return ARROW_RECORDBATCH_MAKE(schema, row_count, result_columns);
514 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:50
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
#define ARROW_RECORDBATCH_MAKE
CHECK(cgen_state)
ExecutorDeviceType device_type_
Definition: sqltypes.h:54
std::shared_ptr< ResultSet > results_
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
Definition: sqltypes.h:46
int cpu_threads()
Definition: thread_count.h:25

+ Here is the call graph for this function:

ArrowResult ArrowResultSetConverter::getArrowResult ( ) const

Serialize an Arrow result to IPC memory. Users are responsible for freeing all CPU IPC buffers using deallocateArrowResultBuffer. GPU buffers will become owned by the caller upon deserialization, and will be automatically freed when they go out of scope.

Definition at line 203 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, CHECK(), CHECK_GE, CPU, field(), arrow::get_and_copy_to_shm(), GPU, and UNREACHABLE.

203  {
204  std::shared_ptr<arrow::RecordBatch> record_batch = convertToArrow();
205 
207  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
208  ARROW_THROW_NOT_OK(out_stream_result.status());
209  auto out_stream = std::move(out_stream_result).ValueOrDie();
210 
211  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
212  {record_batch}, ipc::IpcOptions::Defaults(), out_stream.get()));
213 
214  auto complete_ipc_stream = out_stream->Finish();
215  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
216  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
217 
218  std::vector<char> schema_handle_buffer;
219 
220  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
221  std::vector<char> record_handle_buffer(sizeof(key_t), 0);
222  memcpy(&record_handle_buffer[0],
223  reinterpret_cast<const unsigned char*>(&record_key),
224  sizeof(key_t));
225 
226  return {schema_handle_buffer,
227  0,
228  record_handle_buffer,
229  serialized_records->size(),
230  std::string{""}};
231  }
232 #ifdef HAVE_CUDA
234 
235  // Copy the schema to the schema handle
236  auto out_stream_result = arrow::io::BufferOutputStream::Create(1024);
237  ARROW_THROW_NOT_OK(out_stream_result.status());
238  auto out_stream = std::move(out_stream_result).ValueOrDie();
239 
240  arrow::ipc::DictionaryMemo current_memo;
241  arrow::ipc::DictionaryMemo serialized_memo;
242 
243  arrow::ipc::internal::IpcPayload schema_payload;
245  arrow::ipc::internal::GetSchemaPayload(*record_batch->schema(),
246  arrow::ipc::IpcOptions::Defaults(),
247  &serialized_memo,
248  &schema_payload));
249  int32_t schema_payload_length = 0;
251  arrow::ipc::internal::WriteIpcPayload(schema_payload,
252  arrow::ipc::IpcOptions::Defaults(),
253  out_stream.get(),
254  &schema_payload_length));
255 
256  ARROW_THROW_NOT_OK(CollectDictionaries(*record_batch, &current_memo));
257 
258  // now try a dictionary
259  std::shared_ptr<arrow::Schema> dummy_schema;
260  std::vector<std::shared_ptr<arrow::RecordBatch>> dict_batches;
261  for (int i = 0; i < record_batch->schema()->num_fields(); i++) {
262  auto field = record_batch->schema()->field(i);
263  if (field->type()->id() == arrow::Type::DICTIONARY) {
264  int64_t dict_id = -1;
265  ARROW_THROW_NOT_OK(current_memo.GetId(*field, &dict_id));
266  CHECK_GE(dict_id, 0);
267  std::shared_ptr<Array> dict;
268  ARROW_THROW_NOT_OK(current_memo.GetDictionary(dict_id, &dict));
269  CHECK(dict);
270 
271  if (!dummy_schema) {
272  auto dummy_field = std::make_shared<arrow::Field>("", dict->type());
273  dummy_schema = std::make_shared<arrow::Schema>(
274  std::vector<std::shared_ptr<arrow::Field>>{dummy_field});
275  }
276  dict_batches.emplace_back(
277  arrow::RecordBatch::Make(dummy_schema, dict->length(), {dict}));
278  }
279  }
280  ARROW_THROW_NOT_OK(arrow::ipc::WriteRecordBatchStream(
281  dict_batches, ipc::IpcOptions::Defaults(), out_stream.get()));
282 
283  auto complete_ipc_stream = out_stream->Finish();
284  ARROW_THROW_NOT_OK(complete_ipc_stream.status());
285  auto serialized_records = std::move(complete_ipc_stream).ValueOrDie();
286 
287  const auto record_key = arrow::get_and_copy_to_shm(serialized_records);
288  std::vector<char> schema_record_key_buffer(sizeof(key_t), 0);
289  memcpy(&schema_record_key_buffer[0],
290  reinterpret_cast<const unsigned char*>(&record_key),
291  sizeof(key_t));
292 
293  arrow::cuda::CudaDeviceManager* manager;
294  ARROW_THROW_NOT_OK(arrow::cuda::CudaDeviceManager::GetInstance(&manager));
295  std::shared_ptr<arrow::cuda::CudaContext> context;
296  ARROW_THROW_NOT_OK(manager->GetContext(device_id_, &context));
297 
298  std::shared_ptr<arrow::cuda::CudaBuffer> device_serialized;
300  SerializeRecordBatch(*record_batch, context.get(), &device_serialized));
301 
302  std::shared_ptr<arrow::cuda::CudaIpcMemHandle> cuda_handle;
303  ARROW_THROW_NOT_OK(device_serialized->ExportForIpc(&cuda_handle));
304 
305  std::shared_ptr<arrow::Buffer> serialized_cuda_handle;
307  cuda_handle->Serialize(arrow::default_memory_pool(), &serialized_cuda_handle));
308 
309  std::vector<char> record_handle_buffer(serialized_cuda_handle->size(), 0);
310  memcpy(&record_handle_buffer[0],
311  serialized_cuda_handle->data(),
312  serialized_cuda_handle->size());
313 
314  return {schema_record_key_buffer,
315  serialized_records->size(),
316  record_handle_buffer,
317  serialized_cuda_handle->size(),
318  serialized_cuda_handle->ToString()};
319 #else
320  UNREACHABLE();
321  return {std::vector<char>{}, 0, std::vector<char>{}, 0, ""};
322 #endif
323 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:37
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GE(x, y)
Definition: Logger.h:210
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
CHECK(cgen_state)
ExecutorDeviceType device_type_
key_t get_and_copy_to_shm(const std::shared_ptr< Buffer > &data)
std::shared_ptr< arrow::RecordBatch > convertToArrow() const

+ Here is the call graph for this function:

ArrowResultSetConverter::SerializedArrowOutput ArrowResultSetConverter::getSerializedArrowOutput ( arrow::ipc::DictionaryMemo *  memo) const
private

Definition at line 326 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK.

327  {
328  std::shared_ptr<arrow::RecordBatch> arrow_copy = convertToArrow();
329  std::shared_ptr<arrow::Buffer> serialized_records, serialized_schema;
330 
331  ARROW_THROW_NOT_OK(arrow::ipc::SerializeSchema(
332  *arrow_copy->schema(), memo, arrow::default_memory_pool(), &serialized_schema));
333 
334  ARROW_THROW_NOT_OK(CollectDictionaries(*arrow_copy, memo));
335 
336  if (arrow_copy->num_rows()) {
337  ARROW_THROW_NOT_OK(arrow_copy->Validate());
338  ARROW_THROW_NOT_OK(arrow::ipc::SerializeRecordBatch(
339  *arrow_copy, arrow::default_memory_pool(), &serialized_records));
340  } else {
341  ARROW_THROW_NOT_OK(arrow::AllocateBuffer(0, &serialized_records));
342  }
343  return {serialized_schema, serialized_records};
344 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:37
std::shared_ptr< arrow::RecordBatch > convertToArrow() const
void ArrowResultSetConverter::initializeColumnBuilder ( ColumnBuilder column_builder,
const SQLTypeInfo col_type,
const std::shared_ptr< arrow::Field > &  field 
) const
private

Definition at line 625 of file ArrowResultSetConverter.cpp.

References ARROW_THROW_NOT_OK, ArrowResultSetConverter::ColumnBuilder::builder, CHECK(), ArrowResultSetConverter::ColumnBuilder::col_type, field(), ArrowResultSetConverter::ColumnBuilder::field, SQLTypeInfo::get_comp_param(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_dict_index_type(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_physical_type(), SQLTypeInfo::is_dict_encoded_string(), and ArrowResultSetConverter::ColumnBuilder::physical_type.

628  {
629  column_builder.field = field;
630  column_builder.col_type = col_type;
631  column_builder.physical_type = col_type.is_dict_encoded_string()
632  ? get_dict_index_type(col_type)
633  : get_physical_type(col_type);
634 
635  auto value_type = field->type();
636  if (col_type.is_dict_encoded_string()) {
637  column_builder.builder.reset(new StringDictionary32Builder());
638  // add values to the builder
639  const int dict_id = col_type.get_comp_param();
640  auto str_list = results_->getStringDictionaryPayloadCopy(dict_id);
641 
642  arrow::StringBuilder str_array_builder;
643  ARROW_THROW_NOT_OK(str_array_builder.AppendValues(*str_list));
644  std::shared_ptr<StringArray> string_array;
645  ARROW_THROW_NOT_OK(str_array_builder.Finish(&string_array));
646 
647  auto dict_builder =
648  dynamic_cast<arrow::StringDictionary32Builder*>(column_builder.builder.get());
649  CHECK(dict_builder);
650 
651  dict_builder->InsertMemoValues(*string_array);
652  } else {
654  arrow::MakeBuilder(default_memory_pool(), value_type, &column_builder.builder));
655  }
656 }
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:37
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
CHECK(cgen_state)
std::shared_ptr< ResultSet > results_
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:257
bool is_dict_encoded_string() const
Definition: sqltypes.h:425

+ Here is the call graph for this function:

std::shared_ptr< arrow::Field > ArrowResultSetConverter::makeField ( const std::string  name,
const SQLTypeInfo target_type 
) const
private

Definition at line 580 of file ArrowResultSetConverter.cpp.

References field(), anonymous_namespace{ArrowResultSetConverter.cpp}::get_arrow_type(), and SQLTypeInfo::get_notnull().

582  {
583  return arrow::field(
584  name, get_arrow_type(target_type, device_type_), !target_type.get_notnull());
585 }
std::shared_ptr< arrow::DataType > get_arrow_type(const SQLTypeInfo &mapd_type, const ExecutorDeviceType device_type)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
ExecutorDeviceType device_type_
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:255

+ Here is the call graph for this function:

Friends And Related Function Documentation

friend class ArrowResultSet
friend

Definition at line 236 of file ArrowResultSet.h.

Member Data Documentation

std::vector<std::string> ArrowResultSetConverter::col_names_
private

Definition at line 233 of file ArrowResultSet.h.

std::shared_ptr<Data_Namespace::DataMgr> ArrowResultSetConverter::data_mgr_ = nullptr
private

Definition at line 230 of file ArrowResultSet.h.

int32_t ArrowResultSetConverter::device_id_ = 0
private

Definition at line 232 of file ArrowResultSet.h.

ExecutorDeviceType ArrowResultSetConverter::device_type_ = ExecutorDeviceType::GPU
private

Definition at line 231 of file ArrowResultSet.h.

std::shared_ptr<ResultSet> ArrowResultSetConverter::results_
private

Definition at line 229 of file ArrowResultSet.h.

int32_t ArrowResultSetConverter::top_n_
private

Definition at line 234 of file ArrowResultSet.h.


The documentation for this class was generated from the following files: