OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
/home/jenkins-slave/workspace/core-os-doxygen/MapDServer.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
19 
20 #ifdef HAVE_THRIFT_THREADFACTORY
21 #include <thrift/concurrency/ThreadFactory.h>
22 #else
23 #include <thrift/concurrency/PlatformThreadFactory.h>
24 #endif
25 
26 #include <thrift/concurrency/ThreadManager.h>
27 #include <thrift/protocol/TBinaryProtocol.h>
28 #include <thrift/protocol/TJSONProtocol.h>
29 #include <thrift/server/TThreadedServer.h>
30 #include <thrift/transport/TBufferTransports.h>
31 #include <thrift/transport/THttpServer.h>
32 #include <thrift/transport/TSSLServerSocket.h>
33 #include <thrift/transport/TSSLSocket.h>
34 #include <thrift/transport/TServerSocket.h>
35 
36 #include "Logger/Logger.h"
38 #include "Shared/file_delete.h"
40 #include "Shared/mapd_shared_ptr.h"
41 #include "Shared/scope.h"
42 
43 #include <boost/algorithm/string.hpp>
44 #include <boost/algorithm/string/trim.hpp>
45 #include <boost/filesystem.hpp>
46 #include <boost/locale/generator.hpp>
47 #include <boost/make_shared.hpp>
48 #include <boost/program_options.hpp>
49 
50 #include <csignal>
51 #include <cstdlib>
52 #include <sstream>
53 #include <thread>
54 #include <vector>
55 
56 #ifdef HAVE_AWS_S3
57 #include "DataMgr/OmniSciAwsSdk.h"
58 #endif
59 #include "MapDRelease.h"
60 #include "Shared/Compressor.h"
62 #include "Shared/file_delete.h"
63 #include "Shared/mapd_shared_ptr.h"
64 #include "Shared/scope.h"
66 
67 using namespace ::apache::thrift;
68 using namespace ::apache::thrift::concurrency;
69 using namespace ::apache::thrift::protocol;
70 using namespace ::apache::thrift::server;
71 using namespace ::apache::thrift::transport;
72 
73 extern bool g_enable_thrift_logs;
74 
75 std::atomic<bool> g_running{true};
76 std::atomic<int> g_saw_signal{-1};
77 
79 TThreadedServer* g_thrift_http_server{nullptr};
80 TThreadedServer* g_thrift_buf_server{nullptr};
81 
82 mapd::shared_ptr<DBHandler> g_warmup_handler =
83  0; // global "g_warmup_handler" needed to avoid circular dependency
84 // between "DBHandler" & function "run_warmup_queries"
85 mapd::shared_ptr<DBHandler> g_mapd_handler = 0;
86 std::once_flag g_shutdown_once_flag;
87 
89  if (g_mapd_handler) {
90  std::call_once(g_shutdown_once_flag, []() { g_mapd_handler->shutdown(); });
91  }
92 }
93 
94 void register_signal_handler(int signum, void (*handler)(int)) {
95 #ifdef _WIN32
96  signal(signum, handler);
97 #else
98  struct sigaction act;
99  memset(&act, 0, sizeof(act));
100  if (handler != SIG_DFL && handler != SIG_IGN) {
101  // block all signal deliveries while inside the signal handler
102  sigfillset(&act.sa_mask);
103  }
104  act.sa_handler = handler;
105  sigaction(signum, &act, NULL);
106 #endif
107 }
108 
109 // Signal handler to set a global flag telling the server to exit.
110 // Do not call other functions inside this (or any) signal handler
111 // unless you really know what you are doing. See also:
112 // man 7 signal-safety
113 // man 7 signal
114 // https://en.wikipedia.org/wiki/Reentrancy_(computing)
115 void omnisci_signal_handler(int signum) {
116  // Record the signal number for logging during shutdown.
117  // Only records the first signal if called more than once.
118  int expected_signal{-1};
119  if (!g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
120  return; // this wasn't the first signal
121  }
122 
123  // This point should never be reached more than once.
124 
125  // Tell heartbeat() to shutdown by unsetting the 'g_running' flag.
126  // If 'g_running' is already false, this has no effect and the
127  // shutdown is already in progress.
128  g_running = false;
129 
130  // Handle core dumps specially by pausing inside this signal handler
131  // because on some systems, some signals will execute their default
132  // action immediately when and if the signal handler returns.
133  // We would like to do some emergency cleanup before core dump.
134  if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
135 #ifndef _WIN32
136  || signum == SIGQUIT
137 #endif
138  ) {
139  // Wait briefly to give heartbeat() a chance to flush the logs and
140  // do any other emergency shutdown tasks.
141  std::this_thread::sleep_for(std::chrono::seconds(2));
142 
143  // Explicitly trigger whatever default action this signal would
144  // have done, such as terminate the process or dump core.
145  // Signals are currently blocked so this new signal will be queued
146  // until this signal handler returns.
147  register_signal_handler(signum, SIG_DFL);
148 #ifdef _WIN32
149  raise(signum);
150 #else
151  kill(getpid(), signum);
152 #endif
153  std::this_thread::sleep_for(std::chrono::seconds(5));
154 
155 #ifndef __APPLE__
156  // as a last resort, abort
157  // primary used in Docker environments, where we can end up with PID 1 and fail to
158  // catch unix signals
159  quick_exit(signum);
160 #endif
161  }
162 }
163 
166 #ifndef _WIN32
169 #endif
173 #ifndef _WIN32
174  // Thrift secure socket can cause problems with SIGPIPE
175  register_signal_handler(SIGPIPE, SIG_IGN);
176 #endif
177 }
178 
179 void start_server(TThreadedServer& server, const int port) {
180  try {
181  server.serve();
182  if (errno != 0) {
183  throw std::runtime_error(std::string("Thrift server exited: ") +
184  std::strerror(errno));
185  }
186  } catch (std::exception& e) {
187  LOG(ERROR) << "Exception: " << e.what() << ": port " << port << std::endl;
188  }
189 }
190 
191 void releaseWarmupSession(TSessionId& sessionId, std::ifstream& query_file) noexcept {
192  query_file.close();
193  if (sessionId != g_warmup_handler->getInvalidSessionId()) {
194  try {
195  g_warmup_handler->disconnect(sessionId);
196  } catch (...) {
197  LOG(ERROR) << "Failed to disconnect warmup session, possible failure to run warmup "
198  "queries.";
199  }
200  }
201 }
202 
203 void run_warmup_queries(mapd::shared_ptr<DBHandler> handler,
204  std::string base_path,
205  std::string query_file_path) {
206  // run warmup queries to load cache if requested
207  if (query_file_path.empty()) {
208  return;
209  }
210  if (handler->isAggregator()) {
211  LOG(INFO) << "Skipping warmup query execution on the aggregator, queries should be "
212  "run directly on the leaf nodes.";
213  return;
214  }
215 
216  LOG(INFO) << "Running DB warmup with queries from " << query_file_path;
217  try {
218  g_warmup_handler = handler;
219  std::string db_info;
220  std::string user_keyword, user_name, db_name;
221  std::ifstream query_file;
224  TSessionId sessionId = g_warmup_handler->getInvalidSessionId();
225 
226  ScopeGuard session_guard = [&] { releaseWarmupSession(sessionId, query_file); };
227  query_file.open(query_file_path);
228  while (std::getline(query_file, db_info)) {
229  if (db_info.length() == 0) {
230  continue;
231  }
232  std::istringstream iss(db_info);
233  iss >> user_keyword >> user_name >> db_name;
234  if (user_keyword.compare(0, 4, "USER") == 0) {
235  // connect to DB for given user_name/db_name with super_user_rights (without
236  // password), & start session
237  g_warmup_handler->super_user_rights_ = true;
238  g_warmup_handler->connect(sessionId, user_name, "", db_name);
239  g_warmup_handler->super_user_rights_ = false;
240 
241  // read and run one query at a time for the DB with the setup connection
242  TQueryResult ret;
243  std::string single_query;
244  while (std::getline(query_file, single_query)) {
245  boost::algorithm::trim(single_query);
246  if (single_query.length() == 0 || single_query[0] == '-') {
247  continue;
248  }
249  if (single_query[0] == '}') {
250  single_query.clear();
251  break;
252  }
253  if (single_query.find(';') == single_query.npos) {
254  std::string multiline_query;
255  std::getline(query_file, multiline_query, ';');
256  single_query += multiline_query;
257  }
258 
259  try {
260  g_warmup_handler->sql_execute(ret, sessionId, single_query, true, "", -1, -1);
261  } catch (...) {
262  LOG(WARNING) << "Exception while executing '" << single_query
263  << "', ignoring";
264  }
265  single_query.clear();
266  }
267 
268  // stop session and disconnect from the DB
269  g_warmup_handler->disconnect(sessionId);
270  sessionId = g_warmup_handler->getInvalidSessionId();
271  } else {
272  LOG(WARNING) << "\nSyntax error in the file: " << query_file_path.c_str()
273  << " Missing expected keyword USER. Following line will be ignored: "
274  << db_info.c_str() << std::endl;
275  }
276  db_info.clear();
277  }
278  } catch (const std::exception& e) {
279  LOG(WARNING)
280  << "Exception while executing warmup queries. "
281  << "Warmup may not be fully completed. Will proceed nevertheless.\nError was: "
282  << e.what();
283  }
284 }
285 
286 void heartbeat() {
287 #ifndef _WIN32
288  // Block all signals for this heartbeat thread, only.
289  sigset_t set;
290  sigfillset(&set);
291  int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
292  if (result != 0) {
293  throw std::runtime_error("heartbeat() thread startup failed");
294  }
295 #endif
296 
297  // Sleep until omnisci_signal_handler or anything clears the g_running flag.
298  VLOG(1) << "heartbeat thread starting";
299  while (::g_running) {
300  using namespace std::chrono;
301  std::this_thread::sleep_for(1s);
302  }
303  VLOG(1) << "heartbeat thread exiting";
304 
305  // Get the signal number if there was a signal.
306  int signum = g_saw_signal;
307  if (signum >= 1 && signum != SIGTERM) {
308  LOG(INFO) << "Interrupt signal (" << signum << ") received.";
309  }
310 
311  // if dumping core, try to do some quick stuff
312  if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
313 #ifndef _WIN32
314  || signum == SIGQUIT
315 #endif
316  ) {
317  if (g_mapd_handler) {
318  std::call_once(g_shutdown_once_flag,
319  []() { g_mapd_handler->emergency_shutdown(); });
320  }
322  return;
323  // core dump should begin soon after this, see omnisci_signal_handler()
324  }
325 
326  // trigger an orderly shutdown by telling Thrift to stop serving
327  {
328  mapd_shared_lock<mapd_shared_mutex> read_lock(g_thrift_mutex);
329  auto httpserv = g_thrift_http_server;
330  if (httpserv) {
331  httpserv->stop();
332  }
333  auto bufserv = g_thrift_buf_server;
334  if (bufserv) {
335  bufserv->stop();
336  }
337  // main() should return soon after this
338  }
339 }
340 
341 int startMapdServer(CommandLineOptions& prog_config_opts, bool start_http_server = true) {
342  // try to enforce an orderly shutdown even after a signal
344 
345  // register shutdown procedures for when a normal shutdown happens
346  // be aware that atexit() functions run in reverse order
347  atexit(&logger::shutdown);
348  atexit(&shutdown_handler);
349 
350 #ifdef HAVE_AWS_S3
352  ScopeGuard aws_sdk_guard = [] { omnisci_aws_sdk::shutdown_sdk(); };
353 #endif
354 
355  // start background thread to clean up _DELETE_ME files
356  const unsigned int wait_interval =
357  3; // wait time in secs after looking for deleted file before looking again
358  std::thread file_delete_thread(file_delete,
359  std::ref(g_running),
360  wait_interval,
361  prog_config_opts.base_path + "/mapd_data");
362  std::thread heartbeat_thread(heartbeat);
363 
364  if (!g_enable_thrift_logs) {
365  apache::thrift::GlobalOutput.setOutputFunction([](const char* msg) {});
366  }
367 
369  // Use the locale setting of the server by default. The generate parameter can be
370  // updated appropriately if a locale override option is ever supported.
371  boost::locale::generator generator;
372  std::locale::global(generator.generate(""));
373  }
374 
375  try {
376  if (prog_config_opts.system_parameters.master_address.empty()) {
378  mapd::make_shared<DBHandler>(prog_config_opts.db_leaves,
379  prog_config_opts.string_leaves,
380  prog_config_opts.base_path,
381  prog_config_opts.allow_multifrag,
382  prog_config_opts.jit_debug,
383  prog_config_opts.intel_jit_profile,
384  prog_config_opts.read_only,
385  prog_config_opts.allow_loop_joins,
386  prog_config_opts.enable_rendering,
387  prog_config_opts.renderer_use_vulkan_driver,
388  prog_config_opts.enable_auto_clear_render_mem,
389  prog_config_opts.render_oom_retry_threshold,
390  prog_config_opts.render_mem_bytes,
391  prog_config_opts.max_concurrent_render_sessions,
392  prog_config_opts.reserved_gpu_mem,
393  prog_config_opts.render_compositor_use_last_gpu,
394  prog_config_opts.num_reader_threads,
395  prog_config_opts.authMetadata,
396  prog_config_opts.system_parameters,
397  prog_config_opts.enable_legacy_syntax,
398  prog_config_opts.idle_session_duration,
399  prog_config_opts.max_session_duration,
400  prog_config_opts.enable_runtime_udf,
401  prog_config_opts.udf_file_name,
402  prog_config_opts.udf_compiler_path,
403  prog_config_opts.udf_compiler_options,
404 #ifdef ENABLE_GEOS
405  prog_config_opts.libgeos_so_filename,
406 #endif
407  prog_config_opts.disk_cache_config,
408  false);
409  } else { // running ha server
410  LOG(FATAL)
411  << "No High Availability module available, please contact OmniSci support";
412  }
413  } catch (const std::exception& e) {
414  LOG(FATAL) << "Failed to initialize service handler: " << e.what();
415  }
416 
417  if (g_enable_fsi) {
419  }
420 
421  mapd::shared_ptr<TServerSocket> serverSocket;
422  mapd::shared_ptr<TServerSocket> httpServerSocket;
423  if (!prog_config_opts.system_parameters.ssl_cert_file.empty() &&
424  !prog_config_opts.system_parameters.ssl_key_file.empty()) {
425  mapd::shared_ptr<TSSLSocketFactory> sslSocketFactory;
426  sslSocketFactory =
427  mapd::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory(SSLProtocol::SSLTLS));
428  sslSocketFactory->loadCertificate(
429  prog_config_opts.system_parameters.ssl_cert_file.c_str());
430  sslSocketFactory->loadPrivateKey(
431  prog_config_opts.system_parameters.ssl_key_file.c_str());
432  if (prog_config_opts.system_parameters.ssl_transport_client_auth) {
433  sslSocketFactory->authenticate(true);
434  } else {
435  sslSocketFactory->authenticate(false);
436  }
437  sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
438  serverSocket = mapd::shared_ptr<TServerSocket>(new TSSLServerSocket(
439  prog_config_opts.system_parameters.omnisci_server_port, sslSocketFactory));
440  httpServerSocket = mapd::shared_ptr<TServerSocket>(
441  new TSSLServerSocket(prog_config_opts.http_port, sslSocketFactory));
442  LOG(INFO) << " OmniSci server using encrypted connection. Cert file ["
443  << prog_config_opts.system_parameters.ssl_cert_file << "], key file ["
444  << prog_config_opts.system_parameters.ssl_key_file << "]";
445  } else {
446  LOG(INFO) << " OmniSci server using unencrypted connection";
447  serverSocket = mapd::shared_ptr<TServerSocket>(
448  new TServerSocket(prog_config_opts.system_parameters.omnisci_server_port));
449  httpServerSocket =
450  mapd::shared_ptr<TServerSocket>(new TServerSocket(prog_config_opts.http_port));
451  }
452 
453  ScopeGuard pointer_to_thrift_guard = [] {
454  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
456  };
457 
458  mapd::shared_ptr<TProcessor> processor(
459  new TrackingProcessor(g_mapd_handler, prog_config_opts.log_user_origin));
460  mapd::shared_ptr<TTransportFactory> bufTransportFactory(
461  new TBufferedTransportFactory());
462  mapd::shared_ptr<TProtocolFactory> bufProtocolFactory(new TBinaryProtocolFactory());
463 
464  mapd::shared_ptr<TServerTransport> bufServerTransport(serverSocket);
465  TThreadedServer bufServer(
466  processor, bufServerTransport, bufTransportFactory, bufProtocolFactory);
467  {
468  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
469  g_thrift_buf_server = &bufServer;
470  }
471 
472  std::thread bufThread(start_server,
473  std::ref(bufServer),
474  prog_config_opts.system_parameters.omnisci_server_port);
475 
476  // TEMPORARY
477  auto warmup_queries = [&prog_config_opts]() {
478  // run warm up queries if any exists
480  g_mapd_handler, prog_config_opts.base_path, prog_config_opts.db_query_file);
481  if (prog_config_opts.exit_after_warmup) {
482  g_running = false;
483  }
484  };
485 
486  mapd::shared_ptr<TServerTransport> httpServerTransport(httpServerSocket);
487  mapd::shared_ptr<TTransportFactory> httpTransportFactory(
488  new THttpServerTransportFactory());
489  mapd::shared_ptr<TProtocolFactory> httpProtocolFactory(new TJSONProtocolFactory());
490  TThreadedServer httpServer(
491  processor, httpServerTransport, httpTransportFactory, httpProtocolFactory);
492  if (start_http_server) {
493  {
494  mapd_lock_guard<mapd_shared_mutex> write_lock(g_thrift_mutex);
495  g_thrift_http_server = &httpServer;
496  }
497  std::thread httpThread(
498  start_server, std::ref(httpServer), prog_config_opts.http_port);
499 
500  warmup_queries();
501 
502  bufThread.join();
503  httpThread.join();
504  } else {
505  warmup_queries();
506  bufThread.join();
507  }
508 
509  g_running = false;
510  file_delete_thread.join();
511  heartbeat_thread.join();
512 
513  if (g_enable_fsi) {
515  }
516 
517  int signum = g_saw_signal;
518  if (signum <= 0 || signum == SIGTERM) {
519  return 0;
520  } else {
521  return signum;
522  }
523 }
524 
525 int main(int argc, char** argv) {
526  bool has_clust_topo = false;
527 
528  CommandLineOptions prog_config_opts(argv[0], has_clust_topo);
529 
530  try {
531  if (auto return_code =
532  prog_config_opts.parse_command_line(argc, argv, !has_clust_topo)) {
533  return *return_code;
534  }
535 
536  if (!has_clust_topo) {
537  prog_config_opts.validate_base_path();
538  prog_config_opts.validate();
539  return (startMapdServer(prog_config_opts));
540  }
541  } catch (std::runtime_error& e) {
542  std::cerr << "Can't start: " << e.what() << std::endl;
543  return 1;
544  } catch (boost::program_options::error& e) {
545  std::cerr << "Usage Error: " << e.what() << std::endl;
546  return 1;
547  }
548 }
mapd_shared_mutex g_thrift_mutex
Definition: MapDServer.cpp:78
TThreadedServer * g_thrift_buf_server
Definition: MapDServer.cpp:80
mapd::shared_ptr< DBHandler > g_warmup_handler
Definition: MapDServer.cpp:82
std::vector< LeafHostInfo > string_leaves
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file) noexcept
Definition: MapDServer.cpp:191
std::string udf_compiler_path
std::once_flag g_shutdown_once_flag
Definition: MapDServer.cpp:86
#define LOG(tag)
Definition: Logger.h:194
DiskCacheConfig disk_cache_config
shared utility for the db server and string dictionary server to remove old files ...
void heartbeat()
Definition: MapDServer.cpp:286
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
void run_warmup_queries(mapd::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
Definition: MapDServer.cpp:203
TThreadedServer * g_thrift_http_server
Definition: MapDServer.cpp:79
size_t max_concurrent_render_sessions
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
std::vector< LeafHostInfo > db_leaves
std::shared_timed_mutex mapd_shared_mutex
bool g_enable_thrift_logs
Definition: initdb.cpp:42
mapd::shared_ptr< DBHandler > g_mapd_handler
Definition: MapDServer.cpp:85
void start_server(TThreadedServer &server, const int port)
Definition: MapDServer.cpp:179
std::string ssl_key_file
AuthMetadata authMetadata
void shutdown_handler()
Definition: MapDServer.cpp:88
std::atomic< int > g_saw_signal
Definition: MapDServer.cpp:76
static void start(std::atomic< bool > &is_program_running)
void register_signal_handlers()
Definition: MapDServer.cpp:164
void shutdown()
Definition: Logger.cpp:314
bool g_enable_experimental_string_functions
std::atomic< bool > g_running
Definition: MapDServer.cpp:75
std::vector< std::string > udf_compiler_options
void register_signal_handler(int signum, void(*handler)(int))
Definition: MapDServer.cpp:94
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
Definition: File.cpp:280
bool g_enable_fsi
Definition: Catalog.cpp:92
void omnisci_signal_handler(int signum)
Definition: MapDServer.cpp:115
int startMapdServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
Definition: MapDServer.cpp:341
#define VLOG(n)
Definition: Logger.h:297
std::string master_address
SystemParameters system_parameters
std::string ssl_cert_file