OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetArrayEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/types.h>
20 #include "ParquetEncoder.h"
21 
22 namespace foreign_storage {
24  public:
26  std::shared_ptr<ParquetScalarEncoder> scalar_encoder,
27  const ColumnDescriptor* column_desciptor)
28  : ParquetEncoder(data_buffer)
30  column_desciptor->columnType.get_elem_type().get_size())
31  , scalar_encoder_(scalar_encoder)
38 
39  void appendDataTrackErrors(const int16_t* def_levels,
40  const int16_t* rep_levels,
41  const int64_t values_read,
42  const int64_t levels_read,
43  int8_t* values) override {
45  // validate all elements
46  is_valid_item_.assign(values_read, true);
47  for (int64_t j = 0; j < values_read; ++j) {
48  try {
49  scalar_encoder_->validateUsingEncodersColumnType(values, j);
50  } catch (const std::runtime_error& error) {
51  is_valid_item_[j] = false;
52  }
53  }
54  appendData(def_levels, rep_levels, values_read, levels_read, values);
55  }
56 
57  void appendData(const int16_t* def_levels,
58  const int16_t* rep_levels,
59  const int64_t values_read,
60  const int64_t levels_read,
61  int8_t* values) override {
62  CHECK(levels_read > 0);
63 
64  // encode all values in the temporary in-memory `encode_buffer_`, doing
65  // this encoding as a batch rather than element-wise exposes opportunities
66  // for performance optimization for certain scalar types
67  encodeAllValues(values, values_read);
68 
69  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
70  if (isNewArray(rep_levels[i])) {
73  }
74  processArrayItem(def_levels[i], j);
75  }
76  }
77 
82  has_assembly_started_ = false;
83  }
84 
85  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
86  const parquet::RowGroupMetaData* group_metadata,
87  const int parquet_column_index,
88  const SQLTypeInfo& column_type) override {
89  auto metadata = scalar_encoder_->getRowGroupMetadata(
90  group_metadata, parquet_column_index, column_type);
91  metadata->numBytes = 0; // number of bytes is not known
92  return metadata;
93  }
94 
95  virtual void disableMetadataStatsValidation() override {
97  scalar_encoder_->disableMetadataStatsValidation();
98  }
99 
100  virtual void initializeErrorTracking() override {
102  scalar_encoder_->initializeErrorTracking();
103  }
104 
105  virtual void initializeColumnType(const SQLTypeInfo& column_type) override {
107  scalar_encoder_->initializeColumnType(column_type.get_elem_type());
108  }
109 
110  protected:
111  virtual void processLastArray() {
115  }
117  }
118 
119  virtual void appendArraysToBuffer() {
121  data_buffer_bytes_.clear();
122  }
123 
124  bool isLastArrayNull() const { return is_null_array_; }
125 
126  bool isLastArrayEmpty() const { return is_empty_array_; }
127 
128  size_t sizeOfLastArray() const { return num_elements_in_array_; }
129 
130  int8_t* resizeArrayDataBytes(const size_t additional_num_elements) {
131  auto current_data_byte_size = data_buffer_bytes_.size();
132  data_buffer_bytes_.resize(current_data_byte_size +
133  additional_num_elements * omnisci_data_type_byte_size_);
134  return data_buffer_bytes_.data() + current_data_byte_size;
135  }
136 
138  std::shared_ptr<ParquetScalarEncoder> scalar_encoder_;
139  std::vector<int8_t> data_buffer_bytes_;
140 
141  // constants used during Dremel encoding assembly
142  const static int16_t non_null_def_level = 3;
143  const static int16_t item_null_def_level = 2;
144  const static int16_t empty_list_def_level = 1;
145  const static int16_t list_null_def_level = 0;
146 
147  virtual void resetLastArrayMetadata() {
148  is_empty_array_ = false;
149  is_null_array_ = false;
152  is_invalid_array_ = false;
153  }
154  }
155 
156  bool isNewArray(const int16_t rep_level) const {
157  return rep_level == 0 && has_assembly_started_;
158  }
159 
160  int8_t* encodedDataAtIndex(const size_t index) {
161  return encode_buffer_.data() + (index)*omnisci_data_type_byte_size_;
162  }
163 
164  void updateMetadataForAppendedArrayItem(const int64_t encoded_index) {
166  if (is_error_tracking_enabled_ && !is_valid_item_[encoded_index]) {
167  is_invalid_array_ = true;
168  }
169  }
170 
171  virtual void appendArrayItem(const int64_t encoded_index) {
172  auto omnisci_data_ptr = resizeArrayDataBytes(1);
173  scalar_encoder_->copy(encodedDataAtIndex(encoded_index), omnisci_data_ptr);
174  updateMetadataForAppendedArrayItem(encoded_index);
175  }
176 
177  virtual void encodeAllValues(const int8_t* values, const int64_t values_read) {
178  encode_buffer_.resize(values_read * omnisci_data_type_byte_size_);
179  scalar_encoder_->encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
180  }
181 
182  private:
183  void processArrayItem(const int16_t def_level, int64_t& encoded_index) {
184  has_assembly_started_ = true;
185  if (def_level == non_null_def_level) {
186  // push back a scalar element to in-memory data buffer
187  appendArrayItem(encoded_index++);
188  } else if (def_level == item_null_def_level) {
189  // push back a scalar null to in-memory data buffer
191  } else if (def_level == list_null_def_level) {
192  markArrayAsNull();
193  } else if (def_level == empty_list_def_level) {
195  } else {
196  UNREACHABLE();
197  }
198  }
199 
200  void markArrayAsNull() { is_null_array_ = true; }
201 
203 
207  }
208 
209  std::vector<int8_t> encode_buffer_;
214 
215  // error tracking related members
218  std::vector<bool> is_valid_item_;
219 };
220 } // namespace foreign_storage
virtual void appendArrayItem(const int64_t encoded_index)
virtual void initializeErrorTracking() override
virtual void initializeErrorTracking()
RejectedRowIndices invalid_indices_
virtual void disableMetadataStatsValidation()
void processArrayItem(const int16_t def_level, int64_t &encoded_index)
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
ParquetArrayEncoder(Data_Namespace::AbstractBuffer *data_buffer, std::shared_ptr< ParquetScalarEncoder > scalar_encoder, const ColumnDescriptor *column_desciptor)
virtual void encodeAllValues(const int8_t *values, const int64_t values_read)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
int8_t * resizeArrayDataBytes(const size_t additional_num_elements)
int8_t * encodedDataAtIndex(const size_t index)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
virtual void initializeColumnType(const SQLTypeInfo &column_type) override
void updateMetadataForAppendedArrayItem(const int64_t encoded_index)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
virtual void disableMetadataStatsValidation() override
virtual void initializeColumnType(const SQLTypeInfo &column_type)
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
bool isNewArray(const int16_t rep_level) const
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
Data_Namespace::AbstractBuffer * buffer_
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:975
std::shared_ptr< ParquetScalarEncoder > scalar_encoder_