OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/statistics.h>
26 #include <parquet/types.h>
27 
31 #include "ParquetDecimalEncoder.h"
35 #include "ParquetStringEncoder.h"
37 #include "ParquetTimeEncoder.h"
40 
41 namespace foreign_storage {
42 
43 namespace {
44 
45 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
46  return (parquet_column->logical_type()->is_none() &&
47  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
48  parquet_column->logical_type()->is_string();
49 }
50 
87 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
88  const parquet::schema::Node* node = parquet_column->schema_node().get();
89  if ((node->name() != "element" && node->name() != "item") ||
90  !(node->is_required() ||
91  node->is_optional())) { // ensure first innermost node is named "element"
92  // which is required by the parquet specification;
93  // however testing shows that pyarrow generates this
94  // column with the name of "item"
95  // this field must be either required or optional
96  return false;
97  }
98  node = node->parent();
99  if (!node) { // required nested structure
100  return false;
101  }
102  if (node->name() != "list" || !node->is_repeated() ||
103  !node->is_group()) { // ensure second innermost node is named "list" which is
104  // a repeated group; this is
105  // required by the parquet specification
106  return false;
107  }
108  node = node->parent();
109  if (!node) { // required nested structure
110  return false;
111  }
112  if (!node->logical_type()->is_list() ||
113  !(node->is_optional() ||
114  node->is_required())) { // ensure third outermost node has logical type LIST
115  // which is either optional or required; this is required
116  // by the parquet specification
117  return false;
118  }
119  node =
120  node->parent(); // this must now be the root node of schema which is required by
121  // FSI (lists can not be embedded into a deeper nested structure)
122  if (!node) { // required nested structure
123  return false;
124  }
125  node = node->parent();
126  if (node) { // implies the previous node was not the root node
127  return false;
128  }
129  return true;
130 }
131 
132 template <typename V>
133 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
134  const ColumnDescriptor* column_descriptor,
135  const parquet::ColumnDescriptor* parquet_column_descriptor,
136  AbstractBuffer* buffer) {
137  switch (parquet_column_descriptor->physical_type()) {
138  case parquet::Type::INT32:
139  return std::make_shared<ParquetDecimalEncoder<V, int32_t>>(
140  buffer, column_descriptor, parquet_column_descriptor);
141  case parquet::Type::INT64:
142  return std::make_shared<ParquetDecimalEncoder<V, int64_t>>(
143  buffer, column_descriptor, parquet_column_descriptor);
144  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
145  return std::make_shared<ParquetDecimalEncoder<V, parquet::FixedLenByteArray>>(
146  buffer, column_descriptor, parquet_column_descriptor);
147  case parquet::Type::BYTE_ARRAY:
148  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray>>(
149  buffer, column_descriptor, parquet_column_descriptor);
150  default:
151  UNREACHABLE();
152  }
153  return {};
154 }
155 
156 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
157  const ColumnDescriptor* omnisci_column,
158  const parquet::ColumnDescriptor* parquet_column,
159  AbstractBuffer* buffer,
160  const bool is_metadata_scan) {
161  if (parquet_column->logical_type()->is_decimal()) {
162  if (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
163  is_metadata_scan) {
164  return create_parquet_decimal_encoder_with_omnisci_type<int64_t>(
165  omnisci_column, parquet_column, buffer);
166  } else if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
167  switch (omnisci_column->columnType.get_comp_param()) {
168  case 16:
169  return create_parquet_decimal_encoder_with_omnisci_type<int16_t>(
170  omnisci_column, parquet_column, buffer);
171  case 32:
172  return create_parquet_decimal_encoder_with_omnisci_type<int32_t>(
173  omnisci_column, parquet_column, buffer);
174  default:
175  UNREACHABLE();
176  }
177  } else {
178  UNREACHABLE();
179  }
180  }
181  return {};
182 }
183 
198 template <typename V, typename T, typename U>
199 std::shared_ptr<ParquetEncoder>
201  AbstractBuffer* buffer,
202  const size_t omnisci_data_type_byte_size,
203  const size_t parquet_data_type_byte_size,
204  const bool is_signed) {
205  if (is_signed) {
206  return std::make_shared<ParquetFixedLengthEncoder<V, T>>(
207  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
208  } else {
209  return std::make_shared<ParquetUnsignedFixedLengthEncoder<V, T, U>>(
210  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size);
211  }
212 }
213 
233 template <typename V>
235  AbstractBuffer* buffer,
236  const size_t omnisci_data_type_byte_size,
237  const size_t parquet_data_type_byte_size,
238  const int bit_width,
239  const bool is_signed) {
240  switch (bit_width) {
241  case 8:
243  int32_t,
244  uint8_t>(
245  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
246  case 16:
248  int32_t,
249  uint16_t>(
250  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
251  case 32:
253  int32_t,
254  uint32_t>(
255  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
256  case 64:
258  int64_t,
259  uint64_t>(
260  buffer, omnisci_data_type_byte_size, parquet_data_type_byte_size, is_signed);
261  default:
262  UNREACHABLE();
263  }
264  return {};
265 }
266 
267 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
268  const ColumnDescriptor* omnisci_column,
269  const parquet::ColumnDescriptor* parquet_column,
270  AbstractBuffer* buffer,
271  const bool is_metadata_scan) {
272  auto column_type = omnisci_column->columnType;
273  auto physical_type = parquet_column->physical_type();
274 
275  int bit_width = -1;
276  int is_signed = false;
277  // handle the integral case with no Parquet annotation
278  if (parquet_column->logical_type()->is_none() && column_type.is_integer()) {
279  if (physical_type == parquet::Type::INT32) {
280  bit_width = 32;
281  } else if (physical_type == parquet::Type::INT64) {
282  bit_width = 64;
283  } else {
284  UNREACHABLE();
285  }
286  is_signed = true;
287  }
288  // handle the integral case with Parquet annotation
289  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
290  parquet_column->logical_type().get())) {
291  bit_width = int_logical_column->bit_width();
292  is_signed = int_logical_column->is_signed();
293  }
294 
295  if (bit_width == -1) { // no valid logical type (with or without annotation) found
296  return {};
297  }
298 
299  const size_t omnisci_data_type_byte_size = column_type.get_size();
300  const size_t parquet_data_type_byte_size = parquet::GetTypeByteSize(physical_type);
301 
302  switch (omnisci_data_type_byte_size) {
303  case 8:
304  CHECK(column_type.get_compression() == kENCODING_NONE);
305  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
306  buffer,
307  omnisci_data_type_byte_size,
308  parquet_data_type_byte_size,
309  bit_width,
310  is_signed);
311  case 4:
312  if (is_metadata_scan && column_type.get_type() == kBIGINT) {
313  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
314  buffer,
315  omnisci_data_type_byte_size,
316  parquet_data_type_byte_size,
317  bit_width,
318  is_signed);
319  }
320  return create_parquet_integral_encoder_with_omnisci_type<int32_t>(
321  buffer,
322  omnisci_data_type_byte_size,
323  parquet_data_type_byte_size,
324  bit_width,
325  is_signed);
326  case 2:
327  if (is_metadata_scan) {
328  switch (column_type.get_type()) {
329  case kBIGINT:
330  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
331  buffer,
332  omnisci_data_type_byte_size,
333  parquet_data_type_byte_size,
334  bit_width,
335  is_signed);
336  case kINT:
337  return create_parquet_integral_encoder_with_omnisci_type<int32_t>(
338  buffer,
339  omnisci_data_type_byte_size,
340  parquet_data_type_byte_size,
341  bit_width,
342  is_signed);
343  case kSMALLINT:
344  break;
345  default:
346  UNREACHABLE();
347  }
348  }
349  return create_parquet_integral_encoder_with_omnisci_type<int16_t>(
350  buffer,
351  omnisci_data_type_byte_size,
352  parquet_data_type_byte_size,
353  bit_width,
354  is_signed);
355  case 1:
356  if (is_metadata_scan) {
357  switch (column_type.get_type()) {
358  case kBIGINT:
359  return create_parquet_integral_encoder_with_omnisci_type<int64_t>(
360  buffer,
361  omnisci_data_type_byte_size,
362  parquet_data_type_byte_size,
363  bit_width,
364  is_signed);
365  case kINT:
366  return create_parquet_integral_encoder_with_omnisci_type<int32_t>(
367  buffer,
368  omnisci_data_type_byte_size,
369  parquet_data_type_byte_size,
370  bit_width,
371  is_signed);
372  case kSMALLINT:
373  return create_parquet_integral_encoder_with_omnisci_type<int16_t>(
374  buffer,
375  omnisci_data_type_byte_size,
376  parquet_data_type_byte_size,
377  bit_width,
378  is_signed);
379  case kTINYINT:
380  break;
381  default:
382  UNREACHABLE();
383  }
384  }
385  return create_parquet_integral_encoder_with_omnisci_type<int8_t>(
386  buffer,
387  omnisci_data_type_byte_size,
388  parquet_data_type_byte_size,
389  bit_width,
390  is_signed);
391  default:
392  UNREACHABLE();
393  }
394  return {};
395 }
396 
397 std::shared_ptr<ParquetEncoder> create_parquet_floating_point_encoder(
398  const ColumnDescriptor* omnisci_column,
399  const parquet::ColumnDescriptor* parquet_column,
400  AbstractBuffer* buffer) {
401  auto column_type = omnisci_column->columnType;
402  if (!column_type.is_fp()) {
403  return {};
404  }
405  CHECK_EQ(column_type.get_compression(), kENCODING_NONE);
406  switch (column_type.get_type()) {
407  case kFLOAT:
408  switch (parquet_column->physical_type()) {
410  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
411  buffer, omnisci_column, parquet_column);
413  return std::make_shared<ParquetFixedLengthEncoder<float, double>>(
414  buffer, omnisci_column, parquet_column);
415  default:
416  UNREACHABLE();
417  }
418  case kDOUBLE:
419  CHECK(parquet_column->physical_type() == parquet::Type::DOUBLE);
420  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
421  buffer, omnisci_column, parquet_column);
422  default:
423  UNREACHABLE();
424  }
425  return {};
426 }
427 
428 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
429  const ColumnDescriptor* omnisci_column,
430  const parquet::ColumnDescriptor* parquet_column,
431  AbstractBuffer* buffer) {
432  auto column_type = omnisci_column->columnType;
433  if (parquet_column->logical_type()->is_none() &&
434  !omnisci_column->columnType.is_string()) { // boolean
435  if (column_type.get_compression() == kENCODING_NONE) {
436  switch (column_type.get_type()) {
437  case kBOOLEAN:
438  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
439  buffer, omnisci_column, parquet_column);
440  default:
441  UNREACHABLE();
442  }
443  } else {
444  UNREACHABLE();
445  }
446  }
447  return {};
448 }
449 
450 template <typename V, typename T>
451 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder_with_types(
452  const ColumnDescriptor* omnisci_column,
453  const parquet::ColumnDescriptor* parquet_column,
454  AbstractBuffer* buffer) {
455  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
456  parquet_column->logical_type().get())) {
457  switch (timestamp_logical_type->time_unit()) {
458  case parquet::LogicalType::TimeUnit::MILLIS:
459  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L>>(
460  buffer, omnisci_column, parquet_column);
461  case parquet::LogicalType::TimeUnit::MICROS:
462  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L>>(
463  buffer, omnisci_column, parquet_column);
464  case parquet::LogicalType::TimeUnit::NANOS:
465  return std::make_shared<ParquetTimestampEncoder<V, T, 1000L * 1000L * 1000L>>(
466  buffer, omnisci_column, parquet_column);
467  default:
468  UNREACHABLE();
469  }
470  } else {
471  UNREACHABLE();
472  }
473  return {};
474 }
475 
476 template <typename V, typename T>
478  const ColumnDescriptor* omnisci_column,
479  const parquet::ColumnDescriptor* parquet_column,
480  AbstractBuffer* buffer) {
481  if (auto timestamp_logical_type = dynamic_cast<const parquet::TimestampLogicalType*>(
482  parquet_column->logical_type().get())) {
483  switch (timestamp_logical_type->time_unit()) {
484  case parquet::LogicalType::TimeUnit::MILLIS:
485  return std::make_shared<
487  buffer, omnisci_column, parquet_column);
488  case parquet::LogicalType::TimeUnit::MICROS:
489  return std::make_shared<
491  buffer, omnisci_column, parquet_column);
492  case parquet::LogicalType::TimeUnit::NANOS:
493  return std::make_shared<
495  buffer, omnisci_column, parquet_column);
496  default:
497  UNREACHABLE();
498  }
499  } else {
500  UNREACHABLE();
501  }
502  return {};
503 }
504 
505 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
506  const ColumnDescriptor* omnisci_column,
507  const parquet::ColumnDescriptor* parquet_column,
508  AbstractBuffer* buffer,
509  const bool is_metadata_scan) {
510  auto column_type = omnisci_column->columnType;
511  auto precision = column_type.get_precision();
512  if (parquet_column->logical_type()->is_timestamp()) {
513  if (column_type.get_compression() == kENCODING_NONE) {
514  if (precision == 0) {
515  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t>(
516  omnisci_column, parquet_column, buffer);
517  } else {
518  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
519  buffer, omnisci_column, parquet_column);
520  }
521  } else if (column_type.get_compression() == kENCODING_FIXED) {
522  CHECK(column_type.get_comp_param() == 32);
523  if (is_metadata_scan) {
524  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t>(
525  omnisci_column, parquet_column, buffer);
526  } else {
527  return create_parquet_timestamp_encoder_with_types<int32_t, int64_t>(
528  omnisci_column, parquet_column, buffer);
529  }
530  }
531  } else if (parquet_column->logical_type()->is_none() && column_type.is_timestamp()) {
532  if (parquet_column->physical_type() == parquet::Type::INT32) {
533  CHECK(column_type.get_compression() == kENCODING_FIXED &&
534  column_type.get_comp_param() == 32);
535  if (is_metadata_scan) {
536  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int32_t>>(
537  buffer, omnisci_column, parquet_column);
538  } else {
539  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
540  buffer, omnisci_column, parquet_column);
541  }
542  } else if (parquet_column->physical_type() == parquet::Type::INT64) {
543  if (column_type.get_compression() == kENCODING_NONE) {
544  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
545  buffer, omnisci_column, parquet_column);
546  } else if (column_type.get_compression() == kENCODING_FIXED) {
547  CHECK(column_type.get_comp_param() == 32);
548  if (is_metadata_scan) {
549  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
550  buffer, omnisci_column, parquet_column);
551  } else {
552  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int64_t>>(
553  buffer, omnisci_column, parquet_column);
554  }
555  }
556  } else {
557  UNREACHABLE();
558  }
559  }
560  return {};
561 }
562 
563 template <typename V, typename T>
564 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder_with_types(
565  const ColumnDescriptor* omnisci_column,
566  const parquet::ColumnDescriptor* parquet_column,
567  AbstractBuffer* buffer) {
568  if (auto time_logical_type = dynamic_cast<const parquet::TimeLogicalType*>(
569  parquet_column->logical_type().get())) {
570  switch (time_logical_type->time_unit()) {
571  case parquet::LogicalType::TimeUnit::MILLIS:
572  return std::make_shared<ParquetTimeEncoder<V, T, 1000L>>(
573  buffer, omnisci_column, parquet_column);
574  case parquet::LogicalType::TimeUnit::MICROS:
575  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L>>(
576  buffer, omnisci_column, parquet_column);
577  case parquet::LogicalType::TimeUnit::NANOS:
578  return std::make_shared<ParquetTimeEncoder<V, T, 1000L * 1000L * 1000L>>(
579  buffer, omnisci_column, parquet_column);
580  default:
581  UNREACHABLE();
582  }
583  } else {
584  UNREACHABLE();
585  }
586  return {};
587 }
588 
589 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
590  const ColumnDescriptor* omnisci_column,
591  const parquet::ColumnDescriptor* parquet_column,
592  AbstractBuffer* buffer,
593  const bool is_metadata_scan) {
594  auto column_type = omnisci_column->columnType;
595  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
596  parquet_column->logical_type().get())) {
597  if (column_type.get_compression() == kENCODING_NONE) {
598  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
599  return create_parquet_time_encoder_with_types<int64_t, int32_t>(
600  omnisci_column, parquet_column, buffer);
601  } else {
602  return create_parquet_time_encoder_with_types<int64_t, int64_t>(
603  omnisci_column, parquet_column, buffer);
604  }
605  } else if (column_type.get_compression() == kENCODING_FIXED) {
606  if (is_metadata_scan) {
607  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
608  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
609  return create_parquet_time_encoder_with_types<int64_t, int32_t>(
610  omnisci_column, parquet_column, buffer);
611  } else {
612  CHECK(time_logical_column->time_unit() ==
613  parquet::LogicalType::TimeUnit::MICROS ||
614  time_logical_column->time_unit() ==
615  parquet::LogicalType::TimeUnit::NANOS);
616  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
617  return create_parquet_time_encoder_with_types<int64_t, int64_t>(
618  omnisci_column, parquet_column, buffer);
619  }
620  } else {
621  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
622  CHECK(parquet_column->physical_type() == parquet::Type::INT32);
623  return create_parquet_time_encoder_with_types<int32_t, int32_t>(
624  omnisci_column, parquet_column, buffer);
625  } else {
626  CHECK(time_logical_column->time_unit() ==
627  parquet::LogicalType::TimeUnit::MICROS ||
628  time_logical_column->time_unit() ==
629  parquet::LogicalType::TimeUnit::NANOS);
630  CHECK(parquet_column->physical_type() == parquet::Type::INT64);
631  return create_parquet_time_encoder_with_types<int32_t, int64_t>(
632  omnisci_column, parquet_column, buffer);
633  }
634  }
635  } else {
636  UNREACHABLE();
637  }
638  }
639  return {};
640 }
641 
642 std::shared_ptr<ParquetEncoder> create_parquet_date_from_timestamp_encoder(
643  const ColumnDescriptor* omnisci_column,
644  const parquet::ColumnDescriptor* parquet_column,
645  AbstractBuffer* buffer,
646  const bool is_metadata_scan) {
647  auto column_type = omnisci_column->columnType;
648  if (parquet_column->logical_type()->is_timestamp() && column_type.is_date()) {
649  CHECK(column_type.get_compression() == kENCODING_DATE_IN_DAYS);
650  if (is_metadata_scan) {
651  return create_parquet_timestamp_encoder_with_types<int64_t, int64_t>(
652  omnisci_column, parquet_column, buffer);
653  } else {
654  if (column_type.get_comp_param() ==
655  0) { // DATE ENCODING FIXED (32) uses comp param 0
656  return create_parquet_date_from_timestamp_encoder_with_types<int32_t, int64_t>(
657  omnisci_column, parquet_column, buffer);
658  } else if (column_type.get_comp_param() == 16) {
659  return create_parquet_date_from_timestamp_encoder_with_types<int16_t, int64_t>(
660  omnisci_column, parquet_column, buffer);
661  } else {
662  UNREACHABLE();
663  }
664  }
665  }
666  return {};
667 }
668 
669 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
670  const ColumnDescriptor* omnisci_column,
671  const parquet::ColumnDescriptor* parquet_column,
672  AbstractBuffer* buffer,
673  const bool is_metadata_scan) {
674  auto column_type = omnisci_column->columnType;
675  if (parquet_column->logical_type()->is_date() && column_type.is_date()) {
676  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
677  if (is_metadata_scan) {
678  return std::make_shared<ParquetDateInSecondsEncoder>(
679  buffer, omnisci_column, parquet_column);
680  } else {
681  if (column_type.get_comp_param() ==
682  0) { // DATE ENCODING FIXED (32) uses comp param 0
683  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
684  buffer, omnisci_column, parquet_column);
685  } else if (column_type.get_comp_param() == 16) {
686  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
687  buffer, omnisci_column, parquet_column);
688  } else {
689  UNREACHABLE();
690  }
691  }
692  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
693  return std::make_shared<ParquetDateInSecondsEncoder>(
694  buffer, omnisci_column, parquet_column);
695  } else {
696  UNREACHABLE();
697  }
698  }
699  return {};
700 }
701 
702 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
703  const ColumnDescriptor* omnisci_column,
704  const parquet::ColumnDescriptor* parquet_column,
705  const Chunk_NS::Chunk& chunk,
706  StringDictionary* string_dictionary,
707  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) {
708  auto column_type = omnisci_column->columnType;
709  if (!is_valid_parquet_string(parquet_column) ||
710  !omnisci_column->columnType.is_string()) {
711  return {};
712  }
713  if (column_type.get_compression() == kENCODING_NONE) {
714  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
715  chunk.getIndexBuf());
716  } else if (column_type.get_compression() == kENCODING_DICT) {
717  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
718  auto& logical_chunk_metadata = chunk_metadata.back();
719  logical_chunk_metadata->sqlType = omnisci_column->columnType;
720  switch (column_type.get_size()) {
721  case 1:
722  return std::make_shared<ParquetStringEncoder<uint8_t>>(
723  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
724  case 2:
725  return std::make_shared<ParquetStringEncoder<uint16_t>>(
726  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
727  case 4:
728  return std::make_shared<ParquetStringEncoder<int32_t>>(
729  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
730  default:
731  UNREACHABLE();
732  }
733  } else {
734  UNREACHABLE();
735  }
736  return {};
737 }
738 
739 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
740  const ColumnDescriptor* omnisci_column,
741  const parquet::ColumnDescriptor* parquet_column,
742  std::list<Chunk_NS::Chunk>& chunks,
743  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
744  const bool is_metadata_scan) {
745  auto column_type = omnisci_column->columnType;
746  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
747  return {};
748  }
749  if (is_metadata_scan) {
750  return std::make_shared<ParquetGeospatialEncoder>();
751  }
752  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
753  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
754  auto& chunk_metadata_ptr = chunk_metadata.back();
755  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
756  }
757  return std::make_shared<ParquetGeospatialEncoder>(
758  parquet_column, chunks, chunk_metadata);
759 }
760 
761 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
762 // `create_parquet_array_encoder` each make use of each other, so
763 // one of the two functions must have a forward declaration
764 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
765  const ColumnDescriptor* omnisci_column,
766  const parquet::ColumnDescriptor* parquet_column,
767  std::list<Chunk_NS::Chunk>& chunks,
768  StringDictionary* string_dictionary,
769  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
770  const bool is_metadata_scan);
771 
801 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
802  const ColumnDescriptor* omnisci_column,
803  const parquet::ColumnDescriptor* parquet_column,
804  std::list<Chunk_NS::Chunk>& chunks,
805  StringDictionary* string_dictionary,
806  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
807  const bool is_metadata_scan = false) {
808  auto buffer = chunks.empty() ? nullptr : chunks.begin()->getBuffer();
809  if (auto encoder = create_parquet_geospatial_encoder(
810  omnisci_column, parquet_column, chunks, chunk_metadata, is_metadata_scan)) {
811  return encoder;
812  }
813  if (auto encoder = create_parquet_array_encoder(omnisci_column,
814  parquet_column,
815  chunks,
816  string_dictionary,
817  chunk_metadata,
818  is_metadata_scan)) {
819  return encoder;
820  }
821  if (auto encoder = create_parquet_decimal_encoder(
822  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
823  return encoder;
824  }
825  if (auto encoder = create_parquet_integral_encoder(
826  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
827  return encoder;
828  }
829  if (auto encoder =
830  create_parquet_floating_point_encoder(omnisci_column, parquet_column, buffer)) {
831  return encoder;
832  }
833  if (auto encoder = create_parquet_timestamp_encoder(
834  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
835  return encoder;
836  }
837  if (auto encoder =
838  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
839  return encoder;
840  }
841  if (auto encoder = create_parquet_time_encoder(
842  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
843  return encoder;
844  }
846  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
847  return encoder;
848  }
849  if (auto encoder = create_parquet_date_encoder(
850  omnisci_column, parquet_column, buffer, is_metadata_scan)) {
851  return encoder;
852  }
853  if (auto encoder = create_parquet_string_encoder(
854  omnisci_column,
855  parquet_column,
856  chunks.empty() ? Chunk_NS::Chunk{} : *chunks.begin(),
857  string_dictionary,
858  chunk_metadata)) {
859  return encoder;
860  }
861  UNREACHABLE();
862  return {};
863 }
864 
869 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
870  const ColumnDescriptor* omnisci_column,
871  const parquet::ColumnDescriptor* parquet_column) {
872  std::list<Chunk_NS::Chunk> chunks;
873  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
874  return create_parquet_encoder(
875  omnisci_column, parquet_column, chunks, nullptr, chunk_metadata, true);
876 }
877 
878 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
879  const ColumnDescriptor* omnisci_column,
880  const parquet::ColumnDescriptor* parquet_column,
881  std::list<Chunk_NS::Chunk>& chunks,
882  StringDictionary* string_dictionary,
883  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata,
884  const bool is_metadata_scan) {
885  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
886  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
887  return {};
888  }
889  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
890  get_sub_type_column_descriptor(omnisci_column);
891  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
892  parquet_column,
893  chunks,
894  string_dictionary,
895  chunk_metadata,
896  is_metadata_scan);
897  CHECK(encoder.get());
898  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
899  CHECK(scalar_encoder);
900  if (omnisci_column->columnType.is_fixlen_array()) {
901  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
902  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
903  scalar_encoder,
904  omnisci_column);
905  } else {
906  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
907  is_metadata_scan ? nullptr : chunks.begin()->getBuffer(),
908  is_metadata_scan ? nullptr : chunks.begin()->getIndexBuf(),
909  scalar_encoder,
910  omnisci_column);
911  }
912  return encoder;
913 }
914 
916  const ColumnDescriptor* omnisci_column_descriptor,
917  const parquet::ColumnDescriptor* parquet_column_descriptor) {
918  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
919  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
920  throw std::runtime_error(
921  "Unsupported mapping detected. Column '" + parquet_column_descriptor->name() +
922  "' detected to be a parquet list but OmniSci mapped column '" +
923  omnisci_column_descriptor->columnName + "' is not an array.");
924  }
925  if (is_valid_parquet_list) {
926  if (parquet_column_descriptor->max_repetition_level() != 1 ||
927  parquet_column_descriptor->max_definition_level() != 3) {
928  throw std::runtime_error(
929  "Incorrect schema max repetition level detected in column '" +
930  parquet_column_descriptor->name() +
931  "'. Expected a max repetition level of 1 and max definition level of 3 for "
932  "list column but column has a max "
933  "repetition level of " +
934  std::to_string(parquet_column_descriptor->max_repetition_level()) +
935  " and a max definition level of " +
936  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
937  }
938  } else {
939  if (parquet_column_descriptor->max_repetition_level() != 0 ||
940  parquet_column_descriptor->max_definition_level() != 1) {
941  throw std::runtime_error(
942  "Incorrect schema max repetition level detected in column '" +
943  parquet_column_descriptor->name() +
944  "'. Expected a max repetition level of 0 and max definition level of 1 for "
945  "flat column but column has a max "
946  "repetition level of " +
947  std::to_string(parquet_column_descriptor->max_repetition_level()) +
948  " and a max definition level of " +
949  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
950  }
951  }
952 }
953 
954 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
955  const parquet::ColumnDescriptor* parquet_column,
956  std::vector<int8_t>& values) {
957  auto max_type_byte_size =
958  std::max(omnisci_column->columnType.get_size(),
959  parquet::GetTypeByteSize(parquet_column->physical_type()));
960  size_t values_size =
962  values.resize(values_size);
963 }
964 
965 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
966  const parquet::ColumnDescriptor* parquet_column) {
967  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
968  parquet_column->logical_type().get())) {
969  return omnisci_column->columnType.get_precision() ==
970  decimal_logical_column->precision() &&
971  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
972  omnisci_column->columnType.is_decimal() &&
973  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
974  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
975  }
976  return false;
977 }
978 
980  const parquet::ColumnDescriptor* parquet_column) {
981  if (!omnisci_column->columnType.is_fp()) {
982  return false;
983  }
984  // check if mapping is a valid coerced or non-coerced floating point mapping
985  // with no annotation (floating point columns have no annotation in the
986  // Parquet specification)
987  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
988  return (parquet_column->physical_type() == parquet::Type::DOUBLE) ||
989  (parquet_column->physical_type() == parquet::Type::FLOAT &&
990  omnisci_column->columnType.get_type() == kFLOAT);
991  }
992  return false;
993 }
994 
995 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
996  const parquet::ColumnDescriptor* parquet_column) {
997  if (!omnisci_column->columnType.is_integer()) {
998  return false;
999  }
1000  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
1001  parquet_column->logical_type().get())) {
1002  CHECK(omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1003  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
1004  const int bits_per_byte = 8;
1005  // unsigned types are permitted to map to a wider integral type in order to avoid
1006  // precision loss
1007  const int bit_widening_factor = int_logical_column->is_signed() ? 1 : 2;
1008  return omnisci_column->columnType.get_size() * bits_per_byte <=
1009  int_logical_column->bit_width() * bit_widening_factor;
1010  }
1011  // check if mapping is a valid coerced or non-coerced integral mapping with no
1012  // annotation
1013  if ((omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1014  omnisci_column->columnType.get_compression() == kENCODING_FIXED)) {
1015  return (parquet_column->physical_type() == parquet::Type::INT64) ||
1016  (parquet_column->physical_type() == parquet::Type::INT32 &&
1017  omnisci_column->columnType.get_size() <= 4);
1018  }
1019  return false;
1020 }
1021 
1022 bool is_nanosecond_precision(const ColumnDescriptor* omnisci_column) {
1023  return omnisci_column->columnType.get_dimension() == 9;
1024 }
1025 
1027  const parquet::TimestampLogicalType* timestamp_logical_column) {
1028  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::NANOS;
1029 }
1030 
1031 bool is_microsecond_precision(const ColumnDescriptor* omnisci_column) {
1032  return omnisci_column->columnType.get_dimension() == 6;
1033 }
1034 
1036  const parquet::TimestampLogicalType* timestamp_logical_column) {
1037  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MICROS;
1038 }
1039 
1040 bool is_millisecond_precision(const ColumnDescriptor* omnisci_column) {
1041  return omnisci_column->columnType.get_dimension() == 3;
1042 }
1043 
1045  const parquet::TimestampLogicalType* timestamp_logical_column) {
1046  return timestamp_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
1047 }
1048 
1050  const parquet::ColumnDescriptor* parquet_column) {
1051  bool is_none_encoded_mapping =
1052  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
1053  (parquet_column->physical_type() == parquet::Type::BOOLEAN &&
1054  omnisci_column->columnType.get_type() == kBOOLEAN);
1055  return parquet_column->logical_type()->is_none() && is_none_encoded_mapping;
1056 }
1057 
1059  const parquet::ColumnDescriptor* parquet_column) {
1060  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
1061  ((omnisci_column->columnType.get_compression() == kENCODING_NONE) ||
1062  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1063  omnisci_column->columnType.get_comp_param() == 32)))) {
1064  return false;
1065  }
1066  // check the annotated case
1067  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
1068  parquet_column->logical_type().get())) {
1069  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
1070  return omnisci_column->columnType.get_dimension() == 0 ||
1071  ((is_nanosecond_precision(omnisci_column) &&
1072  is_nanosecond_precision(timestamp_logical_column)) ||
1073  (is_microsecond_precision(omnisci_column) &&
1074  is_microsecond_precision(timestamp_logical_column)) ||
1075  (is_millisecond_precision(omnisci_column) &&
1076  is_millisecond_precision(timestamp_logical_column)));
1077  }
1078  if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
1079  return omnisci_column->columnType.get_dimension() == 0;
1080  }
1081  }
1082  // check the unannotated case
1083  if (parquet_column->logical_type()->is_none() &&
1084  ((parquet_column->physical_type() == parquet::Type::INT32 &&
1085  omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1086  omnisci_column->columnType.get_comp_param() == 32) ||
1087  parquet_column->physical_type() == parquet::Type::INT64)) {
1088  return true;
1089  }
1090  return false;
1091 }
1092 
1093 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
1094  const parquet::ColumnDescriptor* parquet_column) {
1095  if (!(omnisci_column->columnType.get_type() == kTIME &&
1096  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1097  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
1098  omnisci_column->columnType.get_comp_param() == 32)))) {
1099  return false;
1100  }
1101  if (parquet_column->logical_type()->is_time()) {
1102  return true;
1103  }
1104  return false;
1105 }
1106 
1107 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
1108  const parquet::ColumnDescriptor* parquet_column) {
1109  if (!(omnisci_column->columnType.get_type() == kDATE &&
1110  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
1111  (omnisci_column->columnType.get_comp_param() ==
1112  0 // DATE ENCODING DAYS (32) specifies comp_param of 0
1113  || omnisci_column->columnType.get_comp_param() == 16)) ||
1114  omnisci_column->columnType.get_compression() ==
1115  kENCODING_NONE // for array types
1116  ))) {
1117  return false;
1118  }
1119  return parquet_column->logical_type()->is_date() ||
1120  parquet_column->logical_type()
1121  ->is_timestamp(); // to support TIMESTAMP -> DATE coercion
1122 }
1123 
1124 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
1125  const parquet::ColumnDescriptor* parquet_column) {
1126  return is_valid_parquet_string(parquet_column) &&
1127  omnisci_column->columnType.is_string() &&
1128  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
1129  omnisci_column->columnType.get_compression() == kENCODING_DICT);
1130 }
1131 
1132 bool validate_array_mapping(const ColumnDescriptor* omnisci_column,
1133  const parquet::ColumnDescriptor* parquet_column) {
1134  if (is_valid_parquet_list_column(parquet_column) &&
1135  omnisci_column->columnType.is_array()) {
1136  auto omnisci_column_sub_type_column = get_sub_type_column_descriptor(omnisci_column);
1138  omnisci_column_sub_type_column.get(), parquet_column);
1139  }
1140  return false;
1141 }
1142 
1144  const parquet::ColumnDescriptor* parquet_column) {
1145  return is_valid_parquet_string(parquet_column) &&
1146  omnisci_column->columnType.is_geometry();
1147 }
1148 
1149 void validate_equal_schema(const parquet::arrow::FileReader* reference_file_reader,
1150  const parquet::arrow::FileReader* new_file_reader,
1151  const std::string& reference_file_path,
1152  const std::string& new_file_path) {
1153  const auto reference_num_columns =
1154  reference_file_reader->parquet_reader()->metadata()->num_columns();
1155  const auto new_num_columns =
1156  new_file_reader->parquet_reader()->metadata()->num_columns();
1157  if (reference_num_columns != new_num_columns) {
1158  throw std::runtime_error{"Parquet file \"" + new_file_path +
1159  "\" has a different schema. Please ensure that all Parquet "
1160  "files use the same schema. Reference Parquet file: \"" +
1161  reference_file_path + "\" has " +
1162  std::to_string(reference_num_columns) +
1163  " columns. New Parquet file \"" + new_file_path + "\" has " +
1164  std::to_string(new_num_columns) + " columns."};
1165  }
1166 
1167  for (int i = 0; i < reference_num_columns; i++) {
1169  get_column_descriptor(new_file_reader, i),
1170  reference_file_path,
1171  new_file_path);
1172  }
1173 }
1174 
1175 void validate_allowed_mapping(const parquet::ColumnDescriptor* parquet_column,
1176  const ColumnDescriptor* omnisci_column) {
1177  parquet::Type::type physical_type = parquet_column->physical_type();
1178  auto logical_type = parquet_column->logical_type();
1179  bool allowed_type =
1180  LazyParquetChunkLoader::isColumnMappingSupported(omnisci_column, parquet_column);
1181  if (!allowed_type) {
1182  if (logical_type->is_timestamp()) {
1183  auto timestamp_type =
1184  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
1185  CHECK(timestamp_type);
1186 
1187  if (!timestamp_type->is_adjusted_to_utc()) {
1188  LOG(WARNING) << "Non-UTC timezone specified in Parquet file for column \""
1189  << omnisci_column->columnName
1190  << "\". Only UTC timezone is currently supported.";
1191  }
1192  }
1193  std::string parquet_type;
1194  if (parquet_column->logical_type()->is_none()) {
1195  parquet_type = parquet::TypeToString(physical_type);
1196  } else {
1197  parquet_type = logical_type->ToString();
1198  }
1199  std::string omnisci_type = omnisci_column->columnType.get_type_name();
1200  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
1201  "\" to OmniSci type \"" + omnisci_type +
1202  "\" is not allowed. Please use an appropriate column type."};
1203  }
1204 }
1205 
1207  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1208  const std::string& file_path,
1209  const ForeignTableSchema& schema) {
1210  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
1212  schema.numLogicalColumns(), file_metadata->num_columns(), file_path);
1213  }
1214 }
1215 
1216 void throw_missing_metadata_error(const int row_group_index,
1217  const int column_index,
1218  const std::string& file_path) {
1219  throw std::runtime_error{
1220  "Statistics metadata is required for all row groups. Metadata is missing for "
1221  "row group index: " +
1222  std::to_string(row_group_index) +
1223  ", column index: " + std::to_string(column_index) + ", file path: " + file_path};
1224 }
1225 
1227  const int64_t max_row_group_size,
1228  const int fragment_size,
1229  const std::string& file_path) {
1230  throw std::runtime_error{
1231  "Parquet file has a row group size that is larger than the fragment size. "
1232  "Please set the table fragment size to a number that is larger than the "
1233  "row group size. Row group index: " +
1234  std::to_string(row_group_index) +
1235  ", row group size: " + std::to_string(max_row_group_size) +
1236  ", fragment size: " + std::to_string(fragment_size) + ", file path: " + file_path};
1237 }
1238 
1240  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1241  const std::string& file_path,
1242  const ForeignTableSchema& schema) {
1243  auto column_it = schema.getLogicalColumns().begin();
1244  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
1245  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
1246  try {
1247  validate_allowed_mapping(descr, *column_it);
1248  } catch (std::runtime_error& e) {
1249  std::stringstream error_message;
1250  error_message << e.what() << " Parquet column: " << descr->name()
1251  << ", OmniSci column: " << (*column_it)->columnName
1252  << ", Parquet file: " << file_path << ".";
1253  throw std::runtime_error(error_message.str());
1254  }
1255 
1256  auto fragment_size = schema.getForeignTable()->maxFragRows;
1257  int64_t max_row_group_size = 0;
1258  int max_row_group_index = 0;
1259  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
1260  auto group_metadata = file_metadata->RowGroup(r);
1261  auto num_rows = group_metadata->num_rows();
1262  if (num_rows > max_row_group_size) {
1263  max_row_group_size = num_rows;
1264  max_row_group_index = r;
1265  }
1266 
1267  auto column_chunk = group_metadata->ColumnChunk(i);
1268  bool contains_metadata = column_chunk->is_stats_set();
1269  if (contains_metadata) {
1270  auto stats = column_chunk->statistics();
1271  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
1272  if (!stats->HasMinMax() && !is_all_nulls) {
1273  contains_metadata = false;
1274  }
1275  }
1276 
1277  if (!contains_metadata) {
1278  throw_missing_metadata_error(r, i, file_path);
1279  }
1280  }
1281 
1282  if (max_row_group_size > fragment_size) {
1284  max_row_group_index, max_row_group_size, fragment_size, file_path);
1285  }
1286  }
1287 }
1288 
1290  const std::shared_ptr<parquet::FileMetaData>& file_metadata,
1291  const std::string& file_path,
1292  const ForeignTableSchema& schema) {
1293  validate_number_of_columns(file_metadata, file_path, schema);
1294  validate_column_mapping_and_row_group_metadata(file_metadata, file_path, schema);
1295 }
1296 
1297 std::list<RowGroupMetadata> metadata_scan_rowgroup_interval(
1298  const std::map<int, std::shared_ptr<ParquetEncoder>>& encoder_map,
1299  const RowGroupInterval& row_group_interval,
1300  const ReaderPtr& reader,
1301  const ForeignTableSchema& schema) {
1302  std::list<RowGroupMetadata> row_group_metadata;
1303  auto column_interval =
1304  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1305  schema.getLogicalAndPhysicalColumns().back()->columnId};
1306 
1307  auto file_metadata = reader->parquet_reader()->metadata();
1308  for (int row_group = row_group_interval.start_index;
1309  row_group <= row_group_interval.end_index;
1310  ++row_group) {
1311  auto& row_group_metadata_item = row_group_metadata.emplace_back();
1312  row_group_metadata_item.row_group_index = row_group;
1313  row_group_metadata_item.file_path = row_group_interval.file_path;
1314 
1315  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
1316  file_metadata->RowGroup(row_group);
1317 
1318  for (int column_id = column_interval.start; column_id <= column_interval.end;
1319  column_id++) {
1320  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1321  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1322  auto encoder_map_iter =
1323  encoder_map.find(schema.getLogicalColumn(column_id)->columnId);
1324  CHECK(encoder_map_iter != encoder_map.end());
1325  try {
1326  auto metadata = encoder_map_iter->second->getRowGroupMetadata(
1327  group_metadata.get(), parquet_column_index, column_descriptor->columnType);
1328  row_group_metadata_item.column_chunk_metadata.emplace_back(metadata);
1329  } catch (const std::exception& e) {
1330  std::stringstream error_message;
1331  error_message << e.what() << " in row group " << row_group << " of Parquet file '"
1332  << row_group_interval.file_path << "'.";
1333  throw std::runtime_error(error_message.str());
1334  }
1335  }
1336  }
1337  return row_group_metadata;
1338 }
1339 
1340 std::map<int, std::shared_ptr<ParquetEncoder>> populate_encoder_map(
1341  const Interval<ColumnType>& column_interval,
1342  const ForeignTableSchema& schema,
1343  const ReaderPtr& reader) {
1344  std::map<int, std::shared_ptr<ParquetEncoder>> encoder_map;
1345  auto file_metadata = reader->parquet_reader()->metadata();
1346  for (int column_id = column_interval.start; column_id <= column_interval.end;
1347  column_id++) {
1348  const auto column_descriptor = schema.getColumnDescriptor(column_id);
1349  auto parquet_column_descriptor =
1350  file_metadata->schema()->Column(schema.getParquetColumnIndex(column_id));
1351  encoder_map[column_id] =
1352  create_parquet_encoder(column_descriptor, parquet_column_descriptor);
1353  column_id += column_descriptor->columnType.get_physical_cols();
1354  }
1355  return encoder_map;
1356 }
1357 } // namespace
1358 
1359 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::appendRowGroups(
1360  const std::vector<RowGroupInterval>& row_group_intervals,
1361  const int parquet_column_index,
1362  const ColumnDescriptor* column_descriptor,
1363  std::list<Chunk_NS::Chunk>& chunks,
1364  StringDictionary* string_dictionary) {
1365  auto timer = DEBUG_TIMER(__func__);
1366  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1367  // `def_levels` and `rep_levels` below are used to store the read definition
1368  // and repetition levels of the Dremel encoding implemented by the Parquet
1369  // format
1370  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1371  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1372  std::vector<int8_t> values;
1373 
1374  CHECK(!row_group_intervals.empty());
1375  const auto& first_file_path = row_group_intervals.front().file_path;
1376 
1377  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1378  auto first_parquet_column_descriptor =
1379  get_column_descriptor(first_file_reader, parquet_column_index);
1380  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1381  auto encoder = create_parquet_encoder(column_descriptor,
1382  first_parquet_column_descriptor,
1383  chunks,
1384  string_dictionary,
1385  chunk_metadata);
1386  CHECK(encoder.get());
1387 
1388  for (const auto& row_group_interval : row_group_intervals) {
1389  const auto& file_path = row_group_interval.file_path;
1390  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1391 
1392  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1393  CHECK(row_group_interval.start_index >= 0 &&
1394  row_group_interval.end_index < num_row_groups);
1395  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1396 
1397  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1398  auto parquet_column_descriptor =
1399  get_column_descriptor(file_reader, parquet_column_index);
1400  validate_equal_column_descriptor(first_parquet_column_descriptor,
1401  parquet_column_descriptor,
1402  first_file_path,
1403  file_path);
1404 
1406  parquet_column_descriptor);
1407  int64_t values_read = 0;
1408  for (int row_group_index = row_group_interval.start_index;
1409  row_group_index <= row_group_interval.end_index;
1410  ++row_group_index) {
1411  auto group_reader = parquet_reader->RowGroup(row_group_index);
1412  std::shared_ptr<parquet::ColumnReader> col_reader =
1413  group_reader->Column(parquet_column_index);
1414 
1415  try {
1416  while (col_reader->HasNext()) {
1417  int64_t levels_read =
1419  def_levels.data(),
1420  rep_levels.data(),
1421  reinterpret_cast<uint8_t*>(values.data()),
1422  &values_read,
1423  col_reader.get());
1424  encoder->appendData(def_levels.data(),
1425  rep_levels.data(),
1426  values_read,
1427  levels_read,
1428  !col_reader->HasNext(),
1429  values.data());
1430  }
1431  } catch (const std::exception& error) {
1433  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1434  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1435  "', Parquet file: '" + file_path + "'");
1436  }
1437  }
1438  }
1439  return chunk_metadata;
1440 }
1441 
1443  const ColumnDescriptor* omnisci_column,
1444  const parquet::ColumnDescriptor* parquet_column) {
1445  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1446  return true;
1447  }
1448  if (validate_array_mapping(omnisci_column, parquet_column)) {
1449  return true;
1450  }
1451  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1452  return true;
1453  }
1454  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1455  return true;
1456  }
1457  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1458  return true;
1459  }
1460  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1461  return true;
1462  }
1463  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1464  return true;
1465  }
1466  if (validate_time_mapping(omnisci_column, parquet_column)) {
1467  return true;
1468  }
1469  if (validate_date_mapping(omnisci_column, parquet_column)) {
1470  return true;
1471  }
1472  if (validate_string_mapping(omnisci_column, parquet_column)) {
1473  return true;
1474  }
1475  return false;
1476 }
1477 
1479  std::shared_ptr<arrow::fs::FileSystem> file_system,
1480  FileReaderMap* file_map)
1481  : file_system_(file_system), file_reader_cache_(file_map) {}
1482 
1483 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
1484  const std::vector<RowGroupInterval>& row_group_intervals,
1485  const int parquet_column_index,
1486  std::list<Chunk_NS::Chunk>& chunks,
1487  StringDictionary* string_dictionary) {
1488  CHECK(!chunks.empty());
1489  auto const& chunk = *chunks.begin();
1490  auto column_descriptor = chunk.getColumnDesc();
1491  auto buffer = chunk.getBuffer();
1492  CHECK(buffer);
1493 
1494  try {
1495  auto metadata = appendRowGroups(row_group_intervals,
1496  parquet_column_index,
1497  column_descriptor,
1498  chunks,
1499  string_dictionary);
1500  return metadata;
1501  } catch (const std::exception& error) {
1502  throw ForeignStorageException(error.what());
1503  }
1504 
1505  return {};
1506 }
1507 
1508 std::list<RowGroupMetadata> LazyParquetChunkLoader::metadataScan(
1509  const std::set<std::string>& file_paths,
1510  const ForeignTableSchema& schema) {
1511  auto timer = DEBUG_TIMER(__func__);
1512  auto column_interval =
1513  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1514  schema.getLogicalAndPhysicalColumns().back()->columnId};
1515  CHECK(!file_paths.empty());
1516 
1517  // The encoder map needs to be populated before we can start scanning rowgroups, so we
1518  // peel the first file_path out of the async loop below to perform population.
1519  const auto& first_path = *file_paths.begin();
1520  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
1522  first_reader->parquet_reader()->metadata(), first_path, schema);
1523  auto encoder_map = populate_encoder_map(column_interval, schema, first_reader);
1524  const auto num_row_groups = get_parquet_table_size(first_reader).first;
1525  auto row_group_metadata = metadata_scan_rowgroup_interval(
1526  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
1527 
1528  // We want each (filepath->FileReader) pair in the cache to be initialized before we
1529  // multithread so that we are not adding keys in a concurrent environment, so we add
1530  // cache entries for each path and initialize to an empty unique_ptr if the file has not
1531  // yet been opened.
1532  // Since we have already performed the first iteration, we skip it in the thread groups
1533  // so as not to process it twice.
1534  std::set<std::string> cache_subset;
1535  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
1537  cache_subset.insert(*path_it);
1538  }
1539 
1540  // Iterate asyncronously over any paths beyond the first.
1541  auto paths_per_thread = partition_for_threads(cache_subset, g_max_import_threads);
1542  std::vector<std::future<std::list<RowGroupMetadata>>> futures;
1543  for (const auto& path_group : paths_per_thread) {
1544  futures.emplace_back(std::async(
1545  std::launch::async,
1546  [&](const auto& paths, const auto& file_reader_cache) {
1547  std::list<RowGroupMetadata> reduced_metadata;
1548  for (const auto& path : paths.get()) {
1549  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
1550  validate_equal_schema(first_reader, reader, first_path, path);
1551  validate_parquet_metadata(reader->parquet_reader()->metadata(), path, schema);
1552  const auto num_row_groups = get_parquet_table_size(reader).first;
1553  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
1554  reduced_metadata.splice(
1555  reduced_metadata.end(),
1556  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
1557  }
1558  return reduced_metadata;
1559  },
1560  std::ref(path_group),
1561  std::ref(*file_reader_cache_)));
1562  }
1563 
1564  // Reduce all the row_group results.
1565  for (auto& future : futures) {
1566  row_group_metadata.splice(row_group_metadata.end(), future.get());
1567  }
1568  return row_group_metadata;
1569 }
1570 
1571 } // namespace foreign_storage
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, const Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
void validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:106
#define CHECK_EQ(x, y)
Definition: Logger.h:205
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder_with_omnisci_type(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const int bit_width, const bool is_signed)
Create a integral parquet encoder using types.
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:48
std::shared_ptr< ParquetEncoder > create_parquet_signed_or_unsigned_integral_encoder_with_types(AbstractBuffer *buffer, const size_t omnisci_data_type_byte_size, const size_t parquet_data_type_byte_size, const bool is_signed)
Create a signed or unsigned integral parquet encoder using types.
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
#define LOG(tag)
Definition: Logger.h:188
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
bool is_fp() const
Definition: sqltypes.h:492
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
#define DOUBLE
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
tuple r
Definition: test_fsi.py:16
#define UNREACHABLE()
Definition: Logger.h:241
void throw_row_group_larger_than_fragment_size_error(const int row_group_index, const int64_t max_row_group_size, const int fragment_size, const std::string &file_path)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
bool is_nanosecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
std::string to_string(char const *&&v)
int getParquetColumnIndex(const int column_id) const
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache)
void throw_missing_metadata_error(const int row_group_index, const int column_index, const std::string &file_path)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ColumnDescriptor * getLogicalColumn(const int column_id) const
bool is_fixlen_array() const
Definition: sqltypes.h:498
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
bool is_integer() const
Definition: sqltypes.h:490
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:89
void throw_number_of_columns_mismatch_error(size_t num_table_cols, size_t num_file_cols, const std::string &file_path)
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
parquet::arrow::FileReader * ReaderPtr
Definition: ParquetShared.h:33
int get_precision() const
Definition: sqltypes.h:317
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
Definition: sqltypes.h:52
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:322
AbstractBuffer * getBuffer() const
Definition: Chunk.h:104
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:98
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:316
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::string get_type_name() const
Definition: sqltypes.h:417
void initializeIfEmpty(const std::string &path)
std::shared_ptr< ParquetEncoder > create_parquet_date_from_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
std::shared_ptr< ParquetEncoder > create_parquet_floating_point_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:323
const ColumnDescriptor * getColumnDescriptor(const int column_id) const
bool is_millisecond_precision(const ColumnDescriptor *omnisci_column)
const std::list< const ColumnDescriptor * > & getLogicalColumns() const
#define CHECK(condition)
Definition: Logger.h:197
bool is_geometry() const
Definition: sqltypes.h:500
const ForeignTable * getForeignTable() const
#define DEBUG_TIMER(name)
Definition: Logger.h:313
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
bool is_microsecond_precision(const ColumnDescriptor *omnisci_column)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
Definition: sqltypes.h:44
void validate_column_mapping_and_row_group_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:488
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
#define FLOAT
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder_with_types(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool is_decimal() const
Definition: sqltypes.h:491
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer, const bool is_metadata_scan)
std::string columnName
size_t g_max_import_threads
Definition: Importer.cpp:84
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:496
std::list< RowGroupMetadata > metadataScan(const std::set< std::string > &file_paths, const ForeignTableSchema &schema)
Perform a metadata scan for the paths specified.
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
const std::list< const ColumnDescriptor * > & getLogicalAndPhysicalColumns() const
#define BOOLEAN