OmniSciDB  2e3a973ef4
foreign_storage::MultiS3Reader Class Reference

#include <CsvReaderS3.h>

+ Inheritance diagram for foreign_storage::MultiS3Reader:
+ Collaboration diagram for foreign_storage::MultiS3Reader:

Public Member Functions

 MultiS3Reader (const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server_options, const UserMapping *user_mapping)
 
 MultiS3Reader (const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server_options, const UserMapping *user_mapping, const rapidjson::Value &value)
 
void checkForMoreRows (size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
- Public Member Functions inherited from foreign_storage::MultiFileReader
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
 MultiFileReader (const std::string &file_path, const import_export::CopyParams &copy_params, const rapidjson::Value &value)
 
size_t getRemainingSize () override
 
bool isRemainingSizeKnown () override
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () override
 
- Public Member Functions inherited from foreign_storage::CsvReader
 CsvReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~CsvReader ()=default
 

Private Member Functions

void skipHeader ()
 

Private Attributes

size_t file_size_
 
bool scan_finished_
 
std::unique_ptr< Aws::S3::S3Client > s3_client_
 
std::vector< size_t > file_sizes_
 
std::string bucket_name_
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::MultiFileReader
std::vector< std::unique_ptr< CsvReader > > files_
 
std::vector< std::string > file_locations_
 
std::vector< size_t > cumulative_sizes_
 
size_t current_index_
 
size_t current_offset_
 
- Protected Attributes inherited from foreign_storage::CsvReader
import_export::CopyParams copy_params_
 
std::string file_path_
 

Detailed Description

Definition at line 28 of file CsvReaderS3.h.

Constructor & Destructor Documentation

◆ MultiS3Reader() [1/2]

foreign_storage::MultiS3Reader::MultiS3Reader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const ForeignServer server_options,
const UserMapping *  user_mapping 
)

Definition at line 262 of file CsvReaderS3.cpp.

References bucket_name_, foreign_storage::MultiFileReader::file_locations_, file_sizes_, foreign_storage::MultiFileReader::files_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_credentials(), foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_s3_config(), foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::list_files_s3(), foreign_storage::OptionsContainer::options, and s3_client_.

266  : MultiFileReader(prefix_name, copy_params) {
267  auto credentials = get_credentials(user_mapping);
268  auto config = get_s3_config(foreign_server);
269  s3_client_.reset(new Aws::S3::S3Client(credentials, config));
270  bucket_name_ = foreign_server->options.find(ForeignServer::S3_BUCKET_KEY)->second;
271  std::set<S3FileInfo> file_info_set;
272  list_files_s3(s3_client_, prefix_name, bucket_name_, file_info_set);
273  for (const auto& file_info : file_info_set) {
274  files_.emplace_back(std::make_unique<CsvReaderS3>(
275  file_info.first, file_info.second, copy_params, foreign_server, user_mapping));
276  file_locations_.push_back(file_info.first);
277  file_sizes_.push_back(file_info.second);
278  }
279 }
Aws::Client::ClientConfiguration get_s3_config(const ForeignServer *server_options)
Definition: CsvReaderS3.cpp:29
std::vector< size_t > file_sizes_
Definition: CsvReaderS3.h:52
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:476
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:51
std::shared_ptr< Aws::Auth::AWSCredentialsProvider > get_credentials(const UserMapping *user_mapping)
Definition: CsvReaderS3.cpp:80
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
std::vector< std::string > file_locations_
Definition: CsvReader.h:314
void list_files_s3(std::unique_ptr< Aws::S3::S3Client > &s3_client, const std::string &prefix_name, const std::string &bucket_name, std::set< S3FileInfo > &file_info_set)
+ Here is the call graph for this function:

◆ MultiS3Reader() [2/2]

foreign_storage::MultiS3Reader::MultiS3Reader ( const std::string &  file_path,
const import_export::CopyParams copy_params,
const ForeignServer server_options,
const UserMapping *  user_mapping,
const rapidjson::Value &  value 
)

Definition at line 281 of file CsvReaderS3.cpp.

References bucket_name_, CHECK, foreign_storage::MultiFileReader::file_locations_, file_sizes_, foreign_storage::MultiFileReader::files_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_credentials(), foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_s3_config(), foreign_storage::json_utils::get_value_from_object(), foreign_storage::OptionsContainer::options, and s3_client_.

286  : MultiFileReader(file_path, copy_params, value) {
287  auto credentials = get_credentials(user_mapping);
288  auto config = get_s3_config(foreign_server);
289  s3_client_.reset(new Aws::S3::S3Client(credentials, config));
290  bucket_name_ = foreign_server->options.find(ForeignServer::S3_BUCKET_KEY)->second;
291  // reconstruct files from metadata
292  CHECK(value.HasMember("files_metadata"));
293  for (size_t index = 0; index < file_locations_.size(); index++) {
294  files_.emplace_back(
295  std::make_unique<CsvReaderS3>(file_locations_[index],
296  copy_params,
297  foreign_server,
298  user_mapping,
299  value["files_metadata"].GetArray()[index]));
300  }
301  json_utils::get_value_from_object(value, file_sizes_, "file_sizes");
302 }
Aws::Client::ClientConfiguration get_s3_config(const ForeignServer *server_options)
Definition: CsvReaderS3.cpp:29
std::vector< size_t > file_sizes_
Definition: CsvReaderS3.h:52
MultiFileReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.cpp:476
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:51
std::shared_ptr< Aws::Auth::AWSCredentialsProvider > get_credentials(const UserMapping *user_mapping)
Definition: CsvReaderS3.cpp:80
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
#define CHECK(condition)
Definition: Logger.h:197
std::vector< std::string > file_locations_
Definition: CsvReader.h:314
+ Here is the call graph for this function:

Member Function Documentation

◆ checkForMoreRows()

void foreign_storage::MultiS3Reader::checkForMoreRows ( size_t  file_offset,
const ForeignServer server_options,
const UserMapping *  user_mapping 
)
overridevirtual

Rescan the target files Throws an exception if the rescan fails (ie files are not in a valid appended state or not supported)

Parameters
file_offset- where to resume the scan from (end of the last row) as not all of the bytes may have been consumed by the upstream compoennet
server_options- only needed for S3 backed CSV
user_mapping- only needed for S3 backed CSV

Reimplemented from foreign_storage::CsvReader.

Definition at line 310 of file CsvReaderS3.cpp.

References bucket_name_, CHECK, foreign_storage::CsvReader::copy_params_, foreign_storage::MultiFileReader::cumulative_sizes_, foreign_storage::MultiFileReader::current_index_, foreign_storage::MultiFileReader::current_offset_, foreign_storage::MultiFileReader::file_locations_, foreign_storage::CsvReader::file_path_, file_sizes_, foreign_storage::MultiFileReader::files_, foreign_storage::CsvReaderS3::increaseFileSize(), foreign_storage::MultiFileReader::isScanFinished(), foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::list_files_s3(), and s3_client_.

312  {
314  CHECK(file_offset == current_offset_);
315  CHECK(foreign_server != nullptr);
316 
317  // Look for new files
318  std::set<S3FileInfo> file_info_set;
320  int new_files = 0;
321  for (const auto& file_info : file_info_set) {
322  if (std::find(file_locations_.begin(), file_locations_.end(), file_info.first) ==
323  file_locations_.end()) {
324  files_.emplace_back(std::make_unique<CsvReaderS3>(
325  file_info.first, file_info.second, copy_params_, foreign_server, user_mapping));
326  file_locations_.push_back(file_info.first);
327  new_files++;
328  }
329  }
330  // If no new files added and only one file in archive, check for new rows
331  if (new_files == 0 && files_.size() == 1) {
332  if (file_info_set.size() < 1 ||
333  find(file_locations_.begin(),
334  file_locations_.end(),
335  file_info_set.begin()->first) == file_locations_.end()) {
336  throw std::runtime_error{
337  "Foreign table refreshed with APPEND mode missing entry \"" +
338  file_locations_[0] + "\"."};
339  }
340  if (file_info_set.begin()->second < file_sizes_[0]) {
341  throw std::runtime_error{
342  "Refresh of foreign table created with APPEND update mode failed as remote "
343  "file "
344  "reduced in size: \"" +
345  file_locations_[0] + "\"."};
346  }
347 
348  if (file_info_set.begin()->second > file_sizes_[0]) {
349  CsvReaderS3* s3_reader = dynamic_cast<CsvReaderS3*>(files_[0].get());
350  CHECK(s3_reader != nullptr);
351  s3_reader->increaseFileSize(file_info_set.begin()->second);
352  file_sizes_[0] = file_info_set.begin()->second;
353  current_index_ = 0;
354  cumulative_sizes_ = {};
355  }
356  }
357 }
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
std::vector< size_t > file_sizes_
Definition: CsvReaderS3.h:52
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:51
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
#define CHECK(condition)
Definition: Logger.h:197
bool isScanFinished() override
Definition: CsvReader.h:307
std::vector< std::string > file_locations_
Definition: CsvReader.h:314
void list_files_s3(std::unique_ptr< Aws::S3::S3Client > &s3_client, const std::string &prefix_name, const std::string &bucket_name, std::set< S3FileInfo > &file_info_set)
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:317
+ Here is the call graph for this function:

◆ serialize()

void foreign_storage::MultiS3Reader::serialize ( rapidjson::Value &  value,
rapidjson::Document::AllocatorType &  allocator 
) const
overridevirtual

Serialize internal state to given json object This Json will later be used to restore the reader state through a constructor must be called when isScanFinished() is true

Parameters
value- json object to store needed state to this function can store any needed data or none
allocator- allocator to use for json contruction

Reimplemented from foreign_storage::MultiFileReader.

Definition at line 304 of file CsvReaderS3.cpp.

References foreign_storage::json_utils::add_value_to_object(), file_sizes_, and foreign_storage::MultiFileReader::serialize().

Referenced by foreign_storage::CsvReaderS3::isRemainingSizeKnown().

305  {
306  json_utils::add_value_to_object(value, file_sizes_, "file_sizes", allocator);
307  MultiFileReader::serialize(value, allocator);
308 };
std::vector< size_t > file_sizes_
Definition: CsvReaderS3.h:52
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:495
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ skipHeader()

void foreign_storage::MultiS3Reader::skipHeader ( )
private

Referenced by foreign_storage::CsvReaderS3::isRemainingSizeKnown().

+ Here is the caller graph for this function:

Member Data Documentation

◆ bucket_name_

std::string foreign_storage::MultiS3Reader::bucket_name_
private

Definition at line 53 of file CsvReaderS3.h.

Referenced by checkForMoreRows(), and MultiS3Reader().

◆ file_size_

size_t foreign_storage::MultiS3Reader::file_size_
private

Definition at line 48 of file CsvReaderS3.h.

Referenced by foreign_storage::CsvReaderS3::getRemainingSize().

◆ file_sizes_

std::vector<size_t> foreign_storage::MultiS3Reader::file_sizes_
private

Definition at line 52 of file CsvReaderS3.h.

Referenced by checkForMoreRows(), MultiS3Reader(), and serialize().

◆ s3_client_

std::unique_ptr<Aws::S3::S3Client> foreign_storage::MultiS3Reader::s3_client_
private

Definition at line 51 of file CsvReaderS3.h.

Referenced by checkForMoreRows(), and MultiS3Reader().

◆ scan_finished_

bool foreign_storage::MultiS3Reader::scan_finished_
private

Definition at line 50 of file CsvReaderS3.h.

Referenced by foreign_storage::CsvReaderS3::isScanFinished().


The documentation for this class was generated from the following files: