OmniSciDB  dfae7c3b14
S3Archive.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "S3Archive.h"
18 
19 #include <aws/core/Aws.h>
20 #include <aws/core/auth/AWSCredentialsProvider.h>
21 #include <aws/s3/model/GetObjectRequest.h>
22 #include <aws/s3/model/ListObjectsV2Request.h>
23 #include <aws/s3/model/Object.h>
24 #include <atomic>
25 #include <boost/filesystem.hpp>
26 #include <fstream>
27 #include <memory>
28 
29 #include "Logger/Logger.h"
30 
31 int S3Archive::awsapi_count;
32 std::mutex S3Archive::awsapi_mtx;
33 Aws::SDKOptions S3Archive::awsapi_options;
34 
36  boost::filesystem::create_directories(s3_temp_dir);
37  if (!boost::filesystem::is_directory(s3_temp_dir)) {
38  throw std::runtime_error("failed to create s3_temp_dir directory '" + s3_temp_dir +
39  "'");
40  }
41 
42  try {
43  bucket_name = url_part(4);
44  prefix_name = url_part(5);
45 
46  // a prefix '/obj/' should become 'obj/'
47  // a prefix '/obj' should become 'obj'
48  if (prefix_name.size() && '/' == prefix_name.front()) {
49  prefix_name = prefix_name.substr(1);
50  }
51 
52  Aws::S3::Model::ListObjectsV2Request objects_request;
53  objects_request.WithBucket(bucket_name);
54  objects_request.WithPrefix(prefix_name);
55  objects_request.SetMaxKeys(1 << 20);
56 
57  // for a daemon like omnisci_server it seems improper to set s3 credentials
58  // via AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env's because that way
59  // credentials are configured *globally* while different users with private
60  // s3 resources may need separate credentials to access.in that case, use
61  // WITH s3_access_key/s3_secret_key parameters.
62  Aws::Client::ClientConfiguration s3_config;
63  s3_config.region = s3_region.size() ? s3_region : Aws::Region::US_EAST_1;
64  s3_config.endpointOverride = s3_endpoint;
65 
66  /*
67  Fix a wrong ca path established at building libcurl on Centos being carried to
68  Ubuntu. To fix the issue, this is this sequence of locating ca file: 1) if
69  `SSL_CERT_DIR` or `SSL_CERT_FILE` is set, set it to S3 ClientConfiguration. 2) if
70  none ^ is set, omnisci_server searches a list of known ca file paths. 3) if 2)
71  finds nothing, it is users' call to set correct SSL_CERT_DIR or SSL_CERT_FILE. S3
72  c++ sdk: "we only want to override the default path if someone has explicitly told
73  us to."
74  */
75  std::list<std::string> v_known_ca_paths({
76  "/etc/ssl/certs/ca-certificates.crt",
77  "/etc/pki/tls/certs/ca-bundle.crt",
78  "/usr/share/ssl/certs/ca-bundle.crt",
79  "/usr/local/share/certs/ca-root.crt",
80  "/etc/ssl/cert.pem",
81  "/etc/ssl/ca-bundle.pem",
82  });
83  char* env;
84  if (nullptr != (env = getenv("SSL_CERT_DIR"))) {
85  s3_config.caPath = env;
86  }
87  if (nullptr != (env = getenv("SSL_CERT_FILE"))) {
88  v_known_ca_paths.push_front(env);
89  }
90  for (const auto& known_ca_path : v_known_ca_paths) {
91  if (boost::filesystem::exists(known_ca_path)) {
92  s3_config.caFile = known_ca_path;
93  break;
94  }
95  }
96 
97  if (!s3_access_key.empty() && !s3_secret_key.empty()) {
98  s3_client.reset(new Aws::S3::S3Client(
99  Aws::Auth::AWSCredentials(s3_access_key, s3_secret_key), s3_config));
100  } else {
101  s3_client.reset(new Aws::S3::S3Client(
102  std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
103  }
104  while (true) {
105  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
106  if (list_objects_outcome.IsSuccess()) {
107  // pass only object keys to next stage, which may be Importer::import_parquet,
108  // Importer::import_compressed or else, depending on copy_params (eg. .is_parquet)
109  auto object_list = list_objects_outcome.GetResult().GetContents();
110  if (0 == object_list.size()) {
111  if (objkeys.empty()) {
112  throw std::runtime_error("no object was found with s3 url '" + url + "'");
113  }
114  }
115 
116  LOG(INFO) << "Found " << (objkeys.empty() ? "" : "another ") << object_list.size()
117  << " objects with url '" + url + "':";
118  for (auto const& obj : object_list) {
119  std::string objkey = obj.GetKey().c_str();
120  LOG(INFO) << "\t" << objkey << " (size = " << obj.GetSize() << " bytes)";
121  total_file_size += obj.GetSize();
122  // skip _SUCCESS and keys with trailing / or basename with heading '.'
123  boost::filesystem::path path{objkey};
124  if (0 == obj.GetSize()) {
125  continue;
126  }
127  if ('/' == objkey.back()) {
128  continue;
129  }
130  if ('.' == path.filename().string().front()) {
131  continue;
132  }
133  objkeys.push_back(objkey);
134  }
135  } else {
136  // could not ListObject
137  // could be the object is there but we do not have listObject Privilege
138  // We can treat it as a specific object, so should try to parse it and pass to
139  // getObject as a singleton
140  // Null prefix in urls such like 's3://bucket/' should be ignored.
141  if (objkeys.empty()) {
142  if (!prefix_name.empty()) {
143  objkeys.push_back(prefix_name);
144  } else {
145  throw std::runtime_error("failed to list objects of s3 url '" + url + "': " +
146  list_objects_outcome.GetError().GetExceptionName() +
147  ": " + list_objects_outcome.GetError().GetMessage());
148  }
149  }
150  }
151  // continue to read next 1000 files
152  if (list_objects_outcome.GetResult().GetIsTruncated()) {
153  objects_request.SetContinuationToken(
154  list_objects_outcome.GetResult().GetNextContinuationToken());
155  } else {
156  break;
157  }
158  }
159  } catch (...) {
160  throw;
161  }
162 }
163 
164 // a bit complicated with S3 archive of parquet files is that these files
165 // use parquet api (not libarchive) and the files must be landed locally
166 // to be imported. besides, since parquet archives are often big in size
167 // to avoid extra EBS cost to customers, generally we don't want to land
168 // them at once but one by one.
169 //
170 // likely in the future there will be other file types that need to
171 // land entirely to be imported... (avro?)
172 const std::string S3Archive::land(const std::string& objkey,
173  std::exception_ptr& teptr,
174  const bool for_detection) {
175  // 7z file needs entire landing; other file types use a named pipe
176  static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
177  // need a dummy ext b/c no-ext now indicate plain_text
178  std::string file_path = s3_temp_dir + "/s3tmp_" + std::to_string(++seqno) + ".s3";
179  boost::filesystem::remove(file_path);
180 
181  auto ext = strrchr(objkey.c_str(), '.');
182  auto use_pipe = (nullptr == ext || 0 != strcmp(ext, ".7z"));
183 #ifdef ENABLE_IMPORT_PARQUET
184  use_pipe = use_pipe && (nullptr == ext || 0 != strcmp(ext, ".parquet"));
185 #endif
186  if (use_pipe) {
187  if (mkfifo(file_path.c_str(), 0660) < 0) {
188  throw std::runtime_error("failed to create named pipe '" + file_path +
189  "': " + strerror(errno));
190  }
191  }
192 
193  /*
194  Here is the background info that makes the thread interaction here a bit subtle:
195  1) We need two threading modes for the `th_writer` thread below:
196  a) synchronous mode to land .7z files or any file that must land fully as a local
197  file before it can be processed by libarchive. b) asynchronous mode to land a file
198  that can be processed by libarchive as a stream. With this mode, the file is streamed
199  into a temporary named pipe. 2) Cooperating with the `th_writer` thread is the
200  `th_pipe_writer` thread in Importer.cpp. For mode b), th_pipe_writer thread reads data
201  from the named pipe written by th_writer. Before it reads, it needs to open the pipe.
202  It will be blocked indefinitely (hang) if th_writer exits from any error before
203  th_pipe_writer opens the pipe. 3) AWS S3 client s3_client->GetObject returns an
204  'object' rather than a pointer to the object. That makes it hard to use smart pointer
205  for RAII. Calling s3_client->GetObject in th_writer body appears to the immediate
206  approach. 4) If s3_client->GetObject were called inside th_writer and th_writer is in
207  async mode, a tragic scenario is that th_writer receives an error (eg. bad
208  credentials) from AWS S3 server then quits when th_pipe_writer has proceeded to open
209  the named pipe and get blocked (hangs).
210 
211  So a viable approach is to move s3_client->GetObject out of th_writer body but *move*
212  the `object outcome` into th_writer body. This way we can better assure any error of
213  s3_client->GetObject will be thrown immediately to upstream (ie. th_pipe_writer) and
214  `object outcome` will be released later after the object is landed.
215  */
216  Aws::S3::Model::GetObjectRequest object_request;
217  object_request.WithBucket(bucket_name).WithKey(objkey);
218 
219  // set a download byte range (max 10mb) to avoid getting stuck on detecting big s3 files
220  if (use_pipe && for_detection) {
221  object_request.SetRange("bytes=0-10000000");
222  }
223 
224  auto get_object_outcome = s3_client->GetObject(object_request);
225  if (!get_object_outcome.IsSuccess()) {
226  throw std::runtime_error("failed to get object '" + objkey + "' of s3 url '" + url +
227  "': " + get_object_outcome.GetError().GetExceptionName() +
228  ": " + get_object_outcome.GetError().GetMessage());
229  }
230 
231  // streaming means asynch
232  std::atomic<bool> is_get_object_outcome_moved(false);
233  // fix a race between S3Archive::land and S3Archive::~S3Archive on S3Archive itself
234  auto& bucket_name = this->bucket_name;
235  auto th_writer =
236  std::thread([=, &teptr, &get_object_outcome, &is_get_object_outcome_moved]() {
237  try {
238  // this static mutex protect the static google::last_tm_time_for_raw_log from
239  // concurrent LOG(INFO)s that call RawLog__SetLastTime to write the variable!
240  static std::mutex mutex_glog;
241 #define S3_LOG_WITH_LOCK(x) \
242  { \
243  std::lock_guard<std::mutex> lock(mutex_glog); \
244  x; \
245  }
246  S3_LOG_WITH_LOCK(LOG(INFO) << "downloading s3://" << bucket_name << "/"
247  << objkey << " to " << (use_pipe ? "pipe " : "file ")
248  << file_path << "...")
249  auto get_object_outcome_moved =
250  decltype(get_object_outcome)(std::move(get_object_outcome));
251  is_get_object_outcome_moved = true;
252  Aws::OFStream local_file;
253  local_file.open(file_path.c_str(),
254  std::ios::out | std::ios::binary | std::ios::trunc);
255  local_file << get_object_outcome_moved.GetResult().GetBody().rdbuf();
257  << "downloaded s3://" << bucket_name << "/" << objkey << " to "
258  << (use_pipe ? "pipe " : "file ") << file_path << ".")
259  } catch (...) {
260  // need this way to capture any exception occurring when
261  // this thread runs as a disjoint asynchronous thread
262  if (use_pipe) {
263  teptr = std::current_exception();
264  } else {
265  throw;
266  }
267  }
268  });
269 
270  if (use_pipe) {
271  // in async (pipe) case, this function needs to wait for get_object_outcome
272  // to be moved before it can exits; otherwise, the move() above will boom!!
273  while (!is_get_object_outcome_moved) {
274  std::this_thread::yield();
275  }
276 
277  // no more detach this thread b/c detach thread is not possible to terminate
278  // safely. when sanity test exits and glog is destructed too soon, the LOG(INFO)
279  // above may still be holding glog rwlock while glog dtor tries to destruct the lock,
280  // this causing a race, though unlikely this race would happen in production env.
281  threads.push_back(std::move(th_writer));
282  // join is delayed to ~S3Archive; any exception happening to rdbuf()
283  // is passed to the upstream Importer th_pipe_writer thread via teptr.
284  } else {
285  try {
286  th_writer.join();
287  } catch (...) {
288  throw;
289  }
290  }
291 
292  file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
293  return file_path;
294 }
295 
296 void S3Archive::vacuum(const std::string& objkey) {
297  auto it = file_paths.find(objkey);
298  if (file_paths.end() == it) {
299  return;
300  }
301  boost::filesystem::remove(it->second);
302  file_paths.erase(it);
303 }
std::string s3_endpoint
Definition: S3Archive.h:133
std::string s3_region
Definition: S3Archive.h:132
#define LOG(tag)
Definition: Logger.h:188
size_t total_file_size
Definition: S3Archive.h:140
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
Definition: S3Archive.h:110
std::string prefix_name
Definition: S3Archive.h:137
std::string bucket_name
Definition: S3Archive.h:136
std::string to_string(char const *&&v)
std::string s3_access_key
Definition: S3Archive.h:130
std::map< const std::string, const std::string > file_paths
Definition: S3Archive.h:139
void init_for_read() override
Definition: S3Archive.cpp:35
std::string s3_temp_dir
Definition: S3Archive.h:134
const std::string url_part(const int i)
Definition: Archive.h:182
std::string url
Definition: Archive.h:187
std::string s3_secret_key
Definition: S3Archive.h:131
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::vector< std::string > objkeys
Definition: S3Archive.h:138
void vacuum(const std::string &objkey)
Definition: S3Archive.h:115
#define S3_LOG_WITH_LOCK(x)