OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSet Class Reference

#include <ArrowResultSet.h>

Public Member Functions

 ArrowResultSet (const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta, const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
 
 ArrowResultSet (const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta, const ExecutorDeviceType device_type, const size_t min_result_size_for_bulk_dictionary_fetch, const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
 
 ArrowResultSet (const std::shared_ptr< ResultSet > &rows, const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
 
std::vector< TargetValuegetRowAt (const size_t index) const
 
std::vector< TargetValuegetNextRow (const bool translate_strings, const bool decimal_to_double) const
 
size_t colCount () const
 
SQLTypeInfo getColType (const size_t col_idx) const
 
bool definitelyHasNoRows () const
 
size_t rowCount () const
 
size_t entryCount () const
 
bool isEmpty () const
 

Static Public Member Functions

static void deallocateArrowResultBuffer (const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
 

Public Attributes

 device_type
 

Private Member Functions

void resultSetArrowLoopback (const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
 
void resultSetArrowLoopback (const ExecutorDeviceType device_type, const size_t min_result_size_for_bulk_dictionary_fetch, const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
 
template<typename Type , typename ArrayType >
void appendValue (std::vector< TargetValue > &row, const arrow::Array &column, const Type null_val, const size_t idx) const
 

Private Attributes

std::shared_ptr< ArrowResultresults_
 
std::shared_ptr< ResultSetrows_
 
std::vector< TargetMetaInfotargets_meta_
 
std::shared_ptr
< arrow::RecordBatch > 
record_batch_
 
arrow::ipc::DictionaryMemo dictionary_memo_
 
std::vector< std::shared_ptr
< arrow::Array > > 
columns_
 
size_t crt_row_idx_
 
std::vector< TargetMetaInfocolumn_metainfo_
 

Detailed Description

Definition at line 108 of file ArrowResultSet.h.

Constructor & Destructor Documentation

ArrowResultSet::ArrowResultSet ( const std::shared_ptr< ResultSet > &  rows,
const std::vector< TargetMetaInfo > &  targets_meta,
const ExecutorDeviceType  device_type = ExecutorDeviceType::CPU 
)

Definition at line 74 of file ArrowResultSet.cpp.

References column_metainfo_, columns_, field(), record_batch_, resultSetArrowLoopback(), and anonymous_namespace{ArrowResultSet.cpp}::type_from_arrow_field().

77  : rows_(rows), targets_meta_(targets_meta), crt_row_idx_(0) {
79  auto schema = record_batch_->schema();
80  for (int i = 0; i < schema->num_fields(); ++i) {
81  std::shared_ptr<arrow::Field> field = schema->field(i);
82  SQLTypeInfo type_info = type_from_arrow_field(*schema->field(i));
83  column_metainfo_.emplace_back(field->name(), type_info);
84  columns_.emplace_back(record_batch_->column(i));
85  }
86 }
std::shared_ptr< ResultSet > rows_
SQLTypeInfo type_from_arrow_field(const arrow::Field &field)
tuple rows
Definition: report.py:114
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
std::vector< TargetMetaInfo > column_metainfo_
void resultSetArrowLoopback(const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
std::shared_ptr< arrow::RecordBatch > record_batch_
std::vector< TargetMetaInfo > targets_meta_
std::vector< std::shared_ptr< arrow::Array > > columns_

+ Here is the call graph for this function:

ArrowResultSet::ArrowResultSet ( const std::shared_ptr< ResultSet > &  rows,
const std::vector< TargetMetaInfo > &  targets_meta,
const ExecutorDeviceType  device_type,
const size_t  min_result_size_for_bulk_dictionary_fetch,
const double  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch 
)

Definition at line 88 of file ArrowResultSet.cpp.

References column_metainfo_, columns_, field(), record_batch_, resultSetArrowLoopback(), and anonymous_namespace{ArrowResultSet.cpp}::type_from_arrow_field().

94  : rows_(rows), targets_meta_(targets_meta), crt_row_idx_(0) {
96  min_result_size_for_bulk_dictionary_fetch,
97  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
98  auto schema = record_batch_->schema();
99  for (int i = 0; i < schema->num_fields(); ++i) {
100  std::shared_ptr<arrow::Field> field = schema->field(i);
101  SQLTypeInfo type_info = type_from_arrow_field(*schema->field(i));
102  column_metainfo_.emplace_back(field->name(), type_info);
103  columns_.emplace_back(record_batch_->column(i));
104  }
105 }
std::shared_ptr< ResultSet > rows_
SQLTypeInfo type_from_arrow_field(const arrow::Field &field)
tuple rows
Definition: report.py:114
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
std::vector< TargetMetaInfo > column_metainfo_
void resultSetArrowLoopback(const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
std::shared_ptr< arrow::RecordBatch > record_batch_
std::vector< TargetMetaInfo > targets_meta_
std::vector< std::shared_ptr< arrow::Array > > columns_

+ Here is the call graph for this function:

ArrowResultSet::ArrowResultSet ( const std::shared_ptr< ResultSet > &  rows,
const ExecutorDeviceType  device_type = ExecutorDeviceType::CPU 
)
inline

Definition at line 121 of file ArrowResultSet.h.

123  : ArrowResultSet(rows, {}, device_type) {}
ArrowResultSet(const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta, const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
tuple rows
Definition: report.py:114

Member Function Documentation

template<typename Type , typename ArrayType >
void ArrowResultSet::appendValue ( std::vector< TargetValue > &  row,
const arrow::Array &  column,
const Type  null_val,
const size_t  idx 
) const
private

Definition at line 108 of file ArrowResultSet.cpp.

111  {
112  const auto& col = static_cast<const ArrayType&>(column);
113  row.emplace_back(col.IsNull(idx) ? null_val : static_cast<Type>(col.Value(idx)));
114 }
size_t ArrowResultSet::colCount ( ) const

Definition at line 245 of file ArrowResultSet.cpp.

References column_metainfo_.

245  {
246  return column_metainfo_.size();
247 }
std::vector< TargetMetaInfo > column_metainfo_
void ArrowResultSet::deallocateArrowResultBuffer ( const ArrowResult result,
const ExecutorDeviceType  device_type,
const size_t  device_id,
std::shared_ptr< Data_Namespace::DataMgr > &  data_mgr 
)
static

Definition at line 1212 of file ArrowResultSetConverter.cpp.

References CHECK_EQ, CPU, ArrowResult::df_handle, ArrowResult::df_size, ArrowResult::sm_handle, ArrowResult::sm_size, and to_string().

Referenced by DBHandler::deallocate_df().

1216  {
1217 #ifndef _MSC_VER
1218  // CPU buffers skip the sm handle, serializing the entire RecordBatch to df.
1219  // Remove shared memory on sysmem
1220  if (!result.sm_handle.empty()) {
1221  CHECK_EQ(sizeof(key_t), result.sm_handle.size());
1222  const key_t& schema_key = *(key_t*)(&result.sm_handle[0]);
1223  auto shm_id = shmget(schema_key, result.sm_size, 0666);
1224  if (shm_id < 0) {
1225  throw std::runtime_error(
1226  "failed to get an valid shm ID w/ given shm key of the schema");
1227  }
1228  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1229  throw std::runtime_error("failed to deallocate Arrow schema on errorno(" +
1230  std::to_string(errno) + ")");
1231  }
1232  }
1233 
1235  CHECK_EQ(sizeof(key_t), result.df_handle.size());
1236  const key_t& df_key = *(key_t*)(&result.df_handle[0]);
1237  auto shm_id = shmget(df_key, result.df_size, 0666);
1238  if (shm_id < 0) {
1239  throw std::runtime_error(
1240  "failed to get an valid shm ID w/ given shm key of the data");
1241  }
1242  if (-1 == shmctl(shm_id, IPC_RMID, 0)) {
1243  throw std::runtime_error("failed to deallocate Arrow data frame");
1244  }
1245  }
1246  // CUDA buffers become owned by the caller, and will automatically be freed
1247  // TODO: What if the client never takes ownership of the result? we may want to
1248  // establish a check to see if the GPU buffer still exists, and then free it.
1249 #endif
1250 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< char > sm_handle
std::string to_string(char const *&&v)
std::vector< char > df_handle
int64_t sm_size
int64_t df_size

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ArrowResultSet::definitelyHasNoRows ( ) const

Definition at line 254 of file ArrowResultSet.cpp.

References rowCount().

254  {
255  return !rowCount();
256 }
size_t rowCount() const

+ Here is the call graph for this function:

size_t ArrowResultSet::entryCount ( ) const

Definition at line 264 of file ArrowResultSet.cpp.

References rowCount().

264  {
265  return rowCount();
266 }
size_t rowCount() const

+ Here is the call graph for this function:

SQLTypeInfo ArrowResultSet::getColType ( const size_t  col_idx) const

Definition at line 249 of file ArrowResultSet.cpp.

References CHECK_LT, and column_metainfo_.

Referenced by getRowAt().

249  {
250  CHECK_LT(col_idx, column_metainfo_.size());
251  return column_metainfo_[col_idx].get_type_info();
252 }
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::vector< TargetMetaInfo > column_metainfo_

+ Here is the caller graph for this function:

std::vector< TargetValue > ArrowResultSet::getNextRow ( const bool  translate_strings,
const bool  decimal_to_double 
) const

Definition at line 234 of file ArrowResultSet.cpp.

References CHECK_LT, crt_row_idx_, getRowAt(), and rowCount().

235  {
236  if (crt_row_idx_ == rowCount()) {
237  return {};
238  }
240  auto row = getRowAt(crt_row_idx_);
241  ++crt_row_idx_;
242  return row;
243 }
size_t rowCount() const
#define CHECK_LT(x, y)
Definition: Logger.h:303
std::vector< TargetValue > getRowAt(const size_t index) const

+ Here is the call graph for this function:

std::vector< TargetValue > ArrowResultSet::getRowAt ( const size_t  index) const

Definition at line 142 of file ArrowResultSet.cpp.

References CHECK, CHECK_EQ, CHECK_LT, columns_, getColType(), inline_fp_null_value< double >(), inline_fp_null_value< float >(), inline_int_null_val(), kBIGINT, kDATE, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, record_batch_, and rowCount().

Referenced by getNextRow(), and ArrowResultSetRowIterator::operator*().

142  {
143  if (index >= rowCount()) {
144  return {};
145  }
146 
147  CHECK_LT(index, rowCount());
148  std::vector<TargetValue> row;
149  for (int i = 0; i < record_batch_->num_columns(); ++i) {
150  const auto& column = *columns_[i];
151  const auto& column_typeinfo = getColType(i);
152  switch (column_typeinfo.get_type()) {
153  case kTINYINT: {
154  CHECK_EQ(arrow::Type::INT8, column.type_id());
155  appendValue<int64_t, arrow::Int8Array>(
156  row, column, inline_int_null_val(column_typeinfo), index);
157  break;
158  }
159  case kSMALLINT: {
160  CHECK_EQ(arrow::Type::INT16, column.type_id());
161  appendValue<int64_t, arrow::Int16Array>(
162  row, column, inline_int_null_val(column_typeinfo), index);
163  break;
164  }
165  case kINT: {
166  CHECK_EQ(arrow::Type::INT32, column.type_id());
167  appendValue<int64_t, arrow::Int32Array>(
168  row, column, inline_int_null_val(column_typeinfo), index);
169  break;
170  }
171  case kBIGINT: {
172  CHECK_EQ(arrow::Type::INT64, column.type_id());
173  appendValue<int64_t, arrow::Int64Array>(
174  row, column, inline_int_null_val(column_typeinfo), index);
175  break;
176  }
177  case kFLOAT: {
178  CHECK_EQ(arrow::Type::FLOAT, column.type_id());
179  appendValue<float, arrow::FloatArray>(
180  row, column, inline_fp_null_value<float>(), index);
181  break;
182  }
183  case kDOUBLE: {
184  CHECK_EQ(arrow::Type::DOUBLE, column.type_id());
185  appendValue<double, arrow::DoubleArray>(
186  row, column, inline_fp_null_value<double>(), index);
187  break;
188  }
189  case kTEXT: {
190  CHECK_EQ(kENCODING_DICT, column_typeinfo.get_compression());
191  CHECK_EQ(arrow::Type::DICTIONARY, column.type_id());
192  const auto& dict_column = static_cast<const arrow::DictionaryArray&>(column);
193  if (dict_column.IsNull(index)) {
194  row.emplace_back(NullableString(nullptr));
195  } else {
196  const auto& indices =
197  static_cast<const arrow::Int32Array&>(*dict_column.indices());
198  const auto& dictionary =
199  static_cast<const arrow::StringArray&>(*dict_column.dictionary());
200  row.emplace_back(dictionary.GetString(indices.Value(index)));
201  }
202  break;
203  }
204  case kTIMESTAMP: {
205  CHECK_EQ(arrow::Type::TIMESTAMP, column.type_id());
206  appendValue<int64_t, arrow::TimestampArray>(
207  row, column, inline_int_null_val(column_typeinfo), index);
208  break;
209  }
210  case kDATE: {
211  // TODO(wamsi): constexpr?
212  CHECK(arrow::Type::DATE32 == column.type_id() ||
213  arrow::Type::DATE64 == column.type_id());
214  column_typeinfo.is_date_in_days()
215  ? appendValue<int64_t, arrow::Date32Array>(
216  row, column, inline_int_null_val(column_typeinfo), index)
217  : appendValue<int64_t, arrow::Date64Array>(
218  row, column, inline_int_null_val(column_typeinfo), index);
219  break;
220  }
221  case kTIME: {
222  CHECK_EQ(arrow::Type::TIME32, column.type_id());
223  appendValue<int64_t, arrow::Time32Array>(
224  row, column, inline_int_null_val(column_typeinfo), index);
225  break;
226  }
227  default:
228  CHECK(false);
229  }
230  }
231  return row;
232 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
Definition: sqltypes.h:76
size_t rowCount() const
SQLTypeInfo getColType(const size_t col_idx) const
#define CHECK_LT(x, y)
Definition: Logger.h:303
Definition: sqltypes.h:79
Definition: sqltypes.h:80
constexpr float inline_fp_null_value< float >()
constexpr double inline_fp_null_value< double >()
boost::variant< std::string, void * > NullableString
Definition: TargetValue.h:179
#define CHECK(condition)
Definition: Logger.h:291
int64_t inline_int_null_val(const SQL_TYPE_INFO &ti)
Definition: sqltypes.h:72
std::shared_ptr< arrow::RecordBatch > record_batch_
std::vector< std::shared_ptr< arrow::Array > > columns_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool ArrowResultSet::isEmpty ( ) const

Definition at line 270 of file ArrowResultSet.cpp.

References rowCount().

270  {
271  return rowCount() == static_cast<size_t>(0);
272 }
size_t rowCount() const

+ Here is the call graph for this function:

void ArrowResultSet::resultSetArrowLoopback ( const ExecutorDeviceType  device_type = ExecutorDeviceType::CPU)
private

Definition at line 274 of file ArrowResultSet.cpp.

References ArrowResultSetConverter::default_min_result_size_for_bulk_dictionary_fetch.

Referenced by ArrowResultSet().

274  {
276  device_type,
279  default_max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
280 }
static constexpr size_t default_min_result_size_for_bulk_dictionary_fetch
void resultSetArrowLoopback(const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)

+ Here is the caller graph for this function:

void ArrowResultSet::resultSetArrowLoopback ( const ExecutorDeviceType  device_type,
const size_t  min_result_size_for_bulk_dictionary_fetch,
const double  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch 
)
private

Definition at line 282 of file ArrowResultSet.cpp.

References ARROW_ASSIGN_OR_THROW, ARROW_THROW_NOT_OK, CHECK_EQ, device_type, dictionary_memo_, record_batch_, results_, rows_, targets_meta_, to_string(), and WIRE.

285  {
286  std::vector<std::string> col_names;
287 
288  if (!targets_meta_.empty()) {
289  for (auto& meta : targets_meta_) {
290  col_names.push_back(meta.get_resname());
291  }
292  } else {
293  for (unsigned int i = 0; i < rows_->colCount(); i++) {
294  col_names.push_back("col_" + std::to_string(i));
295  }
296  }
297 
298  // We convert the given rows to arrow, which gets serialized
299  // into a buffer by Arrow Wire.
300  auto converter = ArrowResultSetConverter(
301  rows_,
302  col_names,
303  -1,
304  min_result_size_for_bulk_dictionary_fetch,
305  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
306  converter.transport_method_ = ArrowTransport::WIRE;
307  converter.device_type_ = device_type;
308 
309  // Lifetime of the result buffer is that of ArrowResultSet
310  results_ = std::make_shared<ArrowResult>(converter.getArrowResult());
311 
312  // Create a reader for reading back serialized
313  arrow::io::BufferReader reader(
314  reinterpret_cast<const uint8_t*>(results_->df_buffer.data()), results_->df_size);
315 
316  ARROW_ASSIGN_OR_THROW(auto batch_reader,
317  arrow::ipc::RecordBatchStreamReader::Open(&reader));
318 
319  ARROW_THROW_NOT_OK(batch_reader->ReadNext(&record_batch_));
320 
321  // Collect dictionaries from the record batch into the dictionary memo.
323  arrow::ipc::internal::CollectDictionaries(*record_batch_, &dictionary_memo_));
324 
325  CHECK_EQ(record_batch_->schema()->num_fields(), record_batch_->num_columns());
326 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define ARROW_THROW_NOT_OK(s)
Definition: ArrowUtil.h:36
std::shared_ptr< ArrowResult > results_
std::shared_ptr< ResultSet > rows_
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
arrow::ipc::DictionaryMemo dictionary_memo_
std::string to_string(char const *&&v)
std::shared_ptr< arrow::RecordBatch > record_batch_
std::vector< TargetMetaInfo > targets_meta_

+ Here is the call graph for this function:

size_t ArrowResultSet::rowCount ( ) const

Definition at line 258 of file ArrowResultSet.cpp.

References record_batch_.

Referenced by definitelyHasNoRows(), entryCount(), getNextRow(), getRowAt(), and isEmpty().

258  {
259  return record_batch_->num_rows();
260 }
std::shared_ptr< arrow::RecordBatch > record_batch_

+ Here is the caller graph for this function:

Member Data Documentation

std::vector<TargetMetaInfo> ArrowResultSet::column_metainfo_
private

Definition at line 190 of file ArrowResultSet.h.

Referenced by ArrowResultSet(), colCount(), and getColType().

std::vector<std::shared_ptr<arrow::Array> > ArrowResultSet::columns_
private

Definition at line 188 of file ArrowResultSet.h.

Referenced by ArrowResultSet(), and getRowAt().

size_t ArrowResultSet::crt_row_idx_
mutableprivate

Definition at line 189 of file ArrowResultSet.h.

Referenced by getNextRow().

ArrowResultSet::device_type
Initial value:
{}
ArrowResultSetRowIterator rowIterator(size_t from_index,
bool translate_strings,
bool decimal_to_double) const {
for (size_t i = 0; i < from_index; i++) {
++iter;
}
return iter;
}
ArrowResultSetRowIterator rowIterator(bool translate_strings,
bool decimal_to_double) const {
return rowIterator(0, translate_strings, decimal_to_double);
}
std::vector<std::string> getDictionaryStrings(const size_t col_idx) const

Definition at line 123 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().

arrow::ipc::DictionaryMemo ArrowResultSet::dictionary_memo_
private

Definition at line 184 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().

std::shared_ptr<arrow::RecordBatch> ArrowResultSet::record_batch_
private

Definition at line 183 of file ArrowResultSet.h.

Referenced by ArrowResultSet(), getRowAt(), resultSetArrowLoopback(), and rowCount().

std::shared_ptr<ArrowResult> ArrowResultSet::results_
private

Definition at line 180 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().

std::shared_ptr<ResultSet> ArrowResultSet::rows_
private

Definition at line 181 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().

std::vector<TargetMetaInfo> ArrowResultSet::targets_meta_
private

Definition at line 182 of file ArrowResultSet.h.

Referenced by resultSetArrowLoopback().


The documentation for this class was generated from the following files: