OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetStringEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "LazyParquetChunkLoader.h"
20 #include "ParquetInPlaceEncoder.h"
22 
23 #include <parquet/schema.h>
24 #include <parquet/types.h>
25 
26 namespace foreign_storage {
27 
28 template <typename V>
30  public:
32  StringDictionary* string_dictionary,
33  ChunkMetadata* chunk_metadata)
34  : TypedParquetInPlaceEncoder<V, V>(buffer, sizeof(V), sizeof(V))
35  , string_dictionary_(string_dictionary)
36  , chunk_metadata_(chunk_metadata)
37  , encode_buffer_(LazyParquetChunkLoader::batch_reader_num_elements * sizeof(V))
38  , min_(std::numeric_limits<V>::max())
39  , max_(std::numeric_limits<V>::lowest())
41  , invalid_indices_(nullptr) {}
42 
43  // TODO: this member largely mirrors `appendDataTrackErrors` but is only used
44  // by the parquet-secific import FSI cut-over, and will be removed in the
45  // future
46  void validateAndAppendData(const int16_t* def_levels,
47  const int16_t* rep_levels,
48  const int64_t values_read,
49  const int64_t levels_read,
50  int8_t* values,
51  const SQLTypeInfo& column_type, /* may not be used */
52  InvalidRowGroupIndices& invalid_indices) override {
53  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
54  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
55  if (def_levels[i]) {
56  CHECK_LT(j, values_read);
57  auto& byte_array = parquet_data_ptr[j++];
58  if (byte_array.len > StringDictionary::MAX_STRLEN) {
59  invalid_indices.insert(current_batch_offset_ + i);
60  }
61  }
62  }
63  current_batch_offset_ += levels_read;
64  appendData(def_levels, rep_levels, values_read, levels_read, values);
65  }
66 
67  void appendDataTrackErrors(const int16_t* def_levels,
68  const int16_t* rep_levels,
69  const int64_t values_read,
70  const int64_t levels_read,
71  int8_t* values) override {
73  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
74  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
75  if (def_levels[i]) {
76  CHECK_LT(j, values_read);
77  auto& byte_array = parquet_data_ptr[j++];
78  if (byte_array.len > StringDictionary::MAX_STRLEN) {
80  i);
81  }
83  .get_notnull()) { // item is null for NOT NULL column
85  i);
86  }
87  }
89  appendData(def_levels, rep_levels, values_read, levels_read, values);
90  }
91 
92  void appendData(const int16_t* def_levels,
93  const int16_t* rep_levels,
94  const int64_t values_read,
95  const int64_t levels_read,
96  int8_t* values) override {
97  encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
99  def_levels, rep_levels, values_read, levels_read, encode_buffer_.data());
100  }
101 
102  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
103  int8_t* omnisci_data_bytes,
104  const size_t num_elements) override {
106  auto parquet_data_ptr =
107  reinterpret_cast<const parquet::ByteArray*>(parquet_data_bytes);
108  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
109  std::vector<std::string_view> string_views;
110  string_views.reserve(num_elements);
111  for (size_t i = 0; i < num_elements; ++i) {
112  auto& byte_array = parquet_data_ptr[i];
113  if (byte_array.len <= StringDictionary::MAX_STRLEN) {
114  string_views.emplace_back(reinterpret_cast<const char*>(byte_array.ptr),
115  byte_array.len);
116  } else {
117  string_views.emplace_back(nullptr, 0);
118  }
119  }
120  string_dictionary_->getOrAddBulk(string_views, omnisci_data_ptr);
121  updateMetadataStats(num_elements, omnisci_data_bytes);
122  }
123 
124  void encodeAndCopy(const int8_t* parquet_data_bytes,
125  int8_t* omnisci_data_bytes) override {
126  TypedParquetInPlaceEncoder<V, V>::copy(parquet_data_bytes, omnisci_data_bytes);
127  }
128 
129  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
130  const parquet::RowGroupMetaData* group_metadata,
131  const int parquet_column_index,
132  const SQLTypeInfo& column_type) override {
133  auto metadata = ParquetEncoder::getRowGroupMetadata(
134  group_metadata, parquet_column_index, column_type);
135  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
137  column_metadata->num_values();
138  return metadata;
139  }
140 
141  protected:
142  bool encodingIsIdentityForSameTypes() const override { return true; }
143 
144  private:
145  void updateMetadataStats(int64_t values_read, int8_t* values) {
146  if (!chunk_metadata_) {
147  return;
148  }
149  V* data_ptr = reinterpret_cast<V*>(values);
150  for (int64_t i = 0; i < values_read; ++i) {
151  min_ = std::min<V>(data_ptr[i], min_);
152  max_ = std::max<V>(data_ptr[i], max_);
153  }
155  }
156 
159  std::vector<int8_t> encode_buffer_;
160 
161  V min_, max_;
162 
165 };
166 
167 } // namespace foreign_storage
bool encodingIsIdentityForSameTypes() const override
void updateMetadataStats(int64_t values_read, int8_t *values)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
RejectedRowIndices invalid_indices_
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
void fillChunkStats(const T min, const T max, const bool has_nulls)
Definition: ChunkMetadata.h:74
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
std::set< int64_t > InvalidRowGroupIndices
An AbstractBuffer is a unit of data management for a data manager.
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
#define CHECK_LT(x, y)
Definition: Logger.h:233
virtual std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type)
ParquetStringEncoder(Data_Namespace::AbstractBuffer *buffer, StringDictionary *string_dictionary, ChunkMetadata *chunk_metadata)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
#define CHECK(condition)
Definition: Logger.h:223
void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes) override
static constexpr size_t MAX_STRLEN