23 #include <parquet/schema.h> 24 #include <parquet/types.h> 34 const size_t omnisci_data_type_byte_size,
35 const size_t parquet_data_type_byte_size)
56 const int16_t* rep_levels,
57 const int64_t values_read,
58 const int64_t levels_read,
59 const bool is_last_batch,
60 int8_t* values)
override {
62 for (int64_t i = 0; i < values_read; ++i) {
68 if (values_read < levels_read) {
76 for (int64_t i = levels_read - 1; i >= 0; --i) {
90 const int16_t* def_levels,
91 const int64_t values_read,
92 const int64_t levels_read,
93 const bool do_encoding) {
94 for (int64_t i = levels_read - 1, j = values_read - 1; i >= 0; --i) {
99 data_ptr + i * omnisci_data_type_byte_size_);
101 copy(data_ptr + (j--) * omnisci_data_type_byte_size_,
102 data_ptr + i * omnisci_data_type_byte_size_);
105 setNull(data_ptr + i * omnisci_data_type_byte_size_);
113 template <
typename V,
typename T>
118 const parquet::ColumnDescriptor* parquet_column_descriptor)
121 column_desciptor->columnType.get_size(),
122 parquet::GetTypeByteSize(parquet_column_descriptor->physical_type())) {}
125 const size_t omnisci_data_type_byte_size,
126 const size_t parquet_data_type_byte_size)
128 omnisci_data_type_byte_size,
129 parquet_data_type_byte_size) {}
138 const int16_t* rep_levels,
139 const int64_t values_read,
140 const int64_t levels_read,
141 const bool is_last_batch,
142 int8_t* values)
override {
143 if (std::is_same<V, T>::value && values_read == levels_read) {
144 if (!encodingIsIdentityForSameTypes()) {
145 for (int64_t i = 0; i < levels_read; ++i) {
147 values + i * omnisci_data_type_byte_size_);
153 def_levels, rep_levels, values_read, levels_read, is_last_batch, values);
158 int8_t* omnisci_data_bytes,
159 const size_t num_elements)
override {
160 auto parquet_data_ptr =
reinterpret_cast<const T*
>(parquet_data_bytes);
161 auto omnisci_data_ptr =
reinterpret_cast<V*
>(omnisci_data_bytes);
162 for (
size_t i = 0; i < num_elements; ++i) {
163 encodeAndCopy(reinterpret_cast<const int8_t*>(&parquet_data_ptr[i]),
164 reinterpret_cast<int8_t*>(&omnisci_data_ptr[i]));
168 void setNull(int8_t* omnisci_data_bytes)
override {
169 auto& omnisci_data_value =
reinterpret_cast<V*
>(omnisci_data_bytes)[0];
170 omnisci_data_value = get_null_value<V>();
173 void copy(
const int8_t* omnisci_data_bytes_source,
174 int8_t* omnisci_data_bytes_destination)
override {
175 const auto& omnisci_data_value_source =
176 reinterpret_cast<const V*
>(omnisci_data_bytes_source)[0];
177 auto& omnisci_data_value_destination =
178 reinterpret_cast<V*
>(omnisci_data_bytes_destination)[0];
179 omnisci_data_value_destination = omnisci_data_value_source;
183 const parquet::RowGroupMetaData* group_metadata,
184 const int parquet_column_index,
187 auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
190 auto parquet_column_descriptor =
191 group_metadata->schema()->Column(parquet_column_index);
193 if (stats->HasMinMax()) {
195 if (
auto parquet_scalar_validator = dynamic_cast<ParquetMetadataValidator*>(
this)) {
197 parquet_scalar_validator->validate(
199 }
catch (
const std::exception& e) {
200 std::stringstream error_message;
201 error_message << e.what() <<
" Error validating statistics of Parquet column '" 202 << group_metadata->schema()->Column(parquet_column_index)->name()
204 throw std::runtime_error(error_message.str());
208 auto [stats_min, stats_max] = getEncodedStats(parquet_column_descriptor, stats);
209 auto updated_chunk_stats = getUpdatedStats(stats_min, stats_max, column_type);
210 metadata->fillChunkStats(updated_chunk_stats.min,
211 updated_chunk_stats.max,
212 metadata->chunkStats.has_nulls);
214 auto null_count = stats->null_count();
215 validateNullCount(group_metadata->schema()->Column(parquet_column_index)->name(),
218 metadata->chunkStats.has_nulls = null_count > 0;
222 metadata->numElements = group_metadata->num_rows();
231 T stats_min =
reinterpret_cast<T*
>(stats->EncodeMin().data())[0];
232 T stats_max =
reinterpret_cast<T*
>(stats->EncodeMax().data())[0];
233 return {stats_min, stats_max};
246 sizeof(V), reinterpret_cast<int8_t*>(&stats_min),
false,
DoNothingDeleter());
248 sizeof(V), reinterpret_cast<int8_t*>(&stats_max),
false,
DoNothingDeleter());
249 std::vector<ArrayDatum> min_max_datums{min_datum, max_datum};
250 encoder->updateStats(&min_max_datums, 0, 1);
252 encoder->updateStats(reinterpret_cast<int8_t*>(&stats_min), 1);
253 encoder->updateStats(reinterpret_cast<int8_t*>(&stats_max), 1);
255 auto updated_chunk_stats_metadata = std::make_shared<ChunkMetadata>();
256 encoder->getMetadata(updated_chunk_stats_metadata);
257 return updated_chunk_stats_metadata->chunkStats;
261 const parquet::ColumnDescriptor* parquet_column_descriptor,
262 std::shared_ptr<parquet::Statistics> stats) {
263 V stats_min, stats_max;
264 auto min_string = stats->EncodeMin();
265 auto max_string = stats->EncodeMax();
266 if (parquet_column_descriptor->physical_type() ==
267 parquet::Type::FIXED_LEN_BYTE_ARRAY) {
268 parquet::FixedLenByteArray min_byte_array, max_byte_array;
269 min_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(min_string.data());
270 max_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(max_string.data());
272 reinterpret_cast<int8_t*>(&stats_min));
274 reinterpret_cast<int8_t*>(&stats_max));
275 }
else if (parquet_column_descriptor->physical_type() == parquet::Type::BYTE_ARRAY) {
276 parquet::ByteArray min_byte_array, max_byte_array;
277 min_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(min_string.data());
278 min_byte_array.len = min_string.length();
279 max_byte_array.ptr =
reinterpret_cast<const uint8_t*
>(max_string.data());
280 max_byte_array.len = max_string.length();
282 reinterpret_cast<int8_t*>(&stats_min));
284 reinterpret_cast<int8_t*>(&stats_max));
287 reinterpret_cast<int8_t*>(&stats_min));
289 reinterpret_cast<int8_t*>(&stats_max));
291 return {stats_min, stats_max};
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
ParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
static ChunkStats getUpdatedStats(V &stats_min, V &stats_max, const SQLTypeInfo &column_type)
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
std::pair< V, V > getEncodedStats(const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
virtual bool encodingIsIdentityForSameTypes() const
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setNull(int8_t *omnisci_data_bytes) override
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
static void validateNullCount(const std::string &parquet_column_name, int64_t null_count, const SQLTypeInfo &column_type)
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const ColumnDescriptor *column_desciptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
specifies the content in-memory of a row in the column metadata table
Encoder * getEncoder() const
static std::shared_ptr< ChunkMetadata > createMetadata(const SQLTypeInfo &column_type)
const size_t parquet_data_type_byte_size_
void decodeNullsAndEncodeData(int8_t *data_ptr, const int16_t *def_levels, const int64_t values_read, const int64_t levels_read, const bool do_encoding)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
SQLTypeInfo get_elem_type() const
const size_t omnisci_data_type_byte_size_
std::pair< T, T > getUnencodedStats(std::shared_ptr< parquet::Statistics > stats) const
Data_Namespace::AbstractBuffer * buffer_
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0