OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetStringEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "LazyParquetChunkLoader.h"
20 #include "ParquetInPlaceEncoder.h"
22 
23 #include <parquet/schema.h>
24 #include <parquet/types.h>
25 
26 namespace foreign_storage {
27 
28 template <typename V>
30  public:
32  StringDictionary* string_dictionary,
33  ChunkMetadata* chunk_metadata)
34  : TypedParquetInPlaceEncoder<V, V>(buffer, sizeof(V), sizeof(V))
35  , string_dictionary_(string_dictionary)
36  , chunk_metadata_(chunk_metadata)
37  , encode_buffer_(LazyParquetChunkLoader::batch_reader_num_elements * sizeof(V))
38  , min_(std::numeric_limits<V>::max())
39  , max_(std::numeric_limits<V>::lowest())
41  , invalid_indices_(nullptr) {
42  if (chunk_metadata_) {
44  }
45  }
46 
47  // TODO: this member largely mirrors `appendDataTrackErrors` but is only used
48  // by the parquet-secific import FSI cut-over, and will be removed in the
49  // future
50  void validateAndAppendData(const int16_t* def_levels,
51  const int16_t* rep_levels,
52  const int64_t values_read,
53  const int64_t levels_read,
54  int8_t* values,
55  const SQLTypeInfo& column_type, /* may not be used */
56  InvalidRowGroupIndices& invalid_indices) override {
57  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
58  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
59  if (def_levels[i]) {
60  CHECK_LT(j, values_read);
61  auto& byte_array = parquet_data_ptr[j++];
62  if (byte_array.len > StringDictionary::MAX_STRLEN) {
63  invalid_indices.insert(current_batch_offset_ + i);
64  }
65  }
66  }
67  current_batch_offset_ += levels_read;
68  appendData(def_levels, rep_levels, values_read, levels_read, values);
69  }
70 
71  void appendDataTrackErrors(const int16_t* def_levels,
72  const int16_t* rep_levels,
73  const int64_t values_read,
74  const int64_t levels_read,
75  int8_t* values) override {
77  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
78  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
79  if (def_levels[i]) {
80  CHECK_LT(j, values_read);
81  auto& byte_array = parquet_data_ptr[j++];
82  if (byte_array.len > StringDictionary::MAX_STRLEN) {
84  i);
85  }
87  .get_notnull()) { // item is null for NOT NULL column
89  i);
90  }
91  }
93  appendData(def_levels, rep_levels, values_read, levels_read, values);
94  }
95 
96  void appendData(const int16_t* def_levels,
97  const int16_t* rep_levels,
98  const int64_t values_read,
99  const int64_t levels_read,
100  int8_t* values) override {
101  encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
103  def_levels, rep_levels, values_read, levels_read, encode_buffer_.data());
105  chunk_metadata_->chunkStats.has_nulls || (values_read < levels_read);
106  }
107 
108  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
109  int8_t* omnisci_data_bytes,
110  const size_t num_elements) override {
112  auto parquet_data_ptr =
113  reinterpret_cast<const parquet::ByteArray*>(parquet_data_bytes);
114  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
115  std::vector<std::string_view> string_views;
116  string_views.reserve(num_elements);
117  for (size_t i = 0; i < num_elements; ++i) {
118  auto& byte_array = parquet_data_ptr[i];
119  if (byte_array.len <= StringDictionary::MAX_STRLEN) {
120  string_views.emplace_back(reinterpret_cast<const char*>(byte_array.ptr),
121  byte_array.len);
122  } else {
123  string_views.emplace_back(nullptr, 0);
124  }
125  }
126  string_dictionary_->getOrAddBulk(string_views, omnisci_data_ptr);
127  updateMetadataStats(num_elements, omnisci_data_bytes);
128  }
129 
130  void encodeAndCopy(const int8_t* parquet_data_bytes,
131  int8_t* omnisci_data_bytes) override {
132  TypedParquetInPlaceEncoder<V, V>::copy(parquet_data_bytes, omnisci_data_bytes);
133  }
134 
135  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
136  const parquet::RowGroupMetaData* group_metadata,
137  const int parquet_column_index,
138  const SQLTypeInfo& column_type) override {
139  auto metadata = ParquetEncoder::getRowGroupMetadata(
140  group_metadata, parquet_column_index, column_type);
141  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
143  column_metadata->num_values();
144 
145  // Placeholder metadata is defined with has_nulls = false.
146  metadata->chunkStats.has_nulls = false;
147  return metadata;
148  }
149 
150  protected:
151  bool encodingIsIdentityForSameTypes() const override { return true; }
152 
153  private:
154  void updateMetadataStats(int64_t values_read, int8_t* values) {
155  if (!chunk_metadata_) {
156  return;
157  }
158  V* data_ptr = reinterpret_cast<V*>(values);
159  for (int64_t i = 0; i < values_read; ++i) {
160  min_ = std::min<V>(data_ptr[i], min_);
161  max_ = std::max<V>(data_ptr[i], max_);
162  }
164  }
165 
168  std::vector<int8_t> encode_buffer_;
169 
170  V min_, max_;
171 
174 };
175 
176 } // namespace foreign_storage
bool encodingIsIdentityForSameTypes() const override
void updateMetadataStats(int64_t values_read, int8_t *values)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
RejectedRowIndices invalid_indices_
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
void fillChunkStats(const T min, const T max, const bool has_nulls)
Definition: ChunkMetadata.h:51
bool has_nulls
Definition: ChunkMetadata.h:30
ChunkStats chunkStats
Definition: ChunkMetadata.h:37
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
std::set< int64_t > InvalidRowGroupIndices
An AbstractBuffer is a unit of data management for a data manager.
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
#define CHECK_LT(x, y)
Definition: Logger.h:303
virtual std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type)
ParquetStringEncoder(Data_Namespace::AbstractBuffer *buffer, StringDictionary *string_dictionary, ChunkMetadata *chunk_metadata)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
#define CHECK(condition)
Definition: Logger.h:291
void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes) override
static constexpr size_t MAX_STRLEN