OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ThriftClient.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "Shared/ThriftClient.h"
18 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
19 #include "Shared/ThriftConfig.h"
20 #endif
21 
22 #include <iostream>
23 #include <sstream>
24 
25 #include <boost/algorithm/string.hpp>
26 #include <boost/core/ignore_unused.hpp>
27 #include <boost/filesystem.hpp>
28 
29 #include <thrift/protocol/TBinaryProtocol.h>
30 #include <thrift/transport/THttpClient.h>
31 #include <thrift/transport/TSocket.h>
33 
34 using namespace ::apache::thrift::transport;
35 using namespace ::apache::thrift::protocol;
37 
38 void check_standard_ca(std::string& ca_cert_file) {
39  if (ca_cert_file.empty()) {
40  static std::list<std::string> v_known_ca_paths({
41  "/etc/ssl/certs/ca-certificates.crt",
42  "/etc/pki/tls/certs/ca-bundle.crt",
43  "/usr/share/ssl/certs/ca-bundle.crt",
44  "/usr/local/share/certs/ca-root.crt",
45  "/etc/ssl/cert.pem",
46  "/etc/ssl/ca-bundle.pem",
47  });
48  for (const auto& known_ca_path : v_known_ca_paths) {
49  if (boost::filesystem::exists(known_ca_path)) {
50  ca_cert_file = known_ca_path;
51  break;
52  }
53  }
54  }
55 }
56 
57 class InsecureAccessManager : public AccessManager {
58  public:
59  Decision verify(const sockaddr_storage& sa) throw() override {
60  boost::ignore_unused(sa);
61  return ALLOW;
62  };
63  Decision verify(const std::string& host, const char* name, int size) throw() override {
64  boost::ignore_unused(host);
65  boost::ignore_unused(name);
66  boost::ignore_unused(size);
67  return ALLOW;
68  };
69  Decision verify(const sockaddr_storage& sa,
70  const char* data,
71  int size) throw() override {
72  boost::ignore_unused(sa);
73  boost::ignore_unused(data);
74  boost::ignore_unused(size);
75  return ALLOW;
76  };
77 };
78 
79 /*
80  * The Http client that comes with Thrift constructs a very simple set of HTTP
81  * headers, ignoring cookies. This class simply inherits from THttpClient to
82  * override the two methods - parseHeader (where it collects any cookies) and
83  * flush where it inserts the cookies into the http header.
84  *
85  * The methods that are over ridden here are virtual in the parent class, as is
86  * the parents class's destructor.
87  *
88  */
89 class ProxyTHttpClient : public THttpClient {
90  public:
91  // mimic and call the super constructors.
92  ProxyTHttpClient(std::shared_ptr<TTransport> transport,
93  std::string host,
94  std::string path)
95 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
96  : THttpClient(transport, host, path, shared::default_tconfig()) {
97  }
98 #else
99  : THttpClient(transport, host, path) {
100  }
101 #endif
102 
103  ProxyTHttpClient(std::string host, int port, std::string path)
104 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
105  : THttpClient(host, port, path, shared::default_tconfig()) {
106  }
107 #else
108  : THttpClient(host, port, path) {
109  }
110 #endif
111 
112  ~ProxyTHttpClient() override {}
113  // thrift parseHeader d and call the super constructor.
114  void parseHeader(char* header) override {
115  // note boost::istarts_with is case insensitive
116  if (boost::istarts_with(header, "set-cookie:")) {
117  std::string tmp(header);
118  std::string cookie = tmp.substr(tmp.find(":") + 1, std::string::npos);
119  cookies_.push_back(cookie);
120  }
121  THttpClient::parseHeader(header);
122  }
123 
124  void flush() override {
125  /*
126  * Unfortunately the decision to write the header and the body in the same
127  * method precludes using the parent class's flush method here; in what is
128  * effectively a copy of 'flush' in THttpClient with the addition of
129  * cookies, a better error report for a header that is too large and
130  * 'Connection: keep-alive'.
131  */
132  uint8_t* buf;
133  uint32_t len;
134  writeBuffer_.getBuffer(&buf, &len);
135 
136  constexpr static const char* CRLF = "\r\n";
137 
138  std::ostringstream h;
139  h << "POST " << path_ << " HTTP/1.1" << CRLF << "Host: " << host_ << CRLF
140  << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
141  << "Accept: application/x-thrift" << CRLF << "User-Agent: Thrift/"
142  << THRIFT_PACKAGE_VERSION << " (C++/THttpClient)" << CRLF
143  << "Connection: keep-alive" << CRLF;
144  if (!cookies_.empty()) {
145  std::string cookie = "Cookie:" + boost::algorithm::join(cookies_, ";");
146  h << cookie << CRLF;
147  }
148  h << CRLF;
149 
150  cookies_.clear();
151  std::string header = h.str();
152  if (header.size() > (std::numeric_limits<uint32_t>::max)()) {
153  throw TTransportException(
154  "Header too big [" + std::to_string(header.size()) +
155  "]. Max = " + std::to_string((std::numeric_limits<uint32_t>::max)()));
156  }
157  // Write the header, then the data, then flush
158  transport_->write((const uint8_t*)header.c_str(),
159  static_cast<uint32_t>(header.size()));
160  transport_->write(buf, len);
161  transport_->flush();
162 
163  // Reset the buffer and header variables
164  writeBuffer_.resetBuffer();
165  readHeaders_ = true;
166  }
167 
168  std::vector<std::string> cookies_;
169 };
171 ThriftClientConnection::ThriftClientConnection(const std::string& server_host,
172  const int port,
173  const ThriftConnectionType conn_type,
174  bool skip_host_verify,
175  std::shared_ptr<TSSLSocketFactory> factory)
176  : server_host_(server_host)
177  , port_(port)
178  , conn_type_(conn_type)
179  , skip_host_verify_(skip_host_verify)
180  , trust_cert_file_("") {
181  if (factory && (conn_type_ == ThriftConnectionType::BINARY_SSL ||
183  using_X509_store_ = true;
184  factory_ = factory;
185  factory_->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
186  if (skip_host_verify_) {
187  factory_->access(
188  std::shared_ptr<InsecureAccessManager>(new InsecureAccessManager()));
189  }
190  }
191 }
192 
193 std::shared_ptr<TProtocol> ThriftClientConnection::get_protocol() {
194  std::shared_ptr<apache::thrift::transport::TTransport> mytransport;
198  port_,
202 
203  } else {
205  }
206 
207  try {
208  mytransport->open();
209  } catch (const apache::thrift::TException& e) {
210  throw apache::thrift::TException(std::string(e.what()) + ": host " + server_host_ +
211  ", port " + std::to_string(port_));
212  }
215  return std::shared_ptr<TProtocol>(new TJSONProtocol(mytransport));
216  } else {
217  return std::shared_ptr<TProtocol>(new TBinaryProtocol(mytransport));
218  }
219 }
220 
222  const std::string& server_host,
223  const int port,
224  const std::string& ca_cert_name,
225  bool with_timeout,
226  bool with_keepalive,
227  unsigned connect_timeout,
228  unsigned recv_timeout,
229  unsigned send_timeout) {
230  std::shared_ptr<TTransport> transport;
231 
232  if (!factory_ && !ca_cert_name.empty()) {
233  // need to build a factory once for ssl conection
234  factory_ =
235  std::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory(SSLProtocol::SSLTLS));
236  factory_->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
237  factory_->loadTrustedCertificates(ca_cert_name.c_str());
238  factory_->authenticate(false);
239  factory_->access(std::shared_ptr<InsecureAccessManager>(new InsecureAccessManager()));
240  }
241  if (!using_X509_store_ && ca_cert_name.empty()) {
242 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
243  const auto socket =
244  std::make_shared<TSocket>(server_host, port, shared::default_tconfig());
245 #else
246  const auto socket = std::make_shared<TSocket>(server_host, port);
247 #endif
248  if (with_timeout) {
249  socket->setKeepAlive(with_keepalive);
250  socket->setConnTimeout(connect_timeout);
251  socket->setRecvTimeout(recv_timeout);
252  socket->setSendTimeout(send_timeout);
253 #ifdef __APPLE__
254  socket->setLinger(false, 0);
255 #endif
256  }
257 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
258  transport = std::make_shared<TBufferedTransport>(socket, shared::default_tconfig());
259 #else
260  transport = std::make_shared<TBufferedTransport>(socket);
261 #endif
262  } else {
263  std::shared_ptr<TSocket> secure_socket = factory_->createSocket(server_host, port);
264  if (with_timeout) {
265  secure_socket->setKeepAlive(with_keepalive);
266  secure_socket->setConnTimeout(connect_timeout);
267  secure_socket->setRecvTimeout(recv_timeout);
268  secure_socket->setSendTimeout(send_timeout);
269 #ifdef __APPLE__
270  secure_socket->setLinger(false, 0);
271 #endif
272  }
273 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
274  transport = std::shared_ptr<TTransport>(
275  new TBufferedTransport(secure_socket, shared::default_tconfig()));
276 #else
277  transport = std::shared_ptr<TTransport>(new TBufferedTransport(secure_socket));
278 #endif
279  }
280 
281  return transport;
282 }
283 
285  const std::string& server_host,
286  const int port,
287  const std::string& trust_cert_fileX,
288  bool use_https,
289  bool skip_verify) {
290  trust_cert_file_ = trust_cert_fileX;
292 
293  if (!factory_) {
294  factory_ =
295  std::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory(SSLProtocol::SSLTLS));
296  }
297  std::shared_ptr<TTransport> transport;
298  std::shared_ptr<TTransport> socket;
299  if (use_https) {
300  if (skip_verify) {
301  factory_->authenticate(false);
302  factory_->access(
303  std::shared_ptr<InsecureAccessManager>(new InsecureAccessManager()));
304  }
305  if (!using_X509_store_) {
306  factory_->loadTrustedCertificates(trust_cert_file_.c_str());
307  }
308  socket = factory_->createSocket(server_host, port);
309  // transport = std::shared_ptr<TTransport>(new THttpClient(socket,
310  // server_host,
311  // "/"));
312  transport =
313  std::shared_ptr<TTransport>(new ProxyTHttpClient(socket, server_host, "/"));
314  } else {
315  transport = std::shared_ptr<TTransport>(new ProxyTHttpClient(server_host, port, "/"));
316  }
317  return transport;
318 }
ThriftConnectionType conn_type_
Definition: ThriftClient.h:77
unsigned connect_timeout
void check_standard_ca(std::string &ca_cert_file)
ThriftConnectionType
Definition: ThriftClient.h:29
std::shared_ptr< TTransport > open_http_client_transport(const std::string &server_host, const int port, const std::string &trust_cert_file_, bool use_https, bool skip_verify)
std::string join(T const &container, std::string const &delim)
virtual ~ThriftClientConnection()
unsigned send_timeout
std::string to_string(char const *&&v)
std::shared_ptr< apache::thrift::TConfiguration > default_tconfig()
Definition: ThriftConfig.h:26
Decision verify(const sockaddr_storage &sa) override
Decision verify(const std::string &host, const char *name, int size) override
std::shared_ptr< TTransport > open_buffered_client_transport(const std::string &server_host, const int port, const std::string &ca_cert_name, const bool with_timeout=false, const bool with_keepalive=true, const unsigned connect_timeout=0, const unsigned recv_timeount=0, const unsigned send_timeout=0)
ProxyTHttpClient(std::shared_ptr< TTransport > transport, std::string host, std::string path)
Decision verify(const sockaddr_storage &sa, const char *data, int size) override
std::string ca_cert_name_
Definition: ThriftClient.h:79
bool with_keepalive
std::string trust_cert_file_
Definition: ThriftClient.h:80
unsigned recv_timeout
std::string server_host_
Definition: ThriftClient.h:75
std::shared_ptr< TProtocol > get_protocol()
std::shared_ptr< TSSLSocketFactory > factory_
Definition: ThriftClient.h:82
string name
Definition: setup.in.py:72
AccessManager::Decision Decision