OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::ParquetImporter Class Reference

#include <ParquetImporter.h>

+ Inheritance diagram for foreign_storage::ParquetImporter:
+ Collaboration diagram for foreign_storage::ParquetImporter:

Public Member Functions

 ParquetImporter ()
 
 ParquetImporter (const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
std::unique_ptr
< import_export::ImportBatchResult
getNextImportBatch ()
 
std::vector< std::pair< const
ColumnDescriptor
*, StringDictionary * > > 
getStringDictionaries () const
 
int getMaxNumUsefulThreads () const
 
void setNumThreads (const int num_threads)
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
const std::set< std::string > getAlterableTableOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 
virtual bool isLazyFragmentFetchingEnabled () const
 

Private Member Functions

std::set< std::string > getAllFilePaths ()
 

Private Attributes

const int db_id_
 
const ForeignTableforeign_table_
 
int num_threads_
 
std::unique_ptr
< AbstractRowGroupIntervalTracker
row_group_interval_tracker_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 
std::vector< std::pair< const
ColumnDescriptor
*, StringDictionary * > > 
string_dictionaries_per_column_
 
std::shared_mutex row_group_interval_tracker_mutex_
 
std::shared_mutex string_dictionaries_per_column_mutex_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static shared::FilePathOptions getFilePathOptions (const ForeignTable *foreign_table)
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::string ALLOW_FILE_ROLL_OFF_KEY = "ALLOW_FILE_ROLL_OFF"
 
static const std::string THREADS_KEY = "THREADS"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 
static bool allowFileRollOff (const ForeignTable *foreign_table)
 

Detailed Description

Definition at line 40 of file ParquetImporter.h.

Constructor & Destructor Documentation

foreign_storage::ParquetImporter::ParquetImporter ( )

Definition at line 226 of file ParquetImporter.cpp.

foreign_storage::ParquetImporter::ParquetImporter ( const int  db_id,
const ForeignTable foreign_table,
const UserMapping user_mapping 
)

Definition at line 229 of file ParquetImporter.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

232  : db_id_(db_id)
233  , foreign_table_(foreign_table)
234  , num_threads_(1)
235  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
236  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
237  auto& server_options = foreign_table->foreign_server->options;
238  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
239  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
240  } else {
241  UNREACHABLE();
242  }
243 }
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:338
const ForeignTable * foreign_table_
std::unique_ptr< FileReaderMap > file_reader_cache_

Member Function Documentation

std::set< std::string > foreign_storage::ParquetImporter::getAllFilePaths ( )
private

Definition at line 245 of file ParquetImporter.cpp.

References CHECK_EQ, DEBUG_TIMER, file_system_, foreign_table_, foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::throw_file_access_error(), and foreign_storage::throw_file_not_found_error().

Referenced by getNextImportBatch().

245  {
246  auto timer = DEBUG_TIMER(__func__);
247  std::set<std::string> file_paths;
248  auto file_path = getFullFilePath(foreign_table_);
249  arrow::Result<arrow::fs::FileInfo> file_info_result;
250  arrow::Result<std::vector<arrow::fs::FileInfo>> selector_result;
251  {
252  auto get_file_info_timer = DEBUG_TIMER("GetFileInfo-file_info");
253  file_info_result = file_system_->GetFileInfo(file_path);
254  }
255  if (!file_info_result.ok()) {
256  throw_file_access_error(file_path, file_info_result.status().message());
257  } else {
258  auto& file_info = file_info_result.ValueOrDie();
259  if (file_info.type() == arrow::fs::FileType::NotFound) {
260  throw_file_not_found_error(file_path);
261  } else if (file_info.type() == arrow::fs::FileType::File) {
262  file_paths.emplace(file_path);
263  } else {
264  CHECK_EQ(arrow::fs::FileType::Directory, file_info.type());
265  arrow::fs::FileSelector file_selector{};
266  file_selector.base_dir = file_path;
267  file_selector.recursive = true;
268  {
269  auto get_file_info_timer = DEBUG_TIMER("GetFileInfo-selector");
270  selector_result = file_system_->GetFileInfo(file_selector);
271  }
272  if (!selector_result.ok()) {
273  throw_file_access_error(file_path, selector_result.status().message());
274  } else {
275  auto& file_info_vector = selector_result.ValueOrDie();
276  for (const auto& file_info : file_info_vector) {
277  if (file_info.type() == arrow::fs::FileType::File) {
278  file_paths.emplace(file_info.path());
279  }
280  }
281  }
282  }
283  }
284  return file_paths;
285 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::shared_ptr< arrow::fs::FileSystem > file_system_
void throw_file_access_error(const std::string &file_path, const std::string &message)
void throw_file_not_found_error(const std::string &file_path)
const ForeignTable * foreign_table_
#define DEBUG_TIMER(name)
Definition: Logger.h:412
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetImporter::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 62 of file ParquetImporter.h.

References UNREACHABLE.

62  {
63  UNREACHABLE();
64  return {};
65  }
#define UNREACHABLE()
Definition: Logger.h:338
int foreign_storage::ParquetImporter::getMaxNumUsefulThreads ( ) const

Get the maximum number of threads that can do useful computation.

Definition at line 218 of file ParquetImporter.cpp.

References schema_.

218  {
219  return schema_->numLogicalColumns();
220 }
std::unique_ptr< ForeignTableSchema > schema_
std::unique_ptr< import_export::ImportBatchResult > foreign_storage::ParquetImporter::getNextImportBatch ( )

Produce the next ImportBatchResult for import. This is the only functionality of ParquetImporter that is required to be implemented.

Returns
a ImportBatchResult for import.

Definition at line 307 of file ParquetImporter.cpp.

References db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), num_threads_, row_group_interval_tracker_, row_group_interval_tracker_mutex_, schema_, string_dictionaries_per_column_, and string_dictionaries_per_column_mutex_.

307  {
308  {
309  std::unique_lock row_group_interval_tracker_lock(row_group_interval_tracker_mutex_);
311  row_group_interval_tracker_ = std::make_unique<RowGroupIntervalTracker>(
313  }
314  }
315 
316  auto import_batch_result =
317  std::make_unique<ParquetImportBatchResult>(foreign_table_, db_id_, schema_.get());
318  auto [chunks, string_dictionaries] = import_batch_result->getChunksAndDictionaries();
319 
320  {
321  std::unique_lock string_dictionaries_per_column_lock(
323  if (!string_dictionaries_per_column_.size()) {
324  for (const auto& [column_id, dict] : string_dictionaries) {
325  string_dictionaries_per_column_.emplace_back(chunks[column_id].getColumnDesc(),
326  dict);
327  }
328  }
329  }
330 
331  LazyParquetChunkLoader chunk_loader(
333 
334  std::optional<RowGroupInterval> next_row_group;
335  {
336  std::unique_lock row_group_interval_tracker_lock(row_group_interval_tracker_mutex_);
337  next_row_group = row_group_interval_tracker_->getNextRowGroupInterval();
338  }
339  size_t num_rows_completed, num_rows_rejected;
340  if (next_row_group.has_value()) {
341  std::tie(num_rows_completed, num_rows_rejected) = chunk_loader.loadRowGroups(
342  *next_row_group, chunks, *schema_, string_dictionaries, num_threads_);
343  } else {
344  return import_batch_result; // terminate without populating data, read the last row
345  // group
346  }
347 
348  import_batch_result->populateImportStatus(num_rows_completed, num_rows_rejected);
349  import_batch_result->populateInsertData(chunks);
350 
351  return import_batch_result;
352 }
std::set< std::string > getAllFilePaths()
std::shared_mutex row_group_interval_tracker_mutex_
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::unique_ptr< AbstractRowGroupIntervalTracker > row_group_interval_tracker_
std::unique_ptr< ForeignTableSchema > schema_
std::shared_mutex string_dictionaries_per_column_mutex_
std::unique_lock< T > unique_lock
const ForeignTable * foreign_table_
std::unique_ptr< FileReaderMap > file_reader_cache_
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::ParquetImporter::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 67 of file ParquetImporter.h.

References UNREACHABLE.

67  {
68  UNREACHABLE();
69  return {};
70  }
#define UNREACHABLE()
Definition: Logger.h:338
std::string foreign_storage::ParquetImporter::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 297 of file ParquetImporter.cpp.

References UNREACHABLE.

297  {
298  UNREACHABLE();
299  return {};
300 }
#define UNREACHABLE()
Definition: Logger.h:338
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > foreign_storage::ParquetImporter::getStringDictionaries ( ) const

Return string dictionaries that are used per column.

Returns
a vector of StringDictionary and ColumnDescriptor pairs

Definition at line 303 of file ParquetImporter.cpp.

References string_dictionaries_per_column_.

303  {
305 }
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_
bool foreign_storage::ParquetImporter::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 360 of file ParquetImporter.cpp.

References UNREACHABLE.

360  {
361  UNREACHABLE();
362  return {};
363 }
#define UNREACHABLE()
Definition: Logger.h:338
void foreign_storage::ParquetImporter::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 291 of file ParquetImporter.cpp.

References UNREACHABLE.

293  {
294  UNREACHABLE();
295 }
#define UNREACHABLE()
Definition: Logger.h:338
void foreign_storage::ParquetImporter::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 287 of file ParquetImporter.cpp.

References UNREACHABLE.

287  {
288  UNREACHABLE();
289 }
#define UNREACHABLE()
Definition: Logger.h:338
void foreign_storage::ParquetImporter::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 354 of file ParquetImporter.cpp.

References UNREACHABLE.

356  {
357  UNREACHABLE();
358 }
#define UNREACHABLE()
Definition: Logger.h:338
void foreign_storage::ParquetImporter::setNumThreads ( const int  num_threads)

Set the number of threads to use internally when reading batches.

Definition at line 222 of file ParquetImporter.cpp.

References num_threads_.

222  {
223  num_threads_ = num_threads;
224 }

Member Data Documentation

const int foreign_storage::ParquetImporter::db_id_
private

Definition at line 99 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::unique_ptr<FileReaderMap> foreign_storage::ParquetImporter::file_reader_cache_
private

Definition at line 109 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetImporter::file_system_
private

Definition at line 108 of file ParquetImporter.h.

Referenced by getAllFilePaths(), getNextImportBatch(), and ParquetImporter().

const ForeignTable* foreign_storage::ParquetImporter::foreign_table_
private

Definition at line 100 of file ParquetImporter.h.

Referenced by getAllFilePaths(), and getNextImportBatch().

int foreign_storage::ParquetImporter::num_threads_
private

Definition at line 101 of file ParquetImporter.h.

Referenced by getNextImportBatch(), and setNumThreads().

std::unique_ptr<AbstractRowGroupIntervalTracker> foreign_storage::ParquetImporter::row_group_interval_tracker_
private

Definition at line 105 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::shared_mutex foreign_storage::ParquetImporter::row_group_interval_tracker_mutex_
private

Definition at line 113 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::unique_ptr<ForeignTableSchema> foreign_storage::ParquetImporter::schema_
private

Definition at line 107 of file ParquetImporter.h.

Referenced by getMaxNumUsefulThreads(), and getNextImportBatch().

std::vector<std::pair<const ColumnDescriptor*, StringDictionary*> > foreign_storage::ParquetImporter::string_dictionaries_per_column_
private

Definition at line 111 of file ParquetImporter.h.

Referenced by getNextImportBatch(), and getStringDictionaries().

std::shared_mutex foreign_storage::ParquetImporter::string_dictionaries_per_column_mutex_
private

Definition at line 114 of file ParquetImporter.h.

Referenced by getNextImportBatch().


The documentation for this class was generated from the following files: