OmniSciDB  85c2d10cdc
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
/home/jenkins-slave/workspace/core-os-doxygen/MapDServer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
19 
20 #ifdef HAVE_THRIFT_THREADFACTORY
21 #include <thrift/concurrency/ThreadFactory.h>
22 #else
23 #include <thrift/concurrency/PlatformThreadFactory.h>
24 #endif
25 
26 #include <thrift/concurrency/ThreadManager.h>
27 #include <thrift/protocol/TBinaryProtocol.h>
28 #include <thrift/protocol/TJSONProtocol.h>
29 #include <thrift/server/TThreadedServer.h>
30 #include <thrift/transport/TBufferTransports.h>
31 #include <thrift/transport/THttpServer.h>
32 #include <thrift/transport/TSSLServerSocket.h>
33 #include <thrift/transport/TSSLSocket.h>
34 #include <thrift/transport/TServerSocket.h>
35 
36 #include "Logger/Logger.h"
38 #include "Shared/file_delete.h"
40 #include "Shared/mapd_shared_ptr.h"
41 #include "Shared/scope.h"
42 
43 #include <boost/algorithm/string.hpp>
44 #include <boost/algorithm/string/trim.hpp>
45 #include <boost/filesystem.hpp>
46 #include <boost/locale/generator.hpp>
47 #include <boost/make_shared.hpp>
48 #include <boost/program_options.hpp>
49 
50 #include <csignal>
51 #include <cstdlib>
52 #include <sstream>
53 #include <thread>
54 #include <vector>
55 
56 #ifdef HAVE_AWS_S3
57 #include "DataMgr/OmniSciAwsSdk.h"
58 #endif
59 #include "MapDRelease.h"
60 #include "Shared/Compressor.h"
62 #include "Shared/file_delete.h"
63 #include "Shared/mapd_shared_ptr.h"
64 #include "Shared/scope.h"
66 
67 using namespace ::apache::thrift;
68 using namespace ::apache::thrift::concurrency;
69 using namespace ::apache::thrift::protocol;
70 using namespace ::apache::thrift::server;
71 using namespace ::apache::thrift::transport;
72 
73 extern bool g_enable_thrift_logs;
74 
75 std::atomic<bool> g_running{true};
76 std::atomic<int> g_saw_signal{-1};
77 
79 TThreadedServer* g_thrift_http_server{nullptr};
80 TThreadedServer* g_thrift_buf_server{nullptr};
81 
82 mapd::shared_ptr<DBHandler> g_warmup_handler =
83  0; // global "g_warmup_handler" needed to avoid circular dependency
84 // between "DBHandler" & function "run_warmup_queries"
85 mapd::shared_ptr<DBHandler> g_mapd_handler = 0;
86 std::once_flag g_shutdown_once_flag;
87 
89  if (g_mapd_handler) {
90  std::call_once(g_shutdown_once_flag, []() { g_mapd_handler->shutdown(); });
91  }
92 }
93 
94 void register_signal_handler(int signum, void (*handler)(int)) {
95 #ifdef _WIN32
96  signal(signum, handler);
97 #else
98  struct sigaction act;
99  memset(&act, 0, sizeof(act));
100  if (handler != SIG_DFL && handler != SIG_IGN) {
101  // block all signal deliveries while inside the signal handler
102  sigfillset(&act.sa_mask);
103  }
104  act.sa_handler = handler;
105  sigaction(signum, &act, NULL);
106 #endif
107 }
108 
109 // Signal handler to set a global flag telling the server to exit.
110 // Do not call other functions inside this (or any) signal handler
111 // unless you really know what you are doing. See also:
112 // man 7 signal-safety
113 // man 7 signal
114 // https://en.wikipedia.org/wiki/Reentrancy_(computing)
115 void omnisci_signal_handler(int signum) {
116  // Record the signal number for logging during shutdown.
117  // Only records the first signal if called more than once.
118  int expected_signal{-1};
119  if (!g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
120  return; // this wasn't the first signal
121  }
122 
123  // This point should never be reached more than once.
124 
125  // Tell heartbeat() to shutdown by unsetting the 'g_running' flag.
126  // If 'g_running' is already false, this has no effect and the
127  // shutdown is already in progress.
128  g_running = false;
129 
130  // Handle core dumps specially by pausing inside this signal handler
131  // because on some systems, some signals will execute their default
132  // action immediately when and if the signal handler returns.
133  // We would like to do some emergency cleanup before core dump.
134  if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
135 #ifndef _WIN32
136  || signum == SIGQUIT
137 #endif
138  ) {
139  // Wait briefly to give heartbeat() a chance to flush the logs and
140  // do any other emergency shutdown tasks.
141  std::this_thread::sleep_for(std::chrono::seconds(2));
142 
143  // Explicitly trigger whatever default action this signal would
144  // have done, such as terminate the process or dump core.
145  // Signals are currently blocked so this new signal will be queued
146  // until this signal handler returns.
147  register_signal_handler(signum, SIG_DFL);
148 #ifdef _WIN32
149  raise(signum);
150 #else
151  kill(getpid(), signum);
152 #endif
153  std::this_thread::sleep_for(std::chrono::seconds(5));
154 
155 #ifndef __APPLE__
156  // as a last resort, abort
157  // primary used in Docker environments, where we can end up with PID 1 and fail to
158  // catch unix signals
159  quick_exit(signum);
160 #endif
161  }
162 }
163 
166 #ifndef _WIN32
169 #endif
173 #ifndef _WIN32
174  // Thrift secure socket can cause problems with SIGPIPE
175  register_signal_handler(SIGPIPE, SIG_IGN);
176 #endif
177 }
178 
179 void start_server(TThreadedServer& server, const int port) {
180  try {
181  server.serve();
182  if (errno != 0) {
183  throw std::runtime_error(std::string("Thrift server exited: ") +
184  std::strerror(errno));
185  }
186  } catch (std::exception& e) {
187  LOG(ERROR) << "Exception: " << e.what() << ": port " << port << std::endl;
188  }
189 }
190 
191 void releaseWarmupSession(TSessionId& sessionId, std::ifstream& query_file) noexcept {
192  query_file.close();
193  if (sessionId != g_warmup_handler->getInvalidSessionId()) {
194  try {
195  g_warmup_handler->disconnect(sessionId);
196  } catch (...) {
197  LOG(ERROR) << "Failed to disconnect warmup session, possible failure to run warmup "
198  "queries.";
199  }
200  }
201 }
202 
203 void run_warmup_queries(mapd::shared_ptr<DBHandler> handler,
204  std::string base_path,
205  std::string query_file_path) {
206  // run warmup queries to load cache if requested
207  if (query_file_path.empty()) {
208  return;
209  }
210  if (handler->isAggregator()) {
211  LOG(INFO) << "Skipping warmup query execution on the aggregator, queries should be "
212  "run directly on the leaf nodes.";
213  return;
214  }
215 
216  LOG(INFO) << "Running DB warmup with queries from " << query_file_path;
217  try {
218  g_warmup_handler = handler;
219  std::string db_info;
220  std::string user_keyword, user_name, db_name;
221  std::ifstream query_file;
224  TSessionId sessionId = g_warmup_handler->getInvalidSessionId();
225 
226  ScopeGuard session_guard = [&] { releaseWarmupSession(sessionId, query_file); };
227  query_file.open(query_file_path);
228  while (std::getline(query_file, db_info)) {
229  if (db_info.length() == 0) {
230  continue;
231  }
232  std::istringstream iss(db_info);
233  iss >> user_keyword >> user_name >> db_name;
234  if (user_keyword.compare(0, 4, "USER") == 0) {
235  // connect to DB for given user_name/db_name with super_user_rights (without
236  // password), & start session
237  g_warmup_handler->super_user_rights_ = true;
238  g_warmup_handler->connect(sessionId, user_name, "", db_name);
239  g_warmup_handler->super_user_rights_ = false;
240 
241  // read and run one query at a time for the DB with the setup connection
242  TQueryResult ret;
243  std::string single_query;
244  while (std::getline(query_file, single_query)) {
245  boost::algorithm::trim(single_query);
246  if (single_query.length() == 0 || single_query[0] == '-') {
247  continue;
248  }
249  if (single_query[0] == '}') {
250  single_query.clear();
251  break;
252  }
253  if (single_query.find(';') == single_query.npos) {
254  std::string multiline_query;
255  std::getline(query_file, multiline_query, ';');
256  single_query += multiline_query;
257  }
258 
259  try {
260  g_warmup_handler->sql_execute(ret, sessionId, single_query, true, "", -1, -1);
261  } catch (...) {
262  LOG(WARNING) << "Exception while executing '" << single_query
263  << "', ignoring";
264  }
265  single_query.clear();
266  }
267 
268  // stop session and disconnect from the DB
269  g_warmup_handler->disconnect(sessionId);
270  sessionId = g_warmup_handler->getInvalidSessionId();
271  } else {
272  LOG(WARNING) << "\nSyntax error in the file: " << query_file_path.c_str()
273  << " Missing expected keyword USER. Following line will be ignored: "
274  << db_info.c_str() << std::endl;
275  }
276  db_info.clear();
277  }
278  } catch (const std::exception& e) {
279  LOG(WARNING)
280  << "Exception while executing warmup queries. "
281  << "Warmup may not be fully completed. Will proceed nevertheless.\nError was: "
282  << e.what();
283  }
284 }
285 
286 void heartbeat() {
287 #ifndef _WIN32
288  // Block all signals for this heartbeat thread, only.
289  sigset_t set;
290  sigfillset(&set);
291  int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
292  if (result != 0) {
293  throw std::runtime_error("heartbeat() thread startup failed");
294  }
295 #endif
296 
297  // Sleep until omnisci_signal_handler or anything clears the g_running flag.
298  VLOG(1) << "heartbeat thread starting";
299  while (::g_running) {
300  using namespace std::chrono;
301  std::this_thread::sleep_for(1s);
302  }
303  VLOG(1) << "heartbeat thread exiting";
304 
305  // Get the signal number if there was a signal.
306  int signum = g_saw_signal;
307  if (signum >= 1 && signum != SIGTERM) {
308  LOG(INFO) << "Interrupt signal (" << signum << ") received.";
309  }
310 
311  // if dumping core, try to do some quick stuff
312  if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
313 #ifndef _WIN32
314  || signum == SIGQUIT
315 #endif
316  ) {
317  if (g_mapd_handler) {
318  std::call_once(g_shutdown_once_flag,
319  []() { g_mapd_handler->emergency_shutdown(); });
320  }
322  return;
323  // core dump should begin soon after this, see omnisci_signal_handler()
324  }
325 
326  // trigger an orderly shutdown by telling Thrift to stop serving
327  {
328  mapd_shared_lock<mapd_shared_mutex> read_lock(g_thrift_mutex);
329  auto httpserv = g_thrift_http_server;
330  if (httpserv) {
331  httpserv->stop();
332  }
333  auto bufserv = g_thrift_buf_server;
334  if (bufserv) {
335  bufserv->stop();
336  }
337  // main() should return soon after this
338  }
339 }
340 
341 int startMapdServer(CommandLineOptions& prog_config_opts, bool start_http_server = true) {
342  // try to enforce an orderly shutdown even after a signal
344 
345  // register shutdown procedures for when a normal shutdown happens
346  // be aware that atexit() functions run in reverse order
347  atexit(&logger::shutdown);
348  atexit(&shutdown_handler);
349 
350 #ifdef HAVE_AWS_S3
352  ScopeGuard aws_sdk_guard = [] { omnisci_aws_sdk::shutdown_sdk(); };
353 #endif
354 
355  // start background thread to clean up _DELETE_ME files
356  const unsigned int wait_interval =
357  3; // wait time in secs after looking for deleted file before looking again
358  std::thread file_delete_thread(file_delete,
359  std::ref(g_running),
360  wait_interval,
361  prog_config_opts.base_path + "/mapd_data");
362  std::thread heartbeat_thread(heartbeat);
363 
364  if (!g_enable_thrift_logs) {
365  apache::thrift::GlobalOutput.setOutputFunction([](const char* msg) {});
366  }
367 
369  // Use the locale setting of the server by default. The generate parameter can be
370  // updated appropriately if a locale override option is ever supported.
371  boost::locale::generator generator;
372  std::locale::global(generator.generate(""));
373  }
374 
375  try {
377  mapd::make_shared<DBHandler>(prog_config_opts.db_leaves,
378  prog_config_opts.string_leaves,
379  prog_config_opts.base_path,
380  prog_config_opts.allow_multifrag,
381  prog_config_opts.jit_debug,
382  prog_config_opts.intel_jit_profile,
383  prog_config_opts.read_only,
384  prog_config_opts.allow_loop_joins,
385  prog_config_opts.enable_rendering,
386  prog_config_opts.renderer_use_vulkan_driver,
387  prog_config_opts.enable_auto_clear_render_mem,
388  prog_config_opts.render_oom_retry_threshold,
389  prog_config_opts.render_mem_bytes,
390  prog_config_opts.max_concurrent_render_sessions,
391  prog_config_opts.reserved_gpu_mem,
392  prog_config_opts.render_compositor_use_last_gpu,
393  prog_config_opts.num_reader_threads,
394  prog_config_opts.authMetadata,
395  prog_config_opts.system_parameters,
396  prog_config_opts.enable_legacy_syntax,
397  prog_config_opts.idle_session_duration,
398  prog_config_opts.max_session_duration,
399  prog_config_opts.enable_runtime_udf,
400  prog_config_opts.udf_file_name,
401  prog_config_opts.udf_compiler_path,
402  prog_config_opts.udf_compiler_options,
403 #ifdef ENABLE_GEOS
404  prog_config_opts.libgeos_so_filename,
405 #endif
406  prog_config_opts.disk_cache_config,
407  false);
408  } catch (const std::exception& e) {
409  LOG(FATAL) << "Failed to initialize service handler: " << e.what();
410  }
411 
412  if (g_enable_fsi) {
414  }
415 
416  mapd::shared_ptr<TServerSocket> serverSocket;
417  mapd::shared_ptr<TServerSocket> httpServerSocket;
418  if (!prog_config_opts.system_parameters.ssl_cert_file.empty() &&
419  !prog_config_opts.system_parameters.ssl_key_file.empty()) {
420  mapd::shared_ptr<TSSLSocketFactory> sslSocketFactory;
421  sslSocketFactory =
422  mapd::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory(SSLProtocol::SSLTLS));
423  sslSocketFactory->loadCertificate(
424  prog_config_opts.system_parameters.ssl_cert_file.c_str());
425  sslSocketFactory->loadPrivateKey(
426  prog_config_opts.system_parameters.ssl_key_file.c_str());
427  if (prog_config_opts.system_parameters.ssl_transport_client_auth) {
428  sslSocketFactory->authenticate(true);
429  } else {
430  sslSocketFactory->authenticate(false);
431  }
432  sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
433  serverSocket = mapd::shared_ptr<TServerSocket>(new TSSLServerSocket(
434  prog_config_opts.system_parameters.omnisci_server_port, sslSocketFactory));
435  httpServerSocket = mapd::shared_ptr<TServerSocket>(
436  new TSSLServerSocket(prog_config_opts.http_port, sslSocketFactory));
437  LOG(INFO) << " OmniSci server using encrypted connection. Cert file ["
438  << prog_config_opts.system_parameters.ssl_cert_file << "], key file ["
439  << prog_config_opts.system_parameters.ssl_key_file << "]";
440  } else {
441  LOG(INFO) << " OmniSci server using unencrypted connection";
442  serverSocket = mapd::shared_ptr<TServerSocket>(
443  new TServerSocket(prog_config_opts.system_parameters.omnisci_server_port));
444  httpServerSocket =
445  mapd::shared_ptr<TServerSocket>(new TServerSocket(prog_config_opts.http_port));
446  }
447 
448  ScopeGuard pointer_to_thrift_guard = [] {
449  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
451  };
452 
453  if (prog_config_opts.system_parameters.ha_group_id.empty()) {
454  mapd::shared_ptr<TProcessor> processor(
455  new TrackingProcessor(g_mapd_handler, prog_config_opts.log_user_origin));
456  mapd::shared_ptr<TTransportFactory> bufTransportFactory(
457  new TBufferedTransportFactory());
458  mapd::shared_ptr<TProtocolFactory> bufProtocolFactory(new TBinaryProtocolFactory());
459 
460  mapd::shared_ptr<TServerTransport> bufServerTransport(serverSocket);
461  TThreadedServer bufServer(
462  processor, bufServerTransport, bufTransportFactory, bufProtocolFactory);
463  {
464  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
465  g_thrift_buf_server = &bufServer;
466  }
467 
468  std::thread bufThread(start_server,
469  std::ref(bufServer),
470  prog_config_opts.system_parameters.omnisci_server_port);
471 
472  // TEMPORARY
473  auto warmup_queries = [&prog_config_opts]() {
474  // run warm up queries if any exists
476  g_mapd_handler, prog_config_opts.base_path, prog_config_opts.db_query_file);
477  if (prog_config_opts.exit_after_warmup) {
478  g_running = false;
479  }
480  };
481 
482  mapd::shared_ptr<TServerTransport> httpServerTransport(httpServerSocket);
483  mapd::shared_ptr<TTransportFactory> httpTransportFactory(
484  new THttpServerTransportFactory());
485  mapd::shared_ptr<TProtocolFactory> httpProtocolFactory(new TJSONProtocolFactory());
486  TThreadedServer httpServer(
487  processor, httpServerTransport, httpTransportFactory, httpProtocolFactory);
488  if (start_http_server) {
489  {
490  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
491  g_thrift_http_server = &httpServer;
492  }
493  std::thread httpThread(
494  start_server, std::ref(httpServer), prog_config_opts.http_port);
495 
496  warmup_queries();
497 
498  bufThread.join();
499  httpThread.join();
500  } else {
501  warmup_queries();
502  bufThread.join();
503  }
504  } else { // running ha server
505  LOG(FATAL) << "No High Availability module available, please contact OmniSci support";
506  }
507 
508  g_running = false;
509  file_delete_thread.join();
510  heartbeat_thread.join();
511 
512  if (g_enable_fsi) {
514  }
515 
516  int signum = g_saw_signal;
517  if (signum <= 0 || signum == SIGTERM) {
518  return 0;
519  } else {
520  return signum;
521  }
522 }
523 
524 int main(int argc, char** argv) {
525  bool has_clust_topo = false;
526 
527  CommandLineOptions prog_config_opts(argv[0], has_clust_topo);
528 
529  try {
530  if (auto return_code =
531  prog_config_opts.parse_command_line(argc, argv, !has_clust_topo)) {
532  return *return_code;
533  }
534 
535  if (!has_clust_topo) {
536  prog_config_opts.validate_base_path();
537  prog_config_opts.validate();
538  return (startMapdServer(prog_config_opts));
539  }
540  } catch (std::runtime_error& e) {
541  std::cerr << "Can't start: " << e.what() << std::endl;
542  return 1;
543  } catch (boost::program_options::error& e) {
544  std::cerr << "Usage Error: " << e.what() << std::endl;
545  return 1;
546  }
547 }
mapd_shared_mutex g_thrift_mutex
Definition: MapDServer.cpp:78
TThreadedServer * g_thrift_buf_server
Definition: MapDServer.cpp:80
mapd::shared_ptr< DBHandler > g_warmup_handler
Definition: MapDServer.cpp:82
std::vector< LeafHostInfo > string_leaves
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file) noexcept
Definition: MapDServer.cpp:191
std::string udf_compiler_path
std::once_flag g_shutdown_once_flag
Definition: MapDServer.cpp:86
#define LOG(tag)
Definition: Logger.h:188
DiskCacheConfig disk_cache_config
shared utility for the db server and string dictionary server to remove old files ...
void heartbeat()
Definition: MapDServer.cpp:286
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
void run_warmup_queries(mapd::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
Definition: MapDServer.cpp:203
TThreadedServer * g_thrift_http_server
Definition: MapDServer.cpp:79
size_t max_concurrent_render_sessions
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
std::vector< LeafHostInfo > db_leaves
std::shared_timed_mutex mapd_shared_mutex
bool g_enable_thrift_logs
Definition: initdb.cpp:42
mapd::shared_ptr< DBHandler > g_mapd_handler
Definition: MapDServer.cpp:85
void start_server(TThreadedServer &server, const int port)
Definition: MapDServer.cpp:179
std::string ssl_key_file
AuthMetadata authMetadata
void shutdown_handler()
Definition: MapDServer.cpp:88
std::atomic< int > g_saw_signal
Definition: MapDServer.cpp:76
static void start(std::atomic< bool > &is_program_running)
void register_signal_handlers()
Definition: MapDServer.cpp:164
void shutdown()
Definition: Logger.cpp:314
bool g_enable_experimental_string_functions
std::atomic< bool > g_running
Definition: MapDServer.cpp:75
std::vector< std::string > udf_compiler_options
void register_signal_handler(int signum, void(*handler)(int))
Definition: MapDServer.cpp:94
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
Definition: File.cpp:280
bool g_enable_fsi
Definition: Catalog.cpp:92
void omnisci_signal_handler(int signum)
Definition: MapDServer.cpp:115
int startMapdServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
Definition: MapDServer.cpp:341
std::string ha_group_id
#define VLOG(n)
Definition: Logger.h:291
SystemParameters system_parameters
std::string ssl_cert_file