OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DBHandler.h"
24 #include "DistributedLoader.h"
25 #include "TokenCompletionHints.h"
26 
27 #ifdef HAVE_PROFILER
28 #include <gperftools/heap-profiler.h>
29 #endif // HAVE_PROFILER
30 
31 #include "MapDRelease.h"
32 
33 #include "Calcite/Calcite.h"
34 #include "gen-cpp/CalciteServer.h"
35 
38 
39 #include "Catalog/Catalog.h"
44 #include "DistributedHandler.h"
46 #include "Geospatial/ColumnNames.h"
47 #include "Geospatial/Compression.h"
48 #include "Geospatial/GDAL.h"
49 #include "Geospatial/Types.h"
50 #include "ImportExport/Importer.h"
51 #include "LockMgr/LockMgr.h"
53 #include "Parser/ParserWrapper.h"
57 #include "QueryEngine/Execute.h"
67 #include "Shared/ArrowUtil.h"
68 #include "Shared/DateTimeParser.h"
69 #include "Shared/StringTransform.h"
70 #include "Shared/SysDefinitions.h"
71 #include "Shared/file_path_util.h"
73 #include "Shared/import_helpers.h"
74 #include "Shared/measure.h"
75 #include "Shared/misc.h"
76 #include "Shared/scope.h"
78 
79 #ifdef HAVE_AWS_S3
80 #include <aws/core/auth/AWSCredentialsProviderChain.h>
81 #endif
82 #include <fcntl.h>
83 #include <picosha2.h>
84 #include <sys/types.h>
85 #include <algorithm>
86 #include <boost/algorithm/string.hpp>
87 #include <boost/filesystem.hpp>
88 #include <boost/make_shared.hpp>
89 #include <boost/process/search_path.hpp>
90 #include <boost/program_options.hpp>
91 #include <boost/tokenizer.hpp>
92 #include <chrono>
93 #include <cmath>
94 #include <csignal>
95 #include <fstream>
96 #include <future>
97 #include <map>
98 #include <memory>
99 #include <random>
100 #include <string>
101 #include <thread>
102 #include <typeinfo>
103 
104 #include <arrow/api.h>
105 #include <arrow/io/api.h>
106 #include <arrow/ipc/api.h>
107 
108 #include "Shared/ArrowUtil.h"
109 #include "Shared/distributed.h"
111 
112 #ifdef ENABLE_IMPORT_PARQUET
113 extern bool g_enable_parquet_import_fsi;
114 #endif
115 
116 #ifdef HAVE_AWS_S3
117 extern bool g_allow_s3_server_privileges;
118 #endif
119 
120 extern bool g_enable_system_tables;
122 
125 
126 #define INVALID_SESSION_ID ""
127 
128 #define THROW_DB_EXCEPTION(errstr) \
129  { \
130  TDBException ex; \
131  ex.error_msg = errstr; \
132  LOG(ERROR) << ex.error_msg; \
133  throw ex; \
134  }
135 
136 thread_local std::string TrackingProcessor::client_address;
138 
139 namespace {
140 
142  const int32_t user_id,
143  const std::string& dashboard_name) {
144  return (cat.getMetadataForDashboard(std::to_string(user_id), dashboard_name));
145 }
146 
147 struct ForceDisconnect : public std::runtime_error {
148  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
149 };
150 
151 } // namespace
152 
153 #ifdef ENABLE_GEOS
154 extern std::unique_ptr<std::string> g_libgeos_so_filename;
155 #endif
156 
157 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
158  const std::vector<LeafHostInfo>& string_leaves,
159  const std::string& base_data_path,
160  const bool allow_multifrag,
161  const bool jit_debug,
162  const bool intel_jit_profile,
163  const bool read_only,
164  const bool allow_loop_joins,
165  const bool enable_rendering,
166  const bool renderer_use_ppll_polys,
167  const bool renderer_prefer_igpu,
168  const unsigned renderer_vulkan_timeout_ms,
169  const bool enable_auto_clear_render_mem,
170  const int render_oom_retry_threshold,
171  const size_t render_mem_bytes,
172  const size_t max_concurrent_render_sessions,
173  const size_t reserved_gpu_mem,
174  const bool render_compositor_use_last_gpu,
175  const size_t num_reader_threads,
176  const AuthMetadata& authMetadata,
177  SystemParameters& system_parameters,
178  const bool legacy_syntax,
179  const int idle_session_duration,
180  const int max_session_duration,
181  const std::string& udf_filename,
182  const std::string& clang_path,
183  const std::vector<std::string>& clang_options,
184 #ifdef ENABLE_GEOS
185  const std::string& libgeos_so_filename,
186 #endif
187  const File_Namespace::DiskCacheConfig& disk_cache_config,
188  const bool is_new_db)
189  : leaf_aggregator_(db_leaves)
190  , db_leaves_(db_leaves)
191  , string_leaves_(string_leaves)
192  , base_data_path_(base_data_path)
193  , random_gen_(std::random_device{}())
194  , session_id_dist_(0, INT32_MAX)
195  , jit_debug_(jit_debug)
196  , intel_jit_profile_(intel_jit_profile)
197  , allow_multifrag_(allow_multifrag)
198  , read_only_(read_only)
199  , allow_loop_joins_(allow_loop_joins)
200  , authMetadata_(authMetadata)
201  , system_parameters_(system_parameters)
202  , legacy_syntax_(legacy_syntax)
203  , dispatch_queue_(
204  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
206  base_data_path,
207  1,
208  idle_session_duration * 60,
209  max_session_duration * 60,
210  system_parameters.num_sessions,
211  [this](auto& session_ptr) { disconnect_impl(session_ptr); }))
212  , super_user_rights_(false)
213  , idle_session_duration_(idle_session_duration * 60)
214  , max_session_duration_(max_session_duration * 60)
215  , enable_rendering_(enable_rendering)
216  , renderer_use_ppll_polys_(renderer_use_ppll_polys)
217  , renderer_prefer_igpu_(renderer_prefer_igpu)
218  , renderer_vulkan_timeout_(renderer_vulkan_timeout_ms)
219  , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
220  , render_oom_retry_threshold_(render_oom_retry_threshold)
221  , max_concurrent_render_sessions_(max_concurrent_render_sessions)
222  , reserved_gpu_mem_(reserved_gpu_mem)
223  , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
224  , render_mem_bytes_(render_mem_bytes)
225  , num_reader_threads_(num_reader_threads)
226 #ifdef ENABLE_GEOS
227  , libgeos_so_filename_(libgeos_so_filename)
228 #endif
229  , disk_cache_config_(disk_cache_config)
230  , udf_filename_(udf_filename)
231  , clang_path_(clang_path)
232  , clang_options_(clang_options)
233 
234 {
235  LOG(INFO) << "HeavyDB Server " << MAPD_RELEASE;
236  initialize(is_new_db);
237 }
238 
239 void DBHandler::initialize(const bool is_new_db) {
240  if (!initialized_) {
241  initialized_ = true;
242  } else {
244  "Server already initialized; service restart required to activate any new "
245  "entitlements.");
246  return;
247  }
248 
251  cpu_mode_only_ = true;
252  } else {
253 #ifdef HAVE_CUDA
255  cpu_mode_only_ = false;
256 #else
258  LOG(WARNING) << "This build isn't CUDA enabled, will run on CPU";
259  cpu_mode_only_ = true;
260 #endif
261  }
262 
263  bool is_rendering_enabled = enable_rendering_;
264  if (system_parameters_.num_gpus == 0) {
265  is_rendering_enabled = false;
266  }
267 
268  const auto data_path =
269  boost::filesystem::path(base_data_path_) / shared::kDataDirectoryName;
270  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
271  // manage cannot ask for
272  size_t total_reserved = reserved_gpu_mem_;
273  if (is_rendering_enabled) {
274  total_reserved += render_mem_bytes_;
275  }
276 
277  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
278 #ifdef HAVE_CUDA
279  if (!cpu_mode_only_ || is_rendering_enabled) {
280  try {
281  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
283  } catch (const std::exception& e) {
284  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
285  << e.what();
287  cpu_mode_only_ = true;
288  is_rendering_enabled = false;
289  }
290  }
291 #endif // HAVE_CUDA
292 
293  try {
294  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
296  std::move(cuda_mgr),
298  total_reserved,
301  } catch (const std::exception& e) {
302  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
303  }
304 
305  std::string udf_ast_filename("");
306 
307  try {
308  if (!udf_filename_.empty()) {
309  const auto cuda_mgr = data_mgr_->getCudaMgr();
310  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
311  cuda_mgr ? cuda_mgr->getDeviceArch()
313  UdfCompiler compiler(device_arch, clang_path_, clang_options_);
314 
315  const auto [cpu_udf_ir_file, cuda_udf_ir_file] = compiler.compileUdf(udf_filename_);
316  Executor::addUdfIrToModule(cpu_udf_ir_file, /*is_cuda_ir=*/false);
317  if (!cuda_udf_ir_file.empty()) {
318  Executor::addUdfIrToModule(cuda_udf_ir_file, /*is_cuda_ir=*/true);
319  }
320  udf_ast_filename = compiler.getAstFileName(udf_filename_);
321  }
322  } catch (const std::exception& e) {
323  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
324  }
325 
326  try {
327  calcite_ =
328  std::make_shared<Calcite>(system_parameters_, base_data_path_, udf_ast_filename);
329  } catch (const std::exception& e) {
330  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
331  }
332 
333  try {
334  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
335  if (!udf_filename_.empty()) {
336  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
337  }
338  } catch (const std::exception& e) {
339  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
340  }
341 
342  try {
344  } catch (const std::exception& e) {
345  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
346  }
347 
348  try {
349  auto udtfs = ThriftSerializers::to_thrift(
351  std::vector<TUserDefinedFunction> udfs = {};
352  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
353  } catch (const std::exception& e) {
354  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
355  }
356 
357  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
359  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
360  cpu_mode_only_ = true;
361  }
362 
363  LOG(INFO) << "Started in " << executor_device_type_ << " mode.";
364 
365  try {
367  SysCatalog::instance().init(base_data_path_,
368  data_mgr_,
370  calcite_,
371  is_new_db,
372  !db_leaves_.empty(),
374  } catch (const std::exception& e) {
375  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
376  }
377 
378  import_path_ = boost::filesystem::path(base_data_path_) / shared::kDefaultImportDirName;
379  start_time_ = std::time(nullptr);
380 
381  if (is_rendering_enabled) {
382  try {
383  render_handler_.reset(new RenderHandler(this,
387  false,
388  0,
389  false,
393  } catch (const std::exception& e) {
394  LOG(ERROR) << "Backend rendering disabled: " << e.what();
395  }
396  }
397 
399 
400 #ifdef ENABLE_GEOS
401  if (!libgeos_so_filename_.empty()) {
402  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename_));
403  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
404  }
405 #endif
406 }
407 
409  shutdown();
410 }
411 
412 void DBHandler::check_read_only(const std::string& str) {
413  if (DBHandler::read_only_) {
414  THROW_DB_EXCEPTION(str + " disabled: server running in read-only mode.");
415  }
416 }
417 
419  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
420  // We would create an in memory session for calcite with super user privileges which
421  // would be used for getting all tables metadata when a user runs the query. The
422  // session would be under the name of a proxy user/password which would only persist
423  // till server's lifetime or execution of calcite query(in memory) whichever is the
424  // earliest.
426  std::string session_id;
427  do {
429  } while (calcite_sessions_.find(session_id) != calcite_sessions_.end());
430  Catalog_Namespace::UserMetadata user_meta(-1,
431  calcite_->getInternalSessionProxyUserName(),
432  calcite_->getInternalSessionProxyPassword(),
433  true,
434  -1,
435  true,
436  false);
437  const auto emplace_ret = calcite_sessions_.emplace(
438  session_id,
439  std::make_shared<Catalog_Namespace::SessionInfo>(
440  catalog_ptr, user_meta, executor_device_type_, session_id));
441  CHECK(emplace_ret.second);
442  return session_id;
443 }
444 
445 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
446  // Remove InMemory calcite Session.
448  const auto it = calcite_sessions_.find(session_id);
449  CHECK(it != calcite_sessions_.end());
450  calcite_sessions_.erase(it);
451 }
452 
453 // internal connection for connections with no password
454 void DBHandler::internal_connect(TSessionId& session,
455  const std::string& username,
456  const std::string& dbname) {
457  auto stdlog = STDLOG(); // session_info set by connect_impl()
458  std::string username2 = username; // login() may reset username given as argument
459  std::string dbname2 = dbname; // login() may reset dbname given as argument
461  std::shared_ptr<Catalog> cat = nullptr;
462  try {
463  cat =
464  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
465  } catch (std::exception& e) {
466  THROW_DB_EXCEPTION(e.what());
467  }
468 
469  DBObject dbObject(dbname2, DatabaseDBObjectType);
470  dbObject.loadKey(*cat);
472  std::vector<DBObject> dbObjects;
473  dbObjects.push_back(dbObject);
474  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
475  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
476  " is not allowed to access database " + dbname2 + ".");
477  }
478  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
479 }
480 
482  return leaf_aggregator_.leafCount() > 0;
483 }
484 
485 void DBHandler::krb5_connect(TKrb5Session& session,
486  const std::string& inputToken,
487  const std::string& dbname) {
488  THROW_DB_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
489 }
490 
491 void DBHandler::connect(TSessionId& session,
492  const std::string& username,
493  const std::string& passwd,
494  const std::string& dbname) {
495  auto stdlog = STDLOG(); // session_info set by connect_impl()
496  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
497  std::string username2 = username; // login() may reset username given as argument
498  std::string dbname2 = dbname; // login() may reset dbname given as argument
500  std::shared_ptr<Catalog> cat = nullptr;
501  try {
502  cat = SysCatalog::instance().login(
503  dbname2, username2, passwd, user_meta, !super_user_rights_);
504  } catch (std::exception& e) {
505  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
506  THROW_DB_EXCEPTION(e.what());
507  }
508 
509  DBObject dbObject(dbname2, DatabaseDBObjectType);
510  dbObject.loadKey(*cat);
512  std::vector<DBObject> dbObjects;
513  dbObjects.push_back(dbObject);
514  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
515  stdlog.appendNameValuePairs(
516  "user", username, "db", dbname, "exception", "Missing Privileges");
517  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
518  " is not allowed to access database " + dbname2 + ".");
519  }
520  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
521 
522  // if pki auth session will come back encrypted with user pubkey
523  SysCatalog::instance().check_for_session_encryption(passwd, session);
524 }
525 
526 void DBHandler::connect_impl(TSessionId& session,
527  const std::string& passwd,
528  const std::string& dbname,
529  const Catalog_Namespace::UserMetadata& user_meta,
530  std::shared_ptr<Catalog> cat,
531  query_state::StdLog& stdlog) {
532  // TODO(sy): Is there any reason to have dbname as a parameter
533  // here when the cat parameter already provides cat->name()?
534  // Should dbname and cat->name() ever differ?
535  auto session_ptr = sessions_store_->add(user_meta, cat, executor_device_type_);
536  session = session_ptr->get_session_id();
537  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
538  stdlog.setSessionInfo(session_ptr);
539  session_ptr->set_connection_info(getConnectionInfo().toString());
540  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
541  // while doing warmup
542  }
543  auto const roles =
544  stdlog.getConstSessionInfo()->get_currentUser().isSuper
545  ? std::vector<std::string>{{"super"}}
546  : SysCatalog::instance().getRoles(
547  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
548  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
549 }
550 
551 void DBHandler::disconnect(const TSessionId& session) {
552  auto stdlog = STDLOG();
553  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
554  auto session_ptr = get_session_ptr(session);
555  stdlog.setSessionInfo(session_ptr);
556  sessions_store_->disconnect(session);
557 }
558 
560  const auto session_id = session_ptr->get_session_id();
561  std::exception_ptr leaf_exception = nullptr;
562  try {
563  if (leaf_aggregator_.leafCount() > 0) {
564  leaf_aggregator_.disconnect(session_id);
565  }
566  } catch (...) {
567  leaf_exception = std::current_exception();
568  }
569 
570  {
571  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
572  render_group_assignment_map_.erase(session_id);
573  }
574 
575  if (render_handler_) {
576  render_handler_->disconnect(session_id);
577  }
578 
579  if (leaf_exception) {
580  std::rethrow_exception(leaf_exception);
581  }
582 }
583 
584 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
585  auto stdlog = STDLOG(get_session_ptr(session));
586  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
587  auto session_ptr = get_session_ptr(session);
588  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
589  try {
590  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
591  dbname2, session_ptr->get_currentUser().userName);
592  session_ptr->set_catalog_ptr(cat);
593  if (leaf_aggregator_.leafCount() > 0) {
594  leaf_aggregator_.switch_database(session, dbname);
595  return;
596  }
597  } catch (std::exception& e) {
598  THROW_DB_EXCEPTION(e.what());
599  }
600 }
601 
602 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
603  auto session1_ptr = get_session_ptr(session1);
604  auto stdlog = STDLOG(session1_ptr);
605  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
606 
607  try {
608  const Catalog_Namespace::UserMetadata& user_meta = session1_ptr->get_currentUser();
609  std::shared_ptr<Catalog> cat = session1_ptr->get_catalog_ptr();
610  auto session2_ptr = sessions_store_->add(user_meta, cat, executor_device_type_);
611  session2 = session2_ptr->get_session_id();
612  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database "
613  << cat->name();
614  if (leaf_aggregator_.leafCount() > 0) {
615  leaf_aggregator_.clone_session(session1, session2);
616  return;
617  }
618  } catch (std::exception& e) {
619  THROW_DB_EXCEPTION(e.what());
620  }
621 }
622 
623 void DBHandler::interrupt(const TSessionId& query_session,
624  const TSessionId& interrupt_session) {
625  // if this is for distributed setting, query_session becomes a parent session (agg)
626  // and the interrupt session is one of existing session in the leaf node (leaf)
627  // so we can think there exists a logical mapping
628  // between query_session (agg) and interrupt_session (leaf)
629  auto session_ptr = get_session_ptr(interrupt_session);
630  auto& cat = session_ptr->getCatalog();
631  auto stdlog = STDLOG(session_ptr);
632  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
633  const auto allow_query_interrupt =
635  if (g_enable_dynamic_watchdog || allow_query_interrupt) {
636  const auto dbname = cat.getCurrentDB().dbName;
638  jit_debug_ ? "/tmp" : "",
639  jit_debug_ ? "mapdquery" : "",
641  CHECK(executor);
642 
643  if (leaf_aggregator_.leafCount() > 0) {
644  leaf_aggregator_.interrupt(query_session, interrupt_session);
645  }
646  auto target_executor_ids = executor->getExecutorIdsRunningQuery(query_session);
647  if (target_executor_ids.empty()) {
649  executor->getSessionLock());
650  if (executor->checkIsQuerySessionEnrolled(query_session, session_read_lock)) {
651  session_read_lock.unlock();
652  VLOG(1) << "Received interrupt: "
653  << "User " << session_ptr->get_currentUser().userLoggable()
654  << ", Database " << dbname << std::endl;
655  executor->interrupt(query_session, interrupt_session);
656  }
657  } else {
658  for (auto& executor_id : target_executor_ids) {
659  VLOG(1) << "Received interrupt: "
660  << "Executor " << executor_id << ", User "
661  << session_ptr->get_currentUser().userLoggable() << ", Database "
662  << dbname << std::endl;
663  auto target_executor = Executor::getExecutor(executor_id);
664  target_executor->interrupt(query_session, interrupt_session);
665  }
666  }
667 
668  LOG(INFO) << "User " << session_ptr->get_currentUser().userName
669  << " interrupted session with database " << dbname << std::endl;
670  }
671 }
672 
674  if (g_cluster) {
675  if (leaf_aggregator_.leafCount() > 0) {
676  return TRole::type::AGGREGATOR;
677  }
678  return TRole::type::LEAF;
679  }
680  return TRole::type::SERVER;
681 }
682 
683 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
684  auto stdlog = STDLOG(get_session_ptr(session));
685  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
686  const auto rendering_enabled = bool(render_handler_);
687  _return.read_only = read_only_;
688  _return.version = MAPD_RELEASE;
689  _return.rendering_enabled = rendering_enabled;
690  _return.start_time = start_time_;
691  _return.edition = MAPD_EDITION;
692  _return.host_name = heavyai::get_hostname();
693  _return.poly_rendering_enabled = rendering_enabled;
694  _return.role = getServerRole();
695  _return.renderer_status_json =
696  render_handler_ ? render_handler_->get_renderer_status_json() : "";
697 }
698 
699 void DBHandler::get_status(std::vector<TServerStatus>& _return,
700  const TSessionId& session) {
701  //
702  // get_status() is now called locally at startup on the aggregator
703  // in order to validate that all nodes of a cluster are running the
704  // same software version and the same renderer status
705  //
706  // In that context, it is called with the InvalidSessionID, and
707  // with the local super-user flag set.
708  //
709  // Hence, we allow this session-less mode only in distributed mode, and
710  // then on a leaf (always), or on the aggregator (only in super-user mode)
711  //
712  auto const allow_invalid_session = g_cluster && (!isAggregator() || super_user_rights_);
713  if (!allow_invalid_session || session != getInvalidSessionId()) {
714  auto stdlog = STDLOG(get_session_ptr(session));
715  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
716  } else {
717  LOG(INFO) << "get_status() called in session-less mode";
718  }
719  const auto rendering_enabled = bool(render_handler_);
720  TServerStatus ret;
721  ret.read_only = read_only_;
722  ret.version = MAPD_RELEASE;
723  ret.rendering_enabled = rendering_enabled;
724  ret.start_time = start_time_;
725  ret.edition = MAPD_EDITION;
726  ret.host_name = heavyai::get_hostname();
727  ret.poly_rendering_enabled = rendering_enabled;
728  ret.role = getServerRole();
729  ret.renderer_status_json =
730  render_handler_ ? render_handler_->get_renderer_status_json() : "";
731 
732  _return.push_back(ret);
733  if (leaf_aggregator_.leafCount() > 0) {
734  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
735  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
736  }
737 }
738 
739 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
740  const TSessionId& session) {
741  auto stdlog = STDLOG(get_session_ptr(session));
742  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
743  THardwareInfo ret;
744  const auto cuda_mgr = data_mgr_->getCudaMgr();
745  if (cuda_mgr) {
746  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
747  ret.start_gpu = cuda_mgr->getStartGpu();
748  if (ret.start_gpu >= 0) {
749  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
750  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
751  }
752  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
753  TGpuSpecification gpu_spec;
754  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
755  gpu_spec.num_sm = deviceProperties->numMPs;
756  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
757  gpu_spec.memory = deviceProperties->globalMem;
758  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
759  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
760  ret.gpu_info.push_back(gpu_spec);
761  }
762  }
763 
764  // start hardware/OS dependent code
765  ret.num_cpu_hw = std::thread::hardware_concurrency();
766  // ^ This might return diffrent results in case of hyper threading
767  // end hardware/OS dependent code
768 
769  _return.hardware_info.push_back(ret);
770 }
771 
772 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
773  auto session_ptr = get_session_ptr(session);
774  CHECK(session_ptr);
775  auto stdlog = STDLOG(session_ptr);
776  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
777  auto user_metadata = session_ptr->get_currentUser();
778  _return.user = user_metadata.userName;
779  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
780  _return.start_time = session_ptr->get_start_time();
781  _return.is_super = user_metadata.isSuper;
782 }
783 
784 void DBHandler::set_leaf_info(const TSessionId& session, const TLeafInfo& info) {
785  g_distributed_leaf_idx = info.leaf_id;
786  g_distributed_num_leaves = info.num_leaves;
787 }
788 
790  const SQLTypeInfo& ti,
791  TColumn& column) {
792  if (ti.is_array()) {
793  TColumn tColumn;
794  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
795  CHECK(array_tv);
796  bool is_null = !array_tv->is_initialized();
797  if (!is_null) {
798  const auto& vec = array_tv->get();
799  for (const auto& elem_tv : vec) {
800  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
801  }
802  }
803  column.data.arr_col.push_back(tColumn);
804  column.nulls.push_back(is_null && !ti.get_notnull());
805  } else if (ti.is_geometry()) {
806  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
807  if (scalar_tv) {
808  auto s_n = boost::get<NullableString>(scalar_tv);
809  auto s = boost::get<std::string>(s_n);
810  if (s) {
811  column.data.str_col.push_back(*s);
812  } else {
813  column.data.str_col.emplace_back(""); // null string
814  auto null_p = boost::get<void*>(s_n);
815  CHECK(null_p && !*null_p);
816  }
817  column.nulls.push_back(!s && !ti.get_notnull());
818  } else {
819  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
820  CHECK(array_tv);
821  bool is_null = !array_tv->is_initialized();
822  if (!is_null) {
823  auto elem_type = SQLTypeInfo(kDOUBLE, false);
824  TColumn tColumn;
825  const auto& vec = array_tv->get();
826  for (const auto& elem_tv : vec) {
827  value_to_thrift_column(elem_tv, elem_type, tColumn);
828  }
829  column.data.arr_col.push_back(tColumn);
830  column.nulls.push_back(false);
831  } else {
832  TColumn tColumn;
833  column.data.arr_col.push_back(tColumn);
834  column.nulls.push_back(is_null && !ti.get_notnull());
835  }
836  }
837  } else {
838  CHECK(!ti.is_column());
839  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
840  CHECK(scalar_tv);
841  if (boost::get<int64_t>(scalar_tv)) {
842  int64_t data = *(boost::get<int64_t>(scalar_tv));
843 
844  if (ti.is_decimal()) {
845  double val = static_cast<double>(data);
846  if (ti.get_scale() > 0) {
847  val /= pow(10.0, std::abs(ti.get_scale()));
848  }
849  column.data.real_col.push_back(val);
850  } else {
851  column.data.int_col.push_back(data);
852  }
853 
854  switch (ti.get_type()) {
855  case kBOOLEAN:
856  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
857  break;
858  case kTINYINT:
859  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
860  break;
861  case kSMALLINT:
862  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
863  break;
864  case kINT:
865  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
866  break;
867  case kNUMERIC:
868  case kDECIMAL:
869  case kBIGINT:
870  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
871  break;
872  case kTIME:
873  case kTIMESTAMP:
874  case kDATE:
875  case kINTERVAL_DAY_TIME:
877  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
878  break;
879  default:
880  column.nulls.push_back(false);
881  }
882  } else if (boost::get<double>(scalar_tv)) {
883  double data = *(boost::get<double>(scalar_tv));
884  column.data.real_col.push_back(data);
885  if (ti.get_type() == kFLOAT) {
886  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
887  } else {
888  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
889  }
890  } else if (boost::get<float>(scalar_tv)) {
891  CHECK_EQ(kFLOAT, ti.get_type());
892  float data = *(boost::get<float>(scalar_tv));
893  column.data.real_col.push_back(data);
894  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
895  } else if (boost::get<NullableString>(scalar_tv)) {
896  auto s_n = boost::get<NullableString>(scalar_tv);
897  auto s = boost::get<std::string>(s_n);
898  if (s) {
899  column.data.str_col.push_back(*s);
900  } else {
901  column.data.str_col.emplace_back(""); // null string
902  auto null_p = boost::get<void*>(s_n);
903  CHECK(null_p && !*null_p);
904  }
905  column.nulls.push_back(!s && !ti.get_notnull());
906  } else {
907  CHECK(false);
908  }
909  }
910 }
911 
912 TDatum DBHandler::value_to_thrift(const TargetValue& tv, const SQLTypeInfo& ti) {
913  TDatum datum;
914  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
915  if (!scalar_tv) {
916  CHECK(ti.is_array());
917  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
918  CHECK(array_tv);
919  if (array_tv->is_initialized()) {
920  const auto& vec = array_tv->get();
921  for (const auto& elem_tv : vec) {
922  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
923  datum.val.arr_val.push_back(scalar_col_val);
924  }
925  // Datum is not null, at worst it's an empty array Datum
926  datum.is_null = false;
927  } else {
928  datum.is_null = true;
929  }
930  return datum;
931  }
932  if (boost::get<int64_t>(scalar_tv)) {
933  int64_t data = *(boost::get<int64_t>(scalar_tv));
934 
935  if (ti.is_decimal()) {
936  double val = static_cast<double>(data);
937  if (ti.get_scale() > 0) {
938  val /= pow(10.0, std::abs(ti.get_scale()));
939  }
940  datum.val.real_val = val;
941  } else {
942  datum.val.int_val = data;
943  }
944 
945  switch (ti.get_type()) {
946  case kBOOLEAN:
947  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
948  break;
949  case kTINYINT:
950  datum.is_null = (datum.val.int_val == NULL_TINYINT);
951  break;
952  case kSMALLINT:
953  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
954  break;
955  case kINT:
956  datum.is_null = (datum.val.int_val == NULL_INT);
957  break;
958  case kDECIMAL:
959  case kNUMERIC:
960  case kBIGINT:
961  datum.is_null = (datum.val.int_val == NULL_BIGINT);
962  break;
963  case kTIME:
964  case kTIMESTAMP:
965  case kDATE:
966  case kINTERVAL_DAY_TIME:
968  datum.is_null = (datum.val.int_val == NULL_BIGINT);
969  break;
970  default:
971  datum.is_null = false;
972  }
973  } else if (boost::get<double>(scalar_tv)) {
974  datum.val.real_val = *(boost::get<double>(scalar_tv));
975  if (ti.get_type() == kFLOAT) {
976  datum.is_null = (datum.val.real_val == NULL_FLOAT);
977  } else {
978  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
979  }
980  } else if (boost::get<float>(scalar_tv)) {
981  CHECK_EQ(kFLOAT, ti.get_type());
982  datum.val.real_val = *(boost::get<float>(scalar_tv));
983  datum.is_null = (datum.val.real_val == NULL_FLOAT);
984  } else if (boost::get<NullableString>(scalar_tv)) {
985  auto s_n = boost::get<NullableString>(scalar_tv);
986  auto s = boost::get<std::string>(s_n);
987  if (s) {
988  datum.val.str_val = *s;
989  } else {
990  auto null_p = boost::get<void*>(s_n);
991  CHECK(null_p && !*null_p);
992  }
993  datum.is_null = !s;
994  } else {
995  CHECK(false);
996  }
997  return datum;
998 }
999 
1001  TQueryResult& _return,
1002  const QueryStateProxy& query_state_proxy,
1003  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1004  const std::string& query_str,
1005  const bool column_format,
1006  const std::string& nonce,
1007  const int32_t first_n,
1008  const int32_t at_most_n,
1009  const bool use_calcite) {
1010  _return.total_time_ms = 0;
1011  _return.nonce = nonce;
1012  ParserWrapper pw{query_str};
1013  switch (pw.getQueryType()) {
1015  _return.query_type = TQueryType::READ;
1016  VLOG(1) << "query type: READ";
1017  break;
1018  }
1020  _return.query_type = TQueryType::WRITE;
1021  VLOG(1) << "query type: WRITE";
1022  break;
1023  }
1025  _return.query_type = TQueryType::SCHEMA_READ;
1026  VLOG(1) << "query type: SCHEMA READ";
1027  break;
1028  }
1030  _return.query_type = TQueryType::SCHEMA_WRITE;
1031  VLOG(1) << "query type: SCHEMA WRITE";
1032  break;
1033  }
1034  default: {
1035  _return.query_type = TQueryType::UNKNOWN;
1036  LOG(WARNING) << "query type: UNKNOWN";
1037  break;
1038  }
1039  }
1040 
1043  _return.total_time_ms += measure<>::execution([&]() {
1045  query_state_proxy,
1046  column_format,
1047  session_ptr->get_executor_device_type(),
1048  first_n,
1049  at_most_n,
1050  use_calcite,
1051  locks);
1053  _return, result, query_state_proxy, column_format, first_n, at_most_n);
1054  });
1055 }
1056 
1057 void DBHandler::convertData(TQueryResult& _return,
1059  const QueryStateProxy& query_state_proxy,
1060  const bool column_format,
1061  const int32_t first_n,
1062  const int32_t at_most_n) {
1063  _return.execution_time_ms += result.getExecutionTime();
1064  if (result.empty()) {
1065  return;
1066  }
1067 
1068  switch (result.getResultType()) {
1070  convertRows(_return,
1071  query_state_proxy,
1072  result.getTargetsMeta(),
1073  *result.getRows(),
1074  column_format,
1075  first_n,
1076  at_most_n);
1077  break;
1079  convertResult(_return, *result.getRows(), true);
1080  break;
1082  convertExplain(_return, *result.getRows(), true);
1083  break;
1085  convertRows(_return,
1086  query_state_proxy,
1087  result.getTargetsMeta(),
1088  *result.getRows(),
1089  column_format,
1090  -1,
1091  -1);
1092  break;
1093  }
1094 }
1095 
1096 void DBHandler::sql_execute(TQueryResult& _return,
1097  const TSessionId& session,
1098  const std::string& query_str,
1099  const bool column_format,
1100  const std::string& nonce,
1101  const int32_t first_n,
1102  const int32_t at_most_n) {
1103  const std::string exec_ra_prefix = "execute relalg";
1104  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1105  auto actual_query =
1106  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1107  auto session_ptr = get_session_ptr(session);
1108  auto query_state = create_query_state(session_ptr, actual_query);
1109  auto stdlog = STDLOG(session_ptr, query_state);
1110  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1111  stdlog.appendNameValuePairs("nonce", nonce);
1112  auto timer = DEBUG_TIMER(__func__);
1113  try {
1114  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1115  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1116  };
1117 
1118  if (first_n >= 0 && at_most_n >= 0) {
1119  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1120  }
1121 
1122  if (leaf_aggregator_.leafCount() > 0) {
1123  if (!agg_handler_) {
1124  THROW_DB_EXCEPTION("Distributed support is disabled.");
1125  }
1126  _return.total_time_ms = measure<>::execution([&]() {
1127  agg_handler_->cluster_execute(_return,
1128  query_state->createQueryStateProxy(),
1129  query_state->getQueryStr(),
1130  column_format,
1131  nonce,
1132  first_n,
1133  at_most_n,
1135  });
1136  _return.nonce = nonce;
1137  } else {
1138  sql_execute_local(_return,
1139  query_state->createQueryStateProxy(),
1140  session_ptr,
1141  actual_query,
1142  column_format,
1143  nonce,
1144  first_n,
1145  at_most_n,
1146  use_calcite);
1147  }
1148  _return.total_time_ms += process_deferred_copy_from(session);
1149  std::string debug_json = timer.stopAndGetJson();
1150  if (!debug_json.empty()) {
1151  _return.__set_debug(std::move(debug_json));
1152  }
1153  stdlog.appendNameValuePairs(
1154  "execution_time_ms",
1155  _return.execution_time_ms,
1156  "total_time_ms", // BE-3420 - Redundant with duration field
1157  stdlog.duration<std::chrono::milliseconds>());
1158  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1159  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1160  } catch (const std::exception& e) {
1161  if (strstr(e.what(), "java.lang.NullPointerException")) {
1162  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1163  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1164  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1165  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1166  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1167  } else {
1168  THROW_DB_EXCEPTION(e.what());
1169  }
1170  }
1171 }
1172 
1174  const TSessionId& session,
1175  const std::string& query_str,
1176  const bool column_format,
1177  const int32_t first_n,
1178  const int32_t at_most_n,
1180  const std::string exec_ra_prefix = "execute relalg";
1181  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1182  auto actual_query =
1183  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1184 
1185  auto session_ptr = get_session_ptr(session);
1186  CHECK(session_ptr);
1187  auto query_state = create_query_state(session_ptr, actual_query);
1188  auto stdlog = STDLOG(session_ptr, query_state);
1189  auto timer = DEBUG_TIMER(__func__);
1190 
1191  try {
1192  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1193  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1194  };
1195 
1196  if (first_n >= 0 && at_most_n >= 0) {
1197  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1198  }
1199  auto total_time_ms = measure<>::execution([&]() {
1201  query_state->createQueryStateProxy(),
1202  column_format,
1203  session_ptr->get_executor_device_type(),
1204  first_n,
1205  at_most_n,
1206  use_calcite,
1207  locks);
1208  });
1209 
1210  _return.setExecutionTime(total_time_ms + process_deferred_copy_from(session));
1211 
1212  stdlog.appendNameValuePairs(
1213  "execution_time_ms",
1214  _return.getExecutionTime(),
1215  "total_time_ms", // BE-3420 - Redundant with duration field
1216  stdlog.duration<std::chrono::milliseconds>());
1217  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1218  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1219  } catch (const std::exception& e) {
1220  if (strstr(e.what(), "java.lang.NullPointerException")) {
1221  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1222  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1223  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1224  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1225  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1226  } else {
1227  THROW_DB_EXCEPTION(e.what());
1228  }
1229  }
1230 }
1231 
1232 int64_t DBHandler::process_deferred_copy_from(const TSessionId& session_id) {
1233  int64_t total_time_ms(0);
1234  // if the SQL statement we just executed was a geo COPY FROM, the import
1235  // parameters were captured, and this flag set, so we do the actual import here
1236  if (auto deferred_copy_from_state = deferred_copy_from_sessions(session_id)) {
1237  // import_geo_table() calls create_table() which calls this function to
1238  // do the work, so reset the flag now to avoid executing this part a
1239  // second time at the end of that, which would fail as the table was
1240  // already created! Also reset the flag with a ScopeGuard on exiting
1241  // this function any other way, such as an exception from the code above!
1242  deferred_copy_from_sessions.remove(session_id);
1243 
1244  // create table as replicated?
1245  TCreateParams create_params;
1246  if (deferred_copy_from_state->partitions == "REPLICATED") {
1247  create_params.is_replicated = true;
1248  }
1249 
1250  // now do (and time) the import
1251  total_time_ms = measure<>::execution([&]() {
1252  importGeoTableGlobFilterSort(session_id,
1253  deferred_copy_from_state->table,
1254  deferred_copy_from_state->file_name,
1255  deferred_copy_from_state->copy_params,
1256  TRowDescriptor(),
1257  create_params);
1258  });
1259  }
1260  return total_time_ms;
1261 }
1262 
1263 void DBHandler::sql_execute_df(TDataFrame& _return,
1264  const TSessionId& session,
1265  const std::string& query_str,
1266  const TDeviceType::type results_device_type,
1267  const int32_t device_id,
1268  const int32_t first_n,
1269  const TArrowTransport::type transport_method) {
1270  auto session_ptr = get_session_ptr(session);
1271  CHECK(session_ptr);
1272  auto query_state = create_query_state(session_ptr, query_str);
1273  auto stdlog = STDLOG(session_ptr, query_state);
1274 
1275  const auto executor_device_type = session_ptr->get_executor_device_type();
1276 
1277  if (results_device_type == TDeviceType::GPU) {
1278  if (executor_device_type != ExecutorDeviceType::GPU) {
1279  THROW_DB_EXCEPTION(std::string("GPU mode is not allowed in this session"));
1280  }
1281  if (!data_mgr_->gpusPresent()) {
1282  THROW_DB_EXCEPTION(std::string("No GPU is available in this server"));
1283  }
1284  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1286  std::string("Invalid device_id or unavailable GPU with this ID"));
1287  }
1288  }
1289  ParserWrapper pw{query_str};
1290  if (pw.getQueryType() != ParserWrapper::QueryType::Read) {
1291  THROW_DB_EXCEPTION(std::string(
1292  "Only read queries supported for the Arrow sql_execute_df endpoint."));
1293  }
1294  if (ExplainInfo(query_str).isCalciteExplain()) {
1295  THROW_DB_EXCEPTION(std::string(
1296  "Explain is currently unsupported by the Arrow sql_execute_df endpoint."));
1297  }
1298 
1299  ExecutionResult execution_result;
1301  sql_execute_impl(execution_result,
1302  query_state->createQueryStateProxy(),
1303  true, /* column_format - does this do anything? */
1304  executor_device_type,
1305  first_n,
1306  -1, /* at_most_n */
1307  true,
1308  locks);
1309 
1310  const auto result_set = execution_result.getRows();
1311  const auto executor_results_device_type = results_device_type == TDeviceType::CPU
1314  _return.execution_time_ms =
1315  execution_result.getExecutionTime() - result_set->getQueueTime();
1316  const auto converter = std::make_unique<ArrowResultSetConverter>(
1317  result_set,
1318  data_mgr_,
1319  executor_results_device_type,
1320  device_id,
1321  getTargetNames(execution_result.getTargetsMeta()),
1322  first_n,
1323  ArrowTransport(transport_method));
1324  ArrowResult arrow_result;
1325  _return.arrow_conversion_time_ms +=
1326  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
1327  _return.sm_handle =
1328  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
1329  _return.sm_size = arrow_result.sm_size;
1330  _return.df_handle =
1331  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
1332  _return.df_buffer =
1333  std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
1334  if (executor_results_device_type == ExecutorDeviceType::GPU) {
1335  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1336  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
1337  ipc_handle_to_dev_ptr_.insert(
1338  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
1339  }
1340  _return.df_size = arrow_result.df_size;
1341 }
1342 
1343 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1344  const TSessionId& session,
1345  const std::string& query_str,
1346  const int32_t device_id,
1347  const int32_t first_n) {
1348  auto stdlog = STDLOG(get_session_ptr(session));
1349  sql_execute_df(_return,
1350  session,
1351  query_str,
1352  TDeviceType::GPU,
1353  device_id,
1354  first_n,
1355  TArrowTransport::SHARED_MEMORY);
1356 }
1357 
1358 // For now we have only one user of a data frame in all cases.
1359 void DBHandler::deallocate_df(const TSessionId& session,
1360  const TDataFrame& df,
1361  const TDeviceType::type device_type,
1362  const int32_t device_id) {
1363  auto stdlog = STDLOG(get_session_ptr(session));
1364  std::string serialized_cuda_handle = "";
1365  if (device_type == TDeviceType::GPU) {
1366  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1367  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1368  TDBException ex;
1369  ex.error_msg = std::string(
1370  "Current data frame handle is not bookkept or been inserted "
1371  "twice");
1372  LOG(ERROR) << ex.error_msg;
1373  throw ex;
1374  }
1375  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1376  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1377  }
1378  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1379  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1381  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1383  result,
1384  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1385  device_id,
1386  data_mgr_);
1387 }
1388 
1389 void DBHandler::sql_validate(TRowDescriptor& _return,
1390  const TSessionId& session,
1391  const std::string& query_str) {
1392  try {
1393  auto stdlog = STDLOG(get_session_ptr(session));
1394  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1395  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1396  stdlog.setQueryState(query_state);
1397 
1398  ParserWrapper pw{query_str};
1399  if (ExplainInfo(query_str).isExplain() || pw.is_ddl || pw.is_update_dml) {
1400  throw std::runtime_error("Can only validate SELECT statements.");
1401  }
1402 
1403  const auto execute_read_lock =
1407 
1408  TPlanResult parse_result;
1410  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1411  query_state->getQueryStr(),
1412  {},
1413  true,
1415  /*check_privileges=*/true);
1416  const auto query_ra = parse_result.plan_result;
1417 
1418  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1419  _return = fixup_row_descriptor(result.row_set.row_desc,
1420  query_state->getConstSessionInfo()->getCatalog());
1421  } catch (const std::exception& e) {
1422  THROW_DB_EXCEPTION(std::string(e.what()));
1423  }
1424 }
1425 
1426 namespace {
1427 
1429  std::unordered_set<std::string> uc_column_names;
1430  std::unordered_set<std::string> uc_column_table_qualifiers;
1431 };
1432 
1433 // Extract what looks like a (qualified) identifier from the partial query.
1434 // The results will be used to rank the auto-completion results: tables which
1435 // contain at least one of the identifiers first.
1437  const std::string& sql) {
1438  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1439  boost::regex::extended | boost::regex::icase};
1440  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1441  boost::sregex_token_iterator end;
1442  std::unordered_set<std::string> uc_column_names;
1443  std::unordered_set<std::string> uc_column_table_qualifiers;
1444  for (; tok_it != end; ++tok_it) {
1445  std::string column_name = *tok_it;
1446  std::vector<std::string> column_tokens;
1447  boost::split(column_tokens, column_name, boost::is_any_of("."));
1448  if (column_tokens.size() == 2) {
1449  // If the column name is qualified, take user's word.
1450  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1451  } else {
1452  uc_column_names.insert(to_upper(column_name));
1453  }
1454  }
1455  return {uc_column_names, uc_column_table_qualifiers};
1456 }
1457 
1458 } // namespace
1459 
1460 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1461  const TSessionId& session,
1462  const std::string& sql,
1463  const int cursor) {
1464  auto stdlog = STDLOG(get_session_ptr(session));
1465  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1466  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1467  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1468  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1469  proj_tokens.uc_column_names, visible_tables, stdlog);
1470  // Add the table qualifiers explicitly specified by the user.
1471  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1472  proj_tokens.uc_column_table_qualifiers.end());
1473  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1474  std::sort(
1475  hints.begin(),
1476  hints.end(),
1477  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1478  if (lhs.type == TCompletionHintType::TABLE &&
1479  rhs.type == TCompletionHintType::TABLE) {
1480  // Between two tables, one which is compatible with the specified
1481  // projections and one which isn't, pick the one which is compatible.
1482  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1483  compatible_table_names.end() &&
1484  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1485  compatible_table_names.end()) {
1486  return true;
1487  }
1488  }
1489  return lhs.type < rhs.type;
1490  });
1491 }
1492 
1493 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1494  std::vector<std::string>& visible_tables,
1495  query_state::StdLog& stdlog,
1496  const std::string& sql,
1497  const int cursor) {
1498  const auto& session_info = *stdlog.getConstSessionInfo();
1499  try {
1500  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1501 
1502  // Filter out keywords suggested by Calcite which we don't support.
1504  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1505  } catch (const std::exception& e) {
1506  TDBException ex;
1507  ex.error_msg = std::string(e.what());
1508  LOG(ERROR) << ex.error_msg;
1509  throw ex;
1510  }
1511  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1512  const size_t length_to_cursor =
1513  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1514  // Trust hints from Calcite after the FROM keyword.
1515  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1516  return;
1517  }
1518  // Before FROM, the query is too incomplete for context-sensitive completions.
1519  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1520 }
1521 
1522 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1523  query_state::StdLog& stdlog,
1524  std::vector<std::string>& visible_tables,
1525  const std::string& sql,
1526  const int cursor) {
1527  const auto last_word =
1528  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1529  boost::regex select_expr{R"(\s*select\s+)",
1530  boost::regex::extended | boost::regex::icase};
1531  const size_t length_to_cursor =
1532  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1533  // After SELECT but before FROM, look for all columns in all tables which match the
1534  // prefix.
1535  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1536  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1537  // Trust the fully qualified columns the most.
1538  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1539  return;
1540  }
1541  // Not much information to use, just retrieve column names which match the prefix.
1542  if (should_suggest_column_hints(sql)) {
1543  get_column_hints(hints, last_word, column_names_by_table);
1544  return;
1545  }
1546  const std::string kFromKeyword{"FROM"};
1547  if (boost::istarts_with(kFromKeyword, last_word)) {
1548  TCompletionHint keyword_hint;
1549  keyword_hint.type = TCompletionHintType::KEYWORD;
1550  keyword_hint.replaced = last_word;
1551  keyword_hint.hints.emplace_back(kFromKeyword);
1552  hints.push_back(keyword_hint);
1553  }
1554  } else {
1555  const std::string kSelectKeyword{"SELECT"};
1556  if (boost::istarts_with(kSelectKeyword, last_word)) {
1557  TCompletionHint keyword_hint;
1558  keyword_hint.type = TCompletionHintType::KEYWORD;
1559  keyword_hint.replaced = last_word;
1560  keyword_hint.hints.emplace_back(kSelectKeyword);
1561  hints.push_back(keyword_hint);
1562  }
1563  }
1564 }
1565 
1566 std::unordered_map<std::string, std::unordered_set<std::string>>
1567 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1568  query_state::StdLog& stdlog) {
1569  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1570  for (auto it = table_names.begin(); it != table_names.end();) {
1571  TTableDetails table_details;
1572  try {
1573  get_table_details_impl(table_details, stdlog, *it, false, false);
1574  } catch (const TDBException& e) {
1575  // Remove the corrupted Table/View name from the list for further processing.
1576  it = table_names.erase(it);
1577  continue;
1578  }
1579  for (const auto& column_type : table_details.row_desc) {
1580  column_names_by_table[*it].emplace(column_type.col_name);
1581  }
1582  ++it;
1583  }
1584  return column_names_by_table;
1585 }
1586 
1590 }
1591 
1593  const std::unordered_set<std::string>& uc_column_names,
1594  std::vector<std::string>& table_names,
1595  query_state::StdLog& stdlog) {
1596  std::unordered_set<std::string> compatible_table_names_by_column;
1597  for (auto it = table_names.begin(); it != table_names.end();) {
1598  TTableDetails table_details;
1599  try {
1600  get_table_details_impl(table_details, stdlog, *it, false, false);
1601  } catch (const TDBException& e) {
1602  // Remove the corrupted Table/View name from the list for further processing.
1603  it = table_names.erase(it);
1604  continue;
1605  }
1606  for (const auto& column_type : table_details.row_desc) {
1607  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1608  compatible_table_names_by_column.emplace(to_upper(*it));
1609  break;
1610  }
1611  }
1612  ++it;
1613  }
1614  return compatible_table_names_by_column;
1615 }
1616 
1617 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1618  QueryStateProxy query_state_proxy) {
1619  TQueryResult _return;
1621  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1622  [this, &result, &query_state_proxy, &query_ra](const size_t executor_index) {
1623  auto qid_scope_guard = query_state_proxy.getQueryState().setThreadLocalQueryId();
1624  execute_rel_alg(result,
1625  query_state_proxy,
1626  query_ra,
1627  true,
1629  -1,
1630  -1,
1631  /*just_validate=*/true,
1632  /*find_filter_push_down_candidates=*/false,
1633  ExplainInfo(),
1634  executor_index);
1635  });
1637  dispatch_queue_->submit(execute_rel_alg_task, /*is_update_delete=*/false);
1638  auto result_future = execute_rel_alg_task->get_future();
1639  result_future.get();
1640  DBHandler::convertData(_return, result, query_state_proxy, true, -1, -1);
1641  return _return;
1642 }
1643 
1644 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1645  auto stdlog = STDLOG(get_session_ptr(session));
1646  auto session_ptr = stdlog.getConstSessionInfo();
1647  if (!session_ptr->get_currentUser().isSuper) {
1648  // WARNING: This appears to not include roles a user is a member of,
1649  // if the role has no permissions granted to it.
1650  roles =
1651  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1652  session_ptr->getCatalog().getCurrentDB().dbId);
1653  } else {
1654  roles = SysCatalog::instance().getRoles(
1655  false, true, session_ptr->get_currentUser().userName);
1656  }
1657 }
1658 
1659 bool DBHandler::has_role(const TSessionId& sessionId,
1660  const std::string& granteeName,
1661  const std::string& roleName) {
1662  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1663  const auto session_ptr = stdlog.getConstSessionInfo();
1664  const auto current_user = session_ptr->get_currentUser();
1665  if (!current_user.isSuper) {
1666  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1667  user && current_user.userName != granteeName) {
1668  THROW_DB_EXCEPTION("Only super users can check other user's roles.");
1669  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1670  current_user.userName, granteeName, true)) {
1672  "Only super users can check roles assignment that have not been directly "
1673  "granted to a user.");
1674  }
1675  }
1676  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1677 }
1678 
1679 static TDBObject serialize_db_object(const std::string& roleName,
1680  const DBObject& inObject) {
1681  TDBObject outObject;
1682  outObject.objectName = inObject.getName();
1683  outObject.grantee = roleName;
1684  outObject.objectId = inObject.getObjectKey().objectId;
1685  const auto ap = inObject.getPrivileges();
1686  switch (inObject.getObjectKey().permissionType) {
1687  case DatabaseDBObjectType:
1688  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1689  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1690  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1691  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1692  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1693 
1694  break;
1695  case TableDBObjectType:
1696  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1697  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1698  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1699  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1700  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1701  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1702  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1703  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1704  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1705 
1706  break;
1707  case DashboardDBObjectType:
1708  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1709  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1710  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1711  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1712  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1713 
1714  break;
1715  case ViewDBObjectType:
1716  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1717  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1718  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1719  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1720  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1721  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1722  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1723 
1724  break;
1725  case ServerDBObjectType:
1726  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1727  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1728  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1729  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1730  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::SERVER_USAGE));
1731 
1732  break;
1733  default:
1734  CHECK(false);
1735  }
1736  const int type_val = static_cast<int>(inObject.getType());
1737  CHECK(type_val >= 0 && type_val < 6);
1738  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1739  return outObject;
1740 }
1741 
1743  const TDBObjectPermissions& permissions) {
1744  if (!permissions.__isset.database_permissions_) {
1745  THROW_DB_EXCEPTION("Database permissions not set for check.")
1746  }
1747  auto perms = permissions.database_permissions_;
1748  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1749  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1750  (perms.view_sql_editor_ &&
1752  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1753  return false;
1754  } else {
1755  return true;
1756  }
1757 }
1758 
1760  const TDBObjectPermissions& permissions) {
1761  if (!permissions.__isset.table_permissions_) {
1762  THROW_DB_EXCEPTION("Table permissions not set for check.")
1763  }
1764  auto perms = permissions.table_permissions_;
1765  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1766  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1767  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1768  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1769  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1770  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1771  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1772  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1773  return false;
1774  } else {
1775  return true;
1776  }
1777 }
1778 
1780  const TDBObjectPermissions& permissions) {
1781  if (!permissions.__isset.dashboard_permissions_) {
1782  THROW_DB_EXCEPTION("Dashboard permissions not set for check.")
1783  }
1784  auto perms = permissions.dashboard_permissions_;
1785  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1786  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1787  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1788  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1789  return false;
1790  } else {
1791  return true;
1792  }
1793 }
1794 
1796  const TDBObjectPermissions& permissions) {
1797  if (!permissions.__isset.view_permissions_) {
1798  THROW_DB_EXCEPTION("View permissions not set for check.")
1799  }
1800  auto perms = permissions.view_permissions_;
1801  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1802  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1803  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1804  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1805  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1806  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1807  return false;
1808  } else {
1809  return true;
1810  }
1811 }
1812 
1814  const TDBObjectPermissions& permissions) {
1815  CHECK(permissions.__isset.server_permissions_);
1816  auto perms = permissions.server_permissions_;
1817  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
1818  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
1819  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER)) ||
1820  (perms.usage_ && !privs.hasPermission(ServerPrivileges::SERVER_USAGE))) {
1821  return false;
1822  } else {
1823  return true;
1824  }
1825 }
1826 
1827 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1828  const std::string& granteeName,
1829  const std::string& objectName,
1830  const TDBObjectType::type objectType,
1831  const TDBObjectPermissions& permissions) {
1832  auto stdlog = STDLOG(get_session_ptr(sessionId));
1833  auto session_ptr = stdlog.getConstSessionInfo();
1834  auto const& cat = session_ptr->getCatalog();
1835  auto const& current_user = session_ptr->get_currentUser();
1836  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1837  current_user.userName, granteeName, false)) {
1839  "Users except superusers can only check privileges for self or roles granted "
1840  "to "
1841  "them.")
1842  }
1844  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1845  user_meta.isSuper) {
1846  return true;
1847  }
1848  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1849  if (!grnt) {
1850  THROW_DB_EXCEPTION("User or Role " + granteeName + " does not exist.")
1851  }
1853  std::string func_name;
1854  switch (objectType) {
1857  func_name = "database";
1858  break;
1861  func_name = "table";
1862  break;
1865  func_name = "dashboard";
1866  break;
1869  func_name = "view";
1870  break;
1873  func_name = "server";
1874  break;
1875  default:
1876  THROW_DB_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1877  }
1878  DBObject req_object(objectName, type);
1879  req_object.loadKey(cat);
1880 
1881  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1882  if (grantee_object) {
1883  // if grantee has privs on the object
1884  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1885  } else {
1886  // no privileges on that object
1887  return false;
1888  }
1889 }
1890 
1891 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1892  const TSessionId& sessionId,
1893  const std::string& roleName) {
1894  auto stdlog = STDLOG(get_session_ptr(sessionId));
1895  auto session_ptr = stdlog.getConstSessionInfo();
1896  auto const& user = session_ptr->get_currentUser();
1897  if (!user.isSuper &&
1898  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1899  return;
1900  }
1901  auto* rl = SysCatalog::instance().getGrantee(roleName);
1902  if (rl) {
1903  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1904  for (auto& dbObject : *rl->getDbObjects(true)) {
1905  if (dbObject.first.dbId != dbId) {
1906  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1907  // usecase for now, though)
1908  continue;
1909  }
1910  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1911  TDBObjectsForRole.push_back(tdbObject);
1912  }
1913  } else {
1914  THROW_DB_EXCEPTION("User or role " + roleName + " does not exist.");
1915  }
1916 }
1917 
1918 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1919  const TSessionId& sessionId,
1920  const std::string& objectName,
1921  const TDBObjectType::type type) {
1922  auto stdlog = STDLOG(get_session_ptr(sessionId));
1923  auto session_ptr = stdlog.getConstSessionInfo();
1924  const auto& cat = session_ptr->getCatalog();
1925  DBObjectType object_type;
1926  switch (type) {
1928  object_type = DBObjectType::DatabaseDBObjectType;
1929  break;
1931  object_type = DBObjectType::TableDBObjectType;
1932  break;
1935  break;
1937  object_type = DBObjectType::ViewDBObjectType;
1938  break;
1940  object_type = DBObjectType::ServerDBObjectType;
1941  break;
1942  default:
1943  THROW_DB_EXCEPTION("Failed to get object privileges for " + objectName +
1944  ": unknown object type (" + std::to_string(type) + ").");
1945  }
1946  DBObject object_to_find(objectName, object_type);
1947 
1948  // TODO(adb): Use DatabaseLock to protect method
1949  try {
1950  if (object_type == DashboardDBObjectType) {
1951  if (objectName == "") {
1952  object_to_find = DBObject(-1, object_type);
1953  } else {
1954  object_to_find = DBObject(std::stoi(objectName), object_type);
1955  }
1956  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1957  !objectName.empty()) {
1958  // special handling for view / table
1959  auto td = cat.getMetadataForTable(objectName, false);
1960  if (td) {
1961  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1962  object_to_find = DBObject(objectName, object_type);
1963  }
1964  }
1965  object_to_find.loadKey(cat);
1966  } catch (const std::exception&) {
1967  THROW_DB_EXCEPTION("Object with name " + objectName + " does not exist.");
1968  }
1969 
1970  // object type on database level
1971  DBObject object_to_find_dblevel("", object_type);
1972  object_to_find_dblevel.loadKey(cat);
1973  // if user is superuser respond with a full priv
1974  if (session_ptr->get_currentUser().isSuper) {
1975  // using ALL_TABLE here to set max permissions
1976  DBObject dbObj{object_to_find.getObjectKey(),
1978  session_ptr->get_currentUser().userId};
1979  dbObj.setName("super");
1980  TDBObjects.push_back(
1981  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
1982  };
1983 
1984  std::vector<std::string> grantees =
1985  SysCatalog::instance().getRoles(true,
1986  session_ptr->get_currentUser().isSuper,
1987  session_ptr->get_currentUser().userName);
1988  for (const auto& grantee : grantees) {
1989  DBObject* object_found;
1990  auto* gr = SysCatalog::instance().getGrantee(grantee);
1991  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1992  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1993  }
1994  // check object permissions on Database level
1995  if (gr &&
1996  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1997  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1998  }
1999  }
2000 }
2001 
2003  std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr,
2004  std::vector<std::string>& roles,
2005  const TSessionId& sessionId,
2006  const std::string& granteeName,
2007  bool effective) {
2008  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
2009  if (grantee) {
2010  if (session_ptr->get_currentUser().isSuper) {
2011  roles = grantee->getRoles(/*only_direct=*/!effective);
2012  } else if (grantee->isUser()) {
2013  if (session_ptr->get_currentUser().userName == granteeName) {
2014  roles = grantee->getRoles(/*only_direct=*/!effective);
2015  } else {
2017  "Only a superuser is authorized to request list of roles granted to another "
2018  "user.");
2019  }
2020  } else {
2021  CHECK(!grantee->isUser());
2022  // granteeName is actually a roleName here and we can check a role
2023  // only if it is granted to us
2024  if (SysCatalog::instance().isRoleGrantedToGrantee(
2025  session_ptr->get_currentUser().userName, granteeName, false)) {
2026  roles = grantee->getRoles(/*only_direct=*/!effective);
2027  } else {
2028  THROW_DB_EXCEPTION("A user can check only roles granted to him.");
2029  }
2030  }
2031  } else {
2032  THROW_DB_EXCEPTION("Grantee " + granteeName + " does not exist.");
2033  }
2034 }
2035 
2036 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
2037  const TSessionId& sessionId,
2038  const std::string& granteeName) {
2039  // WARNING: This function only returns directly granted roles.
2040  // See also: get_all_effective_roles_for_user() for all of a user's roles.
2041  auto stdlog = STDLOG(get_session_ptr(sessionId));
2042  auto session_ptr = stdlog.getConstSessionInfo();
2043  getAllRolesForUserImpl(session_ptr, roles, sessionId, granteeName, /*effective=*/false);
2044 }
2045 
2046 void DBHandler::get_all_effective_roles_for_user(std::vector<std::string>& roles,
2047  const TSessionId& sessionId,
2048  const std::string& granteeName) {
2049  auto stdlog = STDLOG(get_session_ptr(sessionId));
2050  auto session_ptr = stdlog.getConstSessionInfo();
2051  getAllRolesForUserImpl(session_ptr, roles, sessionId, granteeName, /*effective=*/true);
2052 }
2053 
2054 namespace {
2056  const std::map<std::string, std::vector<std::string>>& table_col_names) {
2057  std::ostringstream oss;
2058  for (const auto& [table_name, col_names] : table_col_names) {
2059  oss << ":" << table_name;
2060  for (const auto& col_name : col_names) {
2061  oss << "," << col_name;
2062  }
2063  }
2064  return oss.str();
2065 }
2066 } // namespace
2067 
2069  TPixelTableRowResult& _return,
2070  const TSessionId& session,
2071  const int64_t widget_id,
2072  const TPixel& pixel,
2073  const std::map<std::string, std::vector<std::string>>& table_col_names,
2074  const bool column_format,
2075  const int32_t pixel_radius,
2076  const std::string& nonce) {
2077  auto stdlog = STDLOG(get_session_ptr(session),
2078  "widget_id",
2079  widget_id,
2080  "pixel.x",
2081  pixel.x,
2082  "pixel.y",
2083  pixel.y,
2084  "column_format",
2085  column_format,
2086  "pixel_radius",
2087  pixel_radius,
2088  "table_col_names",
2089  dump_table_col_names(table_col_names),
2090  "nonce",
2091  nonce);
2092  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2093  auto session_ptr = stdlog.getSessionInfo();
2094  if (!render_handler_) {
2095  THROW_DB_EXCEPTION("Backend rendering is disabled.");
2096  }
2097 
2098  try {
2099  render_handler_->get_result_row_for_pixel(_return,
2100  session_ptr,
2101  widget_id,
2102  pixel,
2103  table_col_names,
2104  column_format,
2105  pixel_radius,
2106  nonce);
2107  } catch (std::exception& e) {
2108  THROW_DB_EXCEPTION(e.what());
2109  }
2110 }
2111 
2113  const ColumnDescriptor* cd) {
2114  TColumnType col_type;
2115  col_type.col_name = cd->columnName;
2116  col_type.src_name = cd->sourceName;
2117  col_type.col_id = cd->columnId;
2118  col_type.col_type.type = type_to_thrift(cd->columnType);
2119  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
2120  col_type.col_type.nullable = !cd->columnType.get_notnull();
2121  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
2122  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
2123  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
2124  }
2125  if (IS_GEO(cd->columnType.get_type())) {
2127  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
2128  } else {
2129  col_type.col_type.precision = cd->columnType.get_precision();
2130  col_type.col_type.scale = cd->columnType.get_scale();
2131  }
2132  col_type.is_system = cd->isSystemCol;
2134  cat != nullptr) {
2135  // have to get the actual size of the encoding from the dictionary definition
2136  const int dict_id = cd->columnType.get_comp_param();
2137  if (!cat->getMetadataForDict(dict_id, false)) {
2138  col_type.col_type.comp_param = 0;
2139  return col_type;
2140  }
2141  auto dd = cat->getMetadataForDict(dict_id, false);
2142  if (!dd) {
2143  THROW_DB_EXCEPTION("Dictionary doesn't exist");
2144  }
2145  col_type.col_type.comp_param = dd->dictNBits;
2146  } else {
2147  col_type.col_type.comp_param =
2148  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
2149  ? 32
2150  : cd->columnType.get_comp_param();
2151  }
2152  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
2153  if (cd->default_value.has_value()) {
2154  col_type.__set_default_value(cd->getDefaultValueLiteral());
2155  }
2156  return col_type;
2157 }
2158 
2159 void DBHandler::get_internal_table_details(TTableDetails& _return,
2160  const TSessionId& session,
2161  const std::string& table_name,
2162  const bool include_system_columns) {
2163  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2164  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2165  get_table_details_impl(_return, stdlog, table_name, include_system_columns, false);
2166 }
2167 
2169  TTableDetails& _return,
2170  const TSessionId& session,
2171  const std::string& table_name,
2172  const std::string& database_name) {
2173  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2174  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2175  get_table_details_impl(_return, stdlog, table_name, true, false, database_name);
2176 }
2177 
2178 void DBHandler::get_table_details(TTableDetails& _return,
2179  const TSessionId& session,
2180  const std::string& table_name) {
2181  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2182  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2183 
2184  auto execute_read_lock =
2188  get_table_details_impl(_return, stdlog, table_name, false, false);
2189 }
2190 
2191 void DBHandler::get_table_details_for_database(TTableDetails& _return,
2192  const TSessionId& session,
2193  const std::string& table_name,
2194  const std::string& database_name) {
2195  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2196  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2197 
2198  auto execute_read_lock =
2202  get_table_details_impl(_return, stdlog, table_name, false, false, database_name);
2203 }
2204 
2205 namespace {
2206 TTableRefreshInfo get_refresh_info(const TableDescriptor* td) {
2207  CHECK(td->isForeignTable());
2208  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
2209  CHECK(foreign_table);
2210  TTableRefreshInfo refresh_info;
2211  const auto& update_type =
2213  CHECK(update_type.has_value());
2214  if (update_type.value() == foreign_storage::ForeignTable::ALL_REFRESH_UPDATE_TYPE) {
2215  refresh_info.update_type = TTableRefreshUpdateType::ALL;
2216  } else if (update_type.value() ==
2218  refresh_info.update_type = TTableRefreshUpdateType::APPEND;
2219  } else {
2220  UNREACHABLE() << "Unexpected refresh update type: " << update_type.value();
2221  }
2222 
2223  const auto& timing_type =
2225  CHECK(timing_type.has_value());
2226  if (timing_type.value() == foreign_storage::ForeignTable::MANUAL_REFRESH_TIMING_TYPE) {
2227  refresh_info.timing_type = TTableRefreshTimingType::MANUAL;
2228  refresh_info.interval_count = -1;
2229  } else if (timing_type.value() ==
2231  refresh_info.timing_type = TTableRefreshTimingType::SCHEDULED;
2232  const auto& start_date_time = foreign_table->getOption(
2234  CHECK(start_date_time.has_value());
2235  auto start_date_time_epoch = dateTimeParse<kTIMESTAMP>(start_date_time.value(), 0);
2236  refresh_info.start_date_time =
2237  shared::convert_temporal_to_iso_format({kTIMESTAMP}, start_date_time_epoch);
2238  const auto& interval =
2239  foreign_table->getOption(foreign_storage::ForeignTable::REFRESH_INTERVAL_KEY);
2240  CHECK(interval.has_value());
2241  const auto& interval_str = interval.value();
2242  refresh_info.interval_count =
2243  std::stoi(interval_str.substr(0, interval_str.length() - 1));
2244  auto interval_type = std::toupper(interval_str[interval_str.length() - 1]);
2245  if (interval_type == 'H') {
2246  refresh_info.interval_type = TTableRefreshIntervalType::HOUR;
2247  } else if (interval_type == 'D') {
2248  refresh_info.interval_type = TTableRefreshIntervalType::DAY;
2249  } else if (interval_type == 'S') {
2250  // This use case is for development only.
2251  refresh_info.interval_type = TTableRefreshIntervalType::NONE;
2252  } else {
2253  UNREACHABLE() << "Unexpected interval type: " << interval_str;
2254  }
2255  } else {
2256  UNREACHABLE() << "Unexpected refresh timing type: " << timing_type.value();
2257  }
2258  if (foreign_table->last_refresh_time !=
2260  refresh_info.last_refresh_time = shared::convert_temporal_to_iso_format(
2261  {kTIMESTAMP}, foreign_table->last_refresh_time);
2262  }
2263  if (foreign_table->next_refresh_time !=
2265  refresh_info.next_refresh_time = shared::convert_temporal_to_iso_format(
2266  {kTIMESTAMP}, foreign_table->next_refresh_time);
2267  }
2268  return refresh_info;
2269 }
2270 } // namespace
2271 
2272 void DBHandler::get_table_details_impl(TTableDetails& _return,
2273  query_state::StdLog& stdlog,
2274  const std::string& table_name,
2275  const bool get_system,
2276  const bool get_physical,
2277  const std::string& database_name) {
2278  try {
2279  auto session_info = stdlog.getSessionInfo();
2280  auto cat = (database_name.empty())
2281  ? &session_info->getCatalog()
2282  : SysCatalog::instance().getCatalog(database_name).get();
2283  if (!cat) {
2284  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2285  }
2286  const auto td_with_lock =
2288  *cat, table_name, false);
2289  const auto td = td_with_lock();
2290  CHECK(td);
2291 
2292  bool have_privileges_on_view_sources = true;
2293  if (td->isView) {
2294  auto query_state = create_query_state(session_info, td->viewSQL);
2295  stdlog.setQueryState(query_state);
2296  try {
2297  if (hasTableAccessPrivileges(td, *session_info)) {
2298  const auto [query_ra, locks] = parse_to_ra(query_state->createQueryStateProxy(),
2299  query_state->getQueryStr(),
2300  {},
2301  true,
2303  false);
2304  try {
2305  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2306  query_ra);
2307  } catch (const std::runtime_error&) {
2308  have_privileges_on_view_sources = false;
2309  }
2310 
2311  const auto result = validate_rel_alg(query_ra.plan_result,
2312  query_state->createQueryStateProxy());
2313 
2314  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, *cat);
2315  } else {
2316  throw std::runtime_error(
2317  "Unable to access view " + table_name +
2318  ". The view may not exist, or the logged in user may not "
2319  "have permission to access the view.");
2320  }
2321  } catch (const std::exception& e) {
2322  throw std::runtime_error("View '" + table_name +
2323  "' query has failed with an error: '" +
2324  std::string(e.what()) +
2325  "'.\nThe view must be dropped and re-created to "
2326  "resolve the error. \nQuery:\n" +
2327  query_state->getQueryStr());
2328  }
2329  } else {
2330  if (hasTableAccessPrivileges(td, *session_info)) {
2331  const auto col_descriptors = cat->getAllColumnMetadataForTable(
2332  td->tableId, get_system, true, get_physical);
2333  const auto deleted_cd = cat->getDeletedColumn(td);
2334  for (const auto cd : col_descriptors) {
2335  if (cd == deleted_cd) {
2336  continue;
2337  }
2338  _return.row_desc.push_back(populateThriftColumnType(cat, cd));
2339  }
2340  } else {
2341  throw std::runtime_error(
2342  "Unable to access table " + table_name +
2343  ". The table may not exist, or the logged in user may not "
2344  "have permission to access the table.");
2345  }
2346  }
2347  _return.fragment_size = td->maxFragRows;
2348  _return.page_size = td->fragPageSize;
2349  _return.max_rows = td->maxRows;
2350  _return.view_sql =
2351  (have_privileges_on_view_sources ? td->viewSQL
2352  : "[Not enough privileges to see the view SQL]");
2353  _return.shard_count = td->nShards;
2354  _return.key_metainfo = td->keyMetainfo;
2355  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2356  _return.partition_detail =
2357  td->partitions.empty()
2358  ? TPartitionDetail::DEFAULT
2359  : (table_is_replicated(td)
2360  ? TPartitionDetail::REPLICATED
2361  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2362  : TPartitionDetail::OTHER));
2363  if (td->isView) {
2364  _return.table_type = TTableType::VIEW;
2365  } else if (td->isTemporaryTable()) {
2366  _return.table_type = TTableType::TEMPORARY;
2367  } else if (td->isForeignTable()) {
2368  _return.table_type = TTableType::FOREIGN;
2369  _return.refresh_info = get_refresh_info(td);
2370  } else {
2371  _return.table_type = TTableType::DEFAULT;
2372  }
2373 
2374  } catch (const std::runtime_error& e) {
2375  THROW_DB_EXCEPTION(std::string(e.what()));
2376  }
2377 }
2378 
2379 void DBHandler::get_link_view(TFrontendView& _return,
2380  const TSessionId& session,
2381  const std::string& link) {
2382  auto stdlog = STDLOG(get_session_ptr(session));
2383  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2384  auto session_ptr = stdlog.getConstSessionInfo();
2385  auto const& cat = session_ptr->getCatalog();
2386  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2387  if (!ld) {
2388  THROW_DB_EXCEPTION("Link " + link + " is not valid.");
2389  }
2390  _return.view_state = ld->viewState;
2391  _return.view_name = ld->link;
2392  _return.update_time = ld->updateTime;
2393  _return.view_metadata = ld->viewMetadata;
2394 }
2395 
2397  const TableDescriptor* td,
2398  const Catalog_Namespace::SessionInfo& session_info) {
2399  auto& cat = session_info.getCatalog();
2400  auto user_metadata = session_info.get_currentUser();
2401 
2402  if (user_metadata.isSuper) {
2403  return true;
2404  }
2405 
2407  dbObject.loadKey(cat);
2408  std::vector<DBObject> privObjects = {dbObject};
2409 
2410  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2411 }
2412 
2413 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2414  const Catalog_Namespace::SessionInfo& session_info,
2415  const GetTablesType get_tables_type,
2416  const std::string& database_name) {
2417  if (database_name.empty()) {
2418  table_names = session_info.getCatalog().getTableNamesForUser(
2419  session_info.get_currentUser(), get_tables_type);
2420  } else {
2421  auto request_cat = SysCatalog::instance().getCatalog(database_name);
2422  if (!request_cat) {
2423  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2424  }
2425  table_names = request_cat->getTableNamesForUser(session_info.get_currentUser(),
2426  get_tables_type);
2427  }
2428 }
2429 
2430 void DBHandler::get_tables(std::vector<std::string>& table_names,
2431  const TSessionId& session) {
2432  auto stdlog = STDLOG(get_session_ptr(session));
2433  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2435  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2436 }
2437 
2438 void DBHandler::get_tables_for_database(std::vector<std::string>& table_names,
2439  const TSessionId& session,
2440  const std::string& database_name) {
2441  auto stdlog = STDLOG(get_session_ptr(session));
2442  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2443 
2444  get_tables_impl(table_names,
2445  *stdlog.getConstSessionInfo(),
2447  database_name);
2448 }
2449 
2450 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2451  const TSessionId& session) {
2452  auto stdlog = STDLOG(get_session_ptr(session));
2453  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2454  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2455 }
2456 
2457 void DBHandler::get_views(std::vector<std::string>& table_names,
2458  const TSessionId& session) {
2459  auto stdlog = STDLOG(get_session_ptr(session));
2460  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2461  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2462 }
2463 
2464 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2465  QueryStateProxy query_state_proxy,
2466  const Catalog_Namespace::SessionInfo& session_info,
2467  const bool with_table_locks) {
2468  const auto& cat = session_info.getCatalog();
2469  // Get copies of table descriptors here in order to avoid possible use of dangling
2470  // pointers, if tables are concurrently dropped.
2471  const auto tables = cat.getAllTableMetadataCopy();
2472  _return.reserve(tables.size());
2473 
2474  for (const auto& td : tables) {
2475  if (td.shard >= 0) {
2476  // skip shards, they're not standalone tables
2477  continue;
2478  }
2479  if (!hasTableAccessPrivileges(&td, session_info)) {
2480  // skip table, as there are no privileges to access it
2481  continue;
2482  }
2483 
2484  TTableMeta ret;
2485  ret.table_name = td.tableName;
2486  ret.is_view = td.isView;
2487  ret.is_replicated = table_is_replicated(&td);
2488  ret.shard_count = td.nShards;
2489  ret.max_rows = td.maxRows;
2490  ret.table_id = td.tableId;
2491 
2492  std::vector<TTypeInfo> col_types;
2493  std::vector<std::string> col_names;
2494  size_t num_cols = 0;
2495  if (td.isView) {
2496  try {
2497  TPlanResult parse_result;
2499  std::tie(parse_result, locks) = parse_to_ra(
2500  query_state_proxy, td.viewSQL, {}, with_table_locks, system_parameters_);
2501  const auto query_ra = parse_result.plan_result;
2502 
2503  ExecutionResult ex_result;
2504  execute_rel_alg(ex_result,
2505  query_state_proxy,
2506  query_ra,
2507  true,
2509  -1,
2510  -1,
2511  /*just_validate=*/true,
2512  /*find_push_down_candidates=*/false,
2513  ExplainInfo());
2514  TQueryResult result;
2515  DBHandler::convertData(result, ex_result, query_state_proxy, true, -1, -1);
2516  num_cols = result.row_set.row_desc.size();
2517  for (const auto& col : result.row_set.row_desc) {
2518  if (col.is_physical) {
2519  num_cols--;
2520  continue;
2521  }
2522  col_types.push_back(col.col_type);
2523  col_names.push_back(col.col_name);
2524  }
2525  } catch (std::exception& e) {
2526  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td.tableName;
2527  }
2528  } else {
2529  try {
2530  if (hasTableAccessPrivileges(&td, session_info)) {
2531  const auto col_descriptors =
2532  cat.getAllColumnMetadataForTable(td.tableId, false, true, false);
2533  const auto deleted_cd = cat.getDeletedColumn(&td);
2534  for (const auto cd : col_descriptors) {
2535  if (cd == deleted_cd) {
2536  continue;
2537  }
2538  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2539  col_names.push_back(cd->columnName);
2540  }
2541  num_cols = col_descriptors.size();
2542  } else {
2543  continue;
2544  }
2545  } catch (const std::runtime_error& e) {
2546  THROW_DB_EXCEPTION(e.what());
2547  }
2548  }
2549 
2550  ret.num_cols = num_cols;
2551  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2552  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2553 
2554  _return.push_back(ret);
2555  }
2556 }
2557 
2558 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2559  const TSessionId& session) {
2560  auto stdlog = STDLOG(get_session_ptr(session));
2561  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2562  auto session_ptr = stdlog.getConstSessionInfo();
2563  auto query_state = create_query_state(session_ptr, "");
2564  stdlog.setQueryState(query_state);
2565 
2566  auto execute_read_lock =
2570 
2571  try {
2572  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2573  } catch (const std::exception& e) {
2574  THROW_DB_EXCEPTION(e.what());
2575  }
2576 }
2577 
2578 void DBHandler::get_users(std::vector<std::string>& user_names,
2579  const TSessionId& session) {
2580  auto stdlog = STDLOG(get_session_ptr(session));
2581  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2582  auto session_ptr = stdlog.getConstSessionInfo();
2583  std::list<Catalog_Namespace::UserMetadata> user_list;
2584 
2585  if (!session_ptr->get_currentUser().isSuper) {
2586  user_list = SysCatalog::instance().getAllUserMetadata(
2587  session_ptr->getCatalog().getCurrentDB().dbId);
2588  } else {
2589  user_list = SysCatalog::instance().getAllUserMetadata();
2590  }
2591  for (auto u : user_list) {
2592  user_names.push_back(u.userName);
2593  }
2594 }
2595 
2596 void DBHandler::get_version(std::string& version) {
2597  version = MAPD_RELEASE;
2598 }
2599 
2600 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2601  auto stdlog = STDLOG(get_session_ptr(session));
2602  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2603  auto session_ptr = stdlog.getConstSessionInfo();
2604  if (!session_ptr->get_currentUser().isSuper) {
2605  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2606  }
2607  try {
2609  } catch (const std::exception& e) {
2610  THROW_DB_EXCEPTION(e.what());
2611  }
2612  if (render_handler_) {
2613  render_handler_->clear_gpu_memory();
2614  }
2615 }
2616 
2617 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2618  auto stdlog = STDLOG(get_session_ptr(session));
2619  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2620  auto session_ptr = stdlog.getConstSessionInfo();
2621  if (!session_ptr->get_currentUser().isSuper) {
2622  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2623  }
2624  try {
2626  } catch (const std::exception& e) {
2627  THROW_DB_EXCEPTION(e.what());
2628  }
2629  if (render_handler_) {
2630  render_handler_->clear_cpu_memory();
2631  }
2632 }
2633 
2634 void DBHandler::clearRenderMemory(const TSessionId& session) {
2635  auto stdlog = STDLOG(get_session_ptr(session));
2636  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2637  auto session_ptr = stdlog.getConstSessionInfo();
2638  if (!session_ptr->get_currentUser().isSuper) {
2639  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_render_memory");
2640  }
2641  if (render_handler_) {
2642  render_handler_->clear_cpu_memory();
2643  render_handler_->clear_gpu_memory();
2644  }
2645 }
2646 
2647 void DBHandler::set_cur_session(const TSessionId& parent_session,
2648  const TSessionId& leaf_session,
2649  const std::string& start_time_str,
2650  const std::string& label,
2651  bool for_running_query_kernel) {
2652  // internal API to manage query interruption in distributed mode
2653  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2654  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2655  auto session_ptr = stdlog.getConstSessionInfo();
2656 
2658  executor->enrollQuerySession(parent_session,
2659  label,
2660  start_time_str,
2662  for_running_query_kernel
2663  ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2664  : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
2665 }
2666 
2667 void DBHandler::invalidate_cur_session(const TSessionId& parent_session,
2668  const TSessionId& leaf_session,
2669  const std::string& start_time_str,
2670  const std::string& label,
2671  bool for_running_query_kernel) {
2672  // internal API to manage query interruption in distributed mode
2673  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2674  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2676  executor->clearQuerySessionStatus(parent_session, start_time_str);
2677 }
2678 
2680  return INVALID_SESSION_ID;
2681 }
2682 
2683 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2684  const TSessionId& session,
2685  const std::string& memory_level) {
2686  auto stdlog = STDLOG(get_session_ptr(session));
2687  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2688  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2689  Data_Namespace::MemoryLevel mem_level;
2690  if (!memory_level.compare("gpu")) {
2692  internal_memory =
2693  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2694  } else {
2696  internal_memory =
2697  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2698  }
2699 
2700  for (auto memInfo : internal_memory) {
2701  TNodeMemoryInfo nodeInfo;
2702  nodeInfo.page_size = memInfo.pageSize;
2703  nodeInfo.max_num_pages = memInfo.maxNumPages;
2704  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2705  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2706  for (auto gpu : memInfo.nodeMemoryData) {
2707  TMemoryData md;
2708  md.slab = gpu.slabNum;
2709  md.start_page = gpu.startPage;
2710  md.num_pages = gpu.numPages;
2711  md.touch = gpu.touch;
2712  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2713  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2714  nodeInfo.node_memory_data.push_back(md);
2715  }
2716  _return.push_back(nodeInfo);
2717  }
2718 }
2719 
2720 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2721  auto stdlog = STDLOG(get_session_ptr(session));
2722  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2723  auto session_ptr = stdlog.getConstSessionInfo();
2724  const auto& user = session_ptr->get_currentUser();
2726  SysCatalog::instance().getDatabaseListForUser(user);
2727  for (auto& db : dbs) {
2728  TDBInfo dbinfo;
2729  dbinfo.db_name = std::move(db.dbName);
2730  dbinfo.db_owner = std::move(db.dbOwnerName);
2731  dbinfos.push_back(std::move(dbinfo));
2732  }
2733 }
2734 
2736  auto executor = get_session_ptr(session)->get_executor_device_type();
2737  switch (executor) {
2739  return TExecuteMode::CPU;
2741  return TExecuteMode::GPU;
2742  default:
2743  UNREACHABLE();
2744  }
2745  UNREACHABLE();
2746  return TExecuteMode::CPU;
2747 }
2748 
2749 void DBHandler::set_execution_mode(const TSessionId& session,
2750  const TExecuteMode::type mode) {
2751  auto session_ptr = get_session_ptr(session);
2752  auto stdlog = STDLOG(session_ptr);
2753  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2754  DBHandler::set_execution_mode_nolock(session_ptr.get(), mode);
2755 }
2756 
2757 namespace {
2758 
2760  if (td && td->nShards) {
2761  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2762  }
2763 }
2764 
2765 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
2766  const std::vector<std::string>& column_names) {
2767  std::unordered_set<std::string> unique_names;
2768  for (const auto& name : column_names) {
2769  auto lower_name = to_lower(name);
2770  if (unique_names.find(lower_name) != unique_names.end()) {
2771  THROW_DB_EXCEPTION("Column " + name + " is mentioned multiple times");
2772  } else {
2773  unique_names.insert(lower_name);
2774  }
2775  }
2776  for (const auto& cd : descs) {
2777  auto iter = unique_names.find(to_lower(cd->columnName));
2778  if (iter != unique_names.end()) {
2779  unique_names.erase(iter);
2780  }
2781  }
2782  if (!unique_names.empty()) {
2783  THROW_DB_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
2784  }
2785 }
2786 
2787 // Return vector of IDs mapping column descriptors to the list of comumn names.
2788 // The size of the vector is the number of actual columns (geophisical columns excluded).
2789 // ID is either a position in column_names matching the descriptor, or -1 if the column
2790 // is missing from the column_names
2791 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
2792  const std::vector<std::string>& column_names) {
2793  std::vector<int> desc_to_column_ids;
2794  if (column_names.empty()) {
2795  int col_idx = 0;
2796  for (const auto& cd : descs) {
2797  if (!cd->isGeoPhyCol) {
2798  desc_to_column_ids.push_back(col_idx);
2799  ++col_idx;
2800  }
2801  }
2802  } else {
2803  for (const auto& cd : descs) {
2804  if (!cd->isGeoPhyCol) {
2805  bool found = false;
2806  for (size_t j = 0; j < column_names.size(); ++j) {
2807  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
2808  found = true;
2809  desc_to_column_ids.push_back(j);
2810  break;
2811  }
2812  }
2813  if (!found) {
2814  if (!cd->columnType.get_notnull()) {
2815  desc_to_column_ids.push_back(-1);
2816  } else {
2817  THROW_DB_EXCEPTION("Column '" + cd->columnName +
2818  "' cannot be omitted due to NOT NULL constraint");
2819  }
2820  }
2821  }
2822  }
2823  }
2824  return desc_to_column_ids;
2825 }
2826 
2827 } // namespace
2828 
2830  const TSessionId& session,
2831  const Catalog& catalog,
2832  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
2833  const ColumnDescriptor* cd,
2834  size_t& col_idx,
2835  size_t num_rows,
2836  const std::string& table_name,
2837  bool assign_render_groups) {
2838  auto geo_col_idx = col_idx - 1;
2839  const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2840  std::vector<std::vector<double>> coords_column, bounds_column;
2841  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2842  std::vector<int> render_groups_column;
2843  SQLTypeInfo ti = cd->columnType;
2844  if (num_rows != wkt_or_wkb_hex_column->size() ||
2845  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
2846  ti,
2847  coords_column,
2848  bounds_column,
2849  ring_sizes_column,
2850  poly_rings_column,
2851  true)) {
2852  std::ostringstream oss;
2853  oss << "Invalid geometry in column " << cd->columnName;
2854  THROW_DB_EXCEPTION(oss.str());
2855  }
2856 
2857  // start or continue assigning render groups for poly columns?
2858  if (IS_GEO_POLY(cd->columnType.get_type()) && assign_render_groups &&
2860  // get RGA to use
2861  import_export::RenderGroupAnalyzer* render_group_analyzer{};
2862  {
2863  // mutex the map access
2864  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
2865 
2866  // emplace new RGA or fetch existing RGA from map
2867  auto [itr_table, emplaced_table] = render_group_assignment_map_.try_emplace(
2868  session, RenderGroupAssignmentTableMap());
2869  LOG_IF(INFO, emplaced_table)
2870  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2871  "Persistent Data for Session '"
2872  << session << "'";
2873  auto [itr_column, emplaced_column] =
2874  itr_table->second.try_emplace(table_name, RenderGroupAssignmentColumnMap());
2875  LOG_IF(INFO, emplaced_column)
2876  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2877  "Persistent Data for Table '"
2878  << table_name << "'";
2879  auto [itr_analyzer, emplaced_analyzer] = itr_column->second.try_emplace(
2880  cd->columnName, std::make_unique<import_export::RenderGroupAnalyzer>());
2881  LOG_IF(INFO, emplaced_analyzer)
2882  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2883  "Persistent Data for Column '"
2884  << cd->columnName << "'";
2885  render_group_analyzer = itr_analyzer->second.get();
2886  CHECK(render_group_analyzer);
2887 
2888  // seed new RGA from existing table/column, to handle appends
2889  if (emplaced_analyzer) {
2890  LOG(INFO) << "load_table_binary_columnar_polys: Seeding Render Groups from "
2891  "existing table...";
2892  render_group_analyzer->seedFromExistingTableContents(
2893  catalog, table_name, cd->columnName);
2894  LOG(INFO) << "load_table_binary_columnar_polys: Done";
2895  }
2896  }
2897 
2898  // assign render groups for this set of bounds
2899  LOG(INFO) << "load_table_binary_columnar_polys: Assigning Render Groups...";
2900  render_groups_column.reserve(bounds_column.size());
2901  for (auto const& bounds : bounds_column) {
2902  CHECK_EQ(bounds.size(), 4u);
2903  int rg = render_group_analyzer->insertBoundsAndReturnRenderGroup(bounds);
2904  render_groups_column.push_back(rg);
2905  }
2906  LOG(INFO) << "load_table_binary_columnar_polys: Done";
2907  } else {
2908  // render groups all zero
2909  render_groups_column.resize(bounds_column.size(), 0);
2910  }
2911 
2912  // Populate physical columns, advance col_idx
2914  cd,
2915  import_buffers,
2916  col_idx,
2917  coords_column,
2918  bounds_column,
2919  ring_sizes_column,
2920  poly_rings_column,
2921  render_groups_column);
2922 }
2923 
2925  const TSessionId& session,
2926  const Catalog& catalog,
2927  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
2928  const std::list<const ColumnDescriptor*>& cds,
2929  const std::vector<int>& desc_id_to_column_id,
2930  size_t num_rows,
2931  const std::string& table_name,
2932  bool assign_render_groups) {
2933  size_t skip_physical_cols = 0;
2934  size_t col_idx = 0, import_idx = 0;
2935  for (const auto& cd : cds) {
2936  if (skip_physical_cols > 0) {
2937  CHECK(cd->isGeoPhyCol);
2938  skip_physical_cols--;
2939  continue;
2940  } else if (cd->columnType.is_geometry()) {
2941  skip_physical_cols = cd->columnType.get_physical_cols();
2942  }
2943  if (desc_id_to_column_id[import_idx] == -1) {
2944  import_buffers[col_idx]->addDefaultValues(cd, num_rows);
2945  col_idx++;
2946  if (cd->columnType.is_geometry()) {
2947  fillGeoColumns(session,
2948  catalog,
2949  import_buffers,
2950  cd,
2951  col_idx,
2952  num_rows,
2953  table_name,
2954  assign_render_groups);
2955  }
2956  } else {
2957  col_idx++;
2958  col_idx += skip_physical_cols;
2959  }
2960  import_idx++;
2961  }
2962 }
2963 
2964 void DBHandler::load_table_binary(const TSessionId& session,
2965  const std::string& table_name,
2966  const std::vector<TRow>& rows,
2967  const std::vector<std::string>& column_names) {
2968  try {
2969  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2970  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2971  auto session_ptr = stdlog.getConstSessionInfo();
2972 
2973  if (rows.empty()) {
2974  THROW_DB_EXCEPTION("No rows to insert");
2975  }
2976 
2977  const auto execute_read_lock =
2981  std::unique_ptr<import_export::Loader> loader;
2982  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2983  auto schema_read_lock = prepare_loader_generic(*session_ptr,
2984  table_name,
2985  rows.front().cols.size(),
2986  &loader,
2987  &import_buffers,
2988  column_names,
2989  "load_table_binary");
2990 
2991  auto col_descs = loader->get_column_descs();
2992  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
2993 
2994  size_t rows_completed = 0;
2995  for (auto const& row : rows) {
2996  size_t col_idx = 0;
2997  try {
2998  for (auto cd : col_descs) {
2999  auto mapped_idx = desc_id_to_column_id[col_idx];
3000  if (mapped_idx != -1) {
3001  import_buffers[col_idx]->add_value(
3002  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3003  }
3004  col_idx++;
3005  }
3006  rows_completed++;
3007  } catch (const std::exception& e) {
3008  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3009  import_buffers[col_idx_to_pop]->pop_value();
3010  }
3011  LOG(ERROR) << "Input exception thrown: " << e.what()
3012  << ". Row discarded, issue at column : " << (col_idx + 1)
3013  << " data :" << row;
3014  }
3015  }
3016  fillMissingBuffers(session,
3017  session_ptr->getCatalog(),
3018  import_buffers,
3019  col_descs,
3020  desc_id_to_column_id,
3021  rows_completed,
3022  table_name,
3023  false);
3024  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3025  session_ptr->getCatalog(), table_name);
3026  if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3027  THROW_DB_EXCEPTION(loader->getErrorMessage());
3028  }
3029  } catch (const std::exception& e) {
3030  THROW_DB_EXCEPTION(std::string(e.what()));
3031  }
3032 }
3033 
3034 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3036  const Catalog_Namespace::SessionInfo& session_info,
3037  const std::string& table_name,
3038  size_t num_cols,
3039  std::unique_ptr<import_export::Loader>* loader,
3040  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3041  const std::vector<std::string>& column_names,
3042  std::string load_type) {
3043  if (num_cols == 0) {
3044  THROW_DB_EXCEPTION("No columns to insert");
3045  }
3046  check_read_only(load_type);
3047  auto& cat = session_info.getCatalog();
3048  auto td_with_lock =
3049  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3051  cat, table_name, true));
3052  const auto td = (*td_with_lock)();
3053  CHECK(td);
3054 
3055  if (g_cluster && !leaf_aggregator_.leafCount()) {
3056  // Sharded table rows need to be routed to the leaf by an aggregator.
3058  }
3059  check_table_load_privileges(session_info, table_name);
3060 
3061  loader->reset(new import_export::Loader(cat, td));
3062 
3063  auto col_descs = (*loader)->get_column_descs();
3064  check_valid_column_names(col_descs, column_names);
3065  if (column_names.empty()) {
3066  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
3067  // Subtracting 1 (rowid) until TableDescriptor is updated.
3068  auto geo_physical_cols = std::count_if(
3069  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
3070  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
3071  (td->hasDeletedCol ? 2 : 1);
3072  if (num_cols != num_table_cols) {
3073  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
3074  ") does not match number of columns in table " +
3075  td->tableName + " (" + std::to_string(num_table_cols) +
3076  ")");
3077  }
3078  } else if (num_cols != column_names.size()) {
3080  "Number of columns specified does not match the "
3081  "number of columns given (" +
3082  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
3083  }
3084 
3085  *import_buffers = import_export::setup_column_loaders(td, loader->get());
3086  return std::move(td_with_lock);
3087 }
3088 namespace {
3089 
3090 size_t get_column_size(const TColumn& column) {
3091  if (!column.nulls.empty()) {
3092  return column.nulls.size();
3093  } else {
3094  // it is a very bold estimate but later we check it against REAL data
3095  // and if this function returns a wrong result (e.g. both int and string
3096  // vectors are filled with values), we get an error
3097  return column.data.int_col.size() + column.data.arr_col.size() +
3098  column.data.real_col.size() + column.data.str_col.size();
3099  }
3100 }
3101 
3102 } // namespace
3103 
3104 void DBHandler::load_table_binary_columnar(const TSessionId& session,
3105  const std::string& table_name,
3106  const std::vector<TColumn>& cols,
3107  const std::vector<std::string>& column_names) {
3109  session, table_name, cols, column_names, AssignRenderGroupsMode::kNone);
3110 }
3111 
3113  const TSessionId& session,
3114  const std::string& table_name,
3115  const std::vector<TColumn>& cols,
3116  const std::vector<std::string>& column_names,
3117  const bool assign_render_groups) {
3119  table_name,
3120  cols,
3121  column_names,
3122  assign_render_groups
3125 }
3126 
3128  const TSessionId& session,
3129  const std::string& table_name,
3130  const std::vector<TColumn>& cols,
3131  const std::vector<std::string>& column_names,
3132  const AssignRenderGroupsMode assign_render_groups_mode) {
3133  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3134  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3135  auto session_ptr = stdlog.getConstSessionInfo();
3136 
3137  if (assign_render_groups_mode == AssignRenderGroupsMode::kCleanUp) {
3138  // throw if the user tries to pass column data on a clean-up
3139  if (cols.size()) {
3141  "load_table_binary_columnar_polys: Column data must be empty when called with "
3142  "assign_render_groups = false");
3143  }
3144 
3145  // mutex the map access
3146  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
3147 
3148  // drop persistent render group assignment data for this session and table
3149  // keep the per-session map in case other tables are active (ideally not)
3150  auto itr_session = render_group_assignment_map_.find(session);
3151  if (itr_session != render_group_assignment_map_.end()) {
3152  LOG(INFO) << "load_table_binary_columnar_polys: Cleaning up Render Group "
3153  "Assignment Persistent Data for Session '"
3154  << session << "', Table '" << table_name << "'";
3155  itr_session->second.erase(table_name);
3156  }
3157 
3158  // just doing clean-up, so we're done
3159  return;
3160  }
3161 
3162  const auto execute_read_lock =
3166  std::unique_ptr<import_export::Loader> loader;
3167  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3168  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3169  table_name,
3170  cols.size(),
3171  &loader,
3172  &import_buffers,
3173  column_names,
3174  "load_table_binary_columnar");
3175 
3176  auto desc_id_to_column_id =
3177  column_ids_by_names(loader->get_column_descs(), column_names);
3178  size_t num_rows = get_column_size(cols.front());
3179  size_t import_idx = 0; // index into the TColumn vector being loaded
3180  size_t col_idx = 0; // index into column description vector
3181  try {
3182  size_t skip_physical_cols = 0;
3183  for (auto cd : loader->get_column_descs()) {
3184  if (skip_physical_cols > 0) {
3185  CHECK(cd->isGeoPhyCol);
3186  skip_physical_cols--;
3187  continue;
3188  }
3189  auto mapped_idx = desc_id_to_column_id[import_idx];
3190  if (mapped_idx != -1) {
3191  size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3192  if (col_rows != num_rows) {
3193  std::ostringstream oss;
3194  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
3195  << cd->columnName << " , expecting " << num_rows << " rows, column "
3196  << col_idx << " has " << col_rows << " rows";
3197  THROW_DB_EXCEPTION(oss.str());
3198  }
3199  // Advance to the next column in the table
3200  col_idx++;
3201  // For geometry columns: process WKT strings and fill physical columns
3202  if (cd->columnType.is_geometry()) {
3203  fillGeoColumns(session,
3204  session_ptr->getCatalog(),
3205  import_buffers,
3206  cd,
3207  col_idx,
3208  num_rows,
3209  table_name,
3210  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3211  skip_physical_cols = cd->columnType.get_physical_cols();
3212  }
3213  } else {
3214  col_idx++;
3215  if (cd->columnType.is_geometry()) {
3216  skip_physical_cols = cd->columnType.get_physical_cols();
3217  col_idx += skip_physical_cols;
3218  }
3219  }
3220  // Advance to the next column of values being loaded
3221  import_idx++;
3222  }
3223  } catch (const std::exception& e) {
3224  std::ostringstream oss;
3225  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
3226  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3227  THROW_DB_EXCEPTION(oss.str());
3228  }
3229  fillMissingBuffers(session,
3230  session_ptr->getCatalog(),
3231  import_buffers,
3232  loader->get_column_descs(),
3233  desc_id_to_column_id,
3234  num_rows,
3235  table_name,
3236  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3237  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3238  session_ptr->getCatalog(), table_name);
3239  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3240  THROW_DB_EXCEPTION(loader->getErrorMessage());
3241  }
3242 }
3243 
3244 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
3245 
3246 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3247  do { \
3248  ::arrow::Status _s = (s); \
3249  if (UNLIKELY(!_s.ok())) { \
3250  TDBException ex; \
3251  ex.error_msg = _s.ToString(); \
3252  LOG(ERROR) << s.ToString(); \
3253  throw ex; \
3254  } \
3255  } while (0)
3256 
3257 namespace {
3258 
3259 RecordBatchVector loadArrowStream(const std::string& stream) {
3260  RecordBatchVector batches;
3261  try {
3262  // TODO(wesm): Make this simpler in general, see ARROW-1600
3263  auto stream_buffer =
3264  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
3265  static_cast<int64_t>(stream.size()));
3266 
3267  arrow::io::BufferReader buf_reader(stream_buffer);
3268  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3269  ARROW_ASSIGN_OR_THROW(batch_reader,
3270  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3271 
3272  while (true) {
3273  std::shared_ptr<arrow::RecordBatch> batch;
3274  // Read batch (zero-copy) from the stream
3275  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
3276  if (batch == nullptr) {
3277  break;
3278  }
3279  batches.emplace_back(std::move(batch));
3280  }
3281  } catch (const std::exception& e) {
3282  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
3283  }
3284  return batches;
3285 }
3286 
3287 } // namespace
3288 
3289 void DBHandler::load_table_binary_arrow(const TSessionId& session,
3290  const std::string& table_name,
3291  const std::string& arrow_stream,
3292  const bool use_column_names) {
3293  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3294  auto session_ptr = stdlog.getConstSessionInfo();
3295 
3296  RecordBatchVector batches = loadArrowStream(arrow_stream);
3297  // Assuming have one batch for now
3298  if (batches.size() != 1) {
3299  THROW_DB_EXCEPTION("Expected a single Arrow record batch. Import aborted");
3300  }
3301 
3302  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3303  std::unique_ptr<import_export::Loader> loader;
3304  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3305  std::vector<std::string> column_names;
3306  if (use_column_names) {
3307  column_names = batch->schema()->field_names();
3308  }
3309  const auto execute_read_lock =
3313  auto schema_read_lock =
3314  prepare_loader_generic(*session_ptr,
3315  table_name,
3316  static_cast<size_t>(batch->num_columns()),
3317  &loader,
3318  &import_buffers,
3319  column_names,
3320  "load_table_binary_arrow");
3321 
3322  auto desc_id_to_column_id =
3323  column_ids_by_names(loader->get_column_descs(), column_names);
3324  size_t num_rows = 0;
3325  size_t col_idx = 0;
3326  try {
3327  for (auto cd : loader->get_column_descs()) {
3328  auto mapped_idx = desc_id_to_column_id[col_idx];
3329  if (mapped_idx != -1) {
3330  auto& array = *batch->column(mapped_idx);
3331  import_export::ArraySliceRange row_slice(0, array.length());
3332  num_rows = import_buffers[col_idx]->add_arrow_values(
3333  cd, array, true, row_slice, nullptr);
3334  }
3335  col_idx++;
3336  }
3337  } catch (const std::exception& e) {
3338  LOG(ERROR) << "Input exception thrown: " << e.what()
3339  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3340  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
3341  // other import paths
3342  THROW_DB_EXCEPTION(e.what());
3343  }
3344  fillMissingBuffers(session,
3345  session_ptr->getCatalog(),
3346  import_buffers,
3347  loader->get_column_descs(),
3348  desc_id_to_column_id,
3349  num_rows,
3350  table_name,
3351  false);
3352  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3353  session_ptr->getCatalog(), table_name);
3354  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3355  THROW_DB_EXCEPTION(loader->getErrorMessage());
3356  }
3357 }
3358 
3359 void DBHandler::load_table(const TSessionId& session,
3360  const std::string& table_name,
3361  const std::vector<TStringRow>& rows,
3362  const std::vector<std::string>& column_names) {
3363  try {
3364  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3365  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3366  auto session_ptr = stdlog.getConstSessionInfo();
3367 
3368  if (rows.empty()) {
3369  THROW_DB_EXCEPTION("No rows to insert");
3370  }
3371 
3372  const auto execute_read_lock =
3376  std::unique_ptr<import_export::Loader> loader;
3377  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3378  auto schema_read_lock =
3379  prepare_loader_generic(*session_ptr,
3380  table_name,
3381  static_cast<size_t>(rows.front().cols.size()),
3382  &loader,
3383  &import_buffers,
3384  column_names,
3385  "load_table");
3386 
3387  auto col_descs = loader->get_column_descs();
3388  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3389  import_export::CopyParams copy_params;
3390  size_t rows_completed = 0;
3391  for (auto const& row : rows) {
3392  size_t import_idx = 0; // index into the TStringRow being loaded
3393  size_t col_idx = 0; // index into column description vector
3394  try {
3395  size_t skip_physical_cols = 0;
3396  for (auto cd : col_descs) {
3397  if (skip_physical_cols > 0) {
3398  CHECK(cd->isGeoPhyCol);
3399  skip_physical_cols--;
3400  continue;
3401  }
3402  auto mapped_idx = desc_id_to_column_id[import_idx];
3403  if (mapped_idx != -1) {
3404  import_buffers[col_idx]->add_value(cd,
3405  row.cols[mapped_idx].str_val,
3406  row.cols[mapped_idx].is_null,
3407  copy_params);
3408  }
3409  col_idx++;
3410  if (cd->columnType.is_geometry()) {
3411  // physical geo columns will be filled separately lately
3412  skip_physical_cols = cd->columnType.get_physical_cols();
3413  col_idx += skip_physical_cols;
3414  }
3415  // Advance to the next field within the row
3416  import_idx++;
3417  }
3418  rows_completed++;
3419  } catch (const std::exception& e) {
3420  LOG(ERROR) << "Input exception thrown: " << e.what()
3421  << ". Row discarded, issue at column : " << (col_idx + 1)
3422  << " data :" << row;
3423  THROW_DB_EXCEPTION(std::string("Exception: ") + e.what());
3424  }
3425  }
3426  // do batch filling of geo columns separately
3427  if (rows.size() != 0) {
3428  const auto& row = rows[0];
3429  size_t col_idx = 0; // index into column description vector
3430  try {
3431  size_t import_idx = 0;
3432  size_t skip_physical_cols = 0;
3433  for (auto cd : col_descs) {
3434  if (skip_physical_cols > 0) {
3435  skip_physical_cols--;
3436  continue;
3437  }
3438  auto mapped_idx = desc_id_to_column_id[import_idx];
3439  col_idx++;
3440  if (cd->columnType.is_geometry()) {
3441  skip_physical_cols = cd->columnType.get_physical_cols();
3442  if (mapped_idx != -1) {
3443  fillGeoColumns(session,
3444  session_ptr->getCatalog(),
3445  import_buffers,
3446  cd,
3447  col_idx,
3448  rows_completed,
3449  table_name,
3450  false);
3451  } else {
3452  col_idx += skip_physical_cols;
3453  }
3454  }
3455  import_idx++;
3456  }
3457  } catch (const std::exception& e) {
3458  LOG(ERROR) << "Input exception thrown: " << e.what()
3459  << ". Row discarded, issue at column : " << (col_idx + 1)
3460  << " data :" << row;
3461  THROW_DB_EXCEPTION(e.what());
3462  }
3463  }
3464  fillMissingBuffers(session,
3465  session_ptr->getCatalog(),
3466  import_buffers,
3467  col_descs,
3468  desc_id_to_column_id,
3469  rows_completed,
3470  table_name,
3471  false);
3472  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3473  session_ptr->getCatalog(), table_name);
3474  if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3475  THROW_DB_EXCEPTION(loader->getErrorMessage());
3476  }
3477 
3478  } catch (const std::exception& e) {
3479  THROW_DB_EXCEPTION(std::string(e.what()));
3480  }
3481 }
3482 
3483 char DBHandler::unescape_char(std::string str) {
3484  char out = str[0];
3485  if (str.size() == 2 && str[0] == '\\') {
3486  if (str[1] == 't') {
3487  out = '\t';
3488  } else if (str[1] == 'n') {
3489  out = '\n';
3490  } else if (str[1] == '0') {
3491  out = '\0';
3492  } else if (str[1] == '\'') {
3493  out = '\'';
3494  } else if (str[1] == '\\') {
3495  out = '\\';
3496  }
3497  }
3498  return out;
3499 }
3500 
3502  import_export::CopyParams copy_params;
3503  switch (cp.has_header) {
3504  case TImportHeaderRow::AUTODETECT:
3506  break;
3507  case TImportHeaderRow::NO_HEADER:
3509  break;
3510  case TImportHeaderRow::HAS_HEADER:
3512  break;
3513  default:
3514  CHECK(false);
3515  }
3516  copy_params.quoted = cp.quoted;
3517  if (cp.delimiter.length() > 0) {
3518  copy_params.delimiter = unescape_char(cp.delimiter);
3519  } else {
3520  copy_params.delimiter = '\0';
3521  }
3522  if (cp.null_str.length() > 0) {
3523  copy_params.null_str = cp.null_str;
3524  }
3525  if (cp.quote.length() > 0) {
3526  copy_params.quote = unescape_char(cp.quote);
3527  }
3528  if (cp.escape.length() > 0) {
3529  copy_params.escape = unescape_char(cp.escape);
3530  }
3531  if (cp.line_delim.length() > 0) {
3532  copy_params.line_delim = unescape_char(cp.line_delim);
3533  }
3534  if (cp.array_delim.length() > 0) {
3535  copy_params.array_delim = unescape_char(cp.array_delim);
3536  }
3537  if (cp.array_begin.length() > 0) {
3538  copy_params.array_begin = unescape_char(cp.array_begin);
3539  }
3540  if (cp.array_end.length() > 0) {
3541  copy_params.array_end = unescape_char(cp.array_end);
3542  }
3543  if (cp.threads != 0) {
3544  copy_params.threads = cp.threads;
3545  }
3546  if (cp.s3_access_key.length() > 0) {
3547  copy_params.s3_access_key = cp.s3_access_key;
3548  }
3549  if (cp.s3_secret_key.length() > 0) {
3550  copy_params.s3_secret_key = cp.s3_secret_key;
3551  }
3552  if (cp.s3_session_token.length() > 0) {
3553  copy_params.s3_session_token = cp.s3_session_token;
3554  }
3555  if (cp.s3_region.length() > 0) {
3556  copy_params.s3_region = cp.s3_region;
3557  }
3558  if (cp.s3_endpoint.length() > 0) {
3559  copy_params.s3_endpoint = cp.s3_endpoint;
3560  }
3561 #ifdef HAVE_AWS_S3
3562  if (g_allow_s3_server_privileges && cp.s3_access_key.length() == 0 &&
3563  cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3564  const auto& server_credentials =
3565  Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3566  copy_params.s3_access_key = server_credentials.GetAWSAccessKeyId();
3567  copy_params.s3_secret_key = server_credentials.GetAWSSecretKey();
3568  copy_params.s3_session_token = server_credentials.GetSessionToken();
3569  }
3570 #endif
3571 
3572  switch (cp.source_type) {
3573  case TSourceType::DELIMITED_FILE:
3575  break;
3576  case TSourceType::GEO_FILE:
3578  break;
3579  case TSourceType::PARQUET_FILE:
3580 #ifdef ENABLE_IMPORT_PARQUET
3582  break;
3583 #else
3584  THROW_DB_EXCEPTION("Parquet not supported");
3585 #endif
3586  case TSourceType::ODBC:
3587  THROW_DB_EXCEPTION("ODBC source not supported");
3588  case TSourceType::RASTER_FILE:
3590  break;
3591  default:
3592  CHECK(false);
3593  }
3594 
3595  switch (cp.geo_coords_encoding) {
3596  case TEncodingType::GEOINT:
3597  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3598  break;
3599  case TEncodingType::NONE:
3600  copy_params.geo_coords_encoding = kENCODING_NONE;
3601  break;
3602  default:
3603  THROW_DB_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3604  std::to_string((int)cp.geo_coords_encoding));
3605  }
3606  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3607  switch (cp.geo_coords_type) {
3608  case TDatumType::GEOGRAPHY:
3609  copy_params.geo_coords_type = kGEOGRAPHY;
3610  break;
3611  case TDatumType::GEOMETRY:
3612  copy_params.geo_coords_type = kGEOMETRY;
3613  break;
3614  default:
3615  THROW_DB_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3616  std::to_string((int)cp.geo_coords_type));
3617  }
3618  switch (cp.geo_coords_srid) {
3619  case 4326:
3620  case 3857:
3621  case 900913:
3622  copy_params.geo_coords_srid = cp.geo_coords_srid;
3623  break;
3624  default:
3625  THROW_DB_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3626  std::to_string((int)cp.geo_coords_srid));
3627  }
3628  copy_params.sanitize_column_names = cp.sanitize_column_names;
3629  copy_params.geo_layer_name = cp.geo_layer_name;
3630  copy_params.geo_assign_render_groups =
3631  cp.geo_assign_render_groups && g_enable_assign_render_groups;
3632  copy_params.geo_explode_collections = cp.geo_explode_collections;
3633  copy_params.source_srid = cp.source_srid;
3634  switch (cp.raster_point_type) {
3635  case TRasterPointType::NONE:
3637  break;
3638  case TRasterPointType::AUTO:
3640  break;
3641  case TRasterPointType::SMALLINT:
3643  break;
3644  case TRasterPointType::INT:
3646  break;
3647  case TRasterPointType::FLOAT:
3649  break;
3650  case TRasterPointType::DOUBLE:
3652  break;
3653  case TRasterPointType::POINT:
3655  break;
3656  default:
3657  CHECK(false);
3658  }
3659  copy_params.raster_import_bands = cp.raster_import_bands;
3660  if (cp.raster_scanlines_per_thread < 0) {
3661  THROW_DB_EXCEPTION("Invalid raster_scanlines_per_thread in TCopyParams (" +
3662  std::to_string((int)cp.raster_scanlines_per_thread));
3663  } else {
3664  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3665  }
3666  switch (cp.raster_point_transform) {
3667  case TRasterPointTransform::NONE:
3669  break;
3670  case TRasterPointTransform::AUTO:
3672  break;
3673  case TRasterPointTransform::FILE:
3675  break;
3676  case TRasterPointTransform::WORLD:
3678  break;
3679  default:
3680  CHECK(false);
3681  }
3682  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3683  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3684  copy_params.dsn = cp.odbc_dsn;
3685  copy_params.connection_string = cp.odbc_connection_string;
3686  copy_params.sql_select = cp.odbc_sql_select;
3687  copy_params.sql_order_by = cp.odbc_sql_order_by;
3688  copy_params.username = cp.odbc_username;
3689  copy_params.password = cp.odbc_password;
3690  copy_params.credential_string = cp.odbc_credential_string;
3691  copy_params.add_metadata_columns = cp.add_metadata_columns;
3692  copy_params.trim_spaces = cp.trim_spaces;
3693  return copy_params;
3694 }
3695 
3697  TCopyParams copy_params;
3698  copy_params.delimiter = cp.delimiter;
3699  copy_params.null_str = cp.null_str;
3700  switch (cp.has_header) {
3702  copy_params.has_header = TImportHeaderRow::AUTODETECT;
3703  break;
3705  copy_params.has_header = TImportHeaderRow::NO_HEADER;
3706  break;
3708  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
3709  break;
3710  default:
3711  CHECK(false);
3712  }
3713  copy_params.quoted = cp.quoted;
3714  copy_params.quote = cp.quote;
3715  copy_params.escape = cp.escape;
3716  copy_params.line_delim = cp.line_delim;
3717  copy_params.array_delim = cp.array_delim;
3718  copy_params.array_begin = cp.array_begin;
3719  copy_params.array_end = cp.array_end;
3720  copy_params.threads = cp.threads;
3721  copy_params.s3_access_key = cp.s3_access_key;
3722  copy_params.s3_secret_key = cp.s3_secret_key;
3723  copy_params.s3_session_token = cp.s3_session_token;
3724  copy_params.s3_region = cp.s3_region;
3725  copy_params.s3_endpoint = cp.s3_endpoint;
3726  switch (cp.source_type) {
3728  copy_params.source_type = TSourceType::DELIMITED_FILE;
3729  break;
3731  copy_params.source_type = TSourceType::GEO_FILE;
3732  break;
3734  copy_params.source_type = TSourceType::PARQUET_FILE;
3735  break;
3737  copy_params.source_type = TSourceType::RASTER_FILE;
3738  break;
3740  copy_params.source_type = TSourceType::ODBC;
3741  break;
3742  default:
3743  CHECK(false);
3744  }
3745  switch (cp.geo_coords_encoding) {
3746  case kENCODING_GEOINT:
3747  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
3748  break;
3749  default:
3750  copy_params.geo_coords_encoding = TEncodingType::NONE;
3751  break;
3752  }
3753  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3754  switch (cp.geo_coords_type) {
3755  case kGEOGRAPHY:
3756  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
3757  break;
3758  case kGEOMETRY:
3759  copy_params.geo_coords_type = TDatumType::GEOMETRY;
3760  break;
3761  default:
3762  CHECK(false);
3763  }
3764  copy_params.geo_coords_srid = cp.geo_coords_srid;
3765  copy_params.sanitize_column_names = cp.sanitize_column_names;
3766  copy_params.geo_layer_name = cp.geo_layer_name;
3767  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3768  copy_params.geo_explode_collections = cp.geo_explode_collections;
3769  copy_params.source_srid = cp.source_srid;
3770  switch (cp.raster_point_type) {
3772  copy_params.raster_point_type = TRasterPointType::NONE;
3773  break;
3775  copy_params.raster_point_type = TRasterPointType::AUTO;
3776  break;
3778  copy_params.raster_point_type = TRasterPointType::SMALLINT;
3779  break;
3781  copy_params.raster_point_type = TRasterPointType::INT;
3782  break;
3784  copy_params.raster_point_type = TRasterPointType::FLOAT;
3785  break;
3787  copy_params.raster_point_type = TRasterPointType::DOUBLE;
3788  break;
3790  copy_params.raster_point_type = TRasterPointType::POINT;
3791  break;
3792  default:
3793  CHECK(false);
3794  }
3795  copy_params.raster_import_bands = cp.raster_import_bands;
3796  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3797  switch (cp.raster_point_transform) {
3799  copy_params.raster_point_transform = TRasterPointTransform::NONE;
3800  break;
3802  copy_params.raster_point_transform = TRasterPointTransform::AUTO;
3803  break;
3805  copy_params.raster_point_transform = TRasterPointTransform::FILE;
3806  break;
3808  copy_params.raster_point_transform = TRasterPointTransform::WORLD;
3809  break;
3810  default:
3811  CHECK(false);
3812  }
3813  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3814  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3815  copy_params.odbc_dsn = cp.dsn;
3816  copy_params.odbc_connection_string = cp.connection_string;
3817  copy_params.odbc_sql_select = cp.sql_select;
3818  copy_params.odbc_sql_order_by = cp.sql_order_by;
3819  copy_params.odbc_username = cp.username;
3820  copy_params.odbc_password = cp.password;
3821  copy_params.odbc_credential_string = cp.credential_string;
3822  copy_params.add_metadata_columns = cp.add_metadata_columns;
3823  copy_params.trim_spaces = cp.trim_spaces;
3824  return copy_params;
3825 }
3826 
3827 namespace {
3828 void add_vsi_network_prefix(std::string& path) {
3829  // do we support network file access?
3830  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
3831 
3832  // modify head of filename based on source location
3833  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
3834  if (!gdal_network) {
3836  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
3837  }
3838  // invoke GDAL CURL virtual file reader
3839  path = "/vsicurl/" + path;
3840  } else if (boost::istarts_with(path, "s3://")) {
3841  if (!gdal_network) {
3843  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
3844  }
3845  // invoke GDAL S3 virtual file reader
3846  boost::replace_first(path, "s3://", "/vsis3/");
3847  }
3848 }
3849 
3850 void add_vsi_geo_prefix(std::string& path) {
3851  // single gzip'd file (not an archive)?
3852  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3853  path = "/vsigzip/" + path;
3854  }
3855 }
3856 
3857 void add_vsi_archive_prefix(std::string& path) {
3858  // check for compressed file or file bundle
3859  if (boost::iends_with(path, ".zip")) {
3860  // zip archive
3861  path = "/vsizip/" + path;
3862  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3863  boost::iends_with(path, ".tar.gz")) {
3864  // tar archive (compressed or uncompressed)
3865  path = "/vsitar/" + path;
3866  }
3867 }
3868 
3869 std::string remove_vsi_prefixes(const std::string& path_in) {
3870  std::string path(path_in);
3871 
3872  // these will be first
3873  if (boost::istarts_with(path, "/vsizip/")) {
3874  boost::replace_first(path, "/vsizip/", "");
3875  } else if (boost::istarts_with(path, "/vsitar/")) {
3876  boost::replace_first(path, "/vsitar/", "");
3877  } else if (boost::istarts_with(path, "/vsigzip/")) {
3878  boost::replace_first(path, "/vsigzip/", "");
3879  }
3880 
3881  // then these
3882  if (boost::istarts_with(path, "/vsicurl/")) {
3883  boost::replace_first(path, "/vsicurl/", "");
3884  } else if (boost::istarts_with(path, "/vsis3/")) {
3885  boost::replace_first(path, "/vsis3/", "s3://");
3886  }
3887 
3888  return path;
3889 }
3890 
3891 bool path_is_relative(const std::string& path) {
3892  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
3893  boost::istarts_with(path, "https://")) {
3894  return false;
3895  }
3896  return !boost::filesystem::path(path).is_absolute();
3897 }
3898 
3899 bool path_has_valid_filename(const std::string& path) {
3900  auto filename = boost::filesystem::path(path).filename().string();
3901  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
3902  return false;
3903  }
3904  return true;
3905 }
3906 
3907 bool is_a_supported_geo_file(const std::string& path) {
3908  if (!path_has_valid_filename(path)) {
3909  return false;
3910  }
3911  // this is now just for files that we want to recognize
3912  // as geo when inside an archive (see below)
3913  // @TODO(se) make this more flexible?
3914  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
3915  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
3916  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
3917  boost::iends_with(path, ".gdb.zip") || boost::iends_with(path, ".fgb")) {
3918  return true;
3919  }
3920  return false;
3921 }
3922 
3923 bool is_a_supported_archive_file(const std::string& path) {
3924  if (!path_has_valid_filename(path)) {
3925  return false;
3926  }
3927  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
3928  return true;
3929  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3930  boost::iends_with(path, ".tar.gz")) {
3931  return true;
3932  }
3933  return false;
3934 }
3935 
3936 std::string find_first_geo_file_in_archive(const std::string& archive_path,
3937  const import_export::CopyParams& copy_params) {
3938  // get the recursive list of all files in the archive
3939  std::vector<std::string> files =
3940  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
3941 
3942  // report the list
3943  LOG(INFO) << "Found " << files.size() << " files in Archive "
3944  << remove_vsi_prefixes(archive_path);
3945  for (const auto& file : files) {
3946  LOG(INFO) << " " << file;
3947  }
3948 
3949  // scan the list for the first candidate file
3950  bool found_suitable_file = false;
3951  std::string file_name;
3952  for (const auto& file : files) {
3953  if (is_a_supported_geo_file(file)) {
3954  file_name = file;
3955  found_suitable_file = true;
3956  break;
3957  }
3958  }
3959 
3960  // if we didn't find anything
3961  if (!found_suitable_file) {
3962  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
3963  remove_vsi_prefixes(archive_path);
3964  file_name.clear();
3965  }
3966 
3967  // done
3968  return file_name;
3969 }
3970 
3971 bool is_local_file(const std::string& file_path) {
3972  return (!boost::istarts_with(file_path, "s3://") &&
3973  !boost::istarts_with(file_path, "http://") &&
3974  !boost::istarts_with(file_path, "https://"));
3975 }
3976 
3977 void validate_import_file_path_if_local(const std::string& file_path) {
3978  if (is_local_file(file_path)) {
3980  file_path, ddl_utils::DataTransferType::IMPORT, true);
3981  }
3982 }
3983 } // namespace
3984 
3985 void DBHandler::detect_column_types(TDetectResult& _return,
3986  const TSessionId& session,
3987  const std::string& file_name_in,
3988  const TCopyParams& cp) {
3989  auto stdlog = STDLOG(get_session_ptr(session));
3990  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3991  check_read_only("detect_column_types");
3992 
3993  bool is_raster = false;
3994  boost::filesystem::path file_path;
3996  if (copy_params.source_type != import_export::SourceType::kOdbc) {
3997  std::string file_name{file_name_in};
3998  if (path_is_relative(file_name)) {
3999  // assume relative paths are relative to data_path / import / <session>
4000  auto temp_file_path = import_path_ / picosha2::hash256_hex_string(session) /
4001  boost::filesystem::path(file_name).filename();
4002  file_name = temp_file_path.string();
4003  }
4005 
4006  if ((copy_params.source_type == import_export::SourceType::kGeoFile ||
4008  is_local_file(file_name)) {
4009  const shared::FilePathOptions options{copy_params.regex_path_filter,
4010  copy_params.file_sort_order_by,
4011  copy_params.file_sort_regex};
4012  auto file_paths = shared::local_glob_filter_sort_files(file_name, options, false);
4013  // For geo and raster detect, pick the first file, if multiple files are provided
4014  // (e.g. through file globbing).
4015  CHECK(!file_paths.empty());
4016  file_name = file_paths[0];
4017  }
4018 
4019  // if it's a geo or raster import, handle alternative paths (S3, HTTP, archive etc.)
4020  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
4021  if (is_a_supported_archive_file(file_name)) {
4022  // find the archive file
4023  add_vsi_network_prefix(file_name);
4024  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4025  THROW_DB_EXCEPTION("Archive does not exist: " + file_name_in);
4026  }
4027  // find geo file in archive
4028  add_vsi_archive_prefix(file_name);
4029  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4030  // prepare to detect that geo file
4031  if (geo_file.size()) {
4032  file_name = file_name + std::string("/") + geo_file;
4033  }
4034  } else {
4035  // prepare to detect geo file directly
4036  add_vsi_network_prefix(file_name);
4037  add_vsi_geo_prefix(file_name);
4038  }
4039  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
4040  // prepare to detect raster file directly
4041  add_vsi_network_prefix(file_name);
4042  add_vsi_geo_prefix(file_name);
4043  is_raster = true;
4044  }
4045 
4046  file_path = boost::filesystem::path(file_name);
4047  // can be a s3 url
4048  if (!boost::istarts_with(file_name, "s3://")) {
4049  if (!boost::filesystem::path(file_name).is_absolute()) {
4050  file_path = import_path_ / picosha2::hash256_hex_string(session) /
4051  boost::filesystem::path(file_name).filename();
4052  file_name = file_path.string();
4053  }
4054 
4055  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4057  // check for geo or raster file
4058  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4059  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4060  "\" does not exist.")
4061  }
4062  } else {
4063  // check for regular file
4064  if (!shared::file_or_glob_path_exists(file_path.string())) {
4065  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4066  "\" does not exist.");
4067  }
4068  }
4069  }
4070  }
4071 
4072  try {
4074 #ifdef ENABLE_IMPORT_PARQUET
4076 #endif
4077  ) {
4078  import_export::Detector detector(file_path, copy_params);
4079  auto best_types = detector.getBestColumnTypes();
4080  std::vector<std::string> headers = detector.get_headers();
4081  copy_params = detector.get_copy_params();
4082 
4083  _return.copy_params = copyparams_to_thrift(copy_params);
4084  _return.row_set.row_desc.resize(best_types.size());
4085  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
4086  TColumnType col;
4087  auto& ti = best_types[col_idx];
4088  col.col_type.precision = ti.get_precision();
4089  col.col_type.scale = ti.get_scale();
4090  col.col_type.comp_param = ti.get_comp_param();
4091  if (ti.is_geometry()) {
4092  // set this so encoding_to_thrift does the right thing
4093  ti.set_compression(copy_params.geo_coords_encoding);
4094  // fill in these directly
4095  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
4096  col.col_type.scale = copy_params.geo_coords_srid;
4097  col.col_type.comp_param = copy_params.geo_coords_comp_param;
4098  }
4099  col.col_type.type = type_to_thrift(ti);
4100  col.col_type.encoding = encoding_to_thrift(ti);
4101  if (ti.is_array()) {
4102  col.col_type.is_array = true;
4103  }
4104  if (copy_params.sanitize_column_names) {
4105  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
4106  } else {
4107  col.col_name = headers[col_idx];
4108  }
4109  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
4110  _return.row_set.row_desc[col_idx] = col;
4111  }
4112  auto sample_data =
4114 
4115  TRow sample_row;
4116  for (auto row : sample_data) {
4117  sample_row.cols.clear();
4118  for (const auto& s : row) {
4119  TDatum td;
4120  td.val.str_val = s;
4121  td.is_null = s.empty();
4122  sample_row.cols.push_back(td);
4123  }
4124  _return.row_set.rows.push_back(sample_row);
4125  }
4126  } else if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4128  check_geospatial_files(file_path, copy_params);
4129  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
4130  file_path.string(), is_raster, Geospatial::kGeoColumnName, copy_params);
4131  for (auto cd : cds) {
4132  if (copy_params.sanitize_column_names) {
4133  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
4134  }
4135  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
4136  }
4137  if (!is_raster) {
4138  // @TODO(se) support for raster?
4139  std::map<std::string, std::vector<std::string>> sample_data;
4141  file_path.string(),
4143  sample_data,
4145  copy_params);
4146  if (sample_data.size() > 0) {
4147  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4148  TRow sample_row;
4149  for (auto cd : cds) {
4150  TDatum td;
4151  td.val.str_val = sample_data[cd.sourceName].at(i);
4152  td.is_null = td.val.str_val.empty();
4153  sample_row.cols.push_back(td);
4154  }
4155  _return.row_set.rows.push_back(sample_row);
4156  }
4157  }
4158  }
4159  _return.copy_params = copyparams_to_thrift(copy_params);
4160  }
4161  } catch (const std::exception& e) {
4162  THROW_DB_EXCEPTION("detect_column_types error: " + std::string(e.what()));
4163  }
4164 }
4165 
4166 void DBHandler::render_vega(TRenderResult& _return,
4167  const TSessionId& session,
4168  const int64_t widget_id,
4169  const std::string& vega_json,
4170  const int compression_level,
4171  const std::string& nonce) {
4172  auto stdlog = STDLOG(get_session_ptr(session),
4173  "widget_id",
4174  widget_id,
4175  "compression_level",
4176  compression_level,
4177  "vega_json",
4178  vega_json,
4179  "nonce",
4180  nonce);
4181  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4182  stdlog.appendNameValuePairs("nonce", nonce);
4183  if (!render_handler_) {
4184  THROW_DB_EXCEPTION("Backend rendering is disabled.");
4185  }
4186 
4187  // cast away const-ness of incoming Thrift string ref
4188  // to allow it to be passed down as an r-value and
4189  // ultimately std::moved into the RenderSession
4190  auto& non_const_vega_json = const_cast<std::string&>(vega_json);
4191 
4192  _return.total_time_ms = measure<>::execution([&]() {
4193  try {
4194  render_handler_->render_vega(_return,
4195  stdlog.getSessionInfo(),
4196  widget_id,
4197  std::move(non_const_vega_json),
4198  compression_level,
4199  nonce);
4200  } catch (std::exception& e) {
4201  THROW_DB_EXCEPTION(e.what());
4202  }
4203  });
4204 }
4205 
4207  int32_t dashboard_id,
4208  AccessPrivileges requestedPermissions) {
4209  DBObject object(dashboard_id, DashboardDBObjectType);
4210  auto& catalog = session_info.getCatalog();
4211  auto& user = session_info.get_currentUser();
4212  object.loadKey(catalog);
4213  object.setPrivileges(requestedPermissions);
4214  std::vector<DBObject> privs = {object};
4215  return SysCatalog::instance().checkPrivileges(user, privs);
4216 }
4217 
4218 // custom expressions
4219 namespace {
4222 
4223 std::unique_ptr<Catalog_Namespace::CustomExpression> create_custom_expr_from_thrift_obj(
4224  const TCustomExpression& t_custom_expr,
4225  const Catalog& catalog) {
4226  if (t_custom_expr.data_source_name.empty()) {
4227  THROW_DB_EXCEPTION("Custom expression data source name cannot be empty.")
4228  }
4229  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4230  << "Unexpected data source type: "
4231  << static_cast<int>(t_custom_expr.data_source_type);
4232  auto td = catalog.getMetadataForTable(t_custom_expr.data_source_name, false);
4233  if (!td) {
4234  THROW_DB_EXCEPTION("Custom expression references a table \"" +
4235  t_custom_expr.data_source_name + "\" that does not exist.")
4236  }
4237  DataSourceType data_source_type = DataSourceType::TABLE;
4238  return std::make_unique<CustomExpression>(
4239  t_custom_expr.name, t_custom_expr.expression_json, data_source_type, td->tableId);
4240 }
4241 
4242 TCustomExpression create_thrift_obj_from_custom_expr(const CustomExpression& custom_expr,
4243  const Catalog& catalog) {
4244  TCustomExpression t_custom_expr;
4245  t_custom_expr.id = custom_expr.id;
4246  t_custom_expr.name = custom_expr.name;
4247  t_custom_expr.expression_json = custom_expr.expression_json;
4248  t_custom_expr.data_source_id = custom_expr.data_source_id;
4249  t_custom_expr.is_deleted = custom_expr.is_deleted;
4250  CHECK(custom_expr.data_source_type == DataSourceType::TABLE)
4251  << "Unexpected data source type: "
4252  << static_cast<int>(custom_expr.data_source_type);
4253  t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4254  auto td = catalog.getMetadataForTable(custom_expr.data_source_id, false);
4255  if (td) {
4256  t_custom_expr.data_source_name = td->tableName;
4257  } else {
4258  LOG(WARNING)
4259  << "Custom expression references a deleted data source. Custom expression id: "
4260  << custom_expr.id << ", name: " << custom_expr.name;
4261  }
4262  return t_custom_expr;
4263 }
4264 } // namespace
4265 
4266 int32_t DBHandler::create_custom_expression(const TSessionId& session,
4267  const TCustomExpression& t_custom_expr) {
4268  auto stdlog = STDLOG(get_session_ptr(session));
4269  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4270  check_read_only("create_custom_expression");
4271 
4272  auto session_ptr = stdlog.getConstSessionInfo();
4273  if (!session_ptr->get_currentUser().isSuper) {
4274  THROW_DB_EXCEPTION("Custom expressions can only be created by super users.")
4275  }
4276  auto& catalog = session_ptr->getCatalog();
4278  return catalog.createCustomExpression(
4279  create_custom_expr_from_thrift_obj(t_custom_expr, catalog));
4280 }
4281 
4282 void DBHandler::get_custom_expressions(std::vector<TCustomExpression>& _return,
4283  const TSessionId& session) {
4284  auto stdlog = STDLOG(get_session_ptr(session));
4285  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4286 
4287  auto session_ptr = stdlog.getConstSessionInfo();
4288  auto& catalog = session_ptr->getCatalog();
4290  auto custom_expressions =
4291  catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4292  for (const auto& custom_expression : custom_expressions) {
4293  _return.emplace_back(create_thrift_obj_from_custom_expr(*custom_expression, catalog));
4294  }
4295 }
4296 
4297 void DBHandler::update_custom_expression(const TSessionId& session,
4298  const int32_t id,
4299  const std::string& expression_json) {
4300  auto stdlog = STDLOG(get_session_ptr(session));
4301  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4302  check_read_only("update_custom_expression");
4303 
4304  auto session_ptr = stdlog.getConstSessionInfo();
4305  if (!session_ptr->get_currentUser().isSuper) {
4306  THROW_DB_EXCEPTION("Custom expressions can only be updated by super users.")
4307  }
4308  auto& catalog = session_ptr->getCatalog();
4310  catalog.updateCustomExpression(id, expression_json);
4311 }
4312 
4314  const TSessionId& session,
4315  const std::vector<int32_t>& custom_expression_ids,
4316  const bool do_soft_delete) {
4317  auto stdlog = STDLOG(get_session_ptr(session));
4318  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4319  check_read_only("delete_custom_expressions");
4320 
4321  auto session_ptr = stdlog.getConstSessionInfo();
4322  if (!session_ptr->get_currentUser().isSuper) {
4323  THROW_DB_EXCEPTION("Custom expressions can only be deleted by super users.")
4324  }
4325  auto& catalog = session_ptr->getCatalog();
4327  catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4328 }
4329 
4330 // dashboards
4331 void DBHandler::get_dashboard(TDashboard& dashboard,
4332  const TSessionId& session,
4333  const int32_t dashboard_id) {
4334  auto stdlog = STDLOG(get_session_ptr(session));
4335  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4336  auto session_ptr = stdlog.getConstSessionInfo();
4337  auto const& cat = session_ptr->getCatalog();
4339  auto dash = cat.getMetadataForDashboard(dashboard_id);
4340  if (!dash) {
4341  THROW_DB_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
4342  " doesn't exist");
4343  }
4345  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4346  THROW_DB_EXCEPTION("User has no view privileges for the dashboard with id " +
4347  std::to_string(dashboard_id));
4348  }
4349  user_meta.userName = "";
4350  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4351  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
4352 }
4353 
4354 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
4355  const TSessionId& session) {
4356  auto stdlog = STDLOG(get_session_ptr(session));
4357  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4358  auto session_ptr = stdlog.getConstSessionInfo();
4359  auto const& cat = session_ptr->getCatalog();
4361  const auto dashes = cat.getAllDashboardsMetadata();
4362  user_meta.userName = "";
4363  for (const auto dash : dashes) {
4365  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4366  // dashboardState is intentionally not populated here
4367  // for payload reasons
4368  // use get_dashboard call to get state
4369  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
4370  }
4371  }
4372 }
4373 
4375  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4377  const DashboardDescriptor* dash,
4378  const bool populate_state) {
4379  auto const& cat = session_ptr->getCatalog();
4380  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4381  auto objects_list = SysCatalog::instance().getMetadataForObject(
4382  cat.getCurrentDB().dbId,
4383  static_cast<int>(DBObjectType::DashboardDBObjectType),
4384  dash->dashboardId);
4385  TDashboard dashboard;
4386  dashboard.dashboard_name = dash->dashboardName;
4387  if (populate_state) {
4388  dashboard.dashboard_state = dash->dashboardState;
4389  }
4390  dashboard.image_hash = dash->imageHash;
4391  dashboard.update_time = dash->updateTime;
4392  dashboard.dashboard_metadata = dash->dashboardMetadata;
4393  dashboard.dashboard_id = dash->dashboardId;
4394  dashboard.dashboard_owner = dash->user;
4395  TDashboardPermissions perms;
4396  // Super user has all permissions.
4397  if (session_ptr->get_currentUser().isSuper) {
4398  perms.create_ = true;
4399  perms.delete_ = true;
4400  perms.edit_ = true;
4401  perms.view_ = true;
4402  } else {
4403  // Collect all grants on current user
4404  // add them to the permissions.
4405  auto obj_to_find =
4406  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
4407  obj_to_find.loadKey(cat);
4408  std::vector<std::string> grantees =
4409  SysCatalog::instance().getRoles(true,
4410  session_ptr->get_currentUser().isSuper,
4411  session_ptr->get_currentUser().userName);
4412  for (const auto& grantee : grantees) {
4413  DBObject* object_found;
4414  auto* gr = SysCatalog::instance().getGrantee(grantee);
4415  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
4416  const auto obj_privs = object_found->getPrivileges();
4417  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4418  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4419  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4420  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4421  }
4422  }
4423  }
4424  dashboard.dashboard_permissions = perms;
4425  if (objects_list.empty() ||
4426  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
4427  dashboard.is_dash_shared = false;
4428  } else {
4429  dashboard.is_dash_shared = true;
4430  }
4431  return dashboard;
4432 }
4433 
4434 namespace dbhandler {
4435 bool is_info_schema_db(const std::string& db_name) {
4436  return (db_name == shared::kInfoSchemaDbName &&
4437  SysCatalog::instance().hasExecutedMigration(shared::kInfoSchemaMigrationName));
4438 }
4439 
4440 void check_not_info_schema_db(const std::string& db_name, bool throw_db_exception) {
4441  if (is_info_schema_db(db_name)) {
4442  std::string error_message{"Write requests/queries are not allowed in the " +
4443  shared::kInfoSchemaDbName + " database."};
4444  if (throw_db_exception) {
4445  THROW_DB_EXCEPTION(error_message)
4446  } else {
4447  throw std::runtime_error(error_message);
4448  }
4449  }
4450 }
4451 } // namespace dbhandler
4452 
4453 int32_t DBHandler::create_dashboard(const TSessionId& session,
4454  const std::string& dashboard_name,
4455  const std::string& dashboard_state,
4456  const std::string& image_hash,
4457  const std::string& dashboard_metadata) {
4458  auto stdlog = STDLOG(get_session_ptr(session));
4459  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4460  auto session_ptr = stdlog.getConstSessionInfo();
4461  CHECK(session_ptr);
4462  check_read_only("create_dashboard");
4463  auto& cat = session_ptr->getCatalog();
4466  }
4467 
4468  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
4470  THROW_DB_EXCEPTION("Not enough privileges to create a dashboard.");
4471  }
4472 
4473  if (dashboard_exists(cat, session_ptr->get_currentUser().userId, dashboard_name)) {
4474  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4475  }
4476 
4478  dd.dashboardName = dashboard_name;
4479  dd.dashboardState = dashboard_state;
4480  dd.imageHash = image_hash;
4481  dd.dashboardMetadata = dashboard_metadata;
4482  dd.userId = session_ptr->get_currentUser().userId;
4483  dd.user = session_ptr->get_currentUser().userName;
4484 
4485  try {
4486  auto id = cat.createDashboard(dd);
4487  // TODO: transactionally unsafe
4488  SysCatalog::instance().createDBObject(
4489  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
4490  return id;
4491  } catch (const std::exception& e) {
4492  THROW_DB_EXCEPTION(e.what());
4493  }
4494 }
4495 
4496 void DBHandler::replace_dashboard(const TSessionId& session,
4497  const int32_t dashboard_id,
4498  const std::string& dashboard_name,
4499  const std::string& dashboard_owner,
4500  const std::string& dashboard_state,
4501  const std::string& image_hash,
4502  const std::string& dashboard_metadata) {
4503  auto stdlog = STDLOG(get_session_ptr(session));
4504  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4505  auto session_ptr = stdlog.getConstSessionInfo();
4506  CHECK(session_ptr);
4507  check_read_only("replace_dashboard");
4508  auto& cat = session_ptr->getCatalog();
4511  }
4512 
4514  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
4515  THROW_DB_EXCEPTION("Not enough privileges to replace a dashboard.");
4516  }
4517 
4518  if (auto dash = cat.getMetadataForDashboard(
4519  std::to_string(session_ptr->get_currentUser().userId), dashboard_name)) {
4520  if (dash->dashboardId != dashboard_id) {
4521  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4522  }
4523  }
4524 
4526  dd.dashboardName = dashboard_name;
4527  dd.dashboardState = dashboard_state;
4528  dd.imageHash = image_hash;
4529  dd.dashboardMetadata = dashboard_metadata;
4531  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
4532  THROW_DB_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
4533  " does not exist");
4534  }
4535  dd.userId = user.userId;
4536  dd.user = dashboard_owner;
4537  dd.dashboardId = dashboard_id;
4538 
4539  try {
4540  cat.replaceDashboard(dd);
4541  } catch (const std::exception& e) {
4542  THROW_DB_EXCEPTION(e.what());
4543  }
4544 }
4545 
4546 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
4547  delete_dashboards(session, {dashboard_id});
4548 }
4549 
4550 void DBHandler::delete_dashboards(const TSessionId& session,
4551  const std::vector<int32_t>& dashboard_ids) {
4552  auto stdlog = STDLOG(get_session_ptr(session));
4553  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4554  auto session_ptr = stdlog.getConstSessionInfo();
4555  check_read_only("delete_dashboards");
4556  auto& cat = session_ptr->getCatalog();
4559  }
4560  // Checks will be performed in catalog
4561  try {
4562  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4563  } catch (const std::exception& e) {
4564  THROW_DB_EXCEPTION(e.what());
4565  }
4566 }
4567 
4568 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
4569  int32_t dashboard_id,
4570  std::vector<std::string> groups) {
4571  const auto session_info = get_session_copy(session);
4572  auto& cat = session_info.getCatalog();
4573  auto dash = cat.getMetadataForDashboard(dashboard_id);
4574  if (!dash) {
4575  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4576  " does not exist");
4577  } else if (session_info.get_currentUser().userId != dash->userId &&
4578  !session_info.get_currentUser().isSuper) {
4579  throw std::runtime_error(
4580  "User should be either owner of dashboard or super user to share/unshare it");
4581  }
4582  std::vector<std::string> valid_groups;
4584  for (auto& group : groups) {
4585  user_meta.isSuper = false; // initialize default flag
4586  if (!SysCatalog::instance().getGrantee(group)) {
4587  THROW_DB_EXCEPTION("User/Role " + group + " does not exist");
4588  } else if (!user_meta.isSuper) {
4589  valid_groups.push_back(group);
4590  }
4591  }
4592  return valid_groups;
4593 }
4594 
4595 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
4596  for (auto const& group : groups) {
4597  if (!SysCatalog::instance().getGrantee(group)) {
4598  THROW_DB_EXCEPTION("User/Role '" + group + "' does not exist");
4599  }
4600  }
4601 }
4602 
4604  const Catalog_Namespace::SessionInfo& session_info,
4605  const std::vector<int32_t>& dashboard_ids) {
4606  auto& cat = session_info.getCatalog();
4607  std::map<std::string, std::list<int32_t>> errors;
4608  for (auto const& dashboard_id : dashboard_ids) {
4609  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
4610  if (!dashboard) {
4611  errors["Dashboard id does not exist"].push_back(dashboard_id);
4612  } else if (session_info.get_currentUser().userId != dashboard->userId &&
4613  !session_info.get_currentUser().isSuper) {
4614  errors["User should be either owner of dashboard or super user to share/unshare it"]
4615  .push_back(dashboard_id);
4616  }
4617  }
4618  if (!errors.empty()) {
4619  std::stringstream error_stream;
4620  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
4621  for (const auto& [error, id_list] : errors) {
4622  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
4623  }
4624  THROW_DB_EXCEPTION(error_stream.str());
4625  }
4626 }
4627 
4628 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
4629  const std::vector<int32_t>& dashboard_ids,
4630  const std::vector<std::string>& groups,
4631  const TDashboardPermissions& permissions,
4632  const bool do_share) {
4633  auto stdlog = STDLOG(get_session_ptr(session));
4634  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4635  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
4636  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
4637  !permissions.view_) {
4638  THROW_DB_EXCEPTION("At least one privilege should be assigned for " +
4639  std::string(do_share ? "grants" : "revokes"));
4640  }
4641  auto session_ptr = stdlog.getConstSessionInfo();
4642  auto const& catalog = session_ptr->getCatalog();
4643  auto& sys_catalog = SysCatalog::instance();
4644  validateGroups(groups);
4645  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
4646  std::vector<DBObject> batch_objects;
4647  for (auto const& dashboard_id : dashboard_ids) {
4648  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
4649  AccessPrivileges privs;
4650  if (permissions.delete_) {
4652  }
4653  if (permissions.create_) {
4655  }
4656  if (permissions.edit_) {
4658  }
4659  if (permissions.view_) {
4661  }
4662  object.setPrivileges(privs);
4663  batch_objects.push_back(object);
4664  }
4665  if (do_share) {
4666  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4667  } else {
4668  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4669  }
4670 }
4671 
4672 void DBHandler::share_dashboards(const TSessionId& session,
4673  const std::vector<int32_t>& dashboard_ids,
4674  const std::vector<std::string>& groups,
4675  const TDashboardPermissions& permissions) {
4676  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
4677 }
4678 
4679 // NOOP: Grants not available for objects as of now
4680 void DBHandler::share_dashboard(const TSessionId& session,
4681  const int32_t dashboard_id,
4682  const std::vector<std::string>& groups,
4683  const std::vector<std::string>& objects,
4684  const TDashboardPermissions& permissions,
4685  const bool grant_role = false) {
4686  share_dashboards(session, {dashboard_id}, groups, permissions);
4687 }
4688 
4689 void DBHandler::unshare_dashboards(const TSessionId& session,
4690  const std::vector<int32_t>& dashboard_ids,
4691  const std::vector<std::string>& groups,
4692  const TDashboardPermissions& permissions) {
4693  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
4694 }
4695 
4696 void DBHandler::unshare_dashboard(const TSessionId& session,
4697  const int32_t dashboard_id,
4698  const std::vector<std::string>& groups,
4699  const std::vector<std::string>& objects,
4700  const TDashboardPermissions& permissions) {
4701  unshare_dashboards(session, {dashboard_id}, groups, permissions);
4702 }
4703 
4705  std::vector<TDashboardGrantees>& dashboard_grantees,
4706  const TSessionId& session,
4707  const int32_t dashboard_id) {
4708  auto stdlog = STDLOG(get_session_ptr(session));
4709  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4710  auto session_ptr = stdlog.getConstSessionInfo();
4711  auto const& cat = session_ptr->getCatalog();
4713  auto dash = cat.getMetadataForDashboard(dashboard_id);
4714  if (!dash) {
4715  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4716  " does not exist");
4717  } else if (session_ptr->get_currentUser().userId != dash->userId &&
4718  !session_ptr->get_currentUser().isSuper) {
4720  "User should be either owner of dashboard or super user to access grantees");
4721  }
4722  std::vector<ObjectRoleDescriptor*> objectsList;
4723  objectsList = SysCatalog::instance().getMetadataForObject(
4724  cat.getCurrentDB().dbId,
4725  static_cast<int>(DBObjectType::DashboardDBObjectType),
4726  dashboard_id); // By default objecttypecan be only dashabaords
4727  user_meta.userId = -1;
4728  user_meta.userName = "";
4729  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4730  for (auto object : objectsList) {
4731  if (user_meta.userName == object->roleName) {
4732  // Mask owner
4733  continue;
4734  }
4735  TDashboardGrantees grantee;
4736  TDashboardPermissions perm;
4737  grantee.name = object->roleName;
4738  grantee.is_user = object->roleType;
4739  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4740  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4741  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4742  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4743  grantee.permissions = perm;
4744  dashboard_grantees.push_back(grantee);
4745  }
4746 }
4747 
4748 void DBHandler::create_link(std::string& _return,
4749  const TSessionId& session,
4750  const std::string& view_state,
4751  const std::string& view_metadata) {
4752  auto stdlog = STDLOG(get_session_ptr(session));
4753  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4754  auto session_ptr = stdlog.getConstSessionInfo();
4755  // check_read_only("create_link");
4756  auto& cat = session_ptr->getCatalog();
4757 
4758  LinkDescriptor ld;
4759  ld.userId = session_ptr->get_currentUser().userId;
4760  ld.viewState = view_state;
4761  ld.viewMetadata = view_metadata;
4762 
4763  try {
4764  _return = cat.createLink(ld, 6);
4765  } catch (const std::exception& e) {
4766  THROW_DB_EXCEPTION(e.what());
4767  }
4768 }
4769 
4771  const std::string& name,
4772  const bool is_array) {
4773  TColumnType ct;
4774  ct.col_name = name;
4775  ct.col_type.type = type;
4776  ct.col_type.is_array = is_array;
4777  return ct;
4778 }
4779 
4780 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
4781  const import_export::CopyParams& copy_params) {
4782  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
4783  if (std::find(shp_ext.begin(),
4784  shp_ext.end(),
4785  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
4786  shp_ext.end()) {
4787  for (auto ext : shp_ext) {
4788  auto aux_file = file_path;
4790  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
4791  copy_params) &&
4793  aux_file.replace_extension(ext).string(), copy_params)) {
4794  throw std::runtime_error("required file for shapefile does not exist: " +
4795  aux_file.filename().string());
4796  }
4797  }
4798  }
4799 }
4800 
4801 void DBHandler::create_table(const TSessionId& session,
4802  const std::string& table_name,
4803  const TRowDescriptor& rd,
4804  const TCreateParams& create_params) {
4805  // @TODO(se) remove file_type which is unused
4806  auto stdlog = STDLOG("table_name", table_name);
4807  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4808  check_read_only("create_table");
4809 
4810  if (ImportHelpers::is_reserved_name(table_name)) {
4811  THROW_DB_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
4812  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
4813  THROW_DB_EXCEPTION("Invalid characters in table name: " + table_name);
4814  }
4815 
4816  auto rds = rd;
4817 
4818  std::string stmt{"CREATE TABLE " + table_name};
4819  std::vector<std::string> col_stmts;
4820 
4821  for (auto col : rds) {
4822  if (ImportHelpers::is_reserved_name(col.col_name)) {
4823  THROW_DB_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
4824  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
4825  THROW_DB_EXCEPTION("Invalid characters in column name: " + col.col_name);
4826  }
4827  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
4828  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
4829  THROW_DB_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
4830  " for column: " + col.col_name);
4831  }
4832 
4833  if (col.col_type.type == TDatumType::DECIMAL) {
4834  // if no precision or scale passed in set to default 14,7
4835  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
4836  col.col_type.precision = 14;
4837  col.col_type.scale = 7;
4838  }
4839  }
4840 
4841  std::string col_stmt;
4842  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
4843  if (col.__isset.default_value) {
4844  col_stmt.append(" DEFAULT " + col.default_value);
4845  }
4846 
4847  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
4848  // `nullable` argument, leading this to default to false. Uncomment for v2.
4849  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
4850 
4851  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
4852  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
4853  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
4854  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
4855  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT ||
4856  thrift_to_encoding(col.col_type.encoding) == kENCODING_DATE_IN_DAYS) {
4857  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
4858  }
4859  } else if (col.col_type.type == TDatumType::STR) {
4860  // non DICT encoded strings
4861  col_stmt.append(" ENCODING NONE");
4862  } else if (col.col_type.type == TDatumType::POINT ||
4863  col.col_type.type == TDatumType::MULTIPOINT ||
4864  col.col_type.type == TDatumType::LINESTRING ||
4865  col.col_type.type == TDatumType::MULTILINESTRING ||
4866  col.col_type.type == TDatumType::POLYGON ||
4867  col.col_type.type == TDatumType::MULTIPOLYGON) {
4868  // non encoded compressable geo
4869  if (col.col_type.scale == 4326) {
4870  col_stmt.append(" ENCODING NONE");
4871  }
4872  }
4873  col_stmts.push_back(col_stmt);
4874  }
4875 
4876  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
4877 
4878  if (create_params.is_replicated) {
4879  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
4880  }
4881 
4882  stmt.append(";");
4883 
4884  TQueryResult ret;
4885  sql_execute(ret, session, stmt, true, "", -1, -1);
4886 }
4887 
4888 void DBHandler::import_table(const TSessionId& session,
4889  const std::string& table_name,
4890  const std::string& file_name_in,
4891  const TCopyParams& cp) {
4892  try {
4893  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4894  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4895  auto session_ptr = stdlog.getConstSessionInfo();
4896  check_read_only("import_table");
4897  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
4898 
4899  const auto execute_read_lock =
4903  auto& cat = session_ptr->getCatalog();
4905  auto start_time = ::toString(std::chrono::system_clock::now());
4907  executor->enrollQuerySession(session,
4908  "IMPORT_TABLE",
4909  start_time,
4911  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
4912  }
4913 
4914  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
4915  // reset the runtime query interrupt status
4917  executor->clearQuerySessionStatus(session, start_time);
4918  }
4919  };
4920  const auto td_with_lock =
4922  cat, table_name);
4923  const auto td = td_with_lock();
4924  CHECK(td);
4925  check_table_load_privileges(*session_ptr, table_name);
4926 
4927  std::string copy_from_source;
4929  if (copy_params.source_type == import_export::SourceType::kOdbc) {
4930  copy_from_source = copy_params.sql_select;
4931  } else {
4932  std::string file_name{file_name_in};
4933  auto file_path = boost::filesystem::path(file_name);
4934  if (!boost::istarts_with(file_name, "s3://")) {
4935  if (!boost::filesystem::path(file_name).is_absolute()) {
4936  file_path = import_path_ / picosha2::hash256_hex_string(session) /
4937  boost::filesystem::path(file_name).filename();
4938  file_name = file_path.string();
4939  }
4940  if (!shared::file_or_glob_path_exists(file_path.string())) {
4941  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4942  "\" does not exist.");
4943  }
4944  }
4946 
4947  // TODO(andrew): add delimiter detection to Importer
4948  if (copy_params.delimiter == '\0') {
4949  copy_params.delimiter = ',';
4950  if (boost::filesystem::extension(file_path) == ".tsv") {
4951  copy_params.delimiter = '\t';
4952  }
4953  }
4954  copy_from_source = file_path.string();
4955  }
4956 
4957  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
4958  session_ptr->getCatalog(), table_name);
4959  std::unique_ptr<import_export::AbstractImporter> importer;
4960  importer = import_export::create_importer(cat, td, copy_from_source, copy_params);
4961  auto ms = measure<>::execution([&]() { importer->import(session_ptr.get()); });
4962  LOG(INFO) << "Total Import Time: " << (double)ms / 1000.0 << " Seconds.";
4963  } catch (const TDBException& e) {
4964  throw;
4965  } catch (const std::exception& e) {
4966  THROW_DB_EXCEPTION(std::string(e.what()));
4967  }
4968 }
4969 
4970 namespace {
4971 
4972 // helper functions for error checking below
4973 // these would usefully be added as methods of TDatumType
4974 // but that's not possible as it's auto-generated by Thrift
4975 
4977  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
4978  t == TDatumType::LINESTRING || t == TDatumType::MULTILINESTRING ||
4979  t == TDatumType::POINT || t == TDatumType::MULTIPOINT);
4980 }
4981 
4983  std::stringstream ss;
4984  ss << t;
4985  return ss.str();
4986 }
4987 
4988 } // namespace
4989 
4990 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
4991  THROW_DB_EXCEPTION("Could not append geo/raster file '" + \
4992  file_path.filename().string() + "' to table '" + table_name + \
4993  "'. Column '" + cd->columnName + "' " + attr + " mismatch (got '" + \
4994  got + "', expected '" + expected + "')");
4995 
4996 void DBHandler::import_geo_table(const TSessionId& session,
4997  const std::string& table_name,
4998  const std::string& file_name,
4999  const TCopyParams& cp,
5000  const TRowDescriptor& row_desc,
5001  const TCreateParams& create_params) {
5002  // this is the direct Thrift endpoint
5003  // it does NOT support the separate FSI regex/filter/sort options
5004  // but it DOES support basic globbing specified in the filename itself
5006  session, table_name, file_name, thrift_to_copyparams(cp), row_desc, create_params);
5007 }
5008 
5009 void DBHandler::importGeoTableGlobFilterSort(const TSessionId& session,
5010  const std::string& table_name,
5011  const std::string& file_name,
5012  const import_export::CopyParams& copy_params,
5013  const TRowDescriptor& row_desc,
5014  const TCreateParams& create_params) {
5015  // this is called by the above direct Thrift endpoint
5016  // and also for a deferred COPY FROM for geo/raster
5017  // it DOES support the full FSI regex/filter/sort options
5018  std::vector<std::string> file_names;
5019  try {
5020  const shared::FilePathOptions options{copy_params.regex_path_filter,
5021  copy_params.file_sort_order_by,
5022  copy_params.file_sort_regex};
5024  file_names = shared::local_glob_filter_sort_files(file_name, options, false);
5025  } catch (const shared::FileNotFoundException& e) {
5026  // no files match, just try the original filename, might be remote
5027  file_names.push_back(file_name);
5028  }
5029  // import whatever we found
5030  for (auto const& file_name : file_names) {
5032  session, table_name, file_name, copy_params, row_desc, create_params);
5033  }
5034 }
5035 
5036 void DBHandler::importGeoTableSingle(const TSessionId& session,
5037  const std::string& table_name,
5038  const std::string& file_name_in,
5039  const import_export::CopyParams& copy_params,
5040  const TRowDescriptor& row_desc,
5041  const TCreateParams& create_params) {
5042  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
5043  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5044  auto session_ptr = stdlog.getConstSessionInfo();
5045  check_read_only("import_table");
5046 
5047  auto& cat = session_ptr->getCatalog();
5049  auto start_time = ::toString(std::chrono::system_clock::now());
5051  executor->enrollQuerySession(session,
5052  "IMPORT_GEO_TABLE",
5053  start_time,
5055  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5056  }
5057 
5058  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
5059  // reset the runtime query interrupt status
5061  executor->clearQuerySessionStatus(session, start_time);
5062  }
5063  };
5064 
5065  std::string file_name{file_name_in};
5066 
5067  if (path_is_relative(file_name)) {
5068  // assume relative paths are relative to data_path / import / <session>
5069  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5070  boost::filesystem::path(file_name).filename();
5071  file_name = file_path.string();
5072  }
5074 
5075  bool is_raster = false;
5076  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
5077  if (is_a_supported_archive_file(file_name)) {
5078  // find the archive file
5079  add_vsi_network_prefix(file_name);
5080  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
5081  THROW_DB_EXCEPTION("Archive does not exist: " + file_name_in);
5082  }
5083  // find geo file in archive
5084  add_vsi_archive_prefix(file_name);
5085  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
5086  // prepare to load that geo file
5087  if (geo_file.size()) {
5088  file_name = file_name + std::string("/") + geo_file;
5089  }
5090  } else {
5091  // prepare to load geo file directly
5092  add_vsi_network_prefix(file_name);
5093  add_vsi_geo_prefix(file_name);
5094  }
5095  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
5096  // prepare to load geo raster file directly
5097  add_vsi_network_prefix(file_name);
5098  add_vsi_geo_prefix(file_name);
5099  is_raster = true;
5100  } else {
5101  THROW_DB_EXCEPTION("import_geo_table called with file_type other than GEO or RASTER");
5102  }
5103 
5104  // log what we're about to try to do
5105  VLOG(1) << "import_geo_table: Original filename: " << file_name_in;
5106  VLOG(1) << "import_geo_table: Actual filename: " << file_name;
5107  VLOG(1) << "import_geo_table: Raster: " << is_raster;
5108 
5109  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
5110  auto file_path = boost::filesystem::path(file_name);
5111  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
5112  THROW_DB_EXCEPTION("File does not exist: " + file_path.filename().string());
5113  }
5114 
5115  // use GDAL to check any dependent files exist (ditto)
5116  try {
5117  check_geospatial_files(file_path, copy_params);
5118  } catch (const std::exception& e) {
5119  THROW_DB_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5120  }
5121 
5122  // get layer info and deconstruct
5123  // in general, we will get a combination of layers of these four types:
5124  // EMPTY: no rows, report and skip
5125  // GEO: create a geo table from this
5126  // NON_GEO: create a regular table from this
5127  // UNSUPPORTED_GEO: report and skip
5128  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
5129  if (!is_raster) {
5130  try {
5131  layer_info =
5132  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
5133  } catch (const std::exception& e) {
5134  THROW_DB_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5135  }
5136  }
5137 
5138  // categorize the results
5139  using LayerNameToContentsMap =
5140  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
5141  LayerNameToContentsMap load_layers;
5142  LOG_IF(INFO, layer_info.size() > 0)
5143  << "import_geo_table: Found the following layers in the geo file:";
5144  for (const auto& layer : layer_info) {
5145  switch (layer.contents) {
5147  LOG(INFO) << "import_geo_table: '" << layer.name
5148  << "' (will import as geo table)";
5149  load_layers[layer.name] = layer.contents;
5150  break;
5152  LOG(INFO) << "import_geo_table: '" << layer.name
5153  << "' (will import as regular table)";
5154  load_layers[layer.name] = layer.contents;
5155  break;
5157  LOG(WARNING) << "import_geo_table: '" << layer.name
5158  << "' (will not import, unsupported geo type)";
5159  break;
5161  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
5162  break;
5163  default:
5164  break;
5165  }
5166  }
5167 
5168  // if nothing is loadable, stop now
5169  if (!is_raster && load_layers.size() == 0) {
5170  THROW_DB_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
5171  }
5172 
5173  // if we've been given an explicit layer name, check that it exists and is loadable
5174  // scan the original list, as it may exist but not have been gathered as loadable
5175  if (!is_raster && copy_params.geo_layer_name.size()) {
5176  bool found = false;
5177  for (const auto& layer : layer_info) {
5178  if (copy_params.geo_layer_name == layer.name) {
5181  // forget all the other layers and just load this one
5182  load_layers.clear();
5183  load_layers[layer.name] = layer.contents;
5184  found = true;
5185  break;
5186  } else if (layer.contents ==
5188  THROW_DB_EXCEPTION("import_geo_table: Explicit geo layer '" +
5189  copy_params.geo_layer_name + "' has unsupported geo type!");
5190  } else if (layer.contents ==
5192  THROW_DB_EXCEPTION("import_geo_table: Explicit geo layer '" +
5193  copy_params.geo_layer_name + "' is empty!");
5194  }
5195  }
5196  }
5197  if (!found) {
5198  THROW_DB_EXCEPTION("import_geo_table: Explicit geo layer '" +
5199  copy_params.geo_layer_name + "' not found!");
5200  }
5201  }
5202 
5203  // Immerse import of multiple layers is not yet supported
5204  // @TODO fix this!
5205  if (!is_raster && row_desc.size() > 0 && load_layers.size() > 1) {
5207  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
5208  }
5209 
5210  // one definition of layer table name construction
5211  // we append the layer name if we're loading more than one table
5212  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
5213  const std::string& layer_name) {
5214  if (load_layers.size() > 1) {
5215  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
5216  if (sanitized_layer_name != layer_name) {
5217  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
5218  << sanitized_layer_name << "' for table name";
5219  }
5220  return table_name + "_" + sanitized_layer_name;
5221  }
5222  return table_name;
5223  };
5224 
5225  // if we're importing multiple tables, then NONE of them must exist already
5226  if (!is_raster && load_layers.size() > 1) {
5227  for (const auto& layer : load_layers) {
5228  // construct table name
5229  auto this_table_name = construct_layer_table_name(table_name, layer.first);
5230 
5231  // table must not exist
5232  if (cat.getMetadataForTable(this_table_name)) {
5233  THROW_DB_EXCEPTION("import_geo_table: Table '" + this_table_name +
5234  "' already exists, aborting!");
5235  }
5236  }
5237  }
5238 
5239  // prepare to gather errors that would otherwise be exceptions, as we can only throw
5240  // one
5241  std::vector<std::string> caught_exception_messages;
5242 
5243  // prepare to time multi-layer import
5244  double total_import_ms = 0.0;
5245 
5246  // for geo raster, we make a single dummy layer
5247  // the name is irrelevant, but set it to the filename so the log makes sense
5248  if (is_raster) {
5249  CHECK_EQ(load_layers.size(), 0u);
5250  load_layers.emplace(file_name, import_export::Importer::GeoFileLayerContents::GEO);
5251  }
5252 
5253  // now we're safe to start importing
5254  // we loop over the layers we're going to attempt to load
5255  for (const auto& layer : load_layers) {
5256  // unpack
5257  const auto& layer_name = layer.first;
5258  const auto& layer_contents = layer.second;
5259  bool is_geo_layer =
5261 
5262  // construct table name again
5263  auto this_table_name = construct_layer_table_name(table_name, layer_name);
5264 
5265  // report
5266  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
5267 
5268  // we need a row descriptor
5269  TRowDescriptor rd;
5270  if (row_desc.size() > 0) {
5271  // we have a valid RowDescriptor
5272  // this is the case where Immerse has already detected and created
5273  // all we need to do is import and trust that the data will match
5274  // use the provided row descriptor
5275  // table must already exist (we check this below)
5276  rd = row_desc;
5277  } else {
5278  // we don't have a RowDescriptor
5279  // we have to detect the file ourselves
5280  TDetectResult cds;
5281  TCopyParams cp_copy = copyparams_to_thrift(copy_params);
5282  cp_copy.geo_layer_name = layer_name;
5283  try {
5284  detect_column_types(cds, session, file_name_in, cp_copy);
5285  } catch (const std::exception& e) {
5286  // capture the error and abort this layer
5287  caught_exception_messages.emplace_back("Column Type Detection failed for '" +
5288  layer_name + "':" + e.what());
5289  continue;
5290  }
5291  rd = cds.row_set.row_desc;
5292 
5293  // then, if the table does NOT already exist, create it
5294  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
5295  if (!td) {
5296  try {
5297  create_table(session, this_table_name, rd, create_params);
5298  } catch (const std::exception& e) {
5299  // capture the error and abort this layer
5300  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
5301  layer_name + "':" + e.what());
5302  continue;
5303  }
5304  }
5305  }
5306 
5307  // match locking sequence for CopyTableStmt::execute
5311 
5312  const TableDescriptor* td{nullptr};
5313  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5314  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5315 
5316  try {
5317  td_with_lock =
5318  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5320  lockmgr::ReadLock>::acquireTableDescriptor(cat, this_table_name));
5321  td = (*td_with_lock)();
5322  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5324  } catch (const std::runtime_error& e) {
5325  // capture the error and abort this layer
5326  std::string exception_message = "Could not import geo/raster file '" +
5327  file_path.filename().string() + "' to table '" +
5328  this_table_name +
5329  "'; table does not exist or failed to create.";
5330  caught_exception_messages.emplace_back(exception_message);
5331  continue;
5332  }
5333  CHECK(td);
5334 
5335  // then, we have to verify that the structure matches
5336  // get column descriptors (non-system, non-deleted, logical columns only)
5337  const auto col_descriptors =
5338  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5339 
5340  // first, compare the column count
5341  if (col_descriptors.size() != rd.size()) {
5342  // capture the error and abort this layer
5343  std::string exception_message = "Could not append geo/raster file '" +
5344  file_path.filename().string() + "' to table '" +
5345  this_table_name + "'. Column count mismatch (got " +
5346  std::to_string(rd.size()) + ", expecting " +
5347  std::to_string(col_descriptors.size()) + ")";
5348  caught_exception_messages.emplace_back(exception_message);
5349  continue;
5350  }
5351 
5352  try {
5353  // validate column type match
5354  // also handle geo column name changes
5355  int rd_index = 0;
5356  for (auto const* cd : col_descriptors) {
5357  auto const cd_col_type = populateThriftColumnType(&cat, cd);
5358 
5359  // for types, all we care about is that the got and expected types are either both
5360  // geo or both non-geo, and if they're geo that the exact geo type matches
5361  auto const gtype = rd[rd_index].col_type.type; // importer type
5362  auto const etype = cd_col_type.col_type.type; // existing table type
5363  if (TTypeInfo_IsGeo(gtype) && TTypeInfo_IsGeo(etype)) {
5364  if (gtype != etype) {
5366  "type", TTypeInfo_TypeToString(gtype), TTypeInfo_TypeToString(etype));
5367  }
5368  } else if (TTypeInfo_IsGeo(gtype) != TTypeInfo_IsGeo(etype)) {
5370  "type", TTypeInfo_TypeToString(gtype), TTypeInfo_TypeToString(etype));
5371  }
5372 
5373  // for names, we keep the existing table geo column name (for example, to handle
5374  // the case where an existing table has a geo column with a legacy name), but all
5375  // other column names must match, otherwise the import will fail
5376  auto const gname = rd[rd_index].col_name; // importer name
5377  auto const ename = cd->columnName; // existing table name
5378  if (gname != ename) {
5379  if (TTypeInfo_IsGeo(gtype)) {
5380  LOG(INFO) << "import_geo_table: Renaming incoming geo column to match "
5381  "existing table column name '"
5382  << ename << "'";
5383  rd[rd_index].col_name = ename;
5384  } else {
5385  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
5386  }
5387  }
5388  rd_index++;
5389  }
5390  } catch (const std::exception& e) {
5391  // capture the error and abort this layer
5392  caught_exception_messages.emplace_back(e.what());
5393  continue;
5394  }
5395 
5396  std::map<std::string, std::string> colname_to_src;
5397  for (auto r : rd) {
5398  colname_to_src[r.col_name] =
5399  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
5400  }
5401 
5402  try {
5403  check_table_load_privileges(*session_ptr, this_table_name);
5404  } catch (const std::exception& e) {
5405  // capture the error and abort this layer
5406  caught_exception_messages.emplace_back(e.what());
5407  continue;
5408  }
5409 
5410  if (!is_raster && is_geo_layer) {
5411  // Final check to ensure that we have exactly one geo column
5412  // before doing the actual import, in case the user naively
5413  // overrode the types in Immerse Preview (which as of 6/17/21
5414  // it still allows you to do). We should make Immerse more
5415  // robust and disallow re-typing of columns to/from geo types
5416  // completely. Currently, if multiple columns are re-typed
5417  // such that there is still exactly one geo column (but it's
5418  // the wrong one) then this test will pass, but the import
5419  // will then reject some (or more likely all) of the rows.
5420  int num_geo_columns{0};
5421  for (auto const& col : rd) {
5422  if (TTypeInfo_IsGeo(col.col_type.type)) {
5423  num_geo_columns++;
5424  }
5425  }
5426  if (num_geo_columns != 1) {
5427  std::string exception_message =
5428  "Table '" + this_table_name +
5429  "' must have exactly one geo column. Import aborted!";
5430  caught_exception_messages.emplace_back(exception_message);
5431  continue;
5432  }
5433  }
5434 
5435  std::string layer_or_raster = is_raster ? "Raster" : "Layer";
5436 
5437  try {
5438  // import this layer only?
5439  import_export::CopyParams copy_params_copy = copy_params;
5440  copy_params_copy.geo_layer_name = layer_name;
5441 
5442  // create an importer
5443  std::unique_ptr<import_export::Importer> importer;
5444  importer.reset(
5445  new import_export::Importer(cat, td, file_path.string(), copy_params_copy));
5446 
5447  // import
5448  auto ms = measure<>::execution(
5449  [&]() { importer->importGDAL(colname_to_src, session_ptr.get(), is_raster); });
5450  LOG(INFO) << "Import of " << layer_or_raster << " '" << layer_name << "' took "
5451  << (double)ms / 1000.0 << "s";
5452  total_import_ms += ms;
5453  } catch (const std::exception& e) {
5454  std::string exception_message = "Import of " + layer_or_raster + " '" +
5455  this_table_name + "' failed: " + e.what();
5456  caught_exception_messages.emplace_back(exception_message);
5457  continue;
5458  }
5459  }
5460 
5461  // did we catch any exceptions?
5462  if (caught_exception_messages.size()) {
5463  // combine all the strings into one and throw a single Thrift exception
5464  std::string combined_exception_message = "Failed to import geo/raster file: ";
5465  bool comma{false};
5466  for (const auto& message : caught_exception_messages) {
5467  combined_exception_message += comma ? (", " + message) : message;
5468  comma = true;
5469  }
5470  THROW_DB_EXCEPTION(combined_exception_message);
5471  } else {
5472  // report success and total time
5473  LOG(INFO) << "Import Successful!";
5474  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
5475  }
5476 }
5477 
5478 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
5479 
5480 void DBHandler::import_table_status(TImportStatus& _return,
5481  const TSessionId& session,
5482  const std::string& import_id) {
5483  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
5484  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5485  auto is = import_export::Importer::get_import_status(import_id);
5486  _return.elapsed = is.elapsed.count();
5487  _return.rows_completed = is.rows_completed;
5488  _return.rows_estimated = is.rows_estimated;
5489  _return.rows_rejected = is.rows_rejected;
5490 }
5491 
5493  const TSessionId& session,
5494  const std::string& archive_path_in,
5495  const TCopyParams& copy_params) {
5496  auto stdlog =
5497  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
5498  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5499 
5500  std::string archive_path(archive_path_in);
5501 
5502  if (path_is_relative(archive_path)) {
5503  // assume relative paths are relative to data_path / import / <session>
5504  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5505  boost::filesystem::path(archive_path).filename();
5506  archive_path = file_path.string();
5507  }
5508  validate_import_file_path_if_local(archive_path);
5509 
5510  if (is_a_supported_archive_file(archive_path)) {
5511  // find the archive file
5512  add_vsi_network_prefix(archive_path);
5513  if (!