OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetStringEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "LazyParquetChunkLoader.h"
20 #include "ParquetInPlaceEncoder.h"
22 
23 #include <parquet/schema.h>
24 #include <parquet/types.h>
25 
26 namespace foreign_storage {
27 
28 template <typename V>
30  public:
32  StringDictionary* string_dictionary,
33  ChunkMetadata* chunk_metadata)
34  : TypedParquetInPlaceEncoder<V, V>(buffer, sizeof(V), sizeof(V))
35  , string_dictionary_(string_dictionary)
36  , chunk_metadata_(chunk_metadata)
37  , encode_buffer_(LazyParquetChunkLoader::batch_reader_num_elements * sizeof(V))
38  , min_(std::numeric_limits<V>::max())
39  , max_(std::numeric_limits<V>::lowest())
41  , invalid_indices_(nullptr) {}
42 
43  // TODO: this member largely mirrors `appendDataTrackErrors` but is only used
44  // by the parquet-secific import FSI cut-over, and will be removed in the
45  // future
46  void validateAndAppendData(const int16_t* def_levels,
47  const int16_t* rep_levels,
48  const int64_t values_read,
49  const int64_t levels_read,
50  int8_t* values,
51  const SQLTypeInfo& column_type, /* may not be used */
52  InvalidRowGroupIndices& invalid_indices) override {
53  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
54  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
55  if (def_levels[i]) {
56  CHECK_LT(j, values_read);
57  auto& byte_array = parquet_data_ptr[j++];
58  if (byte_array.len > StringDictionary::MAX_STRLEN) {
59  invalid_indices.insert(current_batch_offset_ + i);
60  }
61  }
62  }
63  current_batch_offset_ += levels_read;
64  appendData(def_levels, rep_levels, values_read, levels_read, values);
65  }
66 
67  void appendDataTrackErrors(const int16_t* def_levels,
68  const int16_t* rep_levels,
69  const int64_t values_read,
70  const int64_t levels_read,
71  int8_t* values) override {
73  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
74  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
75  if (def_levels[i]) {
76  CHECK_LT(j, values_read);
77  auto& byte_array = parquet_data_ptr[j++];
78  if (byte_array.len > StringDictionary::MAX_STRLEN) {
80  i);
81  }
83  .get_notnull()) { // item is null for NOT NULL column
85  i);
86  }
87  }
89  appendData(def_levels, rep_levels, values_read, levels_read, values);
90  }
91 
92  void appendData(const int16_t* def_levels,
93  const int16_t* rep_levels,
94  const int64_t values_read,
95  const int64_t levels_read,
96  int8_t* values) override {
97  encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
99  def_levels, rep_levels, values_read, levels_read, encode_buffer_.data());
101  chunk_metadata_->chunkStats.has_nulls || (values_read < levels_read);
102  }
103 
104  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
105  int8_t* omnisci_data_bytes,
106  const size_t num_elements) override {
108  auto parquet_data_ptr =
109  reinterpret_cast<const parquet::ByteArray*>(parquet_data_bytes);
110  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
111  std::vector<std::string_view> string_views;
112  string_views.reserve(num_elements);
113  for (size_t i = 0; i < num_elements; ++i) {
114  auto& byte_array = parquet_data_ptr[i];
115  if (byte_array.len <= StringDictionary::MAX_STRLEN) {
116  string_views.emplace_back(reinterpret_cast<const char*>(byte_array.ptr),
117  byte_array.len);
118  } else {
119  string_views.emplace_back(nullptr, 0);
120  }
121  }
122  string_dictionary_->getOrAddBulk(string_views, omnisci_data_ptr);
123  updateMetadataStats(num_elements, omnisci_data_bytes);
124  }
125 
126  void encodeAndCopy(const int8_t* parquet_data_bytes,
127  int8_t* omnisci_data_bytes) override {
128  TypedParquetInPlaceEncoder<V, V>::copy(parquet_data_bytes, omnisci_data_bytes);
129  }
130 
131  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
132  const parquet::RowGroupMetaData* group_metadata,
133  const int parquet_column_index,
134  const SQLTypeInfo& column_type) override {
135  auto metadata = ParquetEncoder::getRowGroupMetadata(
136  group_metadata, parquet_column_index, column_type);
137  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
139  column_metadata->num_values();
140 
141  // Placeholder metadata is defined with has_nulls = false.
142  metadata->chunkStats.has_nulls = false;
143  return metadata;
144  }
145 
146  protected:
147  bool encodingIsIdentityForSameTypes() const override { return true; }
148 
149  private:
150  void updateMetadataStats(int64_t values_read, int8_t* values) {
151  if (!chunk_metadata_) {
152  return;
153  }
154  V* data_ptr = reinterpret_cast<V*>(values);
155  for (int64_t i = 0; i < values_read; ++i) {
156  min_ = std::min<V>(data_ptr[i], min_);
157  max_ = std::max<V>(data_ptr[i], max_);
158  }
160  }
161 
164  std::vector<int8_t> encode_buffer_;
165 
166  V min_, max_;
167 
170 };
171 
172 } // namespace foreign_storage
bool encodingIsIdentityForSameTypes() const override
void updateMetadataStats(int64_t values_read, int8_t *values)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
RejectedRowIndices invalid_indices_
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
void fillChunkStats(const T min, const T max, const bool has_nulls)
Definition: ChunkMetadata.h:51
bool has_nulls
Definition: ChunkMetadata.h:30
ChunkStats chunkStats
Definition: ChunkMetadata.h:37
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
std::set< int64_t > InvalidRowGroupIndices
An AbstractBuffer is a unit of data management for a data manager.
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
#define CHECK_LT(x, y)
Definition: Logger.h:303
virtual std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type)
ParquetStringEncoder(Data_Namespace::AbstractBuffer *buffer, StringDictionary *string_dictionary, ChunkMetadata *chunk_metadata)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
#define CHECK(condition)
Definition: Logger.h:291
void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes) override
static constexpr size_t MAX_STRLEN