OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowResultSet.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "CompilationOptions.h"
20 #include "DataMgr/DataMgr.h"
22 #include "ResultSet.h"
23 #include "TargetMetaInfo.h"
24 #include "TargetValue.h"
25 
26 #include <type_traits>
27 
28 #include "arrow/api.h"
29 #include "arrow/ipc/api.h"
30 #ifdef HAVE_CUDA
31 #include <arrow/gpu/cuda_api.h>
32 #endif // HAVE_CUDA
33 
34 static_assert(ARROW_VERSION >= 16000, "Apache Arrow v0.16.0 or above is required.");
35 
36 // TODO(wamsi): ValueArray is not optimal. Remove it and inherrit from base vector class.
37 using ValueArray = boost::variant<std::vector<bool>,
38  std::vector<int8_t>,
39  std::vector<int16_t>,
40  std::vector<int32_t>,
41  std::vector<int64_t>,
42  std::vector<arrow::Decimal128>,
43  std::vector<float>,
44  std::vector<double>,
45  std::vector<std::string>,
46  std::vector<std::vector<int8_t>>,
47  std::vector<std::vector<int16_t>>,
48  std::vector<std::vector<int32_t>>,
49  std::vector<std::vector<int64_t>>,
50  std::vector<std::vector<float>>,
51  std::vector<std::vector<double>>,
52  std::vector<std::vector<std::string>>>;
53 
54 template <typename T>
55 using Vec2 = std::vector<std::vector<T>>;
56 
57 class ArrowResultSet;
58 
60  public:
61  using value_type = std::vector<TargetValue>;
62  using difference_type = std::ptrdiff_t;
63  using pointer = std::vector<TargetValue>*;
64  using reference = std::vector<TargetValue>&;
65  using iterator_category = std::input_iterator_tag;
66 
67  bool operator==(const ArrowResultSetRowIterator& other) const {
68  return result_set_ == other.result_set_ && crt_row_idx_ == other.crt_row_idx_;
69  }
70  bool operator!=(const ArrowResultSetRowIterator& other) const {
71  return !(*this == other);
72  }
73 
74  inline value_type operator*() const;
76  crt_row_idx_++;
77  return *this;
78  }
80  ArrowResultSetRowIterator iter(*this);
81  ++(*this);
82  return iter;
83  }
84 
85  private:
87  size_t crt_row_idx_;
88 
91 
92  friend class ArrowResultSet;
93 };
94 
95 enum class ArrowTransport { SHARED_MEMORY = 0, WIRE = 1 };
96 
97 struct ArrowResult {
98  std::vector<char> sm_handle;
99  int64_t sm_size;
100  std::vector<char> df_handle;
101  int64_t df_size;
102  std::string serialized_cuda_handle; // Only for GPU memory deallocation
103  std::vector<char> df_buffer; // Only present when transport is WIRE
104 };
105 
106 // Expose Arrow buffers as a subset of the ResultSet interface
107 // to make it work within the existing execution test framework.
109  public:
110  ArrowResultSet(const std::shared_ptr<ResultSet>& rows,
111  const std::vector<TargetMetaInfo>& targets_meta,
113 
115  const std::shared_ptr<ResultSet>& rows,
116  const std::vector<TargetMetaInfo>& targets_meta,
118  const size_t min_result_size_for_bulk_dictionary_fetch,
119  const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
120 
121  ArrowResultSet(const std::shared_ptr<ResultSet>& rows,
122  const ExecutorDeviceType device_type = ExecutorDeviceType::CPU)
123  : ArrowResultSet(rows, {}, device_type) {}
124 
125  ArrowResultSetRowIterator rowIterator(size_t from_index,
126  bool translate_strings,
127  bool decimal_to_double) const {
128  ArrowResultSetRowIterator iter(this);
129  for (size_t i = 0; i < from_index; i++) {
130  ++iter;
131  }
132 
133  return iter;
134  }
135 
136  ArrowResultSetRowIterator rowIterator(bool translate_strings,
137  bool decimal_to_double) const {
138  return rowIterator(0, translate_strings, decimal_to_double);
139  }
140 
141  std::vector<std::string> getDictionaryStrings(const size_t col_idx) const;
142 
143  std::vector<TargetValue> getRowAt(const size_t index) const;
144 
145  std::vector<TargetValue> getNextRow(const bool translate_strings,
146  const bool decimal_to_double) const;
147 
148  size_t colCount() const;
149 
150  SQLTypeInfo getColType(const size_t col_idx) const;
151 
152  bool definitelyHasNoRows() const;
153 
154  size_t rowCount() const;
155  size_t entryCount() const;
156 
157  bool isEmpty() const;
158 
159  static void deallocateArrowResultBuffer(
160  const ArrowResult& result,
161  const ExecutorDeviceType device_type,
162  const size_t device_id,
163  std::shared_ptr<Data_Namespace::DataMgr>& data_mgr);
164 
165  private:
167  const ExecutorDeviceType device_type = ExecutorDeviceType::CPU);
168 
170  const ExecutorDeviceType device_type,
171  const size_t min_result_size_for_bulk_dictionary_fetch,
172  const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
173 
174  template <typename Type, typename ArrayType>
175  void appendValue(std::vector<TargetValue>& row,
176  const arrow::Array& column,
177  const Type null_val,
178  const size_t idx) const;
179 
180  std::shared_ptr<ArrowResult> results_;
181  std::shared_ptr<ResultSet> rows_;
182  std::vector<TargetMetaInfo> targets_meta_;
183  std::shared_ptr<arrow::RecordBatch> record_batch_;
184  arrow::ipc::DictionaryMemo dictionary_memo_;
185 
186  // Boxed arrays from the record batch. The result of RecordBatch::column is
187  // temporary, so we cache these for better performance
188  std::vector<std::shared_ptr<arrow::Array>> columns_;
189  mutable size_t crt_row_idx_;
190  std::vector<TargetMetaInfo> column_metainfo_;
191 };
192 
195 }
196 
197 class ExecutionResult;
198 
199 // The following result_set_arrow_loopback methods are used by our test
200 // framework (ExecuteTest specifically) to take results from the executor,
201 // serialize them to Arrow and then deserialize them to an ArrowResultSet,
202 // which can then be used by the test framework.
203 
204 std::unique_ptr<ArrowResultSet> result_set_arrow_loopback(const ExecutionResult& results);
205 
206 std::unique_ptr<ArrowResultSet> result_set_arrow_loopback(
207  const ExecutionResult* results,
208  const std::shared_ptr<ResultSet>& rows,
209  const ExecutorDeviceType device_type = ExecutorDeviceType::CPU);
210 
211 // This version of result_set_arrow_loopback allows setting the parameters that
212 // drive the choice between dense and sparse dictionary conversion, used for
213 // Select.ArrowDictionaries tests in ExecuteTest
214 
215 std::unique_ptr<ArrowResultSet> result_set_arrow_loopback(
216  const ExecutionResult* results,
217  const std::shared_ptr<ResultSet>& rows,
218  const ExecutorDeviceType device_type,
219  const size_t min_result_size_for_bulk_dictionary_fetch,
220  const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch);
221 
225  INVALID
226 };
227 
229  public:
230  static constexpr size_t default_min_result_size_for_bulk_dictionary_fetch{10000UL};
231  static constexpr double
233 
234  ArrowResultSetConverter(const std::shared_ptr<ResultSet>& results,
235  const std::shared_ptr<Data_Namespace::DataMgr> data_mgr,
236  const ExecutorDeviceType device_type,
237  const int32_t device_id,
238  const std::vector<std::string>& col_names,
239  const int32_t first_n,
240  const ArrowTransport transport_method)
241  : results_(results)
242  , data_mgr_(data_mgr)
243  , device_type_(device_type)
244  , device_id_(device_id)
245  , col_names_(col_names)
246  , top_n_(first_n)
247  , transport_method_(transport_method)
253 
255  const std::shared_ptr<ResultSet>& results,
256  const std::shared_ptr<Data_Namespace::DataMgr> data_mgr,
257  const ExecutorDeviceType device_type,
258  const int32_t device_id,
259  const std::vector<std::string>& col_names,
260  const int32_t first_n,
261  const ArrowTransport transport_method,
262  const size_t min_result_size_for_bulk_dictionary_fetch,
263  const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
264  : results_(results)
265  , data_mgr_(data_mgr)
266  , device_type_(device_type)
267  , device_id_(device_id)
268  , col_names_(col_names)
269  , top_n_(first_n)
270  , transport_method_(transport_method)
272  min_result_size_for_bulk_dictionary_fetch)
274  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch) {}
275 
276  ArrowResult getArrowResult() const;
277 
278  // TODO(adb): Proper namespacing for this set of functionality. For now, make this
279  // public and leverage the converter class as namespace
280  struct ColumnBuilder {
281  using StrId = int32_t;
282  using ArrowStrId = int32_t;
283 
284  std::shared_ptr<arrow::Field> field;
285  std::unique_ptr<arrow::ArrayBuilder> builder;
286  std::shared_ptr<arrow::StringArray> string_array;
290  std::unordered_map<StrId, ArrowStrId> string_remapping;
291  };
292 
293  ArrowResultSetConverter(const std::shared_ptr<ResultSet>& results,
294  const std::vector<std::string>& col_names,
295  const int32_t first_n)
296  : results_(results)
297  , col_names_(col_names)
298  , top_n_(first_n)
304 
306  const std::shared_ptr<ResultSet>& results,
307  const std::vector<std::string>& col_names,
308  const int32_t first_n,
309  const size_t min_result_size_for_bulk_dictionary_fetch,
310  const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
311  : results_(results)
312  , col_names_(col_names)
313  , top_n_(first_n)
315  min_result_size_for_bulk_dictionary_fetch)
317  max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch) {}
318 
319  std::shared_ptr<arrow::RecordBatch> convertToArrow() const;
320 
321  private:
322  std::shared_ptr<arrow::RecordBatch> getArrowBatch(
323  const std::shared_ptr<arrow::Schema>& schema) const;
324 
325  std::shared_ptr<arrow::Field> makeField(const std::string name,
326  const SQLTypeInfo& target_type) const;
327 
329  std::shared_ptr<arrow::Buffer> schema;
330  std::shared_ptr<arrow::Buffer> records;
331  };
333  arrow::ipc::DictionaryFieldMapper* mapper) const;
334 
335  void initializeColumnBuilder(ColumnBuilder& column_builder,
336  const SQLTypeInfo& col_type,
337  const size_t result_col_idx,
338  const std::shared_ptr<arrow::Field>& field) const;
339 
340  void append(ColumnBuilder& column_builder,
341  const ValueArray& values,
342  const std::shared_ptr<std::vector<bool>>& is_valid) const;
343 
344  inline std::shared_ptr<arrow::Array> finishColumnBuilder(
345  ColumnBuilder& column_builder) const;
346 
347  std::shared_ptr<ResultSet> results_;
348  std::shared_ptr<Data_Namespace::DataMgr> data_mgr_ = nullptr;
350  int32_t device_id_ = 0;
351  std::vector<std::string> col_names_;
352  int32_t top_n_;
356  friend class ArrowResultSet;
357 };
358 
359 template <typename T>
360 constexpr auto scale_epoch_values() {
361  return std::is_same<T, arrow::Date32Builder>::value ||
362  std::is_same<T, arrow::Date64Builder>::value;
363 }
std::vector< std::vector< T >> Vec2
const size_t min_result_size_for_bulk_dictionary_fetch_
std::unique_ptr< arrow::ArrayBuilder > builder
std::shared_ptr< arrow::StringArray > string_array
ArrowResultSetRowIterator & operator++(void)
SQLTypes
Definition: sqltypes.h:65
double decimal_to_double(const SQLTypeInfo &otype, int64_t oval)
void append(ColumnBuilder &column_builder, const ValueArray &values, const std::shared_ptr< std::vector< bool >> &is_valid) const
ArrowResult getArrowResult() const
std::vector< char > sm_handle
void initializeColumnBuilder(ColumnBuilder &column_builder, const SQLTypeInfo &col_type, const size_t result_col_idx, const std::shared_ptr< arrow::Field > &field) const
std::shared_ptr< ArrowResult > results_
std::shared_ptr< ResultSet > rows_
size_t rowCount() const
const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch_
SQLTypeInfo getColType(const size_t col_idx) const
ArrowTransport
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
ArrowStringRemapMode
std::shared_ptr< arrow::Array > finishColumnBuilder(ColumnBuilder &column_builder) const
boost::variant< std::vector< bool >, std::vector< int8_t >, std::vector< int16_t >, std::vector< int32_t >, std::vector< int64_t >, std::vector< arrow::Decimal128 >, std::vector< float >, std::vector< double >, std::vector< std::string >, std::vector< std::vector< int8_t >>, std::vector< std::vector< int16_t >>, std::vector< std::vector< int32_t >>, std::vector< std::vector< int64_t >>, std::vector< std::vector< float >>, std::vector< std::vector< double >>, std::vector< std::vector< std::string >>> ValueArray
std::shared_ptr< arrow::Field > field
arrow::ipc::DictionaryMemo dictionary_memo_
High-level representation of SQL values.
ExecutorDeviceType
ArrowResultSet(const std::shared_ptr< ResultSet > &rows, const std::vector< TargetMetaInfo > &targets_meta, const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
tuple rows
Definition: report.py:114
ArrowTransport transport_method_
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:33
std::shared_ptr< arrow::RecordBatch > getArrowBatch(const std::shared_ptr< arrow::Schema > &schema) const
std::vector< std::string > col_names_
std::shared_ptr< arrow::Buffer > records
std::vector< TargetValue > & reference
ArrowResultSetConverter(const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method, const size_t min_result_size_for_bulk_dictionary_fetch, const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
bool operator==(const ArrowResultSetRowIterator &other) const
ExecutorDeviceType device_type_
size_t entryCount() const
std::vector< char > df_handle
std::shared_ptr< arrow::Field > makeField(const std::string name, const SQLTypeInfo &target_type) const
ArrowResultSet(const std::shared_ptr< ResultSet > &rows, const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
std::vector< TargetValue > getNextRow(const bool translate_strings, const bool decimal_to_double) const
SerializedArrowOutput getSerializedArrowOutput(arrow::ipc::DictionaryFieldMapper *mapper) const
std::vector< TargetValue > * pointer
std::ptrdiff_t difference_type
ArrowResultSetConverter(const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n)
static void deallocateArrowResultBuffer(const ArrowResult &result, const ExecutorDeviceType device_type, const size_t device_id, std::shared_ptr< Data_Namespace::DataMgr > &data_mgr)
std::input_iterator_tag iterator_category
std::unordered_map< StrId, ArrowStrId > string_remapping
size_t colCount() const
std::shared_ptr< ResultSet > results_
std::unique_ptr< ArrowResultSet > result_set_arrow_loopback(const ExecutionResult &results)
int64_t sm_size
ArrowResultSetRowIterator(const ArrowResultSet *rs)
std::vector< TargetValue > value_type
value_type operator*() const
std::string serialized_cuda_handle
std::vector< TargetMetaInfo > column_metainfo_
int64_t df_size
bool operator!=(const ArrowResultSetRowIterator &other) const
bool definitelyHasNoRows() const
ArrowResultSetRowIterator operator++(int)
ArrowResultSetConverter(const std::shared_ptr< ResultSet > &results, const std::vector< std::string > &col_names, const int32_t first_n, const size_t min_result_size_for_bulk_dictionary_fetch, const double max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch)
static constexpr size_t default_min_result_size_for_bulk_dictionary_fetch
static constexpr double default_max_dictionary_to_result_size_ratio_for_bulk_dictionary_fetch
void appendValue(std::vector< TargetValue > &row, const arrow::Array &column, const Type null_val, const size_t idx) const
Basic constructors and methods of the row set interface.
ArrowResultSetConverter(const std::shared_ptr< ResultSet > &results, const std::shared_ptr< Data_Namespace::DataMgr > data_mgr, const ExecutorDeviceType device_type, const int32_t device_id, const std::vector< std::string > &col_names, const int32_t first_n, const ArrowTransport transport_method)
bool isEmpty() const
void resultSetArrowLoopback(const ExecutorDeviceType device_type=ExecutorDeviceType::CPU)
std::vector< TargetValue > getRowAt(const size_t index) const
std::shared_ptr< arrow::RecordBatch > record_batch_
string name
Definition: setup.in.py:72
const ArrowResultSet * result_set_
std::shared_ptr< arrow::Buffer > schema
std::vector< TargetMetaInfo > targets_meta_
std::vector< char > df_buffer
constexpr auto scale_epoch_values()
std::vector< std::shared_ptr< arrow::Array > > columns_
std::shared_ptr< arrow::RecordBatch > convertToArrow() const