OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DBHandler.h"
24 #include "DistributedLoader.h"
25 #include "TokenCompletionHints.h"
26 
27 #ifdef HAVE_PROFILER
28 #include <gperftools/heap-profiler.h>
29 #endif // HAVE_PROFILER
30 
31 #include "MapDRelease.h"
32 
33 #include "Calcite/Calcite.h"
34 #include "gen-cpp/CalciteServer.h"
35 
38 
39 #include "Catalog/Catalog.h"
44 #include "DistributedHandler.h"
46 #include "Geospatial/ColumnNames.h"
47 #include "Geospatial/Compression.h"
48 #include "Geospatial/GDAL.h"
49 #include "Geospatial/Types.h"
50 #include "ImportExport/Importer.h"
51 #include "LockMgr/LockMgr.h"
53 #include "Parser/ParserWrapper.h"
57 #include "QueryEngine/Execute.h"
67 #include "Shared/ArrowUtil.h"
68 #include "Shared/DateTimeParser.h"
69 #include "Shared/StringTransform.h"
70 #include "Shared/SysDefinitions.h"
71 #include "Shared/file_path_util.h"
73 #include "Shared/import_helpers.h"
74 #include "Shared/measure.h"
75 #include "Shared/misc.h"
76 #include "Shared/scope.h"
78 
79 #ifdef HAVE_AWS_S3
80 #include <aws/core/auth/AWSCredentialsProviderChain.h>
81 #endif
82 #include <fcntl.h>
83 #include <picosha2.h>
84 #include <sys/types.h>
85 #include <algorithm>
86 #include <boost/algorithm/string.hpp>
87 #include <boost/filesystem.hpp>
88 #include <boost/make_shared.hpp>
89 #include <boost/process/search_path.hpp>
90 #include <boost/program_options.hpp>
91 #include <boost/tokenizer.hpp>
92 #include <chrono>
93 #include <cmath>
94 #include <csignal>
95 #include <fstream>
96 #include <future>
97 #include <map>
98 #include <memory>
99 #include <random>
100 #include <string>
101 #include <thread>
102 #include <typeinfo>
103 
104 #include <arrow/api.h>
105 #include <arrow/io/api.h>
106 #include <arrow/ipc/api.h>
107 
108 #include "Shared/ArrowUtil.h"
109 #include "Shared/distributed.h"
111 
112 #ifdef ENABLE_IMPORT_PARQUET
113 extern bool g_enable_parquet_import_fsi;
114 #endif
115 
116 #ifdef HAVE_AWS_S3
117 extern bool g_allow_s3_server_privileges;
118 #endif
119 
120 extern bool g_enable_system_tables;
122 
125 
126 #define INVALID_SESSION_ID ""
127 
128 #define THROW_DB_EXCEPTION(errstr) \
129  { \
130  TDBException ex; \
131  ex.error_msg = errstr; \
132  LOG(ERROR) << ex.error_msg; \
133  throw ex; \
134  }
135 
136 thread_local std::string TrackingProcessor::client_address;
138 
139 namespace {
140 
142  const int32_t user_id,
143  const std::string& dashboard_name) {
144  return (cat.getMetadataForDashboard(std::to_string(user_id), dashboard_name));
145 }
146 
147 SessionMap::iterator get_session_from_map(const TSessionId& session,
148  SessionMap& session_map) {
149  auto session_it = session_map.find(session);
150  if (session_it == session_map.end()) {
151  THROW_DB_EXCEPTION("Session not valid.");
152  }
153  return session_it;
154 }
155 
156 struct ForceDisconnect : public std::runtime_error {
157  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
158 };
159 
160 } // namespace
161 
162 template <>
163 SessionMap::iterator DBHandler::get_session_it_unsafe(
164  const TSessionId& session,
166  auto session_it = get_session_from_map(session, sessions_);
167  try {
168  check_session_exp_unsafe(session_it);
169  } catch (const ForceDisconnect& e) {
170  read_lock.unlock();
172  auto session_it2 = get_session_from_map(session, sessions_);
173  disconnect_impl(session_it2, write_lock);
174  THROW_DB_EXCEPTION(e.what());
175  }
176  return session_it;
177 }
178 
179 template <>
180 SessionMap::iterator DBHandler::get_session_it_unsafe(
181  const TSessionId& session,
183  auto session_it = get_session_from_map(session, sessions_);
184  try {
185  check_session_exp_unsafe(session_it);
186  } catch (const ForceDisconnect& e) {
187  disconnect_impl(session_it, write_lock);
188  THROW_DB_EXCEPTION(e.what());
189  }
190  return session_it;
191 }
192 
193 template <>
196  std::vector<std::string> expired_sessions;
197  for (auto session_pair : sessions_) {
198  auto session_it = get_session_from_map(session_pair.first, sessions_);
199  try {
200  check_session_exp_unsafe(session_it);
201  } catch (const ForceDisconnect& e) {
202  expired_sessions.emplace_back(session_it->second->get_session_id());
203  }
204  }
205 
206  for (auto session_id : expired_sessions) {
207  if (leaf_aggregator_.leafCount() > 0) {
208  try {
209  leaf_aggregator_.disconnect(session_id);
210  } catch (TDBException& toe) {
211  LOG(INFO) << " Problem disconnecting from leaves : " << toe.what();
212  } catch (std::exception& e) {
213  LOG(INFO)
214  << " Problem disconnecting from leaves, check leaf logs for additonal info";
215  }
216  }
217  sessions_.erase(session_id);
218  }
219  if (render_handler_) {
220  write_lock.unlock();
221  for (auto session_id : expired_sessions) {
222  // NOTE: the render disconnect is done after the session lock is released to
223  // avoid a deadlock. See: https://heavyai.atlassian.net/browse/BE-3324
224  // This out-of-scope solution is a compromise for now until a better session
225  // handling/locking mechanism is developed for the renderer. Note as well that the
226  // session_id cannot be immediately reused. If a render request were to slip in
227  // after the lock is released and before the render disconnect could cause a
228  // problem.
229  render_handler_->disconnect(session_id);
230  }
231  }
232 }
233 
234 #ifdef ENABLE_GEOS
235 extern std::unique_ptr<std::string> g_libgeos_so_filename;
236 #endif
237 
238 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
239  const std::vector<LeafHostInfo>& string_leaves,
240  const std::string& base_data_path,
241  const bool allow_multifrag,
242  const bool jit_debug,
243  const bool intel_jit_profile,
244  const bool read_only,
245  const bool allow_loop_joins,
246  const bool enable_rendering,
247  const bool renderer_use_ppll_polys,
248  const bool renderer_prefer_igpu,
249  const unsigned renderer_vulkan_timeout_ms,
250  const bool enable_auto_clear_render_mem,
251  const int render_oom_retry_threshold,
252  const size_t render_mem_bytes,
253  const size_t max_concurrent_render_sessions,
254  const size_t reserved_gpu_mem,
255  const bool render_compositor_use_last_gpu,
256  const size_t num_reader_threads,
257  const AuthMetadata& authMetadata,
258  SystemParameters& system_parameters,
259  const bool legacy_syntax,
260  const int idle_session_duration,
261  const int max_session_duration,
262  const std::string& udf_filename,
263  const std::string& clang_path,
264  const std::vector<std::string>& clang_options,
265 #ifdef ENABLE_GEOS
266  const std::string& libgeos_so_filename,
267 #endif
268  const File_Namespace::DiskCacheConfig& disk_cache_config,
269  const bool is_new_db)
270  : leaf_aggregator_(db_leaves)
271  , db_leaves_(db_leaves)
272  , string_leaves_(string_leaves)
273  , base_data_path_(base_data_path)
274  , random_gen_(std::random_device{}())
275  , session_id_dist_(0, INT32_MAX)
276  , jit_debug_(jit_debug)
277  , intel_jit_profile_(intel_jit_profile)
278  , allow_multifrag_(allow_multifrag)
279  , read_only_(read_only)
280  , allow_loop_joins_(allow_loop_joins)
281  , authMetadata_(authMetadata)
282  , system_parameters_(system_parameters)
283  , legacy_syntax_(legacy_syntax)
284  , dispatch_queue_(
285  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
286  , super_user_rights_(false)
287  , idle_session_duration_(idle_session_duration * 60)
288  , max_session_duration_(max_session_duration * 60)
289  , enable_rendering_(enable_rendering)
290  , renderer_use_ppll_polys_(renderer_use_ppll_polys)
291  , renderer_prefer_igpu_(renderer_prefer_igpu)
292  , renderer_vulkan_timeout_(renderer_vulkan_timeout_ms)
293  , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
294  , render_oom_retry_threshold_(render_oom_retry_threshold)
295  , max_concurrent_render_sessions_(max_concurrent_render_sessions)
296  , reserved_gpu_mem_(reserved_gpu_mem)
297  , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
298  , render_mem_bytes_(render_mem_bytes)
299  , num_reader_threads_(num_reader_threads)
300 #ifdef ENABLE_GEOS
301  , libgeos_so_filename_(libgeos_so_filename)
302 #endif
303  , disk_cache_config_(disk_cache_config)
304  , udf_filename_(udf_filename)
305  , clang_path_(clang_path)
306  , clang_options_(clang_options)
307 
308 {
309  LOG(INFO) << "HeavyDB Server " << MAPD_RELEASE;
310  initialize(is_new_db);
311 }
312 
313 void DBHandler::initialize(const bool is_new_db) {
314  if (!initialized_) {
315  initialized_ = true;
316  } else {
318  "Server already initialized; service restart required to activate any new "
319  "entitlements.");
320  return;
321  }
322 
325  cpu_mode_only_ = true;
326  } else {
327 #ifdef HAVE_CUDA
329  cpu_mode_only_ = false;
330 #else
332  LOG(WARNING) << "This build isn't CUDA enabled, will run on CPU";
333  cpu_mode_only_ = true;
334 #endif
335  }
336 
337  bool is_rendering_enabled = enable_rendering_;
338  if (system_parameters_.num_gpus == 0) {
339  is_rendering_enabled = false;
340  }
341 
342  const auto data_path =
343  boost::filesystem::path(base_data_path_) / shared::kDataDirectoryName;
344  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
345  // manage cannot ask for
346  size_t total_reserved = reserved_gpu_mem_;
347  if (is_rendering_enabled) {
348  total_reserved += render_mem_bytes_;
349  }
350 
351  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
352 #ifdef HAVE_CUDA
353  if (!cpu_mode_only_ || is_rendering_enabled) {
354  try {
355  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
357  } catch (const std::exception& e) {
358  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
359  << e.what();
361  cpu_mode_only_ = true;
362  is_rendering_enabled = false;
363  }
364  }
365 #endif // HAVE_CUDA
366 
367  try {
368  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
370  std::move(cuda_mgr),
372  total_reserved,
375  } catch (const std::exception& e) {
376  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
377  }
378 
379  std::string udf_ast_filename("");
380 
381  try {
382  if (!udf_filename_.empty()) {
383  const auto cuda_mgr = data_mgr_->getCudaMgr();
384  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
385  cuda_mgr ? cuda_mgr->getDeviceArch()
387  UdfCompiler compiler(device_arch, clang_path_, clang_options_);
388 
389  const auto [cpu_udf_ir_file, cuda_udf_ir_file] = compiler.compileUdf(udf_filename_);
390  Executor::addUdfIrToModule(cpu_udf_ir_file, /*is_cuda_ir=*/false);
391  if (!cuda_udf_ir_file.empty()) {
392  Executor::addUdfIrToModule(cuda_udf_ir_file, /*is_cuda_ir=*/true);
393  }
394  udf_ast_filename = compiler.getAstFileName(udf_filename_);
395  }
396  } catch (const std::exception& e) {
397  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
398  }
399 
400  try {
401  calcite_ =
402  std::make_shared<Calcite>(system_parameters_, base_data_path_, udf_ast_filename);
403  } catch (const std::exception& e) {
404  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
405  }
406 
407  try {
408  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
409  if (!udf_filename_.empty()) {
410  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
411  }
412  } catch (const std::exception& e) {
413  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
414  }
415 
416  try {
418  } catch (const std::exception& e) {
419  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
420  }
421 
422  try {
423  auto udtfs = ThriftSerializers::to_thrift(
425  std::vector<TUserDefinedFunction> udfs = {};
426  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
427  } catch (const std::exception& e) {
428  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
429  }
430 
431  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
433  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
434  cpu_mode_only_ = true;
435  }
436 
437  LOG(INFO) << "Started in " << executor_device_type_ << " mode.";
438 
439  try {
441  SysCatalog::instance().init(base_data_path_,
442  data_mgr_,
444  calcite_,
445  is_new_db,
446  !db_leaves_.empty(),
448  } catch (const std::exception& e) {
449  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
450  }
451 
452  import_path_ = boost::filesystem::path(base_data_path_) / shared::kDefaultImportDirName;
453  start_time_ = std::time(nullptr);
454 
455  if (is_rendering_enabled) {
456  try {
457  render_handler_.reset(new RenderHandler(this,
461  false,
462  0,
463  false,
467  } catch (const std::exception& e) {
468  LOG(ERROR) << "Backend rendering disabled: " << e.what();
469  }
470  }
471 
473 
474  if (leaf_aggregator_.leafCount() > 0) {
475  try {
476  agg_handler_.reset(new HeavyDBAggHandler(this));
477  } catch (const std::exception& e) {
478  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
479  }
480  } else if (g_cluster) {
481  try {
482  leaf_handler_.reset(new HeavyDBLeafHandler(this));
483  } catch (const std::exception& e) {
484  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
485  }
486  }
487 
488 #ifdef ENABLE_GEOS
489  if (!libgeos_so_filename_.empty()) {
490  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename_));
491  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
492  }
493 #endif
494 }
495 
497  shutdown();
498 }
499 
500 void DBHandler::check_read_only(const std::string& str) {
501  if (DBHandler::read_only_) {
502  THROW_DB_EXCEPTION(str + " disabled: server running in read-only mode.");
503  }
504 }
505 
507  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
508  // We would create an in memory session for calcite with super user privileges which
509  // would be used for getting all tables metadata when a user runs the query. The
510  // session would be under the name of a proxy user/password which would only persist
511  // till server's lifetime or execution of calcite query(in memory) whichever is the
512  // earliest.
514  std::string session_id;
515  do {
516  session_id = generate_random_string(64);
517  } while (sessions_.find(session_id) != sessions_.end());
518  Catalog_Namespace::UserMetadata user_meta(-1,
519  calcite_->getInternalSessionProxyUserName(),
520  calcite_->getInternalSessionProxyPassword(),
521  true,
522  -1,
523  true,
524  false);
525  const auto emplace_ret =
526  sessions_.emplace(session_id,
527  std::make_shared<Catalog_Namespace::SessionInfo>(
528  catalog_ptr, user_meta, executor_device_type_, session_id));
529  CHECK(emplace_ret.second);
530  return session_id;
531 }
532 
534  const Catalog_Namespace::UserMetadata user_meta) {
535  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
536  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
537  user_meta.isSuper.load();
538 }
539 
540 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
541  // Remove InMemory calcite Session.
543  const auto it = sessions_.find(session_id);
544  CHECK(it != sessions_.end());
545  sessions_.erase(it);
546 }
547 
548 // internal connection for connections with no password
549 void DBHandler::internal_connect(TSessionId& session,
550  const std::string& username,
551  const std::string& dbname) {
552  auto stdlog = STDLOG(); // session_info set by connect_impl()
553  std::string username2 = username; // login() may reset username given as argument
554  std::string dbname2 = dbname; // login() may reset dbname given as argument
556  std::shared_ptr<Catalog> cat = nullptr;
557  try {
558  cat =
559  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
560  } catch (std::exception& e) {
561  THROW_DB_EXCEPTION(e.what());
562  }
563 
564  DBObject dbObject(dbname2, DatabaseDBObjectType);
565  dbObject.loadKey(*cat);
567  std::vector<DBObject> dbObjects;
568  dbObjects.push_back(dbObject);
569  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
570  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
571  " is not allowed to access database " + dbname2 + ".");
572  }
573  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
574 }
575 
577  return leaf_aggregator_.leafCount() > 0;
578 }
579 
580 void DBHandler::krb5_connect(TKrb5Session& session,
581  const std::string& inputToken,
582  const std::string& dbname) {
583  THROW_DB_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
584 }
585 
586 void DBHandler::connect(TSessionId& session,
587  const std::string& username,
588  const std::string& passwd,
589  const std::string& dbname) {
590  auto stdlog = STDLOG(); // session_info set by connect_impl()
591  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
592  std::string username2 = username; // login() may reset username given as argument
593  std::string dbname2 = dbname; // login() may reset dbname given as argument
595  std::shared_ptr<Catalog> cat = nullptr;
596  try {
597  cat = SysCatalog::instance().login(
598  dbname2, username2, passwd, user_meta, !super_user_rights_);
599  } catch (std::exception& e) {
600  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
601  THROW_DB_EXCEPTION(e.what());
602  }
603 
604  DBObject dbObject(dbname2, DatabaseDBObjectType);
605  dbObject.loadKey(*cat);
607  std::vector<DBObject> dbObjects;
608  dbObjects.push_back(dbObject);
609  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
610  stdlog.appendNameValuePairs(
611  "user", username, "db", dbname, "exception", "Missing Privileges");
612  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
613  " is not allowed to access database " + dbname2 + ".");
614  }
615  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
616 
617  // if pki auth session will come back encrypted with user pubkey
618  SysCatalog::instance().check_for_session_encryption(passwd, session);
619 }
620 
621 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::create_new_session(
622  TSessionId& session,
623  const std::string& dbname,
624  const Catalog_Namespace::UserMetadata& user_meta,
625  std::shared_ptr<Catalog> cat) {
626  do {
627  session = generate_random_string(32);
628  } while (sessions_.find(session) != sessions_.end());
629  std::pair<SessionMap::iterator, bool> emplace_retval =
630  sessions_.emplace(session,
631  std::make_shared<Catalog_Namespace::SessionInfo>(
632  cat, user_meta, executor_device_type_, session));
633  CHECK(emplace_retval.second);
634  auto& session_ptr = emplace_retval.first->second;
635  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
636  return session_ptr;
637 }
638 
639 void DBHandler::connect_impl(TSessionId& session,
640  const std::string& passwd,
641  const std::string& dbname,
642  const Catalog_Namespace::UserMetadata& user_meta,
643  std::shared_ptr<Catalog> cat,
644  query_state::StdLog& stdlog) {
645  // TODO(sy): Is there any reason to have dbname as a parameter
646  // here when the cat parameter already provides cat->name()?
647  // Should dbname and cat->name() ever differ?
648  {
650  expire_idle_sessions_unsafe(write_lock);
652  sessions_.size() + 1 > static_cast<size_t>(system_parameters_.num_sessions)) {
653  THROW_DB_EXCEPTION("Too many active sessions");
654  }
655  }
656  {
658  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
659  stdlog.setSessionInfo(session_ptr);
660  session_ptr->set_connection_info(getConnectionInfo().toString());
661  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
662  // while doing warmup
663  if (leaf_aggregator_.leafCount() > 0) {
664  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
665  return;
666  }
667  }
668  }
669  auto const roles =
670  stdlog.getConstSessionInfo()->get_currentUser().isSuper
671  ? std::vector<std::string>{{"super"}}
672  : SysCatalog::instance().getRoles(
673  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
674  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
675 }
676 
677 void DBHandler::disconnect(const TSessionId& session) {
678  auto stdlog = STDLOG();
679  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
680 
682  auto session_it = get_session_it_unsafe(session, write_lock);
683  stdlog.setSessionInfo(session_it->second);
684  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
685 
686  LOG(INFO) << "User " << session_it->second->get_currentUser().userLoggable()
687  << " disconnected from database " << dbname
688  << " with public_session_id: " << session_it->second->get_public_session_id();
689 
690  disconnect_impl(session_it, write_lock);
691 }
692 
693 void DBHandler::disconnect_impl(const SessionMap::iterator& session_it,
695  // session_it existence should already have been checked (i.e. called via
696  // get_session_it_unsafe(...))
697 
698  const auto session_id = session_it->second->get_session_id();
699  std::exception_ptr leaf_exception = nullptr;
700  try {
701  if (leaf_aggregator_.leafCount() > 0) {
702  leaf_aggregator_.disconnect(session_id);
703  }
704  } catch (...) {
705  leaf_exception = std::current_exception();
706  }
707 
708  {
709  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
710  render_group_assignment_map_.erase(session_id);
711  }
712 
713  sessions_.erase(session_it);
714  write_lock.unlock();
715 
716  if (render_handler_) {
717  render_handler_->disconnect(session_id);
718  }
719 }
720 
721 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
722  auto stdlog = STDLOG(get_session_ptr(session));
723  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
725  auto session_it = get_session_it_unsafe(session, write_lock);
726 
727  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
728 
729  try {
730  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
731  dbname2, session_it->second->get_currentUser().userName);
732  session_it->second->set_catalog_ptr(cat);
733  if (leaf_aggregator_.leafCount() > 0) {
734  leaf_aggregator_.switch_database(session, dbname);
735  return;
736  }
737  } catch (std::exception& e) {
738  THROW_DB_EXCEPTION(e.what());
739  }
740 }
741 
742 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
743  auto stdlog = STDLOG(get_session_ptr(session1));
744  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
746  auto session_it = get_session_it_unsafe(session1, write_lock);
747 
748  try {
749  const Catalog_Namespace::UserMetadata& user_meta =
750  session_it->second->get_currentUser();
751  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
752  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
753  if (leaf_aggregator_.leafCount() > 0) {
754  leaf_aggregator_.clone_session(session1, session2);
755  return;
756  }
757  } catch (std::exception& e) {
758  THROW_DB_EXCEPTION(e.what());
759  }
760 }
761 
762 void DBHandler::interrupt(const TSessionId& query_session,
763  const TSessionId& interrupt_session) {
764  // if this is for distributed setting, query_session becomes a parent session (agg)
765  // and the interrupt session is one of existing session in the leaf node (leaf)
766  // so we can think there exists a logical mapping
767  // between query_session (agg) and interrupt_session (leaf)
768  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
769  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
770  const auto allow_query_interrupt =
772  if (g_enable_dynamic_watchdog || allow_query_interrupt) {
773  // Shared lock to allow simultaneous interrupts of multiple sessions
775 
776  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
777  auto& cat = session_it->second.get()->getCatalog();
778  const auto dbname = cat.getCurrentDB().dbName;
780  jit_debug_ ? "/tmp" : "",
781  jit_debug_ ? "mapdquery" : "",
783  CHECK(executor);
784 
785  if (leaf_aggregator_.leafCount() > 0) {
786  leaf_aggregator_.interrupt(query_session, interrupt_session);
787  }
788  auto target_executor_ids = executor->getExecutorIdsRunningQuery(query_session);
789  if (target_executor_ids.empty()) {
791  executor->getSessionLock());
792  if (executor->checkIsQuerySessionEnrolled(query_session, session_read_lock)) {
793  session_read_lock.unlock();
794  VLOG(1) << "Received interrupt: "
795  << "User " << session_it->second->get_currentUser().userLoggable()
796  << ", Database " << dbname << std::endl;
797  executor->interrupt(query_session, interrupt_session);
798  }
799  } else {
800  for (auto& executor_id : target_executor_ids) {
801  VLOG(1) << "Received interrupt: "
802  << "Executor " << executor_id << ", User "
803  << session_it->second->get_currentUser().userLoggable() << ", Database "
804  << dbname << std::endl;
805  auto target_executor = Executor::getExecutor(executor_id);
806  target_executor->interrupt(query_session, interrupt_session);
807  }
808  }
809 
810  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
811  << " interrupted session with database " << dbname << std::endl;
812  }
813 }
814 
816  if (g_cluster) {
817  if (leaf_aggregator_.leafCount() > 0) {
818  return TRole::type::AGGREGATOR;
819  }
820  return TRole::type::LEAF;
821  }
822  return TRole::type::SERVER;
823 }
824 
825 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
826  auto stdlog = STDLOG(get_session_ptr(session));
827  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
828  const auto rendering_enabled = bool(render_handler_);
829  _return.read_only = read_only_;
830  _return.version = MAPD_RELEASE;
831  _return.rendering_enabled = rendering_enabled;
832  _return.start_time = start_time_;
833  _return.edition = MAPD_EDITION;
834  _return.host_name = heavyai::get_hostname();
835  _return.poly_rendering_enabled = rendering_enabled;
836  _return.role = getServerRole();
837  _return.renderer_status_json =
838  render_handler_ ? render_handler_->get_renderer_status_json() : "";
839 }
840 
841 void DBHandler::get_status(std::vector<TServerStatus>& _return,
842  const TSessionId& session) {
843  //
844  // get_status() is now called locally at startup on the aggregator
845  // in order to validate that all nodes of a cluster are running the
846  // same software version and the same renderer status
847  //
848  // In that context, it is called with the InvalidSessionID, and
849  // with the local super-user flag set.
850  //
851  // Hence, we allow this session-less mode only in distributed mode, and
852  // then on a leaf (always), or on the aggregator (only in super-user mode)
853  //
854  auto const allow_invalid_session = g_cluster && (!isAggregator() || super_user_rights_);
855  if (!allow_invalid_session || session != getInvalidSessionId()) {
856  auto stdlog = STDLOG(get_session_ptr(session));
857  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
858  } else {
859  LOG(INFO) << "get_status() called in session-less mode";
860  }
861  const auto rendering_enabled = bool(render_handler_);
862  TServerStatus ret;
863  ret.read_only = read_only_;
864  ret.version = MAPD_RELEASE;
865  ret.rendering_enabled = rendering_enabled;
866  ret.start_time = start_time_;
867  ret.edition = MAPD_EDITION;
868  ret.host_name = heavyai::get_hostname();
869  ret.poly_rendering_enabled = rendering_enabled;
870  ret.role = getServerRole();
871  ret.renderer_status_json =
872  render_handler_ ? render_handler_->get_renderer_status_json() : "";
873 
874  _return.push_back(ret);
875  if (leaf_aggregator_.leafCount() > 0) {
876  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
877  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
878  }
879 }
880 
881 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
882  const TSessionId& session) {
883  auto stdlog = STDLOG(get_session_ptr(session));
884  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
885  THardwareInfo ret;
886  const auto cuda_mgr = data_mgr_->getCudaMgr();
887  if (cuda_mgr) {
888  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
889  ret.start_gpu = cuda_mgr->getStartGpu();
890  if (ret.start_gpu >= 0) {
891  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
892  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
893  }
894  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
895  TGpuSpecification gpu_spec;
896  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
897  gpu_spec.num_sm = deviceProperties->numMPs;
898  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
899  gpu_spec.memory = deviceProperties->globalMem;
900  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
901  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
902  ret.gpu_info.push_back(gpu_spec);
903  }
904  }
905 
906  // start hardware/OS dependent code
907  ret.num_cpu_hw = std::thread::hardware_concurrency();
908  // ^ This might return diffrent results in case of hyper threading
909  // end hardware/OS dependent code
910 
911  _return.hardware_info.push_back(ret);
912  if (leaf_aggregator_.leafCount() > 0) {
913  ret.host_name = "aggregator";
914  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
915  _return.hardware_info.insert(_return.hardware_info.end(),
916  leaf_hardware.hardware_info.begin(),
917  leaf_hardware.hardware_info.end());
918  }
919 }
920 
921 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
922  auto session_ptr = get_session_ptr(session);
923  CHECK(session_ptr);
924  auto stdlog = STDLOG(session_ptr);
925  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
926  auto user_metadata = session_ptr->get_currentUser();
927 
928  _return.user = user_metadata.userName;
929  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
930  _return.start_time = session_ptr->get_start_time();
931  _return.is_super = user_metadata.isSuper;
932 }
933 
934 void DBHandler::set_leaf_info(const TSessionId& session, const TLeafInfo& info) {
935  g_distributed_leaf_idx = info.leaf_id;
936  g_distributed_num_leaves = info.num_leaves;
937 }
938 
940  const SQLTypeInfo& ti,
941  TColumn& column) {
942  if (ti.is_array()) {
943  TColumn tColumn;
944  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
945  CHECK(array_tv);
946  bool is_null = !array_tv->is_initialized();
947  if (!is_null) {
948  const auto& vec = array_tv->get();
949  for (const auto& elem_tv : vec) {
950  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
951  }
952  }
953  column.data.arr_col.push_back(tColumn);
954  column.nulls.push_back(is_null && !ti.get_notnull());
955  } else if (ti.is_geometry()) {
956  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
957  if (scalar_tv) {
958  auto s_n = boost::get<NullableString>(scalar_tv);
959  auto s = boost::get<std::string>(s_n);
960  if (s) {
961  column.data.str_col.push_back(*s);
962  } else {
963  column.data.str_col.emplace_back(""); // null string
964  auto null_p = boost::get<void*>(s_n);
965  CHECK(null_p && !*null_p);
966  }
967  column.nulls.push_back(!s && !ti.get_notnull());
968  } else {
969  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
970  CHECK(array_tv);
971  bool is_null = !array_tv->is_initialized();
972  if (!is_null) {
973  auto elem_type = SQLTypeInfo(kDOUBLE, false);
974  TColumn tColumn;
975  const auto& vec = array_tv->get();
976  for (const auto& elem_tv : vec) {
977  value_to_thrift_column(elem_tv, elem_type, tColumn);
978  }
979  column.data.arr_col.push_back(tColumn);
980  column.nulls.push_back(false);
981  } else {
982  TColumn tColumn;
983  column.data.arr_col.push_back(tColumn);
984  column.nulls.push_back(is_null && !ti.get_notnull());
985  }
986  }
987  } else {
988  CHECK(!ti.is_column());
989  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
990  CHECK(scalar_tv);
991  if (boost::get<int64_t>(scalar_tv)) {
992  int64_t data = *(boost::get<int64_t>(scalar_tv));
993 
994  if (ti.is_decimal()) {
995  double val = static_cast<double>(data);
996  if (ti.get_scale() > 0) {
997  val /= pow(10.0, std::abs(ti.get_scale()));
998  }
999  column.data.real_col.push_back(val);
1000  } else {
1001  column.data.int_col.push_back(data);
1002  }
1003 
1004  switch (ti.get_type()) {
1005  case kBOOLEAN:
1006  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
1007  break;
1008  case kTINYINT:
1009  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
1010  break;
1011  case kSMALLINT:
1012  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
1013  break;
1014  case kINT:
1015  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
1016  break;
1017  case kNUMERIC:
1018  case kDECIMAL:
1019  case kBIGINT:
1020  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1021  break;
1022  case kTIME:
1023  case kTIMESTAMP:
1024  case kDATE:
1025  case kINTERVAL_DAY_TIME:
1026  case kINTERVAL_YEAR_MONTH:
1027  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1028  break;
1029  default:
1030  column.nulls.push_back(false);
1031  }
1032  } else if (boost::get<double>(scalar_tv)) {
1033  double data = *(boost::get<double>(scalar_tv));
1034  column.data.real_col.push_back(data);
1035  if (ti.get_type() == kFLOAT) {
1036  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1037  } else {
1038  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
1039  }
1040  } else if (boost::get<float>(scalar_tv)) {
1041  CHECK_EQ(kFLOAT, ti.get_type());
1042  float data = *(boost::get<float>(scalar_tv));
1043  column.data.real_col.push_back(data);
1044  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1045  } else if (boost::get<NullableString>(scalar_tv)) {
1046  auto s_n = boost::get<NullableString>(scalar_tv);
1047  auto s = boost::get<std::string>(s_n);
1048  if (s) {
1049  column.data.str_col.push_back(*s);
1050  } else {
1051  column.data.str_col.emplace_back(""); // null string
1052  auto null_p = boost::get<void*>(s_n);
1053  CHECK(null_p && !*null_p);
1054  }
1055  column.nulls.push_back(!s && !ti.get_notnull());
1056  } else {
1057  CHECK(false);
1058  }
1059  }
1060 }
1061 
1063  TDatum datum;
1064  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1065  if (!scalar_tv) {
1066  CHECK(ti.is_array());
1067  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1068  CHECK(array_tv);
1069  if (array_tv->is_initialized()) {
1070  const auto& vec = array_tv->get();
1071  for (const auto& elem_tv : vec) {
1072  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
1073  datum.val.arr_val.push_back(scalar_col_val);
1074  }
1075  // Datum is not null, at worst it's an empty array Datum
1076  datum.is_null = false;
1077  } else {
1078  datum.is_null = true;
1079  }
1080  return datum;
1081  }
1082  if (boost::get<int64_t>(scalar_tv)) {
1083  int64_t data = *(boost::get<int64_t>(scalar_tv));
1084 
1085  if (ti.is_decimal()) {
1086  double val = static_cast<double>(data);
1087  if (ti.get_scale() > 0) {
1088  val /= pow(10.0, std::abs(ti.get_scale()));
1089  }
1090  datum.val.real_val = val;
1091  } else {
1092  datum.val.int_val = data;
1093  }
1094 
1095  switch (ti.get_type()) {
1096  case kBOOLEAN:
1097  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
1098  break;
1099  case kTINYINT:
1100  datum.is_null = (datum.val.int_val == NULL_TINYINT);
1101  break;
1102  case kSMALLINT:
1103  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
1104  break;
1105  case kINT:
1106  datum.is_null = (datum.val.int_val == NULL_INT);
1107  break;
1108  case kDECIMAL:
1109  case kNUMERIC:
1110  case kBIGINT:
1111  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1112  break;
1113  case kTIME:
1114  case kTIMESTAMP:
1115  case kDATE:
1116  case kINTERVAL_DAY_TIME:
1117  case kINTERVAL_YEAR_MONTH:
1118  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1119  break;
1120  default:
1121  datum.is_null = false;
1122  }
1123  } else if (boost::get<double>(scalar_tv)) {
1124  datum.val.real_val = *(boost::get<double>(scalar_tv));
1125  if (ti.get_type() == kFLOAT) {
1126  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1127  } else {
1128  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
1129  }
1130  } else if (boost::get<float>(scalar_tv)) {
1131  CHECK_EQ(kFLOAT, ti.get_type());
1132  datum.val.real_val = *(boost::get<float>(scalar_tv));
1133  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1134  } else if (boost::get<NullableString>(scalar_tv)) {
1135  auto s_n = boost::get<NullableString>(scalar_tv);
1136  auto s = boost::get<std::string>(s_n);
1137  if (s) {
1138  datum.val.str_val = *s;
1139  } else {
1140  auto null_p = boost::get<void*>(s_n);
1141  CHECK(null_p && !*null_p);
1142  }
1143  datum.is_null = !s;
1144  } else {
1145  CHECK(false);
1146  }
1147  return datum;
1148 }
1149 
1151  TQueryResult& _return,
1152  const QueryStateProxy& query_state_proxy,
1153  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1154  const std::string& query_str,
1155  const bool column_format,
1156  const std::string& nonce,
1157  const int32_t first_n,
1158  const int32_t at_most_n,
1159  const bool use_calcite) {
1160  _return.total_time_ms = 0;
1161  _return.nonce = nonce;
1162  ParserWrapper pw{query_str};
1163  switch (pw.getQueryType()) {
1165  _return.query_type = TQueryType::READ;
1166  VLOG(1) << "query type: READ";
1167  break;
1168  }
1170  _return.query_type = TQueryType::WRITE;
1171  VLOG(1) << "query type: WRITE";
1172  break;
1173  }
1175  _return.query_type = TQueryType::SCHEMA_READ;
1176  VLOG(1) << "query type: SCHEMA READ";
1177  break;
1178  }
1180  _return.query_type = TQueryType::SCHEMA_WRITE;
1181  VLOG(1) << "query type: SCHEMA WRITE";
1182  break;
1183  }
1184  default: {
1185  _return.query_type = TQueryType::UNKNOWN;
1186  LOG(WARNING) << "query type: UNKNOWN";
1187  break;
1188  }
1189  }
1190 
1193  _return.total_time_ms += measure<>::execution([&]() {
1195  query_state_proxy,
1196  column_format,
1197  session_ptr->get_executor_device_type(),
1198  first_n,
1199  at_most_n,
1200  use_calcite,
1201  locks);
1203  _return, result, query_state_proxy, column_format, first_n, at_most_n);
1204  });
1205 }
1206 
1207 void DBHandler::convertData(TQueryResult& _return,
1209  const QueryStateProxy& query_state_proxy,
1210  const bool column_format,
1211  const int32_t first_n,
1212  const int32_t at_most_n) {
1213  _return.execution_time_ms += result.getExecutionTime();
1214  if (result.empty()) {
1215  return;
1216  }
1217 
1218  switch (result.getResultType()) {
1220  convertRows(_return,
1221  query_state_proxy,
1222  result.getTargetsMeta(),
1223  *result.getRows(),
1224  column_format,
1225  first_n,
1226  at_most_n);
1227  break;
1229  convertResult(_return, *result.getRows(), true);
1230  break;
1232  convertExplain(_return, *result.getRows(), true);
1233  break;
1235  convertRows(_return,
1236  query_state_proxy,
1237  result.getTargetsMeta(),
1238  *result.getRows(),
1239  column_format,
1240  -1,
1241  -1);
1242  break;
1243  }
1244 }
1245 
1246 void DBHandler::sql_execute(TQueryResult& _return,
1247  const TSessionId& session,
1248  const std::string& query_str,
1249  const bool column_format,
1250  const std::string& nonce,
1251  const int32_t first_n,
1252  const int32_t at_most_n) {
1253  const std::string exec_ra_prefix = "execute relalg";
1254  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1255  auto actual_query =
1256  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1257  auto session_ptr = get_session_ptr(session);
1258  auto query_state = create_query_state(session_ptr, actual_query);
1259  auto stdlog = STDLOG(session_ptr, query_state);
1260  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1261  stdlog.appendNameValuePairs("nonce", nonce);
1262  auto timer = DEBUG_TIMER(__func__);
1263  try {
1264  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1265  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1266  };
1267 
1268  if (first_n >= 0 && at_most_n >= 0) {
1269  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1270  }
1271 
1272  if (leaf_aggregator_.leafCount() > 0) {
1273  if (!agg_handler_) {
1274  THROW_DB_EXCEPTION("Distributed support is disabled.");
1275  }
1276  _return.total_time_ms = measure<>::execution([&]() {
1277  agg_handler_->cluster_execute(_return,
1278  query_state->createQueryStateProxy(),
1279  query_state->getQueryStr(),
1280  column_format,
1281  nonce,
1282  first_n,
1283  at_most_n,
1285  });
1286  _return.nonce = nonce;
1287  } else {
1288  sql_execute_local(_return,
1289  query_state->createQueryStateProxy(),
1290  session_ptr,
1291  actual_query,
1292  column_format,
1293  nonce,
1294  first_n,
1295  at_most_n,
1296  use_calcite);
1297  }
1298  _return.total_time_ms += process_deferred_copy_from(session);
1299  std::string debug_json = timer.stopAndGetJson();
1300  if (!debug_json.empty()) {
1301  _return.__set_debug(std::move(debug_json));
1302  }
1303  stdlog.appendNameValuePairs(
1304  "execution_time_ms",
1305  _return.execution_time_ms,
1306  "total_time_ms", // BE-3420 - Redundant with duration field
1307  stdlog.duration<std::chrono::milliseconds>());
1308  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1309  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1310  } catch (const std::exception& e) {
1311  if (strstr(e.what(), "java.lang.NullPointerException")) {
1312  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1313  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1314  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1315  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1316  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1317  } else {
1318  THROW_DB_EXCEPTION(e.what());
1319  }
1320  }
1321 }
1322 
1324  const TSessionId& session,
1325  const std::string& query_str,
1326  const bool column_format,
1327  const int32_t first_n,
1328  const int32_t at_most_n,
1330  const std::string exec_ra_prefix = "execute relalg";
1331  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1332  auto actual_query =
1333  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1334 
1335  auto session_ptr = get_session_ptr(session);
1336  CHECK(session_ptr);
1337  auto query_state = create_query_state(session_ptr, actual_query);
1338  auto stdlog = STDLOG(session_ptr, query_state);
1339  auto timer = DEBUG_TIMER(__func__);
1340 
1341  try {
1342  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1343  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1344  };
1345 
1346  if (first_n >= 0 && at_most_n >= 0) {
1347  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1348  }
1349  auto total_time_ms = measure<>::execution([&]() {
1351  query_state->createQueryStateProxy(),
1352  column_format,
1353  session_ptr->get_executor_device_type(),
1354  first_n,
1355  at_most_n,
1356  use_calcite,
1357  locks);
1358  });
1359 
1360  _return.setExecutionTime(total_time_ms + process_deferred_copy_from(session));
1361 
1362  stdlog.appendNameValuePairs(
1363  "execution_time_ms",
1364  _return.getExecutionTime(),
1365  "total_time_ms", // BE-3420 - Redundant with duration field
1366  stdlog.duration<std::chrono::milliseconds>());
1367  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1368  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1369  } catch (const std::exception& e) {
1370  if (strstr(e.what(), "java.lang.NullPointerException")) {
1371  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1372  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1373  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1374  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1375  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1376  } else {
1377  THROW_DB_EXCEPTION(e.what());
1378  }
1379  }
1380 }
1381 
1382 int64_t DBHandler::process_deferred_copy_from(const TSessionId& session_id) {
1383  int64_t total_time_ms(0);
1384  // if the SQL statement we just executed was a geo COPY FROM, the import
1385  // parameters were captured, and this flag set, so we do the actual import here
1386  if (auto deferred_copy_from_state = deferred_copy_from_sessions(session_id)) {
1387  // import_geo_table() calls create_table() which calls this function to
1388  // do the work, so reset the flag now to avoid executing this part a
1389  // second time at the end of that, which would fail as the table was
1390  // already created! Also reset the flag with a ScopeGuard on exiting
1391  // this function any other way, such as an exception from the code above!
1392  deferred_copy_from_sessions.remove(session_id);
1393 
1394  // create table as replicated?
1395  TCreateParams create_params;
1396  if (deferred_copy_from_state->partitions == "REPLICATED") {
1397  create_params.is_replicated = true;
1398  }
1399 
1400  // now do (and time) the import
1401  total_time_ms = measure<>::execution([&]() {
1402  importGeoTableGlobFilterSort(session_id,
1403  deferred_copy_from_state->table,
1404  deferred_copy_from_state->file_name,
1405  deferred_copy_from_state->copy_params,
1406  TRowDescriptor(),
1407  create_params);
1408  });
1409  }
1410  return total_time_ms;
1411 }
1412 
1413 void DBHandler::sql_execute_df(TDataFrame& _return,
1414  const TSessionId& session,
1415  const std::string& query_str,
1416  const TDeviceType::type results_device_type,
1417  const int32_t device_id,
1418  const int32_t first_n,
1419  const TArrowTransport::type transport_method) {
1420  auto session_ptr = get_session_ptr(session);
1421  CHECK(session_ptr);
1422  auto query_state = create_query_state(session_ptr, query_str);
1423  auto stdlog = STDLOG(session_ptr, query_state);
1424 
1425  const auto executor_device_type = session_ptr->get_executor_device_type();
1426 
1427  if (results_device_type == TDeviceType::GPU) {
1428  if (executor_device_type != ExecutorDeviceType::GPU) {
1429  THROW_DB_EXCEPTION(std::string("GPU mode is not allowed in this session"));
1430  }
1431  if (!data_mgr_->gpusPresent()) {
1432  THROW_DB_EXCEPTION(std::string("No GPU is available in this server"));
1433  }
1434  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1436  std::string("Invalid device_id or unavailable GPU with this ID"));
1437  }
1438  }
1439  ParserWrapper pw{query_str};
1440  if (pw.getQueryType() != ParserWrapper::QueryType::Read) {
1441  THROW_DB_EXCEPTION(std::string(
1442  "Only read queries supported for the Arrow sql_execute_df endpoint."));
1443  }
1444  if (ExplainInfo(query_str).isCalciteExplain()) {
1445  THROW_DB_EXCEPTION(std::string(
1446  "Explain is currently unsupported by the Arrow sql_execute_df endpoint."));
1447  }
1448 
1449  ExecutionResult execution_result;
1451  sql_execute_impl(execution_result,
1452  query_state->createQueryStateProxy(),
1453  true, /* column_format - does this do anything? */
1454  executor_device_type,
1455  first_n,
1456  -1, /* at_most_n */
1457  true,
1458  locks);
1459 
1460  const auto result_set = execution_result.getRows();
1461  const auto executor_results_device_type = results_device_type == TDeviceType::CPU
1464  _return.execution_time_ms =
1465  execution_result.getExecutionTime() - result_set->getQueueTime();
1466  const auto converter = std::make_unique<ArrowResultSetConverter>(
1467  result_set,
1468  data_mgr_,
1469  executor_results_device_type,
1470  device_id,
1471  getTargetNames(execution_result.getTargetsMeta()),
1472  first_n,
1473  ArrowTransport(transport_method));
1474  ArrowResult arrow_result;
1475  _return.arrow_conversion_time_ms +=
1476  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
1477  _return.sm_handle =
1478  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
1479  _return.sm_size = arrow_result.sm_size;
1480  _return.df_handle =
1481  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
1482  _return.df_buffer =
1483  std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
1484  if (executor_results_device_type == ExecutorDeviceType::GPU) {
1485  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1486  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
1487  ipc_handle_to_dev_ptr_.insert(
1488  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
1489  }
1490  _return.df_size = arrow_result.df_size;
1491 }
1492 
1493 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1494  const TSessionId& session,
1495  const std::string& query_str,
1496  const int32_t device_id,
1497  const int32_t first_n) {
1498  auto stdlog = STDLOG(get_session_ptr(session));
1499  sql_execute_df(_return,
1500  session,
1501  query_str,
1502  TDeviceType::GPU,
1503  device_id,
1504  first_n,
1505  TArrowTransport::SHARED_MEMORY);
1506 }
1507 
1508 // For now we have only one user of a data frame in all cases.
1509 void DBHandler::deallocate_df(const TSessionId& session,
1510  const TDataFrame& df,
1511  const TDeviceType::type device_type,
1512  const int32_t device_id) {
1513  auto stdlog = STDLOG(get_session_ptr(session));
1514  std::string serialized_cuda_handle = "";
1515  if (device_type == TDeviceType::GPU) {
1516  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1517  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1518  TDBException ex;
1519  ex.error_msg = std::string(
1520  "Current data frame handle is not bookkept or been inserted "
1521  "twice");
1522  LOG(ERROR) << ex.error_msg;
1523  throw ex;
1524  }
1525  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1526  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1527  }
1528  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1529  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1531  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1533  result,
1534  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1535  device_id,
1536  data_mgr_);
1537 }
1538 
1539 void DBHandler::sql_validate(TRowDescriptor& _return,
1540  const TSessionId& session,
1541  const std::string& query_str) {
1542  try {
1543  auto stdlog = STDLOG(get_session_ptr(session));
1544  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1545  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1546  stdlog.setQueryState(query_state);
1547 
1548  ParserWrapper pw{query_str};
1549  if (ExplainInfo(query_str).isExplain() || pw.is_ddl || pw.is_update_dml) {
1550  throw std::runtime_error("Can only validate SELECT statements.");
1551  }
1552 
1553  const auto execute_read_lock =
1557 
1558  TPlanResult parse_result;
1560  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1561  query_state->getQueryStr(),
1562  {},
1563  true,
1565  /*check_privileges=*/true);
1566  const auto query_ra = parse_result.plan_result;
1567 
1568  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1569  _return = fixup_row_descriptor(result.row_set.row_desc,
1570  query_state->getConstSessionInfo()->getCatalog());
1571  } catch (const std::exception& e) {
1572  THROW_DB_EXCEPTION(std::string(e.what()));
1573  }
1574 }
1575 
1576 namespace {
1577 
1579  std::unordered_set<std::string> uc_column_names;
1580  std::unordered_set<std::string> uc_column_table_qualifiers;
1581 };
1582 
1583 // Extract what looks like a (qualified) identifier from the partial query.
1584 // The results will be used to rank the auto-completion results: tables which
1585 // contain at least one of the identifiers first.
1587  const std::string& sql) {
1588  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1589  boost::regex::extended | boost::regex::icase};
1590  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1591  boost::sregex_token_iterator end;
1592  std::unordered_set<std::string> uc_column_names;
1593  std::unordered_set<std::string> uc_column_table_qualifiers;
1594  for (; tok_it != end; ++tok_it) {
1595  std::string column_name = *tok_it;
1596  std::vector<std::string> column_tokens;
1597  boost::split(column_tokens, column_name, boost::is_any_of("."));
1598  if (column_tokens.size() == 2) {
1599  // If the column name is qualified, take user's word.
1600  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1601  } else {
1602  uc_column_names.insert(to_upper(column_name));
1603  }
1604  }
1605  return {uc_column_names, uc_column_table_qualifiers};
1606 }
1607 
1608 } // namespace
1609 
1610 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1611  const TSessionId& session,
1612  const std::string& sql,
1613  const int cursor) {
1614  auto stdlog = STDLOG(get_session_ptr(session));
1615  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1616  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1617  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1618  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1619  proj_tokens.uc_column_names, visible_tables, stdlog);
1620  // Add the table qualifiers explicitly specified by the user.
1621  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1622  proj_tokens.uc_column_table_qualifiers.end());
1623  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1624  std::sort(
1625  hints.begin(),
1626  hints.end(),
1627  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1628  if (lhs.type == TCompletionHintType::TABLE &&
1629  rhs.type == TCompletionHintType::TABLE) {
1630  // Between two tables, one which is compatible with the specified
1631  // projections and one which isn't, pick the one which is compatible.
1632  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1633  compatible_table_names.end() &&
1634  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1635  compatible_table_names.end()) {
1636  return true;
1637  }
1638  }
1639  return lhs.type < rhs.type;
1640  });
1641 }
1642 
1643 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1644  std::vector<std::string>& visible_tables,
1645  query_state::StdLog& stdlog,
1646  const std::string& sql,
1647  const int cursor) {
1648  const auto& session_info = *stdlog.getConstSessionInfo();
1649  try {
1650  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1651 
1652  // Filter out keywords suggested by Calcite which we don't support.
1654  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1655  } catch (const std::exception& e) {
1656  TDBException ex;
1657  ex.error_msg = std::string(e.what());
1658  LOG(ERROR) << ex.error_msg;
1659  throw ex;
1660  }
1661  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1662  const size_t length_to_cursor =
1663  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1664  // Trust hints from Calcite after the FROM keyword.
1665  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1666  return;
1667  }
1668  // Before FROM, the query is too incomplete for context-sensitive completions.
1669  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1670 }
1671 
1672 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1673  query_state::StdLog& stdlog,
1674  std::vector<std::string>& visible_tables,
1675  const std::string& sql,
1676  const int cursor) {
1677  const auto last_word =
1678  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1679  boost::regex select_expr{R"(\s*select\s+)",
1680  boost::regex::extended | boost::regex::icase};
1681  const size_t length_to_cursor =
1682  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1683  // After SELECT but before FROM, look for all columns in all tables which match the
1684  // prefix.
1685  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1686  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1687  // Trust the fully qualified columns the most.
1688  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1689  return;
1690  }
1691  // Not much information to use, just retrieve column names which match the prefix.
1692  if (should_suggest_column_hints(sql)) {
1693  get_column_hints(hints, last_word, column_names_by_table);
1694  return;
1695  }
1696  const std::string kFromKeyword{"FROM"};
1697  if (boost::istarts_with(kFromKeyword, last_word)) {
1698  TCompletionHint keyword_hint;
1699  keyword_hint.type = TCompletionHintType::KEYWORD;
1700  keyword_hint.replaced = last_word;
1701  keyword_hint.hints.emplace_back(kFromKeyword);
1702  hints.push_back(keyword_hint);
1703  }
1704  } else {
1705  const std::string kSelectKeyword{"SELECT"};
1706  if (boost::istarts_with(kSelectKeyword, last_word)) {
1707  TCompletionHint keyword_hint;
1708  keyword_hint.type = TCompletionHintType::KEYWORD;
1709  keyword_hint.replaced = last_word;
1710  keyword_hint.hints.emplace_back(kSelectKeyword);
1711  hints.push_back(keyword_hint);
1712  }
1713  }
1714 }
1715 
1716 std::unordered_map<std::string, std::unordered_set<std::string>>
1717 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1718  query_state::StdLog& stdlog) {
1719  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1720  for (auto it = table_names.begin(); it != table_names.end();) {
1721  TTableDetails table_details;
1722  try {
1723  get_table_details_impl(table_details, stdlog, *it, false, false);
1724  } catch (const TDBException& e) {
1725  // Remove the corrupted Table/View name from the list for further processing.
1726  it = table_names.erase(it);
1727  continue;
1728  }
1729  for (const auto& column_type : table_details.row_desc) {
1730  column_names_by_table[*it].emplace(column_type.col_name);
1731  }
1732  ++it;
1733  }
1734  return column_names_by_table;
1735 }
1736 
1740 }
1741 
1743  const std::unordered_set<std::string>& uc_column_names,
1744  std::vector<std::string>& table_names,
1745  query_state::StdLog& stdlog) {
1746  std::unordered_set<std::string> compatible_table_names_by_column;
1747  for (auto it = table_names.begin(); it != table_names.end();) {
1748  TTableDetails table_details;
1749  try {
1750  get_table_details_impl(table_details, stdlog, *it, false, false);
1751  } catch (const TDBException& e) {
1752  // Remove the corrupted Table/View name from the list for further processing.
1753  it = table_names.erase(it);
1754  continue;
1755  }
1756  for (const auto& column_type : table_details.row_desc) {
1757  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1758  compatible_table_names_by_column.emplace(to_upper(*it));
1759  break;
1760  }
1761  }
1762  ++it;
1763  }
1764  return compatible_table_names_by_column;
1765 }
1766 
1767 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1768  QueryStateProxy query_state_proxy) {
1769  TQueryResult _return;
1771  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1772  [this, &result, &query_state_proxy, &query_ra](const size_t executor_index) {
1773  auto qid_scope_guard = query_state_proxy.getQueryState().setThreadLocalQueryId();
1774  execute_rel_alg(result,
1775  query_state_proxy,
1776  query_ra,
1777  true,
1779  -1,
1780  -1,
1781  /*just_validate=*/true,
1782  /*find_filter_push_down_candidates=*/false,
1783  ExplainInfo(),
1784  executor_index);
1785  });
1787  dispatch_queue_->submit(execute_rel_alg_task, /*is_update_delete=*/false);
1788  auto result_future = execute_rel_alg_task->get_future();
1789  result_future.get();
1790  DBHandler::convertData(_return, result, query_state_proxy, true, -1, -1);
1791  return _return;
1792 }
1793 
1794 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1795  auto stdlog = STDLOG(get_session_ptr(session));
1796  auto session_ptr = stdlog.getConstSessionInfo();
1797  if (!session_ptr->get_currentUser().isSuper) {
1798  // WARNING: This appears to not include roles a user is a member of,
1799  // if the role has no permissions granted to it.
1800  roles =
1801  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1802  session_ptr->getCatalog().getCurrentDB().dbId);
1803  } else {
1804  roles = SysCatalog::instance().getRoles(
1805  false, true, session_ptr->get_currentUser().userName);
1806  }
1807 }
1808 
1809 bool DBHandler::has_role(const TSessionId& sessionId,
1810  const std::string& granteeName,
1811  const std::string& roleName) {
1812  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1813  const auto session_ptr = stdlog.getConstSessionInfo();
1814  const auto current_user = session_ptr->get_currentUser();
1815  if (!current_user.isSuper) {
1816  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1817  user && current_user.userName != granteeName) {
1818  THROW_DB_EXCEPTION("Only super users can check other user's roles.");
1819  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1820  current_user.userName, granteeName, true)) {
1822  "Only super users can check roles assignment that have not been directly "
1823  "granted to a user.");
1824  }
1825  }
1826  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1827 }
1828 
1829 static TDBObject serialize_db_object(const std::string& roleName,
1830  const DBObject& inObject) {
1831  TDBObject outObject;
1832  outObject.objectName = inObject.getName();
1833  outObject.grantee = roleName;
1834  outObject.objectId = inObject.getObjectKey().objectId;
1835  const auto ap = inObject.getPrivileges();
1836  switch (inObject.getObjectKey().permissionType) {
1837  case DatabaseDBObjectType:
1838  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1839  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1840  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1841  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1842  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1843 
1844  break;
1845  case TableDBObjectType:
1846  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1847  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1848  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1849  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1850  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1851  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1852  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1853  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1854  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1855 
1856  break;
1857  case DashboardDBObjectType:
1858  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1859  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1860  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1861  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1862  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1863 
1864  break;
1865  case ViewDBObjectType:
1866  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1867  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1868  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1869  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1870  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1871  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1872  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1873 
1874  break;
1875  case ServerDBObjectType:
1876  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1877  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1878  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1879  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1880  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::SERVER_USAGE));
1881 
1882  break;
1883  default:
1884  CHECK(false);
1885  }
1886  const int type_val = static_cast<int>(inObject.getType());
1887  CHECK(type_val >= 0 && type_val < 6);
1888  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1889  return outObject;
1890 }
1891 
1893  const TDBObjectPermissions& permissions) {
1894  if (!permissions.__isset.database_permissions_) {
1895  THROW_DB_EXCEPTION("Database permissions not set for check.")
1896  }
1897  auto perms = permissions.database_permissions_;
1898  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1899  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1900  (perms.view_sql_editor_ &&
1902  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1903  return false;
1904  } else {
1905  return true;
1906  }
1907 }
1908 
1910  const TDBObjectPermissions& permissions) {
1911  if (!permissions.__isset.table_permissions_) {
1912  THROW_DB_EXCEPTION("Table permissions not set for check.")
1913  }
1914  auto perms = permissions.table_permissions_;
1915  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1916  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1917  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1918  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1919  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1920  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1921  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1922  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1923  return false;
1924  } else {
1925  return true;
1926  }
1927 }
1928 
1930  const TDBObjectPermissions& permissions) {
1931  if (!permissions.__isset.dashboard_permissions_) {
1932  THROW_DB_EXCEPTION("Dashboard permissions not set for check.")
1933  }
1934  auto perms = permissions.dashboard_permissions_;
1935  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1936  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1937  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1938  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1939  return false;
1940  } else {
1941  return true;
1942  }
1943 }
1944 
1946  const TDBObjectPermissions& permissions) {
1947  if (!permissions.__isset.view_permissions_) {
1948  THROW_DB_EXCEPTION("View permissions not set for check.")
1949  }
1950  auto perms = permissions.view_permissions_;
1951  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1952  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1953  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1954  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1955  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1956  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1957  return false;
1958  } else {
1959  return true;
1960  }
1961 }
1962 
1964  const TDBObjectPermissions& permissions) {
1965  CHECK(permissions.__isset.server_permissions_);
1966  auto perms = permissions.server_permissions_;
1967  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
1968  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
1969  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER)) ||
1970  (perms.usage_ && !privs.hasPermission(ServerPrivileges::SERVER_USAGE))) {
1971  return false;
1972  } else {
1973  return true;
1974  }
1975 }
1976 
1977 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1978  const std::string& granteeName,
1979  const std::string& objectName,
1980  const TDBObjectType::type objectType,
1981  const TDBObjectPermissions& permissions) {
1982  auto stdlog = STDLOG(get_session_ptr(sessionId));
1983  auto session_ptr = stdlog.getConstSessionInfo();
1984  auto const& cat = session_ptr->getCatalog();
1985  auto const& current_user = session_ptr->get_currentUser();
1986  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1987  current_user.userName, granteeName, false)) {
1989  "Users except superusers can only check privileges for self or roles granted "
1990  "to "
1991  "them.")
1992  }
1994  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1995  user_meta.isSuper) {
1996  return true;
1997  }
1998  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1999  if (!grnt) {
2000  THROW_DB_EXCEPTION("User or Role " + granteeName + " does not exist.")
2001  }
2003  std::string func_name;
2004  switch (objectType) {
2007  func_name = "database";
2008  break;
2011  func_name = "table";
2012  break;
2015  func_name = "dashboard";
2016  break;
2019  func_name = "view";
2020  break;
2023  func_name = "server";
2024  break;
2025  default:
2026  THROW_DB_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
2027  }
2028  DBObject req_object(objectName, type);
2029  req_object.loadKey(cat);
2030 
2031  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
2032  if (grantee_object) {
2033  // if grantee has privs on the object
2034  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
2035  } else {
2036  // no privileges on that object
2037  return false;
2038  }
2039 }
2040 
2041 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
2042  const TSessionId& sessionId,
2043  const std::string& roleName) {
2044  auto stdlog = STDLOG(get_session_ptr(sessionId));
2045  auto session_ptr = stdlog.getConstSessionInfo();
2046  auto const& user = session_ptr->get_currentUser();
2047  if (!user.isSuper &&
2048  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
2049  return;
2050  }
2051  auto* rl = SysCatalog::instance().getGrantee(roleName);
2052  if (rl) {
2053  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
2054  for (auto& dbObject : *rl->getDbObjects(true)) {
2055  if (dbObject.first.dbId != dbId) {
2056  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
2057  // usecase for now, though)
2058  continue;
2059  }
2060  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
2061  TDBObjectsForRole.push_back(tdbObject);
2062  }
2063  } else {
2064  THROW_DB_EXCEPTION("User or role " + roleName + " does not exist.");
2065  }
2066 }
2067 
2068 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
2069  const TSessionId& sessionId,
2070  const std::string& objectName,
2071  const TDBObjectType::type type) {
2072  auto stdlog = STDLOG(get_session_ptr(sessionId));
2073  auto session_ptr = stdlog.getConstSessionInfo();
2074  DBObjectType object_type;
2075  switch (type) {
2077  object_type = DBObjectType::DatabaseDBObjectType;
2078  break;
2080  object_type = DBObjectType::TableDBObjectType;
2081  break;
2084  break;
2086  object_type = DBObjectType::ViewDBObjectType;
2087  break;
2089  object_type = DBObjectType::ServerDBObjectType;
2090  break;
2091  default:
2092  THROW_DB_EXCEPTION("Failed to get object privileges for " + objectName +
2093  ": unknown object type (" + std::to_string(type) + ").");
2094  }
2095  DBObject object_to_find(objectName, object_type);
2096 
2097  // TODO(adb): Use DatabaseLock to protect method
2098  try {
2099  if (object_type == DashboardDBObjectType) {
2100  if (objectName == "") {
2101  object_to_find = DBObject(-1, object_type);
2102  } else {
2103  object_to_find = DBObject(std::stoi(objectName), object_type);
2104  }
2105  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
2106  !objectName.empty()) {
2107  // special handling for view / table
2108  auto const& cat = session_ptr->getCatalog();
2109  auto td = cat.getMetadataForTable(objectName, false);
2110  if (td) {
2111  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
2112  object_to_find = DBObject(objectName, object_type);
2113  }
2114  }
2115  object_to_find.loadKey(session_ptr->getCatalog());
2116  } catch (const std::exception&) {
2117  THROW_DB_EXCEPTION("Object with name " + objectName + " does not exist.");
2118  }
2119 
2120  // object type on database level
2121  DBObject object_to_find_dblevel("", object_type);
2122  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
2123  // if user is superuser respond with a full priv
2124  if (session_ptr->get_currentUser().isSuper) {
2125  // using ALL_TABLE here to set max permissions
2126  DBObject dbObj{object_to_find.getObjectKey(),
2128  session_ptr->get_currentUser().userId};
2129  dbObj.setName("super");
2130  TDBObjects.push_back(
2131  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
2132  };
2133 
2134  std::vector<std::string> grantees =
2135  SysCatalog::instance().getRoles(true,
2136  session_ptr->get_currentUser().isSuper,
2137  session_ptr->get_currentUser().userName);
2138  for (const auto& grantee : grantees) {
2139  DBObject* object_found;
2140  auto* gr = SysCatalog::instance().getGrantee(grantee);
2141  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
2142  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2143  }
2144  // check object permissions on Database level
2145  if (gr &&
2146  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
2147  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2148  }
2149  }
2150 }
2151 
2153  std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr,
2154  std::vector<std::string>& roles,
2155  const TSessionId& sessionId,
2156  const std::string& granteeName,
2157  bool effective) {
2158  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
2159  if (grantee) {
2160  if (session_ptr->get_currentUser().isSuper) {
2161  roles = grantee->getRoles(/*only_direct=*/!effective);
2162  } else if (grantee->isUser()) {
2163  if (session_ptr->get_currentUser().userName == granteeName) {
2164  roles = grantee->getRoles(/*only_direct=*/!effective);
2165  } else {
2167  "Only a superuser is authorized to request list of roles granted to another "
2168  "user.");
2169  }
2170  } else {
2171  CHECK(!grantee->isUser());
2172  // granteeName is actually a roleName here and we can check a role
2173  // only if it is granted to us
2174  if (SysCatalog::instance().isRoleGrantedToGrantee(
2175  session_ptr->get_currentUser().userName, granteeName, false)) {
2176  roles = grantee->getRoles(/*only_direct=*/!effective);
2177  } else {
2178  THROW_DB_EXCEPTION("A user can check only roles granted to him.");
2179  }
2180  }
2181  } else {
2182  THROW_DB_EXCEPTION("Grantee " + granteeName + " does not exist.");
2183  }
2184 }
2185 
2186 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
2187  const TSessionId& sessionId,
2188  const std::string& granteeName) {
2189  // WARNING: This function only returns directly granted roles.
2190  // See also: get_all_effective_roles_for_user() for all of a user's roles.
2191  auto stdlog = STDLOG(get_session_ptr(sessionId));
2192  auto session_ptr = stdlog.getConstSessionInfo();
2193  getAllRolesForUserImpl(session_ptr, roles, sessionId, granteeName, /*effective=*/false);
2194 }
2195 
2196 void DBHandler::get_all_effective_roles_for_user(std::vector<std::string>& roles,
2197  const TSessionId& sessionId,
2198  const std::string& granteeName) {
2199  auto stdlog = STDLOG(get_session_ptr(sessionId));
2200  auto session_ptr = stdlog.getConstSessionInfo();
2201  getAllRolesForUserImpl(session_ptr, roles, sessionId, granteeName, /*effective=*/true);
2202 }
2203 
2204 namespace {
2206  const std::map<std::string, std::vector<std::string>>& table_col_names) {
2207  std::ostringstream oss;
2208  for (const auto& [table_name, col_names] : table_col_names) {
2209  oss << ":" << table_name;
2210  for (const auto& col_name : col_names) {
2211  oss << "," << col_name;
2212  }
2213  }
2214  return oss.str();
2215 }
2216 } // namespace
2217 
2219  TPixelTableRowResult& _return,
2220  const TSessionId& session,
2221  const int64_t widget_id,
2222  const TPixel& pixel,
2223  const std::map<std::string, std::vector<std::string>>& table_col_names,
2224  const bool column_format,
2225  const int32_t pixel_radius,
2226  const std::string& nonce) {
2227  auto stdlog = STDLOG(get_session_ptr(session),
2228  "widget_id",
2229  widget_id,
2230  "pixel.x",
2231  pixel.x,
2232  "pixel.y",
2233  pixel.y,
2234  "column_format",
2235  column_format,
2236  "pixel_radius",
2237  pixel_radius,
2238  "table_col_names",
2239  dump_table_col_names(table_col_names),
2240  "nonce",
2241  nonce);
2242  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2243  auto session_ptr = stdlog.getSessionInfo();
2244  if (!render_handler_) {
2245  THROW_DB_EXCEPTION("Backend rendering is disabled.");
2246  }
2247 
2248  try {
2249  render_handler_->get_result_row_for_pixel(_return,
2250  session_ptr,
2251  widget_id,
2252  pixel,
2253  table_col_names,
2254  column_format,
2255  pixel_radius,
2256  nonce);
2257  } catch (std::exception& e) {
2258  THROW_DB_EXCEPTION(e.what());
2259  }
2260 }
2261 
2263  const ColumnDescriptor* cd) {
2264  TColumnType col_type;
2265  col_type.col_name = cd->columnName;
2266  col_type.src_name = cd->sourceName;
2267  col_type.col_id = cd->columnId;
2268  col_type.col_type.type = type_to_thrift(cd->columnType);
2269  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
2270  col_type.col_type.nullable = !cd->columnType.get_notnull();
2271  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
2272  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
2273  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
2274  }
2275  if (IS_GEO(cd->columnType.get_type())) {
2277  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
2278  } else {
2279  col_type.col_type.precision = cd->columnType.get_precision();
2280  col_type.col_type.scale = cd->columnType.get_scale();
2281  }
2282  col_type.is_system = cd->isSystemCol;
2284  cat != nullptr) {
2285  // have to get the actual size of the encoding from the dictionary definition
2286  const int dict_id = cd->columnType.get_comp_param();
2287  if (!cat->getMetadataForDict(dict_id, false)) {
2288  col_type.col_type.comp_param = 0;
2289  return col_type;
2290  }
2291  auto dd = cat->getMetadataForDict(dict_id, false);
2292  if (!dd) {
2293  THROW_DB_EXCEPTION("Dictionary doesn't exist");
2294  }
2295  col_type.col_type.comp_param = dd->dictNBits;
2296  } else {
2297  col_type.col_type.comp_param =
2298  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
2299  ? 32
2300  : cd->columnType.get_comp_param();
2301  }
2302  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
2303  if (cd->default_value.has_value()) {
2304  col_type.__set_default_value(cd->getDefaultValueLiteral());
2305  }
2306  return col_type;
2307 }
2308 
2309 void DBHandler::get_internal_table_details(TTableDetails& _return,
2310  const TSessionId& session,
2311  const std::string& table_name,
2312  const bool include_system_columns) {
2313  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2314  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2315  get_table_details_impl(_return, stdlog, table_name, include_system_columns, false);
2316 }
2317 
2319  TTableDetails& _return,
2320  const TSessionId& session,
2321  const std::string& table_name,
2322  const std::string& database_name) {
2323  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2324  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2325  get_table_details_impl(_return, stdlog, table_name, true, false, database_name);
2326 }
2327 
2328 void DBHandler::get_table_details(TTableDetails& _return,
2329  const TSessionId& session,
2330  const std::string& table_name) {
2331  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2332  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2333 
2334  auto execute_read_lock =
2338  get_table_details_impl(_return, stdlog, table_name, false, false);
2339 }
2340 
2341 void DBHandler::get_table_details_for_database(TTableDetails& _return,
2342  const TSessionId& session,
2343  const std::string& table_name,
2344  const std::string& database_name) {
2345  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2346  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2347 
2348  auto execute_read_lock =
2352  get_table_details_impl(_return, stdlog, table_name, false, false, database_name);
2353 }
2354 
2355 namespace {
2356 TTableRefreshInfo get_refresh_info(const TableDescriptor* td) {
2357  CHECK(td->isForeignTable());
2358  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
2359  CHECK(foreign_table);
2360  TTableRefreshInfo refresh_info;
2361  const auto& update_type =
2363  CHECK(update_type.has_value());
2364  if (update_type.value() == foreign_storage::ForeignTable::ALL_REFRESH_UPDATE_TYPE) {
2365  refresh_info.update_type = TTableRefreshUpdateType::ALL;
2366  } else if (update_type.value() ==
2368  refresh_info.update_type = TTableRefreshUpdateType::APPEND;
2369  } else {
2370  UNREACHABLE() << "Unexpected refresh update type: " << update_type.value();
2371  }
2372 
2373  const auto& timing_type =
2375  CHECK(timing_type.has_value());
2376  if (timing_type.value() == foreign_storage::ForeignTable::MANUAL_REFRESH_TIMING_TYPE) {
2377  refresh_info.timing_type = TTableRefreshTimingType::MANUAL;
2378  refresh_info.interval_count = -1;
2379  } else if (timing_type.value() ==
2381  refresh_info.timing_type = TTableRefreshTimingType::SCHEDULED;
2382  const auto& start_date_time = foreign_table->getOption(
2384  CHECK(start_date_time.has_value());
2385  auto start_date_time_epoch = dateTimeParse<kTIMESTAMP>(start_date_time.value(), 0);
2386  refresh_info.start_date_time =
2387  shared::convert_temporal_to_iso_format({kTIMESTAMP}, start_date_time_epoch);
2388  const auto& interval =
2389  foreign_table->getOption(foreign_storage::ForeignTable::REFRESH_INTERVAL_KEY);
2390  CHECK(interval.has_value());
2391  const auto& interval_str = interval.value();
2392  refresh_info.interval_count =
2393  std::stoi(interval_str.substr(0, interval_str.length() - 1));
2394  auto interval_type = std::toupper(interval_str[interval_str.length() - 1]);
2395  if (interval_type == 'H') {
2396  refresh_info.interval_type = TTableRefreshIntervalType::HOUR;
2397  } else if (interval_type == 'D') {
2398  refresh_info.interval_type = TTableRefreshIntervalType::DAY;
2399  } else if (interval_type == 'S') {
2400  // This use case is for development only.
2401  refresh_info.interval_type = TTableRefreshIntervalType::NONE;
2402  } else {
2403  UNREACHABLE() << "Unexpected interval type: " << interval_str;
2404  }
2405  } else {
2406  UNREACHABLE() << "Unexpected refresh timing type: " << timing_type.value();
2407  }
2408  if (foreign_table->last_refresh_time !=
2410  refresh_info.last_refresh_time = shared::convert_temporal_to_iso_format(
2411  {kTIMESTAMP}, foreign_table->last_refresh_time);
2412  }
2413  if (foreign_table->next_refresh_time !=
2415  refresh_info.next_refresh_time = shared::convert_temporal_to_iso_format(
2416  {kTIMESTAMP}, foreign_table->next_refresh_time);
2417  }
2418  return refresh_info;
2419 }
2420 } // namespace
2421 void DBHandler::get_table_details_impl(TTableDetails& _return,
2422  query_state::StdLog& stdlog,
2423  const std::string& table_name,
2424  const bool get_system,
2425  const bool get_physical,
2426  const std::string& database_name) {
2427  try {
2428  auto session_info = stdlog.getSessionInfo();
2429  auto cat = (database_name.empty())
2430  ? &session_info->getCatalog()
2431  : SysCatalog::instance().getCatalog(database_name).get();
2432  if (!cat) {
2433  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2434  }
2435  const auto td_with_lock =
2437  *cat, table_name, false);
2438  const auto td = td_with_lock();
2439  CHECK(td);
2440 
2441  bool have_privileges_on_view_sources = true;
2442  if (td->isView) {
2443  auto query_state = create_query_state(session_info, td->viewSQL);
2444  stdlog.setQueryState(query_state);
2445  try {
2446  if (hasTableAccessPrivileges(td, *session_info)) {
2447  const auto [query_ra, locks] = parse_to_ra(query_state->createQueryStateProxy(),
2448  query_state->getQueryStr(),
2449  {},
2450  true,
2452  false);
2453  try {
2454  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2455  query_ra);
2456  } catch (const std::runtime_error&) {
2457  have_privileges_on_view_sources = false;
2458  }
2459 
2460  const auto result = validate_rel_alg(query_ra.plan_result,
2461  query_state->createQueryStateProxy());
2462 
2463  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, *cat);
2464  } else {
2465  throw std::runtime_error(
2466  "Unable to access view " + table_name +
2467  ". The view may not exist, or the logged in user may not "
2468  "have permission to access the view.");
2469  }
2470  } catch (const std::exception& e) {
2471  throw std::runtime_error("View '" + table_name +
2472  "' query has failed with an error: '" +
2473  std::string(e.what()) +
2474  "'.\nThe view must be dropped and re-created to "
2475  "resolve the error. \nQuery:\n" +
2476  query_state->getQueryStr());
2477  }
2478  } else {
2479  if (hasTableAccessPrivileges(td, *session_info)) {
2480  const auto col_descriptors = cat->getAllColumnMetadataForTable(
2481  td->tableId, get_system, true, get_physical);
2482  const auto deleted_cd = cat->getDeletedColumn(td);
2483  for (const auto cd : col_descriptors) {
2484  if (cd == deleted_cd) {
2485  continue;
2486  }
2487  _return.row_desc.push_back(populateThriftColumnType(cat, cd));
2488  }
2489  } else {
2490  throw std::runtime_error(
2491  "Unable to access table " + table_name +
2492  ". The table may not exist, or the logged in user may not "
2493  "have permission to access the table.");
2494  }
2495  }
2496  _return.fragment_size = td->maxFragRows;
2497  _return.page_size = td->fragPageSize;
2498  _return.max_rows = td->maxRows;
2499  _return.view_sql =
2500  (have_privileges_on_view_sources ? td->viewSQL
2501  : "[Not enough privileges to see the view SQL]");
2502  _return.shard_count = td->nShards;
2503  _return.key_metainfo = td->keyMetainfo;
2504  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2505  _return.partition_detail =
2506  td->partitions.empty()
2507  ? TPartitionDetail::DEFAULT
2508  : (table_is_replicated(td)
2509  ? TPartitionDetail::REPLICATED
2510  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2511  : TPartitionDetail::OTHER));
2512  if (td->isView) {
2513  _return.table_type = TTableType::VIEW;
2514  } else if (td->isTemporaryTable()) {
2515  _return.table_type = TTableType::TEMPORARY;
2516  } else if (td->isForeignTable()) {
2517  _return.table_type = TTableType::FOREIGN;
2518  _return.refresh_info = get_refresh_info(td);
2519  } else {
2520  _return.table_type = TTableType::DEFAULT;
2521  }
2522 
2523  } catch (const std::runtime_error& e) {
2524  THROW_DB_EXCEPTION(std::string(e.what()));
2525  }
2526 }
2527 
2528 void DBHandler::get_link_view(TFrontendView& _return,
2529  const TSessionId& session,
2530  const std::string& link) {
2531  auto stdlog = STDLOG(get_session_ptr(session));
2532  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2533  auto session_ptr = stdlog.getConstSessionInfo();
2534  auto const& cat = session_ptr->getCatalog();
2535  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2536  if (!ld) {
2537  THROW_DB_EXCEPTION("Link " + link + " is not valid.");
2538  }
2539  _return.view_state = ld->viewState;
2540  _return.view_name = ld->link;
2541  _return.update_time = ld->updateTime;
2542  _return.view_metadata = ld->viewMetadata;
2543 }
2544 
2546  const TableDescriptor* td,
2547  const Catalog_Namespace::SessionInfo& session_info) {
2548  auto& cat = session_info.getCatalog();
2549  auto user_metadata = session_info.get_currentUser();
2550 
2551  if (user_metadata.isSuper) {
2552  return true;
2553  }
2554 
2556  dbObject.loadKey(cat);
2557  std::vector<DBObject> privObjects = {dbObject};
2558 
2559  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2560 }
2561 
2562 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2563  const Catalog_Namespace::SessionInfo& session_info,
2564  const GetTablesType get_tables_type,
2565  const std::string& database_name) {
2566  if (database_name.empty()) {
2567  table_names = session_info.getCatalog().getTableNamesForUser(
2568  session_info.get_currentUser(), get_tables_type);
2569  } else {
2570  auto request_cat = SysCatalog::instance().getCatalog(database_name);
2571  if (!request_cat) {
2572  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2573  }
2574  table_names = request_cat->getTableNamesForUser(session_info.get_currentUser(),
2575  get_tables_type);
2576  }
2577 }
2578 
2579 void DBHandler::get_tables(std::vector<std::string>& table_names,
2580  const TSessionId& session) {
2581  auto stdlog = STDLOG(get_session_ptr(session));
2582  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2584  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2585 }
2586 
2587 void DBHandler::get_tables_for_database(std::vector<std::string>& table_names,
2588  const TSessionId& session,
2589  const std::string& database_name) {
2590  auto stdlog = STDLOG(get_session_ptr(session));
2591  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2592 
2593  get_tables_impl(table_names,
2594  *stdlog.getConstSessionInfo(),
2596  database_name);
2597 }
2598 
2599 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2600  const TSessionId& session) {
2601  auto stdlog = STDLOG(get_session_ptr(session));
2602  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2603  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2604 }
2605 
2606 void DBHandler::get_views(std::vector<std::string>& table_names,
2607  const TSessionId& session) {
2608  auto stdlog = STDLOG(get_session_ptr(session));
2609  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2610  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2611 }
2612 
2613 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2614  QueryStateProxy query_state_proxy,
2615  const Catalog_Namespace::SessionInfo& session_info,
2616  const bool with_table_locks) {
2617  const auto& cat = session_info.getCatalog();
2618  // Get copies of table descriptors here in order to avoid possible use of dangling
2619  // pointers, if tables are concurrently dropped.
2620  const auto tables = cat.getAllTableMetadataCopy();
2621  _return.reserve(tables.size());
2622 
2623  for (const auto& td : tables) {
2624  if (td.shard >= 0) {
2625  // skip shards, they're not standalone tables
2626  continue;
2627  }
2628  if (!hasTableAccessPrivileges(&td, session_info)) {
2629  // skip table, as there are no privileges to access it
2630  continue;
2631  }
2632 
2633  TTableMeta ret;
2634  ret.table_name = td.tableName;
2635  ret.is_view = td.isView;
2636  ret.is_replicated = table_is_replicated(&td);
2637  ret.shard_count = td.nShards;
2638  ret.max_rows = td.maxRows;
2639  ret.table_id = td.tableId;
2640 
2641  std::vector<TTypeInfo> col_types;
2642  std::vector<std::string> col_names;
2643  size_t num_cols = 0;
2644  if (td.isView) {
2645  try {
2646  TPlanResult parse_result;
2648  std::tie(parse_result, locks) = parse_to_ra(
2649  query_state_proxy, td.viewSQL, {}, with_table_locks, system_parameters_);
2650  const auto query_ra = parse_result.plan_result;
2651 
2652  ExecutionResult ex_result;
2653  execute_rel_alg(ex_result,
2654  query_state_proxy,
2655  query_ra,
2656  true,
2658  -1,
2659  -1,
2660  /*just_validate=*/true,
2661  /*find_push_down_candidates=*/false,
2662  ExplainInfo());
2663  TQueryResult result;
2664  DBHandler::convertData(result, ex_result, query_state_proxy, true, -1, -1);
2665  num_cols = result.row_set.row_desc.size();
2666  for (const auto& col : result.row_set.row_desc) {
2667  if (col.is_physical) {
2668  num_cols--;
2669  continue;
2670  }
2671  col_types.push_back(col.col_type);
2672  col_names.push_back(col.col_name);
2673  }
2674  } catch (std::exception& e) {
2675  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td.tableName;
2676  }
2677  } else {
2678  try {
2679  if (hasTableAccessPrivileges(&td, session_info)) {
2680  const auto col_descriptors =
2681  cat.getAllColumnMetadataForTable(td.tableId, false, true, false);
2682  const auto deleted_cd = cat.getDeletedColumn(&td);
2683  for (const auto cd : col_descriptors) {
2684  if (cd == deleted_cd) {
2685  continue;
2686  }
2687  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2688  col_names.push_back(cd->columnName);
2689  }
2690  num_cols = col_descriptors.size();
2691  } else {
2692  continue;
2693  }
2694  } catch (const std::runtime_error& e) {
2695  THROW_DB_EXCEPTION(e.what());
2696  }
2697  }
2698 
2699  ret.num_cols = num_cols;
2700  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2701  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2702 
2703  _return.push_back(ret);
2704  }
2705 }
2706 
2707 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2708  const TSessionId& session) {
2709  auto stdlog = STDLOG(get_session_ptr(session));
2710  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2711  auto session_ptr = stdlog.getConstSessionInfo();
2712  auto query_state = create_query_state(session_ptr, "");
2713  stdlog.setQueryState(query_state);
2714 
2715  auto execute_read_lock =
2719 
2720  try {
2721  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2722  } catch (const std::exception& e) {
2723  THROW_DB_EXCEPTION(e.what());
2724  }
2725 }
2726 
2727 void DBHandler::get_users(std::vector<std::string>& user_names,
2728  const TSessionId& session) {
2729  auto stdlog = STDLOG(get_session_ptr(session));
2730  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2731  auto session_ptr = stdlog.getConstSessionInfo();
2732  std::list<Catalog_Namespace::UserMetadata> user_list;
2733 
2734  if (!session_ptr->get_currentUser().isSuper) {
2735  user_list = SysCatalog::instance().getAllUserMetadata(
2736  session_ptr->getCatalog().getCurrentDB().dbId);
2737  } else {
2738  user_list = SysCatalog::instance().getAllUserMetadata();
2739  }
2740  for (auto u : user_list) {
2741  user_names.push_back(u.userName);
2742  }
2743 }
2744 
2745 void DBHandler::get_version(std::string& version) {
2746  version = MAPD_RELEASE;
2747 }
2748 
2749 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2750  auto stdlog = STDLOG(get_session_ptr(session));
2751  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2752  auto session_ptr = stdlog.getConstSessionInfo();
2753  if (!session_ptr->get_currentUser().isSuper) {
2754  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2755  }
2756  try {
2758  } catch (const std::exception& e) {
2759  THROW_DB_EXCEPTION(e.what());
2760  }
2761  if (render_handler_) {
2762  render_handler_->clear_gpu_memory();
2763  }
2764 
2765  if (leaf_aggregator_.leafCount() > 0) {
2767  }
2768 }
2769 
2770 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2771  auto stdlog = STDLOG(get_session_ptr(session));
2772  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2773  auto session_ptr = stdlog.getConstSessionInfo();
2774  if (!session_ptr->get_currentUser().isSuper) {
2775  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2776  }
2777  try {
2779  } catch (const std::exception& e) {
2780  THROW_DB_EXCEPTION(e.what());
2781  }
2782  if (render_handler_) {
2783  render_handler_->clear_cpu_memory();
2784  }
2785 
2786  if (leaf_aggregator_.leafCount() > 0) {
2788  }
2789 }
2790 
2791 void DBHandler::clearRenderMemory(const TSessionId& session) {
2792  auto stdlog = STDLOG(get_session_ptr(session));
2793  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2794  auto session_ptr = stdlog.getConstSessionInfo();
2795  if (!session_ptr->get_currentUser().isSuper) {
2796  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_render_memory");
2797  }
2798  if (render_handler_) {
2799  render_handler_->clear_cpu_memory();
2800  render_handler_->clear_gpu_memory();
2801  }
2802 }
2803 
2804 void DBHandler::set_cur_session(const TSessionId& parent_session,
2805  const TSessionId& leaf_session,
2806  const std::string& start_time_str,
2807  const std::string& label,
2808  bool for_running_query_kernel) {
2809  // internal API to manage query interruption in distributed mode
2810  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2811  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2812  auto session_ptr = stdlog.getConstSessionInfo();
2813 
2815  executor->enrollQuerySession(parent_session,
2816  label,
2817  start_time_str,
2819  for_running_query_kernel
2820  ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2821  : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
2822 }
2823 
2824 void DBHandler::invalidate_cur_session(const TSessionId& parent_session,
2825  const TSessionId& leaf_session,
2826  const std::string& start_time_str,
2827  const std::string& label,
2828  bool for_running_query_kernel) {
2829  // internal API to manage query interruption in distributed mode
2830  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2831  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2833  executor->clearQuerySessionStatus(parent_session, start_time_str);
2834 }
2835 
2837  return INVALID_SESSION_ID;
2838 }
2839 
2840 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2841  const TSessionId& session,
2842  const std::string& memory_level) {
2843  auto stdlog = STDLOG(get_session_ptr(session));
2844  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2845  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2846  Data_Namespace::MemoryLevel mem_level;
2847  if (!memory_level.compare("gpu")) {
2849  internal_memory =
2850  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2851  } else {
2853  internal_memory =
2854  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2855  }
2856 
2857  for (auto memInfo : internal_memory) {
2858  TNodeMemoryInfo nodeInfo;
2859  if (leaf_aggregator_.leafCount() > 0) {
2860  nodeInfo.host_name = heavyai::get_hostname();
2861  }
2862  nodeInfo.page_size = memInfo.pageSize;
2863  nodeInfo.max_num_pages = memInfo.maxNumPages;
2864  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2865  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2866  for (auto gpu : memInfo.nodeMemoryData) {
2867  TMemoryData md;
2868  md.slab = gpu.slabNum;
2869  md.start_page = gpu.startPage;
2870  md.num_pages = gpu.numPages;
2871  md.touch = gpu.touch;
2872  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2873  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2874  nodeInfo.node_memory_data.push_back(md);
2875  }
2876  _return.push_back(nodeInfo);
2877  }
2878  if (leaf_aggregator_.leafCount() > 0) {
2879  std::vector<TNodeMemoryInfo> leafSummary =
2880  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2881  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2882  }
2883 }
2884 
2885 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2886  auto stdlog = STDLOG(get_session_ptr(session));
2887  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2888  auto session_ptr = stdlog.getConstSessionInfo();
2889  const auto& user = session_ptr->get_currentUser();
2891  SysCatalog::instance().getDatabaseListForUser(user);
2892  for (auto& db : dbs) {
2893  TDBInfo dbinfo;
2894  dbinfo.db_name = std::move(db.dbName);
2895  dbinfo.db_owner = std::move(db.dbOwnerName);
2896  dbinfos.push_back(std::move(dbinfo));
2897  }
2898 }
2899 
2901  auto executor = get_session_ptr(session)->get_executor_device_type();
2902  switch (executor) {
2904  return TExecuteMode::CPU;
2906  return TExecuteMode::GPU;
2907  default:
2908  UNREACHABLE();
2909  }
2910  UNREACHABLE();
2911  return TExecuteMode::CPU;
2912 }
2913 
2914 void DBHandler::set_execution_mode(const TSessionId& session,
2915  const TExecuteMode::type mode) {
2916  auto stdlog = STDLOG(get_session_ptr(session));
2917  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2919  auto session_it = get_session_it_unsafe(session, write_lock);
2920  if (leaf_aggregator_.leafCount() > 0) {
2921  leaf_aggregator_.set_execution_mode(session, mode);
2922  try {
2923  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2924  } catch (const TDBException& e) {
2925  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2926  }
2927  return;
2928  }
2929  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2930 }
2931 
2932 namespace {
2933 
2935  if (td && td->nShards) {
2936  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2937  }
2938 }
2939 
2940 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
2941  const std::vector<std::string>& column_names) {
2942  std::unordered_set<std::string> unique_names;
2943  for (const auto& name : column_names) {
2944  auto lower_name = to_lower(name);
2945  if (unique_names.find(lower_name) != unique_names.end()) {
2946  THROW_DB_EXCEPTION("Column " + name + " is mentioned multiple times");
2947  } else {
2948  unique_names.insert(lower_name);
2949  }
2950  }
2951  for (const auto& cd : descs) {
2952  auto iter = unique_names.find(to_lower(cd->columnName));
2953  if (iter != unique_names.end()) {
2954  unique_names.erase(iter);
2955  }
2956  }
2957  if (!unique_names.empty()) {
2958  THROW_DB_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
2959  }
2960 }
2961 
2962 // Return vector of IDs mapping column descriptors to the list of comumn names.
2963 // The size of the vector is the number of actual columns (geophisical columns excluded).
2964 // ID is either a position in column_names matching the descriptor, or -1 if the column
2965 // is missing from the column_names
2966 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
2967  const std::vector<std::string>& column_names) {
2968  std::vector<int> desc_to_column_ids;
2969  if (column_names.empty()) {
2970  int col_idx = 0;
2971  for (const auto& cd : descs) {
2972  if (!cd->isGeoPhyCol) {
2973  desc_to_column_ids.push_back(col_idx);
2974  ++col_idx;
2975  }
2976  }
2977  } else {
2978  for (const auto& cd : descs) {
2979  if (!cd->isGeoPhyCol) {
2980  bool found = false;
2981  for (size_t j = 0; j < column_names.size(); ++j) {
2982  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
2983  found = true;
2984  desc_to_column_ids.push_back(j);
2985  break;
2986  }
2987  }
2988  if (!found) {
2989  if (!cd->columnType.get_notnull()) {
2990  desc_to_column_ids.push_back(-1);
2991  } else {
2992  THROW_DB_EXCEPTION("Column '" + cd->columnName +
2993  "' cannot be omitted due to NOT NULL constraint");
2994  }
2995  }
2996  }
2997  }
2998  }
2999  return desc_to_column_ids;
3000 }
3001 
3002 } // namespace
3003 
3005  const TSessionId& session,
3006  const Catalog& catalog,
3007  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3008  const ColumnDescriptor* cd,
3009  size_t& col_idx,
3010  size_t num_rows,
3011  const std::string& table_name,
3012  bool assign_render_groups) {
3013  auto geo_col_idx = col_idx - 1;
3014  const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
3015  std::vector<std::vector<double>> coords_column, bounds_column;
3016  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
3017  std::vector<int> render_groups_column;
3018  SQLTypeInfo ti = cd->columnType;
3019  if (num_rows != wkt_or_wkb_hex_column->size() ||
3020  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
3021  ti,
3022  coords_column,
3023  bounds_column,
3024  ring_sizes_column,
3025  poly_rings_column,
3026  true)) {
3027  std::ostringstream oss;
3028  oss << "Invalid geometry in column " << cd->columnName;
3029  THROW_DB_EXCEPTION(oss.str());
3030  }
3031 
3032  // start or continue assigning render groups for poly columns?
3033  if (IS_GEO_POLY(cd->columnType.get_type()) && assign_render_groups &&
3035  // get RGA to use
3036  import_export::RenderGroupAnalyzer* render_group_analyzer{};
3037  {
3038  // mutex the map access
3039  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
3040 
3041  // emplace new RGA or fetch existing RGA from map
3042  auto [itr_table, emplaced_table] = render_group_assignment_map_.try_emplace(
3043  session, RenderGroupAssignmentTableMap());
3044  LOG_IF(INFO, emplaced_table)
3045  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3046  "Persistent Data for Session '"
3047  << session << "'";
3048  auto [itr_column, emplaced_column] =
3049  itr_table->second.try_emplace(table_name, RenderGroupAssignmentColumnMap());
3050  LOG_IF(INFO, emplaced_column)
3051  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3052  "Persistent Data for Table '"
3053  << table_name << "'";
3054  auto [itr_analyzer, emplaced_analyzer] = itr_column->second.try_emplace(
3055  cd->columnName, std::make_unique<import_export::RenderGroupAnalyzer>());
3056  LOG_IF(INFO, emplaced_analyzer)
3057  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3058  "Persistent Data for Column '"
3059  << cd->columnName << "'";
3060  render_group_analyzer = itr_analyzer->second.get();
3061  CHECK(render_group_analyzer);
3062 
3063  // seed new RGA from existing table/column, to handle appends
3064  if (emplaced_analyzer) {
3065  LOG(INFO) << "load_table_binary_columnar_polys: Seeding Render Groups from "
3066  "existing table...";
3067  render_group_analyzer->seedFromExistingTableContents(
3068  catalog, table_name, cd->columnName);
3069  LOG(INFO) << "load_table_binary_columnar_polys: Done";
3070  }
3071  }
3072 
3073  // assign render groups for this set of bounds
3074  LOG(INFO) << "load_table_binary_columnar_polys: Assigning Render Groups...";
3075  render_groups_column.reserve(bounds_column.size());
3076  for (auto const& bounds : bounds_column) {
3077  CHECK_EQ(bounds.size(), 4u);
3078  int rg = render_group_analyzer->insertBoundsAndReturnRenderGroup(bounds);
3079  render_groups_column.push_back(rg);
3080  }
3081  LOG(INFO) << "load_table_binary_columnar_polys: Done";
3082  } else {
3083  // render groups all zero
3084  render_groups_column.resize(bounds_column.size(), 0);
3085  }
3086 
3087  // Populate physical columns, advance col_idx
3089  cd,
3090  import_buffers,
3091  col_idx,
3092  coords_column,
3093  bounds_column,
3094  ring_sizes_column,
3095  poly_rings_column,
3096  render_groups_column);
3097 }
3098 
3100  const TSessionId& session,
3101  const Catalog& catalog,
3102  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3103  const std::list<const ColumnDescriptor*>& cds,
3104  const std::vector<int>& desc_id_to_column_id,
3105  size_t num_rows,
3106  const std::string& table_name,
3107  bool assign_render_groups) {
3108  size_t skip_physical_cols = 0;
3109  size_t col_idx = 0, import_idx = 0;
3110  for (const auto& cd : cds) {
3111  if (skip_physical_cols > 0) {
3112  CHECK(cd->isGeoPhyCol);
3113  skip_physical_cols--;
3114  continue;
3115  } else if (cd->columnType.is_geometry()) {
3116  skip_physical_cols = cd->columnType.get_physical_cols();
3117  }
3118  if (desc_id_to_column_id[import_idx] == -1) {
3119  import_buffers[col_idx]->addDefaultValues(cd, num_rows);
3120  col_idx++;
3121  if (cd->columnType.is_geometry()) {
3122  fillGeoColumns(session,
3123  catalog,
3124  import_buffers,
3125  cd,
3126  col_idx,
3127  num_rows,
3128  table_name,
3129  assign_render_groups);
3130  }
3131  } else {
3132  col_idx++;
3133  col_idx += skip_physical_cols;
3134  }
3135  import_idx++;
3136  }
3137 }
3138 
3139 void DBHandler::load_table_binary(const TSessionId& session,
3140  const std::string& table_name,
3141  const std::vector<TRow>& rows,
3142  const std::vector<std::string>& column_names) {
3143  try {
3144  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3145  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3146  auto session_ptr = stdlog.getConstSessionInfo();
3147 
3148  if (rows.empty()) {
3149  THROW_DB_EXCEPTION("No rows to insert");
3150  }
3151 
3152  const auto execute_read_lock =
3156  std::unique_ptr<import_export::Loader> loader;
3157  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3158  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3159  table_name,
3160  rows.front().cols.size(),
3161  &loader,
3162  &import_buffers,
3163  column_names,
3164  "load_table_binary");
3165 
3166  auto col_descs = loader->get_column_descs();
3167  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3168 
3169  size_t rows_completed = 0;
3170  for (auto const& row : rows) {
3171  size_t col_idx = 0;
3172  try {
3173  for (auto cd : col_descs) {
3174  auto mapped_idx = desc_id_to_column_id[col_idx];
3175  if (mapped_idx != -1) {
3176  import_buffers[col_idx]->add_value(
3177  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3178  }
3179  col_idx++;
3180  }
3181  rows_completed++;
3182  } catch (const std::exception& e) {
3183  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3184  import_buffers[col_idx_to_pop]->pop_value();
3185  }
3186  LOG(ERROR) << "Input exception thrown: " << e.what()
3187  << ". Row discarded, issue at column : " << (col_idx + 1)
3188  << " data :" << row;
3189  }
3190  }
3191  fillMissingBuffers(session,
3192  session_ptr->getCatalog(),
3193  import_buffers,
3194  col_descs,
3195  desc_id_to_column_id,
3196  rows_completed,
3197  table_name,
3198  false);
3199  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3200  session_ptr->getCatalog(), table_name);
3201  if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3202  THROW_DB_EXCEPTION(loader->getErrorMessage());
3203  }
3204  } catch (const std::exception& e) {
3205  THROW_DB_EXCEPTION(std::string(e.what()));
3206  }
3207 }
3208 
3209 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3211  const Catalog_Namespace::SessionInfo& session_info,
3212  const std::string& table_name,
3213  size_t num_cols,
3214  std::unique_ptr<import_export::Loader>* loader,
3215  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3216  const std::vector<std::string>& column_names,
3217  std::string load_type) {
3218  if (num_cols == 0) {
3219  THROW_DB_EXCEPTION("No columns to insert");
3220  }
3221  check_read_only(load_type);
3222  auto& cat = session_info.getCatalog();
3223  auto td_with_lock =
3224  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3226  cat, table_name, true));
3227  const auto td = (*td_with_lock)();
3228  CHECK(td);
3229 
3230  if (g_cluster && !leaf_aggregator_.leafCount()) {
3231  // Sharded table rows need to be routed to the leaf by an aggregator.
3233  }
3234  check_table_load_privileges(session_info, table_name);
3235 
3236  if (leaf_aggregator_.leafCount() > 0) {
3237  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
3238  } else {
3239  loader->reset(new import_export::Loader(cat, td));
3240  }
3241 
3242  auto col_descs = (*loader)->get_column_descs();
3243  check_valid_column_names(col_descs, column_names);
3244  if (column_names.empty()) {
3245  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
3246  // Subtracting 1 (rowid) until TableDescriptor is updated.
3247  auto geo_physical_cols = std::count_if(
3248  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
3249  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
3250  (td->hasDeletedCol ? 2 : 1);
3251  if (num_cols != num_table_cols) {
3252  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
3253  ") does not match number of columns in table " +
3254  td->tableName + " (" + std::to_string(num_table_cols) +
3255  ")");
3256  }
3257  } else if (num_cols != column_names.size()) {
3259  "Number of columns specified does not match the "
3260  "number of columns given (" +
3261  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
3262  }
3263 
3264  *import_buffers = import_export::setup_column_loaders(td, loader->get());
3265  return std::move(td_with_lock);
3266 }
3267 namespace {
3268 
3269 size_t get_column_size(const TColumn& column) {
3270  if (!column.nulls.empty()) {
3271  return column.nulls.size();
3272  } else {
3273  // it is a very bold estimate but later we check it against REAL data
3274  // and if this function returns a wrong result (e.g. both int and string
3275  // vectors are filled with values), we get an error
3276  return column.data.int_col.size() + column.data.arr_col.size() +
3277  column.data.real_col.size() + column.data.str_col.size();
3278  }
3279 }
3280 
3281 } // namespace
3282 
3283 void DBHandler::load_table_binary_columnar(const TSessionId& session,
3284  const std::string& table_name,
3285  const std::vector<TColumn>& cols,
3286  const std::vector<std::string>& column_names) {
3288  session, table_name, cols, column_names, AssignRenderGroupsMode::kNone);
3289 }
3290 
3292  const TSessionId& session,
3293  const std::string& table_name,
3294  const std::vector<TColumn>& cols,
3295  const std::vector<std::string>& column_names,
3296  const bool assign_render_groups) {
3298  table_name,
3299  cols,
3300  column_names,
3301  assign_render_groups
3304 }
3305 
3307  const TSessionId& session,
3308  const std::string& table_name,
3309  const std::vector<TColumn>& cols,
3310  const std::vector<std::string>& column_names,
3311  const AssignRenderGroupsMode assign_render_groups_mode) {
3312  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3313  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3314  auto session_ptr = stdlog.getConstSessionInfo();
3315 
3316  if (assign_render_groups_mode == AssignRenderGroupsMode::kCleanUp) {
3317  // throw if the user tries to pass column data on a clean-up
3318  if (cols.size()) {
3320  "load_table_binary_columnar_polys: Column data must be empty when called with "
3321  "assign_render_groups = false");
3322  }
3323 
3324  // mutex the map access
3325  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
3326 
3327  // drop persistent render group assignment data for this session and table
3328  // keep the per-session map in case other tables are active (ideally not)
3329  auto itr_session = render_group_assignment_map_.find(session);
3330  if (itr_session != render_group_assignment_map_.end()) {
3331  LOG(INFO) << "load_table_binary_columnar_polys: Cleaning up Render Group "
3332  "Assignment Persistent Data for Session '"
3333  << session << "', Table '" << table_name << "'";
3334  itr_session->second.erase(table_name);
3335  }
3336 
3337  // just doing clean-up, so we're done
3338  return;
3339  }
3340 
3341  const auto execute_read_lock =
3345  std::unique_ptr<import_export::Loader> loader;
3346  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3347  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3348  table_name,
3349  cols.size(),
3350  &loader,
3351  &import_buffers,
3352  column_names,
3353  "load_table_binary_columnar");
3354 
3355  auto desc_id_to_column_id =
3356  column_ids_by_names(loader->get_column_descs(), column_names);
3357  size_t num_rows = get_column_size(cols.front());
3358  size_t import_idx = 0; // index into the TColumn vector being loaded
3359  size_t col_idx = 0; // index into column description vector
3360  try {
3361  size_t skip_physical_cols = 0;
3362  for (auto cd : loader->get_column_descs()) {
3363  if (skip_physical_cols > 0) {
3364  CHECK(cd->isGeoPhyCol);
3365  skip_physical_cols--;
3366  continue;
3367  }
3368  auto mapped_idx = desc_id_to_column_id[import_idx];
3369  if (mapped_idx != -1) {
3370  size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3371  if (col_rows != num_rows) {
3372  std::ostringstream oss;
3373  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
3374  << cd->columnName << " , expecting " << num_rows << " rows, column "
3375  << col_idx << " has " << col_rows << " rows";
3376  THROW_DB_EXCEPTION(oss.str());
3377  }
3378  // Advance to the next column in the table
3379  col_idx++;
3380  // For geometry columns: process WKT strings and fill physical columns
3381  if (cd->columnType.is_geometry()) {
3382  fillGeoColumns(session,
3383  session_ptr->getCatalog(),
3384  import_buffers,
3385  cd,
3386  col_idx,
3387  num_rows,
3388  table_name,
3389  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3390  skip_physical_cols = cd->columnType.get_physical_cols();
3391  }
3392  } else {
3393  col_idx++;
3394  if (cd->columnType.is_geometry()) {
3395  skip_physical_cols = cd->columnType.get_physical_cols();
3396  col_idx += skip_physical_cols;
3397  }
3398  }
3399  // Advance to the next column of values being loaded
3400  import_idx++;
3401  }
3402  } catch (const std::exception& e) {
3403  std::ostringstream oss;
3404  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
3405  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3406  THROW_DB_EXCEPTION(oss.str());
3407  }
3408  fillMissingBuffers(session,
3409  session_ptr->getCatalog(),
3410  import_buffers,
3411  loader->get_column_descs(),
3412  desc_id_to_column_id,
3413  num_rows,
3414  table_name,
3415  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3416  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3417  session_ptr->getCatalog(), table_name);
3418  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3419  THROW_DB_EXCEPTION(loader->getErrorMessage());
3420  }
3421 }
3422 
3423 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
3424 
3425 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3426  do { \
3427  ::arrow::Status _s = (s); \
3428  if (UNLIKELY(!_s.ok())) { \
3429  TDBException ex; \
3430  ex.error_msg = _s.ToString(); \
3431  LOG(ERROR) << s.ToString(); \
3432  throw ex; \
3433  } \
3434  } while (0)
3435 
3436 namespace {
3437 
3438 RecordBatchVector loadArrowStream(const std::string& stream) {
3439  RecordBatchVector batches;
3440  try {
3441  // TODO(wesm): Make this simpler in general, see ARROW-1600
3442  auto stream_buffer =
3443  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
3444  static_cast<int64_t>(stream.size()));
3445 
3446  arrow::io::BufferReader buf_reader(stream_buffer);
3447  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3448  ARROW_ASSIGN_OR_THROW(batch_reader,
3449  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3450 
3451  while (true) {
3452  std::shared_ptr<arrow::RecordBatch> batch;
3453  // Read batch (zero-copy) from the stream
3454  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
3455  if (batch == nullptr) {
3456  break;
3457  }
3458  batches.emplace_back(std::move(batch));
3459  }
3460  } catch (const std::exception& e) {
3461  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
3462  }
3463  return batches;
3464 }
3465 
3466 } // namespace
3467 
3468 void DBHandler::load_table_binary_arrow(const TSessionId& session,
3469  const std::string& table_name,
3470  const std::string& arrow_stream,
3471  const bool use_column_names) {
3472  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3473  auto session_ptr = stdlog.getConstSessionInfo();
3474 
3475  RecordBatchVector batches = loadArrowStream(arrow_stream);
3476  // Assuming have one batch for now
3477  if (batches.size() != 1) {
3478  THROW_DB_EXCEPTION("Expected a single Arrow record batch. Import aborted");
3479  }
3480 
3481  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3482  std::unique_ptr<import_export::Loader> loader;
3483  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3484  std::vector<std::string> column_names;
3485  if (use_column_names) {
3486  column_names = batch->schema()->field_names();
3487  }
3488  const auto execute_read_lock =
3492  auto schema_read_lock =
3493  prepare_loader_generic(*session_ptr,
3494  table_name,
3495  static_cast<size_t>(batch->num_columns()),
3496  &loader,
3497  &import_buffers,
3498  column_names,
3499  "load_table_binary_arrow");
3500 
3501  auto desc_id_to_column_id =
3502  column_ids_by_names(loader->get_column_descs(), column_names);
3503  size_t num_rows = 0;
3504  size_t col_idx = 0;
3505  try {
3506  for (auto cd : loader->get_column_descs()) {
3507  auto mapped_idx = desc_id_to_column_id[col_idx];
3508  if (mapped_idx != -1) {
3509  auto& array = *batch->column(mapped_idx);
3510  import_export::ArraySliceRange row_slice(0, array.length());
3511  num_rows = import_buffers[col_idx]->add_arrow_values(
3512  cd, array, true, row_slice, nullptr);
3513  }
3514  col_idx++;
3515  }
3516  } catch (const std::exception& e) {
3517  LOG(ERROR) << "Input exception thrown: " << e.what()
3518  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3519  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
3520  // other import paths
3521  THROW_DB_EXCEPTION(e.what());
3522  }
3523  fillMissingBuffers(session,
3524  session_ptr->getCatalog(),
3525  import_buffers,
3526  loader->get_column_descs(),
3527  desc_id_to_column_id,
3528  num_rows,
3529  table_name,
3530  false);
3531  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3532  session_ptr->getCatalog(), table_name);
3533  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3534  THROW_DB_EXCEPTION(loader->getErrorMessage());
3535  }
3536 }
3537 
3538 void DBHandler::load_table(const TSessionId& session,
3539  const std::string& table_name,
3540  const std::vector<TStringRow>& rows,
3541  const std::vector<std::string>& column_names) {
3542  try {
3543  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3544  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3545  auto session_ptr = stdlog.getConstSessionInfo();
3546 
3547  if (rows.empty()) {
3548  THROW_DB_EXCEPTION("No rows to insert");
3549  }
3550 
3551  const auto execute_read_lock =
3555  std::unique_ptr<import_export::Loader> loader;
3556  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3557  auto schema_read_lock =
3558  prepare_loader_generic(*session_ptr,
3559  table_name,
3560  static_cast<size_t>(rows.front().cols.size()),
3561  &loader,
3562  &import_buffers,
3563  column_names,
3564  "load_table");
3565 
3566  auto col_descs = loader->get_column_descs();
3567  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3568  import_export::CopyParams copy_params;
3569  size_t rows_completed = 0;
3570  for (auto const& row : rows) {
3571  size_t import_idx = 0; // index into the TStringRow being loaded
3572  size_t col_idx = 0; // index into column description vector
3573  try {
3574  size_t skip_physical_cols = 0;
3575  for (auto cd : col_descs) {
3576  if (skip_physical_cols > 0) {
3577  CHECK(cd->isGeoPhyCol);
3578  skip_physical_cols--;
3579  continue;
3580  }
3581  auto mapped_idx = desc_id_to_column_id[import_idx];
3582  if (mapped_idx != -1) {
3583  import_buffers[col_idx]->add_value(cd,
3584  row.cols[mapped_idx].str_val,
3585  row.cols[mapped_idx].is_null,
3586  copy_params);
3587  }
3588  col_idx++;
3589  if (cd->columnType.is_geometry()) {
3590  // physical geo columns will be filled separately lately
3591  skip_physical_cols = cd->columnType.get_physical_cols();
3592  col_idx += skip_physical_cols;
3593  }
3594  // Advance to the next field within the row
3595  import_idx++;
3596  }
3597  rows_completed++;
3598  } catch (const std::exception& e) {
3599  LOG(ERROR) << "Input exception thrown: " << e.what()
3600  << ". Row discarded, issue at column : " << (col_idx + 1)
3601  << " data :" << row;
3602  THROW_DB_EXCEPTION(std::string("Exception: ") + e.what());
3603  }
3604  }
3605  // do batch filling of geo columns separately
3606  if (rows.size() != 0) {
3607  const auto& row = rows[0];
3608  size_t col_idx = 0; // index into column description vector
3609  try {
3610  size_t import_idx = 0;
3611  size_t skip_physical_cols = 0;
3612  for (auto cd : col_descs) {
3613  if (skip_physical_cols > 0) {
3614  skip_physical_cols--;
3615  continue;
3616  }
3617  auto mapped_idx = desc_id_to_column_id[import_idx];
3618  col_idx++;
3619  if (cd->columnType.is_geometry()) {
3620  skip_physical_cols = cd->columnType.get_physical_cols();
3621  if (mapped_idx != -1) {
3622  fillGeoColumns(session,
3623  session_ptr->getCatalog(),
3624  import_buffers,
3625  cd,
3626  col_idx,
3627  rows_completed,
3628  table_name,
3629  false);
3630  } else {
3631  col_idx += skip_physical_cols;
3632  }
3633  }
3634  import_idx++;
3635  }
3636  } catch (const std::exception& e) {
3637  LOG(ERROR) << "Input exception thrown: " << e.what()
3638  << ". Row discarded, issue at column : " << (col_idx + 1)
3639  << " data :" << row;
3640  THROW_DB_EXCEPTION(e.what());
3641  }
3642  }
3643  fillMissingBuffers(session,
3644  session_ptr->getCatalog(),
3645  import_buffers,
3646  col_descs,
3647  desc_id_to_column_id,
3648  rows_completed,
3649  table_name,
3650  false);
3651  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3652  session_ptr->getCatalog(), table_name);
3653  if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3654  THROW_DB_EXCEPTION(loader->getErrorMessage());
3655  }
3656 
3657  } catch (const std::exception& e) {
3658  THROW_DB_EXCEPTION(std::string(e.what()));
3659  }
3660 }
3661 
3662 char DBHandler::unescape_char(std::string str) {
3663  char out = str[0];
3664  if (str.size() == 2 && str[0] == '\\') {
3665  if (str[1] == 't') {
3666  out = '\t';
3667  } else if (str[1] == 'n') {
3668  out = '\n';
3669  } else if (str[1] == '0') {
3670  out = '\0';
3671  } else if (str[1] == '\'') {
3672  out = '\'';
3673  } else if (str[1] == '\\') {
3674  out = '\\';
3675  }
3676  }
3677  return out;
3678 }
3679 
3681  import_export::CopyParams copy_params;
3682  switch (cp.has_header) {
3683  case TImportHeaderRow::AUTODETECT:
3685  break;
3686  case TImportHeaderRow::NO_HEADER:
3688  break;
3689  case TImportHeaderRow::HAS_HEADER:
3691  break;
3692  default:
3693  CHECK(false);
3694  }
3695  copy_params.quoted = cp.quoted;
3696  if (cp.delimiter.length() > 0) {
3697  copy_params.delimiter = unescape_char(cp.delimiter);
3698  } else {
3699  copy_params.delimiter = '\0';
3700  }
3701  if (cp.null_str.length() > 0) {
3702  copy_params.null_str = cp.null_str;
3703  }
3704  if (cp.quote.length() > 0) {
3705  copy_params.quote = unescape_char(cp.quote);
3706  }
3707  if (cp.escape.length() > 0) {
3708  copy_params.escape = unescape_char(cp.escape);
3709  }
3710  if (cp.line_delim.length() > 0) {
3711  copy_params.line_delim = unescape_char(cp.line_delim);
3712  }
3713  if (cp.array_delim.length() > 0) {
3714  copy_params.array_delim = unescape_char(cp.array_delim);
3715  }
3716  if (cp.array_begin.length() > 0) {
3717  copy_params.array_begin = unescape_char(cp.array_begin);
3718  }
3719  if (cp.array_end.length() > 0) {
3720  copy_params.array_end = unescape_char(cp.array_end);
3721  }
3722  if (cp.threads != 0) {
3723  copy_params.threads = cp.threads;
3724  }
3725  if (cp.s3_access_key.length() > 0) {
3726  copy_params.s3_access_key = cp.s3_access_key;
3727  }
3728  if (cp.s3_secret_key.length() > 0) {
3729  copy_params.s3_secret_key = cp.s3_secret_key;
3730  }
3731  if (cp.s3_session_token.length() > 0) {
3732  copy_params.s3_session_token = cp.s3_session_token;
3733  }
3734  if (cp.s3_region.length() > 0) {
3735  copy_params.s3_region = cp.s3_region;
3736  }
3737  if (cp.s3_endpoint.length() > 0) {
3738  copy_params.s3_endpoint = cp.s3_endpoint;
3739  }
3740 #ifdef HAVE_AWS_S3
3741  if (g_allow_s3_server_privileges && cp.s3_access_key.length() == 0 &&
3742  cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3743  const auto& server_credentials =
3744  Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3745  copy_params.s3_access_key = server_credentials.GetAWSAccessKeyId();
3746  copy_params.s3_secret_key = server_credentials.GetAWSSecretKey();
3747  copy_params.s3_session_token = server_credentials.GetSessionToken();
3748  }
3749 #endif
3750 
3751  switch (cp.source_type) {
3752  case TSourceType::DELIMITED_FILE:
3754  break;
3755  case TSourceType::GEO_FILE:
3757  break;
3758  case TSourceType::PARQUET_FILE:
3759 #ifdef ENABLE_IMPORT_PARQUET
3761  break;
3762 #else
3763  THROW_DB_EXCEPTION("Parquet not supported");
3764 #endif
3765  case TSourceType::ODBC:
3766  THROW_DB_EXCEPTION("ODBC source not supported");
3767  case TSourceType::RASTER_FILE:
3769  break;
3770  default:
3771  CHECK(false);
3772  }
3773 
3774  switch (cp.geo_coords_encoding) {
3775  case TEncodingType::GEOINT:
3776  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3777  break;
3778  case TEncodingType::NONE:
3779  copy_params.geo_coords_encoding = kENCODING_NONE;
3780  break;
3781  default:
3782  THROW_DB_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3783  std::to_string((int)cp.geo_coords_encoding));
3784  }
3785  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3786  switch (cp.geo_coords_type) {
3787  case TDatumType::GEOGRAPHY:
3788  copy_params.geo_coords_type = kGEOGRAPHY;
3789  break;
3790  case TDatumType::GEOMETRY:
3791  copy_params.geo_coords_type = kGEOMETRY;
3792  break;
3793  default:
3794  THROW_DB_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3795  std::to_string((int)cp.geo_coords_type));
3796  }
3797  switch (cp.geo_coords_srid) {
3798  case 4326:
3799  case 3857:
3800  case 900913:
3801  copy_params.geo_coords_srid = cp.geo_coords_srid;
3802  break;
3803  default:
3804  THROW_DB_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3805  std::to_string((int)cp.geo_coords_srid));
3806  }
3807  copy_params.sanitize_column_names = cp.sanitize_column_names;
3808  copy_params.geo_layer_name = cp.geo_layer_name;
3809  copy_params.geo_assign_render_groups =
3810  cp.geo_assign_render_groups && g_enable_assign_render_groups;
3811  copy_params.geo_explode_collections = cp.geo_explode_collections;
3812  copy_params.source_srid = cp.source_srid;
3813  switch (cp.raster_point_type) {
3814  case TRasterPointType::NONE:
3816  break;
3817  case TRasterPointType::AUTO:
3819  break;
3820  case TRasterPointType::SMALLINT:
3822  break;
3823  case TRasterPointType::INT:
3825  break;
3826  case TRasterPointType::FLOAT:
3828  break;
3829  case TRasterPointType::DOUBLE:
3831  break;
3832  case TRasterPointType::POINT:
3834  break;
3835  default:
3836  CHECK(false);
3837  }
3838  copy_params.raster_import_bands = cp.raster_import_bands;
3839  if (cp.raster_scanlines_per_thread < 0) {
3840  THROW_DB_EXCEPTION("Invalid raster_scanlines_per_thread in TCopyParams (" +
3841  std::to_string((int)cp.raster_scanlines_per_thread));
3842  } else {
3843  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3844  }
3845  switch (cp.raster_point_transform) {
3846  case TRasterPointTransform::NONE:
3848  break;
3849  case TRasterPointTransform::AUTO:
3851  break;
3852  case TRasterPointTransform::FILE:
3854  break;
3855  case TRasterPointTransform::WORLD:
3857  break;
3858  default:
3859  CHECK(false);
3860  }
3861  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3862  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3863  copy_params.dsn = cp.odbc_dsn;
3864  copy_params.connection_string = cp.odbc_connection_string;
3865  copy_params.sql_select = cp.odbc_sql_select;
3866  copy_params.sql_order_by = cp.odbc_sql_order_by;
3867  copy_params.username = cp.odbc_username;
3868  copy_params.password = cp.odbc_password;
3869  copy_params.credential_string = cp.odbc_credential_string;
3870  copy_params.add_metadata_columns = cp.add_metadata_columns;
3871  copy_params.trim_spaces = cp.trim_spaces;
3872  return copy_params;
3873 }
3874 
3876  TCopyParams copy_params;
3877  copy_params.delimiter = cp.delimiter;
3878  copy_params.null_str = cp.null_str;
3879  switch (cp.has_header) {
3881  copy_params.has_header = TImportHeaderRow::AUTODETECT;
3882  break;
3884  copy_params.has_header = TImportHeaderRow::NO_HEADER;
3885  break;
3887  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
3888  break;
3889  default:
3890  CHECK(false);
3891  }
3892  copy_params.quoted = cp.quoted;
3893  copy_params.quote = cp.quote;
3894  copy_params.escape = cp.escape;
3895  copy_params.line_delim = cp.line_delim;
3896  copy_params.array_delim = cp.array_delim;
3897  copy_params.array_begin = cp.array_begin;
3898  copy_params.array_end = cp.array_end;
3899  copy_params.threads = cp.threads;
3900  copy_params.s3_access_key = cp.s3_access_key;
3901  copy_params.s3_secret_key = cp.s3_secret_key;
3902  copy_params.s3_session_token = cp.s3_session_token;
3903  copy_params.s3_region = cp.s3_region;
3904  copy_params.s3_endpoint = cp.s3_endpoint;
3905  switch (cp.source_type) {
3907  copy_params.source_type = TSourceType::DELIMITED_FILE;
3908  break;
3910  copy_params.source_type = TSourceType::GEO_FILE;
3911  break;
3913  copy_params.source_type = TSourceType::PARQUET_FILE;
3914  break;
3916  copy_params.source_type = TSourceType::RASTER_FILE;
3917  break;
3919  copy_params.source_type = TSourceType::ODBC;
3920  break;
3921  default:
3922  CHECK(false);
3923  }
3924  switch (cp.geo_coords_encoding) {
3925  case kENCODING_GEOINT:
3926  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
3927  break;
3928  default:
3929  copy_params.geo_coords_encoding = TEncodingType::NONE;
3930  break;
3931  }
3932  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3933  switch (cp.geo_coords_type) {
3934  case kGEOGRAPHY:
3935  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
3936  break;
3937  case kGEOMETRY:
3938  copy_params.geo_coords_type = TDatumType::GEOMETRY;
3939  break;
3940  default:
3941  CHECK(false);
3942  }
3943  copy_params.geo_coords_srid = cp.geo_coords_srid;
3944  copy_params.sanitize_column_names = cp.sanitize_column_names;
3945  copy_params.geo_layer_name = cp.geo_layer_name;
3946  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3947  copy_params.geo_explode_collections = cp.geo_explode_collections;
3948  copy_params.source_srid = cp.source_srid;
3949  switch (cp.raster_point_type) {
3951  copy_params.raster_point_type = TRasterPointType::NONE;
3952  break;
3954  copy_params.raster_point_type = TRasterPointType::AUTO;
3955  break;
3957  copy_params.raster_point_type = TRasterPointType::SMALLINT;
3958  break;
3960  copy_params.raster_point_type = TRasterPointType::INT;
3961  break;
3963  copy_params.raster_point_type = TRasterPointType::FLOAT;
3964  break;
3966  copy_params.raster_point_type = TRasterPointType::DOUBLE;
3967  break;
3969  copy_params.raster_point_type = TRasterPointType::POINT;
3970  break;
3971  default:
3972  CHECK(false);
3973  }
3974  copy_params.raster_import_bands = cp.raster_import_bands;
3975  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3976  switch (cp.raster_point_transform) {
3978  copy_params.raster_point_transform = TRasterPointTransform::NONE;
3979  break;
3981  copy_params.raster_point_transform = TRasterPointTransform::AUTO;
3982  break;
3984  copy_params.raster_point_transform = TRasterPointTransform::FILE;
3985  break;
3987  copy_params.raster_point_transform = TRasterPointTransform::WORLD;
3988  break;
3989  default:
3990  CHECK(false);
3991  }
3992  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3993  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3994  copy_params.odbc_dsn = cp.dsn;
3995  copy_params.odbc_connection_string = cp.connection_string;
3996  copy_params.odbc_sql_select = cp.sql_select;
3997  copy_params.odbc_sql_order_by = cp.sql_order_by;
3998  copy_params.odbc_username = cp.username;
3999  copy_params.odbc_password = cp.password;
4000  copy_params.odbc_credential_string = cp.credential_string;
4001  copy_params.add_metadata_columns = cp.add_metadata_columns;
4002  copy_params.trim_spaces = cp.trim_spaces;
4003  return copy_params;
4004 }
4005 
4006 namespace {
4007 void add_vsi_network_prefix(std::string& path) {
4008  // do we support network file access?
4009  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
4010 
4011  // modify head of filename based on source location
4012  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
4013  if (!gdal_network) {
4015  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
4016  }
4017  // invoke GDAL CURL virtual file reader
4018  path = "/vsicurl/" + path;
4019  } else if (boost::istarts_with(path, "s3://")) {
4020  if (!gdal_network) {
4022  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
4023  }
4024  // invoke GDAL S3 virtual file reader
4025  boost::replace_first(path, "s3://", "/vsis3/");
4026  }
4027 }
4028 
4029 void add_vsi_geo_prefix(std::string& path) {
4030  // single gzip'd file (not an archive)?
4031  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
4032  path = "/vsigzip/" + path;
4033  }
4034 }
4035 
4036 void add_vsi_archive_prefix(std::string& path) {
4037  // check for compressed file or file bundle
4038  if (boost::iends_with(path, ".zip")) {
4039  // zip archive
4040  path = "/vsizip/" + path;
4041  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
4042  boost::iends_with(path, ".tar.gz")) {
4043  // tar archive (compressed or uncompressed)
4044  path = "/vsitar/" + path;
4045  }
4046 }
4047 
4048 std::string remove_vsi_prefixes(const std::string& path_in) {
4049  std::string path(path_in);
4050 
4051  // these will be first
4052  if (boost::istarts_with(path, "/vsizip/")) {
4053  boost::replace_first(path, "/vsizip/", "");
4054  } else if (boost::istarts_with(path, "/vsitar/")) {
4055  boost::replace_first(path, "/vsitar/", "");
4056  } else if (boost::istarts_with(path, "/vsigzip/")) {
4057  boost::replace_first(path, "/vsigzip/", "");
4058  }
4059 
4060  // then these
4061  if (boost::istarts_with(path, "/vsicurl/")) {
4062  boost::replace_first(path, "/vsicurl/", "");
4063  } else if (boost::istarts_with(path, "/vsis3/")) {
4064  boost::replace_first(path, "/vsis3/", "s3://");
4065  }
4066 
4067  return path;
4068 }
4069 
4070 bool path_is_relative(const std::string& path) {
4071  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
4072  boost::istarts_with(path, "https://")) {
4073  return false;
4074  }
4075  return !boost::filesystem::path(path).is_absolute();
4076 }
4077 
4078 bool path_has_valid_filename(const std::string& path) {
4079  auto filename = boost::filesystem::path(path).filename().string();
4080  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
4081  return false;
4082  }
4083  return true;
4084 }
4085 
4086 bool is_a_supported_geo_file(const std::string& path) {
4087  if (!path_has_valid_filename(path)) {
4088  return false;
4089  }
4090  // this is now just for files that we want to recognize
4091  // as geo when inside an archive (see below)
4092  // @TODO(se) make this more flexible?
4093  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
4094  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
4095  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
4096  boost::iends_with(path, ".gdb.zip") || boost::iends_with(path, ".fgb")) {
4097  return true;
4098  }
4099  return false;
4100 }
4101 
4102 bool is_a_supported_archive_file(const std::string& path) {
4103  if (!path_has_valid_filename(path)) {
4104  return false;
4105  }
4106  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
4107  return true;
4108  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
4109  boost::iends_with(path, ".tar.gz")) {
4110  return true;
4111  }
4112  return false;
4113 }
4114 
4115 std::string find_first_geo_file_in_archive(const std::string& archive_path,
4116  const import_export::CopyParams& copy_params) {
4117  // get the recursive list of all files in the archive
4118  std::vector<std::string> files =
4119  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
4120 
4121  // report the list
4122  LOG(INFO) << "Found " << files.size() << " files in Archive "
4123  << remove_vsi_prefixes(archive_path);
4124  for (const auto& file : files) {
4125  LOG(INFO) << " " << file;
4126  }
4127 
4128  // scan the list for the first candidate file
4129  bool found_suitable_file = false;
4130  std::string file_name;
4131  for (const auto& file : files) {
4132  if (is_a_supported_geo_file(file)) {
4133  file_name = file;
4134  found_suitable_file = true;
4135  break;
4136  }
4137  }
4138 
4139  // if we didn't find anything
4140  if (!found_suitable_file) {
4141  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
4142  remove_vsi_prefixes(archive_path);
4143  file_name.clear();
4144  }
4145 
4146  // done
4147  return file_name;
4148 }
4149 
4150 bool is_local_file(const std::string& file_path) {
4151  return (!boost::istarts_with(file_path, "s3://") &&
4152  !boost::istarts_with(file_path, "http://") &&
4153  !boost::istarts_with(file_path, "https://"));
4154 }
4155 
4156 void validate_import_file_path_if_local(const std::string& file_path) {
4157  if (is_local_file(file_path)) {
4159  file_path, ddl_utils::DataTransferType::IMPORT, true);
4160  }
4161 }
4162 } // namespace
4163 
4164 void DBHandler::detect_column_types(TDetectResult& _return,
4165  const TSessionId& session,
4166  const std::string& file_name_in,
4167  const TCopyParams& cp) {
4168  auto stdlog = STDLOG(get_session_ptr(session));
4169  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4170  check_read_only("detect_column_types");
4171 
4172  bool is_raster = false;
4173  boost::filesystem::path file_path;
4175  if (copy_params.source_type != import_export::SourceType::kOdbc) {
4176  std::string file_name{file_name_in};
4177  if (path_is_relative(file_name)) {
4178  // assume relative paths are relative to data_path / import / <session>
4179  auto temp_file_path = import_path_ / picosha2::hash256_hex_string(session) /
4180  boost::filesystem::path(file_name).filename();
4181  file_name = temp_file_path.string();
4182  }
4184 
4185  if ((copy_params.source_type == import_export::SourceType::kGeoFile ||
4187  is_local_file(file_name)) {
4188  const shared::FilePathOptions options{copy_params.regex_path_filter,
4189  copy_params.file_sort_order_by,
4190  copy_params.file_sort_regex};
4191  auto file_paths = shared::local_glob_filter_sort_files(file_name, options, false);
4192  // For geo and raster detect, pick the first file, if multiple files are provided
4193  // (e.g. through file globbing).
4194  CHECK(!file_paths.empty());
4195  file_name = file_paths[0];
4196  }
4197 
4198  // if it's a geo or raster import, handle alternative paths (S3, HTTP, archive etc.)
4199  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
4200  if (is_a_supported_archive_file(file_name)) {
4201  // find the archive file
4202  add_vsi_network_prefix(file_name);
4203  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4204  THROW_DB_EXCEPTION("Archive does not exist: " + file_name_in);
4205  }
4206  // find geo file in archive
4207  add_vsi_archive_prefix(file_name);
4208  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4209  // prepare to detect that geo file
4210  if (geo_file.size()) {
4211  file_name = file_name + std::string("/") + geo_file;
4212  }
4213  } else {
4214  // prepare to detect geo file directly
4215  add_vsi_network_prefix(file_name);
4216  add_vsi_geo_prefix(file_name);
4217  }
4218  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
4219  // prepare to detect raster file directly
4220  add_vsi_network_prefix(file_name);
4221  add_vsi_geo_prefix(file_name);
4222  is_raster = true;
4223  }
4224 
4225  file_path = boost::filesystem::path(file_name);
4226  // can be a s3 url
4227  if (!boost::istarts_with(file_name, "s3://")) {
4228  if (!boost::filesystem::path(file_name).is_absolute()) {
4229  file_path = import_path_ / picosha2::hash256_hex_string(session) /
4230  boost::filesystem::path(file_name).filename();
4231  file_name = file_path.string();
4232  }
4233 
4234  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4236  // check for geo or raster file
4237  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4238  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4239  "\" does not exist.")
4240  }
4241  } else {
4242  // check for regular file
4243  if (!shared::file_or_glob_path_exists(file_path.string())) {
4244  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4245  "\" does not exist.");
4246  }
4247  }
4248  }
4249  }
4250 
4251  try {
4253 #ifdef ENABLE_IMPORT_PARQUET
4255 #endif
4256  ) {
4257  import_export::Detector detector(file_path, copy_params);
4258  auto best_types = detector.getBestColumnTypes();
4259  std::vector<std::string> headers = detector.get_headers();
4260  copy_params = detector.get_copy_params();
4261 
4262  _return.copy_params = copyparams_to_thrift(copy_params);
4263  _return.row_set.row_desc.resize(best_types.size());
4264  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
4265  TColumnType col;
4266  auto& ti = best_types[col_idx];
4267  col.col_type.precision = ti.get_precision();
4268  col.col_type.scale = ti.get_scale();
4269  col.col_type.comp_param = ti.get_comp_param();
4270  if (ti.is_geometry()) {
4271  // set this so encoding_to_thrift does the right thing
4272  ti.set_compression(copy_params.geo_coords_encoding);
4273  // fill in these directly
4274  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
4275  col.col_type.scale = copy_params.geo_coords_srid;
4276  col.col_type.comp_param = copy_params.geo_coords_comp_param;
4277  }
4278  col.col_type.type = type_to_thrift(ti);
4279  col.col_type.encoding = encoding_to_thrift(ti);
4280  if (ti.is_array()) {
4281  col.col_type.is_array = true;
4282  }
4283  if (copy_params.sanitize_column_names) {
4284  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
4285  } else {
4286  col.col_name = headers[col_idx];
4287  }
4288  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
4289  _return.row_set.row_desc[col_idx] = col;
4290  }
4291  auto sample_data =
4293 
4294  TRow sample_row;
4295  for (auto row : sample_data) {
4296  sample_row.cols.clear();
4297  for (const auto& s : row) {
4298  TDatum td;
4299  td.val.str_val = s;
4300  td.is_null = s.empty();
4301  sample_row.cols.push_back(td);
4302  }
4303  _return.row_set.rows.push_back(sample_row);
4304  }
4305  } else if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4307  check_geospatial_files(file_path, copy_params);
4308  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
4309  file_path.string(), is_raster, Geospatial::kGeoColumnName, copy_params);
4310  for (auto cd : cds) {
4311  if (copy_params.sanitize_column_names) {
4312  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
4313  }
4314  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
4315  }
4316  if (!is_raster) {
4317  // @TODO(se) support for raster?
4318  std::map<std::string, std::vector<std::string>> sample_data;
4320  file_path.string(),
4322  sample_data,
4324  copy_params);
4325  if (sample_data.size() > 0) {
4326  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4327  TRow sample_row;
4328  for (auto cd : cds) {
4329  TDatum td;
4330  td.val.str_val = sample_data[cd.sourceName].at(i);
4331  td.is_null = td.val.str_val.empty();
4332  sample_row.cols.push_back(td);
4333  }
4334  _return.row_set.rows.push_back(sample_row);
4335  }
4336  }
4337  }
4338  _return.copy_params = copyparams_to_thrift(copy_params);
4339  }
4340  } catch (const std::exception& e) {
4341  THROW_DB_EXCEPTION("detect_column_types error: " + std::string(e.what()));
4342  }
4343 }
4344 
4345 void DBHandler::render_vega(TRenderResult& _return,
4346  const TSessionId& session,
4347  const int64_t widget_id,
4348  const std::string& vega_json,
4349  const int compression_level,
4350  const std::string& nonce) {
4351  auto stdlog = STDLOG(get_session_ptr(session),
4352  "widget_id",
4353  widget_id,
4354  "compression_level",
4355  compression_level,
4356  "vega_json",
4357  vega_json,
4358  "nonce",
4359  nonce);
4360  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4361  stdlog.appendNameValuePairs("nonce", nonce);
4362  auto session_ptr = stdlog.getConstSessionInfo();
4363  if (!render_handler_) {
4364  THROW_DB_EXCEPTION("Backend rendering is disabled.");
4365  }
4366 
4367  // cast away const-ness of incoming Thrift string ref
4368  // to allow it to be passed down as an r-value and
4369  // ultimately std::moved into the RenderSession
4370  auto& non_const_vega_json = const_cast<std::string&>(vega_json);
4371 
4372  _return.total_time_ms = measure<>::execution([&]() {
4373  try {
4374  render_handler_->render_vega(_return,
4375  stdlog.getSessionInfo(),
4376  widget_id,
4377  std::move(non_const_vega_json),
4378  compression_level,
4379  nonce);
4380  } catch (std::exception& e) {
4381  THROW_DB_EXCEPTION(e.what());
4382  }
4383  });
4384 }
4385 
4387  int32_t dashboard_id,
4388  AccessPrivileges requestedPermissions) {
4389  DBObject object(dashboard_id, DashboardDBObjectType);
4390  auto& catalog = session_info.getCatalog();
4391  auto& user = session_info.get_currentUser();
4392  object.loadKey(catalog);
4393  object.setPrivileges(requestedPermissions);
4394  std::vector<DBObject> privs = {object};
4395  return SysCatalog::instance().checkPrivileges(user, privs);
4396 }
4397 
4398 // custom expressions
4399 namespace {
4402 
4403 std::unique_ptr<Catalog_Namespace::CustomExpression> create_custom_expr_from_thrift_obj(
4404  const TCustomExpression& t_custom_expr,
4405  const Catalog& catalog) {
4406  if (t_custom_expr.data_source_name.empty()) {
4407  THROW_DB_EXCEPTION("Custom expression data source name cannot be empty.")
4408  }
4409  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4410  << "Unexpected data source type: "
4411  << static_cast<int>(t_custom_expr.data_source_type);
4412  auto td = catalog.getMetadataForTable(t_custom_expr.data_source_name, false);
4413  if (!td) {
4414  THROW_DB_EXCEPTION("Custom expression references a table \"" +
4415  t_custom_expr.data_source_name + "\" that does not exist.")
4416  }
4417  DataSourceType data_source_type = DataSourceType::TABLE;
4418  return std::make_unique<CustomExpression>(
4419  t_custom_expr.name, t_custom_expr.expression_json, data_source_type, td->tableId);
4420 }
4421 
4422 TCustomExpression create_thrift_obj_from_custom_expr(const CustomExpression& custom_expr,
4423  const Catalog& catalog) {
4424  TCustomExpression t_custom_expr;
4425  t_custom_expr.id = custom_expr.id;
4426  t_custom_expr.name = custom_expr.name;
4427  t_custom_expr.expression_json = custom_expr.expression_json;
4428  t_custom_expr.data_source_id = custom_expr.data_source_id;
4429  t_custom_expr.is_deleted = custom_expr.is_deleted;
4430  CHECK(custom_expr.data_source_type == DataSourceType::TABLE)
4431  << "Unexpected data source type: "
4432  << static_cast<int>(custom_expr.data_source_type);
4433  t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4434  auto td = catalog.getMetadataForTable(custom_expr.data_source_id, false);
4435  if (td) {
4436  t_custom_expr.data_source_name = td->tableName;
4437  } else {
4438  LOG(WARNING)
4439  << "Custom expression references a deleted data source. Custom expression id: "
4440  << custom_expr.id << ", name: " << custom_expr.name;
4441  }
4442  return t_custom_expr;
4443 }
4444 } // namespace
4445 
4446 int32_t DBHandler::create_custom_expression(const TSessionId& session,
4447  const TCustomExpression& t_custom_expr) {
4448  auto stdlog = STDLOG(get_session_ptr(session));
4449  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4450  check_read_only("create_custom_expression");
4451 
4452  auto session_ptr = stdlog.getConstSessionInfo();
4453  if (!session_ptr->get_currentUser().isSuper) {
4454  THROW_DB_EXCEPTION("Custom expressions can only be created by super users.")
4455  }
4456  auto& catalog = session_ptr->getCatalog();
4458  return catalog.createCustomExpression(
4459  create_custom_expr_from_thrift_obj(t_custom_expr, catalog));
4460 }
4461 
4462 void DBHandler::get_custom_expressions(std::vector<TCustomExpression>& _return,
4463  const TSessionId& session) {
4464  auto stdlog = STDLOG(get_session_ptr(session));
4465  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4466 
4467  auto session_ptr = stdlog.getConstSessionInfo();
4468  auto& catalog = session_ptr->getCatalog();
4470  auto custom_expressions =
4471  catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4472  for (const auto& custom_expression : custom_expressions) {
4473  _return.emplace_back(create_thrift_obj_from_custom_expr(*custom_expression, catalog));
4474  }
4475 }
4476 
4477 void DBHandler::update_custom_expression(const TSessionId& session,
4478  const int32_t id,
4479  const std::string& expression_json) {
4480  auto stdlog = STDLOG(get_session_ptr(session));
4481  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4482  check_read_only("update_custom_expression");
4483 
4484  auto session_ptr = stdlog.getConstSessionInfo();
4485  if (!session_ptr->get_currentUser().isSuper) {
4486  THROW_DB_EXCEPTION("Custom expressions can only be updated by super users.")
4487  }
4488  auto& catalog = session_ptr->getCatalog();
4490  catalog.updateCustomExpression(id, expression_json);
4491 }
4492 
4494  const TSessionId& session,
4495  const std::vector<int32_t>& custom_expression_ids,
4496  const bool do_soft_delete) {
4497  auto stdlog = STDLOG(get_session_ptr(session));
4498  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4499  check_read_only("delete_custom_expressions");
4500 
4501  auto session_ptr = stdlog.getConstSessionInfo();
4502  if (!session_ptr->get_currentUser().isSuper) {
4503  THROW_DB_EXCEPTION("Custom expressions can only be deleted by super users.")
4504  }
4505  auto& catalog = session_ptr->getCatalog();
4507  catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4508 }
4509 
4510 // dashboards
4511 void DBHandler::get_dashboard(TDashboard& dashboard,
4512  const TSessionId& session,
4513  const int32_t dashboard_id) {
4514  auto stdlog = STDLOG(get_session_ptr(session));
4515  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4516  auto session_ptr = stdlog.getConstSessionInfo();
4517  auto const& cat = session_ptr->getCatalog();
4519  auto dash = cat.getMetadataForDashboard(dashboard_id);
4520  if (!dash) {
4521  THROW_DB_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
4522  " doesn't exist");
4523  }
4525  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4526  THROW_DB_EXCEPTION("User has no view privileges for the dashboard with id " +
4527  std::to_string(dashboard_id));
4528  }
4529  user_meta.userName = "";
4530  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4531  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
4532 }
4533 
4534 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
4535  const TSessionId& session) {
4536  auto stdlog = STDLOG(get_session_ptr(session));
4537  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4538  auto session_ptr = stdlog.getConstSessionInfo();
4539  auto const& cat = session_ptr->getCatalog();
4541  const auto dashes = cat.getAllDashboardsMetadata();
4542  user_meta.userName = "";
4543  for (const auto dash : dashes) {
4545  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4546  // dashboardState is intentionally not populated here
4547  // for payload reasons
4548  // use get_dashboard call to get state
4549  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
4550  }
4551  }
4552 }
4553 
4555  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4557  const DashboardDescriptor* dash,
4558  const bool populate_state) {
4559  auto const& cat = session_ptr->getCatalog();
4560  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4561  auto objects_list = SysCatalog::instance().getMetadataForObject(
4562  cat.getCurrentDB().dbId,
4563  static_cast<int>(DBObjectType::DashboardDBObjectType),
4564  dash->dashboardId);
4565  TDashboard dashboard;
4566  dashboard.dashboard_name = dash->dashboardName;
4567  if (populate_state) {
4568  dashboard.dashboard_state = dash->dashboardState;
4569  }
4570  dashboard.image_hash = dash->imageHash;
4571  dashboard.update_time = dash->updateTime;
4572  dashboard.dashboard_metadata = dash->dashboardMetadata;
4573  dashboard.dashboard_id = dash->dashboardId;
4574  dashboard.dashboard_owner = dash->user;
4575  TDashboardPermissions perms;
4576  // Super user has all permissions.
4577  if (session_ptr->get_currentUser().isSuper) {
4578  perms.create_ = true;
4579  perms.delete_ = true;
4580  perms.edit_ = true;
4581  perms.view_ = true;
4582  } else {
4583  // Collect all grants on current user
4584  // add them to the permissions.
4585  auto obj_to_find =
4586  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
4587  obj_to_find.loadKey(session_ptr->getCatalog());
4588  std::vector<std::string> grantees =
4589  SysCatalog::instance().getRoles(true,
4590  session_ptr->get_currentUser().isSuper,
4591  session_ptr->get_currentUser().userName);
4592  for (const auto& grantee : grantees) {
4593  DBObject* object_found;
4594  auto* gr = SysCatalog::instance().getGrantee(grantee);
4595  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
4596  const auto obj_privs = object_found->getPrivileges();
4597  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4598  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4599  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4600  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4601  }
4602  }
4603  }
4604  dashboard.dashboard_permissions = perms;
4605  if (objects_list.empty() ||
4606  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
4607  dashboard.is_dash_shared = false;
4608  } else {
4609  dashboard.is_dash_shared = true;
4610  }
4611  return dashboard;
4612 }
4613 
4614 namespace dbhandler {
4615 bool is_info_schema_db(const std::string& db_name) {
4616  return (db_name == shared::kInfoSchemaDbName &&
4617  SysCatalog::instance().hasExecutedMigration(shared::kInfoSchemaMigrationName));
4618 }
4619 
4620 void check_not_info_schema_db(const std::string& db_name, bool throw_db_exception) {
4621  if (is_info_schema_db(db_name)) {
4622  std::string error_message{"Write requests/queries are not allowed in the " +
4623  shared::kInfoSchemaDbName + " database."};
4624  if (throw_db_exception) {
4625  THROW_DB_EXCEPTION(error_message)
4626  } else {
4627  throw std::runtime_error(error_message);
4628  }
4629  }
4630 }
4631 } // namespace dbhandler
4632 
4633 int32_t DBHandler::create_dashboard(const TSessionId& session,
4634  const std::string& dashboard_name,
4635  const std::string& dashboard_state,
4636  const std::string& image_hash,
4637  const std::string& dashboard_metadata) {
4638  auto stdlog = STDLOG(get_session_ptr(session));
4639  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4640  auto session_ptr = stdlog.getConstSessionInfo();
4641  CHECK(session_ptr);
4642  check_read_only("create_dashboard");
4643  auto& cat = session_ptr->getCatalog();
4646  }
4647 
4648  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
4650  THROW_DB_EXCEPTION("Not enough privileges to create a dashboard.");
4651  }
4652 
4653  if (dashboard_exists(cat, session_ptr->get_currentUser().userId, dashboard_name)) {
4654  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4655  }
4656 
4658  dd.dashboardName = dashboard_name;
4659  dd.dashboardState = dashboard_state;
4660  dd.imageHash = image_hash;
4661  dd.dashboardMetadata = dashboard_metadata;
4662  dd.userId = session_ptr->get_currentUser().userId;
4663  dd.user = session_ptr->get_currentUser().userName;
4664 
4665  try {
4666  auto id = cat.createDashboard(dd);
4667  // TODO: transactionally unsafe
4668  SysCatalog::instance().createDBObject(
4669  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
4670  return id;
4671  } catch (const std::exception& e) {
4672  THROW_DB_EXCEPTION(e.what());
4673  }
4674 }
4675 
4676 void DBHandler::replace_dashboard(const TSessionId& session,
4677  const int32_t dashboard_id,
4678  const std::string& dashboard_name,
4679  const std::string& dashboard_owner,
4680  const std::string& dashboard_state,
4681  const std::string& image_hash,
4682  const std::string& dashboard_metadata) {
4683  auto stdlog = STDLOG(get_session_ptr(session));
4684  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4685  auto session_ptr = stdlog.getConstSessionInfo();
4686  CHECK(session_ptr);
4687  check_read_only("replace_dashboard");
4688  auto& cat = session_ptr->getCatalog();
4691  }
4692 
4694  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
4695  THROW_DB_EXCEPTION("Not enough privileges to replace a dashboard.");
4696  }
4697 
4698  if (auto dash = cat.getMetadataForDashboard(
4699  std::to_string(session_ptr->get_currentUser().userId), dashboard_name)) {
4700  if (dash->dashboardId != dashboard_id) {
4701  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4702  }
4703  }
4704 
4706  dd.dashboardName = dashboard_name;
4707  dd.dashboardState = dashboard_state;
4708  dd.imageHash = image_hash;
4709  dd.dashboardMetadata = dashboard_metadata;
4711  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
4712  THROW_DB_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
4713  " does not exist");
4714  }
4715  dd.userId = user.userId;
4716  dd.user = dashboard_owner;
4717  dd.dashboardId = dashboard_id;
4718 
4719  try {
4720  cat.replaceDashboard(dd);
4721  } catch (const std::exception& e) {
4722  THROW_DB_EXCEPTION(e.what());
4723  }
4724 }
4725 
4726 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
4727  delete_dashboards(session, {dashboard_id});
4728 }
4729 
4730 void DBHandler::delete_dashboards(const TSessionId& session,
4731  const std::vector<int32_t>& dashboard_ids) {
4732  auto stdlog = STDLOG(get_session_ptr(session));
4733  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4734  auto session_ptr = stdlog.getConstSessionInfo();
4735  check_read_only("delete_dashboards");
4736  auto& cat = session_ptr->getCatalog();
4739  }
4740  // Checks will be performed in catalog
4741  try {
4742  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4743  } catch (const std::exception& e) {
4744  THROW_DB_EXCEPTION(e.what());
4745  }
4746 }
4747 
4748 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
4749  int32_t dashboard_id,
4750  std::vector<std::string> groups) {
4751  const auto session_info = get_session_copy(session);
4752  auto& cat = session_info.getCatalog();
4753  auto dash = cat.getMetadataForDashboard(dashboard_id);
4754  if (!dash) {
4755  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4756  " does not exist");
4757  } else if (session_info.get_currentUser().userId != dash->userId &&
4758  !session_info.get_currentUser().isSuper) {
4759  throw std::runtime_error(
4760  "User should be either owner of dashboard or super user to share/unshare it");
4761  }
4762  std::vector<std::string> valid_groups;
4764  for (auto& group : groups) {
4765  user_meta.isSuper = false; // initialize default flag
4766  if (!SysCatalog::instance().getGrantee(group)) {
4767  THROW_DB_EXCEPTION("User/Role " + group + " does not exist");
4768  } else if (!user_meta.isSuper) {
4769  valid_groups.push_back(group);
4770  }
4771  }
4772  return valid_groups;
4773 }
4774 
4775 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
4776  for (auto const& group : groups) {
4777  if (!SysCatalog::instance().getGrantee(group)) {
4778  THROW_DB_EXCEPTION("User/Role '" + group + "' does not exist");
4779  }
4780  }
4781 }
4782 
4784  const Catalog_Namespace::SessionInfo& session_info,
4785  const std::vector<int32_t>& dashboard_ids) {
4786  auto& cat = session_info.getCatalog();
4787  std::map<std::string, std::list<int32_t>> errors;
4788  for (auto const& dashboard_id : dashboard_ids) {
4789  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
4790  if (!dashboard) {
4791  errors["Dashboard id does not exist"].push_back(dashboard_id);
4792  } else if (session_info.get_currentUser().userId != dashboard->userId &&
4793  !session_info.get_currentUser().isSuper) {
4794  errors["User should be either owner of dashboard or super user to share/unshare it"]
4795  .push_back(dashboard_id);
4796  }
4797  }
4798  if (!errors.empty()) {
4799  std::stringstream error_stream;
4800  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
4801  for (const auto& [error, id_list] : errors) {
4802  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
4803  }
4804  THROW_DB_EXCEPTION(error_stream.str());
4805  }
4806 }
4807 
4808 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
4809  const std::vector<int32_t>& dashboard_ids,
4810  const std::vector<std::string>& groups,
4811  const TDashboardPermissions& permissions,
4812  const bool do_share) {
4813  auto stdlog = STDLOG(get_session_ptr(session));
4814  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4815  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
4816  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
4817  !permissions.view_) {
4818  THROW_DB_EXCEPTION("At least one privilege should be assigned for " +
4819  std::string(do_share ? "grants" : "revokes"));
4820  }
4821  auto session_ptr = stdlog.getConstSessionInfo();
4822  auto const& catalog = session_ptr->getCatalog();
4823  auto& sys_catalog = SysCatalog::instance();
4824  validateGroups(groups);
4825  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
4826  std::vector<DBObject> batch_objects;
4827  for (auto const& dashboard_id : dashboard_ids) {
4828  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
4829  AccessPrivileges privs;
4830  if (permissions.delete_) {
4832  }
4833  if (permissions.create_) {
4835  }
4836  if (permissions.edit_) {
4838  }
4839  if (permissions.view_) {
4841  }
4842  object.setPrivileges(privs);
4843  batch_objects.push_back(object);
4844  }
4845  if (do_share) {
4846  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4847  } else {
4848  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4849  }
4850 }
4851 
4852 void DBHandler::share_dashboards(const TSessionId& session,
4853  const std::vector<int32_t>& dashboard_ids,
4854  const std::vector<std::string>& groups,
4855  const TDashboardPermissions& permissions) {
4856  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
4857 }
4858 
4859 // NOOP: Grants not available for objects as of now
4860 void DBHandler::share_dashboard(const TSessionId& session,
4861  const int32_t dashboard_id,
4862  const std::vector<std::string>& groups,
4863  const std::vector<std::string>& objects,
4864  const TDashboardPermissions& permissions,
4865  const bool grant_role = false) {
4866  share_dashboards(session, {dashboard_id}, groups, permissions);
4867 }
4868 
4869 void DBHandler::unshare_dashboards(const TSessionId& session,
4870  const std::vector<int32_t>& dashboard_ids,
4871  const std::vector<std::string>& groups,
4872  const TDashboardPermissions& permissions) {
4873  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
4874 }
4875 
4876 void DBHandler::unshare_dashboard(const TSessionId& session,
4877  const int32_t dashboard_id,
4878  const std::vector<std::string>& groups,
4879  const std::vector<std::string>& objects,
4880  const TDashboardPermissions& permissions) {
4881  unshare_dashboards(session, {dashboard_id}, groups, permissions);
4882 }
4883 
4885  std::vector<TDashboardGrantees>& dashboard_grantees,
4886  const TSessionId& session,
4887  const int32_t dashboard_id) {
4888  auto stdlog = STDLOG(get_session_ptr(session));
4889  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4890  auto session_ptr = stdlog.getConstSessionInfo();
4891  auto const& cat = session_ptr->getCatalog();
4893  auto dash = cat.getMetadataForDashboard(dashboard_id);
4894  if (!dash) {
4895  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4896  " does not exist");
4897  } else if (session_ptr->get_currentUser().userId != dash->userId &&
4898  !session_ptr->get_currentUser().isSuper) {
4900  "User should be either owner of dashboard or super user to access grantees");
4901  }
4902  std::vector<ObjectRoleDescriptor*> objectsList;
4903  objectsList = SysCatalog::instance().getMetadataForObject(
4904  cat.getCurrentDB().dbId,
4905  static_cast<int>(DBObjectType::DashboardDBObjectType),
4906  dashboard_id); // By default objecttypecan be only dashabaords
4907  user_meta.userId = -1;
4908  user_meta.userName = "";
4909  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4910  for (auto object : objectsList) {
4911  if (user_meta.userName == object->roleName) {
4912  // Mask owner
4913  continue;
4914  }
4915  TDashboardGrantees grantee;
4916  TDashboardPermissions perm;
4917  grantee.name = object->roleName;
4918  grantee.is_user = object->roleType;
4919  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4920  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4921  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4922  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4923  grantee.permissions = perm;
4924  dashboard_grantees.push_back(grantee);
4925  }
4926 }
4927 
4928 void DBHandler::create_link(std::string& _return,
4929  const TSessionId& session,
4930  const std::string& view_state,
4931  const std::string& view_metadata) {
4932  auto stdlog = STDLOG(get_session_ptr(session));
4933  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4934  auto session_ptr = stdlog.getConstSessionInfo();
4935  // check_read_only("create_link");
4936  auto& cat = session_ptr->getCatalog();
4937 
4938  LinkDescriptor ld;
4939  ld.userId = session_ptr->get_currentUser().userId;
4940  ld.viewState = view_state;
4941  ld.viewMetadata = view_metadata;
4942 
4943  try {
4944  _return = cat.createLink(ld, 6);
4945  } catch (const std::exception& e) {
4946  THROW_DB_EXCEPTION(e.what());
4947  }
4948 }
4949 
4951  const std::string& name,
4952  const bool is_array) {
4953  TColumnType ct;
4954  ct.col_name = name;
4955  ct.col_type.type = type;
4956  ct.col_type.is_array = is_array;
4957  return ct;
4958 }
4959 
4960 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
4961  const import_export::CopyParams& copy_params) {
4962  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
4963  if (std::find(shp_ext.begin(),
4964  shp_ext.end(),
4965  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
4966  shp_ext.end()) {
4967  for (auto ext : shp_ext) {
4968  auto aux_file = file_path;
4970  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
4971  copy_params) &&
4973  aux_file.replace_extension(ext).string(), copy_params)) {
4974  throw std::runtime_error("required file for shapefile does not exist: " +
4975  aux_file.filename().string());
4976  }
4977  }
4978  }
4979 }
4980 
4981 void DBHandler::create_table(const TSessionId& session,
4982  const std::string& table_name,
4983  const TRowDescriptor& rd,
4984  const TCreateParams& create_params) {
4985  // @TODO(se) remove file_type which is unused
4986  auto stdlog = STDLOG("table_name", table_name);
4987  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4988  check_read_only("create_table");
4989 
4990  if (ImportHelpers::is_reserved_name(table_name)) {
4991  THROW_DB_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
4992  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
4993  THROW_DB_EXCEPTION("Invalid characters in table name: " + table_name);
4994  }
4995 
4996  auto rds = rd;
4997 
4998  std::string stmt{"CREATE TABLE " + table_name};
4999  std::vector<std::string> col_stmts;
5000 
5001  for (auto col : rds) {
5002  if (ImportHelpers::is_reserved_name(col.col_name)) {
5003  THROW_DB_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
5004  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
5005  THROW_DB_EXCEPTION("Invalid characters in column name: " + col.col_name);
5006  }
5007  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
5008  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
5009  THROW_DB_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
5010  " for column: " + col.col_name);
5011  }
5012 
5013  if (col.col_type.type == TDatumType::DECIMAL) {
5014  // if no precision or scale passed in set to default 14,7
5015  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
5016  col.col_type.precision = 14;
5017  col.col_type.scale = 7;
5018  }
5019  }
5020 
5021  std::string col_stmt;
5022  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
5023  if (col.__isset.default_value) {
5024  col_stmt.append(" DEFAULT " + col.default_value);
5025  }
5026 
5027  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
5028  // `nullable` argument, leading this to default to false. Uncomment for v2.
5029  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
5030 
5031  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
5032  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
5033  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
5034  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
5035  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT ||
5036  thrift_to_encoding(col.col_type.encoding) == kENCODING_DATE_IN_DAYS) {
5037  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
5038  }
5039  } else if (col.col_type.type == TDatumType::STR) {
5040  // non DICT encoded strings
5041  col_stmt.append(" ENCODING NONE");
5042  } else if (col.col_type.type == TDatumType::POINT ||
5043  col.col_type.type == TDatumType::LINESTRING ||
5044  col.col_type.type == TDatumType::POLYGON ||
5045  col.col_type.type == TDatumType::MULTIPOLYGON) {
5046  // non encoded compressable geo
5047  if (col.col_type.scale == 4326) {
5048  col_stmt.append(" ENCODING NONE");
5049  }
5050  }
5051  col_stmts.push_back(col_stmt);
5052  }
5053 
5054  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
5055 
5056  if (create_params.is_replicated) {
5057  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
5058  }
5059 
5060  stmt.append(";");
5061 
5062  TQueryResult ret;
5063  sql_execute(ret, session, stmt, true, "", -1, -1);
5064 }
5065 
5066 void DBHandler::import_table(const TSessionId& session,
5067  const std::string& table_name,
5068  const std::string& file_name_in,
5069  const TCopyParams& cp) {
5070  try {
5071  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
5072  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5073  auto session_ptr = stdlog.getConstSessionInfo();
5074  check_read_only("import_table");
5075  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
5076 
5077  const auto execute_read_lock =
5081  auto& cat = session_ptr->getCatalog();
5083  auto start_time = ::toString(std::chrono::system_clock::now());
5085  executor->enrollQuerySession(session,
5086  "IMPORT_TABLE",
5087  start_time,
5089  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5090  }
5091 
5092  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
5093  // reset the runtime query interrupt status
5095  executor->clearQuerySessionStatus(session, start_time);
5096  }
5097  };
5098  const auto td_with_lock =
5100  cat, table_name);
5101  const auto td = td_with_lock();
5102  CHECK(td);
5103  check_table_load_privileges(*session_ptr, table_name);
5104 
5105  std::string copy_from_source;
5107  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5108  copy_from_source = copy_params.sql_select;
5109  } else {
5110  std::string file_name{file_name_in};
5111  auto file_path = boost::filesystem::path(file_name);
5112  if (!boost::istarts_with(file_name, "s3://")) {
5113  if (!boost::filesystem::path(file_name).is_absolute()) {
5114  file_path = import_path_ / picosha2::hash256_hex_string(session) /
5115  boost::filesystem::path(file_name).filename();
5116  file_name = file_path.string();
5117  }
5118  if (!shared::file_or_glob_path_exists(file_path.string())) {
5119  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
5120  "\" does not exist.");
5121  }
5122  }
5124 
5125  // TODO(andrew): add delimiter detection to Importer
5126  if (copy_params.delimiter == '\0') {
5127  copy_params.delimiter = ',';
5128  if (boost::filesystem::extension(file_path) == ".tsv") {
5129  copy_params.delimiter = '\t';
5130  }
5131  }
5132  copy_from_source = file_path.string();
5133  }
5134 
5135  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
5136  session_ptr->getCatalog(), table_name);
5137  std::unique_ptr<import_export::AbstractImporter> importer;
5138  if (leaf_aggregator_.leafCount() > 0) {
5139  } else {
5140  importer = import_export::create_importer(cat, td, copy_from_source, copy_params);
5141  }
5142  auto ms = measure<>::execution([&]() { importer->import(session_ptr.get()); });
5143  LOG(INFO) << "Total Import Time: " << (double)ms / 1000.0 << " Seconds.";
5144  } catch (const TDBException& e) {
5145  throw;
5146  } catch (const std::exception& e) {
5147  THROW_DB_EXCEPTION(std::string(e.what()));
5148  }
5149 }
5150 
5151 namespace {
5152 
5153 // helper functions for error checking below
5154 // these would usefully be added as methods of TDatumType
5155 // but that's not possible as it's auto-generated by Thrift
5156 
5158  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
5159  t == TDatumType::LINESTRING || t == TDatumType::POINT);
5160 }
5161 
5163  std::stringstream ss;
5164  ss << t;
5165  return ss.str();
5166 }
5167 
5168 } // namespace
5169 
5170 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
5171  THROW_DB_EXCEPTION("Could not append geo/raster file '" + \
5172  file_path.filename().string() + "' to table '" + table_name + \
5173  "'. Column '" + cd->columnName + "' " + attr + " mismatch (got '" + \
5174  got + "', expected '" + expected + "')");
5175 
5176 void DBHandler::import_geo_table(const TSessionId& session,
5177  const std::string& table_name,
5178  const std::string& file_name,
5179  const TCopyParams& cp,
5180  const TRowDescriptor& row_desc,
5181  const TCreateParams& create_params) {
5182  // this is the direct Thrift endpoint
5183  // it does NOT support the separate FSI regex/filter/sort options
5184  // but it DOES support basic globbing specified in the filename itself
5186  session, table_name, file_name, thrift_to_copyparams(cp), row_desc, create_params);
5187 }
5188 
5189 void DBHandler::importGeoTableGlobFilterSort(const TSessionId& session,
5190  const std::string& table_name,
5191  const std::string& file_name,
5192  const import_export::CopyParams& copy_params,
5193  const TRowDescriptor& row_desc,
5194  const TCreateParams& create_params) {
5195  // this is called by the above direct Thrift endpoint
5196  // and also for a deferred COPY FROM for geo/raster
5197  // it DOES support the full FSI regex/filter/sort options
5198  std::vector<std::string> file_names;
5199  try {
5200  const shared::FilePathOptions options{copy_params.regex_path_filter,
5201  copy_params.file_sort_order_by,
5202  copy_params.file_sort_regex};
5204  file_names = shared::local_glob_filter_sort_files(file_name, options, false);
5205  } catch (const shared::FileNotFoundException& e) {
5206  // no files match, just try the original filename, might be remote
5207  file_names.push_back(file_name);
5208  }
5209  // import whatever we found
5210  for (auto const& file_name : file_names) {
5212  session, table_name, file_name, copy_params, row_desc, create_params);
5213  }
5214 }
5215 
5216 void DBHandler::importGeoTableSingle(const TSessionId& session,
5217  const std::string& table_name,
5218  const std::string& file_name_in,
5219  const import_export::CopyParams& copy_params,
5220  const TRowDescriptor& row_desc,
5221  const TCreateParams& create_params) {
5222  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
5223  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5224  auto session_ptr = stdlog.getConstSessionInfo();
5225  check_read_only("import_table");
5226 
5227  auto& cat = session_ptr->getCatalog();
5229  auto start_time = ::toString(std::chrono::system_clock::now());
5231  executor->enrollQuerySession(session,
5232  "IMPORT_GEO_TABLE",
5233  start_time,
5235  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5236  }
5237 
5238  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
5239  // reset the runtime query interrupt status
5241  executor->clearQuerySessionStatus(session, start_time);
5242  }
5243  };
5244 
5245  std::string file_name{file_name_in};
5246 
5247  if (path_is_relative(file_name)) {
5248  // assume relative paths are relative to data_path / import / <session>
5249  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5250  boost::filesystem::path(file_name).filename();
5251  file_name = file_path.string();
5252  }
5254 
5255  bool is_raster = false;
5256  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
5257  if (is_a_supported_archive_file(file_name)) {
5258  // find the archive file
5259  add_vsi_network_prefix(file_name);
5260  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
5261  THROW_DB_EXCEPTION("Archive does not exist: " + file_name_in);
5262  }
5263  // find geo file in archive
5264  add_vsi_archive_prefix(file_name);
5265  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
5266  // prepare to load that geo file
5267  if (geo_file.size()) {
5268  file_name = file_name + std::string("/") + geo_file;
5269  }
5270  } else {
5271  // prepare to load geo file directly
5272  add_vsi_network_prefix(file_name);
5273  add_vsi_geo_prefix(file_name);
5274  }
5275  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
5276  // prepare to load geo raster file directly
5277  add_vsi_network_prefix(file_name);
5278  add_vsi_geo_prefix(file_name);
5279  is_raster = true;
5280  } else {
5281  THROW_DB_EXCEPTION("import_geo_table called with file_type other than GEO or RASTER");
5282  }
5283 
5284  // log what we're about to try to do
5285  VLOG(1) << "import_geo_table: Original filename: " << file_name_in;
5286  VLOG(1) << "import_geo_table: Actual filename: " << file_name;
5287  VLOG(1) << "import_geo_table: Raster: " << is_raster;
5288 
5289  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
5290  auto file_path = boost::filesystem::path(file_name);
5291  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
5292  THROW_DB_EXCEPTION("File does not exist: " + file_path.filename().string());
5293  }
5294 
5295  // use GDAL to check any dependent files exist (ditto)
5296  try {
5297  check_geospatial_files(file_path, copy_params);
5298  } catch (const std::exception& e) {
5299  THROW_DB_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5300  }
5301 
5302  // get layer info and deconstruct
5303  // in general, we will get a combination of layers of these four types:
5304  // EMPTY: no rows, report and skip
5305  // GEO: create a geo table from this
5306  // NON_GEO: create a regular table from this
5307  // UNSUPPORTED_GEO: report and skip
5308  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
5309  if (!is_raster) {
5310  try {
5311  layer_info =
5312  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
5313  } catch (const std::exception& e) {
5314  THROW_DB_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5315  }
5316  }
5317 
5318  // categorize the results
5319  using LayerNameToContentsMap =
5320  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
5321  LayerNameToContentsMap load_layers;
5322  LOG_IF(INFO, layer_info.size() > 0)
5323  << "import_geo_table: Found the following layers in the geo file:";
5324  for (const auto& layer : layer_info) {
5325  switch (layer.contents) {
5327  LOG(INFO) << "import_geo_table: '" << layer.name
5328  << "' (will import as geo table)";
5329  load_layers[layer.name] = layer.contents;
5330  break;
5332  LOG(INFO) << "import_geo_table: '" << layer.name
5333  << "' (will import as regular table)";
5334  load_layers[layer.name] = layer.contents;
5335  break;
5337  LOG(WARNING) << "import_geo_table: '" << layer.name
5338  << "' (will not import, unsupported geo type)";
5339  break;
5341  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
5342  break;
5343  default:
5344  break;
5345  }
5346  }
5347 
5348  // if nothing is loadable, stop now
5349  if (!is_raster && load_layers.size() == 0) {
5350  THROW_DB_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
5351  }
5352 
5353  // if we've been given an explicit layer name, check that it exists and is loadable
5354  // scan the original list, as it may exist but not have been gathered as loadable
5355  if (!is_raster && copy_params.geo_layer_name.size()) {
5356  bool found = false;
5357  for (const auto& layer : layer_info) {
5358  if (copy_params.geo_layer_name == layer.name) {
5361  // forget all the other layers and just load this one
5362  load_layers.clear();
5363  load_layers[layer.name] = layer.contents;
5364  found = true;
5365  break;
5366  } else if (layer.contents ==
5368  THROW_DB_EXCEPTION("import_geo_table: Explicit geo layer '" +
5369  copy_params.geo_layer_name + "' has unsupported geo type!");
5370  } else if (layer.contents ==
5372  THROW_DB_EXCEPTION("import_geo_table: Explicit geo layer '" +
5373  copy_params.geo_layer_name + "' is empty!");
5374  }
5375  }
5376  }
5377  if (!found) {
5378  THROW_DB_EXCEPTION("import_geo_table: Explicit geo layer '" +
5379  copy_params.geo_layer_name + "' not found!");
5380  }
5381  }
5382 
5383  // Immerse import of multiple layers is not yet supported
5384  // @TODO fix this!
5385  if (!is_raster && row_desc.size() > 0 && load_layers.size() > 1) {
5387  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
5388  }
5389 
5390  // one definition of layer table name construction
5391  // we append the layer name if we're loading more than one table
5392  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
5393  const std::string& layer_name) {
5394  if (load_layers.size() > 1) {
5395  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
5396  if (sanitized_layer_name != layer_name) {
5397  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
5398  << sanitized_layer_name << "' for table name";
5399  }
5400  return table_name + "_" + sanitized_layer_name;
5401  }
5402  return table_name;
5403  };
5404 
5405  // if we're importing multiple tables, then NONE of them must exist already
5406  if (!is_raster && load_layers.size() > 1) {
5407  for (const auto& layer : load_layers) {
5408  // construct table name
5409  auto this_table_name = construct_layer_table_name(table_name, layer.first);
5410 
5411  // table must not exist
5412  if (cat.getMetadataForTable(this_table_name)) {
5413  THROW_DB_EXCEPTION("import_geo_table: Table '" + this_table_name +
5414  "' already exists, aborting!");
5415  }
5416  }
5417  }
5418 
5419  // prepare to gather errors that would otherwise be exceptions, as we can only throw
5420  // one
5421  std::vector<std::string> caught_exception_messages;
5422 
5423  // prepare to time multi-layer import
5424  double total_import_ms = 0.0;
5425 
5426  // for geo raster, we make a single dummy layer
5427  // the name is irrelevant, but set it to the filename so the log makes sense
5428  if (is_raster) {
5429  CHECK_EQ(load_layers.size(), 0u);
5430  load_layers.emplace(file_name, import_export::Importer::GeoFileLayerContents::GEO);
5431  }
5432 
5433  // now we're safe to start importing
5434  // we loop over the layers we're going to attempt to load
5435  for (const auto& layer : load_layers) {
5436  // unpack
5437  const auto& layer_name = layer.first;
5438  const auto& layer_contents = layer.second;
5439  bool is_geo_layer =
5441 
5442  // construct table name again
5443  auto this_table_name = construct_layer_table_name(table_name, layer_name);
5444 
5445  // report
5446  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
5447 
5448  // we need a row descriptor
5449  TRowDescriptor rd;
5450  if (row_desc.size() > 0) {
5451  // we have a valid RowDescriptor
5452  // this is the case where Immerse has already detected and created
5453  // all we need to do is import and trust that the data will match
5454  // use the provided row descriptor
5455  // table must already exist (we check this below)
5456  rd = row_desc;
5457  } else {
5458  // we don't have a RowDescriptor
5459  // we have to detect the file ourselves
5460  TDetectResult cds;
5461  TCopyParams cp_copy = copyparams_to_thrift(copy_params);
5462  cp_copy.geo_layer_name = layer_name;
5463  try {
5464  detect_column_types(cds, session, file_name_in, cp_copy);
5465  } catch (const std::exception& e) {
5466  // capture the error and abort this layer
5467  caught_exception_messages.emplace_back("Column Type Detection failed for '" +
5468  layer_name + "':" + e.what());
5469  continue;
5470  }
5471  rd = cds.row_set.row_desc;
5472 
5473  // then, if the table does NOT already exist, create it
5474  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
5475  if (!td) {
5476  try {
5477  create_table(session, this_table_name, rd, create_params);
5478  } catch (const std::exception& e) {
5479  // capture the error and abort this layer
5480  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
5481  layer_name + "':" + e.what());
5482  continue;
5483  }
5484  }
5485  }
5486 
5487  // match locking sequence for CopyTableStmt::execute
5491 
5492  const TableDescriptor* td{nullptr};
5493  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>> td_with_lock;
5494  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5495 
5496  try {
5497  td_with_lock =
5498  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
5500  lockmgr::ReadLock>::acquireTableDescriptor(cat, this_table_name));
5501  td = (*td_with_lock)();
5502  insert_data_lock = std::make_unique<lockmgr::WriteLock>(