33 #include <parquet/schema.h>
34 #include <parquet/types.h>
36 namespace foreign_storage {
40 const size_t num_bytes = data.size() *
sizeof(
T);
41 std::shared_ptr<int8_t> buffer(
new int8_t[num_bytes], std::default_delete<int8_t[]>());
42 memcpy(buffer.get(), data.data(), num_bytes);
51 std::list<Chunk_NS::Chunk>& chunks,
52 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata)
76 std::shared_ptr<ParquetScalarEncoder> null_scalar_encoder;
79 auto base_chunk = chunks.begin();
80 base_chunk->initEncoder();
89 chunks, chunk_metadata, geo_column_type,
COORDS);
97 chunks, chunk_metadata, geo_column_type,
BOUNDS);
106 chunks, chunk_metadata, geo_column_type,
RING_SIZES);
120 chunks, chunk_metadata, geo_column_type,
POLY_RINGS);
125 const int16_t* rep_levels,
126 const int64_t values_read,
127 const int64_t levels_read,
128 const bool is_last_batch,
129 int8_t* values)
override {
130 auto parquet_data_ptr =
reinterpret_cast<const parquet::ByteArray*
>(values);
136 if (def_levels[
i] == 0) {
140 auto& byte_array = parquet_data_ptr[
j++];
141 auto geo_string_view = std::string_view{
142 reinterpret_cast<const char*
>(byte_array.ptr), byte_array.len};
165 std::list<Chunk_NS::Chunk>& chunks,
166 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata)
const {
168 if (geo_column_type ==
kPOINT) {
169 CHECK(chunk_metadata.size() == 2);
170 CHECK(chunks.size() == 2);
172 CHECK(chunk_metadata.size() == 3);
173 CHECK(chunks.size() == 3);
174 }
else if (geo_column_type ==
kPOLYGON) {
175 CHECK(chunk_metadata.size() == 5);
176 CHECK(chunks.size() == 5);
178 CHECK(chunk_metadata.size() == 6);
179 CHECK(chunks.size() == 6);
197 const std::vector<ArrayDatum>& datum_parse_buffer,
201 CHECK(!chunk_metadata);
204 if (
auto fixed_len_array_encoder =
205 dynamic_cast<FixedLengthArrayNoneEncoder*>(encoder)) {
206 auto new_chunk_metadata = fixed_len_array_encoder->appendData(
207 &datum_parse_buffer, 0, datum_parse_buffer.size());
208 *chunk_metadata = *new_chunk_metadata;
209 }
else if (
auto array_encoder = dynamic_cast<ArrayNoneEncoder*>(encoder)) {
210 auto new_chunk_metadata = array_encoder->appendData(
211 &datum_parse_buffer, 0, datum_parse_buffer.size(),
false);
212 *chunk_metadata = *new_chunk_metadata;
312 template <
typename T>
317 auto list_iter = list.begin();
319 switch (column_type) {
321 if (geo_column ==
COORDS) {
327 if (geo_column ==
COORDS) {
331 if (geo_column ==
BOUNDS) {
337 if (geo_column ==
COORDS) {
345 if (geo_column ==
BOUNDS) {
355 if (geo_column ==
COORDS) {
367 if (geo_column ==
BOUNDS) {
382 std::tuple<Encoder*, ChunkMetadata*, const ColumnDescriptor*>
384 std::list<Chunk_NS::Chunk>& chunks,
385 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
389 chunk->initEncoder();
390 auto encoder = chunk->getBuffer()->getEncoder();
393 auto column_descriptor = chunk->getColumnDesc();
394 return {encoder, metadata, column_descriptor};
398 const std::string& omnisci_column_name) {
399 std::string error_message =
"Failed to extract valid geometry in row " +
401 omnisci_column_name +
"'.";
408 " doesn't match the geospatial type of OmniSci column '" +
409 omnisci_column_name +
"'.");
static void throwMismatchedGeoElement(const std::string &omnisci_column_name)
const ColumnDescriptor * poly_rings_column_descriptor_
Encoder * bounds_column_encoder_
void processNullGeoElement()
std::vector< uint8_t > compress_coords(std::vector< double > &coords, const SQLTypeInfo &ti)
std::vector< int > poly_rings_parse_buffer_
void appendBaseAndRenderGroupDataAndUpdateMetadata(const int64_t levels_read)
static ArrayDatum composeNullArray(const SQLTypeInfo &ti)
void validateChunksAndMetadataSizing(std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata) const
void appendToArrayEncoderAndUpdateMetadata(const std::vector< ArrayDatum > &datum_parse_buffer, Encoder *encoder, ChunkMetadata *chunk_metadata) const
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1) override
ChunkMetadata * coords_column_metadata_
ChunkMetadata * ring_sizes_column_metadata_
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
HOST DEVICE SQLTypes get_type() const
std::vector< int > ring_sizes_parse_buffer_
Encoder * render_group_column_encoder_
ChunkMetadata * render_group_column_metadata_
std::conditional_t< is_cuda_compiler(), DeviceArrayDatum, HostArrayDatum > ArrayDatum
std::tuple< Encoder *, ChunkMetadata *, const ColumnDescriptor * > initEncoderAndGetEncoderAndMetadata(std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const SQLTypes sql_type, GeoColumnType geo_column_type)
ChunkMetadata * bounds_column_metadata_
static void throwMalformedGeoElement(const size_t row_count, const std::string &omnisci_column_name)
specifies the content in-memory of a row in the column metadata table
std::vector< double > coords_parse_buffer_
Encoder * coords_column_encoder_
Encoder * poly_rings_column_encoder_
void processGeoElement(std::string_view geo_string_view)
std::vector< ArrayDatum > coords_datum_buffer_
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
std::vector< ArrayDatum > poly_rings_datum_buffer_
const ColumnDescriptor * coords_column_descriptor_
std::string parquet_column_name_
void appendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, const bool is_last_batch, int8_t *values) override
ChunkMetadata * poly_rings_column_metadata_
std::vector< ArrayDatum > ring_sizes_datum_buffer_
const ColumnDescriptor * ring_sizes_column_descriptor_
unencoded fixed length array encoder
ArrayDatum encode_as_array_datum(const std::vector< T > &data)
const ColumnDescriptor * geo_column_descriptor_
void appendArrayDatumsToBufferAndUpdateMetadata()
const ColumnDescriptor * bounds_column_descriptor_
static constexpr bool PROMOTE_POLYGON_TO_MULTIPOLYGON
Encoder * ring_sizes_column_encoder_
std::vector< int32_t > render_group_values_
StringNoneEncoder * base_column_encoder_
std::vector< std::string > base_values_
ChunkMetadata * base_column_metadata_
ParquetGeospatialEncoder(const parquet::ColumnDescriptor *parquet_column_descriptor, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
std::vector< double > bounds_parse_buffer_
std::vector< ArrayDatum > bounds_datum_buffer_
const ColumnDescriptor * render_group_column_descriptor_
std::list< T >::iterator getIteratorForGeoColumnType(std::list< T > &list, const SQLTypes column_type, const GeoColumnType geo_column)
ParquetGeospatialEncoder()
virtual std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &ti, const bool replicating=false, const int64_t offset=-1)=0