OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetInPlaceEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "ParquetEncoder.h"
21 #include "ParquetShared.h"
22 
23 #include <parquet/schema.h>
24 #include <parquet/types.h>
25 
27 #include "ForeignStorageBuffer.h"
28 
29 namespace foreign_storage {
30 
32  public:
34  const size_t omnisci_data_type_byte_size,
35  const size_t parquet_data_type_byte_size)
36  : ParquetScalarEncoder(buffer)
37  , omnisci_data_type_byte_size_(omnisci_data_type_byte_size)
38  , parquet_data_type_byte_size_(parquet_data_type_byte_size) {}
39 
55  void appendData(const int16_t* def_levels,
56  const int16_t* rep_levels,
57  const int64_t values_read,
58  const int64_t levels_read,
59  const bool is_last_batch,
60  int8_t* values) override {
62  for (int64_t i = 0; i < values_read; ++i) {
64  values + i * omnisci_data_type_byte_size_);
65  }
66  }
67 
68  if (values_read < levels_read) { // nulls exist
70  values,
71  def_levels,
72  values_read,
73  levels_read,
76  for (int64_t i = levels_read - 1; i >= 0; --i) {
78  values + i * omnisci_data_type_byte_size_);
79  }
80  }
81 
82  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
83  }
84 
85  protected:
87 
88  private:
89  void decodeNullsAndEncodeData(int8_t* data_ptr,
90  const int16_t* def_levels,
91  const int64_t values_read,
92  const int64_t levels_read,
93  const bool do_encoding) {
94  for (int64_t i = levels_read - 1, j = values_read - 1; i >= 0; --i) {
95  if (def_levels[i]) { // not null
96  CHECK(j >= 0);
97  if (do_encoding) {
99  data_ptr + i * omnisci_data_type_byte_size_);
100  } else {
101  copy(data_ptr + (j--) * omnisci_data_type_byte_size_,
102  data_ptr + i * omnisci_data_type_byte_size_);
103  }
104  } else { // null
105  setNull(data_ptr + i * omnisci_data_type_byte_size_);
106  }
107  }
108  }
109 
111 };
112 
113 template <typename V, typename T>
115  public:
117  const ColumnDescriptor* column_desciptor,
118  const parquet::ColumnDescriptor* parquet_column_descriptor)
120  buffer,
121  column_desciptor->columnType.get_size(),
122  parquet::GetTypeByteSize(parquet_column_descriptor->physical_type())) {}
123 
125  const size_t omnisci_data_type_byte_size,
126  const size_t parquet_data_type_byte_size)
127  : ParquetInPlaceEncoder(buffer,
128  omnisci_data_type_byte_size,
129  parquet_data_type_byte_size) {}
130 
137  void appendData(const int16_t* def_levels,
138  const int16_t* rep_levels,
139  const int64_t values_read,
140  const int64_t levels_read,
141  const bool is_last_batch,
142  int8_t* values) override {
143  if (std::is_same<V, T>::value && values_read == levels_read) {
145  for (int64_t i = 0; i < levels_read; ++i) {
147  values + i * omnisci_data_type_byte_size_);
148  }
149  }
150  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
151  } else {
153  def_levels, rep_levels, values_read, levels_read, is_last_batch, values);
154  }
155  }
156 
157  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
158  int8_t* omnisci_data_bytes,
159  const size_t num_elements) override {
160  auto parquet_data_ptr = reinterpret_cast<const T*>(parquet_data_bytes);
161  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
162  for (size_t i = 0; i < num_elements; ++i) {
163  encodeAndCopy(reinterpret_cast<const int8_t*>(&parquet_data_ptr[i]),
164  reinterpret_cast<int8_t*>(&omnisci_data_ptr[i]));
165  }
166  }
167 
168  void setNull(int8_t* omnisci_data_bytes) override {
169  auto& omnisci_data_value = reinterpret_cast<V*>(omnisci_data_bytes)[0];
170  omnisci_data_value = get_null_value<V>();
171  }
172 
173  void copy(const int8_t* omnisci_data_bytes_source,
174  int8_t* omnisci_data_bytes_destination) override {
175  const auto& omnisci_data_value_source =
176  reinterpret_cast<const V*>(omnisci_data_bytes_source)[0];
177  auto& omnisci_data_value_destination =
178  reinterpret_cast<V*>(omnisci_data_bytes_destination)[0];
179  omnisci_data_value_destination = omnisci_data_value_source;
180  }
181 
182  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
183  const parquet::RowGroupMetaData* group_metadata,
184  const int parquet_column_index,
185  const SQLTypeInfo& column_type) override {
186  auto metadata = ParquetEncoder::createMetadata(column_type);
187  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
188 
189  // update statistics
190  auto parquet_column_descriptor =
191  group_metadata->schema()->Column(parquet_column_index);
192  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
193  if (stats->HasMinMax()) {
194  // validate statistics if validation applicable as part of encoding
195  if (auto parquet_scalar_validator = dynamic_cast<ParquetMetadataValidator*>(this)) {
196  try {
197  parquet_scalar_validator->validate(
198  stats, column_type.is_array() ? column_type.get_elem_type() : column_type);
199  } catch (const std::exception& e) {
200  std::stringstream error_message;
201  error_message << e.what() << " Error validating statistics of Parquet column '"
202  << group_metadata->schema()->Column(parquet_column_index)->name()
203  << "'";
204  throw std::runtime_error(error_message.str());
205  }
206  }
207 
208  auto [stats_min, stats_max] = getEncodedStats(parquet_column_descriptor, stats);
209  auto updated_chunk_stats = getUpdatedStats(stats_min, stats_max, column_type);
210  metadata->fillChunkStats(updated_chunk_stats.min,
211  updated_chunk_stats.max,
212  metadata->chunkStats.has_nulls);
213  }
214  auto null_count = stats->null_count();
215  validateNullCount(group_metadata->schema()->Column(parquet_column_index)->name(),
216  null_count,
217  column_type);
218  metadata->chunkStats.has_nulls = null_count > 0;
219 
220  // update sizing
221  metadata->numBytes = omnisci_data_type_byte_size_ * column_metadata->num_values();
222  metadata->numElements = group_metadata->num_rows();
223 
224  return metadata;
225  }
226 
227  protected:
228  virtual bool encodingIsIdentityForSameTypes() const { return false; }
229 
230  std::pair<T, T> getUnencodedStats(std::shared_ptr<parquet::Statistics> stats) const {
231  T stats_min = reinterpret_cast<T*>(stats->EncodeMin().data())[0];
232  T stats_max = reinterpret_cast<T*>(stats->EncodeMax().data())[0];
233  return {stats_min, stats_max};
234  }
235 
236  private:
237  static ChunkStats getUpdatedStats(V& stats_min,
238  V& stats_max,
239  const SQLTypeInfo& column_type) {
240  ForeignStorageBuffer buffer;
241  buffer.initEncoder(column_type);
242  auto encoder = buffer.getEncoder();
243 
244  if (column_type.is_array()) {
245  ArrayDatum min_datum(
246  sizeof(V), reinterpret_cast<int8_t*>(&stats_min), false, DoNothingDeleter());
247  ArrayDatum max_datum(
248  sizeof(V), reinterpret_cast<int8_t*>(&stats_max), false, DoNothingDeleter());
249  std::vector<ArrayDatum> min_max_datums{min_datum, max_datum};
250  encoder->updateStats(&min_max_datums, 0, 1);
251  } else {
252  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_min), 1);
253  encoder->updateStats(reinterpret_cast<int8_t*>(&stats_max), 1);
254  }
255  auto updated_chunk_stats_metadata = std::make_shared<ChunkMetadata>();
256  encoder->getMetadata(updated_chunk_stats_metadata);
257  return updated_chunk_stats_metadata->chunkStats;
258  }
259 
260  std::pair<V, V> getEncodedStats(
261  const parquet::ColumnDescriptor* parquet_column_descriptor,
262  std::shared_ptr<parquet::Statistics> stats) {
263  V stats_min, stats_max;
264  auto min_string = stats->EncodeMin();
265  auto max_string = stats->EncodeMax();
266  if (parquet_column_descriptor->physical_type() ==
267  parquet::Type::FIXED_LEN_BYTE_ARRAY) {
268  parquet::FixedLenByteArray min_byte_array, max_byte_array;
269  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
270  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
271  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
272  reinterpret_cast<int8_t*>(&stats_min));
273  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
274  reinterpret_cast<int8_t*>(&stats_max));
275  } else if (parquet_column_descriptor->physical_type() == parquet::Type::BYTE_ARRAY) {
276  parquet::ByteArray min_byte_array, max_byte_array;
277  min_byte_array.ptr = reinterpret_cast<const uint8_t*>(min_string.data());
278  min_byte_array.len = min_string.length();
279  max_byte_array.ptr = reinterpret_cast<const uint8_t*>(max_string.data());
280  max_byte_array.len = max_string.length();
281  encodeAndCopy(reinterpret_cast<int8_t*>(&min_byte_array),
282  reinterpret_cast<int8_t*>(&stats_min));
283  encodeAndCopy(reinterpret_cast<int8_t*>(&max_byte_array),
284  reinterpret_cast<int8_t*>(&stats_max));
285  } else {
286  encodeAndCopy(reinterpret_cast<int8_t*>(min_string.data()),
287  reinterpret_cast<int8_t*>(&stats_min));
288  encodeAndCopy(reinterpret_cast<int8_t*>(max_string.data()),
289  reinterpret_cast<int8_t*>(&stats_max));
290  }
291  return {stats_min, stats_max};
292  }
293 };
294 
295 } // namespace foreign_storage
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
ParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
static ChunkStats getUpdatedStats(V &stats_min, V &stats_max, const SQLTypeInfo &column_type)
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
std::pair< V, V > getEncodedStats(const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
void initEncoder(const SQLTypeInfo &tmp_sql_type)
void setNull(int8_t *omnisci_data_bytes) override
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:202
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
static void validateNullCount(const std::string &parquet_column_name, int64_t null_count, const SQLTypeInfo &column_type)
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const ColumnDescriptor *column_desciptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
specifies the content in-memory of a row in the column metadata table
static std::shared_ptr< ChunkMetadata > createMetadata(const SQLTypeInfo &column_type)
void decodeNullsAndEncodeData(int8_t *data_ptr, const int16_t *def_levels, const int64_t values_read, const int64_t levels_read, const bool do_encoding)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define CHECK(condition)
Definition: Logger.h:203
std::pair< T, T > getUnencodedStats(std::shared_ptr< parquet::Statistics > stats) const
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:713
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
bool is_array() const
Definition: sqltypes.h:497
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0