19 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
23 #ifdef HAVE_THRIFT_THREADFACTORY
24 #include <thrift/concurrency/ThreadFactory.h>
26 #include <thrift/concurrency/PlatformThreadFactory.h>
29 #include <thrift/concurrency/ThreadManager.h>
30 #include <thrift/protocol/TBinaryProtocol.h>
31 #include <thrift/server/TThreadedServer.h>
32 #include <thrift/transport/TBufferTransports.h>
33 #include <thrift/transport/THttpServer.h>
34 #include <thrift/transport/TSSLServerSocket.h>
35 #include <thrift/transport/TSSLSocket.h>
36 #include <thrift/transport/TServerSocket.h>
45 #include <boost/algorithm/string.hpp>
46 #include <boost/algorithm/string/trim.hpp>
47 #include <boost/filesystem.hpp>
48 #include <boost/locale/generator.hpp>
49 #include <boost/make_shared.hpp>
50 #include <boost/program_options.hpp>
53 #include <tbb/global_control.h>
73 using namespace ::apache::thrift;
74 using namespace ::apache::thrift::concurrency;
75 using namespace ::apache::thrift::protocol;
76 using namespace ::apache::thrift::server;
77 using namespace ::apache::thrift::transport;
102 signal(signum, handler);
104 struct sigaction act;
105 memset(&act, 0,
sizeof(act));
106 if (handler != SIG_DFL && handler != SIG_IGN) {
108 sigfillset(&act.sa_mask);
110 act.sa_handler = handler;
111 sigaction(signum, &act, NULL);
124 int expected_signal{-1};
125 if (!
g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
140 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
147 std::this_thread::sleep_for(std::chrono::seconds(2));
157 kill(getpid(), signum);
159 std::this_thread::sleep_for(std::chrono::seconds(5));
186 void start_server(std::shared_ptr<TThreadedServer> server,
const int port) {
190 throw std::runtime_error(std::string(
"Thrift server exited: ") +
191 std::strerror(errno));
193 }
catch (std::exception& e) {
194 LOG(
ERROR) <<
"Exception: " << e.what() <<
": port " << port << std::endl;
200 if (sessionId != g_warmup_handler->getInvalidSessionId()) {
202 g_warmup_handler->disconnect(sessionId);
204 LOG(
ERROR) <<
"Failed to disconnect warmup session, possible failure to run warmup "
211 std::string base_path,
212 std::string query_file_path) {
214 if (query_file_path.empty()) {
217 if (handler->isAggregator()) {
218 LOG(
INFO) <<
"Skipping warmup query execution on the aggregator, queries should be "
219 "run directly on the leaf nodes.";
223 LOG(
INFO) <<
"Running DB warmup with queries from " << query_file_path;
225 g_warmup_handler = handler;
227 std::string user_keyword, user_name, db_name;
228 std::ifstream query_file;
231 TSessionId sessionId = g_warmup_handler->getInvalidSessionId();
234 query_file.open(query_file_path);
235 while (std::getline(query_file, db_info)) {
236 if (db_info.length() == 0) {
239 std::istringstream iss(db_info);
240 iss >> user_keyword >> user_name >> db_name;
241 if (user_keyword.compare(0, 4,
"USER") == 0) {
244 g_warmup_handler->super_user_rights_ =
true;
245 g_warmup_handler->connect(sessionId, user_name,
"", db_name);
246 g_warmup_handler->super_user_rights_ =
false;
250 std::string single_query;
251 while (std::getline(query_file, single_query)) {
252 boost::algorithm::trim(single_query);
253 if (single_query.length() == 0 || single_query[0] ==
'-') {
256 if (single_query[0] ==
'}') {
257 single_query.clear();
260 if (single_query.find(
';') == single_query.npos) {
261 std::string multiline_query;
262 std::getline(query_file, multiline_query,
';');
263 single_query += multiline_query;
267 g_warmup_handler->sql_execute(ret, sessionId, single_query,
true,
"", -1, -1);
269 LOG(
WARNING) <<
"Exception while executing '" << single_query
272 single_query.clear();
276 g_warmup_handler->disconnect(sessionId);
277 sessionId = g_warmup_handler->getInvalidSessionId();
279 LOG(
WARNING) <<
"\nSyntax error in the file: " << query_file_path.c_str()
280 <<
" Missing expected keyword USER. Following line will be ignored: "
281 << db_info.c_str() << std::endl;
285 }
catch (
const std::exception& e) {
287 <<
"Exception while executing warmup queries. "
288 <<
"Warmup may not be fully completed. Will proceed nevertheless.\nError was: "
298 if (
auto thrift_http_server = g_thrift_http_server; thrift_http_server) {
299 thrift_http_server->stop();
301 g_thrift_http_server.reset();
303 if (
auto thrift_http_binary_server = g_thrift_http_binary_server;
304 thrift_http_binary_server) {
305 thrift_http_binary_server->stop();
307 g_thrift_http_binary_server.reset();
309 if (
auto thrift_tcp_server = g_thrift_tcp_server; thrift_tcp_server) {
310 thrift_tcp_server->stop();
312 g_thrift_tcp_server.reset();
320 int result = pthread_sigmask(SIG_BLOCK, &set, NULL);
322 throw std::runtime_error(
"heartbeat() thread startup failed");
327 VLOG(1) <<
"heartbeat thread starting";
329 using namespace std::chrono;
330 std::this_thread::sleep_for(1s);
332 VLOG(1) <<
"heartbeat thread exiting";
336 if (signum >= 1 && signum != SIGTERM) {
337 LOG(
INFO) <<
"Interrupt signal (" << signum <<
") received.";
341 if (signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE
347 if (
auto db_handler = g_db_handler; db_handler) {
348 db_handler->emergency_shutdown();
361 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
363 class UnboundedTBufferedTransportFactory :
public TBufferedTransportFactory {
365 UnboundedTBufferedTransportFactory() : TBufferedTransportFactory() {}
367 std::shared_ptr<TTransport> getTransport(
368 std::shared_ptr<TTransport> transport)
override {
373 class UnboundedTHttpServerTransportFactory :
public THttpServerTransportFactory {
375 UnboundedTHttpServerTransportFactory() : THttpServerTransportFactory() {}
377 std::shared_ptr<TTransport> getTransport(
378 std::shared_ptr<TTransport> transport)
override {
386 bool start_http_server =
true) {
388 LOG(
INFO) <<
"HeavyDB starting up";
392 LOG(
INFO) <<
"Initializing TBB with " << num_cpu_threads <<
" threads.";
393 tbb::global_control tbb_control(tbb::global_control::max_allowed_parallelism,
397 LOG(
INFO) <<
"TBB max concurrency: " << tbb_max_concurrency <<
" threads.";
401 #endif // HAVE_AWS_S3
402 std::set<std::unique_ptr<std::thread>> server_threads;
403 auto wait_for_server_threads = [&] {
404 for (
auto& th : server_threads) {
407 }
catch (
const std::system_error& e) {
408 if (e.code() != std::errc::invalid_argument) {
409 LOG(
WARNING) <<
"std::thread join failed: " << e.what();
411 }
catch (
const std::exception& e) {
412 LOG(
WARNING) <<
"std::thread join failed: " << e.what();
421 LOG(
INFO) <<
"HeavyDB shutting down";
431 g_db_handler.reset();
433 wait_for_server_threads();
437 #endif // HAVE_AWS_S3
444 const unsigned int wait_interval =
446 server_threads.insert(std::make_unique<std::thread>(
451 server_threads.insert(std::make_unique<std::thread>(
heartbeat));
453 if (!g_enable_thrift_logs) {
454 apache::thrift::GlobalOutput.setOutputFunction([](
const char* msg) {});
462 std::make_shared<DBHandler>(prog_config_opts.
db_leaves,
491 prog_config_opts.libgeos_so_filename,
493 #ifdef HAVE_TORCH_TFS
494 prog_config_opts.torch_lib_path,
500 <<
"No High Availability module available, please contact OmniSci support";
502 }
catch (
const std::exception& e) {
503 LOG(
FATAL) <<
"Failed to initialize service handler: " << e.what();
514 if (g_enable_fsi && g_enable_foreign_table_scheduled_refresh) {
519 std::shared_ptr<TServerSocket> tcp_socket;
520 std::shared_ptr<TServerSocket> http_socket;
521 std::shared_ptr<TServerSocket> http_binary_socket;
526 auto sslSocketFactory = std::make_shared<TSSLSocketFactory>(SSLProtocol::SSLTLS);
527 sslSocketFactory->loadCertificate(
529 sslSocketFactory->loadPrivateKey(
532 sslSocketFactory->authenticate(
true);
534 sslSocketFactory->authenticate(
false);
536 sslSocketFactory->ciphers(
"ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
537 tcp_socket = std::make_shared<TSSLServerSocket>(
539 if (start_http_server) {
540 http_socket = std::make_shared<TSSLServerSocket>(prog_config_opts.
http_port,
544 http_binary_socket = std::make_shared<TSSLServerSocket>(
547 LOG(
INFO) <<
" HeavyDB server using encrypted connection. Cert file ["
553 LOG(
INFO) <<
" HeavyDB server using unencrypted connection";
554 tcp_socket = std::make_shared<TServerSocket>(
556 if (start_http_server) {
557 http_socket = std::make_shared<TServerSocket>(prog_config_opts.
http_port);
566 std::shared_ptr<TProcessor> processor{std::make_shared<TrackingProcessor>(
570 std::shared_ptr<TServerTransport> tcp_st = tcp_socket;
571 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
572 std::shared_ptr<TTransportFactory> tcp_tf{
573 std::make_shared<UnboundedTBufferedTransportFactory>()};
575 std::shared_ptr<TTransportFactory> tcp_tf{
576 std::make_shared<TBufferedTransportFactory>()};
578 std::shared_ptr<TProtocolFactory> tcp_pf{std::make_shared<TBinaryProtocolFactory>()};
579 g_thrift_tcp_server.reset(
new TThreadedServer(processor, tcp_st, tcp_tf, tcp_pf));
580 server_threads.insert(std::make_unique<std::thread>(
586 if (start_http_server) {
587 std::shared_ptr<TServerTransport> http_st = http_socket;
588 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
589 std::shared_ptr<TTransportFactory> http_tf{
590 std::make_shared<UnboundedTHttpServerTransportFactory>()};
592 std::shared_ptr<TTransportFactory> http_tf{
593 std::make_shared<THttpServerTransportFactory>()};
595 std::shared_ptr<TProtocolFactory> http_pf{std::make_shared<TJSONProtocolFactory>()};
596 g_thrift_http_server.reset(
new TThreadedServer(processor, http_st, http_tf, http_pf));
597 server_threads.insert(std::make_unique<std::thread>(
603 std::shared_ptr<TServerTransport> http_binary_st = http_binary_socket;
604 #ifdef HAVE_THRIFT_MESSAGE_LIMIT
605 std::shared_ptr<TTransportFactory> http_binary_tf{
606 std::make_shared<UnboundedTHttpServerTransportFactory>()};
608 std::shared_ptr<TTransportFactory> http_binary_tf{
609 std::make_shared<THttpServerTransportFactory>()};
611 std::shared_ptr<TProtocolFactory> http_binary_pf{
612 std::make_shared<TBinaryProtocolFactory>()};
613 g_thrift_http_binary_server.reset(
614 new TThreadedServer(processor, http_binary_st, http_binary_tf, http_binary_pf));
615 server_threads.insert(std::make_unique<std::thread>(
627 wait_for_server_threads();
631 if (signum <= 0 || signum == SIGTERM) {
638 int main(
int argc,
char** argv) {
639 bool has_clust_topo =
false;
644 if (
auto return_code =
649 if (!has_clust_topo) {
654 }
catch (std::runtime_error& e) {
655 std::cerr <<
"Server Error: " << e.what() << std::endl;
657 }
catch (boost::program_options::error& e) {
658 std::cerr <<
"Usage Error: " << e.what() << std::endl;
const std::string kDataDirectoryName
void run_warmup_queries(std::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
int idle_session_duration
unsigned renderer_vulkan_timeout_ms
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
std::shared_ptr< DBHandler > g_db_handler
std::string udf_file_name
bool renderer_enable_slab_allocation
shared utility for the db server and string dictionary server to remove old files ...
void start_server(std::shared_ptr< TThreadedServer > server, const int port)
void checkDropRenderGroupColumnsMigration() const
std::shared_ptr< TThreadedServer > g_thrift_http_binary_server
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
bool render_compositor_use_last_gpu
bool renderer_prefer_igpu
bool renderer_use_parallel_executors
std::atomic< int > g_saw_signal
size_t max_concurrent_render_sessions
static SysCatalog & instance()
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
tbb::task_arena g_tbb_arena
size_t num_reader_threads
std::shared_ptr< apache::thrift::TConfiguration > default_tconfig()
std::vector< LeafHostInfo > db_leaves
int startHeavyDBServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
bool g_enable_http_binary_server
bool enable_auto_clear_render_mem
std::shared_ptr< TThreadedServer > g_thrift_http_server
int render_oom_retry_threshold
std::string db_query_file
AuthMetadata authMetadata
static void resolveIncompleteAlterColumnCommandsForAllCatalogs()
static void start(std::atomic< bool > &is_program_running)
void validate_base_path()
std::vector< std::string > udf_compiler_options
bool g_enable_foreign_table_scheduled_refresh
bool ssl_transport_client_auth
void register_signal_handler(int signum, void(*handler)(int))
std::shared_ptr< DBHandler > g_warmup_handler
std::atomic< bool > g_running
File_Namespace::DiskCacheConfig disk_cache_config
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
bool g_enable_thrift_logs
void heavydb_signal_handler(int signum)
void register_signal_handlers()
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file) noexcept
bool enable_legacy_syntax
std::string master_address
std::shared_ptr< TThreadedServer > g_thrift_tcp_server
SystemParameters system_parameters
std::string ssl_cert_file