OmniSciDB  bf83d84833
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
S3Archive.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef ARCHIVE_S3ARCHIVE_H_
18 #define ARCHIVE_S3ARCHIVE_H_
19 
20 #include <cstdio>
21 #include <exception>
22 #include <map>
23 #include <thread>
24 #include "Archive.h"
25 
26 #include <openssl/evp.h>
27 
28 #ifdef HAVE_AWS_S3
29 #include <aws/core/Aws.h>
30 #include <aws/s3/S3Client.h>
31 #endif // HAVE_AWS_S3
32 
33 // this is the based archive class for files hosted on AWS S3.
34 // known variants:
35 // . parquet files
36 // . compressed files
37 // no mixed of above is supported yet
38 class S3Archive : public Archive {
39  public:
40  S3Archive(const std::string& url, const bool plain_text) : Archive(url, plain_text) {
41 // init aws api should be singleton because because
42 // it's bad to call Aws::InitAPI and Aws::ShutdownAPI
43 // multiple times.
44 #ifdef HAVE_AWS_S3
45  {
46  std::unique_lock<std::mutex> lck(awsapi_mtx);
47  if (0 == awsapi_count++) {
48  Aws::InitAPI(awsapi_options);
49  }
50  }
51 #endif // HAVE_AWS_S3
52 
53  // these envs are on server side so are global settings
54  // which make few senses in case of private s3 resources
55  char* env;
56  if (0 != (env = getenv("AWS_REGION"))) {
57  s3_region = env;
58  }
59  if (0 != (env = getenv("AWS_ACCESS_KEY_ID"))) {
60  s3_access_key = env;
61  }
62  if (0 != (env = getenv("AWS_SECRET_ACCESS_KEY"))) {
63  s3_secret_key = env;
64  }
65  if (0 != (env = getenv("AWS_ENDPOINT"))) {
66  s3_endpoint = env;
67  }
68  }
69 
70  S3Archive(const std::string& url,
71  const std::string& s3_access_key,
72  const std::string& s3_secret_key,
73  const std::string& s3_region,
74  const std::string& s3_endpoint,
75  const bool plain_text)
76  : S3Archive(url, plain_text) {
77  this->s3_access_key = s3_access_key;
78  this->s3_secret_key = s3_secret_key;
79  this->s3_region = s3_region;
80  this->s3_endpoint = s3_endpoint;
81 
82  // this must be local to omnisci_server not client
83  // or posix dir path accessible to omnisci_server
84  auto env_s3_temp_dir = getenv("TMPDIR");
85  s3_temp_dir = env_s3_temp_dir ? env_s3_temp_dir : "/tmp";
86  }
87 
88  ~S3Archive() override {
89 #ifdef HAVE_AWS_S3
90  for (auto& thread : threads) {
91  if (thread.joinable()) {
92  thread.join();
93  }
94  }
95  std::unique_lock<std::mutex> lck(awsapi_mtx);
96  if (0 == --awsapi_count) {
97  Aws::ShutdownAPI(awsapi_options);
98  }
99 #endif // HAVE_AWS_S3
100  }
101 
102 #ifdef HAVE_AWS_S3
103  void init_for_read() override;
104 #else
105  void init_for_read() override {
106  throw std::runtime_error("AWS S3 support not available");
107  }
108 #endif
109  const std::vector<std::string>& get_objkeys() { return objkeys; }
110 #ifdef HAVE_AWS_S3
111  const std::string land(const std::string& objkey,
112  std::exception_ptr& teptr,
113  const bool for_detection);
114  void vacuum(const std::string& objkey);
115 #else
116  const std::string land(const std::string& objkey,
117  std::exception_ptr& teptr,
118  const bool for_detection) {
119  throw std::runtime_error("AWS S3 support not available");
120  }
121  void vacuum(const std::string& objkey) {
122  throw std::runtime_error("AWS S3 support not available");
123  }
124 #endif // HAVE_AWS_S3
125  size_t get_total_file_size() const { return total_file_size; }
126 
127  private:
128 #ifdef HAVE_AWS_S3
129  static int awsapi_count;
130  static std::mutex awsapi_mtx;
131  static Aws::SDKOptions awsapi_options;
132 
133  std::unique_ptr<Aws::S3::S3Client> s3_client;
134  std::vector<std::thread> threads;
135 #endif // HAVE_AWS_S3
136  std::string s3_access_key; // per-query credentials to override the
137  std::string s3_secret_key; // settings in ~/.aws/credentials or environment
138  std::string s3_region;
139  std::string s3_endpoint;
140  std::string s3_temp_dir;
141 
142  std::string bucket_name;
143  std::string prefix_name;
144  std::vector<std::string> objkeys;
145  std::map<const std::string, const std::string> file_paths;
146  size_t total_file_size{0};
147 };
148 
149 class S3ParquetArchive : public S3Archive {
150  public:
151  S3ParquetArchive(const std::string& url,
152  const std::string& s3_access_key,
153  const std::string& s3_secret_key,
154  const std::string& s3_region,
155  const std::string& s3_endpoint,
156  const bool plain_text)
157  : S3Archive(url, s3_access_key, s3_secret_key, s3_region, s3_endpoint, plain_text) {
158  }
159 };
160 
161 #endif /* ARCHIVE_S3ARCHIVE_H_ */
std::string s3_endpoint
Definition: S3Archive.h:139
std::string s3_region
Definition: S3Archive.h:138
size_t total_file_size
Definition: S3Archive.h:146
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
Definition: S3Archive.h:116
std::string prefix_name
Definition: S3Archive.h:143
std::string bucket_name
Definition: S3Archive.h:142
std::string s3_access_key
Definition: S3Archive.h:136
std::map< const std::string, const std::string > file_paths
Definition: S3Archive.h:145
~S3Archive() override
Definition: S3Archive.h:88
const std::vector< std::string > & get_objkeys()
Definition: S3Archive.h:109
S3Archive(const std::string &url, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_region, const std::string &s3_endpoint, const bool plain_text)
Definition: S3Archive.h:70
void init_for_read() override
Definition: S3Archive.h:105
std::string s3_temp_dir
Definition: S3Archive.h:140
std::string url
Definition: Archive.h:187
std::string s3_secret_key
Definition: S3Archive.h:137
S3Archive(const std::string &url, const bool plain_text)
Definition: S3Archive.h:40
bool plain_text
Definition: Archive.h:191
std::vector< std::string > objkeys
Definition: S3Archive.h:144
S3ParquetArchive(const std::string &url, const std::string &s3_access_key, const std::string &s3_secret_key, const std::string &s3_region, const std::string &s3_endpoint, const bool plain_text)
Definition: S3Archive.h:151
void vacuum(const std::string &objkey)
Definition: S3Archive.h:121
size_t get_total_file_size() const
Definition: S3Archive.h:125