OmniSciDB  2e3a973ef4
/home/jenkins-slave/workspace/core-os-doxygen/MapDServer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "MapDServer.h"
20 
21 #ifdef HAVE_THRIFT_THREADFACTORY
22 #include <thrift/concurrency/ThreadFactory.h>
23 #else
24 #include <thrift/concurrency/PlatformThreadFactory.h>
25 #endif
26 
27 #include <thrift/concurrency/ThreadManager.h>
28 #include <thrift/protocol/TBinaryProtocol.h>
29 #include <thrift/protocol/TJSONProtocol.h>
30 #include <thrift/server/TThreadedServer.h>
31 #include <thrift/transport/TBufferTransports.h>
32 #include <thrift/transport/THttpServer.h>
33 #include <thrift/transport/TSSLServerSocket.h>
34 #include <thrift/transport/TSSLSocket.h>
35 #include <thrift/transport/TServerSocket.h>
36 
37 #include "Archive/S3Archive.h"
38 #include "Logger/Logger.h"
40 #include "Shared/file_delete.h"
42 #include "Shared/mapd_shared_ptr.h"
43 #include "Shared/scope.h"
44 
45 #include <boost/algorithm/string.hpp>
46 #include <boost/algorithm/string/trim.hpp>
47 #include <boost/filesystem.hpp>
48 #include <boost/locale/generator.hpp>
49 #include <boost/make_shared.hpp>
50 #include <boost/program_options.hpp>
51 
52 #include <csignal>
53 #include <cstdlib>
54 #include <sstream>
55 #include <thread>
56 #include <vector>
57 
58 #include "MapDRelease.h"
60 #include "Shared/Compressor.h"
62 #include "Shared/file_delete.h"
63 #include "Shared/mapd_shared_ptr.h"
64 #include "Shared/scope.h"
65 
66 using namespace ::apache::thrift;
67 using namespace ::apache::thrift::concurrency;
68 using namespace ::apache::thrift::protocol;
69 using namespace ::apache::thrift::server;
70 using namespace ::apache::thrift::transport;
71 
72 extern bool g_enable_thrift_logs;
73 
74 std::atomic<bool> g_running{true};
75 std::atomic<int> g_saw_signal{-1};
76 
78 TThreadedServer* g_thrift_http_server{nullptr};
79 TThreadedServer* g_thrift_buf_server{nullptr};
80 
81 mapd::shared_ptr<DBHandler> g_warmup_handler =
82  0; // global "g_warmup_handler" needed to avoid circular dependency
83 // between "DBHandler" & function "run_warmup_queries"
84 mapd::shared_ptr<DBHandler> g_mapd_handler = 0;
85 std::once_flag g_shutdown_once_flag;
86 
88  if (g_mapd_handler) {
89  std::call_once(g_shutdown_once_flag, []() { g_mapd_handler->shutdown(); });
90  }
91 }
92 
93 void register_signal_handler(int signum, void (*handler)(int)) {
94  struct sigaction act;
95  memset(&act, 0, sizeof(act));
96  if (handler != SIG_DFL && handler != SIG_IGN) {
97  // block all signal deliveries while inside the signal handler
98  sigfillset(&act.sa_mask);
99  }
100  act.sa_handler = handler;
101  sigaction(signum, &act, NULL);
102 }
103 
104 // Signal handler to set a global flag telling the server to exit.
105 // Do not call other functions inside this (or any) signal handler
106 // unless you really know what you are doing. See also:
107 // man 7 signal-safety
108 // man 7 signal
109 // https://en.wikipedia.org/wiki/Reentrancy_(computing)
110 void omnisci_signal_handler(int signum) {
111  // Record the signal number for logging during shutdown.
112  // Only records the first signal if called more than once.
113  int expected_signal{-1};
114  if (!g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
115  return; // this wasn't the first signal
116  }
117 
118  // This point should never be reached more than once.
119 
120  // Tell heartbeat() to shutdown by unsetting the 'g_running' flag.
121  // If 'g_running' is already false, this has no effect and the
122  // shutdown is already in progress.
123  g_running = false;
124 
125  // Handle core dumps specially by pausing inside this signal handler
126  // because on some systems, some signals will execute their default
127  // action immediately when and if the signal handler returns.
128  // We would like to do some emergency cleanup before core dump.
129  if (signum == SIGQUIT || signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE) {
130  // Wait briefly to give heartbeat() a chance to flush the logs and
131  // do any other emergency shutdown tasks.
132  sleep(2);
133 
134  // Explicitly trigger whatever default action this signal would
135  // have done, such as terminate the process or dump core.
136  // Signals are currently blocked so this new signal will be queued
137  // until this signal handler returns.
138  register_signal_handler(signum, SIG_DFL);
139  kill(getpid(), signum);
140  sleep(5);
141 
142 #ifndef __APPLE__
143  // as a last resort, abort
144  // primary used in Docker environments, where we can end up with PID 1 and fail to
145  // catch unix signals
146  quick_exit(signum);
147 #endif
148  }
149 }
150 
158  // Thrift secure socket can cause problems with SIGPIPE
159  register_signal_handler(SIGPIPE, SIG_IGN);
160 }
161 
162 void start_server(TThreadedServer& server, const int port) {
163  try {
164  server.serve();
165  if (errno != 0) {
166  throw std::runtime_error(std::string("Thrift server exited: ") +
167  std::strerror(errno));
168  }
169  } catch (std::exception& e) {
170  LOG(ERROR) << "Exception: " << e.what() << ": port " << port << std::endl;
171  }
172 }
173 
174 void releaseWarmupSession(TSessionId& sessionId, std::ifstream& query_file) {
175  query_file.close();
176  if (sessionId != g_warmup_handler->getInvalidSessionId()) {
177  g_warmup_handler->disconnect(sessionId);
178  }
179 }
180 
181 void run_warmup_queries(mapd::shared_ptr<DBHandler> handler,
182  std::string base_path,
183  std::string query_file_path) {
184  // run warmup queries to load cache if requested
185  if (query_file_path.empty()) {
186  return;
187  }
188  LOG(INFO) << "Running DB warmup with queries from " << query_file_path;
189  try {
190  g_warmup_handler = handler;
191  std::string db_info;
192  std::string user_keyword, user_name, db_name;
193  std::ifstream query_file;
196  TSessionId sessionId = g_warmup_handler->getInvalidSessionId();
197 
198  ScopeGuard session_guard = [&] { releaseWarmupSession(sessionId, query_file); };
199  query_file.open(query_file_path);
200  while (std::getline(query_file, db_info)) {
201  if (db_info.length() == 0) {
202  continue;
203  }
204  std::istringstream iss(db_info);
205  iss >> user_keyword >> user_name >> db_name;
206  if (user_keyword.compare(0, 4, "USER") == 0) {
207  // connect to DB for given user_name/db_name with super_user_rights (without
208  // password), & start session
209  g_warmup_handler->super_user_rights_ = true;
210  g_warmup_handler->connect(sessionId, user_name, "", db_name);
211  g_warmup_handler->super_user_rights_ = false;
212 
213  // read and run one query at a time for the DB with the setup connection
214  TQueryResult ret;
215  std::string single_query;
216  while (std::getline(query_file, single_query)) {
217  boost::algorithm::trim(single_query);
218  if (single_query.length() == 0 || single_query[0] == '-') {
219  continue;
220  }
221  if (single_query[0] == '}') {
222  single_query.clear();
223  break;
224  }
225  if (single_query.find(';') == single_query.npos) {
226  std::string multiline_query;
227  std::getline(query_file, multiline_query, ';');
228  single_query += multiline_query;
229  }
230 
231  try {
232  g_warmup_handler->sql_execute(ret, sessionId, single_query, true, "", -1, -1);
233  } catch (...) {
234  LOG(WARNING) << "Exception while executing '" << single_query
235  << "', ignoring";
236  }
237  single_query.clear();
238  }
239 
240  // stop session and disconnect from the DB
241  g_warmup_handler->disconnect(sessionId);
242  sessionId = g_warmup_handler->getInvalidSessionId();
243  } else {
244  LOG(WARNING) << "\nSyntax error in the file: " << query_file_path.c_str()
245  << " Missing expected keyword USER. Following line will be ignored: "
246  << db_info.c_str() << std::endl;
247  }
248  db_info.clear();
249  }
250  } catch (...) {
251  LOG(WARNING) << "Exception while executing warmup queries. "
252  << "Warmup may not be fully completed. Will proceed nevertheless."
253  << std::endl;
254  }
255 }
256 
257 void heartbeat() {
258  // Block all signals for this heartbeat thread, only.
259  sigset_t set;
260  sigfillset(&set);
261  int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
262  if (result != 0) {
263  throw std::runtime_error("heartbeat() thread startup failed");
264  }
265 
266  // Sleep until omnisci_signal_handler or anything clears the g_running flag.
267  VLOG(1) << "heartbeat thread starting";
268  while (::g_running) {
269  using namespace std::chrono;
270  std::this_thread::sleep_for(1s);
271  }
272  VLOG(1) << "heartbeat thread exiting";
273 
274  // Get the signal number if there was a signal.
275  int signum = g_saw_signal;
276  if (signum >= 1 && signum != SIGTERM) {
277  LOG(INFO) << "Interrupt signal (" << signum << ") received.";
278  }
279 
280  // if dumping core, try to do some quick stuff
281  if (signum == SIGQUIT || signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE) {
282  if (g_mapd_handler) {
283  std::call_once(g_shutdown_once_flag,
284  []() { g_mapd_handler->emergency_shutdown(); });
285  }
287  return;
288  // core dump should begin soon after this, see omnisci_signal_handler()
289  }
290 
291  // trigger an orderly shutdown by telling Thrift to stop serving
292  {
293  mapd_shared_lock<mapd_shared_mutex> read_lock(g_thrift_mutex);
294  auto httpserv = g_thrift_http_server;
295  if (httpserv) {
296  httpserv->stop();
297  }
298  auto bufserv = g_thrift_buf_server;
299  if (bufserv) {
300  bufserv->stop();
301  }
302  // main() should return soon after this
303  }
304 }
305 
306 int startMapdServer(CommandLineOptions& prog_config_opts, bool start_http_server = true) {
307  // try to enforce an orderly shutdown even after a signal
309 
310  // register shutdown procedures for when a normal shutdown happens
311  // be aware that atexit() functions run in reverse order
312  atexit(&logger::shutdown);
313  atexit(&shutdown_handler);
314 
315 #ifdef HAVE_AWS_S3
316  // hold a s3 archive here to survive from a segfault that happens on centos
317  // when s3 transactions and others openssl-ed sessions are interleaved...
318  auto s3_survivor = std::make_unique<S3Archive>("s3://omnisci/s3_survivor.txt", true);
319 #endif
320 
321  // start background thread to clean up _DELETE_ME files
322  const unsigned int wait_interval =
323  3; // wait time in secs after looking for deleted file before looking again
324  std::thread file_delete_thread(file_delete,
325  std::ref(g_running),
326  wait_interval,
327  prog_config_opts.base_path + "/mapd_data");
328  std::thread heartbeat_thread(heartbeat);
329 
330  if (!g_enable_thrift_logs) {
331  apache::thrift::GlobalOutput.setOutputFunction([](const char* msg) {});
332  }
333 
335  // Use the locale setting of the server by default. The generate parameter can be
336  // updated appropriately if a locale override option is ever supported.
337  boost::locale::generator generator;
338  std::locale::global(generator.generate(""));
339  }
340 
341  try {
343  mapd::make_shared<DBHandler>(prog_config_opts.db_leaves,
344  prog_config_opts.string_leaves,
345  prog_config_opts.base_path,
346  prog_config_opts.cpu_only,
347  prog_config_opts.allow_multifrag,
348  prog_config_opts.jit_debug,
349  prog_config_opts.intel_jit_profile,
350  prog_config_opts.read_only,
351  prog_config_opts.allow_loop_joins,
352  prog_config_opts.enable_rendering,
353  prog_config_opts.renderer_use_vulkan_driver,
354  prog_config_opts.enable_auto_clear_render_mem,
355  prog_config_opts.render_oom_retry_threshold,
356  prog_config_opts.render_mem_bytes,
357  prog_config_opts.max_concurrent_render_sessions,
358  prog_config_opts.num_gpus,
359  prog_config_opts.start_gpu,
360  prog_config_opts.reserved_gpu_mem,
361  prog_config_opts.render_compositor_use_last_gpu,
362  prog_config_opts.num_reader_threads,
363  prog_config_opts.authMetadata,
364  prog_config_opts.system_parameters,
365  prog_config_opts.enable_legacy_syntax,
366  prog_config_opts.idle_session_duration,
367  prog_config_opts.max_session_duration,
368  prog_config_opts.enable_runtime_udf,
369  prog_config_opts.udf_file_name,
370  prog_config_opts.udf_compiler_path,
371  prog_config_opts.udf_compiler_options,
372 #ifdef ENABLE_GEOS
373  prog_config_opts.libgeos_so_filename,
374 #endif
375  prog_config_opts.disk_cache_config);
376  } catch (const std::exception& e) {
377  LOG(FATAL) << "Failed to initialize service handler: " << e.what();
378  }
379 
380  if (g_enable_fsi) {
382  }
383 
384  mapd::shared_ptr<TServerSocket> serverSocket;
385  mapd::shared_ptr<TServerSocket> httpServerSocket;
386  if (!prog_config_opts.system_parameters.ssl_cert_file.empty() &&
387  !prog_config_opts.system_parameters.ssl_key_file.empty()) {
388  mapd::shared_ptr<TSSLSocketFactory> sslSocketFactory;
389  sslSocketFactory =
390  mapd::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory(SSLProtocol::SSLTLS));
391  sslSocketFactory->loadCertificate(
392  prog_config_opts.system_parameters.ssl_cert_file.c_str());
393  sslSocketFactory->loadPrivateKey(
394  prog_config_opts.system_parameters.ssl_key_file.c_str());
395  if (prog_config_opts.system_parameters.ssl_transport_client_auth) {
396  sslSocketFactory->authenticate(true);
397  } else {
398  sslSocketFactory->authenticate(false);
399  }
400  sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
401  serverSocket = mapd::shared_ptr<TServerSocket>(new TSSLServerSocket(
402  prog_config_opts.system_parameters.omnisci_server_port, sslSocketFactory));
403  httpServerSocket = mapd::shared_ptr<TServerSocket>(
404  new TSSLServerSocket(prog_config_opts.http_port, sslSocketFactory));
405  LOG(INFO) << " OmniSci server using encrypted connection. Cert file ["
406  << prog_config_opts.system_parameters.ssl_cert_file << "], key file ["
407  << prog_config_opts.system_parameters.ssl_key_file << "]";
408  } else {
409  LOG(INFO) << " OmniSci server using unencrypted connection";
410  serverSocket = mapd::shared_ptr<TServerSocket>(
411  new TServerSocket(prog_config_opts.system_parameters.omnisci_server_port));
412  httpServerSocket =
413  mapd::shared_ptr<TServerSocket>(new TServerSocket(prog_config_opts.http_port));
414  }
415 
416  ScopeGuard pointer_to_thrift_guard = [] {
417  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
419  };
420 
421  if (prog_config_opts.system_parameters.ha_group_id.empty()) {
422  mapd::shared_ptr<TProcessor> processor(
423  new TrackingProcessor(g_mapd_handler, prog_config_opts.log_user_origin));
424  mapd::shared_ptr<TTransportFactory> bufTransportFactory(
425  new TBufferedTransportFactory());
426  mapd::shared_ptr<TProtocolFactory> bufProtocolFactory(new TBinaryProtocolFactory());
427 
428  mapd::shared_ptr<TServerTransport> bufServerTransport(serverSocket);
429  TThreadedServer bufServer(
430  processor, bufServerTransport, bufTransportFactory, bufProtocolFactory);
431  {
432  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
433  g_thrift_buf_server = &bufServer;
434  }
435 
436  std::thread bufThread(start_server,
437  std::ref(bufServer),
438  prog_config_opts.system_parameters.omnisci_server_port);
439 
440  // TEMPORARY
441  auto warmup_queries = [&prog_config_opts]() {
442  // run warm up queries if any exists
444  g_mapd_handler, prog_config_opts.base_path, prog_config_opts.db_query_file);
445  if (prog_config_opts.exit_after_warmup) {
446  g_running = false;
447  }
448  };
449 
450  mapd::shared_ptr<TServerTransport> httpServerTransport(httpServerSocket);
451  mapd::shared_ptr<TTransportFactory> httpTransportFactory(
452  new THttpServerTransportFactory());
453  mapd::shared_ptr<TProtocolFactory> httpProtocolFactory(new TJSONProtocolFactory());
454  TThreadedServer httpServer(
455  processor, httpServerTransport, httpTransportFactory, httpProtocolFactory);
456  if (start_http_server) {
457  {
458  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
459  g_thrift_http_server = &httpServer;
460  }
461  std::thread httpThread(
462  start_server, std::ref(httpServer), prog_config_opts.http_port);
463 
464  warmup_queries();
465 
466  bufThread.join();
467  httpThread.join();
468  } else {
469  warmup_queries();
470  bufThread.join();
471  }
472  } else { // running ha server
473  LOG(FATAL) << "No High Availability module available, please contact OmniSci support";
474  }
475 
476  g_running = false;
477  file_delete_thread.join();
478  heartbeat_thread.join();
480 
481  if (g_enable_fsi) {
483  }
484 
485  int signum = g_saw_signal;
486  if (signum <= 0 || signum == SIGTERM) {
487  return 0;
488  } else {
489  return signum;
490  }
491 }
492 
493 int main(int argc, char** argv) {
494  bool has_clust_topo = false;
495 
496  CommandLineOptions prog_config_opts(argv[0], has_clust_topo);
497 
498  try {
499  if (auto return_code =
500  prog_config_opts.parse_command_line(argc, argv, !has_clust_topo)) {
501  return *return_code;
502  }
503 
504  if (!has_clust_topo) {
505  prog_config_opts.validate_base_path();
506  prog_config_opts.validate();
507  return (startMapdServer(prog_config_opts));
508  }
509  } catch (std::runtime_error& e) {
510  std::cerr << "Can't start: " << e.what() << std::endl;
511  return 1;
512  } catch (boost::program_options::error& e) {
513  std::cerr << "Usage Error: " << e.what() << std::endl;
514  return 1;
515  }
516 }
mapd_shared_mutex g_thrift_mutex
Definition: MapDServer.cpp:77
TThreadedServer * g_thrift_buf_server
Definition: MapDServer.cpp:79
mapd::shared_ptr< DBHandler > g_warmup_handler
Definition: MapDServer.cpp:81
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
std::once_flag g_shutdown_once_flag
Definition: MapDServer.cpp:85
#define LOG(tag)
Definition: Logger.h:188
DiskCacheConfig disk_cache_config
shared utility for the db server and string dictionary server to remove old files ...
void heartbeat()
Definition: MapDServer.cpp:257
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
void run_warmup_queries(mapd::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
Definition: MapDServer.cpp:181
TThreadedServer * g_thrift_http_server
Definition: MapDServer.cpp:78
int main(int argc, char **argv)
Definition: MapDServer.cpp:493
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file)
Definition: MapDServer.cpp:174
size_t max_concurrent_render_sessions
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
std::vector< LeafHostInfo > db_leaves
std::shared_timed_mutex mapd_shared_mutex
mapd::shared_ptr< DBHandler > g_mapd_handler
Definition: MapDServer.cpp:84
void start_server(TThreadedServer &server, const int port)
Definition: MapDServer.cpp:162
std::string ssl_key_file
AuthMetadata authMetadata
void shutdown_handler()
Definition: MapDServer.cpp:87
std::atomic< int > g_saw_signal
Definition: MapDServer.cpp:75
static void start(std::atomic< bool > &is_program_running)
void register_signal_handlers()
Definition: MapDServer.cpp:151
void shutdown()
Definition: Logger.cpp:314
bool g_enable_experimental_string_functions
std::atomic< bool > g_running
Definition: MapDServer.cpp:74
std::vector< std::string > udf_compiler_options
void register_signal_handler(int signum, void(*handler)(int))
Definition: MapDServer.cpp:93
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_thrift_logs
Definition: initdb.cpp:41
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
Definition: File.cpp:232
bool g_enable_fsi
Definition: Catalog.cpp:91
void omnisci_signal_handler(int signum)
Definition: MapDServer.cpp:110
int startMapdServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
Definition: MapDServer.cpp:306
std::string ha_group_id
#define VLOG(n)
Definition: Logger.h:291
SystemParameters system_parameters
std::string ssl_cert_file