OmniSciDB  16c4e035a1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetInPlaceEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "ParquetEncoder.h"
21 #include "ParquetShared.h"
22 
23 #include <parquet/schema.h>
24 #include <parquet/types.h>
25 
27 #include "ForeignStorageBuffer.h"
28 
29 namespace foreign_storage {
30 
32  public:
34  const size_t omnisci_data_type_byte_size,
35  const size_t parquet_data_type_byte_size)
36  : ParquetScalarEncoder(buffer)
37  , omnisci_data_type_byte_size_(omnisci_data_type_byte_size)
38  , parquet_data_type_byte_size_(parquet_data_type_byte_size) {}
39 
40  virtual void reserve(const size_t num_elements) = 0;
41 
56  void appendData(const int16_t* def_levels,
57  const int16_t* rep_levels,
58  const int64_t values_read,
59  const int64_t levels_read,
60  int8_t* values) override {
62  for (int64_t i = 0; i < values_read; ++i) {
64  values + i * omnisci_data_type_byte_size_);
65  }
66  }
67 
68  if (values_read < levels_read) { // nulls exist
70  values,
71  def_levels,
72  values_read,
73  levels_read,
76  for (int64_t i = levels_read - 1; i >= 0; --i) {
78  values + i * omnisci_data_type_byte_size_);
79  }
80  }
81 
82  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
83  }
84 
85  protected:
88 
89  private:
90  void decodeNullsAndEncodeData(int8_t* data_ptr,
91  const int16_t* def_levels,
92  const int64_t values_read,
93  const int64_t levels_read,
94  const bool do_encoding) {
95  for (int64_t i = levels_read - 1, j = values_read - 1; i >= 0; --i) {
96  if (def_levels[i]) { // not null
97  CHECK(j >= 0);
98  if (do_encoding) {
100  data_ptr + i * omnisci_data_type_byte_size_);
101  } else {
102  copy(data_ptr + (j--) * omnisci_data_type_byte_size_,
103  data_ptr + i * omnisci_data_type_byte_size_);
104  }
105  } else { // null
106  setNull(data_ptr + i * omnisci_data_type_byte_size_);
107  }
108  }
109  }
110 };
111 
112 template <typename V, typename T, typename NullType = V>
114  public:
116  const ColumnDescriptor* column_desciptor,
117  const parquet::ColumnDescriptor* parquet_column_descriptor)
119  buffer,
120  sizeof(V),
121  parquet::GetTypeByteSize(parquet_column_descriptor->physical_type()))
122  , current_batch_offset_(0) {}
123 
125  const size_t omnisci_data_type_byte_size,
126  const size_t parquet_data_type_byte_size)
127  : ParquetInPlaceEncoder(buffer, sizeof(V), parquet_data_type_byte_size)
128  , current_batch_offset_(0) {}
129 
130  void validate(const int8_t* parquet_data,
131  const int64_t j,
132  const SQLTypeInfo& column_type) const override {
133  // no-op by default
134  }
135 
136  void reserve(const size_t num_elements) override {
137  buffer_->reserve(num_elements * sizeof(V));
138  }
139 
140  void validateAndAppendData(const int16_t* def_levels,
141  const int16_t* rep_levels,
142  const int64_t values_read,
143  const int64_t levels_read,
144  int8_t* values,
145  const SQLTypeInfo& column_type, /* may not be used */
146  InvalidRowGroupIndices& invalid_indices) override {
147  int64_t i, j;
148  for (i = 0, j = 0; i < levels_read; ++i) {
149  if (def_levels[i]) {
150  try {
151  CHECK(j < values_read);
152  validate(values, j++, column_type);
153  } catch (const std::runtime_error& error) {
154  invalid_indices.insert(current_batch_offset_ + i);
155  }
156  }
157  }
158  current_batch_offset_ += levels_read;
159  appendData(def_levels, rep_levels, values_read, levels_read, values);
160  }
161 
163  const InvalidRowGroupIndices& invalid_indices) override {
164  if (invalid_indices.empty()) {
165  return;
166  }
167  auto omnisci_data_values = reinterpret_cast<V*>(buffer_->getMemoryPtr());
169  size_t num_elements = buffer_->size() / omnisci_data_type_byte_size_;
170  std::remove_if(
171  omnisci_data_values, omnisci_data_values + num_elements, [&](const V& value) {
172  const V* start = omnisci_data_values;
173  auto index = std::distance(start, &value);
174  return invalid_indices.find(index) != invalid_indices.end();
175  });
176  size_t num_bytes_erased = invalid_indices.size() * omnisci_data_type_byte_size_;
177  CHECK(num_bytes_erased <= buffer_->size());
178  buffer_->setSize(buffer_->size() - num_bytes_erased);
179  }
180 
187  void appendData(const int16_t* def_levels,
188  const int16_t* rep_levels,
189  const int64_t values_read,
190  const int64_t levels_read,
191  int8_t* values) override {
192  if (std::is_same<V, T>::value && values_read == levels_read) {
194  for (int64_t i = 0; i < levels_read; ++i) {
196  values + i * omnisci_data_type_byte_size_);
197  }
198  }
199  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
200  } else {
202  def_levels, rep_levels, values_read, levels_read, values);
203  }
204  }
205 
206  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
207  int8_t* omnisci_data_bytes,
208  const size_t num_elements) override {
209  auto parquet_data_ptr = reinterpret_cast<const T*>(parquet_data_bytes);
210  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
211  for (size_t i = 0; i < num_elements; ++i) {
212  encodeAndCopy(reinterpret_cast<const int8_t*>(&parquet_data_ptr[i]),
213  reinterpret_cast<int8_t*>(&omnisci_data_ptr[i]));
214  }
215  }
216 
217  void setNull(int8_t* omnisci_data_bytes) override {
218  auto& omnisci_data_value = reinterpret_cast<V*>(omnisci_data_bytes)[0];
219  omnisci_data_value = get_null_value<NullType>();
220  }
221 
222  void copy(const int8_t* omnisci_data_bytes_source,
223  int8_t* omnisci_data_bytes_destination) override {
224  const auto& omnisci_data_value_source =
225  reinterpret_cast<const V*>(omnisci_data_bytes_source)[0];
226  auto& omnisci_data_value_destination =
227  reinterpret_cast<V*>(omnisci_data_bytes_destination)[0];
228  omnisci_data_value_destination = omnisci_data_value_source;
229  }
230 
231  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
232  const parquet::RowGroupMetaData* group_metadata,
233  const int parquet_column_index,
234  const SQLTypeInfo& column_type) override {
235  auto metadata = ParquetEncoder::createMetadata(column_type);
236  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
237 
238  // update statistics
239  auto parquet_column_descriptor =
240  group_metadata->schema()->Column(parquet_column_index);
241  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
242  if (stats->HasMinMax()) {
243  // validate statistics if validation applicable as part of encoding
244  if (auto parquet_scalar_validator = dynamic_cast<ParquetMetadataValidator*>(this)) {
245  try {
246  parquet_scalar_validator->validate(
247  stats, column_type.is_array() ? column_type.get_elem_type() : column_type);
248  } catch (const std::exception& e) {
249  std::stringstream error_message;
250  error_message << e.what() << " Error validating statistics of Parquet column '"
251  << group_metadata->schema()->Column(parquet_column_index)->name()
252  << "'";
253  throw std::runtime_error(error_message.str());
254  }
255  }
256 
257  auto [stats_min, stats_max] = getEncodedStats(parquet_column_descriptor, stats);
258  auto updated_chunk_stats = getUpdatedStats(stats_min, stats_max, column_type);
259  metadata->fillChunkStats(updated_chunk_stats.min,
260  updated_chunk_stats.max,
261  metadata->chunkStats.has_nulls);
262  }
263  auto null_count = stats->null_count();
264  validateNullCount(group_metadata->schema()->Column(parquet_column_index)->name(),
265  null_count,
266  column_type);
267  metadata->chunkStats.has_nulls = null_count > 0;
268 
269  // update sizing
270  metadata->numBytes =
271  sizeof(NullType) // use NullType byte size since it is guaranteed to
272  // be the byte size of stored data
273  * column_metadata->num_values();
274  metadata->numElements = group_metadata->num_rows();
275 
276  return metadata;
277  }
278 
279  protected:
280  virtual bool encodingIsIdentityForSameTypes() const { return false; }
281 
282  std::pair<T, T> getUnencodedStats(std::shared_ptr<parquet::Statistics> stats) const {
283  T stats_min = reinterpret_cast<T*>(stats->EncodeMin().data())[0];
284  T stats_max = reinterpret_cast<T*>(stats->EncodeMax().data())[0];
285  return {stats_min, stats_max};
286  }
287 
288  private:
289  static ChunkStats getUpdatedStats(V& stats_min,
290  V& stats_max,
291  const SQLTypeInfo& column_type) {
292  ForeignStorageBuffer buffer;
293  buffer.initEncoder(column_type);
294  auto encoder = buffer.getEncoder();
295 
296  if (column_type.is_array()) {
297  ArrayDatum min_datum(
298  sizeof(V), reinterpret_cast<int8_t*>(&stats_min), false, DoNothingDeleter());
299  ArrayDatum max_datum(
300  sizeof(V), reinterpret_cast<int8_t*>(&stats_max), false, DoNothingDeleter());
301  std::vector<ArrayDatum> min_max_datums{min_datum, max_datum};
302  encoder->updateStats(&min_max_datums, 0, 1);
303  } else {
304  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_min), 1);
305  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_max), 1);
306  }
307  auto updated_chunk_stats_metadata = std::make_shared<ChunkMetadata>();
308  encoder->getMetadata(updated_chunk_stats_metadata);
309  return updated_chunk_stats_metadata->chunkStats;
310  }
311 
312  std::pair<V, V> getEncodedStats(
313  const parquet::ColumnDescriptor* parquet_column_descriptor,
314  std::shared_ptr<parquet::Statistics> stats) {
315  V stats_min, stats_max;
316  auto min_string = stats->EncodeMin();
317  auto max_string = stats->EncodeMax();
318  if constexpr (std::is_same<T, parquet::FixedLenByteArray>::value) {
319  CHECK_EQ(parquet_column_descriptor->physical_type(),
320  parquet::Type::FIXED_LEN_BYTE_ARRAY);
321  parquet::FixedLenByteArray min_byte_array, max_byte_array;
322  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
323  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
324  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
325  reinterpret_cast<int8_t*>(&stats_min));
326  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
327  reinterpret_cast<int8_t*>(&stats_max));
328  } else if constexpr (std::is_same<T, parquet::ByteArray>::value) {
329  CHECK_EQ(parquet_column_descriptor->physical_type(), parquet::Type::BYTE_ARRAY);
330  parquet::ByteArray min_byte_array, max_byte_array;
331  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
332  min_byte_array.len = min_string.length();
333  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
334  max_byte_array.len = max_string.length();
335  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
336  reinterpret_cast<int8_t*>(&stats_min));
337  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
338  reinterpret_cast<int8_t*>(&stats_max));
339  } else {
340  encodeAndCopy(reinterpret_cast<int8_t*>(min_string.data()),
341  reinterpret_cast<int8_t*>(&stats_min));
342  encodeAndCopy(reinterpret_cast<int8_t*>(max_string.data()),
343  reinterpret_cast<int8_t*>(&stats_max));
344  }
345  return {stats_min, stats_max};
346  }
347 
349 };
350 
351 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:219
ParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
virtual int8_t * getMemoryPtr()=0
void validate(const int8_t *parquet_data, const int64_t j, const SQLTypeInfo &column_type) const override
void setNull(int8_t *omnisci_data_bytes) override
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:208
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const ColumnDescriptor *column_desciptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::set< int64_t > InvalidRowGroupIndices
static void validateNullCount(const std::string &parquet_column_name, int64_t null_count, const SQLTypeInfo &column_type)
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void reserve(const size_t num_elements) override
std::pair< T, T > getUnencodedStats(std::shared_ptr< parquet::Statistics > stats) const
static std::shared_ptr< ChunkMetadata > createMetadata(const SQLTypeInfo &column_type)
virtual void reserve(const size_t num_elements)=0
void decodeNullsAndEncodeData(int8_t *data_ptr, const int16_t *def_levels, const int64_t values_read, const int64_t levels_read, const bool do_encoding)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
std::pair< V, V > getEncodedStats(const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
static ChunkStats getUpdatedStats(V &stats_min, V &stats_max, const SQLTypeInfo &column_type)
#define CHECK(condition)
Definition: Logger.h:211
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:861
virtual void reserve(size_t num_bytes)=0
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
bool is_array() const
Definition: sqltypes.h:527
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0