OmniSciDB  2e3a973ef4
ParquetInPlaceEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "ParquetEncoder.h"
20 
21 #include <parquet/schema.h>
22 #include <parquet/types.h>
23 
25 
26 namespace foreign_storage {
27 
29  const parquet::LogicalType::TimeUnit::unit time_unit) {
30  int64_t conversion_denominator = 0;
31  switch (time_unit) {
32  case parquet::LogicalType::TimeUnit::MILLIS:
33  conversion_denominator = 1000L;
34  break;
35  case parquet::LogicalType::TimeUnit::MICROS:
36  conversion_denominator = 1000L * 1000L;
37  break;
38  case parquet::LogicalType::TimeUnit::NANOS:
39  conversion_denominator = 1000L * 1000L * 1000L;
40  break;
41  default:
42  UNREACHABLE();
43  }
44  return conversion_denominator;
45 }
46 
47 template <typename V, std::enable_if_t<std::is_integral<V>::value, int> = 0>
48 inline V get_null_value() {
49  return inline_int_null_value<V>();
50 }
51 
52 template <typename V, std::enable_if_t<std::is_floating_point<V>::value, int> = 0>
53 inline V get_null_value() {
54  return inline_fp_null_value<V>();
55 }
56 
58  public:
60  const size_t omnisci_data_type_byte_size,
61  const size_t parquet_data_type_byte_size)
62  : ParquetScalarEncoder(buffer)
63  , omnisci_data_type_byte_size_(omnisci_data_type_byte_size)
64  , parquet_data_type_byte_size_(parquet_data_type_byte_size) {}
65 
81  void appendData(const int16_t* def_levels,
82  const int16_t* rep_levels,
83  const int64_t values_read,
84  const int64_t levels_read,
85  const bool is_last_batch,
86  int8_t* values) override {
88  for (int64_t i = 0; i < values_read; ++i) {
90  values + i * omnisci_data_type_byte_size_);
91  }
92  }
93 
94  if (values_read < levels_read) { // nulls exist
96  values,
97  def_levels,
98  values_read,
99  levels_read,
102  for (int64_t i = levels_read - 1; i >= 0; --i) {
104  values + i * omnisci_data_type_byte_size_);
105  }
106  }
107 
108  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
109  }
110 
111  protected:
113 
114  private:
115  void decodeNullsAndEncodeData(int8_t* data_ptr,
116  const int16_t* def_levels,
117  const int64_t values_read,
118  const int64_t levels_read,
119  const bool do_encoding) {
120  for (int64_t i = levels_read - 1, j = values_read - 1; i >= 0; --i) {
121  if (def_levels[i]) { // not null
122  CHECK(j >= 0);
123  if (do_encoding) {
124  encodeAndCopy(data_ptr + (j--) * parquet_data_type_byte_size_,
125  data_ptr + i * omnisci_data_type_byte_size_);
126  } else {
127  copy(data_ptr + (j--) * omnisci_data_type_byte_size_,
128  data_ptr + i * omnisci_data_type_byte_size_);
129  }
130  } else { // null
131  setNull(data_ptr + i * omnisci_data_type_byte_size_);
132  }
133  }
134  }
135 
137 };
138 
139 template <typename V, typename T>
141  public:
143  const ColumnDescriptor* column_desciptor,
144  const parquet::ColumnDescriptor* parquet_column_descriptor)
146  buffer,
147  column_desciptor->columnType.get_size(),
148  parquet::GetTypeByteSize(parquet_column_descriptor->physical_type())) {}
149 
151  const size_t omnisci_data_type_byte_size,
152  const size_t parquet_data_type_byte_size)
153  : ParquetInPlaceEncoder(buffer,
154  omnisci_data_type_byte_size,
155  parquet_data_type_byte_size) {}
156 
163  void appendData(const int16_t* def_levels,
164  const int16_t* rep_levels,
165  const int64_t values_read,
166  const int64_t levels_read,
167  const bool is_last_batch,
168  int8_t* values) override {
169  if (std::is_same<V, T>::value && values_read == levels_read) {
170  if (!encodingIsIdentityForSameTypes()) {
171  for (int64_t i = 0; i < levels_read; ++i) {
173  values + i * omnisci_data_type_byte_size_);
174  }
175  }
176  buffer_->append(values, levels_read * omnisci_data_type_byte_size_);
177  } else {
179  def_levels, rep_levels, values_read, levels_read, is_last_batch, values);
180  }
181  }
182 
183  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
184  int8_t* omnisci_data_bytes,
185  const size_t num_elements) override {
186  auto parquet_data_ptr = reinterpret_cast<const T*>(parquet_data_bytes);
187  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
188  for (size_t i = 0; i < num_elements; ++i) {
189  encodeAndCopy(reinterpret_cast<const int8_t*>(&parquet_data_ptr[i]),
190  reinterpret_cast<int8_t*>(&omnisci_data_ptr[i]));
191  }
192  }
193 
194  void setNull(int8_t* omnisci_data_bytes) override {
195  auto& omnisci_data_value = reinterpret_cast<V*>(omnisci_data_bytes)[0];
196  omnisci_data_value = get_null_value<V>();
197  }
198 
199  void copy(const int8_t* omnisci_data_bytes_source,
200  int8_t* omnisci_data_bytes_destination) override {
201  const auto& omnisci_data_value_source =
202  reinterpret_cast<const V*>(omnisci_data_bytes_source)[0];
203  auto& omnisci_data_value_destination =
204  reinterpret_cast<V*>(omnisci_data_bytes_destination)[0];
205  omnisci_data_value_destination = omnisci_data_value_source;
206  }
207 
208  protected:
209  virtual bool encodingIsIdentityForSameTypes() const { return false; }
210 };
211 
212 } // namespace foreign_storage
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
ParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size)
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
#define UNREACHABLE()
Definition: Logger.h:241
virtual void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes)=0
void setNull(int8_t *omnisci_data_bytes) override
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
TypedParquetInPlaceEncoder(Data_Namespace::AbstractBuffer *buffer, const ColumnDescriptor *column_desciptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
virtual void setNull(int8_t *omnisci_data_bytes)=0
An AbstractBuffer is a unit of data management for a data manager.
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
specifies the content in-memory of a row in the column metadata table
int64_t get_time_conversion_denominator(const parquet::LogicalType::TimeUnit::unit time_unit)
void decodeNullsAndEncodeData(int8_t *data_ptr, const int16_t *def_levels, const int64_t values_read, const int64_t levels_read, const bool do_encoding)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
#define CHECK(condition)
Definition: Logger.h:197
Data_Namespace::AbstractBuffer * buffer_
virtual void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination)=0