OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetInPlaceEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "ParquetEncoder.h"
21 #include "ParquetShared.h"
22 
23 #include <parquet/schema.h>
24 #include <parquet/types.h>
25 
27 #include "ForeignStorageBuffer.h"
28 
29 namespace foreign_storage {
30 
32  public:
34  const size_t omnisci_data_type_byte_size,
35  const size_t parquet_data_type_byte_size)
36  : ParquetScalarEncoder(buffer)
37  , omnisci_data_type_byte_size_(omnisci_data_type_byte_size)
38  , parquet_data_type_byte_size_(parquet_data_type_byte_size) {}
39 
54  void appendData(const int16_t* def_levels,
55  const int16_t* rep_levels,
56  const int64_t values_read,
57  const int64_t levels_read,
58  int8_t* values) override {
60  for (int64_t i = 0; i < values_read; ++i) {
62  values + i * omnisci_data_type_byte_size_);
63  }
64  }
65 
66  if (values_read < levels_read) { // nulls exist
68  values,
69  def_levels,
70  values_read,
71  levels_read,
74  for (int64_t i = levels_read - 1; i >= 0; --i) {
76  values + i * omnisci_data_type_byte_size_);
77  }
78  }
79 
80  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
81  }
82 
83  protected:
86 
87  private:
88  void decodeNullsAndEncodeData(int8_t* data_ptr,
89  const int16_t* def_levels,
90  const int64_t values_read,
91  const int64_t levels_read,
92  const bool do_encoding) {
93  for (int64_t i = levels_read - 1, j = values_read - 1; i >= 0; --i) {
94  if (def_levels[i]) { // not null
95  CHECK(j >= 0);
96  if (do_encoding) {
98  data_ptr + i * omnisci_data_type_byte_size_);
99  } else {
100  copy(data_ptr + (j--) * omnisci_data_type_byte_size_,
101  data_ptr + i * omnisci_data_type_byte_size_);
102  }
103  } else { // null
104  setNull(data_ptr + i * omnisci_data_type_byte_size_);
105  }
106  }
107  }
108 };
109 
110 template <typename V, typename T, typename NullType = V>
112  public:
114  const ColumnDescriptor* column_desciptor,
115  const parquet::ColumnDescriptor* parquet_column_descriptor)
117  buffer,
118  sizeof(V),
119  parquet::GetTypeByteSize(parquet_column_descriptor->physical_type()))
120  , current_batch_offset_(0) {}
121 
123  const size_t omnisci_data_type_byte_size,
124  const size_t parquet_data_type_byte_size)
125  : ParquetInPlaceEncoder(buffer, sizeof(V), parquet_data_type_byte_size)
126  , current_batch_offset_(0) {}
127 
128  void validate(const int8_t* parquet_data,
129  const int64_t j,
130  const SQLTypeInfo& column_type) const override {
131  // no-op by default
132  }
133 
134  void validateAndAppendData(const int16_t* def_levels,
135  const int16_t* rep_levels,
136  const int64_t values_read,
137  const int64_t levels_read,
138  int8_t* values,
139  const SQLTypeInfo& column_type, /* may not be used */
140  InvalidRowGroupIndices& invalid_indices) override {
141  int64_t i, j;
142  for (i = 0, j = 0; i < levels_read; ++i) {
143  if (def_levels[i]) {
144  try {
145  CHECK(j < values_read);
146  validate(values, j++, column_type);
147  } catch (const std::runtime_error& error) {
148  invalid_indices.insert(current_batch_offset_ + i);
149  }
150  }
151  }
152  current_batch_offset_ += levels_read;
153  appendData(def_levels, rep_levels, values_read, levels_read, values);
154  }
155 
157  const InvalidRowGroupIndices& invalid_indices) override {
158  if (invalid_indices.empty()) {
159  return;
160  }
161  auto omnisci_data_values = reinterpret_cast<V*>(buffer_->getMemoryPtr());
163  size_t num_elements = buffer_->size() / omnisci_data_type_byte_size_;
164  std::remove_if(
165  omnisci_data_values, omnisci_data_values + num_elements, [&](const V& value) {
166  const V* start = omnisci_data_values;
167  auto index = std::distance(start, &value);
168  return invalid_indices.find(index) != invalid_indices.end();
169  });
170  size_t num_bytes_erased = invalid_indices.size() * omnisci_data_type_byte_size_;
171  CHECK(num_bytes_erased <= buffer_->size());
172  buffer_->setSize(buffer_->size() - num_bytes_erased);
173  }
174 
181  void appendData(const int16_t* def_levels,
182  const int16_t* rep_levels,
183  const int64_t values_read,
184  const int64_t levels_read,
185  int8_t* values) override {
186  if (std::is_same<V, T>::value && values_read == levels_read) {
188  for (int64_t i = 0; i < levels_read; ++i) {
190  values + i * omnisci_data_type_byte_size_);
191  }
192  }
193  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
194  } else {
196  def_levels, rep_levels, values_read, levels_read, values);
197  }
198  }
199 
200  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
201  int8_t* omnisci_data_bytes,
202  const size_t num_elements) override {
203  auto parquet_data_ptr = reinterpret_cast<const T*>(parquet_data_bytes);
204  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
205  for (size_t i = 0; i < num_elements; ++i) {
206  encodeAndCopy(reinterpret_cast<const int8_t*>(&parquet_data_ptr[i]),
207  reinterpret_cast<int8_t*>(&omnisci_data_ptr[i]));
208  }
209  }
210 
211  void setNull(int8_t* omnisci_data_bytes) override {
212  auto& omnisci_data_value = reinterpret_cast<V*>(omnisci_data_bytes)[0];
213  omnisci_data_value = get_null_value<NullType>();
214  }
215 
216  void copy(const int8_t* omnisci_data_bytes_source,
217  int8_t* omnisci_data_bytes_destination) override {
218  const auto& omnisci_data_value_source =
219  reinterpret_cast<const V*>(omnisci_data_bytes_source)[0];
220  auto& omnisci_data_value_destination =
221  reinterpret_cast<V*>(omnisci_data_bytes_destination)[0];
222  omnisci_data_value_destination = omnisci_data_value_source;
223  }
224 
225  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
226  const parquet::RowGroupMetaData* group_metadata,
227  const int parquet_column_index,
228  const SQLTypeInfo& column_type) override {
229  auto metadata = ParquetEncoder::createMetadata(column_type);
230  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
231 
232  // update statistics
233  auto parquet_column_descriptor =
234  group_metadata->schema()->Column(parquet_column_index);
235  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
236  if (stats->HasMinMax()) {
237  // validate statistics if validation applicable as part of encoding
238  if (auto parquet_scalar_validator = dynamic_cast<ParquetMetadataValidator*>(this)) {
239  try {
240  parquet_scalar_validator->validate(
241  stats, column_type.is_array() ? column_type.get_elem_type() : column_type);
242  } catch (const std::exception& e) {
243  std::stringstream error_message;
244  error_message << e.what() << " Error validating statistics of Parquet column '"
245  << group_metadata->schema()->Column(parquet_column_index)->name()
246  << "'";
247  throw std::runtime_error(error_message.str());
248  }
249  }
250 
251  auto [stats_min, stats_max] = getEncodedStats(parquet_column_descriptor, stats);
252  auto updated_chunk_stats = getUpdatedStats(stats_min, stats_max, column_type);
253  metadata->fillChunkStats(updated_chunk_stats.min,
254  updated_chunk_stats.max,
255  metadata->chunkStats.has_nulls);
256  }
257  auto null_count = stats->null_count();
258  validateNullCount(group_metadata->schema()->Column(parquet_column_index)->name(),
259  null_count,
260  column_type);
261  metadata->chunkStats.has_nulls = null_count > 0;
262 
263  // update sizing
264  metadata->numBytes =
265  sizeof(NullType) // use NullType byte size since it is guaranteed to
266  // be the byte size of stored data
267  * column_metadata->num_values();
268  metadata->numElements = group_metadata->num_rows();
269 
270  return metadata;
271  }
272 
273  protected:
274  virtual bool encodingIsIdentityForSameTypes() const { return false; }
275 
276  std::pair<T, T> getUnencodedStats(std::shared_ptr<parquet::Statistics> stats) const {
277  T stats_min = reinterpret_cast<T*>(stats->EncodeMin().data())[0];
278  T stats_max = reinterpret_cast<T*>(stats->EncodeMax().data())[0];
279  return {stats_min, stats_max};
280  }
281 
282  private:
283  static ChunkStats getUpdatedStats(V& stats_min,
284  V& stats_max,
285  const SQLTypeInfo& column_type) {
286  ForeignStorageBuffer buffer;
287  buffer.initEncoder(column_type);
288  auto encoder = buffer.getEncoder();
289 
290  if (column_type.is_array()) {
291  ArrayDatum min_datum(
292  sizeof(V), reinterpret_cast<int8_t*>(&stats_min), false, DoNothingDeleter());
293  ArrayDatum max_datum(
294  sizeof(V), reinterpret_cast<int8_t*>(&stats_max), false, DoNothingDeleter());
295  std::vector<ArrayDatum> min_max_datums{min_datum, max_datum};
296  encoder->updateStats(&min_max_datums, 0, 1);
297  } else {
298  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_min), 1);
299  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_max), 1);
300  }
301  auto updated_chunk_stats_metadata = std::make_shared<ChunkMetadata>();
302  encoder->getMetadata(updated_chunk_stats_metadata);
303  return updated_chunk_stats_metadata->chunkStats;
304  }
305 
306  std::pair<V, V> getEncodedStats(
307  const parquet::ColumnDescriptor* parquet_column_descriptor,
308  std::shared_ptr<parquet::Statistics> stats) {
309  V stats_min, stats_max;
310  auto min_string = stats->EncodeMin();
311  auto max_string = stats->EncodeMax();
312  if (parquet_column_descriptor->physical_type() ==
313  parquet::Type::FIXED_LEN_BYTE_ARRAY) {
314  parquet::FixedLenByteArray min_byte_array, max_byte_array;
315  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
316  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
317  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
318  reinterpret_cast<int8_t*>(&stats_min));
319  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
320  reinterpret_cast<int8_t*>(&stats_max));
321  } else if (parquet_column_descriptor->physical_type() == parquet::Type::BYTE_ARRAY) {
322  parquet::ByteArray min_byte_array, max_byte_array;
323  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
324  min_byte_array.len = min_string.length();
325  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
326  max_byte_array.len = max_string.length();
327  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
328  reinterpret_cast<int8_t*>(&stats_min));
329  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
330  reinterpret_cast<int8_t*>(&stats_max));
331  } else {
332  encodeAndCopy(reinterpret_cast<int8_t*>(min_string.data()),
333  reinterpret_cast<int8_t*>(&stats_min));
334  encodeAndCopy(reinterpret_cast<int8_t*>(max_string.data()),
335  reinterpret_cast<int8_t*>(&stats_max));
336  }
337  return {stats_min, stats_max};
338  }
339 
341 };
342 
343 } // namespace foreign_storage
ParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
virtual int8_t * getMemoryPtr()=0
void validate(const int8_t *parquet_data, const int64_t j, const SQLTypeInfo &column_type) const override
void setNull(int8_t *omnisci_data_bytes) override
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const ColumnDescriptor *column_desciptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::set< int64_t > InvalidRowGroupIndices
static void validateNullCount(const std::string &parquet_column_name, int64_t null_count, const SQLTypeInfo &column_type)
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
std::pair< T, T > getUnencodedStats(std::shared_ptr< parquet::Statistics > stats) const
static std::shared_ptr< ChunkMetadata > createMetadata(const SQLTypeInfo &column_type)
void decodeNullsAndEncodeData(int8_t *data_ptr, const int16_t *def_levels, const int64_t values_read, const int64_t levels_read, const bool do_encoding)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
std::pair< V, V > getEncodedStats(const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
static ChunkStats getUpdatedStats(V &stats_min, V &stats_max, const SQLTypeInfo &column_type)
#define CHECK(condition)
Definition: Logger.h:209
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:850
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
bool is_array() const
Definition: sqltypes.h:517
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0