OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
27 
31 #include "ParquetDecimalEncoder.h"
35 #include "ParquetStringEncoder.h"
37 #include "ParquetTimeEncoder.h"
40 
41 namespace foreign_storage {
42 
43 namespace {
44 
45 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
46  return (parquet_column->logical_type()->is_none() &&
47  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
48  parquet_column->logical_type()->is_string();
49 }
50 
87 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
88  const parquet::schema::Node* node = parquet_column->schema_node().get();
89  if ((node->name() != "element" && node->name() != "item") ||
90  !(node->is_required() ||
91  node->is_optional())) { // ensure first innermost node is named "element"
92  // which is required by the parquet specification;
93  // however testing shows that pyarrow generates this
94  // column with the name of "item"
95  // this field must be either required or optional
96  return false;
97  }
98  node = node->parent();
99  if (!node) { // required nested structure
100  return false;
101  }
102  if (node->name() != "list" || !node->is_repeated() ||
103  !node->is_group()) { // ensure second innermost node is named "list" which is
104  // a repeated group; this is
105  // required by the parquet specification
106  return false;
107  }
108  node = node->parent();
109  if (!node) { // required nested structure
110  return false;
111  }
112  if (!node->logical_type()->is_list() ||
113  !(node->is_optional() ||
114  node->is_required())) { // ensure third outermost node has logical type LIST
115  // which is either optional or required; this is required
116  // by the parquet specification
117  return false;
118  }
119  node =
120  node->parent(); // this must now be the root node of schema which is required by
121  // FSI (lists can not be embedded into a deeper nested structure)
122  if (!node) { // required nested structure
123  return false;
124  }
125  node = node->parent();
126  if (node) { // implies the previous node was not the root node
127  return false;
128  }
129  return true;
130 }
131 
132 template <typename V>
133 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
134  const ColumnDescriptor* column_descriptor,
135  const parquet::ColumnDescriptor* parquet_column_descriptor,
136  AbstractBuffer* buffer) {
137  switch (parquet_column_descriptor->physical_type()) {
138  case parquet::Type::INT32:
139  return std::make_shared<ParquetDecimalEncoder<V, int32_t>>(
140  buffer, column_descriptor, parquet_column_descriptor);
141  case parquet::Type::INT64:
142  return std::make_shared<ParquetDecimalEncoder<V, int64_t>>(
143  buffer, column_descriptor, parquet_column_descriptor);
144  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
145  return std::make_shared<ParquetDecimalEncoder<V, parquet::FixedLenByteArray>>(
146  buffer, column_descriptor, parquet_column_descriptor);
147  case parquet::Type::BYTE_ARRAY:
148  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray>>(
149  buffer, column_descriptor, parquet_column_descriptor);
150  default:
151  UNREACHABLE();
152  }
153  return {};
154 }
155 
156 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
157  const ColumnDescriptor* omnisci_column,
158  const parquet::ColumnDescriptor* parquet_column,
159  AbstractBuffer* buffer,
160  const bool is_metadata_scan) {
161  if (parquet_column->logical_type()->is_decimal()) {
162  if (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
163  is_metadata_scan) {
164  return create_parquet_decimal_encoder_with_omnisci_type<int64_t>(
165  omnisci_column, parquet_column, buffer);
166  } else if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
167  switch (omnisci_column->columnType.get_comp_param()) {
168  case 16:
169  return create_parquet_decimal_encoder_with_omnisci_type<int16_t>(
170  omnisci_column, parquet_column, buffer);
171  case 32:
172  return create_parquet_decimal_encoder_with_omnisci_type<int32_t>(
173  omnisci_column, parquet_column, buffer);
174  default:
175  UNREACHABLE();
176  }
177  } else {
178  UNREACHABLE();
179  }
180  }
181  return {};
182 }
183 
198 template <typename V, typename T, typename U>
199 std::shared_ptr<ParquetEncoder>
201  AbstractBuffer* buffer,
202  const size_t omnisci_data_type_byte_size,
203  const size_t parquet_data_type_byte_size,
204  const bool is_signed) {
205  if (is_signed) {
206  return std::make_shared<ParquetFixedLengthEncoder<V, T>>(
207  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
208  } else {
209  return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U>>(
210  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
211  }
212 }
213 
233 template <typename V>
235  AbstractBuffer* buffer,
236  const size_t omnisci_data_type_byte_size,
237  const size_t parquet_data_type_byte_size,
238  const int bit_width,
239  const bool is_signed) {
240  switch (bit_width) {
241  case 8:
243  int32_t,
244  uint8_t>(
245  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
246  case 16:
248  int32_t,
249  uint16_t>(
250  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
251  case 32:
253  int32_t,
254  uint32_t>(
255  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
256  case 64:
258  int64_t,
259  uint64_t>(
260  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
261  default:
262  UNREACHABLE();
263  }
264  return {};
265 }
266 
267 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
268  const ColumnDescriptor* omnisci_column,
269  const parquet::ColumnDescriptor* parquet_column,
270  AbstractBuffer* buffer,
271  const bool is_metadata_scan) {
272  auto column_type = omnisci_column->columnType;
273  auto physical_type = parquet_column->physical_type();
274 
275  int bit_width = -1;
276  int is_signed = false;
277  // handle the integral case with no Parquet annotation
278  if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
279  if (physical_type == parquet::Type::INT32) {
280  bit_width = 32;
281  } else if (physical_type == parquet::Type::INT64) {
282  bit_width = 64;
283  } else {
284  UNREACHABLE();
285  }
286  is_signed = true;
287  }
288  // handle the integral case with Parquet annotation
289  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
290  parquet_column->logical_type().get())) {
291  bit_width = int_logical_column->bit_width();
292  is_signed = int_logical_column->is_signed();
293  }
294 
295  if (bit_width == -1) { // no valid logical type (with or without annotation) found
296  return {};
297  }
298 
299  const size_t omnisci_data_type_byte_size = column_type.get_size();
300  const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
301 
302  switch (omnisci_data_type_byte_size) {
303  case 8:
304  CHECK(column_type.get_compression() == kENCODING_NONE);
305  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
306  buffer,
307  omnisci_data_type_byte_size,
308  parquet_data_type_byte_size,
309  bit_width,
310  is_signed);
311  case 4:
312  if (is_metadata_scan && column_type.get_type() == kBIGINT) {
313  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
314  buffer,
315  omnisci_data_type_byte_size,
316  parquet_data_type_byte_size,
317  bit_width,
318  is_signed);
319  }
320  return create_parquet_integral_encoder_with_omnisci_type<int32_t>(
321  buffer,
322  omnisci_data_type_byte_size,
323  parquet_data_type_byte_size,
324  bit_width,
325  is_signed);
326  case 2:
327  if (is_metadata_scan) {
328  switch (column_type.get_type()) {
329  case kBIGINT:
330  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
331  buffer,
332  omnisci_data_type_byte_size,
333  parquet_data_type_byte_size,
334  bit_width,
335  is_signed);
336  case kINT:
337  return create_parquet_integral_encoder_with_omnisci_type<int32_t>(
338  buffer,
339  omnisci_data_type_byte_size,
340  parquet_data_type_byte_size,
341  bit_width,
342  is_signed);
343  case kSMALLINT:
344  break;
345  default:
346  UNREACHABLE();
347  }
348  }
349  return create_parquet_integral_encoder_with_omnisci_type<int16_t>(
350  buffer,
351  omnisci_data_type_byte_size,
352  parquet_data_type_byte_size,
353  bit_width,
354  is_signed);
355  case 1:
356  if (is_metadata_scan) {
357  switch (column_type.get_type()) {
358  case kBIGINT:
359  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
360  buffer,
361  omnisci_data_type_byte_size,
362  parquet_data_type_byte_size,
363  bit_width,
364  is_signed);
365  case kINT:
366  return create_parquet_integral_encoder_with_omnisci_type<int32_t>(
367  buffer,
368  omnisci_data_type_byte_size,
369  parquet_data_type_byte_size,
370  bit_width,
371  is_signed);
372  case kSMALLINT:
373  return create_parquet_integral_encoder_with_omnisci_type<int16_t>(
374  buffer,
375  omnisci_data_type_byte_size,
376  parquet_data_type_byte_size,
377  bit_width,
378  is_signed);
379  case kTINYINT:
380  break;
381  default:
382  UNREACHABLE();
383  }
384  }
385  return create_parquet_integral_encoder_with_omnisci_type<int8_t>(
386  buffer,
387  omnisci_data_type_byte_size,
388  parquet_data_type_byte_size,
389  bit_width,
390  is_signed);
391  default:
392  UNREACHABLE();
393  }
394  return {};
395 }
396 
397 std::shared_ptr<ParquetEncoder> create_parquet_floating_point_encoder(
398  const ColumnDescriptor* omnisci_column,
399  const parquet::ColumnDescriptor* parquet_column,
400  AbstractBuffer* buffer) {
401  auto column_type = omnisci_column->columnType;
402  if (!column_type.is_fp()) {
403  return {};
404  }
405  CHECK_EQ(column_type.get_compression(), kENCODING_NONE);
406  switch (column_type.get_type()) {
407  case kFLOAT:
408  switch (parquet_column->physical_type()) {
409  case parquet::Type::FLOAT:
410  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
411  buffer, omnisci_column, parquet_column);
412  case parquet::Type::DOUBLE:
413  return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
414  buffer, omnisci_column, parquet_column);
415  default:
416  UNREACHABLE();
417  }
418  case kDOUBLE:
419  CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
420  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
421  buffer, omnisci_column, parquet_column);
422  default:
423  UNREACHABLE();
424  }
425  return {};
426 }
427 
428 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
429  const ColumnDescriptor* omnisci_column,
430  const parquet::ColumnDescriptor* parquet_column,
431  AbstractBuffer* buffer) {
432  auto column_type = omnisci_column->columnType;
433  if (parquet_column->logical_type()->is_none() &&
434  !omnisci_column->columnType.is_string()) { // boolean
435  if (column_type.get_compression() == kENCODING_NONE) {
436  switch (column_type.get_type()) {
437  case kBOOLEAN:
438  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
439  buffer, omnisci_column, parquet_column);
440  default:
441  UNREACHABLE();
442  }
443  } else {
444  UNREACHABLE();
445  }
446  }
447  return {};
448 }
449 
450 template <typename V, typename T>
451 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder_with_types(
452  const ColumnDescriptor* omnisci_column,
453  const parquet::ColumnDescriptor* parquet_column,
454  AbstractBuffer* buffer) {
455  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
456  parquet_column->logical_type().get())) {
457  switch (timestamp_logical_type->time_unit()) {
458  case parquet::LogicalType::TimeUnit::MILLIS:
459  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L>>(
460  buffer, omnisci_column, parquet_column);
461  case parquet::LogicalType::TimeUnit::MICROS:
462  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L>>(
463  buffer, omnisci_column, parquet_column);
464  case parquet::LogicalType::TimeUnit::NANOS:
465  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L * 1000L>>(
466  buffer, omnisci_column, parquet_column);
467  default:
468  UNREACHABLE();
469  }
470  } else {
471  UNREACHABLE();
472  }
473  return {};
474 }
475 
476 template <typename V, typename T>
478  const ColumnDescriptor* omnisci_column,
479  const parquet::ColumnDescriptor* parquet_column,
480  AbstractBuffer* buffer) {
481  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
482  parquet_column->logical_type().get())) {
483  switch (timestamp_logical_type->time_unit()) {
484  case parquet::LogicalType::TimeUnit::MILLIS:
485  return std::make_shared<
487  buffer, omnisci_column, parquet_column);
488  case parquet::LogicalType::TimeUnit::MICROS:
489  return std::make_shared<
491  buffer, omnisci_column, parquet_column);
492  case parquet::LogicalType::TimeUnit::NANOS:
493  return std::make_shared<
495  buffer, omnisci_column, parquet_column);
496  default:
497  UNREACHABLE();
498  }
499  } else {
500  UNREACHABLE();
501  }
502  return {};
503 }
504 
505 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
506  const ColumnDescriptor* omnisci_column,
507  const parquet::ColumnDescriptor* parquet_column,
508  AbstractBuffer* buffer,
509  const bool is_metadata_scan) {
510  auto column_type = omnisci_column->columnType;
511  auto precision = column_type.get_precision();
512  if (parquet_column->logical_type()->is_timestamp()) {
513  if (column_type.get_compression() == kENCODING_NONE) {
514  if (precision == 0) {
515  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t>(
516  omnisci_column, parquet_column, buffer);
517  } else {
518  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
519  buffer, omnisci_column, parquet_column);
520  }
521  } else if (column_type.get_compression() == kENCODING_FIXED) {
522  CHECK(column_type.get_comp_param() == 32);
523  if (is_metadata_scan) {
524  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t>(
525  omnisci_column, parquet_column, buffer);
526  } else {
527  return create_parquet_timestamp_encoder_with_types<int32_t, int64_t>(
528  omnisci_column, parquet_column, buffer);
529  }
530  }
531  } else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
532  if (parquet_column->physical_type() == parquet::Type::INT32) {
533  CHECK(column_type.get_compression() == kENCODING_FIXED &&
534  column_type.get_comp_param() == 32);
535  if (is_metadata_scan) {
536  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t>>(
537  buffer, omnisci_column, parquet_column);
538  } else {
539  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
540  buffer, omnisci_column, parquet_column);
541  }
542  } else if (parquet_column->physical_type() == parquet::Type::INT64) {
543  if (column_type.get_compression() == kENCODING_NONE) {
544  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
545  buffer, omnisci_column, parquet_column);
546  } else if (column_type.get_compression() == kENCODING_FIXED) {
547  CHECK(column_type.get_comp_param() == 32);
548  if (is_metadata_scan) {
549  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
550  buffer, omnisci_column, parquet_column);
551  } else {
552  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t>>(
553  buffer, omnisci_column, parquet_column);
554  }
555  }
556  } else {
557  UNREACHABLE();
558  }
559  }
560  return {};
561 }
562 
563 template <typename V, typename T>
564 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder_with_types(
565  const ColumnDescriptor* omnisci_column,
566  const parquet::ColumnDescriptor* parquet_column,
567  AbstractBuffer* buffer) {
568  if (auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
569  parquet_column->logical_type().get())) {
570  switch (time_logical_type->time_unit()) {
571  case parquet::LogicalType::TimeUnit::MILLIS:
572  return std::make_shared<ParquetTimeEncoder<V, T, 1000L>>(
573  buffer, omnisci_column, parquet_column);
574  case parquet::LogicalType::TimeUnit::MICROS:
575  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L>>(
576  buffer, omnisci_column, parquet_column);
577  case parquet::LogicalType::TimeUnit::NANOS:
578  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L * 1000L>>(
579  buffer, omnisci_column, parquet_column);
580  default:
581  UNREACHABLE();
582  }
583  } else {
584  UNREACHABLE();
585  }
586  return {};
587 }
588 
589 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
590  const ColumnDescriptor* omnisci_column,
591  const parquet::ColumnDescriptor* parquet_column,
592  AbstractBuffer* buffer,
593  const bool is_metadata_scan) {
594  auto column_type = omnisci_column->columnType;
595  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
596  parquet_column->logical_type().get())) {
597  if (column_type.get_compression() == kENCODING_NONE) {
598  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
599  return create_parquet_time_encoder_with_types<int64_t, int32_t>(
600  omnisci_column, parquet_column, buffer);
601  } else {
602  return create_parquet_time_encoder_with_types<int64_t, int64_t>(
603  omnisci_column, parquet_column, buffer);
604  }
605  } else if (column_type.get_compression() == kENCODING_FIXED) {
606  if (is_metadata_scan) {
607  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
608  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
609  return create_parquet_time_encoder_with_types<int64_t, int32_t>(
610  omnisci_column, parquet_column, buffer);
611  } else {
612  CHECK(time_logical_column->time_unit() ==
613  parquet::LogicalType::TimeUnit::MICROS ||
614  time_logical_column->time_unit() ==
615  parquet::LogicalType::TimeUnit::NANOS);
616  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
617  return create_parquet_time_encoder_with_types<int64_t, int64_t>(
618  omnisci_column, parquet_column, buffer);
619  }
620  } else {
621  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
622  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
623  return create_parquet_time_encoder_with_types<int32_t, int32_t>(
624  omnisci_column, parquet_column, buffer);
625  } else {
626  CHECK(time_logical_column->time_unit() ==
627  parquet::LogicalType::TimeUnit::MICROS ||
628  time_logical_column->time_unit() ==
629  parquet::LogicalType::TimeUnit::NANOS);
630  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
631  return create_parquet_time_encoder_with_types<int32_t, int64_t>(
632  omnisci_column, parquet_column, buffer);
633  }
634  }
635  } else {
636  UNREACHABLE();
637  }
638  }
639  return {};
640 }
641 
642 std::shared_ptr<ParquetEncoder> create_parquet_date_from_timestamp_encoder(
643  const ColumnDescriptor* omnisci_column,
644  const parquet::ColumnDescriptor* parquet_column,
645  AbstractBuffer* buffer,
646  const bool is_metadata_scan) {
647  auto column_type = omnisci_column->columnType;
648  if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
649  CHECK(column_type.get_compression() == kENCODING_DATE_IN_DAYS);
650  if (is_metadata_scan) {
651  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t>(
652  omnisci_column, parquet_column, buffer);
653  } else {
654  if (column_type.get_comp_param() ==
655  0) { // DATE ENCODING FIXED (32) uses comp param 0
656  return create_parquet_date_from_timestamp_encoder_with_types<int32_t, int64_t>(
657  omnisci_column, parquet_column, buffer);
658  } else if (column_type.get_comp_param() == 16) {
659  return create_parquet_date_from_timestamp_encoder_with_types<int16_t, int64_t>(
660  omnisci_column, parquet_column, buffer);
661  } else {
662  UNREACHABLE();
663  }
664  }
665  }
666  return {};
667 }
668 
669 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
670  const ColumnDescriptor* omnisci_column,
671  const parquet::ColumnDescriptor* parquet_column,
672  AbstractBuffer* buffer,
673  const bool is_metadata_scan) {
674  auto column_type = omnisci_column->columnType;
675  if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
676  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
677  if (is_metadata_scan) {
678  return std::make_shared<ParquetDateInSecondsEncoder>(
679  buffer, omnisci_column, parquet_column);
680  } else {
681  if (column_type.get_comp_param() ==
682  0) { // DATE ENCODING FIXED (32) uses comp param 0
683  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
684  buffer, omnisci_column, parquet_column);
685  } else if (column_type.get_comp_param() == 16) {
686  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
687  buffer, omnisci_column, parquet_column);
688  } else {
689  UNREACHABLE();
690  }
691  }
692  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
693  return std::make_shared<ParquetDateInSecondsEncoder>(
694  buffer, omnisci_column, parquet_column);
695  } else {
696  UNREACHABLE();
697  }
698  }
699  return {};
700 }
701 
702 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
703  const ColumnDescriptor* omnisci_column,
704  const parquet::ColumnDescriptor* parquet_column,
705  const Chunk_NS::Chunk& chunk,
706  StringDictionary* string_dictionary,
707  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) {
708  auto column_type = omnisci_column->columnType;
709  if (!is_valid_parquet_string(parquet_column) ||
710  !omnisci_column->columnType.is_string()) {
711  return {};
712  }
713  if (column_type.get_compression() == kENCODING_NONE) {
714  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
715  chunk.getIndexBuf());
716  } else if (column_type.get_compression() == kENCODING_DICT) {
717  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
718  auto& logical_chunk_metadata = chunk_metadata.back();
719  logical_chunk_metadata->sqlType = omnisci_column->columnType;
720  switch (column_type.get_size()) {
721  case 1:
722  return std::make_shared<ParquetStringEncoder<uint8_t>>(
723  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
724  case 2:
725  return std::make_shared<ParquetStringEncoder<uint16_t>>(
726  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
727  case 4:
728  return std::make_shared<ParquetStringEncoder<int32_t>>(
729  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
730  default:
731  UNREACHABLE();
732  }
733  } else {
734  UNREACHABLE();
735  }
736  return {};
737 }
738 
739 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
740  const ColumnDescriptor* omnisci_column,
741  const parquet::ColumnDescriptor* parquet_column,
742  std::list<Chunk_NS::Chunk>& chunks,
743  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
744  const bool is_metadata_scan) {
745  auto column_type = omnisci_column->columnType;
746  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
747  return {};
748  }
749  if (is_metadata_scan) {
750  return std::make_shared<ParquetGeospatialEncoder>();
751  }
752  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
753  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
754  auto& chunk_metadata_ptr = chunk_metadata.back();
755  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
756  }
757  return std::make_shared<ParquetGeospatialEncoder>(
758  parquet_column, chunks, chunk_metadata);
759 }
760 
761 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
762 // `create_parquet_array_encoder` each make use of each other, so
763 // one of the two functions must have a forward declaration
764 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
765  const ColumnDescriptor* omnisci_column,
766  const parquet::ColumnDescriptor* parquet_column,
767  std::list<Chunk_NS::Chunk>& chunks,
768  StringDictionary* string_dictionary,
769  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
770  const bool is_metadata_scan);
771 
801 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
802  const ColumnDescriptor* omnisci_column,
803  const parquet::ColumnDescriptor* parquet_column,
804  std::list<Chunk_NS::Chunk>& chunks,
805  StringDictionary* string_dictionary,
806  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
807  const bool is_metadata_scan = false) {
808  auto buffer = chunks.empty() ? nullptr : chunks.begin()->getBuffer();
809  if (auto encoder = create_parquet_geospatial_encoder(
810  omnisci_column, parquet_column, chunks, chunk_metadata, is_metadata_scan)) {
811  return encoder;
812  }
813  if (auto encoder = create_parquet_array_encoder(omnisci_column,
814  parquet_column,
815  chunks,
816  string_dictionary,
817  chunk_metadata,
818  is_metadata_scan)) {
819  return encoder;
820  }
821  if (auto encoder = create_parquet_decimal_encoder(
822  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
823  return encoder;
824  }
825  if (auto encoder = create_parquet_integral_encoder(
826  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
827  return encoder;
828  }
829  if (auto encoder =
830  create_parquet_floating_point_encoder(omnisci_column, parquet_column, buffer)) {
831  return encoder;
832  }
833  if (auto encoder = create_parquet_timestamp_encoder(
834  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
835  return encoder;
836  }
837  if (auto encoder =
838  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
839  return encoder;
840  }
841  if (auto encoder = create_parquet_time_encoder(
842  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
843  return encoder;
844  }
846  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
847  return encoder;
848  }
849  if (auto encoder = create_parquet_date_encoder(
850  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
851  return encoder;
852  }
853  if (auto encoder = create_parquet_string_encoder(
854  omnisci_column,
855  parquet_column,
856  chunks.empty() ? Chunk_NS::Chunk{} : *chunks.begin(),
857  string_dictionary,
858  chunk_metadata)) {
859  return encoder;
860  }
861  UNREACHABLE();
862  return {};
863 }
864 
869 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
870  const ColumnDescriptor* omnisci_column,
871  const parquet::ColumnDescriptor* parquet_column) {
872  std::list<Chunk_NS::Chunk> chunks;
873  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
874  return create_parquet_encoder(
875  omnisci_column, parquet_column, chunks, nullptr, chunk_metadata, true);
876 }
877 
878 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
879  const ColumnDescriptor* omnisci_column,
880  const parquet::ColumnDescriptor* parquet_column,
881  std::list<Chunk_NS::Chunk>& chunks,
882  StringDictionary* string_dictionary,
883  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
884  const bool is_metadata_scan) {
885  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
886  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
887  return {};
888  }
889  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
890  get_sub_type_column_descriptor(omnisci_column);
891  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
892  parquet_column,
893  chunks,
894  string_dictionary,
895  chunk_metadata,
896  is_metadata_scan);
897  CHECK(encoder.get());
898  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
899  CHECK(scalar_encoder);
900  if (omnisci_column->columnType.is_fixlen_array()) {
901  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
902  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
903  scalar_encoder,
904  omnisci_column);
905  } else {
906  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
907  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
908  is_metadata_scan ? nullptr : chunks.begin()->getIndexBuf(),
909  scalar_encoder,
910  omnisci_column);
911  }
912  return encoder;
913 }
914 
916  const ColumnDescriptor* omnisci_column_descriptor,
917  const parquet::ColumnDescriptor* parquet_column_descriptor) {
918  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
919  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
920  throw std::runtime_error(
921  "Unsupported mapping detected. Column '" + parquet_column_descriptor->name() +
922  "' detected to be a parquet list but OmniSci mapped column '" +
923  omnisci_column_descriptor->columnName + "' is not an array.");
924  }
925  if (is_valid_parquet_list) {
926  if (parquet_column_descriptor->max_repetition_level() != 1 ||
927  parquet_column_descriptor->max_definition_level() != 3) {
928  throw std::runtime_error(
929  "Incorrect schema max repetition level detected in column '" +
930  parquet_column_descriptor->name() +
931  "'. Expected a max repetition level of 1 and max definition level of 3 for "
932  "list column but column has a max "
933  "repetition level of " +
934  std::to_string(parquet_column_descriptor->max_repetition_level()) +
935  " and a max definition level of " +
936  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
937  }
938  } else {
939  if (parquet_column_descriptor->max_repetition_level() != 0 ||
940  parquet_column_descriptor->max_definition_level() != 1) {
941  throw std::runtime_error(
942  "Incorrect schema max repetition level detected in column '" +
943  parquet_column_descriptor->name() +
944  "'. Expected a max repetition level of 0 and max definition level of 1 for "
945  "flat column but column has a max "
946  "repetition level of " +
947  std::to_string(parquet_column_descriptor->max_repetition_level()) +
948  " and a max definition level of " +
949  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
950  }
951  }
952 }
953 
954 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
955  const parquet::ColumnDescriptor* parquet_column,
956  std::vector<int8_t>& values) {
957  auto max_type_byte_size =
958  std::max(omnisci_column->columnType.get_size(),
959  parquet::GetTypeByteSize(parquet_column->physical_type()));
960  size_t values_size =
962  values.resize(values_size);
963 }
964 
965 std::list<std::unique_ptr<ChunkMetadata>> append_row_groups(
966  const std::vector<RowGroupInterval>& row_group_intervals,
967  const int parquet_column_index,
968  const ColumnDescriptor* column_descriptor,
969  std::list<Chunk_NS::Chunk>& chunks,
970  StringDictionary* string_dictionary,
971  std::shared_ptr<arrow::fs::FileSystem> file_system) {
972  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
973  // `def_levels` and `rep_levels` below are used to store the read definition
974  // and repetition levels of the Dremel encoding implemented by the Parquet
975  // format
976  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
977  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
978  std::vector<int8_t> values;
979 
980  CHECK(!row_group_intervals.empty());
981  std::unique_ptr<parquet::arrow::FileReader> first_file_reader;
982  const auto& first_file_path = row_group_intervals.front().file_path;
983  open_parquet_table(first_file_path, first_file_reader, file_system);
984  auto first_parquet_column_descriptor =
985  get_column_descriptor(first_file_reader.get(), parquet_column_index);
986  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
987  auto encoder = create_parquet_encoder(column_descriptor,
988  first_parquet_column_descriptor,
989  chunks,
990  string_dictionary,
991  chunk_metadata);
992  CHECK(encoder.get());
993 
994  for (const auto& row_group_interval : row_group_intervals) {
995  std::unique_ptr<parquet::arrow::FileReader> file_reader;
996  open_parquet_table(row_group_interval.file_path, file_reader, file_system);
997 
998  int num_row_groups, num_columns;
999  std::tie(num_row_groups, num_columns) = get_parquet_table_size(file_reader);
1000  CHECK(row_group_interval.start_index >= 0 &&
1001  row_group_interval.end_index < num_row_groups);
1002  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1003 
1004  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1005  auto parquet_column_descriptor =
1006  get_column_descriptor(file_reader.get(), parquet_column_index);
1007  validate_equal_column_descriptor(first_parquet_column_descriptor,
1008  parquet_column_descriptor,
1009  first_file_path,
1010  row_group_interval.file_path);
1011 
1013  parquet_column_descriptor);
1014  int64_t values_read = 0;
1015  for (int row_group_index = row_group_interval.start_index;
1016  row_group_index <= row_group_interval.end_index;
1017  ++row_group_index) {
1018  auto group_reader = parquet_reader->RowGroup(row_group_index);
1019  std::shared_ptr<parquet::ColumnReader> col_reader =
1020  group_reader->Column(parquet_column_index);
1021 
1022  try {
1023  while (col_reader->HasNext()) {
1024  int64_t levels_read =
1026  def_levels.data(),
1027  rep_levels.data(),
1028  reinterpret_cast<uint8_t*>(values.data()),
1029  &values_read,
1030  col_reader.get());
1031  encoder->appendData(def_levels.data(),
1032  rep_levels.data(),
1033  values_read,
1034  levels_read,
1035  !col_reader->HasNext(),
1036  values.data());
1037  }
1038  } catch (const std::exception& error) {
1040  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1041  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1042  "', Parquet file: '" + row_group_interval.file_path + "'");
1043  }
1044  }
1045  }
1046  return chunk_metadata;
1047 }
1048 
1049 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
1050  const parquet::ColumnDescriptor* parquet_column) {
1051  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
1052  parquet_column->logical_type().get())) {
1053  return omnisci_column->columnType.get_precision() ==
1054  decimal_logical_column->precision() &&
1055  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
1056  omnisci_column->columnType.is_decimal() &&
1057  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1058  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1059  }
1060  return false;
1061 }
1062 
1064  const parquet::ColumnDescriptor* parquet_column) {
1065  if (!omnisci_column->columnType.is_fp()) {
1066  return false;
1067  }
1068  // check if mapping is a valid coerced or non-coerced floating point mapping
1069  // with no annotation (floating point columns have no annotation in the
1070  // Parquet specification)
1071  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1072  return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
1073  (parquet_column->physical_type() == parquet::Type::FLOAT &&
1074  omnisci_column->columnType.get_type() == kFLOAT);
1075  }
1076  return false;
1077 }
1078 
1079 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
1080  const parquet::ColumnDescriptor* parquet_column) {
1081  if (!omnisci_column->columnType.is_integer()) {
1082  return false;
1083  }
1084  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1085  parquet_column->logical_type().get())) {
1086  CHECK(omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1087  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1088  const int bits_per_byte = 8;
1089  // unsigned types are permitted to map to a wider integral type in order to avoid
1090  // precision loss
1091  const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1092  return omnisci_column->columnType.get_size() * bits_per_byte <=
1093  int_logical_column->bit_width() * bit_widening_factor;
1094  }
1095  // check if mapping is a valid coerced or non-coerced integral mapping with no
1096  // annotation
1097  if ((omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1098  omnisci_column->columnType.get_compression() == kENCODING_FIXED)) {
1099  return (parquet_column->physical_type() == parquet::Type::INT64) ||
1100  (parquet_column->physical_type() == parquet::Type::INT32 &&
1101  omnisci_column->columnType.get_size() <= 4);
1102  }
1103  return false;
1104 }
1105 
1106 bool is_nanosecond_precision(const ColumnDescriptor* omnisci_column) {
1107  return omnisci_column->columnType.get_dimension() == 9;
1108 }
1109 
1111  const parquet::TimestampLogicalType* timestamp_logical_column) {
1112  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1113 }
1114 
1115 bool is_microsecond_precision(const ColumnDescriptor* omnisci_column) {
1116  return omnisci_column->columnType.get_dimension() == 6;
1117 }
1118 
1120  const parquet::TimestampLogicalType* timestamp_logical_column) {
1121  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1122 }
1123 
1124 bool is_millisecond_precision(const ColumnDescriptor* omnisci_column) {
1125  return omnisci_column->columnType.get_dimension() == 3;
1126 }
1127 
1129  const parquet::TimestampLogicalType* timestamp_logical_column) {
1130  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1131 }
1132 
1134  const parquet::ColumnDescriptor* parquet_column) {
1135  bool is_none_encoded_mapping =
1136  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
1137  (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1138  omnisci_column->columnType.get_type() == kBOOLEAN);
1139  return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1140 }
1141 
1143  const parquet::ColumnDescriptor* parquet_column) {
1144  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
1145  ((omnisci_column->columnType.get_compression() == kENCODING_NONE) ||
1146  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1147  omnisci_column->columnType.get_comp_param() == 32)))) {
1148  return false;
1149  }
1150  // check the annotated case
1151  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1152  parquet_column->logical_type().get())) {
1153  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1154  return omnisci_column->columnType.get_dimension() == 0 ||
1155  ((is_nanosecond_precision(omnisci_column) &&
1156  is_nanosecond_precision(timestamp_logical_column)) ||
1157  (is_microsecond_precision(omnisci_column) &&
1158  is_microsecond_precision(timestamp_logical_column)) ||
1159  (is_millisecond_precision(omnisci_column) &&
1160  is_millisecond_precision(timestamp_logical_column)));
1161  }
1162  if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
1163  return omnisci_column->columnType.get_dimension() == 0;
1164  }
1165  }
1166  // check the unannotated case
1167  if (parquet_column->logical_type()->is_none() &&
1168  ((parquet_column->physical_type() == parquet::Type::INT32 &&
1169  omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1170  omnisci_column->columnType.get_comp_param() == 32) ||
1171  parquet_column->physical_type() == parquet::Type::INT64)) {
1172  return true;
1173  }
1174  return false;
1175 }
1176 
1177 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
1178  const parquet::ColumnDescriptor* parquet_column) {
1179  if (!(omnisci_column->columnType.get_type() == kTIME &&
1180  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1181  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1182  omnisci_column->columnType.get_comp_param() == 32)))) {
1183  return false;
1184  }
1185  if (parquet_column->logical_type()->is_time()) {
1186  return true;
1187  }
1188  return false;
1189 }
1190 
1191 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
1192  const parquet::ColumnDescriptor* parquet_column) {
1193  if (!(omnisci_column->columnType.get_type() == kDATE &&
1194  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
1195  (omnisci_column->columnType.get_comp_param() ==
1196  0 // DATE ENCODING DAYS (32) specifies comp_param of 0
1197  || omnisci_column->columnType.get_comp_param() == 16)) ||
1198  omnisci_column->columnType.get_compression() ==
1199  kENCODING_NONE // for array types
1200  ))) {
1201  return false;
1202  }
1203  return parquet_column->logical_type()->is_date() ||
1204  parquet_column->logical_type()
1205  ->is_timestamp(); // to support TIMESTAMP -> DATE coercion
1206 }
1207 
1208 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
1209  const parquet::ColumnDescriptor* parquet_column) {
1210  return is_valid_parquet_string(parquet_column) &&
1211  omnisci_column->columnType.is_string() &&
1212  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1213  omnisci_column->columnType.get_compression() == kENCODING_DICT);
1214 }
1215 
1216 bool validate_array_mapping(const ColumnDescriptor* omnisci_column,
1217  const parquet::ColumnDescriptor* parquet_column) {
1218  if (is_valid_parquet_list_column(parquet_column) &&
1219  omnisci_column->columnType.is_array()) {
1220  auto omnisci_column_sub_type_column = get_sub_type_column_descriptor(omnisci_column);
1222  omnisci_column_sub_type_column.get(), parquet_column);
1223  }
1224  return false;
1225 }
1226 
1228  const parquet::ColumnDescriptor* parquet_column) {
1229  return is_valid_parquet_string(parquet_column) &&
1230  omnisci_column->columnType.is_geometry();
1231 }
1232 
1233 void validate_equal_schema(const parquet::arrow::FileReader* reference_file_reader,
1234  const parquet::arrow::FileReader* new_file_reader,
1235  const std::string& reference_file_path,
1236  const std::string& new_file_path) {
1237  const auto reference_num_columns =
1238  reference_file_reader->parquet_reader()->metadata()->num_columns();
1239  const auto new_num_columns =
1240  new_file_reader->parquet_reader()->metadata()->num_columns();
1241  if (reference_num_columns != new_num_columns) {
1242  throw std::runtime_error{"Parquet file \"" + new_file_path +
1243  "\" has a different schema. Please ensure that all Parquet "
1244  "files use the same schema. Reference Parquet file: \"" +
1245  reference_file_path + "\" has " +
1246  std::to_string(reference_num_columns) +
1247  " columns. New Parquet file \"" + new_file_path + "\" has " +
1248  std::to_string(new_num_columns) + " columns."};
1249  }
1250 
1251  for (int i = 0; i < reference_num_columns; i++) {
1252  validate_equal_column_descriptor(get_column_descriptor(reference_file_reader, i),
1253  get_column_descriptor(new_file_reader, i),
1254  reference_file_path,
1255  new_file_path);
1256  }
1257 }
1258 
1259 void validate_allowed_mapping(const parquet::ColumnDescriptor* parquet_column,
1260  const ColumnDescriptor* omnisci_column) {
1261  parquet::Type::type physical_type = parquet_column->physical_type();
1262  auto logical_type = parquet_column->logical_type();
1263  bool allowed_type =
1264  LazyParquetChunkLoader::isColumnMappingSupported(omnisci_column, parquet_column);
1265  if (!allowed_type) {
1266  if (logical_type->is_timestamp()) {
1267  auto timestamp_type =
1268  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
1269  CHECK(timestamp_type);
1270 
1271  if (!timestamp_type->is_adjusted_to_utc()) {
1272  LOG(WARNING) << "Non-UTC timezone specified in Parquet file for column \""
1273  << omnisci_column->columnName
1274  << "\". Only UTC timezone is currently supported.";
1275  }
1276  }
1277  std::string parquet_type;
1278  if (parquet_column->logical_type()->is_none()) {
1279  parquet_type = parquet::TypeToString(physical_type);
1280  } else {
1281  parquet_type = logical_type->ToString();
1282  }
1283  std::string omnisci_type = omnisci_column->columnType.get_type_name();
1284  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
1285  "\" to OmniSci type \"" + omnisci_type +
1286  "\" is not allowed. Please use an appropriate column type."};
1287  }
1288 }
1289 
1291  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1292  const std::string& file_path,
1293  const ForeignTableSchema& schema) {
1294  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
1296  schema.numLogicalColumns(), file_metadata->num_columns(), file_path);
1297  }
1298 }
1299 
1300 void throw_missing_metadata_error(const int row_group_index,
1301  const int column_index,
1302  const std::string& file_path) {
1303  throw std::runtime_error{
1304  "Statistics metadata is required for all row groups. Metadata is missing for "
1305  "row group index: " +
1306  std::to_string(row_group_index) +
1307  ", column index: " + std::to_string(column_index) + ", file path: " + file_path};
1308 }
1309 
1311  const int64_t max_row_group_size,
1312  const int fragment_size,
1313  const std::string& file_path) {
1314  throw std::runtime_error{
1315  "Parquet file has a row group size that is larger than the fragment size. "
1316  "Please set the table fragment size to a number that is larger than the "
1317  "row group size. Row group index: " +
1318  std::to_string(row_group_index) +
1319  ", row group size: " + std::to_string(max_row_group_size) +
1320  ", fragment size: " + std::to_string(fragment_size) + ", file path: " + file_path};
1321 }
1322 
1324  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1325  const std::string& file_path,
1326  const ForeignTableSchema& schema) {
1327  auto column_it = schema.getLogicalColumns().begin();
1328  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1329  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1330  try {
1331  validate_allowed_mapping(descr, *column_it);
1332  } catch (std::runtime_error& e) {
1333  std::stringstream error_message;
1334  error_message << e.what() << " Parquet column: " << descr->name()
1335  << ", OmniSci column: " << (*column_it)->columnName
1336  << ", Parquet file: " << file_path << ".";
1337  throw std::runtime_error(error_message.str());
1338  }
1339 
1340  auto fragment_size = schema.getForeignTable()->maxFragRows;
1341  int64_t max_row_group_size = 0;
1342  int max_row_group_index = 0;
1343  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
1344  auto group_metadata = file_metadata->RowGroup(r);
1345  auto num_rows = group_metadata->num_rows();
1346  if (num_rows > max_row_group_size) {
1347  max_row_group_size = num_rows;
1348  max_row_group_index = r;
1349  }
1350 
1351  auto column_chunk = group_metadata->ColumnChunk(i);
1352  bool contains_metadata = column_chunk->is_stats_set();
1353  if (contains_metadata) {
1354  auto stats = column_chunk->statistics();
1355  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1356  if (!stats->HasMinMax() && !is_all_nulls) {
1357  contains_metadata = false;
1358  }
1359  }
1360 
1361  if (!contains_metadata) {
1362  throw_missing_metadata_error(r, i, file_path);
1363  }
1364  }
1365 
1366  if (max_row_group_size > fragment_size) {
1368  max_row_group_index, max_row_group_size, fragment_size, file_path);
1369  }
1370  }
1371 }
1372 
1374  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1375  const std::string& file_path,
1376  const ForeignTableSchema& schema) {
1377  validate_number_of_columns(file_metadata, file_path, schema);
1378 
1379  validate_column_mapping_and_row_group_metadata(file_metadata, file_path, schema);
1380 }
1381 
1383  const std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1384  const RowGroupInterval& row_group_interval,
1385  const std::unique_ptr<parquet::arrow::FileReader>& reader,
1386  const ForeignTableSchema& schema,
1387  std::list<RowGroupMetadata>& row_group_metadata) {
1388  auto column_interval =
1389  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1390  schema.getLogicalAndPhysicalColumns().back()->columnId};
1391 
1392  auto file_metadata = reader->parquet_reader()->metadata();
1393  for (int row_group = row_group_interval.start_index;
1394  row_group <= row_group_interval.end_index;
1395  ++row_group) {
1396  auto& row_group_metadata_item = row_group_metadata.emplace_back();
1397  row_group_metadata_item.row_group_index = row_group;
1398  row_group_metadata_item.file_path = row_group_interval.file_path;
1399 
1400  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1401  file_metadata->RowGroup(row_group);
1402 
1403  for (int column_id = column_interval.start; column_id <= column_interval.end;
1404  column_id++) {
1405  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1406  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1407  auto encoder_map_iter =
1408  encoder_map.find(schema.getLogicalColumn(column_id)->columnId);
1409  CHECK(encoder_map_iter != encoder_map.end());
1410  try {
1411  auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1412  group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1413  row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1414  } catch (const std::exception& e) {
1415  std::stringstream error_message;
1416  error_message << e.what() << " in row group " << row_group << " of Parquet file '"
1417  << row_group_interval.file_path << "'.";
1418  throw std::runtime_error(error_message.str());
1419  }
1420  }
1421  }
1422 }
1423 
1424 void populate_encoder_map(std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1425  const Interval<ColumnType>& column_interval,
1426  const ForeignTableSchema& schema,
1427  const std::unique_ptr<parquet::arrow::FileReader>& reader) {
1428  auto file_metadata = reader->parquet_reader()->metadata();
1429  for (int column_id = column_interval.start; column_id <= column_interval.end;
1430  column_id++) {
1431  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1432  auto parquet_column_descriptor =
1433  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1434  encoder_map[column_id] =
1435  create_parquet_encoder(column_descriptor, parquet_column_descriptor);
1436  column_id += column_descriptor->columnType.get_physical_cols();
1437  }
1438 }
1439 
1440 } // namespace
1441 
1443  const ColumnDescriptor* omnisci_column,
1444  const parquet::ColumnDescriptor* parquet_column) {
1445  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1446  return true;
1447  }
1448  if (validate_array_mapping(omnisci_column, parquet_column)) {
1449  return true;
1450  }
1451  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1452  return true;
1453  }
1454  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1455  return true;
1456  }
1457  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1458  return true;
1459  }
1460  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1461  return true;
1462  }
1463  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1464  return true;
1465  }
1466  if (validate_time_mapping(omnisci_column, parquet_column)) {
1467  return true;
1468  }
1469  if (validate_date_mapping(omnisci_column, parquet_column)) {
1470  return true;
1471  }
1472  if (validate_string_mapping(omnisci_column, parquet_column)) {
1473  return true;
1474  }
1475  return false;
1476 }
1477 
1479  std::shared_ptr<arrow::fs::FileSystem> file_system)
1480  : file_system_(file_system) {}
1481 
1482 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
1483  const std::vector<RowGroupInterval>& row_group_intervals,
1484  const int parquet_column_index,
1485  std::list<Chunk_NS::Chunk>& chunks,
1486  StringDictionary* string_dictionary) {
1487  CHECK(!chunks.empty());
1488  auto const& chunk = *chunks.begin();
1489  auto column_descriptor = chunk.getColumnDesc();
1490  auto buffer = chunk.getBuffer();
1491  CHECK(buffer);
1492 
1493  try {
1494  auto metadata = append_row_groups(row_group_intervals,
1495  parquet_column_index,
1496  column_descriptor,
1497  chunks,
1498  string_dictionary,
1499  file_system_);
1500  return metadata;
1501  } catch (const std::exception& error) {
1502  throw ForeignStorageException(error.what());
1503  }
1504 
1505  return {};
1506 }
1507 
1508 std::list<RowGroupMetadata> LazyParquetChunkLoader::metadataScan(
1509  const std::set<std::string>& file_paths,
1510  const ForeignTableSchema& schema) {
1511  std::list<RowGroupMetadata> row_group_metadata;
1512  auto column_interval =
1513  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1514  schema.getLogicalAndPhysicalColumns().back()->columnId};
1515  CHECK(!file_paths.empty());
1516  std::unique_ptr<parquet::arrow::FileReader> first_file_reader;
1517  const auto& first_file_path = *file_paths.begin();
1518  open_parquet_table(first_file_path, first_file_reader, file_system_);
1519  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1520  for (const auto& file_path : file_paths) {
1521  std::unique_ptr<parquet::arrow::FileReader> reader;
1522  open_parquet_table(file_path, reader, file_system_);
1524  first_file_reader.get(), reader.get(), first_file_path, file_path);
1525  int num_row_groups = get_parquet_table_size(reader).first;
1526  auto row_group_interval = RowGroupInterval{file_path, 0, num_row_groups - 1};
1527  validate_parquet_metadata(reader->parquet_reader()->metadata(), file_path, schema);
1528  if (file_path == first_file_path) {
1529  populate_encoder_map(encoder_map, column_interval, schema, first_file_reader);
1530  }
1532  encoder_map, row_group_interval, reader, schema, row_group_metadata);
1533  }
1534  return row_group_metadata;
1535 }
1536 
1537 } // namespace foreign_storage
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
void validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:106
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:48
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
void metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const std::unique_ptr< parquet::arrow::FileReader > &reader, const ForeignTableSchema &schema, std::list< RowGroupMetadata > &row_group_metadata)
#define LOG(tag)
Definition: Logger.h:188
bool is_fp() const
Definition: sqltypes.h:482
HOST DEVICE int get_scale() const
Definition: sqltypes.h:316
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
#define UNREACHABLE()
Definition: Logger.h:241
void throw_row_group_larger_than_fragment_size_error(const int row_group_index, const int64_t max_row_group_size, const int fragment_size, const std::string &file_path)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
std::pair< int, int > get_parquet_table_size(const std::unique_ptr< parquet::arrow::FileReader > &reader)
std::string to_string(char const *&&v)
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system)
int getParquetColumnIndex(const int column_id) const
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ColumnDescriptor * getLogicalColumn(const int column_id) const
bool is_fixlen_array() const
Definition: sqltypes.h:488
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
bool is_integer() const
Definition: sqltypes.h:480
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
int get_precision() const
Definition: sqltypes.h:314
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:52
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:319
AbstractBuffer * getBuffer() const
Definition: Chunk.h:104
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:313
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::string get_type_name() const
Definition: sqltypes.h:414
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
void populate_encoder_map(std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const std::unique_ptr< parquet::arrow::FileReader > &reader)
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:320
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:490
const ForeignTable * getForeignTable() const
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)
std::list< std::unique_ptr< ChunkMetadata > > append_row_groups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::shared_ptr< arrow::fs::FileSystem > file_system)
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
Definition: sqltypes.h:44
void validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:478
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool is_decimal() const
Definition: sqltypes.h:481
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
std::string columnName
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:486
std::list< RowGroupMetadata > metadataScan(const std::set< std::string > &file_paths, const ForeignTableSchema &schema)
Perform a metadata scan for the paths specified.
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const