OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
27 
30 #include "FsiChunkUtils.h"
35 #include "ParquetDecimalEncoder.h"
41 #include "ParquetStringEncoder.h"
44 #include "ParquetTimeEncoder.h"
47 #include "Shared/misc.h"
50 
51 namespace foreign_storage {
52 
53 namespace {
54 
55 bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value) {
56  return value >= lower_bound && value <= upper_bound;
57 }
58 
59 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
60  return (parquet_column->logical_type()->is_none() &&
61  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
62  parquet_column->logical_type()->is_string();
63 }
64 
101 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
102  const parquet::schema::Node* node = parquet_column->schema_node().get();
103  if ((node->name() != "element" && node->name() != "item") ||
104  !(node->is_required() ||
105  node->is_optional())) { // ensure first innermost node is named "element"
106  // which is required by the parquet specification;
107  // however testing shows that pyarrow generates this
108  // column with the name of "item"
109  // this field must be either required or optional
110  return false;
111  }
112  node = node->parent();
113  if (!node) { // required nested structure
114  return false;
115  }
116  if (node->name() != "list" || !node->is_repeated() ||
117  !node->is_group()) { // ensure second innermost node is named "list" which is
118  // a repeated group; this is
119  // required by the parquet specification
120  return false;
121  }
122  node = node->parent();
123  if (!node) { // required nested structure
124  return false;
125  }
126  if (!node->logical_type()->is_list() ||
127  !(node->is_optional() ||
128  node->is_required())) { // ensure third outermost node has logical type LIST
129  // which is either optional or required; this is required
130  // by the parquet specification
131  return false;
132  }
133  node =
134  node->parent(); // this must now be the root node of schema which is required by
135  // FSI (lists can not be embedded into a deeper nested structure)
136  if (!node) { // required nested structure
137  return false;
138  }
139  node = node->parent();
140  if (node) { // implies the previous node was not the root node
141  return false;
142  }
143  return true;
144 }
145 
146 template <typename V, typename NullType>
147 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
148  const ColumnDescriptor* column_descriptor,
149  const parquet::ColumnDescriptor* parquet_column_descriptor,
150  AbstractBuffer* buffer) {
151  switch (parquet_column_descriptor->physical_type()) {
152  case parquet::Type::INT32:
153  return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
154  buffer, column_descriptor, parquet_column_descriptor);
155  case parquet::Type::INT64:
156  return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
157  buffer, column_descriptor, parquet_column_descriptor);
158  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
159  return std::make_shared<
161  buffer, column_descriptor, parquet_column_descriptor);
162  case parquet::Type::BYTE_ARRAY:
163  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
164  buffer, column_descriptor, parquet_column_descriptor);
165  default:
166  UNREACHABLE();
167  }
168  return {};
169 }
170 
171 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
172  const ColumnDescriptor* omnisci_column,
173  const parquet::ColumnDescriptor* parquet_column,
174  AbstractBuffer* buffer,
175  const bool is_metadata_scan_or_for_import) {
176  if (parquet_column->logical_type()->is_decimal()) {
177  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
178  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
179  omnisci_column, parquet_column, buffer);
180  }
181  CHECK(omnisci_column->columnType.get_compression() == kENCODING_FIXED);
182  if (is_metadata_scan_or_for_import) {
183  switch (omnisci_column->columnType.get_comp_param()) {
184  case 16:
185  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
186  omnisci_column, parquet_column, buffer);
187  case 32:
188  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
189  omnisci_column, parquet_column, buffer);
190  default:
191  UNREACHABLE();
192  }
193  } else {
194  switch (omnisci_column->columnType.get_comp_param()) {
195  case 16:
196  return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
197  omnisci_column, parquet_column, buffer);
198  case 32:
199  return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
200  omnisci_column, parquet_column, buffer);
201  default:
202  UNREACHABLE();
203  }
204  }
205  }
206  return {};
207 }
208 
223 template <typename V, typename T, typename U, typename NullType>
224 std::shared_ptr<ParquetEncoder>
226  AbstractBuffer* buffer,
227  const size_t omnisci_data_type_byte_size,
228  const size_t parquet_data_type_byte_size,
229  const bool is_signed) {
230  CHECK(sizeof(NullType) == omnisci_data_type_byte_size);
231  if (is_signed) {
232  return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
233  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
234  } else {
235  return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
236  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
237  }
238 }
239 
259 template <typename V, typename NullType>
261  AbstractBuffer* buffer,
262  const size_t omnisci_data_type_byte_size,
263  const size_t parquet_data_type_byte_size,
264  const int bit_width,
265  const bool is_signed) {
266  switch (bit_width) {
267  case 8:
269  int32_t,
270  uint8_t,
271  NullType>(
272  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
273  case 16:
275  int32_t,
276  uint16_t,
277  NullType>(
278  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
279  case 32:
281  int32_t,
282  uint32_t,
283  NullType>(
284  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
285  case 64:
287  int64_t,
288  uint64_t,
289  NullType>(
290  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
291  default:
292  UNREACHABLE();
293  }
294  return {};
295 }
296 
297 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
298  const ColumnDescriptor* omnisci_column,
299  const parquet::ColumnDescriptor* parquet_column,
300  AbstractBuffer* buffer,
301  const bool is_metadata_scan_or_for_import) {
302  auto column_type = omnisci_column->columnType;
303  auto physical_type = parquet_column->physical_type();
304 
305  int bit_width = -1;
306  int is_signed = false;
307  // handle the integral case with no Parquet annotation
308  if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
309  if (physical_type == parquet::Type::INT32) {
310  bit_width = 32;
311  } else if (physical_type == parquet::Type::INT64) {
312  bit_width = 64;
313  } else {
314  UNREACHABLE();
315  }
316  is_signed = true;
317  }
318  // handle the integral case with Parquet annotation
319  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
320  parquet_column->logical_type().get())) {
321  bit_width = int_logical_column->bit_width();
322  is_signed = int_logical_column->is_signed();
323  }
324 
325  if (bit_width == -1) { // no valid logical type (with or without annotation) found
326  return {};
327  }
328 
329  const size_t omnisci_data_type_byte_size = column_type.get_size();
330  const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
331 
332  switch (omnisci_data_type_byte_size) {
333  case 8:
334  CHECK(column_type.get_compression() == kENCODING_NONE);
335  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
336  buffer,
337  omnisci_data_type_byte_size,
338  parquet_data_type_byte_size,
339  bit_width,
340  is_signed);
341  case 4:
342  if (is_metadata_scan_or_for_import && column_type.get_type() == kBIGINT) {
343  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
344  buffer,
345  omnisci_data_type_byte_size,
346  parquet_data_type_byte_size,
347  bit_width,
348  is_signed);
349  }
350  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
351  buffer,
352  omnisci_data_type_byte_size,
353  parquet_data_type_byte_size,
354  bit_width,
355  is_signed);
356  case 2:
357  if (is_metadata_scan_or_for_import) {
358  switch (column_type.get_type()) {
359  case kBIGINT:
360  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
361  buffer,
362  omnisci_data_type_byte_size,
363  parquet_data_type_byte_size,
364  bit_width,
365  is_signed);
366  case kINT:
367  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
368  buffer,
369  omnisci_data_type_byte_size,
370  parquet_data_type_byte_size,
371  bit_width,
372  is_signed);
373  case kSMALLINT:
374  break;
375  default:
376  UNREACHABLE();
377  }
378  }
379  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
380  buffer,
381  omnisci_data_type_byte_size,
382  parquet_data_type_byte_size,
383  bit_width,
384  is_signed);
385  case 1:
386  if (is_metadata_scan_or_for_import) {
387  switch (column_type.get_type()) {
388  case kBIGINT:
389  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
390  buffer,
391  omnisci_data_type_byte_size,
392  parquet_data_type_byte_size,
393  bit_width,
394  is_signed);
395  case kINT:
396  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
397  buffer,
398  omnisci_data_type_byte_size,
399  parquet_data_type_byte_size,
400  bit_width,
401  is_signed);
402  case kSMALLINT:
403  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
404  buffer,
405  omnisci_data_type_byte_size,
406  parquet_data_type_byte_size,
407  bit_width,
408  is_signed);
409  case kTINYINT:
410  break;
411  default:
412  UNREACHABLE();
413  }
414  }
415  return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
416  buffer,
417  omnisci_data_type_byte_size,
418  parquet_data_type_byte_size,
419  bit_width,
420  is_signed);
421  default:
422  UNREACHABLE();
423  }
424  return {};
425 }
426 
427 std::shared_ptr<ParquetEncoder> create_parquet_floating_point_encoder(
428  const ColumnDescriptor* omnisci_column,
429  const parquet::ColumnDescriptor* parquet_column,
430  AbstractBuffer* buffer) {
431  auto column_type = omnisci_column->columnType;
432  if (!column_type.is_fp()) {
433  return {};
434  }
435  CHECK_EQ(column_type.get_compression(), kENCODING_NONE);
436  switch (column_type.get_type()) {
437  case kFLOAT:
438  switch (parquet_column->physical_type()) {
439  case parquet::Type::FLOAT:
440  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
441  buffer, omnisci_column, parquet_column);
442  case parquet::Type::DOUBLE:
443  return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
444  buffer, omnisci_column, parquet_column);
445  default:
446  UNREACHABLE();
447  }
448  case kDOUBLE:
449  CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
450  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
451  buffer, omnisci_column, parquet_column);
452  default:
453  UNREACHABLE();
454  }
455  return {};
456 }
457 
458 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
459  const ColumnDescriptor* omnisci_column,
460  const parquet::ColumnDescriptor* parquet_column,
461  AbstractBuffer* buffer) {
462  auto column_type = omnisci_column->columnType;
463  if (parquet_column->logical_type()->is_none() &&
464  !omnisci_column->columnType.is_string()) { // boolean
465  if (column_type.get_compression() == kENCODING_NONE) {
466  switch (column_type.get_type()) {
467  case kBOOLEAN:
468  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
469  buffer, omnisci_column, parquet_column);
470  default:
471  UNREACHABLE();
472  }
473  } else {
474  UNREACHABLE();
475  }
476  }
477  return {};
478 }
479 
480 template <typename V, typename T, typename NullType>
481 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder_with_types(
482  const ColumnDescriptor* omnisci_column,
483  const parquet::ColumnDescriptor* parquet_column,
484  AbstractBuffer* buffer) {
485  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
486  parquet_column->logical_type().get())) {
487  switch (timestamp_logical_type->time_unit()) {
488  case parquet::LogicalType::TimeUnit::MILLIS:
489  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
490  buffer, omnisci_column, parquet_column);
491  case parquet::LogicalType::TimeUnit::MICROS:
492  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
493  buffer, omnisci_column, parquet_column);
494  case parquet::LogicalType::TimeUnit::NANOS:
495  return std::make_shared<
497  buffer, omnisci_column, parquet_column);
498  default:
499  UNREACHABLE();
500  }
501  } else {
502  UNREACHABLE();
503  }
504  return {};
505 }
506 
507 template <typename V, typename T, typename NullType>
509  const ColumnDescriptor* omnisci_column,
510  const parquet::ColumnDescriptor* parquet_column,
511  AbstractBuffer* buffer,
512  const bool is_metadata_scan_or_for_import) {
513  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
514  parquet_column->logical_type().get())) {
515  switch (timestamp_logical_type->time_unit()) {
516  case parquet::LogicalType::TimeUnit::MILLIS:
517  if (is_metadata_scan_or_for_import) {
518  return std::make_shared<
520  buffer, omnisci_column, parquet_column);
521  }
522  return std::make_shared<
524  buffer, omnisci_column, parquet_column);
525  case parquet::LogicalType::TimeUnit::MICROS:
526  if (is_metadata_scan_or_for_import) {
527  return std::make_shared<
529  buffer, omnisci_column, parquet_column);
530  }
531  return std::make_shared<
533  buffer, omnisci_column, parquet_column);
534  case parquet::LogicalType::TimeUnit::NANOS:
535  if (is_metadata_scan_or_for_import) {
536  return std::make_shared<
538  T,
539  1000L * 1000L * 1000L,
540  NullType>>(
541  buffer, omnisci_column, parquet_column);
542  }
543  return std::make_shared<
545  buffer, omnisci_column, parquet_column);
546  default:
547  UNREACHABLE();
548  }
549  } else {
550  UNREACHABLE();
551  }
552  return {};
553 }
554 
555 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
556  const ColumnDescriptor* omnisci_column,
557  const parquet::ColumnDescriptor* parquet_column,
558  AbstractBuffer* buffer,
559  const bool is_metadata_scan_or_for_import) {
560  auto column_type = omnisci_column->columnType;
561  auto precision = column_type.get_precision();
562  if (parquet_column->logical_type()->is_timestamp()) {
563  if (column_type.get_compression() == kENCODING_NONE) {
564  if (precision == 0) {
565  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
566  omnisci_column, parquet_column, buffer);
567  } else {
568  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
569  buffer, omnisci_column, parquet_column);
570  }
571  } else if (column_type.get_compression() == kENCODING_FIXED) {
572  CHECK(column_type.get_comp_param() == 32);
573  if (is_metadata_scan_or_for_import) {
574  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
575  omnisci_column, parquet_column, buffer);
576  } else {
577  return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
578  omnisci_column, parquet_column, buffer);
579  }
580  }
581  } else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
582  if (parquet_column->physical_type() == parquet::Type::INT32) {
583  CHECK(column_type.get_compression() == kENCODING_FIXED &&
584  column_type.get_comp_param() == 32);
585  if (is_metadata_scan_or_for_import) {
586  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
587  buffer, omnisci_column, parquet_column);
588  } else {
589  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
590  buffer, omnisci_column, parquet_column);
591  }
592  } else if (parquet_column->physical_type() == parquet::Type::INT64) {
593  if (column_type.get_compression() == kENCODING_NONE) {
594  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
595  buffer, omnisci_column, parquet_column);
596  } else if (column_type.get_compression() == kENCODING_FIXED) {
597  CHECK(column_type.get_comp_param() == 32);
598  if (is_metadata_scan_or_for_import) {
599  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
600  buffer, omnisci_column, parquet_column);
601  } else {
602  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
603  buffer, omnisci_column, parquet_column);
604  }
605  }
606  } else {
607  UNREACHABLE();
608  }
609  }
610  return {};
611 }
612 
613 template <typename V, typename T, typename NullType>
614 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder_with_types(
615  const ColumnDescriptor* omnisci_column,
616  const parquet::ColumnDescriptor* parquet_column,
617  AbstractBuffer* buffer) {
618  if (auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
619  parquet_column->logical_type().get())) {
620  switch (time_logical_type->time_unit()) {
621  case parquet::LogicalType::TimeUnit::MILLIS:
622  return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
623  buffer, omnisci_column, parquet_column);
624  case parquet::LogicalType::TimeUnit::MICROS:
625  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
626  buffer, omnisci_column, parquet_column);
627  case parquet::LogicalType::TimeUnit::NANOS:
628  return std::make_shared<
630  buffer, omnisci_column, parquet_column);
631  default:
632  UNREACHABLE();
633  }
634  } else {
635  UNREACHABLE();
636  }
637  return {};
638 }
639 
640 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
641  const ColumnDescriptor* omnisci_column,
642  const parquet::ColumnDescriptor* parquet_column,
643  AbstractBuffer* buffer,
644  const bool is_metadata_scan_or_for_import) {
645  auto column_type = omnisci_column->columnType;
646  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
647  parquet_column->logical_type().get())) {
648  if (column_type.get_compression() == kENCODING_NONE) {
649  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
650  return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
651  omnisci_column, parquet_column, buffer);
652  } else {
653  return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
654  omnisci_column, parquet_column, buffer);
655  }
656  } else if (column_type.get_compression() == kENCODING_FIXED) {
657  if (is_metadata_scan_or_for_import) {
658  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
659  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
660  return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
661  omnisci_column, parquet_column, buffer);
662  } else {
663  CHECK(time_logical_column->time_unit() ==
664  parquet::LogicalType::TimeUnit::MICROS ||
665  time_logical_column->time_unit() ==
666  parquet::LogicalType::TimeUnit::NANOS);
667  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
668  return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
669  omnisci_column, parquet_column, buffer);
670  }
671  } else {
672  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
673  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
674  return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
675  omnisci_column, parquet_column, buffer);
676  } else {
677  CHECK(time_logical_column->time_unit() ==
678  parquet::LogicalType::TimeUnit::MICROS ||
679  time_logical_column->time_unit() ==
680  parquet::LogicalType::TimeUnit::NANOS);
681  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
682  return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
683  omnisci_column, parquet_column, buffer);
684  }
685  }
686  } else {
687  UNREACHABLE();
688  }
689  }
690  return {};
691 }
692 
693 std::shared_ptr<ParquetEncoder> create_parquet_date_from_timestamp_encoder(
694  const ColumnDescriptor* omnisci_column,
695  const parquet::ColumnDescriptor* parquet_column,
696  AbstractBuffer* buffer,
697  const bool is_metadata_scan_or_for_import) {
698  auto column_type = omnisci_column->columnType;
699  if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
700  CHECK(column_type.get_compression() == kENCODING_DATE_IN_DAYS);
701  if (is_metadata_scan_or_for_import) {
702  if (column_type.get_comp_param() ==
703  0) { // DATE ENCODING FIXED (32) uses comp param 0
705  int64_t,
706  int32_t>(
707  omnisci_column, parquet_column, buffer, true);
708  } else if (column_type.get_comp_param() == 16) {
710  int64_t,
711  int16_t>(
712  omnisci_column, parquet_column, buffer, true);
713  } else {
714  UNREACHABLE();
715  }
716  } else {
717  if (column_type.get_comp_param() ==
718  0) { // DATE ENCODING FIXED (32) uses comp param 0
720  int64_t,
721  int32_t>(
722  omnisci_column, parquet_column, buffer, false);
723  } else if (column_type.get_comp_param() == 16) {
725  int64_t,
726  int16_t>(
727  omnisci_column, parquet_column, buffer, false);
728  } else {
729  UNREACHABLE();
730  }
731  }
732  }
733  return {};
734 }
735 
736 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
737  const ColumnDescriptor* omnisci_column,
738  const parquet::ColumnDescriptor* parquet_column,
739  AbstractBuffer* buffer,
740  const bool is_metadata_scan_or_for_import) {
741  auto column_type = omnisci_column->columnType;
742  if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
743  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
744  if (is_metadata_scan_or_for_import) {
745  if (column_type.get_comp_param() ==
746  0) { // DATE ENCODING FIXED (32) uses comp param 0
747  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int32_t>>(
748  buffer);
749  } else if (column_type.get_comp_param() == 16) {
750  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int16_t>>(
751  buffer);
752  } else {
753  UNREACHABLE();
754  }
755  } else {
756  if (column_type.get_comp_param() ==
757  0) { // DATE ENCODING FIXED (32) uses comp param 0
758  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
759  buffer, omnisci_column, parquet_column);
760  } else if (column_type.get_comp_param() == 16) {
761  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
762  buffer, omnisci_column, parquet_column);
763  } else {
764  UNREACHABLE();
765  }
766  }
767  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
768  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int64_t>>(
769  buffer, omnisci_column, parquet_column);
770  } else {
771  UNREACHABLE();
772  }
773  }
774  return {};
775 }
776 
777 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
778  const ColumnDescriptor* omnisci_column,
779  const parquet::ColumnDescriptor* parquet_column,
780  const Chunk_NS::Chunk& chunk,
781  StringDictionary* string_dictionary,
782  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
783  bool is_for_import,
784  const bool is_for_detect) {
785  auto column_type = omnisci_column->columnType;
786  if (!is_valid_parquet_string(parquet_column) ||
787  !omnisci_column->columnType.is_string()) {
788  return {};
789  }
790  if (column_type.get_compression() == kENCODING_NONE) {
791  if (is_for_import) {
792  return std::make_shared<ParquetStringImportEncoder>(chunk.getBuffer());
793  } else {
794  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
795  chunk.getIndexBuf());
796  }
797  } else if (column_type.get_compression() == kENCODING_DICT) {
798  if (!is_for_detect) { // non-detect use case
799  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
800  std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
801  logical_chunk_metadata->sqlType = omnisci_column->columnType;
802  switch (column_type.get_size()) {
803  case 1:
804  return std::make_shared<ParquetStringEncoder<uint8_t>>(
805  chunk.getBuffer(),
806  string_dictionary,
807  is_for_import ? nullptr : logical_chunk_metadata.get());
808  case 2:
809  return std::make_shared<ParquetStringEncoder<uint16_t>>(
810  chunk.getBuffer(),
811  string_dictionary,
812  is_for_import ? nullptr : logical_chunk_metadata.get());
813  case 4:
814  return std::make_shared<ParquetStringEncoder<int32_t>>(
815  chunk.getBuffer(),
816  string_dictionary,
817  is_for_import ? nullptr : logical_chunk_metadata.get());
818  default:
819  UNREACHABLE();
820  }
821  } else { // detect use-case
822  return std::make_shared<ParquetDetectStringEncoder>(chunk.getBuffer());
823  }
824  } else {
825  UNREACHABLE();
826  }
827  return {};
828 }
829 
830 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
831  const ColumnDescriptor* omnisci_column,
832  const parquet::ColumnDescriptor* parquet_column,
833  std::list<Chunk_NS::Chunk>& chunks,
834  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
835  const bool is_metadata_scan,
836  const bool is_for_import,
837  const bool geo_validate_geometry) {
838  auto column_type = omnisci_column->columnType;
839  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
840  return {};
841  }
842  if (is_for_import) {
843  return std::make_shared<ParquetGeospatialImportEncoder>(chunks,
844  geo_validate_geometry);
845  }
846  if (is_metadata_scan) {
847  return std::make_shared<ParquetGeospatialEncoder>(geo_validate_geometry);
848  }
849  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
850  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
851  auto& chunk_metadata_ptr = chunk_metadata.back();
852  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
853  }
854  return std::make_shared<ParquetGeospatialEncoder>(
855  parquet_column, chunks, chunk_metadata, geo_validate_geometry);
856 }
857 
858 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
859 // `create_parquet_array_encoder` each make use of each other, so
860 // one of the two functions must have a forward declaration
861 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
862  const ColumnDescriptor* omnisci_column,
863  const parquet::ColumnDescriptor* parquet_column,
864  std::list<Chunk_NS::Chunk>& chunks,
865  StringDictionary* string_dictionary,
866  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
867  const bool is_metadata_scan,
868  const bool is_for_import,
869  const bool is_for_detect,
870  const bool geo_validate_geometry);
871 
904 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
905  const ColumnDescriptor* omnisci_column,
906  const parquet::ColumnDescriptor* parquet_column,
907  std::list<Chunk_NS::Chunk>& chunks,
908  StringDictionary* string_dictionary,
909  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
910  const bool is_metadata_scan,
911  const bool is_for_import,
912  const bool is_for_detect,
913  const bool geo_validate_geometry) {
914  CHECK(!(is_metadata_scan && is_for_import));
915  auto buffer = chunks.empty() ? nullptr : chunks.begin()->getBuffer();
916  if (auto encoder = create_parquet_geospatial_encoder(omnisci_column,
917  parquet_column,
918  chunks,
919  chunk_metadata,
920  is_metadata_scan,
921  is_for_import,
922  geo_validate_geometry)) {
923  return encoder;
924  }
925  if (auto encoder = create_parquet_array_encoder(omnisci_column,
926  parquet_column,
927  chunks,
928  string_dictionary,
929  chunk_metadata,
930  is_metadata_scan,
931  is_for_import,
932  is_for_detect,
933  geo_validate_geometry)) {
934  return encoder;
935  }
936  if (auto encoder = create_parquet_decimal_encoder(
937  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
938  return encoder;
939  }
940  if (auto encoder = create_parquet_integral_encoder(
941  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
942  return encoder;
943  }
944  if (auto encoder =
945  create_parquet_floating_point_encoder(omnisci_column, parquet_column, buffer)) {
946  return encoder;
947  }
948  if (auto encoder = create_parquet_timestamp_encoder(
949  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
950  return encoder;
951  }
952  if (auto encoder =
953  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
954  return encoder;
955  }
956  if (auto encoder = create_parquet_time_encoder(
957  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
958  return encoder;
959  }
961  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
962  return encoder;
963  }
964  if (auto encoder = create_parquet_date_encoder(
965  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
966  return encoder;
967  }
968  if (auto encoder = create_parquet_string_encoder(
969  omnisci_column,
970  parquet_column,
971  chunks.empty() ? Chunk_NS::Chunk{} : *chunks.begin(),
972  string_dictionary,
973  chunk_metadata,
974  is_for_import,
975  is_for_detect)) {
976  return encoder;
977  }
978  UNREACHABLE();
979  return {};
980 }
981 
985 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_import(
986  std::list<Chunk_NS::Chunk>& chunks,
987  const ColumnDescriptor* omnisci_column,
988  const parquet::ColumnDescriptor* parquet_column,
989  StringDictionary* string_dictionary,
990  const bool geo_validate_geometry) {
991  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
992  return create_parquet_encoder(omnisci_column,
993  parquet_column,
994  chunks,
995  string_dictionary,
996  chunk_metadata,
997  false,
998  true,
999  false,
1000  geo_validate_geometry);
1001 }
1002 
1007 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_metadata_scan(
1008  const ColumnDescriptor* omnisci_column,
1009  const parquet::ColumnDescriptor* parquet_column,
1010  const bool geo_validate_geometry) {
1011  std::list<Chunk_NS::Chunk> chunks;
1012  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1013  return create_parquet_encoder(omnisci_column,
1014  parquet_column,
1015  chunks,
1016  nullptr,
1017  chunk_metadata,
1018  true,
1019  false,
1020  false,
1021  geo_validate_geometry);
1022 }
1023 
1024 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
1025  const ColumnDescriptor* omnisci_column,
1026  const parquet::ColumnDescriptor* parquet_column,
1027  std::list<Chunk_NS::Chunk>& chunks,
1028  StringDictionary* string_dictionary,
1029  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
1030  const bool is_metadata_scan,
1031  const bool is_for_import,
1032  const bool is_for_detect,
1033  const bool geo_validate_geometry) {
1034  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
1035  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
1036  return {};
1037  }
1038  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1039  get_sub_type_column_descriptor(omnisci_column);
1040  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
1041  parquet_column,
1042  chunks,
1043  string_dictionary,
1044  chunk_metadata,
1045  is_metadata_scan,
1046  is_for_import,
1047  is_for_detect,
1048  geo_validate_geometry);
1049  CHECK(encoder.get());
1050  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
1051  CHECK(scalar_encoder);
1052  if (!is_for_import) {
1053  if (!is_for_detect) {
1054  if (omnisci_column->columnType.is_fixlen_array()) {
1055  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1056  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1057  scalar_encoder,
1058  omnisci_column);
1059  } else {
1060  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1061  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1062  is_metadata_scan ? nullptr : chunks.begin()->getIndexBuf(),
1063  scalar_encoder,
1064  omnisci_column);
1065  }
1066  } else { // is_for_detect
1067  encoder = std::make_shared<ParquetArrayDetectEncoder>(
1068  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1069  }
1070  } else { // is_for_import
1071  encoder = std::make_shared<ParquetArrayImportEncoder>(
1072  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1073  }
1074  return encoder;
1075 }
1076 
1078  const parquet::ParquetFileReader* reader,
1079  const int row_group_index,
1080  const int column_index,
1081  const int16_t* def_levels,
1082  const int64_t num_levels,
1083  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1084  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1085  if (!is_valid_parquet_list) {
1086  return;
1087  }
1088  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1089  reader->metadata()->RowGroup(row_group_index);
1090  auto column_metadata = group_metadata->ColumnChunk(column_index);
1091  // In case of a empty row group do not validate
1092  if (group_metadata->num_rows() == 0) {
1093  return;
1094  }
1095  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
1096  if (!stats->HasMinMax()) {
1097  auto find_it = std::find_if(def_levels,
1098  def_levels + num_levels,
1099  [](const int16_t def_level) { return def_level == 3; });
1100  if (find_it != def_levels + num_levels) {
1101  throw std::runtime_error(
1102  "No minimum and maximum statistic set in list column but non-null & non-empty "
1103  "array/value detected.");
1104  }
1105  }
1106 }
1107 
1114  const parquet::ColumnDescriptor* parquet_column_descriptor,
1115  std::vector<int16_t>& def_levels) {
1116  if (!is_valid_parquet_list_column(parquet_column_descriptor) &&
1117  parquet_column_descriptor->max_definition_level() == 0) {
1118  if (!parquet_column_descriptor->schema_node()->is_required()) {
1119  throw std::runtime_error(
1120  "Unsupported parquet column detected. Column '" +
1121  parquet_column_descriptor->path()->ToDotString() +
1122  "' detected to have max definition level of 0 but is optional.");
1123  }
1124  def_levels.assign(def_levels.size(), 1);
1125  }
1126 }
1127 
1129  const ColumnDescriptor* omnisci_column_descriptor,
1130  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1131  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1132  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
1133  throw std::runtime_error(
1134  "Unsupported mapping detected. Column '" +
1135  parquet_column_descriptor->path()->ToDotString() +
1136  "' detected to be a parquet list but HeavyDB mapped column '" +
1137  omnisci_column_descriptor->columnName + "' is not an array.");
1138  }
1139  if (is_valid_parquet_list) {
1140  if (parquet_column_descriptor->max_repetition_level() != 1 ||
1141  parquet_column_descriptor->max_definition_level() != 3) {
1142  throw std::runtime_error(
1143  "Incorrect schema max repetition level detected in column '" +
1144  parquet_column_descriptor->path()->ToDotString() +
1145  "'. Expected a max repetition level of 1 and max definition level of 3 for "
1146  "list column but column has a max "
1147  "repetition level of " +
1148  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1149  " and a max definition level of " +
1150  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1151  }
1152  } else {
1153  if (parquet_column_descriptor->max_repetition_level() != 0 ||
1154  !(parquet_column_descriptor->max_definition_level() == 1 ||
1155  parquet_column_descriptor->max_definition_level() == 0)) {
1156  throw std::runtime_error(
1157  "Incorrect schema max repetition level detected in column '" +
1158  parquet_column_descriptor->path()->ToDotString() +
1159  "'. Expected a max repetition level of 0 and max definition level of 1 or 0 "
1160  "for "
1161  "flat column but column has a max "
1162  "repetition level of " +
1163  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1164  " and a max definition level of " +
1165  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1166  }
1167  }
1168 }
1169 
1170 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
1171  const parquet::ColumnDescriptor* parquet_column,
1172  std::vector<int8_t>& values) {
1173  auto max_type_byte_size =
1174  std::max(omnisci_column->columnType.get_size(),
1175  parquet::GetTypeByteSize(parquet_column->physical_type()));
1176  size_t values_size =
1178  values.resize(values_size);
1179 }
1180 
1181 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
1182  const parquet::ColumnDescriptor* parquet_column) {
1183  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1184  parquet_column->logical_type().get())) {
1185  return omnisci_column->columnType.get_precision() ==
1186  decimal_logical_column->precision() &&
1187  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
1188  omnisci_column->columnType.is_decimal() &&
1189  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1190  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1191  }
1192  return false;
1193 }
1194 
1195 SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor* parquet_column) {
1196  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1197  parquet_column->logical_type().get())) {
1198  auto parquet_precision = decimal_logical_column->precision();
1199  auto parquet_scale = decimal_logical_column->scale();
1200  if (parquet_precision > sql_constants::kMaxNumericPrecision) {
1202  "Parquet column \"" + parquet_column->ToString() +
1203  "\" has decimal precision of " + std::to_string(parquet_precision) +
1204  " which is too high to import, maximum precision supported is " +
1206  }
1207  SQLTypeInfo type;
1208  type.set_type(kDECIMAL);
1210  type.set_precision(parquet_precision);
1211  type.set_scale(parquet_scale);
1212  type.set_fixed_size();
1213  return type;
1214  }
1215  UNREACHABLE()
1216  << " a Parquet column's decimal logical type failed to be read appropriately";
1217  return {};
1218 }
1219 
1221  const parquet::ColumnDescriptor* parquet_column) {
1222  if (!omnisci_column->columnType.is_fp()) {
1223  return false;
1224  }
1225  // check if mapping is a valid coerced or non-coerced floating point mapping
1226  // with no annotation (floating point columns have no annotation in the
1227  // Parquet specification)
1228  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1229  return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1230  (parquet_column->physical_type() == parquet::Type::FLOAT &&
1231  omnisci_column->columnType.get_type() == kFLOAT);
1232  }
1233  return false;
1234 }
1235 
1237  const parquet::ColumnDescriptor* parquet_column) {
1238  SQLTypeInfo type;
1239  if (parquet_column->physical_type() == parquet::Type::FLOAT) {
1240  type.set_type(kFLOAT);
1241  } else if (parquet_column->physical_type() == parquet::Type::DOUBLE) {
1242  type.set_type(kDOUBLE);
1243  } else {
1244  UNREACHABLE();
1245  }
1247  type.set_fixed_size();
1248  return type;
1249 }
1250 
1251 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
1252  const parquet::ColumnDescriptor* parquet_column) {
1253  if (!omnisci_column->columnType.is_integer()) {
1254  return false;
1255  }
1256  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1257  parquet_column->logical_type().get())) {
1258  CHECK(omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1259  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1260  const int bits_per_byte = 8;
1261  // unsigned types are permitted to map to a wider integral type in order to avoid
1262  // precision loss
1263  const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1264  return omnisci_column->columnType.get_size() * bits_per_byte <=
1265  int_logical_column->bit_width() * bit_widening_factor;
1266  }
1267  // check if mapping is a valid coerced or non-coerced integral mapping with no
1268  // annotation
1269  if ((omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1270  omnisci_column->columnType.get_compression() == kENCODING_FIXED)) {
1271  return (parquet_column->physical_type() == parquet::Type::INT64) ||
1272  (parquet_column->physical_type() == parquet::Type::INT32 &&
1273  omnisci_column->columnType.get_size() <= 4);
1274  }
1275  return false;
1276 }
1277 
1278 SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor* parquet_column) {
1279  SQLTypeInfo type;
1281  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1282  parquet_column->logical_type().get())) {
1283  auto bit_width = int_logical_column->bit_width();
1284  if (!int_logical_column->is_signed()) {
1285  if (within_range(33, 64, bit_width)) {
1287  "Unsigned integer column \"" + parquet_column->path()->ToDotString() +
1288  "\" in Parquet file with 64 bit-width has no supported type for ingestion "
1289  "that will not result in data loss");
1290  } else if (within_range(17, 32, bit_width)) {
1291  type.set_type(kBIGINT);
1292  } else if (within_range(9, 16, bit_width)) {
1293  type.set_type(kINT);
1294  } else if (within_range(0, 8, bit_width)) {
1295  type.set_type(kSMALLINT);
1296  }
1297  } else {
1298  if (within_range(33, 64, bit_width)) {
1299  type.set_type(kBIGINT);
1300  } else if (within_range(17, 32, bit_width)) {
1301  type.set_type(kINT);
1302  } else if (within_range(9, 16, bit_width)) {
1303  type.set_type(kSMALLINT);
1304  } else if (within_range(0, 8, bit_width)) {
1305  type.set_type(kTINYINT);
1306  }
1307  }
1308  type.set_fixed_size();
1309  return type;
1310  }
1311 
1312  CHECK(parquet_column->logical_type()->is_none());
1313  if (parquet_column->physical_type() == parquet::Type::INT32) {
1314  type.set_type(kINT);
1315  } else {
1316  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
1317  type.set_type(kBIGINT);
1318  }
1319  type.set_fixed_size();
1320  return type;
1321 }
1322 
1323 bool is_nanosecond_precision(const ColumnDescriptor* omnisci_column) {
1324  return omnisci_column->columnType.get_dimension() == 9;
1325 }
1326 
1328  const parquet::TimestampLogicalType* timestamp_logical_column) {
1329  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1330 }
1331 
1332 bool is_microsecond_precision(const ColumnDescriptor* omnisci_column) {
1333  return omnisci_column->columnType.get_dimension() == 6;
1334 }
1335 
1337  const parquet::TimestampLogicalType* timestamp_logical_column) {
1338  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1339 }
1340 
1341 bool is_millisecond_precision(const ColumnDescriptor* omnisci_column) {
1342  return omnisci_column->columnType.get_dimension() == 3;
1343 }
1344 
1346  const parquet::TimestampLogicalType* timestamp_logical_column) {
1347  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1348 }
1349 
1351  const parquet::ColumnDescriptor* parquet_column) {
1352  bool is_none_encoded_mapping =
1353  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
1354  (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1355  omnisci_column->columnType.get_type() == kBOOLEAN);
1356  return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1357 }
1358 
1360  const parquet::ColumnDescriptor* parquet_column) {
1361  SQLTypeInfo type;
1363  type.set_type(kBOOLEAN);
1364  type.set_fixed_size();
1365  return type;
1366 }
1367 
1369  const parquet::ColumnDescriptor* parquet_column) {
1370  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
1371  ((omnisci_column->columnType.get_compression() == kENCODING_NONE) ||
1372  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1373  omnisci_column->columnType.get_comp_param() == 32)))) {
1374  return false;
1375  }
1376  // check the annotated case
1377  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1378  parquet_column->logical_type().get())) {
1379  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1380  return omnisci_column->columnType.get_dimension() == 0 ||
1381  ((is_nanosecond_precision(omnisci_column) &&
1382  is_nanosecond_precision(timestamp_logical_column)) ||
1383  (is_microsecond_precision(omnisci_column) &&
1384  is_microsecond_precision(timestamp_logical_column)) ||
1385  (is_millisecond_precision(omnisci_column) &&
1386  is_millisecond_precision(timestamp_logical_column)));
1387  }
1388  if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
1389  return omnisci_column->columnType.get_dimension() == 0;
1390  }
1391  }
1392  // check the unannotated case
1393  if (parquet_column->logical_type()->is_none() &&
1394  ((parquet_column->physical_type() == parquet::Type::INT32 &&
1395  omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1396  omnisci_column->columnType.get_comp_param() == 32) ||
1397  parquet_column->physical_type() == parquet::Type::INT64)) {
1398  return true;
1399  }
1400  return false;
1401 }
1402 
1403 SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor* parquet_column) {
1404  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1405  parquet_column->logical_type().get())) {
1406  SQLTypeInfo type;
1407  type.set_type(kTIMESTAMP);
1409  if (is_nanosecond_precision(timestamp_logical_column)) {
1410  type.set_precision(9);
1411  } else if (is_microsecond_precision(timestamp_logical_column)) {
1412  type.set_precision(6);
1413  } else if (is_millisecond_precision(timestamp_logical_column)) {
1414  type.set_precision(3);
1415  }
1416  type.set_fixed_size();
1417  return type;
1418  }
1419  UNREACHABLE();
1420  return {};
1421 }
1422 
1423 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
1424  const parquet::ColumnDescriptor* parquet_column) {
1425  if (!(omnisci_column->columnType.get_type() == kTIME &&
1426  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1427  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1428  omnisci_column->columnType.get_comp_param() == 32)))) {
1429  return false;
1430  }
1431  if (parquet_column->logical_type()->is_time()) {
1432  return true;
1433  }
1434  return false;
1435 }
1436 
1437 SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor* parquet_column) {
1438  CHECK(parquet_column->logical_type()->is_time());
1439  SQLTypeInfo type;
1440  type.set_type(kTIME);
1441  type.set_compression(kENCODING_NONE);
1442  type.set_fixed_size();
1443  return type;
1444 }
1445 
1446 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
1447  const parquet::ColumnDescriptor* parquet_column) {
1448  if (!(omnisci_column->columnType.get_type() == kDATE &&
1449  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
1450  (omnisci_column->columnType.get_comp_param() ==
1451  0 // DATE ENCODING DAYS (32) specifies comp_param of 0
1452  || omnisci_column->columnType.get_comp_param() == 16)) ||
1453  omnisci_column->columnType.get_compression() ==
1454  kENCODING_NONE // for array types
1455  ))) {
1456  return false;
1457  }
1458  return parquet_column->logical_type()->is_date() ||
1459  parquet_column->logical_type()
1460  ->is_timestamp(); // to support TIMESTAMP -> DATE coercion
1461 }
1462 
1463 SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor* parquet_column) {
1464  CHECK(parquet_column->logical_type()->is_date());
1465  SQLTypeInfo type;
1466  type.set_type(kDATE);
1467  type.set_compression(kENCODING_NONE);
1468  type.set_fixed_size();
1469  return type;
1470 }
1471 
1472 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
1473  const parquet::ColumnDescriptor* parquet_column) {
1474  return is_valid_parquet_string(parquet_column) &&
1475  omnisci_column->columnType.is_string() &&
1476  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1477  omnisci_column->columnType.get_compression() == kENCODING_DICT);
1478 }
1479 
1480 SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor* parquet_column) {
1481  CHECK(is_valid_parquet_string(parquet_column));
1482  SQLTypeInfo type;
1483  type.set_type(kTEXT);
1485  type.set_comp_param(32);
1486  type.set_fixed_size();
1487  return type;
1488 }
1489 
1491  const parquet::ColumnDescriptor* parquet_column) {
1492  return is_valid_parquet_string(parquet_column) &&
1493  omnisci_column->columnType.is_geometry();
1494 }
1495 
1496 void validate_equal_schema(const parquet::arrow::FileReader* reference_file_reader,
1497  const parquet::arrow::FileReader* new_file_reader,
1498  const std::string& reference_file_path,
1499  const std::string& new_file_path) {
1500  const auto reference_num_columns =
1501  reference_file_reader->parquet_reader()->metadata()->num_columns();
1502  const auto new_num_columns =
1503  new_file_reader->parquet_reader()->metadata()->num_columns();
1504  if (reference_num_columns != new_num_columns) {
1505  throw std::runtime_error{"Parquet file \"" + new_file_path +
1506  "\" has a different schema. Please ensure that all Parquet "
1507  "files use the same schema. Reference Parquet file: \"" +
1508  reference_file_path + "\" has " +
1509  std::to_string(reference_num_columns) +
1510  " columns. New Parquet file \"" + new_file_path + "\" has " +
1511  std::to_string(new_num_columns) + " columns."};
1512  }
1513 
1514  for (int i = 0; i < reference_num_columns; i++) {
1515  validate_equal_column_descriptor(get_column_descriptor(reference_file_reader, i),
1516  get_column_descriptor(new_file_reader, i),
1517  reference_file_path,
1518  new_file_path);
1519  }
1520 }
1521 
1522 void validate_allowed_mapping(const parquet::ColumnDescriptor* parquet_column,
1523  const ColumnDescriptor* omnisci_column) {
1524  validate_max_repetition_and_definition_level(omnisci_column, parquet_column);
1525  bool allowed_type = false;
1526  if (omnisci_column->columnType.is_array()) {
1527  if (is_valid_parquet_list_column(parquet_column)) {
1528  auto omnisci_column_sub_type_column =
1529  get_sub_type_column_descriptor(omnisci_column);
1531  omnisci_column_sub_type_column.get(), parquet_column);
1532  }
1533  } else {
1534  allowed_type =
1535  LazyParquetChunkLoader::isColumnMappingSupported(omnisci_column, parquet_column);
1536  }
1537  if (!allowed_type) {
1538  auto logical_type = parquet_column->logical_type();
1539  if (logical_type->is_timestamp()) {
1540  auto timestamp_type =
1541  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
1542  CHECK(timestamp_type);
1543 
1544  if (!timestamp_type->is_adjusted_to_utc()) {
1545  LOG(WARNING) << "Non-UTC timezone specified in Parquet file for column \""
1546  << omnisci_column->columnName
1547  << "\". Only UTC timezone is currently supported.";
1548  }
1549  }
1550  std::string parquet_type;
1551  parquet::Type::type physical_type = parquet_column->physical_type();
1552  if (parquet_column->logical_type()->is_none()) {
1553  parquet_type = parquet::TypeToString(physical_type);
1554  } else {
1555  parquet_type = logical_type->ToString();
1556  }
1557  std::string omnisci_type = omnisci_column->columnType.get_type_name();
1558  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
1559  "\" to HeavyDB type \"" + omnisci_type +
1560  "\" is not allowed. Please use an appropriate column type."};
1561  }
1562 }
1563 
1564 SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor* parquet_column) {
1565  // decimal case
1566  if (parquet_column->logical_type()->is_decimal()) {
1567  return suggest_decimal_mapping(parquet_column);
1568  }
1569  // float case
1570  if (parquet_column->logical_type()->is_none() &&
1571  (parquet_column->physical_type() == parquet::Type::FLOAT ||
1572  parquet_column->physical_type() == parquet::Type::DOUBLE)) {
1573  return suggest_floating_point_mapping(parquet_column);
1574  }
1575  // integral case
1576  if ((parquet_column->logical_type()->is_none() &&
1577  (parquet_column->physical_type() == parquet::Type::INT32 ||
1578  parquet_column->physical_type() == parquet::Type::INT64)) ||
1579  parquet_column->logical_type()->is_int()) {
1580  return suggest_integral_mapping(parquet_column);
1581  }
1582  // boolean case
1583  if (parquet_column->logical_type()->is_none() &&
1584  parquet_column->physical_type() == parquet::Type::BOOLEAN) {
1585  return suggest_boolean_type_mapping(parquet_column);
1586  }
1587  // timestamp case
1588  if (parquet_column->logical_type()->is_timestamp()) {
1589  return suggest_timestamp_mapping(parquet_column);
1590  }
1591  // time case
1592  if (parquet_column->logical_type()->is_time()) {
1593  return suggest_time_mapping(parquet_column);
1594  }
1595  // date case
1596  if (parquet_column->logical_type()->is_date()) {
1597  return suggest_date_mapping(parquet_column);
1598  }
1599  // string case
1600  if (is_valid_parquet_string(parquet_column)) {
1601  return suggest_string_mapping(parquet_column);
1602  }
1603 
1604  throw ForeignStorageException("Unsupported data type detected for column: " +
1605  parquet_column->ToString());
1606 }
1607 
1609  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1610  const std::string& file_path,
1611  const ForeignTableSchema& schema) {
1612  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
1614  schema.numLogicalColumns(), file_metadata->num_columns(), file_path);
1615  }
1616 }
1617 
1618 void throw_missing_metadata_error(const int row_group_index,
1619  const int column_index,
1620  const std::string& file_path) {
1621  throw std::runtime_error{
1622  "Statistics metadata is required for all row groups. Metadata is missing for "
1623  "row group index: " +
1624  std::to_string(row_group_index) +
1625  ", column index: " + std::to_string(column_index) + ", file path: " + file_path};
1626 }
1627 
1631  std::string file_path;
1632 };
1633 
1635  const MaxRowGroupSizeStats max_row_group_stats,
1636  const int fragment_size) {
1637  auto metadata_scan_exception = MetadataScanInfeasibleFragmentSizeException{
1638  "Parquet file has a row group size that is larger than the fragment size. "
1639  "Please set the table fragment size to a number that is larger than the "
1640  "row group size. Row group index: " +
1641  std::to_string(max_row_group_stats.max_row_group_index) +
1642  ", row group size: " + std::to_string(max_row_group_stats.max_row_group_size) +
1643  ", fragment size: " + std::to_string(fragment_size) +
1644  ", file path: " + max_row_group_stats.file_path};
1645  metadata_scan_exception.min_feasible_fragment_size_ =
1646  max_row_group_stats.max_row_group_size;
1647  throw metadata_scan_exception;
1648 }
1649 
1651  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1652  const std::string& file_path,
1653  const ForeignTableSchema& schema,
1654  const bool do_metadata_stats_validation) {
1655  auto column_it = schema.getLogicalColumns().begin();
1656  MaxRowGroupSizeStats max_row_group_stats{0, 0};
1657  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1658  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1659  try {
1660  validate_allowed_mapping(descr, *column_it);
1661  } catch (std::runtime_error& e) {
1662  std::stringstream error_message;
1663  error_message << e.what() << " Parquet column: " << descr->path()->ToDotString()
1664  << ", HeavyDB column: " << (*column_it)->columnName
1665  << ", Parquet file: " << file_path << ".";
1666  throw std::runtime_error(error_message.str());
1667  }
1668 
1669  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
1670  auto group_metadata = file_metadata->RowGroup(r);
1671  auto num_rows = group_metadata->num_rows();
1672  if (num_rows == 0) {
1673  continue;
1674  } else if (num_rows > max_row_group_stats.max_row_group_size) {
1675  max_row_group_stats.max_row_group_size = num_rows;
1676  max_row_group_stats.max_row_group_index = r;
1677  max_row_group_stats.file_path = file_path;
1678  }
1679 
1680  if (do_metadata_stats_validation) {
1681  auto column_chunk = group_metadata->ColumnChunk(i);
1682  bool contains_metadata = column_chunk->is_stats_set();
1683  if (contains_metadata) {
1684  auto stats = column_chunk->statistics();
1685  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1686  bool is_list = is_valid_parquet_list_column(file_metadata->schema()->Column(i));
1687  // Given a list, it is possible it has no min or max if it is comprised
1688  // only of empty lists & nulls. This can not be detected by comparing
1689  // the null count; therefore we afford list types the benefit of the
1690  // doubt in this situation.
1691  if (!(stats->HasMinMax() || is_all_nulls || is_list)) {
1692  contains_metadata = false;
1693  }
1694  }
1695 
1696  if (!contains_metadata) {
1697  throw_missing_metadata_error(r, i, file_path);
1698  }
1699  }
1700  }
1701  }
1702  return max_row_group_stats;
1703 }
1704 
1706  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1707  const std::string& file_path,
1708  const ForeignTableSchema& schema,
1709  const bool do_metadata_stats_validation) {
1710  validate_number_of_columns(file_metadata, file_path, schema);
1712  file_metadata, file_path, schema, do_metadata_stats_validation);
1713 }
1714 
1715 std::list<RowGroupMetadata> metadata_scan_rowgroup_interval(
1716  const std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1717  const RowGroupInterval& row_group_interval,
1718  const ReaderPtr& reader,
1719  const ForeignTableSchema& schema) {
1720  std::list<RowGroupMetadata> row_group_metadata;
1721  auto column_interval =
1722  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1723  schema.getLogicalAndPhysicalColumns().back()->columnId};
1724 
1725  auto file_metadata = reader->parquet_reader()->metadata();
1726  for (int row_group = row_group_interval.start_index;
1727  row_group <= row_group_interval.end_index;
1728  ++row_group) {
1729  auto& row_group_metadata_item = row_group_metadata.emplace_back();
1730  row_group_metadata_item.row_group_index = row_group;
1731  row_group_metadata_item.file_path = row_group_interval.file_path;
1732 
1733  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1734  file_metadata->RowGroup(row_group);
1735 
1736  for (int column_id = column_interval.start; column_id <= column_interval.end;
1737  column_id++) {
1738  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1739  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1740  auto encoder_map_iter =
1741  encoder_map.find(schema.getLogicalColumn(column_id)->columnId);
1742  CHECK(encoder_map_iter != encoder_map.end());
1743  try {
1744  auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1745  group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1746  row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1747  } catch (const std::exception& e) {
1748  std::stringstream error_message;
1749  error_message << e.what() << " in row group " << row_group << " of Parquet file '"
1750  << row_group_interval.file_path << "'.";
1751  throw std::runtime_error(error_message.str());
1752  }
1753  }
1754  }
1755  return row_group_metadata;
1756 }
1757 
1758 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_import(
1759  const std::map<int, Chunk_NS::Chunk> chunks,
1760  const ForeignTableSchema& schema,
1761  const ReaderPtr& reader,
1762  const std::map<int, StringDictionary*> column_dictionaries,
1763  const int64_t num_rows,
1764  const bool geo_validate_geometry) {
1765  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1766  auto file_metadata = reader->parquet_reader()->metadata();
1767  for (auto& [column_id, chunk] : chunks) {
1768  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1769  if (column_descriptor->isGeoPhyCol) { // skip physical columns
1770  continue;
1771  }
1772  auto parquet_column_descriptor =
1773  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1774  auto find_it = column_dictionaries.find(column_id);
1775  StringDictionary* dictionary =
1776  (find_it == column_dictionaries.end() ? nullptr : find_it->second);
1777  std::list<Chunk_NS::Chunk> chunks_for_import;
1778  chunks_for_import.push_back(chunk);
1779  if (column_descriptor->columnType.is_geometry()) {
1780  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1781  chunks_for_import.push_back(chunks.at(column_id + i + 1));
1782  }
1783  }
1784  encoder_map[column_id] = create_parquet_encoder_for_import(chunks_for_import,
1785  column_descriptor,
1786  parquet_column_descriptor,
1787  dictionary,
1788  geo_validate_geometry);
1789 
1790  // reserve space in buffer when num-elements known ahead of time for types
1791  // of known size (for example dictionary encoded strings)
1792  auto encoder = shared::get_from_map(encoder_map, column_id);
1793  if (auto inplace_encoder = dynamic_cast<ParquetInPlaceEncoder*>(encoder.get())) {
1794  inplace_encoder->reserve(num_rows);
1795  }
1796  }
1797  return encoder_map;
1798 }
1799 
1800 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_metadata_scan(
1801  const Interval<ColumnType>& column_interval,
1802  const ForeignTableSchema& schema,
1803  const ReaderPtr& reader,
1804  const bool do_metadata_stats_validation,
1805  const bool geo_validate_geometry) {
1806  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1807  auto file_metadata = reader->parquet_reader()->metadata();
1808  for (int column_id = column_interval.start; column_id <= column_interval.end;
1809  column_id++) {
1810  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1811  auto parquet_column_descriptor =
1812  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1813  encoder_map[column_id] = create_parquet_encoder_for_metadata_scan(
1814  column_descriptor, parquet_column_descriptor, geo_validate_geometry);
1815  if (!do_metadata_stats_validation) {
1816  shared::get_from_map(encoder_map, column_id)->disableMetadataStatsValidation();
1817  }
1818  column_id += column_descriptor->columnType.get_physical_cols();
1819  }
1820  return encoder_map;
1821 }
1822 } // namespace
1823 
1824 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroups(
1825  const std::vector<RowGroupInterval>& row_group_intervals,
1826  const int parquet_column_index,
1827  const ColumnDescriptor* column_descriptor,
1828  std::list<Chunk_NS::Chunk>& chunks,
1829  StringDictionary* string_dictionary,
1830  RejectedRowIndices* rejected_row_indices,
1831  const bool is_for_detect,
1832  const std::optional<int64_t> max_levels_read) {
1833  auto timer = DEBUG_TIMER(__func__);
1834  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1835  // `def_levels` and `rep_levels` below are used to store the read definition
1836  // and repetition levels of the Dremel encoding implemented by the Parquet
1837  // format
1838  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1839  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1840  std::vector<int8_t> values;
1841 
1842  CHECK(!row_group_intervals.empty());
1843  const auto& first_file_path = row_group_intervals.front().file_path;
1844 
1845  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1846  auto first_parquet_column_descriptor =
1847  get_column_descriptor(first_file_reader, parquet_column_index);
1848  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1849 
1850  const bool geo_validate_geometry =
1852  auto encoder = create_parquet_encoder(column_descriptor,
1853  first_parquet_column_descriptor,
1854  chunks,
1855  string_dictionary,
1856  chunk_metadata,
1857  false,
1858  false,
1859  is_for_detect,
1860  geo_validate_geometry);
1861  CHECK(encoder.get());
1862 
1863  if (rejected_row_indices) { // error tracking is enabled
1864  encoder->initializeErrorTracking();
1865  }
1866  encoder->initializeColumnType(column_descriptor->columnType);
1867 
1868  bool early_exit = false;
1869  int64_t total_levels_read = 0;
1870  for (const auto& row_group_interval : row_group_intervals) {
1871  const auto& file_path = row_group_interval.file_path;
1872  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1873 
1874  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1875  CHECK(row_group_interval.start_index >= 0 &&
1876  row_group_interval.end_index < num_row_groups);
1877  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1878 
1879  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1880  auto parquet_column_descriptor =
1881  get_column_descriptor(file_reader, parquet_column_index);
1882  validate_equal_column_descriptor(first_parquet_column_descriptor,
1883  parquet_column_descriptor,
1884  first_file_path,
1885  file_path);
1886 
1888  parquet_column_descriptor);
1890  def_levels);
1891 
1892  int64_t values_read = 0;
1893  for (int row_group_index = row_group_interval.start_index;
1894  row_group_index <= row_group_interval.end_index;
1895  ++row_group_index) {
1896  auto group_reader = parquet_reader->RowGroup(row_group_index);
1897  std::shared_ptr<parquet::ColumnReader> col_reader =
1898  group_reader->Column(parquet_column_index);
1899 
1900  try {
1901  while (col_reader->HasNext()) {
1902  int64_t levels_read =
1904  def_levels.data(),
1905  rep_levels.data(),
1906  reinterpret_cast<uint8_t*>(values.data()),
1907  &values_read,
1908  col_reader.get());
1909 
1910  if (rejected_row_indices) { // error tracking is enabled
1911  encoder->appendDataTrackErrors(def_levels.data(),
1912  rep_levels.data(),
1913  values_read,
1914  levels_read,
1915  values.data());
1916  } else { // no error tracking enabled
1918  parquet_reader, // this validation only in effect for foreign tables
1919  row_group_index,
1920  parquet_column_index,
1921  def_levels.data(),
1922  levels_read,
1923  parquet_column_descriptor);
1924 
1925  encoder->appendData(def_levels.data(),
1926  rep_levels.data(),
1927  values_read,
1928  levels_read,
1929  values.data());
1930  }
1931 
1932  if (max_levels_read.has_value()) {
1933  total_levels_read += levels_read;
1934  if (total_levels_read >= max_levels_read.value()) {
1935  early_exit = true;
1936  break;
1937  }
1938  }
1939  }
1940  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1941  array_encoder->finalizeRowGroup();
1942  }
1943  } catch (const std::exception& error) {
1944  // check for a specific error to detect a possible unexpected switch of data
1945  // source in order to respond with informative error message
1946  if (boost::regex_search(error.what(),
1947  boost::regex{"Deserializing page header failed."})) {
1949  "Unable to read from foreign data source, possible cause is an unexpected "
1950  "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1951  "the "
1952  "foreign table "
1953  "if data source has been updated. Foreign table: " +
1955  }
1956 
1958  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1959  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1960  "', Parquet file: '" + file_path + "'");
1961  }
1962  if (max_levels_read.has_value() && early_exit) {
1963  break;
1964  }
1965  }
1966  if (max_levels_read.has_value() && early_exit) {
1967  break;
1968  }
1969  }
1970 
1971  if (rejected_row_indices) { // error tracking is enabled
1972  *rejected_row_indices = encoder->getRejectedRowIndices();
1973  }
1974  return chunk_metadata;
1975 }
1976 
1978  const parquet::ColumnDescriptor* parquet_column) {
1979  auto type = suggest_column_scalar_type(parquet_column);
1980 
1981  // array case
1982  if (is_valid_parquet_list_column(parquet_column)) {
1983  return type.get_array_type();
1984  }
1985 
1986  return type;
1987 }
1988 
1990  const ColumnDescriptor* omnisci_column,
1991  const parquet::ColumnDescriptor* parquet_column) {
1992  CHECK(!omnisci_column->columnType.is_array())
1993  << "isColumnMappingSupported should not be called on arrays";
1994  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1995  return true;
1996  }
1997  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1998  return true;
1999  }
2000  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
2001  return true;
2002  }
2003  if (validate_integral_mapping(omnisci_column, parquet_column)) {
2004  return true;
2005  }
2006  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
2007  return true;
2008  }
2009  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
2010  return true;
2011  }
2012  if (validate_time_mapping(omnisci_column, parquet_column)) {
2013  return true;
2014  }
2015  if (validate_date_mapping(omnisci_column, parquet_column)) {
2016  return true;
2017  }
2018  if (validate_string_mapping(omnisci_column, parquet_column)) {
2019  return true;
2020  }
2021  return false;
2022 }
2023 
2025  std::shared_ptr<arrow::fs::FileSystem> file_system,
2026  FileReaderMap* file_map,
2027  const ForeignTable* foreign_table)
2028  : file_system_(file_system)
2029  , file_reader_cache_(file_map)
2030  , foreign_table_(foreign_table) {
2031  CHECK(foreign_table_) << "LazyParquetChunkLoader: null Foreign Table ptr";
2032 }
2033 
2034 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
2035  const std::vector<RowGroupInterval>& row_group_intervals,
2036  const int parquet_column_index,
2037  std::list<Chunk_NS::Chunk>& chunks,
2038  StringDictionary* string_dictionary,
2039  RejectedRowIndices* rejected_row_indices) {
2040  CHECK(!chunks.empty());
2041  auto const& chunk = *chunks.begin();
2042  auto column_descriptor = chunk.getColumnDesc();
2043  auto buffer = chunk.getBuffer();
2044  CHECK(buffer);
2045 
2046  try {
2047  auto metadata = appendRowGroups(row_group_intervals,
2048  parquet_column_index,
2049  column_descriptor,
2050  chunks,
2051  string_dictionary,
2052  rejected_row_indices);
2053  return metadata;
2054  } catch (const std::exception& error) {
2055  throw ForeignStorageException(error.what());
2056  }
2057 
2058  return {};
2059 }
2060 
2063  : def_levels(LazyParquetChunkLoader::batch_reader_num_elements)
2064  , rep_levels(LazyParquetChunkLoader::batch_reader_num_elements) {}
2065  std::vector<int16_t> def_levels;
2066  std::vector<int16_t> rep_levels;
2067  std::vector<int8_t> values;
2068  int64_t values_read;
2069  int64_t levels_read;
2070 };
2071 
2073  public:
2074  ParquetRowGroupReader(std::shared_ptr<parquet::ColumnReader> col_reader,
2075  const ColumnDescriptor* column_descriptor,
2076  const parquet::ColumnDescriptor* parquet_column_descriptor,
2077  ParquetEncoder* encoder,
2078  InvalidRowGroupIndices& invalid_indices,
2079  const int row_group_index,
2080  const int parquet_column_index,
2081  const parquet::ParquetFileReader* parquet_reader)
2082  : col_reader_(col_reader)
2083  , column_descriptor_(column_descriptor)
2084  , parquet_column_descriptor_(parquet_column_descriptor)
2085  , encoder_(encoder)
2086  , invalid_indices_(invalid_indices)
2087  , row_group_index_(row_group_index)
2088  , parquet_column_index_(parquet_column_index)
2089  , parquet_reader_(parquet_reader) {
2090  import_encoder = dynamic_cast<ParquetImportEncoder*>(encoder);
2092  }
2093 
2095  while (col_reader_->HasNext()) {
2096  ParquetBatchData batch_data;
2099  batch_data.levels_read =
2101  batch_data.def_levels.data(),
2102  batch_data.rep_levels.data(),
2103  reinterpret_cast<uint8_t*>(batch_data.values.data()),
2104  &batch_data.values_read,
2105  col_reader_.get());
2112  batch_data.def_levels.data(),
2113  batch_data.levels_read,
2115  import_encoder->validateAndAppendData(batch_data.def_levels.data(),
2116  batch_data.rep_levels.data(),
2117  batch_data.values_read,
2118  batch_data.levels_read,
2119  batch_data.values.data(),
2120  column_type,
2122  }
2123  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder_)) {
2124  array_encoder->finalizeRowGroup();
2125  }
2126  }
2127 
2128  void eraseInvalidRowGroupData(const InvalidRowGroupIndices& invalid_indices) {
2129  import_encoder->eraseInvalidIndicesInBuffer(invalid_indices);
2130  }
2131 
2132  private:
2133  std::shared_ptr<parquet::ColumnReader> col_reader_;
2135  const parquet::ColumnDescriptor* parquet_column_descriptor_;
2139  const int row_group_index_;
2141  const parquet::ParquetFileReader* parquet_reader_;
2142 };
2143 
2144 std::pair<size_t, size_t> LazyParquetChunkLoader::loadRowGroups(
2145  const RowGroupInterval& row_group_interval,
2146  const std::map<int, Chunk_NS::Chunk>& chunks,
2147  const ForeignTableSchema& schema,
2148  const std::map<int, StringDictionary*>& column_dictionaries,
2149  const int num_threads) {
2150  auto timer = DEBUG_TIMER(__func__);
2151 
2152  const auto& file_path = row_group_interval.file_path;
2153 
2154  // do not use caching with file-readers, open a new one for every request
2155  auto file_reader_owner = open_parquet_table(file_path, file_system_);
2156  auto file_reader = file_reader_owner.get();
2157  auto file_metadata = file_reader->parquet_reader()->metadata();
2158 
2159  validate_number_of_columns(file_metadata, file_path, schema);
2160 
2161  // check for fixed length encoded columns and indicate to the user
2162  // they should not be used
2163  for (const auto column_descriptor : schema.getLogicalColumns()) {
2164  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
2165  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2166  try {
2167  validate_allowed_mapping(parquet_column, column_descriptor);
2168  } catch (std::runtime_error& e) {
2169  std::stringstream error_message;
2170  error_message << e.what()
2171  << " Parquet column: " << parquet_column->path()->ToDotString()
2172  << ", HeavyDB column: " << column_descriptor->columnName
2173  << ", Parquet file: " << file_path << ".";
2174  throw std::runtime_error(error_message.str());
2175  }
2176  }
2177 
2178  CHECK(row_group_interval.start_index == row_group_interval.end_index);
2179  auto row_group_index = row_group_interval.start_index;
2180  std::map<int, ParquetRowGroupReader> row_group_reader_map;
2181 
2182  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2183  auto group_reader = parquet_reader->RowGroup(row_group_index);
2184 
2185  std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2186 
2187  const bool geo_validate_geometry =
2189  auto encoder_map = populate_encoder_map_for_import(chunks,
2190  schema,
2191  file_reader,
2192  column_dictionaries,
2193  group_reader->metadata()->num_rows(),
2194  geo_validate_geometry);
2195 
2196  std::vector<std::set<int>> partitions(num_threads);
2197  std::map<int, int> column_id_to_thread;
2198  for (auto& [column_id, encoder] : encoder_map) {
2199  auto thread_id = column_id % num_threads;
2200  column_id_to_thread[column_id] = thread_id;
2201  partitions[thread_id].insert(column_id);
2202  }
2203 
2204  for (auto& [column_id, encoder] : encoder_map) {
2205  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
2206  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
2207  auto parquet_column_descriptor =
2208  file_metadata->schema()->Column(parquet_column_index);
2209 
2210  // validate
2211  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
2212  CHECK(row_group_interval.start_index >= 0 &&
2213  row_group_interval.end_index < num_row_groups);
2214  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2216  parquet_column_descriptor);
2217 
2218  std::shared_ptr<parquet::ColumnReader> col_reader =
2219  group_reader->Column(parquet_column_index);
2220 
2221  row_group_reader_map.insert(
2222  {column_id,
2223  ParquetRowGroupReader(col_reader,
2224  column_descriptor,
2225  parquet_column_descriptor,
2226  shared::get_from_map(encoder_map, column_id).get(),
2227  invalid_indices_per_thread[shared::get_from_map(
2228  column_id_to_thread, column_id)],
2229  row_group_index,
2230  parquet_column_index,
2231  parquet_reader)});
2232  }
2233 
2234  std::vector<std::future<void>> futures;
2235  for (int ithread = 0; ithread < num_threads; ++ithread) {
2236  auto column_ids_for_thread = partitions[ithread];
2237  futures.emplace_back(
2238  std::async(std::launch::async, [&row_group_reader_map, column_ids_for_thread] {
2239  for (const auto column_id : column_ids_for_thread) {
2240  shared::get_from_map(row_group_reader_map, column_id)
2241  .readAndValidateRowGroup(); // reads and validate entire row group per
2242  // column
2243  }
2244  }));
2245  }
2246 
2247  for (auto& future : futures) {
2248  future.wait();
2249  }
2250 
2251  for (auto& future : futures) {
2252  future.get();
2253  }
2254 
2255  // merge/reduce invalid indices
2256  InvalidRowGroupIndices invalid_indices;
2257  for (auto& thread_invalid_indices : invalid_indices_per_thread) {
2258  invalid_indices.merge(thread_invalid_indices);
2259  }
2260 
2261  for (auto& [_, reader] : row_group_reader_map) {
2262  reader.eraseInvalidRowGroupData(
2263  invalid_indices); // removes invalid encoded data in buffers
2264  }
2265 
2266  // update the element count for each encoder
2267  for (const auto column_descriptor : schema.getLogicalColumns()) {
2268  auto column_id = column_descriptor->columnId;
2269  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
2270  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2271  invalid_indices.size());
2272  size_t updated_num_elems = db_encoder->getNumElems() +
2273  group_reader->metadata()->num_rows() -
2274  invalid_indices.size();
2275  db_encoder->setNumElems(updated_num_elems);
2276  if (column_descriptor->columnType.is_geometry()) {
2277  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2278  auto db_encoder =
2279  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
2280  db_encoder->setNumElems(updated_num_elems);
2281  }
2282  }
2283  }
2284 
2285  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2286  invalid_indices.size()};
2287 }
2288 
2290  std::vector<std::unique_ptr<TypedParquetDetectBuffer>> detect_buffers;
2291  std::vector<Chunk_NS::Chunk> column_chunks;
2292  std::vector<std::unique_ptr<RejectedRowIndices>> rejected_row_indices_per_column;
2293  std::list<ColumnDescriptor> column_descriptors;
2294 };
2295 
2296 DataPreview LazyParquetChunkLoader::previewFiles(const std::vector<std::string>& files,
2297  const size_t max_num_rows,
2298  const ForeignTable& foreign_table) {
2299  CHECK(!files.empty());
2300 
2301  auto first_file = *files.begin();
2302  auto first_file_reader = file_reader_cache_->getOrInsert(*files.begin(), file_system_);
2303 
2304  for (auto current_file_it = ++files.begin(); current_file_it != files.end();
2305  ++current_file_it) {
2306  auto file_reader = file_reader_cache_->getOrInsert(*current_file_it, file_system_);
2307  validate_equal_schema(first_file_reader, file_reader, first_file, *current_file_it);
2308  }
2309 
2310  auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2311  auto num_columns = first_file_metadata->num_columns();
2312 
2313  DataPreview data_preview;
2314  data_preview.num_rejected_rows = 0;
2315 
2316  auto current_file_it = files.begin();
2317  while (data_preview.sample_rows.size() < max_num_rows &&
2318  current_file_it != files.end()) {
2319  size_t total_num_rows = data_preview.sample_rows.size();
2320  size_t max_num_rows_to_append = max_num_rows - data_preview.sample_rows.size();
2321 
2322  // gather enough rows in row groups to produce required samples
2323  std::vector<RowGroupInterval> row_group_intervals;
2324  for (; current_file_it != files.end(); ++current_file_it) {
2325  const auto& file_path = *current_file_it;
2326  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
2327  auto file_metadata = file_reader->parquet_reader()->metadata();
2328  auto num_row_groups = file_metadata->num_row_groups();
2329  int end_row_group = 0;
2330  for (int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2331  const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2332  total_num_rows += next_num_rows;
2333  end_row_group = i;
2334  }
2335  row_group_intervals.push_back(RowGroupInterval{file_path, 0, end_row_group});
2336  }
2337 
2338  PreviewContext preview_context;
2339  for (int i = 0; i < num_columns; ++i) {
2340  auto col = first_file_metadata->schema()->Column(i);
2341  ColumnDescriptor& cd = preview_context.column_descriptors.emplace_back();
2342  auto sql_type = LazyParquetChunkLoader::suggestColumnMapping(col);
2343  cd.columnType = sql_type;
2344  cd.columnName =
2345  sql_type.is_array() ? col->path()->ToDotVector()[0] + "_array" : col->name();
2346  cd.isSystemCol = false;
2347  cd.isVirtualCol = false;
2348  cd.tableId = -1;
2349  cd.columnId = i + 1;
2350  data_preview.column_names.emplace_back(cd.columnName);
2351  data_preview.column_types.emplace_back(sql_type);
2352  preview_context.detect_buffers.push_back(
2353  std::make_unique<TypedParquetDetectBuffer>());
2354  preview_context.rejected_row_indices_per_column.push_back(
2355  std::make_unique<RejectedRowIndices>());
2356  auto& detect_buffer = preview_context.detect_buffers.back();
2357  auto& chunk = preview_context.column_chunks.emplace_back(&cd);
2358  chunk.setPinnable(false);
2359  chunk.setBuffer(detect_buffer.get());
2360  }
2361 
2362  std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2363  [&](const std::vector<int>& column_indices) {
2364  for (const auto& column_index : column_indices) {
2365  auto& chunk = preview_context.column_chunks[column_index];
2366  auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2367  auto& rejected_row_indices =
2368  preview_context.rejected_row_indices_per_column[column_index];
2369  appendRowGroups(row_group_intervals,
2370  column_index,
2371  chunk.getColumnDesc(),
2372  chunk_list,
2373  nullptr,
2374  rejected_row_indices.get(),
2375  true,
2376  max_num_rows_to_append);
2377  }
2378  };
2379 
2380  auto num_threads = foreign_storage::get_num_threads(foreign_table);
2381 
2382  std::vector<int> columns(num_columns);
2383  std::iota(columns.begin(), columns.end(), 0);
2384  auto futures =
2385  create_futures_for_workers(columns, num_threads, append_row_groups_for_column);
2386  for (auto& future : futures) {
2387  future.wait();
2388  }
2389  for (auto& future : futures) {
2390  future.get();
2391  }
2392 
2393  // merge all `rejected_row_indices_per_column`
2394  auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2395  for (int i = 0; i < num_columns; ++i) {
2396  rejected_row_indices->insert(
2397  preview_context.rejected_row_indices_per_column[i]->begin(),
2398  preview_context.rejected_row_indices_per_column[i]->end());
2399  }
2400 
2401  size_t num_rows = 0;
2402  auto buffers_it = preview_context.detect_buffers.begin();
2403  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2404  CHECK(buffers_it != preview_context.detect_buffers.end());
2405  auto& strings = buffers_it->get()->getStrings();
2406  if (i == 0) {
2407  num_rows = strings.size();
2408  } else {
2409  CHECK_EQ(num_rows, strings.size());
2410  }
2411  }
2412 
2413  size_t num_rejected_rows = rejected_row_indices->size();
2414  data_preview.num_rejected_rows += num_rejected_rows;
2415  CHECK_GE(num_rows, num_rejected_rows);
2416  auto row_count = num_rows - num_rejected_rows;
2417 
2418  auto offset_row = data_preview.sample_rows.size();
2419  data_preview.sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2420 
2421  for (size_t irow = 0, rows_appended = 0;
2422  irow < num_rows && offset_row + rows_appended < max_num_rows;
2423  ++irow) {
2424  if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2425  continue;
2426  }
2427  auto& row_data = data_preview.sample_rows[offset_row + rows_appended];
2428  row_data.resize(num_columns);
2429  auto buffers_it = preview_context.detect_buffers.begin();
2430  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2431  CHECK(buffers_it != preview_context.detect_buffers.end());
2432  auto& strings = buffers_it->get()->getStrings();
2433  row_data[i] = strings[irow];
2434  }
2435  ++rows_appended;
2436  }
2437  }
2438 
2439  // attempt to detect geo columns
2440  for (int i = 0; i < num_columns; ++i) {
2441  auto type_info = data_preview.column_types[i];
2442  if (type_info.is_string()) {
2443  auto tentative_geo_type =
2445  if (tentative_geo_type.has_value()) {
2446  data_preview.column_types[i].set_type(tentative_geo_type.value());
2447  data_preview.column_types[i].set_compression(kENCODING_NONE);
2448  }
2449  }
2450  }
2451 
2452  return data_preview;
2453 }
2454 
2455 std::list<RowGroupMetadata> LazyParquetChunkLoader::metadataScan(
2456  const std::vector<std::string>& file_paths,
2457  const ForeignTableSchema& schema,
2458  const bool do_metadata_stats_validation) {
2459  auto timer = DEBUG_TIMER(__func__);
2460  auto column_interval =
2461  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
2462  schema.getLogicalAndPhysicalColumns().back()->columnId};
2463  CHECK(!file_paths.empty());
2464 
2465  // The encoder map needs to be populated before we can start scanning rowgroups, so we
2466  // peel the first file_path out of the async loop below to perform population.
2467  const auto& first_path = *file_paths.begin();
2468  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
2469  auto max_row_group_stats =
2470  validate_parquet_metadata(first_reader->parquet_reader()->metadata(),
2471  first_path,
2472  schema,
2473  do_metadata_stats_validation);
2474 
2475  const bool geo_validate_geometry =
2477  auto encoder_map = populate_encoder_map_for_metadata_scan(column_interval,
2478  schema,
2479  first_reader,
2480  do_metadata_stats_validation,
2481  geo_validate_geometry);
2482  const auto num_row_groups = get_parquet_table_size(first_reader).first;
2483  auto row_group_metadata = metadata_scan_rowgroup_interval(
2484  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2485 
2486  // We want each (filepath->FileReader) pair in the cache to be initialized before we
2487  // multithread so that we are not adding keys in a concurrent environment, so we add
2488  // cache entries for each path and initialize to an empty unique_ptr if the file has not
2489  // yet been opened.
2490  // Since we have already performed the first iteration, we skip it in the thread groups
2491  // so as not to process it twice.
2492  std::vector<std::string> cache_subset;
2493  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2495  cache_subset.emplace_back(*path_it);
2496  }
2497 
2498  // Iterate asyncronously over any paths beyond the first.
2499  auto table_ptr = schema.getForeignTable();
2500  CHECK(table_ptr);
2501  auto num_threads = foreign_storage::get_num_threads(*table_ptr);
2502  auto paths_per_thread = partition_for_threads(cache_subset, num_threads);
2503  std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2504  futures;
2505  for (const auto& path_group : paths_per_thread) {
2506  futures.emplace_back(std::async(
2508  [&](const auto& paths, const auto& file_reader_cache)
2509  -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2510  std::list<RowGroupMetadata> reduced_metadata;
2511  MaxRowGroupSizeStats max_row_group_stats{0, 0};
2512  for (const auto& path : paths.get()) {
2513  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
2514  validate_equal_schema(first_reader, reader, first_path, path);
2515  auto local_max_row_group_stats =
2516  validate_parquet_metadata(reader->parquet_reader()->metadata(),
2517  path,
2518  schema,
2519  do_metadata_stats_validation);
2520  if (local_max_row_group_stats.max_row_group_size >
2521  max_row_group_stats.max_row_group_size) {
2522  max_row_group_stats = local_max_row_group_stats;
2523  }
2524  const auto num_row_groups = get_parquet_table_size(reader).first;
2525  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
2526  reduced_metadata.splice(
2527  reduced_metadata.end(),
2528  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
2529  }
2530  return {reduced_metadata, max_row_group_stats};
2531  },
2532  std::ref(path_group),
2533  std::ref(*file_reader_cache_)));
2534  }
2535 
2536  // Reduce all the row_group results.
2537  for (auto& future : futures) {
2538  auto [metadata, local_max_row_group_stats] = future.get();
2539  row_group_metadata.splice(row_group_metadata.end(), metadata);
2540  if (local_max_row_group_stats.max_row_group_size >
2541  max_row_group_stats.max_row_group_size) {
2542  max_row_group_stats = local_max_row_group_stats;
2543  }
2544  }
2545 
2546  if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2548  max_row_group_stats, schema.getForeignTable()->maxFragRows);
2549  }
2550 
2551  return row_group_metadata;
2552 }
2553 
2554 } // namespace foreign_storage
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
std::shared_ptr< parquet::ColumnReader > col_reader_
std::list< ColumnDescriptor > column_descriptors
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:392
void set_compression(EncodingType c)
Definition: sqltypes.h:479
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
Definition: DataPreview.cpp:22
std::vector< Chunk_NS::Chunk > column_chunks
static constexpr int32_t kMaxNumericPrecision
Definition: sqltypes.h:58
Definition: sqltypes.h:76
std::vector< std::string > column_names
Definition: DataPreview.h:28
std::string tableName
std::vector< SQLTypeInfo > column_types
Definition: DataPreview.h:29
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:41
#define LOG(tag)
Definition: Logger.h:285
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
bool is_fp() const
Definition: sqltypes.h:571
HOST DEVICE int get_scale() const
Definition: sqltypes.h:396
size_t get_num_threads(const ForeignTable &table)
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool geo_validate_geometry)
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
#define UNREACHABLE()
Definition: Logger.h:338
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
#define CHECK_GE(x, y)
Definition: Logger.h:306
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor *parquet_column)
MaxRowGroupSizeStats validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
dictionary stats
Definition: report.py:116
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void validate_list_column_metadata_statistics(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::string to_string(char const *&&v)
int getParquetColumnIndex(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect, const bool geo_validate_geometry)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
void set_definition_levels_for_zero_max_definition_level_case(const parquet::ColumnDescriptor *parquet_column_descriptor, std::vector< int16_t > &def_levels)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
Definition: sqltypes.h:589
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
void set_fixed_size()
Definition: sqltypes.h:477
SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_integer() const
Definition: sqltypes.h:565
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
void set_scale(int s)
Definition: sqltypes.h:473
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
SQLTypeInfo suggest_floating_point_mapping(const parquet::ColumnDescriptor *parquet_column)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
Load row groups of data into given chunks.
parquet::arrow::FileReader * ReaderPtr
Definition: ParquetShared.h:33
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import, const bool is_for_detect)
int get_precision() const
Definition: sqltypes.h:394
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
void set_comp_param(int p)
Definition: sqltypes.h:480
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
Definition: sqltypes.h:79
Definition: sqltypes.h:80
SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor *parquet_column)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
static constexpr const char * GEO_VALIDATE_GEOMETRY_KEY
Definition: ForeignTable.h:49
void eraseInvalidRowGroupData(const InvalidRowGroupIndices &invalid_indices)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:399
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary, const bool geo_validate_geometry)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:79
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:393
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const bool geo_validate_geometry)
std::string get_type_name() const
Definition: sqltypes.h:482
void initializeIfEmpty(const std::string &path)
Definition: ParquetShared.h:86
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
DataPreview previewFiles(const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
Preview rows of data and column types in a set of files.
std::vector< std::unique_ptr< TypedParquetDetectBuffer > > detect_buffers
std::vector< std::unique_ptr< RejectedRowIndices > > rejected_row_indices_per_column
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:402
ThreadId thread_id()
Definition: Logger.cpp:877
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:595
#define DEBUG_TIMER(name)
Definition: Logger.h:412
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const bool do_metadata_stats_validation, const bool geo_validate_geometry)
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const ForeignTable *foreign_table)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
SQLTypeInfo suggest_boolean_type_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:72
SQLTypeInfo columnType
const ColumnDescriptor * getLogicalColumn(const int column_id) const
bool is_string() const
Definition: sqltypes.h:559
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema, const bool do_metadata_stats_validation)
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
bool is_decimal() const
Definition: sqltypes.h:568
SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor *parquet_column)
std::string columnName
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool getOptionAsBool(const std::string_view &key) const
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:583
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
void set_precision(int d)
Definition: sqltypes.h:471
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const bool geo_validate_geometry)
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)
bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value)
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:468