OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
27 
32 #include "ParquetDecimalEncoder.h"
37 #include "ParquetStringEncoder.h"
40 #include "ParquetTimeEncoder.h"
43 #include "Shared/misc.h"
44 
45 namespace foreign_storage {
46 
47 namespace {
48 
49 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
50  return (parquet_column->logical_type()->is_none() &&
51  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
52  parquet_column->logical_type()->is_string();
53 }
54 
91 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
92  const parquet::schema::Node* node = parquet_column->schema_node().get();
93  if ((node->name() != "element" && node->name() != "item") ||
94  !(node->is_required() ||
95  node->is_optional())) { // ensure first innermost node is named "element"
96  // which is required by the parquet specification;
97  // however testing shows that pyarrow generates this
98  // column with the name of "item"
99  // this field must be either required or optional
100  return false;
101  }
102  node = node->parent();
103  if (!node) { // required nested structure
104  return false;
105  }
106  if (node->name() != "list" || !node->is_repeated() ||
107  !node->is_group()) { // ensure second innermost node is named "list" which is
108  // a repeated group; this is
109  // required by the parquet specification
110  return false;
111  }
112  node = node->parent();
113  if (!node) { // required nested structure
114  return false;
115  }
116  if (!node->logical_type()->is_list() ||
117  !(node->is_optional() ||
118  node->is_required())) { // ensure third outermost node has logical type LIST
119  // which is either optional or required; this is required
120  // by the parquet specification
121  return false;
122  }
123  node =
124  node->parent(); // this must now be the root node of schema which is required by
125  // FSI (lists can not be embedded into a deeper nested structure)
126  if (!node) { // required nested structure
127  return false;
128  }
129  node = node->parent();
130  if (node) { // implies the previous node was not the root node
131  return false;
132  }
133  return true;
134 }
135 
136 template <typename V, typename NullType>
137 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
138  const ColumnDescriptor* column_descriptor,
139  const parquet::ColumnDescriptor* parquet_column_descriptor,
140  AbstractBuffer* buffer) {
141  switch (parquet_column_descriptor->physical_type()) {
142  case parquet::Type::INT32:
143  return std::make_shared<ParquetDecimalEncoder<V, int32_t, NullType>>(
144  buffer, column_descriptor, parquet_column_descriptor);
145  case parquet::Type::INT64:
146  return std::make_shared<ParquetDecimalEncoder<V, int64_t, NullType>>(
147  buffer, column_descriptor, parquet_column_descriptor);
148  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
149  return std::make_shared<
151  buffer, column_descriptor, parquet_column_descriptor);
152  case parquet::Type::BYTE_ARRAY:
153  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray, NullType>>(
154  buffer, column_descriptor, parquet_column_descriptor);
155  default:
156  UNREACHABLE();
157  }
158  return {};
159 }
160 
161 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
162  const ColumnDescriptor* omnisci_column,
163  const parquet::ColumnDescriptor* parquet_column,
164  AbstractBuffer* buffer,
165  const bool is_metadata_scan_or_for_import) {
166  if (parquet_column->logical_type()->is_decimal()) {
167  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
168  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int64_t>(
169  omnisci_column, parquet_column, buffer);
170  }
171  CHECK(omnisci_column->columnType.get_compression() == kENCODING_FIXED);
172  if (is_metadata_scan_or_for_import) {
173  switch (omnisci_column->columnType.get_comp_param()) {
174  case 16:
175  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int16_t>(
176  omnisci_column, parquet_column, buffer);
177  case 32:
178  return create_parquet_decimal_encoder_with_omnisci_type<int64_t, int32_t>(
179  omnisci_column, parquet_column, buffer);
180  default:
181  UNREACHABLE();
182  }
183  } else {
184  switch (omnisci_column->columnType.get_comp_param()) {
185  case 16:
186  return create_parquet_decimal_encoder_with_omnisci_type<int16_t, int16_t>(
187  omnisci_column, parquet_column, buffer);
188  case 32:
189  return create_parquet_decimal_encoder_with_omnisci_type<int32_t, int32_t>(
190  omnisci_column, parquet_column, buffer);
191  default:
192  UNREACHABLE();
193  }
194  }
195  }
196  return {};
197 }
198 
213 template <typename V, typename T, typename U, typename NullType>
214 std::shared_ptr<ParquetEncoder>
216  AbstractBuffer* buffer,
217  const size_t omnisci_data_type_byte_size,
218  const size_t parquet_data_type_byte_size,
219  const bool is_signed) {
220  CHECK(sizeof(NullType) == omnisci_data_type_byte_size);
221  if (is_signed) {
222  return std::make_shared<ParquetFixedLengthEncoder<V, T, NullType>>(
223  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
224  } else {
225  return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U, NullType>>(
226  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
227  }
228 }
229 
249 template <typename V, typename NullType>
251  AbstractBuffer* buffer,
252  const size_t omnisci_data_type_byte_size,
253  const size_t parquet_data_type_byte_size,
254  const int bit_width,
255  const bool is_signed) {
256  switch (bit_width) {
257  case 8:
259  int32_t,
260  uint8_t,
261  NullType>(
262  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
263  case 16:
265  int32_t,
266  uint16_t,
267  NullType>(
268  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
269  case 32:
271  int32_t,
272  uint32_t,
273  NullType>(
274  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
275  case 64:
277  int64_t,
278  uint64_t,
279  NullType>(
280  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
281  default:
282  UNREACHABLE();
283  }
284  return {};
285 }
286 
287 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
288  const ColumnDescriptor* omnisci_column,
289  const parquet::ColumnDescriptor* parquet_column,
290  AbstractBuffer* buffer,
291  const bool is_metadata_scan_or_for_import) {
292  auto column_type = omnisci_column->columnType;
293  auto physical_type = parquet_column->physical_type();
294 
295  int bit_width = -1;
296  int is_signed = false;
297  // handle the integral case with no Parquet annotation
298  if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
299  if (physical_type == parquet::Type::INT32) {
300  bit_width = 32;
301  } else if (physical_type == parquet::Type::INT64) {
302  bit_width = 64;
303  } else {
304  UNREACHABLE();
305  }
306  is_signed = true;
307  }
308  // handle the integral case with Parquet annotation
309  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
310  parquet_column->logical_type().get())) {
311  bit_width = int_logical_column->bit_width();
312  is_signed = int_logical_column->is_signed();
313  }
314 
315  if (bit_width == -1) { // no valid logical type (with or without annotation) found
316  return {};
317  }
318 
319  const size_t omnisci_data_type_byte_size = column_type.get_size();
320  const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
321 
322  switch (omnisci_data_type_byte_size) {
323  case 8:
324  CHECK(column_type.get_compression() == kENCODING_NONE);
325  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int64_t>(
326  buffer,
327  omnisci_data_type_byte_size,
328  parquet_data_type_byte_size,
329  bit_width,
330  is_signed);
331  case 4:
332  if (is_metadata_scan_or_for_import && column_type.get_type() == kBIGINT) {
333  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int32_t>(
334  buffer,
335  omnisci_data_type_byte_size,
336  parquet_data_type_byte_size,
337  bit_width,
338  is_signed);
339  }
340  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int32_t>(
341  buffer,
342  omnisci_data_type_byte_size,
343  parquet_data_type_byte_size,
344  bit_width,
345  is_signed);
346  case 2:
347  if (is_metadata_scan_or_for_import) {
348  switch (column_type.get_type()) {
349  case kBIGINT:
350  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int16_t>(
351  buffer,
352  omnisci_data_type_byte_size,
353  parquet_data_type_byte_size,
354  bit_width,
355  is_signed);
356  case kINT:
357  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int16_t>(
358  buffer,
359  omnisci_data_type_byte_size,
360  parquet_data_type_byte_size,
361  bit_width,
362  is_signed);
363  case kSMALLINT:
364  break;
365  default:
366  UNREACHABLE();
367  }
368  }
369  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int16_t>(
370  buffer,
371  omnisci_data_type_byte_size,
372  parquet_data_type_byte_size,
373  bit_width,
374  is_signed);
375  case 1:
376  if (is_metadata_scan_or_for_import) {
377  switch (column_type.get_type()) {
378  case kBIGINT:
379  return create_parquet_integral_encoder_with_omnisci_type<int64_t, int8_t>(
380  buffer,
381  omnisci_data_type_byte_size,
382  parquet_data_type_byte_size,
383  bit_width,
384  is_signed);
385  case kINT:
386  return create_parquet_integral_encoder_with_omnisci_type<int32_t, int8_t>(
387  buffer,
388  omnisci_data_type_byte_size,
389  parquet_data_type_byte_size,
390  bit_width,
391  is_signed);
392  case kSMALLINT:
393  return create_parquet_integral_encoder_with_omnisci_type<int16_t, int8_t>(
394  buffer,
395  omnisci_data_type_byte_size,
396  parquet_data_type_byte_size,
397  bit_width,
398  is_signed);
399  case kTINYINT:
400  break;
401  default:
402  UNREACHABLE();
403  }
404  }
405  return create_parquet_integral_encoder_with_omnisci_type<int8_t, int8_t>(
406  buffer,
407  omnisci_data_type_byte_size,
408  parquet_data_type_byte_size,
409  bit_width,
410  is_signed);
411  default:
412  UNREACHABLE();
413  }
414  return {};
415 }
416 
417 std::shared_ptr<ParquetEncoder> create_parquet_floating_point_encoder(
418  const ColumnDescriptor* omnisci_column,
419  const parquet::ColumnDescriptor* parquet_column,
420  AbstractBuffer* buffer) {
421  auto column_type = omnisci_column->columnType;
422  if (!column_type.is_fp()) {
423  return {};
424  }
425  CHECK_EQ(column_type.get_compression(), kENCODING_NONE);
426  switch (column_type.get_type()) {
427  case kFLOAT:
428  switch (parquet_column->physical_type()) {
430  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
431  buffer, omnisci_column, parquet_column);
433  return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
434  buffer, omnisci_column, parquet_column);
435  default:
436  UNREACHABLE();
437  }
438  case kDOUBLE:
439  CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
440  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
441  buffer, omnisci_column, parquet_column);
442  default:
443  UNREACHABLE();
444  }
445  return {};
446 }
447 
448 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
449  const ColumnDescriptor* omnisci_column,
450  const parquet::ColumnDescriptor* parquet_column,
451  AbstractBuffer* buffer) {
452  auto column_type = omnisci_column->columnType;
453  if (parquet_column->logical_type()->is_none() &&
454  !omnisci_column->columnType.is_string()) { // boolean
455  if (column_type.get_compression() == kENCODING_NONE) {
456  switch (column_type.get_type()) {
457  case kBOOLEAN:
458  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
459  buffer, omnisci_column, parquet_column);
460  default:
461  UNREACHABLE();
462  }
463  } else {
464  UNREACHABLE();
465  }
466  }
467  return {};
468 }
469 
470 template <typename V, typename T, typename NullType>
471 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder_with_types(
472  const ColumnDescriptor* omnisci_column,
473  const parquet::ColumnDescriptor* parquet_column,
474  AbstractBuffer* buffer) {
475  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
476  parquet_column->logical_type().get())) {
477  switch (timestamp_logical_type->time_unit()) {
478  case parquet::LogicalType::TimeUnit::MILLIS:
479  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L, NullType>>(
480  buffer, omnisci_column, parquet_column);
481  case parquet::LogicalType::TimeUnit::MICROS:
482  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L, NullType>>(
483  buffer, omnisci_column, parquet_column);
484  case parquet::LogicalType::TimeUnit::NANOS:
485  return std::make_shared<
487  buffer, omnisci_column, parquet_column);
488  default:
489  UNREACHABLE();
490  }
491  } else {
492  UNREACHABLE();
493  }
494  return {};
495 }
496 
497 template <typename V, typename T, typename NullType>
499  const ColumnDescriptor* omnisci_column,
500  const parquet::ColumnDescriptor* parquet_column,
501  AbstractBuffer* buffer,
502  const bool is_metadata_scan_or_for_import) {
503  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
504  parquet_column->logical_type().get())) {
505  switch (timestamp_logical_type->time_unit()) {
506  case parquet::LogicalType::TimeUnit::MILLIS:
507  if (is_metadata_scan_or_for_import) {
508  return std::make_shared<ParquetDateFromTimestampEncoder<V, T, 1000L, NullType>>(
509  buffer, omnisci_column, parquet_column);
510  }
511  return std::make_shared<
513  buffer, omnisci_column, parquet_column);
514  case parquet::LogicalType::TimeUnit::MICROS:
515  if (is_metadata_scan_or_for_import) {
516  return std::make_shared<
518  buffer, omnisci_column, parquet_column);
519  }
520  return std::make_shared<
522  buffer, omnisci_column, parquet_column);
523  case parquet::LogicalType::TimeUnit::NANOS:
524  if (is_metadata_scan_or_for_import) {
525  return std::make_shared<
527  buffer, omnisci_column, parquet_column);
528  }
529  return std::make_shared<
531  T,
532  1000L * 1000L * 1000L * kSecsPerDay,
533  NullType>>(
534  buffer, omnisci_column, parquet_column);
535  default:
536  UNREACHABLE();
537  }
538  } else {
539  UNREACHABLE();
540  }
541  return {};
542 }
543 
544 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
545  const ColumnDescriptor* omnisci_column,
546  const parquet::ColumnDescriptor* parquet_column,
547  AbstractBuffer* buffer,
548  const bool is_metadata_scan_or_for_import) {
549  auto column_type = omnisci_column->columnType;
550  auto precision = column_type.get_precision();
551  if (parquet_column->logical_type()->is_timestamp()) {
552  if (column_type.get_compression() == kENCODING_NONE) {
553  if (precision == 0) {
554  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int64_t>(
555  omnisci_column, parquet_column, buffer);
556  } else {
557  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
558  buffer, omnisci_column, parquet_column);
559  }
560  } else if (column_type.get_compression() == kENCODING_FIXED) {
561  CHECK(column_type.get_comp_param() == 32);
562  if (is_metadata_scan_or_for_import) {
563  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t, int32_t>(
564  omnisci_column, parquet_column, buffer);
565  } else {
566  return create_parquet_timestamp_encoder_with_types<int32_t, int64_t, int32_t>(
567  omnisci_column, parquet_column, buffer);
568  }
569  }
570  } else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
571  if (parquet_column->physical_type() == parquet::Type::INT32) {
572  CHECK(column_type.get_compression() == kENCODING_FIXED &&
573  column_type.get_comp_param() == 32);
574  if (is_metadata_scan_or_for_import) {
575  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t, int32_t>>(
576  buffer, omnisci_column, parquet_column);
577  } else {
578  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t, int32_t>>(
579  buffer, omnisci_column, parquet_column);
580  }
581  } else if (parquet_column->physical_type() == parquet::Type::INT64) {
582  if (column_type.get_compression() == kENCODING_NONE) {
583  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int64_t>>(
584  buffer, omnisci_column, parquet_column);
585  } else if (column_type.get_compression() == kENCODING_FIXED) {
586  CHECK(column_type.get_comp_param() == 32);
587  if (is_metadata_scan_or_for_import) {
588  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t, int32_t>>(
589  buffer, omnisci_column, parquet_column);
590  } else {
591  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t, int32_t>>(
592  buffer, omnisci_column, parquet_column);
593  }
594  }
595  } else {
596  UNREACHABLE();
597  }
598  }
599  return {};
600 }
601 
602 template <typename V, typename T, typename NullType>
603 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder_with_types(
604  const ColumnDescriptor* omnisci_column,
605  const parquet::ColumnDescriptor* parquet_column,
606  AbstractBuffer* buffer) {
607  if (auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
608  parquet_column->logical_type().get())) {
609  switch (time_logical_type->time_unit()) {
610  case parquet::LogicalType::TimeUnit::MILLIS:
611  return std::make_shared<ParquetTimeEncoder<V, T, 1000L, NullType>>(
612  buffer, omnisci_column, parquet_column);
613  case parquet::LogicalType::TimeUnit::MICROS:
614  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L, NullType>>(
615  buffer, omnisci_column, parquet_column);
616  case parquet::LogicalType::TimeUnit::NANOS:
617  return std::make_shared<
619  buffer, omnisci_column, parquet_column);
620  default:
621  UNREACHABLE();
622  }
623  } else {
624  UNREACHABLE();
625  }
626  return {};
627 }
628 
629 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
630  const ColumnDescriptor* omnisci_column,
631  const parquet::ColumnDescriptor* parquet_column,
632  AbstractBuffer* buffer,
633  const bool is_metadata_scan_or_for_import) {
634  auto column_type = omnisci_column->columnType;
635  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
636  parquet_column->logical_type().get())) {
637  if (column_type.get_compression() == kENCODING_NONE) {
638  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
639  return create_parquet_time_encoder_with_types<int64_t, int32_t, int64_t>(
640  omnisci_column, parquet_column, buffer);
641  } else {
642  return create_parquet_time_encoder_with_types<int64_t, int64_t, int64_t>(
643  omnisci_column, parquet_column, buffer);
644  }
645  } else if (column_type.get_compression() == kENCODING_FIXED) {
646  if (is_metadata_scan_or_for_import) {
647  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
648  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
649  return create_parquet_time_encoder_with_types<int64_t, int32_t, int32_t>(
650  omnisci_column, parquet_column, buffer);
651  } else {
652  CHECK(time_logical_column->time_unit() ==
653  parquet::LogicalType::TimeUnit::MICROS ||
654  time_logical_column->time_unit() ==
655  parquet::LogicalType::TimeUnit::NANOS);
656  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
657  return create_parquet_time_encoder_with_types<int64_t, int64_t, int32_t>(
658  omnisci_column, parquet_column, buffer);
659  }
660  } else {
661  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
662  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
663  return create_parquet_time_encoder_with_types<int32_t, int32_t, int32_t>(
664  omnisci_column, parquet_column, buffer);
665  } else {
666  CHECK(time_logical_column->time_unit() ==
667  parquet::LogicalType::TimeUnit::MICROS ||
668  time_logical_column->time_unit() ==
669  parquet::LogicalType::TimeUnit::NANOS);
670  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
671  return create_parquet_time_encoder_with_types<int32_t, int64_t, int32_t>(
672  omnisci_column, parquet_column, buffer);
673  }
674  }
675  } else {
676  UNREACHABLE();
677  }
678  }
679  return {};
680 }
681 
682 std::shared_ptr<ParquetEncoder> create_parquet_date_from_timestamp_encoder(
683  const ColumnDescriptor* omnisci_column,
684  const parquet::ColumnDescriptor* parquet_column,
685  AbstractBuffer* buffer,
686  const bool is_metadata_scan_or_for_import) {
687  auto column_type = omnisci_column->columnType;
688  if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
689  CHECK(column_type.get_compression() == kENCODING_DATE_IN_DAYS);
690  if (is_metadata_scan_or_for_import) {
691  if (column_type.get_comp_param() ==
692  0) { // DATE ENCODING FIXED (32) uses comp param 0
694  int64_t,
695  int32_t>(
696  omnisci_column, parquet_column, buffer, true);
697  } else if (column_type.get_comp_param() == 16) {
699  int64_t,
700  int16_t>(
701  omnisci_column, parquet_column, buffer, true);
702  } else {
703  UNREACHABLE();
704  }
705  } else {
706  if (column_type.get_comp_param() ==
707  0) { // DATE ENCODING FIXED (32) uses comp param 0
709  int64_t,
710  int32_t>(
711  omnisci_column, parquet_column, buffer, false);
712  } else if (column_type.get_comp_param() == 16) {
714  int64_t,
715  int16_t>(
716  omnisci_column, parquet_column, buffer, false);
717  } else {
718  UNREACHABLE();
719  }
720  }
721  }
722  return {};
723 }
724 
725 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
726  const ColumnDescriptor* omnisci_column,
727  const parquet::ColumnDescriptor* parquet_column,
728  AbstractBuffer* buffer,
729  const bool is_metadata_scan_or_for_import) {
730  auto column_type = omnisci_column->columnType;
731  if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
732  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
733  if (is_metadata_scan_or_for_import) {
734  if (column_type.get_comp_param() ==
735  0) { // DATE ENCODING FIXED (32) uses comp param 0
736  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int32_t>>(
737  buffer);
738  } else if (column_type.get_comp_param() == 16) {
739  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int16_t>>(
740  buffer);
741  } else {
742  UNREACHABLE();
743  }
744  } else {
745  if (column_type.get_comp_param() ==
746  0) { // DATE ENCODING FIXED (32) uses comp param 0
747  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
748  buffer, omnisci_column, parquet_column);
749  } else if (column_type.get_comp_param() == 16) {
750  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
751  buffer, omnisci_column, parquet_column);
752  } else {
753  UNREACHABLE();
754  }
755  }
756  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
757  return std::make_shared<ParquetDateInSecondsEncoder</*NullType=*/int64_t>>(
758  buffer, omnisci_column, parquet_column);
759  } else {
760  UNREACHABLE();
761  }
762  }
763  return {};
764 }
765 
766 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
767  const ColumnDescriptor* omnisci_column,
768  const parquet::ColumnDescriptor* parquet_column,
769  const Chunk_NS::Chunk& chunk,
770  StringDictionary* string_dictionary,
771  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
772  bool is_for_import) {
773  auto column_type = omnisci_column->columnType;
774  if (!is_valid_parquet_string(parquet_column) ||
775  !omnisci_column->columnType.is_string()) {
776  return {};
777  }
778  if (column_type.get_compression() == kENCODING_NONE) {
779  if (is_for_import) {
780  return std::make_shared<ParquetStringImportEncoder>(chunk.getBuffer());
781  } else {
782  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
783  chunk.getIndexBuf());
784  }
785  } else if (column_type.get_compression() == kENCODING_DICT) {
786  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
787  std::unique_ptr<ChunkMetadata>& logical_chunk_metadata = chunk_metadata.back();
788  logical_chunk_metadata->sqlType = omnisci_column->columnType;
789  switch (column_type.get_size()) {
790  case 1:
791  return std::make_shared<ParquetStringEncoder<uint8_t>>(
792  chunk.getBuffer(),
793  string_dictionary,
794  is_for_import ? nullptr : logical_chunk_metadata.get());
795  case 2:
796  return std::make_shared<ParquetStringEncoder<uint16_t>>(
797  chunk.getBuffer(),
798  string_dictionary,
799  is_for_import ? nullptr : logical_chunk_metadata.get());
800  case 4:
801  return std::make_shared<ParquetStringEncoder<int32_t>>(
802  chunk.getBuffer(),
803  string_dictionary,
804  is_for_import ? nullptr : logical_chunk_metadata.get());
805  default:
806  UNREACHABLE();
807  }
808  } else {
809  UNREACHABLE();
810  }
811  return {};
812 }
813 
814 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
815  const ColumnDescriptor* omnisci_column,
816  const parquet::ColumnDescriptor* parquet_column,
817  std::list<Chunk_NS::Chunk>& chunks,
818  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
819  const bool is_metadata_scan,
820  const bool is_for_import) {
821  auto column_type = omnisci_column->columnType;
822  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
823  return {};
824  }
825  if (is_for_import) {
826  return std::make_shared<ParquetGeospatialImportEncoder>(chunks);
827  }
828  if (is_metadata_scan) {
829  return std::make_shared<ParquetGeospatialEncoder>();
830  }
831  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
832  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
833  auto& chunk_metadata_ptr = chunk_metadata.back();
834  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
835  }
836  return std::make_shared<ParquetGeospatialEncoder>(
837  parquet_column, chunks, chunk_metadata);
838 }
839 
840 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
841 // `create_parquet_array_encoder` each make use of each other, so
842 // one of the two functions must have a forward declaration
843 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
844  const ColumnDescriptor* omnisci_column,
845  const parquet::ColumnDescriptor* parquet_column,
846  std::list<Chunk_NS::Chunk>& chunks,
847  StringDictionary* string_dictionary,
848  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
849  const bool is_metadata_scan,
850  const bool is_for_import);
851 
884 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
885  const ColumnDescriptor* omnisci_column,
886  const parquet::ColumnDescriptor* parquet_column,
887  std::list<Chunk_NS::Chunk>& chunks,
888  StringDictionary* string_dictionary,
889  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
890  const bool is_metadata_scan = false,
891  const bool is_for_import = false) {
892  CHECK(!(is_metadata_scan && is_for_import));
893  auto buffer = chunks.empty() ? nullptr : chunks.begin()->getBuffer();
894  if (auto encoder = create_parquet_geospatial_encoder(omnisci_column,
895  parquet_column,
896  chunks,
897  chunk_metadata,
898  is_metadata_scan,
899  is_for_import)) {
900  return encoder;
901  }
902  if (auto encoder = create_parquet_array_encoder(omnisci_column,
903  parquet_column,
904  chunks,
905  string_dictionary,
906  chunk_metadata,
907  is_metadata_scan,
908  is_for_import)) {
909  return encoder;
910  }
911  if (auto encoder = create_parquet_decimal_encoder(
912  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
913  return encoder;
914  }
915  if (auto encoder = create_parquet_integral_encoder(
916  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
917  return encoder;
918  }
919  if (auto encoder =
920  create_parquet_floating_point_encoder(omnisci_column, parquet_column, buffer)) {
921  return encoder;
922  }
923  if (auto encoder = create_parquet_timestamp_encoder(
924  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
925  return encoder;
926  }
927  if (auto encoder =
928  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
929  return encoder;
930  }
931  if (auto encoder = create_parquet_time_encoder(
932  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
933  return encoder;
934  }
936  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
937  return encoder;
938  }
939  if (auto encoder = create_parquet_date_encoder(
940  omnisci_column, parquet_column, buffer, is_metadata_scan || is_for_import)) {
941  return encoder;
942  }
943  if (auto encoder = create_parquet_string_encoder(
944  omnisci_column,
945  parquet_column,
946  chunks.empty() ? Chunk_NS::Chunk{} : *chunks.begin(),
947  string_dictionary,
948  chunk_metadata,
949  is_for_import)) {
950  return encoder;
951  }
952  UNREACHABLE();
953  return {};
954 }
955 
959 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_import(
960  std::list<Chunk_NS::Chunk>& chunks,
961  const ColumnDescriptor* omnisci_column,
962  const parquet::ColumnDescriptor* parquet_column,
963  StringDictionary* string_dictionary) {
964  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
965  return create_parquet_encoder(omnisci_column,
966  parquet_column,
967  chunks,
968  string_dictionary,
969  chunk_metadata,
970  false,
971  true);
972 }
973 
978 std::shared_ptr<ParquetEncoder> create_parquet_encoder_for_metadata_scan(
979  const ColumnDescriptor* omnisci_column,
980  const parquet::ColumnDescriptor* parquet_column) {
981  std::list<Chunk_NS::Chunk> chunks;
982  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
983  return create_parquet_encoder(
984  omnisci_column, parquet_column, chunks, nullptr, chunk_metadata, true);
985 }
986 
987 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
988  const ColumnDescriptor* omnisci_column,
989  const parquet::ColumnDescriptor* parquet_column,
990  std::list<Chunk_NS::Chunk>& chunks,
991  StringDictionary* string_dictionary,
992  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
993  const bool is_metadata_scan,
994  const bool is_for_import) {
995  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
996  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
997  return {};
998  }
999  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
1000  get_sub_type_column_descriptor(omnisci_column);
1001  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
1002  parquet_column,
1003  chunks,
1004  string_dictionary,
1005  chunk_metadata,
1006  is_metadata_scan,
1007  is_for_import);
1008  CHECK(encoder.get());
1009  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
1010  CHECK(scalar_encoder);
1011  if (!is_for_import) {
1012  if (omnisci_column->columnType.is_fixlen_array()) {
1013  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
1014  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1015  scalar_encoder,
1016  omnisci_column);
1017  } else {
1018  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
1019  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
1020  is_metadata_scan ? nullptr : chunks.begin()->getIndexBuf(),
1021  scalar_encoder,
1022  omnisci_column);
1023  }
1024  } else { // is_for_import
1025  encoder = std::make_shared<ParquetArrayImportEncoder>(
1026  chunks.begin()->getBuffer(), scalar_encoder, omnisci_column);
1027  }
1028  return encoder;
1029 }
1030 
1032  const parquet::ParquetFileReader* reader,
1033  const int row_group_index,
1034  const int column_index,
1035  const int16_t* def_levels,
1036  const int64_t num_levels,
1037  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1038  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1039  if (!is_valid_parquet_list) {
1040  return;
1041  }
1042  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1043  reader->metadata()->RowGroup(row_group_index);
1044  auto column_metadata = group_metadata->ColumnChunk(column_index);
1045  auto stats = validate_and_get_column_metadata_statistics(column_metadata.get());
1046  if (!stats->HasMinMax()) {
1047  auto find_it = std::find_if(def_levels,
1048  def_levels + num_levels,
1049  [](const int16_t def_level) { return def_level == 3; });
1050  if (find_it != def_levels + num_levels) {
1051  throw std::runtime_error(
1052  "No minimum and maximum statistic set in list column but non-null & non-empty "
1053  "array/value detected.");
1054  }
1055  }
1056 }
1057 
1059  const ColumnDescriptor* omnisci_column_descriptor,
1060  const parquet::ColumnDescriptor* parquet_column_descriptor) {
1061  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
1062  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
1063  throw std::runtime_error(
1064  "Unsupported mapping detected. Column '" + parquet_column_descriptor->name() +
1065  "' detected to be a parquet list but OmniSci mapped column '" +
1066  omnisci_column_descriptor->columnName + "' is not an array.");
1067  }
1068  if (is_valid_parquet_list) {
1069  if (parquet_column_descriptor->max_repetition_level() != 1 ||
1070  parquet_column_descriptor->max_definition_level() != 3) {
1071  throw std::runtime_error(
1072  "Incorrect schema max repetition level detected in column '" +
1073  parquet_column_descriptor->name() +
1074  "'. Expected a max repetition level of 1 and max definition level of 3 for "
1075  "list column but column has a max "
1076  "repetition level of " +
1077  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1078  " and a max definition level of " +
1079  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1080  }
1081  } else {
1082  if (parquet_column_descriptor->max_repetition_level() != 0 ||
1083  parquet_column_descriptor->max_definition_level() != 1) {
1084  throw std::runtime_error(
1085  "Incorrect schema max repetition level detected in column '" +
1086  parquet_column_descriptor->name() +
1087  "'. Expected a max repetition level of 0 and max definition level of 1 for "
1088  "flat column but column has a max "
1089  "repetition level of " +
1090  std::to_string(parquet_column_descriptor->max_repetition_level()) +
1091  " and a max definition level of " +
1092  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
1093  }
1094  }
1095 }
1096 
1097 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
1098  const parquet::ColumnDescriptor* parquet_column,
1099  std::vector<int8_t>& values) {
1100  auto max_type_byte_size =
1101  std::max(omnisci_column->columnType.get_size(),
1102  parquet::GetTypeByteSize(parquet_column->physical_type()));
1103  size_t values_size =
1105  values.resize(values_size);
1106 }
1107 
1108 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
1109  const parquet::ColumnDescriptor* parquet_column) {
1110  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1111  parquet_column->logical_type().get())) {
1112  return omnisci_column->columnType.get_precision() ==
1113  decimal_logical_column->precision() &&
1114  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
1115  omnisci_column->columnType.is_decimal() &&
1116  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1117  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1118  }
1119  return false;
1120 }
1121 
1123  const parquet::ColumnDescriptor* parquet_column) {
1124  if (!omnisci_column->columnType.is_fp()) {
1125  return false;
1126  }
1127  // check if mapping is a valid coerced or non-coerced floating point mapping
1128  // with no annotation (floating point columns have no annotation in the
1129  // Parquet specification)
1130  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1131  return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1132  (parquet_column->physical_type() == parquet::Type::FLOAT &&
1133  omnisci_column->columnType.get_type() == kFLOAT);
1134  }
1135  return false;
1136 }
1137 
1138 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
1139  const parquet::ColumnDescriptor* parquet_column) {
1140  if (!omnisci_column->columnType.is_integer()) {
1141  return false;
1142  }
1143  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1144  parquet_column->logical_type().get())) {
1145  CHECK(omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1146  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1147  const int bits_per_byte = 8;
1148  // unsigned types are permitted to map to a wider integral type in order to avoid
1149  // precision loss
1150  const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1151  return omnisci_column->columnType.get_size() * bits_per_byte <=
1152  int_logical_column->bit_width() * bit_widening_factor;
1153  }
1154  // check if mapping is a valid coerced or non-coerced integral mapping with no
1155  // annotation
1156  if ((omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1157  omnisci_column->columnType.get_compression() == kENCODING_FIXED)) {
1158  return (parquet_column->physical_type() == parquet::Type::INT64) ||
1159  (parquet_column->physical_type() == parquet::Type::INT32 &&
1160  omnisci_column->columnType.get_size() <= 4);
1161  }
1162  return false;
1163 }
1164 
1165 bool is_nanosecond_precision(const ColumnDescriptor* omnisci_column) {
1166  return omnisci_column->columnType.get_dimension() == 9;
1167 }
1168 
1170  const parquet::TimestampLogicalType* timestamp_logical_column) {
1171  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1172 }
1173 
1174 bool is_microsecond_precision(const ColumnDescriptor* omnisci_column) {
1175  return omnisci_column->columnType.get_dimension() == 6;
1176 }
1177 
1179  const parquet::TimestampLogicalType* timestamp_logical_column) {
1180  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1181 }
1182 
1183 bool is_millisecond_precision(const ColumnDescriptor* omnisci_column) {
1184  return omnisci_column->columnType.get_dimension() == 3;
1185 }
1186 
1188  const parquet::TimestampLogicalType* timestamp_logical_column) {
1189  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1190 }
1191 
1193  const parquet::ColumnDescriptor* parquet_column) {
1194  bool is_none_encoded_mapping =
1195  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
1196  (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1197  omnisci_column->columnType.get_type() == kBOOLEAN);
1198  return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1199 }
1200 
1202  const parquet::ColumnDescriptor* parquet_column) {
1203  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
1204  ((omnisci_column->columnType.get_compression() == kENCODING_NONE) ||
1205  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1206  omnisci_column->columnType.get_comp_param() == 32)))) {
1207  return false;
1208  }
1209  // check the annotated case
1210  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1211  parquet_column->logical_type().get())) {
1212  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1213  return omnisci_column->columnType.get_dimension() == 0 ||
1214  ((is_nanosecond_precision(omnisci_column) &&
1215  is_nanosecond_precision(timestamp_logical_column)) ||
1216  (is_microsecond_precision(omnisci_column) &&
1217  is_microsecond_precision(timestamp_logical_column)) ||
1218  (is_millisecond_precision(omnisci_column) &&
1219  is_millisecond_precision(timestamp_logical_column)));
1220  }
1221  if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
1222  return omnisci_column->columnType.get_dimension() == 0;
1223  }
1224  }
1225  // check the unannotated case
1226  if (parquet_column->logical_type()->is_none() &&
1227  ((parquet_column->physical_type() == parquet::Type::INT32 &&
1228  omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1229  omnisci_column->columnType.get_comp_param() == 32) ||
1230  parquet_column->physical_type() == parquet::Type::INT64)) {
1231  return true;
1232  }
1233  return false;
1234 }
1235 
1236 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
1237  const parquet::ColumnDescriptor* parquet_column) {
1238  if (!(omnisci_column->columnType.get_type() == kTIME &&
1239  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1240  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1241  omnisci_column->columnType.get_comp_param() == 32)))) {
1242  return false;
1243  }
1244  if (parquet_column->logical_type()->is_time()) {
1245  return true;
1246  }
1247  return false;
1248 }
1249 
1250 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
1251  const parquet::ColumnDescriptor* parquet_column) {
1252  if (!(omnisci_column->columnType.get_type() == kDATE &&
1253  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
1254  (omnisci_column->columnType.get_comp_param() ==
1255  0 // DATE ENCODING DAYS (32) specifies comp_param of 0
1256  || omnisci_column->columnType.get_comp_param() == 16)) ||
1257  omnisci_column->columnType.get_compression() ==
1258  kENCODING_NONE // for array types
1259  ))) {
1260  return false;
1261  }
1262  return parquet_column->logical_type()->is_date() ||
1263  parquet_column->logical_type()
1264  ->is_timestamp(); // to support TIMESTAMP -> DATE coercion
1265 }
1266 
1267 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
1268  const parquet::ColumnDescriptor* parquet_column) {
1269  return is_valid_parquet_string(parquet_column) &&
1270  omnisci_column->columnType.is_string() &&
1271  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1272  omnisci_column->columnType.get_compression() == kENCODING_DICT);
1273 }
1274 
1275 bool validate_array_mapping(const ColumnDescriptor* omnisci_column,
1276  const parquet::ColumnDescriptor* parquet_column) {
1277  if (is_valid_parquet_list_column(parquet_column) &&
1278  omnisci_column->columnType.is_array()) {
1279  auto omnisci_column_sub_type_column = get_sub_type_column_descriptor(omnisci_column);
1281  omnisci_column_sub_type_column.get(), parquet_column);
1282  }
1283  return false;
1284 }
1285 
1287  const parquet::ColumnDescriptor* parquet_column) {
1288  return is_valid_parquet_string(parquet_column) &&
1289  omnisci_column->columnType.is_geometry();
1290 }
1291 
1292 void validate_equal_schema(const parquet::arrow::FileReader* reference_file_reader,
1293  const parquet::arrow::FileReader* new_file_reader,
1294  const std::string& reference_file_path,
1295  const std::string& new_file_path) {
1296  const auto reference_num_columns =
1297  reference_file_reader->parquet_reader()->metadata()->num_columns();
1298  const auto new_num_columns =
1299  new_file_reader->parquet_reader()->metadata()->num_columns();
1300  if (reference_num_columns != new_num_columns) {
1301  throw std::runtime_error{"Parquet file \"" + new_file_path +
1302  "\" has a different schema. Please ensure that all Parquet "
1303  "files use the same schema. Reference Parquet file: \"" +
1304  reference_file_path + "\" has " +
1305  std::to_string(reference_num_columns) +
1306  " columns. New Parquet file \"" + new_file_path + "\" has " +
1307  std::to_string(new_num_columns) + " columns."};
1308  }
1309 
1310  for (int i = 0; i < reference_num_columns; i++) {
1312  get_column_descriptor(new_file_reader, i),
1313  reference_file_path,
1314  new_file_path);
1315  }
1316 }
1317 
1318 void validate_allowed_mapping(const parquet::ColumnDescriptor* parquet_column,
1319  const ColumnDescriptor* omnisci_column) {
1320  parquet::Type::type physical_type = parquet_column->physical_type();
1321  auto logical_type = parquet_column->logical_type();
1322  bool allowed_type =
1323  LazyParquetChunkLoader::isColumnMappingSupported(omnisci_column, parquet_column);
1324  if (!allowed_type) {
1325  if (logical_type->is_timestamp()) {
1326  auto timestamp_type =
1327  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
1328  CHECK(timestamp_type);
1329 
1330  if (!timestamp_type->is_adjusted_to_utc()) {
1331  LOG(WARNING) << "Non-UTC timezone specified in Parquet file for column \""
1332  << omnisci_column->columnName
1333  << "\". Only UTC timezone is currently supported.";
1334  }
1335  }
1336  std::string parquet_type;
1337  if (parquet_column->logical_type()->is_none()) {
1338  parquet_type = parquet::TypeToString(physical_type);
1339  } else {
1340  parquet_type = logical_type->ToString();
1341  }
1342  std::string omnisci_type = omnisci_column->columnType.get_type_name();
1343  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
1344  "\" to OmniSci type \"" + omnisci_type +
1345  "\" is not allowed. Please use an appropriate column type."};
1346  }
1347 }
1348 
1350  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1351  const std::string& file_path,
1352  const ForeignTableSchema& schema) {
1353  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
1355  schema.numLogicalColumns(), file_metadata->num_columns(), file_path);
1356  }
1357 }
1358 
1359 void throw_missing_metadata_error(const int row_group_index,
1360  const int column_index,
1361  const std::string& file_path) {
1362  throw std::runtime_error{
1363  "Statistics metadata is required for all row groups. Metadata is missing for "
1364  "row group index: " +
1365  std::to_string(row_group_index) +
1366  ", column index: " + std::to_string(column_index) + ", file path: " + file_path};
1367 }
1368 
1370  const int64_t max_row_group_size,
1371  const int fragment_size,
1372  const std::string& file_path) {
1373  throw std::runtime_error{
1374  "Parquet file has a row group size that is larger than the fragment size. "
1375  "Please set the table fragment size to a number that is larger than the "
1376  "row group size. Row group index: " +
1377  std::to_string(row_group_index) +
1378  ", row group size: " + std::to_string(max_row_group_size) +
1379  ", fragment size: " + std::to_string(fragment_size) + ", file path: " + file_path};
1380 }
1381 
1383  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1384  const std::string& file_path,
1385  const ForeignTableSchema& schema) {
1386  auto column_it = schema.getLogicalColumns().begin();
1387  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1388  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1389  try {
1390  validate_allowed_mapping(descr, *column_it);
1391  } catch (std::runtime_error& e) {
1392  std::stringstream error_message;
1393  error_message << e.what() << " Parquet column: " << descr->name()
1394  << ", OmniSci column: " << (*column_it)->columnName
1395  << ", Parquet file: " << file_path << ".";
1396  throw std::runtime_error(error_message.str());
1397  }
1398 
1399  auto fragment_size = schema.getForeignTable()->maxFragRows;
1400  int64_t max_row_group_size = 0;
1401  int max_row_group_index = 0;
1402  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
1403  auto group_metadata = file_metadata->RowGroup(r);
1404  auto num_rows = group_metadata->num_rows();
1405  if (num_rows > max_row_group_size) {
1406  max_row_group_size = num_rows;
1407  max_row_group_index = r;
1408  }
1409 
1410  auto column_chunk = group_metadata->ColumnChunk(i);
1411  bool contains_metadata = column_chunk->is_stats_set();
1412  if (contains_metadata) {
1413  auto stats = column_chunk->statistics();
1414  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1415  bool is_list = is_valid_parquet_list_column(file_metadata->schema()->Column(i));
1416  // Given a list, it is possible it has no min or max if it is comprised
1417  // only of empty lists & nulls. This can not be detected by comparing
1418  // the null count; therefore we afford list types the benefit of the
1419  // doubt in this situation.
1420  if (!(stats->HasMinMax() || is_all_nulls || is_list)) {
1421  contains_metadata = false;
1422  }
1423  }
1424 
1425  if (!contains_metadata) {
1426  throw_missing_metadata_error(r, i, file_path);
1427  }
1428  }
1429 
1430  if (max_row_group_size > fragment_size) {
1432  max_row_group_index, max_row_group_size, fragment_size, file_path);
1433  }
1434  }
1435 }
1436 
1438  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1439  const std::string& file_path,
1440  const ForeignTableSchema& schema) {
1441  validate_number_of_columns(file_metadata, file_path, schema);
1442  validate_column_mapping_and_row_group_metadata(file_metadata, file_path, schema);
1443 }
1444 
1445 std::list<RowGroupMetadata> metadata_scan_rowgroup_interval(
1446  const std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1447  const RowGroupInterval& row_group_interval,
1448  const ReaderPtr& reader,
1449  const ForeignTableSchema& schema) {
1450  std::list<RowGroupMetadata> row_group_metadata;
1451  auto column_interval =
1452  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1453  schema.getLogicalAndPhysicalColumns().back()->columnId};
1454 
1455  auto file_metadata = reader->parquet_reader()->metadata();
1456  for (int row_group = row_group_interval.start_index;
1457  row_group <= row_group_interval.end_index;
1458  ++row_group) {
1459  auto& row_group_metadata_item = row_group_metadata.emplace_back();
1460  row_group_metadata_item.row_group_index = row_group;
1461  row_group_metadata_item.file_path = row_group_interval.file_path;
1462 
1463  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1464  file_metadata->RowGroup(row_group);
1465 
1466  for (int column_id = column_interval.start; column_id <= column_interval.end;
1467  column_id++) {
1468  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1469  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1470  auto encoder_map_iter =
1471  encoder_map.find(schema.getLogicalColumn(column_id)->columnId);
1472  CHECK(encoder_map_iter != encoder_map.end());
1473  try {
1474  auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1475  group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1476  row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1477  } catch (const std::exception& e) {
1478  std::stringstream error_message;
1479  error_message << e.what() << " in row group " << row_group << " of Parquet file '"
1480  << row_group_interval.file_path << "'.";
1481  throw std::runtime_error(error_message.str());
1482  }
1483  }
1484  }
1485  return row_group_metadata;
1486 }
1487 
1488 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_import(
1489  const std::map<int, Chunk_NS::Chunk> chunks,
1490  const ForeignTableSchema& schema,
1491  const ReaderPtr& reader,
1492  const std::map<int, StringDictionary*> column_dictionaries) {
1493  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1494  auto file_metadata = reader->parquet_reader()->metadata();
1495  for (auto& [column_id, chunk] : chunks) {
1496  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1497  if (column_descriptor->isGeoPhyCol) { // skip physical columns
1498  continue;
1499  }
1500  auto parquet_column_descriptor =
1501  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1502  auto find_it = column_dictionaries.find(column_id);
1503  StringDictionary* dictionary =
1504  (find_it == column_dictionaries.end() ? nullptr : find_it->second);
1505  std::list<Chunk_NS::Chunk> chunks_for_import;
1506  chunks_for_import.push_back(chunk);
1507  if (column_descriptor->columnType.is_geometry()) {
1508  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1509  chunks_for_import.push_back(chunks.at(column_id + i + 1));
1510  }
1511  }
1512  encoder_map[column_id] = create_parquet_encoder_for_import(
1513  chunks_for_import, column_descriptor, parquet_column_descriptor, dictionary);
1514  }
1515  return encoder_map;
1516 }
1517 
1518 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map_for_metadata_scan(
1519  const Interval<ColumnType>& column_interval,
1520  const ForeignTableSchema& schema,
1521  const ReaderPtr& reader) {
1522  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1523  auto file_metadata = reader->parquet_reader()->metadata();
1524  for (int column_id = column_interval.start; column_id <= column_interval.end;
1525  column_id++) {
1526  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1527  auto parquet_column_descriptor =
1528  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1529  encoder_map[column_id] = create_parquet_encoder_for_metadata_scan(
1530  column_descriptor, parquet_column_descriptor);
1531  column_id += column_descriptor->columnType.get_physical_cols();
1532  }
1533  return encoder_map;
1534 }
1535 } // namespace
1536 
1537 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroups(
1538  const std::vector<RowGroupInterval>& row_group_intervals,
1539  const int parquet_column_index,
1540  const ColumnDescriptor* column_descriptor,
1541  std::list<Chunk_NS::Chunk>& chunks,
1542  StringDictionary* string_dictionary) {
1543  auto timer = DEBUG_TIMER(__func__);
1544  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1545  // `def_levels` and `rep_levels` below are used to store the read definition
1546  // and repetition levels of the Dremel encoding implemented by the Parquet
1547  // format
1548  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1549  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1550  std::vector<int8_t> values;
1551 
1552  CHECK(!row_group_intervals.empty());
1553  const auto& first_file_path = row_group_intervals.front().file_path;
1554 
1555  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1556  auto first_parquet_column_descriptor =
1557  get_column_descriptor(first_file_reader, parquet_column_index);
1558  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1559  auto encoder = create_parquet_encoder(column_descriptor,
1560  first_parquet_column_descriptor,
1561  chunks,
1562  string_dictionary,
1563  chunk_metadata);
1564  CHECK(encoder.get());
1565 
1566  for (const auto& row_group_interval : row_group_intervals) {
1567  const auto& file_path = row_group_interval.file_path;
1568  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1569 
1570  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1571  CHECK(row_group_interval.start_index >= 0 &&
1572  row_group_interval.end_index < num_row_groups);
1573  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1574 
1575  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1576  auto parquet_column_descriptor =
1577  get_column_descriptor(file_reader, parquet_column_index);
1578  validate_equal_column_descriptor(first_parquet_column_descriptor,
1579  parquet_column_descriptor,
1580  first_file_path,
1581  file_path);
1582 
1584  parquet_column_descriptor);
1585  int64_t values_read = 0;
1586  for (int row_group_index = row_group_interval.start_index;
1587  row_group_index <= row_group_interval.end_index;
1588  ++row_group_index) {
1589  auto group_reader = parquet_reader->RowGroup(row_group_index);
1590  std::shared_ptr<parquet::ColumnReader> col_reader =
1591  group_reader->Column(parquet_column_index);
1592 
1593  try {
1594  while (col_reader->HasNext()) {
1595  int64_t levels_read =
1597  def_levels.data(),
1598  rep_levels.data(),
1599  reinterpret_cast<uint8_t*>(values.data()),
1600  &values_read,
1601  col_reader.get());
1602 
1603  validate_definition_levels(parquet_reader,
1604  row_group_index,
1605  parquet_column_index,
1606  def_levels.data(),
1607  levels_read,
1608  parquet_column_descriptor);
1609 
1610  encoder->appendData(def_levels.data(),
1611  rep_levels.data(),
1612  values_read,
1613  levels_read,
1614  values.data());
1615  }
1616  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1617  array_encoder->finalizeRowGroup();
1618  }
1619  } catch (const std::exception& error) {
1621  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1622  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1623  "', Parquet file: '" + file_path + "'");
1624  }
1625  }
1626  }
1627  return chunk_metadata;
1628 }
1629 
1631  const ColumnDescriptor* omnisci_column,
1632  const parquet::ColumnDescriptor* parquet_column) {
1633  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1634  return true;
1635  }
1636  if (validate_array_mapping(omnisci_column, parquet_column)) {
1637  return true;
1638  }
1639  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1640  return true;
1641  }
1642  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1643  return true;
1644  }
1645  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1646  return true;
1647  }
1648  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1649  return true;
1650  }
1651  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1652  return true;
1653  }
1654  if (validate_time_mapping(omnisci_column, parquet_column)) {
1655  return true;
1656  }
1657  if (validate_date_mapping(omnisci_column, parquet_column)) {
1658  return true;
1659  }
1660  if (validate_string_mapping(omnisci_column, parquet_column)) {
1661  return true;
1662  }
1663  return false;
1664 }
1665 
1667  std::shared_ptr<arrow::fs::FileSystem> file_system,
1668  FileReaderMap* file_map)
1669  : file_system_(file_system), file_reader_cache_(file_map) {}
1670 
1671 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
1672  const std::vector<RowGroupInterval>& row_group_intervals,
1673  const int parquet_column_index,
1674  std::list<Chunk_NS::Chunk>& chunks,
1675  StringDictionary* string_dictionary) {
1676  CHECK(!chunks.empty());
1677  auto const& chunk = *chunks.begin();
1678  auto column_descriptor = chunk.getColumnDesc();
1679  auto buffer = chunk.getBuffer();
1680  CHECK(buffer);
1681 
1682  try {
1683  auto metadata = appendRowGroups(row_group_intervals,
1684  parquet_column_index,
1685  column_descriptor,
1686  chunks,
1687  string_dictionary);
1688  return metadata;
1689  } catch (const std::exception& error) {
1690  throw ForeignStorageException(error.what());
1691  }
1692 
1693  return {};
1694 }
1695 
1698  : def_levels(LazyParquetChunkLoader::batch_reader_num_elements)
1699  , rep_levels(LazyParquetChunkLoader::batch_reader_num_elements) {}
1700  std::vector<int16_t> def_levels;
1701  std::vector<int16_t> rep_levels;
1702  std::vector<int8_t> values;
1703  int64_t values_read;
1704  int64_t levels_read;
1705 };
1706 
1708  public:
1709  ParquetRowGroupReader(std::shared_ptr<parquet::ColumnReader> col_reader,
1710  const ColumnDescriptor* column_descriptor,
1711  const parquet::ColumnDescriptor* parquet_column_descriptor,
1712  ParquetEncoder* encoder,
1713  InvalidRowGroupIndices& invalid_indices,
1714  const int row_group_index,
1715  const int parquet_column_index,
1716  const parquet::ParquetFileReader* parquet_reader)
1717  : col_reader_(col_reader)
1718  , column_descriptor_(column_descriptor)
1719  , parquet_column_descriptor_(parquet_column_descriptor)
1720  , encoder_(encoder)
1721  , invalid_indices_(invalid_indices)
1722  , row_group_index_(row_group_index)
1723  , parquet_column_index_(parquet_column_index)
1724  , parquet_reader_(parquet_reader) {
1725  import_encoder = dynamic_cast<ParquetImportEncoder*>(encoder);
1727  }
1728 
1730  while (col_reader_->HasNext()) {
1731  ParquetBatchData batch_data;
1734  batch_data.levels_read =
1736  batch_data.def_levels.data(),
1737  batch_data.rep_levels.data(),
1738  reinterpret_cast<uint8_t*>(batch_data.values.data()),
1739  &batch_data.values_read,
1740  col_reader_.get());
1747  batch_data.def_levels.data(),
1748  batch_data.levels_read,
1750  import_encoder->validateAndAppendData(batch_data.def_levels.data(),
1751  batch_data.rep_levels.data(),
1752  batch_data.values_read,
1753  batch_data.levels_read,
1754  batch_data.values.data(),
1755  column_type,
1757  }
1758  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder_)) {
1759  array_encoder->finalizeRowGroup();
1760  }
1761  }
1762 
1765  }
1766 
1767  private:
1768  std::shared_ptr<parquet::ColumnReader> col_reader_;
1770  const parquet::ColumnDescriptor* parquet_column_descriptor_;
1774  const int row_group_index_;
1776  const parquet::ParquetFileReader* parquet_reader_;
1777 };
1778 
1779 std::pair<size_t, size_t> LazyParquetChunkLoader::loadRowGroups(
1780  const RowGroupInterval& row_group_interval,
1781  const std::map<int, Chunk_NS::Chunk>& chunks,
1782  const ForeignTableSchema& schema,
1783  const std::map<int, StringDictionary*>& column_dictionaries) {
1784  auto timer = DEBUG_TIMER(__func__);
1785 
1786  const auto& file_path = row_group_interval.file_path;
1787  auto file_reader = file_reader_cache_->insert(file_path, file_system_);
1788  auto file_metadata = file_reader->parquet_reader()->metadata();
1789 
1790  validate_number_of_columns(file_metadata, file_path, schema);
1791 
1792  // check for fixed length encoded columns and indicate to the user
1793  // they should not be used
1794  for (const auto column_descriptor : schema.getLogicalColumns()) {
1795  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
1796  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
1797  try {
1798  validate_allowed_mapping(parquet_column, column_descriptor);
1799  } catch (std::runtime_error& e) {
1800  std::stringstream error_message;
1801  error_message << e.what() << " Parquet column: " << parquet_column->name()
1802  << ", OmniSci column: " << column_descriptor->columnName
1803  << ", Parquet file: " << file_path << ".";
1804  throw std::runtime_error(error_message.str());
1805  }
1806  }
1807 
1808  auto encoder_map =
1809  populate_encoder_map_for_import(chunks, schema, file_reader, column_dictionaries);
1810 
1811  CHECK(row_group_interval.start_index == row_group_interval.end_index);
1812  auto row_group_index = row_group_interval.start_index;
1813  std::map<int, ParquetRowGroupReader> row_group_reader_map;
1814 
1815  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1816  auto group_reader = parquet_reader->RowGroup(row_group_index);
1817  InvalidRowGroupIndices invalid_indices;
1818  for (auto& [column_id, encoder] : encoder_map) {
1819  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
1820  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1821  auto parquet_column_descriptor =
1822  file_metadata->schema()->Column(parquet_column_index);
1823 
1824  // validate
1825  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1826  CHECK(row_group_interval.start_index >= 0 &&
1827  row_group_interval.end_index < num_row_groups);
1828  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1830  parquet_column_descriptor);
1831 
1832  std::shared_ptr<parquet::ColumnReader> col_reader =
1833  group_reader->Column(parquet_column_index);
1834 
1835  row_group_reader_map.insert({column_id,
1836  ParquetRowGroupReader(col_reader,
1837  column_descriptor,
1838  parquet_column_descriptor,
1839  encoder_map[column_id].get(),
1840  invalid_indices,
1841  row_group_index,
1842  parquet_column_index,
1843  parquet_reader)});
1844  }
1845 
1846  for (auto& [_, reader] : row_group_reader_map) {
1847  reader.readAndValidateRowGroup(); // reads and validates entire row group per column
1848  }
1849 
1850  for (auto& [_, reader] : row_group_reader_map) {
1851  reader.eraseInvalidRowGroupData(); // removes invalid encoded data in buffers
1852  }
1853 
1854  // update the element count for each encoder
1855  for (const auto column_descriptor : schema.getLogicalColumns()) {
1856  auto column_id = column_descriptor->columnId;
1857  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
1858  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
1859  invalid_indices.size());
1860  size_t updated_num_elems = db_encoder->getNumElems() +
1861  group_reader->metadata()->num_rows() -
1862  invalid_indices.size();
1863  db_encoder->setNumElems(updated_num_elems);
1864  if (column_descriptor->columnType.is_geometry()) {
1865  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1866  auto db_encoder =
1867  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
1868  db_encoder->setNumElems(updated_num_elems);
1869  }
1870  }
1871  }
1872 
1873  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
1874  invalid_indices.size()};
1875 }
1876 
1877 std::list<RowGroupMetadata> LazyParquetChunkLoader::metadataScan(
1878  const std::vector<std::string>& file_paths,
1879  const ForeignTableSchema& schema) {
1880  auto timer = DEBUG_TIMER(__func__);
1881  auto column_interval =
1882  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1883  schema.getLogicalAndPhysicalColumns().back()->columnId};
1884  CHECK(!file_paths.empty());
1885 
1886  // The encoder map needs to be populated before we can start scanning rowgroups, so we
1887  // peel the first file_path out of the async loop below to perform population.
1888  const auto& first_path = *file_paths.begin();
1889  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
1891  first_reader->parquet_reader()->metadata(), first_path, schema);
1892  auto encoder_map =
1893  populate_encoder_map_for_metadata_scan(column_interval, schema, first_reader);
1894  const auto num_row_groups = get_parquet_table_size(first_reader).first;
1895  auto row_group_metadata = metadata_scan_rowgroup_interval(
1896  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
1897 
1898  // We want each (filepath->FileReader) pair in the cache to be initialized before we
1899  // multithread so that we are not adding keys in a concurrent environment, so we add
1900  // cache entries for each path and initialize to an empty unique_ptr if the file has not
1901  // yet been opened.
1902  // Since we have already performed the first iteration, we skip it in the thread groups
1903  // so as not to process it twice.
1904  std::vector<std::string> cache_subset;
1905  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
1907  cache_subset.emplace_back(*path_it);
1908  }
1909 
1910  // Iterate asyncronously over any paths beyond the first.
1911  auto paths_per_thread = partition_for_threads(cache_subset, g_max_import_threads);
1912  std::vector<std::future<std::list<RowGroupMetadata>>> futures;
1913  for (const auto& path_group : paths_per_thread) {
1914  futures.emplace_back(std::async(
1916  [&](const auto& paths, const auto& file_reader_cache) {
1917  std::list<RowGroupMetadata> reduced_metadata;
1918  for (const auto& path : paths.get()) {
1919  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
1920  validate_equal_schema(first_reader, reader, first_path, path);
1921  validate_parquet_metadata(reader->parquet_reader()->metadata(), path, schema);
1922  const auto num_row_groups = get_parquet_table_size(reader).first;
1923  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
1924  reduced_metadata.splice(
1925  reduced_metadata.end(),
1926  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
1927  }
1928  return reduced_metadata;
1929  },
1930  std::ref(path_group),
1931  std::ref(*file_reader_cache_)));
1932  }
1933 
1934  // Reduce all the row_group results.
1935  for (auto& future : futures) {
1936  row_group_metadata.splice(row_group_metadata.end(), future.get());
1937  }
1938  return row_group_metadata;
1939 }
1940 
1941 } // namespace foreign_storage
std::shared_ptr< parquet::ColumnReader > col_reader_
static constexpr int64_t kSecsPerDay
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_metadata_scan(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
void validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
HOST DEVICE SQLTypes get_subtype() const
Definition: sqltypes.h:330
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:109
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:49
const parquet::ParquetFileReader * parquet_reader_
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import)
#define LOG(tag)
Definition: Logger.h:203
std::shared_ptr< parquet::Statistics > validate_and_get_column_metadata_statistics(const parquet::ColumnChunkMetaData *column_metadata)
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
bool is_fp() const
Definition: sqltypes.h:513
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader)
#define DOUBLE
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
const parquet::ColumnDescriptor * parquet_column_descriptor_
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan, const bool is_for_import)
#define UNREACHABLE()
Definition: Logger.h:253
void throw_row_group_larger_than_fragment_size_error(const int row_group_index, const int64_t max_row_group_size, const int fragment_size, const std::string &file_path)
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
std::string to_string(char const *&&v)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries)
int getParquetColumnIndex(const int column_id) const
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ColumnDescriptor * getLogicalColumn(const int column_id) const
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, bool is_for_import)
future< Result > async(Fn &&fn, Args &&...args)
bool is_fixlen_array() const
Definition: sqltypes.h:519
std::set< int64_t > InvalidRowGroupIndices
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
bool is_integer() const
Definition: sqltypes.h:511
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
parquet::arrow::FileReader * ReaderPtr
Definition: ParquetShared.h:33
int get_precision() const
Definition: sqltypes.h:332
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
Definition: sqltypes.h:53
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:337
AbstractBuffer * getBuffer() const
Definition: Chunk.h:107
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
virtual void validateAndAppendData(const int16_t *def_levels, const int16_t *rep_levels, const int64_t values_read, const int64_t levels_read, int8_t *values, const SQLTypeInfo &column_type, InvalidRowGroupIndices &invalid_indices)=0
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:331
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::string get_type_name() const
Definition: sqltypes.h:432
V & get_from_map(std::map< K, V > &map, const K &key)
Definition: misc.h:58
void initializeIfEmpty(const std::string &path)
virtual void eraseInvalidIndicesInBuffer(const InvalidRowGroupIndices &invalid_indices)=0
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
std::shared_ptr< ParquetEncoder > create_parquet_encoder_for_import(std::list< Chunk_NS::Chunk > &chunks, const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, StringDictionary *string_dictionary)
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
ParquetRowGroupReader(std::shared_ptr< parquet::ColumnReader > col_reader, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, ParquetEncoder *encoder, InvalidRowGroupIndices &invalid_indices, const int row_group_index, const int parquet_column_index, const parquet::ParquetFileReader *parquet_reader)
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan=false, const bool is_for_import=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
#define CHECK(condition)
Definition: Logger.h:209
bool is_geometry() const
Definition: sqltypes.h:521
const ForeignTable * getForeignTable() const
#define DEBUG_TIMER(name)
Definition: Logger.h:352
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan_or_for_import)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:45
void validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:509
std::list< RowGroupMetadata > metadataScan(const std::vector< std::string > &file_paths, const ForeignTableSchema &schema)
Perform a metadata scan for the paths specified.
#define FLOAT
std::pair< size_t, size_t > loadRowGroups(const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries)
Load row groups of data into given chunks.
bool is_decimal() const
Definition: sqltypes.h:512
std::string columnName
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
size_t g_max_import_threads
Definition: Importer.cpp:85
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:517
void validate_definition_levels(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
#define BOOLEAN