OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::LocalMultiFileReader Class Reference

#include <FileReader.h>

+ Inheritance diagram for foreign_storage::LocalMultiFileReader:
+ Collaboration diagram for foreign_storage::LocalMultiFileReader:

Public Member Functions

 LocalMultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const shared::FilePathOptions &options, const std::optional< size_t > &max_file_count)
 
 LocalMultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
void checkForMoreRows (size_t file_offset, const shared::FilePathOptions &options, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
- Public Member Functions inherited from foreign_storage::MultiFileReader
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t getRemainingSize () override
 
bool isRemainingSizeKnown () override
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () const override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
FirstLineByFilePath getFirstLineForEachFile () const override
 
bool isEndOfLastFile () override
 
std::string getCurrentFilePath () const override
 
virtual std::set< std::string > checkForRolledOffFiles (const shared::FilePathOptions &file_path_options)
 
- Public Member Functions inherited from foreign_storage::FileReader
 FileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~FileReader ()=default
 

Private Member Functions

std::vector< std::string > getAllFilePaths (const shared::FilePathOptions &file_path_options) const override
 
void insertFile (std::string location)
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::MultiFileReader
std::vector< std::unique_ptr
< FileReader > > 
files_
 
std::vector< std::string > file_locations_
 
std::vector< size_t > cumulative_sizes_
 
size_t current_index_
 
size_t current_offset_
 
size_t starting_offset_
 
bool is_end_of_last_file_
 
- Protected Attributes inherited from foreign_storage::FileReader
import_export::CopyParams copy_params_
 
std::string file_path_
 

Detailed Description

Definition at line 396 of file FileReader.h.

Constructor & Destructor Documentation

foreign_storage::LocalMultiFileReader::LocalMultiFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const shared::FilePathOptions options,
const std::optional< size_t > &  max_file_count 
)

Definition at line 631 of file FileReader.cpp.

References insertFile(), and shared::local_glob_filter_sort_files().

635  : MultiFileReader(file_path, copy_params) {
636  auto file_paths = shared::local_glob_filter_sort_files(file_path, options);
637  if (max_file_count.has_value() && file_paths.size() > max_file_count.value()) {
638  file_paths.erase(file_paths.begin(),
639  file_paths.begin() + (file_paths.size() - max_file_count.value()));
640  }
641  for (const auto& file_path : file_paths) {
642  insertFile(file_path);
643  }
644 }
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:530
void insertFile(std::string location)
Definition: FileReader.cpp:666
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)

+ Here is the call graph for this function:

foreign_storage::LocalMultiFileReader::LocalMultiFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const rapidjson::Value &  value 
)

Definition at line 646 of file FileReader.cpp.

References foreign_storage::FileReader::copy_params_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::MultiFileReader::files_, and shared::is_compressed_file_extension().

649  : MultiFileReader(file_path, copy_params, value) {
650  // Constructs file from files_metadata
651  for (size_t index = 0; index < file_locations_.size(); index++) {
653  files_.emplace_back(std::make_unique<CompressedFileReader>(
654  file_locations_[index],
655  copy_params_,
656  value["files_metadata"].GetArray()[index]));
657  } else {
658  files_.emplace_back(std::make_unique<SingleTextFileReader>(
659  file_locations_[index],
660  copy_params_,
661  value["files_metadata"].GetArray()[index]));
662  }
663  }
664 }
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:530
import_export::CopyParams copy_params_
Definition: FileReader.h:128
std::vector< std::string > file_locations_
Definition: FileReader.h:381
bool is_compressed_file_extension(const std::string &location)
Definition: file_type.cpp:49
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:380

+ Here is the call graph for this function:

Member Function Documentation

void foreign_storage::LocalMultiFileReader::checkForMoreRows ( size_t  file_offset,
const shared::FilePathOptions options,
const ForeignServer server_options,
const UserMapping user_mapping 
)
overridevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed files
user_mapping- only needed for S3 backed files

Reimplemented from foreign_storage::FileReader.

Definition at line 680 of file FileReader.cpp.

References CHECK, CHECK_GT, shared::contains(), foreign_storage::MultiFileReader::cumulative_sizes_, foreign_storage::MultiFileReader::current_index_, foreign_storage::MultiFileReader::current_offset_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::FileReader::file_path_, foreign_storage::MultiFileReader::files_, getAllFilePaths(), insertFile(), foreign_storage::MultiFileReader::is_end_of_last_file_, foreign_storage::MultiFileReader::isScanFinished(), foreign_storage::MultiFileReader::starting_offset_, and foreign_storage::throw_removed_file_error().

684  {
685  // Look for new files
686  std::set<std::string> new_locations;
688  CHECK(file_offset == current_offset_);
689  if (boost::filesystem::is_directory(file_path_)) {
690  // Find all files in this directory
691  auto all_file_paths = getAllFilePaths(file_path_options);
692  for (const auto& path : all_file_paths) {
693  if (!shared::contains(file_locations_, path)) {
694  new_locations.insert(path);
695  }
696  }
697 
698  for (const auto& file_path : file_locations_) {
699  if (!shared::contains(all_file_paths, file_path)) {
700  throw_removed_file_error(file_path);
701  }
702  }
703  }
704 
705  if (!files_.empty()) {
706  // Check if last file has new data
707  size_t base = starting_offset_;
708  CHECK_GT(current_index_, size_t(0));
709  auto last_file_index = current_index_ - 1;
710  if (last_file_index > 0) {
711  base = cumulative_sizes_[last_file_index - 1];
712  }
713  files_.back()->checkForMoreRows(current_offset_ - base, file_path_options);
714  if (!files_.back()->isScanFinished()) {
715  // Go back to the last file, if more rows are found.
716  current_index_ = last_file_index;
717  is_end_of_last_file_ = false;
718  cumulative_sizes_.pop_back();
719  }
720  }
721 
722  if (new_locations.size() > 0) {
723  for (const auto& location : new_locations) {
724  insertFile(location);
725  }
726  }
727 }
bool contains(const T &container, const U &element)
Definition: misc.h:195
std::vector< std::string > getAllFilePaths(const shared::FilePathOptions &file_path_options) const override
Definition: FileReader.cpp:729
#define CHECK_GT(x, y)
Definition: Logger.h:305
void insertFile(std::string location)
Definition: FileReader.cpp:666
void throw_removed_file_error(const std::string &file_path)
bool isScanFinished() const override
Definition: FileReader.h:362
#define CHECK(condition)
Definition: Logger.h:291
std::vector< std::string > file_locations_
Definition: FileReader.h:381
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:380
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:384

+ Here is the call graph for this function:

std::vector< std::string > foreign_storage::LocalMultiFileReader::getAllFilePaths ( const shared::FilePathOptions file_path_options) const
overrideprivatevirtual

Implements foreign_storage::MultiFileReader.

Definition at line 729 of file FileReader.cpp.

References foreign_storage::FileReader::file_path_, and shared::local_glob_filter_sort_files().

Referenced by checkForMoreRows().

730  {
731  return shared::local_glob_filter_sort_files(file_path_, file_path_options);
732 }
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const FilePathOptions &options, const bool recurse)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::LocalMultiFileReader::insertFile ( std::string  location)
private

Definition at line 666 of file FileReader.cpp.

References foreign_storage::FileReader::copy_params_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::MultiFileReader::files_, and shared::is_compressed_file_extension().

Referenced by checkForMoreRows(), and LocalMultiFileReader().

666  {
667  if (shared::is_compressed_file_extension(location)) {
668  files_.emplace_back(std::make_unique<CompressedFileReader>(location, copy_params_));
669  } else {
670  files_.emplace_back(std::make_unique<SingleTextFileReader>(location, copy_params_));
671  }
672  if (files_.back()->isScanFinished()) {
673  // skip any initially empty files
674  files_.pop_back();
675  } else {
676  file_locations_.push_back(location);
677  }
678 }
import_export::CopyParams copy_params_
Definition: FileReader.h:128
std::vector< std::string > file_locations_
Definition: FileReader.h:381
bool is_compressed_file_extension(const std::string &location)
Definition: file_type.cpp:49
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:380

+ Here is the call graph for this function:

+ Here is the caller graph for this function:


The documentation for this class was generated from the following files: