OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
S3Archive.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "S3Archive.h"
17 #include <atomic>
18 #include <boost/filesystem.hpp>
19 #include "Shared/Logger.h"
20 
21 #include <aws/core/Aws.h>
22 #include <aws/core/auth/AWSCredentialsProvider.h>
23 #include <aws/s3/model/GetObjectRequest.h>
24 #include <aws/s3/model/ListObjectsV2Request.h>
25 #include <aws/s3/model/Object.h>
26 #include <fstream>
27 #include <memory>
28 
29 int S3Archive::awsapi_count;
30 std::mutex S3Archive::awsapi_mtx;
31 Aws::SDKOptions S3Archive::awsapi_options;
32 
34  boost::filesystem::create_directories(s3_temp_dir);
35  if (!boost::filesystem::is_directory(s3_temp_dir)) {
36  throw std::runtime_error("failed to create s3_temp_dir directory '" + s3_temp_dir +
37  "'");
38  }
39 
40  try {
41  bucket_name = url_part(4);
42  prefix_name = url_part(5);
43 
44  // a prefix '/obj/' should become 'obj/'
45  // a prefix '/obj' should become 'obj'
46  if (prefix_name.size() && '/' == prefix_name.front()) {
47  prefix_name = prefix_name.substr(1);
48  }
49 
50  Aws::S3::Model::ListObjectsV2Request objects_request;
51  objects_request.WithBucket(bucket_name);
52  objects_request.WithPrefix(prefix_name);
53  objects_request.SetMaxKeys(1 << 20);
54 
55  // for a daemon like omnisci_server it seems improper to set s3 credentials
56  // via AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env's because that way
57  // credentials are configured *globally* while different users with private
58  // s3 resources may need separate credentials to access.in that case, use
59  // WITH s3_access_key/s3_secret_key parameters.
60  Aws::Client::ClientConfiguration s3_config;
61  s3_config.region = s3_region.size() ? s3_region : Aws::Region::US_EAST_1;
62  s3_config.endpointOverride = s3_endpoint;
63 
64  /*
65  Fix a wrong ca path established at building libcurl on Centos being carried to
66  Ubuntu. To fix the issue, this is this sequence of locating ca file: 1) if
67  `SSL_CERT_DIR` or `SSL_CERT_FILE` is set, set it to S3 ClientConfiguration. 2) if
68  none ^ is set, omnisci_server searches a list of known ca file paths. 3) if 2)
69  finds nothing, it is users' call to set correct SSL_CERT_DIR or SSL_CERT_FILE. S3
70  c++ sdk: "we only want to override the default path if someone has explicitly told
71  us to."
72  */
73  std::list<std::string> v_known_ca_paths({
74  "/etc/ssl/certs/ca-certificates.crt",
75  "/etc/pki/tls/certs/ca-bundle.crt",
76  "/usr/share/ssl/certs/ca-bundle.crt",
77  "/usr/local/share/certs/ca-root.crt",
78  "/etc/ssl/cert.pem",
79  "/etc/ssl/ca-bundle.pem",
80  });
81  char* env;
82  if (nullptr != (env = getenv("SSL_CERT_DIR"))) {
83  s3_config.caPath = env;
84  }
85  if (nullptr != (env = getenv("SSL_CERT_FILE"))) {
86  v_known_ca_paths.push_front(env);
87  }
88  for (const auto& known_ca_path : v_known_ca_paths) {
89  if (boost::filesystem::exists(known_ca_path)) {
90  s3_config.caFile = known_ca_path;
91  break;
92  }
93  }
94 
95  if (!s3_access_key.empty() && !s3_secret_key.empty()) {
96  s3_client.reset(new Aws::S3::S3Client(
97  Aws::Auth::AWSCredentials(s3_access_key, s3_secret_key), s3_config));
98  } else {
99  s3_client.reset(new Aws::S3::S3Client(
100  std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
101  }
102  while (true) {
103  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
104  if (list_objects_outcome.IsSuccess()) {
105  // pass only object keys to next stage, which may be Importer::import_parquet,
106  // Importer::import_compressed or else, depending on copy_params (eg. .is_parquet)
107  auto object_list = list_objects_outcome.GetResult().GetContents();
108  if (0 == object_list.size()) {
109  if (objkeys.empty()) {
110  throw std::runtime_error("no object was found with s3 url '" + url + "'");
111  }
112  }
113 
114  LOG(INFO) << "Found " << (objkeys.empty() ? "" : "another ") << object_list.size()
115  << " objects with url '" + url + "':";
116  for (auto const& obj : object_list) {
117  std::string objkey = obj.GetKey().c_str();
118  LOG(INFO) << "\t" << objkey << " (size = " << obj.GetSize() << " bytes)";
119  total_file_size += obj.GetSize();
120  // skip _SUCCESS and keys with trailing / or basename with heading '.'
121  boost::filesystem::path path{objkey};
122  if (0 == obj.GetSize()) {
123  continue;
124  }
125  if ('/' == objkey.back()) {
126  continue;
127  }
128  if ('.' == path.filename().string().front()) {
129  continue;
130  }
131  objkeys.push_back(objkey);
132  }
133  } else {
134  // could not ListObject
135  // could be the object is there but we do not have listObject Privilege
136  // We can treat it as a specific object, so should try to parse it and pass to
137  // getObject as a singleton
138  // Null prefix in urls such like 's3://bucket/' should be ignored.
139  if (objkeys.empty()) {
140  if (!prefix_name.empty()) {
141  objkeys.push_back(prefix_name);
142  } else {
143  throw std::runtime_error("failed to list objects of s3 url '" + url + "': " +
144  list_objects_outcome.GetError().GetExceptionName() +
145  ": " + list_objects_outcome.GetError().GetMessage());
146  }
147  }
148  }
149  // continue to read next 1000 files
150  if (list_objects_outcome.GetResult().GetIsTruncated()) {
151  objects_request.SetContinuationToken(
152  list_objects_outcome.GetResult().GetNextContinuationToken());
153  } else {
154  break;
155  }
156  }
157  } catch (...) {
158  throw;
159  }
160 }
161 
162 // a bit complicated with S3 archive of parquet files is that these files
163 // use parquet api (not libarchive) and the files must be landed locally
164 // to be imported. besides, since parquet archives are often big in size
165 // to avoid extra EBS cost to customers, generally we don't want to land
166 // them at once but one by one.
167 //
168 // likely in the future there will be other file types that need to
169 // land entirely to be imported... (avro?)
170 const std::string S3Archive::land(const std::string& objkey,
171  std::exception_ptr& teptr,
172  const bool for_detection) {
173  // 7z file needs entire landing; other file types use a named pipe
174  static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
175  // need a dummy ext b/c no-ext now indicate plain_text
176  std::string file_path = s3_temp_dir + "/s3tmp_" + std::to_string(++seqno) + ".s3";
177  boost::filesystem::remove(file_path);
178 
179  auto ext = strrchr(objkey.c_str(), '.');
180  auto use_pipe = (nullptr == ext || 0 != strcmp(ext, ".7z"));
181 #ifdef ENABLE_IMPORT_PARQUET
182  use_pipe = use_pipe && (nullptr == ext || 0 != strcmp(ext, ".parquet"));
183 #endif
184  if (use_pipe) {
185  if (mkfifo(file_path.c_str(), 0660) < 0) {
186  throw std::runtime_error("failed to create named pipe '" + file_path +
187  "': " + strerror(errno));
188  }
189  }
190 
191  /*
192  Here is the background info that makes the thread interaction here a bit subtle:
193  1) We need two threading modes for the `th_writer` thread below:
194  a) synchronous mode to land .7z files or any file that must land fully as a local
195  file before it can be processed by libarchive. b) asynchronous mode to land a file
196  that can be processed by libarchive as a stream. With this mode, the file is streamed
197  into a temporary named pipe. 2) Cooperating with the `th_writer` thread is the
198  `th_pipe_writer` thread in Importer.cpp. For mode b), th_pipe_writer thread reads data
199  from the named pipe written by th_writer. Before it reads, it needs to open the pipe.
200  It will be blocked indefinitely (hang) if th_writer exits from any error before
201  th_pipe_writer opens the pipe. 3) AWS S3 client s3_client->GetObject returns an
202  'object' rather than a pointer to the object. That makes it hard to use smart pointer
203  for RAII. Calling s3_client->GetObject in th_writer body appears to the immediate
204  approach. 4) If s3_client->GetObject were called inside th_writer and th_writer is in
205  async mode, a tragic scenario is that th_writer receives an error (eg. bad
206  credentials) from AWS S3 server then quits when th_pipe_writer has proceeded to open
207  the named pipe and get blocked (hangs).
208 
209  So a viable approach is to move s3_client->GetObject out of th_writer body but *move*
210  the `object outcome` into th_writer body. This way we can better assure any error of
211  s3_client->GetObject will be thrown immediately to upstream (ie. th_pipe_writer) and
212  `object outcome` will be released later after the object is landed.
213  */
214  Aws::S3::Model::GetObjectRequest object_request;
215  object_request.WithBucket(bucket_name).WithKey(objkey);
216 
217  // set a download byte range (max 10mb) to avoid getting stuck on detecting big s3 files
218  if (use_pipe && for_detection) {
219  object_request.SetRange("bytes=0-10000000");
220  }
221 
222  auto get_object_outcome = s3_client->GetObject(object_request);
223  if (!get_object_outcome.IsSuccess()) {
224  throw std::runtime_error("failed to get object '" + objkey + "' of s3 url '" + url +
225  "': " + get_object_outcome.GetError().GetExceptionName() +
226  ": " + get_object_outcome.GetError().GetMessage());
227  }
228 
229  // streaming means asynch
230  std::atomic<bool> is_get_object_outcome_moved(false);
231  // fix a race between S3Archive::land and S3Archive::~S3Archive on S3Archive itself
232  auto& bucket_name = this->bucket_name;
233  auto th_writer =
234  std::thread([=, &teptr, &get_object_outcome, &is_get_object_outcome_moved]() {
235  try {
236  // this static mutex protect the static google::last_tm_time_for_raw_log from
237  // concurrent LOG(INFO)s that call RawLog__SetLastTime to write the variable!
238  static std::mutex mutex_glog;
239 #define MAPD_S3_LOG(x) \
240  { \
241  std::unique_lock<std::mutex> lock(mutex_glog); \
242  x; \
243  }
245  << "downloading s3://" << bucket_name << "/" << objkey << " to "
246  << (use_pipe ? "pipe " : "file ") << file_path << "...")
247  auto get_object_outcome_moved =
248  decltype(get_object_outcome)(std::move(get_object_outcome));
249  is_get_object_outcome_moved = true;
250  Aws::OFStream local_file;
251  local_file.open(file_path.c_str(),
252  std::ios::out | std::ios::binary | std::ios::trunc);
253  local_file << get_object_outcome_moved.GetResult().GetBody().rdbuf();
255  << "downloaded s3://" << bucket_name << "/" << objkey << " to "
256  << (use_pipe ? "pipe " : "file ") << file_path << ".")
257  } catch (...) {
258  // need this way to capture any exception occurring when
259  // this thread runs as a disjoint asynchronous thread
260  if (use_pipe) {
261  teptr = std::current_exception();
262  } else {
263  throw;
264  }
265  }
266  });
267 
268  if (use_pipe) {
269  // in async (pipe) case, this function needs to wait for get_object_outcome
270  // to be moved before it can exits; otherwise, the move() above will boom!!
271  while (!is_get_object_outcome_moved) {
272  std::this_thread::yield();
273  }
274 
275  // no more detach this thread b/c detach thread is not possible to terminate
276  // safely. when sanity test exits and glog is destructed too soon, the LOG(INFO)
277  // above may still be holding glog rwlock while glog dtor tries to destruct the lock,
278  // this causing a race, though unlikely this race would happen in production env.
279  threads.push_back(std::move(th_writer));
280  // join is delayed to ~S3Archive; any exception happening to rdbuf()
281  // is passed to the upstream Importer th_pipe_writer thread via teptr.
282  } else {
283  try {
284  th_writer.join();
285  } catch (...) {
286  throw;
287  }
288  }
289 
290  file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
291  return file_path;
292 }
293 
294 void S3Archive::vacuum(const std::string& objkey) {
295  auto it = file_paths.find(objkey);
296  if (file_paths.end() == it) {
297  return;
298  }
299  boost::filesystem::remove(it->second);
300  file_paths.erase(it);
301 }
std::string s3_endpoint
Definition: S3Archive.h:133
#define MAPD_S3_LOG(x)
std::string s3_region
Definition: S3Archive.h:132
#define LOG(tag)
Definition: Logger.h:185
size_t total_file_size
Definition: S3Archive.h:140
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
Definition: S3Archive.h:110
std::string prefix_name
Definition: S3Archive.h:137
std::string bucket_name
Definition: S3Archive.h:136
std::string to_string(char const *&&v)
std::string s3_access_key
Definition: S3Archive.h:130
std::map< const std::string, const std::string > file_paths
Definition: S3Archive.h:139
bool g_enable_smem_group_by true
void init_for_read() override
Definition: S3Archive.cpp:33
std::string s3_temp_dir
Definition: S3Archive.h:134
const std::string url_part(const int i)
Definition: Archive.h:183
virtual int open()
Definition: Archive.h:132
std::string url
Definition: Archive.h:186
std::string s3_secret_key
Definition: S3Archive.h:131
const int8_t const int64_t const uint64_t const int32_t const int64_t int64_t ** out
std::vector< std::string > objkeys
Definition: S3Archive.h:138
void vacuum(const std::string &objkey)
Definition: S3Archive.h:115