OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::LazyParquetChunkLoader Class Reference

#include <LazyParquetChunkLoader.h>

+ Collaboration diagram for foreign_storage::LazyParquetChunkLoader:

Public Member Functions

 LazyParquetChunkLoader (std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache)
 
std::list< std::unique_ptr
< ChunkMetadata > > 
loadChunk (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr)
 
std::list< RowGroupMetadatametadataScan (const std::vector< std::string > &file_paths, const ForeignTableSchema &schema)
 Perform a metadata scan for the paths specified. More...
 
std::pair< size_t, size_t > loadRowGroups (const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries)
 Load row groups of data into given chunks. More...
 

Static Public Member Functions

static bool isColumnMappingSupported (const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
 

Static Public Attributes

static const int batch_reader_num_elements = 4096
 

Private Member Functions

std::list< std::unique_ptr
< ChunkMetadata > > 
appendRowGroups (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary)
 

Private Attributes

std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
FileReaderMapfile_reader_cache_
 

Detailed Description

A lazy parquet to chunk loader

Definition at line 35 of file LazyParquetChunkLoader.h.

Constructor & Destructor Documentation

foreign_storage::LazyParquetChunkLoader::LazyParquetChunkLoader ( std::shared_ptr< arrow::fs::FileSystem >  file_system,
FileReaderMap file_reader_cache 
)

Definition at line 1666 of file LazyParquetChunkLoader.cpp.

1669  : file_system_(file_system), file_reader_cache_(file_map) {}
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::appendRowGroups ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
const ColumnDescriptor column_descriptor,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary 
)
private

Definition at line 1537 of file LazyParquetChunkLoader.cpp.

References batch_reader_num_elements, CHECK, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::create_parquet_encoder(), DEBUG_TIMER, file_reader_cache_, file_system_, foreign_storage::get_column_descriptor(), foreign_storage::get_parquet_table_size(), foreign_storage::FileReaderMap::getOrInsert(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::resize_values_buffer(), to_string(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_definition_levels(), foreign_storage::validate_equal_column_descriptor(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level().

Referenced by loadChunk().

1542  {
1543  auto timer = DEBUG_TIMER(__func__);
1544  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1545  // `def_levels` and `rep_levels` below are used to store the read definition
1546  // and repetition levels of the Dremel encoding implemented by the Parquet
1547  // format
1548  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1549  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1550  std::vector<int8_t> values;
1551 
1552  CHECK(!row_group_intervals.empty());
1553  const auto& first_file_path = row_group_intervals.front().file_path;
1554 
1555  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1556  auto first_parquet_column_descriptor =
1557  get_column_descriptor(first_file_reader, parquet_column_index);
1558  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1559  auto encoder = create_parquet_encoder(column_descriptor,
1560  first_parquet_column_descriptor,
1561  chunks,
1562  string_dictionary,
1563  chunk_metadata);
1564  CHECK(encoder.get());
1565 
1566  for (const auto& row_group_interval : row_group_intervals) {
1567  const auto& file_path = row_group_interval.file_path;
1568  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1569 
1570  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1571  CHECK(row_group_interval.start_index >= 0 &&
1572  row_group_interval.end_index < num_row_groups);
1573  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1574 
1575  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1576  auto parquet_column_descriptor =
1577  get_column_descriptor(file_reader, parquet_column_index);
1578  validate_equal_column_descriptor(first_parquet_column_descriptor,
1579  parquet_column_descriptor,
1580  first_file_path,
1581  file_path);
1582 
1584  parquet_column_descriptor);
1585  int64_t values_read = 0;
1586  for (int row_group_index = row_group_interval.start_index;
1587  row_group_index <= row_group_interval.end_index;
1588  ++row_group_index) {
1589  auto group_reader = parquet_reader->RowGroup(row_group_index);
1590  std::shared_ptr<parquet::ColumnReader> col_reader =
1591  group_reader->Column(parquet_column_index);
1592 
1593  try {
1594  while (col_reader->HasNext()) {
1595  int64_t levels_read =
1597  def_levels.data(),
1598  rep_levels.data(),
1599  reinterpret_cast<uint8_t*>(values.data()),
1600  &values_read,
1601  col_reader.get());
1602 
1603  validate_definition_levels(parquet_reader,
1604  row_group_index,
1605  parquet_column_index,
1606  def_levels.data(),
1607  levels_read,
1608  parquet_column_descriptor);
1609 
1610  encoder->appendData(def_levels.data(),
1611  rep_levels.data(),
1612  values_read,
1613  levels_read,
1614  values.data());
1615  }
1616  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1617  array_encoder->finalizeRowGroup();
1618  }
1619  } catch (const std::exception& error) {
1620  throw ForeignStorageException(
1621  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1622  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1623  "', Parquet file: '" + file_path + "'");
1624  }
1625  }
1626  }
1627  return chunk_metadata;
1628 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
std::string to_string(char const *&&v)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const bool is_metadata_scan=false, const bool is_for_import=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
void validate_definition_levels(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::LazyParquetChunkLoader::isColumnMappingSupported ( const ColumnDescriptor omnisci_column,
const parquet::ColumnDescriptor *  parquet_column 
)
static

Determine if a Parquet to OmniSci column mapping is supported.

Parameters
omnisci_column- the column descriptor of the OmniSci column
parquet_column- the column descriptor of the Parquet column
Returns
true if the column mapping is supported by LazyParquetChunkLoader, false otherwise

Definition at line 1630 of file LazyParquetChunkLoader.cpp.

References foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_date_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_decimal_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_floating_point_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_geospatial_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_integral_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_none_type_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_string_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_time_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_timestamp_mapping().

Referenced by foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping().

1632  {
1633  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1634  return true;
1635  }
1636  if (validate_array_mapping(omnisci_column, parquet_column)) {
1637  return true;
1638  }
1639  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1640  return true;
1641  }
1642  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1643  return true;
1644  }
1645  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1646  return true;
1647  }
1648  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1649  return true;
1650  }
1651  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1652  return true;
1653  }
1654  if (validate_time_mapping(omnisci_column, parquet_column)) {
1655  return true;
1656  }
1657  if (validate_date_mapping(omnisci_column, parquet_column)) {
1658  return true;
1659  }
1660  if (validate_string_mapping(omnisci_column, parquet_column)) {
1661  return true;
1662  }
1663  return false;
1664 }
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::loadChunk ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary = nullptr 
)

Load a number of row groups of a column in a parquet file into a chunk

Parameters
row_group_interval- an inclusive interval [start,end] that specifies row groups to load
parquet_column_index- the logical column index in the parquet file (and omnisci db) of column to load
chunks- a list containing the chunks to load
string_dictionary- a string dictionary for the column corresponding to the column, if applicable
Returns
An empty list when no metadata update is applicable, otherwise a list of ChunkMetadata shared pointers with which to update the corresponding column chunk metadata. NOTE: Only ChunkMetadata.sqlType and the min & max values of the ChunkMetadata.chunkStats are valid, other values are not set.

NOTE: if more than one chunk is supplied, the first chunk is required to be the chunk corresponding to the logical column, while the remaining chunks correspond to physical columns (in ascending order of column id.) Similarly, if a metada update is expected, the list of ChunkMetadata shared pointers returned will correspond directly to the list chunks.

Definition at line 1671 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), and CHECK.

Referenced by foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader().

1675  {
1676  CHECK(!chunks.empty());
1677  auto const& chunk = *chunks.begin();
1678  auto column_descriptor = chunk.getColumnDesc();
1679  auto buffer = chunk.getBuffer();
1680  CHECK(buffer);
1681 
1682  try {
1683  auto metadata = appendRowGroups(row_group_intervals,
1684  parquet_column_index,
1685  column_descriptor,
1686  chunks,
1687  string_dictionary);
1688  return metadata;
1689  } catch (const std::exception& error) {
1690  throw ForeignStorageException(error.what());
1691  }
1692 
1693  return {};
1694 }
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary)
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< size_t, size_t > foreign_storage::LazyParquetChunkLoader::loadRowGroups ( const RowGroupInterval row_group_interval,
const std::map< int, Chunk_NS::Chunk > &  chunks,
const ForeignTableSchema schema,
const std::map< int, StringDictionary * > &  column_dictionaries 
)

Load row groups of data into given chunks.

Parameters
row_group_interval- specifies which row groups to load
chunks- map of column index to chunk which data will be loaded into
schema- schema of the foreign table to perform metadata scan for
column_dictionaries- a map of string dictionaries for columns that require it
Returns
[num_rows_completed,num_rows_rejected] - returns number of rows loaded and rejected while loading

Note that only logical chunks are expected because the data is read into an intermediate form into the underlying buffers. This member is intended to be used for import.

NOTE: Currently, loading one row group at a time is required.

Definition at line 1779 of file LazyParquetChunkLoader.cpp.

References CHECK, DEBUG_TIMER, foreign_storage::RowGroupInterval::end_index, foreign_storage::RowGroupInterval::file_path, file_reader_cache_, file_system_, shared::get_from_map(), foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getColumnDescriptor(), foreign_storage::ForeignTableSchema::getLogicalColumns(), foreign_storage::ForeignTableSchema::getParquetColumnIndex(), i, foreign_storage::FileReaderMap::insert(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_import(), foreign_storage::RowGroupInterval::start_index, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_number_of_columns().

1783  {
1784  auto timer = DEBUG_TIMER(__func__);
1785 
1786  const auto& file_path = row_group_interval.file_path;
1787  auto file_reader = file_reader_cache_->insert(file_path, file_system_);
1788  auto file_metadata = file_reader->parquet_reader()->metadata();
1789 
1790  validate_number_of_columns(file_metadata, file_path, schema);
1791 
1792  // check for fixed length encoded columns and indicate to the user
1793  // they should not be used
1794  for (const auto column_descriptor : schema.getLogicalColumns()) {
1795  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
1796  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
1797  try {
1798  validate_allowed_mapping(parquet_column, column_descriptor);
1799  } catch (std::runtime_error& e) {
1800  std::stringstream error_message;
1801  error_message << e.what() << " Parquet column: " << parquet_column->name()
1802  << ", OmniSci column: " << column_descriptor->columnName
1803  << ", Parquet file: " << file_path << ".";
1804  throw std::runtime_error(error_message.str());
1805  }
1806  }
1807 
1808  auto encoder_map =
1809  populate_encoder_map_for_import(chunks, schema, file_reader, column_dictionaries);
1810 
1811  CHECK(row_group_interval.start_index == row_group_interval.end_index);
1812  auto row_group_index = row_group_interval.start_index;
1813  std::map<int, ParquetRowGroupReader> row_group_reader_map;
1814 
1815  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1816  auto group_reader = parquet_reader->RowGroup(row_group_index);
1817  InvalidRowGroupIndices invalid_indices;
1818  for (auto& [column_id, encoder] : encoder_map) {
1819  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
1820  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
1821  auto parquet_column_descriptor =
1822  file_metadata->schema()->Column(parquet_column_index);
1823 
1824  // validate
1825  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1826  CHECK(row_group_interval.start_index >= 0 &&
1827  row_group_interval.end_index < num_row_groups);
1828  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1830  parquet_column_descriptor);
1831 
1832  std::shared_ptr<parquet::ColumnReader> col_reader =
1833  group_reader->Column(parquet_column_index);
1834 
1835  row_group_reader_map.insert({column_id,
1836  ParquetRowGroupReader(col_reader,
1837  column_descriptor,
1838  parquet_column_descriptor,
1839  encoder_map[column_id].get(),
1840  invalid_indices,
1841  row_group_index,
1842  parquet_column_index,
1843  parquet_reader)});
1844  }
1845 
1846  for (auto& [_, reader] : row_group_reader_map) {
1847  reader.readAndValidateRowGroup(); // reads and validates entire row group per column
1848  }
1849 
1850  for (auto& [_, reader] : row_group_reader_map) {
1851  reader.eraseInvalidRowGroupData(); // removes invalid encoded data in buffers
1852  }
1853 
1854  // update the element count for each encoder
1855  for (const auto column_descriptor : schema.getLogicalColumns()) {
1856  auto column_id = column_descriptor->columnId;
1857  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
1858  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
1859  invalid_indices.size());
1860  size_t updated_num_elems = db_encoder->getNumElems() +
1861  group_reader->metadata()->num_rows() -
1862  invalid_indices.size();
1863  db_encoder->setNumElems(updated_num_elems);
1864  if (column_descriptor->columnType.is_geometry()) {
1865  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
1866  auto db_encoder =
1867  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
1868  db_encoder->setNumElems(updated_num_elems);
1869  }
1870  }
1871  }
1872 
1873  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
1874  invalid_indices.size()};
1875 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries)
std::set< int64_t > InvalidRowGroupIndices
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
V & get_from_map(std::map< K, V > &map, const K &key)
Definition: misc.h:58
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_

+ Here is the call graph for this function:

std::list< RowGroupMetadata > foreign_storage::LazyParquetChunkLoader::metadataScan ( const std::vector< std::string > &  file_paths,
const ForeignTableSchema schema 
)

Perform a metadata scan for the paths specified.

Parameters
file_paths- (ordered) files of the metadata scan
schema- schema of the foreign table to perform metadata scan for
Returns
a list of the row group metadata extracted from file_paths

Definition at line 1877 of file LazyParquetChunkLoader.cpp.

References threading_serial::async(), CHECK, DEBUG_TIMER, file_reader_cache_, file_system_, g_max_import_threads, foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getLogicalAndPhysicalColumns(), foreign_storage::FileReaderMap::initializeIfEmpty(), foreign_storage::FileReaderMap::insert(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::metadata_scan_rowgroup_interval(), foreign_storage::partition_for_threads(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_metadata_scan(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_parquet_metadata().

Referenced by foreign_storage::ParquetDataWrapper::metadataScanFiles().

1879  {
1880  auto timer = DEBUG_TIMER(__func__);
1881  auto column_interval =
1882  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
1883  schema.getLogicalAndPhysicalColumns().back()->columnId};
1884  CHECK(!file_paths.empty());
1885 
1886  // The encoder map needs to be populated before we can start scanning rowgroups, so we
1887  // peel the first file_path out of the async loop below to perform population.
1888  const auto& first_path = *file_paths.begin();
1889  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
1891  first_reader->parquet_reader()->metadata(), first_path, schema);
1892  auto encoder_map =
1893  populate_encoder_map_for_metadata_scan(column_interval, schema, first_reader);
1894  const auto num_row_groups = get_parquet_table_size(first_reader).first;
1895  auto row_group_metadata = metadata_scan_rowgroup_interval(
1896  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
1897 
1898  // We want each (filepath->FileReader) pair in the cache to be initialized before we
1899  // multithread so that we are not adding keys in a concurrent environment, so we add
1900  // cache entries for each path and initialize to an empty unique_ptr if the file has not
1901  // yet been opened.
1902  // Since we have already performed the first iteration, we skip it in the thread groups
1903  // so as not to process it twice.
1904  std::vector<std::string> cache_subset;
1905  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
1907  cache_subset.emplace_back(*path_it);
1908  }
1909 
1910  // Iterate asyncronously over any paths beyond the first.
1911  auto paths_per_thread = partition_for_threads(cache_subset, g_max_import_threads);
1912  std::vector<std::future<std::list<RowGroupMetadata>>> futures;
1913  for (const auto& path_group : paths_per_thread) {
1914  futures.emplace_back(std::async(
1916  [&](const auto& paths, const auto& file_reader_cache) {
1917  std::list<RowGroupMetadata> reduced_metadata;
1918  for (const auto& path : paths.get()) {
1919  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
1920  validate_equal_schema(first_reader, reader, first_path, path);
1921  validate_parquet_metadata(reader->parquet_reader()->metadata(), path, schema);
1922  const auto num_row_groups = get_parquet_table_size(reader).first;
1923  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
1924  reduced_metadata.splice(
1925  reduced_metadata.end(),
1926  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
1927  }
1928  return reduced_metadata;
1929  },
1930  std::ref(path_group),
1931  std::ref(*file_reader_cache_)));
1932  }
1933 
1934  // Reduce all the row_group results.
1935  for (auto& future : futures) {
1936  row_group_metadata.splice(row_group_metadata.end(), future.get());
1937  }
1938  return row_group_metadata;
1939 }
void validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: ParquetShared.h:41
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
future< Result > async(Fn &&fn, Args &&...args)
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
void initializeIfEmpty(const std::string &path)
#define CHECK(condition)
Definition: Logger.h:209
#define DEBUG_TIMER(name)
Definition: Logger.h:352
std::shared_ptr< arrow::fs::FileSystem > file_system_
size_t g_max_import_threads
Definition: Importer.cpp:85

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

const int foreign_storage::LazyParquetChunkLoader::batch_reader_num_elements = 4096
static
FileReaderMap* foreign_storage::LazyParquetChunkLoader::file_reader_cache_
private

Definition at line 122 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), loadRowGroups(), and metadataScan().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::LazyParquetChunkLoader::file_system_
private

Definition at line 121 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), loadRowGroups(), and metadataScan().


The documentation for this class was generated from the following files: