OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::LazyParquetChunkLoader Class Reference

#include <LazyParquetChunkLoader.h>

+ Collaboration diagram for foreign_storage::LazyParquetChunkLoader:

Public Member Functions

 LazyParquetChunkLoader (std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache)
 
std::list< std::unique_ptr
< ChunkMetadata > > 
loadChunk (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
 
std::list< RowGroupMetadatametadataScan (const std::set< std::string > &file_paths, const ForeignTableSchema &schema)
 Perform a metadata scan for the paths specified. More...
 

Static Public Member Functions

static bool isColumnMappingSupported (const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
 

Static Public Attributes

static const int batch_reader_num_elements = 4096
 

Private Member Functions

std::list< std::unique_ptr
< ChunkMetadata > > 
appendRowGroups (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary)
 

Private Attributes

std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
FileReaderMapfile_reader_cache_
 

Detailed Description

A lazy parquet to chunk loader

Definition at line 35 of file LazyParquetChunkLoader.h.

Constructor & Destructor Documentation

foreign_storage::LazyParquetChunkLoader::LazyParquetChunkLoader ( std::shared_ptr< arrow::fs::FileSystem >  file_system,
FileReaderMap file_reader_cache 
)

Definition at line 1478 of file LazyParquetChunkLoader.cpp.

1481  : file_system_(file_system), file_reader_cache_(file_map) {}
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::appendRowGroups ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
const ColumnDescriptor column_descriptor,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary 
)
private

Definition at line 1359 of file LazyParquetChunkLoader.cpp.

References batch_reader_num_elements, CHECK, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::create_parquet_encoder(), DEBUG_TIMER, file_reader_cache_, file_system_, foreign_storage::get_column_descriptor(), foreign_storage::get_parquet_table_size(), foreign_storage::FileReaderMap::getOrInsert(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::resize_values_buffer(), to_string(), foreign_storage::validate_equal_column_descriptor(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level().

Referenced by loadChunk().

1364  {
1365  auto timer = DEBUG_TIMER(__func__);
1366  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1367  // `def_levels` and `rep_levels` below are used to store the read definition
1368  // and repetition levels of the Dremel encoding implemented by the Parquet
1369  // format
1370  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1371  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1372  std::vector<int8_t> values;
1373 
1374  CHECK(!row_group_intervals.empty());
1375  const auto& first_file_path = row_group_intervals.front().file_path;
1376 
1377  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1378  auto first_parquet_column_descriptor =
1379  get_column_descriptor(first_file_reader, parquet_column_index);
1380  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1381  auto encoder = create_parquet_encoder(column_descriptor,
1382  first_parquet_column_descriptor,
1383  chunks,
1384  string_dictionary,
1385  chunk_metadata);
1386  CHECK(encoder.get());
1387 
1388  for (const auto& row_group_interval : row_group_intervals) {
1389  const auto& file_path = row_group_interval.file_path;
1390  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1391 
1392  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1393  CHECK(row_group_interval.start_index >= 0 &&
1394  row_group_interval.end_index < num_row_groups);
1395  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1396 
1397  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1398  auto parquet_column_descriptor =
1399  get_column_descriptor(file_reader, parquet_column_index);
1400  validate_equal_column_descriptor(first_parquet_column_descriptor,
1401  parquet_column_descriptor,
1402  first_file_path,
1403  file_path);
1404 
1406  parquet_column_descriptor);
1407  int64_t values_read = 0;
1408  for (int row_group_index = row_group_interval.start_index;
1409  row_group_index <= row_group_interval.end_index;
1410  ++row_group_index) {
1411  auto group_reader = parquet_reader->RowGroup(row_group_index);
1412  std::shared_ptr<parquet::ColumnReader> col_reader =
1413  group_reader->Column(parquet_column_index);
1414 
1415  try {
1416  while (col_reader->HasNext()) {
1417  int64_t levels_read =
1419  def_levels.data(),
1420  rep_levels.data(),
1421  reinterpret_cast<uint8_t*>(values.data()),
1422  &values_read,
1423  col_reader.get());
1424  encoder->appendData(def_levels.data(),
1425  rep_levels.data(),
1426  values_read,
1427  levels_read,
1428  !col_reader->HasNext(),
1429  values.data());
1430  }
1431  } catch (const std::exception& error) {
1432  throw ForeignStorageException(
1433  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1434  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1435  "', Parquet file: '" + file_path + "'");
1436  }
1437  }
1438  }
1439  return chunk_metadata;
1440 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
std::string to_string(char const *&&v)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:89
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::LazyParquetChunkLoader::isColumnMappingSupported ( const ColumnDescriptor omnisci_column,
const parquet::ColumnDescriptor *  parquet_column 
)
static

Determine if a Parquet to OmniSci column mapping is supported.

Parameters
omnisci_column- the column descriptor of the OmniSci column
parquet_column- the column descriptor of the Parquet column
Returns
true if the column mapping is supported by LazyParquetChunkLoader, false otherwise

Definition at line 1442 of file LazyParquetChunkLoader.cpp.

References foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_date_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_decimal_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_floating_point_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_geospatial_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_integral_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_none_type_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_string_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_time_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_timestamp_mapping().

Referenced by foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping().

1444  {
1445  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1446  return true;
1447  }
1448  if (validate_array_mapping(omnisci_column, parquet_column)) {
1449  return true;
1450  }
1451  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1452  return true;
1453  }
1454  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1455  return true;
1456  }
1457  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1458  return true;
1459  }
1460  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1461  return true;
1462  }
1463  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1464  return true;
1465  }
1466  if (validate_time_mapping(omnisci_column, parquet_column)) {
1467  return true;
1468  }
1469  if (validate_date_mapping(omnisci_column, parquet_column)) {
1470  return true;
1471  }
1472  if (validate_string_mapping(omnisci_column, parquet_column)) {
1473  return true;
1474  }
1475  return false;
1476 }
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::loadChunk ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary = nullptr 
)

Load a number of row groups of a column in a parquet file into a chunk

Parameters
row_group_interval- an inclusive interval [start,end] that specifies row groups to load
parquet_column_index- the logical column index in the parquet file (and omnisci db) of column to load
chunks- a list containing the chunks to load
string_dictionary- a string dictionary for the column corresponding to the column, if applicable
Returns
An empty list when no metadata update is applicable, otherwise a list of ChunkMetadata shared pointers with which to update the corresponding column chunk metadata. NOTE: Only ChunkMetadata.sqlType and the min & max values of the ChunkMetadata.chunkStats are valid, other values are not set.

NOTE: if more than one chunk is supplied, the first chunk is required to be the chunk corresponding to the logical column, while the remaining chunks correspond to physical columns (in ascending order of column id.) Similarly, if a metada update is expected, the list of ChunkMetadata shared pointers returned will correspond directly to the list chunks.

Definition at line 1483 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), and CHECK.

Referenced by foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader().

1487  {
1488  CHECK(!chunks.empty());
1489  auto const& chunk = *chunks.begin();
1490  auto column_descriptor = chunk.getColumnDesc();
1491  auto buffer = chunk.getBuffer();
1492  CHECK(buffer);
1493 
1494  try {
1495  auto metadata = appendRowGroups(row_group_intervals,
1496  parquet_column_index,
1497  column_descriptor,
1498  chunks,
1499  string_dictionary);
1500  return metadata;
1501  } catch (const std::exception& error) {
1502  throw ForeignStorageException(error.what());
1503  }
1504 
1505  return {};
1506 }
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary)
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< RowGroupMetadata > foreign_storage::LazyParquetChunkLoader::metadataScan ( const std::set< std::string > &  file_paths,
const ForeignTableSchema schema 
)

Perform a metadata scan for the paths specified.

Parameters
file_paths- (ordered) files of the metadata scan
schema- schema of the foreign table to perform metadata scan for
Returns
a list of the row group metadata extracted from file_paths

Definition at line 1508 of file LazyParquetChunkLoader.cpp.

References CHECK, DEBUG_TIMER, file_reader_cache_, file_system_, g_max_import_threads, foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getLogicalAndPhysicalColumns(), foreign_storage::FileReaderMap::initializeIfEmpty(), foreign_storage::FileReaderMap::insert(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::metadata_scan_rowgroup_interval(), foreign_storage::partition_for_threads(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_parquet_metadata().

Referenced by foreign_storage::ParquetDataWrapper::metadataScanFiles().

1510  {
1511  auto timer = DEBUG_TIMER(__func__);
1512  auto column_interval =
1513  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1514  schema.getLogicalAndPhysicalColumns().back()->columnId};
1515  CHECK(!file_paths.empty());
1516 
1517  // The encoder map needs to be populated before we can start scanning rowgroups, so we
1518  // peel the first file_path out of the async loop below to perform population.
1519  const auto& first_path = *file_paths.begin();
1520  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
1522  first_reader->parquet_reader()->metadata(), first_path, schema);
1523  auto encoder_map = populate_encoder_map(column_interval, schema, first_reader);
1524  const auto num_row_groups = get_parquet_table_size(first_reader).first;
1525  auto row_group_metadata = metadata_scan_rowgroup_interval(
1526  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
1527 
1528  // We want each (filepath->FileReader) pair in the cache to be initialized before we
1529  // multithread so that we are not adding keys in a concurrent environment, so we add
1530  // cache entries for each path and initialize to an empty unique_ptr if the file has not
1531  // yet been opened.
1532  // Since we have already performed the first iteration, we skip it in the thread groups
1533  // so as not to process it twice.
1534  std::set<std::string> cache_subset;
1535  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
1537  cache_subset.insert(*path_it);
1538  }
1539 
1540  // Iterate asyncronously over any paths beyond the first.
1541  auto paths_per_thread = partition_for_threads(cache_subset, g_max_import_threads);
1542  std::vector<std::future<std::list<RowGroupMetadata>>> futures;
1543  for (const auto& path_group : paths_per_thread) {
1544  futures.emplace_back(std::async(
1545  std::launch::async,
1546  [&](const auto& paths, const auto& file_reader_cache) {
1547  std::list<RowGroupMetadata> reduced_metadata;
1548  for (const auto& path : paths.get()) {
1549  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
1550  validate_equal_schema(first_reader, reader, first_path, path);
1551  validate_parquet_metadata(reader->parquet_reader()->metadata(), path, schema);
1552  const auto num_row_groups = get_parquet_table_size(reader).first;
1553  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
1554  reduced_metadata.splice(
1555  reduced_metadata.end(),
1556  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
1557  }
1558  return reduced_metadata;
1559  },
1560  std::ref(path_group),
1561  std::ref(*file_reader_cache_)));
1562  }
1563 
1564  // Reduce all the row_group results.
1565  for (auto& future : futures) {
1566  row_group_metadata.splice(row_group_metadata.end(), future.get());
1567  }
1568  return row_group_metadata;
1569 }
void validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader)
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:98
void initializeIfEmpty(const std::string &path)
#define CHECK(condition)
Definition: Logger.h:203
#define DEBUG_TIMER(name)
Definition: Logger.h:319
std::shared_ptr< arrow::fs::FileSystem > file_system_
size_t g_max_import_threads
Definition: Importer.cpp:84

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

const int foreign_storage::LazyParquetChunkLoader::batch_reader_num_elements = 4096
static
FileReaderMap* foreign_storage::LazyParquetChunkLoader::file_reader_cache_
private

Definition at line 99 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), and metadataScan().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::LazyParquetChunkLoader::file_system_
private

Definition at line 98 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), and metadataScan().


The documentation for this class was generated from the following files: