OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::ParquetImporter Class Reference

#include <ParquetImporter.h>

+ Inheritance diagram for foreign_storage::ParquetImporter:
+ Collaboration diagram for foreign_storage::ParquetImporter:

Public Member Functions

 ParquetImporter ()
 
 ParquetImporter (const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers, AbstractBuffer *delete_buffer) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
std::unique_ptr
< import_export::ImportBatchResult
getNextImportBatch ()
 
std::vector< std::pair< const
ColumnDescriptor
*, StringDictionary * > > 
getStringDictionaries () const
 
int getMaxNumUsefulThreads () const
 
void setNumThreads (const int num_threads)
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
const std::set< std::string > getAlterableTableOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 
virtual void createRenderGroupAnalyzers ()
 Create RenderGroupAnalyzers for poly columns. More...
 
virtual bool isLazyFragmentFetchingEnabled () const
 

Private Member Functions

std::set< std::string > getAllFilePaths ()
 

Private Attributes

const int db_id_
 
const ForeignTableforeign_table_
 
int num_threads_
 
std::unique_ptr
< AbstractRowGroupIntervalTracker
row_group_interval_tracker_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 
std::vector< std::pair< const
ColumnDescriptor
*, StringDictionary * > > 
string_dictionaries_per_column_
 
std::shared_mutex row_group_interval_tracker_mutex_
 
std::shared_mutex string_dictionaries_per_column_mutex_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static shared::FilePathOptions getFilePathOptions (const ForeignTable *foreign_table)
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::string ALLOW_FILE_ROLL_OFF_KEY = "ALLOW_FILE_ROLL_OFF"
 
static const std::string THREADS_KEY = "THREADS"
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 
static bool allowFileRollOff (const ForeignTable *foreign_table)
 

Detailed Description

Definition at line 40 of file ParquetImporter.h.

Constructor & Destructor Documentation

foreign_storage::ParquetImporter::ParquetImporter ( )

Definition at line 227 of file ParquetImporter.cpp.

foreign_storage::ParquetImporter::ParquetImporter ( const int  db_id,
const ForeignTable foreign_table,
const UserMapping user_mapping 
)

Definition at line 230 of file ParquetImporter.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

233  : db_id_(db_id)
234  , foreign_table_(foreign_table)
235  , num_threads_(1)
236  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
237  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
238  auto& server_options = foreign_table->foreign_server->options;
239  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
240  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
241  } else {
242  UNREACHABLE();
243  }
244 }
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:337
const ForeignTable * foreign_table_
std::unique_ptr< FileReaderMap > file_reader_cache_

Member Function Documentation

std::set< std::string > foreign_storage::ParquetImporter::getAllFilePaths ( )
private

Definition at line 246 of file ParquetImporter.cpp.

References CHECK_EQ, DEBUG_TIMER, file_system_, foreign_table_, foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::throw_file_access_error(), and foreign_storage::throw_file_not_found_error().

Referenced by getNextImportBatch().

246  {
247  auto timer = DEBUG_TIMER(__func__);
248  std::set<std::string> file_paths;
249  auto file_path = getFullFilePath(foreign_table_);
250  arrow::Result<arrow::fs::FileInfo> file_info_result;
251  arrow::Result<std::vector<arrow::fs::FileInfo>> selector_result;
252  {
253  auto get_file_info_timer = DEBUG_TIMER("GetFileInfo-file_info");
254  file_info_result = file_system_->GetFileInfo(file_path);
255  }
256  if (!file_info_result.ok()) {
257  throw_file_access_error(file_path, file_info_result.status().message());
258  } else {
259  auto& file_info = file_info_result.ValueOrDie();
260  if (file_info.type() == arrow::fs::FileType::NotFound) {
261  throw_file_not_found_error(file_path);
262  } else if (file_info.type() == arrow::fs::FileType::File) {
263  file_paths.emplace(file_path);
264  } else {
265  CHECK_EQ(arrow::fs::FileType::Directory, file_info.type());
266  arrow::fs::FileSelector file_selector{};
267  file_selector.base_dir = file_path;
268  file_selector.recursive = true;
269  {
270  auto get_file_info_timer = DEBUG_TIMER("GetFileInfo-selector");
271  selector_result = file_system_->GetFileInfo(file_selector);
272  }
273  if (!selector_result.ok()) {
274  throw_file_access_error(file_path, selector_result.status().message());
275  } else {
276  auto& file_info_vector = selector_result.ValueOrDie();
277  for (const auto& file_info : file_info_vector) {
278  if (file_info.type() == arrow::fs::FileType::File) {
279  file_paths.emplace(file_info.path());
280  }
281  }
282  }
283  }
284  }
285  return file_paths;
286 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::shared_ptr< arrow::fs::FileSystem > file_system_
void throw_file_access_error(const std::string &file_path, const std::string &message)
void throw_file_not_found_error(const std::string &file_path)
const ForeignTable * foreign_table_
#define DEBUG_TIMER(name)
Definition: Logger.h:411
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetImporter::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 62 of file ParquetImporter.h.

References UNREACHABLE.

62  {
63  UNREACHABLE();
64  return {};
65  }
#define UNREACHABLE()
Definition: Logger.h:337
int foreign_storage::ParquetImporter::getMaxNumUsefulThreads ( ) const

Get the maximum number of threads that can do useful computation.

Definition at line 219 of file ParquetImporter.cpp.

References schema_.

219  {
220  return schema_->numLogicalColumns();
221 }
std::unique_ptr< ForeignTableSchema > schema_
std::unique_ptr< import_export::ImportBatchResult > foreign_storage::ParquetImporter::getNextImportBatch ( )

Produce the next ImportBatchResult for import. This is the only functionality of ParquetImporter that is required to be implemented.

Returns
a ImportBatchResult for import.

Definition at line 308 of file ParquetImporter.cpp.

References db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), num_threads_, row_group_interval_tracker_, row_group_interval_tracker_mutex_, schema_, string_dictionaries_per_column_, and string_dictionaries_per_column_mutex_.

308  {
309  {
310  std::unique_lock row_group_interval_tracker_lock(row_group_interval_tracker_mutex_);
312  row_group_interval_tracker_ = std::make_unique<RowGroupIntervalTracker>(
314  }
315  }
316 
317  auto import_batch_result =
318  std::make_unique<ParquetImportBatchResult>(foreign_table_, db_id_, schema_.get());
319  auto [chunks, string_dictionaries] = import_batch_result->getChunksAndDictionaries();
320 
321  {
322  std::unique_lock string_dictionaries_per_column_lock(
324  if (!string_dictionaries_per_column_.size()) {
325  for (const auto& [column_id, dict] : string_dictionaries) {
326  string_dictionaries_per_column_.emplace_back(chunks[column_id].getColumnDesc(),
327  dict);
328  }
329  }
330  }
331 
332  // this code path is deprecated and does not need a RenderGroupAnalyzerMap
333  LazyParquetChunkLoader chunk_loader(
334  file_system_, file_reader_cache_.get(), nullptr, foreign_table_->tableName);
335 
336  std::optional<RowGroupInterval> next_row_group;
337  {
338  std::unique_lock row_group_interval_tracker_lock(row_group_interval_tracker_mutex_);
339  next_row_group = row_group_interval_tracker_->getNextRowGroupInterval();
340  }
341  size_t num_rows_completed, num_rows_rejected;
342  if (next_row_group.has_value()) {
343  std::tie(num_rows_completed, num_rows_rejected) = chunk_loader.loadRowGroups(
344  *next_row_group, chunks, *schema_, string_dictionaries, num_threads_);
345  } else {
346  return import_batch_result; // terminate without populating data, read the last row
347  // group
348  }
349 
350  import_batch_result->populateImportStatus(num_rows_completed, num_rows_rejected);
351  import_batch_result->populateInsertData(chunks);
352 
353  return import_batch_result;
354 }
std::set< std::string > getAllFilePaths()
std::shared_mutex row_group_interval_tracker_mutex_
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::unique_ptr< AbstractRowGroupIntervalTracker > row_group_interval_tracker_
std::unique_ptr< ForeignTableSchema > schema_
std::shared_mutex string_dictionaries_per_column_mutex_
std::unique_lock< T > unique_lock
const ForeignTable * foreign_table_
std::unique_ptr< FileReaderMap > file_reader_cache_
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::ParquetImporter::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 67 of file ParquetImporter.h.

References UNREACHABLE.

67  {
68  UNREACHABLE();
69  return {};
70  }
#define UNREACHABLE()
Definition: Logger.h:337
std::string foreign_storage::ParquetImporter::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 298 of file ParquetImporter.cpp.

References UNREACHABLE.

298  {
299  UNREACHABLE();
300  return {};
301 }
#define UNREACHABLE()
Definition: Logger.h:337
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > foreign_storage::ParquetImporter::getStringDictionaries ( ) const

Return string dictionaries that are used per column.

Returns
a vector of StringDictionary and ColumnDescriptor pairs

Definition at line 304 of file ParquetImporter.cpp.

References string_dictionaries_per_column_.

304  {
306 }
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_
bool foreign_storage::ParquetImporter::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 362 of file ParquetImporter.cpp.

References UNREACHABLE.

362  {
363  UNREACHABLE();
364  return {};
365 }
#define UNREACHABLE()
Definition: Logger.h:337
void foreign_storage::ParquetImporter::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers,
AbstractBuffer delete_buffer 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)
delete_buffer- chunk buffer for fragment's delete column, if non-null data wrapper is expected to mark deleted rows in buffer and continue processing

Implements foreign_storage::ForeignDataWrapper.

Definition at line 292 of file ParquetImporter.cpp.

References UNREACHABLE.

294  {
295  UNREACHABLE();
296 }
#define UNREACHABLE()
Definition: Logger.h:337
void foreign_storage::ParquetImporter::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 288 of file ParquetImporter.cpp.

References UNREACHABLE.

288  {
289  UNREACHABLE();
290 }
#define UNREACHABLE()
Definition: Logger.h:337
void foreign_storage::ParquetImporter::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 356 of file ParquetImporter.cpp.

References UNREACHABLE.

358  {
359  UNREACHABLE();
360 }
#define UNREACHABLE()
Definition: Logger.h:337
void foreign_storage::ParquetImporter::setNumThreads ( const int  num_threads)

Set the number of threads to use internally when reading batches.

Definition at line 223 of file ParquetImporter.cpp.

References num_threads_.

223  {
224  num_threads_ = num_threads;
225 }

Member Data Documentation

const int foreign_storage::ParquetImporter::db_id_
private

Definition at line 99 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::unique_ptr<FileReaderMap> foreign_storage::ParquetImporter::file_reader_cache_
private

Definition at line 109 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetImporter::file_system_
private

Definition at line 108 of file ParquetImporter.h.

Referenced by getAllFilePaths(), getNextImportBatch(), and ParquetImporter().

const ForeignTable* foreign_storage::ParquetImporter::foreign_table_
private

Definition at line 100 of file ParquetImporter.h.

Referenced by getAllFilePaths(), and getNextImportBatch().

int foreign_storage::ParquetImporter::num_threads_
private

Definition at line 101 of file ParquetImporter.h.

Referenced by getNextImportBatch(), and setNumThreads().

std::unique_ptr<AbstractRowGroupIntervalTracker> foreign_storage::ParquetImporter::row_group_interval_tracker_
private

Definition at line 105 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::shared_mutex foreign_storage::ParquetImporter::row_group_interval_tracker_mutex_
private

Definition at line 113 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::unique_ptr<ForeignTableSchema> foreign_storage::ParquetImporter::schema_
private

Definition at line 107 of file ParquetImporter.h.

Referenced by getMaxNumUsefulThreads(), and getNextImportBatch().

std::vector<std::pair<const ColumnDescriptor*, StringDictionary*> > foreign_storage::ParquetImporter::string_dictionaries_per_column_
private

Definition at line 111 of file ParquetImporter.h.

Referenced by getNextImportBatch(), and getStringDictionaries().

std::shared_mutex foreign_storage::ParquetImporter::string_dictionaries_per_column_mutex_
private

Definition at line 114 of file ParquetImporter.h.

Referenced by getNextImportBatch().


The documentation for this class was generated from the following files: