OmniSciDB  0b528656ed
/home/jenkins-slave/workspace/core-os-doxygen/MapDServer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "MapDServer.h"
20 
21 #ifdef HAVE_THRIFT_THREADFACTORY
22 #include <thrift/concurrency/ThreadFactory.h>
23 #else
24 #include <thrift/concurrency/PlatformThreadFactory.h>
25 #endif
26 
27 #include <thrift/concurrency/ThreadManager.h>
28 #include <thrift/protocol/TBinaryProtocol.h>
29 #include <thrift/protocol/TJSONProtocol.h>
30 #include <thrift/server/TThreadedServer.h>
31 #include <thrift/transport/TBufferTransports.h>
32 #include <thrift/transport/THttpServer.h>
33 #include <thrift/transport/TSSLServerSocket.h>
34 #include <thrift/transport/TSSLSocket.h>
35 #include <thrift/transport/TServerSocket.h>
36 
37 #include "Archive/S3Archive.h"
38 #include "Shared/Logger.h"
40 #include "Shared/file_delete.h"
42 #include "Shared/mapd_shared_ptr.h"
43 #include "Shared/scope.h"
44 
45 #include <boost/algorithm/string.hpp>
46 #include <boost/algorithm/string/trim.hpp>
47 #include <boost/filesystem.hpp>
48 #include <boost/locale/generator.hpp>
49 #include <boost/make_shared.hpp>
50 #include <boost/program_options.hpp>
51 
52 #include <csignal>
53 #include <cstdlib>
54 #include <sstream>
55 #include <thread>
56 #include <vector>
57 #include "MapDRelease.h"
58 #include "Shared/Compressor.h"
60 #include "Shared/file_delete.h"
61 #include "Shared/mapd_shared_ptr.h"
62 #include "Shared/scope.h"
63 
64 using namespace ::apache::thrift;
65 using namespace ::apache::thrift::concurrency;
66 using namespace ::apache::thrift::protocol;
67 using namespace ::apache::thrift::server;
68 using namespace ::apache::thrift::transport;
69 
70 extern bool g_enable_thrift_logs;
71 
72 std::atomic<bool> g_running{true};
73 std::atomic<int> g_saw_signal{-1};
74 
76 TThreadedServer* g_thrift_http_server{nullptr};
77 TThreadedServer* g_thrift_buf_server{nullptr};
78 
79 mapd::shared_ptr<DBHandler> g_warmup_handler =
80  0; // global "g_warmup_handler" needed to avoid circular dependency
81 // between "DBHandler" & function "run_warmup_queries"
82 mapd::shared_ptr<DBHandler> g_mapd_handler = 0;
83 std::once_flag g_shutdown_once_flag;
84 
86  if (g_mapd_handler) {
87  std::call_once(g_shutdown_once_flag, []() { g_mapd_handler->shutdown(); });
88  }
89 }
90 
91 void register_signal_handler(int signum, void (*handler)(int)) {
92  struct sigaction act;
93  memset(&act, 0, sizeof(act));
94  if (handler != SIG_DFL && handler != SIG_IGN) {
95  // block all signal deliveries while inside the signal handler
96  sigfillset(&act.sa_mask);
97  }
98  act.sa_handler = handler;
99  sigaction(signum, &act, NULL);
100 }
101 
102 // Signal handler to set a global flag telling the server to exit.
103 // Do not call other functions inside this (or any) signal handler
104 // unless you really know what you are doing. See also:
105 // man 7 signal-safety
106 // man 7 signal
107 // https://en.wikipedia.org/wiki/Reentrancy_(computing)
108 void omnisci_signal_handler(int signum) {
109  // Record the signal number for logging during shutdown.
110  // Only records the first signal if called more than once.
111  int expected_signal{-1};
112  if (!g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
113  return; // this wasn't the first signal
114  }
115 
116  // This point should never be reached more than once.
117 
118  // Tell heartbeat() to shutdown by unsetting the 'g_running' flag.
119  // If 'g_running' is already false, this has no effect and the
120  // shutdown is already in progress.
121  g_running = false;
122 
123  // Handle core dumps specially by pausing inside this signal handler
124  // because on some systems, some signals will execute their default
125  // action immediately when and if the signal handler returns.
126  // We would like to do some emergency cleanup before core dump.
127  if (signum == SIGQUIT || signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE) {
128  // Wait briefly to give heartbeat() a chance to flush the logs and
129  // do any other emergency shutdown tasks.
130  sleep(2);
131 
132  // Explicitly trigger whatever default action this signal would
133  // have done, such as terminate the process or dump core.
134  // Signals are currently blocked so this new signal will be queued
135  // until this signal handler returns.
136  register_signal_handler(signum, SIG_DFL);
137  kill(getpid(), signum);
138  sleep(5);
139 
140 #ifndef __APPLE__
141  // as a last resort, abort
142  // primary used in Docker environments, where we can end up with PID 1 and fail to
143  // catch unix signals
144  quick_exit(signum);
145 #endif
146  }
147 }
148 
156  // Thrift secure socket can cause problems with SIGPIPE
157  register_signal_handler(SIGPIPE, SIG_IGN);
158 }
159 
160 void start_server(TThreadedServer& server, const int port) {
161  try {
162  server.serve();
163  if (errno != 0) {
164  throw std::runtime_error(std::string("Thrift server exited: ") +
165  std::strerror(errno));
166  }
167  } catch (std::exception& e) {
168  LOG(ERROR) << "Exception: " << e.what() << ": port " << port << std::endl;
169  }
170 }
171 
172 void releaseWarmupSession(TSessionId& sessionId, std::ifstream& query_file) {
173  query_file.close();
174  if (sessionId != g_warmup_handler->getInvalidSessionId()) {
175  g_warmup_handler->disconnect(sessionId);
176  }
177 }
178 
179 void run_warmup_queries(mapd::shared_ptr<DBHandler> handler,
180  std::string base_path,
181  std::string query_file_path) {
182  // run warmup queries to load cache if requested
183  if (query_file_path.empty()) {
184  return;
185  }
186  LOG(INFO) << "Running DB warmup with queries from " << query_file_path;
187  try {
188  g_warmup_handler = handler;
189  std::string db_info;
190  std::string user_keyword, user_name, db_name;
191  std::ifstream query_file;
194  TSessionId sessionId = g_warmup_handler->getInvalidSessionId();
195 
196  ScopeGuard session_guard = [&] { releaseWarmupSession(sessionId, query_file); };
197  query_file.open(query_file_path);
198  while (std::getline(query_file, db_info)) {
199  if (db_info.length() == 0) {
200  continue;
201  }
202  std::istringstream iss(db_info);
203  iss >> user_keyword >> user_name >> db_name;
204  if (user_keyword.compare(0, 4, "USER") == 0) {
205  // connect to DB for given user_name/db_name with super_user_rights (without
206  // password), & start session
207  g_warmup_handler->super_user_rights_ = true;
208  g_warmup_handler->connect(sessionId, user_name, "", db_name);
209  g_warmup_handler->super_user_rights_ = false;
210 
211  // read and run one query at a time for the DB with the setup connection
212  TQueryResult ret;
213  std::string single_query;
214  while (std::getline(query_file, single_query)) {
215  boost::algorithm::trim(single_query);
216  if (single_query.length() == 0 || single_query[0] == '-') {
217  continue;
218  }
219  if (single_query[0] == '}') {
220  single_query.clear();
221  break;
222  }
223  if (single_query.find(';') == single_query.npos) {
224  std::string multiline_query;
225  std::getline(query_file, multiline_query, ';');
226  single_query += multiline_query;
227  }
228 
229  try {
230  g_warmup_handler->sql_execute(ret, sessionId, single_query, true, "", -1, -1);
231  } catch (...) {
232  LOG(WARNING) << "Exception while executing '" << single_query
233  << "', ignoring";
234  }
235  single_query.clear();
236  }
237 
238  // stop session and disconnect from the DB
239  g_warmup_handler->disconnect(sessionId);
240  sessionId = g_warmup_handler->getInvalidSessionId();
241  } else {
242  LOG(WARNING) << "\nSyntax error in the file: " << query_file_path.c_str()
243  << " Missing expected keyword USER. Following line will be ignored: "
244  << db_info.c_str() << std::endl;
245  }
246  db_info.clear();
247  }
248  } catch (...) {
249  LOG(WARNING) << "Exception while executing warmup queries. "
250  << "Warmup may not be fully completed. Will proceed nevertheless."
251  << std::endl;
252  }
253 }
254 
255 void heartbeat() {
256  // Block all signals for this heartbeat thread, only.
257  sigset_t set;
258  sigfillset(&set);
259  int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
260  if (result != 0) {
261  throw std::runtime_error("heartbeat() thread startup failed");
262  }
263 
264  // Sleep until omnisci_signal_handler or anything clears the g_running flag.
265  VLOG(1) << "heartbeat thread starting";
266  while (::g_running) {
267  using namespace std::chrono;
268  std::this_thread::sleep_for(1s);
269  }
270  VLOG(1) << "heartbeat thread exiting";
271 
272  // Get the signal number if there was a signal.
273  int signum = g_saw_signal;
274  if (signum >= 1 && signum != SIGTERM) {
275  LOG(INFO) << "Interrupt signal (" << signum << ") received.";
276  }
277 
278  // if dumping core, try to do some quick stuff
279  if (signum == SIGQUIT || signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE) {
280  if (g_mapd_handler) {
281  std::call_once(g_shutdown_once_flag,
282  []() { g_mapd_handler->emergency_shutdown(); });
283  }
285  return;
286  // core dump should begin soon after this, see omnisci_signal_handler()
287  }
288 
289  // trigger an orderly shutdown by telling Thrift to stop serving
290  {
291  mapd_shared_lock<mapd_shared_mutex> read_lock(g_thrift_mutex);
292  auto httpserv = g_thrift_http_server;
293  if (httpserv) {
294  httpserv->stop();
295  }
296  auto bufserv = g_thrift_buf_server;
297  if (bufserv) {
298  bufserv->stop();
299  }
300  // main() should return soon after this
301  }
302 }
303 
304 int startMapdServer(CommandLineOptions& prog_config_opts, bool start_http_server = true) {
305  // try to enforce an orderly shutdown even after a signal
307 
308  // register shutdown procedures for when a normal shutdown happens
309  // be aware that atexit() functions run in reverse order
310  atexit(&logger::shutdown);
311  atexit(&shutdown_handler);
312 
313 #ifdef HAVE_AWS_S3
314  // hold a s3 archive here to survive from a segfault that happens on centos
315  // when s3 transactions and others openssl-ed sessions are interleaved...
316  auto s3_survivor = std::make_unique<S3Archive>("s3://omnisci/s3_survivor.txt", true);
317 #endif
318 
319  // start background thread to clean up _DELETE_ME files
320  const unsigned int wait_interval =
321  3; // wait time in secs after looking for deleted file before looking again
322  std::thread file_delete_thread(file_delete,
323  std::ref(g_running),
324  wait_interval,
325  prog_config_opts.base_path + "/mapd_data");
326  std::thread heartbeat_thread(heartbeat);
327 
328  if (!g_enable_thrift_logs) {
329  apache::thrift::GlobalOutput.setOutputFunction([](const char* msg) {});
330  }
331 
333  // Use the locale setting of the server by default. The generate parameter can be
334  // updated appropriately if a locale override option is ever supported.
335  boost::locale::generator generator;
336  std::locale::global(generator.generate(""));
337  }
338 
339  try {
341  mapd::make_shared<DBHandler>(prog_config_opts.db_leaves,
342  prog_config_opts.string_leaves,
343  prog_config_opts.base_path,
344  prog_config_opts.cpu_only,
345  prog_config_opts.allow_multifrag,
346  prog_config_opts.jit_debug,
347  prog_config_opts.intel_jit_profile,
348  prog_config_opts.read_only,
349  prog_config_opts.allow_loop_joins,
350  prog_config_opts.enable_rendering,
351  prog_config_opts.enable_auto_clear_render_mem,
352  prog_config_opts.render_oom_retry_threshold,
353  prog_config_opts.render_mem_bytes,
354  prog_config_opts.max_concurrent_render_sessions,
355  prog_config_opts.num_gpus,
356  prog_config_opts.start_gpu,
357  prog_config_opts.reserved_gpu_mem,
358  prog_config_opts.render_compositor_use_last_gpu,
359  prog_config_opts.num_reader_threads,
360  prog_config_opts.authMetadata,
361  prog_config_opts.system_parameters,
362  prog_config_opts.enable_legacy_syntax,
363  prog_config_opts.idle_session_duration,
364  prog_config_opts.max_session_duration,
365  prog_config_opts.enable_runtime_udf,
366  prog_config_opts.udf_file_name,
367  prog_config_opts.udf_compiler_path,
368  prog_config_opts.udf_compiler_options
369 #ifdef ENABLE_GEOS
370  ,
371  prog_config_opts.libgeos_so_filename
372 #endif
373  );
374  } catch (const std::exception& e) {
375  LOG(FATAL) << "Failed to initialize service handler: " << e.what();
376  }
377 
378  mapd::shared_ptr<TServerSocket> serverSocket;
379  mapd::shared_ptr<TServerSocket> httpServerSocket;
380  if (!prog_config_opts.system_parameters.ssl_cert_file.empty() &&
381  !prog_config_opts.system_parameters.ssl_key_file.empty()) {
382  mapd::shared_ptr<TSSLSocketFactory> sslSocketFactory;
383  sslSocketFactory =
384  mapd::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory(SSLProtocol::SSLTLS));
385  sslSocketFactory->loadCertificate(
386  prog_config_opts.system_parameters.ssl_cert_file.c_str());
387  sslSocketFactory->loadPrivateKey(
388  prog_config_opts.system_parameters.ssl_key_file.c_str());
389  if (prog_config_opts.system_parameters.ssl_transport_client_auth) {
390  sslSocketFactory->authenticate(true);
391  } else {
392  sslSocketFactory->authenticate(false);
393  }
394  sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
395  serverSocket = mapd::shared_ptr<TServerSocket>(new TSSLServerSocket(
396  prog_config_opts.system_parameters.omnisci_server_port, sslSocketFactory));
397  httpServerSocket = mapd::shared_ptr<TServerSocket>(
398  new TSSLServerSocket(prog_config_opts.http_port, sslSocketFactory));
399  LOG(INFO) << " OmniSci server using encrypted connection. Cert file ["
400  << prog_config_opts.system_parameters.ssl_cert_file << "], key file ["
401  << prog_config_opts.system_parameters.ssl_key_file << "]";
402  } else {
403  LOG(INFO) << " OmniSci server using unencrypted connection";
404  serverSocket = mapd::shared_ptr<TServerSocket>(
405  new TServerSocket(prog_config_opts.system_parameters.omnisci_server_port));
406  httpServerSocket =
407  mapd::shared_ptr<TServerSocket>(new TServerSocket(prog_config_opts.http_port));
408  }
409 
410  ScopeGuard pointer_to_thrift_guard = [] {
411  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
413  };
414 
415  if (prog_config_opts.system_parameters.ha_group_id.empty()) {
416  mapd::shared_ptr<TProcessor> processor(new TrackingProcessor(g_mapd_handler));
417  mapd::shared_ptr<TTransportFactory> bufTransportFactory(
418  new TBufferedTransportFactory());
419  mapd::shared_ptr<TProtocolFactory> bufProtocolFactory(new TBinaryProtocolFactory());
420 
421  mapd::shared_ptr<TServerTransport> bufServerTransport(serverSocket);
422  TThreadedServer bufServer(
423  processor, bufServerTransport, bufTransportFactory, bufProtocolFactory);
424  {
425  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
426  g_thrift_buf_server = &bufServer;
427  }
428 
429  std::thread bufThread(start_server,
430  std::ref(bufServer),
431  prog_config_opts.system_parameters.omnisci_server_port);
432 
433  // TEMPORARY
434  auto warmup_queries = [&prog_config_opts]() {
435  // run warm up queries if any exists
437  g_mapd_handler, prog_config_opts.base_path, prog_config_opts.db_query_file);
438  if (prog_config_opts.exit_after_warmup) {
439  g_running = false;
440  }
441  };
442 
443  mapd::shared_ptr<TServerTransport> httpServerTransport(httpServerSocket);
444  mapd::shared_ptr<TTransportFactory> httpTransportFactory(
445  new THttpServerTransportFactory());
446  mapd::shared_ptr<TProtocolFactory> httpProtocolFactory(new TJSONProtocolFactory());
447  TThreadedServer httpServer(
448  processor, httpServerTransport, httpTransportFactory, httpProtocolFactory);
449  if (start_http_server) {
450  {
451  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
452  g_thrift_http_server = &httpServer;
453  }
454  std::thread httpThread(
455  start_server, std::ref(httpServer), prog_config_opts.http_port);
456 
457  warmup_queries();
458 
459  bufThread.join();
460  httpThread.join();
461  } else {
462  warmup_queries();
463  bufThread.join();
464  }
465  } else { // running ha server
466  LOG(FATAL) << "No High Availability module available, please contact OmniSci support";
467  }
468 
469  g_running = false;
470  file_delete_thread.join();
471  heartbeat_thread.join();
473 
474  int signum = g_saw_signal;
475  if (signum <= 0 || signum == SIGTERM) {
476  return 0;
477  } else {
478  return signum;
479  }
480 }
481 
482 int main(int argc, char** argv) {
483  bool has_clust_topo = false;
484 
485  CommandLineOptions prog_config_opts(argv[0], has_clust_topo);
486 
487  try {
488  if (auto return_code =
489  prog_config_opts.parse_command_line(argc, argv, !has_clust_topo)) {
490  return *return_code;
491  }
492 
493  if (!has_clust_topo) {
494  prog_config_opts.validate_base_path();
495  prog_config_opts.validate();
496  return (startMapdServer(prog_config_opts));
497  }
498  } catch (std::runtime_error& e) {
499  std::cerr << "Can't start: " << e.what() << std::endl;
500  return 1;
501  } catch (boost::program_options::error& e) {
502  std::cerr << "Usage Error: " << e.what() << std::endl;
503  return 1;
504  }
505 }
mapd_shared_mutex g_thrift_mutex
Definition: MapDServer.cpp:75
TThreadedServer * g_thrift_buf_server
Definition: MapDServer.cpp:77
mapd::shared_ptr< DBHandler > g_warmup_handler
Definition: MapDServer.cpp:79
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
std::once_flag g_shutdown_once_flag
Definition: MapDServer.cpp:83
#define LOG(tag)
Definition: Logger.h:188
shared utility for the db server and string dictionary server to remove old files ...
void heartbeat()
Definition: MapDServer.cpp:255
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
void run_warmup_queries(mapd::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
Definition: MapDServer.cpp:179
TThreadedServer * g_thrift_http_server
Definition: MapDServer.cpp:76
int main(int argc, char **argv)
Definition: MapDServer.cpp:482
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file)
Definition: MapDServer.cpp:172
size_t max_concurrent_render_sessions
singleton class to handle concurrancy and state for blosc library. A C++ wraper over a pure C library...
std::vector< LeafHostInfo > db_leaves
std::shared_timed_mutex mapd_shared_mutex
mapd::shared_ptr< DBHandler > g_mapd_handler
Definition: MapDServer.cpp:82
void start_server(TThreadedServer &server, const int port)
Definition: MapDServer.cpp:160
std::string ssl_key_file
AuthMetadata authMetadata
void shutdown_handler()
Definition: MapDServer.cpp:85
std::atomic< int > g_saw_signal
Definition: MapDServer.cpp:73
void register_signal_handlers()
Definition: MapDServer.cpp:149
void shutdown()
Definition: Logger.cpp:310
bool g_enable_experimental_string_functions
std::atomic< bool > g_running
Definition: MapDServer.cpp:72
std::vector< std::string > udf_compiler_options
void register_signal_handler(int signum, void(*handler)(int))
Definition: MapDServer.cpp:91
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_thrift_logs
Definition: initdb.cpp:41
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
Definition: File.cpp:227
void omnisci_signal_handler(int signum)
Definition: MapDServer.cpp:108
int startMapdServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
Definition: MapDServer.cpp:304
std::string ha_group_id
#define VLOG(n)
Definition: Logger.h:291
SystemParameters system_parameters
std::string ssl_cert_file