OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetStringEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "LazyParquetChunkLoader.h"
20 #include "ParquetInPlaceEncoder.h"
22 
23 #include <parquet/schema.h>
24 #include <parquet/types.h>
25 
26 namespace foreign_storage {
27 
28 template <typename V>
30  public:
32  StringDictionary* string_dictionary,
33  ChunkMetadata* chunk_metadata)
34  : TypedParquetInPlaceEncoder<V, V>(buffer, sizeof(V), sizeof(V))
35  , string_dictionary_(string_dictionary)
36  , chunk_metadata_(chunk_metadata)
37  , encode_buffer_(LazyParquetChunkLoader::batch_reader_num_elements * sizeof(V))
38  , min_(std::numeric_limits<V>::max())
39  , max_(std::numeric_limits<V>::lowest())
41  , invalid_indices_(nullptr) {}
42 
43  void validateAndAppendData(const int16_t* def_levels,
44  const int16_t* rep_levels,
45  const int64_t values_read,
46  const int64_t levels_read,
47  int8_t* values,
48  const SQLTypeInfo& column_type, /* may not be used */
49  InvalidRowGroupIndices& invalid_indices) override {
50  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
51  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
52  if (def_levels[i]) {
53  CHECK(j < values_read);
54  auto& byte_array = parquet_data_ptr[j++];
55  if (byte_array.len > StringDictionary::MAX_STRLEN) {
56  invalid_indices.insert(current_batch_offset_ + i);
57  }
58  }
59  }
60  current_batch_offset_ += levels_read;
61  encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
62  appendData(def_levels, rep_levels, values_read, levels_read, values);
63  }
64 
65  void appendData(const int16_t* def_levels,
66  const int16_t* rep_levels,
67  const int64_t values_read,
68  const int64_t levels_read,
69  int8_t* values) override {
70  encodeAndCopyContiguous(values, encode_buffer_.data(), values_read);
72  def_levels, rep_levels, values_read, levels_read, encode_buffer_.data());
73  }
74 
75  void encodeAndCopyContiguous(const int8_t* parquet_data_bytes,
76  int8_t* omnisci_data_bytes,
77  const size_t num_elements) override {
79  auto parquet_data_ptr =
80  reinterpret_cast<const parquet::ByteArray*>(parquet_data_bytes);
81  auto omnisci_data_ptr = reinterpret_cast<V*>(omnisci_data_bytes);
82  std::vector<std::string_view> string_views;
83  string_views.reserve(num_elements);
84  for (size_t i = 0; i < num_elements; ++i) {
85  auto& byte_array = parquet_data_ptr[i];
86  if (byte_array.len <= StringDictionary::MAX_STRLEN) {
87  string_views.emplace_back(reinterpret_cast<const char*>(byte_array.ptr),
88  byte_array.len);
89  } else {
90  string_views.emplace_back(nullptr, 0);
91  }
92  }
93  string_dictionary_->getOrAddBulk(string_views, omnisci_data_ptr);
94  updateMetadataStats(num_elements, omnisci_data_bytes);
95  }
96 
97  void encodeAndCopy(const int8_t* parquet_data_bytes,
98  int8_t* omnisci_data_bytes) override {
99  TypedParquetInPlaceEncoder<V, V>::copy(parquet_data_bytes, omnisci_data_bytes);
100  }
101 
102  std::shared_ptr<ChunkMetadata> getRowGroupMetadata(
103  const parquet::RowGroupMetaData* group_metadata,
104  const int parquet_column_index,
105  const SQLTypeInfo& column_type) override {
106  auto metadata = ParquetEncoder::getRowGroupMetadata(
107  group_metadata, parquet_column_index, column_type);
108  auto column_metadata = group_metadata->ColumnChunk(parquet_column_index);
110  column_metadata->num_values();
111  return metadata;
112  }
113 
114  protected:
115  bool encodingIsIdentityForSameTypes() const override { return true; }
116 
117  private:
118  void updateMetadataStats(int64_t values_read, int8_t* values) {
119  if (!chunk_metadata_) {
120  return;
121  }
122  V* data_ptr = reinterpret_cast<V*>(values);
123  for (int64_t i = 0; i < values_read; ++i) {
124  min_ = std::min<V>(data_ptr[i], min_);
125  max_ = std::max<V>(data_ptr[i], max_);
126  }
128  }
129 
132  std::vector<int8_t> encode_buffer_;
133 
134  V min_, max_;
135 
138 };
139 
140 } // namespace foreign_storage
bool encodingIsIdentityForSameTypes() const override
void updateMetadataStats(int64_t values_read, int8_t *values)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
void copy(const int8_t *omnisci_data_bytes_source, int8_t *omnisci_data_bytes_destination) override
void fillChunkStats(const T min, const T max, const bool has_nulls)
Definition: ChunkMetadata.h:73
std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type) override
std::set< int64_t > InvalidRowGroupIndices
An AbstractBuffer is a unit of data management for a data manager.
void getOrAddBulk(const std::vector< String > &string_vec, T *encoded_vec)
virtual std::shared_ptr< ChunkMetadata > getRowGroupMetadata(const parquet::RowGroupMetaData *group_metadata, const int parquet_column_index, const SQLTypeInfo &column_type)
ParquetStringEncoder(Data_Namespace::AbstractBuffer *buffer, StringDictionary *string_dictionary, ChunkMetadata *chunk_metadata)
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
void encodeAndCopyContiguous(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes, const size_t num_elements) override
#define CHECK(condition)
Definition: Logger.h:209
void encodeAndCopy(const int8_t *parquet_data_bytes, int8_t *omnisci_data_bytes) override
static constexpr size_t MAX_STRLEN