OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
S3Archive Class Reference

#include <S3Archive.h>

+ Inheritance diagram for S3Archive:
+ Collaboration diagram for S3Archive:

Public Member Functions

 S3Archive (const std::string &url, const bool plain_text)
 
 S3Archive (const std::string &url, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_region, const std::string &s3_endpoint, const bool plain_text)
 
 ~S3Archive () override
 
void init_for_read () override
 
const std::vector< std::string > & get_objkeys ()
 
const std::string land (const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
 
void vacuum (const std::string &objkey)
 
size_t get_total_file_size () const
 
- Public Member Functions inherited from Archive
 Archive (const std::string url, const bool plain_text)
 
virtual ~Archive ()
 
virtual std::string archive_error (int err)
 
virtual bool read_next_header ()
 
virtual bool read_data_block (const void **buff, size_t *size, int64_t *offset)
 
virtual int64_t get_position_compressed () const
 
virtual int open ()
 
virtual int close ()
 
virtual ssize_t read (const void **buff)
 
const std::string url_part (const int i)
 
std::string entryName ()
 

Private Attributes

std::string s3_access_key
 
std::string s3_secret_key
 
std::string s3_region
 
std::string s3_endpoint
 
std::string s3_temp_dir
 
std::string bucket_name
 
std::string prefix_name
 
std::vector< std::string > objkeys
 
std::map< const std::string,
const std::string > 
file_paths
 
size_t total_file_size {0}
 

Additional Inherited Members

- Static Public Member Functions inherited from Archive
static ssize_t read (struct archive *a, void *client_data, const void **buff)
 
static int open (struct archive *a, void *client_data)
 
static int close (struct archive *a, void *client_data)
 
static void parse_url (const std::string url, std::map< int, std::string > &url_parts)
 
- Protected Attributes inherited from Archive
std::string url
 
std::map< int, std::string > url_parts
 
archive * ar = 0
 
archive_entry * entry
 
bool plain_text
 

Detailed Description

Definition at line 38 of file S3Archive.h.

Constructor & Destructor Documentation

S3Archive::S3Archive ( const std::string &  url,
const bool  plain_text 
)
inline

Definition at line 40 of file S3Archive.h.

References s3_access_key, s3_endpoint, s3_region, and s3_secret_key.

40  : Archive(url, plain_text) {
41 // init aws api should be singleton because because
42 // it's bad to call Aws::InitAPI and Aws::ShutdownAPI
43 // multiple times.
44 #ifdef HAVE_AWS_S3
45  {
46  std::unique_lock<std::mutex> lck(awsapi_mtx);
47  if (0 == awsapi_count++) {
48  Aws::InitAPI(awsapi_options);
49  }
50  }
51 #endif // HAVE_AWS_S3
52 
53  // these envs are on server side so are global settings
54  // which make few senses in case of private s3 resources
55  char* env;
56  if (0 != (env = getenv("AWS_REGION"))) {
57  s3_region = env;
58  }
59  if (0 != (env = getenv("AWS_ACCESS_KEY_ID"))) {
60  s3_access_key = env;
61  }
62  if (0 != (env = getenv("AWS_SECRET_ACCESS_KEY"))) {
63  s3_secret_key = env;
64  }
65  if (0 != (env = getenv("AWS_ENDPOINT"))) {
66  s3_endpoint = env;
67  }
68  }
std::string s3_endpoint
Definition: S3Archive.h:133
std::string s3_region
Definition: S3Archive.h:132
std::string s3_access_key
Definition: S3Archive.h:130
std::string url
Definition: Archive.h:187
std::string s3_secret_key
Definition: S3Archive.h:131
bool plain_text
Definition: Archive.h:191
Archive(const std::string url, const bool plain_text)
Definition: Archive.h:31
S3Archive::S3Archive ( const std::string &  url,
const std::string &  s3_access_key,
const std::string &  s3_secret_key,
const std::string &  s3_region,
const std::string &  s3_endpoint,
const bool  plain_text 
)
inline

Definition at line 70 of file S3Archive.h.

References s3_access_key, s3_endpoint, s3_region, s3_secret_key, and s3_temp_dir.

79  this->s3_region = s3_region;
80  this->s3_endpoint = s3_endpoint;
81 
82  // this must be local to omnisci_server not client
83  // or posix dir path accessible to omnisci_server
84  auto env_s3_temp_dir = getenv("TMPDIR");
85  s3_temp_dir = env_s3_temp_dir ? env_s3_temp_dir : "/tmp";
86  }
std::string s3_endpoint
Definition: S3Archive.h:133
std::string s3_region
Definition: S3Archive.h:132
std::string s3_access_key
Definition: S3Archive.h:130
std::string s3_temp_dir
Definition: S3Archive.h:134
std::string url
Definition: Archive.h:187
std::string s3_secret_key
Definition: S3Archive.h:131
S3Archive(const std::string &url, const bool plain_text)
Definition: S3Archive.h:40
bool plain_text
Definition: Archive.h:191
S3Archive::~S3Archive ( )
inlineoverride

Definition at line 88 of file S3Archive.h.

88  {
89 #ifdef HAVE_AWS_S3
90  for (auto& thread : threads) {
91  if (thread.joinable()) {
92  thread.join();
93  }
94  }
95  std::unique_lock<std::mutex> lck(awsapi_mtx);
96  if (0 == --awsapi_count) {
97  Aws::ShutdownAPI(awsapi_options);
98  }
99 #endif // HAVE_AWS_S3
100  }

Member Function Documentation

const std::vector<std::string>& S3Archive::get_objkeys ( )
inline

Definition at line 103 of file S3Archive.h.

References objkeys.

103 { return objkeys; }
std::vector< std::string > objkeys
Definition: S3Archive.h:138
size_t S3Archive::get_total_file_size ( ) const
inline

Definition at line 119 of file S3Archive.h.

References total_file_size.

119 { return total_file_size; }
size_t total_file_size
Definition: S3Archive.h:140
void S3Archive::init_for_read ( )
overridevirtual

Reimplemented from Archive.

Definition at line 35 of file S3Archive.cpp.

References bucket_name, logger::INFO, LOG, objkeys, prefix_name, s3_access_key, s3_endpoint, s3_region, s3_secret_key, s3_temp_dir, total_file_size, Archive::url, and Archive::url_part().

35  {
36  boost::filesystem::create_directories(s3_temp_dir);
37  if (!boost::filesystem::is_directory(s3_temp_dir)) {
38  throw std::runtime_error("failed to create s3_temp_dir directory '" + s3_temp_dir +
39  "'");
40  }
41 
42  try {
43  bucket_name = url_part(4);
44  prefix_name = url_part(5);
45 
46  // a prefix '/obj/' should become 'obj/'
47  // a prefix '/obj' should become 'obj'
48  if (prefix_name.size() && '/' == prefix_name.front()) {
49  prefix_name = prefix_name.substr(1);
50  }
51 
52  Aws::S3::Model::ListObjectsV2Request objects_request;
53  objects_request.WithBucket(bucket_name);
54  objects_request.WithPrefix(prefix_name);
55  objects_request.SetMaxKeys(1 << 20);
56 
57  // for a daemon like omnisci_server it seems improper to set s3 credentials
58  // via AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env's because that way
59  // credentials are configured *globally* while different users with private
60  // s3 resources may need separate credentials to access.in that case, use
61  // WITH s3_access_key/s3_secret_key parameters.
62  Aws::Client::ClientConfiguration s3_config;
63  s3_config.region = s3_region.size() ? s3_region : Aws::Region::US_EAST_1;
64  s3_config.endpointOverride = s3_endpoint;
65 
66  /*
67  Fix a wrong ca path established at building libcurl on Centos being carried to
68  Ubuntu. To fix the issue, this is this sequence of locating ca file: 1) if
69  `SSL_CERT_DIR` or `SSL_CERT_FILE` is set, set it to S3 ClientConfiguration. 2) if
70  none ^ is set, omnisci_server searches a list of known ca file paths. 3) if 2)
71  finds nothing, it is users' call to set correct SSL_CERT_DIR or SSL_CERT_FILE. S3
72  c++ sdk: "we only want to override the default path if someone has explicitly told
73  us to."
74  */
75  std::list<std::string> v_known_ca_paths({
76  "/etc/ssl/certs/ca-certificates.crt",
77  "/etc/pki/tls/certs/ca-bundle.crt",
78  "/usr/share/ssl/certs/ca-bundle.crt",
79  "/usr/local/share/certs/ca-root.crt",
80  "/etc/ssl/cert.pem",
81  "/etc/ssl/ca-bundle.pem",
82  });
83  char* env;
84  if (nullptr != (env = getenv("SSL_CERT_DIR"))) {
85  s3_config.caPath = env;
86  }
87  if (nullptr != (env = getenv("SSL_CERT_FILE"))) {
88  v_known_ca_paths.push_front(env);
89  }
90  for (const auto& known_ca_path : v_known_ca_paths) {
91  if (boost::filesystem::exists(known_ca_path)) {
92  s3_config.caFile = known_ca_path;
93  break;
94  }
95  }
96 
97  if (!s3_access_key.empty() && !s3_secret_key.empty()) {
98  s3_client.reset(new Aws::S3::S3Client(
99  Aws::Auth::AWSCredentials(s3_access_key, s3_secret_key), s3_config));
100  } else {
101  s3_client.reset(new Aws::S3::S3Client(
102  std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
103  }
104  while (true) {
105  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
106  if (list_objects_outcome.IsSuccess()) {
107  // pass only object keys to next stage, which may be Importer::import_parquet,
108  // Importer::import_compressed or else, depending on copy_params (eg. .is_parquet)
109  auto object_list = list_objects_outcome.GetResult().GetContents();
110  if (0 == object_list.size()) {
111  if (objkeys.empty()) {
112  throw std::runtime_error("no object was found with s3 url '" + url + "'");
113  }
114  }
115 
116  LOG(INFO) << "Found " << (objkeys.empty() ? "" : "another ") << object_list.size()
117  << " objects with url '" + url + "':";
118  for (auto const& obj : object_list) {
119  std::string objkey = obj.GetKey().c_str();
120  LOG(INFO) << "\t" << objkey << " (size = " << obj.GetSize() << " bytes)";
121  total_file_size += obj.GetSize();
122  // skip _SUCCESS and keys with trailing / or basename with heading '.'
123  boost::filesystem::path path{objkey};
124  if (0 == obj.GetSize()) {
125  continue;
126  }
127  if ('/' == objkey.back()) {
128  continue;
129  }
130  if ('.' == path.filename().string().front()) {
131  continue;
132  }
133  objkeys.push_back(objkey);
134  }
135  } else {
136  // could not ListObject
137  // could be the object is there but we do not have listObject Privilege
138  // We can treat it as a specific object, so should try to parse it and pass to
139  // getObject as a singleton
140  // Null prefix in urls such like 's3://bucket/' should be ignored.
141  if (objkeys.empty()) {
142  if (!prefix_name.empty()) {
143  objkeys.push_back(prefix_name);
144  } else {
145  throw std::runtime_error("failed to list objects of s3 url '" + url + "': " +
146  list_objects_outcome.GetError().GetExceptionName() +
147  ": " + list_objects_outcome.GetError().GetMessage());
148  }
149  }
150  }
151  // continue to read next 1000 files
152  if (list_objects_outcome.GetResult().GetIsTruncated()) {
153  objects_request.SetContinuationToken(
154  list_objects_outcome.GetResult().GetNextContinuationToken());
155  } else {
156  break;
157  }
158  }
159  } catch (...) {
160  throw;
161  }
162 }
std::string s3_endpoint
Definition: S3Archive.h:133
std::string s3_region
Definition: S3Archive.h:132
#define LOG(tag)
Definition: Logger.h:188
size_t total_file_size
Definition: S3Archive.h:140
std::string prefix_name
Definition: S3Archive.h:137
std::string bucket_name
Definition: S3Archive.h:136
std::string s3_access_key
Definition: S3Archive.h:130
std::string s3_temp_dir
Definition: S3Archive.h:134
const std::string url_part(const int i)
Definition: Archive.h:182
std::string url
Definition: Archive.h:187
std::string s3_secret_key
Definition: S3Archive.h:131
std::vector< std::string > objkeys
Definition: S3Archive.h:138

+ Here is the call graph for this function:

const std::string S3Archive::land ( const std::string &  objkey,
std::exception_ptr &  teptr,
const bool  for_detection 
)
inline

Definition at line 110 of file S3Archive.h.

112  {
113  throw std::runtime_error("AWS S3 support not available");
114  }
void S3Archive::vacuum ( const std::string &  objkey)
inline

Definition at line 115 of file S3Archive.h.

115  {
116  throw std::runtime_error("AWS S3 support not available");
117  }

Member Data Documentation

std::string S3Archive::bucket_name
private

Definition at line 136 of file S3Archive.h.

Referenced by init_for_read().

std::map<const std::string, const std::string> S3Archive::file_paths
private

Definition at line 139 of file S3Archive.h.

std::vector<std::string> S3Archive::objkeys
private

Definition at line 138 of file S3Archive.h.

Referenced by get_objkeys(), and init_for_read().

std::string S3Archive::prefix_name
private

Definition at line 137 of file S3Archive.h.

Referenced by init_for_read().

std::string S3Archive::s3_access_key
private
std::string S3Archive::s3_endpoint
private
std::string S3Archive::s3_region
private
std::string S3Archive::s3_secret_key
private
std::string S3Archive::s3_temp_dir
private

Definition at line 134 of file S3Archive.h.

Referenced by init_for_read(), and S3Archive().

size_t S3Archive::total_file_size {0}
private

Definition at line 140 of file S3Archive.h.

Referenced by get_total_file_size(), and init_for_read().


The documentation for this class was generated from the following files: