OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
S3Archive Class Reference

#include <S3Archive.h>

+ Inheritance diagram for S3Archive:
+ Collaboration diagram for S3Archive:

Public Member Functions

 S3Archive (const std::string &url, const bool plain_text)
 
 S3Archive (const std::string &url, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_region, const std::string &s3_endpoint, const bool plain_text)
 
 ~S3Archive () override
 
void init_for_read () override
 
const std::vector< std::string > & get_objkeys ()
 
const std::string land (const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
 
void vacuum (const std::string &objkey)
 
size_t get_total_file_size () const
 
- Public Member Functions inherited from Archive
 Archive (const std::string url, const bool plain_text)
 
virtual ~Archive ()
 
virtual std::string archive_error (int err)
 
virtual bool read_next_header ()
 
virtual bool read_data_block (const void **buff, size_t *size, int64_t *offset)
 
virtual int64_t get_position_compressed () const
 
virtual int open ()
 
virtual int close ()
 
virtual ssize_t read (const void **buff)
 
const std::string url_part (const int i)
 

Private Attributes

std::string s3_access_key
 
std::string s3_secret_key
 
std::string s3_region
 
std::string s3_endpoint
 
std::string s3_temp_dir
 
std::string bucket_name
 
std::string prefix_name
 
std::vector< std::string > objkeys
 
std::map< const std::string,
const std::string > 
file_paths
 
size_t total_file_size {0}
 

Additional Inherited Members

- Static Public Member Functions inherited from Archive
static ssize_t read (struct archive *a, void *client_data, const void **buff)
 
static int open (struct archive *a, void *client_data)
 
static int close (struct archive *a, void *client_data)
 
static void parse_url (const std::string url, std::map< int, std::string > &url_parts)
 
- Protected Attributes inherited from Archive
std::string url
 
std::map< int, std::string > url_parts
 
archive * ar = 0
 
bool plain_text
 

Detailed Description

Definition at line 38 of file S3Archive.h.

Constructor & Destructor Documentation

S3Archive::S3Archive ( const std::string &  url,
const bool  plain_text 
)
inline

Definition at line 40 of file S3Archive.h.

References s3_access_key, s3_endpoint, s3_region, and s3_secret_key.

40  : Archive(url, plain_text) {
41 // init aws api should be singleton because because
42 // it's bad to call Aws::InitAPI and Aws::ShutdownAPI
43 // multiple times.
44 #ifdef HAVE_AWS_S3
45  {
46  std::unique_lock<std::mutex> lck(awsapi_mtx);
47  if (0 == awsapi_count++) {
48  Aws::InitAPI(awsapi_options);
49  }
50  }
51 #endif // HAVE_AWS_S3
52 
53  // these envs are on server side so are global settings
54  // which make few senses in case of private s3 resources
55  char* env;
56  if (0 != (env = getenv("AWS_REGION"))) {
57  s3_region = env;
58  }
59  if (0 != (env = getenv("AWS_ACCESS_KEY_ID"))) {
60  s3_access_key = env;
61  }
62  if (0 != (env = getenv("AWS_SECRET_ACCESS_KEY"))) {
63  s3_secret_key = env;
64  }
65  if (0 != (env = getenv("AWS_ENDPOINT"))) {
66  s3_endpoint = env;
67  }
68  }
std::string s3_endpoint
Definition: S3Archive.h:133
std::string s3_region
Definition: S3Archive.h:132
std::string s3_access_key
Definition: S3Archive.h:130
std::string url
Definition: Archive.h:186
std::string s3_secret_key
Definition: S3Archive.h:131
bool plain_text
Definition: Archive.h:189
Archive(const std::string url, const bool plain_text)
Definition: Archive.h:31
S3Archive::S3Archive ( const std::string &  url,
const std::string &  s3_access_key,
const std::string &  s3_secret_key,
const std::string &  s3_region,
const std::string &  s3_endpoint,
const bool  plain_text 
)
inline

Definition at line 70 of file S3Archive.h.

References s3_access_key, s3_endpoint, s3_region, s3_secret_key, and s3_temp_dir.

79  this->s3_region = s3_region;
80  this->s3_endpoint = s3_endpoint;
81 
82  // this must be local to omnisci_server not client
83  // or posix dir path accessible to omnisci_server
84  auto env_s3_temp_dir = getenv("TMPDIR");
85  s3_temp_dir = env_s3_temp_dir ? env_s3_temp_dir : "/tmp";
86  }
std::string s3_endpoint
Definition: S3Archive.h:133
std::string s3_region
Definition: S3Archive.h:132
std::string s3_access_key
Definition: S3Archive.h:130
std::string s3_temp_dir
Definition: S3Archive.h:134
std::string url
Definition: Archive.h:186
std::string s3_secret_key
Definition: S3Archive.h:131
S3Archive(const std::string &url, const bool plain_text)
Definition: S3Archive.h:40
bool plain_text
Definition: Archive.h:189
S3Archive::~S3Archive ( )
inlineoverride

Definition at line 88 of file S3Archive.h.

88  {
89 #ifdef HAVE_AWS_S3
90  for (auto& thread : threads) {
91  if (thread.joinable()) {
92  thread.join();
93  }
94  }
95  std::unique_lock<std::mutex> lck(awsapi_mtx);
96  if (0 == --awsapi_count) {
97  Aws::ShutdownAPI(awsapi_options);
98  }
99 #endif // HAVE_AWS_S3
100  }

Member Function Documentation

const std::vector<std::string>& S3Archive::get_objkeys ( )
inline

Definition at line 103 of file S3Archive.h.

References objkeys.

103 { return objkeys; }
std::vector< std::string > objkeys
Definition: S3Archive.h:138
size_t S3Archive::get_total_file_size ( ) const
inline

Definition at line 119 of file S3Archive.h.

References total_file_size.

119 { return total_file_size; }
size_t total_file_size
Definition: S3Archive.h:140
void S3Archive::init_for_read ( )
overridevirtual

Reimplemented from Archive.

Definition at line 33 of file S3Archive.cpp.

References bucket_name, logger::INFO, LOG, objkeys, prefix_name, s3_access_key, s3_endpoint, s3_region, s3_secret_key, s3_temp_dir, total_file_size, Archive::url, and Archive::url_part().

33  {
34  boost::filesystem::create_directories(s3_temp_dir);
35  if (!boost::filesystem::is_directory(s3_temp_dir)) {
36  throw std::runtime_error("failed to create s3_temp_dir directory '" + s3_temp_dir +
37  "'");
38  }
39 
40  try {
41  bucket_name = url_part(4);
42  prefix_name = url_part(5);
43 
44  // a prefix '/obj/' should become 'obj/'
45  // a prefix '/obj' should become 'obj'
46  if (prefix_name.size() && '/' == prefix_name.front()) {
47  prefix_name = prefix_name.substr(1);
48  }
49 
50  Aws::S3::Model::ListObjectsV2Request objects_request;
51  objects_request.WithBucket(bucket_name);
52  objects_request.WithPrefix(prefix_name);
53  objects_request.SetMaxKeys(1 << 20);
54 
55  // for a daemon like omnisci_server it seems improper to set s3 credentials
56  // via AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env's because that way
57  // credentials are configured *globally* while different users with private
58  // s3 resources may need separate credentials to access.in that case, use
59  // WITH s3_access_key/s3_secret_key parameters.
60  Aws::Client::ClientConfiguration s3_config;
61  s3_config.region = s3_region.size() ? s3_region : Aws::Region::US_EAST_1;
62  s3_config.endpointOverride = s3_endpoint;
63 
64  /*
65  Fix a wrong ca path established at building libcurl on Centos being carried to
66  Ubuntu. To fix the issue, this is this sequence of locating ca file: 1) if
67  `SSL_CERT_DIR` or `SSL_CERT_FILE` is set, set it to S3 ClientConfiguration. 2) if
68  none ^ is set, omnisci_server searches a list of known ca file paths. 3) if 2)
69  finds nothing, it is users' call to set correct SSL_CERT_DIR or SSL_CERT_FILE. S3
70  c++ sdk: "we only want to override the default path if someone has explicitly told
71  us to."
72  */
73  std::list<std::string> v_known_ca_paths({
74  "/etc/ssl/certs/ca-certificates.crt",
75  "/etc/pki/tls/certs/ca-bundle.crt",
76  "/usr/share/ssl/certs/ca-bundle.crt",
77  "/usr/local/share/certs/ca-root.crt",
78  "/etc/ssl/cert.pem",
79  "/etc/ssl/ca-bundle.pem",
80  });
81  char* env;
82  if (nullptr != (env = getenv("SSL_CERT_DIR"))) {
83  s3_config.caPath = env;
84  }
85  if (nullptr != (env = getenv("SSL_CERT_FILE"))) {
86  v_known_ca_paths.push_front(env);
87  }
88  for (const auto& known_ca_path : v_known_ca_paths) {
89  if (boost::filesystem::exists(known_ca_path)) {
90  s3_config.caFile = known_ca_path;
91  break;
92  }
93  }
94 
95  if (!s3_access_key.empty() && !s3_secret_key.empty()) {
96  s3_client.reset(new Aws::S3::S3Client(
97  Aws::Auth::AWSCredentials(s3_access_key, s3_secret_key), s3_config));
98  } else {
99  s3_client.reset(new Aws::S3::S3Client(
100  std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
101  }
102  while (true) {
103  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
104  if (list_objects_outcome.IsSuccess()) {
105  // pass only object keys to next stage, which may be Importer::import_parquet,
106  // Importer::import_compressed or else, depending on copy_params (eg. .is_parquet)
107  auto object_list = list_objects_outcome.GetResult().GetContents();
108  if (0 == object_list.size()) {
109  if (objkeys.empty()) {
110  throw std::runtime_error("no object was found with s3 url '" + url + "'");
111  }
112  }
113 
114  LOG(INFO) << "Found " << (objkeys.empty() ? "" : "another ") << object_list.size()
115  << " objects with url '" + url + "':";
116  for (auto const& obj : object_list) {
117  std::string objkey = obj.GetKey().c_str();
118  LOG(INFO) << "\t" << objkey << " (size = " << obj.GetSize() << " bytes)";
119  total_file_size += obj.GetSize();
120  // skip _SUCCESS and keys with trailing / or basename with heading '.'
121  boost::filesystem::path path{objkey};
122  if (0 == obj.GetSize()) {
123  continue;
124  }
125  if ('/' == objkey.back()) {
126  continue;
127  }
128  if ('.' == path.filename().string().front()) {
129  continue;
130  }
131  objkeys.push_back(objkey);
132  }
133  } else {
134  // could not ListObject
135  // could be the object is there but we do not have listObject Privilege
136  // We can treat it as a specific object, so should try to parse it and pass to
137  // getObject as a singleton
138  // Null prefix in urls such like 's3://bucket/' should be ignored.
139  if (objkeys.empty()) {
140  if (!prefix_name.empty()) {
141  objkeys.push_back(prefix_name);
142  } else {
143  throw std::runtime_error("failed to list objects of s3 url '" + url + "': " +
144  list_objects_outcome.GetError().GetExceptionName() +
145  ": " + list_objects_outcome.GetError().GetMessage());
146  }
147  }
148  }
149  // continue to read next 1000 files
150  if (list_objects_outcome.GetResult().GetIsTruncated()) {
151  objects_request.SetContinuationToken(
152  list_objects_outcome.GetResult().GetNextContinuationToken());
153  } else {
154  break;
155  }
156  }
157  } catch (...) {
158  throw;
159  }
160 }
std::string s3_endpoint
Definition: S3Archive.h:133
std::string s3_region
Definition: S3Archive.h:132
#define LOG(tag)
Definition: Logger.h:185
size_t total_file_size
Definition: S3Archive.h:140
std::string prefix_name
Definition: S3Archive.h:137
std::string bucket_name
Definition: S3Archive.h:136
std::string s3_access_key
Definition: S3Archive.h:130
std::string s3_temp_dir
Definition: S3Archive.h:134
const std::string url_part(const int i)
Definition: Archive.h:183
std::string url
Definition: Archive.h:186
std::string s3_secret_key
Definition: S3Archive.h:131
std::vector< std::string > objkeys
Definition: S3Archive.h:138

+ Here is the call graph for this function:

const std::string S3Archive::land ( const std::string &  objkey,
std::exception_ptr &  teptr,
const bool  for_detection 
)
inline

Definition at line 110 of file S3Archive.h.

112  {
113  throw std::runtime_error("AWS S3 support not available");
114  }
void S3Archive::vacuum ( const std::string &  objkey)
inline

Definition at line 115 of file S3Archive.h.

115  {
116  throw std::runtime_error("AWS S3 support not available");
117  }

Member Data Documentation

std::string S3Archive::bucket_name
private

Definition at line 136 of file S3Archive.h.

Referenced by init_for_read().

std::map<const std::string, const std::string> S3Archive::file_paths
private

Definition at line 139 of file S3Archive.h.

std::vector<std::string> S3Archive::objkeys
private

Definition at line 138 of file S3Archive.h.

Referenced by get_objkeys(), and init_for_read().

std::string S3Archive::prefix_name
private

Definition at line 137 of file S3Archive.h.

Referenced by init_for_read().

std::string S3Archive::s3_access_key
private

Definition at line 130 of file S3Archive.h.

Referenced by init_for_read(), and S3Archive().

std::string S3Archive::s3_endpoint
private

Definition at line 133 of file S3Archive.h.

Referenced by init_for_read(), and S3Archive().

std::string S3Archive::s3_region
private

Definition at line 132 of file S3Archive.h.

Referenced by init_for_read(), and S3Archive().

std::string S3Archive::s3_secret_key
private

Definition at line 131 of file S3Archive.h.

Referenced by init_for_read(), and S3Archive().

std::string S3Archive::s3_temp_dir
private

Definition at line 134 of file S3Archive.h.

Referenced by init_for_read(), and S3Archive().

size_t S3Archive::total_file_size {0}
private

Definition at line 140 of file S3Archive.h.

Referenced by get_total_file_size(), and init_for_read().


The documentation for this class was generated from the following files: