OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::ParquetImporter Class Reference

#include <ParquetImporter.h>

+ Inheritance diagram for foreign_storage::ParquetImporter:
+ Collaboration diagram for foreign_storage::ParquetImporter:

Public Member Functions

 ParquetImporter ()
 
 ParquetImporter (const int db_id, const ForeignTable *foreign_table, const UserMapping *user_mapping)
 
void populateChunkMetadata (ChunkMetadataVector &chunk_metadata_vector) override
 
void populateChunkBuffers (const ChunkToBufferMap &required_buffers, const ChunkToBufferMap &optional_buffers) override
 
std::string getSerializedDataWrapper () const override
 
void restoreDataWrapperInternals (const std::string &file_path, const ChunkMetadataVector &chunk_metadata_vector) override
 
bool isRestored () const override
 
ParallelismLevel getCachedParallelismLevel () const override
 
ParallelismLevel getNonCachedParallelismLevel () const override
 
std::unique_ptr
< import_export::ImportBatchResult
getNextImportBatch ()
 
std::vector< std::pair< const
ColumnDescriptor
*, StringDictionary * > > 
getStringDictionaries () const
 
- Public Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
 AbstractFileStorageDataWrapper ()
 
void validateServerOptions (const ForeignServer *foreign_server) const override
 
void validateTableOptions (const ForeignTable *foreign_table) const override
 
const std::set
< std::string_view > & 
getSupportedTableOptions () const override
 
void validateUserMappingOptions (const UserMapping *user_mapping, const ForeignServer *foreign_server) const override
 
const std::set
< std::string_view > & 
getSupportedUserMappingOptions () const override
 
- Public Member Functions inherited from foreign_storage::ForeignDataWrapper
 ForeignDataWrapper ()=default
 
virtual ~ForeignDataWrapper ()=default
 
virtual void validateSchema (const std::list< ColumnDescriptor > &columns) const
 

Private Member Functions

std::set< std::string > getAllFilePaths ()
 

Private Attributes

const int db_id_
 
const ForeignTableforeign_table_
 
std::unique_ptr
< AbstractRowGroupIntervalTracker
row_group_interval_tracker_
 
std::unique_ptr
< ForeignTableSchema
schema_
 
std::shared_ptr
< arrow::fs::FileSystem > 
file_system_
 
std::unique_ptr< FileReaderMapfile_reader_cache_
 
std::vector< std::pair< const
ColumnDescriptor
*, StringDictionary * > > 
string_dictionaries_per_column_
 

Additional Inherited Members

- Public Types inherited from foreign_storage::ForeignDataWrapper
enum  ParallelismLevel { NONE, INTRA_FRAGMENT, INTER_FRAGMENT }
 
- Static Public Attributes inherited from foreign_storage::AbstractFileStorageDataWrapper
static const std::string STORAGE_TYPE_KEY = "STORAGE_TYPE"
 
static const std::string BASE_PATH_KEY = "BASE_PATH"
 
static const std::string FILE_PATH_KEY = "FILE_PATH"
 
static const std::string REGEX_PATH_FILTER_KEY = "REGEX_PATH_FILTER"
 
static const std::string LOCAL_FILE_STORAGE_TYPE = "LOCAL_FILE"
 
static const std::string S3_STORAGE_TYPE = "AWS_S3"
 
static const std::string FILE_SORT_ORDER_BY_KEY = shared::FILE_SORT_ORDER_BY_KEY
 
static const std::string FILE_SORT_REGEX_KEY = shared::FILE_SORT_REGEX_KEY
 
static const std::array
< std::string, 1 > 
supported_storage_types
 
- Static Protected Member Functions inherited from foreign_storage::AbstractFileStorageDataWrapper
static std::string getFullFilePath (const ForeignTable *foreign_table)
 Returns the path to the source file/dir of the table. Depending on options this may result from a concatenation of server and table path options. More...
 

Detailed Description

Definition at line 42 of file ParquetImporter.h.

Constructor & Destructor Documentation

foreign_storage::ParquetImporter::ParquetImporter ( )

Definition at line 213 of file ParquetImporter.cpp.

213 : db_id_(-1), foreign_table_(nullptr) {}
const ForeignTable * foreign_table_
foreign_storage::ParquetImporter::ParquetImporter ( const int  db_id,
const ForeignTable foreign_table,
const UserMapping user_mapping 
)

Definition at line 215 of file ParquetImporter.cpp.

References file_system_, foreign_storage::ForeignTable::foreign_server, foreign_storage::AbstractFileStorageDataWrapper::LOCAL_FILE_STORAGE_TYPE, foreign_storage::OptionsContainer::options, foreign_storage::AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY, and UNREACHABLE.

218  : db_id_(db_id)
219  , foreign_table_(foreign_table)
220  , schema_(std::make_unique<ForeignTableSchema>(db_id, foreign_table))
221  , file_reader_cache_(std::make_unique<FileReaderMap>()) {
222  auto& server_options = foreign_table->foreign_server->options;
223  if (server_options.find(STORAGE_TYPE_KEY)->second == LOCAL_FILE_STORAGE_TYPE) {
224  file_system_ = std::make_shared<arrow::fs::LocalFileSystem>();
225  } else {
226  UNREACHABLE();
227  }
228 }
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::unique_ptr< ForeignTableSchema > schema_
#define UNREACHABLE()
Definition: Logger.h:253
const ForeignTable * foreign_table_
std::unique_ptr< FileReaderMap > file_reader_cache_

Member Function Documentation

std::set< std::string > foreign_storage::ParquetImporter::getAllFilePaths ( )
private

Definition at line 230 of file ParquetImporter.cpp.

References CHECK_EQ, DEBUG_TIMER, file_system_, foreign_table_, foreign_storage::AbstractFileStorageDataWrapper::getFullFilePath(), foreign_storage::throw_file_access_error(), and foreign_storage::throw_file_not_found_error().

Referenced by getNextImportBatch().

230  {
231  auto timer = DEBUG_TIMER(__func__);
232  std::set<std::string> file_paths;
233  auto file_path = getFullFilePath(foreign_table_);
234  auto file_info_result = file_system_->GetFileInfo(file_path);
235  if (!file_info_result.ok()) {
236  throw_file_access_error(file_path, file_info_result.status().message());
237  } else {
238  auto& file_info = file_info_result.ValueOrDie();
239  if (file_info.type() == arrow::fs::FileType::NotFound) {
240  throw_file_not_found_error(file_path);
241  } else if (file_info.type() == arrow::fs::FileType::File) {
242  file_paths.emplace(file_path);
243  } else {
244  CHECK_EQ(arrow::fs::FileType::Directory, file_info.type());
245  arrow::fs::FileSelector file_selector{};
246  file_selector.base_dir = file_path;
247  file_selector.recursive = true;
248  auto selector_result = file_system_->GetFileInfo(file_selector);
249  if (!selector_result.ok()) {
250  throw_file_access_error(file_path, selector_result.status().message());
251  } else {
252  auto& file_info_vector = selector_result.ValueOrDie();
253  for (const auto& file_info : file_info_vector) {
254  if (file_info.type() == arrow::fs::FileType::File) {
255  file_paths.emplace(file_info.path());
256  }
257  }
258  }
259  }
260  }
261  return file_paths;
262 }
#define CHECK_EQ(x, y)
Definition: Logger.h:217
std::shared_ptr< arrow::fs::FileSystem > file_system_
void throw_file_access_error(const std::string &file_path, const std::string &message)
void throw_file_not_found_error(const std::string &file_path)
const ForeignTable * foreign_table_
#define DEBUG_TIMER(name)
Definition: Logger.h:352
static std::string getFullFilePath(const ForeignTable *foreign_table)
Returns the path to the source file/dir of the table. Depending on options this may result from a con...

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

ParallelismLevel foreign_storage::ParquetImporter::getCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when a cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 63 of file ParquetImporter.h.

References UNREACHABLE.

63  {
64  UNREACHABLE();
65  return {};
66  }
#define UNREACHABLE()
Definition: Logger.h:253
std::unique_ptr< import_export::ImportBatchResult > foreign_storage::ParquetImporter::getNextImportBatch ( )

Produce the next ImportBatchResult for import. This is the only functionality of ParquetImporter that is required to be implemented.

Returns
a ImportBatchResult for import.

Definition at line 283 of file ParquetImporter.cpp.

References db_id_, file_reader_cache_, file_system_, foreign_table_, getAllFilePaths(), row_group_interval_tracker_, schema_, and string_dictionaries_per_column_.

283  {
285  row_group_interval_tracker_ = std::make_unique<RowGroupIntervalTracker>(
287  }
288 
289  auto import_batch_result =
290  std::make_unique<ParquetImportBatchResult>(foreign_table_, db_id_, schema_.get());
291  auto [chunks, string_dictionaries] = import_batch_result->getChunksAndDictionaries();
292  if (!string_dictionaries_per_column_.size()) {
293  for (const auto& [column_id, dict] : string_dictionaries) {
294  string_dictionaries_per_column_.emplace_back(chunks[column_id].getColumnDesc(),
295  dict);
296  }
297  }
298 
299  LazyParquetChunkLoader chunk_loader(file_system_, file_reader_cache_.get());
300 
301  auto next_row_group = row_group_interval_tracker_->getNextRowGroupInterval();
302  size_t num_rows_completed, num_rows_rejected;
303  if (next_row_group.has_value()) {
304  std::tie(num_rows_completed, num_rows_rejected) = chunk_loader.loadRowGroups(
305  *next_row_group, chunks, *schema_, string_dictionaries);
306  } else {
307  return import_batch_result; // terminate without populating data, read the last row
308  // group
309  }
310 
311  import_batch_result->populateImportStatus(num_rows_completed, num_rows_rejected);
312  import_batch_result->populateInsertData(chunks);
313 
314  return import_batch_result;
315 }
std::set< std::string > getAllFilePaths()
std::shared_ptr< arrow::fs::FileSystem > file_system_
std::unique_ptr< AbstractRowGroupIntervalTracker > row_group_interval_tracker_
std::unique_ptr< ForeignTableSchema > schema_
const ForeignTable * foreign_table_
std::unique_ptr< FileReaderMap > file_reader_cache_
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_

+ Here is the call graph for this function:

ParallelismLevel foreign_storage::ParquetImporter::getNonCachedParallelismLevel ( ) const
inlineoverridevirtual

Gets the desired level of parallelism for the data wrapper when no cache is in use. This affects the optional buffers that the data wrapper is made aware of during data requests.

Reimplemented from foreign_storage::ForeignDataWrapper.

Definition at line 68 of file ParquetImporter.h.

References UNREACHABLE.

68  {
69  UNREACHABLE();
70  return {};
71  }
#define UNREACHABLE()
Definition: Logger.h:253
std::string foreign_storage::ParquetImporter::getSerializedDataWrapper ( ) const
overridevirtual

Serialize internal state of wrapper into file at given path if implemented

Implements foreign_storage::ForeignDataWrapper.

Definition at line 273 of file ParquetImporter.cpp.

References UNREACHABLE.

273  {
274  UNREACHABLE();
275  return {};
276 }
#define UNREACHABLE()
Definition: Logger.h:253
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > foreign_storage::ParquetImporter::getStringDictionaries ( ) const

Return string dictionaries that are used per column.

Returns
a vector of StringDictionary and ColumnDescriptor pairs

Definition at line 279 of file ParquetImporter.cpp.

References string_dictionaries_per_column_.

279  {
281 }
std::vector< std::pair< const ColumnDescriptor *, StringDictionary * > > string_dictionaries_per_column_
bool foreign_storage::ParquetImporter::isRestored ( ) const
overridevirtual

Implements foreign_storage::ForeignDataWrapper.

Definition at line 323 of file ParquetImporter.cpp.

References UNREACHABLE.

323  {
324  UNREACHABLE();
325  return {};
326 }
#define UNREACHABLE()
Definition: Logger.h:253
void foreign_storage::ParquetImporter::populateChunkBuffers ( const ChunkToBufferMap required_buffers,
const ChunkToBufferMap optional_buffers 
)
overridevirtual

Populates given chunk buffers identified by chunk keys. All provided chunk buffers are expected to be for the same fragment.

Parameters
required_buffers- chunk buffers that must always be populated
optional_buffers- chunk buffers that can be optionally populated, if the data wrapper has to scan through chunk data anyways (typically for row wise data formats)

Implements foreign_storage::ForeignDataWrapper.

Definition at line 268 of file ParquetImporter.cpp.

References UNREACHABLE.

269  {
270  UNREACHABLE();
271 }
#define UNREACHABLE()
Definition: Logger.h:253
void foreign_storage::ParquetImporter::populateChunkMetadata ( ChunkMetadataVector chunk_metadata_vector)
overridevirtual

Populates given chunk metadata vector with metadata for all chunks in related foreign table.

Parameters
chunk_metadata_vector- vector that will be populated with chunk metadata

Implements foreign_storage::ForeignDataWrapper.

Definition at line 264 of file ParquetImporter.cpp.

References UNREACHABLE.

264  {
265  UNREACHABLE();
266 }
#define UNREACHABLE()
Definition: Logger.h:253
void foreign_storage::ParquetImporter::restoreDataWrapperInternals ( const std::string &  file_path,
const ChunkMetadataVector chunk_metadata 
)
overridevirtual

Restore internal state of datawrapper

Parameters
file_path- location of file created by serializeMetadata
chunk_metadata_vector- vector of chunk metadata recovered from disk

Implements foreign_storage::ForeignDataWrapper.

Definition at line 317 of file ParquetImporter.cpp.

References UNREACHABLE.

319  {
320  UNREACHABLE();
321 }
#define UNREACHABLE()
Definition: Logger.h:253

Member Data Documentation

const int foreign_storage::ParquetImporter::db_id_
private

Definition at line 90 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::unique_ptr<FileReaderMap> foreign_storage::ParquetImporter::file_reader_cache_
private

Definition at line 99 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::shared_ptr<arrow::fs::FileSystem> foreign_storage::ParquetImporter::file_system_
private

Definition at line 98 of file ParquetImporter.h.

Referenced by getAllFilePaths(), getNextImportBatch(), and ParquetImporter().

const ForeignTable* foreign_storage::ParquetImporter::foreign_table_
private

Definition at line 91 of file ParquetImporter.h.

Referenced by getAllFilePaths(), and getNextImportBatch().

std::unique_ptr<AbstractRowGroupIntervalTracker> foreign_storage::ParquetImporter::row_group_interval_tracker_
private

Definition at line 95 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::unique_ptr<ForeignTableSchema> foreign_storage::ParquetImporter::schema_
private

Definition at line 97 of file ParquetImporter.h.

Referenced by getNextImportBatch().

std::vector<std::pair<const ColumnDescriptor*, StringDictionary*> > foreign_storage::ParquetImporter::string_dictionaries_per_column_
private

Definition at line 101 of file ParquetImporter.h.

Referenced by getNextImportBatch(), and getStringDictionaries().


The documentation for this class was generated from the following files: