19 #include <aws/core/Aws.h>
20 #include <aws/core/auth/AWSCredentialsProvider.h>
21 #include <aws/s3/model/GetObjectRequest.h>
22 #include <aws/s3/model/ListObjectsV2Request.h>
23 #include <aws/s3/model/Object.h>
25 #include <boost/filesystem.hpp>
33 if (!boost::filesystem::is_directory(
s3_temp_dir)) {
34 throw std::runtime_error(
"failed to create s3_temp_dir directory '" +
s3_temp_dir +
48 Aws::S3::Model::ListObjectsV2Request objects_request;
51 objects_request.SetMaxKeys(1 << 20);
58 Aws::Client::ClientConfiguration s3_config;
71 std::list<std::string> v_known_ca_paths({
72 "/etc/ssl/certs/ca-certificates.crt",
73 "/etc/pki/tls/certs/ca-bundle.crt",
74 "/usr/share/ssl/certs/ca-bundle.crt",
75 "/usr/local/share/certs/ca-root.crt",
77 "/etc/ssl/ca-bundle.pem",
80 if (
nullptr != (env = getenv(
"SSL_CERT_DIR"))) {
81 s3_config.caPath = env;
83 if (
nullptr != (env = getenv(
"SSL_CERT_FILE"))) {
84 v_known_ca_paths.push_front(env);
86 for (
const auto& known_ca_path : v_known_ca_paths) {
87 if (boost::filesystem::exists(known_ca_path)) {
88 s3_config.caFile = known_ca_path;
94 s3_client.reset(
new Aws::S3::S3Client(
98 s3_client.reset(
new Aws::S3::S3Client(
99 std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
102 auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
103 if (list_objects_outcome.IsSuccess()) {
106 auto object_list = list_objects_outcome.GetResult().GetContents();
107 if (0 == object_list.size()) {
109 throw std::runtime_error(
"no object was found with s3 url '" +
url +
"'");
113 LOG(
INFO) <<
"Found " << (
objkeys.empty() ?
"" :
"another ") << object_list.size()
114 <<
" objects with url '" +
url +
"':";
115 for (
auto const& obj : object_list) {
116 std::string objkey = obj.GetKey().c_str();
117 LOG(
INFO) <<
"\t" << objkey <<
" (size = " << obj.GetSize() <<
" bytes)";
120 boost::filesystem::path path{objkey};
121 if (0 == obj.GetSize()) {
124 if (
'/' == objkey.back()) {
127 if (
'.' == path.filename().string().front()) {
142 throw std::runtime_error(
"failed to list objects of s3 url '" +
url +
"': " +
143 list_objects_outcome.GetError().GetExceptionName() +
144 ": " + list_objects_outcome.GetError().GetMessage());
149 if (list_objects_outcome.GetResult().GetIsTruncated()) {
150 objects_request.SetContinuationToken(
151 list_objects_outcome.GetResult().GetNextContinuationToken());
170 std::exception_ptr& teptr,
171 const bool for_detection) {
173 static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
176 boost::filesystem::remove(file_path);
178 auto ext = strrchr(objkey.c_str(),
'.');
179 auto use_pipe = (
nullptr == ext || 0 != strcmp(ext,
".7z"));
180 #ifdef ENABLE_IMPORT_PARQUET
181 use_pipe = use_pipe && (
nullptr == ext || 0 != strcmp(ext,
".parquet"));
184 if (mkfifo(file_path.c_str(), 0660) < 0) {
185 throw std::runtime_error(
"failed to create named pipe '" + file_path +
186 "': " + strerror(errno));
213 Aws::S3::Model::GetObjectRequest object_request;
214 object_request.WithBucket(
bucket_name).WithKey(objkey);
217 if (use_pipe && for_detection) {
218 object_request.SetRange(
"bytes=0-10000000");
221 auto get_object_outcome = s3_client->GetObject(object_request);
222 if (!get_object_outcome.IsSuccess()) {
223 throw std::runtime_error(
"failed to get object '" + objkey +
"' of s3 url '" +
url +
224 "': " + get_object_outcome.GetError().GetExceptionName() +
225 ": " + get_object_outcome.GetError().GetMessage());
229 std::atomic<bool> is_get_object_outcome_moved(
false);
233 std::thread([=, &teptr, &get_object_outcome, &is_get_object_outcome_moved]() {
237 static std::mutex mutex_glog;
238 #define S3_LOG_WITH_LOCK(x) \
240 std::lock_guard<std::mutex> lock(mutex_glog); \
244 << objkey <<
" to " << (use_pipe ?
"pipe " :
"file ")
245 << file_path <<
"...")
246 auto get_object_outcome_moved =
247 decltype(get_object_outcome)(std::move(get_object_outcome));
248 is_get_object_outcome_moved =
true;
249 Aws::OFStream local_file;
250 local_file.
open(file_path.c_str(),
251 std::ios::out | std::ios::binary | std::ios::trunc);
252 local_file << get_object_outcome_moved.GetResult().GetBody().rdbuf();
255 << (use_pipe ? "pipe " : "file ") << file_path << ".")
260 teptr = std::current_exception();
270 while (!is_get_object_outcome_moved) {
271 std::this_thread::yield();
278 threads.push_back(std::move(th_writer));
289 file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
298 boost::filesystem::remove(it->second);
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
std::string s3_access_key
std::map< const std::string, const std::string > file_paths
bool g_enable_smem_group_by true
void init_for_read() override
std::string s3_session_token
const std::string url_part(const int i)
std::string s3_secret_key
std::vector< std::string > objkeys
void vacuum(const std::string &objkey)
#define S3_LOG_WITH_LOCK(x)