OmniSciDB  2e3a973ef4
foreign_storage::anonymous_namespace{LazyParquetImporter.cpp} Namespace Reference

Functions

std::string type_to_string (SQLTypes type)
 
void validate_allowed_mapping (const parquet::ColumnDescriptor *descr, const ColumnDescriptor *cd)
 
void validate_parquet_metadata (const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
 
void initialize_bad_rows_tracker (import_export::BadRowsTracker &bad_rows_tracker, const int row_group, const std::string &file_path, import_export::Importer *importer)
 
void initialize_import_buffers_vec (const import_export::Loader *loader, const ForeignTableSchema &schema, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
 
void initialize_row_group_metadata_vec (const size_t num_logical_and_physical_columns, std::vector< RowGroupMetadata > &row_group_metadata_vec)
 
std::unique_ptr< import_export::TypedImportBuffer > & initialize_import_buffer (const size_t column_id, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
 
int64_t get_divisor (const SQLTypeInfo &type, const parquet::LogicalType::TimeUnit::unit time_unit)
 
std::pair< int64_t, int64_t > get_datetime_min_and_max (const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
 
int64_t convert_decimal_byte_array_to_int (const std::string &encoded_value)
 
std::pair< int64_t, int64_t > get_decimal_min_and_max (const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
 
void update_array_metadata_stats (ArrayMetadataStats &array_stats, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
 
void read_parquet_metadata_into_import_buffer (const size_t num_rows, const int row_group, const ForeignTableSchema &schema, const std::unique_ptr< parquet::RowGroupMetaData > &group_metadata, const ColumnDescriptor *column_descriptor, import_export::BadRowsTracker &bad_rows_tracker, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec, ParquetLoaderMetadata &parquet_loader_metadata)
 
void read_parquet_data_into_import_buffer (const import_export::Loader *loader, const int row_group, const ForeignTableSchema &schema, const ColumnDescriptor *column_descriptor, std::unique_ptr< parquet::arrow::FileReader > &reader, import_export::BadRowsTracker &bad_rows_tracker, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
 
void import_row_group (const int row_group, const bool metadata_scan, const ForeignTableSchema &schema, const Interval< ColumnType > &column_interval, import_export::Importer *importer, std::unique_ptr< parquet::arrow::FileReader > &reader, ParquetLoaderMetadata &parquet_loader_metadata, import_export::BadRowsTracker &bad_rows_tracker, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
 
void import_row_groups (const RowGroupInterval &row_group_interval, bool &load_failed, const bool metadata_scan, const ForeignTableSchema &schema, const Interval< ColumnType > &column_interval, import_export::Importer *importer, std::unique_ptr< parquet::arrow::FileReader > &reader, ParquetLoaderMetadata &parquet_loader_metadata, import_export::BadRowsTracker &bad_rows_tracker, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
 
void validate_equal_schema (const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
 

Variables

constexpr int MILLIS_PER_SECOND = 1000
 
constexpr int MICROS_PER_SECOND = 1000 * 1000
 
constexpr int NANOS_PER_SECOND = 1000 * 1000 * 1000
 

Function Documentation

◆ convert_decimal_byte_array_to_int()

int64_t foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::convert_decimal_byte_array_to_int ( const std::string &  encoded_value)

Definition at line 267 of file LazyParquetImporter.cpp.

References CHECK, and run_benchmark_import::result.

Referenced by get_decimal_min_and_max().

267  {
268  auto byte_array = reinterpret_cast<const uint8_t*>(encoded_value.data());
269  auto result = arrow::Decimal128::FromBigEndian(byte_array, encoded_value.length());
270  CHECK(result.ok()) << result.status().message();
271  auto& decimal = result.ValueOrDie();
272  return static_cast<int64_t>(decimal);
273 }
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the caller graph for this function:

◆ get_datetime_min_and_max()

std::pair<int64_t, int64_t> foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::get_datetime_min_and_max ( const ColumnDescriptor column_descriptor,
const parquet::ColumnDescriptor *  parquet_column_descriptor,
std::shared_ptr< parquet::Statistics >  stats 
)

Gets the min and max Parquet chunk metadata for time, timestamp, or date column types.

Definition at line 203 of file LazyParquetImporter.cpp.

References CHECK, CHECK_EQ, ColumnDescriptor::columnName, ColumnDescriptor::columnType, get_divisor(), kTIME, run_benchmark_import::type, and UNREACHABLE.

Referenced by read_parquet_metadata_into_import_buffer().

206  {
207  int64_t min{0}, max{0};
208  auto& type = column_descriptor->columnType;
209  auto logical_type = parquet_column_descriptor->logical_type();
210  auto physical_type = parquet_column_descriptor->physical_type();
211  auto encoded_min = stats->EncodeMin();
212  auto encoded_max = stats->EncodeMax();
213 
214  if (type.is_timestamp()) {
215  CHECK(logical_type->is_timestamp());
216  CHECK_EQ(parquet::Type::INT64, physical_type);
217 
218  auto timestamp_type =
219  dynamic_cast<const parquet::TimestampLogicalType*>(logical_type.get());
220  CHECK(timestamp_type);
221 
222  if (!timestamp_type->is_adjusted_to_utc()) {
223  throw std::runtime_error{
224  "Non-UTC timezone specified in Parquet file for column \"" +
225  column_descriptor->columnName +
226  "\". Only UTC timezone is currently supported."};
227  }
228 
229  auto divisor = get_divisor(type, timestamp_type->time_unit());
230  min = reinterpret_cast<int64_t*>(encoded_min.data())[0];
231  max = reinterpret_cast<int64_t*>(encoded_max.data())[0];
232 
233  min /= divisor;
234  max /= divisor;
235  } else if (type.is_date()) {
236  CHECK_EQ(parquet::Type::INT32, physical_type);
237  min = reinterpret_cast<int32_t*>(encoded_min.data())[0];
238  max = reinterpret_cast<int32_t*>(encoded_max.data())[0];
239  } else if (type.get_type() == kTIME) {
240  CHECK(logical_type->is_time());
241 
242  auto time_type = dynamic_cast<const parquet::TimeLogicalType*>(logical_type.get());
243  CHECK(time_type);
244 
245  auto time_unit = time_type->time_unit();
246  auto divisor = get_divisor(type, time_unit);
247  if (time_unit == parquet::LogicalType::TimeUnit::MILLIS) {
248  CHECK_EQ(parquet::Type::INT32, physical_type);
249  min = reinterpret_cast<int32_t*>(encoded_min.data())[0];
250  max = reinterpret_cast<int32_t*>(encoded_max.data())[0];
251  } else {
252  CHECK(time_unit == parquet::LogicalType::TimeUnit::MICROS ||
253  time_unit == parquet::LogicalType::TimeUnit::NANOS);
254  CHECK_EQ(parquet::Type::INT64, physical_type);
255  min = reinterpret_cast<int64_t*>(encoded_min.data())[0];
256  max = reinterpret_cast<int64_t*>(encoded_max.data())[0];
257  }
258 
259  min /= divisor;
260  max /= divisor;
261  } else {
262  UNREACHABLE();
263  }
264  return {min, max};
265 }
#define CHECK_EQ(x, y)
Definition: Logger.h:205
Definition: sqltypes.h:51
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK(condition)
Definition: Logger.h:197
SQLTypeInfo columnType
int64_t get_divisor(const SQLTypeInfo &type, const parquet::LogicalType::TimeUnit::unit time_unit)
std::string columnName
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_decimal_min_and_max()

std::pair<int64_t, int64_t> foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::get_decimal_min_and_max ( const ColumnDescriptor column_descriptor,
const parquet::ColumnDescriptor *  parquet_column_descriptor,
std::shared_ptr< parquet::Statistics >  stats 
)

Gets the min and max Parquet chunk metadata for a decimal type.

Definition at line 278 of file LazyParquetImporter.cpp.

References CHECK, CHECK_GT, ColumnDescriptor::columnType, convert_decimal_byte_array_to_int(), run_benchmark_import::type, and UNREACHABLE.

Referenced by read_parquet_metadata_into_import_buffer().

281  {
282  auto& type = column_descriptor->columnType;
283  auto logical_type = parquet_column_descriptor->logical_type();
284  CHECK(type.is_decimal() && logical_type->is_decimal());
285 
286  auto decimal_type =
287  dynamic_cast<const parquet::DecimalLogicalType*>(logical_type.get());
288  CHECK(decimal_type);
289 
290  auto physical_type = parquet_column_descriptor->physical_type();
291  auto encoded_min = stats->EncodeMin();
292  auto encoded_max = stats->EncodeMax();
293 
294  int64_t min{0}, max{0};
295  CHECK_GT(decimal_type->precision(), 0);
296  if (physical_type == parquet::Type::BYTE_ARRAY ||
297  physical_type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
298  min = convert_decimal_byte_array_to_int(encoded_min);
299  max = convert_decimal_byte_array_to_int(encoded_max);
300  } else if (physical_type == parquet::Type::INT32) {
301  min = reinterpret_cast<int32_t*>(encoded_min.data())[0];
302  max = reinterpret_cast<int32_t*>(encoded_min.data())[0];
303  } else if (physical_type == parquet::Type::INT64) {
304  min = reinterpret_cast<int64_t*>(encoded_min.data())[0];
305  max = reinterpret_cast<int64_t*>(encoded_max.data())[0];
306  } else {
307  UNREACHABLE();
308  }
309  return {min, max};
310 }
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK_GT(x, y)
Definition: Logger.h:209
int64_t convert_decimal_byte_array_to_int(const std::string &encoded_value)
#define CHECK(condition)
Definition: Logger.h:197
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ get_divisor()

int64_t foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::get_divisor ( const SQLTypeInfo type,
const parquet::LogicalType::TimeUnit::unit  time_unit 
)

Gets the divisor to be used when converting from the given Parquet time unit to the OmniSci time unit/precision. The OmniSci time unit/precision is expected to either be in seconds or be an exact match of the Parquet time unit.

Parameters
type- OmniSci column type information
time_unit- Time unit/precision of column stored in the Parquet file
Returns
divisor

Definition at line 173 of file LazyParquetImporter.cpp.

References CHECK, SQLTypeInfo::get_precision(), foreign_storage::AllowedParquetMetadataTypeMappings::isSameTimeUnit(), MICROS_PER_SECOND, MILLIS_PER_SECOND, NANOS_PER_SECOND, and UNREACHABLE.

Referenced by get_datetime_min_and_max().

174  {
175  int64_t divisor = 1;
176  if (time_unit == parquet::LogicalType::TimeUnit::MILLIS) {
177  if (type.get_precision() == 0) {
178  divisor = MILLIS_PER_SECOND;
179  } else {
180  CHECK(AllowedParquetMetadataTypeMappings::isSameTimeUnit(type, time_unit));
181  }
182  } else if (time_unit == parquet::LogicalType::TimeUnit::MICROS) {
183  if (type.get_precision() == 0) {
184  divisor = MICROS_PER_SECOND;
185  } else {
186  CHECK(AllowedParquetMetadataTypeMappings::isSameTimeUnit(type, time_unit));
187  }
188  } else if (time_unit == parquet::LogicalType::TimeUnit::NANOS) {
189  if (type.get_precision() == 0) {
190  divisor = NANOS_PER_SECOND;
191  } else {
192  CHECK(AllowedParquetMetadataTypeMappings::isSameTimeUnit(type, time_unit));
193  }
194  } else {
195  UNREACHABLE();
196  }
197  return divisor;
198 }
int get_precision() const
Definition: sqltypes.h:262
#define UNREACHABLE()
Definition: Logger.h:241
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ import_row_group()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::import_row_group ( const int  row_group,
const bool  metadata_scan,
const ForeignTableSchema schema,
const Interval< ColumnType > &  column_interval,
import_export::Importer importer,
std::unique_ptr< parquet::arrow::FileReader > &  reader,
ParquetLoaderMetadata parquet_loader_metadata,
import_export::BadRowsTracker bad_rows_tracker,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffer_vec 
)

Definition at line 478 of file LazyParquetImporter.cpp.

References foreign_storage::Interval< T >::end, foreign_storage::ParquetLoaderMetadata::file_path, foreign_storage::ForeignTableSchema::getColumnDescriptor(), import_export::Importer::getLoader(), initialize_bad_rows_tracker(), initialize_import_buffers_vec(), initialize_row_group_metadata_vec(), import_export::Importer::load(), num_rows, foreign_storage::ForeignTableSchema::numLogicalAndPhysicalColumns(), read_parquet_data_into_import_buffer(), read_parquet_metadata_into_import_buffer(), foreign_storage::ParquetLoaderMetadata::row_group_metadata_vector, and foreign_storage::Interval< T >::start.

Referenced by import_row_groups().

487  {
488  auto loader = importer->getLoader();
489 
490  // Initialize
491  initialize_import_buffers_vec(loader, schema, import_buffer_vec);
492  initialize_row_group_metadata_vec(schema.numLogicalAndPhysicalColumns(),
493  parquet_loader_metadata.row_group_metadata_vector);
495  bad_rows_tracker, row_group, parquet_loader_metadata.file_path, importer);
496 
497  // Read metadata
498  auto file_metadata = reader->parquet_reader()->metadata();
499  std::unique_ptr<parquet::RowGroupMetaData> group_metadata =
500  file_metadata->RowGroup(row_group);
501  const size_t num_rows = group_metadata->num_rows();
502 
503  // Process each column into corresponding import buffer
504  for (int column_id = column_interval.start; column_id <= column_interval.end;
505  column_id++) {
506  const auto column_descriptor = schema.getColumnDescriptor(column_id);
507  if (metadata_scan) {
508  // Parquet metadata only import branch
510  row_group,
511  schema,
512  group_metadata,
513  column_descriptor,
514  bad_rows_tracker,
515  import_buffer_vec,
516  parquet_loader_metadata);
517  } else {
518  // Regular import branch
520  row_group,
521  schema,
522  column_descriptor,
523  reader,
524  bad_rows_tracker,
525  import_buffer_vec);
526  }
527  column_id += column_descriptor->columnType.get_physical_cols();
528  }
529  importer->load(import_buffer_vec, num_rows);
530 }
const int8_t const int64_t * num_rows
void initialize_bad_rows_tracker(import_export::BadRowsTracker &bad_rows_tracker, const int row_group, const std::string &file_path, import_export::Importer *importer)
void initialize_row_group_metadata_vec(const size_t num_logical_and_physical_columns, std::vector< RowGroupMetadata > &row_group_metadata_vec)
void load(const std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t row_count)
Definition: Importer.cpp:3261
T const end
Definition: Intervals.h:67
void read_parquet_data_into_import_buffer(const import_export::Loader *loader, const int row_group, const ForeignTableSchema &schema, const ColumnDescriptor *column_descriptor, std::unique_ptr< parquet::arrow::FileReader > &reader, import_export::BadRowsTracker &bad_rows_tracker, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
void read_parquet_metadata_into_import_buffer(const size_t num_rows, const int row_group, const ForeignTableSchema &schema, const std::unique_ptr< parquet::RowGroupMetaData > &group_metadata, const ColumnDescriptor *column_descriptor, import_export::BadRowsTracker &bad_rows_tracker, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec, ParquetLoaderMetadata &parquet_loader_metadata)
auto getLoader() const
Definition: Importer.h:810
void initialize_import_buffers_vec(const import_export::Loader *loader, const ForeignTableSchema &schema, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ import_row_groups()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::import_row_groups ( const RowGroupInterval row_group_interval,
bool &  load_failed,
const bool  metadata_scan,
const ForeignTableSchema schema,
const Interval< ColumnType > &  column_interval,
import_export::Importer importer,
std::unique_ptr< parquet::arrow::FileReader > &  reader,
ParquetLoaderMetadata parquet_loader_metadata,
import_export::BadRowsTracker bad_rows_tracker,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffer_vec 
)

Definition at line 532 of file LazyParquetImporter.cpp.

References CHECK_LE, foreign_storage::RowGroupInterval::end_index, import_row_group(), and foreign_storage::RowGroupInterval::start_index.

Referenced by foreign_storage::LazyParquetImporter::partialImport().

542  {
543  CHECK_LE(row_group_interval.start_index, row_group_interval.end_index);
544  for (int row_group = row_group_interval.start_index;
545  row_group <= row_group_interval.end_index && !load_failed;
546  ++row_group) {
547  import_row_group(row_group,
548  metadata_scan,
549  schema,
550  column_interval,
551  importer,
552  reader,
553  parquet_loader_metadata,
554  bad_rows_tracker,
555  import_buffer_vec);
556  }
557 }
void import_row_group(const int row_group, const bool metadata_scan, const ForeignTableSchema &schema, const Interval< ColumnType > &column_interval, import_export::Importer *importer, std::unique_ptr< parquet::arrow::FileReader > &reader, ParquetLoaderMetadata &parquet_loader_metadata, import_export::BadRowsTracker &bad_rows_tracker, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
#define CHECK_LE(x, y)
Definition: Logger.h:208
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initialize_bad_rows_tracker()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::initialize_bad_rows_tracker ( import_export::BadRowsTracker bad_rows_tracker,
const int  row_group,
const std::string &  file_path,
import_export::Importer importer 
)

Definition at line 122 of file LazyParquetImporter.cpp.

References import_export::BadRowsTracker::file_name, import_export::BadRowsTracker::importer, import_export::BadRowsTracker::nerrors, import_export::BadRowsTracker::row_group, and import_export::BadRowsTracker::rows.

Referenced by import_row_group().

125  {
126  bad_rows_tracker.rows.clear();
127  bad_rows_tracker.nerrors = 0;
128  bad_rows_tracker.file_name = file_path;
129  bad_rows_tracker.row_group = row_group;
130  bad_rows_tracker.importer = importer;
131 }
std::atomic< int > nerrors
Definition: Importer.h:77
std::set< int64_t > rows
Definition: Importer.h:76
+ Here is the caller graph for this function:

◆ initialize_import_buffer()

std::unique_ptr<import_export::TypedImportBuffer>& foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::initialize_import_buffer ( const size_t  column_id,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffer_vec 
)

Definition at line 150 of file LazyParquetImporter.cpp.

Referenced by read_parquet_data_into_import_buffer(), and read_parquet_metadata_into_import_buffer().

152  {
153  auto& import_buffer =
154  import_buffer_vec[column_id - 1]; // Column id starts at 1, hence the -1 offset
155  import_buffer->import_buffers = &import_buffer_vec;
156  import_buffer->col_idx = column_id;
157  return import_buffer;
158 }
+ Here is the caller graph for this function:

◆ initialize_import_buffers_vec()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::initialize_import_buffers_vec ( const import_export::Loader loader,
const ForeignTableSchema schema,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffer_vec 
)

Definition at line 133 of file LazyParquetImporter.cpp.

References foreign_storage::ForeignTableSchema::getLogicalAndPhysicalColumns(), and import_export::Loader::getStringDict().

Referenced by import_row_group().

136  {
137  import_buffer_vec.clear();
138  for (const auto cd : schema.getLogicalAndPhysicalColumns()) {
139  import_buffer_vec.emplace_back(new TypedImportBuffer(cd, loader->getStringDict(cd)));
140  }
141 }
StringDictionary * getStringDict(const ColumnDescriptor *cd) const
Definition: Importer.h:545
import_export::TypedImportBuffer TypedImportBuffer
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ initialize_row_group_metadata_vec()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::initialize_row_group_metadata_vec ( const size_t  num_logical_and_physical_columns,
std::vector< RowGroupMetadata > &  row_group_metadata_vec 
)

Definition at line 143 of file LazyParquetImporter.cpp.

Referenced by import_row_group().

145  {
146  row_group_metadata_vec.clear();
147  row_group_metadata_vec.resize(num_logical_and_physical_columns);
148 }
+ Here is the caller graph for this function:

◆ read_parquet_data_into_import_buffer()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::read_parquet_data_into_import_buffer ( const import_export::Loader loader,
const int  row_group,
const ForeignTableSchema schema,
const ColumnDescriptor column_descriptor,
std::unique_ptr< parquet::arrow::FileReader > &  reader,
import_export::BadRowsTracker bad_rows_tracker,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffer_vec 
)

Definition at line 441 of file LazyParquetImporter.cpp.

References ColumnDescriptor::columnId, ColumnDescriptor::columnName, foreign_storage::ForeignTableSchema::getLogicalColumn(), foreign_storage::ForeignTableSchema::getParquetColumnIndex(), import_export::Loader::getTableDesc(), initialize_import_buffer(), import_export::BadRowsTracker::rows, and TableDescriptor::tableName.

Referenced by import_row_group().

448  {
449  auto column_id = column_descriptor->columnId;
450  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
451  std::shared_ptr<arrow::ChunkedArray> array;
452  PARQUET_THROW_NOT_OK(
453  reader->RowGroup(row_group)->Column(parquet_column_index)->Read(&array));
454  auto& import_buffer = initialize_import_buffer(column_id, import_buffer_vec);
455  auto num_bad_rows_before_add_values = bad_rows_tracker.rows.size();
456  // TODO: the following async call & wait suppresses a false-positive
457  // lock-order-inversion warning from TSAN; remove its use once
458  // TypedImportBuffer is refactored
459  std::async(std::launch::async, [&] {
460  for (auto chunk : array->chunks()) {
461  import_buffer->add_arrow_values(
462  column_descriptor, *chunk, false, {0, chunk->length()}, &bad_rows_tracker);
463  }
464  }).wait();
465  // TODO: remove this check for failure to import geo-type once null
466  // geo-types are fixed
467  auto logical_column = schema.getLogicalColumn(column_id);
468  if (logical_column->columnType.is_geometry() &&
469  bad_rows_tracker.rows.size() > num_bad_rows_before_add_values) {
470  std::stringstream error_msg;
471  error_msg << "Failure to import geo column '" << column_descriptor->columnName
472  << "' in table '" << loader->getTableDesc()->tableName << "' for row group "
473  << row_group << " and row " << *bad_rows_tracker.rows.rbegin() << ".";
474  throw std::runtime_error(error_msg.str());
475  }
476 }
std::string tableName
std::unique_ptr< import_export::TypedImportBuffer > & initialize_import_buffer(const size_t column_id, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
const TableDescriptor * getTableDesc() const
Definition: Importer.h:540
std::set< int64_t > rows
Definition: Importer.h:76
std::string columnName
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ read_parquet_metadata_into_import_buffer()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::read_parquet_metadata_into_import_buffer ( const size_t  num_rows,
const int  row_group,
const ForeignTableSchema schema,
const std::unique_ptr< parquet::RowGroupMetaData > &  group_metadata,
const ColumnDescriptor column_descriptor,
import_export::BadRowsTracker bad_rows_tracker,
std::vector< std::unique_ptr< TypedImportBuffer >> &  import_buffer_vec,
ParquetLoaderMetadata parquet_loader_metadata 
)

Definition at line 358 of file LazyParquetImporter.cpp.

References ARROW_ASSIGN_OR_THROW, CHECK, ColumnDescriptor::columnId, ColumnDescriptor::columnType, get_datetime_min_and_max(), get_decimal_min_and_max(), SQLTypeInfo::get_physical_cols(), foreign_storage::get_sub_type_column_descriptor(), foreign_storage::ForeignTableSchema::getParquetColumnIndex(), initialize_import_buffer(), is_datetime(), num_rows, foreign_storage::ParquetLoaderMetadata::row_group_metadata_vector, run_benchmark_import::type, and update_array_metadata_stats().

Referenced by import_row_group().

366  {
367  auto column_id = column_descriptor->columnId;
368  auto& import_buffer = initialize_import_buffer(column_id, import_buffer_vec);
369  auto parquet_column_index = schema.getParquetColumnIndex(column_id);
370  auto column_chunk = group_metadata->ColumnChunk(parquet_column_index);
371  CHECK(column_chunk->is_stats_set());
372  std::shared_ptr<parquet::Statistics> stats = column_chunk->statistics();
373  bool is_all_nulls = stats->null_count() == column_chunk->num_values();
374  CHECK(is_all_nulls || stats->HasMinMax());
375 
376  const auto& type = column_descriptor->columnType;
377  auto parquet_column_descriptor = group_metadata->schema()->Column(parquet_column_index);
378  auto& row_group_metadata_vec = parquet_loader_metadata.row_group_metadata_vector;
379  if (!is_all_nulls) {
380  if (is_datetime(type.get_type()) ||
381  (type.is_array() && is_datetime(type.get_elem_type().get_type()))) {
382  if (type.is_array()) {
383  auto sub_type_column_descriptor =
384  get_sub_type_column_descriptor(column_descriptor);
385  auto [min, max] = get_datetime_min_and_max(
386  sub_type_column_descriptor.get(), parquet_column_descriptor, stats);
387  auto& metadata = row_group_metadata_vec[column_id - 1];
388  metadata.array_stats.updateStats(
389  sub_type_column_descriptor->columnType, min, max);
390  } else {
391  auto [min, max] =
392  get_datetime_min_and_max(column_descriptor, parquet_column_descriptor, stats);
393  import_buffer->addBigint(min);
394  import_buffer->addBigint(max);
395  }
396  } else if (type.is_decimal() ||
397  (type.is_array() && type.get_elem_type().is_decimal())) {
398  if (type.is_array()) {
399  auto sub_type_column_descriptor =
400  get_sub_type_column_descriptor(column_descriptor);
401  auto [min, max] = get_decimal_min_and_max(
402  sub_type_column_descriptor.get(), parquet_column_descriptor, stats);
403  auto& metadata = row_group_metadata_vec[column_id - 1];
404  metadata.array_stats.updateStats(
405  sub_type_column_descriptor->columnType, min, max);
406  } else {
407  auto [min, max] =
408  get_decimal_min_and_max(column_descriptor, parquet_column_descriptor, stats);
409  import_buffer->addBigint(min);
410  import_buffer->addBigint(max);
411  }
412  } else if (!type.is_string() && !type.is_varlen()) {
413  std::shared_ptr<arrow::Scalar> min, max;
414  PARQUET_THROW_NOT_OK(parquet::arrow::StatisticsAsScalars(*stats, &min, &max));
415  ARROW_ASSIGN_OR_THROW(auto min_array, arrow::MakeArrayFromScalar(*min, 1));
416  ARROW_ASSIGN_OR_THROW(auto max_array, arrow::MakeArrayFromScalar(*max, 1));
417  import_buffer->add_arrow_values(
418  column_descriptor, *min_array, false, {0, 1}, &bad_rows_tracker);
419  import_buffer->add_arrow_values(
420  column_descriptor, *max_array, false, {0, 1}, &bad_rows_tracker);
421  } else if (type.is_array() && !type.get_elem_type().is_string()) {
422  auto sub_type_column_descriptor = get_sub_type_column_descriptor(column_descriptor);
423  update_array_metadata_stats(row_group_metadata_vec[column_id - 1].array_stats,
424  sub_type_column_descriptor.get(),
425  parquet_column_descriptor,
426  stats);
427  }
428  }
429 
430  // Set the same metadata for the logical column and all its related physical columns
431  auto physical_columns_count = column_descriptor->columnType.get_physical_cols();
432  for (auto i = column_id - 1; i < (column_id + physical_columns_count); i++) {
433  row_group_metadata_vec[i].metadata_only = true;
434  row_group_metadata_vec[i].row_group_index = row_group;
435  row_group_metadata_vec[i].is_all_nulls = is_all_nulls;
436  row_group_metadata_vec[i].has_nulls = stats->null_count() > 0;
437  row_group_metadata_vec[i].num_elements = num_rows;
438  }
439 }
const int8_t const int64_t * num_rows
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr)
Definition: ArrowUtil.h:60
std::unique_ptr< ColumnDescriptor > get_sub_type_column_descriptor(const ColumnDescriptor *column)
std::unique_ptr< import_export::TypedImportBuffer > & initialize_import_buffer(const size_t column_id, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffer_vec)
int get_physical_cols() const
Definition: sqltypes.h:280
#define CHECK(condition)
Definition: Logger.h:197
std::pair< int64_t, int64_t > get_decimal_min_and_max(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
std::pair< int64_t, int64_t > get_datetime_min_and_max(const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
void update_array_metadata_stats(ArrayMetadataStats &array_stats, const ColumnDescriptor *column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor, std::shared_ptr< parquet::Statistics > stats)
SQLTypeInfo columnType
constexpr auto is_datetime(SQLTypes type)
Definition: sqltypes.h:202
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ type_to_string()

std::string foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::type_to_string ( SQLTypes  type)

Definition at line 36 of file LazyParquetImporter.cpp.

References SQLTypeInfo::get_type_name().

Referenced by validate_allowed_mapping().

36  {
37  return SQLTypeInfo(type, false).get_type_name();
38 }
std::string get_type_name() const
Definition: sqltypes.h:362
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ update_array_metadata_stats()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::update_array_metadata_stats ( ArrayMetadataStats array_stats,
const ColumnDescriptor column_descriptor,
const parquet::ColumnDescriptor *  parquet_column_descriptor,
std::shared_ptr< parquet::Statistics >  stats 
)

Definition at line 312 of file LazyParquetImporter.cpp.

References ColumnDescriptor::columnType, and foreign_storage::ArrayMetadataStats::updateStats().

Referenced by read_parquet_metadata_into_import_buffer().

316  {
317  auto encoded_min = stats->EncodeMin();
318  auto encoded_max = stats->EncodeMax();
319  switch (parquet_column_descriptor->physical_type()) {
320  case parquet::Type::BOOLEAN: {
321  auto min_value = reinterpret_cast<const bool*>(encoded_min.data())[0];
322  auto max_value = reinterpret_cast<const bool*>(encoded_max.data())[0];
323  array_stats.updateStats(column_descriptor->columnType, min_value, max_value);
324  break;
325  }
326  case parquet::Type::INT32: {
327  auto min_value = reinterpret_cast<const int32_t*>(encoded_min.data())[0];
328  auto max_value = reinterpret_cast<const int32_t*>(encoded_max.data())[0];
329  array_stats.updateStats(column_descriptor->columnType, min_value, max_value);
330  break;
331  }
332  case parquet::Type::INT64: {
333  auto min_value = reinterpret_cast<const int64_t*>(encoded_min.data())[0];
334  auto max_value = reinterpret_cast<const int64_t*>(encoded_max.data())[0];
335  array_stats.updateStats(column_descriptor->columnType, min_value, max_value);
336  break;
337  }
338  case parquet::Type::DOUBLE: {
339  auto min_value = reinterpret_cast<const double*>(encoded_min.data())[0];
340  auto max_value = reinterpret_cast<const double*>(encoded_max.data())[0];
341  array_stats.updateStats(column_descriptor->columnType, min_value, max_value);
342  break;
343  }
344  case parquet::Type::FLOAT: {
345  auto min_value = reinterpret_cast<const float*>(encoded_min.data())[0];
346  auto max_value = reinterpret_cast<const float*>(encoded_max.data())[0];
347  array_stats.updateStats(column_descriptor->columnType, min_value, max_value);
348  break;
349  }
350  default:
351  throw std::runtime_error(
352  "Unsupported physical type detected while"
353  " scanning metadata of parquet column '" +
354  parquet_column_descriptor->name() + "'.");
355  }
356 }
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ validate_allowed_mapping()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::validate_allowed_mapping ( const parquet::ColumnDescriptor *  descr,
const ColumnDescriptor cd 
)

Definition at line 40 of file LazyParquetImporter.cpp.

References ColumnDescriptor::columnType, SQLTypeInfo::get_type(), foreign_storage::AllowedParquetMetadataTypeMappings::isColumnMappingSupported(), run_benchmark_import::type, and type_to_string().

Referenced by validate_parquet_metadata().

41  {
42  const auto type = cd->columnType.get_type();
43  parquet::Type::type physical_type = descr->physical_type();
44  auto logical_type = descr->logical_type();
45  bool allowed_type =
46  AllowedParquetMetadataTypeMappings::isColumnMappingSupported(cd, descr);
47  if (!allowed_type) {
48  std::string parquet_type;
49  if (descr->logical_type()->is_none()) {
50  parquet_type = parquet::TypeToString(physical_type);
51  } else {
52  parquet_type = logical_type->ToString();
53  }
54  std::string omnisci_type = type_to_string(type);
55  throw std::runtime_error{"Conversion from Parquet type \"" + parquet_type +
56  "\" to OmniSci type \"" + omnisci_type +
57  "\" is not allowed. Please use an appropriate column type."};
58  }
59 }
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
SQLTypeInfo columnType
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ validate_equal_schema()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::validate_equal_schema ( const parquet::arrow::FileReader *  reference_file_reader,
const parquet::arrow::FileReader *  new_file_reader,
const std::string &  reference_file_path,
const std::string &  new_file_path 
)

Definition at line 559 of file LazyParquetImporter.cpp.

References foreign_storage::get_column_descriptor(), to_string(), and foreign_storage::validate_equal_column_descriptor().

Referenced by foreign_storage::LazyParquetImporter::metadataScan().

562  {
563  const auto reference_num_columns =
564  reference_file_reader->parquet_reader()->metadata()->num_columns();
565  const auto new_num_columns =
566  new_file_reader->parquet_reader()->metadata()->num_columns();
567  if (reference_num_columns != new_num_columns) {
568  throw std::runtime_error{"Parquet file \"" + new_file_path +
569  "\" has a different schema. Please ensure that all Parquet "
570  "files use the same schema. Reference Parquet file: \"" +
571  reference_file_path + "\" has " +
572  std::to_string(reference_num_columns) +
573  " columns. New Parquet file \"" + new_file_path + "\" has " +
574  std::to_string(new_num_columns) + " columns."};
575  }
576 
577  for (int i = 0; i < reference_num_columns; i++) {
579  get_column_descriptor(new_file_reader, i),
580  reference_file_path,
581  new_file_path);
582  }
583 }
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
std::string to_string(char const *&&v)
const ColumnDescriptor * get_column_descriptor(const int col_id, const int table_id, const Catalog_Namespace::Catalog &cat)
Definition: Execute.h:154
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ validate_parquet_metadata()

void foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::validate_parquet_metadata ( const std::shared_ptr< parquet::FileMetaData > &  file_metadata,
const std::string &  file_path,
const ForeignTableSchema schema 
)

Definition at line 61 of file LazyParquetImporter.cpp.

References foreign_storage::ForeignTableSchema::getForeignTable(), foreign_storage::ForeignTableSchema::getLogicalColumns(), TableDescriptor::maxFragRows, num_rows, foreign_storage::ForeignTableSchema::numLogicalColumns(), TableDescriptor::tableName, to_string(), and validate_allowed_mapping().

Referenced by foreign_storage::LazyParquetImporter::partialImport().

64  {
65  if (schema.numLogicalColumns() != file_metadata->num_columns()) {
66  std::stringstream error_msg;
67  error_msg << "Mismatched number of logical columns in table '"
68  << schema.getForeignTable()->tableName << "' ("
69  << schema.numLogicalColumns() << " columns) with parquet file '"
70  << file_path << "' (" << file_metadata->num_columns() << " columns.)";
71  throw std::runtime_error(error_msg.str());
72  }
73 
74  auto column_it = schema.getLogicalColumns().begin();
75  for (int i = 0; i < file_metadata->num_columns(); ++i, ++column_it) {
76  const parquet::ColumnDescriptor* descr = file_metadata->schema()->Column(i);
77  validate_allowed_mapping(descr, *column_it);
78 
79  auto fragment_size = schema.getForeignTable()->maxFragRows;
80  int64_t max_row_group_size = 0;
81  int max_row_group_index = 0;
82  for (int r = 0; r < file_metadata->num_row_groups(); ++r) {
83  auto group_metadata = file_metadata->RowGroup(r);
84  auto num_rows = group_metadata->num_rows();
85  if (num_rows > max_row_group_size) {
86  max_row_group_size = num_rows;
87  max_row_group_index = r;
88  }
89 
90  auto column_chunk = group_metadata->ColumnChunk(i);
91  bool contains_metadata = column_chunk->is_stats_set();
92  if (contains_metadata) {
93  auto stats = column_chunk->statistics();
94  bool is_all_nulls = (stats->null_count() == group_metadata->num_rows());
95  if (!stats->HasMinMax() && !is_all_nulls) {
96  contains_metadata = false;
97  }
98  }
99 
100  if (!contains_metadata) {
101  throw std::runtime_error{
102  "Statistics metadata is required for all row groups. Metadata is missing for "
103  "row group index: " +
104  std::to_string(r) + ", column index: " + std::to_string(i) +
105  ", file path: " + file_path};
106  }
107  }
108 
109  if (max_row_group_size > fragment_size) {
110  throw std::runtime_error{
111  "Parquet file has a row group size that is larger than the fragment size. "
112  "Please set the table fragment size to a number that is larger than the "
113  "row group size. Row group index: " +
114  std::to_string(max_row_group_index) +
115  ", row group size: " + std::to_string(max_row_group_size) +
116  ", fragment size: " + std::to_string(fragment_size) +
117  ", file path: " + file_path};
118  }
119  }
120 }
const int8_t const int64_t * num_rows
void validate_allowed_mapping(const parquet::ColumnDescriptor *descr, const ColumnDescriptor *cd)
std::string to_string(char const *&&v)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Variable Documentation

◆ MICROS_PER_SECOND

constexpr int foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::MICROS_PER_SECOND = 1000 * 1000

Definition at line 161 of file LazyParquetImporter.cpp.

Referenced by get_divisor().

◆ MILLIS_PER_SECOND

constexpr int foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::MILLIS_PER_SECOND = 1000

Definition at line 160 of file LazyParquetImporter.cpp.

Referenced by get_divisor().

◆ NANOS_PER_SECOND

constexpr int foreign_storage::anonymous_namespace{LazyParquetImporter.cpp}::NANOS_PER_SECOND = 1000 * 1000 * 1000

Definition at line 162 of file LazyParquetImporter.cpp.

Referenced by get_divisor().