OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParquetGeospatialImportEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <parquet/schema.h>
20 #include <parquet/types.h>
21 #include "GeospatialEncoder.h"
22 #include "ParquetEncoder.h"
24 
25 namespace foreign_storage {
26 
28  public GeospatialEncoder,
29  public ParquetImportEncoder {
30  public:
32  : ParquetEncoder(nullptr)
35  , invalid_indices_(nullptr) {}
36 
37  ParquetGeospatialImportEncoder(std::list<Chunk_NS::Chunk>& chunks)
38  : ParquetEncoder(nullptr)
39  , GeospatialEncoder(chunks)
41  , invalid_indices_(nullptr)
42  , base_column_buffer_(nullptr)
43  , coords_column_buffer_(nullptr)
44  , bounds_column_buffer_(nullptr)
45  , ring_sizes_column_buffer_(nullptr)
46  , poly_rings_column_buffer_(nullptr)
47  , render_group_column_buffer_(nullptr) {
49 
50  const auto geo_column_type = geo_column_descriptor_->columnType.get_type();
51 
53  chunks.begin()->getBuffer());
55 
56  // initialize coords column
58  getBuffer(chunks, geo_column_type, COORDS));
60 
61  // initialize bounds column
62  if (hasBoundsColumn()) {
64  getBuffer(chunks, geo_column_type, BOUNDS));
66  }
67 
68  // initialize ring sizes column & render group column
69  if (hasRingSizesColumn()) {
71  getBuffer(chunks, geo_column_type, RING_SIZES));
73  }
74  if (hasRenderGroupColumn()) {
75  render_group_column_buffer_ = getBuffer(chunks, geo_column_type, RENDER_GROUP);
77  }
78 
79  // initialize poly rings column
80  if (hasPolyRingsColumn()) {
82  getBuffer(chunks, geo_column_type, POLY_RINGS));
84  }
85  }
86 
87  void validateAndAppendData(const int16_t* def_levels,
88  const int16_t* rep_levels,
89  const int64_t values_read,
90  const int64_t levels_read,
91  int8_t* values,
92  const SQLTypeInfo& column_type, /* may not be used */
93  InvalidRowGroupIndices& invalid_indices) override {
94  invalid_indices_ = &invalid_indices; // used in assembly algorithm
95  appendData(def_levels, rep_levels, values_read, levels_read, values);
96  }
97 
99  const InvalidRowGroupIndices& invalid_indices) override {
100  if (invalid_indices.empty()) {
101  return;
102  }
103  base_column_buffer_->eraseInvalidData(invalid_indices);
104  coords_column_buffer_->eraseInvalidData(invalid_indices);
105  if (hasBoundsColumn()) {
106  bounds_column_buffer_->eraseInvalidData(invalid_indices);
107  }
108  if (hasRingSizesColumn()) {
110  }
111  if (hasPolyRingsColumn()) {
113  }
114  if (hasRenderGroupColumn()) {
116  sizeof(int32_t) *
117  (render_group_column_buffer_->size() - invalid_indices.size()));
118  }
119  }
120 
121  void appendData(const int16_t* def_levels,
122  const int16_t* rep_levels,
123  const int64_t values_read,
124  const int64_t levels_read,
125  int8_t* values) override {
126  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
127 
129 
130  for (int64_t i = 0, j = 0; i < levels_read; ++i) {
132  if (def_levels[i] == 0) {
134  } else {
135  CHECK(j < values_read);
136  auto& byte_array = parquet_data_ptr[j++];
137  auto geo_string_view = std::string_view{
138  reinterpret_cast<const char*>(byte_array.ptr), byte_array.len};
139  try {
140  processGeoElement(geo_string_view);
141  } catch (const std::runtime_error& error) {
147  }
148  }
149  }
150 
152 
153  appendBaseAndRenderGroupData(levels_read);
154 
155  current_batch_offset_ += levels_read;
156  }
157 
158  private:
160  const std::vector<ArrayDatum>& datum_buffer) {
161  if (column_buffer) {
162  for (const auto& datum : datum_buffer) {
163  column_buffer->appendElement(datum);
164  }
165  } else {
166  CHECK(datum_buffer.empty());
167  }
168  }
169 
175  }
176 
177  void appendBaseAndRenderGroupData(const int64_t row_count) {
178  for (int64_t i = 0; i < row_count; ++i) {
180  }
182  render_group_values_.resize(row_count, 0);
183  auto data_ptr = reinterpret_cast<int8_t*>(render_group_values_.data());
184  render_group_column_buffer_->append(data_ptr, sizeof(int32_t) * row_count);
185  }
186  }
187 
188  AbstractBuffer* getBuffer(std::list<Chunk_NS::Chunk>& chunks,
189  const SQLTypes sql_type,
190  GeoColumnType geo_column_type) {
191  auto chunk = getIteratorForGeoColumnType(chunks, sql_type, geo_column_type);
192  auto buffer = chunk->getBuffer();
193  return buffer;
194  }
195 
204 };
205 
206 } // namespace foreign_storage
AbstractBuffer * getBuffer(std::list< Chunk_NS::Chunk > &chunks, const SQLTypes sql_type, GeoColumnType geo_column_type)
void eraseInvalidData(const FindContainer &invalid_indices)
void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices) override
SQLTypes
Definition: sqltypes.h:38
std::vector< ArrayDatum > coords_datum_buffer_
TypedParquetStorageBuffer< ArrayDatum > * coords_column_buffer_
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values) override
TypedParquetStorageBuffer< std::string > * base_column_buffer_
std::vector< ArrayDatum > bounds_datum_buffer_
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
void appendArrayDatumsIfApplicable(TypedParquetStorageBuffer< ArrayDatum > *column_buffer, const std::vector< ArrayDatum > &datum_buffer)
std::set< int64_t > InvalidRowGroupIndices
An AbstractBuffer is a unit of data management for a data manager.
void processGeoElement(std::string_view geo_string_view)
std::vector< ArrayDatum > ring_sizes_datum_buffer_
ParquetGeospatialImportEncoder(std::list< Chunk_NS::Chunk > &chunks)
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
#define CHECK(condition)
Definition: Logger.h:209
bool is_geometry() const
Definition: sqltypes.h:521
const ColumnDescriptor * geo_column_descriptor_
SQLTypeInfo columnType
TypedParquetStorageBuffer< ArrayDatum > * bounds_column_buffer_
std::vector< ArrayDatum > poly_rings_datum_buffer_
TypedParquetStorageBuffer< ArrayDatum > * poly_rings_column_buffer_
void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices) override
std::list< T >::iterator getIteratorForGeoColumnType(std::list< T > &list, const SQLTypes column_type, const GeoColumnType geo_column)
std::vector< int32_t > render_group_values_
TypedParquetStorageBuffer< ArrayDatum > * ring_sizes_column_buffer_