OmniSciDB  95562058bd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
foreign_storage::CsvReaderS3 Class Reference

#include <CsvReaderS3.h>

+ Inheritance diagram for foreign_storage::CsvReaderS3:
+ Collaboration diagram for foreign_storage::CsvReaderS3:

Public Member Functions

 CsvReaderS3 (const std::string &obj_key, size_t file_size, const import_export::CopyParams &copy_params, const ForeignServer *server_options, const UserMapping *user_mapping)
 
 CsvReaderS3 (const std::string &obj_key, const import_export::CopyParams &copy_params, const ForeignServer *server_options, const UserMapping *user_mapping, const rapidjson::Value &value)
 
size_t read (void *buffer, size_t max_size) override
 
size_t readRegion (void *buffer, size_t offset, size_t size) override
 
bool isScanFinished () override
 
size_t getRemainingSize () override
 
bool isRemainingSizeKnown () override
 
void serialize (rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
 
void increaseFileSize (size_t new_size)
 
- Public Member Functions inherited from foreign_storage::CsvReader
 CsvReader (const std::string &file_path, const import_export::CopyParams &copy_params)
 
virtual ~CsvReader ()=default
 
virtual void checkForMoreRows (size_t file_offset, const ForeignServer *server_options=nullptr, const UserMapping *user_mapping=nullptr)
 

Private Member Functions

void skipHeader ()
 

Private Attributes

size_t file_size_
 
bool scan_finished_
 
std::unique_ptr
< Aws::S3::S3Client > 
s3_client_
 
std::string obj_key_
 
std::string bucket_name_
 
import_export::CopyParams copy_params_
 
size_t current_offset_
 
size_t header_offset_
 

Additional Inherited Members

- Protected Attributes inherited from foreign_storage::CsvReader
import_export::CopyParams copy_params_
 
std::string file_path_
 

Detailed Description

Definition at line 56 of file CsvReaderS3.h.

Constructor & Destructor Documentation

foreign_storage::CsvReaderS3::CsvReaderS3 ( const std::string &  obj_key,
size_t  file_size,
const import_export::CopyParams copy_params,
const ForeignServer server_options,
const UserMapping *  user_mapping 
)

Definition at line 96 of file CsvReaderS3.cpp.

References bucket_name_, file_size_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_credentials(), foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_s3_config(), header_offset_, foreign_storage::OptionsContainer::options, s3_client_, scan_finished_, and skipHeader().

101  : CsvReader(obj_key, copy_params)
103  , scan_finished_(false)
104  , obj_key_(obj_key)
105  , copy_params_(copy_params)
106  , current_offset_(0)
107  , header_offset_(0) {
108  bucket_name_ = server_options->options.find(ForeignServer::S3_BUCKET_KEY)->second;
109  s3_client_.reset(new Aws::S3::S3Client(get_credentials(user_mapping),
110  get_s3_config(server_options)));
111  skipHeader();
112  if (header_offset_ >= file_size_) {
113  scan_finished_ = true;
114  }
116 }
Aws::Client::ClientConfiguration get_s3_config(const ForeignServer *server_options)
Definition: CsvReaderS3.cpp:29
CsvReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.h:36
std::shared_ptr< Aws::Auth::AWSCredentialsProvider > get_credentials(const UserMapping *user_mapping)
Definition: CsvReaderS3.cpp:80
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:94
import_export::CopyParams copy_params_
Definition: CsvReaderS3.h:98
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31

+ Here is the call graph for this function:

foreign_storage::CsvReaderS3::CsvReaderS3 ( const std::string &  obj_key,
const import_export::CopyParams copy_params,
const ForeignServer server_options,
const UserMapping *  user_mapping,
const rapidjson::Value &  value 
)

Definition at line 118 of file CsvReaderS3.cpp.

References bucket_name_, file_size_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_credentials(), foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_s3_config(), foreign_storage::json_utils::get_value_from_object(), header_offset_, foreign_storage::OptionsContainer::options, s3_client_, and scan_finished_.

123  : CsvReader(obj_key, copy_params)
124  , scan_finished_(false)
125  , obj_key_(obj_key)
126  , copy_params_(copy_params)
127  , current_offset_(0)
128  , header_offset_(0) {
129  bucket_name_ = server_options->options.find(ForeignServer::S3_BUCKET_KEY)->second;
130  s3_client_.reset(new Aws::S3::S3Client(get_credentials(user_mapping),
131  get_s3_config(server_options)));
132  scan_finished_ = true;
133  json_utils::get_value_from_object(value, header_offset_, "header_offset");
134  json_utils::get_value_from_object(value, file_size_, "file_size");
135 }
Aws::Client::ClientConfiguration get_s3_config(const ForeignServer *server_options)
Definition: CsvReaderS3.cpp:29
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
CsvReader(const std::string &file_path, const import_export::CopyParams &copy_params)
Definition: CsvReader.h:36
std::shared_ptr< Aws::Auth::AWSCredentialsProvider > get_credentials(const UserMapping *user_mapping)
Definition: CsvReaderS3.cpp:80
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:94
import_export::CopyParams copy_params_
Definition: CsvReaderS3.h:98

+ Here is the call graph for this function:

Member Function Documentation

size_t foreign_storage::CsvReaderS3::getRemainingSize ( )
inlineoverridevirtual
Returns
size of the CSV remaining to be read

Implements foreign_storage::CsvReader.

Definition at line 79 of file CsvReaderS3.h.

References current_offset_, and file_size_.

void foreign_storage::CsvReaderS3::increaseFileSize ( size_t  new_size)

Definition at line 205 of file CsvReaderS3.cpp.

References CHECK, CHECK_GT, file_size_, and scan_finished_.

Referenced by foreign_storage::MultiS3Reader::checkForMoreRows().

205  {
207  CHECK_GT(new_size, file_size_);
209  file_size_ = new_size;
210  scan_finished_ = false;
211 }
#define CHECK_GT(x, y)
Definition: Logger.h:209
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the caller graph for this function:

bool foreign_storage::CsvReaderS3::isRemainingSizeKnown ( )
inlineoverridevirtual
Returns
if remaining size is known

Implements foreign_storage::CsvReader.

Definition at line 81 of file CsvReaderS3.h.

81 { return true; };
bool foreign_storage::CsvReaderS3::isScanFinished ( )
inlineoverridevirtual
Returns
true if the entire CSV has been read

Implements foreign_storage::CsvReader.

Definition at line 77 of file CsvReaderS3.h.

References scan_finished_.

Referenced by readRegion().

77 { return scan_finished_; }

+ Here is the caller graph for this function:

size_t foreign_storage::CsvReaderS3::read ( void *  buffer,
size_t  max_size 
)
overridevirtual

Read up to max_size bytes from archive into buffer starting starting from the end of the last read

Parameters
buffer- buffer to load into
max_size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::CsvReader.

Definition at line 144 of file CsvReaderS3.cpp.

References bucket_name_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::create_request(), current_offset_, file_size_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_access_error_message(), header_offset_, obj_key_, s3_client_, and scan_finished_.

Referenced by readRegion().

144  {
145  size_t byte_start = header_offset_ + current_offset_;
146  size_t byte_end = byte_start + max_size;
147  auto object_request = create_request(bucket_name_, obj_key_, byte_start, byte_end);
148  auto get_object_outcome = s3_client_->GetObject(object_request);
149 
150  if (!get_object_outcome.IsSuccess()) {
151  throw std::runtime_error{
153  obj_key_,
154  get_object_outcome.GetError().GetExceptionName(),
155  get_object_outcome.GetError().GetMessage())};
156  }
157  get_object_outcome.GetResult().GetBody().read(static_cast<char*>(buffer), max_size);
158 
159  size_t read_bytes = get_object_outcome.GetResult().GetBody().gcount();
160  current_offset_ += read_bytes;
161  if (current_offset_ + header_offset_ >= file_size_) {
162  scan_finished_ = true;
163  }
164  return read_bytes;
165 }
std::string get_access_error_message(const std::string &bucket, const std::string &object_name, const std::string &exception_name, const std::string &message)
Definition: CsvReaderS3.cpp:72
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:94
Aws::S3::Model::GetObjectRequest create_request(const std::string &bucket_name, const std::string &obj_name, size_t start=0, size_t end=0)
Definition: CsvReaderS3.cpp:58

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

size_t foreign_storage::CsvReaderS3::readRegion ( void *  buffer,
size_t  offset,
size_t  size 
)
inlineoverridevirtual

Read up to max_size bytes from archive, starting at given offset isScanFinished() must return true to use readRegion

Parameters
buffer- buffer to load into
offset- starting point into the archive to read
size- maximum number of bytes to read into the buffer
Returns
number of bytes actually read

Implements foreign_storage::CsvReader.

Definition at line 71 of file CsvReaderS3.h.

References CHECK, current_offset_, isScanFinished(), and read().

71  {
73  current_offset_ = offset;
74  return read(buffer, size);
75  }
bool isScanFinished() override
Definition: CsvReaderS3.h:77
size_t read(void *buffer, size_t max_size) override
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

void foreign_storage::CsvReaderS3::serialize ( rapidjson::Value &  value,
rapidjson::Document::AllocatorType &  allocator 
) const
overridevirtual

Serialize internal state to given json object This Json will later be used to restore the reader state through a constructor must be called when isScanFinished() is true

Parameters
value- json object to store needed state to this function can store any needed data or none
allocator- allocator to use for json contruction

Implements foreign_storage::CsvReader.

Definition at line 137 of file CsvReaderS3.cpp.

References foreign_storage::json_utils::add_value_to_object(), CHECK, file_size_, header_offset_, and scan_finished_.

138  {
140  json_utils::add_value_to_object(value, header_offset_, "header_offset", allocator);
141  json_utils::add_value_to_object(value, file_size_, "file_size", allocator);
142 };
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
#define CHECK(condition)
Definition: Logger.h:197

+ Here is the call graph for this function:

void foreign_storage::CsvReaderS3::skipHeader ( )
private

Definition at line 167 of file CsvReaderS3.cpp.

References bucket_name_, copy_params_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::create_request(), file_size_, foreign_storage::anonymous_namespace{CsvReaderS3.cpp}::get_access_error_message(), import_export::CopyParams::has_header, header_offset_, import_export::NO_HEADER, obj_key_, and s3_client_.

Referenced by CsvReaderS3().

167  {
169  size_t header_size = 1024;
170  bool header_found = false;
171  while (header_found == false) {
172  auto object_request = create_request(bucket_name_, obj_key_, 0, header_size);
173 
174  std::unique_ptr<char[]> header_buff = std::make_unique<char[]>(header_size);
175  auto get_object_outcome = s3_client_->GetObject(object_request);
176 
177  if (!get_object_outcome.IsSuccess()) {
178  throw std::runtime_error{
180  obj_key_,
181  get_object_outcome.GetError().GetExceptionName(),
182  get_object_outcome.GetError().GetMessage())};
183  }
184 
185  get_object_outcome.GetResult().GetBody().getline((header_buff.get()), header_size);
186  if (get_object_outcome.GetResult().GetBody().fail()) {
187  // We didnt get a full line
188  if (header_size == file_size_) {
189  // File only contains one header line
191  break;
192  }
193  header_size *= 2;
194  if (header_size > file_size_) {
195  header_size = file_size_;
196  }
197  } else {
198  header_offset_ = get_object_outcome.GetResult().GetBody().gcount();
199  header_found = true;
200  }
201  }
202  }
203 }
std::string get_access_error_message(const std::string &bucket, const std::string &object_name, const std::string &exception_name, const std::string &message)
Definition: CsvReaderS3.cpp:72
ImportHeaderRow has_header
Definition: CopyParams.h:48
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:94
import_export::CopyParams copy_params_
Definition: CsvReaderS3.h:98
Aws::S3::Model::GetObjectRequest create_request(const std::string &bucket_name, const std::string &obj_name, size_t start=0, size_t end=0)
Definition: CsvReaderS3.cpp:58

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

Member Data Documentation

std::string foreign_storage::CsvReaderS3::bucket_name_
private

Definition at line 97 of file CsvReaderS3.h.

Referenced by CsvReaderS3(), read(), and skipHeader().

import_export::CopyParams foreign_storage::CsvReaderS3::copy_params_
private

Definition at line 98 of file CsvReaderS3.h.

Referenced by skipHeader().

size_t foreign_storage::CsvReaderS3::current_offset_
private

Definition at line 100 of file CsvReaderS3.h.

Referenced by getRemainingSize(), read(), and readRegion().

size_t foreign_storage::CsvReaderS3::file_size_
private
size_t foreign_storage::CsvReaderS3::header_offset_
private

Definition at line 101 of file CsvReaderS3.h.

Referenced by CsvReaderS3(), read(), serialize(), and skipHeader().

std::string foreign_storage::CsvReaderS3::obj_key_
private

Definition at line 96 of file CsvReaderS3.h.

Referenced by read(), and skipHeader().

std::unique_ptr<Aws::S3::S3Client> foreign_storage::CsvReaderS3::s3_client_
private

Definition at line 94 of file CsvReaderS3.h.

Referenced by CsvReaderS3(), read(), and skipHeader().

bool foreign_storage::CsvReaderS3::scan_finished_
private

Definition at line 93 of file CsvReaderS3.h.

Referenced by CsvReaderS3(), increaseFileSize(), isScanFinished(), read(), and serialize().


The documentation for this class was generated from the following files: