OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::LazyParquetChunkLoader Class Reference

#include <LazyParquetChunkLoader.h>

+ Collaboration diagram for foreign_storage::LazyParquetChunkLoader:

Public Member Functions

 LazyParquetChunkLoader (std::shared_ptr< arrow::fs::FileSystem > file_system, FileReaderMap *file_reader_cache, const RenderGroupAnalyzerMap *render_group_analyzer_map)
 
std::list< std::unique_ptr
< ChunkMetadata > > 
loadChunk (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary=nullptr, RejectedRowIndices *rejected_row_indices=nullptr)
 
std::list< RowGroupMetadatametadataScan (const std::vector< std::string > &file_paths, const ForeignTableSchema &schema, const bool do_metadata_stats_validation=true)
 Perform a metadata scan for the paths specified. More...
 
std::pair< size_t, size_t > loadRowGroups (const RowGroupInterval &row_group_interval, const std::map< int, Chunk_NS::Chunk > &chunks, const ForeignTableSchema &schema, const std::map< int, StringDictionary * > &column_dictionaries, const int num_threads=1)
 Load row groups of data into given chunks. More...
 
DataPreview previewFiles (const std::vector< std::string > &files, const size_t max_num_rows)
 Preview rows of data and column types in a set of files. More...
 

Static Public Member Functions

static bool isColumnMappingSupported (const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
 

Static Public Attributes

static const int batch_reader_num_elements = 4096
 

Private Member Functions

std::list< std::unique_ptr
< ChunkMetadata > > 
appendRowGroups (const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
 

Static Private Member Functions

static SQLTypeInfo suggestColumnMapping (const parquet::ColumnDescriptor *parquet_column)
 

Private Attributes

std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
FileReaderMapfile_reader_cache_
 
const RenderGroupAnalyzerMaprender_group_analyzer_map_
 

Detailed Description

A lazy parquet to chunk loader

Definition at line 37 of file LazyParquetChunkLoader.h.

Constructor & Destructor Documentation

foreign_storage::LazyParquetChunkLoader::LazyParquetChunkLoader ( std::shared_ptr< arrow::fs::FileSystem >  file_system,
FileReaderMap file_reader_cache,
const RenderGroupAnalyzerMap render_group_analyzer_map 
)

Definition at line 1963 of file LazyParquetChunkLoader.cpp.

1967  : file_system_(file_system)
1968  , file_reader_cache_(file_map)
1969  , render_group_analyzer_map_{render_group_analyzer_map} {}
const RenderGroupAnalyzerMap * render_group_analyzer_map_
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::appendRowGroups ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
const ColumnDescriptor column_descriptor,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary,
RejectedRowIndices rejected_row_indices,
const bool  is_for_detect = false,
const std::optional< int64_t >  max_levels_read = std::nullopt 
)
private

Definition at line 1783 of file LazyParquetChunkLoader.cpp.

References batch_reader_num_elements, CHECK, ColumnDescriptor::columnType, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::create_parquet_encoder(), DEBUG_TIMER, file_reader_cache_, file_system_, foreign_storage::get_column_descriptor(), foreign_storage::get_parquet_table_size(), foreign_storage::FileReaderMap::getOrInsert(), render_group_analyzer_map_, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::resize_values_buffer(), to_string(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_definition_levels(), foreign_storage::validate_equal_column_descriptor(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level().

Referenced by loadChunk(), and previewFiles().

1791  {
1792  auto timer = DEBUG_TIMER(__func__);
1793  std::list<std::unique_ptr<ChunkMetadata>> chunk_metadata;
1794  // `def_levels` and `rep_levels` below are used to store the read definition
1795  // and repetition levels of the Dremel encoding implemented by the Parquet
1796  // format
1797  std::vector<int16_t> def_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1798  std::vector<int16_t> rep_levels(LazyParquetChunkLoader::batch_reader_num_elements);
1799  std::vector<int8_t> values;
1800 
1801  CHECK(!row_group_intervals.empty());
1802  const auto& first_file_path = row_group_intervals.front().file_path;
1803 
1804  auto first_file_reader = file_reader_cache_->getOrInsert(first_file_path, file_system_);
1805  auto first_parquet_column_descriptor =
1806  get_column_descriptor(first_file_reader, parquet_column_index);
1807  resize_values_buffer(column_descriptor, first_parquet_column_descriptor, values);
1808  auto encoder = create_parquet_encoder(column_descriptor,
1809  first_parquet_column_descriptor,
1810  chunks,
1811  string_dictionary,
1812  chunk_metadata,
1814  false,
1815  false,
1816  is_for_detect);
1817  CHECK(encoder.get());
1818 
1819  if (rejected_row_indices) { // error tracking is enabled
1820  encoder->initializeErrorTracking(column_descriptor->columnType);
1821  }
1822 
1823  bool early_exit = false;
1824  int64_t total_levels_read = 0;
1825  for (const auto& row_group_interval : row_group_intervals) {
1826  const auto& file_path = row_group_interval.file_path;
1827  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
1828 
1829  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
1830  CHECK(row_group_interval.start_index >= 0 &&
1831  row_group_interval.end_index < num_row_groups);
1832  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
1833 
1834  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
1835  auto parquet_column_descriptor =
1836  get_column_descriptor(file_reader, parquet_column_index);
1837  validate_equal_column_descriptor(first_parquet_column_descriptor,
1838  parquet_column_descriptor,
1839  first_file_path,
1840  file_path);
1841 
1843  parquet_column_descriptor);
1844  int64_t values_read = 0;
1845  for (int row_group_index = row_group_interval.start_index;
1846  row_group_index <= row_group_interval.end_index;
1847  ++row_group_index) {
1848  auto group_reader = parquet_reader->RowGroup(row_group_index);
1849  std::shared_ptr<parquet::ColumnReader> col_reader =
1850  group_reader->Column(parquet_column_index);
1851 
1852  try {
1853  while (col_reader->HasNext()) {
1854  int64_t levels_read =
1856  def_levels.data(),
1857  rep_levels.data(),
1858  reinterpret_cast<uint8_t*>(values.data()),
1859  &values_read,
1860  col_reader.get());
1861 
1862  validate_definition_levels(parquet_reader,
1863  row_group_index,
1864  parquet_column_index,
1865  def_levels.data(),
1866  levels_read,
1867  parquet_column_descriptor);
1868 
1869  if (rejected_row_indices) { // error tracking is enabled
1870  encoder->appendDataTrackErrors(def_levels.data(),
1871  rep_levels.data(),
1872  values_read,
1873  levels_read,
1874  values.data());
1875  } else { // no error tracking enabled
1876  encoder->appendData(def_levels.data(),
1877  rep_levels.data(),
1878  values_read,
1879  levels_read,
1880  values.data());
1881  }
1882 
1883  if (max_levels_read.has_value()) {
1884  total_levels_read += levels_read;
1885  if (total_levels_read >= max_levels_read.value()) {
1886  early_exit = true;
1887  break;
1888  }
1889  }
1890  }
1891  if (auto array_encoder = dynamic_cast<ParquetArrayEncoder*>(encoder.get())) {
1892  array_encoder->finalizeRowGroup();
1893  }
1894  } catch (const std::exception& error) {
1895  throw ForeignStorageException(
1896  std::string(error.what()) + " Row group: " + std::to_string(row_group_index) +
1897  ", Parquet column: '" + col_reader->descr()->path()->ToDotString() +
1898  "', Parquet file: '" + file_path + "'");
1899  }
1900  if (max_levels_read.has_value() && early_exit) {
1901  break;
1902  }
1903  }
1904  if (max_levels_read.has_value() && early_exit) {
1905  break;
1906  }
1907  }
1908 
1909  if (rejected_row_indices) { // error tracking is enabled
1910  *rejected_row_indices = encoder->getRejectedRowIndices();
1911  }
1912  return chunk_metadata;
1913 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void validate_equal_column_descriptor(const parquet::ColumnDescriptor *reference_descriptor, const parquet::ColumnDescriptor *new_descriptor, const std::string &reference_file_path, const std::string &new_file_path)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
std::string to_string(char const *&&v)
const parquet::ColumnDescriptor * get_column_descriptor(const parquet::arrow::FileReader *reader, const int logical_column_index)
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void resize_values_buffer(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::vector< int8_t > &values)
SQLTypeInfo columnType
std::shared_ptr< ParquetEncoder > create_parquet_encoder(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, std::list< std::unique_ptr< ChunkMetadata >> &chunk_metadata, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool is_metadata_scan=false, const bool is_for_import=false, const bool is_for_detect=false)
Create a Parquet specific encoder for a Parquet to OmniSci mapping.
void validate_definition_levels(const parquet::ParquetFileReader *reader, const int row_group_index, const int column_index, const int16_t *def_levels, const int64_t num_levels, const parquet::ColumnDescriptor *parquet_column_descriptor)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::LazyParquetChunkLoader::isColumnMappingSupported ( const ColumnDescriptor omnisci_column,
const parquet::ColumnDescriptor *  parquet_column 
)
static

Determine if a Parquet to OmniSci column mapping is supported.

Parameters
omnisci_column- the column descriptor of the OmniSci column
parquet_column- the column descriptor of the Parquet column
Returns
true if the column mapping is supported by LazyParquetChunkLoader, false otherwise

Definition at line 1927 of file LazyParquetChunkLoader.cpp.

References foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_date_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_decimal_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_floating_point_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_geospatial_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_integral_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_none_type_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_string_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_time_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_timestamp_mapping().

Referenced by foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_array_mapping().

1929  {
1930  if (validate_geospatial_mapping(omnisci_column, parquet_column)) {
1931  return true;
1932  }
1933  if (validate_array_mapping(omnisci_column, parquet_column)) {
1934  return true;
1935  }
1936  if (validate_decimal_mapping(omnisci_column, parquet_column)) {
1937  return true;
1938  }
1939  if (validate_floating_point_mapping(omnisci_column, parquet_column)) {
1940  return true;
1941  }
1942  if (validate_integral_mapping(omnisci_column, parquet_column)) {
1943  return true;
1944  }
1945  if (validate_none_type_mapping(omnisci_column, parquet_column)) {
1946  return true;
1947  }
1948  if (validate_timestamp_mapping(omnisci_column, parquet_column)) {
1949  return true;
1950  }
1951  if (validate_time_mapping(omnisci_column, parquet_column)) {
1952  return true;
1953  }
1954  if (validate_date_mapping(omnisci_column, parquet_column)) {
1955  return true;
1956  }
1957  if (validate_string_mapping(omnisci_column, parquet_column)) {
1958  return true;
1959  }
1960  return false;
1961 }
bool validate_array_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_time_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_integral_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_date_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_timestamp_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_geospatial_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_decimal_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_none_type_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_floating_point_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)
bool validate_string_mapping(const ColumnDescriptor *omnisci_column, const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::list< std::unique_ptr< ChunkMetadata > > foreign_storage::LazyParquetChunkLoader::loadChunk ( const std::vector< RowGroupInterval > &  row_group_intervals,
const int  parquet_column_index,
std::list< Chunk_NS::Chunk > &  chunks,
StringDictionary string_dictionary = nullptr,
RejectedRowIndices rejected_row_indices = nullptr 
)

Load a number of row groups of a column in a parquet file into a chunk

Parameters
row_group_interval- an inclusive interval [start,end] that specifies row groups to load
parquet_column_index- the logical column index in the parquet file (and omnisci db) of column to load
chunks- a list containing the chunks to load
string_dictionary- a string dictionary for the column corresponding to the column, if applicable
rejected_row_indices- optional, if specified errors will be tracked in this data structure while loading
Returns
An empty list when no metadata update is applicable, otherwise a list of ChunkMetadata shared pointers with which to update the corresponding column chunk metadata. NOTE: Only ChunkMetadata.sqlType and the min & max values of the ChunkMetadata.chunkStats are valid, other values are not set.

NOTE: if more than one chunk is supplied, the first chunk is required to be the chunk corresponding to the logical column, while the remaining chunks correspond to physical columns (in ascending order of column id.) Similarly, if a metada update is expected, the list of ChunkMetadata shared pointers returned will correspond directly to the list chunks.

Definition at line 1971 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), and CHECK.

Referenced by foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader().

1976  {
1977  CHECK(!chunks.empty());
1978  auto const& chunk = *chunks.begin();
1979  auto column_descriptor = chunk.getColumnDesc();
1980  auto buffer = chunk.getBuffer();
1981  CHECK(buffer);
1982 
1983  try {
1984  auto metadata = appendRowGroups(row_group_intervals,
1985  parquet_column_index,
1986  column_descriptor,
1987  chunks,
1988  string_dictionary,
1989  rejected_row_indices);
1990  return metadata;
1991  } catch (const std::exception& error) {
1992  throw ForeignStorageException(error.what());
1993  }
1994 
1995  return {};
1996 }
#define CHECK(condition)
Definition: Logger.h:223
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::pair< size_t, size_t > foreign_storage::LazyParquetChunkLoader::loadRowGroups ( const RowGroupInterval row_group_interval,
const std::map< int, Chunk_NS::Chunk > &  chunks,
const ForeignTableSchema schema,
const std::map< int, StringDictionary * > &  column_dictionaries,
const int  num_threads = 1 
)

Load row groups of data into given chunks.

Parameters
row_group_interval- specifies which row groups to load
chunks- map of column index to chunk which data will be loaded into
schema- schema of the foreign table to perform metadata scan for
column_dictionaries- a map of string dictionaries for columns that require it
num_threads- number of threads to utilize while reading (if applicale)
Returns
[num_rows_completed,num_rows_rejected] - returns number of rows loaded and rejected while loading

Note that only logical chunks are expected because the data is read into an intermediate form into the underlying buffers. This member is intended to be used for import.

NOTE: Currently, loading one row group at a time is required.

Definition at line 2081 of file LazyParquetChunkLoader.cpp.

References threading_serial::async(), CHECK, DEBUG_TIMER, foreign_storage::RowGroupInterval::end_index, foreign_storage::RowGroupInterval::file_path, file_system_, shared::get_from_map(), foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getColumnDescriptor(), foreign_storage::ForeignTableSchema::getLogicalColumns(), foreign_storage::ForeignTableSchema::getParquetColumnIndex(), foreign_storage::open_parquet_table(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_import(), render_group_analyzer_map_, foreign_storage::RowGroupInterval::start_index, logger::thread_id(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_allowed_mapping(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_max_repetition_and_definition_level(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_number_of_columns().

2086  {
2087  auto timer = DEBUG_TIMER(__func__);
2088 
2089  const auto& file_path = row_group_interval.file_path;
2090 
2091  // do not use caching with file-readers, open a new one for every request
2092  auto file_reader_owner = open_parquet_table(file_path, file_system_);
2093  auto file_reader = file_reader_owner.get();
2094  auto file_metadata = file_reader->parquet_reader()->metadata();
2095 
2096  validate_number_of_columns(file_metadata, file_path, schema);
2097 
2098  // check for fixed length encoded columns and indicate to the user
2099  // they should not be used
2100  for (const auto column_descriptor : schema.getLogicalColumns()) {
2101  auto parquet_column_index = schema.getParquetColumnIndex(column_descriptor->columnId);
2102  auto parquet_column = file_metadata->schema()->Column(parquet_column_index);
2103  try {
2104  validate_allowed_mapping(parquet_column, column_descriptor);
2105  } catch (std::runtime_error& e) {
2106  std::stringstream error_message;
2107  error_message << e.what() << " Parquet column: " << parquet_column->name()
2108  << ", HeavyDB column: " << column_descriptor->columnName
2109  << ", Parquet file: " << file_path << ".";
2110  throw std::runtime_error(error_message.str());
2111  }
2112  }
2113 
2114  CHECK(row_group_interval.start_index == row_group_interval.end_index);
2115  auto row_group_index = row_group_interval.start_index;
2116  std::map<int, ParquetRowGroupReader> row_group_reader_map;
2117 
2118  parquet::ParquetFileReader* parquet_reader = file_reader->parquet_reader();
2119  auto group_reader = parquet_reader->RowGroup(row_group_index);
2120 
2121  std::vector<InvalidRowGroupIndices> invalid_indices_per_thread(num_threads);
2122 
2123  auto encoder_map = populate_encoder_map_for_import(chunks,
2124  schema,
2125  file_reader,
2126  column_dictionaries,
2127  group_reader->metadata()->num_rows(),
2129 
2130  std::vector<std::set<int>> partitions(num_threads);
2131  std::map<int, int> column_id_to_thread;
2132  for (auto& [column_id, encoder] : encoder_map) {
2133  auto thread_id = column_id % num_threads;
2134  column_id_to_thread[column_id] = thread_id;
2135  partitions[thread_id].insert(column_id);
2136  }
2137 
2138  for (auto& [column_id, encoder] : encoder_map) {
2139  const auto& column_descriptor = schema.getColumnDescriptor(column_id);
2140  const auto parquet_column_index = schema.getParquetColumnIndex(column_id);
2141  auto parquet_column_descriptor =
2142  file_metadata->schema()->Column(parquet_column_index);
2143 
2144  // validate
2145  auto [num_row_groups, num_columns] = get_parquet_table_size(file_reader);
2146  CHECK(row_group_interval.start_index >= 0 &&
2147  row_group_interval.end_index < num_row_groups);
2148  CHECK(parquet_column_index >= 0 && parquet_column_index < num_columns);
2150  parquet_column_descriptor);
2151 
2152  std::shared_ptr<parquet::ColumnReader> col_reader =
2153  group_reader->Column(parquet_column_index);
2154 
2155  row_group_reader_map.insert(
2156  {column_id,
2157  ParquetRowGroupReader(col_reader,
2158  column_descriptor,
2159  parquet_column_descriptor,
2160  shared::get_from_map(encoder_map, column_id).get(),
2161  invalid_indices_per_thread[shared::get_from_map(
2162  column_id_to_thread, column_id)],
2163  row_group_index,
2164  parquet_column_index,
2165  parquet_reader)});
2166  }
2167 
2168  std::vector<std::future<void>> futures;
2169  for (int ithread = 0; ithread < num_threads; ++ithread) {
2170  auto column_ids_for_thread = partitions[ithread];
2171  futures.emplace_back(
2172  std::async(std::launch::async, [&row_group_reader_map, column_ids_for_thread] {
2173  for (const auto column_id : column_ids_for_thread) {
2174  shared::get_from_map(row_group_reader_map, column_id)
2175  .readAndValidateRowGroup(); // reads and validate entire row group per
2176  // column
2177  }
2178  }));
2179  }
2180 
2181  for (auto& future : futures) {
2182  future.wait();
2183  }
2184 
2185  for (auto& future : futures) {
2186  future.get();
2187  }
2188 
2189  // merge/reduce invalid indices
2190  InvalidRowGroupIndices invalid_indices;
2191  for (auto& thread_invalid_indices : invalid_indices_per_thread) {
2192  invalid_indices.merge(thread_invalid_indices);
2193  }
2194 
2195  for (auto& [_, reader] : row_group_reader_map) {
2196  reader.eraseInvalidRowGroupData(
2197  invalid_indices); // removes invalid encoded data in buffers
2198  }
2199 
2200  // update the element count for each encoder
2201  for (const auto column_descriptor : schema.getLogicalColumns()) {
2202  auto column_id = column_descriptor->columnId;
2203  auto db_encoder = shared::get_from_map(chunks, column_id).getBuffer()->getEncoder();
2204  CHECK(static_cast<size_t>(group_reader->metadata()->num_rows()) >=
2205  invalid_indices.size());
2206  size_t updated_num_elems = db_encoder->getNumElems() +
2207  group_reader->metadata()->num_rows() -
2208  invalid_indices.size();
2209  db_encoder->setNumElems(updated_num_elems);
2210  if (column_descriptor->columnType.is_geometry()) {
2211  for (int i = 0; i < column_descriptor->columnType.get_physical_cols(); ++i) {
2212  auto db_encoder =
2213  shared::get_from_map(chunks, column_id + i + 1).getBuffer()->getEncoder();
2214  db_encoder->setNumElems(updated_num_elems);
2215  }
2216  }
2217  }
2218 
2219  return {group_reader->metadata()->num_rows() - invalid_indices.size(),
2220  invalid_indices.size()};
2221 }
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
UniqueReaderPtr open_parquet_table(const std::string &file_path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
future< Result > async(Fn &&fn, Args &&...args)
std::set< int64_t > InvalidRowGroupIndices
void validate_allowed_mapping(const parquet::ColumnDescriptor *parquet_column, const ColumnDescriptor *omnisci_column)
void validate_number_of_columns(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_import(const std::map< int, Chunk_NS::Chunk > chunks, const ForeignTableSchema &schema, const ReaderPtr &reader, const std::map< int, StringDictionary * > column_dictionaries, const int64_t num_rows, const RenderGroupAnalyzerMap *render_group_analyzer_map)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
ThreadId thread_id()
Definition: Logger.cpp:817
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
void validate_max_repetition_and_definition_level(const ColumnDescriptor *omnisci_column_descriptor, const parquet::ColumnDescriptor *parquet_column_descriptor)
std::shared_ptr< arrow::fs::FileSystem > file_system_

+ Here is the call graph for this function:

std::list< RowGroupMetadata > foreign_storage::LazyParquetChunkLoader::metadataScan ( const std::vector< std::string > &  file_paths,
const ForeignTableSchema schema,
const bool  do_metadata_stats_validation = true 
)

Perform a metadata scan for the paths specified.

Parameters
file_paths- (ordered) files of the metadata scan
schema- schema of the foreign table to perform metadata scan for
do_metadata_stats_validation- validate stats in metadata of parquet files if true
Returns
a list of the row group metadata extracted from file_paths

Definition at line 2386 of file LazyParquetChunkLoader.cpp.

References threading_serial::async(), CHECK, DEBUG_TIMER, file_reader_cache_, file_system_, g_max_import_threads, foreign_storage::get_parquet_table_size(), foreign_storage::ForeignTableSchema::getForeignTable(), foreign_storage::ForeignTableSchema::getLogicalAndPhysicalColumns(), foreign_storage::FileReaderMap::initializeIfEmpty(), foreign_storage::FileReaderMap::insert(), TableDescriptor::maxFragRows, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::metadata_scan_rowgroup_interval(), foreign_storage::partition_for_threads(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::populate_encoder_map_for_metadata_scan(), render_group_analyzer_map_, foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::throw_row_group_larger_than_fragment_size_error(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema(), and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_parquet_metadata().

Referenced by foreign_storage::ParquetDataWrapper::metadataScanFiles().

2389  {
2390  auto timer = DEBUG_TIMER(__func__);
2391  auto column_interval =
2392  Interval<ColumnType>{schema.getLogicalAndPhysicalColumns().front()->columnId,
2393  schema.getLogicalAndPhysicalColumns().back()->columnId};
2394  CHECK(!file_paths.empty());
2395 
2396  // The encoder map needs to be populated before we can start scanning rowgroups, so we
2397  // peel the first file_path out of the async loop below to perform population.
2398  const auto& first_path = *file_paths.begin();
2399  auto first_reader = file_reader_cache_->insert(first_path, file_system_);
2400  auto max_row_group_stats = validate_parquet_metadata(
2401  first_reader->parquet_reader()->metadata(), first_path, schema);
2402  auto encoder_map = populate_encoder_map_for_metadata_scan(column_interval,
2403  schema,
2404  first_reader,
2406  do_metadata_stats_validation);
2407  const auto num_row_groups = get_parquet_table_size(first_reader).first;
2408  auto row_group_metadata = metadata_scan_rowgroup_interval(
2409  encoder_map, {first_path, 0, num_row_groups - 1}, first_reader, schema);
2410 
2411  // We want each (filepath->FileReader) pair in the cache to be initialized before we
2412  // multithread so that we are not adding keys in a concurrent environment, so we add
2413  // cache entries for each path and initialize to an empty unique_ptr if the file has not
2414  // yet been opened.
2415  // Since we have already performed the first iteration, we skip it in the thread groups
2416  // so as not to process it twice.
2417  std::vector<std::string> cache_subset;
2418  for (auto path_it = ++(file_paths.begin()); path_it != file_paths.end(); ++path_it) {
2420  cache_subset.emplace_back(*path_it);
2421  }
2422 
2423  // Iterate asyncronously over any paths beyond the first.
2424  auto paths_per_thread = partition_for_threads(cache_subset, g_max_import_threads);
2425  std::vector<std::future<std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats>>>
2426  futures;
2427  for (const auto& path_group : paths_per_thread) {
2428  futures.emplace_back(std::async(
2430  [&](const auto& paths, const auto& file_reader_cache)
2431  -> std::pair<std::list<RowGroupMetadata>, MaxRowGroupSizeStats> {
2432  std::list<RowGroupMetadata> reduced_metadata;
2433  MaxRowGroupSizeStats max_row_group_stats{0, 0};
2434  for (const auto& path : paths.get()) {
2435  auto reader = file_reader_cache.get().getOrInsert(path, file_system_);
2436  validate_equal_schema(first_reader, reader, first_path, path);
2437  auto local_max_row_group_stats = validate_parquet_metadata(
2438  reader->parquet_reader()->metadata(), path, schema);
2439  if (local_max_row_group_stats.max_row_group_size >
2440  max_row_group_stats.max_row_group_size) {
2441  max_row_group_stats = local_max_row_group_stats;
2442  }
2443  const auto num_row_groups = get_parquet_table_size(reader).first;
2444  const auto interval = RowGroupInterval{path, 0, num_row_groups - 1};
2445  reduced_metadata.splice(
2446  reduced_metadata.end(),
2447  metadata_scan_rowgroup_interval(encoder_map, interval, reader, schema));
2448  }
2449  return {reduced_metadata, max_row_group_stats};
2450  },
2451  std::ref(path_group),
2452  std::ref(*file_reader_cache_)));
2453  }
2454 
2455  // Reduce all the row_group results.
2456  for (auto& future : futures) {
2457  auto [metadata, local_max_row_group_stats] = future.get();
2458  row_group_metadata.splice(row_group_metadata.end(), metadata);
2459  if (local_max_row_group_stats.max_row_group_size >
2460  max_row_group_stats.max_row_group_size) {
2461  max_row_group_stats = local_max_row_group_stats;
2462  }
2463  }
2464 
2465  if (max_row_group_stats.max_row_group_size > schema.getForeignTable()->maxFragRows) {
2467  max_row_group_stats, schema.getForeignTable()->maxFragRows);
2468  }
2469 
2470  return row_group_metadata;
2471 }
auto partition_for_threads(const std::set< T > &items, size_t max_threads)
Definition: FsiChunkUtils.h:42
std::pair< int, int > get_parquet_table_size(const ReaderPtr &reader)
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
const RenderGroupAnalyzerMap * render_group_analyzer_map_
MaxRowGroupSizeStats validate_parquet_metadata(const std::shared_ptr< parquet::FileMetaData > &file_metadata, const std::string &file_path, const ForeignTableSchema &schema)
future< Result > async(Fn &&fn, Args &&...args)
std::list< RowGroupMetadata > metadata_scan_rowgroup_interval(const std::map< int, std::shared_ptr< ParquetEncoder >> &encoder_map, const RowGroupInterval &row_group_interval, const ReaderPtr &reader, const ForeignTableSchema &schema)
const ReaderPtr insert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:79
void initializeIfEmpty(const std::string &path)
Definition: ParquetShared.h:86
#define CHECK(condition)
Definition: Logger.h:223
#define DEBUG_TIMER(name)
Definition: Logger.h:370
std::map< int, std::shared_ptr< ParquetEncoder > > populate_encoder_map_for_metadata_scan(const Interval< ColumnType > &column_interval, const ForeignTableSchema &schema, const ReaderPtr &reader, const RenderGroupAnalyzerMap *render_group_analyzer_map, const bool do_metadata_stats_validation)
std::shared_ptr< arrow::fs::FileSystem > file_system_
void throw_row_group_larger_than_fragment_size_error(const MaxRowGroupSizeStats max_row_group_stats, const int fragment_size)
size_t g_max_import_threads
Definition: Importer.cpp:106

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

DataPreview foreign_storage::LazyParquetChunkLoader::previewFiles ( const std::vector< std::string > &  files,
const size_t  max_num_rows 
)

Preview rows of data and column types in a set of files.

Parameters
files- files to preview
max_num_rows- maximum number of rows to preview
Returns
a DataPreview instance that contains relevant preview information

Definition at line 2230 of file LazyParquetChunkLoader.cpp.

References appendRowGroups(), CHECK, CHECK_EQ, CHECK_GE, foreign_storage::PreviewContext::column_chunks, foreign_storage::PreviewContext::column_descriptors, foreign_storage::DataPreview::column_names, foreign_storage::DataPreview::column_types, ColumnDescriptor::columnId, ColumnDescriptor::columnName, ColumnDescriptor::columnType, foreign_storage::create_futures_for_workers(), foreign_storage::PreviewContext::detect_buffers, foreign_storage::detect_geo_type(), file_reader_cache_, file_system_, g_max_import_threads, foreign_storage::FileReaderMap::getOrInsert(), gpu_enabled::iota(), ColumnDescriptor::isSystemCol, ColumnDescriptor::isVirtualCol, kENCODING_NONE, foreign_storage::DataPreview::num_rejected_rows, foreign_storage::PreviewContext::rejected_row_indices_per_column, foreign_storage::DataPreview::sample_rows, suggestColumnMapping(), ColumnDescriptor::tableId, and foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::validate_equal_schema().

2231  {
2232  CHECK(!files.empty());
2233 
2234  auto first_file = *files.begin();
2235  auto first_file_reader = file_reader_cache_->getOrInsert(*files.begin(), file_system_);
2236 
2237  for (auto current_file_it = ++files.begin(); current_file_it != files.end();
2238  ++current_file_it) {
2239  auto file_reader = file_reader_cache_->getOrInsert(*current_file_it, file_system_);
2240  validate_equal_schema(first_file_reader, file_reader, first_file, *current_file_it);
2241  }
2242 
2243  auto first_file_metadata = first_file_reader->parquet_reader()->metadata();
2244  auto num_columns = first_file_metadata->num_columns();
2245 
2246  DataPreview data_preview;
2247  data_preview.num_rejected_rows = 0;
2248 
2249  auto current_file_it = files.begin();
2250  while (data_preview.sample_rows.size() < max_num_rows &&
2251  current_file_it != files.end()) {
2252  size_t total_num_rows = data_preview.sample_rows.size();
2253  size_t max_num_rows_to_append = max_num_rows - data_preview.sample_rows.size();
2254 
2255  // gather enough rows in row groups to produce required samples
2256  std::vector<RowGroupInterval> row_group_intervals;
2257  for (; current_file_it != files.end(); ++current_file_it) {
2258  const auto& file_path = *current_file_it;
2259  auto file_reader = file_reader_cache_->getOrInsert(file_path, file_system_);
2260  auto file_metadata = file_reader->parquet_reader()->metadata();
2261  auto num_row_groups = file_metadata->num_row_groups();
2262  int end_row_group = 0;
2263  for (int i = 0; i < num_row_groups && total_num_rows < max_num_rows; ++i) {
2264  const size_t next_num_rows = file_metadata->RowGroup(i)->num_rows();
2265  total_num_rows += next_num_rows;
2266  end_row_group = i;
2267  }
2268  row_group_intervals.push_back(RowGroupInterval{file_path, 0, end_row_group});
2269  }
2270 
2271  PreviewContext preview_context;
2272  for (int i = 0; i < num_columns; ++i) {
2273  auto col = first_file_metadata->schema()->Column(i);
2274  ColumnDescriptor& cd = preview_context.column_descriptors.emplace_back();
2275  auto sql_type = LazyParquetChunkLoader::suggestColumnMapping(col);
2276  cd.columnType = sql_type;
2277  cd.columnName =
2278  sql_type.is_array() ? col->path()->ToDotVector()[0] + "_array" : col->name();
2279  cd.isSystemCol = false;
2280  cd.isVirtualCol = false;
2281  cd.tableId = -1;
2282  cd.columnId = i + 1;
2283  data_preview.column_names.emplace_back(cd.columnName);
2284  data_preview.column_types.emplace_back(sql_type);
2285  preview_context.detect_buffers.push_back(
2286  std::make_unique<TypedParquetDetectBuffer>());
2287  preview_context.rejected_row_indices_per_column.push_back(
2288  std::make_unique<RejectedRowIndices>());
2289  auto& detect_buffer = preview_context.detect_buffers.back();
2290  auto& chunk = preview_context.column_chunks.emplace_back(&cd);
2291  chunk.setPinnable(false);
2292  chunk.setBuffer(detect_buffer.get());
2293  }
2294 
2295  std::function<void(const std::vector<int>&)> append_row_groups_for_column =
2296  [&](const std::vector<int>& column_indices) {
2297  for (const auto& column_index : column_indices) {
2298  auto& chunk = preview_context.column_chunks[column_index];
2299  auto chunk_list = std::list<Chunk_NS::Chunk>{chunk};
2300  auto& rejected_row_indices =
2301  preview_context.rejected_row_indices_per_column[column_index];
2302  appendRowGroups(row_group_intervals,
2303  column_index,
2304  chunk.getColumnDesc(),
2305  chunk_list,
2306  nullptr,
2307  rejected_row_indices.get(),
2308  true,
2309  max_num_rows_to_append);
2310  }
2311  };
2312 
2313  std::vector<int> columns(num_columns);
2314  std::iota(columns.begin(), columns.end(), 0);
2315  auto futures = create_futures_for_workers(
2316  columns, g_max_import_threads, append_row_groups_for_column);
2317  for (auto& future : futures) {
2318  future.wait();
2319  }
2320  for (auto& future : futures) {
2321  future.get();
2322  }
2323 
2324  // merge all `rejected_row_indices_per_column`
2325  auto rejected_row_indices = std::make_unique<RejectedRowIndices>();
2326  for (int i = 0; i < num_columns; ++i) {
2327  rejected_row_indices->insert(
2328  preview_context.rejected_row_indices_per_column[i]->begin(),
2329  preview_context.rejected_row_indices_per_column[i]->end());
2330  }
2331 
2332  size_t num_rows = 0;
2333  auto buffers_it = preview_context.detect_buffers.begin();
2334  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2335  CHECK(buffers_it != preview_context.detect_buffers.end());
2336  auto& strings = buffers_it->get()->getStrings();
2337  if (i == 0) {
2338  num_rows = strings.size();
2339  } else {
2340  CHECK_EQ(num_rows, strings.size());
2341  }
2342  }
2343 
2344  size_t num_rejected_rows = rejected_row_indices->size();
2345  data_preview.num_rejected_rows += num_rejected_rows;
2346  CHECK_GE(num_rows, num_rejected_rows);
2347  auto row_count = num_rows - num_rejected_rows;
2348 
2349  auto offset_row = data_preview.sample_rows.size();
2350  data_preview.sample_rows.resize(std::min(offset_row + row_count, max_num_rows));
2351 
2352  for (size_t irow = 0, rows_appended = 0;
2353  irow < num_rows && offset_row + rows_appended < max_num_rows;
2354  ++irow) {
2355  if (rejected_row_indices->find(irow) != rejected_row_indices->end()) {
2356  continue;
2357  }
2358  auto& row_data = data_preview.sample_rows[offset_row + rows_appended];
2359  row_data.resize(num_columns);
2360  auto buffers_it = preview_context.detect_buffers.begin();
2361  for (int i = 0; i < num_columns; ++i, ++buffers_it) {
2362  CHECK(buffers_it != preview_context.detect_buffers.end());
2363  auto& strings = buffers_it->get()->getStrings();
2364  row_data[i] = strings[irow];
2365  }
2366  ++rows_appended;
2367  }
2368  }
2369 
2370  // attempt to detect geo columns
2371  for (int i = 0; i < num_columns; ++i) {
2372  auto type_info = data_preview.column_types[i];
2373  if (type_info.is_string()) {
2374  auto tentative_geo_type =
2375  foreign_storage::detect_geo_type(data_preview.sample_rows, i);
2376  if (tentative_geo_type.has_value()) {
2377  data_preview.column_types[i].set_type(tentative_geo_type.value());
2378  data_preview.column_types[i].set_compression(kENCODING_NONE);
2379  }
2380  }
2381  }
2382 
2383  return data_preview;
2384 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::optional< SQLTypes > detect_geo_type(const SampleRows &sample_rows, size_t column_index)
Definition: DataPreview.cpp:22
#define CHECK_GE(x, y)
Definition: Logger.h:236
void validate_equal_schema(const parquet::arrow::FileReader *reference_file_reader, const parquet::arrow::FileReader *new_file_reader, const std::string &reference_file_path, const std::string &new_file_path)
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:75
const ReaderPtr getOrInsert(const std::string &path, std::shared_ptr< arrow::fs::FileSystem > &file_system)
Definition: ParquetShared.h:70
specifies the content in-memory of a row in the column metadata table
DEVICE void iota(ARGS &&...args)
Definition: gpu_enabled.h:69
#define CHECK(condition)
Definition: Logger.h:223
std::list< std::unique_ptr< ChunkMetadata > > appendRowGroups(const std::vector< RowGroupInterval > &row_group_intervals, const int parquet_column_index, const ColumnDescriptor *column_descriptor, std::list< Chunk_NS::Chunk > &chunks, StringDictionary *string_dictionary, RejectedRowIndices *rejected_row_indices, const bool is_for_detect=false, const std::optional< int64_t > max_levels_read=std::nullopt)
std::shared_ptr< arrow::fs::FileSystem > file_system_
SQLTypeInfo columnType
std::string columnName
size_t g_max_import_threads
Definition: Importer.cpp:106
static SQLTypeInfo suggestColumnMapping(const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

SQLTypeInfo foreign_storage::LazyParquetChunkLoader::suggestColumnMapping ( const parquet::ColumnDescriptor *  parquet_column)
staticprivate

Suggest a possible Parquet to OmniSci column mapping based on heuristics.

Parameters
parquet_column- the column descriptor of the Parquet column
Returns
a supported OmniSci SQLTypeInfo given the Parquet column type

NOTE: the suggested type may be entirely inappropriate given a specific use-case; however, it is guaranteed to be an allowed mapping. For example, geo-types are never attempted to be detected and instead strings are always suggested in their place.

Definition at line 1915 of file LazyParquetChunkLoader.cpp.

References foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::is_valid_parquet_list_column(), foreign_storage::anonymous_namespace{LazyParquetChunkLoader.cpp}::suggest_column_scalar_type(), and run_benchmark_import::type.

Referenced by previewFiles().

1916  {
1917  auto type = suggest_column_scalar_type(parquet_column);
1918 
1919  // array case
1920  if (is_valid_parquet_list_column(parquet_column)) {
1921  return type.get_array_type();
1922  }
1923 
1924  return type;
1925 }
bool is_valid_parquet_list_column(const parquet::ColumnDescriptor *parquet_column)
Detect a valid list parquet column.
SQLTypeInfo suggest_column_scalar_type(const parquet::ColumnDescriptor *parquet_column)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

const int foreign_storage::LazyParquetChunkLoader::batch_reader_num_elements = 4096
static
FileReaderMap* foreign_storage::LazyParquetChunkLoader::file_reader_cache_
private

Definition at line 171 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), metadataScan(), and previewFiles().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::LazyParquetChunkLoader::file_system_
private
const RenderGroupAnalyzerMap* foreign_storage::LazyParquetChunkLoader::render_group_analyzer_map_
private

Definition at line 173 of file LazyParquetChunkLoader.h.

Referenced by appendRowGroups(), loadRowGroups(), and metadataScan().


The documentation for this class was generated from the following files: