19 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
23 #ifdef HAVE_THRIFT_THREADFACTORY
24 #include <thrift/concurrency/ThreadFactory.h>
26 #include <thrift/concurrency/PlatformThreadFactory.h>
29 #include <thrift/concurrency/ThreadManager.h>
30 #include <thrift/protocol/TBinaryProtocol.h>
31 #include <thrift/server/TThreadedServer.h>
32 #include <thrift/transport/TBufferTransports.h>
33 #include <thrift/transport/THttpServer.h>
34 #include <thrift/transport/TSSLServerSocket.h>
35 #include <thrift/transport/TSSLSocket.h>
36 #include <thrift/transport/TServerSocket.h>
45 #include <boost/algorithm/string.hpp>
46 #include <boost/algorithm/string/trim.hpp>
47 #include <boost/filesystem.hpp>
48 #include <boost/locale/generator.hpp>
49 #include <boost/make_shared.hpp>
50 #include <boost/program_options.hpp>
61 #include "MapDRelease.h"
69 using namespace ::apache::thrift;
70 using namespace ::apache::thrift::concurrency;
71 using namespace ::apache::thrift::protocol;
72 using namespace ::apache::thrift::server;
73 using namespace ::apache::thrift::transport;
98 signal(signum, handler);
100 struct sigaction act;
101 memset(&act, 0,
sizeof(act));
102 if (handler != SIG_DFL && handler != SIG_IGN) {
104 sigfillset(&act.sa_mask);
106 act.sa_handler = handler;
107 sigaction(signum, &act, NULL);
120 int expected_signal{-1};
121 if (!
g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
136 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
143 std::this_thread::sleep_for(std::chrono::seconds(2));
153 kill(getpid(), signum);
155 std::this_thread::sleep_for(std::chrono::seconds(5));
182 void start_server(std::shared_ptr<TThreadedServer> server,
const int port) {
186 throw std::runtime_error(std::string(
"Thrift server exited: ") +
187 std::strerror(errno));
189 }
catch (std::exception& e) {
190 LOG(
ERROR) <<
"Exception: " << e.what() <<
": port " << port << std::endl;
196 if (sessionId != g_warmup_handler->getInvalidSessionId()) {
198 g_warmup_handler->disconnect(sessionId);
200 LOG(
ERROR) <<
"Failed to disconnect warmup session, possible failure to run warmup "
207 std::string base_path,
208 std::string query_file_path) {
210 if (query_file_path.empty()) {
213 if (handler->isAggregator()) {
214 LOG(
INFO) <<
"Skipping warmup query execution on the aggregator, queries should be "
215 "run directly on the leaf nodes.";
219 LOG(
INFO) <<
"Running DB warmup with queries from " << query_file_path;
221 g_warmup_handler = handler;
223 std::string user_keyword, user_name, db_name;
224 std::ifstream query_file;
227 TSessionId sessionId = g_warmup_handler->getInvalidSessionId();
230 query_file.open(query_file_path);
231 while (std::getline(query_file, db_info)) {
232 if (db_info.length() == 0) {
235 std::istringstream iss(db_info);
236 iss >> user_keyword >> user_name >> db_name;
237 if (user_keyword.compare(0, 4,
"USER") == 0) {
240 g_warmup_handler->super_user_rights_ =
true;
241 g_warmup_handler->connect(sessionId, user_name,
"", db_name);
242 g_warmup_handler->super_user_rights_ =
false;
246 std::string single_query;
247 while (std::getline(query_file, single_query)) {
248 boost::algorithm::trim(single_query);
249 if (single_query.length() == 0 || single_query[0] ==
'-') {
252 if (single_query[0] ==
'}') {
253 single_query.clear();
256 if (single_query.find(
';') == single_query.npos) {
257 std::string multiline_query;
258 std::getline(query_file, multiline_query,
';');
259 single_query += multiline_query;
263 g_warmup_handler->sql_execute(ret, sessionId, single_query,
true,
"", -1, -1);
265 LOG(
WARNING) <<
"Exception while executing '" << single_query
268 single_query.clear();
272 g_warmup_handler->disconnect(sessionId);
273 sessionId = g_warmup_handler->getInvalidSessionId();
275 LOG(
WARNING) <<
"\nSyntax error in the file: " << query_file_path.c_str()
276 <<
" Missing expected keyword USER. Following line will be ignored: "
277 << db_info.c_str() << std::endl;
281 }
catch (
const std::exception& e) {
283 <<
"Exception while executing warmup queries. "
284 <<
"Warmup may not be fully completed. Will proceed nevertheless.\nError was: "
293 if (
auto thrift_http_server = g_thrift_http_server; thrift_http_server) {
294 thrift_http_server->stop();
296 g_thrift_http_server.reset();
298 if (
auto thrift_http_binary_server = g_thrift_http_binary_server;
299 thrift_http_binary_server) {
300 thrift_http_binary_server->stop();
302 g_thrift_http_binary_server.reset();
304 if (
auto thrift_tcp_server = g_thrift_tcp_server; thrift_tcp_server) {
305 thrift_tcp_server->stop();
307 g_thrift_tcp_server.reset();
315 int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
317 throw std::runtime_error(
"heartbeat() thread startup failed");
322 VLOG(1) <<
"heartbeat thread starting";
324 using namespace std::chrono;
325 std::this_thread::sleep_for(1s);
327 VLOG(1) <<
"heartbeat thread exiting";
331 if (signum >= 1 && signum != SIGTERM) {
332 LOG(
INFO) <<
"Interrupt signal (" << signum <<
") received.";
336 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
342 if (
auto db_handler = g_db_handler; db_handler) {
343 db_handler->emergency_shutdown();
356 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
358 class UnboundedTBufferedTransportFactory :
public TBufferedTransportFactory {
360 UnboundedTBufferedTransportFactory() : TBufferedTransportFactory() {}
362 std::shared_ptr<TTransport> getTransport(
363 std::shared_ptr<TTransport> transport)
override {
368 class UnboundedTHttpServerTransportFactory :
public THttpServerTransportFactory {
370 UnboundedTHttpServerTransportFactory() : THttpServerTransportFactory() {}
372 std::shared_ptr<TTransport> getTransport(
373 std::shared_ptr<TTransport> transport)
override {
381 bool start_http_server =
true) {
383 LOG(
INFO) <<
"HeavyDB starting up";
389 LOG(
INFO) <<
"TBB max concurrency: " << tbb_max_concurrency <<
" threads.";
393 #endif // HAVE_AWS_S3
394 std::set<std::unique_ptr<std::thread>> server_threads;
395 auto wait_for_server_threads = [&] {
396 for (
auto& th : server_threads) {
399 }
catch (
const std::system_error& e) {
400 if (e.code() != std::errc::invalid_argument) {
401 LOG(
WARNING) <<
"std::thread join failed: " << e.what();
403 }
catch (
const std::exception& e) {
404 LOG(
WARNING) <<
"std::thread join failed: " << e.what();
413 LOG(
INFO) <<
"HeavyDB shutting down";
423 g_db_handler.reset();
425 wait_for_server_threads();
429 #endif // HAVE_AWS_S3
436 const unsigned int wait_interval =
438 server_threads.insert(std::make_unique<std::thread>(
443 server_threads.insert(std::make_unique<std::thread>(
heartbeat));
445 if (!g_enable_thrift_logs) {
446 apache::thrift::GlobalOutput.setOutputFunction([](
const char* msg) {});
454 std::make_shared<DBHandler>(prog_config_opts.
db_leaves,
483 prog_config_opts.libgeos_so_filename,
489 <<
"No High Availability module available, please contact OmniSci support";
491 }
catch (
const std::exception& e) {
492 LOG(
FATAL) <<
"Failed to initialize service handler: " << e.what();
500 std::shared_ptr<TServerSocket> tcp_socket;
501 std::shared_ptr<TServerSocket> http_socket;
502 std::shared_ptr<TServerSocket> http_binary_socket;
507 auto sslSocketFactory = std::make_shared<TSSLSocketFactory>(SSLProtocol::SSLTLS);
508 sslSocketFactory->loadCertificate(
510 sslSocketFactory->loadPrivateKey(
513 sslSocketFactory->authenticate(
true);
515 sslSocketFactory->authenticate(
false);
517 sslSocketFactory->ciphers(
"ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
518 tcp_socket = std::make_shared<TSSLServerSocket>(
520 if (start_http_server) {
521 http_socket = std::make_shared<TSSLServerSocket>(prog_config_opts.
http_port,
525 http_binary_socket = std::make_shared<TSSLServerSocket>(
528 LOG(
INFO) <<
" HeavyDB server using encrypted connection. Cert file ["
534 LOG(
INFO) <<
" HeavyDB server using unencrypted connection";
535 tcp_socket = std::make_shared<TServerSocket>(
537 if (start_http_server) {
538 http_socket = std::make_shared<TServerSocket>(prog_config_opts.
http_port);
547 std::shared_ptr<TProcessor> processor{std::make_shared<TrackingProcessor>(
551 std::shared_ptr<TServerTransport> tcp_st = tcp_socket;
552 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
553 std::shared_ptr<TTransportFactory> tcp_tf{
554 std::make_shared<UnboundedTBufferedTransportFactory>()};
556 std::shared_ptr<TTransportFactory> tcp_tf{
557 std::make_shared<TBufferedTransportFactory>()};
559 std::shared_ptr<TProtocolFactory> tcp_pf{std::make_shared<TBinaryProtocolFactory>()};
560 g_thrift_tcp_server.reset(
new TThreadedServer(processor, tcp_st, tcp_tf, tcp_pf));
561 server_threads.insert(std::make_unique<std::thread>(
567 if (start_http_server) {
568 std::shared_ptr<TServerTransport> http_st = http_socket;
569 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
570 std::shared_ptr<TTransportFactory> http_tf{
571 std::make_shared<UnboundedTHttpServerTransportFactory>()};
573 std::shared_ptr<TTransportFactory> http_tf{
574 std::make_shared<THttpServerTransportFactory>()};
576 std::shared_ptr<TProtocolFactory> http_pf{std::make_shared<TJSONProtocolFactory>()};
577 g_thrift_http_server.reset(
new TThreadedServer(processor, http_st, http_tf, http_pf));
578 server_threads.insert(std::make_unique<std::thread>(
584 std::shared_ptr<TServerTransport> http_binary_st = http_binary_socket;
585 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
586 std::shared_ptr<TTransportFactory> http_binary_tf{
587 std::make_shared<UnboundedTHttpServerTransportFactory>()};
589 std::shared_ptr<TTransportFactory> http_binary_tf{
590 std::make_shared<THttpServerTransportFactory>()};
592 std::shared_ptr<TProtocolFactory> http_binary_pf{
593 std::make_shared<TBinaryProtocolFactory>()};
594 g_thrift_http_binary_server.reset(
595 new TThreadedServer(processor, http_binary_st, http_binary_tf, http_binary_pf));
596 server_threads.insert(std::make_unique<std::thread>(
608 wait_for_server_threads();
612 if (signum <= 0 || signum == SIGTERM) {
619 int main(
int argc,
char** argv) {
620 bool has_clust_topo =
false;
625 if (
auto return_code =
630 if (!has_clust_topo) {
635 }
catch (std::runtime_error& e) {
636 std::cerr <<
"Server Error: " << e.what() << std::endl;
638 }
catch (boost::program_options::error& e) {
639 std::cerr <<
"Usage Error: " << e.what() << std::endl;
const std::string kDataDirectoryName
void run_warmup_queries(std::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
int idle_session_duration
unsigned renderer_vulkan_timeout_ms
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
std::shared_ptr< DBHandler > g_db_handler
bool renderer_use_ppll_polys
std::string udf_file_name
shared utility for the db server and string dictionary server to remove old files ...
void start_server(std::shared_ptr< TThreadedServer > server, const int port)
std::shared_ptr< TThreadedServer > g_thrift_http_binary_server
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
bool render_compositor_use_last_gpu
bool renderer_prefer_igpu
bool renderer_use_parallel_executors
std::atomic< int > g_saw_signal
size_t max_concurrent_render_sessions
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
tbb::task_arena g_tbb_arena
size_t num_reader_threads
std::shared_ptr< apache::thrift::TConfiguration > default_tconfig()
std::vector< LeafHostInfo > db_leaves
int startHeavyDBServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
bool g_enable_http_binary_server
bool enable_auto_clear_render_mem
std::shared_ptr< TThreadedServer > g_thrift_http_server
int render_oom_retry_threshold
std::string db_query_file
AuthMetadata authMetadata
static void start(std::atomic< bool > &is_program_running)
void validate_base_path()
std::vector< std::string > udf_compiler_options
bool ssl_transport_client_auth
void register_signal_handler(int signum, void(*handler)(int))
std::shared_ptr< DBHandler > g_warmup_handler
std::atomic< bool > g_running
File_Namespace::DiskCacheConfig disk_cache_config
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
bool g_enable_thrift_logs
void heavydb_signal_handler(int signum)
void register_signal_handlers()
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file) noexcept
bool enable_legacy_syntax
std::string master_address
std::shared_ptr< TThreadedServer > g_thrift_tcp_server
SystemParameters system_parameters
std::string ssl_cert_file