OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
20 #include "DataMgr/AbstractBuffer.h"
21 #include "ForeignStorageBuffer.h"
22 #include "ParquetShared.h"
23 
24 #include <parquet/metadata.h>
25 
26 namespace foreign_storage {
27 
28 using RejectedRowIndices = std::set<int64_t>;
29 
31  public:
33  : buffer_(buffer)
37  virtual ~ParquetEncoder() = default;
38 
39  virtual void appendDataTrackErrors(const int16_t* def_levels,
40  const int16_t* rep_levels,
41  const int64_t values_read,
42  const int64_t levels_read,
43  int8_t* values) = 0;
44 
45  virtual void appendData(const int16_t* def_levels,
46  const int16_t* rep_levels,
47  const int64_t values_read,
48  const int64_t levels_read,
49  int8_t* values) = 0;
50 
51  virtual std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
52  const parquet::RowGroupMetaData* group_metadata,
53  const int parquet_column_index,
54  const SQLTypeInfo& column_type) {
55  int64_t null_count{0};
56  auto metadata = createMetadata(column_type);
57 
58  if (validate_metadata_stats_ && group_metadata->num_rows() > 0) {
59  // update statistics
60  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
61  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
62  null_count = stats->null_count();
63  validateNullCount(group_metadata->schema()->Column(parquet_column_index)->name(),
64  null_count,
65  column_type);
66  }
67  metadata->chunkStats.has_nulls = null_count > 0;
68 
69  // update sizing
70  metadata->numElements = group_metadata->num_rows();
71  return metadata;
72  }
73 
75 
77 
79 
80  virtual void initializeColumnType(const SQLTypeInfo& column_type) {
81  column_type_ = column_type;
82  }
83 
84  protected:
86 
87  // Members utililzed for error tracking
92 
93  // flag to disable validation of stats
95 
96  static std::shared_ptr<ChunkMetadata> createMetadata(const SQLTypeInfo& column_type) {
97  auto metadata = std::make_shared<ChunkMetadata>();
98  ForeignStorageBuffer buffer;
99  buffer.initEncoder(column_type.is_array() ? column_type.get_elem_type()
100  : column_type);
101  auto encoder = buffer.getEncoder();
102  encoder->getMetadata(metadata);
103  metadata->sqlType = column_type;
104  return metadata;
105  }
106 
107  static void throwNotNullViolation(const std::string& parquet_column_name) {
108  std::stringstream error_message;
109  error_message << "A null value was detected in Parquet column '"
110  << parquet_column_name << "' but HeavyDB column is set to not null";
111  throw std::runtime_error(error_message.str());
112  }
113 
114  static void validateNullCount(const std::string& parquet_column_name,
115  int64_t null_count,
116  const SQLTypeInfo& column_type) {
117  bool has_nulls = null_count > 0;
118  if (has_nulls && column_type.get_notnull()) {
119  throwNotNullViolation(parquet_column_name);
120  }
121  }
122 };
123 
124 using InvalidRowGroupIndices = std::set<int64_t>;
125 
127  public:
128  virtual void eraseInvalidIndicesInBuffer(
129  const InvalidRowGroupIndices& invalid_indices) = 0;
130 
131  virtual void validateAndAppendData(const int16_t* def_levels,
132  const int16_t* rep_levels,
133  const int64_t values_read,
134  const int64_t levels_read,
135  int8_t* values,
136  const SQLTypeInfo& column_type, /* may not be used */
137  InvalidRowGroupIndices& invalid_indices) = 0;
138 };
139 
141  public:
143 
144  virtual void setNull(int8_t* omnisci_data_bytes) = 0;
145  virtual void copy(const int8_t* omnisci_data_bytes_source,
146  int8_t* omnisci_data_bytes_destination) = 0;
147  virtual void encodeAndCopy(const int8_t* parquet_data_bytes,
148  int8_t* omnisci_data_bytes) = 0;
149 
150  virtual void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
151  int8_t* omnisci_data_bytes,
152  const size_t num_elements) = 0;
153 
154  virtual void validate(const int8_t* parquet_data,
155  const int64_t j,
156  const SQLTypeInfo& column_type) const = 0;
157 
158  virtual void validateUsingEncodersColumnType(const int8_t* parquet_data,
159  const int64_t j) const = 0;
160 
161  virtual std::string encodedDataToString(const int8_t* bytes) const = 0;
162 };
163 
164 } // namespace foreign_storage
ParquetEncoder(Data_Namespace::AbstractBuffer *buffer)
virtual void initializeErrorTracking()
RejectedRowIndices invalid_indices_
virtual ~ParquetEncoder()=default
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
virtual std::string encodedDataToString(const int8_t *bytes) const =0
static void throwNotNullViolation(const std::string &parquet_column_name)
virtual void disableMetadataStatsValidation()
virtual void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values)=0
RejectedRowIndices getRejectedRowIndices() const
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
dictionary stats
Definition: report.py:116
virtual void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values)=0
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::set< int64_t > InvalidRowGroupIndices
static void validateNullCount(const std::string &parquet_column_name, int64_t null_count, const SQLTypeInfo &column_type)
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
bool g_enable_smem_group_by true
virtual void validate(const int8_t *parquet_data, const int64_t j, const SQLTypeInfo &column_type) const =0
static std::shared_ptr< ChunkMetadata > createMetadata(const SQLTypeInfo &column_type)
virtual std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type)
virtual void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements)=0
virtual void validateUsingEncodersColumnType(const int8_t *parquet_data, const int64_t j) const =0
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
virtual void initializeColumnType(const SQLTypeInfo &column_type)
bool g_enable_watchdog false
Definition: Execute.cpp:80
ParquetScalarEncoder(Data_Namespace::AbstractBuffer *buffer)
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
bool is_array() const
Definition: sqltypes.h:583
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0