20 #ifdef HAVE_THRIFT_THREADFACTORY
21 #include <thrift/concurrency/ThreadFactory.h>
23 #include <thrift/concurrency/PlatformThreadFactory.h>
26 #include <thrift/concurrency/ThreadManager.h>
27 #include <thrift/protocol/TBinaryProtocol.h>
28 #include <thrift/protocol/TJSONProtocol.h>
29 #include <thrift/server/TThreadedServer.h>
30 #include <thrift/transport/TBufferTransports.h>
31 #include <thrift/transport/THttpServer.h>
32 #include <thrift/transport/TSSLServerSocket.h>
33 #include <thrift/transport/TSSLSocket.h>
34 #include <thrift/transport/TServerSocket.h>
43 #include <boost/algorithm/string.hpp>
44 #include <boost/algorithm/string/trim.hpp>
45 #include <boost/filesystem.hpp>
46 #include <boost/locale/generator.hpp>
47 #include <boost/make_shared.hpp>
48 #include <boost/program_options.hpp>
59 #include "MapDRelease.h"
67 using namespace ::apache::thrift;
68 using namespace ::apache::thrift::concurrency;
69 using namespace ::apache::thrift::protocol;
70 using namespace ::apache::thrift::server;
71 using namespace ::apache::thrift::transport;
96 signal(signum, handler);
99 memset(&act, 0,
sizeof(act));
100 if (handler != SIG_DFL && handler != SIG_IGN) {
102 sigfillset(&act.sa_mask);
104 act.sa_handler = handler;
105 sigaction(signum, &act, NULL);
118 int expected_signal{-1};
119 if (!
g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
134 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
141 std::this_thread::sleep_for(std::chrono::seconds(2));
151 kill(getpid(), signum);
153 std::this_thread::sleep_for(std::chrono::seconds(5));
183 throw std::runtime_error(std::string(
"Thrift server exited: ") +
184 std::strerror(errno));
186 }
catch (std::exception& e) {
187 LOG(
ERROR) <<
"Exception: " << e.what() <<
": port " << port << std::endl;
197 LOG(
ERROR) <<
"Failed to disconnect warmup session, possible failure to run warmup "
204 std::string base_path,
205 std::string query_file_path) {
207 if (query_file_path.empty()) {
210 if (handler->isAggregator()) {
211 LOG(
INFO) <<
"Skipping warmup query execution on the aggregator, queries should be "
212 "run directly on the leaf nodes.";
216 LOG(
INFO) <<
"Running DB warmup with queries from " << query_file_path;
220 std::string user_keyword, user_name, db_name;
221 std::ifstream query_file;
227 query_file.open(query_file_path);
228 while (std::getline(query_file, db_info)) {
229 if (db_info.length() == 0) {
232 std::istringstream iss(db_info);
233 iss >> user_keyword >> user_name >> db_name;
234 if (user_keyword.compare(0, 4,
"USER") == 0) {
243 std::string single_query;
244 while (std::getline(query_file, single_query)) {
245 boost::algorithm::trim(single_query);
246 if (single_query.length() == 0 || single_query[0] ==
'-') {
249 if (single_query[0] ==
'}') {
250 single_query.clear();
253 if (single_query.find(
';') == single_query.npos) {
254 std::string multiline_query;
255 std::getline(query_file, multiline_query,
';');
256 single_query += multiline_query;
260 g_warmup_handler->sql_execute(ret, sessionId, single_query,
true,
"", -1, -1);
262 LOG(
WARNING) <<
"Exception while executing '" << single_query
265 single_query.clear();
272 LOG(
WARNING) <<
"\nSyntax error in the file: " << query_file_path.c_str()
273 <<
" Missing expected keyword USER. Following line will be ignored: "
274 << db_info.c_str() << std::endl;
278 }
catch (
const std::exception& e) {
280 <<
"Exception while executing warmup queries. "
281 <<
"Warmup may not be fully completed. Will proceed nevertheless.\nError was: "
291 int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
293 throw std::runtime_error(
"heartbeat() thread startup failed");
298 VLOG(1) <<
"heartbeat thread starting";
300 using namespace std::chrono;
301 std::this_thread::sleep_for(1s);
303 VLOG(1) <<
"heartbeat thread exiting";
307 if (signum >= 1 && signum != SIGTERM) {
308 LOG(
INFO) <<
"Interrupt signal (" << signum <<
") received.";
312 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
356 const unsigned int wait_interval =
361 prog_config_opts.
base_path +
"/mapd_data");
365 apache::thrift::GlobalOutput.setOutputFunction([](
const char* msg) {});
371 boost::locale::generator generator;
372 std::locale::global(generator.generate(
""));
377 mapd::make_shared<DBHandler>(prog_config_opts.
db_leaves,
404 prog_config_opts.libgeos_so_filename,
408 }
catch (
const std::exception& e) {
409 LOG(
FATAL) <<
"Failed to initialize service handler: " << e.what();
416 mapd::shared_ptr<TServerSocket> serverSocket;
417 mapd::shared_ptr<TServerSocket> httpServerSocket;
420 mapd::shared_ptr<TSSLSocketFactory> sslSocketFactory;
422 mapd::shared_ptr<TSSLSocketFactory>(
new TSSLSocketFactory(SSLProtocol::SSLTLS));
423 sslSocketFactory->loadCertificate(
425 sslSocketFactory->loadPrivateKey(
428 sslSocketFactory->authenticate(
true);
430 sslSocketFactory->authenticate(
false);
432 sslSocketFactory->ciphers(
"ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
433 serverSocket = mapd::shared_ptr<TServerSocket>(
new TSSLServerSocket(
435 httpServerSocket = mapd::shared_ptr<TServerSocket>(
436 new TSSLServerSocket(prog_config_opts.
http_port, sslSocketFactory));
437 LOG(
INFO) <<
" OmniSci server using encrypted connection. Cert file ["
441 LOG(
INFO) <<
" OmniSci server using unencrypted connection";
442 serverSocket = mapd::shared_ptr<TServerSocket>(
445 mapd::shared_ptr<TServerSocket>(
new TServerSocket(prog_config_opts.
http_port));
454 mapd::shared_ptr<TProcessor> processor(
456 mapd::shared_ptr<TTransportFactory> bufTransportFactory(
457 new TBufferedTransportFactory());
458 mapd::shared_ptr<TProtocolFactory> bufProtocolFactory(
new TBinaryProtocolFactory());
460 mapd::shared_ptr<TServerTransport> bufServerTransport(serverSocket);
461 TThreadedServer bufServer(
462 processor, bufServerTransport, bufTransportFactory, bufProtocolFactory);
473 auto warmup_queries = [&prog_config_opts]() {
482 mapd::shared_ptr<TServerTransport> httpServerTransport(httpServerSocket);
483 mapd::shared_ptr<TTransportFactory> httpTransportFactory(
484 new THttpServerTransportFactory());
485 mapd::shared_ptr<TProtocolFactory> httpProtocolFactory(
new TJSONProtocolFactory());
486 TThreadedServer httpServer(
487 processor, httpServerTransport, httpTransportFactory, httpProtocolFactory);
488 if (start_http_server) {
493 std::thread httpThread(
505 LOG(
FATAL) <<
"No High Availability module available, please contact OmniSci support";
509 file_delete_thread.join();
510 heartbeat_thread.join();
517 if (signum <= 0 || signum == SIGTERM) {
524 int main(
int argc,
char** argv) {
525 bool has_clust_topo =
false;
530 if (
auto return_code =
535 if (!has_clust_topo) {
540 }
catch (std::runtime_error& e) {
541 std::cerr <<
"Can't start: " << e.what() << std::endl;
543 }
catch (boost::program_options::error& e) {
544 std::cerr <<
"Usage Error: " << e.what() << std::endl;
mapd_shared_mutex g_thrift_mutex
TThreadedServer * g_thrift_buf_server
int idle_session_duration
mapd::shared_ptr< DBHandler > g_warmup_handler
std::vector< LeafHostInfo > string_leaves
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file) noexcept
std::string udf_compiler_path
std::once_flag g_shutdown_once_flag
DiskCacheConfig disk_cache_config
std::string udf_file_name
shared utility for the db server and string dictionary server to remove old files ...
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
bool render_compositor_use_last_gpu
bool renderer_use_vulkan_driver
void run_warmup_queries(mapd::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
TThreadedServer * g_thrift_http_server
size_t max_concurrent_render_sessions
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
size_t num_reader_threads
std::vector< LeafHostInfo > db_leaves
std::shared_timed_mutex mapd_shared_mutex
bool enable_auto_clear_render_mem
bool g_enable_thrift_logs
int render_oom_retry_threshold
mapd::shared_ptr< DBHandler > g_mapd_handler
void start_server(TThreadedServer &server, const int port)
std::string db_query_file
AuthMetadata authMetadata
std::atomic< int > g_saw_signal
static void start(std::atomic< bool > &is_program_running)
void validate_base_path()
void register_signal_handlers()
bool g_enable_experimental_string_functions
std::atomic< bool > g_running
std::vector< std::string > udf_compiler_options
bool ssl_transport_client_auth
void register_signal_handler(int signum, void(*handler)(int))
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
void omnisci_signal_handler(int signum)
int startMapdServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
bool enable_legacy_syntax
SystemParameters system_parameters
std::string ssl_cert_file