OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetGeospatialImportEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/schema.h>
20 #include <parquet/types.h>
21 #include "GeospatialEncoder.h"
22 #include "ParquetEncoder.h"
24 
25 namespace foreign_storage {
26 
28  public GeospatialEncoder,
29  public ParquetImportEncoder {
30  public:
32  : ParquetEncoder(nullptr)
33  , GeospatialEncoder(nullptr)
35  , invalid_indices_(nullptr) {}
36 
37  ParquetGeospatialImportEncoder(std::list<Chunk_NS::Chunk>& chunks)
38  : ParquetEncoder(nullptr)
39  , GeospatialEncoder(chunks, nullptr)
41  , invalid_indices_(nullptr)
42  , base_column_buffer_(nullptr)
43  , coords_column_buffer_(nullptr)
44  , bounds_column_buffer_(nullptr)
46  , poly_rings_column_buffer_(nullptr)
47  , render_group_column_buffer_(nullptr) {
49 
50  const auto geo_column_type = geo_column_descriptor_->columnType.get_type();
51 
53  chunks.begin()->getBuffer());
55 
56  // initialize coords column
58  getBuffer(chunks, geo_column_type, COORDS));
60 
61  // initialize bounds column
62  if (hasBoundsColumn()) {
64  getBuffer(chunks, geo_column_type, BOUNDS));
66  }
67 
68  // initialize ring sizes column & render group column
72  getBuffer(chunks, geo_column_type, RING_OR_LINE_SIZES));
74  }
75  if (hasRenderGroupColumn()) {
76  render_group_column_buffer_ = getBuffer(chunks, geo_column_type, RENDER_GROUP);
78  }
79 
80  // initialize poly rings column
81  if (hasPolyRingsColumn()) {
83  getBuffer(chunks, geo_column_type, POLY_RINGS));
85  }
86  }
87 
88  void validateAndAppendData(const int16_t* def_levels,
89  const int16_t* rep_levels,
90  const int64_t values_read,
91  const int64_t levels_read,
92  int8_t* values,
93  const SQLTypeInfo& column_type, /* may not be used */
94  InvalidRowGroupIndices& invalid_indices) override {
95  invalid_indices_ = &invalid_indices; // used in assembly algorithm
96  appendData(def_levels, rep_levels, values_read, levels_read, values);
97  }
98 
100  const InvalidRowGroupIndices& invalid_indices) override {
101  if (invalid_indices.empty()) {
102  return;
103  }
104  base_column_buffer_->eraseInvalidData(invalid_indices);
105  coords_column_buffer_->eraseInvalidData(invalid_indices);
106  if (hasBoundsColumn()) {
107  bounds_column_buffer_->eraseInvalidData(invalid_indices);
108  }
109  if (hasRingOrLineSizesColumn()) {
111  }
112  if (hasPolyRingsColumn()) {
114  }
115  if (hasRenderGroupColumn()) {
117  sizeof(int32_t) *
118  (render_group_column_buffer_->size() - invalid_indices.size()));
119  }
120  }
121 
122  void appendData(const int16_t* def_levels,
123  const int16_t* rep_levels,
124  const int64_t values_read,
125  const int64_t levels_read,
126  int8_t* values) override {
127  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
128 
130 
131  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
133  if (def_levels[i] == 0) {
135  } else {
136  CHECK(j < values_read);
137  auto& byte_array = parquet_data_ptr[j++];
138  auto geo_string_view = std::string_view{
139  reinterpret_cast<const char*>(byte_array.ptr), byte_array.len};
140  try {
141  processGeoElement(geo_string_view);
142  } catch (const std::runtime_error& error) {
148  }
149  }
150  }
151 
153 
154  appendBaseAndRenderGroupData(levels_read);
155 
156  current_batch_offset_ += levels_read;
157  }
158 
159  void appendDataTrackErrors(const int16_t* def_levels,
160  const int16_t* rep_levels,
161  const int64_t values_read,
162  const int64_t levels_read,
163  int8_t* values) override {
164  UNREACHABLE() << "unexpected call to appendDataTrackErrors from unsupported encoder";
165  }
166 
167  private:
169  const std::vector<ArrayDatum>& datum_buffer) {
170  if (column_buffer) {
171  for (const auto& datum : datum_buffer) {
172  column_buffer->appendElement(datum);
173  }
174  } else {
175  CHECK(datum_buffer.empty());
176  }
177  }
178 
185  }
186 
187  void appendBaseAndRenderGroupData(const int64_t row_count) {
188  for (int64_t i = 0; i < row_count; ++i) {
190  }
192  auto data_ptr = reinterpret_cast<int8_t*>(render_group_value_buffer_.data());
193  render_group_column_buffer_->append(data_ptr, sizeof(int32_t) * row_count);
194  }
195  }
196 
197  AbstractBuffer* getBuffer(std::list<Chunk_NS::Chunk>& chunks,
198  const SQLTypes sql_type,
199  GeoColumnType geo_column_type) {
200  auto chunk = getIteratorForGeoColumnType(chunks, sql_type, geo_column_type);
201  auto buffer = chunk->getBuffer();
202  return buffer;
203  }
204 
213 };
214 
215 } // namespace foreign_storage
AbstractBuffer * getBuffer(std::list< Chunk_NS::Chunk > &chunks, const SQLTypes sql_type, GeoColumnType geo_column_type)
void eraseInvalidData(const FindContainer &invalid_indices)
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
SQLTypes
Definition: sqltypes.h:55
std::vector< ArrayDatum > coords_datum_buffer_
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
std::vector< ArrayDatum > ring_or_line_sizes_datum_buffer_
TypedParquetStorageBuffer< ArrayDatum > * coords_column_buffer_
#define UNREACHABLE()
Definition: Logger.h:337
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
TypedParquetStorageBuffer< std::string > * base_column_buffer_
std::vector< ArrayDatum > bounds_datum_buffer_
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
void appendArrayDatumsIfApplicable(TypedParquetStorageBuffer< ArrayDatum > *column_buffer, const std::vector< ArrayDatum > &datum_buffer)
std::vector< int32_t > render_group_value_buffer_
std::set< int64_t > InvalidRowGroupIndices
An AbstractBuffer is a unit of data management for a data manager.
void processGeoElement(std::string_view geo_string_view)
ParquetGeospatialImportEncoder(std::list< Chunk_NS::Chunk > &chunks)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:592
TypedParquetStorageBuffer< ArrayDatum > * ring_or_line_sizes_column_buffer_
const ColumnDescriptor * geo_column_descriptor_
SQLTypeInfo columnType
TypedParquetStorageBuffer< ArrayDatum > * bounds_column_buffer_
std::vector< ArrayDatum > poly_rings_datum_buffer_
TypedParquetStorageBuffer< ArrayDatum > * poly_rings_column_buffer_
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
std::list< T >::iterator getIteratorForGeoColumnType(std::list< T > &list, const SQLTypes column_type, const GeoColumnType geo_column)