16 #ifndef ARROW_IMPORTER_H
17 #define ARROW_IMPORTER_H
24 #include <arrow/api.h>
25 #include <arrow/io/api.h>
26 #include <boost/algorithm/string.hpp>
27 #include <boost/variant.hpp>
37 using std::runtime_error::runtime_error;
40 template <
typename T = ArrowImporterException>
44 static std::mutex mtx;
45 std::unique_lock<std::mutex> lock(mtx);
51 #ifdef ENABLE_IMPORT_PARQUET
52 #include <parquet/api/reader.h>
53 #include <parquet/api/writer.h>
54 #include <parquet/arrow/reader.h>
55 #include <parquet/exception.h>
56 #endif // ENABLE_IMPORT_PARQUET
58 #include "arrow/util/decimal.h"
63 boost::variant<bool, float, double, int64_t, std::string, void*, arrow::Decimal128>;
69 typename std::enable_if_t<std::is_integral<T>::value && !std::is_same<T, bool>::value,
74 #define exprtype(expr) std::decay_t<decltype(expr)>
78 return bad_rows_tracker ?
"File " + bad_rows_tracker->
file_name +
", row-group " +
80 (cd ?
", column " + cd->
columnName +
": " :
"")
84 template <
typename SrcType,
typename DstType>
86 using ArrayType =
typename arrow::TypeTraits<SrcType>::ArrayType;
87 return (DstType)
static_cast<const ArrayType&
>(array).
Value(idx);
90 template <
typename SrcType>
92 using ArrayType =
typename arrow::TypeTraits<SrcType>::ArrayType;
93 return static_cast<const ArrayType&
>(array).GetString(idx);
96 #define NUMERIC_CASE(tid, src_type, var_type) \
97 case arrow::Type::tid: \
98 return get_numeric_value<src_type, var_type>;
99 #define STRING_CASE(tid, src_type) \
100 case arrow::Type::tid: \
101 return get_string_value<src_type>;
106 switch (array.type_id()) {
123 NUMERIC_CASE(DECIMAL, arrow::Decimal128Type, arrow::Decimal128)
129 array.type()->name() +
" is not supported");
139 "Invalid type conversion from parquet " + pt +
" type to " +
143 template <
typename DATA_TYPE,
typename VALUE_TYPE>
149 "Invalid data conversion from parquet value " +
std::to_string(v) +
158 "Invalid data conversion from parquet string '" + v +
"' to " +
174 const arrow::Array& array,
178 , bad_rows_tracker(bad_rows_tracker)
179 , arrow_type(*array.
type())
180 , arrow_decimal_scale(
181 arrow_type.id() == arrow::
Type::DECIMAL
182 ? static_cast<const arrow::Decimal128Type&>(arrow_type).scale()
184 , old_type(cd->columnType.get_type(),
185 cd->columnType.get_dimension(),
188 , new_type(cd->columnType.get_type(),
189 cd->columnType.get_dimension(),
190 cd->columnType.get_scale(),
194 template <
typename DATA_TYPE>
198 const arrow::Array& array,
199 std::vector<DATA_TYPE>& buffer,
209 static const std::map<std::pair<int32_t, arrow::TimeUnit::type>,
210 std::pair<SQLOps, int64_t>>
212 {{0, arrow::TimeUnit::MILLI}, {
kDIVIDE, kMillisecondsInSecond}},
213 {{0, arrow::TimeUnit::MICRO}, {
kDIVIDE, kMicrosecondsInSecond}},
214 {{0, arrow::TimeUnit::NANO}, {
kDIVIDE, kNanosecondsinSecond}},
215 {{3, arrow::TimeUnit::SECOND}, {
kMULTIPLY, kMicrosecondsInSecond}},
216 {{3, arrow::TimeUnit::MICRO}, {
kDIVIDE, kMillisecondsInSecond}},
217 {{3, arrow::TimeUnit::NANO}, {
kDIVIDE, kMicrosecondsInSecond}},
218 {{6, arrow::TimeUnit::SECOND}, {
kMULTIPLY, kMicrosecondsInSecond}},
219 {{6, arrow::TimeUnit::MILLI}, {
kMULTIPLY, kMillisecondsInSecond}},
220 {{6, arrow::TimeUnit::NANO}, {
kDIVIDE, kMillisecondsInSecond}},
221 {{9, arrow::TimeUnit::SECOND}, {
kMULTIPLY, kNanosecondsinSecond}},
222 {{9, arrow::TimeUnit::MILLI}, {
kMULTIPLY, kMicrosecondsInSecond}},
223 {{9, arrow::TimeUnit::MICRO}, {
kMULTIPLY, kMillisecondsInSecond}}};
226 template <
typename VALUE_TYPE>
234 , dimension(data.cd->columnType.is_high_precision_timestamp()
235 ? data.cd->columnType.get_dimension()
237 template <bool enabled = std::is_integral<VALUE_TYPE>::value>
238 int64_t
resolve_time(
const VALUE_TYPE& v, std::enable_if_t<enabled>* = 0)
const {
239 const auto& type_id = data.arrow_type.id();
240 if (type_id == arrow::Type::DATE32 || type_id == arrow::Type::DATE64) {
241 auto& date_type =
static_cast<const arrow::DateType&
>(data.arrow_type);
242 switch (date_type.unit()) {
243 case arrow::DateUnit::DAY:
245 case arrow::DateUnit::MILLI:
248 }
else if (type_id == arrow::Type::TIME32 || type_id == arrow::Type::TIME64 ||
249 type_id == arrow::Type::TIMESTAMP) {
250 auto& time_type =
static_cast<const arrow::TimeType&
>(data.arrow_type);
254 const auto scale =
result->second;
255 return scale.first ==
kMULTIPLY ? v * scale.second : v / scale.second;
260 UNREACHABLE() << data.arrow_type <<
" is not a valid Arrow time or date type";
263 template <bool enabled = std::is_integral<VALUE_TYPE>::value>
264 int64_t
resolve_time(
const VALUE_TYPE& v, std::enable_if_t<!enabled>* = 0)
const {
265 static_assert(enabled,
"unreachable");
270 template <
typename VALUE_TYPE>
278 template <
typename DATA_TYPE,
typename = enable_if_
integral<DATA_TYPE>>
279 explicit operator const DATA_TYPE()
const {
282 template <
typename DATA_TYPE,
typename = enable_if_
floating<DATA_TYPE>>
283 explicit operator DATA_TYPE()
const {
286 explicit operator const std::string()
const {
return std::string(); }
294 template <
typename DATA_TYPE,
typename = enable_if_
integral<DATA_TYPE>>
295 explicit operator const DATA_TYPE()
const {
296 if (!(data.cd->columnType.is_number() || data.cd->columnType.is_boolean())) {
301 template <
typename DATA_TYPE,
typename = enable_if_
floating<DATA_TYPE>>
302 explicit operator DATA_TYPE()
const {
305 explicit operator const std::string()
const {
return v ?
"T" :
"F"; }
313 template <
typename DATA_TYPE,
typename = enable_if_
integral<DATA_TYPE>>
314 explicit operator const DATA_TYPE()
const {
315 const auto ti = data.cd->columnType;
316 DATA_TYPE v = ti.is_decimal() ? this->v * pow(10, ti.get_scale()) : this->v;
317 if (!(std::numeric_limits<DATA_TYPE>::lowest() < v &&
318 v <= std::numeric_limits<DATA_TYPE>::max())) {
319 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
323 template <
typename DATA_TYPE,
typename = enable_if_
floating<DATA_TYPE>>
324 explicit operator DATA_TYPE()
const {
335 template <
typename DATA_TYPE,
typename = enable_if_
integral<DATA_TYPE>>
336 explicit operator const DATA_TYPE()
const {
337 const auto ti = data.cd->columnType;
338 DATA_TYPE v = ti.is_decimal() ? this->v * pow(10, ti.get_scale()) : this->v;
339 if (!(std::numeric_limits<DATA_TYPE>::lowest() < v &&
340 v <= std::numeric_limits<DATA_TYPE>::max())) {
341 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
345 template <
typename DATA_TYPE,
typename = enable_if_
floating<DATA_TYPE>>
346 explicit operator DATA_TYPE()
const {
347 if (std::is_same<DATA_TYPE, float>::value) {
348 if (!(std::numeric_limits<float>::lowest() < v &&
349 v <= std::numeric_limits<float>::max())) {
350 data_conversion_error<float>(v, data.cd, data.bad_rows_tracker);
363 template <
typename DATA_TYPE,
typename = enable_if_
integral<DATA_TYPE>>
364 explicit operator const DATA_TYPE()
const {
366 if (std::is_same<int64_t, DATA_TYPE>::value) {
367 }
else if (std::numeric_limits<DATA_TYPE>::lowest() < v &&
368 v <= std::numeric_limits<DATA_TYPE>::max()) {
370 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
372 if (data.cd->columnType.is_time()) {
373 v = this->resolve_time(v);
377 template <
typename DATA_TYPE,
typename = enable_if_
floating<DATA_TYPE>>
378 explicit operator DATA_TYPE()
const {
381 explicit operator const std::string()
const {
382 const auto& type_id = data.arrow_type.id();
383 if (type_id == arrow::Type::DATE32 || type_id == arrow::Type::DATE64) {
384 auto& date_type =
static_cast<const arrow::DateType&
>(data.arrow_type);
386 Datum datum{.
bigintval = date_type.unit() == arrow::DateUnit::MILLI
387 ? v / kMicrosecondsInSecond
390 }
else if (type_id == arrow::Type::TIME32 || type_id == arrow::Type::TIME64 ||
391 type_id == arrow::Type::TIMESTAMP) {
392 auto& time_type =
static_cast<const arrow::TimeType&
>(data.arrow_type);
397 divisor =
result->second.second;
402 if (divisor != 1 && v % divisor) {
416 explicit operator const bool()
const {
418 return inline_int_null_value<int8_t>();
423 return datum.boolval;
429 template <
typename DATA_TYPE,
typename = enable_if_
integral_not_
bool<DATA_TYPE>>
430 explicit operator const DATA_TYPE()
const {
435 auto ti = data.cd->columnType;
437 return datum.bigintval;
443 template <
typename DATA_TYPE,
typename = enable_if_
floating<DATA_TYPE>>
444 explicit operator DATA_TYPE()
const {
445 return atof(v.data());
447 explicit operator const std::string()
const {
return v; }
458 "Truncation error on Arrow Decimal128 value");
460 template <
typename DATA_TYPE,
typename = enable_if_
integral<DATA_TYPE>>
461 explicit operator const DATA_TYPE()
const {
462 int64_t v =
static_cast<int64_t
>(this->v);
463 if (data.cd->columnType.is_decimal()) {
466 if (data.arrow_decimal_scale) {
467 v = std::llround(v / pow(10, data.arrow_decimal_scale));
469 if (std::is_same<int64_t, DATA_TYPE>::value) {
470 }
else if (std::numeric_limits<DATA_TYPE>::lowest() < v &&
471 v <= std::numeric_limits<DATA_TYPE>::max()) {
473 data_conversion_error<DATA_TYPE>(v, data.cd, data.bad_rows_tracker);
477 template <
typename DATA_TYPE,
typename = enable_if_
floating<DATA_TYPE>>
478 explicit operator DATA_TYPE()
const {
479 int64_t v =
static_cast<int64_t
>(this->v);
480 return data.arrow_decimal_scale ? v / pow(10, data.arrow_decimal_scale) : v;
482 explicit operator const std::string()
const {
483 return v.ToString(data.arrow_decimal_scale);
488 template <
typename DATA_TYPE>
489 inline auto& operator<<(DataBuffer<DATA_TYPE>& data,
const VarValue& var) {
490 boost::apply_visitor(
491 [&data](
const auto& v) {
499 #endif // ARROW_IMPORTER_H
constexpr int64_t kMillisecondsInSecond
void data_conversion_error(const VALUE_TYPE v, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
constexpr int64_t kMicrosecondsInSecond
boost::variant< bool, float, double, int64_t, std::string, void *, arrow::Decimal128 > VarValue
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
const DataBufferBase & data
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
const arrow::Array & array
DataBufferBase(const ColumnDescriptor *cd, const arrow::Array &array, import_export::BadRowsTracker *const bad_rows_tracker)
auto value_getter(const arrow::Array &array, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
Constants for Builtin SQL Types supported by OmniSci.
const int arrow_decimal_scale
#define NUMERIC_CASE(tid, src_type, var_type)
double inline_fp_null_val(const SQL_TYPE_INFO &ti)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
static const std::map< std::pair< int32_t, arrow::TimeUnit::type >, std::pair< SQLOps, int64_t > > _precision_scale_lookup
const SQLTypeInfo new_type
typename std::enable_if_t< std::is_floating_point< T >::value, T > enable_if_floating
void type_conversion_error(const std::string pt, const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
import_export::BadRowsTracker *const bad_rows_tracker
DataBuffer(const ColumnDescriptor *cd, const arrow::Array &array, std::vector< DATA_TYPE > &buffer, import_export::BadRowsTracker *const bad_rows_tracker)
Datum StringToDatum(std::string_view s, SQLTypeInfo &ti)
specifies the content in-memory of a row in the column metadata table
std::vector< DATA_TYPE > & buffer
bool g_enable_smem_group_by true
VarValue get_string_value(const arrow::Array &array, const int64_t idx)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
ArrowValue(const DataBufferBase &data, const VALUE_TYPE &v)
constexpr int32_t kSecondsInDay
int64_t convert_decimal_value_to_scale(const int64_t decimal_value, const SQLTypeInfo &type_info, const SQLTypeInfo &new_type_info)
int64_t resolve_time(const VALUE_TYPE &v, std::enable_if_t< enabled > *=0) const
std::string get_type_name() const
typename std::enable_if_t< std::is_integral< T >::value, T > enable_if_integral
#define STRING_CASE(tid, src_type)
int64_t resolve_time(const VALUE_TYPE &v, std::enable_if_t<!enabled > *=0) const
const arrow::DataType & arrow_type
VarValue get_numeric_value(const arrow::Array &array, const int64_t idx)
int64_t inline_fixed_encoding_null_val(const SQL_TYPE_INFO &ti)
typename std::enable_if_t< std::is_integral< T >::value &&!std::is_same< T, bool >::value, T > enable_if_integral_not_bool
const SQLTypeInfo old_type
std::string error_context(const ColumnDescriptor *cd, import_export::BadRowsTracker *const bad_rows_tracker)
constexpr int64_t kNanosecondsinSecond
const ColumnDescriptor * cd
ArrowValueBase(const DataBufferBase &data, const VALUE_TYPE &v)
void arrow_throw_if(const bool cond, const std::string &message)
arrow::Decimal128 VALUE_TYPE