OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowResultSet.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "CompilationOptions.h"
20 #include "DataMgr/DataMgr.h"
22 #include "ResultSet.h"
23 #include "TargetMetaInfo.h"
24 #include "TargetValue.h"
25 
26 #include <type_traits>
27 
28 #include "arrow/api.h"
29 #include "arrow/ipc/api.h"
30 
31 // TODO(wamsi): ValueArray is not optimal. Remove it and inherrit from base vector class.
32 using ValueArray = boost::variant<std::vector<bool>,
33  std::vector<int8_t>,
34  std::vector<int16_t>,
35  std::vector<int32_t>,
36  std::vector<int64_t>,
37  std::vector<float>,
38  std::vector<double>,
39  std::vector<std::string>>;
40 
41 class ArrowResultSet;
42 
44  public:
45  using value_type = std::vector<TargetValue>;
46  using difference_type = std::ptrdiff_t;
47  using pointer = std::vector<TargetValue>*;
48  using reference = std::vector<TargetValue>&;
49  using iterator_category = std::input_iterator_tag;
50 
51  bool operator==(const ArrowResultSetRowIterator& other) const {
52  return result_set_ == other.result_set_ && crt_row_idx_ == other.crt_row_idx_;
53  }
54  bool operator!=(const ArrowResultSetRowIterator& other) const {
55  return !(*this == other);
56  }
57 
58  inline value_type operator*() const;
60  crt_row_idx_++;
61  return *this;
62  }
64  ArrowResultSetRowIterator iter(*this);
65  ++(*this);
66  return iter;
67  }
68 
69  private:
71  size_t crt_row_idx_;
72 
75 
76  friend class ArrowResultSet;
77 };
78 
79 struct ArrowResult {
80  std::vector<char> sm_handle;
81  int64_t sm_size;
82  std::vector<char> df_handle;
83  int64_t df_size;
84  int8_t* df_dev_ptr; // Only for device memory deallocation
85 };
86 
87 // Expose Arrow buffers as a subset of the ResultSet interface
88 // to make it work within the existing execution test framework.
90  public:
91  ArrowResultSet(const std::shared_ptr<ResultSet>& rows,
92  const std::vector<TargetMetaInfo>& targets_meta);
93  ArrowResultSet(const std::shared_ptr<ResultSet>& rows) : ArrowResultSet(rows, {}) {}
94 
95  ArrowResultSetRowIterator rowIterator(size_t from_index,
96  bool translate_strings,
97  bool decimal_to_double) const {
98  ArrowResultSetRowIterator iter(this);
99  for (size_t i = 0; i < from_index; i++) {
100  ++iter;
101  }
102 
103  return iter;
104  }
105 
106  ArrowResultSetRowIterator rowIterator(bool translate_strings,
107  bool decimal_to_double) const {
108  return rowIterator(0, translate_strings, decimal_to_double);
109  }
110 
111  std::vector<TargetValue> getRowAt(const size_t index) const;
112 
113  std::vector<TargetValue> getNextRow(const bool translate_strings,
114  const bool decimal_to_double) const;
115 
116  size_t colCount() const;
117 
118  SQLTypeInfo getColType(const size_t col_idx) const;
119 
120  bool definitelyHasNoRows() const;
121 
122  size_t rowCount() const;
123 
124  static void deallocateArrowResultBuffer(
125  const ArrowResult& result,
126  const ExecutorDeviceType device_type,
127  const size_t device_id,
128  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr);
129 
130  private:
131  void resultSetArrowLoopback();
132  template <typename Type, typename ArrayType>
133  void appendValue(std::vector<TargetValue>& row,
134  const arrow::Array& column,
135  const Type null_val,
136  const size_t idx) const;
137 
138  std::shared_ptr<ResultSet> rows_;
139  std::vector<TargetMetaInfo> targets_meta_;
140  std::shared_ptr<arrow::RecordBatch> record_batch_;
141 
142  // Boxed arrays from the record batch. The result of RecordBatch::column is
143  // temporary, so we cache these for better performance
144  std::vector<std::shared_ptr<arrow::Array>> columns_;
145  mutable size_t crt_row_idx_;
146  std::vector<TargetMetaInfo> column_metainfo_;
147 };
148 
150  return result_set_->getRowAt(crt_row_idx_);
151 }
152 
153 class ExecutionResult;
154 
155 // Take results from the executor, serializes them to Arrow and then deserialize
156 // them to ArrowResultSet, which can then be used by the existing test framework.
157 std::unique_ptr<ArrowResultSet> result_set_arrow_loopback(const ExecutionResult& results);
158 
159 // QUERYENGINE_// Take results from the executor, serializes them to Arrow and then
160 // deserialize them to ArrowResultSet, which can then be used by the existing test
161 // framework.
162 std::unique_ptr<ArrowResultSet> result_set_arrow_loopback(
163  const ExecutionResult* results,
164  const std::shared_ptr<ResultSet>& rows);
165 
167  public:
168  ArrowResultSetConverter(const std::shared_ptr<ResultSet>& results,
169  const std::shared_ptr<Data_Namespace::DataMgr> data_mgr,
170  const ExecutorDeviceType device_type,
171  const int32_t device_id,
172  const std::vector<std::string>& col_names,
173  const int32_t first_n)
174  : results_(results)
175  , data_mgr_(data_mgr)
176  , device_type_(device_type)
177  , device_id_(device_id)
178  , col_names_(col_names)
179  , top_n_(first_n) {}
180 
182 
183  private:
184  ArrowResultSetConverter(const std::shared_ptr<ResultSet>& results,
185  const std::vector<std::string>& col_names,
186  const int32_t first_n)
187  : results_(results), col_names_(col_names), top_n_(first_n) {}
188  std::shared_ptr<arrow::RecordBatch> convertToArrow(
189  arrow::ipc::DictionaryMemo& memo) const;
190  std::shared_ptr<arrow::RecordBatch> getArrowBatch(
191  const std::shared_ptr<arrow::Schema>& schema) const;
193  std::shared_ptr<arrow::Field> makeField(
194  const std::string name,
195  const SQLTypeInfo& target_type,
196  const std::shared_ptr<arrow::Array>& dictionary) const;
197  std::shared_ptr<arrow::DataType> getArrowType(
198  const SQLTypeInfo& mapd_type,
199  const std::shared_ptr<arrow::Array>& dict_values) const;
200 
202  std::shared_ptr<arrow::Buffer> schema;
203  std::shared_ptr<arrow::Buffer> records;
204  };
206 
207  struct ColumnBuilder {
208  std::shared_ptr<arrow::Field> field;
209  std::unique_ptr<arrow::ArrayBuilder> builder;
212  };
213  void initializeColumnBuilder(ColumnBuilder& column_builder,
214  const SQLTypeInfo& col_type,
215  const std::shared_ptr<arrow::Field>& field) const;
216  inline void reserveColumnBuilderSize(ColumnBuilder& column_builder,
217  const size_t row_count) const;
218  void append(ColumnBuilder& column_builder,
219  const ValueArray& values,
220  const std::shared_ptr<std::vector<bool>>& is_valid) const;
221 
222  template <typename BuilderType, typename C_TYPE>
223  inline void appendToColumnBuilder(
224  ColumnBuilder& column_builder,
225  const ValueArray& values,
226  const std::shared_ptr<std::vector<bool>>& is_valid) const;
227 
228  inline std::shared_ptr<arrow::Array> finishColumnBuilder(
229  ColumnBuilder& column_builder) const;
230 
231  std::shared_ptr<ResultSet> results_;
232  std::shared_ptr<Data_Namespace::DataMgr> data_mgr_ = nullptr;
234  int32_t device_id_ = 0;
235  std::vector<std::string> col_names_;
236  int32_t top_n_;
237 
238  friend class ArrowResultSet;
239 };
240 
241 template <typename T>
242 constexpr auto scale_epoch_values() {
243  return std::is_same<T, arrow::Date32Builder>::value ||
244  std::is_same<T, arrow::Date64Builder>::value;
245 }
std::unique_ptr< arrow::ArrayBuilder > builder
ArrowResultSetRowIterator & operator++(void)
SQLTypes
Definition: sqltypes.h:41
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
ArrowResult getArrowResult() const
std::vector< char > sm_handle
ExecutorDeviceType
std::shared_ptr< ResultSet > rows_
size_t rowCount() const
SQLTypeInfo getColType(const size_t col_idx) const
ArrowResultSet(const std::shared_ptr< ResultSet > &rows)
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type, const std::shared_ptr< arrow::Array > &dictionary) const
std::shared_ptr< arrow::Field > field
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< float >, std::vector< double >, std::vector< std::string >> ValueArray
void reserveColumnBuilderSize(ColumnBuilder &column_builder, const size_t row_count) const
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::vector< std::string > col_names_
std::shared_ptr< arrow::Buffer > records
std::vector< TargetValue > & reference
int8_t * df_dev_ptr
bool operator==(const ArrowResultSetRowIterator &other) const
ExecutorDeviceType device_type_
std::shared_ptr< arrow::DataType > getArrowType(const SQLTypeInfo &mapd_type, const std::shared_ptr< arrow::Array > &dict_values) const
std::vector< char > df_handle
std::vector< TargetValue > getNextRow(const bool translate_strings, const bool decimal_to_double) const
void appendToColumnBuilder(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
std::vector< TargetValue > * pointer
std::ptrdiff_t difference_type
ArrowResultSetConverter(const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::input_iterator_tag iterator_category
size_t colCount() const
std::shared_ptr< ResultSet > results_
std::unique_ptr< ArrowResultSet > result_set_arrow_loopback(const ExecutionResult &results)
int64_t sm_size
ArrowResultSetRowIterator(const ArrowResultSet *rs)
std::vector< TargetValue > value_type
value_type operator*() const
std::vector< TargetMetaInfo > column_metainfo_
int64_t df_size
bool operator!=(const ArrowResultSetRowIterator &other) const
bool definitelyHasNoRows() const
ArrowResultSetRowIterator operator++(int)
void appendValue(std::vector< TargetValue > &row, const arrow::Array &column, const Type null_val, const size_t idx) const
ArrowResult getArrowResultImpl() const
void resultSetArrowLoopback()
Basic constructors and methods of the row set interface.
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const std::shared_ptr< arrow::Field > &field) const
SerializedArrowOutput getSerializedArrowOutput() const
std::shared_ptr< arrow::RecordBatch > record_batch_
const ArrowResultSet * result_set_
ArrowResultSet(const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta)
std::shared_ptr< arrow::Buffer > schema
std::shared_ptr< arrow::RecordBatch > convertToArrow(arrow::ipc::DictionaryMemo &memo) const
std::vector< TargetMetaInfo > targets_meta_
constexpr auto scale_epoch_values()
std::vector< std::shared_ptr< arrow::Array > > columns_
ArrowResultSetConverter(const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n)