OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Calcite.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: Calcite.cpp
19  * Author: michael
20  *
21  * Created on November 23, 2015, 9:33 AM
22  */
23 
24 #include "Calcite.h"
25 #include "Catalog/Catalog.h"
26 #include "Logger/Logger.h"
29 #include "Shared/ThriftClient.h"
30 #include "Shared/fixautotools.h"
31 #include "Shared/measure.h"
33 
34 #include <thrift/protocol/TBinaryProtocol.h>
35 #include <thrift/transport/TSocket.h>
36 #include <thrift/transport/TTransportUtils.h>
37 #include <type_traits>
38 
39 #ifdef _MSC_VER
40 #include <process.h>
41 #endif
42 
43 #include "gen-cpp/CalciteServer.h"
44 
45 #include "rapidjson/document.h"
46 
47 #include <utility>
48 
49 using namespace rapidjson;
50 using namespace apache::thrift;
51 using namespace apache::thrift::protocol;
52 using namespace apache::thrift::transport;
53 
54 namespace {
55 template <typename XDEBUG_OPTION,
56  typename REMOTE_DEBUG_OPTION,
57  typename... REMAINING_ARGS>
58 int wrapped_execlp(char const* path,
59  XDEBUG_OPTION&& x_debug,
60  REMOTE_DEBUG_OPTION&& remote_debug,
61  REMAINING_ARGS&&... standard_args) {
62 #ifdef ENABLE_JAVA_REMOTE_DEBUG
63  return execlp(
64  path, x_debug, remote_debug, std::forward<REMAINING_ARGS>(standard_args)...);
65 #else
66  return execlp(path, std::forward<REMAINING_ARGS>(standard_args)...);
67 #endif
68 }
69 } // namespace
70 
71 static void start_calcite_server_as_daemon(const int db_port,
72  const int port,
73  const std::string& data_dir,
74  const size_t calcite_max_mem,
75  const std::string& ssl_trust_store,
76  const std::string& ssl_trust_password_X,
77  const std::string& ssl_keystore,
78  const std::string& ssl_keystore_password_X,
79  const std::string& ssl_key_file,
80  const std::string& db_config_file,
81  const std::string& udf_filename) {
82  auto root_abs_path = omnisci::get_root_abs_path();
83  std::string const xDebug = "-Xdebug";
84  std::string const remoteDebug =
85  "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005";
86  std::string xmxP = "-Xmx" + std::to_string(calcite_max_mem) + "m";
87  std::string jarP = "-jar";
88  std::string jarD =
89  root_abs_path + "/bin/calcite-1.0-SNAPSHOT-jar-with-dependencies.jar";
90  std::string extensionsP = "-e";
91  std::string extensionsD = root_abs_path + "/QueryEngine/";
92  std::string dataP = "-d";
93  std::string dataD = data_dir;
94  std::string localPortP = "-p";
95  std::string localPortD = std::to_string(port);
96  std::string dbPortP = "-m";
97  std::string dbPortD = std::to_string(db_port);
98  std::string TrustStoreP = "-T";
99  std::string TrustPasswdP = "-P";
100  std::string ConfigFileP = "-c";
101  std::string KeyStoreP = "-Y";
102  std::string KeyStorePasswdP = "-Z";
103  std::string logDirectory = "-DMAPD_LOG_DIR=" + data_dir;
104  std::string userDefinedFunctionsP = "";
105  std::string userDefinedFunctionsD = "";
106 
107  if (!udf_filename.empty()) {
108  userDefinedFunctionsP += "-u";
109  userDefinedFunctionsD += udf_filename;
110  }
111 
112  // If a config file hasn't been supplied then put the password in the params
113  // otherwise send an empty string and Calcite should get it from the config file.
114  std::string key_store_password = (db_config_file == "") ? ssl_keystore_password_X : "";
115  std::string trust_store_password = (db_config_file == "") ? ssl_trust_password_X : "";
116 #ifdef _MSC_VER
117  // TODO: enable UDF support
118  std::vector<std::string> args_vec;
119  args_vec.push_back("java");
120  args_vec.push_back(xDebug);
121  args_vec.push_back(remoteDebug);
122  args_vec.push_back(xmxP);
123  args_vec.push_back(logDirectory);
124  args_vec.push_back(jarP);
125  args_vec.push_back(jarD);
126  args_vec.push_back(extensionsP);
127  args_vec.push_back(extensionsD);
128  args_vec.push_back(dataP);
129  args_vec.push_back(dataD);
130  args_vec.push_back(localPortP);
131  args_vec.push_back(localPortD);
132  args_vec.push_back(dbPortP);
133  args_vec.push_back(dbPortD);
134  if (!ssl_trust_store.empty()) {
135  args_vec.push_back(TrustStoreP);
136  args_vec.push_back(ssl_trust_store);
137  }
138  if (!trust_store_password.empty()) {
139  args_vec.push_back(TrustPasswdP);
140  args_vec.push_back(trust_store_password);
141  }
142  if (!ssl_keystore.empty()) {
143  args_vec.push_back(KeyStoreP);
144  args_vec.push_back(ssl_keystore);
145  }
146  if (!key_store_password.empty()) {
147  args_vec.push_back(KeyStorePasswdP);
148  args_vec.push_back(key_store_password);
149  }
150  if (!db_config_file.empty()) {
151  args_vec.push_back(ConfigFileP);
152  args_vec.push_back(db_config_file);
153  }
154  std::string args{boost::algorithm::join(args_vec, " ")};
155  STARTUPINFO startup_info;
156  PROCESS_INFORMATION proc_info;
157  ZeroMemory(&startup_info, sizeof(startup_info));
158  startup_info.cb = sizeof(startup_info);
159  ZeroMemory(&proc_info, sizeof(proc_info));
160  LOG(INFO) << "Startup command: " << args;
161  std::wstring wargs = std::wstring(args.begin(), args.end());
162  const auto ret = CreateProcess(NULL,
163  (LPWSTR)wargs.c_str(),
164  NULL,
165  NULL,
166  false,
167  0,
168  NULL,
169  NULL,
170  &startup_info,
171  &proc_info);
172  if (ret == 0) {
173  LOG(FATAL) << "Failed to start Calcite server " << GetLastError();
174  }
175 #else
176  int pid = fork();
177  if (pid == 0) {
178  int i;
179 
180  if (udf_filename.empty()) {
181  i = wrapped_execlp("java",
182  xDebug.c_str(),
183  remoteDebug.c_str(),
184  xmxP.c_str(),
185  logDirectory.c_str(),
186  jarP.c_str(),
187  jarD.c_str(),
188  extensionsP.c_str(),
189  extensionsD.c_str(),
190  dataP.c_str(),
191  dataD.c_str(),
192  localPortP.c_str(),
193  localPortD.c_str(),
194  dbPortP.c_str(),
195  dbPortD.c_str(),
196  TrustStoreP.c_str(),
197  ssl_trust_store.c_str(),
198  TrustPasswdP.c_str(),
199  trust_store_password.c_str(),
200  KeyStoreP.c_str(),
201  ssl_keystore.c_str(),
202  KeyStorePasswdP.c_str(),
203  key_store_password.c_str(),
204  ConfigFileP.c_str(),
205  db_config_file.c_str(),
206  (char*)0);
207  } else {
208  i = wrapped_execlp("java",
209  xDebug.c_str(),
210  remoteDebug.c_str(),
211  xmxP.c_str(),
212  logDirectory.c_str(),
213  jarP.c_str(),
214  jarD.c_str(),
215  extensionsP.c_str(),
216  extensionsD.c_str(),
217  dataP.c_str(),
218  dataD.c_str(),
219  localPortP.c_str(),
220  localPortD.c_str(),
221  dbPortP.c_str(),
222  dbPortD.c_str(),
223  TrustStoreP.c_str(),
224  ssl_trust_store.c_str(),
225  TrustPasswdP.c_str(),
226  trust_store_password.c_str(),
227  KeyStoreP.c_str(),
228  ssl_keystore.c_str(),
229  KeyStorePasswdP.c_str(),
230  key_store_password.c_str(),
231  ConfigFileP.c_str(),
232  db_config_file.c_str(),
233  userDefinedFunctionsP.c_str(),
234  userDefinedFunctionsD.c_str(),
235  (char*)0);
236  }
237 
238  if (i) {
239  int errsv = errno;
240  LOG(FATAL) << "Failed to start Calcite server [errno=" << errsv
241  << "]: " << strerror(errsv);
242  } else {
243  LOG(INFO) << "Successfully started Calcite server";
244  }
245  }
246 #endif
247 }
248 
249 std::pair<std::shared_ptr<CalciteServerClient>, std::shared_ptr<TTransport>>
251  const auto transport = connMgr_->open_buffered_client_transport("localhost",
252  port,
253  ssl_ca_file_,
254  true,
255  service_keepalive_,
256  service_timeout_,
257  service_timeout_,
258  service_timeout_);
259  try {
260  transport->open();
261 
262  } catch (TException& tx) {
263  throw tx;
264  } catch (std::exception& ex) {
265  throw ex;
266  }
267  std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
268  std::shared_ptr<CalciteServerClient> client;
269  client.reset(new CalciteServerClient(protocol));
270  std::pair<std::shared_ptr<CalciteServerClient>, std::shared_ptr<TTransport>> ret;
271  return std::make_pair(client, transport);
272 }
273 
274 void Calcite::runServer(const int db_port,
275  const int port,
276  const std::string& data_dir,
277  const size_t calcite_max_mem,
278  const std::string& udf_filename) {
279  LOG(INFO) << "Running Calcite server as a daemon";
280 
281  // ping server to see if for any reason there is an orphaned one
282  int ping_time = ping();
283  if (ping_time > -1) {
284  // we have an orphaned server shut it down
285  LOG(ERROR)
286  << "Appears to be orphaned Calcite server already running, shutting it down";
287  LOG(ERROR) << "Please check that you are not trying to run two servers on same port";
288  LOG(ERROR) << "Attempting to shutdown orphaned Calcite server";
289  try {
290  auto clientP = getClient(remote_calcite_port_);
291  clientP.first->shutdown();
292  clientP.second->close();
293  LOG(ERROR) << "orphaned Calcite server shutdown";
294 
295  } catch (TException& tx) {
296  LOG(ERROR) << "Failed to shutdown orphaned Calcite server, reason: " << tx.what();
297  }
298  }
299 
300  // start the calcite server as a seperate process
302  port,
303  data_dir,
304  calcite_max_mem,
305  ssl_trust_store_,
306  ssl_trust_password_,
307  ssl_keystore_,
308  ssl_keystore_password_,
309  ssl_key_file_,
310  db_config_file_,
311  udf_filename);
312 
313  // check for new server for 30 seconds max
314  std::this_thread::sleep_for(std::chrono::milliseconds(200));
315  int retry_max = 300;
316  for (int i = 2; i <= retry_max; i++) {
317  int ping_time = ping(i, retry_max);
318  if (ping_time > -1) {
319  LOG(INFO) << "Calcite server start took " << i * 100 << " ms ";
320  LOG(INFO) << "ping took " << ping_time << " ms ";
321  server_available_ = true;
322  return;
323  } else {
324  // wait 100 ms
325  std::this_thread::sleep_for(std::chrono::milliseconds(100));
326  }
327  }
328  server_available_ = false;
329  LOG(FATAL) << "Could not connect to Calcite remote server running on port [" << port
330  << "]";
331 }
332 
333 // ping existing server
334 // return -1 if no ping response
335 // params set to default values in header
336 int Calcite::ping(int retry_num, int max_retry) {
337  try {
338  auto ms = measure<>::execution([&]() {
339  auto clientP = getClient(remote_calcite_port_);
340  clientP.first->ping();
341  clientP.second->close();
342  });
343  return ms;
344 
345  } catch (TException& tx) {
346  if (retry_num >= max_retry) {
347  LOG(ERROR) << "Problems connecting to Calcite. Thrift error - " << tx.what();
348  }
349  return -1;
350  }
351 }
352 
353 Calcite::Calcite(const int db_port,
354  const int calcite_port,
355  const std::string& data_dir,
356  const size_t calcite_max_mem,
357  const size_t service_timeout,
358  const bool service_keepalive,
359  const std::string& udf_filename)
360  : server_available_(false)
361  , service_timeout_(service_timeout)
362  , service_keepalive_(service_keepalive) {
363  init(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
364 }
365 
366 void Calcite::init(const int db_port,
367  const int calcite_port,
368  const std::string& data_dir,
369  const size_t calcite_max_mem,
370  const std::string& udf_filename) {
371  LOG(INFO) << "Creating Calcite Handler, Calcite Port is " << calcite_port
372  << " base data dir is " << data_dir;
373  connMgr_ = std::make_shared<ThriftClientConnection>();
374  if (calcite_port < 0) {
375  CHECK(false) << "JNI mode no longer supported.";
376  }
377  if (calcite_port == 0) {
378  // dummy process for initdb
379  remote_calcite_port_ = calcite_port;
380  server_available_ = false;
381  } else {
382  remote_calcite_port_ = calcite_port;
383  runServer(db_port, calcite_port, data_dir, calcite_max_mem, udf_filename);
384  server_available_ = true;
385  }
386 }
387 
388 Calcite::Calcite(const SystemParameters& system_parameters,
389  const std::string& data_dir,
390  const std::string& udf_filename)
391  : service_timeout_(system_parameters.calcite_timeout)
392  , service_keepalive_(system_parameters.calcite_keepalive)
393  , ssl_trust_store_(system_parameters.ssl_trust_store)
394  , ssl_trust_password_(system_parameters.ssl_trust_password)
395  , ssl_key_file_(system_parameters.ssl_key_file)
396  , ssl_keystore_(system_parameters.ssl_keystore)
397  , ssl_keystore_password_(system_parameters.ssl_keystore_password)
398  , ssl_ca_file_(system_parameters.ssl_trust_ca_file)
399  , db_config_file_(system_parameters.config_file) {
400  init(system_parameters.omnisci_server_port,
401  system_parameters.calcite_port,
402  data_dir,
403  system_parameters.calcite_max_mem,
404  udf_filename);
405 }
406 
407 void Calcite::updateMetadata(std::string catalog, std::string table) {
408  if (server_available_) {
409  auto ms = measure<>::execution([&]() {
410  auto clientP = getClient(remote_calcite_port_);
411  clientP.first->updateMetadata(catalog, table);
412  clientP.second->close();
413  });
414  LOG(INFO) << "Time to updateMetadata " << ms << " (ms)";
415  } else {
416  LOG(INFO) << "Not routing to Calcite, server is not up";
417  }
418 }
419 
421  std::vector<std::vector<std::string>> tableOrViewNames,
422  AccessPrivileges tablePrivs,
423  AccessPrivileges viewPrivs) {
424  // TODO MAT this needs to be able to check privileges from other catalogs
425  Catalog_Namespace::Catalog& catalog = session_info.getCatalog();
426 
427  for (auto tableOrViewName : tableOrViewNames) {
428  const TableDescriptor* tableMeta =
429  catalog.getMetadataForTable(tableOrViewName[0], false);
430 
431  if (!tableMeta) {
432  throw std::runtime_error("unknown table of view: " + tableOrViewName[0]);
433  }
434 
435  DBObjectKey key;
436  key.dbId = catalog.getCurrentDB().dbId;
439  key.objectId = tableMeta->tableId;
440  AccessPrivileges privs = tableMeta->isView ? viewPrivs : tablePrivs;
441  DBObject dbobject(key, privs, tableMeta->userId);
442  std::vector<DBObject> privObjects{dbobject};
443 
444  if (!privs.hasAny()) {
445  throw std::runtime_error("Operation not supported for object " +
446  tableOrViewName[0]);
447  }
448 
449  if (!Catalog_Namespace::SysCatalog::instance().checkPrivileges(
450  session_info.get_currentUser(), privObjects)) {
451  throw std::runtime_error("Violation of access privileges: user " +
452  session_info.get_currentUser().userLoggable() +
453  " has no proper privileges for object " +
454  tableOrViewName[0]);
455  }
456  }
457 }
458 
459 TPlanResult Calcite::process(
460  query_state::QueryStateProxy query_state_proxy,
461  std::string sql_string,
462  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
463  const bool legacy_syntax,
464  const bool is_explain,
465  const bool is_view_optimize,
466  const bool check_privileges,
467  const std::string& calcite_session_id) {
468  TPlanResult result = processImpl(query_state_proxy,
469  std::move(sql_string),
470  filter_push_down_info,
471  legacy_syntax,
472  is_explain,
473  is_view_optimize,
474  calcite_session_id);
475  if (check_privileges && !is_explain) {
476  checkAccessedObjectsPrivileges(query_state_proxy, result);
477  }
478  return result;
479 }
480 
482  query_state::QueryStateProxy query_state_proxy,
483  TPlanResult plan) const {
484  AccessPrivileges NOOP;
485  // check the individual tables
486  auto const session_ptr = query_state_proxy.getQueryState().getConstSessionInfo();
487  checkPermissionForTables(*session_ptr,
488  plan.primary_accessed_objects.tables_selected_from,
491  checkPermissionForTables(*session_ptr,
492  plan.primary_accessed_objects.tables_inserted_into,
494  NOOP);
495  checkPermissionForTables(*session_ptr,
496  plan.primary_accessed_objects.tables_updated_in,
498  NOOP);
499  checkPermissionForTables(*session_ptr,
500  plan.primary_accessed_objects.tables_deleted_from,
502  NOOP);
503 }
504 
505 std::vector<TCompletionHint> Calcite::getCompletionHints(
506  const Catalog_Namespace::SessionInfo& session_info,
507  const std::vector<std::string>& visible_tables,
508  const std::string sql_string,
509  const int cursor) {
510  std::vector<TCompletionHint> hints;
511  auto& cat = session_info.getCatalog();
512  const auto user = session_info.get_currentUser().userName;
513  const auto session = session_info.get_session_id();
514  const auto catalog = cat.getCurrentDB().dbName;
515  auto client = getClient(remote_calcite_port_);
516  client.first->getCompletionHints(
517  hints, user, session, catalog, visible_tables, sql_string, cursor);
518  return hints;
519 }
520 
521 std::vector<std::string> Calcite::get_db_objects(const std::string ra) {
522  std::vector<std::string> v_db_obj;
523  Document document;
524  document.Parse(ra.c_str());
525  const Value& rels = document["rels"];
526  CHECK(rels.IsArray());
527  for (auto& v : rels.GetArray()) {
528  std::string relOp(v["relOp"].GetString());
529  if (!relOp.compare("EnumerableTableScan")) {
530  std::string x;
531  auto t = v["table"].GetArray();
532  x = t[1].GetString();
533  v_db_obj.push_back(x);
534  }
535  }
536 
537  return v_db_obj;
538 }
539 
541  query_state::QueryStateProxy query_state_proxy,
542  const std::string sql_string,
543  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
544  const bool legacy_syntax,
545  const bool is_explain,
546  const bool is_view_optimize,
547  const std::string& calcite_session_id) {
548  query_state::Timer timer = query_state_proxy.createTimer(__func__);
549  const auto& user_session_info = query_state_proxy.getQueryState().getConstSessionInfo();
550  const auto& cat = user_session_info->getCatalog();
551  const std::string user = getInternalSessionProxyUserName();
552  const std::string catalog = cat.getCurrentDB().dbName;
553  LOG(INFO) << "User " << user << " catalog " << catalog << " sql '" << sql_string << "'";
554  LOG(IR) << "SQL query\n" << sql_string << "\nEnd of SQL query";
555  LOG(PTX) << "SQL query\n" << sql_string << "\nEnd of SQL query";
556 
557  TRestriction restriction;
558  auto rest = user_session_info->get_restriction_ptr();
559  if (rest != nullptr && !rest->column.empty()) {
560  VLOG(1) << "This users session has a restriction : " << *rest;
561  restriction.column = rest->column;
562  restriction.values = rest->values;
563  }
564  TPlanResult ret;
565  if (server_available_) {
566  try {
567  // calcite_session_id would be an empty string when accessed by internal resources
568  // that would not access `process` through handler instance, like for eg: Unit
569  // Tests. In these cases we would use the session_id from query state.
570  auto ms = measure<>::execution([&]() {
571  auto clientP = getClient(remote_calcite_port_);
572  clientP.first->process(ret,
573  user,
574  calcite_session_id.empty()
575  ? user_session_info->get_session_id()
576  : calcite_session_id,
577  catalog,
578  sql_string,
579  filter_push_down_info,
580  legacy_syntax,
581  is_explain,
582  is_view_optimize,
583  restriction);
584  clientP.second->close();
585  });
586 
587  // LOG(INFO) << ret.plan_result;
588  LOG(INFO) << "Time in Thrift "
589  << (ms > ret.execution_time_ms ? ms - ret.execution_time_ms : 0)
590  << " (ms), Time in Java Calcite server " << ret.execution_time_ms
591  << " (ms)";
592  } catch (InvalidParseRequest& e) {
593  throw std::invalid_argument(e.whyUp);
594  } catch (const std::exception& ex) {
595  LOG(FATAL)
596  << "Error occurred trying to communicate with Calcite server, the error was: '"
597  << ex.what() << "', omnisci_server restart will be required";
598  return ret; // satisfy return-type warning
599  }
600  } else {
601  LOG(FATAL) << "Not routing to Calcite, server is not up";
602  }
603  return ret;
604 }
605 
607  if (server_available_) {
608  TPlanResult ret;
609  std::string whitelist;
610 
611  auto clientP = getClient(remote_calcite_port_);
612  clientP.first->getExtensionFunctionWhitelist(whitelist);
613  clientP.second->close();
614  VLOG(1) << whitelist;
615  return whitelist;
616  } else {
617  LOG(FATAL) << "Not routing to Calcite, server is not up";
618  return "";
619  }
620  CHECK(false);
621  return "";
622 }
623 
625  if (server_available_) {
626  TPlanResult ret;
627  std::string whitelist;
628 
629  auto clientP = getClient(remote_calcite_port_);
630  clientP.first->getUserDefinedFunctionWhitelist(whitelist);
631  clientP.second->close();
632  VLOG(1) << "User defined functions whitelist loaded from Calcite: " << whitelist;
633  return whitelist;
634  } else {
635  LOG(FATAL) << "Not routing to Calcite, server is not up";
636  return "";
637  }
638  UNREACHABLE();
639  return "";
640 }
641 
643  std::call_once(shutdown_once_flag_,
644  [this, log]() { this->inner_close_calcite_server(log); });
645 }
646 
648  if (server_available_) {
649  LOG_IF(INFO, log) << "Shutting down Calcite server";
650  try {
651  auto clientP = getClient(remote_calcite_port_);
652  clientP.first->shutdown();
653  clientP.second->close();
654  } catch (const std::exception& e) {
655  if (std::string(e.what()) != "connect() failed: Connection refused" &&
656  std::string(e.what()) != "socket open() error: Connection refused" &&
657  std::string(e.what()) != "No more data to read.") {
658  std::cerr << "Error shutting down Calcite server: " << e.what() << std::endl;
659  } // else Calcite already shut down
660  }
661  LOG_IF(INFO, log) << "shut down Calcite";
662  server_available_ = false;
663  }
664 }
665 
667  close_calcite_server(false);
668 }
669 
671  if (server_available_) {
672  TPlanResult ret;
673  std::string whitelist;
674  auto clientP = getClient(remote_calcite_port_);
675  clientP.first->getRuntimeExtensionFunctionWhitelist(whitelist);
676  clientP.second->close();
677  VLOG(1) << "Runtime extension functions whitelist loaded from Calcite: " << whitelist;
678  return whitelist;
679  } else {
680  LOG(FATAL) << "Not routing to Calcite, server is not up";
681  return "";
682  }
683  UNREACHABLE();
684  return "";
685 }
686 
688  const std::vector<TUserDefinedFunction>& udfs,
689  const std::vector<TUserDefinedTableFunction>& udtfs,
690  bool isruntime) {
691  if (server_available_) {
692  auto clientP = getClient(remote_calcite_port_);
693  clientP.first->setRuntimeExtensionFunctions(udfs, udtfs, isruntime);
694  clientP.second->close();
695  } else {
696  LOG(FATAL) << "Not routing to Calcite, server is not up";
697  }
698 }
std::vector< std::string > get_db_objects(const std::string ra)
Definition: Calcite.cpp:521
int wrapped_execlp(char const *path, XDEBUG_OPTION &&x_debug, REMOTE_DEBUG_OPTION &&remote_debug, REMAINING_ARGS &&...standard_args)
Definition: Calcite.cpp:58
std::string cat(Ts &&...args)
std::once_flag shutdown_once_flag_
Definition: Calcite.h:137
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:111
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
void init(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
Definition: Calcite.cpp:366
#define LOG(tag)
Definition: Logger.h:203
static const AccessPrivileges INSERT_INTO_TABLE
Definition: DBObject.h:163
std::string join(T const &container, std::string const &delim)
TPlanResult processImpl(query_state::QueryStateProxy, std::string sql_string, const std::vector< TFilterPushDownInfo > &filter_push_down_info, const bool legacy_syntax, const bool is_explain, const bool is_view_optimize, const std::string &calcite_session_id)
Definition: Calcite.cpp:540
#define UNREACHABLE()
Definition: Logger.h:253
bool hasAny() const
Definition: DBObject.h:142
void setRuntimeExtensionFunctions(const std::vector< TUserDefinedFunction > &udfs, const std::vector< TUserDefinedTableFunction > &udtfs, bool isruntime=true)
Definition: Calcite.cpp:687
int32_t objectId
Definition: DBObject.h:57
std::shared_ptr< ThriftClientConnection > connMgr_
Definition: Calcite.h:125
Timer createTimer(char const *event_name)
Definition: QueryState.cpp:131
static const AccessPrivileges SELECT_FROM_TABLE
Definition: DBObject.h:162
std::string to_string(char const *&&v)
#define LOG_IF(severity, condition)
Definition: Logger.h:299
void updateMetadata(std::string catalog, std::string table)
Definition: Calcite.cpp:407
Calcite(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const size_t service_timeout, const bool service_keepalive, const std::string &udf_filename="")
Definition: Calcite.cpp:353
This file contains the class specification and related data structures for Catalog.
void runServer(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &udf_filename)
Definition: Calcite.cpp:274
void checkPermissionForTables(const Catalog_Namespace::SessionInfo &session_info, std::vector< std::vector< std::string >> tableOrViewNames, AccessPrivileges tablePrivs, AccessPrivileges viewPrivs)
Definition: Calcite.cpp:420
static SysCatalog & instance()
Definition: SysCatalog.h:325
static void start_calcite_server_as_daemon(const int db_port, const int port, const std::string &data_dir, const size_t calcite_max_mem, const std::string &ssl_trust_store, const std::string &ssl_trust_password_X, const std::string &ssl_keystore, const std::string &ssl_keystore_password_X, const std::string &ssl_key_file, const std::string &db_config_file, const std::string &udf_filename)
Definition: Calcite.cpp:71
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:221
std::vector< TCompletionHint > getCompletionHints(const Catalog_Namespace::SessionInfo &session_info, const std::vector< std::string > &visible_tables, const std::string sql_string, const int cursor)
Definition: Calcite.cpp:505
QueryState & getQueryState()
Definition: QueryState.h:181
static const AccessPrivileges DELETE_FROM_TABLE
Definition: DBObject.h:165
int ping(int retry_num=0, int max_retry=50)
Definition: Calcite.cpp:336
std::string getUserDefinedFunctionWhitelist()
Definition: Calcite.cpp:624
std::string get_session_id() const
Definition: SessionInfo.h:78
static const AccessPrivileges SELECT_FROM_VIEW
Definition: DBObject.h:182
Catalog & getCatalog() const
Definition: SessionInfo.h:67
TPlanResult process(query_state::QueryStateProxy, std::string sql_string, const std::vector< TFilterPushDownInfo > &filter_push_down_info, const bool legacy_syntax, const bool is_explain, const bool is_view_optimize, const bool check_privileges, const std::string &calcite_session_id="")
Definition: Calcite.cpp:459
int remote_calcite_port_
Definition: Calcite.h:129
int32_t dbId
Definition: DBObject.h:56
std::string const getInternalSessionProxyUserName()
Definition: Calcite.h:97
void inner_close_calcite_server(bool log)
Definition: Calcite.cpp:647
bool server_available_
Definition: Calcite.h:126
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
char * t
std::string get_root_abs_path()
std::pair< std::shared_ptr< CalciteServerClient >, std::shared_ptr< TTransport > > getClient(int port)
Definition: Calcite.cpp:250
int32_t permissionType
Definition: DBObject.h:55
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
static const AccessPrivileges UPDATE_IN_TABLE
Definition: DBObject.h:164
void checkAccessedObjectsPrivileges(query_state::QueryStateProxy query_state_prox, TPlanResult plan) const
Definition: Calcite.cpp:481
std::string userLoggable() const
Definition: SysCatalog.cpp:127
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:81
~Calcite()
Definition: Calcite.cpp:666
std::string getExtensionFunctionWhitelist()
Definition: Calcite.cpp:606
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:73
std::string getRuntimeExtensionFunctionWhitelist()
Definition: Calcite.cpp:670
#define VLOG(n)
Definition: Logger.h:303
void close_calcite_server(bool log=true)
Definition: Calcite.cpp:642