OmniSciDB  baf940c279
ArrowResultSet Class Reference

#include <ArrowResultSet.h>

Public Member Functions

 ArrowResultSet (const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta)
 
 ArrowResultSet (const std::shared_ptr< ResultSet > &rows)
 
ArrowResultSetRowIterator rowIterator (size_t from_index, bool translate_strings, bool decimal_to_double) const
 
ArrowResultSetRowIterator rowIterator (bool translate_strings, bool decimal_to_double) const
 
std::vector< TargetValuegetRowAt (const size_t index) const
 
std::vector< TargetValuegetNextRow (const bool translate_strings, const bool decimal_to_double) const
 
size_t colCount () const
 
SQLTypeInfo getColType (const size_t col_idx) const
 
bool definitelyHasNoRows () const
 
size_t rowCount () const
 

Static Public Member Functions

static void deallocateArrowResultBuffer (const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
 

Private Member Functions

void resultSetArrowLoopback ()
 
template<typename Type , typename ArrayType >
void appendValue (std::vector< TargetValue > &row, const arrow::Array &column, const Type null_val, const size_t idx) const
 

Private Attributes

std::shared_ptr< ResultSetrows_
 
std::vector< TargetMetaInfotargets_meta_
 
std::shared_ptr< arrow::RecordBatch > record_batch_
 
arrow::ipc::DictionaryMemo dictionary_memo_
 
std::vector< std::shared_ptr< arrow::Array > > columns_
 
size_t crt_row_idx_
 
std::vector< TargetMetaInfocolumn_metainfo_
 

Detailed Description

Definition at line 97 of file ArrowResultSet.h.

Constructor & Destructor Documentation

◆ ArrowResultSet() [1/2]

ArrowResultSet::ArrowResultSet ( const std::shared_ptr< ResultSet > &  rows,
const std::vector< TargetMetaInfo > &  targets_meta 
)

Definition at line 74 of file ArrowResultSet.cpp.

References column_metainfo_, columns_, field(), record_batch_, resultSetArrowLoopback(), and anonymous_namespace{ArrowResultSet.cpp}::type_from_arrow_field().

76  : rows_(rows), targets_meta_(targets_meta), crt_row_idx_(0) {
78  auto schema = record_batch_->schema();
79  for (int i = 0; i < schema->num_fields(); ++i) {
80  std::shared_ptr<arrow::Field> field = schema->field(i);
81  SQLTypeInfo type_info = type_from_arrow_field(*schema->field(i));
82  column_metainfo_.emplace_back(field->name(), type_info);
83  columns_.emplace_back(record_batch_->column(i));
84  }
85 }
std::shared_ptr< ResultSet > rows_
SQLTypeInfo type_from_arrow_field(const arrow::Field &field)
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::vector< TargetMetaInfo > column_metainfo_
void resultSetArrowLoopback()
std::shared_ptr< arrow::RecordBatch > record_batch_
std::vector< TargetMetaInfo > targets_meta_
std::vector< std::shared_ptr< arrow::Array > > columns_
+ Here is the call graph for this function:

◆ ArrowResultSet() [2/2]

ArrowResultSet::ArrowResultSet ( const std::shared_ptr< ResultSet > &  rows)
inline

Definition at line 101 of file ArrowResultSet.h.

101 : ArrowResultSet(rows, {}) {}
ArrowResultSet(const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta)

Member Function Documentation

◆ appendValue()

template<typename Type , typename ArrayType >
void ArrowResultSet::appendValue ( std::vector< TargetValue > &  row,
const arrow::Array &  column,
const Type  null_val,
const size_t  idx 
) const
private

Definition at line 88 of file ArrowResultSet.cpp.

91  {
92  const auto& col = static_cast<const ArrayType&>(column);
93  row.emplace_back(col.IsNull(idx) ? null_val : static_cast<Type>(col.Value(idx)));
94 }

◆ colCount()

size_t ArrowResultSet::colCount ( ) const

Definition at line 199 of file ArrowResultSet.cpp.

References column_metainfo_.

199  {
200  return column_metainfo_.size();
201 }
std::vector< TargetMetaInfo > column_metainfo_

◆ deallocateArrowResultBuffer()

void ArrowResultSet::deallocateArrowResultBuffer ( const ArrowResult result,
const ExecutorDeviceType  device_type,
const size_t  device_id,
std::shared_ptr< Data_Namespace::DataMgr > &  data_mgr 
)
static

Definition at line 948 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, CPU, ArrowResult::df_handle, ArrowResult::df_size, ArrowResult::sm_handle, ArrowResult::sm_size, and to_string().

Referenced by DBHandler::deallocate_df().

952  {
953 #ifndef _MSC_VER
954  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
955  // Remove shared memory on sysmem
956  if (!result.sm_handle.empty()) {
957  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
958  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
959  auto shm_id = shmget(schema_key, result.sm_size, 0666);
960  if (shm_id < 0) {
961  throw std::runtime_error(
962  "failed to get an valid shm ID w/ given shm key of the schema");
963  }
964  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
965  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
966  std::to_string(errno) + ")");
967  }
968  }
969 
970  if (device_type == ExecutorDeviceType::CPU) {
971  CHECK_EQ(sizeof(key_t), result.df_handle.size());
972  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
973  auto shm_id = shmget(df_key, result.df_size, 0666);
974  if (shm_id < 0) {
975  throw std::runtime_error(
976  "failed to get an valid shm ID w/ given shm key of the data");
977  }
978  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
979  throw std::runtime_error("failed to deallocate Arrow data frame");
980  }
981  }
982  // CUDA buffers become owned by the caller, and will automatically be freed
983  // TODO: What if the client never takes ownership of the result? we may want to
984  // establish a check to see if the GPU buffer still exists, and then free it.
985 #endif
986 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::vector< char > sm_handle
std::string to_string(char const *&&v)
std::vector< char > df_handle
int64_t sm_size
int64_t df_size
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ definitelyHasNoRows()

bool ArrowResultSet::definitelyHasNoRows ( ) const

Definition at line 208 of file ArrowResultSet.cpp.

References rowCount().

208  {
209  return !rowCount();
210 }
size_t rowCount() const
+ Here is the call graph for this function:

◆ getColType()

SQLTypeInfo ArrowResultSet::getColType ( const size_t  col_idx) const

Definition at line 203 of file ArrowResultSet.cpp.

References CHECK_LT, and column_metainfo_.

Referenced by getRowAt().

203  {
204  CHECK_LT(col_idx, column_metainfo_.size());
205  return column_metainfo_[col_idx].get_type_info();
206 }
#define CHECK_LT(x, y)
Definition: Logger.h:207
std::vector< TargetMetaInfo > column_metainfo_
+ Here is the caller graph for this function:

◆ getNextRow()

std::vector< TargetValue > ArrowResultSet::getNextRow ( const bool  translate_strings,
const bool  decimal_to_double 
) const

Definition at line 188 of file ArrowResultSet.cpp.

References CHECK_LT, crt_row_idx_, getRowAt(), and rowCount().

189  {
190  if (crt_row_idx_ == rowCount()) {
191  return {};
192  }
194  auto row = getRowAt(crt_row_idx_);
195  ++crt_row_idx_;
196  return row;
197 }
std::vector< TargetValue > getRowAt(const size_t index) const
#define CHECK_LT(x, y)
Definition: Logger.h:207
size_t rowCount() const
+ Here is the call graph for this function:

◆ getRowAt()

std::vector< TargetValue > ArrowResultSet::getRowAt ( const size_t  index) const

Definition at line 96 of file ArrowResultSet.cpp.

References CHECK, CHECK_EQ, CHECK_LT, columns_, getColType(), inline_fp_null_value< double >(), inline_fp_null_value< float >(), inline_int_null_val(), kBIGINT, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, record_batch_, and rowCount().

Referenced by getNextRow(), and ArrowResultSetRowIterator::operator*().

96  {
97  if (index >= rowCount()) {
98  return {};
99  }
100 
101  CHECK_LT(index, rowCount());
102  std::vector<TargetValue> row;
103  for (int i = 0; i < record_batch_->num_columns(); ++i) {
104  const auto& column = *columns_[i];
105  const auto& column_typeinfo = getColType(i);
106  switch (column_typeinfo.get_type()) {
107  case kTINYINT: {
108  CHECK_EQ(arrow::Type::INT8, column.type_id());
109  appendValue<int64_t, arrow::Int8Array>(
110  row, column, inline_int_null_val(column_typeinfo), index);
111  break;
112  }
113  case kSMALLINT: {
114  CHECK_EQ(arrow::Type::INT16, column.type_id());
115  appendValue<int64_t, arrow::Int16Array>(
116  row, column, inline_int_null_val(column_typeinfo), index);
117  break;
118  }
119  case kINT: {
120  CHECK_EQ(arrow::Type::INT32, column.type_id());
121  appendValue<int64_t, arrow::Int32Array>(
122  row, column, inline_int_null_val(column_typeinfo), index);
123  break;
124  }
125  case kBIGINT: {
126  CHECK_EQ(arrow::Type::INT64, column.type_id());
127  appendValue<int64_t, arrow::Int64Array>(
128  row, column, inline_int_null_val(column_typeinfo), index);
129  break;
130  }
131  case kFLOAT: {
132  CHECK_EQ(arrow::Type::FLOAT, column.type_id());
133  appendValue<float, arrow::FloatArray>(
134  row, column, inline_fp_null_value<float>(), index);
135  break;
136  }
137  case kDOUBLE: {
138  CHECK_EQ(arrow::Type::DOUBLE, column.type_id());
139  appendValue<double, arrow::DoubleArray>(
140  row, column, inline_fp_null_value<double>(), index);
141  break;
142  }
143  case kTEXT: {
144  CHECK_EQ(kENCODING_DICT, column_typeinfo.get_compression());
145  CHECK_EQ(arrow::Type::DICTIONARY, column.type_id());
146  const auto& dict_column = static_cast<const arrow::DictionaryArray&>(column);
147  if (dict_column.IsNull(index)) {
148  row.emplace_back(NullableString(nullptr));
149  } else {
150  const auto& indices =
151  static_cast<const arrow::Int32Array&>(*dict_column.indices());
152  const auto& dictionary =
153  static_cast<const arrow::StringArray&>(*dict_column.dictionary());
154  row.emplace_back(dictionary.GetString(indices.Value(index)));
155  }
156  break;
157  }
158  case kTIMESTAMP: {
159  CHECK_EQ(arrow::Type::TIMESTAMP, column.type_id());
160  appendValue<int64_t, arrow::TimestampArray>(
161  row, column, inline_int_null_val(column_typeinfo), index);
162  break;
163  }
164  case kDATE: {
165  // TODO(wamsi): constexpr?
166  CHECK(arrow::Type::DATE32 == column.type_id() ||
167  arrow::Type::DATE64 == column.type_id());
168  column_typeinfo.is_date_in_days()
169  ? appendValue<int64_t, arrow::Date32Array>(
170  row, column, inline_int_null_val(column_typeinfo), index)
171  : appendValue<int64_t, arrow::Date64Array>(
172  row, column, inline_int_null_val(column_typeinfo), index);
173  break;
174  }
175  case kTIME: {
176  CHECK_EQ(arrow::Type::TIME32, column.type_id());
177  appendValue<int64_t, arrow::Time32Array>(
178  row, column, inline_int_null_val(column_typeinfo), index);
179  break;
180  }
181  default:
182  CHECK(false);
183  }
184  }
185  return row;
186 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:51
SQLTypeInfo getColType(const size_t col_idx) const
#define CHECK_LT(x, y)
Definition: Logger.h:207
Definition: sqltypes.h:54
Definition: sqltypes.h:55
constexpr float inline_fp_null_value< float >()
constexpr double inline_fp_null_value< double >()
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:155
#define CHECK(condition)
Definition: Logger.h:197
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
Definition: sqltypes.h:47
std::shared_ptr< arrow::RecordBatch > record_batch_
size_t rowCount() const
std::vector< std::shared_ptr< arrow::Array > > columns_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ resultSetArrowLoopback()

void ArrowResultSet::resultSetArrowLoopback ( )
private

Definition at line 216 of file ArrowResultSet.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, CHECK, CHECK_EQ, dictionary_memo_, record_batch_, rows_, targets_meta_, and to_string().

Referenced by ArrowResultSet().

216  {
217  std::vector<std::string> col_names;
218 
219  if (!targets_meta_.empty()) {
220  for (auto& meta : targets_meta_) {
221  col_names.push_back(meta.get_resname());
222  }
223  } else {
224  for (unsigned int i = 0; i < rows_->colCount(); i++) {
225  col_names.push_back("col_" + std::to_string(i));
226  }
227  }
228  const auto converter = ArrowResultSetConverter(rows_, col_names, -1);
229 
230  arrow::ipc::DictionaryMemo schema_memo;
231  const auto serialized_arrow_output = converter.getSerializedArrowOutput(&schema_memo);
232 
233  arrow::io::BufferReader schema_reader(serialized_arrow_output.schema);
234 
235  ARROW_ASSIGN_OR_THROW(auto schema,
236  arrow::ipc::ReadSchema(&schema_reader, &dictionary_memo_));
237  CHECK_EQ(schema_memo.num_fields(), dictionary_memo_.num_fields());
238 
239  // add the dictionaries from the serialized output to the newly created memo
240  const auto& serialized_id_to_dict = schema_memo.dictionaries();
241  for (const auto& itr : serialized_id_to_dict) {
242  const auto& id = itr.first;
243  const auto& dict = itr.second;
244  CHECK(!dictionary_memo_.HasDictionary(id));
245  ARROW_THROW_NOT_OK(dictionary_memo_.AddDictionary(id, dict));
246  }
247 
248  arrow::io::BufferReader records_reader(serialized_arrow_output.records);
249 
252  arrow::ipc::ReadRecordBatch(schema,
254  arrow::ipc::IpcReadOptions::Defaults(),
255  &records_reader));
256 
257  CHECK_EQ(schema->num_fields(), record_batch_->num_columns());
258 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
std::shared_ptr< ResultSet > rows_
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
arrow::ipc::DictionaryMemo dictionary_memo_
std::string to_string(char const *&&v)
#define CHECK(condition)
Definition: Logger.h:197
std::shared_ptr< arrow::RecordBatch > record_batch_
std::vector< TargetMetaInfo > targets_meta_
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ rowCount()

size_t ArrowResultSet::rowCount ( ) const

Definition at line 212 of file ArrowResultSet.cpp.

References record_batch_.

Referenced by definitelyHasNoRows(), getNextRow(), and getRowAt().

212  {
213  return record_batch_->num_rows();
214 }
std::shared_ptr< arrow::RecordBatch > record_batch_
+ Here is the caller graph for this function:

◆ rowIterator() [1/2]

ArrowResultSetRowIterator ArrowResultSet::rowIterator ( size_t  from_index,
bool  translate_strings,
bool  decimal_to_double 
) const
inline

Definition at line 103 of file ArrowResultSet.h.

105  {
106  ArrowResultSetRowIterator iter(this);
107  for (size_t i = 0; i < from_index; i++) {
108  ++iter;
109  }
110 
111  return iter;
112  }

◆ rowIterator() [2/2]

ArrowResultSetRowIterator ArrowResultSet::rowIterator ( bool  translate_strings,
bool  decimal_to_double 
) const
inline

Definition at line 114 of file ArrowResultSet.h.

References anonymous_namespace{TypedDataAccessors.h}::decimal_to_double(), and run_benchmark_import::result.

115  {
116  return rowIterator(0, translate_strings, decimal_to_double);
117  }
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
ArrowResultSetRowIterator rowIterator(size_t from_index, bool translate_strings, bool decimal_to_double) const
+ Here is the call graph for this function:

Member Data Documentation

◆ column_metainfo_

std::vector<TargetMetaInfo> ArrowResultSet::column_metainfo_
private

Definition at line 155 of file ArrowResultSet.h.

Referenced by ArrowResultSet(), colCount(), and getColType().

◆ columns_

std::vector<std::shared_ptr<arrow::Array> > ArrowResultSet::columns_
private

Definition at line 153 of file ArrowResultSet.h.

Referenced by ArrowResultSet(), and getRowAt().

◆ crt_row_idx_

size_t ArrowResultSet::crt_row_idx_
mutableprivate

Definition at line 154 of file ArrowResultSet.h.

Referenced by getNextRow().

◆ dictionary_memo_

arrow::ipc::DictionaryMemo ArrowResultSet::dictionary_memo_
private

Definition at line 149 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().

◆ record_batch_

std::shared_ptr<arrow::RecordBatch> ArrowResultSet::record_batch_
private

Definition at line 148 of file ArrowResultSet.h.

Referenced by ArrowResultSet(), getRowAt(), resultSetArrowLoopback(), and rowCount().

◆ rows_

std::shared_ptr<ResultSet> ArrowResultSet::rows_
private

Definition at line 146 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().

◆ targets_meta_

std::vector<TargetMetaInfo> ArrowResultSet::targets_meta_
private

Definition at line 147 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().


The documentation for this class was generated from the following files: