OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetArrayEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/types.h>
20 #include "ParquetEncoder.h"
21 
22 namespace foreign_storage {
24  public:
26  std::shared_ptr<ParquetScalarEncoder> scalar_encoder,
27  const ColumnDescriptor* column_desciptor)
28  : ParquetEncoder(data_buffer)
30  column_desciptor->columnType.get_elem_type().get_size())
31  , scalar_encoder_(scalar_encoder)
36 
37  void appendData(const int16_t* def_levels,
38  const int16_t* rep_levels,
39  const int64_t values_read,
40  const int64_t levels_read,
41  int8_t* values) override {
42  CHECK(levels_read > 0);
43 
44  // encode all values in the temporary in-memory `encode_buffer_`, doing
45  // this encoding as a batch rather than element-wise exposes opportunities
46  // for performance optimization for certain scalar types
47  encodeAllValues(values, values_read);
48 
49  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
50  if (isNewArray(rep_levels[i])) {
53  }
54  processArrayItem(def_levels[i], j);
55  }
56  }
57 
62  has_assembly_started_ = false;
63  }
64 
65  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
66  const parquet::RowGroupMetaData* group_metadata,
67  const int parquet_column_index,
68  const SQLTypeInfo& column_type) override {
69  auto metadata = scalar_encoder_->getRowGroupMetadata(
70  group_metadata, parquet_column_index, column_type);
71  metadata->numBytes = 0; // number of bytes is not known
72  return metadata;
73  }
74 
75  protected:
76  virtual void processLastArray() = 0;
77 
78  virtual void appendArraysToBuffer() {
80  data_buffer_bytes_.clear();
81  }
82 
83  bool isLastArrayNull() const { return is_null_array_; }
84 
85  bool isLastArrayEmpty() const { return is_empty_array_; }
86 
87  size_t sizeOfLastArray() const { return num_elements_in_array_; }
88 
89  int8_t* resizeArrayDataBytes(const size_t additional_num_elements) {
90  auto current_data_byte_size = data_buffer_bytes_.size();
91  data_buffer_bytes_.resize(current_data_byte_size +
92  additional_num_elements * omnisci_data_type_byte_size_);
93  return data_buffer_bytes_.data() + current_data_byte_size;
94  }
95 
97  std::shared_ptr<ParquetScalarEncoder> scalar_encoder_;
98  std::vector<int8_t> data_buffer_bytes_;
99 
100  // constants used during Dremel encoding assembly
101  const static int16_t non_null_def_level = 3;
102  const static int16_t item_null_def_level = 2;
103  const static int16_t empty_list_def_level = 1;
104  const static int16_t list_null_def_level = 0;
105 
106  virtual void resetLastArrayMetadata() {
107  is_empty_array_ = false;
108  is_null_array_ = false;
110  }
111 
112  bool isNewArray(const int16_t rep_level) const {
113  return rep_level == 0 && has_assembly_started_;
114  }
115 
116  virtual void appendArrayItem(const int64_t encoded_index) {
117  auto omnisci_data_ptr = resizeArrayDataBytes(1);
118  scalar_encoder_->copy(
119  encode_buffer_.data() + (encoded_index)*omnisci_data_type_byte_size_,
120  omnisci_data_ptr);
122  }
123 
124  private:
125  void processArrayItem(const int16_t def_level, int64_t& encoded_index) {
126  has_assembly_started_ = true;
127  if (def_level == non_null_def_level) {
128  // push back a scalar element to in-memory data buffer
129  appendArrayItem(encoded_index++);
130  } else if (def_level == item_null_def_level) {
131  // push back a scalar null to in-memory data buffer
133  } else if (def_level == list_null_def_level) {
134  markArrayAsNull();
135  } else if (def_level == empty_list_def_level) {
137  } else {
138  UNREACHABLE();
139  }
140  }
141 
142  void encodeAllValues(const int8_t* values, const int64_t values_read) {
143  encode_buffer_.resize(values_read * omnisci_data_type_byte_size_);
144  scalar_encoder_->encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
145  }
146 
147  void markArrayAsNull() { is_null_array_ = true; }
148 
150 
154  }
155 
156  std::vector<int8_t> encode_buffer_;
161 };
162 } // namespace foreign_storage
virtual void appendArrayItem(const int64_t encoded_index)
void processArrayItem(const int16_t def_level, int64_t &encoded_index)
#define UNREACHABLE()
Definition: Logger.h:253
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
ParquetArrayEncoder(Data_Namespace::AbstractBuffer *data_buffer, std::shared_ptr< ParquetScalarEncoder > scalar_encoder, const ColumnDescriptor *column_desciptor)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
int8_t * resizeArrayDataBytes(const size_t additional_num_elements)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void encodeAllValues(const int8_t *values, const int64_t values_read)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
bool isNewArray(const int16_t rep_level) const
Data_Namespace::AbstractBuffer * buffer_
std::shared_ptr< ParquetScalarEncoder > scalar_encoder_