OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::LazyParquetChunkLoader Class Reference

#include <LazyParquetChunkLoader.h>

+ Collaboration diagram for foreign_storage::LazyParquetChunkLoader:

Public Member Functions

 LazyParquetChunkLoader (std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const RenderGroupAnalyzerMap *render_group_analyzer_map, const std::string &foreign_table_name)
 
std::list< std::unique_ptr
< ChunkMetadata > > 
loadChunk (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
 
std::list< RowGroupMetadatametadataScan (const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
 Perform a metadata scan for the paths specified. More...
 
std::pair< size_t, size_t > loadRowGroups (const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
 Load row groups of data into given chunks. More...
 
DataPreview previewFiles (const std::vector< std::string > &files, const size_t max_num_rows, const ForeignTable &table)
 Preview rows of data and column types in a set of files. More...
 

Static Public Member Functions

static bool isColumnMappingSupported (const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
 

Static Public Attributes

static const int batch_reader_num_elements = 4096
 

Private Member Functions

std::list< std::unique_ptr
< ChunkMetadata > > 
appendRowGroups (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
 

Static Private Member Functions

static SQLTypeInfo suggestColumnMapping (const parquet::ColumnDescriptor *parquet_column)
 

Private Attributes

std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
FileReaderMapfile_reader_cache_
 
const RenderGroupAnalyzerMaprender_group_analyzer_map_
 
std::string foreign_table_name_
 

Detailed Description

A lazy parquet to chunk loader

Definition at line 37 of file LazyParquetChunkLoader.h.

Constructor & Destructor Documentation

foreign_storage::LazyParquetChunkLoader::LazyParquetChunkLoader ( std::shared_ptr< arrow::fs::FileSystem >  file_system,
FileReaderMap file_reader_cache,
const RenderGroupAnalyzerMap render_group_analyzer_map,
const std::string &  foreign_table_name 
)

Definition at line 1978 of file LazyParquetChunkLoader.cpp.

1983  : file_system_(file_system)
1984  , file_reader_cache_(file_map)
1985  , render_group_analyzer_map_{render_group_analyzer_map}
1986  , foreign_table_name_(foreign_table_name) {}
const RenderGroupAnalyzerMap * render_group_analyzer_map_
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::appendRowGroups ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
const ColumnDescriptor column_descriptor,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary,
RejectedRowIndices rejected_row_indices,
const bool  is_for_detect = false,
const std::optional< int64_t >  max_levels_read = std::nullopt 
)
private

Definition at line 1785 of file LazyParquetChunkLoader.cpp.

References batch_reader_num_elements, CHECK, ColumnDescriptor::columnType, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::create_parquet_encoder(), DEBUG_TIMER, file_reader_cache_, file_system_, foreign_table_name_, foreign_storage::get_column_descriptor(), foreign_storage::get_parquet_table_size(), foreign_storage::FileReaderMap::getOrInsert(), render_group_analyzer_map_, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::resize_values_buffer(), to_string(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_definition_levels(), foreign_storage::validate_equal_column_descriptor(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level().

Referenced by loadChunk(), and previewFiles().

1793  {
1794  auto timer = DEBUG_TIMER(__func__);
1795  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1796  // `def_levels` and `rep_levels` below are used to store the read definition
1797  // and repetition levels of the Dremel encoding implemented by the Parquet
1798  // format
1799  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1800  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1801  std::vector<int8_t> values;
1802 
1803  CHECK(!row_group_intervals.empty());
1804  const auto& first_file_path = row_group_intervals.front().file_path;
1805 
1806  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1807  auto first_parquet_column_descriptor =
1808  get_column_descriptor(first_file_reader, parquet_column_index);
1809  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1810  auto encoder = create_parquet_encoder(column_descriptor,
1811  first_parquet_column_descriptor,
1812  chunks,
1813  string_dictionary,
1814  chunk_metadata,
1816  false,
1817  false,
1818  is_for_detect);
1819  CHECK(encoder.get());
1820 
1821  if (rejected_row_indices) { // error tracking is enabled
1822  encoder->initializeErrorTracking(column_descriptor->columnType);
1823  }
1824 
1825  bool early_exit = false;
1826  int64_t total_levels_read = 0;
1827  for (const auto& row_group_interval : row_group_intervals) {
1828  const auto& file_path = row_group_interval.file_path;
1829  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1830 
1831  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1832  CHECK(row_group_interval.start_index >= 0 &&
1833  row_group_interval.end_index < num_row_groups);
1834  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1835 
1836  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1837  auto parquet_column_descriptor =
1838  get_column_descriptor(file_reader, parquet_column_index);
1839  validate_equal_column_descriptor(first_parquet_column_descriptor,
1840  parquet_column_descriptor,
1841  first_file_path,
1842  file_path);
1843 
1845  parquet_column_descriptor);
1846  int64_t values_read = 0;
1847  for (int row_group_index = row_group_interval.start_index;
1848  row_group_index <= row_group_interval.end_index;
1849  ++row_group_index) {
1850  auto group_reader = parquet_reader->RowGroup(row_group_index);
1851  std::shared_ptr<parquet::ColumnReader> col_reader =
1852  group_reader->Column(parquet_column_index);
1853 
1854  try {
1855  while (col_reader->HasNext()) {
1856  int64_t levels_read =
1858  def_levels.data(),
1859  rep_levels.data(),
1860  reinterpret_cast<uint8_t*>(values.data()),
1861  &values_read,
1862  col_reader.get());
1863 
1864  validate_definition_levels(parquet_reader,
1865  row_group_index,
1866  parquet_column_index,
1867  def_levels.data(),
1868  levels_read,
1869  parquet_column_descriptor);
1870 
1871  if (rejected_row_indices) { // error tracking is enabled
1872  encoder->appendDataTrackErrors(def_levels.data(),
1873  rep_levels.data(),
1874  values_read,
1875  levels_read,
1876  values.data());
1877  } else { // no error tracking enabled
1878  encoder->appendData(def_levels.data(),
1879  rep_levels.data(),
1880  values_read,
1881  levels_read,
1882  values.data());
1883  }
1884 
1885  if (max_levels_read.has_value()) {
1886  total_levels_read += levels_read;
1887  if (total_levels_read >= max_levels_read.value()) {
1888  early_exit = true;
1889  break;
1890  }
1891  }
1892  }
1893  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1894  array_encoder->finalizeRowGroup();
1895  }
1896  } catch (const std::exception& error) {
1897  // check for a specific error to detect a possible unexpected switch of data
1898  // source in order to respond with informative error message
1899  if (boost::regex_search(error.what(),
1900  boost::regex{"Deserializing page header failed."})) {
1901  throw ForeignStorageException(
1902  "Unable to read from foreign data source, possible cause is an unexpected "
1903  "change of source. Please use the \"REFRESH FOREIGN TABLES\" command on "
1904  "the "
1905  "foreign table "
1906  "if data source has been updated. Foreign table: " +
1908  }
1909 
1910  throw ForeignStorageException(
1911  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1912  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1913  "', Parquet file: '" + file_path + "'");
1914  }
1915  if (max_levels_read.has_value() && early_exit) {
1916  break;
1917  }
1918  }
1919  if (max_levels_read.has_value() && early_exit) {
1920  break;
1921  }
1922  }
1923 
1924  if (rejected_row_indices) { // error tracking is enabled
1925  *rejected_row_indices = encoder->getRejectedRowIndices();
1926  }
1927  return chunk_metadata;
1928 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
std::string to_string(char const *&&v)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
SQLTypeInfo columnType
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan=false, const bool is_for_import=false, const bool is_for_detect=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void validate_definition_levels(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::LazyParquetChunkLoader::isColumnMappingSupported ( const ColumnDescriptor omnisci_column,
const parquet::ColumnDescriptor *  parquet_column 
)
static

Determine if a Parquet to OmniSci column mapping is supported.

Parameters
omnisci_column- the column descriptor of the OmniSci column
parquet_column- the column descriptor of the Parquet column
Returns
true if the column mapping is supported by LazyParquetChunkLoader, false otherwise

Definition at line 1942 of file LazyParquetChunkLoader.cpp.

References foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_date_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_decimal_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_floating_point_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_geospatial_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_integral_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_none_type_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_string_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_time_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_timestamp_mapping().

Referenced by foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping().

1944  {
1945  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1946  return true;
1947  }
1948  if (validate_array_mapping(omnisci_column, parquet_column)) {
1949  return true;
1950  }
1951  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1952  return true;
1953  }
1954  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1955  return true;
1956  }
1957  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1958  return true;
1959  }
1960  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1961  return true;
1962  }
1963  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1964  return true;
1965  }
1966  if (validate_time_mapping(omnisci_column, parquet_column)) {
1967  return true;
1968  }
1969  if (validate_date_mapping(omnisci_column, parquet_column)) {
1970  return true;
1971  }
1972  if (validate_string_mapping(omnisci_column, parquet_column)) {
1973  return true;
1974  }
1975  return false;
1976 }
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::loadChunk ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary = nullptr,
RejectedRowIndices rejected_row_indices = nullptr 
)

Load a number of row groups of a column in a parquet file into a chunk

Parameters
row_group_interval- an inclusive interval [start,end] that specifies row groups to load
parquet_column_index- the logical column index in the parquet file (and omnisci db) of column to load
chunks- a list containing the chunks to load
string_dictionary- a string dictionary for the column corresponding to the column, if applicable
rejected_row_indices- optional, if specified errors will be tracked in this data structure while loading
Returns
An empty list when no metadata update is applicable, otherwise a list of ChunkMetadata shared pointers with which to update the corresponding column chunk metadata.

NOTE: if more than one chunk is supplied, the first chunk is required to be the chunk corresponding to the logical column, while the remaining chunks correspond to physical columns (in ascending order of column id.) Similarly, if a metada update is expected, the list of ChunkMetadata shared pointers returned will correspond directly to the list chunks.

Definition at line 1988 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), and CHECK.

Referenced by foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader().

1993  {
1994  CHECK(!chunks.empty());
1995  auto const& chunk = *chunks.begin();
1996  auto column_descriptor = chunk.getColumnDesc();
1997  auto buffer = chunk.getBuffer();
1998  CHECK(buffer);
1999 
2000  try {
2001  auto metadata = appendRowGroups(row_group_intervals,
2002  parquet_column_index,
2003  column_descriptor,
2004  chunks,
2005  string_dictionary,
2006  rejected_row_indices);
2007  return metadata;
2008  } catch (const std::exception& error) {
2009  throw ForeignStorageException(error.what());
2010  }
2011 
2012  return {};
2013 }
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< size_t, size_t > foreign_storage::LazyParquetChunkLoader::loadRowGroups ( const RowGroupInterval row_group_interval,
const std::map< int, Chunk_NS::Chunk > &  chunks,
const ForeignTableSchema schema,
const std::map< int, StringDictionary * > &  column_dictionaries,
const int  num_threads = 1 
)

Load row groups of data into given chunks.

Parameters
row_group_interval- specifies which row groups to load
chunks- map of column index to chunk which data will be loaded into
schema- schema of the foreign table to perform metadata scan for
column_dictionaries- a map of string dictionaries for columns that require it
num_threads- number of threads to utilize while reading (if applicale)
Returns
[num_rows_completed,num_rows_rejected] - returns number of rows loaded and rejected while loading

Note that only logical chunks are expected because the data is read into an intermediate form into the underlying buffers. This member is intended to be used for import.

NOTE: Currently, loading one row group at a time is required.

Definition at line 2098 of file LazyParquetChunkLoader.cpp.

References threading_serial::async(), CHECK, DEBUG_TIMER, foreign_storage::RowGroupInterval::end_index, foreign_storage::RowGroupInterval::file_path, file_system_, shared::get_from_map(), foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getColumnDescriptor(), foreign_storage::ForeignTableSchema::getLogicalColumns(), foreign_storage::ForeignTableSchema::getParquetColumnIndex(), foreign_storage::open_parquet_table(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_import(), render_group_analyzer_map_, foreign_storage::RowGroupInterval::start_index, logger::thread_id(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_number_of_columns().

2103  {
2104  auto timer = DEBUG_TIMER(__func__);
2105 
2106  const auto& file_path = row_group_interval.file_path;
2107 
2108  // do not use caching with file-readers, open a new one for every request
2109  auto file_reader_owner = open_parquet_table(file_path, file_system_);
2110  auto file_reader = file_reader_owner.get();
2111  auto file_metadata = file_reader->parquet_reader()->metadata();
2112 
2113  validate_number_of_columns(file_metadata, file_path, schema);
2114 
2115  // check for fixed length encoded columns and indicate to the user
2116  // they should not be used
2117  for (const auto column_descriptor : schema.getLogicalColumns()) {
2118  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
2119  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2120  try {
2121  validate_allowed_mapping(parquet_column, column_descriptor);
2122  } catch (std::runtime_error& e) {
2123  std::stringstream error_message;
2124  error_message << e.what() << " Parquet column: " << parquet_column->name()
2125  << ", HeavyDB column: " << column_descriptor->columnName
2126  << ", Parquet file: " << file_path << ".";
2127  throw std::runtime_error(error_message.str());
2128  }
2129  }
2130 
2131  CHECK(row_group_interval.start_index == row_group_interval.end_index);
2132  auto row_group_index = row_group_interval.start_index;
2133  std::map<int, ParquetRowGroupReader> row_group_reader_map;
2134 
2135  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2136  auto group_reader = parquet_reader->RowGroup(row_group_index);
2137 
2138  std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2139 
2140  auto encoder_map = populate_encoder_map_for_import(chunks,
2141  schema,
2142  file_reader,
2143  column_dictionaries,
2144  group_reader->metadata()->num_rows(),
2146 
2147  std::vector<std::set<int>> partitions(num_threads);
2148  std::map<int, int> column_id_to_thread;
2149  for (auto& [column_id, encoder] : encoder_map) {
2150  auto thread_id = column_id % num_threads;
2151  column_id_to_thread[column_id] = thread_id;
2152  partitions[thread_id].insert(column_id);
2153  }
2154 
2155  for (auto& [column_id, encoder] : encoder_map) {
2156  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
2157  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
2158  auto parquet_column_descriptor =
2159  file_metadata->schema()->Column(parquet_column_index);
2160 
2161  // validate
2162  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
2163  CHECK(row_group_interval.start_index >= 0 &&
2164  row_group_interval.end_index < num_row_groups);
2165  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2167  parquet_column_descriptor);
2168 
2169  std::shared_ptr<parquet::ColumnReader> col_reader =
2170  group_reader->Column(parquet_column_index);
2171 
2172  row_group_reader_map.insert(
2173  {column_id,
2174  ParquetRowGroupReader(col_reader,
2175  column_descriptor,
2176  parquet_column_descriptor,
2177  shared::get_from_map(encoder_map, column_id).get(),
2178  invalid_indices_per_thread[shared::get_from_map(
2179  column_id_to_thread, column_id)],
2180  row_group_index,
2181  parquet_column_index,
2182  parquet_reader)});
2183  }
2184 
2185  std::vector<std::future<void>> futures;
2186  for (int ithread = 0; ithread < num_threads; ++ithread) {
2187  auto column_ids_for_thread = partitions[ithread];
2188  futures.emplace_back(
2189  std::async(std::launch::async, [&row_group_reader_map, column_ids_for_thread] {
2190  for (const auto column_id : column_ids_for_thread) {
2191  shared::get_from_map(row_group_reader_map, column_id)
2192  .readAndValidateRowGroup(); // reads and validate entire row group per
2193  // column
2194  }
2195  }));
2196  }
2197 
2198  for (auto& future : futures) {
2199  future.wait();
2200  }
2201 
2202  for (auto& future : futures) {
2203  future.get();
2204  }
2205 
2206  // merge/reduce invalid indices
2207  InvalidRowGroupIndices invalid_indices;
2208  for (auto& thread_invalid_indices : invalid_indices_per_thread) {
2209  invalid_indices.merge(thread_invalid_indices);
2210  }
2211 
2212  for (auto& [_, reader] : row_group_reader_map) {
2213  reader.eraseInvalidRowGroupData(
2214  invalid_indices); // removes invalid encoded data in buffers
2215  }
2216 
2217  // update the element count for each encoder
2218  for (const auto column_descriptor : schema.getLogicalColumns()) {
2219  auto column_id = column_descriptor->columnId;
2220  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
2221  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2222  invalid_indices.size());
2223  size_t updated_num_elems = db_encoder->getNumElems() +
2224  group_reader->metadata()->num_rows() -
2225  invalid_indices.size();
2226  db_encoder->setNumElems(updated_num_elems);
2227  if (column_descriptor->columnType.is_geometry()) {
2228  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2229  auto db_encoder =
2230  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
2231  db_encoder->setNumElems(updated_num_elems);
2232  }
2233  }
2234  }
2235 
2236  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2237  invalid_indices.size()};
2238 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
future< Result > async(Fn &&fn, Args &&...args)
std::set< int64_t > InvalidRowGroupIndices
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const RenderGroupAnalyzerMap *render_group_analyzer_map)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
ThreadId thread_id()
Definition: Logger.cpp:871
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_

+ Here is the call graph for this function:

std::list< RowGroupMetadata > foreign_storage::LazyParquetChunkLoader::metadataScan ( const std::vector< std::string > &  file_paths,
const ForeignTableSchema schema,
const bool  do_metadata_stats_validation = true 
)

Perform a metadata scan for the paths specified.

Parameters
file_paths- (ordered) files of the metadata scan
schema- schema of the foreign table to perform metadata scan for
do_metadata_stats_validation- validate stats in metadata of parquet files if true
Returns
a list of the row group metadata extracted from file_paths

Definition at line 2406 of file LazyParquetChunkLoader.cpp.

References threading_serial::async(), CHECK, DEBUG_TIMER, file_reader_cache_, file_system_, foreign_storage::get_num_threads(), foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getForeignTable(), foreign_storage::ForeignTableSchema::getLogicalAndPhysicalColumns(), foreign_storage::FileReaderMap::initializeIfEmpty(), foreign_storage::FileReaderMap::insert(), TableDescriptor::maxFragRows, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::metadata_scan_rowgroup_interval(), foreign_storage::partition_for_threads(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_metadata_scan(), render_group_analyzer_map_, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::throw_row_group_larger_than_fragment_size_error(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_parquet_metadata().

Referenced by foreign_storage::ParquetDataWrapper::getRowGroupMetadataForFilePaths().

2409  {
2410  auto timer = DEBUG_TIMER(__func__);
2411  auto column_interval =
2412  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
2413  schema.getLogicalAndPhysicalColumns().back()->columnId};
2414  CHECK(!file_paths.empty());
2415 
2416  // The encoder map needs to be populated before we can start scanning rowgroups, so we
2417  // peel the first file_path out of the async loop below to perform population.
2418  const auto& first_path = *file_paths.begin();
2419  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
2420  auto max_row_group_stats = validate_parquet_metadata(
2421  first_reader->parquet_reader()->metadata(), first_path, schema);
2422  auto encoder_map = populate_encoder_map_for_metadata_scan(column_interval,
2423  schema,
2424  first_reader,
2426  do_metadata_stats_validation);
2427  const auto num_row_groups = get_parquet_table_size(first_reader).first;
2428  auto row_group_metadata = metadata_scan_rowgroup_interval(
2429  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2430 
2431  // We want each (filepath->FileReader) pair in the cache to be initialized before we
2432  // multithread so that we are not adding keys in a concurrent environment, so we add
2433  // cache entries for each path and initialize to an empty unique_ptr if the file has not
2434  // yet been opened.
2435  // Since we have already performed the first iteration, we skip it in the thread groups
2436  // so as not to process it twice.
2437  std::vector<std::string> cache_subset;
2438  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2440  cache_subset.emplace_back(*path_it);
2441  }
2442 
2443  // Iterate asyncronously over any paths beyond the first.
2444  auto table_ptr = schema.getForeignTable();
2445  CHECK(table_ptr);
2446  auto num_threads = foreign_storage::get_num_threads(*table_ptr);
2447  auto paths_per_thread = partition_for_threads(cache_subset, num_threads);
2448  std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2449  futures;
2450  for (const auto& path_group : paths_per_thread) {
2451  futures.emplace_back(std::async(
2453  [&](const auto& paths, const auto& file_reader_cache)
2454  -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2455  std::list<RowGroupMetadata> reduced_metadata;
2456  MaxRowGroupSizeStats max_row_group_stats{0, 0};
2457  for (const auto& path : paths.get()) {
2458  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
2459  validate_equal_schema(first_reader, reader, first_path, path);
2460  auto local_max_row_group_stats = validate_parquet_metadata(
2461  reader->parquet_reader()->metadata(), path, schema);
2462  if (local_max_row_group_stats.max_row_group_size >
2463  max_row_group_stats.max_row_group_size) {
2464  max_row_group_stats = local_max_row_group_stats;
2465  }
2466  const auto num_row_groups = get_parquet_table_size(reader).first;
2467  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
2468  reduced_metadata.splice(
2469  reduced_metadata.end(),
2470  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
2471  }
2472  return {reduced_metadata, max_row_group_stats};
2473  },
2474  std::ref(path_group),
2475  std::ref(*file_reader_cache_)));
2476  }
2477 
2478  // Reduce all the row_group results.
2479  for (auto& future : futures) {
2480  auto [metadata, local_max_row_group_stats] = future.get();
2481  row_group_metadata.splice(row_group_metadata.end(), metadata);
2482  if (local_max_row_group_stats.max_row_group_size >
2483  max_row_group_stats.max_row_group_size) {
2484  max_row_group_stats = local_max_row_group_stats;
2485  }
2486  }
2487 
2488  if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2490  max_row_group_stats, schema.getForeignTable()->maxFragRows);
2491  }
2492 
2493  return row_group_metadata;
2494 }
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:41
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
size_t get_num_threads(const ForeignTable &table)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
future< Result > async(Fn &&fn, Args &&...args)
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:79
void initializeIfEmpty(const std::string &path)
Definition: ParquetShared.h:86
#define CHECK(condition)
Definition: Logger.h:291
#define DEBUG_TIMER(name)
Definition: Logger.h:411
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool do_metadata_stats_validation)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

DataPreview foreign_storage::LazyParquetChunkLoader::previewFiles ( const std::vector< std::string > &  files,
const size_t  max_num_rows,
const ForeignTable table 
)

Preview rows of data and column types in a set of files.

Parameters
files- files to preview
max_num_rows- maximum number of rows to preview
table- foreign table for preview
Returns
a DataPreview instance that contains relevant preview information

Definition at line 2247 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), CHECK, CHECK_EQ, CHECK_GE, foreign_storage::PreviewContext::column_chunks, foreign_storage::PreviewContext::column_descriptors, foreign_storage::DataPreview::column_names, foreign_storage::DataPreview::column_types, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, foreign_storage::create_futures_for_workers(), foreign_storage::PreviewContext::detect_buffers, foreign_storage::detect_geo_type(), file_reader_cache_, file_system_, foreign_storage::get_num_threads(), foreign_storage::FileReaderMap::getOrInsert(), gpu_enabled::iota(), ColumnDescriptor::isSystemCol, ColumnDescriptor::isVirtualCol, kENCODING_NONE, foreign_storage::DataPreview::num_rejected_rows, foreign_storage::PreviewContext::rejected_row_indices_per_column, foreign_storage::DataPreview::sample_rows, suggestColumnMapping(), ColumnDescriptor::tableId, and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema().

2249  {
2250  CHECK(!files.empty());
2251 
2252  auto first_file = *files.begin();
2253  auto first_file_reader = file_reader_cache_->getOrInsert(*files.begin(), file_system_);
2254 
2255  for (auto current_file_it = ++files.begin(); current_file_it != files.end();
2256  ++current_file_it) {
2257  auto file_reader = file_reader_cache_->getOrInsert(*current_file_it, file_system_);
2258  validate_equal_schema(first_file_reader, file_reader, first_file, *current_file_it);
2259  }
2260 
2261  auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2262  auto num_columns = first_file_metadata->num_columns();
2263 
2264  DataPreview data_preview;
2265  data_preview.num_rejected_rows = 0;
2266 
2267  auto current_file_it = files.begin();
2268  while (data_preview.sample_rows.size() < max_num_rows &&
2269  current_file_it != files.end()) {
2270  size_t total_num_rows = data_preview.sample_rows.size();
2271  size_t max_num_rows_to_append = max_num_rows - data_preview.sample_rows.size();
2272 
2273  // gather enough rows in row groups to produce required samples
2274  std::vector<RowGroupInterval> row_group_intervals;
2275  for (; current_file_it != files.end(); ++current_file_it) {
2276  const auto& file_path = *current_file_it;
2277  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
2278  auto file_metadata = file_reader->parquet_reader()->metadata();
2279  auto num_row_groups = file_metadata->num_row_groups();
2280  int end_row_group = 0;
2281  for (int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2282  const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2283  total_num_rows += next_num_rows;
2284  end_row_group = i;
2285  }
2286  row_group_intervals.push_back(RowGroupInterval{file_path, 0, end_row_group});
2287  }
2288 
2289  PreviewContext preview_context;
2290  for (int i = 0; i < num_columns; ++i) {
2291  auto col = first_file_metadata->schema()->Column(i);
2292  ColumnDescriptor& cd = preview_context.column_descriptors.emplace_back();
2293  auto sql_type = LazyParquetChunkLoader::suggestColumnMapping(col);
2294  cd.columnType = sql_type;
2295  cd.columnName =
2296  sql_type.is_array() ? col->path()->ToDotVector()[0] + "_array" : col->name();
2297  cd.isSystemCol = false;
2298  cd.isVirtualCol = false;
2299  cd.tableId = -1;
2300  cd.columnId = i + 1;
2301  data_preview.column_names.emplace_back(cd.columnName);
2302  data_preview.column_types.emplace_back(sql_type);
2303  preview_context.detect_buffers.push_back(
2304  std::make_unique<TypedParquetDetectBuffer>());
2305  preview_context.rejected_row_indices_per_column.push_back(
2306  std::make_unique<RejectedRowIndices>());
2307  auto& detect_buffer = preview_context.detect_buffers.back();
2308  auto& chunk = preview_context.column_chunks.emplace_back(&cd);
2309  chunk.setPinnable(false);
2310  chunk.setBuffer(detect_buffer.get());
2311  }
2312 
2313  std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2314  [&](const std::vector<int>& column_indices) {
2315  for (const auto& column_index : column_indices) {
2316  auto& chunk = preview_context.column_chunks[column_index];
2317  auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2318  auto& rejected_row_indices =
2319  preview_context.rejected_row_indices_per_column[column_index];
2320  appendRowGroups(row_group_intervals,
2321  column_index,
2322  chunk.getColumnDesc(),
2323  chunk_list,
2324  nullptr,
2325  rejected_row_indices.get(),
2326  true,
2327  max_num_rows_to_append);
2328  }
2329  };
2330 
2331  auto num_threads = foreign_storage::get_num_threads(foreign_table);
2332 
2333  std::vector<int> columns(num_columns);
2334  std::iota(columns.begin(), columns.end(), 0);
2335  auto futures =
2336  create_futures_for_workers(columns, num_threads, append_row_groups_for_column);
2337  for (auto& future : futures) {
2338  future.wait();
2339  }
2340  for (auto& future : futures) {
2341  future.get();
2342  }
2343 
2344  // merge all `rejected_row_indices_per_column`
2345  auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2346  for (int i = 0; i < num_columns; ++i) {
2347  rejected_row_indices->insert(
2348  preview_context.rejected_row_indices_per_column[i]->begin(),
2349  preview_context.rejected_row_indices_per_column[i]->end());
2350  }
2351 
2352  size_t num_rows = 0;
2353  auto buffers_it = preview_context.detect_buffers.begin();
2354  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2355  CHECK(buffers_it != preview_context.detect_buffers.end());
2356  auto& strings = buffers_it->get()->getStrings();
2357  if (i == 0) {
2358  num_rows = strings.size();
2359  } else {
2360  CHECK_EQ(num_rows, strings.size());
2361  }
2362  }
2363 
2364  size_t num_rejected_rows = rejected_row_indices->size();
2365  data_preview.num_rejected_rows += num_rejected_rows;
2366  CHECK_GE(num_rows, num_rejected_rows);
2367  auto row_count = num_rows - num_rejected_rows;
2368 
2369  auto offset_row = data_preview.sample_rows.size();
2370  data_preview.sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2371 
2372  for (size_t irow = 0, rows_appended = 0;
2373  irow < num_rows && offset_row + rows_appended < max_num_rows;
2374  ++irow) {
2375  if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2376  continue;
2377  }
2378  auto& row_data = data_preview.sample_rows[offset_row + rows_appended];
2379  row_data.resize(num_columns);
2380  auto buffers_it = preview_context.detect_buffers.begin();
2381  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2382  CHECK(buffers_it != preview_context.detect_buffers.end());
2383  auto& strings = buffers_it->get()->getStrings();
2384  row_data[i] = strings[irow];
2385  }
2386  ++rows_appended;
2387  }
2388  }
2389 
2390  // attempt to detect geo columns
2391  for (int i = 0; i < num_columns; ++i) {
2392  auto type_info = data_preview.column_types[i];
2393  if (type_info.is_string()) {
2394  auto tentative_geo_type =
2395  foreign_storage::detect_geo_type(data_preview.sample_rows, i);
2396  if (tentative_geo_type.has_value()) {
2397  data_preview.column_types[i].set_type(tentative_geo_type.value());
2398  data_preview.column_types[i].set_compression(kENCODING_NONE);
2399  }
2400  }
2401  }
2402 
2403  return data_preview;
2404 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
Definition: DataPreview.cpp:22
size_t get_num_threads(const ForeignTable &table)
#define CHECK_GE(x, y)
Definition: Logger.h:306
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
specifies the content in-memory of a row in the column metadata table
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
#define CHECK(condition)
Definition: Logger.h:291
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< arrow::fs::FileSystem > file_system_
SQLTypeInfo columnType
std::string columnName
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

SQLTypeInfo foreign_storage::LazyParquetChunkLoader::suggestColumnMapping ( const parquet::ColumnDescriptor *  parquet_column)
staticprivate

Suggest a possible Parquet to OmniSci column mapping based on heuristics.

Parameters
parquet_column- the column descriptor of the Parquet column
Returns
a supported OmniSci SQLTypeInfo given the Parquet column type

NOTE: the suggested type may be entirely inappropriate given a specific use-case; however, it is guaranteed to be an allowed mapping. For example, geo-types are never attempted to be detected and instead strings are always suggested in their place.

Definition at line 1930 of file LazyParquetChunkLoader.cpp.

References foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::is_valid_parquet_list_column(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::suggest_column_scalar_type(), and run_benchmark_import::type.

Referenced by previewFiles().

1931  {
1932  auto type = suggest_column_scalar_type(parquet_column);
1933 
1934  // array case
1935  if (is_valid_parquet_list_column(parquet_column)) {
1936  return type.get_array_type();
1937  }
1938 
1939  return type;
1940 }
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

const int foreign_storage::LazyParquetChunkLoader::batch_reader_num_elements = 4096
static
FileReaderMap* foreign_storage::LazyParquetChunkLoader::file_reader_cache_
private

Definition at line 172 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), metadataScan(), and previewFiles().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::LazyParquetChunkLoader::file_system_
private
std::string foreign_storage::LazyParquetChunkLoader::foreign_table_name_
private

Definition at line 175 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups().

const RenderGroupAnalyzerMap* foreign_storage::LazyParquetChunkLoader::render_group_analyzer_map_
private

Definition at line 174 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), loadRowGroups(), and metadataScan().


The documentation for this class was generated from the following files: