OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::ParquetDataWrapper Class Reference

#include <ParquetDataWrapper.h>

+ Inheritance diagram for foreign_storage::ParquetDataWrapper:
+ Collaboration diagram for foreign_storage::ParquetDataWrapper:

Public Member Functions

 ParquetDataWrapper ()
 
 ParquetDataWrapper (const int db_id, const ForeignTable *foreign_table, const bool do_metadata_stats_validation=true)
 
 ParquetDataWrapper (const ForeignTable *foreign_table, std::shared_ptr< arrow::fs::FileSystem > file_system)
 Constructor intended for detect use-case only. More...
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
void createRenderGroupAnalyzers () override
 Create RenderGroupAnalyzers for poly columns. More...
 
DataPreview getDataPreview (const size_t num_rows)
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
const std::set< std::string > getAlterableTableOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 
virtual bool isLazyFragmentFetchingEnabled () const
 

Private Member Functions

std::list< const
ColumnDescriptor * > 
getColumnsToInitialize (const Interval< ColumnType > &column_interval)
 
void initializeChunkBuffers (const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
 
void fetchChunkMetadata ()
 
void loadBuffersUsingLazyParquetChunkLoader (const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
 
std::vector< std::string > getOrderedProcessedFilePaths ()
 
std::vector< std::string > getAllFilePaths ()
 
bool moveToNextFragment (size_t new_rows_count) const
 
void finalizeFragmentMap ()
 
void addNewFragment (int row_group, const std::string &file_path)
 
bool isNewFile (const std::string &file_path) const
 
void addNewFile (const std::string &file_path)
 
void setLastFileRowCount (const std::string &file_path)
 
void resetParquetMetadata ()
 
void metadataScanFiles (const std::vector< std::string > &file_paths)
 
void metadataScanRowGroupMetadata (const std::list< RowGroupMetadata > &row_group_metadata)
 
std::list< RowGroupMetadatagetRowGroupMetadataForFilePaths (const std::vector< std::string > &file_paths) const
 
std::map< FilePathAndRowGroup,
RowGroupMetadata
getRowGroupMetadataMap (const std::vector< std::string > &file_paths) const
 
void updateChunkMetadataForFragment (const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
 
void metadataScanRowGroupIntervals (const std::vector< RowGroupInterval > &row_group_intervals)
 
void updateMetadataForRolledOffFiles (const std::set< std::string > &rolled_off_files)
 
void removeMetadataForLastFile (const std::string &last_file_path)
 

Private Attributes

const bool do_metadata_stats_validation_
 
std::map< int, std::vector
< RowGroupInterval > > 
fragment_to_row_group_interval_map_
 
std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
const int db_id_
 
const ForeignTableforeign_table_
 
int last_fragment_index_
 
size_t last_fragment_row_count_
 
size_t total_row_count_
 
size_t last_file_row_count_
 
int last_row_group_
 
bool is_restored_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 
std::mutex delete_buffer_mutex_
 
RenderGroupAnalyzerMap render_group_analyzer_map_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static shared::FilePathOptions getFilePathOptions (const ForeignTable *foreign_table)
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::string ALLOW_FILE_ROLL_OFF_KEY = "ALLOW_FILE_ROLL_OFF"
 
static const std::string THREADS_KEY = "THREADS"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 
static bool allowFileRollOff (const ForeignTable *foreign_table)
 

Detailed Description

Definition at line 42 of file ParquetDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( )
foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const int  db_id,
const ForeignTable foreign_table,
const bool  do_metadata_stats_validation = true 
)

Definition at line 93 of file ParquetDataWrapper.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

96  : do_metadata_stats_validation_(do_metadata_stats_validation)
97  , db_id_(db_id)
98  , foreign_table_(foreign_table)
101  , total_row_count_(0)
103  , last_row_group_(0)
104  , is_restored_(false)
105  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
106  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
107  auto& server_options = foreign_table->foreign_server->options;
108  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
109  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
110  } else {
111  UNREACHABLE();
112  }
113 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:337
std::shared_ptr< arrow::fs::FileSystem > file_system_
foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const ForeignTable foreign_table,
std::shared_ptr< arrow::fs::FileSystem >  file_system 
)

Constructor intended for detect use-case only.

Definition at line 79 of file ParquetDataWrapper.cpp.

82  , db_id_(-1)
83  , foreign_table_(foreign_table)
86  , total_row_count_(0)
88  , last_row_group_(0)
89  , is_restored_(false)
90  , file_system_(file_system)
91  , file_reader_cache_(std::make_unique<FileReaderMap>()) {}
std::unique_ptr< FileReaderMap > file_reader_cache_
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

void foreign_storage::ParquetDataWrapper::addNewFile ( const std::string &  file_path)
private

Definition at line 233 of file ParquetDataWrapper.cpp.

References foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), CHECK, CHECK_EQ, foreign_table_, fragment_to_row_group_interval_map_, last_fragment_index_, last_row_group_, and setLastFileRowCount().

Referenced by metadataScanRowGroupMetadata().

233  {
234  const auto last_fragment_entry =
236  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
237 
238  // The entry for the first fragment starts out as an empty vector
239  if (last_fragment_entry->second.empty()) {
240  // File roll off can result in empty older fragments.
243  }
244  } else {
245  last_fragment_entry->second.back().end_index = last_row_group_;
246  }
247  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
248  setLastFileRowCount(file_path);
249 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
void setLastFileRowCount(const std::string &file_path)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
static bool allowFileRollOff(const ForeignTable *foreign_table)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::addNewFragment ( int  row_group,
const std::string &  file_path 
)
private

Definition at line 203 of file ParquetDataWrapper.cpp.

References CHECK, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and setLastFileRowCount().

Referenced by metadataScanRowGroupMetadata().

203  {
204  const auto last_fragment_entry =
206  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
207 
208  last_fragment_entry->second.back().end_index = last_row_group_;
212  RowGroupInterval{file_path, row_group});
213  setLastFileRowCount(file_path);
214 }
void setLastFileRowCount(const std::string &file_path)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::createRenderGroupAnalyzers ( )
overridevirtual

Create RenderGroupAnalyzers for poly columns.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 742 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_GE, db_id_, foreign_table_, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), IS_GEO_POLY, render_group_analyzer_map_, and TableDescriptor::tableId.

742  {
743  // must have these
744  CHECK_GE(db_id_, 0);
746 
747  // populate map for all poly columns in this table
749  CHECK(catalog);
750  auto columns =
751  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
752  for (auto const& column : columns) {
753  if (IS_GEO_POLY(column->columnType.get_type())) {
755  .try_emplace(column->columnId,
756  std::make_unique<import_export::RenderGroupAnalyzer>())
757  .second);
758  }
759  }
760 }
#define CHECK_GE(x, y)
Definition: Logger.h:306
static SysCatalog & instance()
Definition: SysCatalog.h:343
RenderGroupAnalyzerMap render_group_analyzer_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:291
#define IS_GEO_POLY(T)
Definition: sqltypes.h:305

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::fetchChunkMetadata ( )
private

Definition at line 256 of file ParquetDataWrapper.cpp.

References foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), CHECK, CHECK_EQ, shared::check_for_rolled_off_file_paths(), chunk_metadata_map_, shared::contains(), db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), Catalog_Namespace::SysCatalog::getCatalog(), getOrderedProcessedFilePaths(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), last_file_row_count_, metadataScanFiles(), removeMetadataForLastFile(), resetParquetMetadata(), foreign_storage::throw_removed_file_error(), foreign_storage::throw_removed_row_in_file_error(), and updateMetadataForRolledOffFiles().

Referenced by populateChunkMetadata().

256  {
258  CHECK(catalog);
259  std::vector<std::string> new_file_paths;
260  auto processed_file_paths = getOrderedProcessedFilePaths();
261  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
262  auto all_file_paths = getAllFilePaths();
264  const auto rolled_off_files =
265  shared::check_for_rolled_off_file_paths(all_file_paths, processed_file_paths);
266  updateMetadataForRolledOffFiles(rolled_off_files);
267  }
268 
269  for (const auto& file_path : processed_file_paths) {
270  if (!shared::contains(all_file_paths, file_path)) {
271  throw_removed_file_error(file_path);
272  }
273  }
274 
275  // For multi-file appends, reprocess the last file in order to account for appends
276  // that may have occurred to this file. For single file appends, reprocess file if new
277  // rows have been added.
278  if (!processed_file_paths.empty()) {
279  // Single file append
280  if (all_file_paths.size() == 1) {
281  CHECK_EQ(processed_file_paths.size(), size_t(1));
282  CHECK_EQ(processed_file_paths[0], all_file_paths[0]);
283  }
284 
285  const auto& last_file_path = processed_file_paths.back();
286  // Since an existing file is being appended to we need to update the cached
287  // FileReader as the existing one will be out of date.
288  auto reader = file_reader_cache_->insert(last_file_path, file_system_);
289  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
290  if (row_count < last_file_row_count_) {
291  throw_removed_row_in_file_error(last_file_path);
292  } else if (row_count > last_file_row_count_) {
293  removeMetadataForLastFile(last_file_path);
294  new_file_paths.emplace_back(last_file_path);
295  }
296  }
297 
298  for (const auto& file_path : all_file_paths) {
299  if (!shared::contains(processed_file_paths, file_path)) {
300  new_file_paths.emplace_back(file_path);
301  }
302  }
303  } else {
304  CHECK(chunk_metadata_map_.empty());
305  new_file_paths = getAllFilePaths();
307  }
308 
309  if (!new_file_paths.empty()) {
310  metadataScanFiles(new_file_paths);
311  }
312 }
bool contains(const T &container, const U &element)
Definition: misc.h:195
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unique_ptr< FileReaderMap > file_reader_cache_
void updateMetadataForRolledOffFiles(const std::set< std::string > &rolled_off_files)
std::vector< std::string > getOrderedProcessedFilePaths()
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
std::set< std::string > check_for_rolled_off_file_paths(const std::vector< std::string > &all_file_paths, std::vector< std::string > &processed_file_paths)
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
void removeMetadataForLastFile(const std::string &last_file_path)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
static bool allowFileRollOff(const ForeignTable *foreign_table)
void metadataScanFiles(const std::vector< std::string > &file_paths)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::finalizeFragmentMap ( )
private

Definition at line 198 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanRowGroupMetadata().

198  {
201 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::vector< std::string > foreign_storage::ParquetDataWrapper::getAllFilePaths ( )
private

Definition at line 395 of file ParquetDataWrapper.cpp.

References DEBUG_TIMER, foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::AbstractFileStorageDataWrapper::getFilePathOptions(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, shared::local_glob_filter_sort_files(), foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

Referenced by fetchChunkMetadata(), and getDataPreview().

395  {
396  auto timer = DEBUG_TIMER(__func__);
397  std::vector<std::string> found_file_paths;
398  auto file_path = getFullFilePath(foreign_table_);
399  const auto file_path_options = getFilePathOptions(foreign_table_);
400  auto& server_options = foreign_table_->foreign_server->options;
401  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
402  found_file_paths = shared::local_glob_filter_sort_files(file_path, file_path_options);
403  } else {
404  UNREACHABLE();
405  }
406  return found_file_paths;
407 }
static shared::FilePathOptions getFilePathOptions(const ForeignTable *foreign_table)
#define UNREACHABLE()
Definition: Logger.h:337
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define DEBUG_TIMER(name)
Definition: Logger.h:411
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 70 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTER_FRAGMENT.

std::list< const ColumnDescriptor * > foreign_storage::ParquetDataWrapper::getColumnsToInitialize ( const Interval< ColumnType > &  column_interval)
private

Definition at line 127 of file ParquetDataWrapper.cpp.

References CHECK, db_id_, foreign_storage::Interval< T >::end, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), schema_, and foreign_storage::Interval< T >::start.

Referenced by initializeChunkBuffers().

128  {
130  CHECK(catalog);
131  const auto& columns = schema_->getLogicalAndPhysicalColumns();
132  auto column_start = column_interval.start;
133  auto column_end = column_interval.end;
134  std::list<const ColumnDescriptor*> columns_to_init;
135  for (const auto column : columns) {
136  auto column_id = column->columnId;
137  if (column_id >= column_start && column_id <= column_end) {
138  columns_to_init.push_back(column);
139  }
140  }
141  return columns_to_init;
142 }
std::unique_ptr< ForeignTableSchema > schema_
static SysCatalog & instance()
Definition: SysCatalog.h:343
T const end
Definition: Intervals.h:68
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

DataPreview foreign_storage::ParquetDataWrapper::getDataPreview ( const size_t  num_rows)

Definition at line 726 of file ParquetDataWrapper.cpp.

References file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), render_group_analyzer_map_, and TableDescriptor::tableName.

726  {
727  LazyParquetChunkLoader chunk_loader(file_system_,
728  file_reader_cache_.get(),
731  auto file_paths = getAllFilePaths();
732  if (file_paths.empty()) {
733  throw ForeignStorageException{"No file found at \"" +
735  }
736  return chunk_loader.previewFiles(file_paths, num_rows, *foreign_table_);
737 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::string tableName
std::vector< std::string > getAllFilePaths()
RenderGroupAnalyzerMap render_group_analyzer_map_
std::shared_ptr< arrow::fs::FileSystem > file_system_
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 72 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::vector< std::string > foreign_storage::ParquetDataWrapper::getOrderedProcessedFilePaths ( )
private

Definition at line 383 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_.

Referenced by fetchChunkMetadata().

383  {
384  std::vector<std::string> file_paths;
385  for (const auto& entry : fragment_to_row_group_interval_map_) {
386  for (const auto& row_group_interval : entry.second) {
387  if (file_paths.empty() || file_paths.back() != row_group_interval.file_path) {
388  file_paths.emplace_back(row_group_interval.file_path);
389  }
390  }
391  }
392  return file_paths;
393 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::list< RowGroupMetadata > foreign_storage::ParquetDataWrapper::getRowGroupMetadataForFilePaths ( const std::vector< std::string > &  file_paths) const
private

Definition at line 441 of file ParquetDataWrapper.cpp.

References do_metadata_stats_validation_, file_reader_cache_, file_system_, foreign_table_, foreign_storage::LazyParquetChunkLoader::metadataScan(), schema_, and TableDescriptor::tableName.

Referenced by getRowGroupMetadataMap(), and metadataScanFiles().

442  {
443  LazyParquetChunkLoader chunk_loader(
445  return chunk_loader.metadataScan(file_paths, *schema_, do_metadata_stats_validation_);
446 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::string tableName
std::unique_ptr< ForeignTableSchema > schema_
std::shared_ptr< arrow::fs::FileSystem > file_system_

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::map< FilePathAndRowGroup, RowGroupMetadata > foreign_storage::ParquetDataWrapper::getRowGroupMetadataMap ( const std::vector< std::string > &  file_paths) const
private

Definition at line 860 of file ParquetDataWrapper.cpp.

References getRowGroupMetadataForFilePaths().

Referenced by metadataScanRowGroupIntervals(), and updateMetadataForRolledOffFiles().

861  {
862  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
863  std::map<FilePathAndRowGroup, RowGroupMetadata> row_group_metadata_map;
864  for (const auto& row_group_metadata_item : row_group_metadata) {
865  row_group_metadata_map[{row_group_metadata_item.file_path,
866  row_group_metadata_item.row_group_index}] =
867  row_group_metadata_item;
868  }
869  return row_group_metadata_map;
870 }
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::string foreign_storage::ParquetDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 678 of file ParquetDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), fragment_to_row_group_interval_map_, last_file_row_count_, last_fragment_index_, last_fragment_row_count_, last_row_group_, total_row_count_, and foreign_storage::json_utils::write_to_string().

678  {
679  rapidjson::Document d;
680  d.SetObject();
681 
684  "fragment_to_row_group_interval_map",
685  d.GetAllocator());
686  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
688  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
690  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
692  d, total_row_count_, "total_row_count", d.GetAllocator());
694  d, last_file_row_count_, "last_file_row_count", d.GetAllocator());
695  return json_utils::write_to_string(d);
696 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:157
std::string write_to_string(const rapidjson::Document &document)

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::initializeChunkBuffers ( const int  fragment_index,
const Interval< ColumnType > &  column_interval,
const ChunkToBufferMap required_buffers,
const bool  reserve_buffers_and_set_stats = false 
)
private

Definition at line 144 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), getColumnsToInitialize(), kENCODING_NONE, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

148  {
149  for (const auto column : getColumnsToInitialize(column_interval)) {
150  Chunk_NS::Chunk chunk{column, false};
151  ChunkKey data_chunk_key;
152  if (column->columnType.is_varlen_indeed()) {
153  data_chunk_key = {
154  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
155  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
156  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
157  chunk.setBuffer(data_buffer);
158 
159  ChunkKey index_chunk_key{
160  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
161  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
162  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
163  chunk.setIndexBuffer(index_buffer);
164  } else {
165  data_chunk_key = {
166  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
167  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
168  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
169  chunk.setBuffer(data_buffer);
170  }
171  chunk.initEncoder();
172  if (reserve_buffers_and_set_stats) {
173  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
174  CHECK(metadata_it != chunk_metadata_map_.end());
175  auto buffer = chunk.getBuffer();
176  auto& metadata = metadata_it->second;
177  auto encoder = buffer->getEncoder();
178  encoder->resetChunkStats(metadata->chunkStats);
179  encoder->setNumElems(metadata->numElements);
180  if ((column->columnType.is_string() &&
181  column->columnType.get_compression() == kENCODING_NONE) ||
182  column->columnType.is_geometry()) {
183  // non-dictionary string or geometry WKT string
184  auto index_buffer = chunk.getIndexBuf();
185  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
186  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
187  auto index_buffer = chunk.getIndexBuf();
188  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
189  } else {
190  size_t num_bytes_to_reserve =
191  metadata->numElements * column->columnType.get_size();
192  buffer->reserve(num_bytes_to_reserve);
193  }
194  }
195  }
196 }
std::vector< int > ChunkKey
Definition: types.h:36
int32_t StringOffsetT
Definition: sqltypes.h:1258
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
int32_t ArrayOffsetT
Definition: sqltypes.h:1259
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isNewFile ( const std::string &  file_path) const
private

Definition at line 216 of file ParquetDataWrapper.cpp.

References foreign_storage::AbstractFileStorageDataWrapper::allowFileRollOff(), CHECK, CHECK_EQ, foreign_table_, fragment_to_row_group_interval_map_, and last_fragment_index_.

Referenced by metadataScanRowGroupMetadata().

216  {
217  const auto last_fragment_entry =
219  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
220 
221  // The entry for the first fragment starts out as an empty vector
222  if (last_fragment_entry->second.empty()) {
223  // File roll off can result in empty older fragments.
226  }
227  return true;
228  } else {
229  return (last_fragment_entry->second.back().file_path != file_path);
230  }
231 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
static bool allowFileRollOff(const ForeignTable *foreign_table)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 722 of file ParquetDataWrapper.cpp.

References is_restored_.

722  {
723  return is_restored_;
724 }
void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader ( const int  logical_column_id,
const int  fragment_id,
const ChunkToBufferMap required_buffers,
AbstractBuffer delete_buffer 
)
private

Definition at line 487 of file ParquetDataWrapper.cpp.

References Data_Namespace::AbstractBuffer::append(), CHECK, CHECK_GT, chunk_metadata_map_, ColumnDescriptor::columnType, db_id_, delete_buffer_mutex_, foreign_storage::Interval< T >::end, file_reader_cache_, file_system_, foreign_table_, fragment_to_row_group_interval_map_, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_elem_type(), shared::get_from_map(), SQLTypeInfo::get_physical_cols(), Catalog_Namespace::SysCatalog::getCatalog(), Data_Namespace::AbstractBuffer::getMemoryPtr(), initializeChunkBuffers(), Catalog_Namespace::SysCatalog::instance(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_geometry(), foreign_storage::LazyParquetChunkLoader::loadChunk(), render_group_analyzer_map_, schema_, Data_Namespace::AbstractBuffer::size(), foreign_storage::Interval< T >::start, TableDescriptor::tableId, and TableDescriptor::tableName.

Referenced by populateChunkBuffers().

491  {
492  const auto& row_group_intervals =
494  // File roll off can lead to an empty row group interval vector.
495  if (row_group_intervals.empty()) {
496  return;
497  }
498 
500  CHECK(catalog);
501  const ColumnDescriptor* logical_column =
502  schema_->getColumnDescriptor(logical_column_id);
503  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
504 
505  const Interval<ColumnType> column_interval = {
506  logical_column_id,
507  logical_column_id + logical_column->columnType.get_physical_cols()};
508  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
509 
510  const bool is_dictionary_encoded_string_column =
511  logical_column->columnType.is_dict_encoded_string() ||
512  (logical_column->columnType.is_array() &&
513  logical_column->columnType.get_elem_type().is_dict_encoded_string());
514 
515  StringDictionary* string_dictionary = nullptr;
516  if (is_dictionary_encoded_string_column) {
517  auto dict_descriptor =
518  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
519  CHECK(dict_descriptor);
520  string_dictionary = dict_descriptor->stringDict.get();
521  }
522 
523  std::list<Chunk_NS::Chunk> chunks;
524  for (int column_id = column_interval.start; column_id <= column_interval.end;
525  ++column_id) {
526  auto column_descriptor = schema_->getColumnDescriptor(column_id);
527  Chunk_NS::Chunk chunk{column_descriptor, false};
528  if (column_descriptor->columnType.is_varlen_indeed()) {
529  ChunkKey data_chunk_key = {
530  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
531  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
532  chunk.setBuffer(buffer);
533  ChunkKey index_chunk_key = {
534  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
535  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
536  chunk.setIndexBuffer(index_buffer);
537  } else {
538  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
539  auto buffer = shared::get_from_map(required_buffers, chunk_key);
540  chunk.setBuffer(buffer);
541  }
542  chunks.emplace_back(chunk);
543  }
544 
545  std::unique_ptr<RejectedRowIndices> rejected_row_indices;
546  if (delete_buffer) {
547  rejected_row_indices = std::make_unique<RejectedRowIndices>();
548  }
549  LazyParquetChunkLoader chunk_loader(file_system_,
550  file_reader_cache_.get(),
553  auto metadata = chunk_loader.loadChunk(row_group_intervals,
554  parquet_column_index,
555  chunks,
556  string_dictionary,
557  rejected_row_indices.get());
558 
559  if (delete_buffer) {
560  // all modifying operations on `delete_buffer` must be synchronized as it is a
561  // shared buffer
562  std::unique_lock<std::mutex> delete_buffer_lock(delete_buffer_mutex_);
563 
564  CHECK(!chunks.empty());
565  CHECK(chunks.begin()->getBuffer()->hasEncoder());
566  auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
567 
568  // ensure delete buffer is sized appropriately
569  if (delete_buffer->size() < num_rows_in_chunk) {
570  auto remaining_rows = num_rows_in_chunk - delete_buffer->size();
571  std::vector<int8_t> data(remaining_rows, false);
572  delete_buffer->append(data.data(), remaining_rows);
573  }
574 
575  // compute a logical OR with current `delete_buffer` contents and this chunks
576  // rejected indices
577  CHECK(rejected_row_indices);
578  auto delete_buffer_data = delete_buffer->getMemoryPtr();
579  for (const auto& rejected_index : *rejected_row_indices) {
580  CHECK_GT(delete_buffer->size(), static_cast<size_t>(rejected_index));
581  delete_buffer_data[rejected_index] = true;
582  }
583  }
584 
585  auto metadata_iter = metadata.begin();
586  for (int column_id = column_interval.start; column_id <= column_interval.end;
587  ++column_id, ++metadata_iter) {
588  auto column = schema_->getColumnDescriptor(column_id);
589  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
590  if (column->columnType.is_varlen_indeed()) {
591  data_chunk_key.emplace_back(1);
592  }
593  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
594 
595  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
596  // by executor
597  auto cached_metadata_previous =
598  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
599  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
600  std::make_shared<ChunkMetadata>();
601  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
602  *cached_metadata = *cached_metadata_previous;
603 
604  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
605  cached_metadata->numBytes =
606  shared::get_from_map(required_buffers, data_chunk_key)->size();
607 
608  // for certain types, update the metadata statistics
609  // should update the cache, and the internal chunk_metadata_map_
610  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
611  CHECK(metadata_iter != metadata.end());
612  cached_metadata->chunkStats = (*metadata_iter)->chunkStats;
613 
614  // Update stats on buffer so it is saved in cache
615  shared::get_from_map(required_buffers, data_chunk_key)
616  ->getEncoder()
617  ->resetChunkStats(cached_metadata->chunkStats);
618  }
619  }
620 }
std::vector< int > ChunkKey
Definition: types.h:36
std::unique_ptr< FileReaderMap > file_reader_cache_
std::string tableName
std::unique_ptr< ForeignTableSchema > schema_
virtual int8_t * getMemoryPtr()=0
#define CHECK_GT(x, y)
Definition: Logger.h:305
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int get_physical_cols() const
Definition: sqltypes.h:414
static SysCatalog & instance()
Definition: SysCatalog.h:343
T const end
Definition: Intervals.h:68
RenderGroupAnalyzerMap render_group_analyzer_map_
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
specifies the content in-memory of a row in the column metadata table
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:392
#define CHECK(condition)
Definition: Logger.h:291
bool is_geometry() const
Definition: sqltypes.h:592
bool is_dict_encoded_string() const
Definition: sqltypes.h:632
SQLTypeInfo columnType
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:963
bool is_array() const
Definition: sqltypes.h:588

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanFiles ( const std::vector< std::string > &  file_paths)
private

Definition at line 409 of file ParquetDataWrapper.cpp.

References getRowGroupMetadataForFilePaths(), and metadataScanRowGroupMetadata().

Referenced by fetchChunkMetadata().

409  {
410  auto row_group_metadata = getRowGroupMetadataForFilePaths(file_paths);
411  metadataScanRowGroupMetadata(row_group_metadata);
412 }
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
std::list< RowGroupMetadata > getRowGroupMetadataForFilePaths(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanRowGroupIntervals ( const std::vector< RowGroupInterval > &  row_group_intervals)
private

Definition at line 840 of file ParquetDataWrapper.cpp.

References shared::get_from_map(), getRowGroupMetadataMap(), and metadataScanRowGroupMetadata().

Referenced by removeMetadataForLastFile().

841  {
842  std::vector<std::string> file_paths;
843  for (const auto& row_group_interval : row_group_intervals) {
844  file_paths.emplace_back(row_group_interval.file_path);
845  }
846  auto row_group_metadata_map = getRowGroupMetadataMap(file_paths);
847  std::list<RowGroupMetadata> row_group_metadata;
848  for (const auto& row_group_interval : row_group_intervals) {
849  for (auto row_group = row_group_interval.start_index;
850  row_group <= row_group_interval.end_index;
851  row_group++) {
852  row_group_metadata.emplace_back(shared::get_from_map(
853  row_group_metadata_map, {row_group_interval.file_path, row_group}));
854  }
855  }
856  metadataScanRowGroupMetadata(row_group_metadata);
857 }
void metadataScanRowGroupMetadata(const std::list< RowGroupMetadata > &row_group_metadata)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanRowGroupMetadata ( const std::list< RowGroupMetadata > &  row_group_metadata)
private

Definition at line 414 of file ParquetDataWrapper.cpp.

References addNewFile(), addNewFragment(), CHECK_EQ, finalizeFragmentMap(), isNewFile(), last_fragment_index_, last_fragment_row_count_, last_row_group_, moveToNextFragment(), schema_, total_row_count_, and updateChunkMetadataForFragment().

Referenced by metadataScanFiles(), and metadataScanRowGroupIntervals().

415  {
416  auto column_interval =
417  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
418  schema_->getLogicalAndPhysicalColumns().back()->columnId};
419  for (const auto& row_group_metadata_item : row_group_metadata) {
420  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
421  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
422  const auto import_row_count = (*column_chunk_metadata_iter)->numElements;
423  auto row_group = row_group_metadata_item.row_group_index;
424  const auto& file_path = row_group_metadata_item.file_path;
425  if (moveToNextFragment(import_row_count)) {
426  addNewFragment(row_group, file_path);
427  } else if (isNewFile(file_path)) {
428  CHECK_EQ(row_group, 0);
429  addNewFile(file_path);
430  }
431  last_row_group_ = row_group;
433  column_interval, column_chunk_metadata, last_fragment_index_);
434 
435  last_fragment_row_count_ += import_row_count;
436  total_row_count_ += import_row_count;
437  }
439 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unique_ptr< ForeignTableSchema > schema_
void addNewFile(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::moveToNextFragment ( size_t  new_rows_count) const
private

Definition at line 474 of file ParquetDataWrapper.cpp.

References foreign_table_, last_fragment_row_count_, and TableDescriptor::maxFragRows.

Referenced by metadataScanRowGroupMetadata().

474  {
475  return (last_fragment_row_count_ + new_rows_count) >
476  static_cast<size_t>(foreign_table_->maxFragRows);
477 }

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 622 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, foreign_storage::create_futures_for_workers(), foreign_table_, foreign_storage::get_num_threads(), loadBuffersUsingLazyParquetChunkLoader(), and schema_.

624  {
625  ChunkToBufferMap buffers_to_load;
626  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
627  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
628 
629  CHECK(!buffers_to_load.empty());
630 
631  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
632  for (const auto& [chunk_key, buffer] : buffers_to_load) {
633  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
634  col_frag_hints.emplace(
635  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
636  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
637  }
638 
639  std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
640  [&, this](const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
641  for (const auto& [col_id, frag_id] : hint_set) {
643  col_id, frag_id, buffers_to_load, delete_buffer);
644  }
645  };
646 
649  auto futures = create_futures_for_workers(col_frag_hints, num_threads, lambda);
650 
651  // We wait on all futures, then call get because we want all threads to have finished
652  // before we propagate a potential exception.
653  for (auto& future : futures) {
654  future.wait();
655  }
656 
657  for (auto& future : futures) {
658  future.get();
659  }
660 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
size_t get_num_threads(const ForeignTable &table)
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:74
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
#define CHECK(condition)
Definition: Logger.h:291
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 479 of file ParquetDataWrapper.cpp.

References chunk_metadata_map_, and fetchChunkMetadata().

480  {
482  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
483  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
484  }
485 }
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::removeMetadataForLastFile ( const std::string &  last_file_path)
private

Definition at line 762 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_FRAGMENT_IDX, chunk_metadata_map_, fragment_to_row_group_interval_map_, shared::get_from_map(), last_fragment_index_, last_fragment_row_count_, last_row_group_, metadataScanRowGroupIntervals(), resetParquetMetadata(), and total_row_count_.

Referenced by fetchChunkMetadata().

762  {
763  std::optional<int32_t> first_deleted_fragment_id;
764  for (auto it = fragment_to_row_group_interval_map_.begin();
766  const auto fragment_id = it->first;
767  const auto& row_group_intervals = it->second;
768  for (const auto& row_group_interval : row_group_intervals) {
769  if (first_deleted_fragment_id.has_value()) {
770  // All subsequent fragments should map to the last file.
771  CHECK_EQ(last_file_path, row_group_interval.file_path);
772  } else if (last_file_path == row_group_interval.file_path) {
773  first_deleted_fragment_id = fragment_id;
774  }
775  }
776  if (first_deleted_fragment_id.has_value() &&
777  first_deleted_fragment_id.value() < fragment_id) {
779  } else {
780  it++;
781  }
782  }
783  CHECK(first_deleted_fragment_id.has_value());
784 
785  std::map<int32_t, size_t> remaining_fragments_row_counts;
786  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
787  auto fragment_id = it->first[CHUNK_KEY_FRAGMENT_IDX];
788  if (fragment_id >= first_deleted_fragment_id.value()) {
789  it = chunk_metadata_map_.erase(it);
790  } else {
791  auto fragment_count_it = remaining_fragments_row_counts.find(fragment_id);
792  if (fragment_count_it == remaining_fragments_row_counts.end()) {
793  remaining_fragments_row_counts[fragment_id] = it->second->numElements;
794  } else {
795  CHECK_EQ(remaining_fragments_row_counts[fragment_id], it->second->numElements);
796  }
797  it++;
798  }
799  }
800 
801  total_row_count_ = 0;
802  for (const auto [fragment_id, row_count] : remaining_fragments_row_counts) {
803  total_row_count_ += row_count;
804  }
805 
806  // Re-populate metadata for last fragment with deleted rows, excluding metadata for the
807  // last file.
808  auto row_group_intervals_to_scan = shared::get_from_map(
809  fragment_to_row_group_interval_map_, first_deleted_fragment_id.value());
810  auto it = std::find_if(row_group_intervals_to_scan.begin(),
811  row_group_intervals_to_scan.end(),
812  [&last_file_path](const auto& row_group_interval) {
813  return row_group_interval.file_path == last_file_path;
814  });
815  CHECK(it != row_group_intervals_to_scan.end());
816  row_group_intervals_to_scan.erase(it, row_group_intervals_to_scan.end());
817 
818  if (first_deleted_fragment_id.value() > 0) {
819  last_fragment_index_ = first_deleted_fragment_id.value() - 1;
821  shared::get_from_map(remaining_fragments_row_counts, last_fragment_index_);
822  const auto& last_row_group_intervals =
824  if (last_row_group_intervals.empty()) {
825  last_row_group_ = 0;
826  } else {
827  last_row_group_ = last_row_group_intervals.back().end_index;
828  }
829  fragment_to_row_group_interval_map_.erase(first_deleted_fragment_id.value());
830  } else {
831  CHECK_EQ(total_row_count_, size_t(0));
833  }
834 
835  if (!row_group_intervals_to_scan.empty()) {
836  metadataScanRowGroupIntervals(row_group_intervals_to_scan);
837  }
838 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
void metadataScanRowGroupIntervals(const std::vector< RowGroupInterval > &row_group_intervals)
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::resetParquetMetadata ( )
private

Definition at line 115 of file ParquetDataWrapper.cpp.

References file_reader_cache_, fragment_to_row_group_interval_map_, last_file_row_count_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and total_row_count_.

Referenced by fetchChunkMetadata(), and removeMetadataForLastFile().

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 698 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, fragment_to_row_group_interval_map_, foreign_storage::json_utils::get_value_from_object(), is_restored_, last_file_row_count_, last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::json_utils::read_from_file(), and total_row_count_.

700  {
701  auto d = json_utils::read_from_file(file_path);
702  CHECK(d.IsObject());
703 
705  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
707  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
709  d, last_fragment_row_count_, "last_fragment_row_count");
711  if (d.HasMember("last_file_row_count")) {
712  json_utils::get_value_from_object(d, last_file_row_count_, "last_file_row_count");
713  }
714 
715  CHECK(chunk_metadata_map_.empty());
716  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
717  chunk_metadata_map_[chunk_key] = chunk_metadata;
718  }
719  is_restored_ = true;
720 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:172
rapidjson::Document read_from_file(const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::setLastFileRowCount ( const std::string &  file_path)
private

Definition at line 251 of file ParquetDataWrapper.cpp.

References file_reader_cache_, file_system_, and last_file_row_count_.

Referenced by addNewFile(), and addNewFragment().

251  {
252  auto reader = file_reader_cache_->getOrInsert(file_path, file_system_);
253  last_file_row_count_ = reader->parquet_reader()->metadata()->num_rows();
254 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::shared_ptr< arrow::fs::FileSystem > file_system_

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::updateChunkMetadataForFragment ( const Interval< ColumnType > &  column_interval,
const std::list< std::shared_ptr< ChunkMetadata >> &  column_chunk_metadata,
int32_t  fragment_id 
)
private

Definition at line 448 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, foreign_storage::Interval< T >::end, foreign_table_, foreign_storage::anonymous_namespace{ParquetDataWrapper.cpp}::reduce_metadata(), schema_, foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by metadataScanRowGroupMetadata(), and updateMetadataForRolledOffFiles().

451  {
452  CHECK_EQ(static_cast<int>(column_chunk_metadata.size()),
453  schema_->numLogicalAndPhysicalColumns());
454  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
455  for (auto column_id = column_interval.start; column_id <= column_interval.end;
456  column_id++, column_chunk_metadata_iter++) {
457  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
458  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
459  const auto& type_info = column_descriptor->columnType;
460  ChunkKey const data_chunk_key =
461  type_info.is_varlen_indeed()
462  ? ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id, 1}
463  : ChunkKey{db_id_, foreign_table_->tableId, column_id, fragment_id};
464  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
465 
466  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
467  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
468  } else {
469  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
470  }
471  }
472 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
std::unique_ptr< ForeignTableSchema > schema_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
T const end
Definition: Intervals.h:68
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::updateMetadataForRolledOffFiles ( const std::set< std::string > &  rolled_off_files)
private

Definition at line 314 of file ParquetDataWrapper.cpp.

References CHECK, CHUNK_KEY_FRAGMENT_IDX, chunk_metadata_map_, shared::contains(), fragment_to_row_group_interval_map_, shared::get_from_map(), getRowGroupMetadataMap(), schema_, and updateChunkMetadataForFragment().

Referenced by fetchChunkMetadata().

315  {
316  if (!rolled_off_files.empty()) {
317  std::set<int32_t> deleted_fragment_ids;
318  std::optional<int32_t> partially_deleted_fragment_id;
319  std::vector<std::string> remaining_files_in_partially_deleted_fragment;
320  for (auto& [fragment_id, row_group_interval_vec] :
322  for (auto it = row_group_interval_vec.begin();
323  it != row_group_interval_vec.end();) {
324  if (shared::contains(rolled_off_files, it->file_path)) {
325  it = row_group_interval_vec.erase(it);
326  } else {
327  remaining_files_in_partially_deleted_fragment.emplace_back(it->file_path);
328  it++;
329  }
330  }
331  if (row_group_interval_vec.empty()) {
332  deleted_fragment_ids.emplace(fragment_id);
333  } else {
334  CHECK(!remaining_files_in_partially_deleted_fragment.empty());
335  partially_deleted_fragment_id = fragment_id;
336  break;
337  }
338  }
339 
340  for (auto it = chunk_metadata_map_.begin(); it != chunk_metadata_map_.end();) {
341  const auto& chunk_key = it->first;
342  if (shared::contains(deleted_fragment_ids, chunk_key[CHUNK_KEY_FRAGMENT_IDX])) {
343  auto& chunk_metadata = it->second;
344  chunk_metadata->numElements = 0;
345  chunk_metadata->numBytes = 0;
346  it++;
347  } else if (partially_deleted_fragment_id.has_value() &&
348  chunk_key[CHUNK_KEY_FRAGMENT_IDX] == partially_deleted_fragment_id) {
349  // Metadata for the partially deleted fragment will be re-populated.
350  it = chunk_metadata_map_.erase(it);
351  } else {
352  it++;
353  }
354  }
355 
356  if (partially_deleted_fragment_id.has_value()) {
357  // Create map of row group to row group metadata for remaining files in the
358  // fragment.
359  auto row_group_metadata_map =
360  getRowGroupMetadataMap(remaining_files_in_partially_deleted_fragment);
361 
362  // Re-populate metadata for remaining row groups in partially deleted fragment.
363  auto column_interval =
364  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
365  schema_->getLogicalAndPhysicalColumns().back()->columnId};
366  auto row_group_intervals = shared::get_from_map(
367  fragment_to_row_group_interval_map_, partially_deleted_fragment_id.value());
368  for (const auto& row_group_interval : row_group_intervals) {
369  for (auto row_group = row_group_interval.start_index;
370  row_group <= row_group_interval.end_index;
371  row_group++) {
372  const auto& row_group_metadata_item = shared::get_from_map(
373  row_group_metadata_map, {row_group_interval.file_path, row_group});
374  updateChunkMetadataForFragment(column_interval,
375  row_group_metadata_item.column_chunk_metadata,
376  partially_deleted_fragment_id.value());
377  }
378  }
379  }
380  }
381 }
bool contains(const T &container, const U &element)
Definition: misc.h:195
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:41
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void updateChunkMetadataForFragment(const Interval< ColumnType > &column_interval, const std::list< std::shared_ptr< ChunkMetadata >> &column_chunk_metadata, int32_t fragment_id)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:61
#define CHECK(condition)
Definition: Logger.h:291
std::map< FilePathAndRowGroup, RowGroupMetadata > getRowGroupMetadataMap(const std::vector< std::string > &file_paths) const

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::mutex foreign_storage::ParquetDataWrapper::delete_buffer_mutex_
private

Definition at line 147 of file ParquetDataWrapper.h.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

const bool foreign_storage::ParquetDataWrapper::do_metadata_stats_validation_
private

Definition at line 132 of file ParquetDataWrapper.h.

Referenced by getRowGroupMetadataForFilePaths().

std::unique_ptr<FileReaderMap> foreign_storage::ParquetDataWrapper::file_reader_cache_
private
std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetDataWrapper::file_system_
private
bool foreign_storage::ParquetDataWrapper::is_restored_
private

Definition at line 142 of file ParquetDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

size_t foreign_storage::ParquetDataWrapper::last_file_row_count_
private
size_t foreign_storage::ParquetDataWrapper::last_fragment_row_count_
private
RenderGroupAnalyzerMap foreign_storage::ParquetDataWrapper::render_group_analyzer_map_
private
size_t foreign_storage::ParquetDataWrapper::total_row_count_
private

The documentation for this class was generated from the following files: