OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::LocalMultiFileReader Class Reference

#include <CsvReader.h>

+ Inheritance diagram for foreign_storage::LocalMultiFileReader:
+ Collaboration diagram for foreign_storage::LocalMultiFileReader:

Public Member Functions

 LocalMultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 LocalMultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
void checkForMoreRows (size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
- Public Member Functions inherited from foreign_storage::MultiFileReader
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t getRemainingSize () override
 
bool isRemainingSizeKnown () override
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
- Public Member Functions inherited from foreign_storage::CsvReader
 CsvReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~CsvReader ()=default
 

Private Member Functions

void insertFile (std::string location)
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::MultiFileReader
std::vector< std::unique_ptr
< CsvReader > > 
files_
 
std::vector< std::string > file_locations_
 
std::vector< size_t > cumulative_sizes_
 
size_t current_index_
 
size_t current_offset_
 
- Protected Attributes inherited from foreign_storage::CsvReader
import_export::CopyParams copy_params_
 
std::string file_path_
 

Detailed Description

Definition at line 325 of file CsvReader.h.

Constructor & Destructor Documentation

foreign_storage::LocalMultiFileReader::LocalMultiFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params 
)

Definition at line 529 of file CsvReader.cpp.

References insertFile().

531  : MultiFileReader(file_path, copy_params) {
532  std::set<std::string> file_locations;
533  if (boost::filesystem::is_directory(file_path)) {
534  // Find all files in this directory
535  for (boost::filesystem::recursive_directory_iterator it(file_path), eit; it != eit;
536  ++it) {
537  if (!boost::filesystem::is_directory(it->path())) {
538  file_locations.insert(it->path().string());
539  }
540  }
541  } else {
542  file_locations.insert(file_path);
543  }
544  for (const auto& location : file_locations) {
545  insertFile(location);
546  }
547 }
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:476
void insertFile(std::string location)
Definition: CsvReader.cpp:589

+ Here is the call graph for this function:

foreign_storage::LocalMultiFileReader::LocalMultiFileReader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const rapidjson::Value &  value 
)

Definition at line 569 of file CsvReader.cpp.

References foreign_storage::CsvReader::copy_params_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::MultiFileReader::files_, and foreign_storage::anonymous_namespace{CsvReader.cpp}::is_compressed_file().

572  : MultiFileReader(file_path, copy_params, value) {
573  // Constructs file from files_metadata
574  for (size_t index = 0; index < file_locations_.size(); index++) {
575  if (is_compressed_file(file_locations_[index])) {
576  files_.emplace_back(std::make_unique<CompressedFileReader>(
577  file_locations_[index],
578  copy_params_,
579  value["files_metadata"].GetArray()[index]));
580  } else {
581  files_.emplace_back(
582  std::make_unique<SingleFileReader>(file_locations_[index],
583  copy_params_,
584  value["files_metadata"].GetArray()[index]));
585  }
586  }
587 }
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:476
bool is_compressed_file(const std::string &location)
Definition: CsvReader.cpp:550
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
std::vector< std::string > file_locations_
Definition: CsvReader.h:314

+ Here is the call graph for this function:

Member Function Documentation

void foreign_storage::LocalMultiFileReader::checkForMoreRows ( size_t  file_offset,
const ForeignServer server_options,
const UserMapping *  user_mapping 
)
overridevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed CSV
user_mapping- only needed for S3 backed CSV

Reimplemented from foreign_storage::CsvReader.

Definition at line 603 of file CsvReader.cpp.

References CHECK, foreign_storage::MultiFileReader::cumulative_sizes_, foreign_storage::MultiFileReader::current_index_, foreign_storage::MultiFileReader::current_offset_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::CsvReader::file_path_, foreign_storage::MultiFileReader::files_, insertFile(), foreign_storage::MultiFileReader::isScanFinished(), and foreign_storage::throw_removed_file_error().

605  {
606  // Look for new files
607  std::set<std::string> new_locations;
609  CHECK(file_offset == current_offset_);
610  if (boost::filesystem::is_directory(file_path_)) {
611  // Find all files in this directory
612  std::set<std::string> all_file_paths;
613  for (boost::filesystem::recursive_directory_iterator it(file_path_), eit; it != eit;
614  ++it) {
615  bool new_file =
616  std::find(file_locations_.begin(), file_locations_.end(), it->path()) ==
617  file_locations_.end();
618  if (!boost::filesystem::is_directory(it->path()) && new_file) {
619  new_locations.insert(it->path().string());
620  }
621  all_file_paths.emplace(it->path().string());
622  }
623 
624  for (const auto& file_path : file_locations_) {
625  if (all_file_paths.find(file_path) == all_file_paths.end()) {
626  throw_removed_file_error(file_path);
627  }
628  }
629  }
630  if (new_locations.size() > 0) {
631  for (const auto& location : new_locations) {
632  insertFile(location);
633  }
634  } else if (files_.size() == 1) {
635  // Single file, check if it has new data
636  files_[0].get()->checkForMoreRows(file_offset);
637  if (!files_[0].get()->isScanFinished()) {
638  current_index_ = 0;
639  cumulative_sizes_ = {};
640  }
641  }
642 }
void insertFile(std::string location)
Definition: CsvReader.cpp:589
void throw_removed_file_error(const std::string &file_path)
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
#define CHECK(condition)
Definition: Logger.h:197
bool isScanFinished() override
Definition: CsvReader.h:307
std::vector< std::string > file_locations_
Definition: CsvReader.h:314
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:317

+ Here is the call graph for this function:

void foreign_storage::LocalMultiFileReader::insertFile ( std::string  location)
private

Definition at line 589 of file CsvReader.cpp.

References foreign_storage::CsvReader::copy_params_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::MultiFileReader::files_, and foreign_storage::anonymous_namespace{CsvReader.cpp}::is_compressed_file().

Referenced by checkForMoreRows(), and LocalMultiFileReader().

589  {
590  if (is_compressed_file(location)) {
591  files_.emplace_back(std::make_unique<CompressedFileReader>(location, copy_params_));
592  } else {
593  files_.emplace_back(std::make_unique<SingleFileReader>(location, copy_params_));
594  }
595  if (files_.back()->isScanFinished()) {
596  // skip any initially empty files
597  files_.pop_back();
598  } else {
599  file_locations_.push_back(location);
600  }
601 }
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
bool is_compressed_file(const std::string &location)
Definition: CsvReader.cpp:550
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
std::vector< std::string > file_locations_
Definition: CsvReader.h:314

+ Here is the call graph for this function:

+ Here is the caller graph for this function:


The documentation for this class was generated from the following files: