19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
51 namespace foreign_storage {
56 return value >= lower_bound && value <=
upper_bound;
60 return (parquet_column->logical_type()->is_none() &&
61 parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
62 parquet_column->logical_type()->is_string();
102 const parquet::schema::Node* node = parquet_column->schema_node().get();
103 if ((node->name() !=
"element" && node->name() !=
"item") ||
104 !(node->is_required() ||
105 node->is_optional())) {
112 node = node->parent();
116 if (node->name() !=
"list" || !node->is_repeated() ||
122 node = node->parent();
126 if (!node->logical_type()->is_list() ||
127 !(node->is_optional() ||
128 node->is_required())) {
139 node = node->parent();
146 template <
typename V,
typename NullType>
149 const parquet::ColumnDescriptor* parquet_column_descriptor,
151 switch (parquet_column_descriptor->physical_type()) {
152 case parquet::Type::INT32:
153 return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
154 buffer, column_descriptor, parquet_column_descriptor);
155 case parquet::Type::INT64:
156 return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
157 buffer, column_descriptor, parquet_column_descriptor);
158 case parquet::Type::FIXED_LEN_BYTE_ARRAY:
159 return std::make_shared<
161 buffer, column_descriptor, parquet_column_descriptor);
162 case parquet::Type::BYTE_ARRAY:
163 return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
164 buffer, column_descriptor, parquet_column_descriptor);
173 const parquet::ColumnDescriptor* parquet_column,
175 const bool is_metadata_scan_or_for_import) {
176 if (parquet_column->logical_type()->is_decimal()) {
178 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
179 omnisci_column, parquet_column, buffer);
182 if (is_metadata_scan_or_for_import) {
185 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
186 omnisci_column, parquet_column, buffer);
188 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
189 omnisci_column, parquet_column, buffer);
196 return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
197 omnisci_column, parquet_column, buffer);
199 return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
200 omnisci_column, parquet_column, buffer);
223 template <
typename V,
typename T,
typename U,
typename NullType>
224 std::shared_ptr<ParquetEncoder>
227 const size_t omnisci_data_type_byte_size,
228 const size_t parquet_data_type_byte_size,
229 const bool is_signed) {
230 CHECK(
sizeof(NullType) == omnisci_data_type_byte_size);
232 return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
233 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
235 return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
236 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
259 template <
typename V,
typename NullType>
262 const size_t omnisci_data_type_byte_size,
263 const size_t parquet_data_type_byte_size,
265 const bool is_signed) {
272 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
278 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
284 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
290 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
299 const parquet::ColumnDescriptor* parquet_column,
301 const bool is_metadata_scan_or_for_import) {
302 auto column_type = omnisci_column->
columnType;
303 auto physical_type = parquet_column->physical_type();
306 int is_signed =
false;
308 if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
309 if (physical_type == parquet::Type::INT32) {
311 }
else if (physical_type == parquet::Type::INT64) {
319 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
320 parquet_column->logical_type().get())) {
321 bit_width = int_logical_column->bit_width();
322 is_signed = int_logical_column->is_signed();
325 if (bit_width == -1) {
329 const size_t omnisci_data_type_byte_size = column_type.get_size();
330 const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
332 switch (omnisci_data_type_byte_size) {
335 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
337 omnisci_data_type_byte_size,
338 parquet_data_type_byte_size,
342 if (is_metadata_scan_or_for_import && column_type.get_type() ==
kBIGINT) {
343 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
345 omnisci_data_type_byte_size,
346 parquet_data_type_byte_size,
350 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
352 omnisci_data_type_byte_size,
353 parquet_data_type_byte_size,
357 if (is_metadata_scan_or_for_import) {
358 switch (column_type.get_type()) {
360 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
362 omnisci_data_type_byte_size,
363 parquet_data_type_byte_size,
367 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
369 omnisci_data_type_byte_size,
370 parquet_data_type_byte_size,
379 return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
381 omnisci_data_type_byte_size,
382 parquet_data_type_byte_size,
386 if (is_metadata_scan_or_for_import) {
387 switch (column_type.get_type()) {
389 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
391 omnisci_data_type_byte_size,
392 parquet_data_type_byte_size,
396 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
398 omnisci_data_type_byte_size,
399 parquet_data_type_byte_size,
403 return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
405 omnisci_data_type_byte_size,
406 parquet_data_type_byte_size,
415 return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
417 omnisci_data_type_byte_size,
418 parquet_data_type_byte_size,
429 const parquet::ColumnDescriptor* parquet_column,
431 auto column_type = omnisci_column->
columnType;
432 if (!column_type.is_fp()) {
436 switch (column_type.get_type()) {
438 switch (parquet_column->physical_type()) {
439 case parquet::Type::FLOAT:
440 return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
441 buffer, omnisci_column, parquet_column);
442 case parquet::Type::DOUBLE:
443 return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
444 buffer, omnisci_column, parquet_column);
449 CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
450 return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
451 buffer, omnisci_column, parquet_column);
460 const parquet::ColumnDescriptor* parquet_column,
462 auto column_type = omnisci_column->
columnType;
463 if (parquet_column->logical_type()->is_none() &&
466 switch (column_type.get_type()) {
468 return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
469 buffer, omnisci_column, parquet_column);
480 template <
typename V,
typename T,
typename NullType>
483 const parquet::ColumnDescriptor* parquet_column,
485 if (
auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
486 parquet_column->logical_type().get())) {
487 switch (timestamp_logical_type->time_unit()) {
488 case parquet::LogicalType::TimeUnit::MILLIS:
489 return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
490 buffer, omnisci_column, parquet_column);
491 case parquet::LogicalType::TimeUnit::MICROS:
492 return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
493 buffer, omnisci_column, parquet_column);
494 case parquet::LogicalType::TimeUnit::NANOS:
495 return std::make_shared<
497 buffer, omnisci_column, parquet_column);
507 template <
typename V,
typename T,
typename NullType>
510 const parquet::ColumnDescriptor* parquet_column,
512 const bool is_metadata_scan_or_for_import) {
513 if (
auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
514 parquet_column->logical_type().get())) {
515 switch (timestamp_logical_type->time_unit()) {
516 case parquet::LogicalType::TimeUnit::MILLIS:
517 if (is_metadata_scan_or_for_import) {
518 return std::make_shared<
520 buffer, omnisci_column, parquet_column);
522 return std::make_shared<
524 buffer, omnisci_column, parquet_column);
525 case parquet::LogicalType::TimeUnit::MICROS:
526 if (is_metadata_scan_or_for_import) {
527 return std::make_shared<
529 buffer, omnisci_column, parquet_column);
531 return std::make_shared<
533 buffer, omnisci_column, parquet_column);
534 case parquet::LogicalType::TimeUnit::NANOS:
535 if (is_metadata_scan_or_for_import) {
536 return std::make_shared<
539 1000L * 1000L * 1000L,
541 buffer, omnisci_column, parquet_column);
543 return std::make_shared<
545 buffer, omnisci_column, parquet_column);
557 const parquet::ColumnDescriptor* parquet_column,
559 const bool is_metadata_scan_or_for_import) {
560 auto column_type = omnisci_column->
columnType;
562 if (parquet_column->logical_type()->is_timestamp()) {
564 if (precision == 0) {
565 return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
566 omnisci_column, parquet_column, buffer);
568 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
569 buffer, omnisci_column, parquet_column);
572 CHECK(column_type.get_comp_param() == 32);
573 if (is_metadata_scan_or_for_import) {
574 return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
575 omnisci_column, parquet_column, buffer);
577 return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
578 omnisci_column, parquet_column, buffer);
581 }
else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
582 if (parquet_column->physical_type() == parquet::Type::INT32) {
584 column_type.get_comp_param() == 32);
585 if (is_metadata_scan_or_for_import) {
586 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
587 buffer, omnisci_column, parquet_column);
589 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
590 buffer, omnisci_column, parquet_column);
592 }
else if (parquet_column->physical_type() == parquet::Type::INT64) {
594 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
595 buffer, omnisci_column, parquet_column);
597 CHECK(column_type.get_comp_param() == 32);
598 if (is_metadata_scan_or_for_import) {
599 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
600 buffer, omnisci_column, parquet_column);
602 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
603 buffer, omnisci_column, parquet_column);
613 template <
typename V,
typename T,
typename NullType>
616 const parquet::ColumnDescriptor* parquet_column,
618 if (
auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
619 parquet_column->logical_type().get())) {
620 switch (time_logical_type->time_unit()) {
621 case parquet::LogicalType::TimeUnit::MILLIS:
622 return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
623 buffer, omnisci_column, parquet_column);
624 case parquet::LogicalType::TimeUnit::MICROS:
625 return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
626 buffer, omnisci_column, parquet_column);
627 case parquet::LogicalType::TimeUnit::NANOS:
628 return std::make_shared<
630 buffer, omnisci_column, parquet_column);
642 const parquet::ColumnDescriptor* parquet_column,
644 const bool is_metadata_scan_or_for_import) {
645 auto column_type = omnisci_column->
columnType;
646 if (
auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
647 parquet_column->logical_type().get())) {
649 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
650 return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
651 omnisci_column, parquet_column, buffer);
653 return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
654 omnisci_column, parquet_column, buffer);
657 if (is_metadata_scan_or_for_import) {
658 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
659 CHECK(parquet_column->physical_type() == parquet::Type::INT32);
660 return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
661 omnisci_column, parquet_column, buffer);
663 CHECK(time_logical_column->time_unit() ==
664 parquet::LogicalType::TimeUnit::MICROS ||
665 time_logical_column->time_unit() ==
666 parquet::LogicalType::TimeUnit::NANOS);
667 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
668 return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
669 omnisci_column, parquet_column, buffer);
672 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
673 CHECK(parquet_column->physical_type() == parquet::Type::INT32);
674 return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
675 omnisci_column, parquet_column, buffer);
677 CHECK(time_logical_column->time_unit() ==
678 parquet::LogicalType::TimeUnit::MICROS ||
679 time_logical_column->time_unit() ==
680 parquet::LogicalType::TimeUnit::NANOS);
681 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
682 return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
683 omnisci_column, parquet_column, buffer);
695 const parquet::ColumnDescriptor* parquet_column,
697 const bool is_metadata_scan_or_for_import) {
698 auto column_type = omnisci_column->
columnType;
699 if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
701 if (is_metadata_scan_or_for_import) {
702 if (column_type.get_comp_param() ==
707 omnisci_column, parquet_column, buffer,
true);
708 }
else if (column_type.get_comp_param() == 16) {
712 omnisci_column, parquet_column, buffer,
true);
717 if (column_type.get_comp_param() ==
722 omnisci_column, parquet_column, buffer,
false);
723 }
else if (column_type.get_comp_param() == 16) {
727 omnisci_column, parquet_column, buffer,
false);
738 const parquet::ColumnDescriptor* parquet_column,
740 const bool is_metadata_scan_or_for_import) {
741 auto column_type = omnisci_column->
columnType;
742 if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
744 if (is_metadata_scan_or_for_import) {
745 if (column_type.get_comp_param() ==
749 }
else if (column_type.get_comp_param() == 16) {
756 if (column_type.get_comp_param() ==
758 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
759 buffer, omnisci_column, parquet_column);
760 }
else if (column_type.get_comp_param() == 16) {
761 return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
762 buffer, omnisci_column, parquet_column);
769 buffer, omnisci_column, parquet_column);
779 const parquet::ColumnDescriptor* parquet_column,
782 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
784 const bool is_for_detect) {
785 auto column_type = omnisci_column->
columnType;
792 return std::make_shared<ParquetStringImportEncoder>(chunk.
getBuffer());
794 return std::make_shared<ParquetStringNoneEncoder>(chunk.
getBuffer(),
798 if (!is_for_detect) {
799 chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
800 std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
801 logical_chunk_metadata->sqlType = omnisci_column->
columnType;
802 switch (column_type.get_size()) {
804 return std::make_shared<ParquetStringEncoder<uint8_t>>(
807 is_for_import ?
nullptr : logical_chunk_metadata.get());
809 return std::make_shared<ParquetStringEncoder<uint16_t>>(
812 is_for_import ?
nullptr : logical_chunk_metadata.get());
814 return std::make_shared<ParquetStringEncoder<int32_t>>(
817 is_for_import ?
nullptr : logical_chunk_metadata.get());
822 return std::make_shared<ParquetDetectStringEncoder>(chunk.
getBuffer());
832 const parquet::ColumnDescriptor* parquet_column,
833 std::list<Chunk_NS::Chunk>& chunks,
834 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
835 const bool is_metadata_scan,
836 const bool is_for_import,
837 const bool geo_validate_geometry) {
838 auto column_type = omnisci_column->
columnType;
843 return std::make_shared<ParquetGeospatialImportEncoder>(chunks,
844 geo_validate_geometry);
846 if (is_metadata_scan) {
847 return std::make_shared<ParquetGeospatialEncoder>(geo_validate_geometry);
849 for (
auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
850 chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
851 auto& chunk_metadata_ptr = chunk_metadata.back();
852 chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
854 return std::make_shared<ParquetGeospatialEncoder>(
855 parquet_column, chunks, chunk_metadata, geo_validate_geometry);
863 const parquet::ColumnDescriptor* parquet_column,
864 std::list<Chunk_NS::Chunk>& chunks,
866 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
867 const bool is_metadata_scan,
868 const bool is_for_import,
869 const bool is_for_detect,
870 const bool geo_validate_geometry);
906 const parquet::ColumnDescriptor* parquet_column,
907 std::list<Chunk_NS::Chunk>& chunks,
909 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
910 const bool is_metadata_scan,
911 const bool is_for_import,
912 const bool is_for_detect,
913 const bool geo_validate_geometry) {
914 CHECK(!(is_metadata_scan && is_for_import));
915 auto buffer = chunks.empty() ?
nullptr : chunks.begin()->getBuffer();
922 geo_validate_geometry)) {
933 geo_validate_geometry)) {
937 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
941 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
949 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
957 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
961 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
965 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
986 std::list<Chunk_NS::Chunk>& chunks,
988 const parquet::ColumnDescriptor* parquet_column,
990 const bool geo_validate_geometry) {
991 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1000 geo_validate_geometry);
1009 const parquet::ColumnDescriptor* parquet_column,
1010 const bool geo_validate_geometry) {
1011 std::list<Chunk_NS::Chunk> chunks;
1012 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1021 geo_validate_geometry);
1026 const parquet::ColumnDescriptor* parquet_column,
1027 std::list<Chunk_NS::Chunk>& chunks,
1029 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
1030 const bool is_metadata_scan,
1031 const bool is_for_import,
1032 const bool is_for_detect,
1033 const bool geo_validate_geometry) {
1038 std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1048 geo_validate_geometry);
1049 CHECK(encoder.get());
1051 CHECK(scalar_encoder);
1052 if (!is_for_import) {
1053 if (!is_for_detect) {
1055 encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1056 is_metadata_scan ?
nullptr : chunks.begin()->getBuffer(),
1060 encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1061 is_metadata_scan ?
nullptr : chunks.begin()->getBuffer(),
1062 is_metadata_scan ?
nullptr : chunks.begin()->getIndexBuf(),
1067 encoder = std::make_shared<ParquetArrayDetectEncoder>(
1068 chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1071 encoder = std::make_shared<ParquetArrayImportEncoder>(
1072 chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1078 const parquet::ParquetFileReader* reader,
1079 const int row_group_index,
1080 const int column_index,
1081 const int16_t* def_levels,
1082 const int64_t num_levels,
1083 const parquet::ColumnDescriptor* parquet_column_descriptor) {
1085 if (!is_valid_parquet_list) {
1088 std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1089 reader->metadata()->RowGroup(row_group_index);
1090 auto column_metadata = group_metadata->ColumnChunk(column_index);
1092 if (group_metadata->num_rows() == 0) {
1096 if (!
stats->HasMinMax()) {
1097 auto find_it = std::find_if(def_levels,
1098 def_levels + num_levels,
1099 [](
const int16_t def_level) {
return def_level == 3; });
1100 if (find_it != def_levels + num_levels) {
1101 throw std::runtime_error(
1102 "No minimum and maximum statistic set in list column but non-null & non-empty "
1103 "array/value detected.");
1114 const parquet::ColumnDescriptor* parquet_column_descriptor,
1115 std::vector<int16_t>& def_levels) {
1117 parquet_column_descriptor->max_definition_level() == 0) {
1118 if (!parquet_column_descriptor->schema_node()->is_required()) {
1119 throw std::runtime_error(
1120 "Unsupported parquet column detected. Column '" +
1121 parquet_column_descriptor->path()->ToDotString() +
1122 "' detected to have max definition level of 0 but is optional.");
1124 def_levels.assign(def_levels.size(), 1);
1130 const parquet::ColumnDescriptor* parquet_column_descriptor) {
1133 throw std::runtime_error(
1134 "Unsupported mapping detected. Column '" +
1135 parquet_column_descriptor->path()->ToDotString() +
1136 "' detected to be a parquet list but HeavyDB mapped column '" +
1137 omnisci_column_descriptor->
columnName +
"' is not an array.");
1139 if (is_valid_parquet_list) {
1140 if (parquet_column_descriptor->max_repetition_level() != 1 ||
1141 parquet_column_descriptor->max_definition_level() != 3) {
1142 throw std::runtime_error(
1143 "Incorrect schema max repetition level detected in column '" +
1144 parquet_column_descriptor->path()->ToDotString() +
1145 "'. Expected a max repetition level of 1 and max definition level of 3 for "
1146 "list column but column has a max "
1147 "repetition level of " +
1148 std::to_string(parquet_column_descriptor->max_repetition_level()) +
1149 " and a max definition level of " +
1150 std::to_string(parquet_column_descriptor->max_definition_level()) +
".");
1153 if (parquet_column_descriptor->max_repetition_level() != 0 ||
1154 !(parquet_column_descriptor->max_definition_level() == 1 ||
1155 parquet_column_descriptor->max_definition_level() == 0)) {
1156 throw std::runtime_error(
1157 "Incorrect schema max repetition level detected in column '" +
1158 parquet_column_descriptor->path()->ToDotString() +
1159 "'. Expected a max repetition level of 0 and max definition level of 1 or 0 "
1161 "flat column but column has a max "
1162 "repetition level of " +
1163 std::to_string(parquet_column_descriptor->max_repetition_level()) +
1164 " and a max definition level of " +
1165 std::to_string(parquet_column_descriptor->max_definition_level()) +
".");
1171 const parquet::ColumnDescriptor* parquet_column,
1172 std::vector<int8_t>& values) {
1173 auto max_type_byte_size =
1175 parquet::GetTypeByteSize(parquet_column->physical_type()));
1176 size_t values_size =
1178 values.resize(values_size);
1182 const parquet::ColumnDescriptor* parquet_column) {
1183 if (
auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1184 parquet_column->logical_type().get())) {
1186 decimal_logical_column->precision() &&
1196 if (
auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1197 parquet_column->logical_type().get())) {
1198 auto parquet_precision = decimal_logical_column->precision();
1199 auto parquet_scale = decimal_logical_column->
scale();
1202 "Parquet column \"" + parquet_column->ToString() +
1203 "\" has decimal precision of " +
std::to_string(parquet_precision) +
1204 " which is too high to import, maximum precision supported is " +
1216 <<
" a Parquet column's decimal logical type failed to be read appropriately";
1221 const parquet::ColumnDescriptor* parquet_column) {
1229 return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1230 (parquet_column->physical_type() == parquet::Type::FLOAT &&
1237 const parquet::ColumnDescriptor* parquet_column) {
1239 if (parquet_column->physical_type() == parquet::Type::FLOAT) {
1241 }
else if (parquet_column->physical_type() == parquet::Type::DOUBLE) {
1252 const parquet::ColumnDescriptor* parquet_column) {
1256 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1257 parquet_column->logical_type().get())) {
1260 const int bits_per_byte = 8;
1263 const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1265 int_logical_column->bit_width() * bit_widening_factor;
1271 return (parquet_column->physical_type() == parquet::Type::INT64) ||
1272 (parquet_column->physical_type() == parquet::Type::INT32 &&
1281 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1282 parquet_column->logical_type().get())) {
1283 auto bit_width = int_logical_column->bit_width();
1284 if (!int_logical_column->is_signed()) {
1287 "Unsigned integer column \"" + parquet_column->path()->ToDotString() +
1288 "\" in Parquet file with 64 bit-width has no supported type for ingestion "
1289 "that will not result in data loss");
1312 CHECK(parquet_column->logical_type()->is_none());
1313 if (parquet_column->physical_type() == parquet::Type::INT32) {
1316 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
1328 const parquet::TimestampLogicalType* timestamp_logical_column) {
1329 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1337 const parquet::TimestampLogicalType* timestamp_logical_column) {
1338 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1346 const parquet::TimestampLogicalType* timestamp_logical_column) {
1347 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1351 const parquet::ColumnDescriptor* parquet_column) {
1352 bool is_none_encoded_mapping =
1354 (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1356 return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1360 const parquet::ColumnDescriptor* parquet_column) {
1369 const parquet::ColumnDescriptor* parquet_column) {
1377 if (
auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1378 parquet_column->logical_type().get())) {
1393 if (parquet_column->logical_type()->is_none() &&
1394 ((parquet_column->physical_type() == parquet::Type::INT32 &&
1397 parquet_column->physical_type() == parquet::Type::INT64)) {
1404 if (
auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1405 parquet_column->logical_type().get())) {
1424 const parquet::ColumnDescriptor* parquet_column) {
1431 if (parquet_column->logical_type()->is_time()) {
1438 CHECK(parquet_column->logical_type()->is_time());
1440 type.set_type(
kTIME);
1442 type.set_fixed_size();
1447 const parquet::ColumnDescriptor* parquet_column) {
1458 return parquet_column->logical_type()->is_date() ||
1459 parquet_column->logical_type()
1464 CHECK(parquet_column->logical_type()->is_date());
1466 type.set_type(
kDATE);
1468 type.set_fixed_size();
1473 const parquet::ColumnDescriptor* parquet_column) {
1491 const parquet::ColumnDescriptor* parquet_column) {
1497 const parquet::arrow::FileReader* new_file_reader,
1498 const std::string& reference_file_path,
1499 const std::string& new_file_path) {
1500 const auto reference_num_columns =
1501 reference_file_reader->parquet_reader()->metadata()->num_columns();
1502 const auto new_num_columns =
1503 new_file_reader->parquet_reader()->metadata()->num_columns();
1504 if (reference_num_columns != new_num_columns) {
1505 throw std::runtime_error{
"Parquet file \"" + new_file_path +
1506 "\" has a different schema. Please ensure that all Parquet "
1507 "files use the same schema. Reference Parquet file: \"" +
1508 reference_file_path +
"\" has " +
1510 " columns. New Parquet file \"" + new_file_path +
"\" has " +
1514 for (
int i = 0; i < reference_num_columns; i++) {
1517 reference_file_path,
1525 bool allowed_type =
false;
1528 auto omnisci_column_sub_type_column =
1531 omnisci_column_sub_type_column.get(), parquet_column);
1537 if (!allowed_type) {
1538 auto logical_type = parquet_column->logical_type();
1539 if (logical_type->is_timestamp()) {
1540 auto timestamp_type =
1541 dynamic_cast<const parquet::TimestampLogicalType*
>(logical_type.get());
1542 CHECK(timestamp_type);
1544 if (!timestamp_type->is_adjusted_to_utc()) {
1545 LOG(
WARNING) <<
"Non-UTC timezone specified in Parquet file for column \""
1547 <<
"\". Only UTC timezone is currently supported.";
1550 std::string parquet_type;
1552 if (parquet_column->logical_type()->is_none()) {
1553 parquet_type = parquet::TypeToString(physical_type);
1555 parquet_type = logical_type->ToString();
1558 throw std::runtime_error{
"Conversion from Parquet type \"" + parquet_type +
1559 "\" to HeavyDB type \"" + omnisci_type +
1560 "\" is not allowed. Please use an appropriate column type."};
1566 if (parquet_column->logical_type()->is_decimal()) {
1570 if (parquet_column->logical_type()->is_none() &&
1571 (parquet_column->physical_type() == parquet::Type::FLOAT ||
1572 parquet_column->physical_type() == parquet::Type::DOUBLE)) {
1576 if ((parquet_column->logical_type()->is_none() &&
1577 (parquet_column->physical_type() == parquet::Type::INT32 ||
1578 parquet_column->physical_type() == parquet::Type::INT64)) ||
1579 parquet_column->logical_type()->is_int()) {
1583 if (parquet_column->logical_type()->is_none() &&
1584 parquet_column->physical_type() == parquet::Type::BOOLEAN) {
1588 if (parquet_column->logical_type()->is_timestamp()) {
1592 if (parquet_column->logical_type()->is_time()) {
1596 if (parquet_column->logical_type()->is_date()) {
1605 parquet_column->ToString());
1609 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1610 const std::string& file_path,
1619 const int column_index,
1620 const std::string& file_path) {
1621 throw std::runtime_error{
1622 "Statistics metadata is required for all row groups. Metadata is missing for "
1623 "row group index: " +
1625 ", column index: " +
std::to_string(column_index) +
", file path: " + file_path};
1636 const int fragment_size) {
1638 "Parquet file has a row group size that is larger than the fragment size. "
1639 "Please set the table fragment size to a number that is larger than the "
1640 "row group size. Row group index: " +
1644 ", file path: " + max_row_group_stats.
file_path};
1645 metadata_scan_exception.min_feasible_fragment_size_ =
1647 throw metadata_scan_exception;
1651 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1652 const std::string& file_path,
1654 const bool do_metadata_stats_validation) {
1657 for (
int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1658 const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1661 }
catch (std::runtime_error& e) {
1662 std::stringstream error_message;
1663 error_message << e.what() <<
" Parquet column: " << descr->path()->ToDotString()
1664 <<
", HeavyDB column: " << (*column_it)->columnName
1665 <<
", Parquet file: " << file_path <<
".";
1666 throw std::runtime_error(error_message.str());
1669 for (
int r = 0; r < file_metadata->num_row_groups(); ++r) {
1670 auto group_metadata = file_metadata->RowGroup(r);
1671 auto num_rows = group_metadata->num_rows();
1672 if (num_rows == 0) {
1674 }
else if (num_rows > max_row_group_stats.max_row_group_size) {
1675 max_row_group_stats.max_row_group_size = num_rows;
1676 max_row_group_stats.max_row_group_index = r;
1677 max_row_group_stats.file_path = file_path;
1680 if (do_metadata_stats_validation) {
1681 auto column_chunk = group_metadata->ColumnChunk(i);
1682 bool contains_metadata = column_chunk->is_stats_set();
1683 if (contains_metadata) {
1684 auto stats = column_chunk->statistics();
1685 bool is_all_nulls =
stats->null_count() == column_chunk->num_values();
1691 if (!(
stats->HasMinMax() || is_all_nulls || is_list)) {
1692 contains_metadata =
false;
1696 if (!contains_metadata) {
1702 return max_row_group_stats;
1706 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1707 const std::string& file_path,
1709 const bool do_metadata_stats_validation) {
1712 file_metadata, file_path, schema, do_metadata_stats_validation);
1716 const std::map<
int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1720 std::list<RowGroupMetadata> row_group_metadata;
1721 auto column_interval =
1725 auto file_metadata = reader->parquet_reader()->metadata();
1726 for (
int row_group = row_group_interval.
start_index;
1727 row_group <= row_group_interval.
end_index;
1729 auto& row_group_metadata_item = row_group_metadata.emplace_back();
1730 row_group_metadata_item.row_group_index = row_group;
1731 row_group_metadata_item.file_path = row_group_interval.
file_path;
1733 std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1734 file_metadata->RowGroup(row_group);
1736 for (
int column_id = column_interval.start; column_id <= column_interval.end;
1740 auto encoder_map_iter =
1742 CHECK(encoder_map_iter != encoder_map.end());
1744 auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1745 group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1746 row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1747 }
catch (
const std::exception& e) {
1748 std::stringstream error_message;
1749 error_message << e.what() <<
" in row group " << row_group <<
" of Parquet file '"
1750 << row_group_interval.
file_path <<
"'.";
1751 throw std::runtime_error(error_message.str());
1755 return row_group_metadata;
1759 const std::map<int, Chunk_NS::Chunk> chunks,
1762 const std::map<int, StringDictionary*> column_dictionaries,
1763 const int64_t num_rows,
1764 const bool geo_validate_geometry) {
1765 std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1766 auto file_metadata = reader->parquet_reader()->metadata();
1767 for (
auto& [column_id, chunk] : chunks) {
1769 if (column_descriptor->isGeoPhyCol) {
1772 auto parquet_column_descriptor =
1774 auto find_it = column_dictionaries.find(column_id);
1776 (find_it == column_dictionaries.end() ?
nullptr : find_it->second);
1777 std::list<Chunk_NS::Chunk> chunks_for_import;
1778 chunks_for_import.push_back(chunk);
1779 if (column_descriptor->columnType.is_geometry()) {
1780 for (
int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1781 chunks_for_import.push_back(chunks.at(column_id + i + 1));
1786 parquet_column_descriptor,
1788 geo_validate_geometry);
1793 if (
auto inplace_encoder = dynamic_cast<ParquetInPlaceEncoder*>(encoder.get())) {
1794 inplace_encoder->reserve(num_rows);
1804 const bool do_metadata_stats_validation,
1805 const bool geo_validate_geometry) {
1806 std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1807 auto file_metadata = reader->parquet_reader()->metadata();
1808 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
1811 auto parquet_column_descriptor =
1814 column_descriptor, parquet_column_descriptor, geo_validate_geometry);
1815 if (!do_metadata_stats_validation) {
1818 column_id += column_descriptor->columnType.get_physical_cols();
1825 const std::vector<RowGroupInterval>& row_group_intervals,
1826 const int parquet_column_index,
1828 std::list<Chunk_NS::Chunk>& chunks,
1831 const bool is_for_detect,
1832 const std::optional<int64_t> max_levels_read) {
1834 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1840 std::vector<int8_t> values;
1842 CHECK(!row_group_intervals.empty());
1843 const auto& first_file_path = row_group_intervals.front().file_path;
1846 auto first_parquet_column_descriptor =
1850 const bool geo_validate_geometry =
1853 first_parquet_column_descriptor,
1860 geo_validate_geometry);
1861 CHECK(encoder.get());
1863 if (rejected_row_indices) {
1864 encoder->initializeErrorTracking();
1866 encoder->initializeColumnType(column_descriptor->
columnType);
1868 bool early_exit =
false;
1869 int64_t total_levels_read = 0;
1870 for (
const auto& row_group_interval : row_group_intervals) {
1871 const auto& file_path = row_group_interval.file_path;
1875 CHECK(row_group_interval.start_index >= 0 &&
1876 row_group_interval.end_index < num_row_groups);
1877 CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1879 parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1880 auto parquet_column_descriptor =
1883 parquet_column_descriptor,
1888 parquet_column_descriptor);
1892 int64_t values_read = 0;
1893 for (
int row_group_index = row_group_interval.start_index;
1894 row_group_index <= row_group_interval.end_index;
1895 ++row_group_index) {
1896 auto group_reader = parquet_reader->RowGroup(row_group_index);
1897 std::shared_ptr<parquet::ColumnReader> col_reader =
1898 group_reader->Column(parquet_column_index);
1901 while (col_reader->HasNext()) {
1902 int64_t levels_read =
1906 reinterpret_cast<uint8_t*
>(values.data()),
1910 if (rejected_row_indices) {
1911 encoder->appendDataTrackErrors(def_levels.data(),
1920 parquet_column_index,
1923 parquet_column_descriptor);
1925 encoder->appendData(def_levels.data(),
1932 if (max_levels_read.has_value()) {
1933 total_levels_read += levels_read;
1934 if (total_levels_read >= max_levels_read.value()) {
1940 if (
auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1941 array_encoder->finalizeRowGroup();
1943 }
catch (
const std::exception& error) {
1946 if (boost::regex_search(error.what(),
1947 boost::regex{
"Deserializing page header failed."})) {
1949 "Unable to read from foreign data source, possible cause is an unexpected "
1950 "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1953 "if data source has been updated. Foreign table: " +
1958 std::string(error.what()) +
" Row group: " +
std::to_string(row_group_index) +
1959 ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1960 "', Parquet file: '" + file_path +
"'");
1962 if (max_levels_read.has_value() && early_exit) {
1966 if (max_levels_read.has_value() && early_exit) {
1971 if (rejected_row_indices) {
1972 *rejected_row_indices = encoder->getRejectedRowIndices();
1974 return chunk_metadata;
1978 const parquet::ColumnDescriptor* parquet_column) {
1983 return type.get_array_type();
1991 const parquet::ColumnDescriptor* parquet_column) {
1993 <<
"isColumnMappingSupported should not be called on arrays";
2025 std::shared_ptr<arrow::fs::FileSystem> file_system,
2028 : file_system_(file_system)
2029 , file_reader_cache_(file_map)
2030 , foreign_table_(foreign_table) {
2035 const std::vector<RowGroupInterval>& row_group_intervals,
2036 const int parquet_column_index,
2037 std::list<Chunk_NS::Chunk>& chunks,
2040 CHECK(!chunks.empty());
2041 auto const& chunk = *chunks.begin();
2042 auto column_descriptor = chunk.getColumnDesc();
2043 auto buffer = chunk.getBuffer();
2048 parquet_column_index,
2052 rejected_row_indices);
2054 }
catch (
const std::exception& error) {
2076 const parquet::ColumnDescriptor* parquet_column_descriptor,
2079 const int row_group_index,
2080 const int parquet_column_index,
2081 const parquet::ParquetFileReader* parquet_reader)
2103 reinterpret_cast<uint8_t*
>(batch_data.
values.data()),
2119 batch_data.
values.data(),
2123 if (
auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(
encoder_)) {
2124 array_encoder->finalizeRowGroup();
2146 const std::map<int, Chunk_NS::Chunk>& chunks,
2148 const std::map<int, StringDictionary*>& column_dictionaries,
2149 const int num_threads) {
2152 const auto& file_path = row_group_interval.
file_path;
2156 auto file_reader = file_reader_owner.get();
2157 auto file_metadata = file_reader->parquet_reader()->metadata();
2165 auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2168 }
catch (std::runtime_error& e) {
2169 std::stringstream error_message;
2170 error_message << e.what()
2171 <<
" Parquet column: " << parquet_column->path()->ToDotString()
2172 <<
", HeavyDB column: " << column_descriptor->columnName
2173 <<
", Parquet file: " << file_path <<
".";
2174 throw std::runtime_error(error_message.str());
2179 auto row_group_index = row_group_interval.
start_index;
2180 std::map<int, ParquetRowGroupReader> row_group_reader_map;
2182 parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2183 auto group_reader = parquet_reader->RowGroup(row_group_index);
2185 std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2187 const bool geo_validate_geometry =
2192 column_dictionaries,
2193 group_reader->metadata()->num_rows(),
2194 geo_validate_geometry);
2196 std::vector<std::set<int>> partitions(num_threads);
2197 std::map<int, int> column_id_to_thread;
2198 for (
auto& [column_id, encoder] : encoder_map) {
2199 auto thread_id = column_id % num_threads;
2200 column_id_to_thread[column_id] =
thread_id;
2201 partitions[
thread_id].insert(column_id);
2204 for (
auto& [column_id, encoder] : encoder_map) {
2207 auto parquet_column_descriptor =
2208 file_metadata->schema()->Column(parquet_column_index);
2213 row_group_interval.
end_index < num_row_groups);
2214 CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2216 parquet_column_descriptor);
2218 std::shared_ptr<parquet::ColumnReader> col_reader =
2219 group_reader->Column(parquet_column_index);
2221 row_group_reader_map.insert(
2225 parquet_column_descriptor,
2228 column_id_to_thread, column_id)],
2230 parquet_column_index,
2234 std::vector<std::future<void>> futures;
2235 for (
int ithread = 0; ithread < num_threads; ++ithread) {
2236 auto column_ids_for_thread = partitions[ithread];
2237 futures.emplace_back(
2239 for (
const auto column_id : column_ids_for_thread) {
2241 .readAndValidateRowGroup();
2247 for (
auto& future : futures) {
2251 for (
auto& future : futures) {
2257 for (
auto& thread_invalid_indices : invalid_indices_per_thread) {
2258 invalid_indices.merge(thread_invalid_indices);
2261 for (
auto& [_, reader] : row_group_reader_map) {
2262 reader.eraseInvalidRowGroupData(
2268 auto column_id = column_descriptor->columnId;
2270 CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2271 invalid_indices.size());
2272 size_t updated_num_elems = db_encoder->getNumElems() +
2273 group_reader->metadata()->num_rows() -
2274 invalid_indices.size();
2275 db_encoder->setNumElems(updated_num_elems);
2276 if (column_descriptor->columnType.is_geometry()) {
2277 for (
int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2280 db_encoder->setNumElems(updated_num_elems);
2285 return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2286 invalid_indices.size()};
2297 const size_t max_num_rows,
2299 CHECK(!files.empty());
2301 auto first_file = *files.begin();
2304 for (
auto current_file_it = ++files.begin(); current_file_it != files.end();
2305 ++current_file_it) {
2310 auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2311 auto num_columns = first_file_metadata->num_columns();
2316 auto current_file_it = files.begin();
2317 while (data_preview.
sample_rows.size() < max_num_rows &&
2318 current_file_it != files.end()) {
2319 size_t total_num_rows = data_preview.
sample_rows.size();
2320 size_t max_num_rows_to_append = max_num_rows - data_preview.
sample_rows.size();
2323 std::vector<RowGroupInterval> row_group_intervals;
2324 for (; current_file_it != files.end(); ++current_file_it) {
2325 const auto& file_path = *current_file_it;
2327 auto file_metadata = file_reader->parquet_reader()->metadata();
2328 auto num_row_groups = file_metadata->num_row_groups();
2329 int end_row_group = 0;
2330 for (
int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2331 const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2332 total_num_rows += next_num_rows;
2335 row_group_intervals.push_back(
RowGroupInterval{file_path, 0, end_row_group});
2339 for (
int i = 0; i < num_columns; ++i) {
2340 auto col = first_file_metadata->schema()->Column(i);
2345 sql_type.is_array() ? col->path()->ToDotVector()[0] +
"_array" : col->name();
2353 std::make_unique<TypedParquetDetectBuffer>());
2355 std::make_unique<RejectedRowIndices>());
2357 auto& chunk = preview_context.
column_chunks.emplace_back(&cd);
2358 chunk.setPinnable(
false);
2359 chunk.setBuffer(detect_buffer.get());
2362 std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2363 [&](
const std::vector<int>& column_indices) {
2364 for (
const auto& column_index : column_indices) {
2366 auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2367 auto& rejected_row_indices =
2371 chunk.getColumnDesc(),
2374 rejected_row_indices.get(),
2376 max_num_rows_to_append);
2382 std::vector<int> columns(num_columns);
2383 std::iota(columns.begin(), columns.end(), 0);
2386 for (
auto& future : futures) {
2389 for (
auto& future : futures) {
2394 auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2395 for (
int i = 0; i < num_columns; ++i) {
2396 rejected_row_indices->insert(
2401 size_t num_rows = 0;
2403 for (
int i = 0; i < num_columns; ++i, ++buffers_it) {
2405 auto& strings = buffers_it->get()->getStrings();
2407 num_rows = strings.size();
2409 CHECK_EQ(num_rows, strings.size());
2413 size_t num_rejected_rows = rejected_row_indices->size();
2415 CHECK_GE(num_rows, num_rejected_rows);
2416 auto row_count = num_rows - num_rejected_rows;
2418 auto offset_row = data_preview.
sample_rows.size();
2419 data_preview.
sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2421 for (
size_t irow = 0, rows_appended = 0;
2422 irow < num_rows && offset_row + rows_appended < max_num_rows;
2424 if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2427 auto& row_data = data_preview.
sample_rows[offset_row + rows_appended];
2428 row_data.resize(num_columns);
2430 for (
int i = 0; i < num_columns; ++i, ++buffers_it) {
2432 auto& strings = buffers_it->get()->getStrings();
2433 row_data[i] = strings[irow];
2440 for (
int i = 0; i < num_columns; ++i) {
2442 if (type_info.is_string()) {
2443 auto tentative_geo_type =
2445 if (tentative_geo_type.has_value()) {
2446 data_preview.
column_types[i].set_type(tentative_geo_type.value());
2452 return data_preview;
2456 const std::vector<std::string>& file_paths,
2458 const bool do_metadata_stats_validation) {
2460 auto column_interval =
2463 CHECK(!file_paths.empty());
2467 const auto& first_path = *file_paths.begin();
2469 auto max_row_group_stats =
2473 do_metadata_stats_validation);
2475 const bool geo_validate_geometry =
2480 do_metadata_stats_validation,
2481 geo_validate_geometry);
2484 encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2492 std::vector<std::string> cache_subset;
2493 for (
auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2495 cache_subset.emplace_back(*path_it);
2499 auto table_ptr = schema.getForeignTable();
2503 std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2505 for (
const auto& path_group : paths_per_thread) {
2508 [&](
const auto& paths,
const auto& file_reader_cache)
2509 -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2510 std::list<RowGroupMetadata> reduced_metadata;
2511 MaxRowGroupSizeStats max_row_group_stats{0, 0};
2512 for (
const auto& path : paths.get()) {
2513 auto reader = file_reader_cache.get().getOrInsert(path,
file_system_);
2515 auto local_max_row_group_stats =
2519 do_metadata_stats_validation);
2520 if (local_max_row_group_stats.max_row_group_size >
2521 max_row_group_stats.max_row_group_size) {
2522 max_row_group_stats = local_max_row_group_stats;
2526 reduced_metadata.splice(
2527 reduced_metadata.end(),
2530 return {reduced_metadata, max_row_group_stats};
2532 std::ref(path_group),
2537 for (
auto& future : futures) {
2538 auto [metadata, local_max_row_group_stats] = future.get();
2539 row_group_metadata.splice(row_group_metadata.end(), metadata);
2540 if (local_max_row_group_stats.max_row_group_size >
2541 max_row_group_stats.max_row_group_size) {
2542 max_row_group_stats = local_max_row_group_stats;
2546 if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2548 max_row_group_stats, schema.getForeignTable()->maxFragRows);
2551 return row_group_metadata;
DEVICE auto upper_bound(ARGS &&...args)
std::shared_ptr< parquet::ColumnReader > col_reader_
std::list< ColumnDescriptor > column_descriptors
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
AbstractBuffer * getIndexBuf() const
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
std::vector< Chunk_NS::Chunk > column_chunks
static constexpr int32_t kMaxNumericPrecision
std::vector< std::string > column_names
std::vector< SQLTypeInfo > column_types
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
HOST DEVICE int get_scale() const
size_t get_num_threads(const ForeignTable &table)
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool geo_validate_geometry)
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
const int row_group_index_
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor *parquet_column)
MaxRowGroupSizeStats validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
InvalidRowGroupIndices & invalid_indices_
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
ParquetEncoder * encoder_
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void validate_list_column_metadata_statistics(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
int getParquetColumnIndex(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
int64_t max_row_group_size
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
void set_definition_levels_for_zero_max_definition_level_case(const parquet::ColumnDescriptor *parquet_column_descriptor, std::vector< int16_t > &def_levels)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
ParquetImportEncoder * import_encoder
SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor *parquet_column)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
void readAndValidateRowGroup()
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
SQLTypeInfo suggest_floating_point_mapping(const parquet::ColumnDescriptor *parquet_column)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
Load row groups of data into given chunks.
parquet::arrow::FileReader * ReaderPtr
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import, const bool is_for_detect)
int get_precision() const
int numLogicalColumns() const
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
void set_comp_param(int p)
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
const ForeignTable * foreign_table_
int64_t max_row_group_index
SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor *parquet_column)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
std::vector< int8_t > values
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
void eraseInvalidRowGroupData(const InvalidRowGroupIndices &invalid_indices)
DEVICE auto lower_bound(ARGS &&...args)
HOST DEVICE EncodingType get_compression() const
AbstractBuffer * getBuffer() const
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary, const bool geo_validate_geometry)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
DEVICE void iota(ARGS &&...args)
static const int batch_reader_num_elements
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
HOST DEVICE int get_dimension() const
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const bool geo_validate_geometry)
std::string get_type_name() const
void initializeIfEmpty(const std::string &path)
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
DataPreview previewFiles(const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
Preview rows of data and column types in a set of files.
std::vector< std::unique_ptr< TypedParquetDetectBuffer > > detect_buffers
std::vector< std::unique_ptr< RejectedRowIndices > > rejected_row_indices_per_column
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
FileReaderMap * file_reader_cache_
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
#define DEBUG_TIMER(name)
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const bool do_metadata_stats_validation, const bool geo_validate_geometry)
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const ForeignTable *foreign_table)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
const int parquet_column_index_
SQLTypeInfo suggest_boolean_type_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const ColumnDescriptor * getLogicalColumn(const int column_id) const
std::vector< int16_t > def_levels
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
std::vector< int16_t > rep_levels
SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool getOptionAsBool(const std::string_view &key) const
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const ColumnDescriptor * column_descriptor_
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
void set_precision(int d)
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const bool geo_validate_geometry)
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)
bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value)
HOST DEVICE void set_type(SQLTypes t)