OmniSciDB  2e3a973ef4
LazyParquetChunkLoader.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LazyParquetChunkLoader.h"
18 
19 #include <arrow/api.h>
20 #include <arrow/io/api.h>
21 #include <parquet/arrow/reader.h>
22 #include <parquet/column_scanner.h>
23 #include <parquet/exception.h>
24 #include <parquet/platform.h>
25 #include <parquet/types.h>
26 
28 #include "ParquetDecimalEncoder.h"
32 #include "ParquetStringEncoder.h"
34 #include "ParquetTimeEncoder.h"
37 
38 namespace foreign_storage {
39 
40 namespace {
41 
42 bool is_valid_parquet_string(const parquet::ColumnDescriptor* parquet_column) {
43  return (parquet_column->logical_type()->is_none() &&
44  parquet_column->physical_type() == parquet::Type::BYTE_ARRAY) ||
45  parquet_column->logical_type()->is_string();
46 }
47 
84 bool is_valid_parquet_list_column(const parquet::ColumnDescriptor* parquet_column) {
85  const parquet::schema::Node* node = parquet_column->schema_node().get();
86  if ((node->name() != "element" && node->name() != "item") ||
87  !(node->is_required() ||
88  node->is_optional())) { // ensure first innermost node is named "element"
89  // which is required by the parquet specification;
90  // however testing shows that pyarrow generates this
91  // column with the name of "item"
92  // this field must be either required or optional
93  return false;
94  }
95  node = node->parent();
96  if (!node) { // required nested structure
97  return false;
98  }
99  if (node->name() != "list" || !node->is_repeated() ||
100  !node->is_group()) { // ensure second innermost node is named "list" which is
101  // a repeated group; this is
102  // required by the parquet specification
103  return false;
104  }
105  node = node->parent();
106  if (!node) { // required nested structure
107  return false;
108  }
109  if (!node->logical_type()->is_list() ||
110  !(node->is_optional() ||
111  node->is_required())) { // ensure third outermost node has logical type LIST
112  // which is either optional or required; this is required
113  // by the parquet specification
114  return false;
115  }
116  node =
117  node->parent(); // this must now be the root node of schema which is required by
118  // FSI (lists can not be embedded into a deeper nested structure)
119  if (!node) { // required nested structure
120  return false;
121  }
122  node = node->parent();
123  if (node) { // implies the previous node was not the root node
124  return false;
125  }
126  return true;
127 }
128 
129 template <typename V>
130 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder_with_omnisci_type(
131  const ColumnDescriptor* column_descriptor,
132  const parquet::ColumnDescriptor* parquet_column_descriptor,
133  AbstractBuffer* buffer) {
134  switch (parquet_column_descriptor->physical_type()) {
135  case parquet::Type::INT32:
136  return std::make_shared<ParquetDecimalEncoder<V, int32_t>>(
137  buffer, column_descriptor, parquet_column_descriptor);
138  case parquet::Type::INT64:
139  return std::make_shared<ParquetDecimalEncoder<V, int64_t>>(
140  buffer, column_descriptor, parquet_column_descriptor);
141  case parquet::Type::FIXED_LEN_BYTE_ARRAY:
142  return std::make_shared<ParquetDecimalEncoder<V, parquet::FixedLenByteArray>>(
143  buffer, column_descriptor, parquet_column_descriptor);
144  case parquet::Type::BYTE_ARRAY:
145  return std::make_shared<ParquetDecimalEncoder<V, parquet::ByteArray>>(
146  buffer, column_descriptor, parquet_column_descriptor);
147  default:
148  UNREACHABLE();
149  }
150  return {};
151 }
152 
153 std::shared_ptr<ParquetEncoder> create_parquet_decimal_encoder(
154  const ColumnDescriptor* omnisci_column,
155  const parquet::ColumnDescriptor* parquet_column,
156  AbstractBuffer* buffer) {
157  if (parquet_column->logical_type()->is_decimal()) {
158  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
159  return create_parquet_decimal_encoder_with_omnisci_type<int64_t>(
160  omnisci_column, parquet_column, buffer);
161  } else if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
162  switch (omnisci_column->columnType.get_comp_param()) {
163  case 16:
164  return create_parquet_decimal_encoder_with_omnisci_type<int16_t>(
165  omnisci_column, parquet_column, buffer);
166  case 32:
167  return create_parquet_decimal_encoder_with_omnisci_type<int32_t>(
168  omnisci_column, parquet_column, buffer);
169  default:
170  UNREACHABLE();
171  }
172  } else {
173  UNREACHABLE();
174  }
175  }
176  return {};
177 }
178 
179 std::shared_ptr<ParquetEncoder> create_parquet_integral_encoder(
180  const ColumnDescriptor* omnisci_column,
181  const parquet::ColumnDescriptor* parquet_column,
182  AbstractBuffer* buffer) {
183  auto column_type = omnisci_column->columnType;
184  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
185  parquet_column->logical_type().get())) {
186  if (int_logical_column->is_signed()) { // signed
187  switch (column_type.get_size()) {
188  case 8:
189  CHECK(column_type.get_compression() == kENCODING_NONE);
190  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
191  buffer, omnisci_column, parquet_column);
192  case 4:
193  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
194  buffer, omnisci_column, parquet_column);
195  case 2:
196  return std::make_shared<ParquetFixedLengthEncoder<int16_t, int32_t>>(
197  buffer, omnisci_column, parquet_column);
198  case 1:
199  return std::make_shared<ParquetFixedLengthEncoder<int8_t, int32_t>>(
200  buffer, omnisci_column, parquet_column);
201  default:
202  UNREACHABLE();
203  }
204  } else { // unsigned, requires using a larger bit depth signed integer within omnisci
205  // to prevent the possibility of loss of information
206  switch (column_type.get_size()) {
207  case 8:
208  CHECK(column_type.get_compression() == kENCODING_NONE);
209  return std::make_shared<
211  buffer, omnisci_column, parquet_column);
212  case 4:
213  return std::make_shared<
215  buffer, omnisci_column, parquet_column);
216  case 2:
217  return std::make_shared<
219  buffer, omnisci_column, parquet_column);
220  default:
221  UNREACHABLE();
222  }
223  }
224  }
225  return {};
226 }
227 
228 std::shared_ptr<ParquetEncoder> create_parquet_none_type_encoder(
229  const ColumnDescriptor* omnisci_column,
230  const parquet::ColumnDescriptor* parquet_column,
231  AbstractBuffer* buffer) {
232  auto column_type = omnisci_column->columnType;
233  if (parquet_column->logical_type()->is_none() &&
234  !omnisci_column->columnType.is_string()) { // boolean, int32, int64, float & double
235  if (column_type.get_compression() == kENCODING_NONE) {
236  switch (column_type.get_type()) {
237  case kBIGINT:
238  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
239  buffer, omnisci_column, parquet_column);
240  case kINT:
241  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
242  buffer, omnisci_column, parquet_column);
243  case kBOOLEAN:
244  return std::make_shared<ParquetFixedLengthEncoder<int8_t, bool>>(
245  buffer, omnisci_column, parquet_column);
246  case kFLOAT:
247  return std::make_shared<ParquetFixedLengthEncoder<float, float>>(
248  buffer, omnisci_column, parquet_column);
249  case kDOUBLE:
250  return std::make_shared<ParquetFixedLengthEncoder<double, double>>(
251  buffer, omnisci_column, parquet_column);
252  default:
253  UNREACHABLE();
254  }
255  } else {
256  UNREACHABLE();
257  }
258  }
259  return {};
260 }
261 
262 std::shared_ptr<ParquetEncoder> create_parquet_timestamp_encoder(
263  const ColumnDescriptor* omnisci_column,
264  const parquet::ColumnDescriptor* parquet_column,
265  AbstractBuffer* buffer) {
266  auto column_type = omnisci_column->columnType;
267  if (parquet_column->logical_type()->is_timestamp()) {
268  auto precision = column_type.get_precision();
269  if (precision == 0) {
270  return std::make_shared<ParquetTimestampEncoder<int64_t, int64_t>>(
271  buffer, omnisci_column, parquet_column);
272  } else {
273  return std::make_shared<ParquetFixedLengthEncoder<int64_t, int64_t>>(
274  buffer, omnisci_column, parquet_column);
275  }
276  }
277  return {};
278 }
279 
280 std::shared_ptr<ParquetEncoder> create_parquet_time_encoder(
281  const ColumnDescriptor* omnisci_column,
282  const parquet::ColumnDescriptor* parquet_column,
283  AbstractBuffer* buffer) {
284  auto column_type = omnisci_column->columnType;
285  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
286  parquet_column->logical_type().get())) {
287  if (column_type.get_compression() == kENCODING_NONE) {
288  if (time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS) {
289  return std::make_shared<ParquetTimeEncoder<int64_t, int32_t>>(
290  buffer, omnisci_column, parquet_column);
291  } else {
292  return std::make_shared<ParquetTimeEncoder<int64_t, int64_t>>(
293  buffer, omnisci_column, parquet_column);
294  }
295  } else if (column_type.get_compression() == kENCODING_FIXED) {
296  return std::make_shared<ParquetTimeEncoder<int32_t, int32_t>>(
297  buffer, omnisci_column, parquet_column);
298  } else {
299  UNREACHABLE();
300  }
301  }
302  return {};
303 }
304 
305 std::shared_ptr<ParquetEncoder> create_parquet_date_encoder(
306  const ColumnDescriptor* omnisci_column,
307  const parquet::ColumnDescriptor* parquet_column,
308  AbstractBuffer* buffer) {
309  auto column_type = omnisci_column->columnType;
310  if (parquet_column->logical_type()->is_date()) {
311  if (column_type.get_compression() == kENCODING_DATE_IN_DAYS) {
312  return std::make_shared<ParquetFixedLengthEncoder<int32_t, int32_t>>(
313  buffer, omnisci_column, parquet_column);
314  } else if (column_type.get_compression() == kENCODING_NONE) { // for array types
315  return std::make_shared<ParquetDateInSecondsEncoder>(
316  buffer, omnisci_column, parquet_column);
317  } else {
318  UNREACHABLE();
319  }
320  }
321  return {};
322 }
323 
324 std::shared_ptr<ParquetEncoder> create_parquet_string_encoder(
325  const ColumnDescriptor* omnisci_column,
326  const parquet::ColumnDescriptor* parquet_column,
327  Chunk_NS::Chunk& chunk,
328  StringDictionary* string_dictionary,
329  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) {
330  auto column_type = omnisci_column->columnType;
331  if (!is_valid_parquet_string(parquet_column)) {
332  return {};
333  }
334  if (column_type.get_compression() == kENCODING_NONE) {
335  return std::make_shared<ParquetStringNoneEncoder>(chunk.getBuffer(),
336  chunk.getIndexBuf());
337  } else if (column_type.get_compression() == kENCODING_DICT) {
338  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
339  auto& logical_chunk_metadata = chunk_metadata.back();
340  logical_chunk_metadata->sqlType = omnisci_column->columnType;
341  CHECK(string_dictionary);
342  switch (column_type.get_size()) {
343  case 1:
344  return std::make_shared<ParquetStringEncoder<uint8_t>>(
345  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
346  case 2:
347  return std::make_shared<ParquetStringEncoder<uint16_t>>(
348  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
349  case 4:
350  return std::make_shared<ParquetStringEncoder<int32_t>>(
351  chunk.getBuffer(), string_dictionary, logical_chunk_metadata);
352  default:
353  UNREACHABLE();
354  }
355  } else {
356  UNREACHABLE();
357  }
358  return {};
359 }
360 
361 std::shared_ptr<ParquetEncoder> create_parquet_geospatial_encoder(
362  const ColumnDescriptor* omnisci_column,
363  const parquet::ColumnDescriptor* parquet_column,
364  std::list<Chunk_NS::Chunk>& chunks,
365  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) {
366  auto column_type = chunks.begin()->getColumnDesc()->columnType;
367  if (!is_valid_parquet_string(parquet_column) || !column_type.is_geometry()) {
368  return {};
369  }
370  for (auto chunks_iter = chunks.begin(); chunks_iter != chunks.end(); ++chunks_iter) {
371  chunk_metadata.emplace_back(std::make_unique<ChunkMetadata>());
372  auto& chunk_metadata_ptr = chunk_metadata.back();
373  chunk_metadata_ptr->sqlType = chunks_iter->getColumnDesc()->columnType;
374  }
375  return std::make_shared<ParquetGeospatialEncoder>(
376  parquet_column, chunks, chunk_metadata);
377 }
378 
379 // forward declare `create_parquet_array_encoder`: `create_parquet_encoder` and
380 // `create_parquet_array_encoder` each make use of each other, so
381 // one of the two functions must have a forward declaration
382 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
383  const ColumnDescriptor* omnisci_column,
384  const parquet::ColumnDescriptor* parquet_column,
385  std::list<Chunk_NS::Chunk>& chunks,
386  StringDictionary* string_dictionary,
387  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata);
388 
389 std::shared_ptr<ParquetEncoder> create_parquet_encoder(
390  const ColumnDescriptor* omnisci_column,
391  const parquet::ColumnDescriptor* parquet_column,
392  std::list<Chunk_NS::Chunk>& chunks,
393  StringDictionary* string_dictionary,
394  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) {
395  CHECK(!chunks.empty());
396  auto& chunk = *chunks.begin();
397  auto buffer = chunk.getBuffer();
398  if (auto encoder = create_parquet_geospatial_encoder(
399  omnisci_column, parquet_column, chunks, chunk_metadata)) {
400  return encoder;
401  }
402  if (auto encoder = create_parquet_array_encoder(
403  omnisci_column, parquet_column, chunks, string_dictionary, chunk_metadata)) {
404  return encoder;
405  }
406  if (auto encoder =
407  create_parquet_decimal_encoder(omnisci_column, parquet_column, buffer)) {
408  return encoder;
409  }
410  if (auto encoder =
411  create_parquet_integral_encoder(omnisci_column, parquet_column, buffer)) {
412  return encoder;
413  }
414  if (auto encoder =
415  create_parquet_none_type_encoder(omnisci_column, parquet_column, buffer)) {
416  return encoder;
417  }
418  if (auto encoder =
419  create_parquet_timestamp_encoder(omnisci_column, parquet_column, buffer)) {
420  return encoder;
421  }
422  if (auto encoder =
423  create_parquet_time_encoder(omnisci_column, parquet_column, buffer)) {
424  return encoder;
425  }
426  if (auto encoder =
427  create_parquet_date_encoder(omnisci_column, parquet_column, buffer)) {
428  return encoder;
429  }
430  if (auto encoder = create_parquet_string_encoder(
431  omnisci_column, parquet_column, chunk, string_dictionary, chunk_metadata)) {
432  return encoder;
433  }
434  UNREACHABLE();
435  return {};
436 }
437 
438 std::shared_ptr<ParquetEncoder> create_parquet_array_encoder(
439  const ColumnDescriptor* omnisci_column,
440  const parquet::ColumnDescriptor* parquet_column,
441  std::list<Chunk_NS::Chunk>& chunks,
442  StringDictionary* string_dictionary,
443  std::list<std::unique_ptr<ChunkMetadata>>& chunk_metadata) {
444  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column);
445  if (!is_valid_parquet_list || !omnisci_column->columnType.is_array()) {
446  return {};
447  }
448  std::unique_ptr<ColumnDescriptor> omnisci_column_sub_type_column =
449  get_sub_type_column_descriptor(omnisci_column);
450  auto encoder = create_parquet_encoder(omnisci_column_sub_type_column.get(),
451  parquet_column,
452  chunks,
453  string_dictionary,
454  chunk_metadata);
455  CHECK(encoder.get());
456  auto scalar_encoder = std::dynamic_pointer_cast<ParquetScalarEncoder>(encoder);
457  CHECK(scalar_encoder);
458  auto& chunk = *chunks.begin();
459  if (omnisci_column->columnType.is_fixlen_array()) {
460  encoder = std::make_shared<ParquetFixedLengthArrayEncoder>(
461  chunk.getBuffer(), scalar_encoder, omnisci_column);
462  } else {
463  encoder = std::make_shared<ParquetVariableLengthArrayEncoder>(
464  chunk.getBuffer(), chunk.getIndexBuf(), scalar_encoder, omnisci_column);
465  }
466  return encoder;
467 }
468 
470  const ColumnDescriptor* omnisci_column_descriptor,
471  const parquet::ColumnDescriptor* parquet_column_descriptor) {
472  bool is_valid_parquet_list = is_valid_parquet_list_column(parquet_column_descriptor);
473  if (is_valid_parquet_list && !omnisci_column_descriptor->columnType.is_array()) {
474  throw std::runtime_error(
475  "Unsupported mapping detected. Column '" + parquet_column_descriptor->name() +
476  "' detected to be a parquet list but OmniSci mapped column '" +
477  omnisci_column_descriptor->columnName + "' is not an array.");
478  }
479  if (is_valid_parquet_list) {
480  if (parquet_column_descriptor->max_repetition_level() != 1 ||
481  parquet_column_descriptor->max_definition_level() != 3) {
482  throw std::runtime_error(
483  "Incorrect schema max repetition level detected in column '" +
484  parquet_column_descriptor->name() +
485  "'. Expected a max repetition level of 1 and max definition level of 3 for "
486  "list column but column has a max "
487  "repetition level of " +
488  std::to_string(parquet_column_descriptor->max_repetition_level()) +
489  " and a max definition level of " +
490  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
491  }
492  } else {
493  if (parquet_column_descriptor->max_repetition_level() != 0 ||
494  parquet_column_descriptor->max_definition_level() != 1) {
495  throw std::runtime_error(
496  "Incorrect schema max repetition level detected in column '" +
497  parquet_column_descriptor->name() +
498  "'. Expected a max repetition level of 0 and max definition level of 1 for "
499  "flat column but column has a max "
500  "repetition level of " +
501  std::to_string(parquet_column_descriptor->max_repetition_level()) +
502  " and a max definition level of " +
503  std::to_string(parquet_column_descriptor->max_definition_level()) + ".");
504  }
505  }
506 }
507 
508 void resize_values_buffer(const ColumnDescriptor* omnisci_column,
509  const parquet::ColumnDescriptor* parquet_column,
510  std::vector<int8_t>& values) {
511  auto max_type_byte_size =
512  std::max(omnisci_column->columnType.get_size(),
513  parquet::GetTypeByteSize(parquet_column->physical_type()));
514  size_t values_size =
516  values.resize(values_size);
517 }
518 
519 std::list<std::unique_ptr<ChunkMetadata>> append_row_groups(
520  const std::vector<RowGroupInterval>& row_group_intervals,
521  const int parquet_column_index,
522  const ColumnDescriptor* column_descriptor,
523  std::list<Chunk_NS::Chunk>& chunks,
524  StringDictionary* string_dictionary,
525  std::shared_ptr<arrow::fs::FileSystem> file_system) {
526  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
527  // `def_levels` and `rep_levels` below are used to store the read definition
528  // and repetition levels of the Dremel encoding implemented by the Parquet
529  // format
530  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
531  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
532  std::vector<int8_t> values;
533 
534  CHECK(!row_group_intervals.empty());
535  std::unique_ptr<parquet::arrow::FileReader> first_file_reader;
536  const auto& first_file_path = row_group_intervals.front().file_path;
537  open_parquet_table(first_file_path, first_file_reader, file_system);
538  auto first_parquet_column_descriptor =
539  get_column_descriptor(first_file_reader.get(), parquet_column_index);
540  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
541  auto encoder = create_parquet_encoder(column_descriptor,
542  first_parquet_column_descriptor,
543  chunks,
544  string_dictionary,
545  chunk_metadata);
546  CHECK(encoder.get());
547 
548  for (const auto& row_group_interval : row_group_intervals) {
549  std::unique_ptr<parquet::arrow::FileReader> file_reader;
550  open_parquet_table(row_group_interval.file_path, file_reader, file_system);
551 
552  int num_row_groups, num_columns;
553  std::tie(num_row_groups, num_columns) = get_parquet_table_size(file_reader);
554  CHECK(row_group_interval.start_index >= 0 &&
555  row_group_interval.end_index < num_row_groups);
556  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
557 
558  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
559  auto parquet_column_descriptor =
560  get_column_descriptor(file_reader.get(), parquet_column_index);
561  validate_equal_column_descriptor(first_parquet_column_descriptor,
562  parquet_column_descriptor,
563  first_file_path,
564  row_group_interval.file_path);
565 
567  parquet_column_descriptor);
568  int64_t values_read = 0;
569  for (int row_group_index = row_group_interval.start_index;
570  row_group_index <= row_group_interval.end_index;
571  ++row_group_index) {
572  auto group_reader = parquet_reader->RowGroup(row_group_index);
573  std::shared_ptr<parquet::ColumnReader> col_reader =
574  group_reader->Column(parquet_column_index);
575 
576  while (col_reader->HasNext()) {
577  int64_t levels_read =
579  def_levels.data(),
580  rep_levels.data(),
581  reinterpret_cast<uint8_t*>(values.data()),
582  &values_read,
583  col_reader.get());
584  encoder->appendData(def_levels.data(),
585  rep_levels.data(),
586  values_read,
587  levels_read,
588  !col_reader->HasNext(),
589  values.data());
590  }
591  }
592  }
593  return chunk_metadata;
594 }
595 
596 bool validate_decimal_mapping(const ColumnDescriptor* omnisci_column,
597  const parquet::ColumnDescriptor* parquet_column) {
598  if (auto decimal_logical_column = dynamic_cast<const parquet::DecimalLogicalType*>(
599  parquet_column->logical_type().get())) {
600  return omnisci_column->columnType.get_precision() ==
601  decimal_logical_column->precision() &&
602  omnisci_column->columnType.get_scale() == decimal_logical_column->scale() &&
603  omnisci_column->columnType.is_decimal() &&
604  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
605  omnisci_column->columnType.get_compression() == kENCODING_FIXED);
606  }
607  return false;
608 }
609 
610 bool validate_integral_mapping(const ColumnDescriptor* omnisci_column,
611  const parquet::ColumnDescriptor* parquet_column) {
612  if (auto int_logical_column = dynamic_cast<const parquet::IntLogicalType*>(
613  parquet_column->logical_type().get())) {
614  if (int_logical_column->is_signed()) { // signed
615  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
616  switch (int_logical_column->bit_width()) {
617  case 64:
618  return omnisci_column->columnType.get_type() == kBIGINT;
619  case 32:
620  return omnisci_column->columnType.get_type() == kINT;
621  case 16:
622  return omnisci_column->columnType.get_type() == kSMALLINT;
623  case 8:
624  return omnisci_column->columnType.get_type() == kTINYINT;
625  default:
626  UNREACHABLE();
627  }
628  } else if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
629  return omnisci_column->columnType.get_comp_param() ==
630  int_logical_column->bit_width();
631  }
632  } else { // unsigned
633  if (omnisci_column->columnType.get_compression() == kENCODING_NONE) {
634  switch (int_logical_column->bit_width()) {
635  case 64:
636  return false;
637  case 32:
638  return omnisci_column->columnType.get_type() == kBIGINT;
639  case 16:
640  return omnisci_column->columnType.get_type() == kINT;
641  case 8:
642  return omnisci_column->columnType.get_type() == kSMALLINT;
643  default:
644  UNREACHABLE();
645  }
646  } else if (omnisci_column->columnType.get_compression() == kENCODING_FIXED) {
647  switch (int_logical_column->bit_width()) {
648  case 64:
649  case 32:
650  return false;
651  case 16:
652  return omnisci_column->columnType.get_comp_param() == 32;
653  case 8:
654  return omnisci_column->columnType.get_comp_param() == 16;
655  default:
656  UNREACHABLE();
657  }
658  }
659  }
660  }
661  return false;
662 }
663 
664 bool validate_none_type_mapping(const ColumnDescriptor* omnisci_column,
665  const parquet::ColumnDescriptor* parquet_column) {
666  return parquet_column->logical_type()->is_none() &&
667  omnisci_column->columnType.get_compression() == kENCODING_NONE &&
668  ((parquet_column->physical_type() == parquet::Type::BOOLEAN &&
669  omnisci_column->columnType.get_type() == kBOOLEAN) ||
670  (parquet_column->physical_type() == parquet::Type::INT32 &&
671  omnisci_column->columnType.get_type() == kINT) ||
672  (parquet_column->physical_type() == parquet::Type::INT64 &&
673  omnisci_column->columnType.get_type() == kBIGINT) ||
674  (parquet_column->physical_type() == parquet::Type::FLOAT &&
675  omnisci_column->columnType.get_type() == kFLOAT) ||
676  (parquet_column->physical_type() == parquet::Type::DOUBLE &&
677  omnisci_column->columnType.get_type() == kDOUBLE));
678 }
679 
680 bool validate_timestamp_mapping(const ColumnDescriptor* omnisci_column,
681  const parquet::ColumnDescriptor* parquet_column) {
682  if (!(omnisci_column->columnType.get_type() == kTIMESTAMP &&
683  omnisci_column->columnType.get_compression() == kENCODING_NONE)) {
684  return false;
685  }
686  if (auto timestamp_logical_column = dynamic_cast<const parquet::TimestampLogicalType*>(
687  parquet_column->logical_type().get())) {
688  if (!timestamp_logical_column->is_adjusted_to_utc()) {
689  return false;
690  }
691  return omnisci_column->columnType.get_dimension() == 0 ||
692  ((omnisci_column->columnType.get_dimension() == 9 &&
693  timestamp_logical_column->time_unit() ==
694  parquet::LogicalType::TimeUnit::NANOS) ||
695  (omnisci_column->columnType.get_dimension() == 6 &&
696  timestamp_logical_column->time_unit() ==
697  parquet::LogicalType::TimeUnit::MICROS) ||
698  (omnisci_column->columnType.get_dimension() == 3 &&
699  timestamp_logical_column->time_unit() ==
700  parquet::LogicalType::TimeUnit::MILLIS));
701  }
702  return false;
703 }
704 
705 bool validate_time_mapping(const ColumnDescriptor* omnisci_column,
706  const parquet::ColumnDescriptor* parquet_column) {
707  if (!(omnisci_column->columnType.get_type() == kTIME &&
708  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
709  (omnisci_column->columnType.get_compression() == kENCODING_FIXED &&
710  omnisci_column->columnType.get_comp_param() == 32)))) {
711  return false;
712  }
713  if (auto time_logical_column = dynamic_cast<const parquet::TimeLogicalType*>(
714  parquet_column->logical_type().get())) {
715  if (!time_logical_column->is_adjusted_to_utc()) {
716  return false;
717  }
718  return omnisci_column->columnType.get_compression() == kENCODING_NONE ||
719  time_logical_column->time_unit() == parquet::LogicalType::TimeUnit::MILLIS;
720  }
721  return false;
722 }
723 
724 bool validate_date_mapping(const ColumnDescriptor* omnisci_column,
725  const parquet::ColumnDescriptor* parquet_column) {
726  if (!(omnisci_column->columnType.get_type() == kDATE &&
727  ((omnisci_column->columnType.get_compression() == kENCODING_DATE_IN_DAYS &&
728  omnisci_column->columnType.get_comp_param() ==
729  0) // DATE ENCODING DAYS (32) specifies comp_param of 0
730  || omnisci_column->columnType.get_compression() ==
731  kENCODING_NONE // for array types
732  ))) {
733  return false;
734  }
735  return parquet_column->logical_type()->is_date();
736 }
737 
738 bool validate_string_mapping(const ColumnDescriptor* omnisci_column,
739  const parquet::ColumnDescriptor* parquet_column) {
740  return is_valid_parquet_string(parquet_column) &&
741  omnisci_column->columnType.is_string() &&
742  (omnisci_column->columnType.get_compression() == kENCODING_NONE ||
743  omnisci_column->columnType.get_compression() == kENCODING_DICT);
744 }
745 
746 bool validate_array_mapping(const ColumnDescriptor* omnisci_column,
747  const parquet::ColumnDescriptor* parquet_column) {
748  if (is_valid_parquet_list_column(parquet_column) &&
749  omnisci_column->columnType.is_array()) {
750  auto omnisci_column_sub_type_column = get_sub_type_column_descriptor(omnisci_column);
752  omnisci_column_sub_type_column.get(), parquet_column);
753  }
754  return false;
755 }
756 
758  const parquet::ColumnDescriptor* parquet_column) {
759  return is_valid_parquet_string(parquet_column) &&
760  omnisci_column->columnType.is_geometry();
761 }
762 
763 } // namespace
764 
766  const ColumnDescriptor* omnisci_column,
767  const parquet::ColumnDescriptor* parquet_column) {
768  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
769  return true;
770  }
771  if (validate_array_mapping(omnisci_column, parquet_column)) {
772  return true;
773  }
774  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
775  return true;
776  }
777  if (validate_integral_mapping(omnisci_column, parquet_column)) {
778  return true;
779  }
780  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
781  return true;
782  }
783  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
784  return true;
785  }
786  if (validate_time_mapping(omnisci_column, parquet_column)) {
787  return true;
788  }
789  if (validate_date_mapping(omnisci_column, parquet_column)) {
790  return true;
791  }
792  if (validate_string_mapping(omnisci_column, parquet_column)) {
793  return true;
794  }
795  return false;
796 }
797 
799  std::shared_ptr<arrow::fs::FileSystem> file_system)
800  : file_system_(file_system) {}
801 
802 std::list<std::unique_ptr<ChunkMetadata>> LazyParquetChunkLoader::loadChunk(
803  const std::vector<RowGroupInterval>& row_group_intervals,
804  const int parquet_column_index,
805  std::list<Chunk_NS::Chunk>& chunks,
806  StringDictionary* string_dictionary) {
807  CHECK(!chunks.empty());
808  auto const& chunk = *chunks.begin();
809  auto column_descriptor = chunk.getColumnDesc();
810  auto buffer = chunk.getBuffer();
811  CHECK(buffer);
812 
813  auto metadata = append_row_groups(row_group_intervals,
814  parquet_column_index,
815  column_descriptor,
816  chunks,
817  string_dictionary,
818  file_system_);
819  return metadata;
820 }
821 
822 } // namespace foreign_storage
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool is_array() const
Definition: sqltypes.h:425
std::shared_ptr< ParquetEncoder > create_parquet_geospatial_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
int get_precision() const
Definition: sqltypes.h:262
std::shared_ptr< ParquetEncoder > create_parquet_timestamp_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool is_string() const
Definition: sqltypes.h:417
Definition: sqltypes.h:51
static bool isColumnMappingSupported(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
AbstractBuffer * getBuffer() const
Definition: Chunk.h:104
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:268
#define UNREACHABLE()
Definition: Logger.h:241
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
std::shared_ptr< ParquetEncoder > create_parquet_none_type_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
AbstractBuffer * getIndexBuf() const
Definition: Chunk.h:106
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
HOST DEVICE EncodingType get_compression() const
Definition: sqltypes.h:267
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::list< std::unique_ptr< ChunkMetadata > > loadChunk(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
bool is_decimal() const
Definition: sqltypes.h:420
HOST DEVICE int get_scale() const
Definition: sqltypes.h:264
std::pair< int, int > get_parquet_table_size(const std::unique_ptr< parquet::arrow::FileReader > &reader)
std::string to_string(char const *&&v)
LazyParquetChunkLoader(std::shared_ptr< arrow::fs::FileSystem > file_system)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
std::shared_ptr< ParquetEncoder > create_parquet_integral_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
An AbstractBuffer is a unit of data management for a data manager.
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
specifies the content in-memory of a row in the column metadata table
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_date_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
Definition: sqltypes.h:55
bool is_geometry() const
Definition: sqltypes.h:429
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
std::shared_ptr< ParquetEncoder > create_parquet_array_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
HOST DEVICE int get_dimension() const
Definition: sqltypes.h:261
std::shared_ptr< ParquetEncoder > create_parquet_time_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, AbstractBuffer *buffer)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
std::shared_ptr< ParquetEncoder > create_parquet_string_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, Chunk_NS::Chunk &chunk, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata)
#define CHECK(condition)
Definition: Logger.h:197
bool is_valid_parquet_string(const parquet::ColumnDescriptor *parquet_column)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
void open_parquet_table(const std::string &file_path, std::unique_ptr< parquet::arrow::FileReader > &reader, std::shared_ptr< arrow::fs::FileSystem > &file_system)
bool is_fixlen_array() const
Definition: sqltypes.h:427
std::list< std::unique_ptr< ChunkMetadata > > append_row_groups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::shared_ptr< arrow::fs::FileSystem > file_system)
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
Definition: sqltypes.h:47
SQLTypeInfo columnType
std::shared_ptr< ParquetEncoder > create_parquet_decimal_encoder_with_omnisci_type(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, AbstractBuffer *buffer)
std::string columnName
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)