19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
51 namespace foreign_storage {
56 return value >= lower_bound && value <=
upper_bound;
60 return (parquet_column->logical_type()->is_none() &&
61 parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
62 parquet_column->logical_type()->is_string();
102 const parquet::schema::Node* node = parquet_column->schema_node().get();
103 if ((node->name() !=
"element" && node->name() !=
"item") ||
104 !(node->is_required() ||
105 node->is_optional())) {
112 node = node->parent();
116 if (node->name() !=
"list" || !node->is_repeated() ||
122 node = node->parent();
126 if (!node->logical_type()->is_list() ||
127 !(node->is_optional() ||
128 node->is_required())) {
139 node = node->parent();
146 template <
typename V,
typename NullType>
149 const parquet::ColumnDescriptor* parquet_column_descriptor,
151 switch (parquet_column_descriptor->physical_type()) {
152 case parquet::Type::INT32:
153 return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
154 buffer, column_descriptor, parquet_column_descriptor);
155 case parquet::Type::INT64:
156 return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
157 buffer, column_descriptor, parquet_column_descriptor);
158 case parquet::Type::FIXED_LEN_BYTE_ARRAY:
159 return std::make_shared<
161 buffer, column_descriptor, parquet_column_descriptor);
162 case parquet::Type::BYTE_ARRAY:
163 return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
164 buffer, column_descriptor, parquet_column_descriptor);
173 const parquet::ColumnDescriptor* parquet_column,
175 const bool is_metadata_scan_or_for_import) {
176 if (parquet_column->logical_type()->is_decimal()) {
178 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
179 omnisci_column, parquet_column, buffer);
182 if (is_metadata_scan_or_for_import) {
185 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
186 omnisci_column, parquet_column, buffer);
188 return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
189 omnisci_column, parquet_column, buffer);
196 return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
197 omnisci_column, parquet_column, buffer);
199 return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
200 omnisci_column, parquet_column, buffer);
223 template <
typename V,
typename T,
typename U,
typename NullType>
224 std::shared_ptr<ParquetEncoder>
227 const size_t omnisci_data_type_byte_size,
228 const size_t parquet_data_type_byte_size,
229 const bool is_signed) {
230 CHECK(
sizeof(NullType) == omnisci_data_type_byte_size);
232 return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
233 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
235 return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
236 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
259 template <
typename V,
typename NullType>
262 const size_t omnisci_data_type_byte_size,
263 const size_t parquet_data_type_byte_size,
265 const bool is_signed) {
272 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
278 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
284 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
290 buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
299 const parquet::ColumnDescriptor* parquet_column,
301 const bool is_metadata_scan_or_for_import) {
302 auto column_type = omnisci_column->
columnType;
303 auto physical_type = parquet_column->physical_type();
306 int is_signed =
false;
308 if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
309 if (physical_type == parquet::Type::INT32) {
311 }
else if (physical_type == parquet::Type::INT64) {
319 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
320 parquet_column->logical_type().get())) {
321 bit_width = int_logical_column->bit_width();
322 is_signed = int_logical_column->is_signed();
325 if (bit_width == -1) {
329 const size_t omnisci_data_type_byte_size = column_type.get_size();
330 const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
332 switch (omnisci_data_type_byte_size) {
335 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
337 omnisci_data_type_byte_size,
338 parquet_data_type_byte_size,
342 if (is_metadata_scan_or_for_import && column_type.get_type() ==
kBIGINT) {
343 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
345 omnisci_data_type_byte_size,
346 parquet_data_type_byte_size,
350 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
352 omnisci_data_type_byte_size,
353 parquet_data_type_byte_size,
357 if (is_metadata_scan_or_for_import) {
358 switch (column_type.get_type()) {
360 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
362 omnisci_data_type_byte_size,
363 parquet_data_type_byte_size,
367 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
369 omnisci_data_type_byte_size,
370 parquet_data_type_byte_size,
379 return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
381 omnisci_data_type_byte_size,
382 parquet_data_type_byte_size,
386 if (is_metadata_scan_or_for_import) {
387 switch (column_type.get_type()) {
389 return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
391 omnisci_data_type_byte_size,
392 parquet_data_type_byte_size,
396 return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
398 omnisci_data_type_byte_size,
399 parquet_data_type_byte_size,
403 return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
405 omnisci_data_type_byte_size,
406 parquet_data_type_byte_size,
415 return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
417 omnisci_data_type_byte_size,
418 parquet_data_type_byte_size,
429 const parquet::ColumnDescriptor* parquet_column,
431 auto column_type = omnisci_column->
columnType;
432 if (!column_type.is_fp()) {
436 switch (column_type.get_type()) {
438 switch (parquet_column->physical_type()) {
439 case parquet::Type::FLOAT:
440 return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
441 buffer, omnisci_column, parquet_column);
442 case parquet::Type::DOUBLE:
443 return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
444 buffer, omnisci_column, parquet_column);
449 CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
450 return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
451 buffer, omnisci_column, parquet_column);
460 const parquet::ColumnDescriptor* parquet_column,
462 auto column_type = omnisci_column->
columnType;
463 if (parquet_column->logical_type()->is_none() &&
466 switch (column_type.get_type()) {
468 return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
469 buffer, omnisci_column, parquet_column);
480 template <
typename V,
typename T,
typename NullType>
483 const parquet::ColumnDescriptor* parquet_column,
485 if (
auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
486 parquet_column->logical_type().get())) {
487 switch (timestamp_logical_type->time_unit()) {
488 case parquet::LogicalType::TimeUnit::MILLIS:
489 return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
490 buffer, omnisci_column, parquet_column);
491 case parquet::LogicalType::TimeUnit::MICROS:
492 return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
493 buffer, omnisci_column, parquet_column);
494 case parquet::LogicalType::TimeUnit::NANOS:
495 return std::make_shared<
497 buffer, omnisci_column, parquet_column);
507 template <
typename V,
typename T,
typename NullType>
510 const parquet::ColumnDescriptor* parquet_column,
512 const bool is_metadata_scan_or_for_import) {
513 if (
auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
514 parquet_column->logical_type().get())) {
515 switch (timestamp_logical_type->time_unit()) {
516 case parquet::LogicalType::TimeUnit::MILLIS:
517 if (is_metadata_scan_or_for_import) {
518 return std::make_shared<
520 buffer, omnisci_column, parquet_column);
522 return std::make_shared<
524 buffer, omnisci_column, parquet_column);
525 case parquet::LogicalType::TimeUnit::MICROS:
526 if (is_metadata_scan_or_for_import) {
527 return std::make_shared<
529 buffer, omnisci_column, parquet_column);
531 return std::make_shared<
533 buffer, omnisci_column, parquet_column);
534 case parquet::LogicalType::TimeUnit::NANOS:
535 if (is_metadata_scan_or_for_import) {
536 return std::make_shared<
539 1000L * 1000L * 1000L,
541 buffer, omnisci_column, parquet_column);
543 return std::make_shared<
545 buffer, omnisci_column, parquet_column);
557 const parquet::ColumnDescriptor* parquet_column,
559 const bool is_metadata_scan_or_for_import) {
560 auto column_type = omnisci_column->
columnType;
562 if (parquet_column->logical_type()->is_timestamp()) {
564 if (precision == 0) {
565 return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
566 omnisci_column, parquet_column, buffer);
568 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
569 buffer, omnisci_column, parquet_column);
572 CHECK(column_type.get_comp_param() == 32);
573 if (is_metadata_scan_or_for_import) {
574 return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
575 omnisci_column, parquet_column, buffer);
577 return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
578 omnisci_column, parquet_column, buffer);
581 }
else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
582 if (parquet_column->physical_type() == parquet::Type::INT32) {
584 column_type.get_comp_param() == 32);
585 if (is_metadata_scan_or_for_import) {
586 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
587 buffer, omnisci_column, parquet_column);
589 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
590 buffer, omnisci_column, parquet_column);
592 }
else if (parquet_column->physical_type() == parquet::Type::INT64) {
594 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
595 buffer, omnisci_column, parquet_column);
597 CHECK(column_type.get_comp_param() == 32);
598 if (is_metadata_scan_or_for_import) {
599 return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
600 buffer, omnisci_column, parquet_column);
602 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
603 buffer, omnisci_column, parquet_column);
613 template <
typename V,
typename T,
typename NullType>
616 const parquet::ColumnDescriptor* parquet_column,
618 if (
auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
619 parquet_column->logical_type().get())) {
620 switch (time_logical_type->time_unit()) {
621 case parquet::LogicalType::TimeUnit::MILLIS:
622 return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
623 buffer, omnisci_column, parquet_column);
624 case parquet::LogicalType::TimeUnit::MICROS:
625 return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
626 buffer, omnisci_column, parquet_column);
627 case parquet::LogicalType::TimeUnit::NANOS:
628 return std::make_shared<
630 buffer, omnisci_column, parquet_column);
642 const parquet::ColumnDescriptor* parquet_column,
644 const bool is_metadata_scan_or_for_import) {
645 auto column_type = omnisci_column->
columnType;
646 if (
auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
647 parquet_column->logical_type().get())) {
649 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
650 return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
651 omnisci_column, parquet_column, buffer);
653 return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
654 omnisci_column, parquet_column, buffer);
657 if (is_metadata_scan_or_for_import) {
658 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
659 CHECK(parquet_column->physical_type() == parquet::Type::INT32);
660 return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
661 omnisci_column, parquet_column, buffer);
663 CHECK(time_logical_column->time_unit() ==
664 parquet::LogicalType::TimeUnit::MICROS ||
665 time_logical_column->time_unit() ==
666 parquet::LogicalType::TimeUnit::NANOS);
667 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
668 return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
669 omnisci_column, parquet_column, buffer);
672 if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
673 CHECK(parquet_column->physical_type() == parquet::Type::INT32);
674 return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
675 omnisci_column, parquet_column, buffer);
677 CHECK(time_logical_column->time_unit() ==
678 parquet::LogicalType::TimeUnit::MICROS ||
679 time_logical_column->time_unit() ==
680 parquet::LogicalType::TimeUnit::NANOS);
681 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
682 return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
683 omnisci_column, parquet_column, buffer);
695 const parquet::ColumnDescriptor* parquet_column,
697 const bool is_metadata_scan_or_for_import) {
698 auto column_type = omnisci_column->
columnType;
699 if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
701 if (is_metadata_scan_or_for_import) {
702 if (column_type.get_comp_param() ==
707 omnisci_column, parquet_column, buffer,
true);
708 }
else if (column_type.get_comp_param() == 16) {
712 omnisci_column, parquet_column, buffer,
true);
717 if (column_type.get_comp_param() ==
722 omnisci_column, parquet_column, buffer,
false);
723 }
else if (column_type.get_comp_param() == 16) {
727 omnisci_column, parquet_column, buffer,
false);
738 const parquet::ColumnDescriptor* parquet_column,
740 const bool is_metadata_scan_or_for_import) {
741 auto column_type = omnisci_column->
columnType;
742 if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
744 if (is_metadata_scan_or_for_import) {
745 if (column_type.get_comp_param() ==
749 }
else if (column_type.get_comp_param() == 16) {
756 if (column_type.get_comp_param() ==
758 return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
759 buffer, omnisci_column, parquet_column);
760 }
else if (column_type.get_comp_param() == 16) {
761 return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
762 buffer, omnisci_column, parquet_column);
769 buffer, omnisci_column, parquet_column);
779 const parquet::ColumnDescriptor* parquet_column,
782 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
784 const bool is_for_detect) {
785 auto column_type = omnisci_column->
columnType;
792 return std::make_shared<ParquetStringImportEncoder>(chunk.
getBuffer());
794 return std::make_shared<ParquetStringNoneEncoder>(chunk.
getBuffer(),
798 if (!is_for_detect) {
799 chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
800 std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
801 logical_chunk_metadata->sqlType = omnisci_column->
columnType;
802 switch (column_type.get_size()) {
804 return std::make_shared<ParquetStringEncoder<uint8_t>>(
807 is_for_import ?
nullptr : logical_chunk_metadata.get());
809 return std::make_shared<ParquetStringEncoder<uint16_t>>(
812 is_for_import ?
nullptr : logical_chunk_metadata.get());
814 return std::make_shared<ParquetStringEncoder<int32_t>>(
817 is_for_import ?
nullptr : logical_chunk_metadata.get());
822 return std::make_shared<ParquetDetectStringEncoder>(chunk.
getBuffer());
832 const parquet::ColumnDescriptor* parquet_column,
833 std::list<Chunk_NS::Chunk>& chunks,
834 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
836 const bool is_metadata_scan,
837 const bool is_for_import) {
838 auto column_type = omnisci_column->
columnType;
843 return std::make_shared<ParquetGeospatialImportEncoder>(chunks);
845 if (is_metadata_scan) {
846 return std::make_shared<ParquetGeospatialEncoder>(render_group_analyzer_map);
848 for (
auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
849 chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
850 auto& chunk_metadata_ptr = chunk_metadata.back();
851 chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
853 return std::make_shared<ParquetGeospatialEncoder>(
854 parquet_column, chunks, chunk_metadata, render_group_analyzer_map);
862 const parquet::ColumnDescriptor* parquet_column,
863 std::list<Chunk_NS::Chunk>& chunks,
865 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
866 const bool is_metadata_scan,
867 const bool is_for_import,
868 const bool is_for_detect);
904 const parquet::ColumnDescriptor* parquet_column,
905 std::list<Chunk_NS::Chunk>& chunks,
907 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
909 const bool is_metadata_scan =
false,
910 const bool is_for_import =
false,
911 const bool is_for_detect =
false) {
912 CHECK(!(is_metadata_scan && is_for_import));
913 auto buffer = chunks.empty() ?
nullptr : chunks.begin()->getBuffer();
918 render_group_analyzer_map,
934 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
938 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
946 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
954 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
958 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
962 omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
983 std::list<Chunk_NS::Chunk>& chunks,
985 const parquet::ColumnDescriptor* parquet_column,
988 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
994 render_group_analyzer_map,
1005 const parquet::ColumnDescriptor* parquet_column,
1007 std::list<Chunk_NS::Chunk> chunks;
1008 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1014 render_group_analyzer_map,
1020 const parquet::ColumnDescriptor* parquet_column,
1021 std::list<Chunk_NS::Chunk>& chunks,
1023 std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
1024 const bool is_metadata_scan,
1025 const bool is_for_import,
1026 const bool is_for_detect) {
1031 std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1042 CHECK(encoder.get());
1044 CHECK(scalar_encoder);
1045 if (!is_for_import) {
1046 if (!is_for_detect) {
1048 encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1049 is_metadata_scan ?
nullptr : chunks.begin()->getBuffer(),
1053 encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1054 is_metadata_scan ?
nullptr : chunks.begin()->getBuffer(),
1055 is_metadata_scan ?
nullptr : chunks.begin()->getIndexBuf(),
1060 encoder = std::make_shared<ParquetArrayDetectEncoder>(
1061 chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1064 encoder = std::make_shared<ParquetArrayImportEncoder>(
1065 chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1071 const parquet::ParquetFileReader* reader,
1072 const int row_group_index,
1073 const int column_index,
1074 const int16_t* def_levels,
1075 const int64_t num_levels,
1076 const parquet::ColumnDescriptor* parquet_column_descriptor) {
1078 if (!is_valid_parquet_list) {
1081 std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1082 reader->metadata()->RowGroup(row_group_index);
1083 auto column_metadata = group_metadata->ColumnChunk(column_index);
1085 if (!stats->HasMinMax()) {
1086 auto find_it = std::find_if(def_levels,
1087 def_levels + num_levels,
1088 [](
const int16_t def_level) {
return def_level == 3; });
1089 if (find_it != def_levels + num_levels) {
1090 throw std::runtime_error(
1091 "No minimum and maximum statistic set in list column but non-null & non-empty "
1092 "array/value detected.");
1099 const parquet::ColumnDescriptor* parquet_column_descriptor) {
1102 throw std::runtime_error(
1103 "Unsupported mapping detected. Column '" + parquet_column_descriptor->name() +
1104 "' detected to be a parquet list but HeavyDB mapped column '" +
1105 omnisci_column_descriptor->
columnName +
"' is not an array.");
1107 if (is_valid_parquet_list) {
1108 if (parquet_column_descriptor->max_repetition_level() != 1 ||
1109 parquet_column_descriptor->max_definition_level() != 3) {
1110 throw std::runtime_error(
1111 "Incorrect schema max repetition level detected in column '" +
1112 parquet_column_descriptor->name() +
1113 "'. Expected a max repetition level of 1 and max definition level of 3 for "
1114 "list column but column has a max "
1115 "repetition level of " +
1116 std::to_string(parquet_column_descriptor->max_repetition_level()) +
1117 " and a max definition level of " +
1118 std::to_string(parquet_column_descriptor->max_definition_level()) +
".");
1121 if (parquet_column_descriptor->max_repetition_level() != 0 ||
1122 parquet_column_descriptor->max_definition_level() != 1) {
1123 throw std::runtime_error(
1124 "Incorrect schema max repetition level detected in column '" +
1125 parquet_column_descriptor->name() +
1126 "'. Expected a max repetition level of 0 and max definition level of 1 for "
1127 "flat column but column has a max "
1128 "repetition level of " +
1129 std::to_string(parquet_column_descriptor->max_repetition_level()) +
1130 " and a max definition level of " +
1131 std::to_string(parquet_column_descriptor->max_definition_level()) +
".");
1137 const parquet::ColumnDescriptor* parquet_column,
1138 std::vector<int8_t>& values) {
1139 auto max_type_byte_size =
1141 parquet::GetTypeByteSize(parquet_column->physical_type()));
1142 size_t values_size =
1144 values.resize(values_size);
1148 const parquet::ColumnDescriptor* parquet_column) {
1149 if (
auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1150 parquet_column->logical_type().get())) {
1152 decimal_logical_column->precision() &&
1162 if (
auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1163 parquet_column->logical_type().get())) {
1164 auto parquet_precision = decimal_logical_column->precision();
1165 auto parquet_scale = decimal_logical_column->
scale();
1168 "Parquet column \"" + parquet_column->ToString() +
1169 "\" has decimal precision of " +
std::to_string(parquet_precision) +
1170 " which is too high to import, maximum precision supported is " +
1182 <<
" a Parquet column's decimal logical type failed to be read appropriately";
1187 const parquet::ColumnDescriptor* parquet_column) {
1195 return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1196 (parquet_column->physical_type() == parquet::Type::FLOAT &&
1203 const parquet::ColumnDescriptor* parquet_column) {
1205 if (parquet_column->physical_type() == parquet::Type::FLOAT) {
1207 }
else if (parquet_column->physical_type() == parquet::Type::DOUBLE) {
1218 const parquet::ColumnDescriptor* parquet_column) {
1222 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1223 parquet_column->logical_type().get())) {
1226 const int bits_per_byte = 8;
1229 const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1231 int_logical_column->bit_width() * bit_widening_factor;
1237 return (parquet_column->physical_type() == parquet::Type::INT64) ||
1238 (parquet_column->physical_type() == parquet::Type::INT32 &&
1247 if (
auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1248 parquet_column->logical_type().get())) {
1249 auto bit_width = int_logical_column->bit_width();
1250 if (!int_logical_column->is_signed()) {
1253 "Unsigned integer column \"" + parquet_column->name() +
1254 "\" in Parquet file with 64 bit-width has no supported type for ingestion "
1255 "that will not result in data loss");
1278 CHECK(parquet_column->logical_type()->is_none());
1279 if (parquet_column->physical_type() == parquet::Type::INT32) {
1282 CHECK(parquet_column->physical_type() == parquet::Type::INT64);
1294 const parquet::TimestampLogicalType* timestamp_logical_column) {
1295 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1303 const parquet::TimestampLogicalType* timestamp_logical_column) {
1304 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1312 const parquet::TimestampLogicalType* timestamp_logical_column) {
1313 return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1317 const parquet::ColumnDescriptor* parquet_column) {
1318 bool is_none_encoded_mapping =
1320 (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1322 return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1326 const parquet::ColumnDescriptor* parquet_column) {
1335 const parquet::ColumnDescriptor* parquet_column) {
1343 if (
auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1344 parquet_column->logical_type().get())) {
1359 if (parquet_column->logical_type()->is_none() &&
1360 ((parquet_column->physical_type() == parquet::Type::INT32 &&
1363 parquet_column->physical_type() == parquet::Type::INT64)) {
1370 if (
auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1371 parquet_column->logical_type().get())) {
1390 const parquet::ColumnDescriptor* parquet_column) {
1397 if (parquet_column->logical_type()->is_time()) {
1404 CHECK(parquet_column->logical_type()->is_time());
1406 type.set_type(
kTIME);
1408 type.set_fixed_size();
1413 const parquet::ColumnDescriptor* parquet_column) {
1424 return parquet_column->logical_type()->is_date() ||
1425 parquet_column->logical_type()
1430 CHECK(parquet_column->logical_type()->is_date());
1432 type.set_type(
kDATE);
1434 type.set_fixed_size();
1439 const parquet::ColumnDescriptor* parquet_column) {
1457 const parquet::ColumnDescriptor* parquet_column) {
1462 omnisci_column_sub_type_column.get(), parquet_column);
1468 const parquet::ColumnDescriptor* parquet_column) {
1474 const parquet::arrow::FileReader* new_file_reader,
1475 const std::string& reference_file_path,
1476 const std::string& new_file_path) {
1477 const auto reference_num_columns =
1478 reference_file_reader->parquet_reader()->metadata()->num_columns();
1479 const auto new_num_columns =
1480 new_file_reader->parquet_reader()->metadata()->num_columns();
1481 if (reference_num_columns != new_num_columns) {
1482 throw std::runtime_error{
"Parquet file \"" + new_file_path +
1483 "\" has a different schema. Please ensure that all Parquet "
1484 "files use the same schema. Reference Parquet file: \"" +
1485 reference_file_path +
"\" has " +
1487 " columns. New Parquet file \"" + new_file_path +
"\" has " +
1491 for (
int i = 0; i < reference_num_columns; i++) {
1494 reference_file_path,
1502 auto logical_type = parquet_column->logical_type();
1505 if (!allowed_type) {
1506 if (logical_type->is_timestamp()) {
1507 auto timestamp_type =
1508 dynamic_cast<const parquet::TimestampLogicalType*
>(logical_type.get());
1509 CHECK(timestamp_type);
1511 if (!timestamp_type->is_adjusted_to_utc()) {
1512 LOG(
WARNING) <<
"Non-UTC timezone specified in Parquet file for column \""
1514 <<
"\". Only UTC timezone is currently supported.";
1517 std::string parquet_type;
1518 if (parquet_column->logical_type()->is_none()) {
1519 parquet_type = parquet::TypeToString(physical_type);
1521 parquet_type = logical_type->ToString();
1524 throw std::runtime_error{
"Conversion from Parquet type \"" + parquet_type +
1525 "\" to HeavyDB type \"" + omnisci_type +
1526 "\" is not allowed. Please use an appropriate column type."};
1532 if (parquet_column->logical_type()->is_decimal()) {
1536 if (parquet_column->logical_type()->is_none() &&
1537 (parquet_column->physical_type() == parquet::Type::FLOAT ||
1538 parquet_column->physical_type() == parquet::Type::DOUBLE)) {
1542 if ((parquet_column->logical_type()->is_none() &&
1543 (parquet_column->physical_type() == parquet::Type::INT32 ||
1544 parquet_column->physical_type() == parquet::Type::INT64)) ||
1545 parquet_column->logical_type()->is_int()) {
1549 if (parquet_column->logical_type()->is_none() &&
1550 parquet_column->physical_type() == parquet::Type::BOOLEAN) {
1554 if (parquet_column->logical_type()->is_timestamp()) {
1558 if (parquet_column->logical_type()->is_time()) {
1562 if (parquet_column->logical_type()->is_date()) {
1571 parquet_column->ToString());
1575 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1576 const std::string& file_path,
1585 const int column_index,
1586 const std::string& file_path) {
1587 throw std::runtime_error{
1588 "Statistics metadata is required for all row groups. Metadata is missing for "
1589 "row group index: " +
1591 ", column index: " +
std::to_string(column_index) +
", file path: " + file_path};
1602 const int fragment_size) {
1604 "Parquet file has a row group size that is larger than the fragment size. "
1605 "Please set the table fragment size to a number that is larger than the "
1606 "row group size. Row group index: " +
1610 ", file path: " + max_row_group_stats.
file_path};
1611 metadata_scan_exception.min_feasible_fragment_size_ =
1613 throw metadata_scan_exception;
1617 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1618 const std::string& file_path,
1622 for (
int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1623 const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1626 }
catch (std::runtime_error& e) {
1627 std::stringstream error_message;
1628 error_message << e.what() <<
" Parquet column: " << descr->name()
1629 <<
", HeavyDB column: " << (*column_it)->columnName
1630 <<
", Parquet file: " << file_path <<
".";
1631 throw std::runtime_error(error_message.str());
1634 for (
int r = 0; r < file_metadata->num_row_groups(); ++r) {
1635 auto group_metadata = file_metadata->RowGroup(r);
1636 auto num_rows = group_metadata->num_rows();
1637 if (num_rows == 0) {
1639 }
else if (num_rows > max_row_group_stats.max_row_group_size) {
1640 max_row_group_stats.max_row_group_size = num_rows;
1641 max_row_group_stats.max_row_group_index = r;
1642 max_row_group_stats.file_path = file_path;
1645 auto column_chunk = group_metadata->ColumnChunk(i);
1646 bool contains_metadata = column_chunk->is_stats_set();
1647 if (contains_metadata) {
1648 auto stats = column_chunk->statistics();
1649 bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1655 if (!(stats->HasMinMax() || is_all_nulls || is_list)) {
1656 contains_metadata =
false;
1660 if (!contains_metadata) {
1665 return max_row_group_stats;
1669 const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1670 const std::string& file_path,
1677 const std::map<
int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1681 std::list<RowGroupMetadata> row_group_metadata;
1682 auto column_interval =
1686 auto file_metadata = reader->parquet_reader()->metadata();
1687 for (
int row_group = row_group_interval.
start_index;
1688 row_group <= row_group_interval.
end_index;
1690 auto& row_group_metadata_item = row_group_metadata.emplace_back();
1691 row_group_metadata_item.row_group_index = row_group;
1692 row_group_metadata_item.file_path = row_group_interval.
file_path;
1694 std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1695 file_metadata->RowGroup(row_group);
1697 for (
int column_id = column_interval.start; column_id <= column_interval.end;
1701 auto encoder_map_iter =
1703 CHECK(encoder_map_iter != encoder_map.end());
1705 auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1706 group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1707 row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1708 }
catch (
const std::exception& e) {
1709 std::stringstream error_message;
1710 error_message << e.what() <<
" in row group " << row_group <<
" of Parquet file '"
1711 << row_group_interval.
file_path <<
"'.";
1712 throw std::runtime_error(error_message.str());
1716 return row_group_metadata;
1720 const std::map<int, Chunk_NS::Chunk> chunks,
1723 const std::map<int, StringDictionary*> column_dictionaries,
1724 const int64_t num_rows,
1726 std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1727 auto file_metadata = reader->parquet_reader()->metadata();
1728 for (
auto& [column_id, chunk] : chunks) {
1730 if (column_descriptor->isGeoPhyCol) {
1733 auto parquet_column_descriptor =
1735 auto find_it = column_dictionaries.find(column_id);
1737 (find_it == column_dictionaries.end() ?
nullptr : find_it->second);
1738 std::list<Chunk_NS::Chunk> chunks_for_import;
1739 chunks_for_import.push_back(chunk);
1740 if (column_descriptor->columnType.is_geometry()) {
1741 for (
int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1742 chunks_for_import.push_back(chunks.at(column_id + i + 1));
1747 parquet_column_descriptor,
1749 render_group_analyzer_map);
1754 if (
auto inplace_encoder = dynamic_cast<ParquetInPlaceEncoder*>(encoder.get())) {
1755 inplace_encoder->reserve(num_rows);
1766 const bool do_metadata_stats_validation) {
1767 std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1768 auto file_metadata = reader->parquet_reader()->metadata();
1769 for (
int column_id = column_interval.
start; column_id <= column_interval.
end;
1772 auto parquet_column_descriptor =
1775 column_descriptor, parquet_column_descriptor, render_group_analyzer_map);
1776 if (!do_metadata_stats_validation) {
1779 column_id += column_descriptor->columnType.get_physical_cols();
1786 const std::vector<RowGroupInterval>& row_group_intervals,
1787 const int parquet_column_index,
1789 std::list<Chunk_NS::Chunk>& chunks,
1792 const bool is_for_detect,
1793 const std::optional<int64_t> max_levels_read) {
1795 std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1801 std::vector<int8_t> values;
1803 CHECK(!row_group_intervals.empty());
1804 const auto& first_file_path = row_group_intervals.front().file_path;
1807 auto first_parquet_column_descriptor =
1811 first_parquet_column_descriptor,
1819 CHECK(encoder.get());
1821 if (rejected_row_indices) {
1822 encoder->initializeErrorTracking(column_descriptor->
columnType);
1825 bool early_exit =
false;
1826 int64_t total_levels_read = 0;
1827 for (
const auto& row_group_interval : row_group_intervals) {
1828 const auto& file_path = row_group_interval.file_path;
1832 CHECK(row_group_interval.start_index >= 0 &&
1833 row_group_interval.end_index < num_row_groups);
1834 CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1836 parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1837 auto parquet_column_descriptor =
1840 parquet_column_descriptor,
1845 parquet_column_descriptor);
1846 int64_t values_read = 0;
1847 for (
int row_group_index = row_group_interval.start_index;
1848 row_group_index <= row_group_interval.end_index;
1849 ++row_group_index) {
1850 auto group_reader = parquet_reader->RowGroup(row_group_index);
1851 std::shared_ptr<parquet::ColumnReader> col_reader =
1852 group_reader->Column(parquet_column_index);
1855 while (col_reader->HasNext()) {
1856 int64_t levels_read =
1860 reinterpret_cast<uint8_t*
>(values.data()),
1866 parquet_column_index,
1869 parquet_column_descriptor);
1871 if (rejected_row_indices) {
1872 encoder->appendDataTrackErrors(def_levels.data(),
1878 encoder->appendData(def_levels.data(),
1885 if (max_levels_read.has_value()) {
1886 total_levels_read += levels_read;
1887 if (total_levels_read >= max_levels_read.value()) {
1893 if (
auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1894 array_encoder->finalizeRowGroup();
1896 }
catch (
const std::exception& error) {
1899 if (boost::regex_search(error.what(),
1900 boost::regex{
"Deserializing page header failed."})) {
1902 "Unable to read from foreign data source, possible cause is an unexpected "
1903 "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1906 "if data source has been updated. Foreign table: " +
1911 std::string(error.what()) +
" Row group: " +
std::to_string(row_group_index) +
1912 ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1913 "', Parquet file: '" + file_path +
"'");
1915 if (max_levels_read.has_value() && early_exit) {
1919 if (max_levels_read.has_value() && early_exit) {
1924 if (rejected_row_indices) {
1925 *rejected_row_indices = encoder->getRejectedRowIndices();
1927 return chunk_metadata;
1931 const parquet::ColumnDescriptor* parquet_column) {
1936 return type.get_array_type();
1944 const parquet::ColumnDescriptor* parquet_column) {
1979 std::shared_ptr<arrow::fs::FileSystem> file_system,
1982 const std::string& foreign_table_name)
1983 : file_system_(file_system)
1984 , file_reader_cache_(file_map)
1985 , render_group_analyzer_map_{render_group_analyzer_map}
1986 , foreign_table_name_(foreign_table_name) {}
1989 const std::vector<RowGroupInterval>& row_group_intervals,
1990 const int parquet_column_index,
1991 std::list<Chunk_NS::Chunk>& chunks,
1994 CHECK(!chunks.empty());
1995 auto const& chunk = *chunks.begin();
1996 auto column_descriptor = chunk.getColumnDesc();
1997 auto buffer = chunk.getBuffer();
2002 parquet_column_index,
2006 rejected_row_indices);
2008 }
catch (
const std::exception& error) {
2030 const parquet::ColumnDescriptor* parquet_column_descriptor,
2033 const int row_group_index,
2034 const int parquet_column_index,
2035 const parquet::ParquetFileReader* parquet_reader)
2057 reinterpret_cast<uint8_t*
>(batch_data.
values.data()),
2073 batch_data.
values.data(),
2077 if (
auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(
encoder_)) {
2078 array_encoder->finalizeRowGroup();
2100 const std::map<int, Chunk_NS::Chunk>& chunks,
2102 const std::map<int, StringDictionary*>& column_dictionaries,
2103 const int num_threads) {
2106 const auto& file_path = row_group_interval.
file_path;
2110 auto file_reader = file_reader_owner.get();
2111 auto file_metadata = file_reader->parquet_reader()->metadata();
2119 auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2122 }
catch (std::runtime_error& e) {
2123 std::stringstream error_message;
2124 error_message << e.what() <<
" Parquet column: " << parquet_column->name()
2125 <<
", HeavyDB column: " << column_descriptor->columnName
2126 <<
", Parquet file: " << file_path <<
".";
2127 throw std::runtime_error(error_message.str());
2132 auto row_group_index = row_group_interval.
start_index;
2133 std::map<int, ParquetRowGroupReader> row_group_reader_map;
2135 parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2136 auto group_reader = parquet_reader->RowGroup(row_group_index);
2138 std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2143 column_dictionaries,
2144 group_reader->metadata()->num_rows(),
2147 std::vector<std::set<int>> partitions(num_threads);
2148 std::map<int, int> column_id_to_thread;
2149 for (
auto& [column_id, encoder] : encoder_map) {
2150 auto thread_id = column_id % num_threads;
2151 column_id_to_thread[column_id] =
thread_id;
2152 partitions[
thread_id].insert(column_id);
2155 for (
auto& [column_id, encoder] : encoder_map) {
2158 auto parquet_column_descriptor =
2159 file_metadata->schema()->Column(parquet_column_index);
2164 row_group_interval.
end_index < num_row_groups);
2165 CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2167 parquet_column_descriptor);
2169 std::shared_ptr<parquet::ColumnReader> col_reader =
2170 group_reader->Column(parquet_column_index);
2172 row_group_reader_map.insert(
2176 parquet_column_descriptor,
2179 column_id_to_thread, column_id)],
2181 parquet_column_index,
2185 std::vector<std::future<void>> futures;
2186 for (
int ithread = 0; ithread < num_threads; ++ithread) {
2187 auto column_ids_for_thread = partitions[ithread];
2188 futures.emplace_back(
2190 for (
const auto column_id : column_ids_for_thread) {
2192 .readAndValidateRowGroup();
2198 for (
auto& future : futures) {
2202 for (
auto& future : futures) {
2208 for (
auto& thread_invalid_indices : invalid_indices_per_thread) {
2209 invalid_indices.merge(thread_invalid_indices);
2212 for (
auto& [_, reader] : row_group_reader_map) {
2213 reader.eraseInvalidRowGroupData(
2219 auto column_id = column_descriptor->columnId;
2221 CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2222 invalid_indices.size());
2223 size_t updated_num_elems = db_encoder->getNumElems() +
2224 group_reader->metadata()->num_rows() -
2225 invalid_indices.size();
2226 db_encoder->setNumElems(updated_num_elems);
2227 if (column_descriptor->columnType.is_geometry()) {
2228 for (
int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2231 db_encoder->setNumElems(updated_num_elems);
2236 return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2237 invalid_indices.size()};
2248 const size_t max_num_rows,
2250 CHECK(!files.empty());
2252 auto first_file = *files.begin();
2255 for (
auto current_file_it = ++files.begin(); current_file_it != files.end();
2256 ++current_file_it) {
2261 auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2262 auto num_columns = first_file_metadata->num_columns();
2267 auto current_file_it = files.begin();
2268 while (data_preview.
sample_rows.size() < max_num_rows &&
2269 current_file_it != files.end()) {
2270 size_t total_num_rows = data_preview.
sample_rows.size();
2271 size_t max_num_rows_to_append = max_num_rows - data_preview.
sample_rows.size();
2274 std::vector<RowGroupInterval> row_group_intervals;
2275 for (; current_file_it != files.end(); ++current_file_it) {
2276 const auto& file_path = *current_file_it;
2278 auto file_metadata = file_reader->parquet_reader()->metadata();
2279 auto num_row_groups = file_metadata->num_row_groups();
2280 int end_row_group = 0;
2281 for (
int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2282 const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2283 total_num_rows += next_num_rows;
2286 row_group_intervals.push_back(
RowGroupInterval{file_path, 0, end_row_group});
2290 for (
int i = 0; i < num_columns; ++i) {
2291 auto col = first_file_metadata->schema()->Column(i);
2296 sql_type.is_array() ? col->path()->ToDotVector()[0] +
"_array" : col->name();
2304 std::make_unique<TypedParquetDetectBuffer>());
2306 std::make_unique<RejectedRowIndices>());
2308 auto& chunk = preview_context.
column_chunks.emplace_back(&cd);
2309 chunk.setPinnable(
false);
2310 chunk.setBuffer(detect_buffer.get());
2313 std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2314 [&](
const std::vector<int>& column_indices) {
2315 for (
const auto& column_index : column_indices) {
2317 auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2318 auto& rejected_row_indices =
2322 chunk.getColumnDesc(),
2325 rejected_row_indices.get(),
2327 max_num_rows_to_append);
2333 std::vector<int> columns(num_columns);
2334 std::iota(columns.begin(), columns.end(), 0);
2337 for (
auto& future : futures) {
2340 for (
auto& future : futures) {
2345 auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2346 for (
int i = 0; i < num_columns; ++i) {
2347 rejected_row_indices->insert(
2352 size_t num_rows = 0;
2354 for (
int i = 0; i < num_columns; ++i, ++buffers_it) {
2356 auto& strings = buffers_it->get()->getStrings();
2358 num_rows = strings.size();
2360 CHECK_EQ(num_rows, strings.size());
2364 size_t num_rejected_rows = rejected_row_indices->size();
2366 CHECK_GE(num_rows, num_rejected_rows);
2367 auto row_count = num_rows - num_rejected_rows;
2369 auto offset_row = data_preview.
sample_rows.size();
2370 data_preview.
sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2372 for (
size_t irow = 0, rows_appended = 0;
2373 irow < num_rows && offset_row + rows_appended < max_num_rows;
2375 if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2378 auto& row_data = data_preview.
sample_rows[offset_row + rows_appended];
2379 row_data.resize(num_columns);
2381 for (
int i = 0; i < num_columns; ++i, ++buffers_it) {
2383 auto& strings = buffers_it->get()->getStrings();
2384 row_data[i] = strings[irow];
2391 for (
int i = 0; i < num_columns; ++i) {
2393 if (type_info.is_string()) {
2394 auto tentative_geo_type =
2396 if (tentative_geo_type.has_value()) {
2397 data_preview.
column_types[i].set_type(tentative_geo_type.value());
2403 return data_preview;
2407 const std::vector<std::string>& file_paths,
2409 const bool do_metadata_stats_validation) {
2411 auto column_interval =
2414 CHECK(!file_paths.empty());
2418 const auto& first_path = *file_paths.begin();
2421 first_reader->parquet_reader()->metadata(), first_path, schema);
2426 do_metadata_stats_validation);
2429 encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2437 std::vector<std::string> cache_subset;
2438 for (
auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2440 cache_subset.emplace_back(*path_it);
2448 std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2450 for (
const auto& path_group : paths_per_thread) {
2453 [&](
const auto& paths,
const auto& file_reader_cache)
2454 -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2455 std::list<RowGroupMetadata> reduced_metadata;
2456 MaxRowGroupSizeStats max_row_group_stats{0, 0};
2457 for (
const auto& path : paths.get()) {
2458 auto reader = file_reader_cache.get().getOrInsert(path,
file_system_);
2461 reader->parquet_reader()->metadata(), path, schema);
2462 if (local_max_row_group_stats.max_row_group_size >
2463 max_row_group_stats.max_row_group_size) {
2464 max_row_group_stats = local_max_row_group_stats;
2468 reduced_metadata.splice(
2469 reduced_metadata.end(),
2472 return {reduced_metadata, max_row_group_stats};
2474 std::ref(path_group),
2479 for (
auto& future : futures) {
2480 auto [metadata, local_max_row_group_stats] = future.get();
2481 row_group_metadata.splice(row_group_metadata.end(), metadata);
2482 if (local_max_row_group_stats.max_row_group_size >
2483 max_row_group_stats.max_row_group_size) {
2484 max_row_group_stats = local_max_row_group_stats;
2493 return row_group_metadata;
DEVICE auto upper_bound(ARGS &&...args)
std::shared_ptr< parquet::ColumnReader > col_reader_
std::list< ColumnDescriptor > column_descriptors
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
AbstractBuffer * getIndexBuf() const
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
std::vector< Chunk_NS::Chunk > column_chunks
static constexpr int32_t kMaxNumericPrecision
std::vector< std::string > column_names
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan, const bool is_for_import)
std::vector< SQLTypeInfo > column_types
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect)
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
HOST DEVICE int get_scale() const
size_t get_num_threads(const ForeignTable &table)
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
const int row_group_index_
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
MaxRowGroupSizeStats validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor *parquet_column)
InvalidRowGroupIndices & invalid_indices_
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
ParquetEncoder * encoder_
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
std::string foreign_table_name_
int getParquetColumnIndex(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary, const RenderGroupAnalyzerMap *render_group_analyzer_map)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
int64_t max_row_group_size
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
const ForeignTable * getForeignTable() const
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
ParquetImportEncoder * import_encoder
SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor *parquet_column)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const RenderGroupAnalyzerMap *render_group_analyzer_map, const std::string &foreign_table_name)
void readAndValidateRowGroup()
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
SQLTypeInfo suggest_floating_point_mapping(const parquet::ColumnDescriptor *parquet_column)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
Load row groups of data into given chunks.
parquet::arrow::FileReader * ReaderPtr
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import, const bool is_for_detect)
int get_precision() const
int numLogicalColumns() const
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const RenderGroupAnalyzerMap *render_group_analyzer_map)
void set_comp_param(int p)
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
int64_t max_row_group_index
SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor *parquet_column)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
std::vector< int8_t > values
void eraseInvalidRowGroupData(const InvalidRowGroupIndices &invalid_indices)
DEVICE auto lower_bound(ARGS &&...args)
HOST DEVICE EncodingType get_compression() const
AbstractBuffer * getBuffer() const
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
DEVICE void iota(ARGS &&...args)
static const int batch_reader_num_elements
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const RenderGroupAnalyzerMap *render_group_analyzer_map)
HOST DEVICE int get_dimension() const
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::string get_type_name() const
void initializeIfEmpty(const std::string &path)
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
DataPreview previewFiles(const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
Preview rows of data and column types in a set of files.
std::vector< std::unique_ptr< TypedParquetDetectBuffer > > detect_buffers
std::vector< std::unique_ptr< RejectedRowIndices > > rejected_row_indices_per_column
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
FileReaderMap * file_reader_cache_
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
#define DEBUG_TIMER(name)
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool do_metadata_stats_validation)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
const int parquet_column_index_
SQLTypeInfo suggest_boolean_type_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const ColumnDescriptor * getLogicalColumn(const int column_id) const
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
std::vector< int16_t > def_levels
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan=false, const bool is_for_import=false, const bool is_for_detect=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
std::vector< int16_t > rep_levels
SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const ColumnDescriptor * column_descriptor_
void validate_definition_levels(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
void set_precision(int d)
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)
bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value)
HOST DEVICE void set_type(SQLTypes t)