OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
S3Archive.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "S3Archive.h"
18 
19 #include <aws/core/Aws.h>
20 #include <aws/core/auth/AWSCredentialsProvider.h>
21 #include <aws/core/auth/AWSCredentialsProviderChain.h>
22 #include <aws/s3/model/GetObjectRequest.h>
23 #include <aws/s3/model/ListObjectsV2Request.h>
24 #include <aws/s3/model/Object.h>
25 #include <atomic>
26 #include <boost/filesystem.hpp>
27 #include <fstream>
28 #include <memory>
29 
31 #include "DataMgr/OmniSciAwsSdk.h"
32 #include "Logger/Logger.h"
33 
35 
36 namespace {
38  const std::string& env_variable_name) {
39  char* env;
40  if (param.empty() && (0 != (env = getenv(env_variable_name.c_str())))) {
41  param = env;
42  }
43 }
44 
45 }; // namespace
46 
48  boost::filesystem::create_directories(s3_temp_dir);
49  if (!boost::filesystem::is_directory(s3_temp_dir)) {
50  throw std::runtime_error("failed to create s3_temp_dir directory '" + s3_temp_dir +
51  "'");
52  }
53 
54  try {
55  bucket_name = url_part(4);
56  prefix_name = url_part(5);
57 
58  // a prefix '/obj/' should become 'obj/'
59  // a prefix '/obj' should become 'obj'
60  if (prefix_name.size() && '/' == prefix_name.front()) {
61  prefix_name = prefix_name.substr(1);
62  }
63 
64  Aws::S3::Model::ListObjectsV2Request objects_request;
65  objects_request.WithBucket(bucket_name);
66  objects_request.WithPrefix(prefix_name);
67  objects_request.SetMaxKeys(1 << 20);
68 
71  }
72 
73  if (s3_region.empty()) {
74  throw std::runtime_error(
75  "Required parameter \"s3_region\" not set. Please specify the \"s3_region\" "
76  "configuration parameter.");
77  }
78 
79  // for a daemon like heavydb it seems improper to set s3 credentials
80  // via AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env's because that way
81  // credentials are configured *globally* while different users with private
82  // s3 resources may need separate credentials to access.in that case, use
83  // WITH s3_access_key/s3_secret_key parameters.
84  Aws::Client::ClientConfiguration s3_config;
85  s3_config.region = s3_region;
86  s3_config.endpointOverride = s3_endpoint;
87  auto ssl_config = omnisci_aws_sdk::get_ssl_config();
88  s3_config.caPath = ssl_config.ca_path;
89  s3_config.caFile = ssl_config.ca_file;
90 
91  if (!s3_access_key.empty() && !s3_secret_key.empty()) {
92  s3_client.reset(new Aws::S3::S3Client(
93  Aws::Auth::AWSCredentials(s3_access_key, s3_secret_key, s3_session_token),
94  s3_config));
95  } else if (g_allow_s3_server_privileges) {
96  s3_client.reset(new Aws::S3::S3Client(
97  std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(), s3_config));
98  } else {
99  s3_client.reset(new Aws::S3::S3Client(
100  std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
101  }
102  while (true) {
103  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
104  if (list_objects_outcome.IsSuccess()) {
105  // pass only object keys to next stage, which may be Importer::import_parquet,
106  // Importer::import_compressed or else, depending on copy_params (eg. .is_parquet)
107  auto object_list = list_objects_outcome.GetResult().GetContents();
110 
111  if (0 == object_list.size()) {
112  if (objkeys.empty()) {
113  throw std::runtime_error("no object was found with s3 url '" + url + "'");
114  }
115  }
116 
117  LOG(INFO) << "Found " << (objkeys.empty() ? "" : "another ") << object_list.size()
118  << " objects with url '" + url + "':";
119  for (auto const& obj : object_list) {
120  std::string objkey = obj.GetKey().c_str();
121  LOG(INFO) << "\t" << objkey << " (size = " << obj.GetSize() << " bytes)";
122  total_file_size += obj.GetSize();
123  // skip _SUCCESS and keys with trailing / or basename with heading '.'
124  boost::filesystem::path path{objkey};
125  if (0 == obj.GetSize()) {
126  continue;
127  }
128  if ('/' == objkey.back()) {
129  continue;
130  }
131  if ('.' == path.filename().string().front()) {
132  continue;
133  }
134  objkeys.push_back(objkey);
135  }
136  } else {
137  // could not ListObject
138  // could be the object is there but we do not have listObject Privilege
139  // We can treat it as a specific object, so should try to parse it and pass to
140  // getObject as a singleton
141  // Null prefix in urls such like 's3://bucket/' should be ignored.
142  if (objkeys.empty()) {
143  if (!prefix_name.empty()) {
144  objkeys.push_back(prefix_name);
145  } else {
146  throw std::runtime_error("failed to list objects of s3 url '" + url + "': " +
147  list_objects_outcome.GetError().GetExceptionName() +
148  ": " + list_objects_outcome.GetError().GetMessage());
149  }
150  }
151  }
152  // continue to read next 1000 files
153  if (list_objects_outcome.GetResult().GetIsTruncated()) {
154  objects_request.SetContinuationToken(
155  list_objects_outcome.GetResult().GetNextContinuationToken());
156  } else {
157  break;
158  }
159  }
160  } catch (...) {
161  throw;
162  }
163 }
164 
165 // a bit complicated with S3 archive of parquet files is that these files
166 // use parquet api (not libarchive) and the files must be landed locally
167 // to be imported. besides, since parquet archives are often big in size
168 // to avoid extra EBS cost to customers, generally we don't want to land
169 // them at once but one by one.
170 //
171 // likely in the future there will be other file types that need to
172 // land entirely to be imported... (avro?)
173 const std::string S3Archive::land(const std::string& objkey,
174  std::exception_ptr& teptr,
175  const bool for_detection,
176  const bool allow_named_pipe_use,
177  const bool track_file_paths) {
178  // 7z file needs entire landing; other file types use a named pipe
179  static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
180  // need a dummy ext b/c no-ext now indicate plain_text
181  std::string file_path = s3_temp_dir + "/s3tmp_" + std::to_string(++seqno) + ".s3";
182  boost::filesystem::remove(file_path);
183 
184  auto ext = strrchr(objkey.c_str(), '.');
185  auto use_pipe = (nullptr == ext || 0 != strcmp(ext, ".7z"));
186 #ifdef ENABLE_IMPORT_PARQUET
187  use_pipe = use_pipe && (nullptr == ext || 0 != strcmp(ext, ".parquet"));
188 #endif
189  if (!allow_named_pipe_use) { // override using a named pipe no matter the configuration
190  use_pipe = false;
191  }
192  if (use_pipe) {
193  if (mkfifo(file_path.c_str(), 0660) < 0) {
194  throw std::runtime_error("failed to create named pipe '" + file_path +
195  "': " + strerror(errno));
196  }
197  }
198 
199  /*
200  Here is the background info that makes the thread interaction here a bit subtle:
201  1) We need two threading modes for the `th_writer` thread below:
202  a) synchronous mode to land .7z files or any file that must land fully as a local
203  file before it can be processed by libarchive. b) asynchronous mode to land a file
204  that can be processed by libarchive as a stream. With this mode, the file is streamed
205  into a temporary named pipe. 2) Cooperating with the `th_writer` thread is the
206  `th_pipe_writer` thread in Importer.cpp. For mode b), th_pipe_writer thread reads data
207  from the named pipe written by th_writer. Before it reads, it needs to open the pipe.
208  It will be blocked indefinitely (hang) if th_writer exits from any error before
209  th_pipe_writer opens the pipe. 3) AWS S3 client s3_client->GetObject returns an
210  'object' rather than a pointer to the object. That makes it hard to use smart pointer
211  for RAII. Calling s3_client->GetObject in th_writer body appears to the immediate
212  approach. 4) If s3_client->GetObject were called inside th_writer and th_writer is in
213  async mode, a tragic scenario is that th_writer receives an error (eg. bad
214  credentials) from AWS S3 server then quits when th_pipe_writer has proceeded to open
215  the named pipe and get blocked (hangs).
216 
217  So a viable approach is to move s3_client->GetObject out of th_writer body but *move*
218  the `object outcome` into th_writer body. This way we can better assure any error of
219  s3_client->GetObject will be thrown immediately to upstream (ie. th_pipe_writer) and
220  `object outcome` will be released later after the object is landed.
221  */
222  Aws::S3::Model::GetObjectRequest object_request;
223  object_request.WithBucket(bucket_name).WithKey(objkey);
224 
225  // set a download byte range (max 10mb) to avoid getting stuck on detecting big s3 files
226  if (use_pipe && for_detection) {
227  object_request.SetRange("bytes=0-10000000");
228  }
229 
230  auto get_object_outcome = s3_client->GetObject(object_request);
231  if (!get_object_outcome.IsSuccess()) {
232  throw std::runtime_error("failed to get object '" + objkey + "' of s3 url '" + url +
233  "': " + get_object_outcome.GetError().GetExceptionName() +
234  ": " + get_object_outcome.GetError().GetMessage());
235  }
236 
237  // streaming means asynch
238  std::atomic<bool> is_get_object_outcome_moved(false);
239  // fix a race between S3Archive::land and S3Archive::~S3Archive on S3Archive itself
240  auto& bucket_name = this->bucket_name;
241  auto th_writer =
242  std::thread([=, &teptr, &get_object_outcome, &is_get_object_outcome_moved]() {
243  try {
244  // this static mutex protect the static google::last_tm_time_for_raw_log from
245  // concurrent LOG(INFO)s that call RawLog__SetLastTime to write the variable!
246  static std::mutex mutex_glog;
247 #define S3_LOG_WITH_LOCK(x) \
248  { \
249  std::lock_guard<std::mutex> lock(mutex_glog); \
250  x; \
251  }
252  S3_LOG_WITH_LOCK(LOG(INFO) << "downloading s3://" << bucket_name << "/"
253  << objkey << " to " << (use_pipe ? "pipe " : "file ")
254  << file_path << "...")
255  auto get_object_outcome_moved =
256  decltype(get_object_outcome)(std::move(get_object_outcome));
257  is_get_object_outcome_moved = true;
258  Aws::OFStream local_file;
259  local_file.open(file_path.c_str(),
260  std::ios::out | std::ios::binary | std::ios::trunc);
261  local_file << get_object_outcome_moved.GetResult().GetBody().rdbuf();
263  << "downloaded s3://" << bucket_name << "/" << objkey << " to "
264  << (use_pipe ? "pipe " : "file ") << file_path << ".")
265  } catch (...) {
266  // need this way to capture any exception occurring when
267  // this thread runs as a disjoint asynchronous thread
268  if (use_pipe) {
269  teptr = std::current_exception();
270  } else {
271  throw;
272  }
273  }
274  });
275 
276  if (use_pipe) {
277  // in async (pipe) case, this function needs to wait for get_object_outcome
278  // to be moved before it can exits; otherwise, the move() above will boom!!
279  while (!is_get_object_outcome_moved) {
280  std::this_thread::yield();
281  }
282 
283  // no more detach this thread b/c detach thread is not possible to terminate
284  // safely. when sanity test exits and glog is destructed too soon, the LOG(INFO)
285  // above may still be holding glog rwlock while glog dtor tries to destruct the lock,
286  // this causing a race, though unlikely this race would happen in production env.
287  threads.push_back(std::move(th_writer));
288  // join is delayed to ~S3Archive; any exception happening to rdbuf()
289  // is passed to the upstream Importer th_pipe_writer thread via teptr.
290  } else {
291  try {
292  th_writer.join();
293  } catch (...) {
294  throw;
295  }
296  }
297 
298  if (track_file_paths) { // `file_paths` may be shared between threads, so is not
299  // thread-safe
300  file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
301  }
302  return file_path;
303 }
304 
305 void S3Archive::vacuum(const std::string& objkey) {
306  auto it = file_paths.find(objkey);
307  if (file_paths.end() == it) {
308  return;
309  }
310  boost::filesystem::remove(it->second);
311  file_paths.erase(it);
312 }
std::string s3_endpoint
Definition: S3Archive.h:146
std::string s3_region
Definition: S3Archive.h:145
#define LOG(tag)
Definition: Logger.h:283
size_t total_file_size
Definition: S3Archive.h:156
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
Definition: S3Archive.h:122
std::string prefix_name
Definition: S3Archive.h:150
std::string bucket_name
Definition: S3Archive.h:149
std::string to_string(char const *&&v)
std::string s3_access_key
Definition: S3Archive.h:142
std::vector< Aws::S3::Model::Object > s3_objects_filter_sort_files(const std::vector< Aws::S3::Model::Object > &file_paths, const shared::FilePathOptions &options)
SslConfig get_ssl_config()
std::map< const std::string, const std::string > file_paths
Definition: S3Archive.h:155
std::optional< std::string > file_sort_regex
Definition: S3Archive.h:153
bool g_enable_smem_group_by true
std::optional< std::string > regex_path_filter
Definition: S3Archive.h:151
void init_for_read() override
Definition: S3Archive.h:109
std::optional< std::string > file_sort_order_by
Definition: S3Archive.h:152
std::string s3_session_token
Definition: S3Archive.h:144
std::string s3_temp_dir
Definition: S3Archive.h:147
const std::string url_part(const int i)
Definition: Archive.h:185
void get_s3_parameter_from_env_if_unset_or_empty(std::string &param, const std::string &env_variable_name)
Definition: S3Archive.cpp:37
virtual int open()
Definition: Archive.h:134
std::string url
Definition: Archive.h:190
std::string s3_secret_key
Definition: S3Archive.h:143
bool g_allow_s3_server_privileges
Definition: S3Archive.cpp:34
std::vector< std::string > objkeys
Definition: S3Archive.h:154
void vacuum(const std::string &objkey)
Definition: S3Archive.h:127
#define S3_LOG_WITH_LOCK(x)