OmniSciDB  2e3a973ef4
CsvReaderS3.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <aws/core/auth/AWSCredentialsProvider.h>
17 #include <aws/s3/model/GetObjectRequest.h>
18 #include <aws/s3/model/ListObjectsV2Request.h>
19 #include <aws/s3/model/Object.h>
20 
21 #include "Catalog/ForeignServer.h"
24 
25 namespace foreign_storage {
26 
27 namespace {
28 
29 Aws::Client::ClientConfiguration get_s3_config(const ForeignServer* server_options) {
30  Aws::Client::ClientConfiguration s3_config;
31  s3_config.region = server_options->options.find(ForeignServer::AWS_REGION_KEY)->second;
32 
33  // Find SSL certificate trust store to connect to S3
34  std::list<std::string> v_known_ca_paths({
35  "/etc/ssl/certs/ca-certificates.crt",
36  "/etc/pki/tls/certs/ca-bundle.crt",
37  "/usr/share/ssl/certs/ca-bundle.crt",
38  "/usr/local/share/certs/ca-root.crt",
39  "/etc/ssl/cert.pem",
40  "/etc/ssl/ca-bundle.pem",
41  });
42  char* env;
43  if (nullptr != (env = getenv("SSL_CERT_DIR"))) {
44  s3_config.caPath = env;
45  }
46  if (nullptr != (env = getenv("SSL_CERT_FILE"))) {
47  v_known_ca_paths.push_front(env);
48  }
49  for (const auto& known_ca_path : v_known_ca_paths) {
50  if (boost::filesystem::exists(known_ca_path)) {
51  s3_config.caFile = known_ca_path;
52  break;
53  }
54  }
55  return s3_config;
56 }
57 
58 Aws::S3::Model::GetObjectRequest create_request(const std::string& bucket_name,
59  const std::string& obj_name,
60  size_t start = 0,
61  size_t end = 0) {
62  CHECK(start <= end);
63  Aws::S3::Model::GetObjectRequest object_request;
64  object_request.WithBucket(bucket_name).WithKey(obj_name);
65  if (end > 0) {
66  object_request.SetRange(std::string("bytes=") + std::to_string(start) + "-" +
68  }
69  return object_request;
70 }
71 
72 std::string get_access_error_message(const std::string& bucket,
73  const std::string& object_name,
74  const std::string& exception_name,
75  const std::string& message) {
76  return "Unable to access s3 file: " + bucket + "/" + object_name + ". " +
77  exception_name + ": " + message;
78 }
79 
80 std::shared_ptr<Aws::Auth::AWSCredentialsProvider> get_credentials(
81  const UserMapping* user_mapping) {
82  if (user_mapping) {
83  const auto options = user_mapping->getUnencryptedOptions();
84  if (options.find(UserMapping::S3_ACCESS_KEY) != options.end() &&
85  options.find(UserMapping::S3_SECRET_KEY) != options.end()) {
86  return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
87  options.find(UserMapping::S3_ACCESS_KEY)->second,
88  options.find(UserMapping::S3_SECRET_KEY)->second);
89  }
90  }
91  return std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
92 }
93 
94 } // namespace
95 
96 CsvReaderS3::CsvReaderS3(const std::string& obj_key,
97  size_t file_size,
98  const import_export::CopyParams& copy_params,
99  const ForeignServer* server_options,
100  const UserMapping* user_mapping)
101  : CsvReader(obj_key, copy_params)
102  , file_size_(file_size)
103  , scan_finished_(false)
104  , obj_key_(obj_key)
105  , copy_params_(copy_params)
106  , current_offset_(0)
107  , header_offset_(0) {
108  bucket_name_ = server_options->options.find(ForeignServer::S3_BUCKET_KEY)->second;
109  s3_client_.reset(new Aws::S3::S3Client(get_credentials(user_mapping),
110  get_s3_config(server_options)));
111  skipHeader();
112  if (header_offset_ >= file_size_) {
113  scan_finished_ = true;
114  }
116 }
117 
118 CsvReaderS3::CsvReaderS3(const std::string& obj_key,
119  const import_export::CopyParams& copy_params,
120  const ForeignServer* server_options,
121  const UserMapping* user_mapping,
122  const rapidjson::Value& value)
123  : CsvReader(obj_key, copy_params)
124  , scan_finished_(false)
125  , obj_key_(obj_key)
126  , copy_params_(copy_params)
127  , current_offset_(0)
128  , header_offset_(0) {
129  bucket_name_ = server_options->options.find(ForeignServer::S3_BUCKET_KEY)->second;
130  s3_client_.reset(new Aws::S3::S3Client(get_credentials(user_mapping),
131  get_s3_config(server_options)));
132  scan_finished_ = true;
133  json_utils::get_value_from_object(value, header_offset_, "header_offset");
134  json_utils::get_value_from_object(value, file_size_, "file_size");
135 }
136 
137 void CsvReaderS3::serialize(rapidjson::Value& value,
138  rapidjson::Document::AllocatorType& allocator) const {
140  json_utils::add_value_to_object(value, header_offset_, "header_offset", allocator);
141  json_utils::add_value_to_object(value, file_size_, "file_size", allocator);
142 };
143 
144 size_t CsvReaderS3::read(void* buffer, size_t max_size) {
145  size_t byte_start = header_offset_ + current_offset_;
146  size_t byte_end = byte_start + max_size;
147  auto object_request = create_request(bucket_name_, obj_key_, byte_start, byte_end);
148  auto get_object_outcome = s3_client_->GetObject(object_request);
149 
150  if (!get_object_outcome.IsSuccess()) {
151  throw std::runtime_error{
153  obj_key_,
154  get_object_outcome.GetError().GetExceptionName(),
155  get_object_outcome.GetError().GetMessage())};
156  }
157  get_object_outcome.GetResult().GetBody().read(static_cast<char*>(buffer), max_size);
158 
159  size_t read_bytes = get_object_outcome.GetResult().GetBody().gcount();
160  current_offset_ += read_bytes;
161  if (current_offset_ + header_offset_ >= file_size_) {
162  scan_finished_ = true;
163  }
164  return read_bytes;
165 }
166 
169  size_t header_size = 1024;
170  bool header_found = false;
171  while (header_found == false) {
172  auto object_request = create_request(bucket_name_, obj_key_, 0, header_size);
173 
174  std::unique_ptr<char[]> header_buff = std::make_unique<char[]>(header_size);
175  auto get_object_outcome = s3_client_->GetObject(object_request);
176 
177  if (!get_object_outcome.IsSuccess()) {
178  throw std::runtime_error{
180  obj_key_,
181  get_object_outcome.GetError().GetExceptionName(),
182  get_object_outcome.GetError().GetMessage())};
183  }
184 
185  get_object_outcome.GetResult().GetBody().getline((header_buff.get()), header_size);
186  if (get_object_outcome.GetResult().GetBody().fail()) {
187  // We didnt get a full line
188  if (header_size == file_size_) {
189  // File only contains one header line
191  break;
192  }
193  header_size *= 2;
194  if (header_size > file_size_) {
195  header_size = file_size_;
196  }
197  } else {
198  header_offset_ = get_object_outcome.GetResult().GetBody().gcount();
199  header_found = true;
200  }
201  }
202  }
203 }
204 
205 void CsvReaderS3::increaseFileSize(size_t new_size) {
207  CHECK_GT(new_size, file_size_);
209  file_size_ = new_size;
210  scan_finished_ = false;
211 }
212 
213 namespace {
214 
215 using S3FileInfo = std::pair<std::string, size_t>;
216 void list_files_s3(std::unique_ptr<Aws::S3::S3Client>& s3_client,
217  const std::string& prefix_name,
218  const std::string& bucket_name,
219  std::set<S3FileInfo>& file_info_set) {
220  Aws::S3::Model::ListObjectsV2Request objects_request;
221  objects_request.WithBucket(bucket_name);
222  objects_request.WithPrefix(prefix_name);
223  auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
224  if (list_objects_outcome.IsSuccess()) {
225  auto object_list = list_objects_outcome.GetResult().GetContents();
226  if (0 == object_list.size()) {
227  throw std::runtime_error{get_access_error_message(
228  bucket_name, prefix_name, "Error", "No object was found at the given path.")};
229  }
230  // Instantiate CsvReaderS3 for each valid object
231  for (auto const& obj : object_list) {
232  std::string objkey = obj.GetKey().c_str();
233 
234  // skip keys with trailing / or basename with heading '.'
235  boost::filesystem::path path{objkey};
236  if (0 == obj.GetSize()) {
237  continue;
238  }
239  if ('/' == objkey.back()) {
240  continue;
241  }
242  if ('.' == path.filename().string().front()) {
243  continue;
244  }
245  // TODO: remove filename restriction on txt when new S3 test datasests are added
246  if (boost::filesystem::extension(path) != ".csv" &&
247  boost::filesystem::extension(path) != ".tsv") {
248  continue;
249  }
250  file_info_set.insert(S3FileInfo(objkey, obj.GetSize()));
251  }
252  } else {
253  throw std::runtime_error{
254  get_access_error_message(bucket_name,
255  prefix_name,
256  list_objects_outcome.GetError().GetExceptionName(),
257  list_objects_outcome.GetError().GetMessage())};
258  }
259 }
260 } // namespace
261 
262 MultiS3Reader::MultiS3Reader(const std::string& prefix_name,
263  const import_export::CopyParams& copy_params,
264  const ForeignServer* foreign_server,
265  const UserMapping* user_mapping)
266  : MultiFileReader(prefix_name, copy_params) {
267  auto credentials = get_credentials(user_mapping);
268  auto config = get_s3_config(foreign_server);
269  s3_client_.reset(new Aws::S3::S3Client(credentials, config));
270  bucket_name_ = foreign_server->options.find(ForeignServer::S3_BUCKET_KEY)->second;
271  std::set<S3FileInfo> file_info_set;
272  list_files_s3(s3_client_, prefix_name, bucket_name_, file_info_set);
273  for (const auto& file_info : file_info_set) {
274  files_.emplace_back(std::make_unique<CsvReaderS3>(
275  file_info.first, file_info.second, copy_params, foreign_server, user_mapping));
276  file_locations_.push_back(file_info.first);
277  file_sizes_.push_back(file_info.second);
278  }
279 }
280 
281 MultiS3Reader::MultiS3Reader(const std::string& file_path,
282  const import_export::CopyParams& copy_params,
283  const ForeignServer* foreign_server,
284  const UserMapping* user_mapping,
285  const rapidjson::Value& value)
286  : MultiFileReader(file_path, copy_params, value) {
287  auto credentials = get_credentials(user_mapping);
288  auto config = get_s3_config(foreign_server);
289  s3_client_.reset(new Aws::S3::S3Client(credentials, config));
290  bucket_name_ = foreign_server->options.find(ForeignServer::S3_BUCKET_KEY)->second;
291  // reconstruct files from metadata
292  CHECK(value.HasMember("files_metadata"));
293  for (size_t index = 0; index < file_locations_.size(); index++) {
294  files_.emplace_back(
295  std::make_unique<CsvReaderS3>(file_locations_[index],
296  copy_params,
297  foreign_server,
298  user_mapping,
299  value["files_metadata"].GetArray()[index]));
300  }
301  json_utils::get_value_from_object(value, file_sizes_, "file_sizes");
302 }
303 
304 void MultiS3Reader::serialize(rapidjson::Value& value,
305  rapidjson::Document::AllocatorType& allocator) const {
306  json_utils::add_value_to_object(value, file_sizes_, "file_sizes", allocator);
307  MultiFileReader::serialize(value, allocator);
308 };
309 
310 void MultiS3Reader::checkForMoreRows(size_t file_offset,
311  const ForeignServer* foreign_server,
312  const UserMapping* user_mapping) {
314  CHECK(file_offset == current_offset_);
315  CHECK(foreign_server != nullptr);
316 
317  // Look for new files
318  std::set<S3FileInfo> file_info_set;
320  int new_files = 0;
321  for (const auto& file_info : file_info_set) {
322  if (std::find(file_locations_.begin(), file_locations_.end(), file_info.first) ==
323  file_locations_.end()) {
324  files_.emplace_back(std::make_unique<CsvReaderS3>(
325  file_info.first, file_info.second, copy_params_, foreign_server, user_mapping));
326  file_locations_.push_back(file_info.first);
327  new_files++;
328  }
329  }
330  // If no new files added and only one file in archive, check for new rows
331  if (new_files == 0 && files_.size() == 1) {
332  if (file_info_set.size() < 1 ||
333  find(file_locations_.begin(),
334  file_locations_.end(),
335  file_info_set.begin()->first) == file_locations_.end()) {
336  throw std::runtime_error{
337  "Foreign table refreshed with APPEND mode missing entry \"" +
338  file_locations_[0] + "\"."};
339  }
340  if (file_info_set.begin()->second < file_sizes_[0]) {
341  throw std::runtime_error{
342  "Refresh of foreign table created with APPEND update mode failed as remote "
343  "file "
344  "reduced in size: \"" +
345  file_locations_[0] + "\"."};
346  }
347 
348  if (file_info_set.begin()->second > file_sizes_[0]) {
349  CsvReaderS3* s3_reader = dynamic_cast<CsvReaderS3*>(files_[0].get());
350  CHECK(s3_reader != nullptr);
351  s3_reader->increaseFileSize(file_info_set.begin()->second);
352  file_sizes_[0] = file_info_set.begin()->second;
353  current_index_ = 0;
354  cumulative_sizes_ = {};
355  }
356  }
357 }
358 
359 } // namespace foreign_storage
std::map< std::string, std::string, std::less<> > options
MultiS3Reader(const std::string &file_path, const import_export::CopyParams &copy_params, const ForeignServer *server_options, const UserMapping *user_mapping)
import_export::CopyParams copy_params_
Definition: CsvReader.h:102
Aws::Client::ClientConfiguration get_s3_config(const ForeignServer *server_options)
Definition: CsvReaderS3.cpp:29
std::string get_access_error_message(const std::string &bucket, const std::string &object_name, const std::string &exception_name, const std::string &message)
Definition: CsvReaderS3.cpp:72
CsvReaderS3(const std::string &obj_key, size_t file_size, const import_export::CopyParams &copy_params, const ForeignServer *server_options, const UserMapping *user_mapping)
Definition: CsvReaderS3.cpp:96
std::vector< size_t > file_sizes_
Definition: CsvReaderS3.h:52
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
#define CHECK_GT(x, y)
Definition: Logger.h:209
std::string to_string(char const *&&v)
ImportHeaderRow has_header
Definition: CopyParams.h:48
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
void increaseFileSize(size_t new_size)
void checkForMoreRows(size_t file_offset, const ForeignServer *server_options, const UserMapping *user_mapping) override
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:51
std::shared_ptr< Aws::Auth::AWSCredentialsProvider > get_credentials(const UserMapping *user_mapping)
Definition: CsvReaderS3.cpp:80
size_t read(void *buffer, size_t max_size) override
std::unique_ptr< Aws::S3::S3Client > s3_client_
Definition: CsvReaderS3.h:94
std::vector< std::unique_ptr< CsvReader > > files_
Definition: CsvReader.h:313
#define CHECK(condition)
Definition: Logger.h:197
bool isScanFinished() override
Definition: CsvReader.h:307
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
std::vector< std::string > file_locations_
Definition: CsvReader.h:314
void list_files_s3(std::unique_ptr< Aws::S3::S3Client > &s3_client, const std::string &prefix_name, const std::string &bucket_name, std::set< S3FileInfo > &file_info_set)
void serialize(rapidjson::Value &value, rapidjson::Document::AllocatorType &allocator) const override
Definition: CsvReader.cpp:495
import_export::CopyParams copy_params_
Definition: CsvReaderS3.h:98
std::vector< size_t > cumulative_sizes_
Definition: CsvReader.h:317
Aws::S3::Model::GetObjectRequest create_request(const std::string &bucket_name, const std::string &obj_name, size_t start=0, size_t end=0)
Definition: CsvReaderS3.cpp:58
size_t file_size(const int fd)
Definition: omnisci_fs.cpp:31