OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetArrayEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/types.h>
20 #include "ParquetEncoder.h"
21 
22 namespace foreign_storage {
24  public:
26  std::shared_ptr<ParquetScalarEncoder> scalar_encoder,
27  const ColumnDescriptor* column_desciptor)
28  : ParquetEncoder(data_buffer)
30  column_desciptor->columnType.get_elem_type().get_size())
31  , scalar_encoder_(scalar_encoder)
35 
36  void appendData(const int16_t* def_levels,
37  const int16_t* rep_levels,
38  const int64_t values_read,
39  const int64_t levels_read,
40  const bool is_last_batch,
41  int8_t* values) override {
42  CHECK(levels_read > 0);
43 
44  // encode all values in the temporary in-memory `encode_buffer_`, doing
45  // this encoding as a batch rather than element-wise exposes opportunities
46  // for performance optimization for certain scalar types
47  encodeAllValues(values, values_read);
48 
49  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
50  if (isNewArray(rep_levels[i])) {
53  }
54  processArrayItem(def_levels[i], j);
55  }
56 
57  if (is_last_batch) {
59  }
60  }
61 
62  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
63  const parquet::RowGroupMetaData* group_metadata,
64  const int parquet_column_index,
65  const SQLTypeInfo& column_type) override {
66  return scalar_encoder_->getRowGroupMetadata(
67  group_metadata, parquet_column_index, column_type);
68  }
69 
70  protected:
71  virtual void processLastArray() = 0;
72 
73  virtual void appendArraysToBuffer() {
75  data_buffer_bytes_.clear();
76  }
77 
78  bool isLastArrayNull() const { return is_null_array_; }
79 
80  size_t sizeOfLastArray() const { return num_elements_in_array_; }
81 
82  int8_t* resizeArrayDataBytes(const size_t additional_num_elements) {
83  auto current_data_byte_size = data_buffer_bytes_.size();
84  data_buffer_bytes_.resize(current_data_byte_size +
85  additional_num_elements * omnisci_data_type_byte_size_);
86  return data_buffer_bytes_.data() + current_data_byte_size;
87  }
88 
90  std::shared_ptr<ParquetScalarEncoder> scalar_encoder_;
91  std::vector<int8_t> data_buffer_bytes_;
92 
93  // constants used during Dremel encoding assembly
94  const static int16_t non_null_def_level = 3;
95  const static int16_t item_null_def_level = 2;
96  const static int16_t list_null_def_level = 0;
97 
98  private:
103  has_assembly_started_ = false;
104  }
105 
107  is_null_array_ = false;
109  }
110 
111  bool isNewArray(const int16_t rep_level) const {
112  return rep_level == 0 && has_assembly_started_;
113  }
114 
115  void processArrayItem(const int16_t def_level, int64_t& encoded_index) {
116  has_assembly_started_ = true;
117  if (def_level == non_null_def_level) {
118  // push back a scalar element to in-memory data buffer
119  appendArrayItem(encoded_index++);
120  } else if (def_level == item_null_def_level) {
121  // push back a scalar null to in-memory data buffer
123  } else if (def_level == list_null_def_level) {
124  markArrayAsNull();
125  } else {
126  UNREACHABLE();
127  }
128  }
129 
130  void encodeAllValues(const int8_t* values, const int64_t values_read) {
131  encode_buffer_.resize(values_read * omnisci_data_type_byte_size_);
132  scalar_encoder_->encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
133  }
134 
135  void markArrayAsNull() { is_null_array_ = true; }
136 
137  void appendArrayItem(const int64_t encoded_index) {
138  auto omnisci_data_ptr = resizeArrayDataBytes(1);
139  scalar_encoder_->copy(
140  encode_buffer_.data() + (encoded_index)*omnisci_data_type_byte_size_,
141  omnisci_data_ptr);
143  }
144 
148  }
149 
150  std::vector<int8_t> encode_buffer_;
154 };
155 } // namespace foreign_storage
void processArrayItem(const int16_t def_level, int64_t &encoded_index)
#define UNREACHABLE()
Definition: Logger.h:247
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
void appendArrayItem(const int64_t encoded_index)
ParquetArrayEncoder(Data_Namespace::AbstractBuffer *data_buffer, std::shared_ptr< ParquetScalarEncoder > scalar_encoder, const ColumnDescriptor *column_desciptor)
int8_t * resizeArrayDataBytes(const size_t additional_num_elements)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void encodeAllValues(const int8_t *values, const int64_t values_read)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:203
bool isNewArray(const int16_t rep_level) const
Data_Namespace::AbstractBuffer * buffer_
std::shared_ptr< ParquetScalarEncoder > scalar_encoder_
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override