OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
27 
29 #include "FsiChunkUtils.h"
34 #include "ParquetDecimalEncoder.h"
40 #include "ParquetStringEncoder.h"
43 #include "ParquetTimeEncoder.h"
46 #include "Shared/misc.h"
49 
50 namespace foreign_storage {
51 
52 namespace {
53 
54 bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value) {
55  return value >= lower_bound && value <= upper_bound;
56 }
57 
58 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
59  return (parquet_column->logical_type()->is_none() &&
60  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
61  parquet_column->logical_type()->is_string();
62 }
63 
100 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
101  const parquet::schema::Node* node = parquet_column->schema_node().get();
102  if ((node->name() != "element" && node->name() != "item") ||
103  !(node->is_required() ||
104  node->is_optional())) { // ensure first innermost node is named "element"
105  // which is required by the parquet specification;
106  // however testing shows that pyarrow generates this
107  // column with the name of "item"
108  // this field must be either required or optional
109  return false;
110  }
111  node = node->parent();
112  if (!node) { // required nested structure
113  return false;
114  }
115  if (node->name() != "list" || !node->is_repeated() ||
116  !node->is_group()) { // ensure second innermost node is named "list" which is
117  // a repeated group; this is
118  // required by the parquet specification
119  return false;
120  }
121  node = node->parent();
122  if (!node) { // required nested structure
123  return false;
124  }
125  if (!node->logical_type()->is_list() ||
126  !(node->is_optional() ||
127  node->is_required())) { // ensure third outermost node has logical type LIST
128  // which is either optional or required; this is required
129  // by the parquet specification
130  return false;
131  }
132  node =
133  node->parent(); // this must now be the root node of schema which is required by
134  // FSI (lists can not be embedded into a deeper nested structure)
135  if (!node) { // required nested structure
136  return false;
137  }
138  node = node->parent();
139  if (node) { // implies the previous node was not the root node
140  return false;
141  }
142  return true;
143 }
144 
145 template <typename V, typename NullType>
146 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
147  const ColumnDescriptor* column_descriptor,
148  const parquet::ColumnDescriptor* parquet_column_descriptor,
149  AbstractBuffer* buffer) {
150  switch (parquet_column_descriptor->physical_type()) {
151  case parquet::Type::INT32:
152  return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
153  buffer, column_descriptor, parquet_column_descriptor);
154  case parquet::Type::INT64:
155  return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
156  buffer, column_descriptor, parquet_column_descriptor);
157  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
158  return std::make_shared<
160  buffer, column_descriptor, parquet_column_descriptor);
161  case parquet::Type::BYTE_ARRAY:
162  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
163  buffer, column_descriptor, parquet_column_descriptor);
164  default:
165  UNREACHABLE();
166  }
167  return {};
168 }
169 
170 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
171  const ColumnDescriptor* omnisci_column,
172  const parquet::ColumnDescriptor* parquet_column,
173  AbstractBuffer* buffer,
174  const bool is_metadata_scan_or_for_import) {
175  if (parquet_column->logical_type()->is_decimal()) {
176  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
177  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
178  omnisci_column, parquet_column, buffer);
179  }
180  CHECK(omnisci_column->columnType.get_compression() == kENCODING_FIXED);
181  if (is_metadata_scan_or_for_import) {
182  switch (omnisci_column->columnType.get_comp_param()) {
183  case 16:
184  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
185  omnisci_column, parquet_column, buffer);
186  case 32:
187  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
188  omnisci_column, parquet_column, buffer);
189  default:
190  UNREACHABLE();
191  }
192  } else {
193  switch (omnisci_column->columnType.get_comp_param()) {
194  case 16:
195  return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
196  omnisci_column, parquet_column, buffer);
197  case 32:
198  return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
199  omnisci_column, parquet_column, buffer);
200  default:
201  UNREACHABLE();
202  }
203  }
204  }
205  return {};
206 }
207 
222 template <typename V, typename T, typename U, typename NullType>
223 std::shared_ptr<ParquetEncoder>
225  AbstractBuffer* buffer,
226  const size_t omnisci_data_type_byte_size,
227  const size_t parquet_data_type_byte_size,
228  const bool is_signed) {
229  CHECK(sizeof(NullType) == omnisci_data_type_byte_size);
230  if (is_signed) {
231  return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
232  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
233  } else {
234  return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
235  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
236  }
237 }
238 
258 template <typename V, typename NullType>
260  AbstractBuffer* buffer,
261  const size_t omnisci_data_type_byte_size,
262  const size_t parquet_data_type_byte_size,
263  const int bit_width,
264  const bool is_signed) {
265  switch (bit_width) {
266  case 8:
268  int32_t,
269  uint8_t,
270  NullType>(
271  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
272  case 16:
274  int32_t,
275  uint16_t,
276  NullType>(
277  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
278  case 32:
280  int32_t,
281  uint32_t,
282  NullType>(
283  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
284  case 64:
286  int64_t,
287  uint64_t,
288  NullType>(
289  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
290  default:
291  UNREACHABLE();
292  }
293  return {};
294 }
295 
296 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
297  const ColumnDescriptor* omnisci_column,
298  const parquet::ColumnDescriptor* parquet_column,
299  AbstractBuffer* buffer,
300  const bool is_metadata_scan_or_for_import) {
301  auto column_type = omnisci_column->columnType;
302  auto physical_type = parquet_column->physical_type();
303 
304  int bit_width = -1;
305  int is_signed = false;
306  // handle the integral case with no Parquet annotation
307  if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
308  if (physical_type == parquet::Type::INT32) {
309  bit_width = 32;
310  } else if (physical_type == parquet::Type::INT64) {
311  bit_width = 64;
312  } else {
313  UNREACHABLE();
314  }
315  is_signed = true;
316  }
317  // handle the integral case with Parquet annotation
318  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
319  parquet_column->logical_type().get())) {
320  bit_width = int_logical_column->bit_width();
321  is_signed = int_logical_column->is_signed();
322  }
323 
324  if (bit_width == -1) { // no valid logical type (with or without annotation) found
325  return {};
326  }
327 
328  const size_t omnisci_data_type_byte_size = column_type.get_size();
329  const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
330 
331  switch (omnisci_data_type_byte_size) {
332  case 8:
333  CHECK(column_type.get_compression() == kENCODING_NONE);
334  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
335  buffer,
336  omnisci_data_type_byte_size,
337  parquet_data_type_byte_size,
338  bit_width,
339  is_signed);
340  case 4:
341  if (is_metadata_scan_or_for_import && column_type.get_type() == kBIGINT) {
342  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
343  buffer,
344  omnisci_data_type_byte_size,
345  parquet_data_type_byte_size,
346  bit_width,
347  is_signed);
348  }
349  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
350  buffer,
351  omnisci_data_type_byte_size,
352  parquet_data_type_byte_size,
353  bit_width,
354  is_signed);
355  case 2:
356  if (is_metadata_scan_or_for_import) {
357  switch (column_type.get_type()) {
358  case kBIGINT:
359  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
360  buffer,
361  omnisci_data_type_byte_size,
362  parquet_data_type_byte_size,
363  bit_width,
364  is_signed);
365  case kINT:
366  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
367  buffer,
368  omnisci_data_type_byte_size,
369  parquet_data_type_byte_size,
370  bit_width,
371  is_signed);
372  case kSMALLINT:
373  break;
374  default:
375  UNREACHABLE();
376  }
377  }
378  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
379  buffer,
380  omnisci_data_type_byte_size,
381  parquet_data_type_byte_size,
382  bit_width,
383  is_signed);
384  case 1:
385  if (is_metadata_scan_or_for_import) {
386  switch (column_type.get_type()) {
387  case kBIGINT:
388  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
389  buffer,
390  omnisci_data_type_byte_size,
391  parquet_data_type_byte_size,
392  bit_width,
393  is_signed);
394  case kINT:
395  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
396  buffer,
397  omnisci_data_type_byte_size,
398  parquet_data_type_byte_size,
399  bit_width,
400  is_signed);
401  case kSMALLINT:
402  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
403  buffer,
404  omnisci_data_type_byte_size,
405  parquet_data_type_byte_size,
406  bit_width,
407  is_signed);
408  case kTINYINT:
409  break;
410  default:
411  UNREACHABLE();
412  }
413  }
414  return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
415  buffer,
416  omnisci_data_type_byte_size,
417  parquet_data_type_byte_size,
418  bit_width,
419  is_signed);
420  default:
421  UNREACHABLE();
422  }
423  return {};
424 }
425 
426 std::shared_ptr<ParquetEncoder> create_parquet_floating_point_encoder(
427  const ColumnDescriptor* omnisci_column,
428  const parquet::ColumnDescriptor* parquet_column,
429  AbstractBuffer* buffer) {
430  auto column_type = omnisci_column->columnType;
431  if (!column_type.is_fp()) {
432  return {};
433  }
434  CHECK_EQ(column_type.get_compression(), kENCODING_NONE);
435  switch (column_type.get_type()) {
436  case kFLOAT:
437  switch (parquet_column->physical_type()) {
438  case parquet::Type::FLOAT:
439  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
440  buffer, omnisci_column, parquet_column);
441  case parquet::Type::DOUBLE:
442  return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
443  buffer, omnisci_column, parquet_column);
444  default:
445  UNREACHABLE();
446  }
447  case kDOUBLE:
448  CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
449  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
450  buffer, omnisci_column, parquet_column);
451  default:
452  UNREACHABLE();
453  }
454  return {};
455 }
456 
457 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
458  const ColumnDescriptor* omnisci_column,
459  const parquet::ColumnDescriptor* parquet_column,
460  AbstractBuffer* buffer) {
461  auto column_type = omnisci_column->columnType;
462  if (parquet_column->logical_type()->is_none() &&
463  !omnisci_column->columnType.is_string()) { // boolean
464  if (column_type.get_compression() == kENCODING_NONE) {
465  switch (column_type.get_type()) {
466  case kBOOLEAN:
467  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
468  buffer, omnisci_column, parquet_column);
469  default:
470  UNREACHABLE();
471  }
472  } else {
473  UNREACHABLE();
474  }
475  }
476  return {};
477 }
478 
479 template <typename V, typename T, typename NullType>
480 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder_with_types(
481  const ColumnDescriptor* omnisci_column,
482  const parquet::ColumnDescriptor* parquet_column,
483  AbstractBuffer* buffer) {
484  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
485  parquet_column->logical_type().get())) {
486  switch (timestamp_logical_type->time_unit()) {
487  case parquet::LogicalType::TimeUnit::MILLIS:
488  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
489  buffer, omnisci_column, parquet_column);
490  case parquet::LogicalType::TimeUnit::MICROS:
491  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
492  buffer, omnisci_column, parquet_column);
493  case parquet::LogicalType::TimeUnit::NANOS:
494  return std::make_shared<
496  buffer, omnisci_column, parquet_column);
497  default:
498  UNREACHABLE();
499  }
500  } else {
501  UNREACHABLE();
502  }
503  return {};
504 }
505 
506 template <typename V, typename T, typename NullType>
508  const ColumnDescriptor* omnisci_column,
509  const parquet::ColumnDescriptor* parquet_column,
510  AbstractBuffer* buffer,
511  const bool is_metadata_scan_or_for_import) {
512  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
513  parquet_column->logical_type().get())) {
514  switch (timestamp_logical_type->time_unit()) {
515  case parquet::LogicalType::TimeUnit::MILLIS:
516  if (is_metadata_scan_or_for_import) {
517  return std::make_shared<
519  buffer, omnisci_column, parquet_column);
520  }
521  return std::make_shared<
523  buffer, omnisci_column, parquet_column);
524  case parquet::LogicalType::TimeUnit::MICROS:
525  if (is_metadata_scan_or_for_import) {
526  return std::make_shared<
528  buffer, omnisci_column, parquet_column);
529  }
530  return std::make_shared<
532  buffer, omnisci_column, parquet_column);
533  case parquet::LogicalType::TimeUnit::NANOS:
534  if (is_metadata_scan_or_for_import) {
535  return std::make_shared<
537  T,
538  1000L * 1000L * 1000L,
539  NullType>>(
540  buffer, omnisci_column, parquet_column);
541  }
542  return std::make_shared<
544  buffer, omnisci_column, parquet_column);
545  default:
546  UNREACHABLE();
547  }
548  } else {
549  UNREACHABLE();
550  }
551  return {};
552 }
553 
554 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
555  const ColumnDescriptor* omnisci_column,
556  const parquet::ColumnDescriptor* parquet_column,
557  AbstractBuffer* buffer,
558  const bool is_metadata_scan_or_for_import) {
559  auto column_type = omnisci_column->columnType;
560  auto precision = column_type.get_precision();
561  if (parquet_column->logical_type()->is_timestamp()) {
562  if (column_type.get_compression() == kENCODING_NONE) {
563  if (precision == 0) {
564  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
565  omnisci_column, parquet_column, buffer);
566  } else {
567  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
568  buffer, omnisci_column, parquet_column);
569  }
570  } else if (column_type.get_compression() == kENCODING_FIXED) {
571  CHECK(column_type.get_comp_param() == 32);
572  if (is_metadata_scan_or_for_import) {
573  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
574  omnisci_column, parquet_column, buffer);
575  } else {
576  return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
577  omnisci_column, parquet_column, buffer);
578  }
579  }
580  } else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
581  if (parquet_column->physical_type() == parquet::Type::INT32) {
582  CHECK(column_type.get_compression() == kENCODING_FIXED &&
583  column_type.get_comp_param() == 32);
584  if (is_metadata_scan_or_for_import) {
585  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
586  buffer, omnisci_column, parquet_column);
587  } else {
588  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
589  buffer, omnisci_column, parquet_column);
590  }
591  } else if (parquet_column->physical_type() == parquet::Type::INT64) {
592  if (column_type.get_compression() == kENCODING_NONE) {
593  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
594  buffer, omnisci_column, parquet_column);
595  } else if (column_type.get_compression() == kENCODING_FIXED) {
596  CHECK(column_type.get_comp_param() == 32);
597  if (is_metadata_scan_or_for_import) {
598  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
599  buffer, omnisci_column, parquet_column);
600  } else {
601  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
602  buffer, omnisci_column, parquet_column);
603  }
604  }
605  } else {
606  UNREACHABLE();
607  }
608  }
609  return {};
610 }
611 
612 template <typename V, typename T, typename NullType>
613 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder_with_types(
614  const ColumnDescriptor* omnisci_column,
615  const parquet::ColumnDescriptor* parquet_column,
616  AbstractBuffer* buffer) {
617  if (auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
618  parquet_column->logical_type().get())) {
619  switch (time_logical_type->time_unit()) {
620  case parquet::LogicalType::TimeUnit::MILLIS:
621  return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
622  buffer, omnisci_column, parquet_column);
623  case parquet::LogicalType::TimeUnit::MICROS:
624  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
625  buffer, omnisci_column, parquet_column);
626  case parquet::LogicalType::TimeUnit::NANOS:
627  return std::make_shared<
629  buffer, omnisci_column, parquet_column);
630  default:
631  UNREACHABLE();
632  }
633  } else {
634  UNREACHABLE();
635  }
636  return {};
637 }
638 
639 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
640  const ColumnDescriptor* omnisci_column,
641  const parquet::ColumnDescriptor* parquet_column,
642  AbstractBuffer* buffer,
643  const bool is_metadata_scan_or_for_import) {
644  auto column_type = omnisci_column->columnType;
645  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
646  parquet_column->logical_type().get())) {
647  if (column_type.get_compression() == kENCODING_NONE) {
648  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
649  return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
650  omnisci_column, parquet_column, buffer);
651  } else {
652  return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
653  omnisci_column, parquet_column, buffer);
654  }
655  } else if (column_type.get_compression() == kENCODING_FIXED) {
656  if (is_metadata_scan_or_for_import) {
657  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
658  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
659  return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
660  omnisci_column, parquet_column, buffer);
661  } else {
662  CHECK(time_logical_column->time_unit() ==
663  parquet::LogicalType::TimeUnit::MICROS ||
664  time_logical_column->time_unit() ==
665  parquet::LogicalType::TimeUnit::NANOS);
666  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
667  return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
668  omnisci_column, parquet_column, buffer);
669  }
670  } else {
671  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
672  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
673  return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
674  omnisci_column, parquet_column, buffer);
675  } else {
676  CHECK(time_logical_column->time_unit() ==
677  parquet::LogicalType::TimeUnit::MICROS ||
678  time_logical_column->time_unit() ==
679  parquet::LogicalType::TimeUnit::NANOS);
680  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
681  return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
682  omnisci_column, parquet_column, buffer);
683  }
684  }
685  } else {
686  UNREACHABLE();
687  }
688  }
689  return {};
690 }
691 
692 std::shared_ptr<ParquetEncoder> create_parquet_date_from_timestamp_encoder(
693  const ColumnDescriptor* omnisci_column,
694  const parquet::ColumnDescriptor* parquet_column,
695  AbstractBuffer* buffer,
696  const bool is_metadata_scan_or_for_import) {
697  auto column_type = omnisci_column->columnType;
698  if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
699  CHECK(column_type.get_compression() == kENCODING_DATE_IN_DAYS);
700  if (is_metadata_scan_or_for_import) {
701  if (column_type.get_comp_param() ==
702  0) { // DATE ENCODING FIXED (32) uses comp param 0
704  int64_t,
705  int32_t>(
706  omnisci_column, parquet_column, buffer, true);
707  } else if (column_type.get_comp_param() == 16) {
709  int64_t,
710  int16_t>(
711  omnisci_column, parquet_column, buffer, true);
712  } else {
713  UNREACHABLE();
714  }
715  } else {
716  if (column_type.get_comp_param() ==
717  0) { // DATE ENCODING FIXED (32) uses comp param 0
719  int64_t,
720  int32_t>(
721  omnisci_column, parquet_column, buffer, false);
722  } else if (column_type.get_comp_param() == 16) {
724  int64_t,
725  int16_t>(
726  omnisci_column, parquet_column, buffer, false);
727  } else {
728  UNREACHABLE();
729  }
730  }
731  }
732  return {};
733 }
734 
735 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
736  const ColumnDescriptor* omnisci_column,
737  const parquet::ColumnDescriptor* parquet_column,
738  AbstractBuffer* buffer,
739  const bool is_metadata_scan_or_for_import) {
740  auto column_type = omnisci_column->columnType;
741  if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
742  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
743  if (is_metadata_scan_or_for_import) {
744  if (column_type.get_comp_param() ==
745  0) { // DATE ENCODING FIXED (32) uses comp param 0
746  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int32_t>>(
747  buffer);
748  } else if (column_type.get_comp_param() == 16) {
749  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int16_t>>(
750  buffer);
751  } else {
752  UNREACHABLE();
753  }
754  } else {
755  if (column_type.get_comp_param() ==
756  0) { // DATE ENCODING FIXED (32) uses comp param 0
757  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
758  buffer, omnisci_column, parquet_column);
759  } else if (column_type.get_comp_param() == 16) {
760  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
761  buffer, omnisci_column, parquet_column);
762  } else {
763  UNREACHABLE();
764  }
765  }
766  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
767  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int64_t>>(
768  buffer, omnisci_column, parquet_column);
769  } else {
770  UNREACHABLE();
771  }
772  }
773  return {};
774 }
775 
776 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
777  const ColumnDescriptor* omnisci_column,
778  const parquet::ColumnDescriptor* parquet_column,
779  const Chunk_NS::Chunk& chunk,
780  StringDictionary* string_dictionary,
781  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
782  bool is_for_import,
783  const bool is_for_detect) {
784  auto column_type = omnisci_column->columnType;
785  if (!is_valid_parquet_string(parquet_column) ||
786  !omnisci_column->columnType.is_string()) {
787  return {};
788  }
789  if (column_type.get_compression() == kENCODING_NONE) {
790  if (is_for_import) {
791  return std::make_shared<ParquetStringImportEncoder>(chunk.getBuffer());
792  } else {
793  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
794  chunk.getIndexBuf());
795  }
796  } else if (column_type.get_compression() == kENCODING_DICT) {
797  if (!is_for_detect) { // non-detect use case
798  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
799  std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
800  logical_chunk_metadata->sqlType = omnisci_column->columnType;
801  switch (column_type.get_size()) {
802  case 1:
803  return std::make_shared<ParquetStringEncoder<uint8_t>>(
804  chunk.getBuffer(),
805  string_dictionary,
806  is_for_import ? nullptr : logical_chunk_metadata.get());
807  case 2:
808  return std::make_shared<ParquetStringEncoder<uint16_t>>(
809  chunk.getBuffer(),
810  string_dictionary,
811  is_for_import ? nullptr : logical_chunk_metadata.get());
812  case 4:
813  return std::make_shared<ParquetStringEncoder<int32_t>>(
814  chunk.getBuffer(),
815  string_dictionary,
816  is_for_import ? nullptr : logical_chunk_metadata.get());
817  default:
818  UNREACHABLE();
819  }
820  } else { // detect use-case
821  return std::make_shared<ParquetDetectStringEncoder>(chunk.getBuffer());
822  }
823  } else {
824  UNREACHABLE();
825  }
826  return {};
827 }
828 
829 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
830  const ColumnDescriptor* omnisci_column,
831  const parquet::ColumnDescriptor* parquet_column,
832  std::list<Chunk_NS::Chunk>& chunks,
833  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
834  const RenderGroupAnalyzerMap* render_group_analyzer_map,
835  const bool is_metadata_scan,
836  const bool is_for_import) {
837  auto column_type = omnisci_column->columnType;
838  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
839  return {};
840  }
841  if (is_for_import) {
842  return std::make_shared<ParquetGeospatialImportEncoder>(chunks); // no RGAMap
843  }
844  if (is_metadata_scan) {
845  return std::make_shared<ParquetGeospatialEncoder>(render_group_analyzer_map);
846  }
847  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
848  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
849  auto& chunk_metadata_ptr = chunk_metadata.back();
850  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
851  }
852  return std::make_shared<ParquetGeospatialEncoder>(
853  parquet_column, chunks, chunk_metadata, render_group_analyzer_map);
854 }
855 
856 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
857 // `create_parquet_array_encoder` each make use of each other, so
858 // one of the two functions must have a forward declaration
859 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
860  const ColumnDescriptor* omnisci_column,
861  const parquet::ColumnDescriptor* parquet_column,
862  std::list<Chunk_NS::Chunk>& chunks,
863  StringDictionary* string_dictionary,
864  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
865  const bool is_metadata_scan,
866  const bool is_for_import,
867  const bool is_for_detect);
868 
901 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
902  const ColumnDescriptor* omnisci_column,
903  const parquet::ColumnDescriptor* parquet_column,
904  std::list<Chunk_NS::Chunk>& chunks,
905  StringDictionary* string_dictionary,
906  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
907  const RenderGroupAnalyzerMap* render_group_analyzer_map,
908  const bool is_metadata_scan = false,
909  const bool is_for_import = false,
910  const bool is_for_detect = false) {
911  CHECK(!(is_metadata_scan && is_for_import));
912  auto buffer = chunks.empty() ? nullptr : chunks.begin()->getBuffer();
913  if (auto encoder = create_parquet_geospatial_encoder(omnisci_column,
914  parquet_column,
915  chunks,
916  chunk_metadata,
917  render_group_analyzer_map,
918  is_metadata_scan,
919  is_for_import)) {
920  return encoder;
921  }
922  if (auto encoder = create_parquet_array_encoder(omnisci_column,
923  parquet_column,
924  chunks,
925  string_dictionary,
926  chunk_metadata,
927  is_metadata_scan,
928  is_for_import,
929  is_for_detect)) {
930  return encoder;
931  }
932  if (auto encoder = create_parquet_decimal_encoder(
933  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
934  return encoder;
935  }
936  if (auto encoder = create_parquet_integral_encoder(
937  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
938  return encoder;
939  }
940  if (auto encoder =
941  create_parquet_floating_point_encoder(omnisci_column, parquet_column, buffer)) {
942  return encoder;
943  }
944  if (auto encoder = create_parquet_timestamp_encoder(
945  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
946  return encoder;
947  }
948  if (auto encoder =
949  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
950  return encoder;
951  }
952  if (auto encoder = create_parquet_time_encoder(
953  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
954  return encoder;
955  }
957  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
958  return encoder;
959  }
960  if (auto encoder = create_parquet_date_encoder(
961  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
962  return encoder;
963  }
964  if (auto encoder = create_parquet_string_encoder(
965  omnisci_column,
966  parquet_column,
967  chunks.empty() ? Chunk_NS::Chunk{} : *chunks.begin(),
968  string_dictionary,
969  chunk_metadata,
970  is_for_import,
971  is_for_detect)) {
972  return encoder;
973  }
974  UNREACHABLE();
975  return {};
976 }
977 
981 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_import(
982  std::list<Chunk_NS::Chunk>& chunks,
983  const ColumnDescriptor* omnisci_column,
984  const parquet::ColumnDescriptor* parquet_column,
985  StringDictionary* string_dictionary,
986  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
987  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
988  return create_parquet_encoder(omnisci_column,
989  parquet_column,
990  chunks,
991  string_dictionary,
992  chunk_metadata,
993  render_group_analyzer_map,
994  false,
995  true);
996 }
997 
1002 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_metadata_scan(
1003  const ColumnDescriptor* omnisci_column,
1004  const parquet::ColumnDescriptor* parquet_column,
1005  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
1006  std::list<Chunk_NS::Chunk> chunks;
1007  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1008  return create_parquet_encoder(omnisci_column,
1009  parquet_column,
1010  chunks,
1011  nullptr,
1012  chunk_metadata,
1013  render_group_analyzer_map,
1014  true);
1015 }
1016 
1017 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
1018  const ColumnDescriptor* omnisci_column,
1019  const parquet::ColumnDescriptor* parquet_column,
1020  std::list<Chunk_NS::Chunk>& chunks,
1021  StringDictionary* string_dictionary,
1022  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
1023  const bool is_metadata_scan,
1024  const bool is_for_import,
1025  const bool is_for_detect) {
1026  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
1027  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
1028  return {};
1029  }
1030  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1031  get_sub_type_column_descriptor(omnisci_column);
1032  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
1033  parquet_column,
1034  chunks,
1035  string_dictionary,
1036  chunk_metadata,
1037  nullptr,
1038  is_metadata_scan,
1039  is_for_import,
1040  is_for_detect);
1041  CHECK(encoder.get());
1042  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
1043  CHECK(scalar_encoder);
1044  if (!is_for_import) {
1045  if (!is_for_detect) {
1046  if (omnisci_column->columnType.is_fixlen_array()) {
1047  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1048  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1049  scalar_encoder,
1050  omnisci_column);
1051  } else {
1052  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1053  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1054  is_metadata_scan ? nullptr : chunks.begin()->getIndexBuf(),
1055  scalar_encoder,
1056  omnisci_column);
1057  }
1058  } else { // is_for_detect
1059  encoder = std::make_shared<ParquetArrayDetectEncoder>(
1060  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1061  }
1062  } else { // is_for_import
1063  encoder = std::make_shared<ParquetArrayImportEncoder>(
1064  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1065  }
1066  return encoder;
1067 }
1068 
1070  const parquet::ParquetFileReader* reader,
1071  const int row_group_index,
1072  const int column_index,
1073  const int16_t* def_levels,
1074  const int64_t num_levels,
1075  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1076  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1077  if (!is_valid_parquet_list) {
1078  return;
1079  }
1080  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1081  reader->metadata()->RowGroup(row_group_index);
1082  auto column_metadata = group_metadata->ColumnChunk(column_index);
1083  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
1084  if (!stats->HasMinMax()) {
1085  auto find_it = std::find_if(def_levels,
1086  def_levels + num_levels,
1087  [](const int16_t def_level) { return def_level == 3; });
1088  if (find_it != def_levels + num_levels) {
1089  throw std::runtime_error(
1090  "No minimum and maximum statistic set in list column but non-null & non-empty "
1091  "array/value detected.");
1092  }
1093  }
1094 }
1095 
1097  const ColumnDescriptor* omnisci_column_descriptor,
1098  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1099  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1100  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
1101  throw std::runtime_error(
1102  "Unsupported mapping detected. Column '" + parquet_column_descriptor->name() +
1103  "' detected to be a parquet list but HeavyDB mapped column '" +
1104  omnisci_column_descriptor->columnName + "' is not an array.");
1105  }
1106  if (is_valid_parquet_list) {
1107  if (parquet_column_descriptor->max_repetition_level() != 1 ||
1108  parquet_column_descriptor->max_definition_level() != 3) {
1109  throw std::runtime_error(
1110  "Incorrect schema max repetition level detected in column '" +
1111  parquet_column_descriptor->name() +
1112  "'. Expected a max repetition level of 1 and max definition level of 3 for "
1113  "list column but column has a max "
1114  "repetition level of " +
1115  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1116  " and a max definition level of " +
1117  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1118  }
1119  } else {
1120  if (parquet_column_descriptor->max_repetition_level() != 0 ||
1121  parquet_column_descriptor->max_definition_level() != 1) {
1122  throw std::runtime_error(
1123  "Incorrect schema max repetition level detected in column '" +
1124  parquet_column_descriptor->name() +
1125  "'. Expected a max repetition level of 0 and max definition level of 1 for "
1126  "flat column but column has a max "
1127  "repetition level of " +
1128  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1129  " and a max definition level of " +
1130  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1131  }
1132  }
1133 }
1134 
1135 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
1136  const parquet::ColumnDescriptor* parquet_column,
1137  std::vector<int8_t>& values) {
1138  auto max_type_byte_size =
1139  std::max(omnisci_column->columnType.get_size(),
1140  parquet::GetTypeByteSize(parquet_column->physical_type()));
1141  size_t values_size =
1143  values.resize(values_size);
1144 }
1145 
1146 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
1147  const parquet::ColumnDescriptor* parquet_column) {
1148  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1149  parquet_column->logical_type().get())) {
1150  return omnisci_column->columnType.get_precision() ==
1151  decimal_logical_column->precision() &&
1152  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
1153  omnisci_column->columnType.is_decimal() &&
1154  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1155  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1156  }
1157  return false;
1158 }
1159 
1160 SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor* parquet_column) {
1161  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1162  parquet_column->logical_type().get())) {
1163  auto parquet_precision = decimal_logical_column->precision();
1164  auto parquet_scale = decimal_logical_column->scale();
1165  if (parquet_precision > 18) {
1167  "Parquet column \"" + parquet_column->ToString() +
1168  "\" has decimal precision of " + std::to_string(parquet_precision) +
1169  " which is too high to import, maximum precision supported is 18.");
1170  }
1171  SQLTypeInfo type;
1172  type.set_type(kDECIMAL);
1174  type.set_precision(parquet_precision);
1175  type.set_scale(parquet_scale);
1176  type.set_fixed_size();
1177  return type;
1178  }
1179  UNREACHABLE()
1180  << " a Parquet column's decimal logical type failed to be read appropriately";
1181  return {};
1182 }
1183 
1185  const parquet::ColumnDescriptor* parquet_column) {
1186  if (!omnisci_column->columnType.is_fp()) {
1187  return false;
1188  }
1189  // check if mapping is a valid coerced or non-coerced floating point mapping
1190  // with no annotation (floating point columns have no annotation in the
1191  // Parquet specification)
1192  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1193  return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1194  (parquet_column->physical_type() == parquet::Type::FLOAT &&
1195  omnisci_column->columnType.get_type() == kFLOAT);
1196  }
1197  return false;
1198 }
1199 
1201  const parquet::ColumnDescriptor* parquet_column) {
1202  SQLTypeInfo type;
1203  if (parquet_column->physical_type() == parquet::Type::FLOAT) {
1204  type.set_type(kFLOAT);
1205  } else if (parquet_column->physical_type() == parquet::Type::DOUBLE) {
1206  type.set_type(kDOUBLE);
1207  } else {
1208  UNREACHABLE();
1209  }
1211  type.set_fixed_size();
1212  return type;
1213 }
1214 
1215 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
1216  const parquet::ColumnDescriptor* parquet_column) {
1217  if (!omnisci_column->columnType.is_integer()) {
1218  return false;
1219  }
1220  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1221  parquet_column->logical_type().get())) {
1222  CHECK(omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1223  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1224  const int bits_per_byte = 8;
1225  // unsigned types are permitted to map to a wider integral type in order to avoid
1226  // precision loss
1227  const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1228  return omnisci_column->columnType.get_size() * bits_per_byte <=
1229  int_logical_column->bit_width() * bit_widening_factor;
1230  }
1231  // check if mapping is a valid coerced or non-coerced integral mapping with no
1232  // annotation
1233  if ((omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1234  omnisci_column->columnType.get_compression() == kENCODING_FIXED)) {
1235  return (parquet_column->physical_type() == parquet::Type::INT64) ||
1236  (parquet_column->physical_type() == parquet::Type::INT32 &&
1237  omnisci_column->columnType.get_size() <= 4);
1238  }
1239  return false;
1240 }
1241 
1242 SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor* parquet_column) {
1243  SQLTypeInfo type;
1245  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1246  parquet_column->logical_type().get())) {
1247  auto bit_width = int_logical_column->bit_width();
1248  if (!int_logical_column->is_signed()) {
1249  if (within_range(33, 64, bit_width)) {
1251  "Unsigned integer column \"" + parquet_column->name() +
1252  "\" in Parquet file with 64 bit-width has no supported type for ingestion "
1253  "that will not result in data loss");
1254  } else if (within_range(17, 32, bit_width)) {
1255  type.set_type(kBIGINT);
1256  } else if (within_range(9, 16, bit_width)) {
1257  type.set_type(kINT);
1258  } else if (within_range(0, 8, bit_width)) {
1259  type.set_type(kSMALLINT);
1260  }
1261  } else {
1262  if (within_range(33, 64, bit_width)) {
1263  type.set_type(kBIGINT);
1264  } else if (within_range(17, 32, bit_width)) {
1265  type.set_type(kINT);
1266  } else if (within_range(9, 16, bit_width)) {
1267  type.set_type(kSMALLINT);
1268  } else if (within_range(0, 8, bit_width)) {
1269  type.set_type(kTINYINT);
1270  }
1271  }
1272  type.set_fixed_size();
1273  return type;
1274  }
1275 
1276  CHECK(parquet_column->logical_type()->is_none());
1277  if (parquet_column->physical_type() == parquet::Type::INT32) {
1278  type.set_type(kINT);
1279  } else {
1280  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
1281  type.set_type(kBIGINT);
1282  }
1283  type.set_fixed_size();
1284  return type;
1285 }
1286 
1287 bool is_nanosecond_precision(const ColumnDescriptor* omnisci_column) {
1288  return omnisci_column->columnType.get_dimension() == 9;
1289 }
1290 
1292  const parquet::TimestampLogicalType* timestamp_logical_column) {
1293  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1294 }
1295 
1296 bool is_microsecond_precision(const ColumnDescriptor* omnisci_column) {
1297  return omnisci_column->columnType.get_dimension() == 6;
1298 }
1299 
1301  const parquet::TimestampLogicalType* timestamp_logical_column) {
1302  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1303 }
1304 
1305 bool is_millisecond_precision(const ColumnDescriptor* omnisci_column) {
1306  return omnisci_column->columnType.get_dimension() == 3;
1307 }
1308 
1310  const parquet::TimestampLogicalType* timestamp_logical_column) {
1311  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1312 }
1313 
1315  const parquet::ColumnDescriptor* parquet_column) {
1316  bool is_none_encoded_mapping =
1317  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
1318  (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1319  omnisci_column->columnType.get_type() == kBOOLEAN);
1320  return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1321 }
1322 
1324  const parquet::ColumnDescriptor* parquet_column) {
1325  SQLTypeInfo type;
1327  type.set_type(kBOOLEAN);
1328  type.set_fixed_size();
1329  return type;
1330 }
1331 
1333  const parquet::ColumnDescriptor* parquet_column) {
1334  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
1335  ((omnisci_column->columnType.get_compression() == kENCODING_NONE) ||
1336  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1337  omnisci_column->columnType.get_comp_param() == 32)))) {
1338  return false;
1339  }
1340  // check the annotated case
1341  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1342  parquet_column->logical_type().get())) {
1343  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1344  return omnisci_column->columnType.get_dimension() == 0 ||
1345  ((is_nanosecond_precision(omnisci_column) &&
1346  is_nanosecond_precision(timestamp_logical_column)) ||
1347  (is_microsecond_precision(omnisci_column) &&
1348  is_microsecond_precision(timestamp_logical_column)) ||
1349  (is_millisecond_precision(omnisci_column) &&
1350  is_millisecond_precision(timestamp_logical_column)));
1351  }
1352  if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
1353  return omnisci_column->columnType.get_dimension() == 0;
1354  }
1355  }
1356  // check the unannotated case
1357  if (parquet_column->logical_type()->is_none() &&
1358  ((parquet_column->physical_type() == parquet::Type::INT32 &&
1359  omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1360  omnisci_column->columnType.get_comp_param() == 32) ||
1361  parquet_column->physical_type() == parquet::Type::INT64)) {
1362  return true;
1363  }
1364  return false;
1365 }
1366 
1367 SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor* parquet_column) {
1368  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1369  parquet_column->logical_type().get())) {
1370  SQLTypeInfo type;
1371  type.set_type(kTIMESTAMP);
1373  if (is_nanosecond_precision(timestamp_logical_column)) {
1374  type.set_precision(9);
1375  } else if (is_microsecond_precision(timestamp_logical_column)) {
1376  type.set_precision(6);
1377  } else if (is_millisecond_precision(timestamp_logical_column)) {
1378  type.set_precision(3);
1379  }
1380  type.set_fixed_size();
1381  return type;
1382  }
1383  UNREACHABLE();
1384  return {};
1385 }
1386 
1387 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
1388  const parquet::ColumnDescriptor* parquet_column) {
1389  if (!(omnisci_column->columnType.get_type() == kTIME &&
1390  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1391  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1392  omnisci_column->columnType.get_comp_param() == 32)))) {
1393  return false;
1394  }
1395  if (parquet_column->logical_type()->is_time()) {
1396  return true;
1397  }
1398  return false;
1399 }
1400 
1401 SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor* parquet_column) {
1402  CHECK(parquet_column->logical_type()->is_time());
1403  SQLTypeInfo type;
1404  type.set_type(kTIME);
1405  type.set_compression(kENCODING_NONE);
1406  type.set_fixed_size();
1407  return type;
1408 }
1409 
1410 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
1411  const parquet::ColumnDescriptor* parquet_column) {
1412  if (!(omnisci_column->columnType.get_type() == kDATE &&
1413  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
1414  (omnisci_column->columnType.get_comp_param() ==
1415  0 // DATE ENCODING DAYS (32) specifies comp_param of 0
1416  || omnisci_column->columnType.get_comp_param() == 16)) ||
1417  omnisci_column->columnType.get_compression() ==
1418  kENCODING_NONE // for array types
1419  ))) {
1420  return false;
1421  }
1422  return parquet_column->logical_type()->is_date() ||
1423  parquet_column->logical_type()
1424  ->is_timestamp(); // to support TIMESTAMP -> DATE coercion
1425 }
1426 
1427 SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor* parquet_column) {
1428  CHECK(parquet_column->logical_type()->is_date());
1429  SQLTypeInfo type;
1430  type.set_type(kDATE);
1431  type.set_compression(kENCODING_NONE);
1432  type.set_fixed_size();
1433  return type;
1434 }
1435 
1436 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
1437  const parquet::ColumnDescriptor* parquet_column) {
1438  return is_valid_parquet_string(parquet_column) &&
1439  omnisci_column->columnType.is_string() &&
1440  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1441  omnisci_column->columnType.get_compression() == kENCODING_DICT);
1442 }
1443 
1444 SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor* parquet_column) {
1445  CHECK(is_valid_parquet_string(parquet_column));
1446  SQLTypeInfo type;
1447  type.set_type(kTEXT);
1449  type.set_comp_param(32);
1450  type.set_fixed_size();
1451  return type;
1452 }
1453 
1454 bool validate_array_mapping(const ColumnDescriptor* omnisci_column,
1455  const parquet::ColumnDescriptor* parquet_column) {
1456  if (is_valid_parquet_list_column(parquet_column) &&
1457  omnisci_column->columnType.is_array()) {
1458  auto omnisci_column_sub_type_column = get_sub_type_column_descriptor(omnisci_column);
1460  omnisci_column_sub_type_column.get(), parquet_column);
1461  }
1462  return false;
1463 }
1464 
1466  const parquet::ColumnDescriptor* parquet_column) {
1467  return is_valid_parquet_string(parquet_column) &&
1468  omnisci_column->columnType.is_geometry();
1469 }
1470 
1471 void validate_equal_schema(const parquet::arrow::FileReader* reference_file_reader,
1472  const parquet::arrow::FileReader* new_file_reader,
1473  const std::string& reference_file_path,
1474  const std::string& new_file_path) {
1475  const auto reference_num_columns =
1476  reference_file_reader->parquet_reader()->metadata()->num_columns();
1477  const auto new_num_columns =
1478  new_file_reader->parquet_reader()->metadata()->num_columns();
1479  if (reference_num_columns != new_num_columns) {
1480  throw std::runtime_error{"Parquet file \"" + new_file_path +
1481  "\" has a different schema. Please ensure that all Parquet "
1482  "files use the same schema. Reference Parquet file: \"" +
1483  reference_file_path + "\" has " +
1484  std::to_string(reference_num_columns) +
1485  " columns. New Parquet file \"" + new_file_path + "\" has " +
1486  std::to_string(new_num_columns) + " columns."};
1487  }
1488 
1489  for (int i = 0; i < reference_num_columns; i++) {
1490  validate_equal_column_descriptor(get_column_descriptor(reference_file_reader, i),
1491  get_column_descriptor(new_file_reader, i),
1492  reference_file_path,
1493  new_file_path);
1494  }
1495 }
1496 
1497 void validate_allowed_mapping(const parquet::ColumnDescriptor* parquet_column,
1498  const ColumnDescriptor* omnisci_column) {
1499  parquet::Type::type physical_type = parquet_column->physical_type();
1500  auto logical_type = parquet_column->logical_type();
1501  bool allowed_type =
1502  LazyParquetChunkLoader::isColumnMappingSupported(omnisci_column, parquet_column);
1503  if (!allowed_type) {
1504  if (logical_type->is_timestamp()) {
1505  auto timestamp_type =
1506  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
1507  CHECK(timestamp_type);
1508 
1509  if (!timestamp_type->is_adjusted_to_utc()) {
1510  LOG(WARNING) << "Non-UTC timezone specified in Parquet file for column \""
1511  << omnisci_column->columnName
1512  << "\". Only UTC timezone is currently supported.";
1513  }
1514  }
1515  std::string parquet_type;
1516  if (parquet_column->logical_type()->is_none()) {
1517  parquet_type = parquet::TypeToString(physical_type);
1518  } else {
1519  parquet_type = logical_type->ToString();
1520  }
1521  std::string omnisci_type = omnisci_column->columnType.get_type_name();
1522  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
1523  "\" to HeavyDB type \"" + omnisci_type +
1524  "\" is not allowed. Please use an appropriate column type."};
1525  }
1526 }
1527 
1528 SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor* parquet_column) {
1529  // decimal case
1530  if (parquet_column->logical_type()->is_decimal()) {
1531  return suggest_decimal_mapping(parquet_column);
1532  }
1533  // float case
1534  if (parquet_column->logical_type()->is_none() &&
1535  (parquet_column->physical_type() == parquet::Type::FLOAT ||
1536  parquet_column->physical_type() == parquet::Type::DOUBLE)) {
1537  return suggest_floating_point_mapping(parquet_column);
1538  }
1539  // integral case
1540  if ((parquet_column->logical_type()->is_none() &&
1541  (parquet_column->physical_type() == parquet::Type::INT32 ||
1542  parquet_column->physical_type() == parquet::Type::INT64)) ||
1543  parquet_column->logical_type()->is_int()) {
1544  return suggest_integral_mapping(parquet_column);
1545  }
1546  // boolean case
1547  if (parquet_column->logical_type()->is_none() &&
1548  parquet_column->physical_type() == parquet::Type::BOOLEAN) {
1549  return suggest_boolean_type_mapping(parquet_column);
1550  }
1551  // timestamp case
1552  if (parquet_column->logical_type()->is_timestamp()) {
1553  return suggest_timestamp_mapping(parquet_column);
1554  }
1555  // time case
1556  if (parquet_column->logical_type()->is_time()) {
1557  return suggest_time_mapping(parquet_column);
1558  }
1559  // date case
1560  if (parquet_column->logical_type()->is_date()) {
1561  return suggest_date_mapping(parquet_column);
1562  }
1563  // string case
1564  if (is_valid_parquet_string(parquet_column)) {
1565  return suggest_string_mapping(parquet_column);
1566  }
1567 
1568  throw ForeignStorageException("Unsupported data type detected for column: " +
1569  parquet_column->ToString());
1570 }
1571 
1573  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1574  const std::string& file_path,
1575  const ForeignTableSchema& schema) {
1576  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
1578  schema.numLogicalColumns(), file_metadata->num_columns(), file_path);
1579  }
1580 }
1581 
1582 void throw_missing_metadata_error(const int row_group_index,
1583  const int column_index,
1584  const std::string& file_path) {
1585  throw std::runtime_error{
1586  "Statistics metadata is required for all row groups. Metadata is missing for "
1587  "row group index: " +
1588  std::to_string(row_group_index) +
1589  ", column index: " + std::to_string(column_index) + ", file path: " + file_path};
1590 }
1591 
1595  std::string file_path;
1596 };
1597 
1599  const MaxRowGroupSizeStats max_row_group_stats,
1600  const int fragment_size) {
1601  auto metadata_scan_exception = MetadataScanInfeasibleFragmentSizeException{
1602  "Parquet file has a row group size that is larger than the fragment size. "
1603  "Please set the table fragment size to a number that is larger than the "
1604  "row group size. Row group index: " +
1605  std::to_string(max_row_group_stats.max_row_group_index) +
1606  ", row group size: " + std::to_string(max_row_group_stats.max_row_group_size) +
1607  ", fragment size: " + std::to_string(fragment_size) +
1608  ", file path: " + max_row_group_stats.file_path};
1609  metadata_scan_exception.min_feasible_fragment_size_ =
1610  max_row_group_stats.max_row_group_size;
1611  throw metadata_scan_exception;
1612 }
1613 
1615  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1616  const std::string& file_path,
1617  const ForeignTableSchema& schema) {
1618  auto column_it = schema.getLogicalColumns().begin();
1619  MaxRowGroupSizeStats max_row_group_stats{0, 0};
1620  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1621  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1622  try {
1623  validate_allowed_mapping(descr, *column_it);
1624  } catch (std::runtime_error& e) {
1625  std::stringstream error_message;
1626  error_message << e.what() << " Parquet column: " << descr->name()
1627  << ", HeavyDB column: " << (*column_it)->columnName
1628  << ", Parquet file: " << file_path << ".";
1629  throw std::runtime_error(error_message.str());
1630  }
1631 
1632  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
1633  auto group_metadata = file_metadata->RowGroup(r);
1634  auto num_rows = group_metadata->num_rows();
1635  if (num_rows == 0) {
1636  continue;
1637  } else if (num_rows > max_row_group_stats.max_row_group_size) {
1638  max_row_group_stats.max_row_group_size = num_rows;
1639  max_row_group_stats.max_row_group_index = r;
1640  max_row_group_stats.file_path = file_path;
1641  }
1642 
1643  auto column_chunk = group_metadata->ColumnChunk(i);
1644  bool contains_metadata = column_chunk->is_stats_set();
1645  if (contains_metadata) {
1646  auto stats = column_chunk->statistics();
1647  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1648  bool is_list = is_valid_parquet_list_column(file_metadata->schema()->Column(i));
1649  // Given a list, it is possible it has no min or max if it is comprised
1650  // only of empty lists & nulls. This can not be detected by comparing
1651  // the null count; therefore we afford list types the benefit of the
1652  // doubt in this situation.
1653  if (!(stats->HasMinMax() || is_all_nulls || is_list)) {
1654  contains_metadata = false;
1655  }
1656  }
1657 
1658  if (!contains_metadata) {
1659  throw_missing_metadata_error(r, i, file_path);
1660  }
1661  }
1662  }
1663  return max_row_group_stats;
1664 }
1665 
1667  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1668  const std::string& file_path,
1669  const ForeignTableSchema& schema) {
1670  validate_number_of_columns(file_metadata, file_path, schema);
1671  return validate_column_mapping_and_row_group_metadata(file_metadata, file_path, schema);
1672 }
1673 
1674 std::list<RowGroupMetadata> metadata_scan_rowgroup_interval(
1675  const std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1676  const RowGroupInterval& row_group_interval,
1677  const ReaderPtr& reader,
1678  const ForeignTableSchema& schema) {
1679  std::list<RowGroupMetadata> row_group_metadata;
1680  auto column_interval =
1681  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1682  schema.getLogicalAndPhysicalColumns().back()->columnId};
1683 
1684  auto file_metadata = reader->parquet_reader()->metadata();
1685  for (int row_group = row_group_interval.start_index;
1686  row_group <= row_group_interval.end_index;
1687  ++row_group) {
1688  auto& row_group_metadata_item = row_group_metadata.emplace_back();
1689  row_group_metadata_item.row_group_index = row_group;
1690  row_group_metadata_item.file_path = row_group_interval.file_path;
1691 
1692  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1693  file_metadata->RowGroup(row_group);
1694 
1695  for (int column_id = column_interval.start; column_id <= column_interval.end;
1696  column_id++) {
1697  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1698  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1699  auto encoder_map_iter =
1700  encoder_map.find(schema.getLogicalColumn(column_id)->columnId);
1701  CHECK(encoder_map_iter != encoder_map.end());
1702  try {
1703  auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1704  group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1705  row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1706  } catch (const std::exception& e) {
1707  std::stringstream error_message;
1708  error_message << e.what() << " in row group " << row_group << " of Parquet file '"
1709  << row_group_interval.file_path << "'.";
1710  throw std::runtime_error(error_message.str());
1711  }
1712  }
1713  }
1714  return row_group_metadata;
1715 }
1716 
1717 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_import(
1718  const std::map<int, Chunk_NS::Chunk> chunks,
1719  const ForeignTableSchema& schema,
1720  const ReaderPtr& reader,
1721  const std::map<int, StringDictionary*> column_dictionaries,
1722  const int64_t num_rows,
1723  const RenderGroupAnalyzerMap* render_group_analyzer_map) {
1724  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1725  auto file_metadata = reader->parquet_reader()->metadata();
1726  for (auto& [column_id, chunk] : chunks) {
1727  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1728  if (column_descriptor->isGeoPhyCol) { // skip physical columns
1729  continue;
1730  }
1731  auto parquet_column_descriptor =
1732  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1733  auto find_it = column_dictionaries.find(column_id);
1734  StringDictionary* dictionary =
1735  (find_it == column_dictionaries.end() ? nullptr : find_it->second);
1736  std::list<Chunk_NS::Chunk> chunks_for_import;
1737  chunks_for_import.push_back(chunk);
1738  if (column_descriptor->columnType.is_geometry()) {
1739  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1740  chunks_for_import.push_back(chunks.at(column_id + i + 1));
1741  }
1742  }
1743  encoder_map[column_id] = create_parquet_encoder_for_import(chunks_for_import,
1744  column_descriptor,
1745  parquet_column_descriptor,
1746  dictionary,
1747  render_group_analyzer_map);
1748 
1749  // reserve space in buffer when num-elements known ahead of time for types
1750  // of known size (for example dictionary encoded strings)
1751  auto encoder = shared::get_from_map(encoder_map, column_id);
1752  if (auto inplace_encoder = dynamic_cast<ParquetInPlaceEncoder*>(encoder.get())) {
1753  inplace_encoder->reserve(num_rows);
1754  }
1755  }
1756  return encoder_map;
1757 }
1758 
1759 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_metadata_scan(
1760  const Interval<ColumnType>& column_interval,
1761  const ForeignTableSchema& schema,
1762  const ReaderPtr& reader,
1763  const RenderGroupAnalyzerMap* render_group_analyzer_map,
1764  const bool do_metadata_stats_validation) {
1765  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1766  auto file_metadata = reader->parquet_reader()->metadata();
1767  for (int column_id = column_interval.start; column_id <= column_interval.end;
1768  column_id++) {
1769  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1770  auto parquet_column_descriptor =
1771  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1772  encoder_map[column_id] = create_parquet_encoder_for_metadata_scan(
1773  column_descriptor, parquet_column_descriptor, render_group_analyzer_map);
1774  if (!do_metadata_stats_validation) {
1775  shared::get_from_map(encoder_map, column_id)->disableMetadataStatsValidation();
1776  }
1777  column_id += column_descriptor->columnType.get_physical_cols();
1778  }
1779  return encoder_map;
1780 }
1781 } // namespace
1782 
1783 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroups(
1784  const std::vector<RowGroupInterval>& row_group_intervals,
1785  const int parquet_column_index,
1786  const ColumnDescriptor* column_descriptor,
1787  std::list<Chunk_NS::Chunk>& chunks,
1788  StringDictionary* string_dictionary,
1789  RejectedRowIndices* rejected_row_indices,
1790  const bool is_for_detect,
1791  const std::optional<int64_t> max_levels_read) {
1792  auto timer = DEBUG_TIMER(__func__);
1793  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1794  // `def_levels` and `rep_levels` below are used to store the read definition
1795  // and repetition levels of the Dremel encoding implemented by the Parquet
1796  // format
1797  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1798  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1799  std::vector<int8_t> values;
1800 
1801  CHECK(!row_group_intervals.empty());
1802  const auto& first_file_path = row_group_intervals.front().file_path;
1803 
1804  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1805  auto first_parquet_column_descriptor =
1806  get_column_descriptor(first_file_reader, parquet_column_index);
1807  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1808  auto encoder = create_parquet_encoder(column_descriptor,
1809  first_parquet_column_descriptor,
1810  chunks,
1811  string_dictionary,
1812  chunk_metadata,
1814  false,
1815  false,
1816  is_for_detect);
1817  CHECK(encoder.get());
1818 
1819  if (rejected_row_indices) { // error tracking is enabled
1820  encoder->initializeErrorTracking(column_descriptor->columnType);
1821  }
1822 
1823  bool early_exit = false;
1824  int64_t total_levels_read = 0;
1825  for (const auto& row_group_interval : row_group_intervals) {
1826  const auto& file_path = row_group_interval.file_path;
1827  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1828 
1829  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1830  CHECK(row_group_interval.start_index >= 0 &&
1831  row_group_interval.end_index < num_row_groups);
1832  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1833 
1834  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1835  auto parquet_column_descriptor =
1836  get_column_descriptor(file_reader, parquet_column_index);
1837  validate_equal_column_descriptor(first_parquet_column_descriptor,
1838  parquet_column_descriptor,
1839  first_file_path,
1840  file_path);
1841 
1843  parquet_column_descriptor);
1844  int64_t values_read = 0;
1845  for (int row_group_index = row_group_interval.start_index;
1846  row_group_index <= row_group_interval.end_index;
1847  ++row_group_index) {
1848  auto group_reader = parquet_reader->RowGroup(row_group_index);
1849  std::shared_ptr<parquet::ColumnReader> col_reader =
1850  group_reader->Column(parquet_column_index);
1851 
1852  try {
1853  while (col_reader->HasNext()) {
1854  int64_t levels_read =
1856  def_levels.data(),
1857  rep_levels.data(),
1858  reinterpret_cast<uint8_t*>(values.data()),
1859  &values_read,
1860  col_reader.get());
1861 
1862  validate_definition_levels(parquet_reader,
1863  row_group_index,
1864  parquet_column_index,
1865  def_levels.data(),
1866  levels_read,
1867  parquet_column_descriptor);
1868 
1869  if (rejected_row_indices) { // error tracking is enabled
1870  encoder->appendDataTrackErrors(def_levels.data(),
1871  rep_levels.data(),
1872  values_read,
1873  levels_read,
1874  values.data());
1875  } else { // no error tracking enabled
1876  encoder->appendData(def_levels.data(),
1877  rep_levels.data(),
1878  values_read,
1879  levels_read,
1880  values.data());
1881  }
1882 
1883  if (max_levels_read.has_value()) {
1884  total_levels_read += levels_read;
1885  if (total_levels_read >= max_levels_read.value()) {
1886  early_exit = true;
1887  break;
1888  }
1889  }
1890  }
1891  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1892  array_encoder->finalizeRowGroup();
1893  }
1894  } catch (const std::exception& error) {
1896  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1897  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1898  "', Parquet file: '" + file_path + "'");
1899  }
1900  if (max_levels_read.has_value() && early_exit) {
1901  break;
1902  }
1903  }
1904  if (max_levels_read.has_value() && early_exit) {
1905  break;
1906  }
1907  }
1908 
1909  if (rejected_row_indices) { // error tracking is enabled
1910  *rejected_row_indices = encoder->getRejectedRowIndices();
1911  }
1912  return chunk_metadata;
1913 }
1914 
1916  const parquet::ColumnDescriptor* parquet_column) {
1917  auto type = suggest_column_scalar_type(parquet_column);
1918 
1919  // array case
1920  if (is_valid_parquet_list_column(parquet_column)) {
1921  return type.get_array_type();
1922  }
1923 
1924  return type;
1925 }
1926 
1928  const ColumnDescriptor* omnisci_column,
1929  const parquet::ColumnDescriptor* parquet_column) {
1930  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1931  return true;
1932  }
1933  if (validate_array_mapping(omnisci_column, parquet_column)) {
1934  return true;
1935  }
1936  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1937  return true;
1938  }
1939  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1940  return true;
1941  }
1942  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1943  return true;
1944  }
1945  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1946  return true;
1947  }
1948  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1949  return true;
1950  }
1951  if (validate_time_mapping(omnisci_column, parquet_column)) {
1952  return true;
1953  }
1954  if (validate_date_mapping(omnisci_column, parquet_column)) {
1955  return true;
1956  }
1957  if (validate_string_mapping(omnisci_column, parquet_column)) {
1958  return true;
1959  }
1960  return false;
1961 }
1962 
1964  std::shared_ptr<arrow::fs::FileSystem> file_system,
1965  FileReaderMap* file_map,
1966  const RenderGroupAnalyzerMap* render_group_analyzer_map)
1967  : file_system_(file_system)
1968  , file_reader_cache_(file_map)
1969  , render_group_analyzer_map_{render_group_analyzer_map} {}
1970 
1971 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
1972  const std::vector<RowGroupInterval>& row_group_intervals,
1973  const int parquet_column_index,
1974  std::list<Chunk_NS::Chunk>& chunks,
1975  StringDictionary* string_dictionary,
1976  RejectedRowIndices* rejected_row_indices) {
1977  CHECK(!chunks.empty());
1978  auto const& chunk = *chunks.begin();
1979  auto column_descriptor = chunk.getColumnDesc();
1980  auto buffer = chunk.getBuffer();
1981  CHECK(buffer);
1982 
1983  try {
1984  auto metadata = appendRowGroups(row_group_intervals,
1985  parquet_column_index,
1986  column_descriptor,
1987  chunks,
1988  string_dictionary,
1989  rejected_row_indices);
1990  return metadata;
1991  } catch (const std::exception& error) {
1992  throw ForeignStorageException(error.what());
1993  }
1994 
1995  return {};
1996 }
1997 
2000  : def_levels(LazyParquetChunkLoader::batch_reader_num_elements)
2001  , rep_levels(LazyParquetChunkLoader::batch_reader_num_elements) {}
2002  std::vector<int16_t> def_levels;
2003  std::vector<int16_t> rep_levels;
2004  std::vector<int8_t> values;
2005  int64_t values_read;
2006  int64_t levels_read;
2007 };
2008 
2010  public:
2011  ParquetRowGroupReader(std::shared_ptr<parquet::ColumnReader> col_reader,
2012  const ColumnDescriptor* column_descriptor,
2013  const parquet::ColumnDescriptor* parquet_column_descriptor,
2014  ParquetEncoder* encoder,
2015  InvalidRowGroupIndices& invalid_indices,
2016  const int row_group_index,
2017  const int parquet_column_index,
2018  const parquet::ParquetFileReader* parquet_reader)
2019  : col_reader_(col_reader)
2020  , column_descriptor_(column_descriptor)
2021  , parquet_column_descriptor_(parquet_column_descriptor)
2022  , encoder_(encoder)
2023  , invalid_indices_(invalid_indices)
2024  , row_group_index_(row_group_index)
2025  , parquet_column_index_(parquet_column_index)
2026  , parquet_reader_(parquet_reader) {
2027  import_encoder = dynamic_cast<ParquetImportEncoder*>(encoder);
2029  }
2030 
2032  while (col_reader_->HasNext()) {
2033  ParquetBatchData batch_data;
2036  batch_data.levels_read =
2038  batch_data.def_levels.data(),
2039  batch_data.rep_levels.data(),
2040  reinterpret_cast<uint8_t*>(batch_data.values.data()),
2041  &batch_data.values_read,
2042  col_reader_.get());
2049  batch_data.def_levels.data(),
2050  batch_data.levels_read,
2052  import_encoder->validateAndAppendData(batch_data.def_levels.data(),
2053  batch_data.rep_levels.data(),
2054  batch_data.values_read,
2055  batch_data.levels_read,
2056  batch_data.values.data(),
2057  column_type,
2059  }
2060  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder_)) {
2061  array_encoder->finalizeRowGroup();
2062  }
2063  }
2064 
2065  void eraseInvalidRowGroupData(const InvalidRowGroupIndices& invalid_indices) {
2066  import_encoder->eraseInvalidIndicesInBuffer(invalid_indices);
2067  }
2068 
2069  private:
2070  std::shared_ptr<parquet::ColumnReader> col_reader_;
2072  const parquet::ColumnDescriptor* parquet_column_descriptor_;
2076  const int row_group_index_;
2078  const parquet::ParquetFileReader* parquet_reader_;
2079 };
2080 
2081 std::pair<size_t, size_t> LazyParquetChunkLoader::loadRowGroups(
2082  const RowGroupInterval& row_group_interval,
2083  const std::map<int, Chunk_NS::Chunk>& chunks,
2084  const ForeignTableSchema& schema,
2085  const std::map<int, StringDictionary*>& column_dictionaries,
2086  const int num_threads) {
2087  auto timer = DEBUG_TIMER(__func__);
2088 
2089  const auto& file_path = row_group_interval.file_path;
2090 
2091  // do not use caching with file-readers, open a new one for every request
2092  auto file_reader_owner = open_parquet_table(file_path, file_system_);
2093  auto file_reader = file_reader_owner.get();
2094  auto file_metadata = file_reader->parquet_reader()->metadata();
2095 
2096  validate_number_of_columns(file_metadata, file_path, schema);
2097 
2098  // check for fixed length encoded columns and indicate to the user
2099  // they should not be used
2100  for (const auto column_descriptor : schema.getLogicalColumns()) {
2101  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
2102  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2103  try {
2104  validate_allowed_mapping(parquet_column, column_descriptor);
2105  } catch (std::runtime_error& e) {
2106  std::stringstream error_message;
2107  error_message << e.what() << " Parquet column: " << parquet_column->name()
2108  << ", HeavyDB column: " << column_descriptor->columnName
2109  << ", Parquet file: " << file_path << ".";
2110  throw std::runtime_error(error_message.str());
2111  }
2112  }
2113 
2114  CHECK(row_group_interval.start_index == row_group_interval.end_index);
2115  auto row_group_index = row_group_interval.start_index;
2116  std::map<int, ParquetRowGroupReader> row_group_reader_map;
2117 
2118  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2119  auto group_reader = parquet_reader->RowGroup(row_group_index);
2120 
2121  std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2122 
2123  auto encoder_map = populate_encoder_map_for_import(chunks,
2124  schema,
2125  file_reader,
2126  column_dictionaries,
2127  group_reader->metadata()->num_rows(),
2129 
2130  std::vector<std::set<int>> partitions(num_threads);
2131  std::map<int, int> column_id_to_thread;
2132  for (auto& [column_id, encoder] : encoder_map) {
2133  auto thread_id = column_id % num_threads;
2134  column_id_to_thread[column_id] = thread_id;
2135  partitions[thread_id].insert(column_id);
2136  }
2137 
2138  for (auto& [column_id, encoder] : encoder_map) {
2139  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
2140  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
2141  auto parquet_column_descriptor =
2142  file_metadata->schema()->Column(parquet_column_index);
2143 
2144  // validate
2145  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
2146  CHECK(row_group_interval.start_index >= 0 &&
2147  row_group_interval.end_index < num_row_groups);
2148  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2150  parquet_column_descriptor);
2151 
2152  std::shared_ptr<parquet::ColumnReader> col_reader =
2153  group_reader->Column(parquet_column_index);
2154 
2155  row_group_reader_map.insert(
2156  {column_id,
2157  ParquetRowGroupReader(col_reader,
2158  column_descriptor,
2159  parquet_column_descriptor,
2160  shared::get_from_map(encoder_map, column_id).get(),
2161  invalid_indices_per_thread[shared::get_from_map(
2162  column_id_to_thread, column_id)],
2163  row_group_index,
2164  parquet_column_index,
2165  parquet_reader)});
2166  }
2167 
2168  std::vector<std::future<void>> futures;
2169  for (int ithread = 0; ithread < num_threads; ++ithread) {
2170  auto column_ids_for_thread = partitions[ithread];
2171  futures.emplace_back(
2172  std::async(std::launch::async, [&row_group_reader_map, column_ids_for_thread] {
2173  for (const auto column_id : column_ids_for_thread) {
2174  shared::get_from_map(row_group_reader_map, column_id)
2175  .readAndValidateRowGroup(); // reads and validate entire row group per
2176  // column
2177  }
2178  }));
2179  }
2180 
2181  for (auto& future : futures) {
2182  future.wait();
2183  }
2184 
2185  for (auto& future : futures) {
2186  future.get();
2187  }
2188 
2189  // merge/reduce invalid indices
2190  InvalidRowGroupIndices invalid_indices;
2191  for (auto& thread_invalid_indices : invalid_indices_per_thread) {
2192  invalid_indices.merge(thread_invalid_indices);
2193  }
2194 
2195  for (auto& [_, reader] : row_group_reader_map) {
2196  reader.eraseInvalidRowGroupData(
2197  invalid_indices); // removes invalid encoded data in buffers
2198  }
2199 
2200  // update the element count for each encoder
2201  for (const auto column_descriptor : schema.getLogicalColumns()) {
2202  auto column_id = column_descriptor->columnId;
2203  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
2204  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2205  invalid_indices.size());
2206  size_t updated_num_elems = db_encoder->getNumElems() +
2207  group_reader->metadata()->num_rows() -
2208  invalid_indices.size();
2209  db_encoder->setNumElems(updated_num_elems);
2210  if (column_descriptor->columnType.is_geometry()) {
2211  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2212  auto db_encoder =
2213  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
2214  db_encoder->setNumElems(updated_num_elems);
2215  }
2216  }
2217  }
2218 
2219  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2220  invalid_indices.size()};
2221 }
2222 
2224  std::vector<std::unique_ptr<TypedParquetDetectBuffer>> detect_buffers;
2225  std::vector<Chunk_NS::Chunk> column_chunks;
2226  std::vector<std::unique_ptr<RejectedRowIndices>> rejected_row_indices_per_column;
2227  std::list<ColumnDescriptor> column_descriptors;
2228 };
2229 
2230 DataPreview LazyParquetChunkLoader::previewFiles(const std::vector<std::string>& files,
2231  const size_t max_num_rows) {
2232  CHECK(!files.empty());
2233 
2234  auto first_file = *files.begin();
2235  auto first_file_reader = file_reader_cache_->getOrInsert(*files.begin(), file_system_);
2236 
2237  for (auto current_file_it = ++files.begin(); current_file_it != files.end();
2238  ++current_file_it) {
2239  auto file_reader = file_reader_cache_->getOrInsert(*current_file_it, file_system_);
2240  validate_equal_schema(first_file_reader, file_reader, first_file, *current_file_it);
2241  }
2242 
2243  auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2244  auto num_columns = first_file_metadata->num_columns();
2245 
2246  DataPreview data_preview;
2247  data_preview.num_rejected_rows = 0;
2248 
2249  auto current_file_it = files.begin();
2250  while (data_preview.sample_rows.size() < max_num_rows &&
2251  current_file_it != files.end()) {
2252  size_t total_num_rows = data_preview.sample_rows.size();
2253  size_t max_num_rows_to_append = max_num_rows - data_preview.sample_rows.size();
2254 
2255  // gather enough rows in row groups to produce required samples
2256  std::vector<RowGroupInterval> row_group_intervals;
2257  for (; current_file_it != files.end(); ++current_file_it) {
2258  const auto& file_path = *current_file_it;
2259  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
2260  auto file_metadata = file_reader->parquet_reader()->metadata();
2261  auto num_row_groups = file_metadata->num_row_groups();
2262  int end_row_group = 0;
2263  for (int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2264  const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2265  total_num_rows += next_num_rows;
2266  end_row_group = i;
2267  }
2268  row_group_intervals.push_back(RowGroupInterval{file_path, 0, end_row_group});
2269  }
2270 
2271  PreviewContext preview_context;
2272  for (int i = 0; i < num_columns; ++i) {
2273  auto col = first_file_metadata->schema()->Column(i);
2274  ColumnDescriptor& cd = preview_context.column_descriptors.emplace_back();
2275  auto sql_type = LazyParquetChunkLoader::suggestColumnMapping(col);
2276  cd.columnType = sql_type;
2277  cd.columnName =
2278  sql_type.is_array() ? col->path()->ToDotVector()[0] + "_array" : col->name();
2279  cd.isSystemCol = false;
2280  cd.isVirtualCol = false;
2281  cd.tableId = -1;
2282  cd.columnId = i + 1;
2283  data_preview.column_names.emplace_back(cd.columnName);
2284  data_preview.column_types.emplace_back(sql_type);
2285  preview_context.detect_buffers.push_back(
2286  std::make_unique<TypedParquetDetectBuffer>());
2287  preview_context.rejected_row_indices_per_column.push_back(
2288  std::make_unique<RejectedRowIndices>());
2289  auto& detect_buffer = preview_context.detect_buffers.back();
2290  auto& chunk = preview_context.column_chunks.emplace_back(&cd);
2291  chunk.setPinnable(false);
2292  chunk.setBuffer(detect_buffer.get());
2293  }
2294 
2295  std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2296  [&](const std::vector<int>& column_indices) {
2297  for (const auto& column_index : column_indices) {
2298  auto& chunk = preview_context.column_chunks[column_index];
2299  auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2300  auto& rejected_row_indices =
2301  preview_context.rejected_row_indices_per_column[column_index];
2302  appendRowGroups(row_group_intervals,
2303  column_index,
2304  chunk.getColumnDesc(),
2305  chunk_list,
2306  nullptr,
2307  rejected_row_indices.get(),
2308  true,
2309  max_num_rows_to_append);
2310  }
2311  };
2312 
2313  std::vector<int> columns(num_columns);
2314  std::iota(columns.begin(), columns.end(), 0);
2315  auto futures = create_futures_for_workers(
2316  columns, g_max_import_threads, append_row_groups_for_column);
2317  for (auto& future : futures) {
2318  future.wait();
2319  }
2320  for (auto& future : futures) {
2321  future.get();
2322  }
2323 
2324  // merge all `rejected_row_indices_per_column`
2325  auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2326  for (int i = 0; i < num_columns; ++i) {
2327  rejected_row_indices->insert(
2328  preview_context.rejected_row_indices_per_column[i]->begin(),
2329  preview_context.rejected_row_indices_per_column[i]->end());
2330  }
2331 
2332  size_t num_rows = 0;
2333  auto buffers_it = preview_context.detect_buffers.begin();
2334  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2335  CHECK(buffers_it != preview_context.detect_buffers.end());
2336  auto& strings = buffers_it->get()->getStrings();
2337  if (i == 0) {
2338  num_rows = strings.size();
2339  } else {
2340  CHECK_EQ(num_rows, strings.size());
2341  }
2342  }
2343 
2344  size_t num_rejected_rows = rejected_row_indices->size();
2345  data_preview.num_rejected_rows += num_rejected_rows;
2346  CHECK_GE(num_rows, num_rejected_rows);
2347  auto row_count = num_rows - num_rejected_rows;
2348 
2349  auto offset_row = data_preview.sample_rows.size();
2350  data_preview.sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2351 
2352  for (size_t irow = 0, rows_appended = 0;
2353  irow < num_rows && offset_row + rows_appended < max_num_rows;
2354  ++irow) {
2355  if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2356  continue;
2357  }
2358  auto& row_data = data_preview.sample_rows[offset_row + rows_appended];
2359  row_data.resize(num_columns);
2360  auto buffers_it = preview_context.detect_buffers.begin();
2361  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2362  CHECK(buffers_it != preview_context.detect_buffers.end());
2363  auto& strings = buffers_it->get()->getStrings();
2364  row_data[i] = strings[irow];
2365  }
2366  ++rows_appended;
2367  }
2368  }
2369 
2370  // attempt to detect geo columns
2371  for (int i = 0; i < num_columns; ++i) {
2372  auto type_info = data_preview.column_types[i];
2373  if (type_info.is_string()) {
2374  auto tentative_geo_type =
2376  if (tentative_geo_type.has_value()) {
2377  data_preview.column_types[i].set_type(tentative_geo_type.value());
2378  data_preview.column_types[i].set_compression(kENCODING_NONE);
2379  }
2380  }
2381  }
2382 
2383  return data_preview;
2384 }
2385 
2386 std::list<RowGroupMetadata> LazyParquetChunkLoader::metadataScan(
2387  const std::vector<std::string>& file_paths,
2388  const ForeignTableSchema& schema,
2389  const bool do_metadata_stats_validation) {
2390  auto timer = DEBUG_TIMER(__func__);
2391  auto column_interval =
2392  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
2393  schema.getLogicalAndPhysicalColumns().back()->columnId};
2394  CHECK(!file_paths.empty());
2395 
2396  // The encoder map needs to be populated before we can start scanning rowgroups, so we
2397  // peel the first file_path out of the async loop below to perform population.
2398  const auto& first_path = *file_paths.begin();
2399  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
2400  auto max_row_group_stats = validate_parquet_metadata(
2401  first_reader->parquet_reader()->metadata(), first_path, schema);
2402  auto encoder_map = populate_encoder_map_for_metadata_scan(column_interval,
2403  schema,
2404  first_reader,
2406  do_metadata_stats_validation);
2407  const auto num_row_groups = get_parquet_table_size(first_reader).first;
2408  auto row_group_metadata = metadata_scan_rowgroup_interval(
2409  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2410 
2411  // We want each (filepath->FileReader) pair in the cache to be initialized before we
2412  // multithread so that we are not adding keys in a concurrent environment, so we add
2413  // cache entries for each path and initialize to an empty unique_ptr if the file has not
2414  // yet been opened.
2415  // Since we have already performed the first iteration, we skip it in the thread groups
2416  // so as not to process it twice.
2417  std::vector<std::string> cache_subset;
2418  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2420  cache_subset.emplace_back(*path_it);
2421  }
2422 
2423  // Iterate asyncronously over any paths beyond the first.
2424  auto paths_per_thread = partition_for_threads(cache_subset, g_max_import_threads);
2425  std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2426  futures;
2427  for (const auto& path_group : paths_per_thread) {
2428  futures.emplace_back(std::async(
2430  [&](const auto& paths, const auto& file_reader_cache)
2431  -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2432  std::list<RowGroupMetadata> reduced_metadata;
2433  MaxRowGroupSizeStats max_row_group_stats{0, 0};
2434  for (const auto& path : paths.get()) {
2435  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
2436  validate_equal_schema(first_reader, reader, first_path, path);
2437  auto local_max_row_group_stats = validate_parquet_metadata(
2438  reader->parquet_reader()->metadata(), path, schema);
2439  if (local_max_row_group_stats.max_row_group_size >
2440  max_row_group_stats.max_row_group_size) {
2441  max_row_group_stats = local_max_row_group_stats;
2442  }
2443  const auto num_row_groups = get_parquet_table_size(reader).first;
2444  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
2445  reduced_metadata.splice(
2446  reduced_metadata.end(),
2447  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
2448  }
2449  return {reduced_metadata, max_row_group_stats};
2450  },
2451  std::ref(path_group),
2452  std::ref(*file_reader_cache_)));
2453  }
2454 
2455  // Reduce all the row_group results.
2456  for (auto& future : futures) {
2457  auto [metadata, local_max_row_group_stats] = future.get();
2458  row_group_metadata.splice(row_group_metadata.end(), metadata);
2459  if (local_max_row_group_stats.max_row_group_size >
2460  max_row_group_stats.max_row_group_size) {
2461  max_row_group_stats = local_max_row_group_stats;
2462  }
2463  }
2464 
2465  if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2467  max_row_group_stats, schema.getForeignTable()->maxFragRows);
2468  }
2469 
2470  return row_group_metadata;
2471 }
2472 
2473 } // namespace foreign_storage
DEVICE auto upper_bound(ARGS &&...args)
Definition: gpu_enabled.h:123
std::shared_ptr< parquet::ColumnReader > col_reader_
std::list< ColumnDescriptor > column_descriptors
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:330
void set_compression(EncodingType c)
Definition: sqltypes.h:440
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:148
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
Definition: DataPreview.cpp:22
std::vector< Chunk_NS::Chunk > column_chunks
Definition: sqltypes.h:49
std::vector< std::string > column_names
Definition: DataPreview.h:28
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan, const bool is_for_import)
std::vector< SQLTypeInfo > column_types
Definition: DataPreview.h:29
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:42
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import, const bool is_for_detect)
#define LOG(tag)
Definition: Logger.h:217
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
bool is_fp() const
Definition: sqltypes.h:514
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
#define UNREACHABLE()
Definition: Logger.h:267
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
#define CHECK_GE(x, y)
Definition: Logger.h:236
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
MaxRowGroupSizeStats validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
SQLTypeInfo suggest_decimal_mapping(const parquet::ColumnDescriptor *parquet_column)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
std::string to_string(char const *&&v)
int getParquetColumnIndex(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary, const RenderGroupAnalyzerMap *render_group_analyzer_map)
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
SQLTypeInfo suggest_timestamp_mapping(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
const ForeignTable * getForeignTable() const
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
SQLTypeInfo suggest_string_mapping(const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
Definition: sqltypes.h:520
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:75
void set_fixed_size()
Definition: sqltypes.h:438
SQLTypeInfo suggest_date_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_integer() const
Definition: sqltypes.h:512
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
void set_scale(int s)
Definition: sqltypes.h:434
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
SQLTypeInfo suggest_floating_point_mapping(const parquet::ColumnDescriptor *parquet_column)
DataPreview previewFiles(const std::vector< std::string > &files, const size_t max_num_rows)
Preview rows of data and column types in a set of files.
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
Load row groups of data into given chunks.
parquet::arrow::FileReader * ReaderPtr
Definition: ParquetShared.h:33
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import, const bool is_for_detect)
int get_precision() const
Definition: sqltypes.h:332
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const RenderGroupAnalyzerMap *render_group_analyzer_map)
void set_comp_param(int p)
Definition: sqltypes.h:441
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
Perform a metadata scan for the paths specified.
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const RenderGroupAnalyzerMap *render_group_analyzer_map)
Definition: sqltypes.h:52
Definition: sqltypes.h:53
SQLTypeInfo suggest_integral_mapping(const parquet::ColumnDescriptor *parquet_column)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
void eraseInvalidRowGroupData(const InvalidRowGroupIndices &invalid_indices)
DEVICE auto lower_bound(ARGS &&...args)
Definition: gpu_enabled.h:78
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
AbstractBuffer * getBuffer() const
Definition: Chunk.h:146
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:79
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const RenderGroupAnalyzerMap *render_group_analyzer_map)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::string get_type_name() const
Definition: sqltypes.h:443
void initializeIfEmpty(const std::string &path)
Definition: ParquetShared.h:86
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::set< int64_t > RejectedRowIndices
std::vector< std::unique_ptr< TypedParquetDetectBuffer > > detect_buffers
std::vector< std::unique_ptr< RejectedRowIndices > > rejected_row_indices_per_column
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
ThreadId thread_id()
Definition: Logger.cpp:817
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
#define CHECK(condition)
Definition: Logger.h:223
bool is_geometry() const
Definition: sqltypes.h:522
#define DEBUG_TIMER(name)
Definition: Logger.h:370
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool do_metadata_stats_validation)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
SQLTypeInfo suggest_boolean_type_mapping(const parquet::ColumnDescriptor *parquet_column)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:45
SQLTypeInfo columnType
const ColumnDescriptor * getLogicalColumn(const int column_id) const
std::map< int, std::unique_ptr< import_export::RenderGroupAnalyzer >> RenderGroupAnalyzerMap
bool is_string() const
Definition: sqltypes.h:510
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan=false, const bool is_for_import=false, const bool is_for_detect=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
bool is_decimal() const
Definition: sqltypes.h:513
SQLTypeInfo suggest_time_mapping(const parquet::ColumnDescriptor *parquet_column)
std::string columnName
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
size_t g_max_import_threads
Definition: Importer.cpp:106
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:518
void validate_definition_levels(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
void set_precision(int d)
Definition: sqltypes.h:432
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)
bool within_range(int64_t lower_bound, int64_t upper_bound, int64_t value)
HOST DEVICE void set_type(SQLTypes t)
Definition: sqltypes.h:429