OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetInPlaceEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "ParquetEncoder.h"
21 #include "ParquetShared.h"
23 
24 #include <parquet/schema.h>
25 #include <parquet/types.h>
26 
28 #include "ForeignStorageBuffer.h"
29 
30 namespace foreign_storage {
31 
33  public:
35  const size_t omnisci_data_type_byte_size,
36  const size_t parquet_data_type_byte_size)
37  : ParquetScalarEncoder(buffer)
38  , omnisci_data_type_byte_size_(omnisci_data_type_byte_size)
39  , parquet_data_type_byte_size_(parquet_data_type_byte_size) {}
40 
41  virtual void reserve(const size_t num_elements) = 0;
42 
57  void appendData(const int16_t* def_levels,
58  const int16_t* rep_levels,
59  const int64_t values_read,
60  const int64_t levels_read,
61  int8_t* values) override {
63  for (int64_t i = 0; i < values_read; ++i) {
65  values + i * omnisci_data_type_byte_size_);
66  }
67  }
68 
69  if (values_read < levels_read) { // nulls exist
71  values,
72  def_levels,
73  values_read,
74  levels_read,
77  for (int64_t i = levels_read - 1; i >= 0; --i) {
79  values + i * omnisci_data_type_byte_size_);
80  }
81  }
82 
83  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
84  }
85 
86  protected:
89 
90  private:
91  void decodeNullsAndEncodeData(int8_t* data_ptr,
92  const int16_t* def_levels,
93  const int64_t values_read,
94  const int64_t levels_read,
95  const bool do_encoding) {
96  for (int64_t i = levels_read - 1, j = values_read - 1; i >= 0; --i) {
97  if (def_levels[i]) { // not null
98  CHECK(j >= 0);
99  if (do_encoding) {
100  encodeAndCopy(data_ptr + (j--) * parquet_data_type_byte_size_,
101  data_ptr + i * omnisci_data_type_byte_size_);
102  } else {
103  copy(data_ptr + (j--) * omnisci_data_type_byte_size_,
104  data_ptr + i * omnisci_data_type_byte_size_);
105  }
106  } else { // null
107  setNull(data_ptr + i * omnisci_data_type_byte_size_);
108  }
109  }
110  }
111 };
112 
113 template <typename V, typename T, typename NullType = V>
115  public:
117  const ColumnDescriptor* column_desciptor,
118  const parquet::ColumnDescriptor* parquet_column_descriptor)
120  buffer,
121  sizeof(V),
122  parquet::GetTypeByteSize(parquet_column_descriptor->physical_type()))
123  , current_batch_offset_(0) {
124  if (auto detect_buffer = dynamic_cast<TypedParquetDetectBuffer*>(buffer_)) {
126  }
127  }
128 
130  const size_t omnisci_data_type_byte_size,
131  const size_t parquet_data_type_byte_size)
132  : ParquetInPlaceEncoder(buffer, sizeof(V), parquet_data_type_byte_size)
133  , current_batch_offset_(0) {
134  if (auto detect_buffer = dynamic_cast<TypedParquetDetectBuffer*>(buffer_)) {
136  }
137  }
138 
139  void validate(const int8_t* parquet_data,
140  const int64_t j,
141  const SQLTypeInfo& column_type) const override {
142  // no-op by default
143  }
144 
145  std::string integralTypeToString(const V& element) const {
146  Datum d;
147  d.bigintval = element;
149  }
150 
151  bool isIntegralType(const SQLTypeInfo& type) const {
152  return type.is_timestamp() || type.is_time() || type.is_date() || type.is_boolean() ||
153  type.is_decimal() || type.is_integer();
154  }
155 
156  std::string elementToString(const V& element) const {
157  // handle specialized cases that require specific formating when converting to string
158  auto null_value = get_null_value<NullType>();
159  if (element == null_value) {
160  return "NULL";
161  }
163  return integralTypeToString(element);
164  }
165  return std::to_string(element);
166  }
167 
168  std::string encodedDataToString(const int8_t* bytes) const override {
169  const auto& element = reinterpret_cast<const V*>(bytes)[0];
170  return elementToString(element);
171  }
172 
174  auto detect_buffer = dynamic_cast<TypedParquetDetectBuffer*>(buffer_);
175  CHECK(detect_buffer);
176  std::function<std::string(const V&)> element_to_string = [this](const V& element) {
177  return this->elementToString(element);
178  };
179  detect_buffer->setConverterType<V>(element_to_string);
180  }
181 
182  void validateUsingEncodersColumnType(const int8_t* parquet_data,
183  const int64_t j) const override {
184  validate(parquet_data, j, column_type_);
185  }
186 
187  void reserve(const size_t num_append_elements) override {
188  buffer_->reserve(buffer_->size() + (num_append_elements * sizeof(V)));
189  }
190 
191  void appendDataTrackErrors(const int16_t* def_levels,
192  const int16_t* rep_levels,
193  const int64_t values_read,
194  const int64_t levels_read,
195  int8_t* values) override {
197  int64_t i, j;
198  for (i = 0, j = 0; i < levels_read; ++i) {
199  if (def_levels[i]) {
200  try {
201  CHECK(j < values_read);
202  validateUsingEncodersColumnType(values, j++);
203  } catch (const std::runtime_error& error) {
205  }
206  } else if (column_type_.get_notnull()) { // item is null for NOT NULL column
208  }
209  }
210  current_chunk_offset_ += levels_read;
211  appendData(def_levels, rep_levels, values_read, levels_read, values);
212  }
213 
214  // TODO: this member largely mirrors `appendDataTrackErrors` but is only used
215  // by the parquet-secific import FSI cut-over, and will be removed in the
216  // future
217  void validateAndAppendData(const int16_t* def_levels,
218  const int16_t* rep_levels,
219  const int64_t values_read,
220  const int64_t levels_read,
221  int8_t* values,
222  const SQLTypeInfo& column_type, /* may not be used */
223  InvalidRowGroupIndices& invalid_indices) override {
224  int64_t i, j;
225  for (i = 0, j = 0; i < levels_read; ++i) {
226  if (def_levels[i]) {
227  try {
228  CHECK(j < values_read);
229  validate(values, j++, column_type);
230  } catch (const std::runtime_error& error) {
231  invalid_indices.insert(current_batch_offset_ + i);
232  }
233  }
234  }
235  current_batch_offset_ += levels_read;
236  appendData(def_levels, rep_levels, values_read, levels_read, values);
237  }
238 
240  const InvalidRowGroupIndices& invalid_indices) override {
241  if (invalid_indices.empty()) {
242  return;
243  }
244  auto omnisci_data_values = reinterpret_cast<V*>(buffer_->getMemoryPtr());
246  size_t num_elements = buffer_->size() / omnisci_data_type_byte_size_;
247  std::remove_if(
248  omnisci_data_values, omnisci_data_values + num_elements, [&](const V& value) {
249  const V* start = omnisci_data_values;
250  auto index = std::distance(start, &value);
251  return invalid_indices.find(index) != invalid_indices.end();
252  });
253  size_t num_bytes_erased = invalid_indices.size() * omnisci_data_type_byte_size_;
254  CHECK(num_bytes_erased <= buffer_->size());
255  buffer_->setSize(buffer_->size() - num_bytes_erased);
256  }
257 
264  void appendData(const int16_t* def_levels,
265  const int16_t* rep_levels,
266  const int64_t values_read,
267  const int64_t levels_read,
268  int8_t* values) override {
269  if (std::is_same<V, T>::value && values_read == levels_read) {
271  for (int64_t i = 0; i < levels_read; ++i) {
273  values + i * omnisci_data_type_byte_size_);
274  }
275  }
276  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
277  } else {
279  def_levels, rep_levels, values_read, levels_read, values);
280  }
281  }
282 
283  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
284  int8_t* omnisci_data_bytes,
285  const size_t num_elements) override {
286  auto parquet_data_ptr = reinterpret_cast<const T*>(parquet_data_bytes);
287  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
288  for (size_t i = 0; i < num_elements; ++i) {
289  encodeAndCopy(reinterpret_cast<const int8_t*>(&parquet_data_ptr[i]),
290  reinterpret_cast<int8_t*>(&omnisci_data_ptr[i]));
291  }
292  }
293 
294  void setNull(int8_t* omnisci_data_bytes) override {
295  auto& omnisci_data_value = reinterpret_cast<V*>(omnisci_data_bytes)[0];
296  omnisci_data_value = get_null_value<NullType>();
297  }
298 
299  void copy(const int8_t* omnisci_data_bytes_source,
300  int8_t* omnisci_data_bytes_destination) override {
301  const auto& omnisci_data_value_source =
302  reinterpret_cast<const V*>(omnisci_data_bytes_source)[0];
303  auto& omnisci_data_value_destination =
304  reinterpret_cast<V*>(omnisci_data_bytes_destination)[0];
305  omnisci_data_value_destination = omnisci_data_value_source;
306  }
307 
308  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
309  const parquet::RowGroupMetaData* group_metadata,
310  const int parquet_column_index,
311  const SQLTypeInfo& column_type) override {
312  auto metadata = ParquetEncoder::createMetadata(column_type);
313  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
314 
315  // update statistics
316  auto parquet_column_descriptor =
317  group_metadata->schema()->Column(parquet_column_index);
318 
319  if (ParquetEncoder::validate_metadata_stats_ && group_metadata->num_rows() > 0) {
320  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
321  if (stats->HasMinMax()) {
322  // validate statistics if validation applicable as part of encoding
323  if (auto parquet_scalar_validator =
324  dynamic_cast<ParquetMetadataValidator*>(this)) {
325  try {
326  parquet_scalar_validator->validate(
327  stats,
328  column_type.is_array() ? column_type.get_elem_type() : column_type);
329  } catch (const std::exception& e) {
330  std::stringstream error_message;
331  error_message
332  << e.what() << " Error validating statistics of Parquet column '"
333  << group_metadata->schema()->Column(parquet_column_index)->name() << "'";
334  throw std::runtime_error(error_message.str());
335  }
336  }
337 
338  auto [stats_min, stats_max] = getEncodedStats(parquet_column_descriptor, stats);
339  auto updated_chunk_stats = getUpdatedStats(stats_min, stats_max, column_type);
340  metadata->fillChunkStats(updated_chunk_stats.min,
341  updated_chunk_stats.max,
342  metadata->chunkStats.has_nulls);
343  }
344  auto null_count = stats->null_count();
345  validateNullCount(group_metadata->schema()->Column(parquet_column_index)->name(),
346  null_count,
347  column_type);
348  metadata->chunkStats.has_nulls = null_count > 0;
349  }
350 
351  // update sizing
352  metadata->numBytes =
353  sizeof(NullType) // use NullType byte size since it is guaranteed to
354  // be the byte size of stored data
355  * column_metadata->num_values();
356  metadata->numElements = group_metadata->num_rows();
357 
358  return metadata;
359  }
360 
361  protected:
362  virtual bool encodingIsIdentityForSameTypes() const { return false; }
363 
364  std::pair<T, T> getUnencodedStats(std::shared_ptr<parquet::Statistics> stats) const {
365  T stats_min = reinterpret_cast<T*>(stats->EncodeMin().data())[0];
366  T stats_max = reinterpret_cast<T*>(stats->EncodeMax().data())[0];
367  return {stats_min, stats_max};
368  }
369 
370  private:
371  static ChunkStats getUpdatedStats(V& stats_min,
372  V& stats_max,
373  const SQLTypeInfo& column_type) {
374  ForeignStorageBuffer buffer;
375  buffer.initEncoder(column_type);
376  auto encoder = buffer.getEncoder();
377 
378  if (column_type.is_array()) {
379  ArrayDatum min_datum(
380  sizeof(V), reinterpret_cast<int8_t*>(&stats_min), false, DoNothingDeleter());
381  ArrayDatum max_datum(
382  sizeof(V), reinterpret_cast<int8_t*>(&stats_max), false, DoNothingDeleter());
383  std::vector<ArrayDatum> min_max_datums{min_datum, max_datum};
384  encoder->updateStats(&min_max_datums, 0, 1);
385  } else {
386  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_min), 1);
387  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_max), 1);
388  }
389  auto updated_chunk_stats_metadata = std::make_shared<ChunkMetadata>();
390  encoder->getMetadata(updated_chunk_stats_metadata);
391  return updated_chunk_stats_metadata->chunkStats;
392  }
393 
394  std::pair<V, V> getEncodedStats(
395  const parquet::ColumnDescriptor* parquet_column_descriptor,
396  std::shared_ptr<parquet::Statistics> stats) {
397  V stats_min, stats_max;
398  auto min_string = stats->EncodeMin();
399  auto max_string = stats->EncodeMax();
400  if constexpr (std::is_same<T, parquet::FixedLenByteArray>::value) {
401  CHECK_EQ(parquet_column_descriptor->physical_type(),
402  parquet::Type::FIXED_LEN_BYTE_ARRAY);
403  parquet::FixedLenByteArray min_byte_array, max_byte_array;
404  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
405  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
406  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
407  reinterpret_cast<int8_t*>(&stats_min));
408  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
409  reinterpret_cast<int8_t*>(&stats_max));
410  } else if constexpr (std::is_same<T, parquet::ByteArray>::value) {
411  CHECK_EQ(parquet_column_descriptor->physical_type(), parquet::Type::BYTE_ARRAY);
412  parquet::ByteArray min_byte_array, max_byte_array;
413  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
414  min_byte_array.len = min_string.length();
415  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
416  max_byte_array.len = max_string.length();
417  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
418  reinterpret_cast<int8_t*>(&stats_min));
419  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
420  reinterpret_cast<int8_t*>(&stats_max));
421  } else {
422  encodeAndCopy(reinterpret_cast<int8_t*>(min_string.data()),
423  reinterpret_cast<int8_t*>(&stats_min));
424  encodeAndCopy(reinterpret_cast<int8_t*>(max_string.data()),
425  reinterpret_cast<int8_t*>(&stats_max));
426  }
427  return {stats_min, stats_max};
428  }
429 
431 };
432 
433 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:301
ParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
std::string DatumToString(Datum d, const SQLTypeInfo &ti)
Definition: Datum.cpp:460
bool is_timestamp() const
Definition: sqltypes.h:1044
RejectedRowIndices invalid_indices_
bool isIntegralType(const SQLTypeInfo &type) const
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
void reserve(const size_t num_append_elements) override
std::string elementToString(const V &element) const
virtual int8_t * getMemoryPtr()=0
void validate(const int8_t *parquet_data, const int64_t j, const SQLTypeInfo &column_type) const override
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
void setNull(int8_t *omnisci_data_bytes) override
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
dictionary stats
Definition: report.py:116
void initEncoder(const SQLTypeInfo &tmp_sql_type)
bool is_time() const
Definition: sqltypes.h:577
std::string to_string(char const *&&v)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:229
std::string encodedDataToString(const int8_t *bytes) const override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const ColumnDescriptor *column_desciptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::set< int64_t > InvalidRowGroupIndices
static void validateNullCount(const std::string &parquet_column_name, int64_t null_count, const SQLTypeInfo &column_type)
bool is_integer() const
Definition: sqltypes.h:565
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
int64_t bigintval
Definition: Datum.h:74
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
bool is_boolean() const
Definition: sqltypes.h:580
std::pair< T, T > getUnencodedStats(std::shared_ptr< parquet::Statistics > stats) const
static std::shared_ptr< ChunkMetadata > createMetadata(const SQLTypeInfo &column_type)
virtual void reserve(const size_t num_elements)=0
void decodeNullsAndEncodeData(int8_t *data_ptr, const int16_t *def_levels, const int64_t values_read, const int64_t levels_read, const bool do_encoding)
void validateUsingEncodersColumnType(const int8_t *parquet_data, const int64_t j) const override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
std::pair< V, V > getEncodedStats(const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
static ChunkStats getUpdatedStats(V &stats_min, V &stats_max, const SQLTypeInfo &column_type)
#define CHECK(condition)
Definition: Logger.h:291
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
Definition: Datum.h:69
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
bool is_decimal() const
Definition: sqltypes.h:568
virtual void reserve(size_t num_bytes)=0
std::string integralTypeToString(const V &element) const
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
bool is_date() const
Definition: sqltypes.h:1026
bool is_array() const
Definition: sqltypes.h:583
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0