20 #ifdef HAVE_THRIFT_THREADFACTORY 21 #include <thrift/concurrency/ThreadFactory.h> 23 #include <thrift/concurrency/PlatformThreadFactory.h> 26 #include <thrift/concurrency/ThreadManager.h> 27 #include <thrift/protocol/TBinaryProtocol.h> 28 #include <thrift/protocol/TJSONProtocol.h> 29 #include <thrift/server/TThreadedServer.h> 30 #include <thrift/transport/TBufferTransports.h> 31 #include <thrift/transport/THttpServer.h> 32 #include <thrift/transport/TSSLServerSocket.h> 33 #include <thrift/transport/TSSLSocket.h> 34 #include <thrift/transport/TServerSocket.h> 43 #include <boost/algorithm/string.hpp> 44 #include <boost/algorithm/string/trim.hpp> 45 #include <boost/filesystem.hpp> 46 #include <boost/locale/generator.hpp> 47 #include <boost/make_shared.hpp> 48 #include <boost/program_options.hpp> 60 #include "MapDRelease.h" 68 using namespace ::apache::thrift::concurrency;
70 using namespace ::apache::thrift::server;
96 memset(&act, 0,
sizeof(act));
97 if (handler != SIG_DFL && handler != SIG_IGN) {
99 sigfillset(&act.sa_mask);
101 act.sa_handler = handler;
102 sigaction(signum, &act, NULL);
114 int expected_signal{-1};
115 if (!
g_saw_signal.compare_exchange_strong(expected_signal, signum)) {
130 if (signum == SIGQUIT || signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE) {
140 kill(getpid(), signum);
167 throw std::runtime_error(std::string(
"Thrift server exited: ") +
168 std::strerror(errno));
170 }
catch (std::exception& e) {
171 LOG(
ERROR) <<
"Exception: " << e.what() <<
": port " << port << std::endl;
183 std::string base_path,
184 std::string query_file_path) {
186 if (query_file_path.empty()) {
189 LOG(
INFO) <<
"Running DB warmup with queries from " << query_file_path;
193 std::string user_keyword, user_name, db_name;
194 std::ifstream query_file;
200 query_file.open(query_file_path);
201 while (std::getline(query_file, db_info)) {
202 if (db_info.length() == 0) {
205 std::istringstream iss(db_info);
206 iss >> user_keyword >> user_name >> db_name;
207 if (user_keyword.compare(0, 4,
"USER") == 0) {
216 std::string single_query;
217 while (std::getline(query_file, single_query)) {
218 boost::algorithm::trim(single_query);
219 if (single_query.length() == 0 || single_query[0] ==
'-') {
222 if (single_query[0] ==
'}') {
223 single_query.clear();
226 if (single_query.find(
';') == single_query.npos) {
227 std::string multiline_query;
228 std::getline(query_file, multiline_query,
';');
229 single_query += multiline_query;
233 g_warmup_handler->sql_execute(ret, sessionId, single_query,
true,
"", -1, -1);
235 LOG(
WARNING) <<
"Exception while executing '" << single_query
238 single_query.clear();
245 LOG(
WARNING) <<
"\nSyntax error in the file: " << query_file_path.c_str()
246 <<
" Missing expected keyword USER. Following line will be ignored: " 247 << db_info.c_str() << std::endl;
252 LOG(
WARNING) <<
"Exception while executing warmup queries. " 253 <<
"Warmup may not be fully completed. Will proceed nevertheless." 262 int result = pthread_sigmask(SIG_BLOCK, &
set, NULL);
264 throw std::runtime_error(
"heartbeat() thread startup failed");
268 VLOG(1) <<
"heartbeat thread starting";
270 using namespace std::chrono;
271 std::this_thread::sleep_for(1s);
273 VLOG(1) <<
"heartbeat thread exiting";
277 if (signum >= 1 && signum != SIGTERM) {
278 LOG(
INFO) <<
"Interrupt signal (" << signum <<
") received.";
282 if (signum == SIGQUIT || signum == SIGABRT || signum == SIGSEGV || signum == SIGFPE) {
322 const unsigned int wait_interval =
327 prog_config_opts.
base_path +
"/mapd_data");
331 apache::thrift::GlobalOutput.setOutputFunction([](
const char* msg) {});
337 boost::locale::generator generator;
338 std::locale::global(generator.generate(
""));
343 mapd::make_shared<DBHandler>(prog_config_opts.
db_leaves,
373 prog_config_opts.libgeos_so_filename,
376 }
catch (
const std::exception& e) {
377 LOG(
FATAL) <<
"Failed to initialize service handler: " << e.what();
384 mapd::shared_ptr<TServerSocket> serverSocket;
385 mapd::shared_ptr<TServerSocket> httpServerSocket;
388 mapd::shared_ptr<TSSLSocketFactory> sslSocketFactory;
390 mapd::shared_ptr<TSSLSocketFactory>(
new TSSLSocketFactory(SSLProtocol::SSLTLS));
391 sslSocketFactory->loadCertificate(
393 sslSocketFactory->loadPrivateKey(
396 sslSocketFactory->authenticate(
true);
398 sslSocketFactory->authenticate(
false);
400 sslSocketFactory->ciphers(
"ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
401 serverSocket = mapd::shared_ptr<TServerSocket>(
new TSSLServerSocket(
403 httpServerSocket = mapd::shared_ptr<TServerSocket>(
404 new TSSLServerSocket(prog_config_opts.
http_port, sslSocketFactory));
405 LOG(
INFO) <<
" OmniSci server using encrypted connection. Cert file [" 409 LOG(
INFO) <<
" OmniSci server using unencrypted connection";
410 serverSocket = mapd::shared_ptr<TServerSocket>(
413 mapd::shared_ptr<TServerSocket>(
new TServerSocket(prog_config_opts.
http_port));
422 mapd::shared_ptr<TProcessor> processor(
424 mapd::shared_ptr<TTransportFactory> bufTransportFactory(
425 new TBufferedTransportFactory());
426 mapd::shared_ptr<TProtocolFactory> bufProtocolFactory(
new TBinaryProtocolFactory());
428 mapd::shared_ptr<TServerTransport> bufServerTransport(serverSocket);
429 TThreadedServer bufServer(
430 processor, bufServerTransport, bufTransportFactory, bufProtocolFactory);
441 auto warmup_queries = [&prog_config_opts]() {
450 mapd::shared_ptr<TServerTransport> httpServerTransport(httpServerSocket);
451 mapd::shared_ptr<TTransportFactory> httpTransportFactory(
452 new THttpServerTransportFactory());
453 mapd::shared_ptr<TProtocolFactory> httpProtocolFactory(
new TJSONProtocolFactory());
454 TThreadedServer httpServer(
455 processor, httpServerTransport, httpTransportFactory, httpProtocolFactory);
456 if (start_http_server) {
461 std::thread httpThread(
473 LOG(
FATAL) <<
"No High Availability module available, please contact OmniSci support";
477 file_delete_thread.join();
478 heartbeat_thread.join();
486 if (signum <= 0 || signum == SIGTERM) {
493 int main(
int argc,
char** argv) {
494 bool has_clust_topo =
false;
499 if (
auto return_code =
504 if (!has_clust_topo) {
509 }
catch (std::runtime_error& e) {
510 std::cerr <<
"Can't start: " << e.what() << std::endl;
512 }
catch (boost::program_options::error& e) {
513 std::cerr <<
"Usage Error: " << e.what() << std::endl;
mapd_shared_mutex g_thrift_mutex
TThreadedServer * g_thrift_buf_server
int idle_session_duration
mapd::shared_ptr< DBHandler > g_warmup_handler
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
std::once_flag g_shutdown_once_flag
DiskCacheConfig disk_cache_config
std::string udf_file_name
shared utility for the db server and string dictionary server to remove old files ...
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
bool render_compositor_use_last_gpu
bool renderer_use_vulkan_driver
void run_warmup_queries(mapd::shared_ptr< DBHandler > handler, std::string base_path, std::string query_file_path)
TThreadedServer * g_thrift_http_server
int main(int argc, char **argv)
void releaseWarmupSession(TSessionId &sessionId, std::ifstream &query_file)
size_t max_concurrent_render_sessions
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
size_t num_reader_threads
std::vector< LeafHostInfo > db_leaves
std::shared_timed_mutex mapd_shared_mutex
bool enable_auto_clear_render_mem
int render_oom_retry_threshold
mapd::shared_ptr< DBHandler > g_mapd_handler
void start_server(TThreadedServer &server, const int port)
std::string db_query_file
AuthMetadata authMetadata
std::atomic< int > g_saw_signal
static void start(std::atomic< bool > &is_program_running)
void validate_base_path()
void register_signal_handlers()
bool g_enable_experimental_string_functions
std::atomic< bool > g_running
std::vector< std::string > udf_compiler_options
bool ssl_transport_client_auth
void register_signal_handler(int signum, void(*handler)(int))
mapd_shared_lock< mapd_shared_mutex > read_lock
mapd_unique_lock< mapd_shared_mutex > write_lock
bool g_enable_thrift_logs
void file_delete(std::atomic< bool > &program_is_running, const unsigned int wait_interval_seconds, const std::string base_path)
void omnisci_signal_handler(int signum)
int startMapdServer(CommandLineOptions &prog_config_opts, bool start_http_server=true)
bool enable_legacy_syntax
SystemParameters system_parameters
std::string ssl_cert_file