OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::LocalMultiFileReader Class Reference

#include <FileReader.h>

+ Inheritance diagram for foreign_storage::LocalMultiFileReader:
+ Collaboration diagram for foreign_storage::LocalMultiFileReader:

Public Member Functions

 LocalMultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const std::optional< std::string > &regex_path_filter, const std::optional< std::string > &file_sort_order_by, const std::optional< std::string > &file_sort_regex)
 
 LocalMultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
void checkForMoreRows (size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
- Public Member Functions inherited from foreign_storage::MultiFileReader
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t getRemainingSize () override
 
bool isRemainingSizeKnown () override
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
FirstLineByFilePath getFirstLineForEachFile () const override
 
bool isEndOfLastFile () override
 
- Public Member Functions inherited from foreign_storage::FileReader
 FileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~FileReader ()=default
 

Private Member Functions

void insertFile (std::string location)
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::MultiFileReader
std::vector< std::unique_ptr
< FileReader > > 
files_
 
std::vector< std::string > file_locations_
 
std::vector< size_t > cumulative_sizes_
 
size_t current_index_
 
size_t current_offset_
 
bool is_end_of_last_file_
 
- Protected Attributes inherited from foreign_storage::FileReader
import_export::CopyParams copy_params_
 
std::string file_path_
 

Detailed Description

Definition at line 371 of file FileReader.h.

Constructor & Destructor Documentation

foreign_storage::LocalMultiFileReader::LocalMultiFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const std::optional< std::string > &  regex_path_filter,
const std::optional< std::string > &  file_sort_order_by,
const std::optional< std::string > &  file_sort_regex 
)

Definition at line 590 of file FileReader.cpp.

References insertFile(), and shared::local_glob_filter_sort_files().

596  : MultiFileReader(file_path, copy_params) {
597  auto found_file_locations = shared::local_glob_filter_sort_files(
598  file_path, regex_path_filter, file_sort_order_by, file_sort_regex);
599  for (const auto& location : found_file_locations) {
600  insertFile(location);
601  }
602 }
std::vector< std::string > local_glob_filter_sort_files(const std::string &file_path, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex)
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:519
void insertFile(std::string location)
Definition: FileReader.cpp:632

+ Here is the call graph for this function:

foreign_storage::LocalMultiFileReader::LocalMultiFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const rapidjson::Value &  value 
)

Definition at line 612 of file FileReader.cpp.

References foreign_storage::FileReader::copy_params_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::MultiFileReader::files_, and foreign_storage::anonymous_namespace{FileReader.cpp}::is_compressed_file().

615  : MultiFileReader(file_path, copy_params, value) {
616  // Constructs file from files_metadata
617  for (size_t index = 0; index < file_locations_.size(); index++) {
618  if (is_compressed_file(file_locations_[index])) {
619  files_.emplace_back(std::make_unique<CompressedFileReader>(
620  file_locations_[index],
621  copy_params_,
622  value["files_metadata"].GetArray()[index]));
623  } else {
624  files_.emplace_back(std::make_unique<SingleTextFileReader>(
625  file_locations_[index],
626  copy_params_,
627  value["files_metadata"].GetArray()[index]));
628  }
629  }
630 }
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: FileReader.cpp:519
import_export::CopyParams copy_params_
Definition: FileReader.h:118
bool is_compressed_file(const std::string &location)
Definition: FileReader.cpp:605
std::vector< std::string > file_locations_
Definition: FileReader.h:358
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:357

+ Here is the call graph for this function:

Member Function Documentation

void foreign_storage::LocalMultiFileReader::checkForMoreRows ( size_t  file_offset,
const ForeignServer server_options,
const UserMapping user_mapping 
)
overridevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed files
user_mapping- only needed for S3 backed files

Reimplemented from foreign_storage::FileReader.

Definition at line 646 of file FileReader.cpp.

References CHECK, foreign_storage::MultiFileReader::cumulative_sizes_, foreign_storage::MultiFileReader::current_index_, foreign_storage::MultiFileReader::current_offset_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::FileReader::file_path_, foreign_storage::MultiFileReader::files_, insertFile(), foreign_storage::MultiFileReader::isScanFinished(), and foreign_storage::throw_removed_file_error().

648  {
649  // Look for new files
650  std::set<std::string> new_locations;
652  CHECK(file_offset == current_offset_);
653  if (boost::filesystem::is_directory(file_path_)) {
654  // Find all files in this directory
655  std::set<std::string> all_file_paths;
656  for (boost::filesystem::recursive_directory_iterator
657  it(file_path_, boost::filesystem::symlink_option::recurse),
658  eit;
659  it != eit;
660  ++it) {
661  bool new_file =
662  std::find(file_locations_.begin(), file_locations_.end(), it->path()) ==
663  file_locations_.end();
664  if (!boost::filesystem::is_directory(it->path()) && new_file) {
665  new_locations.insert(it->path().string());
666  }
667  all_file_paths.emplace(it->path().string());
668  }
669 
670  for (const auto& file_path : file_locations_) {
671  if (all_file_paths.find(file_path) == all_file_paths.end()) {
672  throw_removed_file_error(file_path);
673  }
674  }
675  }
676  if (new_locations.size() > 0) {
677  for (const auto& location : new_locations) {
678  insertFile(location);
679  }
680  } else if (files_.size() == 1) {
681  // Single file, check if it has new data
682  files_[0].get()->checkForMoreRows(file_offset);
683  if (!files_[0].get()->isScanFinished()) {
684  current_index_ = 0;
685  cumulative_sizes_ = {};
686  }
687  }
688 }
void insertFile(std::string location)
Definition: FileReader.cpp:632
void throw_removed_file_error(const std::string &file_path)
#define CHECK(condition)
Definition: Logger.h:209
std::vector< std::string > file_locations_
Definition: FileReader.h:358
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:357
std::vector< size_t > cumulative_sizes_
Definition: FileReader.h:361

+ Here is the call graph for this function:

void foreign_storage::LocalMultiFileReader::insertFile ( std::string  location)
private

Definition at line 632 of file FileReader.cpp.

References foreign_storage::FileReader::copy_params_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::MultiFileReader::files_, and foreign_storage::anonymous_namespace{FileReader.cpp}::is_compressed_file().

Referenced by checkForMoreRows(), and LocalMultiFileReader().

632  {
633  if (is_compressed_file(location)) {
634  files_.emplace_back(std::make_unique<CompressedFileReader>(location, copy_params_));
635  } else {
636  files_.emplace_back(std::make_unique<SingleTextFileReader>(location, copy_params_));
637  }
638  if (files_.back()->isScanFinished()) {
639  // skip any initially empty files
640  files_.pop_back();
641  } else {
642  file_locations_.push_back(location);
643  }
644 }
import_export::CopyParams copy_params_
Definition: FileReader.h:118
bool is_compressed_file(const std::string &location)
Definition: FileReader.cpp:605
std::vector< std::string > file_locations_
Definition: FileReader.h:358
std::vector< std::unique_ptr< FileReader > > files_
Definition: FileReader.h:357

+ Here is the call graph for this function:

+ Here is the caller graph for this function:


The documentation for this class was generated from the following files: