OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
S3Archive.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "S3Archive.h"
18 
19 #include <aws/core/Aws.h>
20 #include <aws/core/auth/AWSCredentialsProvider.h>
21 #include <aws/s3/model/GetObjectRequest.h>
22 #include <aws/s3/model/ListObjectsV2Request.h>
23 #include <aws/s3/model/Object.h>
24 #include <atomic>
25 #include <boost/filesystem.hpp>
26 #include <fstream>
27 #include <memory>
28 
29 #include "Logger/Logger.h"
30 
32  boost::filesystem::create_directories(s3_temp_dir);
33  if (!boost::filesystem::is_directory(s3_temp_dir)) {
34  throw std::runtime_error("failed to create s3_temp_dir directory '" + s3_temp_dir +
35  "'");
36  }
37 
38  try {
39  bucket_name = url_part(4);
40  prefix_name = url_part(5);
41 
42  // a prefix '/obj/' should become 'obj/'
43  // a prefix '/obj' should become 'obj'
44  if (prefix_name.size() && '/' == prefix_name.front()) {
45  prefix_name = prefix_name.substr(1);
46  }
47 
48  Aws::S3::Model::ListObjectsV2Request objects_request;
49  objects_request.WithBucket(bucket_name);
50  objects_request.WithPrefix(prefix_name);
51  objects_request.SetMaxKeys(1 << 20);
52 
53  // for a daemon like omnisci_server it seems improper to set s3 credentials
54  // via AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env's because that way
55  // credentials are configured *globally* while different users with private
56  // s3 resources may need separate credentials to access.in that case, use
57  // WITH s3_access_key/s3_secret_key parameters.
58  Aws::Client::ClientConfiguration s3_config;
59  s3_config.region = s3_region.size() ? s3_region : Aws::Region::US_EAST_1;
60  s3_config.endpointOverride = s3_endpoint;
61 
62  /*
63  Fix a wrong ca path established at building libcurl on Centos being carried to
64  Ubuntu. To fix the issue, this is this sequence of locating ca file: 1) if
65  `SSL_CERT_DIR` or `SSL_CERT_FILE` is set, set it to S3 ClientConfiguration. 2) if
66  none ^ is set, omnisci_server searches a list of known ca file paths. 3) if 2)
67  finds nothing, it is users' call to set correct SSL_CERT_DIR or SSL_CERT_FILE. S3
68  c++ sdk: "we only want to override the default path if someone has explicitly told
69  us to."
70  */
71  std::list<std::string> v_known_ca_paths({
72  "/etc/ssl/certs/ca-certificates.crt",
73  "/etc/pki/tls/certs/ca-bundle.crt",
74  "/usr/share/ssl/certs/ca-bundle.crt",
75  "/usr/local/share/certs/ca-root.crt",
76  "/etc/ssl/cert.pem",
77  "/etc/ssl/ca-bundle.pem",
78  });
79  char* env;
80  if (nullptr != (env = getenv("SSL_CERT_DIR"))) {
81  s3_config.caPath = env;
82  }
83  if (nullptr != (env = getenv("SSL_CERT_FILE"))) {
84  v_known_ca_paths.push_front(env);
85  }
86  for (const auto& known_ca_path : v_known_ca_paths) {
87  if (boost::filesystem::exists(known_ca_path)) {
88  s3_config.caFile = known_ca_path;
89  break;
90  }
91  }
92 
93  if (!s3_access_key.empty() && !s3_secret_key.empty()) {
94  s3_client.reset(new Aws::S3::S3Client(
95  Aws::Auth::AWSCredentials(s3_access_key, s3_secret_key, s3_session_token),
96  s3_config));
97  } else {
98  s3_client.reset(new Aws::S3::S3Client(
99  std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
100  }
101  while (true) {
102  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
103  if (list_objects_outcome.IsSuccess()) {
104  // pass only object keys to next stage, which may be Importer::import_parquet,
105  // Importer::import_compressed or else, depending on copy_params (eg. .is_parquet)
106  auto object_list = list_objects_outcome.GetResult().GetContents();
107  if (0 == object_list.size()) {
108  if (objkeys.empty()) {
109  throw std::runtime_error("no object was found with s3 url '" + url + "'");
110  }
111  }
112 
113  LOG(INFO) << "Found " << (objkeys.empty() ? "" : "another ") << object_list.size()
114  << " objects with url '" + url + "':";
115  for (auto const& obj : object_list) {
116  std::string objkey = obj.GetKey().c_str();
117  LOG(INFO) << "\t" << objkey << " (size = " << obj.GetSize() << " bytes)";
118  total_file_size += obj.GetSize();
119  // skip _SUCCESS and keys with trailing / or basename with heading '.'
120  boost::filesystem::path path{objkey};
121  if (0 == obj.GetSize()) {
122  continue;
123  }
124  if ('/' == objkey.back()) {
125  continue;
126  }
127  if ('.' == path.filename().string().front()) {
128  continue;
129  }
130  objkeys.push_back(objkey);
131  }
132  } else {
133  // could not ListObject
134  // could be the object is there but we do not have listObject Privilege
135  // We can treat it as a specific object, so should try to parse it and pass to
136  // getObject as a singleton
137  // Null prefix in urls such like 's3://bucket/' should be ignored.
138  if (objkeys.empty()) {
139  if (!prefix_name.empty()) {
140  objkeys.push_back(prefix_name);
141  } else {
142  throw std::runtime_error("failed to list objects of s3 url '" + url + "': " +
143  list_objects_outcome.GetError().GetExceptionName() +
144  ": " + list_objects_outcome.GetError().GetMessage());
145  }
146  }
147  }
148  // continue to read next 1000 files
149  if (list_objects_outcome.GetResult().GetIsTruncated()) {
150  objects_request.SetContinuationToken(
151  list_objects_outcome.GetResult().GetNextContinuationToken());
152  } else {
153  break;
154  }
155  }
156  } catch (...) {
157  throw;
158  }
159 }
160 
161 // a bit complicated with S3 archive of parquet files is that these files
162 // use parquet api (not libarchive) and the files must be landed locally
163 // to be imported. besides, since parquet archives are often big in size
164 // to avoid extra EBS cost to customers, generally we don't want to land
165 // them at once but one by one.
166 //
167 // likely in the future there will be other file types that need to
168 // land entirely to be imported... (avro?)
169 const std::string S3Archive::land(const std::string& objkey,
170  std::exception_ptr& teptr,
171  const bool for_detection) {
172  // 7z file needs entire landing; other file types use a named pipe
173  static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
174  // need a dummy ext b/c no-ext now indicate plain_text
175  std::string file_path = s3_temp_dir + "/s3tmp_" + std::to_string(++seqno) + ".s3";
176  boost::filesystem::remove(file_path);
177 
178  auto ext = strrchr(objkey.c_str(), '.');
179  auto use_pipe = (nullptr == ext || 0 != strcmp(ext, ".7z"));
180 #ifdef ENABLE_IMPORT_PARQUET
181  use_pipe = use_pipe && (nullptr == ext || 0 != strcmp(ext, ".parquet"));
182 #endif
183  if (use_pipe) {
184  if (mkfifo(file_path.c_str(), 0660) < 0) {
185  throw std::runtime_error("failed to create named pipe '" + file_path +
186  "': " + strerror(errno));
187  }
188  }
189 
190  /*
191  Here is the background info that makes the thread interaction here a bit subtle:
192  1) We need two threading modes for the `th_writer` thread below:
193  a) synchronous mode to land .7z files or any file that must land fully as a local
194  file before it can be processed by libarchive. b) asynchronous mode to land a file
195  that can be processed by libarchive as a stream. With this mode, the file is streamed
196  into a temporary named pipe. 2) Cooperating with the `th_writer` thread is the
197  `th_pipe_writer` thread in Importer.cpp. For mode b), th_pipe_writer thread reads data
198  from the named pipe written by th_writer. Before it reads, it needs to open the pipe.
199  It will be blocked indefinitely (hang) if th_writer exits from any error before
200  th_pipe_writer opens the pipe. 3) AWS S3 client s3_client->GetObject returns an
201  'object' rather than a pointer to the object. That makes it hard to use smart pointer
202  for RAII. Calling s3_client->GetObject in th_writer body appears to the immediate
203  approach. 4) If s3_client->GetObject were called inside th_writer and th_writer is in
204  async mode, a tragic scenario is that th_writer receives an error (eg. bad
205  credentials) from AWS S3 server then quits when th_pipe_writer has proceeded to open
206  the named pipe and get blocked (hangs).
207 
208  So a viable approach is to move s3_client->GetObject out of th_writer body but *move*
209  the `object outcome` into th_writer body. This way we can better assure any error of
210  s3_client->GetObject will be thrown immediately to upstream (ie. th_pipe_writer) and
211  `object outcome` will be released later after the object is landed.
212  */
213  Aws::S3::Model::GetObjectRequest object_request;
214  object_request.WithBucket(bucket_name).WithKey(objkey);
215 
216  // set a download byte range (max 10mb) to avoid getting stuck on detecting big s3 files
217  if (use_pipe && for_detection) {
218  object_request.SetRange("bytes=0-10000000");
219  }
220 
221  auto get_object_outcome = s3_client->GetObject(object_request);
222  if (!get_object_outcome.IsSuccess()) {
223  throw std::runtime_error("failed to get object '" + objkey + "' of s3 url '" + url +
224  "': " + get_object_outcome.GetError().GetExceptionName() +
225  ": " + get_object_outcome.GetError().GetMessage());
226  }
227 
228  // streaming means asynch
229  std::atomic<bool> is_get_object_outcome_moved(false);
230  // fix a race between S3Archive::land and S3Archive::~S3Archive on S3Archive itself
231  auto& bucket_name = this->bucket_name;
232  auto th_writer =
233  std::thread([=, &teptr, &get_object_outcome, &is_get_object_outcome_moved]() {
234  try {
235  // this static mutex protect the static google::last_tm_time_for_raw_log from
236  // concurrent LOG(INFO)s that call RawLog__SetLastTime to write the variable!
237  static std::mutex mutex_glog;
238 #define S3_LOG_WITH_LOCK(x) \
239  { \
240  std::lock_guard<std::mutex> lock(mutex_glog); \
241  x; \
242  }
243  S3_LOG_WITH_LOCK(LOG(INFO) << "downloading s3://" << bucket_name << "/"
244  << objkey << " to " << (use_pipe ? "pipe " : "file ")
245  << file_path << "...")
246  auto get_object_outcome_moved =
247  decltype(get_object_outcome)(std::move(get_object_outcome));
248  is_get_object_outcome_moved = true;
249  Aws::OFStream local_file;
250  local_file.open(file_path.c_str(),
251  std::ios::out | std::ios::binary | std::ios::trunc);
252  local_file << get_object_outcome_moved.GetResult().GetBody().rdbuf();
254  << "downloaded s3://" << bucket_name << "/" << objkey << " to "
255  << (use_pipe ? "pipe " : "file ") << file_path << ".")
256  } catch (...) {
257  // need this way to capture any exception occurring when
258  // this thread runs as a disjoint asynchronous thread
259  if (use_pipe) {
260  teptr = std::current_exception();
261  } else {
262  throw;
263  }
264  }
265  });
266 
267  if (use_pipe) {
268  // in async (pipe) case, this function needs to wait for get_object_outcome
269  // to be moved before it can exits; otherwise, the move() above will boom!!
270  while (!is_get_object_outcome_moved) {
271  std::this_thread::yield();
272  }
273 
274  // no more detach this thread b/c detach thread is not possible to terminate
275  // safely. when sanity test exits and glog is destructed too soon, the LOG(INFO)
276  // above may still be holding glog rwlock while glog dtor tries to destruct the lock,
277  // this causing a race, though unlikely this race would happen in production env.
278  threads.push_back(std::move(th_writer));
279  // join is delayed to ~S3Archive; any exception happening to rdbuf()
280  // is passed to the upstream Importer th_pipe_writer thread via teptr.
281  } else {
282  try {
283  th_writer.join();
284  } catch (...) {
285  throw;
286  }
287  }
288 
289  file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
290  return file_path;
291 }
292 
293 void S3Archive::vacuum(const std::string& objkey) {
294  auto it = file_paths.find(objkey);
295  if (file_paths.end() == it) {
296  return;
297  }
298  boost::filesystem::remove(it->second);
299  file_paths.erase(it);
300 }
std::string s3_endpoint
Definition: S3Archive.h:130
std::string s3_region
Definition: S3Archive.h:129
#define LOG(tag)
Definition: Logger.h:188
size_t total_file_size
Definition: S3Archive.h:137
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
Definition: S3Archive.h:106
std::string prefix_name
Definition: S3Archive.h:134
std::string bucket_name
Definition: S3Archive.h:133
std::string to_string(char const *&&v)
std::string s3_access_key
Definition: S3Archive.h:126
std::map< const std::string, const std::string > file_paths
Definition: S3Archive.h:136
bool g_enable_smem_group_by true
void init_for_read() override
Definition: S3Archive.h:95
std::string s3_session_token
Definition: S3Archive.h:128
std::string s3_temp_dir
Definition: S3Archive.h:131
const std::string url_part(const int i)
Definition: Archive.h:182
virtual int open()
Definition: Archive.h:131
std::string url
Definition: Archive.h:187
std::string s3_secret_key
Definition: S3Archive.h:127
std::vector< std::string > objkeys
Definition: S3Archive.h:135
void vacuum(const std::string &objkey)
Definition: S3Archive.h:111
#define S3_LOG_WITH_LOCK(x)