OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetArrayEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/types.h>
20 #include "ParquetEncoder.h"
21 
22 namespace foreign_storage {
24  public:
26  std::shared_ptr<ParquetScalarEncoder> scalar_encoder,
27  const ColumnDescriptor* column_desciptor)
28  : ParquetEncoder(data_buffer)
30  column_desciptor->columnType.get_elem_type().get_size())
31  , scalar_encoder_(scalar_encoder)
35 
36  void appendData(const int16_t* def_levels,
37  const int16_t* rep_levels,
38  const int64_t values_read,
39  const int64_t levels_read,
40  const bool is_last_batch,
41  int8_t* values) override {
42  CHECK(levels_read > 0);
43 
44  // encode all values in the temporary in-memory `encode_buffer_`, doing
45  // this encoding as a batch rather than element-wise exposes opportunities
46  // for performance optimization for certain scalar types
47  encodeAllValues(values, values_read);
48 
49  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
50  if (isNewArray(rep_levels[i])) {
53  }
54  processArrayItem(def_levels[i], j);
55  }
56 
57  if (is_last_batch) {
59  }
60  }
61 
62  protected:
63  virtual void processLastArray() = 0;
64 
65  virtual void appendArraysToBuffer() {
67  data_buffer_bytes_.clear();
68  }
69 
70  bool isLastArrayNull() const { return is_null_array_; }
71 
72  size_t sizeOfLastArray() const { return num_elements_in_array_; }
73 
74  int8_t* resizeArrayDataBytes(const size_t additional_num_elements) {
75  auto current_data_byte_size = data_buffer_bytes_.size();
76  data_buffer_bytes_.resize(current_data_byte_size +
77  additional_num_elements * omnisci_data_type_byte_size_);
78  return data_buffer_bytes_.data() + current_data_byte_size;
79  }
80 
82  std::shared_ptr<ParquetScalarEncoder> scalar_encoder_;
83  std::vector<int8_t> data_buffer_bytes_;
84 
85  // constants used during Dremel encoding assembly
86  const static int16_t non_null_def_level = 3;
87  const static int16_t item_null_def_level = 2;
88  const static int16_t list_null_def_level = 0;
89 
90  private:
95  has_assembly_started_ = false;
96  }
97 
99  is_null_array_ = false;
101  }
102 
103  bool isNewArray(const int16_t rep_level) const {
104  return rep_level == 0 && has_assembly_started_;
105  }
106 
107  void processArrayItem(const int16_t def_level, int64_t& encoded_index) {
108  has_assembly_started_ = true;
109  if (def_level == non_null_def_level) {
110  // push back a scalar element to in-memory data buffer
111  appendArrayItem(encoded_index++);
112  } else if (def_level == item_null_def_level) {
113  // push back a scalar null to in-memory data buffer
115  } else if (def_level == list_null_def_level) {
116  markArrayAsNull();
117  } else {
118  UNREACHABLE();
119  }
120  }
121 
122  void encodeAllValues(const int8_t* values, const int64_t values_read) {
123  encode_buffer_.resize(values_read * omnisci_data_type_byte_size_);
124  scalar_encoder_->encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
125  }
126 
127  void markArrayAsNull() { is_null_array_ = true; }
128 
129  void appendArrayItem(const int64_t encoded_index) {
130  auto omnisci_data_ptr = resizeArrayDataBytes(1);
131  scalar_encoder_->copy(
132  encode_buffer_.data() + (encoded_index)*omnisci_data_type_byte_size_,
133  omnisci_data_ptr);
135  }
136 
140  }
141 
142  std::vector<int8_t> encode_buffer_;
146 };
147 } // namespace foreign_storage
void processArrayItem(const int16_t def_level, int64_t &encoded_index)
#define UNREACHABLE()
Definition: Logger.h:241
void appendArrayItem(const int64_t encoded_index)
ParquetArrayEncoder(Data_Namespace::AbstractBuffer *data_buffer, std::shared_ptr< ParquetScalarEncoder > scalar_encoder, const ColumnDescriptor *column_desciptor)
int8_t * resizeArrayDataBytes(const size_t additional_num_elements)
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void encodeAllValues(const int8_t *values, const int64_t values_read)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
bool g_enable_watchdog false
Definition: Execute.cpp:74
#define CHECK(condition)
Definition: Logger.h:197
bool isNewArray(const int16_t rep_level) const
Data_Namespace::AbstractBuffer * buffer_
std::shared_ptr< ParquetScalarEncoder > scalar_encoder_
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override