OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetStringNoneEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "LazyParquetChunkLoader.h"
20 #include "ParquetEncoder.h"
21 
22 #include <parquet/schema.h>
23 #include <parquet/types.h>
24 
25 namespace foreign_storage {
26 
28  public:
30  Data_Namespace::AbstractBuffer* index_buffer)
31  : ParquetEncoder(buffer)
32  , index_buffer_(index_buffer)
33  , encode_buffer_(LazyParquetChunkLoader::batch_reader_num_elements *
34  sizeof(StringOffsetT)) {}
35 
36  void appendData(const int16_t* def_levels,
37  const int16_t* rep_levels,
38  const int64_t values_read,
39  const int64_t levels_read,
40  int8_t* values) override {
41  CHECK(levels_read > 0);
44 
45  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
46  auto offsets = reinterpret_cast<StringOffsetT*>(encode_buffer_.data());
47  auto last_offset = buffer_->size();
48 
49  size_t total_len = 0;
50  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
51  if (def_levels[i]) {
52  CHECK(j < values_read);
53  auto& byte_array = parquet_data_ptr[j++];
55  byte_array.len > ParquetEncoder::column_type_.get_max_strlen()) {
56  // no-op, or effectively inserting a null: total_len += 0;
57  } else {
58  total_len += byte_array.len;
59  }
60  }
61  offsets[i] = last_offset + total_len;
62  }
63  index_buffer_->append(encode_buffer_.data(), levels_read * sizeof(StringOffsetT));
64 
65  encode_buffer_.resize(std::max<size_t>(total_len, encode_buffer_.size()));
66  buffer_->reserve(buffer_->size() + total_len);
67  total_len = 0;
68  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
69  if (def_levels[i]) {
70  CHECK(j < values_read);
71  auto& byte_array = parquet_data_ptr[j++];
73  byte_array.len > ParquetEncoder::column_type_.get_max_strlen()) {
75  i);
76  } else {
77  memcpy(encode_buffer_.data() + total_len, byte_array.ptr, byte_array.len);
78  total_len += byte_array.len;
79  }
80  } else if (is_error_tracking_enabled_ &&
82  .get_notnull()) { // item is null for NOT NULL column
84  i);
85  }
86  }
89  }
90  buffer_->append(encode_buffer_.data(), total_len);
91  }
92 
93  void appendDataTrackErrors(const int16_t* def_levels,
94  const int16_t* rep_levels,
95  const int64_t values_read,
96  const int64_t levels_read,
97  int8_t* values) override {
99  appendData(def_levels, rep_levels, values_read, levels_read, values);
100  }
101 
102  private:
104  if (!index_buffer_->size()) {
105  // write the initial starting offset
106  StringOffsetT zero = 0;
107  index_buffer_->append(reinterpret_cast<int8_t*>(&zero), sizeof(StringOffsetT));
108  }
109  }
110 
112  std::vector<int8_t> encode_buffer_;
113 };
114 
115 } // namespace foreign_storage
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
Data_Namespace::AbstractBuffer * index_buffer_
RejectedRowIndices invalid_indices_
int32_t StringOffsetT
Definition: sqltypes.h:1493
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
An AbstractBuffer is a unit of data management for a data manager.
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
ParquetStringNoneEncoder(Data_Namespace::AbstractBuffer *buffer, Data_Namespace::AbstractBuffer *index_buffer)
#define CHECK(condition)
Definition: Logger.h:291
HOST DEVICE size_t get_max_strlen() const
Definition: sqltypes.h:405
Data_Namespace::AbstractBuffer * buffer_
virtual void reserve(size_t num_bytes)=0