OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Calcite Class Reference

#include <Calcite.h>

+ Collaboration diagram for Calcite:

Public Member Functions

 Calcite (const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const size_t service_timeout, const bool service_keepalive, const std::string &udf_filename="")
 
 Calcite (const SystemParameters &db_parameters, const std::string &data_dir, const std::string &udf_filename="")
 
 Calcite ()
 
TPlanResult process (query_state::QueryStateProxy, std::string sql_string, const TQueryParsingOption &query_parsing_option, const TOptimizationOption &optimization_option, const std::string &calcite_session_id="")
 
void checkAccessedObjectsPrivileges (query_state::QueryStateProxy query_state_prox, TPlanResult plan) const
 
std::vector< TCompletionHint > getCompletionHints (const Catalog_Namespace::SessionInfo &session_info, const std::vector< std::string > &visible_tables, const std::string sql_string, const int cursor)
 
std::string getExtensionFunctionWhitelist ()
 
std::string getUserDefinedFunctionWhitelist ()
 
virtual void updateMetadata (std::string catalog, std::string table)
 
void close_calcite_server (bool log=true)
 
virtual ~Calcite ()
 
std::string getRuntimeExtensionFunctionWhitelist ()
 
void setRuntimeExtensionFunctions (const std::vector< TUserDefinedFunction > &udfs, const std::vector< TUserDefinedTableFunction > &udtfs, bool isruntime=true)
 
TQueryParsingOption getCalciteQueryParsingOption (bool legacy_syntax, bool is_explain, bool check_privileges, bool is_explain_detail)
 
TOptimizationOption getCalciteOptimizationOption (bool is_view_optimize, bool enable_watchdog, const std::vector< TFilterPushDownInfo > &filter_push_down_info, bool distributed_mode)
 

Static Public Member Functions

static std::string const getInternalSessionProxyUserName ()
 
static std::string const getInternalSessionProxyPassword ()
 

Private Member Functions

void init (const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
 
void runServer (const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
 
TPlanResult processImpl (query_state::QueryStateProxy, std::string sql_string, const TQueryParsingOption &query_parsing_option, const TOptimizationOption &optimization_option, const std::string &calcite_session_id)
 
std::vector< std::string > get_db_objects (const std::string ra)
 
void inner_close_calcite_server (bool log)
 
std::pair< std::shared_ptr
< CalciteServerClient >
, std::shared_ptr< TTransport > > 
getClient (int port)
 
int ping (int retry_num=0, int max_retry=50)
 

Private Attributes

std::shared_ptr
< ThriftClientConnection
connMgr_
 
bool server_available_
 
size_t service_timeout_
 
bool service_keepalive_ = true
 
int remote_calcite_port_ = -1
 
std::string ssl_trust_store_
 
std::string ssl_trust_password_
 
std::string ssl_key_file_
 
std::string ssl_keystore_
 
std::string ssl_keystore_password_
 
std::string ssl_ca_file_
 
std::string db_config_file_
 
std::once_flag shutdown_once_flag_
 

Detailed Description

Definition at line 62 of file Calcite.h.

Constructor & Destructor Documentation

Calcite::Calcite ( const int  db_port,
const int  port,
const std::string &  data_dir,
const size_t  calcite_max_mem,
const size_t  service_timeout,
const bool  service_keepalive,
const std::string &  udf_filename = "" 
)

Definition at line 355 of file Calcite.cpp.

References init().

362  : server_available_(false)
363  , service_timeout_(service_timeout)
364  , service_keepalive_(service_keepalive) {
365  init(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
366 }
void init(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
Definition: Calcite.cpp:368
bool service_keepalive_
Definition: Calcite.h:137
size_t service_timeout_
Definition: Calcite.h:136
bool server_available_
Definition: Calcite.h:135

+ Here is the call graph for this function:

Calcite::Calcite ( const SystemParameters db_parameters,
const std::string &  data_dir,
const std::string &  udf_filename = "" 
)

Definition at line 390 of file Calcite.cpp.

References SystemParameters::calcite_max_mem, SystemParameters::calcite_port, init(), and SystemParameters::omnisci_server_port.

393  : service_timeout_(system_parameters.calcite_timeout)
394  , service_keepalive_(system_parameters.calcite_keepalive)
395  , ssl_trust_store_(system_parameters.ssl_trust_store)
396  , ssl_trust_password_(system_parameters.ssl_trust_password)
397  , ssl_key_file_(system_parameters.ssl_key_file)
398  , ssl_keystore_(system_parameters.ssl_keystore)
399  , ssl_keystore_password_(system_parameters.ssl_keystore_password)
400  , ssl_ca_file_(system_parameters.ssl_trust_ca_file)
401  , db_config_file_(system_parameters.config_file) {
402  init(system_parameters.omnisci_server_port,
403  system_parameters.calcite_port,
404  data_dir,
405  system_parameters.calcite_max_mem,
406  udf_filename);
407 }
std::string db_config_file_
Definition: Calcite.h:145
std::string ssl_key_file_
Definition: Calcite.h:141
void init(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
Definition: Calcite.cpp:368
std::string ssl_keystore_
Definition: Calcite.h:142
std::string ssl_keystore_password_
Definition: Calcite.h:143
std::string ssl_trust_store_
Definition: Calcite.h:139
bool service_keepalive_
Definition: Calcite.h:137
std::string ssl_ca_file_
Definition: Calcite.h:144
size_t service_timeout_
Definition: Calcite.h:136
std::string ssl_trust_password_
Definition: Calcite.h:140

+ Here is the call graph for this function:

Calcite::Calcite ( )
inline

Definition at line 74 of file Calcite.h.

74 {}
Calcite::~Calcite ( )
virtual

Definition at line 686 of file Calcite.cpp.

References close_calcite_server().

686  {
687  close_calcite_server(false);
688 }
void close_calcite_server(bool log=true)
Definition: Calcite.cpp:662

+ Here is the call graph for this function:

Member Function Documentation

void Calcite::checkAccessedObjectsPrivileges ( query_state::QueryStateProxy  query_state_prox,
TPlanResult  plan 
) const

Definition at line 496 of file Calcite.cpp.

References anonymous_namespace{Calcite.cpp}::checkPermissionForTables(), AccessPrivileges::DELETE_FROM_TABLE, query_state::QueryState::getConstSessionInfo(), AccessPrivileges::INSERT_INTO_TABLE, AccessPrivileges::SELECT_FROM_TABLE, AccessPrivileges::SELECT_FROM_VIEW, and AccessPrivileges::UPDATE_IN_TABLE.

Referenced by process().

498  {
499  AccessPrivileges NOOP;
500  // check the individual tables
501  auto const session_ptr = query_state_proxy->getConstSessionInfo();
502  // TODO: Replace resolved tables vector with a `FullyQualifiedTableName` struct.
503  checkPermissionForTables(*session_ptr,
504  plan.primary_accessed_objects.tables_selected_from,
507  checkPermissionForTables(*session_ptr,
508  plan.primary_accessed_objects.tables_inserted_into,
510  NOOP);
511  checkPermissionForTables(*session_ptr,
512  plan.primary_accessed_objects.tables_updated_in,
514  NOOP);
515  checkPermissionForTables(*session_ptr,
516  plan.primary_accessed_objects.tables_deleted_from,
518  NOOP);
519 }
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
static const AccessPrivileges SELECT_FROM_TABLE
Definition: DBObject.h:160
static const AccessPrivileges DELETE_FROM_TABLE
Definition: DBObject.h:163
void checkPermissionForTables(const Catalog_Namespace::SessionInfo &session_info, std::vector< std::vector< std::string >> tableOrViewNames, AccessPrivileges tablePrivs, AccessPrivileges viewPrivs)
Definition: Calcite.cpp:437
static const AccessPrivileges SELECT_FROM_VIEW
Definition: DBObject.h:180
static const AccessPrivileges UPDATE_IN_TABLE
Definition: DBObject.h:162

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Calcite::close_calcite_server ( bool  log = true)

Definition at line 662 of file Calcite.cpp.

References inner_close_calcite_server(), and shutdown_once_flag_.

Referenced by ~Calcite().

662  {
663  std::call_once(shutdown_once_flag_,
664  [this, log]() { this->inner_close_calcite_server(log); });
665 }
std::once_flag shutdown_once_flag_
Definition: Calcite.h:146
void inner_close_calcite_server(bool log)
Definition: Calcite.cpp:667

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

std::vector< std::string > Calcite::get_db_objects ( const std::string  ra)
private

Definition at line 537 of file Calcite.cpp.

References CHECK.

537  {
538  std::vector<std::string> v_db_obj;
539  Document document;
540  document.Parse(ra.c_str());
541  const Value& rels = document["rels"];
542  CHECK(rels.IsArray());
543  for (auto& v : rels.GetArray()) {
544  std::string relOp(v["relOp"].GetString());
545  if (!relOp.compare("EnumerableTableScan")) {
546  std::string x;
547  auto t = v["table"].GetArray();
548  x = t[1].GetString();
549  v_db_obj.push_back(x);
550  }
551  }
552 
553  return v_db_obj;
554 }
#define CHECK(condition)
Definition: Logger.h:291
TOptimizationOption Calcite::getCalciteOptimizationOption ( bool  is_view_optimize,
bool  enable_watchdog,
const std::vector< TFilterPushDownInfo > &  filter_push_down_info,
bool  distributed_mode 
)

Definition at line 734 of file Calcite.cpp.

738  {
739  TOptimizationOption optimization_option;
740  optimization_option.filter_push_down_info = filter_push_down_info;
741  optimization_option.is_view_optimize = is_view_optimize;
742  optimization_option.enable_watchdog = enable_watchdog;
743  optimization_option.distributed_mode = distributed_mode;
744  return optimization_option;
745 }
TQueryParsingOption Calcite::getCalciteQueryParsingOption ( bool  legacy_syntax,
bool  is_explain,
bool  check_privileges,
bool  is_explain_detail 
)

Definition at line 720 of file Calcite.cpp.

References CHECK_LE.

723  {
724  TQueryParsingOption query_parsing_info;
725  query_parsing_info.legacy_syntax = legacy_syntax;
726  query_parsing_info.is_explain = is_explain;
727  query_parsing_info.check_privileges = check_privileges;
728  query_parsing_info.is_explain_detail = is_explain_detail;
729  // `EXPLAIN CALCITE DETAIL` requires `is_explain` set to TRUE
730  CHECK_LE(is_explain_detail, is_explain);
731  return query_parsing_info;
732 }
#define CHECK_LE(x, y)
Definition: Logger.h:304
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > Calcite::getClient ( int  port)
private

Definition at line 252 of file Calcite.cpp.

Referenced by getCompletionHints(), getExtensionFunctionWhitelist(), getRuntimeExtensionFunctionWhitelist(), getUserDefinedFunctionWhitelist(), inner_close_calcite_server(), processImpl(), setRuntimeExtensionFunctions(), and updateMetadata().

252  {
253  const auto transport = connMgr_->open_buffered_client_transport("localhost",
254  port,
255  ssl_ca_file_,
256  true,
261  try {
262  transport->open();
263 
264  } catch (TException& tx) {
265  throw tx;
266  } catch (std::exception& ex) {
267  throw ex;
268  }
269  std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
270  std::shared_ptr<CalciteServerClient> client;
271  client.reset(new CalciteServerClient(protocol));
272  std::pair<std::shared_ptr<CalciteServerClient>, std::shared_ptr<TTransport>> ret;
273  return std::make_pair(client, transport);
274 }
std::shared_ptr< ThriftClientConnection > connMgr_
Definition: Calcite.h:134
bool service_keepalive_
Definition: Calcite.h:137
std::string ssl_ca_file_
Definition: Calcite.h:144
size_t service_timeout_
Definition: Calcite.h:136

+ Here is the caller graph for this function:

std::vector< TCompletionHint > Calcite::getCompletionHints ( const Catalog_Namespace::SessionInfo session_info,
const std::vector< std::string > &  visible_tables,
const std::string  sql_string,
const int  cursor 
)

Definition at line 521 of file Calcite.cpp.

References cat(), Catalog_Namespace::SessionInfo::get_currentUser(), Catalog_Namespace::SessionInfo::get_session_id(), Catalog_Namespace::SessionInfo::getCatalog(), getClient(), remote_calcite_port_, and Catalog_Namespace::UserMetadata::userName.

525  {
526  std::vector<TCompletionHint> hints;
527  auto& cat = session_info.getCatalog();
528  const auto user = session_info.get_currentUser().userName;
529  const auto session = session_info.get_session_id();
530  const auto catalog = cat.getCurrentDB().dbName;
531  auto client = getClient(remote_calcite_port_);
532  client.first->getCompletionHints(
533  hints, user, session, catalog, visible_tables, sql_string, cursor);
534  return hints;
535 }
std::string cat(Ts &&...args)
std::string get_session_id() const
Definition: SessionInfo.h:93
Catalog & getCatalog() const
Definition: SessionInfo.h:75
int remote_calcite_port_
Definition: Calcite.h:138
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88

+ Here is the call graph for this function:

std::string Calcite::getExtensionFunctionWhitelist ( )

Definition at line 626 of file Calcite.cpp.

References CHECK, logger::FATAL, getClient(), LOG, remote_calcite_port_, server_available_, and VLOG.

626  {
627  if (server_available_) {
628  TPlanResult ret;
629  std::string whitelist;
630 
631  auto clientP = getClient(remote_calcite_port_);
632  clientP.first->getExtensionFunctionWhitelist(whitelist);
633  clientP.second->close();
634  VLOG(1) << whitelist;
635  return whitelist;
636  } else {
637  LOG(FATAL) << "Not routing to Calcite, server is not up";
638  return "";
639  }
640  CHECK(false);
641  return "";
642 }
#define LOG(tag)
Definition: Logger.h:285
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
#define CHECK(condition)
Definition: Logger.h:291
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

static std::string const Calcite::getInternalSessionProxyPassword ( )
inlinestatic

Definition at line 98 of file Calcite.h.

References anonymous_namespace{Calcite.h}::kCalciteUserPassword.

98  {
99  return kCalciteUserPassword;
100  }
constexpr char const * kCalciteUserPassword
Definition: Calcite.h:40
static std::string const Calcite::getInternalSessionProxyUserName ( )
inlinestatic

Definition at line 97 of file Calcite.h.

References anonymous_namespace{Calcite.h}::kCalciteUserName.

Referenced by processImpl().

97 { return kCalciteUserName; }
constexpr char const * kCalciteUserName
Definition: Calcite.h:39

+ Here is the caller graph for this function:

std::string Calcite::getRuntimeExtensionFunctionWhitelist ( )

Definition at line 690 of file Calcite.cpp.

References logger::FATAL, getClient(), LOG, remote_calcite_port_, server_available_, UNREACHABLE, and VLOG.

690  {
691  if (server_available_) {
692  TPlanResult ret;
693  std::string whitelist;
694  auto clientP = getClient(remote_calcite_port_);
695  clientP.first->getRuntimeExtensionFunctionWhitelist(whitelist);
696  clientP.second->close();
697  VLOG(1) << "Runtime extension functions whitelist loaded from Calcite: " << whitelist;
698  return whitelist;
699  } else {
700  LOG(FATAL) << "Not routing to Calcite, server is not up";
701  return "";
702  }
703  UNREACHABLE();
704  return "";
705 }
#define LOG(tag)
Definition: Logger.h:285
#define UNREACHABLE()
Definition: Logger.h:338
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

std::string Calcite::getUserDefinedFunctionWhitelist ( )

Definition at line 644 of file Calcite.cpp.

References logger::FATAL, getClient(), LOG, remote_calcite_port_, server_available_, UNREACHABLE, and VLOG.

644  {
645  if (server_available_) {
646  TPlanResult ret;
647  std::string whitelist;
648 
649  auto clientP = getClient(remote_calcite_port_);
650  clientP.first->getUserDefinedFunctionWhitelist(whitelist);
651  clientP.second->close();
652  VLOG(1) << "User defined functions whitelist loaded from Calcite: " << whitelist;
653  return whitelist;
654  } else {
655  LOG(FATAL) << "Not routing to Calcite, server is not up";
656  return "";
657  }
658  UNREACHABLE();
659  return "";
660 }
#define LOG(tag)
Definition: Logger.h:285
#define UNREACHABLE()
Definition: Logger.h:338
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252
#define VLOG(n)
Definition: Logger.h:388

+ Here is the call graph for this function:

void Calcite::init ( const int  db_port,
const int  port,
const std::string &  data_dir,
const size_t  calcite_max_mem,
const std::string &  udf_filename 
)
private

Definition at line 368 of file Calcite.cpp.

References CHECK, connMgr_, logger::INFO, LOG, remote_calcite_port_, runServer(), and server_available_.

Referenced by Calcite().

372  {
373  LOG(INFO) << "Creating Calcite Handler, Calcite Port is " << calcite_port
374  << " base data dir is " << data_dir;
375  connMgr_ = std::make_shared<ThriftClientConnection>();
376  if (calcite_port < 0) {
377  CHECK(false) << "JNI mode no longer supported.";
378  }
379  if (calcite_port == 0) {
380  // dummy process for initheavy
381  remote_calcite_port_ = calcite_port;
382  server_available_ = false;
383  } else {
384  remote_calcite_port_ = calcite_port;
385  runServer(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
386  server_available_ = true;
387  }
388 }
#define LOG(tag)
Definition: Logger.h:285
std::shared_ptr< ThriftClientConnection > connMgr_
Definition: Calcite.h:134
void runServer(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
Definition: Calcite.cpp:276
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
#define CHECK(condition)
Definition: Logger.h:291

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Calcite::inner_close_calcite_server ( bool  log)
private

Definition at line 667 of file Calcite.cpp.

References getClient(), logger::INFO, LOG_IF, remote_calcite_port_, and server_available_.

Referenced by close_calcite_server().

667  {
668  if (server_available_) {
669  LOG_IF(INFO, log) << "Shutting down Calcite server";
670  try {
671  auto clientP = getClient(remote_calcite_port_);
672  clientP.first->shutdown();
673  clientP.second->close();
674  } catch (const std::exception& e) {
675  if (std::string(e.what()) != "connect() failed: Connection refused" &&
676  std::string(e.what()) != "socket open() error: Connection refused" &&
677  std::string(e.what()) != "No more data to read.") {
678  std::cerr << "Error shutting down Calcite server: " << e.what() << std::endl;
679  } // else Calcite already shut down
680  }
681  LOG_IF(INFO, log) << "shut down Calcite";
682  server_available_ = false;
683  }
684 }
#define LOG_IF(severity, condition)
Definition: Logger.h:384
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

int Calcite::ping ( int  retry_num = 0,
int  max_retry = 50 
)
private

Definition at line 338 of file Calcite.cpp.

References logger::ERROR, measure< TimeT >::execution(), and LOG.

338  {
339  try {
340  auto ms = measure<>::execution([&]() {
341  auto clientP = getClient(remote_calcite_port_);
342  clientP.first->ping();
343  clientP.second->close();
344  });
345  return ms;
346 
347  } catch (TException& tx) {
348  if (retry_num >= max_retry) {
349  LOG(ERROR) << "Problems connecting to Calcite. Thrift error - " << tx.what();
350  }
351  return -1;
352  }
353 }
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:285
int remote_calcite_port_
Definition: Calcite.h:138
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252

+ Here is the call graph for this function:

TPlanResult Calcite::process ( query_state::QueryStateProxy  query_state_proxy,
std::string  sql_string,
const TQueryParsingOption &  query_parsing_option,
const TOptimizationOption &  optimization_option,
const std::string &  calcite_session_id = "" 
)

Definition at line 480 of file Calcite.cpp.

References checkAccessedObjectsPrivileges(), processImpl(), and run_benchmark_import::result.

484  {
485  TPlanResult result = processImpl(query_state_proxy,
486  std::move(sql_string),
487  query_parsing_option,
488  optimization_option,
489  calcite_session_id);
490  if (query_parsing_option.check_privileges && !query_parsing_option.is_explain) {
491  checkAccessedObjectsPrivileges(query_state_proxy, result);
492  }
493  return result;
494 }
TPlanResult processImpl(query_state::QueryStateProxy, std::string sql_string, const TQueryParsingOption &query_parsing_option, const TOptimizationOption &optimization_option, const std::string &calcite_session_id)
Definition: Calcite.cpp:556
void checkAccessedObjectsPrivileges(query_state::QueryStateProxy query_state_prox, TPlanResult plan) const
Definition: Calcite.cpp:496

+ Here is the call graph for this function:

TPlanResult Calcite::processImpl ( query_state::QueryStateProxy  query_state_proxy,
std::string  sql_string,
const TQueryParsingOption &  query_parsing_option,
const TOptimizationOption &  optimization_option,
const std::string &  calcite_session_id 
)
private

Definition at line 556 of file Calcite.cpp.

References cat(), query_state::QueryStateProxy::createTimer(), measure< TimeT >::execution(), logger::EXECUTOR, logger::FATAL, getClient(), query_state::QueryState::getConstSessionInfo(), getInternalSessionProxyUserName(), hide_sensitive_data_from_query(), logger::INFO, logger::IR, LOG, logger::PTX, remote_calcite_port_, server_available_, and logger::WARNING.

Referenced by process().

560  {
561  query_state::Timer timer = query_state_proxy.createTimer(__func__);
562  const auto& user_session_info = query_state_proxy->getConstSessionInfo();
563  const auto& cat = user_session_info->getCatalog();
564  const std::string user = getInternalSessionProxyUserName();
565  const std::string catalog = cat.getCurrentDB().dbName;
566  LOG(INFO) << "User " << user << " catalog " << catalog << " sql '"
567  << hide_sensitive_data_from_query(sql_string) << "'";
568  LOG(IR) << "SQL query\n"
569  << hide_sensitive_data_from_query(sql_string) << "\nEnd of SQL query";
570  LOG(PTX) << "SQL query\n"
571  << hide_sensitive_data_from_query(sql_string) << "\nEnd of SQL query";
572  LOG(EXECUTOR) << "SQL query\n"
573  << hide_sensitive_data_from_query(sql_string) << "\nEnd of SQL query";
574 
575  std::vector<TRestriction> trestrictions;
576 
577  TPlanResult ret;
578  if (server_available_) {
579  try {
580  // calcite_session_id would be an empty string when accessed by internal resources
581  // that would not access `process` through handler instance, like for eg: Unit
582  // Tests. In these cases we would use the session_id from query state.
583  auto ms = measure<>::execution([&]() {
584  auto clientP = getClient(remote_calcite_port_);
585  clientP.first->process(ret,
586  user,
587  calcite_session_id.empty()
588  ? user_session_info->get_session_id()
589  : calcite_session_id,
590  catalog,
591  sql_string,
592  query_parsing_option,
593  optimization_option,
594  trestrictions);
595  clientP.second->close();
596  });
597 
598  // LOG(INFO) << ret.plan_result;
599  LOG(INFO) << "Time in Thrift "
600  << (ms > ret.execution_time_ms ? ms - ret.execution_time_ms : 0)
601  << " (ms), Time in Java Calcite server " << ret.execution_time_ms
602  << " (ms)";
603  } catch (InvalidParseRequest& e) {
604  throw std::invalid_argument(e.whyUp);
605  } catch (const TTransportException& ex) {
606  if (ex.getType() == TTransportException::TIMED_OUT) {
607  LOG(WARNING) << "Calcite request timed out: " << ex.what();
608  } else {
609  LOG(FATAL) << "Error occurred trying to communicate with Calcite server, the "
610  "error was: '"
611  << ex.what() << "', heavydb restart will be required";
612  }
613  throw;
614  } catch (const std::exception& ex) {
615  LOG(FATAL)
616  << "Error occurred trying to communicate with Calcite server, the error was: '"
617  << ex.what() << "', heavydb restart will be required";
618  return ret; // satisfy return-type warning
619  }
620  } else {
621  LOG(FATAL) << "Not routing to Calcite, server is not up";
622  }
623  return ret;
624 }
std::string hide_sensitive_data_from_query(std::string const &query_str)
std::string cat(Ts &&...args)
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:285
static std::string const getInternalSessionProxyUserName()
Definition: Calcite.h:97
Timer createTimer(char const *event_name)
Definition: QueryState.cpp:129
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:84

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Calcite::runServer ( const int  db_port,
const int  port,
const std::string &  data_dir,
const size_t  calcite_max_mem,
const std::string &  udf_filename 
)
private

Definition at line 276 of file Calcite.cpp.

References logger::ERROR, logger::FATAL, logger::INFO, LOG, and start_calcite_server_as_daemon().

Referenced by init().

280  {
281  LOG(INFO) << "Running Calcite server as a daemon";
282 
283  // ping server to see if for any reason there is an orphaned one
284  int ping_time = ping();
285  if (ping_time > -1) {
286  // we have an orphaned server shut it down
287  LOG(ERROR)
288  << "Appears to be orphaned Calcite server already running, shutting it down";
289  LOG(ERROR) << "Please check that you are not trying to run two servers on same port";
290  LOG(ERROR) << "Attempting to shutdown orphaned Calcite server";
291  try {
292  auto clientP = getClient(remote_calcite_port_);
293  clientP.first->shutdown();
294  clientP.second->close();
295  LOG(ERROR) << "orphaned Calcite server shutdown";
296 
297  } catch (TException& tx) {
298  LOG(ERROR) << "Failed to shutdown orphaned Calcite server, reason: " << tx.what();
299  }
300  }
301 
302  // start the calcite server as a seperate process
304  port,
305  data_dir,
306  calcite_max_mem,
313  udf_filename);
314 
315  // check for new server for 30 seconds max
316  std::this_thread::sleep_for(std::chrono::milliseconds(200));
317  int retry_max = 300;
318  for (int i = 2; i <= retry_max; i++) {
319  int ping_time = ping(i, retry_max);
320  if (ping_time > -1) {
321  LOG(INFO) << "Calcite server start took " << i * 100 << " ms ";
322  LOG(INFO) << "ping took " << ping_time << " ms ";
323  server_available_ = true;
324  return;
325  } else {
326  // wait 100 ms
327  std::this_thread::sleep_for(std::chrono::milliseconds(100));
328  }
329  }
330  server_available_ = false;
331  LOG(FATAL) << "Could not connect to Calcite remote server running on port [" << port
332  << "]";
333 }
std::string db_config_file_
Definition: Calcite.h:145
std::string ssl_key_file_
Definition: Calcite.h:141
#define LOG(tag)
Definition: Logger.h:285
std::string ssl_keystore_
Definition: Calcite.h:142
std::string ssl_keystore_password_
Definition: Calcite.h:143
std::string ssl_trust_store_
Definition: Calcite.h:139
static void start_calcite_server_as_daemon(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &ssl_trust_store, const std::string &ssl_trust_password_X, const std::string &ssl_keystore, const std::string &ssl_keystore_password_X, const std::string &ssl_key_file, const std::string &db_config_file, const std::string &udf_filename)
Definition: Calcite.cpp:71
int ping(int retry_num=0, int max_retry=50)
Definition: Calcite.cpp:338
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252
std::string ssl_trust_password_
Definition: Calcite.h:140

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void Calcite::setRuntimeExtensionFunctions ( const std::vector< TUserDefinedFunction > &  udfs,
const std::vector< TUserDefinedTableFunction > &  udtfs,
bool  isruntime = true 
)

Definition at line 707 of file Calcite.cpp.

References logger::FATAL, getClient(), LOG, remote_calcite_port_, and server_available_.

710  {
711  if (server_available_) {
712  auto clientP = getClient(remote_calcite_port_);
713  clientP.first->setRuntimeExtensionFunctions(udfs, udtfs, isruntime);
714  clientP.second->close();
715  } else {
716  LOG(FATAL) << "Not routing to Calcite, server is not up";
717  }
718 }
#define LOG(tag)
Definition: Logger.h:285
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252

+ Here is the call graph for this function:

void Calcite::updateMetadata ( std::string  catalog,
std::string  table 
)
virtual

Definition at line 409 of file Calcite.cpp.

References measure< TimeT >::execution(), getClient(), logger::INFO, LOG, remote_calcite_port_, and server_available_.

409  {
410  if (server_available_) {
411  auto ms = measure<>::execution([&]() {
412  auto clientP = getClient(remote_calcite_port_);
413  clientP.first->updateMetadata(catalog, table);
414  clientP.second->close();
415  });
416  LOG(INFO) << "Time to updateMetadata " << ms << " (ms)";
417  } else {
418  LOG(INFO) << "Not routing to Calcite, server is not up";
419  }
420 }
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
#define LOG(tag)
Definition: Logger.h:285
int remote_calcite_port_
Definition: Calcite.h:138
bool server_available_
Definition: Calcite.h:135
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252

+ Here is the call graph for this function:

Member Data Documentation

std::shared_ptr<ThriftClientConnection> Calcite::connMgr_
private

Definition at line 134 of file Calcite.h.

Referenced by init().

std::string Calcite::db_config_file_
private

Definition at line 145 of file Calcite.h.

bool Calcite::service_keepalive_ = true
private

Definition at line 137 of file Calcite.h.

size_t Calcite::service_timeout_
private

Definition at line 136 of file Calcite.h.

std::once_flag Calcite::shutdown_once_flag_
private

Definition at line 146 of file Calcite.h.

Referenced by close_calcite_server().

std::string Calcite::ssl_ca_file_
private

Definition at line 144 of file Calcite.h.

std::string Calcite::ssl_key_file_
private

Definition at line 141 of file Calcite.h.

std::string Calcite::ssl_keystore_
private

Definition at line 142 of file Calcite.h.

std::string Calcite::ssl_keystore_password_
private

Definition at line 143 of file Calcite.h.

std::string Calcite::ssl_trust_password_
private

Definition at line 140 of file Calcite.h.

std::string Calcite::ssl_trust_store_
private

Definition at line 139 of file Calcite.h.


The documentation for this class was generated from the following files: