OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
S3Archive.h
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef ARCHIVE_S3ARCHIVE_H_
18 #define ARCHIVE_S3ARCHIVE_H_
19 
20 #include <cstdio>
21 #include <exception>
22 #include <map>
23 #include <optional>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 #include "Archive.h"
28 
29 #include <openssl/evp.h>
30 
31 #ifdef HAVE_AWS_S3
32 #include <aws/core/Aws.h>
33 #include <aws/s3/S3Client.h>
34 #endif // HAVE_AWS_S3
35 
36 // this is the based archive class for files hosted on AWS S3.
37 // known variants:
38 // . parquet files
39 // . compressed files
40 // no mixed of above is supported yet
41 class S3Archive : public Archive {
42  public:
43  S3Archive(const std::string& url, const bool plain_text) : Archive(url, plain_text) {
44  // these envs are on server side so are global settings
45  // which make few senses in case of private s3 resources
46  char* env;
47  if (0 != (env = getenv("AWS_REGION"))) {
48  s3_region = env;
49  }
50  if (0 != (env = getenv("AWS_ACCESS_KEY_ID"))) {
51  s3_access_key = env;
52  }
53  if (0 != (env = getenv("AWS_SECRET_ACCESS_KEY"))) {
54  s3_secret_key = env;
55  }
56  if (0 != (env = getenv("AWS_SESSION_TOKEN"))) {
57  s3_session_token = env;
58  }
59 
60  if (0 != (env = getenv("AWS_ENDPOINT"))) {
61  s3_endpoint = env;
62  }
63  }
64 
65  S3Archive(const std::string& url,
66  const std::string& s3_access_key,
67  const std::string& s3_secret_key,
68  const std::string& s3_session_token,
69  const std::string& s3_region,
70  const std::string& s3_endpoint,
71  const bool plain_text,
72  const std::optional<std::string>& regex_path_filter,
73  const std::optional<std::string>& file_sort_order_by,
74  const std::optional<std::string>& file_sort_regex,
75  const std::string& s3_temp_dir_path = {})
76  : S3Archive(url, plain_text) {
77  this->s3_access_key = s3_access_key;
78  this->s3_secret_key = s3_secret_key;
79  this->s3_session_token = s3_session_token;
80  this->s3_region = s3_region;
81  this->s3_endpoint = s3_endpoint;
82  this->regex_path_filter = regex_path_filter;
83  this->file_sort_order_by = file_sort_order_by;
84  this->file_sort_regex = file_sort_regex;
85 
86  if (s3_temp_dir_path.empty()) {
87  // this must be local to heavydb not client
88  // or posix dir path accessible to heavydb
89  auto env_s3_temp_dir = getenv("TMPDIR");
90  s3_temp_dir = env_s3_temp_dir ? env_s3_temp_dir : "/tmp";
91  } else {
92  s3_temp_dir = s3_temp_dir_path;
93  }
94  }
95 
96  ~S3Archive() override {
97 #ifdef HAVE_AWS_S3
98  for (auto& thread : threads) {
99  if (thread.joinable()) {
100  thread.join();
101  }
102  }
103 #endif // HAVE_AWS_S3
104  }
105 
106 #ifdef HAVE_AWS_S3
107  void init_for_read() override;
108 #else
109  void init_for_read() override {
110  throw std::runtime_error("AWS S3 support not available");
111  }
112 #endif
113  const std::vector<std::string>& get_objkeys() {
114  return objkeys;
115  }
116 #ifdef HAVE_AWS_S3
117  const std::string land(const std::string& objkey,
118  std::exception_ptr& teptr,
119  const bool for_detection,
120  const bool allow_named_pipe_use = true,
121  const bool track_file_paths = true);
122  void vacuum(const std::string& objkey);
123 #else
124  const std::string land(const std::string& objkey,
125  std::exception_ptr& teptr,
126  const bool for_detection) {
127  throw std::runtime_error("AWS S3 support not available");
128  }
129  void vacuum(const std::string& objkey) {
130  throw std::runtime_error("AWS S3 support not available");
131  }
132 #endif // HAVE_AWS_S3
133  size_t get_total_file_size() const {
134  return total_file_size;
135  }
136 
137  private:
138 #ifdef HAVE_AWS_S3
139  static int awsapi_count;
140  static std::mutex awsapi_mtx;
141  static Aws::SDKOptions awsapi_options;
142 
143  std::unique_ptr<Aws::S3::S3Client> s3_client;
144  std::vector<std::thread> threads;
145 #endif // HAVE_AWS_S3
146  std::string s3_access_key; // per-query credentials to override the
147  std::string s3_secret_key; // settings in ~/.aws/credentials or environment
148  std::string s3_session_token;
149  std::string s3_region;
150  std::string s3_endpoint;
151  std::string s3_temp_dir;
152 
153  std::string bucket_name;
154  std::string prefix_name;
155  std::optional<std::string> regex_path_filter;
156  std::optional<std::string> file_sort_order_by;
157  std::optional<std::string> file_sort_regex;
158  std::vector<std::string> objkeys;
159  std::map<const std::string, const std::string> file_paths;
160  size_t total_file_size{0};
161 };
162 
163 class S3ParquetArchive : public S3Archive {
164  public:
165  S3ParquetArchive(const std::string& url,
166  const std::string& s3_access_key,
167  const std::string& s3_secret_key,
168  const std::string& s3_session_token,
169  const std::string& s3_region,
170  const std::string& s3_endpoint,
171  const bool plain_text,
172  const std::optional<std::string>& regex_path_filter,
173  const std::optional<std::string>& file_sort_order_by,
174  const std::optional<std::string>& file_sort_regex)
175  : S3Archive(url,
176  s3_access_key,
177  s3_secret_key,
178  s3_session_token,
179  s3_region,
180  s3_endpoint,
181  plain_text,
182  regex_path_filter,
183  file_sort_order_by,
184  file_sort_regex) {}
185 };
186 
187 #endif /* ARCHIVE_S3ARCHIVE_H_ */
std::string s3_endpoint
Definition: S3Archive.h:150
std::string s3_region
Definition: S3Archive.h:149
size_t total_file_size
Definition: S3Archive.h:160
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
Definition: S3Archive.h:124
std::string prefix_name
Definition: S3Archive.h:154
std::string bucket_name
Definition: S3Archive.h:153
S3ParquetArchive(const std::string &url, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token, const std::string &s3_region, const std::string &s3_endpoint, const bool plain_text, const std::optional< std::string > &regex_path_filter, const std::optional< std::string > &file_sort_order_by, const std::optional< std::string > &file_sort_regex)
Definition: S3Archive.h:165
std::string s3_access_key
Definition: S3Archive.h:146
std::map< const std::string, const std::string > file_paths
Definition: S3Archive.h:159
~S3Archive() override
Definition: S3Archive.h:96
S3Archive(const std::string &url, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_session_token, const std::string &s3_region, const std::string &s3_endpoint, const bool plain_text, const std::optional< std::string > &regex_path_filter, const std::optional< std::string > &file_sort_order_by, const std::optional< std::string > &file_sort_regex, const std::string &s3_temp_dir_path={})
Definition: S3Archive.h:65
const std::vector< std::string > & get_objkeys()
Definition: S3Archive.h:113
std::optional< std::string > file_sort_regex
Definition: S3Archive.h:157
std::optional< std::string > regex_path_filter
Definition: S3Archive.h:155
void init_for_read() override
Definition: S3Archive.h:109
std::optional< std::string > file_sort_order_by
Definition: S3Archive.h:156
std::string s3_session_token
Definition: S3Archive.h:148
std::string s3_temp_dir
Definition: S3Archive.h:151
std::string url
Definition: Archive.h:202
std::string s3_secret_key
Definition: S3Archive.h:147
S3Archive(const std::string &url, const bool plain_text)
Definition: S3Archive.h:43
bool plain_text
Definition: Archive.h:206
std::vector< std::string > objkeys
Definition: S3Archive.h:158
void vacuum(const std::string &objkey)
Definition: S3Archive.h:129
size_t get_total_file_size() const
Definition: S3Archive.h:133