OmniSciDB  2e3a973ef4
ParquetGeospatialEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "DataMgr/Chunk/Chunk.h"
20 #include "Geospatial/Compression.h"
21 #include "Geospatial/Types.h"
22 
23 #include "ParquetEncoder.h"
24 
25 #include "ImportExport/Importer.h"
26 #include "LazyParquetChunkLoader.h"
27 
31 
32 #include <parquet/schema.h>
33 #include <parquet/types.h>
34 
35 namespace foreign_storage {
36 
37 template <typename T>
38 inline ArrayDatum encode_as_array_datum(const std::vector<T>& data) {
39  const size_t num_bytes = data.size() * sizeof(T);
40  std::shared_ptr<int8_t> buffer(new int8_t[num_bytes], std::default_delete<int8_t[]>());
41  memcpy(buffer.get(), data.data(), num_bytes);
42  return ArrayDatum(num_bytes, buffer, false);
43 }
44 
46  public:
47  ParquetGeospatialEncoder(const parquet::ColumnDescriptor* parquet_column_descriptor,
48  std::list<Chunk_NS::Chunk>& chunks,
49  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata)
50  : ParquetEncoder(nullptr)
51  , geo_column_descriptor_(chunks.begin()->getColumnDesc())
52  , base_column_encoder_(nullptr)
53  , coords_column_encoder_(nullptr)
54  , bounds_column_encoder_(nullptr)
58  , base_column_metadata_(nullptr)
59  , coords_column_metadata_(nullptr)
60  , bounds_column_metadata_(nullptr)
64  , render_group_values_(LazyParquetChunkLoader::batch_reader_num_elements, 0)
65  , base_values_(LazyParquetChunkLoader::batch_reader_num_elements)
66  , row_count_(0)
67  , parquet_column_name_(parquet_column_descriptor->name()) {
69 
70  validateChunksAndMetadataSizing(chunks, chunk_metadata);
71 
72  const auto geo_column_type = geo_column_descriptor_->columnType.get_type();
73  std::shared_ptr<ParquetScalarEncoder> null_scalar_encoder;
74 
75  // initialize base column encoder
76  auto base_chunk = chunks.begin();
77  base_chunk->initEncoder();
79  dynamic_cast<StringNoneEncoder*>(base_chunk->getBuffer()->getEncoder());
80  base_column_metadata_ = chunk_metadata.begin()->get();
82 
83  // initialize coords column
86  chunks, chunk_metadata, geo_column_type, COORDS);
87 
88  // initialize bounds column
89  if (geo_column_type == kLINESTRING || geo_column_type == kPOLYGON ||
90  geo_column_type == kMULTIPOLYGON) {
91  std::tie(
94  chunks, chunk_metadata, geo_column_type, BOUNDS);
95  }
96 
97  // initialize ring sizes column & render group column
98  if (geo_column_type == kPOLYGON || geo_column_type == kMULTIPOLYGON) {
103  chunks, chunk_metadata, geo_column_type, RING_SIZES);
108  chunks, chunk_metadata, geo_column_type, RENDER_GROUP);
109  }
110 
111  // initialize poly rings column
112  if (geo_column_type == kMULTIPOLYGON) {
117  chunks, chunk_metadata, geo_column_type, POLY_RINGS);
118  }
119  }
120 
121  void appendData(const int16_t* def_levels,
122  const int16_t* rep_levels,
123  const int64_t values_read,
124  const int64_t levels_read,
125  const bool is_last_batch,
126  int8_t* values) override {
127  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
128 
130 
131  for (int64_t i = 0, j = 0; i < levels_read; ++i, ++row_count_) {
133  if (def_levels[i] == 0) {
135  } else {
136  CHECK(j < values_read);
137  auto& byte_array = parquet_data_ptr[j++];
138  auto geo_string_view = std::string_view{
139  reinterpret_cast<const char*>(byte_array.ptr), byte_array.len};
140  processGeoElement(geo_string_view);
141  }
142  }
143 
145 
147  }
148 
149  private:
150  void appendBaseAndRenderGroupDataAndUpdateMetadata(const int64_t levels_read) {
151  // add nulls to base column & zeros to render group (if applicable)
153  *base_column_encoder_->appendData(&base_values_, 0, levels_read);
155  auto data_ptr = reinterpret_cast<int8_t*>(render_group_values_.data());
157  data_ptr, levels_read, render_group_column_descriptor_->columnType);
158  }
159  }
160 
162  std::list<Chunk_NS::Chunk>& chunks,
163  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) const {
164  const auto geo_column_type = geo_column_descriptor_->columnType.get_type();
165  if (geo_column_type == kPOINT) {
166  CHECK(chunk_metadata.size() == 2);
167  CHECK(chunks.size() == 2);
168  } else if (geo_column_type == kLINESTRING) {
169  CHECK(chunk_metadata.size() == 3);
170  CHECK(chunks.size() == 3);
171  } else if (geo_column_type == kPOLYGON) {
172  CHECK(chunk_metadata.size() == 5);
173  CHECK(chunks.size() == 5);
174  } else if (geo_column_type == kMULTIPOLYGON) {
175  CHECK(chunk_metadata.size() == 6);
176  CHECK(chunks.size() == 6);
177  }
178  }
179 
191  }
192 
194  const std::vector<ArrayDatum>& datum_parse_buffer,
195  Encoder* encoder,
196  ChunkMetadata* chunk_metadata) const {
197  if (!encoder) {
198  CHECK(!chunk_metadata);
199  return;
200  }
201  if (auto fixed_len_array_encoder =
202  dynamic_cast<FixedLengthArrayNoneEncoder*>(encoder)) {
203  auto new_chunk_metadata = fixed_len_array_encoder->appendData(
204  &datum_parse_buffer, 0, datum_parse_buffer.size());
205  *chunk_metadata = *new_chunk_metadata;
206  } else if (auto array_encoder = dynamic_cast<ArrayNoneEncoder*>(encoder)) {
207  auto new_chunk_metadata = array_encoder->appendData(
208  &datum_parse_buffer, 0, datum_parse_buffer.size(), false);
209  *chunk_metadata = *new_chunk_metadata;
210  } else {
211  UNREACHABLE();
212  }
213  }
214 
215  void processGeoElement(std::string_view geo_string_view) {
217  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string_view),
218  import_ti,
224  std::string error_message =
225  "Failed to extract valid geometry from row " + std::to_string(row_count_) +
226  " for OmniSci column " + geo_column_descriptor_->columnName +
227  " importing from parquet column " + parquet_column_name_;
228  throw std::runtime_error(error_message);
229  }
230 
231  // validate types
232  if (geo_column_descriptor_->columnType.get_type() != import_ti.get_type()) {
234  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
236  throw std::runtime_error("Imported geometry from parquet column " +
238  " doesn't match the geospatial type of OmniSci column " +
240  }
241  }
242 
243  // append coords
244  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(
246  coords_datum_buffer_.emplace_back(encode_as_array_datum(compressed_coords));
247 
248  // append bounds
251  }
252 
253  // append ring sizes
255  ring_sizes_datum_buffer_.emplace_back(
257  }
258 
259  // append poly rings
261  poly_rings_datum_buffer_.emplace_back(
263  }
264  }
265 
274  // POINT columns are represented using fixed length arrays and need
275  // special treatment of nulls
277  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(
279  coords_datum_buffer_.emplace_back(encode_as_array_datum(compressed_coords));
280  } else {
283  }
287  }
289  ring_sizes_datum_buffer_.emplace_back(
292  }
294  poly_rings_datum_buffer_.emplace_back(
297  }
298  }
299 
301  coords_parse_buffer_.clear();
302  bounds_parse_buffer_.clear();
303  ring_sizes_parse_buffer_.clear();
304  poly_rings_parse_buffer_.clear();
305  }
306 
308  coords_datum_buffer_.clear();
309  bounds_datum_buffer_.clear();
310  ring_sizes_datum_buffer_.clear();
311  poly_rings_datum_buffer_.clear();
312  }
313 
315 
316  template <typename T>
317  typename std::list<T>::iterator getIteratorForGeoColumnType(
318  std::list<T>& list,
319  const SQLTypes column_type,
320  const GeoColumnType geo_column) {
321  auto list_iter = list.begin();
322  list_iter++; // skip base column
323  switch (column_type) {
324  case kPOINT: {
325  if (geo_column == COORDS) {
326  return list_iter;
327  }
328  UNREACHABLE();
329  }
330  case kLINESTRING: {
331  if (geo_column == COORDS) {
332  return list_iter;
333  }
334  list_iter++;
335  if (geo_column == BOUNDS) {
336  return list_iter;
337  }
338  UNREACHABLE();
339  }
340  case kPOLYGON: {
341  if (geo_column == COORDS) {
342  return list_iter;
343  }
344  list_iter++;
345  if (geo_column == RING_SIZES) {
346  return list_iter;
347  }
348  list_iter++;
349  if (geo_column == BOUNDS) {
350  return list_iter;
351  }
352  list_iter++;
353  if (geo_column == RENDER_GROUP) {
354  return list_iter;
355  }
356  UNREACHABLE();
357  }
358  case kMULTIPOLYGON: {
359  if (geo_column == COORDS) {
360  return list_iter;
361  }
362  list_iter++;
363  if (geo_column == RING_SIZES) {
364  return list_iter;
365  }
366  list_iter++;
367  if (geo_column == POLY_RINGS) {
368  return list_iter;
369  }
370  list_iter++;
371  if (geo_column == BOUNDS) {
372  return list_iter;
373  }
374  list_iter++;
375  if (geo_column == RENDER_GROUP) {
376  return list_iter;
377  }
378  UNREACHABLE();
379  }
380  default:
381  UNREACHABLE();
382  }
383  return {};
384  }
385 
386  std::tuple<Encoder*, ChunkMetadata*, const ColumnDescriptor*>
388  std::list<Chunk_NS::Chunk>& chunks,
389  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
390  const SQLTypes sql_type,
391  GeoColumnType geo_column_type) {
392  auto chunk = getIteratorForGeoColumnType(chunks, sql_type, geo_column_type);
393  chunk->initEncoder();
394  auto encoder = chunk->getBuffer()->getEncoder();
395  auto metadata =
396  getIteratorForGeoColumnType(chunk_metadata, sql_type, geo_column_type)->get();
397  auto column_descriptor = chunk->getColumnDesc();
398  return {encoder, metadata, column_descriptor};
399  }
400 
402 
403  constexpr static bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
404 
411 
418 
424 
425  std::vector<int32_t> render_group_values_;
426  std::vector<std::string> base_values_;
427  size_t row_count_;
428  std::string parquet_column_name_;
429 
430  // Used repeatedly in parsing geo types, declared as members to prevent
431  // deallocation/reallocation costs
432  std::vector<double> coords_parse_buffer_;
433  std::vector<double> bounds_parse_buffer_;
434  std::vector<int> ring_sizes_parse_buffer_;
435  std::vector<int> poly_rings_parse_buffer_;
436 
437  // Used to buffer array appends in memory for a batch
438  std::vector<ArrayDatum> coords_datum_buffer_;
439  std::vector<ArrayDatum> bounds_datum_buffer_;
440  std::vector<ArrayDatum> ring_sizes_datum_buffer_;
441  std::vector<ArrayDatum> poly_rings_datum_buffer_;
442 };
443 
444 } // namespace foreign_storage
SQLTypes
Definition: sqltypes.h:40
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
void appendBaseAndRenderGroupDataAndUpdateMetadata(const int64_t levels_read)
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:395
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
#define UNREACHABLE()
Definition: Logger.h:241
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:908
std::string to_string(char const *&&v)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:131
std::tuple< Encoder *, ChunkMetadata *, const ColumnDescriptor * > initEncoderAndGetEncoderAndMetadata(std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const SQLTypes sql_type, GeoColumnType geo_column_type)
specifies the content in-memory of a row in the column metadata table
void processGeoElement(std::string_view geo_string_view)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:701
bool is_geometry() const
Definition: sqltypes.h:429
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
unencoded fixed length array encoder
void validateChunksAndMetadataSizing(std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata) const
ArrayDatum encode_as_array_datum(const std::vector< T > &data)
#define CHECK(condition)
Definition: Logger.h:197
void appendToArrayEncoderAndUpdateMetadata(const std::vector< ArrayDatum > &datum_parse_buffer, Encoder *encoder, ChunkMetadata *chunk_metadata) const
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
For unencoded strings.
SQLTypeInfo columnType
unencoded array encoder
std::string columnName
ParquetGeospatialEncoder(const parquet::ColumnDescriptor *parquet_column_descriptor, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
std::list< T >::iterator getIteratorForGeoColumnType(std::list< T > &list, const SQLTypes column_type, const GeoColumnType geo_column)
virtual std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1)=0