OmniSciDB  c0231cc57d
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
27 
30 #include "FsiChunkUtils.h"
35 #include "ParquetDecimalEncoder.h"
41 #include "ParquetStringEncoder.h"
44 #include "ParquetTimeEncoder.h"
47 #include "Shared/misc.h"
50 
51 namespace foreign_storage {
52 
53 namespace {
54 
55 bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value) {
56  return value >= lower_bound && value <= upper_bound;
57 }
58 
59 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
60  return (parquet_column->logical_type()->is_none() &&
61  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
62  parquet_column->logical_type()->is_string();
63 }
64 
101 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
102  const parquet::schema::Node* node = parquet_column->schema_node().get();
103  if ((node->name() != "element" && node->name() != "item") ||
104  !(node->is_required() ||
105  node->is_optional())) { // ensure first innermost node is named "element"
106  // which is required by the parquet specification;
107  // however testing shows that pyarrow generates this
108  // column with the name of "item"
109  // this field must be either required or optional
110  return false;
111  }
112  node = node->parent();
113  if (!node) { // required nested structure
114  return false;
115  }
116  if (node->name() != "list" || !node->is_repeated() ||
117  !node->is_group()) { // ensure second innermost node is named "list" which is
118  // a repeated group; this is
119  // required by the parquet specification
120  return false;
121  }
122  node = node->parent();
123  if (!node) { // required nested structure
124  return false;
125  }
126  if (!node->logical_type()->is_list() ||
127  !(node->is_optional() ||
128  node->is_required())) { // ensure third outermost node has logical type LIST
129  // which is either optional or required; this is required
130  // by the parquet specification
131  return false;
132  }
133  node =
134  node->parent(); // this must now be the root node of schema which is required by
135  // FSI (lists can not be embedded into a deeper nested structure)
136  if (!node) { // required nested structure
137  return false;
138  }
139  node = node->parent();
140  if (node) { // implies the previous node was not the root node
141  return false;
142  }
143  return true;
144 }
145 
146 template <typename V, typename NullType>
147 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
148  const ColumnDescriptor* column_descriptor,
149  const parquet::ColumnDescriptor* parquet_column_descriptor,
150  AbstractBuffer* buffer) {
151  switch (parquet_column_descriptor->physical_type()) {
152  case parquet::Type::INT32:
153  return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
154  buffer, column_descriptor, parquet_column_descriptor);
155  case parquet::Type::INT64:
156  return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
157  buffer, column_descriptor, parquet_column_descriptor);
158  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
159  return std::make_shared<
161  buffer, column_descriptor, parquet_column_descriptor);
162  case parquet::Type::BYTE_ARRAY:
163  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
164  buffer, column_descriptor, parquet_column_descriptor);
165  default:
166  UNREACHABLE();
167  }
168  return {};
169 }
170 
171 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
172  const ColumnDescriptor* omnisci_column,
173  const parquet::ColumnDescriptor* parquet_column,
174  AbstractBuffer* buffer,
175  const bool is_metadata_scan_or_for_import) {
176  if (parquet_column->logical_type()->is_decimal()) {
177  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
178  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
179  omnisci_column, parquet_column, buffer);
180  }
181  CHECK(omnisci_column->columnType.get_compression() == kENCODING_FIXED);
182  if (is_metadata_scan_or_for_import) {
183  switch (omnisci_column->columnType.get_comp_param()) {
184  case 16:
185  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
186  omnisci_column, parquet_column, buffer);
187  case 32:
188  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
189  omnisci_column, parquet_column, buffer);
190  default:
191  UNREACHABLE();
192  }
193  } else {
194  switch (omnisci_column->columnType.get_comp_param()) {
195  case 16:
196  return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
197  omnisci_column, parquet_column, buffer);
198  case 32:
199  return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
200  omnisci_column, parquet_column, buffer);
201  default:
202  UNREACHABLE();
203  }
204  }
205  }
206  return {};
207 }
208 
223 template <typename V, typename T, typename U, typename NullType>
224 std::shared_ptr<ParquetEncoder>
226  AbstractBuffer* buffer,
227  const size_t omnisci_data_type_byte_size,
228  const size_t parquet_data_type_byte_size,
229  const bool is_signed) {
230  CHECK(sizeof(NullType) == omnisci_data_type_byte_size);
231  if (is_signed) {
232  return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
233  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
234  } else {
235  return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
236  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
237  }
238 }
239 
259 template <typename V, typename NullType>
261  AbstractBuffer* buffer,
262  const size_t omnisci_data_type_byte_size,
263  const size_t parquet_data_type_byte_size,
264  const int bit_width,
265  const bool is_signed) {
266  switch (bit_width) {
267  case 8:
269  int32_t,
270  uint8_t,
271  NullType>(
272  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
273  case 16:
275  int32_t,
276  uint16_t,
277  NullType>(
278  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
279  case 32:
281  int32_t,
282  uint32_t,
283  NullType>(
284  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
285  case 64:
287  int64_t,
288  uint64_t,
289  NullType>(
290  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
291  default:
292  UNREACHABLE();
293  }
294  return {};
295 }
296 
297 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
298  const ColumnDescriptor* omnisci_column,
299  const parquet::ColumnDescriptor* parquet_column,
300  AbstractBuffer* buffer,
301  const bool is_metadata_scan_or_for_import) {
302  auto column_type = omnisci_column->columnType;
303  auto physical_type = parquet_column->physical_type();
304 
305  int bit_width = -1;
306  int is_signed = false;
307  // handle the integral case with no Parquet annotation
308  if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
309  if (physical_type == parquet::Type::INT32) {
310  bit_width = 32;
311  } else if (physical_type == parquet::Type::INT64) {
312  bit_width = 64;
313  } else {
314  UNREACHABLE();
315  }
316  is_signed = true;
317  }
318  // handle the integral case with Parquet annotation
319  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
320  parquet_column->logical_type().get())) {
321  bit_width = int_logical_column->bit_width();
322  is_signed = int_logical_column->is_signed();
323  }
324 
325  if (bit_width == -1) { // no valid logical type (with or without annotation) found
326  return {};
327  }
328 
329  const size_t omnisci_data_type_byte_size = column_type.get_size();
330  const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
331 
332  switch (omnisci_data_type_byte_size) {
333  case 8:
334  CHECK(column_type.get_compression() == kENCODING_NONE);
335  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
336  buffer,
337  omnisci_data_type_byte_size,
338  parquet_data_type_byte_size,
339  bit_width,
340  is_signed);
341  case 4:
342  if (is_metadata_scan_or_for_import && column_type.get_type() == kBIGINT) {
343  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
344  buffer,
345  omnisci_data_type_byte_size,
346  parquet_data_type_byte_size,
347  bit_width,
348  is_signed);
349  }
350  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
351  buffer,
352  omnisci_data_type_byte_size,
353  parquet_data_type_byte_size,
354  bit_width,
355  is_signed);
356  case 2:
357  if (is_metadata_scan_or_for_import) {
358  switch (column_type.get_type()) {
359  case kBIGINT:
360  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
361  buffer,
362  omnisci_data_type_byte_size,
363  parquet_data_type_byte_size,
364  bit_width,
365  is_signed);
366  case kINT:
367  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
368  buffer,
369  omnisci_data_type_byte_size,
370  parquet_data_type_byte_size,
371  bit_width,
372  is_signed);
373  case kSMALLINT:
374  break;
375  default:
376  UNREACHABLE();
377  }
378  }
379  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
380  buffer,
381  omnisci_data_type_byte_size,
382  parquet_data_type_byte_size,
383  bit_width,
384  is_signed);
385  case 1:
386  if (is_metadata_scan_or_for_import) {
387  switch (column_type.get_type()) {
388  case kBIGINT:
389  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
390  buffer,
391  omnisci_data_type_byte_size,
392  parquet_data_type_byte_size,
393  bit_width,
394  is_signed);
395  case kINT:
396  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
397  buffer,
398  omnisci_data_type_byte_size,
399  parquet_data_type_byte_size,
400  bit_width,
401  is_signed);
402  case kSMALLINT:
403  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
404  buffer,
405  omnisci_data_type_byte_size,
406  parquet_data_type_byte_size,
407  bit_width,
408  is_signed);
409  case kTINYINT:
410  break;
411  default:
412  UNREACHABLE();
413  }
414  }
415  return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
416  buffer,
417  omnisci_data_type_byte_size,
418  parquet_data_type_byte_size,
419  bit_width,
420  is_signed);
421  default:
422  UNREACHABLE();
423  }
424  return {};
425 }
426 
427 std::shared_ptr<ParquetEncoder> create_parquet_floating_point_encoder(
428  const ColumnDescriptor* omnisci_column,
429  const parquet::ColumnDescriptor* parquet_column,
430  AbstractBuffer* buffer) {
431  auto column_type = omnisci_column->columnType;
432  if (!column_type.is_fp()) {
433  return {};
434  }
435  CHECK_EQ(column_type.get_compression(), kENCODING_NONE);
436  switch (column_type.get_type()) {
437  case kFLOAT:
438  switch (parquet_column->physical_type()) {
439  case parquet::Type::FLOAT:
440  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
441  buffer, omnisci_column, parquet_column);
442  case parquet::Type::DOUBLE:
443  return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
444  buffer, omnisci_column, parquet_column);
445  default:
446  UNREACHABLE();
447  }
448  case kDOUBLE:
449  CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
450  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
451  buffer, omnisci_column, parquet_column);
452  default:
453  UNREACHABLE();
454  }
455  return {};
456 }
457 
458 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
459  const ColumnDescriptor* omnisci_column,
460  const parquet::ColumnDescriptor* parquet_column,
461  AbstractBuffer* buffer) {
462  auto column_type = omnisci_column->columnType;
463  if (parquet_column->logical_type()->is_none() &&
464  !omnisci_column->columnType.is_string()) { // boolean
465  if (column_type.get_compression() == kENCODING_NONE) {
466  switch (column_type.get_type()) {
467  case kBOOLEAN:
468  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
469  buffer, omnisci_column, parquet_column);
470  default:
471  UNREACHABLE();
472  }
473  } else {
474  UNREACHABLE();
475  }
476  }
477  return {};
478 }
479 
480 template <typename V, typename T, typename NullType>
481 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder_with_types(
482  const ColumnDescriptor* omnisci_column,
483  const parquet::ColumnDescriptor* parquet_column,
484  AbstractBuffer* buffer) {
485  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
486  parquet_column->logical_type().get())) {
487  switch (timestamp_logical_type->time_unit()) {
488  case parquet::LogicalType::TimeUnit::MILLIS:
489  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
490  buffer, omnisci_column, parquet_column);
491  case parquet::LogicalType::TimeUnit::MICROS:
492  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
493  buffer, omnisci_column, parquet_column);
494  case parquet::LogicalType::TimeUnit::NANOS:
495  return std::make_shared<
497  buffer, omnisci_column, parquet_column);
498  default:
499  UNREACHABLE();
500  }
501  } else {
502  UNREACHABLE();
503  }
504  return {};
505 }
506 
507 template <typename V, typename T, typename NullType>
509  const ColumnDescriptor* omnisci_column,
510  const parquet::ColumnDescriptor* parquet_column,
511  AbstractBuffer* buffer,
512  const bool is_metadata_scan_or_for_import) {
513  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
514  parquet_column->logical_type().get())) {
515  switch (timestamp_logical_type->time_unit()) {
516  case parquet::LogicalType::TimeUnit::MILLIS:
517  if (is_metadata_scan_or_for_import) {
518  return std::make_shared<
520  buffer, omnisci_column, parquet_column);
521  }
522  return std::make_shared<
524  buffer, omnisci_column, parquet_column);
525  case parquet::LogicalType::TimeUnit::MICROS:
526  if (is_metadata_scan_or_for_import) {
527  return std::make_shared<
529  buffer, omnisci_column, parquet_column);
530  }
531  return std::make_shared<
533  buffer, omnisci_column, parquet_column);
534  case parquet::LogicalType::TimeUnit::NANOS:
535  if (is_metadata_scan_or_for_import) {
536  return std::make_shared<
538  T,
539  1000L * 1000L * 1000L,
540  NullType>>(
541  buffer, omnisci_column, parquet_column);
542  }
543  return std::make_shared<
545  buffer, omnisci_column, parquet_column);
546  default:
547  UNREACHABLE();
548  }
549  } else {
550  UNREACHABLE();
551  }
552  return {};
553 }
554 
555 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
556  const ColumnDescriptor* omnisci_column,
557  const parquet::ColumnDescriptor* parquet_column,
558  AbstractBuffer* buffer,
559  const bool is_metadata_scan_or_for_import) {
560  auto column_type = omnisci_column->columnType;
561  auto precision = column_type.get_precision();
562  if (parquet_column->logical_type()->is_timestamp()) {
563  if (column_type.get_compression() == kENCODING_NONE) {
564  if (precision == 0) {
565  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
566  omnisci_column, parquet_column, buffer);
567  } else {
568  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
569  buffer, omnisci_column, parquet_column);
570  }
571  } else if (column_type.get_compression() == kENCODING_FIXED) {
572  CHECK(column_type.get_comp_param() == 32);
573  if (is_metadata_scan_or_for_import) {
574  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
575  omnisci_column, parquet_column, buffer);
576  } else {
577  return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
578  omnisci_column, parquet_column, buffer);
579  }
580  }
581  } else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
582  if (parquet_column->physical_type() == parquet::Type::INT32) {
583  CHECK(column_type.get_compression() == kENCODING_FIXED &&
584  column_type.get_comp_param() == 32);
585  if (is_metadata_scan_or_for_import) {
586  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
587  buffer, omnisci_column, parquet_column);
588  } else {
589  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
590  buffer, omnisci_column, parquet_column);
591  }
592  } else if (parquet_column->physical_type() == parquet::Type::INT64) {
593  if (column_type.get_compression() == kENCODING_NONE) {
594  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
595  buffer, omnisci_column, parquet_column);
596  } else if (column_type.get_compression() == kENCODING_FIXED) {
597  CHECK(column_type.get_comp_param() == 32);
598  if (is_metadata_scan_or_for_import) {
599  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
600  buffer, omnisci_column, parquet_column);
601  } else {
602  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
603  buffer, omnisci_column, parquet_column);
604  }
605  }
606  } else {
607  UNREACHABLE();
608  }
609  }
610  return {};
611 }
612 
613 template <typename V, typename T, typename NullType>
614 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder_with_types(
615  const ColumnDescriptor* omnisci_column,
616  const parquet::ColumnDescriptor* parquet_column,
617  AbstractBuffer* buffer) {
618  if (auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
619  parquet_column->logical_type().get())) {
620  switch (time_logical_type->time_unit()) {
621  case parquet::LogicalType::TimeUnit::MILLIS:
622  return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
623  buffer, omnisci_column, parquet_column);
624  case parquet::LogicalType::TimeUnit::MICROS:
625  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
626  buffer, omnisci_column, parquet_column);
627  case parquet::LogicalType::TimeUnit::NANOS:
628  return std::make_shared<
630  buffer, omnisci_column, parquet_column);
631  default:
632  UNREACHABLE();
633  }
634  } else {
635  UNREACHABLE();
636  }
637  return {};
638 }
639 
640 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
641  const ColumnDescriptor* omnisci_column,
642  const parquet::ColumnDescriptor* parquet_column,
643  AbstractBuffer* buffer,
644  const bool is_metadata_scan_or_for_import) {
645  auto column_type = omnisci_column->columnType;
646  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
647  parquet_column->logical_type().get())) {
648  if (column_type.get_compression() == kENCODING_NONE) {
649  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
650  return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
651  omnisci_column, parquet_column, buffer);
652  } else {
653  return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
654  omnisci_column, parquet_column, buffer);
655  }
656  } else if (column_type.get_compression() == kENCODING_FIXED) {
657  if (is_metadata_scan_or_for_import) {
658  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
659  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
660  return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
661  omnisci_column, parquet_column, buffer);
662  } else {
663  CHECK(time_logical_column->time_unit() ==
664  parquet::LogicalType::TimeUnit::MICROS ||
665  time_logical_column->time_unit() ==
666  parquet::LogicalType::TimeUnit::NANOS);
667  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
668  return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
669  omnisci_column, parquet_column, buffer);
670  }
671  } else {
672  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
673  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
674  return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
675  omnisci_column, parquet_column, buffer);
676  } else {
677  CHECK(time_logical_column->time_unit() ==
678  parquet::LogicalType::TimeUnit::MICROS ||
679  time_logical_column->time_unit() ==
680  parquet::LogicalType::TimeUnit::NANOS);
681  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
682  return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
683  omnisci_column, parquet_column, buffer);
684  }
685  }
686  } else {
687  UNREACHABLE();
688  }
689  }
690  return {};
691 }
692 
693 std::shared_ptr<ParquetEncoder> create_parquet_date_from_timestamp_encoder(
694  const ColumnDescriptor* omnisci_column,
695  const parquet::ColumnDescriptor* parquet_column,
696  AbstractBuffer* buffer,
697  const bool is_metadata_scan_or_for_import) {
698  auto column_type = omnisci_column->columnType;
699  if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
700  CHECK(column_type.get_compression() == kENCODING_DATE_IN_DAYS);
701  if (is_metadata_scan_or_for_import) {
702  if (column_type.get_comp_param() ==
703  0) { // DATE ENCODING FIXED (32) uses comp param 0
705  int64_t,
706  int32_t>(
707  omnisci_column, parquet_column, buffer, true);
708  } else if (column_type.get_comp_param() == 16) {
710  int64_t,
711  int16_t>(
712  omnisci_column, parquet_column, buffer, true);
713  } else {
714  UNREACHABLE();
715  }
716  } else {
717  if (column_type.get_comp_param() ==
718  0) { // DATE ENCODING FIXED (32) uses comp param 0
720  int64_t,
721  int32_t>(
722  omnisci_column, parquet_column, buffer, false);
723  } else if (column_type.get_comp_param() == 16) {
725  int64_t,
726  int16_t>(
727  omnisci_column, parquet_column, buffer, false);
728  } else {
729  UNREACHABLE();
730  }
731  }
732  }
733  return {};
734 }
735 
736 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
737  const ColumnDescriptor* omnisci_column,
738  const parquet::ColumnDescriptor* parquet_column,
739  AbstractBuffer* buffer,
740  const bool is_metadata_scan_or_for_import) {
741  auto column_type = omnisci_column->columnType;
742  if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
743  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
744  if (is_metadata_scan_or_for_import) {
745  if (column_type.get_comp_param() ==
746  0) { // DATE ENCODING FIXED (32) uses comp param 0
747  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int32_t>>(
748  buffer);
749  } else if (column_type.get_comp_param() == 16) {
750  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int16_t>>(
751  buffer);
752  } else {
753  UNREACHABLE();
754  }
755  } else {
756  if (column_type.get_comp_param() ==
757  0) { // DATE ENCODING FIXED (32) uses comp param 0
758  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
759  buffer, omnisci_column, parquet_column);
760  } else if (column_type.get_comp_param() == 16) {
761  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
762  buffer, omnisci_column, parquet_column);
763  } else {
764  UNREACHABLE();
765  }
766  }
767  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
768  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int64_t>>(
769  buffer, omnisci_column, parquet_column);
770  } else {
771  UNREACHABLE();
772  }
773  }
774  return {};
775 }
776 
777 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
778  const ColumnDescriptor* omnisci_column,
779  const parquet::ColumnDescriptor* parquet_column,
780  const Chunk_NS::Chunk& chunk,
781  StringDictionary* string_dictionary,
782  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
783  bool is_for_import,
784  const bool is_for_detect) {
785  auto column_type = omnisci_column->columnType;
786  if (!is_valid_parquet_string(parquet_column) ||
787  !omnisci_column->columnType.is_string()) {
788  return {};
789  }
790  if (column_type.get_compression() == kENCODING_NONE) {
791  if (is_for_import) {
792  return std::make_shared<ParquetStringImportEncoder>(chunk.getBuffer());
793  } else {
794  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
795  chunk.getIndexBuf());
796  }
797  } else if (column_type.get_compression() == kENCODING_DICT) {
798  if (!is_for_detect) { // non-detect use case
799  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
800  std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
801  logical_chunk_metadata->sqlType = omnisci_column->columnType;
802  switch (column_type.get_size()) {
803  case 1:
804  return std::make_shared<ParquetStringEncoder<uint8_t>>(
805  chunk.getBuffer(),
806  string_dictionary,
807  is_for_import ? nullptr : logical_chunk_metadata.get());
808  case 2:
809  return std::make_shared<ParquetStringEncoder<uint16_t>>(
810  chunk.getBuffer(),
811  string_dictionary,
812  is_for_import ? nullptr : logical_chunk_metadata.get());
813  case 4:
814  return std::make_shared<ParquetStringEncoder<int32_t>>(
815  chunk.getBuffer(),
816  string_dictionary,
817  is_for_import ? nullptr : logical_chunk_metadata.get());
818  default:
819  UNREACHABLE();
820  }
821  } else { // detect use-case
822  return std::make_shared<ParquetDetectStringEncoder>(chunk.getBuffer());
823  }
824  } else {
825  UNREACHABLE();
826  }
827  return {};
828 }
829 
830 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
831  const ColumnDescriptor* omnisci_column,
832  const parquet::ColumnDescriptor* parquet_column,
833  std::list<Chunk_NS::Chunk>& chunks,
834  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
835  const RenderGroupAnalyzerMap* render_group_analyzer_map,
836  const bool is_metadata_scan,
837  const bool is_for_import) {
838  auto column_type = omnisci_column->columnType;
839  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
840  return {};
841  }
842  if (is_for_import) {
843  return std::make_shared<ParquetGeospatialImportEncoder>(chunks); // no RGAMap
844  }
845  if (is_metadata_scan) {
846  return std::make_shared<ParquetGeospatialEncoder>(render_group_analyzer_map);
847  }
848  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
849  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
850  auto& chunk_metadata_ptr = chunk_metadata.back();
851  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
852  }
853  return std::make_shared<ParquetGeospatialEncoder>(
854  parquet_column, chunks, chunk_metadata, render_group_analyzer_map);
855 }
856 
857 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
858 // `create_parquet_array_encoder` each make use of each other, so
859 // one of the two functions must have a forward declaration
860 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
861  const ColumnDescriptor* omnisci_column,
862  const parquet::ColumnDescriptor* parquet_column,
863  std::list<Chunk_NS::Chunk>& chunks,
864  StringDictionary* string_dictionary,
865  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
866  const bool is_metadata_scan,
867  const bool is_for_import,
868  const bool is_for_detect);
869 
902 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
903  const ColumnDescriptor* omnisci_column,
904  const parquet::ColumnDescriptor* parquet_column,
905  std::list<Chunk_NS::Chunk>& chunks,
906  StringDictionary* string_dictionary,
907  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
908  const RenderGroupAnalyzerMap* render_group_analyzer_map,
909  const bool is_metadata_scan = false,
910  const bool is_for_import = false,
911  const bool is_for_detect = false) {
912  CHECK(!(is_metadata_scan && is_for_import));
913  auto buffer = chunks.empty() ? nullptr : chunks.begin()->getBuffer();
914  if (auto encoder = create_parquet_geospatial_encoder(omnisci_column,
915  parquet_column,
916  chunks,
917  chunk_metadata,
918  render_group_analyzer_map,
919  is_metadata_scan,
920  is_for_import)) {
921  return encoder;
922  }
923  if (auto encoder = create_parquet_array_encoder(omnisci_column,
924  parquet_column,
925  chunks,
926  string_dictionary,
927  chunk_metadata,
928  is_metadata_scan,
929  is_for_import,
930  is_for_detect)) {
931  return encoder;
932  }
933  if (auto encoder = create_parquet_decimal_encoder(
934  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
935  return encoder;
936  }
937  if (auto encoder = create_parquet_integral_encoder(
938  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
939  return encoder;
940  }
941  if (auto encoder =
942  create_parquet_floating_point_encoder(omnisci_column, parquet_column, buffer)) {
943  return encoder;
944  }
945  if (auto encoder = create_parquet_timestamp_encoder(
946  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
947  return encoder;
948  }
949  if (auto encoder =
950  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
951  return encoder;
952  }
953  if (auto encoder = create_parquet_time_encoder(
954  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
955  return encoder;
956  }
958  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
959  return encoder;
960  }
961  if (auto encoder = create_parquet_date_encoder(
962  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
963  return encoder;
964  }
965  if (auto encoder = create_parquet_string_encoder(
966  omnisci_column,
967  parquet_column,
968  chunks.empty() ? Chunk_NS::Chunk{} : *chunks.begin(),
969  string_dictionary,
970  chunk_metadata,
971  is_for_import,
972  is_for_detect)) {
973  return encoder;
974  }
975  UNREACHABLE();
976  return {};
977 }
978 
982 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_import(
983  std::list<Chunk_NS::Chunk>& chunks,
984  const ColumnDescriptor* omnisci_column,
985  const parquet::ColumnDescriptor* parquet_column,
986  StringDictionary* string_dictionary,
987  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
988  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
989  return create_parquet_encoder(omnisci_column,
990  parquet_column,
991  chunks,
992  string_dictionary,
993  chunk_metadata,
994  render_group_analyzer_map,
995  false,
996  true);
997 }
998 
1003 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_metadata_scan(
1004  const ColumnDescriptor* omnisci_column,
1005  const parquet::ColumnDescriptor* parquet_column,
1006  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
1007  std::list<Chunk_NS::Chunk> chunks;
1008  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1009  return create_parquet_encoder(omnisci_column,
1010  parquet_column,
1011  chunks,
1012  nullptr,
1013  chunk_metadata,
1014  render_group_analyzer_map,
1015  true);
1016 }
1017 
1018 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
1019  const ColumnDescriptor* omnisci_column,
1020  const parquet::ColumnDescriptor* parquet_column,
1021  std::list<Chunk_NS::Chunk>& chunks,
1022  StringDictionary* string_dictionary,
1023  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
1024  const bool is_metadata_scan,
1025  const bool is_for_import,
1026  const bool is_for_detect) {
1027  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
1028  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
1029  return {};
1030  }
1031  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1032  get_sub_type_column_descriptor(omnisci_column);
1033  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
1034  parquet_column,
1035  chunks,
1036  string_dictionary,
1037  chunk_metadata,
1038  nullptr,
1039  is_metadata_scan,
1040  is_for_import,
1041  is_for_detect);
1042  CHECK(encoder.get());
1043  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
1044  CHECK(scalar_encoder);
1045  if (!is_for_import) {
1046  if (!is_for_detect) {
1047  if (omnisci_column->columnType.is_fixlen_array()) {
1048  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1049  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1050  scalar_encoder,
1051  omnisci_column);
1052  } else {
1053  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1054  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1055  is_metadata_scan ? nullptr : chunks.begin()->getIndexBuf(),
1056  scalar_encoder,
1057  omnisci_column);
1058  }
1059  } else { // is_for_detect
1060  encoder = std::make_shared<ParquetArrayDetectEncoder>(
1061  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1062  }
1063  } else { // is_for_import
1064  encoder = std::make_shared<ParquetArrayImportEncoder>(
1065  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1066  }
1067  return encoder;
1068 }
1069 
1071  const parquet::ParquetFileReader* reader,
1072  const int row_group_index,
1073  const int column_index,
1074  const int16_t* def_levels,
1075  const int64_t num_levels,
1076  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1077  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1078  if (!is_valid_parquet_list) {
1079  return;
1080  }
1081  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1082  reader->metadata()->RowGroup(row_group_index);
1083  auto column_metadata = group_metadata->ColumnChunk(column_index);
1084  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
1085  if (!stats->HasMinMax()) {
1086  auto find_it = std::find_if(def_levels,
1087  def_levels + num_levels,
1088  [](const int16_t def_level) { return def_level == 3; });
1089  if (find_it != def_levels + num_levels) {
1090  throw std::runtime_error(
1091  "No minimum and maximum statistic set in list column but non-null & non-empty "
1092  "array/value detected.");
1093  }
1094  }
1095 }
1096 
1098  const ColumnDescriptor* omnisci_column_descriptor,
1099  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1100  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1101  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
1102  throw std::runtime_error(
1103  "Unsupported mapping detected. Column '" + parquet_column_descriptor->name() +
1104  "' detected to be a parquet list but HeavyDB mapped column '" +
1105  omnisci_column_descriptor->columnName + "' is not an array.");
1106  }
1107  if (is_valid_parquet_list) {
1108  if (parquet_column_descriptor->max_repetition_level() != 1 ||
1109  parquet_column_descriptor->max_definition_level() != 3) {
1110  throw std::runtime_error(
1111  "Incorrect schema max repetition level detected in column '" +
1112  parquet_column_descriptor->name() +
1113  "'. Expected a max repetition level of 1 and max definition level of 3 for "
1114  "list column but column has a max "
1115  "repetition level of " +
1116  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1117  " and a max definition level of " +
1118  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1119  }
1120  } else {
1121  if (parquet_column_descriptor->max_repetition_level() != 0 ||
1122  parquet_column_descriptor->max_definition_level() != 1) {
1123  throw std::runtime_error(
1124  "Incorrect schema max repetition level detected in column '" +
1125  parquet_column_descriptor->name() +
1126  "'. Expected a max repetition level of 0 and max definition level of 1 for "
1127  "flat column but column has a max "
1128  "repetition level of " +
1129  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1130  " and a max definition level of " +
1131  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1132  }
1133  }
1134 }
1135 
1136 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
1137  const parquet::ColumnDescriptor* parquet_column,
1138  std::vector<int8_t>& values) {
1139  auto max_type_byte_size =
1140  std::max(omnisci_column->columnType.get_size(),
1141  parquet::GetTypeByteSize(parquet_column->physical_type()));
1142  size_t values_size =
1144  values.resize(values_size);
1145 }
1146 
1147 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
1148  const parquet::ColumnDescriptor* parquet_column) {
1149  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1150  parquet_column->logical_type().get())) {
1151  return omnisci_column->columnType.get_precision() ==
1152  decimal_logical_column->precision() &&
1153  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
1154  omnisci_column->columnType.is_decimal() &&
1155  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1156  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1157  }
1158  return false;
1159 }
1160 
1161 SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor* parquet_column) {
1162  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1163  parquet_column->logical_type().get())) {
1164  auto parquet_precision = decimal_logical_column->precision();
1165  auto parquet_scale = decimal_logical_column->scale();
1166  if (parquet_precision > sql_constants::kMaxNumericPrecision) {
1168  "Parquet column \"" + parquet_column->ToString() +
1169  "\" has decimal precision of " + std::to_string(parquet_precision) +
1170  " which is too high to import, maximum precision supported is " +
1172  }
1173  SQLTypeInfo type;
1174  type.set_type(kDECIMAL);
1176  type.set_precision(parquet_precision);
1177  type.set_scale(parquet_scale);
1178  type.set_fixed_size();
1179  return type;
1180  }
1181  UNREACHABLE()
1182  << " a Parquet column's decimal logical type failed to be read appropriately";
1183  return {};
1184 }
1185 
1187  const parquet::ColumnDescriptor* parquet_column) {
1188  if (!omnisci_column->columnType.is_fp()) {
1189  return false;
1190  }
1191  // check if mapping is a valid coerced or non-coerced floating point mapping
1192  // with no annotation (floating point columns have no annotation in the
1193  // Parquet specification)
1194  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1195  return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1196  (parquet_column->physical_type() == parquet::Type::FLOAT &&
1197  omnisci_column->columnType.get_type() == kFLOAT);
1198  }
1199  return false;
1200 }
1201 
1203  const parquet::ColumnDescriptor* parquet_column) {
1204  SQLTypeInfo type;
1205  if (parquet_column->physical_type() == parquet::Type::FLOAT) {
1206  type.set_type(kFLOAT);
1207  } else if (parquet_column->physical_type() == parquet::Type::DOUBLE) {
1208  type.set_type(kDOUBLE);
1209  } else {
1210  UNREACHABLE();
1211  }
1213  type.set_fixed_size();
1214  return type;
1215 }
1216 
1217 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
1218  const parquet::ColumnDescriptor* parquet_column) {
1219  if (!omnisci_column->columnType.is_integer()) {
1220  return false;
1221  }
1222  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1223  parquet_column->logical_type().get())) {
1224  CHECK(omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1225  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1226  const int bits_per_byte = 8;
1227  // unsigned types are permitted to map to a wider integral type in order to avoid
1228  // precision loss
1229  const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1230  return omnisci_column->columnType.get_size() * bits_per_byte <=
1231  int_logical_column->bit_width() * bit_widening_factor;
1232  }
1233  // check if mapping is a valid coerced or non-coerced integral mapping with no
1234  // annotation
1235  if ((omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1236  omnisci_column->columnType.get_compression() == kENCODING_FIXED)) {
1237  return (parquet_column->physical_type() == parquet::Type::INT64) ||
1238  (parquet_column->physical_type() == parquet::Type::INT32 &&
1239  omnisci_column->columnType.get_size() <= 4);
1240  }
1241  return false;
1242 }
1243 
1244 SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor* parquet_column) {
1245  SQLTypeInfo type;
1247  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1248  parquet_column->logical_type().get())) {
1249  auto bit_width = int_logical_column->bit_width();
1250  if (!int_logical_column->is_signed()) {
1251  if (within_range(33, 64, bit_width)) {
1253  "Unsigned integer column \"" + parquet_column->name() +
1254  "\" in Parquet file with 64 bit-width has no supported type for ingestion "
1255  "that will not result in data loss");
1256  } else if (within_range(17, 32, bit_width)) {
1257  type.set_type(kBIGINT);
1258  } else if (within_range(9, 16, bit_width)) {
1259  type.set_type(kINT);
1260  } else if (within_range(0, 8, bit_width)) {
1261  type.set_type(kSMALLINT);
1262  }
1263  } else {
1264  if (within_range(33, 64, bit_width)) {
1265  type.set_type(kBIGINT);
1266  } else if (within_range(17, 32, bit_width)) {
1267  type.set_type(kINT);
1268  } else if (within_range(9, 16, bit_width)) {
1269  type.set_type(kSMALLINT);
1270  } else if (within_range(0, 8, bit_width)) {
1271  type.set_type(kTINYINT);
1272  }
1273  }
1274  type.set_fixed_size();
1275  return type;
1276  }
1277 
1278  CHECK(parquet_column->logical_type()->is_none());
1279  if (parquet_column->physical_type() == parquet::Type::INT32) {
1280  type.set_type(kINT);
1281  } else {
1282  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
1283  type.set_type(kBIGINT);
1284  }
1285  type.set_fixed_size();
1286  return type;
1287 }
1288 
1289 bool is_nanosecond_precision(const ColumnDescriptor* omnisci_column) {
1290  return omnisci_column->columnType.get_dimension() == 9;
1291 }
1292 
1294  const parquet::TimestampLogicalType* timestamp_logical_column) {
1295  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1296 }
1297 
1298 bool is_microsecond_precision(const ColumnDescriptor* omnisci_column) {
1299  return omnisci_column->columnType.get_dimension() == 6;
1300 }
1301 
1303  const parquet::TimestampLogicalType* timestamp_logical_column) {
1304  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1305 }
1306 
1307 bool is_millisecond_precision(const ColumnDescriptor* omnisci_column) {
1308  return omnisci_column->columnType.get_dimension() == 3;
1309 }
1310 
1312  const parquet::TimestampLogicalType* timestamp_logical_column) {
1313  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1314 }
1315 
1317  const parquet::ColumnDescriptor* parquet_column) {
1318  bool is_none_encoded_mapping =
1319  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
1320  (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1321  omnisci_column->columnType.get_type() == kBOOLEAN);
1322  return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1323 }
1324 
1326  const parquet::ColumnDescriptor* parquet_column) {
1327  SQLTypeInfo type;
1329  type.set_type(kBOOLEAN);
1330  type.set_fixed_size();
1331  return type;
1332 }
1333 
1335  const parquet::ColumnDescriptor* parquet_column) {
1336  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
1337  ((omnisci_column->columnType.get_compression() == kENCODING_NONE) ||
1338  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1339  omnisci_column->columnType.get_comp_param() == 32)))) {
1340  return false;
1341  }
1342  // check the annotated case
1343  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1344  parquet_column->logical_type().get())) {
1345  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1346  return omnisci_column->columnType.get_dimension() == 0 ||
1347  ((is_nanosecond_precision(omnisci_column) &&
1348  is_nanosecond_precision(timestamp_logical_column)) ||
1349  (is_microsecond_precision(omnisci_column) &&
1350  is_microsecond_precision(timestamp_logical_column)) ||
1351  (is_millisecond_precision(omnisci_column) &&
1352  is_millisecond_precision(timestamp_logical_column)));
1353  }
1354  if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
1355  return omnisci_column->columnType.get_dimension() == 0;
1356  }
1357  }
1358  // check the unannotated case
1359  if (parquet_column->logical_type()->is_none() &&
1360  ((parquet_column->physical_type() == parquet::Type::INT32 &&
1361  omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1362  omnisci_column->columnType.get_comp_param() == 32) ||
1363  parquet_column->physical_type() == parquet::Type::INT64)) {
1364  return true;
1365  }
1366  return false;
1367 }
1368 
1369 SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor* parquet_column) {
1370  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1371  parquet_column->logical_type().get())) {
1372  SQLTypeInfo type;
1373  type.set_type(kTIMESTAMP);
1375  if (is_nanosecond_precision(timestamp_logical_column)) {
1376  type.set_precision(9);
1377  } else if (is_microsecond_precision(timestamp_logical_column)) {
1378  type.set_precision(6);
1379  } else if (is_millisecond_precision(timestamp_logical_column)) {
1380  type.set_precision(3);
1381  }
1382  type.set_fixed_size();
1383  return type;
1384  }
1385  UNREACHABLE();
1386  return {};
1387 }
1388 
1389 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
1390  const parquet::ColumnDescriptor* parquet_column) {
1391  if (!(omnisci_column->columnType.get_type() == kTIME &&
1392  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1393  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1394  omnisci_column->columnType.get_comp_param() == 32)))) {
1395  return false;
1396  }
1397  if (parquet_column->logical_type()->is_time()) {
1398  return true;
1399  }
1400  return false;
1401 }
1402 
1403 SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor* parquet_column) {
1404  CHECK(parquet_column->logical_type()->is_time());
1405  SQLTypeInfo type;
1406  type.set_type(kTIME);
1407  type.set_compression(kENCODING_NONE);
1408  type.set_fixed_size();
1409  return type;
1410 }
1411 
1412 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
1413  const parquet::ColumnDescriptor* parquet_column) {
1414  if (!(omnisci_column->columnType.get_type() == kDATE &&
1415  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
1416  (omnisci_column->columnType.get_comp_param() ==
1417  0 // DATE ENCODING DAYS (32) specifies comp_param of 0
1418  || omnisci_column->columnType.get_comp_param() == 16)) ||
1419  omnisci_column->columnType.get_compression() ==
1420  kENCODING_NONE // for array types
1421  ))) {
1422  return false;
1423  }
1424  return parquet_column->logical_type()->is_date() ||
1425  parquet_column->logical_type()
1426  ->is_timestamp(); // to support TIMESTAMP -> DATE coercion
1427 }
1428 
1429 SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor* parquet_column) {
1430  CHECK(parquet_column->logical_type()->is_date());
1431  SQLTypeInfo type;
1432  type.set_type(kDATE);
1433  type.set_compression(kENCODING_NONE);
1434  type.set_fixed_size();
1435  return type;
1436 }
1437 
1438 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
1439  const parquet::ColumnDescriptor* parquet_column) {
1440  return is_valid_parquet_string(parquet_column) &&
1441  omnisci_column->columnType.is_string() &&
1442  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1443  omnisci_column->columnType.get_compression() == kENCODING_DICT);
1444 }
1445 
1446 SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor* parquet_column) {
1447  CHECK(is_valid_parquet_string(parquet_column));
1448  SQLTypeInfo type;
1449  type.set_type(kTEXT);
1451  type.set_comp_param(32);
1452  type.set_fixed_size();
1453  return type;
1454 }
1455 
1456 bool validate_array_mapping(const ColumnDescriptor* omnisci_column,
1457  const parquet::ColumnDescriptor* parquet_column) {
1458  if (is_valid_parquet_list_column(parquet_column) &&
1459  omnisci_column->columnType.is_array()) {
1460  auto omnisci_column_sub_type_column = get_sub_type_column_descriptor(omnisci_column);
1462  omnisci_column_sub_type_column.get(), parquet_column);
1463  }
1464  return false;
1465 }
1466 
1468  const parquet::ColumnDescriptor* parquet_column) {
1469  return is_valid_parquet_string(parquet_column) &&
1470  omnisci_column->columnType.is_geometry();
1471 }
1472 
1473 void validate_equal_schema(const parquet::arrow::FileReader* reference_file_reader,
1474  const parquet::arrow::FileReader* new_file_reader,
1475  const std::string& reference_file_path,
1476  const std::string& new_file_path) {
1477  const auto reference_num_columns =
1478  reference_file_reader->parquet_reader()->metadata()->num_columns();
1479  const auto new_num_columns =
1480  new_file_reader->parquet_reader()->metadata()->num_columns();
1481  if (reference_num_columns != new_num_columns) {
1482  throw std::runtime_error{"Parquet file \"" + new_file_path +
1483  "\" has a different schema. Please ensure that all Parquet "
1484  "files use the same schema. Reference Parquet file: \"" +
1485  reference_file_path + "\" has " +
1486  std::to_string(reference_num_columns) +
1487  " columns. New Parquet file \"" + new_file_path + "\" has " +
1488  std::to_string(new_num_columns) + " columns."};
1489  }
1490 
1491  for (int i = 0; i < reference_num_columns; i++) {
1492  validate_equal_column_descriptor(get_column_descriptor(reference_file_reader, i),
1493  get_column_descriptor(new_file_reader, i),
1494  reference_file_path,
1495  new_file_path);
1496  }
1497 }
1498 
1499 void validate_allowed_mapping(const parquet::ColumnDescriptor* parquet_column,
1500  const ColumnDescriptor* omnisci_column) {
1501  parquet::Type::type physical_type = parquet_column->physical_type();
1502  auto logical_type = parquet_column->logical_type();
1503  bool allowed_type =
1504  LazyParquetChunkLoader::isColumnMappingSupported(omnisci_column, parquet_column);
1505  if (!allowed_type) {
1506  if (logical_type->is_timestamp()) {
1507  auto timestamp_type =
1508  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
1509  CHECK(timestamp_type);
1510 
1511  if (!timestamp_type->is_adjusted_to_utc()) {
1512  LOG(WARNING) << "Non-UTC timezone specified in Parquet file for column \""
1513  << omnisci_column->columnName
1514  << "\". Only UTC timezone is currently supported.";
1515  }
1516  }
1517  std::string parquet_type;
1518  if (parquet_column->logical_type()->is_none()) {
1519  parquet_type = parquet::TypeToString(physical_type);
1520  } else {
1521  parquet_type = logical_type->ToString();
1522  }
1523  std::string omnisci_type = omnisci_column->columnType.get_type_name();
1524  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
1525  "\" to HeavyDB type \"" + omnisci_type +
1526  "\" is not allowed. Please use an appropriate column type."};
1527  }
1528 }
1529 
1530 SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor* parquet_column) {
1531  // decimal case
1532  if (parquet_column->logical_type()->is_decimal()) {
1533  return suggest_decimal_mapping(parquet_column);
1534  }
1535  // float case
1536  if (parquet_column->logical_type()->is_none() &&
1537  (parquet_column->physical_type() == parquet::Type::FLOAT ||
1538  parquet_column->physical_type() == parquet::Type::DOUBLE)) {
1539  return suggest_floating_point_mapping(parquet_column);
1540  }
1541  // integral case
1542  if ((parquet_column->logical_type()->is_none() &&
1543  (parquet_column->physical_type() == parquet::Type::INT32 ||
1544  parquet_column->physical_type() == parquet::Type::INT64)) ||
1545  parquet_column->logical_type()->is_int()) {
1546  return suggest_integral_mapping(parquet_column);
1547  }
1548  // boolean case
1549  if (parquet_column->logical_type()->is_none() &&
1550  parquet_column->physical_type() == parquet::Type::BOOLEAN) {
1551  return suggest_boolean_type_mapping(parquet_column);
1552  }
1553  // timestamp case
1554  if (parquet_column->logical_type()->is_timestamp()) {
1555  return suggest_timestamp_mapping(parquet_column);
1556  }
1557  // time case
1558  if (parquet_column->logical_type()->is_time()) {
1559  return suggest_time_mapping(parquet_column);
1560  }
1561  // date case
1562  if (parquet_column->logical_type()->is_date()) {
1563  return suggest_date_mapping(parquet_column);
1564  }
1565  // string case
1566  if (is_valid_parquet_string(parquet_column)) {
1567  return suggest_string_mapping(parquet_column);
1568  }
1569 
1570  throw ForeignStorageException("Unsupported data type detected for column: " +
1571  parquet_column->ToString());
1572 }
1573 
1575  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1576  const std::string& file_path,
1577  const ForeignTableSchema& schema) {
1578  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
1580  schema.numLogicalColumns(), file_metadata->num_columns(), file_path);
1581  }
1582 }
1583 
1584 void throw_missing_metadata_error(const int row_group_index,
1585  const int column_index,
1586  const std::string& file_path) {
1587  throw std::runtime_error{
1588  "Statistics metadata is required for all row groups. Metadata is missing for "
1589  "row group index: " +
1590  std::to_string(row_group_index) +
1591  ", column index: " + std::to_string(column_index) + ", file path: " + file_path};
1592 }
1593 
1597  std::string file_path;
1598 };
1599 
1601  const MaxRowGroupSizeStats max_row_group_stats,
1602  const int fragment_size) {
1603  auto metadata_scan_exception = MetadataScanInfeasibleFragmentSizeException{
1604  "Parquet file has a row group size that is larger than the fragment size. "
1605  "Please set the table fragment size to a number that is larger than the "
1606  "row group size. Row group index: " +
1607  std::to_string(max_row_group_stats.max_row_group_index) +
1608  ", row group size: " + std::to_string(max_row_group_stats.max_row_group_size) +
1609  ", fragment size: " + std::to_string(fragment_size) +
1610  ", file path: " + max_row_group_stats.file_path};
1611  metadata_scan_exception.min_feasible_fragment_size_ =
1612  max_row_group_stats.max_row_group_size;
1613  throw metadata_scan_exception;
1614 }
1615 
1617  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1618  const std::string& file_path,
1619  const ForeignTableSchema& schema) {
1620  auto column_it = schema.getLogicalColumns().begin();
1621  MaxRowGroupSizeStats max_row_group_stats{0, 0};
1622  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1623  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1624  try {
1625  validate_allowed_mapping(descr, *column_it);
1626  } catch (std::runtime_error& e) {
1627  std::stringstream error_message;
1628  error_message << e.what() << " Parquet column: " << descr->name()
1629  << ", HeavyDB column: " << (*column_it)->columnName
1630  << ", Parquet file: " << file_path << ".";
1631  throw std::runtime_error(error_message.str());
1632  }
1633 
1634  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
1635  auto group_metadata = file_metadata->RowGroup(r);
1636  auto num_rows = group_metadata->num_rows();
1637  if (num_rows == 0) {
1638  continue;
1639  } else if (num_rows > max_row_group_stats.max_row_group_size) {
1640  max_row_group_stats.max_row_group_size = num_rows;
1641  max_row_group_stats.max_row_group_index = r;
1642  max_row_group_stats.file_path = file_path;
1643  }
1644 
1645  auto column_chunk = group_metadata->ColumnChunk(i);
1646  bool contains_metadata = column_chunk->is_stats_set();
1647  if (contains_metadata) {
1648  auto stats = column_chunk->statistics();
1649  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1650  bool is_list = is_valid_parquet_list_column(file_metadata->schema()->Column(i));
1651  // Given a list, it is possible it has no min or max if it is comprised
1652  // only of empty lists & nulls. This can not be detected by comparing
1653  // the null count; therefore we afford list types the benefit of the
1654  // doubt in this situation.
1655  if (!(stats->HasMinMax() || is_all_nulls || is_list)) {
1656  contains_metadata = false;
1657  }
1658  }
1659 
1660  if (!contains_metadata) {
1661  throw_missing_metadata_error(r, i, file_path);
1662  }
1663  }
1664  }
1665  return max_row_group_stats;
1666 }
1667 
1669  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1670  const std::string& file_path,
1671  const ForeignTableSchema& schema) {
1672  validate_number_of_columns(file_metadata, file_path, schema);
1673  return validate_column_mapping_and_row_group_metadata(file_metadata, file_path, schema);
1674 }
1675 
1676 std::list<RowGroupMetadata> metadata_scan_rowgroup_interval(
1677  const std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1678  const RowGroupInterval& row_group_interval,
1679  const ReaderPtr& reader,
1680  const ForeignTableSchema& schema) {
1681  std::list<RowGroupMetadata> row_group_metadata;
1682  auto column_interval =
1683  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1684  schema.getLogicalAndPhysicalColumns().back()->columnId};
1685 
1686  auto file_metadata = reader->parquet_reader()->metadata();
1687  for (int row_group = row_group_interval.start_index;
1688  row_group <= row_group_interval.end_index;
1689  ++row_group) {
1690  auto& row_group_metadata_item = row_group_metadata.emplace_back();
1691  row_group_metadata_item.row_group_index = row_group;
1692  row_group_metadata_item.file_path = row_group_interval.file_path;
1693 
1694  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1695  file_metadata->RowGroup(row_group);
1696 
1697  for (int column_id = column_interval.start; column_id <= column_interval.end;
1698  column_id++) {
1699  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1700  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1701  auto encoder_map_iter =
1702  encoder_map.find(schema.getLogicalColumn(column_id)->columnId);
1703  CHECK(encoder_map_iter != encoder_map.end());
1704  try {
1705  auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1706  group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1707  row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1708  } catch (const std::exception& e) {
1709  std::stringstream error_message;
1710  error_message << e.what() << " in row group " << row_group << " of Parquet file '"
1711  << row_group_interval.file_path << "'.";
1712  throw std::runtime_error(error_message.str());
1713  }
1714  }
1715  }
1716  return row_group_metadata;
1717 }
1718 
1719 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_import(
1720  const std::map<int, Chunk_NS::Chunk> chunks,
1721  const ForeignTableSchema& schema,
1722  const ReaderPtr& reader,
1723  const std::map<int, StringDictionary*> column_dictionaries,
1724  const int64_t num_rows,
1725  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
1726  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1727  auto file_metadata = reader->parquet_reader()->metadata();
1728  for (auto& [column_id, chunk] : chunks) {
1729  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1730  if (column_descriptor->isGeoPhyCol) { // skip physical columns
1731  continue;
1732  }
1733  auto parquet_column_descriptor =
1734  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1735  auto find_it = column_dictionaries.find(column_id);
1736  StringDictionary* dictionary =
1737  (find_it == column_dictionaries.end() ? nullptr : find_it->second);
1738  std::list<Chunk_NS::Chunk> chunks_for_import;
1739  chunks_for_import.push_back(chunk);
1740  if (column_descriptor->columnType.is_geometry()) {
1741  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1742  chunks_for_import.push_back(chunks.at(column_id + i + 1));
1743  }
1744  }
1745  encoder_map[column_id] = create_parquet_encoder_for_import(chunks_for_import,
1746  column_descriptor,
1747  parquet_column_descriptor,
1748  dictionary,
1749  render_group_analyzer_map);
1750 
1751  // reserve space in buffer when num-elements known ahead of time for types
1752  // of known size (for example dictionary encoded strings)
1753  auto encoder = shared::get_from_map(encoder_map, column_id);
1754  if (auto inplace_encoder = dynamic_cast<ParquetInPlaceEncoder*>(encoder.get())) {
1755  inplace_encoder->reserve(num_rows);
1756  }
1757  }
1758  return encoder_map;
1759 }
1760 
1761 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_metadata_scan(
1762  const Interval<ColumnType>& column_interval,
1763  const ForeignTableSchema& schema,
1764  const ReaderPtr& reader,
1765  const RenderGroupAnalyzerMap* render_group_analyzer_map,
1766  const bool do_metadata_stats_validation) {
1767  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1768  auto file_metadata = reader->parquet_reader()->metadata();
1769  for (int column_id = column_interval.start; column_id <= column_interval.end;
1770  column_id++) {
1771  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1772  auto parquet_column_descriptor =
1773  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1774  encoder_map[column_id] = create_parquet_encoder_for_metadata_scan(
1775  column_descriptor, parquet_column_descriptor, render_group_analyzer_map);
1776  if (!do_metadata_stats_validation) {
1777  shared::get_from_map(encoder_map, column_id)->disableMetadataStatsValidation();
1778  }
1779  column_id += column_descriptor->columnType.get_physical_cols();
1780  }
1781  return encoder_map;
1782 }
1783 } // namespace
1784 
1785 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroups(
1786  const std::vector<RowGroupInterval>& row_group_intervals,
1787  const int parquet_column_index,
1788  const ColumnDescriptor* column_descriptor,
1789  std::list<Chunk_NS::Chunk>& chunks,
1790  StringDictionary* string_dictionary,
1791  RejectedRowIndices* rejected_row_indices,
1792  const bool is_for_detect,
1793  const std::optional<int64_t> max_levels_read) {
1794  auto timer = DEBUG_TIMER(__func__);
1795  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1796  // `def_levels` and `rep_levels` below are used to store the read definition
1797  // and repetition levels of the Dremel encoding implemented by the Parquet
1798  // format
1799  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1800  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1801  std::vector<int8_t> values;
1802 
1803  CHECK(!row_group_intervals.empty());
1804  const auto& first_file_path = row_group_intervals.front().file_path;
1805 
1806  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1807  auto first_parquet_column_descriptor =
1808  get_column_descriptor(first_file_reader, parquet_column_index);
1809  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1810  auto encoder = create_parquet_encoder(column_descriptor,
1811  first_parquet_column_descriptor,
1812  chunks,
1813  string_dictionary,
1814  chunk_metadata,
1816  false,
1817  false,
1818  is_for_detect);
1819  CHECK(encoder.get());
1820 
1821  if (rejected_row_indices) { // error tracking is enabled
1822  encoder->initializeErrorTracking(column_descriptor->columnType);
1823  }
1824 
1825  bool early_exit = false;
1826  int64_t total_levels_read = 0;
1827  for (const auto& row_group_interval : row_group_intervals) {
1828  const auto& file_path = row_group_interval.file_path;
1829  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1830 
1831  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1832  CHECK(row_group_interval.start_index >= 0 &&
1833  row_group_interval.end_index < num_row_groups);
1834  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1835 
1836  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1837  auto parquet_column_descriptor =
1838  get_column_descriptor(file_reader, parquet_column_index);
1839  validate_equal_column_descriptor(first_parquet_column_descriptor,
1840  parquet_column_descriptor,
1841  first_file_path,
1842  file_path);
1843 
1845  parquet_column_descriptor);
1846  int64_t values_read = 0;
1847  for (int row_group_index = row_group_interval.start_index;
1848  row_group_index <= row_group_interval.end_index;
1849  ++row_group_index) {
1850  auto group_reader = parquet_reader->RowGroup(row_group_index);
1851  std::shared_ptr<parquet::ColumnReader> col_reader =
1852  group_reader->Column(parquet_column_index);
1853 
1854  try {
1855  while (col_reader->HasNext()) {
1856  int64_t levels_read =
1858  def_levels.data(),
1859  rep_levels.data(),
1860  reinterpret_cast<uint8_t*>(values.data()),
1861  &values_read,
1862  col_reader.get());
1863 
1864  validate_definition_levels(parquet_reader,
1865  row_group_index,
1866  parquet_column_index,
1867  def_levels.data(),
1868  levels_read,
1869  parquet_column_descriptor);
1870 
1871  if (rejected_row_indices) { // error tracking is enabled
1872  encoder->appendDataTrackErrors(def_levels.data(),
1873  rep_levels.data(),
1874  values_read,
1875  levels_read,
1876  values.data());
1877  } else { // no error tracking enabled
1878  encoder->appendData(def_levels.data(),
1879  rep_levels.data(),
1880  values_read,
1881  levels_read,
1882  values.data());
1883  }
1884 
1885  if (max_levels_read.has_value()) {
1886  total_levels_read += levels_read;
1887  if (total_levels_read >= max_levels_read.value()) {
1888  early_exit = true;
1889  break;
1890  }
1891  }
1892  }
1893  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1894  array_encoder->finalizeRowGroup();
1895  }
1896  } catch (const std::exception& error) {
1897  // check for a specific error to detect a possible unexpected switch of data
1898  // source in order to respond with informative error message
1899  if (boost::regex_search(error.what(),
1900  boost::regex{"Deserializing page header failed."})) {
1902  "Unable to read from foreign data source, possible cause is an unexpected "
1903  "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1904  "the "
1905  "foreign table "
1906  "if data source has been updated. Foreign table: " +
1908  }
1909 
1911  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1912  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1913  "', Parquet file: '" + file_path + "'");
1914  }
1915  if (max_levels_read.has_value() && early_exit) {
1916  break;
1917  }
1918  }
1919  if (max_levels_read.has_value() && early_exit) {
1920  break;
1921  }
1922  }
1923 
1924  if (rejected_row_indices) { // error tracking is enabled
1925  *rejected_row_indices = encoder->getRejectedRowIndices();
1926  }
1927  return chunk_metadata;
1928 }
1929 
1931  const parquet::ColumnDescriptor* parquet_column) {
1932  auto type = suggest_column_scalar_type(parquet_column);
1933 
1934  // array case
1935  if (is_valid_parquet_list_column(parquet_column)) {
1936  return type.get_array_type();
1937  }
1938 
1939  return type;
1940 }
1941 
1943  const ColumnDescriptor* omnisci_column,
1944  const parquet::ColumnDescriptor* parquet_column) {
1945  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1946  return true;
1947  }
1948  if (validate_array_mapping(omnisci_column, parquet_column)) {
1949  return true;
1950  }
1951  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1952  return true;
1953  }
1954  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1955  return true;
1956  }
1957  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1958  return true;
1959  }
1960  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1961  return true;
1962  }
1963  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1964  return true;
1965  }
1966  if (validate_time_mapping(omnisci_column, parquet_column)) {
1967  return true;
1968  }
1969  if (validate_date_mapping(omnisci_column, parquet_column)) {
1970  return true;
1971  }
1972  if (validate_string_mapping(omnisci_column, parquet_column)) {
1973  return true;
1974  }
1975  return false;
1976 }
1977 
1979  std::shared_ptr<arrow::fs::FileSystem> file_system,
1980  FileReaderMap* file_map,
1981  const RenderGroupAnalyzerMap* render_group_analyzer_map,
1982  const std::string& foreign_table_name)
1983  : file_system_(file_system)
1984  , file_reader_cache_(file_map)
1985  , render_group_analyzer_map_{render_group_analyzer_map}
1986  , foreign_table_name_(foreign_table_name) {}
1987 
1988 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
1989  const std::vector<RowGroupInterval>& row_group_intervals,
1990  const int parquet_column_index,
1991  std::list<Chunk_NS::Chunk>& chunks,
1992  StringDictionary* string_dictionary,
1993  RejectedRowIndices* rejected_row_indices) {
1994  CHECK(!chunks.empty());
1995  auto const& chunk = *chunks.begin();
1996  auto column_descriptor = chunk.getColumnDesc();
1997  auto buffer = chunk.getBuffer();
1998  CHECK(buffer);
1999 
2000  try {
2001  auto metadata = appendRowGroups(row_group_intervals,
2002  parquet_column_index,
2003  column_descriptor,
2004  chunks,
2005  string_dictionary,
2006  rejected_row_indices);
2007  return metadata;
2008  } catch (const std::exception& error) {
2009  throw ForeignStorageException(error.what());
2010  }
2011 
2012  return {};
2013 }
2014 
2017  : def_levels(LazyParquetChunkLoader::batch_reader_num_elements)
2018  , rep_levels(LazyParquetChunkLoader::batch_reader_num_elements) {}
2019  std::vector<int16_t> def_levels;
2020  std::vector<int16_t> rep_levels;
2021  std::vector<int8_t> values;
2022  int64_t values_read;
2023  int64_t levels_read;
2024 };
2025 
2027  public:
2028  ParquetRowGroupReader(std::shared_ptr<parquet::ColumnReader> col_reader,
2029  const ColumnDescriptor* column_descriptor,
2030  const parquet::ColumnDescriptor* parquet_column_descriptor,
2031  ParquetEncoder* encoder,
2032  InvalidRowGroupIndices& invalid_indices,
2033  const int row_group_index,
2034  const int parquet_column_index,
2035  const parquet::ParquetFileReader* parquet_reader)
2036  : col_reader_(col_reader)
2037  , column_descriptor_(column_descriptor)
2038  , parquet_column_descriptor_(parquet_column_descriptor)
2039  , encoder_(encoder)
2040  , invalid_indices_(invalid_indices)
2041  , row_group_index_(row_group_index)
2042  , parquet_column_index_(parquet_column_index)
2043  , parquet_reader_(parquet_reader) {
2044  import_encoder = dynamic_cast<ParquetImportEncoder*>(encoder);
2046  }
2047 
2049  while (col_reader_->HasNext()) {
2050  ParquetBatchData batch_data;
2053  batch_data.levels_read =
2055  batch_data.def_levels.data(),
2056  batch_data.rep_levels.data(),
2057  reinterpret_cast<uint8_t*>(batch_data.values.data()),
2058  &batch_data.values_read,
2059  col_reader_.get());
2066  batch_data.def_levels.data(),
2067  batch_data.levels_read,
2069  import_encoder->validateAndAppendData(batch_data.def_levels.data(),
2070  batch_data.rep_levels.data(),
2071  batch_data.values_read,
2072  batch_data.levels_read,
2073  batch_data.values.data(),
2074  column_type,
2076  }
2077  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder_)) {
2078  array_encoder->finalizeRowGroup();
2079  }
2080  }
2081 
2082  void eraseInvalidRowGroupData(const InvalidRowGroupIndices& invalid_indices) {
2083  import_encoder->eraseInvalidIndicesInBuffer(invalid_indices);
2084  }
2085 
2086  private:
2087  std::shared_ptr<parquet::ColumnReader> col_reader_;
2089  const parquet::ColumnDescriptor* parquet_column_descriptor_;
2093  const int row_group_index_;
2095  const parquet::ParquetFileReader* parquet_reader_;
2096 };
2097 
2098 std::pair<size_t, size_t> LazyParquetChunkLoader::loadRowGroups(
2099  const RowGroupInterval& row_group_interval,
2100  const std::map<int, Chunk_NS::Chunk>& chunks,
2101  const ForeignTableSchema& schema,
2102  const std::map<int, StringDictionary*>& column_dictionaries,
2103  const int num_threads) {
2104  auto timer = DEBUG_TIMER(__func__);
2105 
2106  const auto& file_path = row_group_interval.file_path;
2107 
2108  // do not use caching with file-readers, open a new one for every request
2109  auto file_reader_owner = open_parquet_table(file_path, file_system_);
2110  auto file_reader = file_reader_owner.get();
2111  auto file_metadata = file_reader->parquet_reader()->metadata();
2112 
2113  validate_number_of_columns(file_metadata, file_path, schema);
2114 
2115  // check for fixed length encoded columns and indicate to the user
2116  // they should not be used
2117  for (const auto column_descriptor : schema.getLogicalColumns()) {
2118  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
2119  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2120  try {
2121  validate_allowed_mapping(parquet_column, column_descriptor);
2122  } catch (std::runtime_error& e) {
2123  std::stringstream error_message;
2124  error_message << e.what() << " Parquet column: " << parquet_column->name()
2125  << ", HeavyDB column: " << column_descriptor->columnName
2126  << ", Parquet file: " << file_path << ".";
2127  throw std::runtime_error(error_message.str());
2128  }
2129  }
2130 
2131  CHECK(row_group_interval.start_index == row_group_interval.end_index);
2132  auto row_group_index = row_group_interval.start_index;
2133  std::map<int, ParquetRowGroupReader> row_group_reader_map;
2134 
2135  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2136  auto group_reader = parquet_reader->RowGroup(row_group_index);
2137 
2138  std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2139 
2140  auto encoder_map = populate_encoder_map_for_import(chunks,
2141  schema,
2142  file_reader,
2143  column_dictionaries,
2144  group_reader->metadata()->num_rows(),
2146 
2147  std::vector<std::set<int>> partitions(num_threads);
2148  std::map<int, int> column_id_to_thread;
2149  for (auto& [column_id, encoder] : encoder_map) {
2150  auto thread_id = column_id % num_threads;
2151  column_id_to_thread[column_id] = thread_id;
2152  partitions[thread_id].insert(column_id);
2153  }
2154 
2155  for (auto& [column_id, encoder] : encoder_map) {
2156  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
2157  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
2158  auto parquet_column_descriptor =
2159  file_metadata->schema()->Column(parquet_column_index);
2160 
2161  // validate
2162  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
2163  CHECK(row_group_interval.start_index >= 0 &&
2164  row_group_interval.end_index < num_row_groups);
2165  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2167  parquet_column_descriptor);
2168 
2169  std::shared_ptr<parquet::ColumnReader> col_reader =
2170  group_reader->Column(parquet_column_index);
2171 
2172  row_group_reader_map.insert(
2173  {column_id,
2174  ParquetRowGroupReader(col_reader,
2175  column_descriptor,
2176  parquet_column_descriptor,
2177  shared::get_from_map(encoder_map, column_id).get(),
2178  invalid_indices_per_thread[shared::get_from_map(
2179  column_id_to_thread, column_id)],
2180  row_group_index,
2181  parquet_column_index,
2182  parquet_reader)});
2183  }
2184 
2185  std::vector<std::future<void>> futures;
2186  for (int ithread = 0; ithread < num_threads; ++ithread) {
2187  auto column_ids_for_thread = partitions[ithread];
2188  futures.emplace_back(
2189  std::async(std::launch::async, [&row_group_reader_map, column_ids_for_thread] {
2190  for (const auto column_id : column_ids_for_thread) {
2191  shared::get_from_map(row_group_reader_map, column_id)
2192  .readAndValidateRowGroup(); // reads and validate entire row group per
2193  // column
2194  }
2195  }));
2196  }
2197 
2198  for (auto& future : futures) {
2199  future.wait();
2200  }
2201 
2202  for (auto& future : futures) {
2203  future.get();
2204  }
2205 
2206  // merge/reduce invalid indices
2207  InvalidRowGroupIndices invalid_indices;
2208  for (auto& thread_invalid_indices : invalid_indices_per_thread) {
2209  invalid_indices.merge(thread_invalid_indices);
2210  }
2211 
2212  for (auto& [_, reader] : row_group_reader_map) {
2213  reader.eraseInvalidRowGroupData(
2214  invalid_indices); // removes invalid encoded data in buffers
2215  }
2216 
2217  // update the element count for each encoder
2218  for (const auto column_descriptor : schema.getLogicalColumns()) {
2219  auto column_id = column_descriptor->columnId;
2220  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
2221  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2222  invalid_indices.size());
2223  size_t updated_num_elems = db_encoder->getNumElems() +
2224  group_reader->metadata()->num_rows() -
2225  invalid_indices.size();
2226  db_encoder->setNumElems(updated_num_elems);
2227  if (column_descriptor->columnType.is_geometry()) {
2228  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2229  auto db_encoder =
2230  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
2231  db_encoder->setNumElems(updated_num_elems);
2232  }
2233  }
2234  }
2235 
2236  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2237  invalid_indices.size()};
2238 }
2239 
2241  std::vector<std::unique_ptr<TypedParquetDetectBuffer>> detect_buffers;
2242  std::vector<Chunk_NS::Chunk> column_chunks;
2243  std::vector<std::unique_ptr<RejectedRowIndices>> rejected_row_indices_per_column;
2244  std::list<ColumnDescriptor> column_descriptors;
2245 };
2246 
2247 DataPreview LazyParquetChunkLoader::previewFiles(const std::vector<std::string>& files,
2248  const size_t max_num_rows,
2249  const ForeignTable& foreign_table) {
2250  CHECK(!files.empty());
2251 
2252  auto first_file = *files.begin();
2253  auto first_file_reader = file_reader_cache_->getOrInsert(*files.begin(), file_system_);
2254 
2255  for (auto current_file_it = ++files.begin(); current_file_it != files.end();
2256  ++current_file_it) {
2257  auto file_reader = file_reader_cache_->getOrInsert(*current_file_it, file_system_);
2258  validate_equal_schema(first_file_reader, file_reader, first_file, *current_file_it);
2259  }
2260 
2261  auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2262  auto num_columns = first_file_metadata->num_columns();
2263 
2264  DataPreview data_preview;
2265  data_preview.num_rejected_rows = 0;
2266 
2267  auto current_file_it = files.begin();
2268  while (data_preview.sample_rows.size() < max_num_rows &&
2269  current_file_it != files.end()) {
2270  size_t total_num_rows = data_preview.sample_rows.size();
2271  size_t max_num_rows_to_append = max_num_rows - data_preview.sample_rows.size();
2272 
2273  // gather enough rows in row groups to produce required samples
2274  std::vector<RowGroupInterval> row_group_intervals;
2275  for (; current_file_it != files.end(); ++current_file_it) {
2276  const auto& file_path = *current_file_it;
2277  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
2278  auto file_metadata = file_reader->parquet_reader()->metadata();
2279  auto num_row_groups = file_metadata->num_row_groups();
2280  int end_row_group = 0;
2281  for (int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2282  const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2283  total_num_rows += next_num_rows;
2284  end_row_group = i;
2285  }
2286  row_group_intervals.push_back(RowGroupInterval{file_path, 0, end_row_group});
2287  }
2288 
2289  PreviewContext preview_context;
2290  for (int i = 0; i < num_columns; ++i) {
2291  auto col = first_file_metadata->schema()->Column(i);
2292  ColumnDescriptor& cd = preview_context.column_descriptors.emplace_back();
2293  auto sql_type = LazyParquetChunkLoader::suggestColumnMapping(col);
2294  cd.columnType = sql_type;
2295  cd.columnName =
2296  sql_type.is_array() ? col->path()->ToDotVector()[0] + "_array" : col->name();
2297  cd.isSystemCol = false;
2298  cd.isVirtualCol = false;
2299  cd.tableId = -1;
2300  cd.columnId = i + 1;
2301  data_preview.column_names.emplace_back(cd.columnName);
2302  data_preview.column_types.emplace_back(sql_type);
2303  preview_context.detect_buffers.push_back(
2304  std::make_unique<TypedParquetDetectBuffer>());
2305  preview_context.rejected_row_indices_per_column.push_back(
2306  std::make_unique<RejectedRowIndices>());
2307  auto& detect_buffer = preview_context.detect_buffers.back();
2308  auto& chunk = preview_context.column_chunks.emplace_back(&cd);
2309  chunk.setPinnable(false);
2310  chunk.setBuffer(detect_buffer.get());
2311  }
2312 
2313  std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2314  [&](const std::vector<int>& column_indices) {
2315  for (const auto& column_index : column_indices) {
2316  auto& chunk = preview_context.column_chunks[column_index];
2317  auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2318  auto& rejected_row_indices =
2319  preview_context.rejected_row_indices_per_column[column_index];
2320  appendRowGroups(row_group_intervals,
2321  column_index,
2322  chunk.getColumnDesc(),
2323  chunk_list,
2324  nullptr,
2325  rejected_row_indices.get(),
2326  true,
2327  max_num_rows_to_append);
2328  }
2329  };
2330 
2331  auto num_threads = foreign_storage::get_num_threads(foreign_table);
2332 
2333  std::vector<int> columns(num_columns);
2334  std::iota(columns.begin(), columns.end(), 0);
2335  auto futures =
2336  create_futures_for_workers(columns, num_threads, append_row_groups_for_column);
2337  for (auto& future : futures) {
2338  future.wait();
2339  }
2340  for (auto& future : futures) {
2341  future.get();
2342  }
2343 
2344  // merge all `rejected_row_indices_per_column`
2345  auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2346  for (int i = 0; i < num_columns; ++i) {
2347  rejected_row_indices->insert(
2348  preview_context.rejected_row_indices_per_column[i]->begin(),
2349  preview_context.rejected_row_indices_per_column[i]->end());
2350  }
2351 
2352  size_t num_rows = 0;
2353  auto buffers_it = preview_context.detect_buffers.begin();
2354  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2355  CHECK(buffers_it != preview_context.detect_buffers.end());
2356  auto& strings = buffers_it->get()->getStrings();
2357  if (i == 0) {
2358  num_rows = strings.size();
2359  } else {
2360  CHECK_EQ(num_rows, strings.size());
2361  }
2362  }
2363 
2364  size_t num_rejected_rows = rejected_row_indices->size();
2365  data_preview.num_rejected_rows += num_rejected_rows;
2366  CHECK_GE(num_rows, num_rejected_rows);
2367  auto row_count = num_rows - num_rejected_rows;
2368 
2369  auto offset_row = data_preview.sample_rows.size();
2370  data_preview.sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2371 
2372  for (size_t irow = 0, rows_appended = 0;
2373  irow < num_rows && offset_row + rows_appended < max_num_rows;
2374  ++irow) {
2375  if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2376  continue;
2377  }
2378  auto& row_data = data_preview.sample_rows[offset_row + rows_appended];
2379  row_data.resize(num_columns);
2380  auto buffers_it = preview_context.detect_buffers.begin();
2381  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2382  CHECK(buffers_it != preview_context.detect_buffers.end());
2383  auto& strings = buffers_it->get()->getStrings();
2384  row_data[i] = strings[irow];
2385  }
2386  ++rows_appended;
2387  }
2388  }
2389 
2390  // attempt to detect geo columns
2391  for (int i = 0; i < num_columns; ++i) {
2392  auto type_info = data_preview.column_types[i];
2393  if (type_info.is_string()) {
2394  auto tentative_geo_type =
2396  if (tentative_geo_type.has_value()) {
2397  data_preview.column_types[i].set_type(tentative_geo_type.value());
2398  data_preview.column_types[i].set_compression(kENCODING_NONE);
2399  }
2400  }
2401  }
2402 
2403  return data_preview;
2404 }
2405 
2406 std::list<RowGroupMetadata> LazyParquetChunkLoader::metadataScan(
2407  const std::vector<std::string>& file_paths,
2408  const ForeignTableSchema& schema,
2409  const bool do_metadata_stats_validation) {
2410  auto timer = DEBUG_TIMER(__func__);
2411  auto column_interval =
2412  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
2413  schema.getLogicalAndPhysicalColumns().back()->columnId};
2414  CHECK(!file_paths.empty());
2415 
2416  // The encoder map needs to be populated before we can start scanning rowgroups, so we
2417  // peel the first file_path out of the async loop below to perform population.
2418  const auto& first_path = *file_paths.begin();
2419  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
2420  auto max_row_group_stats = validate_parquet_metadata(
2421  first_reader->parquet_reader()->metadata(), first_path, schema);
2422  auto encoder_map = populate_encoder_map_for_metadata_scan(column_interval,
2423  schema,
2424  first_reader,
2426  do_metadata_stats_validation);
2427  const auto num_row_groups = get_parquet_table_size(first_reader).first;
2428  auto row_group_metadata = metadata_scan_rowgroup_interval(
2429  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2430 
2431  // We want each (filepath->FileReader) pair in the cache to be initialized before we
2432  // multithread so that we are not adding keys in a concurrent environment, so we add
2433  // cache entries for each path and initialize to an empty unique_ptr if the file has not
2434  // yet been opened.
2435  // Since we have already performed the first iteration, we skip it in the thread groups
2436  // so as not to process it twice.
2437  std::vector<std::string> cache_subset;
2438  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2440  cache_subset.emplace_back(*path_it);
2441  }
2442 
2443  // Iterate asyncronously over any paths beyond the first.
2444  auto table_ptr = schema.getForeignTable();
2445  CHECK(table_ptr);
2446  auto num_threads = foreign_storage::get_num_threads(*table_ptr);
2447  auto paths_per_thread = partition_for_threads(cache_subset, num_threads);
2448  std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2449  futures;
2450  for (const auto& path_group : paths_per_thread) {
2451  futures.emplace_back(std::async(
2453  [&](const auto& paths, const auto& file_reader_cache)
2454  -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2455  std::list<RowGroupMetadata> reduced_metadata;
2456  MaxRowGroupSizeStats max_row_group_stats{0, 0};
2457  for (const auto& path : paths.get()) {
2458  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
2459  validate_equal_schema(first_reader, reader, first_path, path);
2460  auto local_max_row_group_stats = validate_parquet_metadata(
2461  reader->parquet_reader()->metadata(), path, schema);
2462  if (local_max_row_group_stats.max_row_group_size >
2463  max_row_group_stats.max_row_group_size) {
2464  max_row_group_stats = local_max_row_group_stats;
2465  }
2466  const auto num_row_groups = get_parquet_table_size(reader).first;
2467  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
2468  reduced_metadata.splice(
2469  reduced_metadata.end(),
2470  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
2471  }
2472  return {reduced_metadata, max_row_group_stats};
2473  },
2474  std::ref(path_group),
2475  std::ref(*file_reader_cache_)));
2476  }
2477 
2478  // Reduce all the row_group results.
2479  for (auto& future : futures) {
2480  auto [metadata, local_max_row_group_stats] = future.get();
2481  row_group_metadata.splice(row_group_metadata.end(), metadata);
2482  if (local_max_row_group_stats.max_row_group_size >
2483  max_row_group_stats.max_row_group_size) {
2484  max_row_group_stats = local_max_row_group_stats;
2485  }
2486  }
2487 
2488  if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2490  max_row_group_stats, schema.getForeignTable()->maxFragRows);
2491  }
2492 
2493  return row_group_metadata;
2494 }
2495 
2496 } // namespace foreign_storage
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
std::shared_ptr< parquet::ColumnReader > col_reader_
std::list< ColumnDescriptor > column_descriptors
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:405
void set_compression(EncodingType c)
Definition: sqltypes.h:525
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:230
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
Definition: sqltypes.h:414
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
Definition: DataPreview.cpp:22
std::vector< Chunk_NS::Chunk > column_chunks
static constexpr int32_t kMaxNumericPrecision
Definition: sqltypes.h:45
Definition: sqltypes.h:63
std::vector< std::string > column_names
Definition: DataPreview.h:28
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan, const bool is_for_import)
std::vector< SQLTypeInfo > column_types
Definition: DataPreview.h:29
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:41
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect)
#define LOG(tag)
Definition: Logger.h:216
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
bool is_fp() const
Definition: sqltypes.h:604
HOST DEVICE int get_scale() const
Definition: sqltypes.h:409
size_t get_num_threads(const ForeignTable &table)
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
#define UNREACHABLE()
Definition: Logger.h:266
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
#define CHECK_GE(x, y)
Definition: Logger.h:235
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
MaxRowGroupSizeStats validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor *parquet_column)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:404
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
std::string to_string(char const *&&v)
int getParquetColumnIndex(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary, const RenderGroupAnalyzerMap *render_group_analyzer_map)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
const ForeignTable * getForeignTable() const
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
Definition: sqltypes.h:610
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
void set_fixed_size()
Definition: sqltypes.h:523
SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_integer() const
Definition: sqltypes.h:602
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const RenderGroupAnalyzerMap *render_group_analyzer_map, const std::string &foreign_table_name)
void set_scale(int s)
Definition: sqltypes.h:519
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
SQLTypeInfo suggest_floating_point_mapping(const parquet::ColumnDescriptor *parquet_column)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
Load row groups of data into given chunks.
parquet::arrow::FileReader * ReaderPtr
Definition: ParquetShared.h:33
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import, const bool is_for_detect)
int get_precision() const
Definition: sqltypes.h:407
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const RenderGroupAnalyzerMap *render_group_analyzer_map)
void set_comp_param(int p)
Definition: sqltypes.h:526
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
Definition: sqltypes.h:66
Definition: sqltypes.h:67
SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor *parquet_column)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
void eraseInvalidRowGroupData(const InvalidRowGroupIndices &invalid_indices)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:412
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:79
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const RenderGroupAnalyzerMap *render_group_analyzer_map)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:406
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::string get_type_name() const
Definition: sqltypes.h:528
void initializeIfEmpty(const std::string &path)
Definition: ParquetShared.h:86
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
DataPreview previewFiles(const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
Preview rows of data and column types in a set of files.
std::vector< std::unique_ptr< TypedParquetDetectBuffer > > detect_buffers
std::vector< std::unique_ptr< RejectedRowIndices > > rejected_row_indices_per_column
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:413
ThreadId thread_id()
Definition: Logger.cpp:820
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
#define CHECK(condition)
Definition: Logger.h:222
bool is_geometry() const
Definition: sqltypes.h:612
#define DEBUG_TIMER(name)
Definition: Logger.h:371
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool do_metadata_stats_validation)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
SQLTypeInfo suggest_boolean_type_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:59
SQLTypeInfo columnType
const ColumnDescriptor * getLogicalColumn(const int column_id) const
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
bool is_string() const
Definition: sqltypes.h:600
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan=false, const bool is_for_import=false, const bool is_for_detect=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
bool is_decimal() const
Definition: sqltypes.h:603
SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor *parquet_column)
std::string columnName
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:608
void validate_definition_levels(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
void set_precision(int d)
Definition: sqltypes.h:517
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)
bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value)
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:514