OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ParquetGeospatialEncoder.h
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "DataMgr/Chunk/Chunk.h"
20 #include "Geospatial/Compression.h"
21 #include "Geospatial/Types.h"
22 
23 #include "ParquetEncoder.h"
24 
25 #include "ImportExport/Importer.h"
26 #include "LazyParquetChunkLoader.h"
27 
32 
33 #include <parquet/schema.h>
34 #include <parquet/types.h>
35 
36 namespace foreign_storage {
37 
38 template <typename T>
39 inline ArrayDatum encode_as_array_datum(const std::vector<T>& data) {
40  const size_t num_bytes = data.size() * sizeof(T);
41  std::shared_ptr<int8_t> buffer(new int8_t[num_bytes], std::default_delete<int8_t[]>());
42  memcpy(buffer.get(), data.data(), num_bytes);
43  return ArrayDatum(num_bytes, buffer, false);
44 }
45 
47  public:
49 
50  ParquetGeospatialEncoder(const parquet::ColumnDescriptor* parquet_column_descriptor,
51  std::list<Chunk_NS::Chunk>& chunks,
52  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata)
53  : ParquetEncoder(nullptr)
54  , geo_column_descriptor_(chunks.begin()->getColumnDesc())
55  , base_column_encoder_(nullptr)
56  , coords_column_encoder_(nullptr)
57  , bounds_column_encoder_(nullptr)
61  , base_column_metadata_(nullptr)
62  , coords_column_metadata_(nullptr)
63  , bounds_column_metadata_(nullptr)
67  , render_group_values_(LazyParquetChunkLoader::batch_reader_num_elements, 0)
68  , base_values_(LazyParquetChunkLoader::batch_reader_num_elements)
69  , row_count_(0)
70  , parquet_column_name_(parquet_column_descriptor->name()) {
72 
73  validateChunksAndMetadataSizing(chunks, chunk_metadata);
74 
75  const auto geo_column_type = geo_column_descriptor_->columnType.get_type();
76  std::shared_ptr<ParquetScalarEncoder> null_scalar_encoder;
77 
78  // initialize base column encoder
79  auto base_chunk = chunks.begin();
80  base_chunk->initEncoder();
82  dynamic_cast<StringNoneEncoder*>(base_chunk->getBuffer()->getEncoder());
83  base_column_metadata_ = chunk_metadata.begin()->get();
85 
86  // initialize coords column
89  chunks, chunk_metadata, geo_column_type, COORDS);
90 
91  // initialize bounds column
92  if (geo_column_type == kLINESTRING || geo_column_type == kPOLYGON ||
93  geo_column_type == kMULTIPOLYGON) {
94  std::tie(
97  chunks, chunk_metadata, geo_column_type, BOUNDS);
98  }
99 
100  // initialize ring sizes column & render group column
101  if (geo_column_type == kPOLYGON || geo_column_type == kMULTIPOLYGON) {
106  chunks, chunk_metadata, geo_column_type, RING_SIZES);
111  chunks, chunk_metadata, geo_column_type, RENDER_GROUP);
112  }
113 
114  // initialize poly rings column
115  if (geo_column_type == kMULTIPOLYGON) {
120  chunks, chunk_metadata, geo_column_type, POLY_RINGS);
121  }
122  }
123 
124  void appendData(const int16_t* def_levels,
125  const int16_t* rep_levels,
126  const int64_t values_read,
127  const int64_t levels_read,
128  const bool is_last_batch,
129  int8_t* values) override {
130  auto parquet_data_ptr = reinterpret_cast<const parquet::ByteArray*>(values);
131 
133 
134  for (int64_t i = 0, j = 0; i < levels_read; ++i, ++row_count_) {
136  if (def_levels[i] == 0) {
138  } else {
139  CHECK(j < values_read);
140  auto& byte_array = parquet_data_ptr[j++];
141  auto geo_string_view = std::string_view{
142  reinterpret_cast<const char*>(byte_array.ptr), byte_array.len};
143  processGeoElement(geo_string_view);
144  }
145  }
146 
148 
150  }
151 
152  private:
153  void appendBaseAndRenderGroupDataAndUpdateMetadata(const int64_t levels_read) {
154  // add nulls to base column & zeros to render group (if applicable)
156  *base_column_encoder_->appendData(&base_values_, 0, levels_read);
158  auto data_ptr = reinterpret_cast<int8_t*>(render_group_values_.data());
160  data_ptr, levels_read, render_group_column_descriptor_->columnType);
161  }
162  }
163 
165  std::list<Chunk_NS::Chunk>& chunks,
166  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) const {
167  const auto geo_column_type = geo_column_descriptor_->columnType.get_type();
168  if (geo_column_type == kPOINT) {
169  CHECK(chunk_metadata.size() == 2);
170  CHECK(chunks.size() == 2);
171  } else if (geo_column_type == kLINESTRING) {
172  CHECK(chunk_metadata.size() == 3);
173  CHECK(chunks.size() == 3);
174  } else if (geo_column_type == kPOLYGON) {
175  CHECK(chunk_metadata.size() == 5);
176  CHECK(chunks.size() == 5);
177  } else if (geo_column_type == kMULTIPOLYGON) {
178  CHECK(chunk_metadata.size() == 6);
179  CHECK(chunks.size() == 6);
180  }
181  }
182 
194  }
195 
197  const std::vector<ArrayDatum>& datum_parse_buffer,
198  Encoder* encoder,
199  ChunkMetadata* chunk_metadata) const {
200  if (!encoder) {
201  CHECK(!chunk_metadata);
202  return;
203  }
204  if (auto fixed_len_array_encoder =
205  dynamic_cast<FixedLengthArrayNoneEncoder*>(encoder)) {
206  auto new_chunk_metadata = fixed_len_array_encoder->appendData(
207  &datum_parse_buffer, 0, datum_parse_buffer.size());
208  *chunk_metadata = *new_chunk_metadata;
209  } else if (auto array_encoder = dynamic_cast<ArrayNoneEncoder*>(encoder)) {
210  auto new_chunk_metadata = array_encoder->appendData(
211  &datum_parse_buffer, 0, datum_parse_buffer.size(), false);
212  *chunk_metadata = *new_chunk_metadata;
213  } else {
214  UNREACHABLE();
215  }
216  }
217 
218  void processGeoElement(std::string_view geo_string_view) {
220  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string_view),
221  import_ti,
228  }
229 
230  // validate types
231  if (geo_column_descriptor_->columnType.get_type() != import_ti.get_type()) {
233  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
236  }
237  }
238 
239  // append coords
240  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(
242  coords_datum_buffer_.emplace_back(encode_as_array_datum(compressed_coords));
243 
244  // append bounds
247  }
248 
249  // append ring sizes
251  ring_sizes_datum_buffer_.emplace_back(
253  }
254 
255  // append poly rings
257  poly_rings_datum_buffer_.emplace_back(
259  }
260  }
261 
270  // POINT columns are represented using fixed length arrays and need
271  // special treatment of nulls
273  std::vector<uint8_t> compressed_coords = Geospatial::compress_coords(
275  coords_datum_buffer_.emplace_back(encode_as_array_datum(compressed_coords));
276  } else {
279  }
283  }
285  ring_sizes_datum_buffer_.emplace_back(
288  }
290  poly_rings_datum_buffer_.emplace_back(
293  }
294  }
295 
297  coords_parse_buffer_.clear();
298  bounds_parse_buffer_.clear();
299  ring_sizes_parse_buffer_.clear();
300  poly_rings_parse_buffer_.clear();
301  }
302 
304  coords_datum_buffer_.clear();
305  bounds_datum_buffer_.clear();
306  ring_sizes_datum_buffer_.clear();
307  poly_rings_datum_buffer_.clear();
308  }
309 
311 
312  template <typename T>
313  typename std::list<T>::iterator getIteratorForGeoColumnType(
314  std::list<T>& list,
315  const SQLTypes column_type,
316  const GeoColumnType geo_column) {
317  auto list_iter = list.begin();
318  list_iter++; // skip base column
319  switch (column_type) {
320  case kPOINT: {
321  if (geo_column == COORDS) {
322  return list_iter;
323  }
324  UNREACHABLE();
325  }
326  case kLINESTRING: {
327  if (geo_column == COORDS) {
328  return list_iter;
329  }
330  list_iter++;
331  if (geo_column == BOUNDS) {
332  return list_iter;
333  }
334  UNREACHABLE();
335  }
336  case kPOLYGON: {
337  if (geo_column == COORDS) {
338  return list_iter;
339  }
340  list_iter++;
341  if (geo_column == RING_SIZES) {
342  return list_iter;
343  }
344  list_iter++;
345  if (geo_column == BOUNDS) {
346  return list_iter;
347  }
348  list_iter++;
349  if (geo_column == RENDER_GROUP) {
350  return list_iter;
351  }
352  UNREACHABLE();
353  }
354  case kMULTIPOLYGON: {
355  if (geo_column == COORDS) {
356  return list_iter;
357  }
358  list_iter++;
359  if (geo_column == RING_SIZES) {
360  return list_iter;
361  }
362  list_iter++;
363  if (geo_column == POLY_RINGS) {
364  return list_iter;
365  }
366  list_iter++;
367  if (geo_column == BOUNDS) {
368  return list_iter;
369  }
370  list_iter++;
371  if (geo_column == RENDER_GROUP) {
372  return list_iter;
373  }
374  UNREACHABLE();
375  }
376  default:
377  UNREACHABLE();
378  }
379  return {};
380  }
381 
382  std::tuple<Encoder*, ChunkMetadata*, const ColumnDescriptor*>
384  std::list<Chunk_NS::Chunk>& chunks,
385  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
386  const SQLTypes sql_type,
387  GeoColumnType geo_column_type) {
388  auto chunk = getIteratorForGeoColumnType(chunks, sql_type, geo_column_type);
389  chunk->initEncoder();
390  auto encoder = chunk->getBuffer()->getEncoder();
391  auto metadata =
392  getIteratorForGeoColumnType(chunk_metadata, sql_type, geo_column_type)->get();
393  auto column_descriptor = chunk->getColumnDesc();
394  return {encoder, metadata, column_descriptor};
395  }
396 
397  static void throwMalformedGeoElement(const size_t row_count,
398  const std::string& omnisci_column_name) {
399  std::string error_message = "Failed to extract valid geometry in row " +
400  std::to_string(row_count) + " of OmniSci column '" +
401  omnisci_column_name + "'.";
402  throw foreign_storage::ForeignStorageException(error_message);
403  }
404 
405  static void throwMismatchedGeoElement(const std::string& omnisci_column_name) {
407  "Imported geometry"
408  " doesn't match the geospatial type of OmniSci column '" +
409  omnisci_column_name + "'.");
410  }
411 
413 
414  constexpr static bool PROMOTE_POLYGON_TO_MULTIPOLYGON = true;
415 
422 
429 
435 
436  std::vector<int32_t> render_group_values_;
437  std::vector<std::string> base_values_;
438  size_t row_count_;
439  std::string parquet_column_name_;
440 
441  // Used repeatedly in parsing geo types, declared as members to prevent
442  // deallocation/reallocation costs
443  std::vector<double> coords_parse_buffer_;
444  std::vector<double> bounds_parse_buffer_;
445  std::vector<int> ring_sizes_parse_buffer_;
446  std::vector<int> poly_rings_parse_buffer_;
447 
448  // Used to buffer array appends in memory for a batch
449  std::vector<ArrayDatum> coords_datum_buffer_;
450  std::vector<ArrayDatum> bounds_datum_buffer_;
451  std::vector<ArrayDatum> ring_sizes_datum_buffer_;
452  std::vector<ArrayDatum> poly_rings_datum_buffer_;
453 };
454 
455 } // namespace foreign_storage
static void throwMismatchedGeoElement(const std::string &omnisci_column_name)
SQLTypes
Definition: sqltypes.h:37
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
Definition: Compression.cpp:52
void appendBaseAndRenderGroupDataAndUpdateMetadata(const int64_t levels_read)
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
Definition: Importer.cpp:399
void validateChunksAndMetadataSizing(std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata) const
void appendToArrayEncoderAndUpdateMetadata(const std::vector< ArrayDatum > &datum_parse_buffer, Encoder *encoder, ChunkMetadata *chunk_metadata) const
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
#define UNREACHABLE()
Definition: Logger.h:241
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1114
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
std::string to_string(char const *&&v)
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
Definition: sqltypes.h:199
std::tuple< Encoder *, ChunkMetadata *, const ColumnDescriptor * > initEncoderAndGetEncoderAndMetadata(std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const SQLTypes sql_type, GeoColumnType geo_column_type)
static void throwMalformedGeoElement(const size_t row_count, const std::string &omnisci_column_name)
specifies the content in-memory of a row in the column metadata table
void processGeoElement(std::string_view geo_string_view)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:907
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
unencoded fixed length array encoder
ArrayDatum encode_as_array_datum(const std::vector< T > &data)
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:490
For unencoded strings.
SQLTypeInfo columnType
string name
Definition: setup.py:35
unencoded array encoder
std::string columnName
ParquetGeospatialEncoder(const parquet::ColumnDescriptor *parquet_column_descriptor, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
std::list< T >::iterator getIteratorForGeoColumnType(std::list< T > &list, const SQLTypes column_type, const GeoColumnType geo_column)
virtual std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1)=0