OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetArrayEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/types.h>
20 #include "ParquetEncoder.h"
21 
22 namespace foreign_storage {
24  public:
26  std::shared_ptr<ParquetScalarEncoder> scalar_encoder,
27  const ColumnDescriptor* column_desciptor)
28  : ParquetEncoder(data_buffer)
30  column_desciptor->columnType.get_elem_type().get_size())
31  , scalar_encoder_(scalar_encoder)
38 
39  void appendDataTrackErrors(const int16_t* def_levels,
40  const int16_t* rep_levels,
41  const int64_t values_read,
42  const int64_t levels_read,
43  int8_t* values) override {
45  // validate all elements
46  is_valid_item_.assign(values_read, true);
47  for (int64_t j = 0; j < values_read; ++j) {
48  try {
49  scalar_encoder_->validateUsingEncodersColumnType(values, j);
50  } catch (const std::runtime_error& error) {
51  is_valid_item_[j] = false;
52  }
53  }
54  appendData(def_levels, rep_levels, values_read, levels_read, values);
55  }
56 
57  void appendData(const int16_t* def_levels,
58  const int16_t* rep_levels,
59  const int64_t values_read,
60  const int64_t levels_read,
61  int8_t* values) override {
62  CHECK(levels_read > 0);
63 
64  // encode all values in the temporary in-memory `encode_buffer_`, doing
65  // this encoding as a batch rather than element-wise exposes opportunities
66  // for performance optimization for certain scalar types
67  encodeAllValues(values, values_read);
68 
69  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
70  if (isNewArray(rep_levels[i])) {
73  }
74  processArrayItem(def_levels[i], j);
75  }
76  }
77 
82  has_assembly_started_ = false;
83  }
84 
85  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
86  const parquet::RowGroupMetaData* group_metadata,
87  const int parquet_column_index,
88  const SQLTypeInfo& column_type) override {
89  auto metadata = scalar_encoder_->getRowGroupMetadata(
90  group_metadata, parquet_column_index, column_type);
91  metadata->numBytes = 0; // number of bytes is not known
92  return metadata;
93  }
94 
95  virtual void disableMetadataStatsValidation() override {
97  scalar_encoder_->disableMetadataStatsValidation();
98  }
99 
100  virtual void initializeErrorTracking(const SQLTypeInfo& column_type) override {
102  scalar_encoder_->initializeErrorTracking(column_type.get_elem_type());
103  }
104 
105  protected:
106  virtual void processLastArray() {
110  }
112  }
113 
114  virtual void appendArraysToBuffer() {
116  data_buffer_bytes_.clear();
117  }
118 
119  bool isLastArrayNull() const { return is_null_array_; }
120 
121  bool isLastArrayEmpty() const { return is_empty_array_; }
122 
123  size_t sizeOfLastArray() const { return num_elements_in_array_; }
124 
125  int8_t* resizeArrayDataBytes(const size_t additional_num_elements) {
126  auto current_data_byte_size = data_buffer_bytes_.size();
127  data_buffer_bytes_.resize(current_data_byte_size +
128  additional_num_elements * omnisci_data_type_byte_size_);
129  return data_buffer_bytes_.data() + current_data_byte_size;
130  }
131 
133  std::shared_ptr<ParquetScalarEncoder> scalar_encoder_;
134  std::vector<int8_t> data_buffer_bytes_;
135 
136  // constants used during Dremel encoding assembly
137  const static int16_t non_null_def_level = 3;
138  const static int16_t item_null_def_level = 2;
139  const static int16_t empty_list_def_level = 1;
140  const static int16_t list_null_def_level = 0;
141 
142  virtual void resetLastArrayMetadata() {
143  is_empty_array_ = false;
144  is_null_array_ = false;
147  is_invalid_array_ = false;
148  }
149  }
150 
151  bool isNewArray(const int16_t rep_level) const {
152  return rep_level == 0 && has_assembly_started_;
153  }
154 
155  int8_t* encodedDataAtIndex(const size_t index) {
156  return encode_buffer_.data() + (index)*omnisci_data_type_byte_size_;
157  }
158 
159  void updateMetadataForAppendedArrayItem(const int64_t encoded_index) {
161  if (is_error_tracking_enabled_ && !is_valid_item_[encoded_index]) {
162  is_invalid_array_ = true;
163  }
164  }
165 
166  virtual void appendArrayItem(const int64_t encoded_index) {
167  auto omnisci_data_ptr = resizeArrayDataBytes(1);
168  scalar_encoder_->copy(encodedDataAtIndex(encoded_index), omnisci_data_ptr);
169  updateMetadataForAppendedArrayItem(encoded_index);
170  }
171 
172  virtual void encodeAllValues(const int8_t* values, const int64_t values_read) {
173  encode_buffer_.resize(values_read * omnisci_data_type_byte_size_);
174  scalar_encoder_->encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
175  }
176 
177  private:
178  void processArrayItem(const int16_t def_level, int64_t& encoded_index) {
179  has_assembly_started_ = true;
180  if (def_level == non_null_def_level) {
181  // push back a scalar element to in-memory data buffer
182  appendArrayItem(encoded_index++);
183  } else if (def_level == item_null_def_level) {
184  // push back a scalar null to in-memory data buffer
186  } else if (def_level == list_null_def_level) {
187  markArrayAsNull();
188  } else if (def_level == empty_list_def_level) {
190  } else {
191  UNREACHABLE();
192  }
193  }
194 
195  void markArrayAsNull() { is_null_array_ = true; }
196 
198 
202  }
203 
204  std::vector<int8_t> encode_buffer_;
209 
210  // error tracking related members
213  std::vector<bool> is_valid_item_;
214 };
215 } // namespace foreign_storage
virtual void appendArrayItem(const int64_t encoded_index)
RejectedRowIndices invalid_indices_
virtual void disableMetadataStatsValidation()
void processArrayItem(const int16_t def_level, int64_t &encoded_index)
#define UNREACHABLE()
Definition: Logger.h:337
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
ParquetArrayEncoder(Data_Namespace::AbstractBuffer *data_buffer, std::shared_ptr< ParquetScalarEncoder > scalar_encoder, const ColumnDescriptor *column_desciptor)
virtual void encodeAllValues(const int8_t *values, const int64_t values_read)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
virtual void initializeErrorTracking(const SQLTypeInfo &column_type)
int8_t * resizeArrayDataBytes(const size_t additional_num_elements)
int8_t * encodedDataAtIndex(const size_t index)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void updateMetadataForAppendedArrayItem(const int64_t encoded_index)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
virtual void disableMetadataStatsValidation() override
bool g_enable_watchdog false
Definition: Execute.cpp:79
#define CHECK(condition)
Definition: Logger.h:291
bool isNewArray(const int16_t rep_level) const
virtual void initializeErrorTracking(const SQLTypeInfo &column_type) override
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:388
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:963
std::shared_ptr< ParquetScalarEncoder > scalar_encoder_