OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::ParquetDataWrapper Class Reference

#include <ParquetDataWrapper.h>

+ Inheritance diagram for foreign_storage::ParquetDataWrapper:
+ Collaboration diagram for foreign_storage::ParquetDataWrapper:

Public Member Functions

 ParquetDataWrapper ()
 
 ParquetDataWrapper (const int db_id, const ForeignTable *foreign_table, const bool do_metadata_stats_validation=true)
 
 ParquetDataWrapper (const ForeignTable *foreign_table, std::shared_ptr< arrow::fs::FileSystem > file_system)
 Constructor intended for detect use-case only. More...
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
void createRenderGroupAnalyzers () override
 Create RenderGroupAnalyzers for poly columns. More...
 
DataPreview getDataPreview (const size_t num_rows)
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual const std::set
< std::string > 
getAlterableTableOptions () const
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Private Member Functions

std::list< const
ColumnDescriptor * > 
getColumnsToInitialize (const Interval< ColumnType > &column_interval)
 
void initializeChunkBuffers (const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
 
void fetchChunkMetadata ()
 
void loadBuffersUsingLazyParquetChunkLoader (const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
 
std::set< std::string > getProcessedFilePaths ()
 
std::vector< std::string > getAllFilePaths ()
 
bool moveToNextFragment (size_t new_rows_count) const
 
void finalizeFragmentMap ()
 
void addNewFragment (int row_group, const std::string &file_path)
 
bool isNewFile (const std::string &file_path) const
 
void addNewFile (const std::string &file_path)
 
void resetParquetMetadata ()
 
void metadataScanFiles (const std::vector< std::string > &file_paths)
 

Private Attributes

const bool do_metadata_stats_validation_
 
std::map< int, std::vector
< RowGroupInterval > > 
fragment_to_row_group_interval_map_
 
std::map< ChunkKey,
std::shared_ptr< ChunkMetadata > > 
chunk_metadata_map_
 
const int db_id_
 
const ForeignTableforeign_table_
 
int last_fragment_index_
 
size_t last_fragment_row_count_
 
size_t total_row_count_
 
int last_row_group_
 
bool is_restored_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 
std::mutex delete_buffer_mutex_
 
RenderGroupAnalyzerMap render_group_analyzer_map_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 40 of file ParquetDataWrapper.h.

Constructor & Destructor Documentation

foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( )
foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const int  db_id,
const ForeignTable foreign_table,
const bool  do_metadata_stats_validation = true 
)

Definition at line 92 of file ParquetDataWrapper.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

95  : do_metadata_stats_validation_(do_metadata_stats_validation)
96  , db_id_(db_id)
97  , foreign_table_(foreign_table)
100  , total_row_count_(0)
101  , last_row_group_(0)
102  , is_restored_(false)
103  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
104  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
105  auto& server_options = foreign_table->foreign_server->options;
106  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
107  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
108  } else {
109  UNREACHABLE();
110  }
111 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:267
std::shared_ptr< arrow::fs::FileSystem > file_system_
foreign_storage::ParquetDataWrapper::ParquetDataWrapper ( const ForeignTable foreign_table,
std::shared_ptr< arrow::fs::FileSystem >  file_system 
)

Constructor intended for detect use-case only.

Definition at line 79 of file ParquetDataWrapper.cpp.

82  , db_id_(-1)
83  , foreign_table_(foreign_table)
86  , total_row_count_(0)
87  , last_row_group_(0)
88  , is_restored_(false)
89  , file_system_(file_system)
90  , file_reader_cache_(std::make_unique<FileReaderMap>()) {}
std::unique_ptr< FileReaderMap > file_reader_cache_
std::shared_ptr< arrow::fs::FileSystem > file_system_

Member Function Documentation

void foreign_storage::ParquetDataWrapper::addNewFile ( const std::string &  file_path)
private

Definition at line 226 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

226  {
227  const auto last_fragment_entry =
229  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
230 
231  // The entry for the first fragment starts out as an empty vector
232  if (last_fragment_entry->second.empty()) {
234  } else {
235  last_fragment_entry->second.back().end_index = last_row_group_;
236  }
237  last_fragment_entry->second.emplace_back(RowGroupInterval{file_path, 0});
238 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::addNewFragment ( int  row_group,
const std::string &  file_path 
)
private

Definition at line 200 of file ParquetDataWrapper.cpp.

References CHECK, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, and last_row_group_.

Referenced by metadataScanFiles().

200  {
201  const auto last_fragment_entry =
203  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
204 
205  last_fragment_entry->second.back().end_index = last_row_group_;
209  RowGroupInterval{file_path, row_group});
210 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::createRenderGroupAnalyzers ( )
overridevirtual

Create RenderGroupAnalyzers for poly columns.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 623 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_GE, db_id_, foreign_table_, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), IS_GEO_POLY, render_group_analyzer_map_, and TableDescriptor::tableId.

623  {
624  // must have these
625  CHECK_GE(db_id_, 0);
627 
628  // populate map for all poly columns in this table
630  CHECK(catalog);
631  auto columns =
632  catalog->getAllColumnMetadataForTable(foreign_table_->tableId, false, false, true);
633  for (auto const& column : columns) {
634  if (IS_GEO_POLY(column->columnType.get_type())) {
636  .try_emplace(column->columnId,
637  std::make_unique<import_export::RenderGroupAnalyzer>())
638  .second);
639  }
640  }
641 }
#define CHECK_GE(x, y)
Definition: Logger.h:236
static SysCatalog & instance()
Definition: SysCatalog.h:337
RenderGroupAnalyzerMap render_group_analyzer_map_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:223
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::fetchChunkMetadata ( )
private

Definition at line 240 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, chunk_metadata_map_, shared::contains(), db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), Catalog_Namespace::SysCatalog::getCatalog(), getProcessedFilePaths(), Catalog_Namespace::SysCatalog::instance(), foreign_storage::ForeignTable::isAppendMode(), metadataScanFiles(), resetParquetMetadata(), foreign_storage::throw_removed_file_error(), foreign_storage::throw_removed_row_in_file_error(), and total_row_count_.

Referenced by populateChunkMetadata().

240  {
242  CHECK(catalog);
243  std::vector<std::string> new_file_paths;
244  auto processed_file_paths = getProcessedFilePaths();
245  if (foreign_table_->isAppendMode() && !processed_file_paths.empty()) {
246  auto all_file_paths = getAllFilePaths();
247  for (const auto& file_path : processed_file_paths) {
248  if (!shared::contains(all_file_paths, file_path)) {
249  throw_removed_file_error(file_path);
250  }
251  }
252 
253  for (const auto& file_path : all_file_paths) {
254  if (!shared::contains(processed_file_paths, file_path)) {
255  new_file_paths.emplace_back(file_path);
256  }
257  }
258 
259  // Single file append
260  // If an append occurs with multiple files, then we assume any existing files have
261  // not been altered. If an append occurs on a single file, then we check to see if
262  // it has changed.
263  if (new_file_paths.empty() && all_file_paths.size() == 1) {
264  CHECK_EQ(processed_file_paths.size(), static_cast<size_t>(1));
265  const auto& file_path = *all_file_paths.begin();
266  CHECK_EQ(*processed_file_paths.begin(), file_path);
267 
268  // Since an existing file is being appended to we need to update the cached
269  // FileReader as the existing one will be out of date.
270  auto reader = file_reader_cache_->insert(file_path, file_system_);
271  size_t row_count = reader->parquet_reader()->metadata()->num_rows();
272 
273  if (row_count < total_row_count_) {
275  } else if (row_count > total_row_count_) {
276  new_file_paths = all_file_paths;
277  chunk_metadata_map_.clear();
279  }
280  }
281  } else {
282  CHECK(chunk_metadata_map_.empty());
283  new_file_paths = getAllFilePaths();
285  }
286 
287  if (!new_file_paths.empty()) {
288  metadataScanFiles(new_file_paths);
289  }
290 }
bool contains(const T &container, const U &element)
Definition: misc.h:196
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::set< std::string > getProcessedFilePaths()
std::unique_ptr< FileReaderMap > file_reader_cache_
std::vector< std::string > getAllFilePaths()
void throw_removed_row_in_file_error(const std::string &file_path)
void throw_removed_file_error(const std::string &file_path)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
static SysCatalog & instance()
Definition: SysCatalog.h:337
std::shared_ptr< arrow::fs::FileSystem > file_system_
bool isAppendMode() const
Checks if the table is in append mode.
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void metadataScanFiles(const std::vector< std::string > &file_paths)
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::finalizeFragmentMap ( )
private

Definition at line 195 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_, last_fragment_index_, and last_row_group_.

Referenced by metadataScanFiles().

195  {
198 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::vector< std::string > foreign_storage::ParquetDataWrapper::getAllFilePaths ( )
private

Definition at line 302 of file ParquetDataWrapper.cpp.

References DEBUG_TIMER, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_ORDER_BY_KEY, foreign_storage::AbstractFileStorageDataWrapper::FILE_SORT_REGEX_KEY, foreign_storage::ForeignTable::foreign_server, foreign_table_, foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::OptionsContainer::getOption(), foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, shared::local_glob_filter_sort_files(), foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

Referenced by fetchChunkMetadata(), and getDataPreview().

302  {
303  auto timer = DEBUG_TIMER(__func__);
304  std::vector<std::string> found_file_paths;
305  auto file_path = getFullFilePath(foreign_table_);
306  const auto& regex_pattern = foreign_table_->getOption(REGEX_PATH_FILTER_KEY);
307  const auto& sort_by = foreign_table_->getOption(FILE_SORT_ORDER_BY_KEY);
308  const auto& sort_regex = foreign_table_->getOption(FILE_SORT_REGEX_KEY);
309 
310  auto& server_options = foreign_table_->foreign_server->options;
311  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
312  found_file_paths = shared::local_glob_filter_sort_files(
313  file_path, regex_pattern, sort_by, sort_regex);
314  } else {
315  UNREACHABLE();
316  }
317  return found_file_paths;
318 }
#define UNREACHABLE()
Definition: Logger.h:267
std::optional< std::string > getOption(const std::string_view &key) const
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex, const bool recurse)
const ForeignServer * foreign_server
Definition: ForeignTable.h:56
#define DEBUG_TIMER(name)
Definition: Logger.h:370
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 68 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTER_FRAGMENT.

std::list< const ColumnDescriptor * > foreign_storage::ParquetDataWrapper::getColumnsToInitialize ( const Interval< ColumnType > &  column_interval)
private

Definition at line 124 of file ParquetDataWrapper.cpp.

References CHECK, db_id_, foreign_storage::Interval< T >::end, Catalog_Namespace::SysCatalog::getCatalog(), Catalog_Namespace::SysCatalog::instance(), schema_, and foreign_storage::Interval< T >::start.

Referenced by initializeChunkBuffers().

125  {
127  CHECK(catalog);
128  const auto& columns = schema_->getLogicalAndPhysicalColumns();
129  auto column_start = column_interval.start;
130  auto column_end = column_interval.end;
131  std::list<const ColumnDescriptor*> columns_to_init;
132  for (const auto column : columns) {
133  auto column_id = column->columnId;
134  if (column_id >= column_start && column_id <= column_end) {
135  columns_to_init.push_back(column);
136  }
137  }
138  return columns_to_init;
139 }
std::unique_ptr< ForeignTableSchema > schema_
static SysCatalog & instance()
Definition: SysCatalog.h:337
T const end
Definition: Intervals.h:68
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

DataPreview foreign_storage::ParquetDataWrapper::getDataPreview ( const size_t  num_rows)

Definition at line 609 of file ParquetDataWrapper.cpp.

References file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), and render_group_analyzer_map_.

609  {
610  LazyParquetChunkLoader chunk_loader(
612  auto file_paths = getAllFilePaths();
613  if (file_paths.empty()) {
614  throw ForeignStorageException{"No file found at \"" +
616  }
617  return chunk_loader.previewFiles(file_paths, num_rows);
618 }
std::unique_ptr< FileReaderMap > file_reader_cache_
std::vector< std::string > getAllFilePaths()
RenderGroupAnalyzerMap render_group_analyzer_map_
std::shared_ptr< arrow::fs::FileSystem > file_system_
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::ParquetDataWrapper::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 70 of file ParquetDataWrapper.h.

References foreign_storage::ForeignDataWrapper::INTRA_FRAGMENT.

std::set< std::string > foreign_storage::ParquetDataWrapper::getProcessedFilePaths ( )
private

Definition at line 292 of file ParquetDataWrapper.cpp.

References fragment_to_row_group_interval_map_.

Referenced by fetchChunkMetadata().

292  {
293  std::set<std::string> file_paths;
294  for (const auto& entry : fragment_to_row_group_interval_map_) {
295  for (const auto& row_group_interval : entry.second) {
296  file_paths.emplace(row_group_interval.file_path);
297  }
298  }
299  return file_paths;
300 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_

+ Here is the caller graph for this function:

std::string foreign_storage::ParquetDataWrapper::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 566 of file ParquetDataWrapper.cpp.

References foreign_storage::json_utils::add_value_to_object(), fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, total_row_count_, and foreign_storage::json_utils::write_to_string().

566  {
567  rapidjson::Document d;
568  d.SetObject();
569 
572  "fragment_to_row_group_interval_map",
573  d.GetAllocator());
574  json_utils::add_value_to_object(d, last_row_group_, "last_row_group", d.GetAllocator());
576  d, last_fragment_index_, "last_fragment_index", d.GetAllocator());
578  d, last_fragment_row_count_, "last_fragment_row_count", d.GetAllocator());
580  d, total_row_count_, "total_row_count", d.GetAllocator());
581  return json_utils::write_to_string(d);
582 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:149
std::string write_to_string(const rapidjson::Document &document)

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::initializeChunkBuffers ( const int  fragment_index,
const Interval< ColumnType > &  column_interval,
const ChunkToBufferMap required_buffers,
const bool  reserve_buffers_and_set_stats = false 
)
private

Definition at line 141 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, db_id_, foreign_table_, shared::get_from_map(), getColumnsToInitialize(), kENCODING_NONE, and TableDescriptor::tableId.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

145  {
146  for (const auto column : getColumnsToInitialize(column_interval)) {
147  Chunk_NS::Chunk chunk{column, false};
148  ChunkKey data_chunk_key;
149  if (column->columnType.is_varlen_indeed()) {
150  data_chunk_key = {
151  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 1};
152  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
153  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
154  chunk.setBuffer(data_buffer);
155 
156  ChunkKey index_chunk_key{
157  db_id_, foreign_table_->tableId, column->columnId, fragment_index, 2};
158  CHECK(required_buffers.find(index_chunk_key) != required_buffers.end());
159  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
160  chunk.setIndexBuffer(index_buffer);
161  } else {
162  data_chunk_key = {
163  db_id_, foreign_table_->tableId, column->columnId, fragment_index};
164  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
165  auto data_buffer = shared::get_from_map(required_buffers, data_chunk_key);
166  chunk.setBuffer(data_buffer);
167  }
168  chunk.initEncoder();
169  if (reserve_buffers_and_set_stats) {
170  const auto metadata_it = chunk_metadata_map_.find(data_chunk_key);
171  CHECK(metadata_it != chunk_metadata_map_.end());
172  auto buffer = chunk.getBuffer();
173  auto& metadata = metadata_it->second;
174  auto encoder = buffer->getEncoder();
175  encoder->resetChunkStats(metadata->chunkStats);
176  encoder->setNumElems(metadata->numElements);
177  if ((column->columnType.is_string() &&
178  column->columnType.get_compression() == kENCODING_NONE) ||
179  column->columnType.is_geometry()) {
180  // non-dictionary string or geometry WKT string
181  auto index_buffer = chunk.getIndexBuf();
182  index_buffer->reserve(sizeof(StringOffsetT) * (metadata->numElements + 1));
183  } else if (!column->columnType.is_fixlen_array() && column->columnType.is_array()) {
184  auto index_buffer = chunk.getIndexBuf();
185  index_buffer->reserve(sizeof(ArrayOffsetT) * (metadata->numElements + 1));
186  } else {
187  size_t num_bytes_to_reserve =
188  metadata->numElements * column->columnType.get_size();
189  buffer->reserve(num_bytes_to_reserve);
190  }
191  }
192  }
193 }
std::vector< int > ChunkKey
Definition: types.h:37
int32_t StringOffsetT
Definition: sqltypes.h:1113
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
std::list< const ColumnDescriptor * > getColumnsToInitialize(const Interval< ColumnType > &column_interval)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
int32_t ArrayOffsetT
Definition: sqltypes.h:1114
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isNewFile ( const std::string &  file_path) const
private

Definition at line 212 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, fragment_to_row_group_interval_map_, and last_fragment_index_.

Referenced by metadataScanFiles().

212  {
213  const auto last_fragment_entry =
215  CHECK(last_fragment_entry != fragment_to_row_group_interval_map_.end());
216 
217  // The entry for the first fragment starts out as an empty vector
218  if (last_fragment_entry->second.empty()) {
220  return true;
221  } else {
222  return (last_fragment_entry->second.back().file_path != file_path);
223  }
224 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 605 of file ParquetDataWrapper.cpp.

References is_restored_.

605  {
606  return is_restored_;
607 }
void foreign_storage::ParquetDataWrapper::loadBuffersUsingLazyParquetChunkLoader ( const int  logical_column_id,
const int  fragment_id,
const ChunkToBufferMap required_buffers,
AbstractBuffer delete_buffer 
)
private

Definition at line 382 of file ParquetDataWrapper.cpp.

References Data_Namespace::AbstractBuffer::append(), CHECK, CHECK_GT, chunk_metadata_map_, ColumnDescriptor::columnType, db_id_, delete_buffer_mutex_, foreign_storage::Interval< T >::end, file_reader_cache_, file_system_, foreign_table_, fragment_to_row_group_interval_map_, SQLTypeInfo::get_comp_param(), SQLTypeInfo::get_elem_type(), shared::get_from_map(), SQLTypeInfo::get_physical_cols(), Catalog_Namespace::SysCatalog::getCatalog(), Data_Namespace::AbstractBuffer::getMemoryPtr(), initializeChunkBuffers(), Catalog_Namespace::SysCatalog::instance(), SQLTypeInfo::is_array(), SQLTypeInfo::is_dict_encoded_string(), SQLTypeInfo::is_geometry(), foreign_storage::LazyParquetChunkLoader::loadChunk(), render_group_analyzer_map_, schema_, Data_Namespace::AbstractBuffer::size(), foreign_storage::Interval< T >::start, and TableDescriptor::tableId.

Referenced by populateChunkBuffers().

386  {
388  CHECK(catalog);
389  const ColumnDescriptor* logical_column =
390  schema_->getColumnDescriptor(logical_column_id);
391  auto parquet_column_index = schema_->getParquetColumnIndex(logical_column_id);
392 
393  const Interval<ColumnType> column_interval = {
394  logical_column_id,
395  logical_column_id + logical_column->columnType.get_physical_cols()};
396  initializeChunkBuffers(fragment_id, column_interval, required_buffers, true);
397 
398  const auto& row_group_intervals =
400 
401  const bool is_dictionary_encoded_string_column =
402  logical_column->columnType.is_dict_encoded_string() ||
403  (logical_column->columnType.is_array() &&
404  logical_column->columnType.get_elem_type().is_dict_encoded_string());
405 
406  StringDictionary* string_dictionary = nullptr;
407  if (is_dictionary_encoded_string_column) {
408  auto dict_descriptor =
409  catalog->getMetadataForDict(logical_column->columnType.get_comp_param(), true);
410  CHECK(dict_descriptor);
411  string_dictionary = dict_descriptor->stringDict.get();
412  }
413 
414  std::list<Chunk_NS::Chunk> chunks;
415  for (int column_id = column_interval.start; column_id <= column_interval.end;
416  ++column_id) {
417  auto column_descriptor = schema_->getColumnDescriptor(column_id);
418  Chunk_NS::Chunk chunk{column_descriptor, false};
419  if (column_descriptor->columnType.is_varlen_indeed()) {
420  ChunkKey data_chunk_key = {
421  db_id_, foreign_table_->tableId, column_id, fragment_id, 1};
422  auto buffer = shared::get_from_map(required_buffers, data_chunk_key);
423  chunk.setBuffer(buffer);
424  ChunkKey index_chunk_key = {
425  db_id_, foreign_table_->tableId, column_id, fragment_id, 2};
426  auto index_buffer = shared::get_from_map(required_buffers, index_chunk_key);
427  chunk.setIndexBuffer(index_buffer);
428  } else {
429  ChunkKey chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
430  auto buffer = shared::get_from_map(required_buffers, chunk_key);
431  chunk.setBuffer(buffer);
432  }
433  chunks.emplace_back(chunk);
434  }
435 
436  std::unique_ptr<RejectedRowIndices> rejected_row_indices;
437  if (delete_buffer) {
438  rejected_row_indices = std::make_unique<RejectedRowIndices>();
439  }
440  LazyParquetChunkLoader chunk_loader(
442  auto metadata = chunk_loader.loadChunk(row_group_intervals,
443  parquet_column_index,
444  chunks,
445  string_dictionary,
446  rejected_row_indices.get());
447 
448  if (delete_buffer) {
449  // all modifying operations on `delete_buffer` must be synchronized as it is a
450  // shared buffer
451  std::unique_lock<std::mutex> delete_buffer_lock(delete_buffer_mutex_);
452 
453  CHECK(!chunks.empty());
454  CHECK(chunks.begin()->getBuffer()->hasEncoder());
455  auto num_rows_in_chunk = chunks.begin()->getBuffer()->getEncoder()->getNumElems();
456 
457  // ensure delete buffer is sized appropriately
458  if (delete_buffer->size() < num_rows_in_chunk) {
459  auto remaining_rows = num_rows_in_chunk - delete_buffer->size();
460  std::vector<int8_t> data(remaining_rows, false);
461  delete_buffer->append(data.data(), remaining_rows);
462  }
463 
464  // compute a logical OR with current `delete_buffer` contents and this chunks
465  // rejected indices
466  CHECK(rejected_row_indices);
467  auto delete_buffer_data = delete_buffer->getMemoryPtr();
468  for (const auto& rejected_index : *rejected_row_indices) {
469  CHECK_GT(delete_buffer->size(), static_cast<size_t>(rejected_index));
470  delete_buffer_data[rejected_index] = true;
471  }
472  }
473 
474  auto metadata_iter = metadata.begin();
475  for (int column_id = column_interval.start; column_id <= column_interval.end;
476  ++column_id, ++metadata_iter) {
477  auto column = schema_->getColumnDescriptor(column_id);
478  ChunkKey data_chunk_key = {db_id_, foreign_table_->tableId, column_id, fragment_id};
479  if (column->columnType.is_varlen_indeed()) {
480  data_chunk_key.emplace_back(1);
481  }
482  CHECK(chunk_metadata_map_.find(data_chunk_key) != chunk_metadata_map_.end());
483 
484  // Allocate new shared_ptr for metadata so we dont modify old one which may be used
485  // by executor
486  auto cached_metadata_previous =
487  shared::get_from_map(chunk_metadata_map_, data_chunk_key);
488  shared::get_from_map(chunk_metadata_map_, data_chunk_key) =
489  std::make_shared<ChunkMetadata>();
490  auto cached_metadata = shared::get_from_map(chunk_metadata_map_, data_chunk_key);
491  *cached_metadata = *cached_metadata_previous;
492 
493  CHECK(required_buffers.find(data_chunk_key) != required_buffers.end());
494  cached_metadata->numBytes =
495  shared::get_from_map(required_buffers, data_chunk_key)->size();
496 
497  // for certain types, update the metadata statistics
498  // should update the cache, and the internal chunk_metadata_map_
499  if (is_dictionary_encoded_string_column || logical_column->columnType.is_geometry()) {
500  CHECK(metadata_iter != metadata.end());
501  auto& chunk_metadata_ptr = *metadata_iter;
502  cached_metadata->chunkStats.max = chunk_metadata_ptr->chunkStats.max;
503  cached_metadata->chunkStats.min = chunk_metadata_ptr->chunkStats.min;
504 
505  // Update stats on buffer so it is saved in cache
506  shared::get_from_map(required_buffers, data_chunk_key)
507  ->getEncoder()
508  ->resetChunkStats(cached_metadata->chunkStats);
509  }
510  }
511 }
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
virtual int8_t * getMemoryPtr()=0
#define CHECK_GT(x, y)
Definition: Logger.h:235
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
int get_physical_cols() const
Definition: sqltypes.h:360
static SysCatalog & instance()
Definition: SysCatalog.h:337
T const end
Definition: Intervals.h:68
RenderGroupAnalyzerMap render_group_analyzer_map_
void initializeChunkBuffers(const int fragment_index, const Interval< ColumnType > &column_interval, const ChunkToBufferMap &required_buffers, const bool reserve_buffers_and_set_stats=false)
specifies the content in-memory of a row in the column metadata table
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
V & get_from_map(std::map< K, V, comp > &map, const K &key)
Definition: misc.h:62
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
HOST DEVICE int get_comp_param() const
Definition: sqltypes.h:338
#define CHECK(condition)
Definition: Logger.h:223
bool is_geometry() const
Definition: sqltypes.h:522
bool is_dict_encoded_string() const
Definition: sqltypes.h:548
SQLTypeInfo columnType
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:865
bool is_array() const
Definition: sqltypes.h:518

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::metadataScanFiles ( const std::vector< std::string > &  file_paths)
private

Definition at line 320 of file ParquetDataWrapper.cpp.

References addNewFile(), addNewFragment(), CHECK, CHECK_EQ, chunk_metadata_map_, db_id_, do_metadata_stats_validation_, file_reader_cache_, file_system_, finalizeFragmentMap(), foreign_table_, isNewFile(), last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::LazyParquetChunkLoader::metadataScan(), moveToNextFragment(), foreign_storage::anonymous_namespace{ParquetDataWrapper.cpp}::reduce_metadata(), schema_, TableDescriptor::tableId, and total_row_count_.

Referenced by fetchChunkMetadata().

320  {
321  LazyParquetChunkLoader chunk_loader(file_system_, file_reader_cache_.get(), nullptr);
322  auto row_group_metadata =
323  chunk_loader.metadataScan(file_paths, *schema_, do_metadata_stats_validation_);
324  auto column_interval =
325  Interval<ColumnType>{schema_->getLogicalAndPhysicalColumns().front()->columnId,
326  schema_->getLogicalAndPhysicalColumns().back()->columnId};
327 
328  for (const auto& row_group_metadata_item : row_group_metadata) {
329  const auto& column_chunk_metadata = row_group_metadata_item.column_chunk_metadata;
330  CHECK(static_cast<int>(column_chunk_metadata.size()) ==
331  schema_->numLogicalAndPhysicalColumns());
332  auto column_chunk_metadata_iter = column_chunk_metadata.begin();
333  const int64_t import_row_count = (*column_chunk_metadata_iter)->numElements;
334  int row_group = row_group_metadata_item.row_group_index;
335  const auto& file_path = row_group_metadata_item.file_path;
336  if (moveToNextFragment(import_row_count)) {
337  addNewFragment(row_group, file_path);
338  } else if (isNewFile(file_path)) {
339  CHECK_EQ(row_group, 0);
340  addNewFile(file_path);
341  }
342  last_row_group_ = row_group;
343 
344  for (int column_id = column_interval.start; column_id <= column_interval.end;
345  column_id++, column_chunk_metadata_iter++) {
346  CHECK(column_chunk_metadata_iter != column_chunk_metadata.end());
347  const auto column_descriptor = schema_->getColumnDescriptor(column_id);
348 
349  const auto& type_info = column_descriptor->columnType;
350  ChunkKey chunk_key{
352  ChunkKey data_chunk_key = chunk_key;
353  if (type_info.is_varlen_indeed()) {
354  data_chunk_key.emplace_back(1);
355  }
356  std::shared_ptr<ChunkMetadata> chunk_metadata = *column_chunk_metadata_iter;
357  if (chunk_metadata_map_.find(data_chunk_key) == chunk_metadata_map_.end()) {
358  chunk_metadata_map_[data_chunk_key] = chunk_metadata;
359  } else {
360  reduce_metadata(chunk_metadata_map_[data_chunk_key], chunk_metadata);
361  }
362  }
363  last_fragment_row_count_ += import_row_count;
364  total_row_count_ += import_row_count;
365  }
367 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::vector< int > ChunkKey
Definition: types.h:37
std::unique_ptr< FileReaderMap > file_reader_cache_
std::unique_ptr< ForeignTableSchema > schema_
void reduce_metadata(std::shared_ptr< ChunkMetadata > reduce_to, std::shared_ptr< ChunkMetadata > reduce_from)
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void addNewFile(const std::string &file_path)
void addNewFragment(int row_group, const std::string &file_path)
bool isNewFile(const std::string &file_path) const
bool moveToNextFragment(size_t new_rows_count) const
std::shared_ptr< arrow::fs::FileSystem > file_system_
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

bool foreign_storage::ParquetDataWrapper::moveToNextFragment ( size_t  new_rows_count) const
private

Definition at line 369 of file ParquetDataWrapper.cpp.

References foreign_table_, last_fragment_row_count_, and TableDescriptor::maxFragRows.

Referenced by metadataScanFiles().

369  {
370  return (last_fragment_row_count_ + new_rows_count) >
371  static_cast<size_t>(foreign_table_->maxFragRows);
372 }

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 513 of file ParquetDataWrapper.cpp.

References CHECK, CHECK_EQ, CHUNK_KEY_COLUMN_IDX, CHUNK_KEY_FRAGMENT_IDX, foreign_storage::create_futures_for_workers(), g_max_import_threads, loadBuffersUsingLazyParquetChunkLoader(), and schema_.

515  {
516  ChunkToBufferMap buffers_to_load;
517  buffers_to_load.insert(required_buffers.begin(), required_buffers.end());
518  buffers_to_load.insert(optional_buffers.begin(), optional_buffers.end());
519 
520  CHECK(!buffers_to_load.empty());
521 
522  std::set<ForeignStorageMgr::ParallelismHint> col_frag_hints;
523  for (const auto& [chunk_key, buffer] : buffers_to_load) {
524  CHECK_EQ(buffer->size(), static_cast<size_t>(0));
525  col_frag_hints.emplace(
526  schema_->getLogicalColumn(chunk_key[CHUNK_KEY_COLUMN_IDX])->columnId,
527  chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
528  }
529 
530  std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
531  [&, this](const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
532  for (const auto& [col_id, frag_id] : hint_set) {
534  col_id, frag_id, buffers_to_load, delete_buffer);
535  }
536  };
537  auto futures = create_futures_for_workers(col_frag_hints, g_max_import_threads, lambda);
538 
539  // We wait on all futures, then call get because we want all threads to have finished
540  // before we propagate a potential exception.
541  for (auto& future : futures) {
542  future.wait();
543  }
544 
545  for (auto& future : futures) {
546  future.get();
547  }
548 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::unique_ptr< ForeignTableSchema > schema_
#define CHUNK_KEY_FRAGMENT_IDX
Definition: types.h:42
std::map< ChunkKey, AbstractBuffer * > ChunkToBufferMap
std::vector< std::future< void > > create_futures_for_workers(const Container &items, size_t max_threads, std::function< void(const Container &)> lambda)
Definition: FsiChunkUtils.h:75
void loadBuffersUsingLazyParquetChunkLoader(const int logical_column_id, const int fragment_id, const ChunkToBufferMap &required_buffers, AbstractBuffer *delete_buffer)
#define CHECK(condition)
Definition: Logger.h:223
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
size_t g_max_import_threads
Definition: Importer.cpp:106

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 374 of file ParquetDataWrapper.cpp.

References chunk_metadata_map_, and fetchChunkMetadata().

375  {
377  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_map_) {
378  chunk_metadata_vector.emplace_back(chunk_key, chunk_metadata);
379  }
380 }
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_

+ Here is the call graph for this function:

void foreign_storage::ParquetDataWrapper::resetParquetMetadata ( )
private

Definition at line 113 of file ParquetDataWrapper.cpp.

References file_reader_cache_, fragment_to_row_group_interval_map_, last_fragment_index_, last_fragment_row_count_, last_row_group_, and total_row_count_.

Referenced by fetchChunkMetadata().

+ Here is the caller graph for this function:

void foreign_storage::ParquetDataWrapper::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 584 of file ParquetDataWrapper.cpp.

References CHECK, chunk_metadata_map_, fragment_to_row_group_interval_map_, foreign_storage::json_utils::get_value_from_object(), is_restored_, last_fragment_index_, last_fragment_row_count_, last_row_group_, foreign_storage::json_utils::read_from_file(), and total_row_count_.

586  {
587  auto d = json_utils::read_from_file(file_path);
588  CHECK(d.IsObject());
589 
591  d, fragment_to_row_group_interval_map_, "fragment_to_row_group_interval_map");
593  json_utils::get_value_from_object(d, last_fragment_index_, "last_fragment_index");
595  d, last_fragment_row_count_, "last_fragment_row_count");
597 
598  CHECK(chunk_metadata_map_.empty());
599  for (const auto& [chunk_key, chunk_metadata] : chunk_metadata_vector) {
600  chunk_metadata_map_[chunk_key] = chunk_metadata;
601  }
602  is_restored_ = true;
603 }
std::map< int, std::vector< RowGroupInterval > > fragment_to_row_group_interval_map_
std::map< ChunkKey, std::shared_ptr< ChunkMetadata > > chunk_metadata_map_
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:164
rapidjson::Document read_from_file(const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:223

+ Here is the call graph for this function:

Member Data Documentation

std::map<ChunkKey, std::shared_ptr<ChunkMetadata> > foreign_storage::ParquetDataWrapper::chunk_metadata_map_
private
const int foreign_storage::ParquetDataWrapper::db_id_
private
std::mutex foreign_storage::ParquetDataWrapper::delete_buffer_mutex_
private

Definition at line 121 of file ParquetDataWrapper.h.

Referenced by loadBuffersUsingLazyParquetChunkLoader().

const bool foreign_storage::ParquetDataWrapper::do_metadata_stats_validation_
private

Definition at line 107 of file ParquetDataWrapper.h.

Referenced by metadataScanFiles().

std::unique_ptr<FileReaderMap> foreign_storage::ParquetDataWrapper::file_reader_cache_
private
std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetDataWrapper::file_system_
private
std::map<int, std::vector<RowGroupInterval> > foreign_storage::ParquetDataWrapper::fragment_to_row_group_interval_map_
private
bool foreign_storage::ParquetDataWrapper::is_restored_
private

Definition at line 116 of file ParquetDataWrapper.h.

Referenced by isRestored(), and restoreDataWrapperInternals().

int foreign_storage::ParquetDataWrapper::last_fragment_index_
private
size_t foreign_storage::ParquetDataWrapper::last_fragment_row_count_
private
int foreign_storage::ParquetDataWrapper::last_row_group_
private
RenderGroupAnalyzerMap foreign_storage::ParquetDataWrapper::render_group_analyzer_map_
private
std::unique_ptr<ForeignTableSchema> foreign_storage::ParquetDataWrapper::schema_
private
size_t foreign_storage::ParquetDataWrapper::total_row_count_
private

The documentation for this class was generated from the following files: