OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetGeospatialImportEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/schema.h>
20 #include <parquet/types.h>
21 #include "GeospatialEncoder.h"
22 #include "ParquetEncoder.h"
24 
25 namespace foreign_storage {
26 
28  public GeospatialEncoder,
29  public ParquetImportEncoder {
30  public:
31  ParquetGeospatialImportEncoder(const bool geo_validate_geometry)
32  : ParquetEncoder(nullptr)
33  , GeospatialEncoder(geo_validate_geometry)
35  , invalid_indices_(nullptr) {}
36 
37  ParquetGeospatialImportEncoder(std::list<Chunk_NS::Chunk>& chunks,
38  const bool geo_validate_geometry)
39  : ParquetEncoder(nullptr)
40  , GeospatialEncoder(chunks, geo_validate_geometry)
42  , invalid_indices_(nullptr)
43  , base_column_buffer_(nullptr)
44  , coords_column_buffer_(nullptr)
45  , bounds_column_buffer_(nullptr)
47  , poly_rings_column_buffer_(nullptr) {
49 
50  const auto geo_column_type = geo_column_descriptor_->columnType.get_type();
51 
53  chunks.begin()->getBuffer());
55 
56  // initialize coords column
58  getBuffer(chunks, geo_column_type, COORDS));
60 
61  // initialize bounds column
62  if (hasBoundsColumn()) {
64  getBuffer(chunks, geo_column_type, BOUNDS));
66  }
67 
68  // initialize ring sizes column & render group column
72  getBuffer(chunks, geo_column_type, RING_OR_LINE_SIZES));
74  }
75 
76  // initialize poly rings column
77  if (hasPolyRingsColumn()) {
79  getBuffer(chunks, geo_column_type, POLY_RINGS));
81  }
82  }
83 
84  void validateAndAppendData(const int16_t* def_levels,
85  const int16_t* rep_levels,
86  const int64_t values_read,
87  const int64_t levels_read,
88  int8_t* values,
89  const SQLTypeInfo& column_type, /* may not be used */
90  InvalidRowGroupIndices& invalid_indices) override {
91  invalid_indices_ = &invalid_indices; // used in assembly algorithm
92  appendData(def_levels, rep_levels, values_read, levels_read, values);
93  }
94 
96  const InvalidRowGroupIndices& invalid_indices) override {
97  if (invalid_indices.empty()) {
98  return;
99  }
100  base_column_buffer_->eraseInvalidData(invalid_indices);
101  coords_column_buffer_->eraseInvalidData(invalid_indices);
102  if (hasBoundsColumn()) {
103  bounds_column_buffer_->eraseInvalidData(invalid_indices);
104  }
105  if (hasRingOrLineSizesColumn()) {
107  }
108  if (hasPolyRingsColumn()) {
110  }
111  }
112 
113  void appendData(const int16_t* def_levels,
114  const int16_t* rep_levels,
115  const int64_t values_read,
116  const int64_t levels_read,
117  int8_t* values) override {
118  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
119 
121 
122  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
124  if (def_levels[i] == 0) {
126  } else {
127  CHECK(j < values_read);
128  auto& byte_array = parquet_data_ptr[j++];
129  auto geo_string_view = std::string_view{
130  reinterpret_cast<const char*>(byte_array.ptr), byte_array.len};
131  try {
132  processGeoElement(geo_string_view);
133  } catch (const std::runtime_error& error) {
139  }
140  }
141  }
142 
144 
145  appendBaseData(levels_read);
146 
147  current_batch_offset_ += levels_read;
148  }
149 
150  void appendDataTrackErrors(const int16_t* def_levels,
151  const int16_t* rep_levels,
152  const int64_t values_read,
153  const int64_t levels_read,
154  int8_t* values) override {
155  UNREACHABLE() << "unexpected call to appendDataTrackErrors from unsupported encoder";
156  }
157 
158  private:
160  const std::vector<ArrayDatum>& datum_buffer) {
161  if (column_buffer) {
162  for (const auto& datum : datum_buffer) {
163  column_buffer->appendElement(datum);
164  }
165  } else {
166  CHECK(datum_buffer.empty());
167  }
168  }
169 
176  }
177 
178  void appendBaseData(const int64_t row_count) {
179  for (int64_t i = 0; i < row_count; ++i) {
181  }
182  }
183 
184  AbstractBuffer* getBuffer(std::list<Chunk_NS::Chunk>& chunks,
185  const SQLTypes sql_type,
186  GeoColumnType geo_column_type) {
187  auto chunk = getIteratorForGeoColumnType(chunks, sql_type, geo_column_type);
188  auto buffer = chunk->getBuffer();
189  return buffer;
190  }
191 
199 };
200 
201 } // namespace foreign_storage
AbstractBuffer * getBuffer(std::list< Chunk_NS::Chunk > &chunks, const SQLTypes sql_type, GeoColumnType geo_column_type)
void eraseInvalidData(const FindContainer &invalid_indices)
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
SQLTypes
Definition: sqltypes.h:65
std::vector< ArrayDatum > coords_datum_buffer_
void appendDataTrackErrors(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
std::vector< ArrayDatum > ring_or_line_sizes_datum_buffer_
TypedParquetStorageBuffer< ArrayDatum > * coords_column_buffer_
#define UNREACHABLE()
Definition: Logger.h:338
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
TypedParquetStorageBuffer< std::string > * base_column_buffer_
std::vector< ArrayDatum > bounds_datum_buffer_
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
ParquetGeospatialImportEncoder(std::list< Chunk_NS::Chunk > &chunks, const bool geo_validate_geometry)
void appendArrayDatumsIfApplicable(TypedParquetStorageBuffer< ArrayDatum > *column_buffer, const std::vector< ArrayDatum > &datum_buffer)
std::set< int64_t > InvalidRowGroupIndices
An AbstractBuffer is a unit of data management for a data manager.
void processGeoElement(std::string_view geo_string_view)
ParquetGeospatialImportEncoder(const bool geo_validate_geometry)
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:595
TypedParquetStorageBuffer< ArrayDatum > * ring_or_line_sizes_column_buffer_
const ColumnDescriptor * geo_column_descriptor_
SQLTypeInfo columnType
TypedParquetStorageBuffer< ArrayDatum > * bounds_column_buffer_
std::vector< ArrayDatum > poly_rings_datum_buffer_
TypedParquetStorageBuffer< ArrayDatum > * poly_rings_column_buffer_
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
std::list< T >::iterator getIteratorForGeoColumnType(std::list< T > &list, const SQLTypes column_type, const GeoColumnType geo_column)