OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Calcite.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "Calcite.h"
24 #include "Catalog/Catalog.h"
25 #include "Logger/Logger.h"
27 #include "Shared/SysDefinitions.h"
29 #include "Shared/ThriftClient.h"
30 #include "Shared/fixautotools.h"
31 #include "Shared/measure.h"
33 
34 #include <thrift/protocol/TBinaryProtocol.h>
35 #include <thrift/transport/TSocket.h>
36 #include <thrift/transport/TTransportUtils.h>
37 #include <type_traits>
38 
39 #ifdef _MSC_VER
40 #include <process.h>
41 #endif
42 
43 #include "gen-cpp/CalciteServer.h"
44 
45 #include "rapidjson/document.h"
46 
47 #include <utility>
48 
49 using namespace rapidjson;
50 using namespace apache::thrift;
51 using namespace apache::thrift::protocol;
52 using namespace apache::thrift::transport;
53 
54 namespace {
55 template <typename XDEBUG_OPTION,
56  typename REMOTE_DEBUG_OPTION,
57  typename... REMAINING_ARGS>
58 int wrapped_execlp(char const* path,
59  XDEBUG_OPTION&& x_debug,
60  REMOTE_DEBUG_OPTION&& remote_debug,
61  REMAINING_ARGS&&... standard_args) {
62 #ifdef ENABLE_JAVA_REMOTE_DEBUG
63  return execlp(
64  path, x_debug, remote_debug, std::forward<REMAINING_ARGS>(standard_args)...);
65 #else
66  return execlp(path, std::forward<REMAINING_ARGS>(standard_args)...);
67 #endif
68 }
69 } // namespace
70 
71 static void start_calcite_server_as_daemon(const int db_port,
72  const int port,
73  const std::string& data_dir,
74  const size_t calcite_max_mem,
75  const std::string& ssl_trust_store,
76  const std::string& ssl_trust_password_X,
77  const std::string& ssl_keystore,
78  const std::string& ssl_keystore_password_X,
79  const std::string& ssl_key_file,
80  const std::string& db_config_file,
81  const std::string& udf_filename) {
82  auto root_abs_path = heavyai::get_root_abs_path();
83  std::string const xDebug = "-Xdebug";
84  std::string const remoteDebug =
85  "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005";
86  std::string xmxP = "-Xmx" + std::to_string(calcite_max_mem) + "m";
87  std::string jarP = "-jar";
88  std::string jarD =
89  root_abs_path + "/bin/calcite-1.0-SNAPSHOT-jar-with-dependencies.jar";
90  std::string extensionsP = "-e";
91  std::string extensionsD = root_abs_path + "/QueryEngine/";
92  std::string dataP = "-d";
93  std::string dataD = data_dir;
94  std::string localPortP = "-p";
95  std::string localPortD = std::to_string(port);
96  std::string dbPortP = "-m";
97  std::string dbPortD = std::to_string(db_port);
98  std::string TrustStoreP = "-T";
99  std::string TrustPasswdP = "-P";
100  std::string ConfigFileP = "-c";
101  std::string KeyStoreP = "-Y";
102  std::string KeyStorePasswdP = "-Z";
103  // FIXME: this path should be getting pulled from logger rather than hardcoded
104  std::string logDirectory =
105  "-DLOG_DIR=" + data_dir + "/" + shared::kDefaultLogDirName + "/";
106  std::string userDefinedFunctionsP = "";
107  std::string userDefinedFunctionsD = "";
108 
109  if (!udf_filename.empty()) {
110  userDefinedFunctionsP += "-u";
111  userDefinedFunctionsD += udf_filename;
112  }
113 
114  // If a config file hasn't been supplied then put the password in the params
115  // otherwise send an empty string and Calcite should get it from the config file.
116  std::string key_store_password = (db_config_file == "") ? ssl_keystore_password_X : "";
117  std::string trust_store_password = (db_config_file == "") ? ssl_trust_password_X : "";
118 #ifdef _MSC_VER
119  // TODO: enable UDF support
120  std::vector<std::string> args_vec;
121  args_vec.push_back("java");
122  args_vec.push_back(xDebug);
123  args_vec.push_back(remoteDebug);
124  args_vec.push_back(xmxP);
125  args_vec.push_back(logDirectory);
126  args_vec.push_back(jarP);
127  args_vec.push_back(jarD);
128  args_vec.push_back(extensionsP);
129  args_vec.push_back(extensionsD);
130  args_vec.push_back(dataP);
131  args_vec.push_back(dataD);
132  args_vec.push_back(localPortP);
133  args_vec.push_back(localPortD);
134  args_vec.push_back(dbPortP);
135  args_vec.push_back(dbPortD);
136  if (!ssl_trust_store.empty()) {
137  args_vec.push_back(TrustStoreP);
138  args_vec.push_back(ssl_trust_store);
139  }
140  if (!trust_store_password.empty()) {
141  args_vec.push_back(TrustPasswdP);
142  args_vec.push_back(trust_store_password);
143  }
144  if (!ssl_keystore.empty()) {
145  args_vec.push_back(KeyStoreP);
146  args_vec.push_back(ssl_keystore);
147  }
148  if (!key_store_password.empty()) {
149  args_vec.push_back(KeyStorePasswdP);
150  args_vec.push_back(key_store_password);
151  }
152  if (!db_config_file.empty()) {
153  args_vec.push_back(ConfigFileP);
154  args_vec.push_back(db_config_file);
155  }
156  std::string args{boost::algorithm::join(args_vec, " ")};
157  STARTUPINFO startup_info;
158  PROCESS_INFORMATION proc_info;
159  ZeroMemory(&startup_info, sizeof(startup_info));
160  startup_info.cb = sizeof(startup_info);
161  ZeroMemory(&proc_info, sizeof(proc_info));
162  LOG(INFO) << "Startup command: " << args;
163  std::wstring wargs = std::wstring(args.begin(), args.end());
164  const auto ret = CreateProcess(NULL,
165  (LPWSTR)wargs.c_str(),
166  NULL,
167  NULL,
168  false,
169  0,
170  NULL,
171  NULL,
172  &startup_info,
173  &proc_info);
174  if (ret == 0) {
175  LOG(FATAL) << "Failed to start Calcite server " << GetLastError();
176  }
177 #else
178  int pid = fork();
179  if (pid == 0) {
180  int i;
181 
182  if (udf_filename.empty()) {
183  i = wrapped_execlp("java",
184  xDebug.c_str(),
185  remoteDebug.c_str(),
186  xmxP.c_str(),
187  logDirectory.c_str(),
188  jarP.c_str(),
189  jarD.c_str(),
190  extensionsP.c_str(),
191  extensionsD.c_str(),
192  dataP.c_str(),
193  dataD.c_str(),
194  localPortP.c_str(),
195  localPortD.c_str(),
196  dbPortP.c_str(),
197  dbPortD.c_str(),
198  TrustStoreP.c_str(),
199  ssl_trust_store.c_str(),
200  TrustPasswdP.c_str(),
201  trust_store_password.c_str(),
202  KeyStoreP.c_str(),
203  ssl_keystore.c_str(),
204  KeyStorePasswdP.c_str(),
205  key_store_password.c_str(),
206  ConfigFileP.c_str(),
207  db_config_file.c_str(),
208  (char*)0);
209  } else {
210  i = wrapped_execlp("java",
211  xDebug.c_str(),
212  remoteDebug.c_str(),
213  xmxP.c_str(),
214  logDirectory.c_str(),
215  jarP.c_str(),
216  jarD.c_str(),
217  extensionsP.c_str(),
218  extensionsD.c_str(),
219  dataP.c_str(),
220  dataD.c_str(),
221  localPortP.c_str(),
222  localPortD.c_str(),
223  dbPortP.c_str(),
224  dbPortD.c_str(),
225  TrustStoreP.c_str(),
226  ssl_trust_store.c_str(),
227  TrustPasswdP.c_str(),
228  trust_store_password.c_str(),
229  KeyStoreP.c_str(),
230  ssl_keystore.c_str(),
231  KeyStorePasswdP.c_str(),
232  key_store_password.c_str(),
233  ConfigFileP.c_str(),
234  db_config_file.c_str(),
235  userDefinedFunctionsP.c_str(),
236  userDefinedFunctionsD.c_str(),
237  (char*)0);
238  }
239 
240  if (i) {
241  int errsv = errno;
242  LOG(FATAL) << "Failed to start Calcite server [errno=" << errsv
243  << "]: " << strerror(errsv);
244  } else {
245  LOG(INFO) << "Successfully started Calcite server";
246  }
247  }
248 #endif
249 }
250 
251 std::pair<std::shared_ptr<CalciteServerClient>, std::shared_ptr<TTransport>>
253  const auto transport = connMgr_->open_buffered_client_transport("localhost",
254  port,
255  ssl_ca_file_,
256  true,
257  service_keepalive_,
258  service_timeout_,
259  service_timeout_,
260  service_timeout_);
261  try {
262  transport->open();
263 
264  } catch (TException& tx) {
265  throw tx;
266  } catch (std::exception& ex) {
267  throw ex;
268  }
269  std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
270  std::shared_ptr<CalciteServerClient> client;
271  client.reset(new CalciteServerClient(protocol));
272  std::pair<std::shared_ptr<CalciteServerClient>, std::shared_ptr<TTransport>> ret;
273  return std::make_pair(client, transport);
274 }
275 
276 void Calcite::runServer(const int db_port,
277  const int port,
278  const std::string& data_dir,
279  const size_t calcite_max_mem,
280  const std::string& udf_filename) {
281  LOG(INFO) << "Running Calcite server as a daemon";
282 
283  // ping server to see if for any reason there is an orphaned one
284  int ping_time = ping();
285  if (ping_time > -1) {
286  // we have an orphaned server shut it down
287  LOG(ERROR)
288  << "Appears to be orphaned Calcite server already running, shutting it down";
289  LOG(ERROR) << "Please check that you are not trying to run two servers on same port";
290  LOG(ERROR) << "Attempting to shutdown orphaned Calcite server";
291  try {
292  auto clientP = getClient(remote_calcite_port_);
293  clientP.first->shutdown();
294  clientP.second->close();
295  LOG(ERROR) << "orphaned Calcite server shutdown";
296 
297  } catch (TException& tx) {
298  LOG(ERROR) << "Failed to shutdown orphaned Calcite server, reason: " << tx.what();
299  }
300  }
301 
302  // start the calcite server as a seperate process
304  port,
305  data_dir,
306  calcite_max_mem,
307  ssl_trust_store_,
308  ssl_trust_password_,
309  ssl_keystore_,
310  ssl_keystore_password_,
311  ssl_key_file_,
312  db_config_file_,
313  udf_filename);
314 
315  // check for new server for 30 seconds max
316  std::this_thread::sleep_for(std::chrono::milliseconds(200));
317  int retry_max = 300;
318  for (int i = 2; i <= retry_max; i++) {
319  int ping_time = ping(i, retry_max);
320  if (ping_time > -1) {
321  LOG(INFO) << "Calcite server start took " << i * 100 << " ms ";
322  LOG(INFO) << "ping took " << ping_time << " ms ";
323  server_available_ = true;
324  return;
325  } else {
326  // wait 100 ms
327  std::this_thread::sleep_for(std::chrono::milliseconds(100));
328  }
329  }
330  server_available_ = false;
331  LOG(FATAL) << "Could not connect to Calcite remote server running on port [" << port
332  << "]";
333 }
334 
335 // ping existing server
336 // return -1 if no ping response
337 // params set to default values in header
338 int Calcite::ping(int retry_num, int max_retry) {
339  try {
340  auto ms = measure<>::execution([&]() {
341  auto clientP = getClient(remote_calcite_port_);
342  clientP.first->ping();
343  clientP.second->close();
344  });
345  return ms;
346 
347  } catch (TException& tx) {
348  if (retry_num >= max_retry) {
349  LOG(ERROR) << "Problems connecting to Calcite. Thrift error - " << tx.what();
350  }
351  return -1;
352  }
353 }
354 
355 Calcite::Calcite(const int db_port,
356  const int calcite_port,
357  const std::string& data_dir,
358  const size_t calcite_max_mem,
359  const size_t service_timeout,
360  const bool service_keepalive,
361  const std::string& udf_filename)
362  : server_available_(false)
363  , service_timeout_(service_timeout)
364  , service_keepalive_(service_keepalive) {
365  init(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
366 }
367 
368 void Calcite::init(const int db_port,
369  const int calcite_port,
370  const std::string& data_dir,
371  const size_t calcite_max_mem,
372  const std::string& udf_filename) {
373  LOG(INFO) << "Creating Calcite Handler, Calcite Port is " << calcite_port
374  << " base data dir is " << data_dir;
375  connMgr_ = std::make_shared<ThriftClientConnection>();
376  if (calcite_port < 0) {
377  CHECK(false) << "JNI mode no longer supported.";
378  }
379  if (calcite_port == 0) {
380  // dummy process for initheavy
381  remote_calcite_port_ = calcite_port;
382  server_available_ = false;
383  } else {
384  remote_calcite_port_ = calcite_port;
385  runServer(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
386  server_available_ = true;
387  }
388 }
389 
390 Calcite::Calcite(const SystemParameters& system_parameters,
391  const std::string& data_dir,
392  const std::string& udf_filename)
393  : service_timeout_(system_parameters.calcite_timeout)
394  , service_keepalive_(system_parameters.calcite_keepalive)
395  , ssl_trust_store_(system_parameters.ssl_trust_store)
396  , ssl_trust_password_(system_parameters.ssl_trust_password)
397  , ssl_key_file_(system_parameters.ssl_key_file)
398  , ssl_keystore_(system_parameters.ssl_keystore)
399  , ssl_keystore_password_(system_parameters.ssl_keystore_password)
400  , ssl_ca_file_(system_parameters.ssl_trust_ca_file)
401  , db_config_file_(system_parameters.config_file) {
402  init(system_parameters.omnisci_server_port,
403  system_parameters.calcite_port,
404  data_dir,
405  system_parameters.calcite_max_mem,
406  udf_filename);
407 }
408 
409 void Calcite::updateMetadata(std::string catalog, std::string table) {
410  if (server_available_) {
411  auto ms = measure<>::execution([&]() {
412  auto clientP = getClient(remote_calcite_port_);
413  clientP.first->updateMetadata(catalog, table);
414  clientP.second->close();
415  });
416  LOG(INFO) << "Time to updateMetadata " << ms << " (ms)";
417  } else {
418  LOG(INFO) << "Not routing to Calcite, server is not up";
419  }
420 }
421 
422 namespace {
424  const Catalog_Namespace::Catalog& accessed_catalog) {
425  const auto db_name = accessed_catalog.name();
426  DBObject db_object(db_name, DatabaseDBObjectType);
427  db_object.loadKey(accessed_catalog);
429 
430  const auto& user = session_info.get_currentUser();
431  if (!Catalog_Namespace::SysCatalog::instance().checkPrivileges(user, {db_object})) {
432  throw std::runtime_error("Unauthorized Access: user " + user.userLoggable() +
433  " is not allowed to access database " + db_name + ".");
434  }
435 }
436 
438  std::vector<std::vector<std::string>> tableOrViewNames,
439  AccessPrivileges tablePrivs,
440  AccessPrivileges viewPrivs) {
441  for (auto tableOrViewName : tableOrViewNames) {
442  // Calcite returns table names in the form of a {table_name, database_name} vector.
443  const auto catalog =
445  CHECK(catalog);
446  check_db_access(session_info, *catalog);
447 
448  const TableDescriptor* tableMeta =
449  catalog->getMetadataForTable(tableOrViewName[0], false);
450 
451  if (!tableMeta) {
452  throw std::runtime_error("unknown table of view: " + tableOrViewName[0]);
453  }
454 
455  DBObjectKey key;
456  key.dbId = catalog->getCurrentDB().dbId;
459  key.objectId = tableMeta->tableId;
460  AccessPrivileges privs = tableMeta->isView ? viewPrivs : tablePrivs;
461  DBObject dbobject(key, privs, tableMeta->userId);
462  std::vector<DBObject> privObjects{dbobject};
463 
464  if (!privs.hasAny()) {
465  throw std::runtime_error("Operation not supported for object " +
466  tableOrViewName[0]);
467  }
468 
469  if (!Catalog_Namespace::SysCatalog::instance().checkPrivileges(
470  session_info.get_currentUser(), privObjects)) {
471  throw std::runtime_error("Violation of access privileges: user " +
472  session_info.get_currentUser().userLoggable() +
473  " has no proper privileges for object " +
474  tableOrViewName[0]);
475  }
476  }
477 }
478 } // namespace
479 
480 TPlanResult Calcite::process(query_state::QueryStateProxy query_state_proxy,
481  std::string sql_string,
482  const TQueryParsingOption& query_parsing_option,
483  const TOptimizationOption& optimization_option,
484  const std::string& calcite_session_id) {
485  TPlanResult result = processImpl(query_state_proxy,
486  std::move(sql_string),
487  query_parsing_option,
488  optimization_option,
489  calcite_session_id);
490  if (query_parsing_option.check_privileges && !query_parsing_option.is_explain) {
491  checkAccessedObjectsPrivileges(query_state_proxy, result);
492  }
493  return result;
494 }
495 
497  query_state::QueryStateProxy query_state_proxy,
498  TPlanResult plan) const {
499  AccessPrivileges NOOP;
500  // check the individual tables
501  auto const session_ptr = query_state_proxy->getConstSessionInfo();
502  // TODO: Replace resolved tables vector with a `FullyQualifiedTableName` struct.
503  checkPermissionForTables(*session_ptr,
504  plan.primary_accessed_objects.tables_selected_from,
507  checkPermissionForTables(*session_ptr,
508  plan.primary_accessed_objects.tables_inserted_into,
510  NOOP);
511  checkPermissionForTables(*session_ptr,
512  plan.primary_accessed_objects.tables_updated_in,
514  NOOP);
515  checkPermissionForTables(*session_ptr,
516  plan.primary_accessed_objects.tables_deleted_from,
518  NOOP);
519 }
520 
521 std::vector<TCompletionHint> Calcite::getCompletionHints(
522  const Catalog_Namespace::SessionInfo& session_info,
523  const std::vector<std::string>& visible_tables,
524  const std::string sql_string,
525  const int cursor) {
526  std::vector<TCompletionHint> hints;
527  auto& cat = session_info.getCatalog();
528  const auto user = session_info.get_currentUser().userName;
529  const auto session = session_info.get_session_id();
530  const auto catalog = cat.getCurrentDB().dbName;
531  auto client = getClient(remote_calcite_port_);
532  client.first->getCompletionHints(
533  hints, user, session, catalog, visible_tables, sql_string, cursor);
534  return hints;
535 }
536 
537 std::vector<std::string> Calcite::get_db_objects(const std::string ra) {
538  std::vector<std::string> v_db_obj;
539  Document document;
540  document.Parse(ra.c_str());
541  const Value& rels = document["rels"];
542  CHECK(rels.IsArray());
543  for (auto& v : rels.GetArray()) {
544  std::string relOp(v["relOp"].GetString());
545  if (!relOp.compare("EnumerableTableScan")) {
546  std::string x;
547  auto t = v["table"].GetArray();
548  x = t[1].GetString();
549  v_db_obj.push_back(x);
550  }
551  }
552 
553  return v_db_obj;
554 }
555 
557  const std::string sql_string,
558  const TQueryParsingOption& query_parsing_option,
559  const TOptimizationOption& optimization_option,
560  const std::string& calcite_session_id) {
561  query_state::Timer timer = query_state_proxy.createTimer(__func__);
562  const auto& user_session_info = query_state_proxy->getConstSessionInfo();
563  const auto& cat = user_session_info->getCatalog();
564  const std::string user = getInternalSessionProxyUserName();
565  const std::string catalog = cat.getCurrentDB().dbName;
566  LOG(INFO) << "User " << user << " catalog " << catalog << " sql '"
567  << hide_sensitive_data_from_query(sql_string) << "'";
568  LOG(IR) << "SQL query\n"
569  << hide_sensitive_data_from_query(sql_string) << "\nEnd of SQL query";
570  LOG(PTX) << "SQL query\n"
571  << hide_sensitive_data_from_query(sql_string) << "\nEnd of SQL query";
572  LOG(EXECUTOR) << "SQL query\n"
573  << hide_sensitive_data_from_query(sql_string) << "\nEnd of SQL query";
574 
575  std::vector<TRestriction> trestrictions;
576 
577  TPlanResult ret;
578  if (server_available_) {
579  try {
580  // calcite_session_id would be an empty string when accessed by internal resources
581  // that would not access `process` through handler instance, like for eg: Unit
582  // Tests. In these cases we would use the session_id from query state.
583  auto ms = measure<>::execution([&]() {
584  auto clientP = getClient(remote_calcite_port_);
585  clientP.first->process(ret,
586  user,
587  calcite_session_id.empty()
588  ? user_session_info->get_session_id()
589  : calcite_session_id,
590  catalog,
591  sql_string,
592  query_parsing_option,
593  optimization_option,
594  trestrictions);
595  clientP.second->close();
596  });
597 
598  // LOG(INFO) << ret.plan_result;
599  LOG(INFO) << "Time in Thrift "
600  << (ms > ret.execution_time_ms ? ms - ret.execution_time_ms : 0)
601  << " (ms), Time in Java Calcite server " << ret.execution_time_ms
602  << " (ms)";
603  } catch (InvalidParseRequest& e) {
604  throw std::invalid_argument(e.whyUp);
605  } catch (const TTransportException& ex) {
606  if (ex.getType() == TTransportException::TIMED_OUT) {
607  LOG(WARNING) << "Calcite request timed out: " << ex.what();
608  } else {
609  LOG(FATAL) << "Error occurred trying to communicate with Calcite server, the "
610  "error was: '"
611  << ex.what() << "', heavydb restart will be required";
612  }
613  throw;
614  } catch (const std::exception& ex) {
615  LOG(FATAL)
616  << "Error occurred trying to communicate with Calcite server, the error was: '"
617  << ex.what() << "', heavydb restart will be required";
618  return ret; // satisfy return-type warning
619  }
620  } else {
621  LOG(FATAL) << "Not routing to Calcite, server is not up";
622  }
623  return ret;
624 }
625 
627  if (server_available_) {
628  TPlanResult ret;
629  std::string whitelist;
630 
631  auto clientP = getClient(remote_calcite_port_);
632  clientP.first->getExtensionFunctionWhitelist(whitelist);
633  clientP.second->close();
634  VLOG(1) << whitelist;
635  return whitelist;
636  } else {
637  LOG(FATAL) << "Not routing to Calcite, server is not up";
638  return "";
639  }
640  CHECK(false);
641  return "";
642 }
643 
645  if (server_available_) {
646  TPlanResult ret;
647  std::string whitelist;
648 
649  auto clientP = getClient(remote_calcite_port_);
650  clientP.first->getUserDefinedFunctionWhitelist(whitelist);
651  clientP.second->close();
652  VLOG(1) << "User defined functions whitelist loaded from Calcite: " << whitelist;
653  return whitelist;
654  } else {
655  LOG(FATAL) << "Not routing to Calcite, server is not up";
656  return "";
657  }
658  UNREACHABLE();
659  return "";
660 }
661 
663  std::call_once(shutdown_once_flag_,
664  [this, log]() { this->inner_close_calcite_server(log); });
665 }
666 
668  if (server_available_) {
669  LOG_IF(INFO, log) << "Shutting down Calcite server";
670  try {
671  auto clientP = getClient(remote_calcite_port_);
672  clientP.first->shutdown();
673  clientP.second->close();
674  } catch (const std::exception& e) {
675  if (std::string(e.what()) != "connect() failed: Connection refused" &&
676  std::string(e.what()) != "socket open() error: Connection refused" &&
677  std::string(e.what()) != "No more data to read.") {
678  std::cerr << "Error shutting down Calcite server: " << e.what() << std::endl;
679  } // else Calcite already shut down
680  }
681  LOG_IF(INFO, log) << "shut down Calcite";
682  server_available_ = false;
683  }
684 }
685 
687  close_calcite_server(false);
688 }
689 
691  if (server_available_) {
692  TPlanResult ret;
693  std::string whitelist;
694  auto clientP = getClient(remote_calcite_port_);
695  clientP.first->getRuntimeExtensionFunctionWhitelist(whitelist);
696  clientP.second->close();
697  VLOG(1) << "Runtime extension functions whitelist loaded from Calcite: " << whitelist;
698  return whitelist;
699  } else {
700  LOG(FATAL) << "Not routing to Calcite, server is not up";
701  return "";
702  }
703  UNREACHABLE();
704  return "";
705 }
706 
708  const std::vector<TUserDefinedFunction>& udfs,
709  const std::vector<TUserDefinedTableFunction>& udtfs,
710  bool isruntime) {
711  if (server_available_) {
712  auto clientP = getClient(remote_calcite_port_);
713  clientP.first->setRuntimeExtensionFunctions(udfs, udtfs, isruntime);
714  clientP.second->close();
715  } else {
716  LOG(FATAL) << "Not routing to Calcite, server is not up";
717  }
718 }
719 
720 TQueryParsingOption Calcite::getCalciteQueryParsingOption(bool legacy_syntax,
721  bool is_explain,
722  bool check_privileges,
723  bool is_explain_detail) {
724  TQueryParsingOption query_parsing_info;
725  query_parsing_info.legacy_syntax = legacy_syntax;
726  query_parsing_info.is_explain = is_explain;
727  query_parsing_info.check_privileges = check_privileges;
728  query_parsing_info.is_explain_detail = is_explain_detail;
729  // `EXPLAIN CALCITE DETAIL` requires `is_explain` set to TRUE
730  CHECK_LE(is_explain_detail, is_explain);
731  return query_parsing_info;
732 }
733 
735  bool is_view_optimize,
736  bool enable_watchdog,
737  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
738  bool distributed_mode) {
739  TOptimizationOption optimization_option;
740  optimization_option.filter_push_down_info = filter_push_down_info;
741  optimization_option.is_view_optimize = is_view_optimize;
742  optimization_option.enable_watchdog = enable_watchdog;
743  optimization_option.distributed_mode = distributed_mode;
744  return optimization_option;
745 }
std::vector< std::string > get_db_objects(const std::string ra)
Definition: Calcite.cpp:537
std::string hide_sensitive_data_from_query(std::string const &query_str)
int wrapped_execlp(char const *path, XDEBUG_OPTION &&x_debug, REMOTE_DEBUG_OPTION &&remote_debug, REMAINING_ARGS &&...standard_args)
Definition: Calcite.cpp:58
TPlanResult processImpl(query_state::QueryStateProxy, std::string sql_string, const TQueryParsingOption &query_parsing_option, const TOptimizationOption &optimization_option, const std::string &calcite_session_id)
Definition: Calcite.cpp:556
std::string cat(Ts &&...args)
std::once_flag shutdown_once_flag_
Definition: Calcite.h:146
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
std::string get_root_abs_path()
void init(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
Definition: Calcite.cpp:368
Calcite()
Definition: Calcite.h:74
#define LOG(tag)
Definition: Logger.h:285
static std::string const getInternalSessionProxyUserName()
Definition: Calcite.h:97
const std::string kDefaultLogDirName
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:161
std::string join(T const &container, std::string const &delim)
#define UNREACHABLE()
Definition: Logger.h:338
TQueryParsingOption getCalciteQueryParsingOption(bool legacy_syntax, bool is_explain, bool check_privileges, bool is_explain_detail)
Definition: Calcite.cpp:720
bool hasAny() const
Definition: DBObject.h:140
void setRuntimeExtensionFunctions(const std::vector< TUserDefinedFunction > &udfs, const std::vector< TUserDefinedTableFunction > &udtfs, bool isruntime=true)
Definition: Calcite.cpp:707
int32_t objectId
Definition: DBObject.h:55
std::shared_ptr< ThriftClientConnection > connMgr_
Definition: Calcite.h:134
void setPrivileges(const AccessPrivileges &privs)
Definition: DBObject.h:227
Timer createTimer(char const *event_name)
Definition: QueryState.cpp:129
TOptimizationOption getCalciteOptimizationOption(bool is_view_optimize, bool enable_watchdog, const std::vector< TFilterPushDownInfo > &filter_push_down_info, bool distributed_mode)
Definition: Calcite.cpp:734
static const AccessPrivileges SELECT_FROM_TABLE
Definition: DBObject.h:160
std::string to_string(char const *&&v)
#define LOG_IF(severity, condition)
Definition: Logger.h:384
virtual void updateMetadata(std::string catalog, std::string table)
Definition: Calcite.cpp:409
std::string name() const
Definition: Catalog.h:348
This file contains the class specification and related data structures for Catalog.
void runServer(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
Definition: Calcite.cpp:276
static SysCatalog & instance()
Definition: SysCatalog.h:343
static void start_calcite_server_as_daemon(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &ssl_trust_store, const std::string &ssl_trust_password_X, const std::string &ssl_keystore, const std::string &ssl_keystore_password_X, const std::string &ssl_key_file, const std::string &db_config_file, const std::string &udf_filename)
Definition: Calcite.cpp:71
std::vector< TCompletionHint > getCompletionHints(const Catalog_Namespace::SessionInfo &session_info, const std::vector< std::string > &visible_tables, const std::string sql_string, const int cursor)
Definition: Calcite.cpp:521
static const AccessPrivileges DELETE_FROM_TABLE
Definition: DBObject.h:163
int ping(int retry_num=0, int max_retry=50)
Definition: Calcite.cpp:338
std::string getUserDefinedFunctionWhitelist()
Definition: Calcite.cpp:644
std::string get_session_id() const
Definition: SessionInfo.h:93
void loadKey()
Definition: DBObject.cpp:190
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
void checkPermissionForTables(const Catalog_Namespace::SessionInfo &session_info, std::vector< std::vector< std::string >> tableOrViewNames, AccessPrivileges tablePrivs, AccessPrivileges viewPrivs)
Definition: Calcite.cpp:437
void check_db_access(const Catalog_Namespace::SessionInfo &session_info, const Catalog_Namespace::Catalog &accessed_catalog)
Definition: Calcite.cpp:423
#define CHECK_LE(x, y)
Definition: Logger.h:304
static const AccessPrivileges SELECT_FROM_VIEW
Definition: DBObject.h:180
Catalog & getCatalog() const
Definition: SessionInfo.h:75
int remote_calcite_port_
Definition: Calcite.h:138
int32_t dbId
Definition: DBObject.h:54
void inner_close_calcite_server(bool log)
Definition: Calcite.cpp:667
static const AccessPrivileges ACCESS
Definition: DBObject.h:153
bool server_available_
Definition: Calcite.h:135
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:252
int32_t permissionType
Definition: DBObject.h:53
TPlanResult process(query_state::QueryStateProxy, std::string sql_string, const TQueryParsingOption &query_parsing_option, const TOptimizationOption &optimization_option, const std::string &calcite_session_id="")
Definition: Calcite.cpp:480
static const AccessPrivileges UPDATE_IN_TABLE
Definition: DBObject.h:162
void checkAccessedObjectsPrivileges(query_state::QueryStateProxy query_state_prox, TPlanResult plan) const
Definition: Calcite.cpp:496
std::string userLoggable() const
Definition: SysCatalog.cpp:158
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:84
virtual ~Calcite()
Definition: Calcite.cpp:686
std::string getExtensionFunctionWhitelist()
Definition: Calcite.cpp:626
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:88
std::string getRuntimeExtensionFunctionWhitelist()
Definition: Calcite.cpp:690
#define VLOG(n)
Definition: Logger.h:388
void close_calcite_server(bool log=true)
Definition: Calcite.cpp:662