OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
S3Archive.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "S3Archive.h"
18 
19 #include <aws/core/Aws.h>
20 #include <aws/core/auth/AWSCredentialsProvider.h>
21 #include <aws/core/auth/AWSCredentialsProviderChain.h>
22 #include <aws/s3/model/GetObjectRequest.h>
23 #include <aws/s3/model/ListObjectsV2Request.h>
24 #include <aws/s3/model/Object.h>
25 #include <atomic>
26 #include <boost/filesystem.hpp>
27 #include <fstream>
28 #include <memory>
29 
31 #include "DataMgr/OmniSciAwsSdk.h"
32 #include "Logger/Logger.h"
33 
35 
37  boost::filesystem::create_directories(s3_temp_dir);
38  if (!boost::filesystem::is_directory(s3_temp_dir)) {
39  throw std::runtime_error("failed to create s3_temp_dir directory '" + s3_temp_dir +
40  "'");
41  }
42 
43  try {
44  bucket_name = url_part(4);
45  prefix_name = url_part(5);
46 
47  // a prefix '/obj/' should become 'obj/'
48  // a prefix '/obj' should become 'obj'
49  if (prefix_name.size() && '/' == prefix_name.front()) {
50  prefix_name = prefix_name.substr(1);
51  }
52 
53  Aws::S3::Model::ListObjectsV2Request objects_request;
54  objects_request.WithBucket(bucket_name);
55  objects_request.WithPrefix(prefix_name);
56  objects_request.SetMaxKeys(1 << 20);
57 
58  // for a daemon like omnisci_server it seems improper to set s3 credentials
59  // via AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY env's because that way
60  // credentials are configured *globally* while different users with private
61  // s3 resources may need separate credentials to access.in that case, use
62  // WITH s3_access_key/s3_secret_key parameters.
63  Aws::Client::ClientConfiguration s3_config;
64  s3_config.region = s3_region.size() ? s3_region : Aws::Region::US_EAST_1;
65  s3_config.endpointOverride = s3_endpoint;
66  auto ssl_config = omnisci_aws_sdk::get_ssl_config();
67  s3_config.caPath = ssl_config.ca_path;
68  s3_config.caFile = ssl_config.ca_file;
69 
70  if (!s3_access_key.empty() && !s3_secret_key.empty()) {
71  s3_client.reset(new Aws::S3::S3Client(
72  Aws::Auth::AWSCredentials(s3_access_key, s3_secret_key, s3_session_token),
73  s3_config));
74  } else if (g_allow_s3_server_privileges) {
75  s3_client.reset(new Aws::S3::S3Client(
76  std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(), s3_config));
77  } else {
78  s3_client.reset(new Aws::S3::S3Client(
79  std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
80  }
81  while (true) {
82  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
83  if (list_objects_outcome.IsSuccess()) {
84  // pass only object keys to next stage, which may be Importer::import_parquet,
85  // Importer::import_compressed or else, depending on copy_params (eg. .is_parquet)
86  auto object_list = list_objects_outcome.GetResult().GetContents();
89 
90  if (0 == object_list.size()) {
91  if (objkeys.empty()) {
92  throw std::runtime_error("no object was found with s3 url '" + url + "'");
93  }
94  }
95 
96  LOG(INFO) << "Found " << (objkeys.empty() ? "" : "another ") << object_list.size()
97  << " objects with url '" + url + "':";
98  for (auto const& obj : object_list) {
99  std::string objkey = obj.GetKey().c_str();
100  LOG(INFO) << "\t" << objkey << " (size = " << obj.GetSize() << " bytes)";
101  total_file_size += obj.GetSize();
102  // skip _SUCCESS and keys with trailing / or basename with heading '.'
103  boost::filesystem::path path{objkey};
104  if (0 == obj.GetSize()) {
105  continue;
106  }
107  if ('/' == objkey.back()) {
108  continue;
109  }
110  if ('.' == path.filename().string().front()) {
111  continue;
112  }
113  objkeys.push_back(objkey);
114  }
115  } else {
116  // could not ListObject
117  // could be the object is there but we do not have listObject Privilege
118  // We can treat it as a specific object, so should try to parse it and pass to
119  // getObject as a singleton
120  // Null prefix in urls such like 's3://bucket/' should be ignored.
121  if (objkeys.empty()) {
122  if (!prefix_name.empty()) {
123  objkeys.push_back(prefix_name);
124  } else {
125  throw std::runtime_error("failed to list objects of s3 url '" + url + "': " +
126  list_objects_outcome.GetError().GetExceptionName() +
127  ": " + list_objects_outcome.GetError().GetMessage());
128  }
129  }
130  }
131  // continue to read next 1000 files
132  if (list_objects_outcome.GetResult().GetIsTruncated()) {
133  objects_request.SetContinuationToken(
134  list_objects_outcome.GetResult().GetNextContinuationToken());
135  } else {
136  break;
137  }
138  }
139  } catch (...) {
140  throw;
141  }
142 }
143 
144 // a bit complicated with S3 archive of parquet files is that these files
145 // use parquet api (not libarchive) and the files must be landed locally
146 // to be imported. besides, since parquet archives are often big in size
147 // to avoid extra EBS cost to customers, generally we don't want to land
148 // them at once but one by one.
149 //
150 // likely in the future there will be other file types that need to
151 // land entirely to be imported... (avro?)
152 const std::string S3Archive::land(const std::string& objkey,
153  std::exception_ptr& teptr,
154  const bool for_detection) {
155  // 7z file needs entire landing; other file types use a named pipe
156  static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
157  // need a dummy ext b/c no-ext now indicate plain_text
158  std::string file_path = s3_temp_dir + "/s3tmp_" + std::to_string(++seqno) + ".s3";
159  boost::filesystem::remove(file_path);
160 
161  auto ext = strrchr(objkey.c_str(), '.');
162  auto use_pipe = (nullptr == ext || 0 != strcmp(ext, ".7z"));
163 #ifdef ENABLE_IMPORT_PARQUET
164  use_pipe = use_pipe && (nullptr == ext || 0 != strcmp(ext, ".parquet"));
165 #endif
166  if (use_pipe) {
167  if (mkfifo(file_path.c_str(), 0660) < 0) {
168  throw std::runtime_error("failed to create named pipe '" + file_path +
169  "': " + strerror(errno));
170  }
171  }
172 
173  /*
174  Here is the background info that makes the thread interaction here a bit subtle:
175  1) We need two threading modes for the `th_writer` thread below:
176  a) synchronous mode to land .7z files or any file that must land fully as a local
177  file before it can be processed by libarchive. b) asynchronous mode to land a file
178  that can be processed by libarchive as a stream. With this mode, the file is streamed
179  into a temporary named pipe. 2) Cooperating with the `th_writer` thread is the
180  `th_pipe_writer` thread in Importer.cpp. For mode b), th_pipe_writer thread reads data
181  from the named pipe written by th_writer. Before it reads, it needs to open the pipe.
182  It will be blocked indefinitely (hang) if th_writer exits from any error before
183  th_pipe_writer opens the pipe. 3) AWS S3 client s3_client->GetObject returns an
184  'object' rather than a pointer to the object. That makes it hard to use smart pointer
185  for RAII. Calling s3_client->GetObject in th_writer body appears to the immediate
186  approach. 4) If s3_client->GetObject were called inside th_writer and th_writer is in
187  async mode, a tragic scenario is that th_writer receives an error (eg. bad
188  credentials) from AWS S3 server then quits when th_pipe_writer has proceeded to open
189  the named pipe and get blocked (hangs).
190 
191  So a viable approach is to move s3_client->GetObject out of th_writer body but *move*
192  the `object outcome` into th_writer body. This way we can better assure any error of
193  s3_client->GetObject will be thrown immediately to upstream (ie. th_pipe_writer) and
194  `object outcome` will be released later after the object is landed.
195  */
196  Aws::S3::Model::GetObjectRequest object_request;
197  object_request.WithBucket(bucket_name).WithKey(objkey);
198 
199  // set a download byte range (max 10mb) to avoid getting stuck on detecting big s3 files
200  if (use_pipe && for_detection) {
201  object_request.SetRange("bytes=0-10000000");
202  }
203 
204  auto get_object_outcome = s3_client->GetObject(object_request);
205  if (!get_object_outcome.IsSuccess()) {
206  throw std::runtime_error("failed to get object '" + objkey + "' of s3 url '" + url +
207  "': " + get_object_outcome.GetError().GetExceptionName() +
208  ": " + get_object_outcome.GetError().GetMessage());
209  }
210 
211  // streaming means asynch
212  std::atomic<bool> is_get_object_outcome_moved(false);
213  // fix a race between S3Archive::land and S3Archive::~S3Archive on S3Archive itself
214  auto& bucket_name = this->bucket_name;
215  auto th_writer =
216  std::thread([=, &teptr, &get_object_outcome, &is_get_object_outcome_moved]() {
217  try {
218  // this static mutex protect the static google::last_tm_time_for_raw_log from
219  // concurrent LOG(INFO)s that call RawLog__SetLastTime to write the variable!
220  static std::mutex mutex_glog;
221 #define S3_LOG_WITH_LOCK(x) \
222  { \
223  std::lock_guard<std::mutex> lock(mutex_glog); \
224  x; \
225  }
226  S3_LOG_WITH_LOCK(LOG(INFO) << "downloading s3://" << bucket_name << "/"
227  << objkey << " to " << (use_pipe ? "pipe " : "file ")
228  << file_path << "...")
229  auto get_object_outcome_moved =
230  decltype(get_object_outcome)(std::move(get_object_outcome));
231  is_get_object_outcome_moved = true;
232  Aws::OFStream local_file;
233  local_file.open(file_path.c_str(),
234  std::ios::out | std::ios::binary | std::ios::trunc);
235  local_file << get_object_outcome_moved.GetResult().GetBody().rdbuf();
237  << "downloaded s3://" << bucket_name << "/" << objkey << " to "
238  << (use_pipe ? "pipe " : "file ") << file_path << ".")
239  } catch (...) {
240  // need this way to capture any exception occurring when
241  // this thread runs as a disjoint asynchronous thread
242  if (use_pipe) {
243  teptr = std::current_exception();
244  } else {
245  throw;
246  }
247  }
248  });
249 
250  if (use_pipe) {
251  // in async (pipe) case, this function needs to wait for get_object_outcome
252  // to be moved before it can exits; otherwise, the move() above will boom!!
253  while (!is_get_object_outcome_moved) {
254  std::this_thread::yield();
255  }
256 
257  // no more detach this thread b/c detach thread is not possible to terminate
258  // safely. when sanity test exits and glog is destructed too soon, the LOG(INFO)
259  // above may still be holding glog rwlock while glog dtor tries to destruct the lock,
260  // this causing a race, though unlikely this race would happen in production env.
261  threads.push_back(std::move(th_writer));
262  // join is delayed to ~S3Archive; any exception happening to rdbuf()
263  // is passed to the upstream Importer th_pipe_writer thread via teptr.
264  } else {
265  try {
266  th_writer.join();
267  } catch (...) {
268  throw;
269  }
270  }
271 
272  file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
273  return file_path;
274 }
275 
276 void S3Archive::vacuum(const std::string& objkey) {
277  auto it = file_paths.find(objkey);
278  if (file_paths.end() == it) {
279  return;
280  }
281  boost::filesystem::remove(it->second);
282  file_paths.erase(it);
283 }
std::string s3_endpoint
Definition: S3Archive.h:136
std::string s3_region
Definition: S3Archive.h:135
#define LOG(tag)
Definition: Logger.h:203
size_t total_file_size
Definition: S3Archive.h:146
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
Definition: S3Archive.h:112
std::string prefix_name
Definition: S3Archive.h:140
std::string bucket_name
Definition: S3Archive.h:139
std::string to_string(char const *&&v)
std::string s3_access_key
Definition: S3Archive.h:132
SslConfig get_ssl_config()
std::map< const std::string, const std::string > file_paths
Definition: S3Archive.h:145
std::optional< std::string > file_sort_regex
Definition: S3Archive.h:143
bool g_enable_smem_group_by true
std::optional< std::string > regex_path_filter
Definition: S3Archive.h:141
void init_for_read() override
Definition: S3Archive.h:101
std::optional< std::string > file_sort_order_by
Definition: S3Archive.h:142
std::string s3_session_token
Definition: S3Archive.h:134
std::string s3_temp_dir
Definition: S3Archive.h:137
const std::string url_part(const int i)
Definition: Archive.h:196
virtual int open()
Definition: Archive.h:145
std::string url
Definition: Archive.h:201
std::string s3_secret_key
Definition: S3Archive.h:133
bool g_allow_s3_server_privileges
Definition: S3Archive.cpp:34
std::vector< std::string > objkeys
Definition: S3Archive.h:144
void vacuum(const std::string &objkey)
Definition: S3Archive.h:117
std::vector< Aws::S3::Model::Object > s3_objects_filter_sort_files(const std::vector< Aws::S3::Model::Object > &file_paths, const std::optional< std::string > &filter_regex, const std::optional< std::string > &sort_by, const std::optional< std::string > &sort_regex)
#define S3_LOG_WITH_LOCK(x)