OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DBHandler.h"
24 #include "DistributedLoader.h"
25 #include "TokenCompletionHints.h"
26 
27 #ifdef HAVE_PROFILER
28 #include <gperftools/heap-profiler.h>
29 #endif // HAVE_PROFILER
30 
31 #include "MapDRelease.h"
32 
33 #include "Calcite/Calcite.h"
34 #include "gen-cpp/CalciteServer.h"
35 
38 
39 #include "Catalog/Catalog.h"
44 #include "DistributedHandler.h"
46 #include "Geospatial/ColumnNames.h"
47 #include "Geospatial/Compression.h"
48 #include "Geospatial/GDAL.h"
49 #include "Geospatial/Types.h"
50 #include "ImportExport/Importer.h"
51 #include "LockMgr/LockMgr.h"
53 #include "Parser/ParserWrapper.h"
57 #include "QueryEngine/Execute.h"
67 #include "RequestInfo.h"
68 #include "Shared/ArrowUtil.h"
69 #include "Shared/DateTimeParser.h"
70 #include "Shared/StringTransform.h"
71 #include "Shared/SysDefinitions.h"
72 #include "Shared/file_path_util.h"
74 #include "Shared/import_helpers.h"
75 #include "Shared/measure.h"
76 #include "Shared/misc.h"
77 #include "Shared/scope.h"
79 
80 #ifdef HAVE_AWS_S3
81 #include <aws/core/auth/AWSCredentialsProviderChain.h>
82 #endif
83 #include <fcntl.h>
84 #include <picosha2.h>
85 #include <sys/types.h>
86 #include <algorithm>
87 #include <boost/algorithm/string.hpp>
88 #include <boost/filesystem.hpp>
89 #include <boost/make_shared.hpp>
90 #include <boost/process/search_path.hpp>
91 #include <boost/program_options.hpp>
92 #include <boost/tokenizer.hpp>
93 #include <chrono>
94 #include <cmath>
95 #include <csignal>
96 #include <fstream>
97 #include <future>
98 #include <map>
99 #include <memory>
100 #include <random>
101 #include <string>
102 #include <thread>
103 #include <typeinfo>
104 
105 #include <arrow/api.h>
106 #include <arrow/io/api.h>
107 #include <arrow/ipc/api.h>
108 
109 #include "Shared/ArrowUtil.h"
110 #include "Shared/distributed.h"
112 
113 #ifdef ENABLE_IMPORT_PARQUET
114 extern bool g_enable_parquet_import_fsi;
115 #endif
116 
117 #ifdef HAVE_AWS_S3
118 extern bool g_allow_s3_server_privileges;
119 #endif
120 
121 extern bool g_enable_system_tables;
124 
127 
128 #define INVALID_SESSION_ID ""
129 
130 #define SET_REQUEST_ID(parent_request_id) \
131  if (g_uniform_request_ids_per_thrift_call && parent_request_id) \
132  logger::set_request_id(parent_request_id); \
133  else if (logger::set_new_request_id(); parent_request_id) \
134  LOG(INFO) << "This request has parent request_id(" << parent_request_id << ')'
135 
136 #define THROW_DB_EXCEPTION(errstr) \
137  { \
138  TDBException ex; \
139  ex.error_msg = errstr; \
140  LOG(ERROR) << ex.error_msg; \
141  throw ex; \
142  }
143 
144 thread_local std::string TrackingProcessor::client_address;
146 
147 namespace {
148 
150  const int32_t user_id,
151  const std::string& dashboard_name) {
152  return (cat.getMetadataForDashboard(std::to_string(user_id), dashboard_name));
153 }
154 
155 struct ForceDisconnect : public std::runtime_error {
156  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
157 };
158 
159 } // namespace
160 
161 #ifdef ENABLE_GEOS
162 extern std::unique_ptr<std::string> g_libgeos_so_filename;
163 #endif
164 
165 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
166  const std::vector<LeafHostInfo>& string_leaves,
167  const std::string& base_data_path,
168  const bool allow_multifrag,
169  const bool jit_debug,
170  const bool intel_jit_profile,
171  const bool read_only,
172  const bool allow_loop_joins,
173  const bool enable_rendering,
174  const bool renderer_use_ppll_polys,
175  const bool renderer_prefer_igpu,
176  const unsigned renderer_vulkan_timeout_ms,
177  const bool renderer_use_parallel_executors,
178  const bool enable_auto_clear_render_mem,
179  const int render_oom_retry_threshold,
180  const size_t render_mem_bytes,
181  const size_t max_concurrent_render_sessions,
182  const size_t reserved_gpu_mem,
183  const bool render_compositor_use_last_gpu,
184  const size_t num_reader_threads,
185  const AuthMetadata& authMetadata,
186  SystemParameters& system_parameters,
187  const bool legacy_syntax,
188  const int idle_session_duration,
189  const int max_session_duration,
190  const std::string& udf_filename,
191  const std::string& clang_path,
192  const std::vector<std::string>& clang_options,
193 #ifdef ENABLE_GEOS
194  const std::string& libgeos_so_filename,
195 #endif
196  const File_Namespace::DiskCacheConfig& disk_cache_config,
197  const bool is_new_db)
198  : leaf_aggregator_(db_leaves)
199  , db_leaves_(db_leaves)
200  , string_leaves_(string_leaves)
201  , base_data_path_(base_data_path)
202  , random_gen_(std::random_device{}())
203  , session_id_dist_(0, INT32_MAX)
204  , jit_debug_(jit_debug)
205  , intel_jit_profile_(intel_jit_profile)
206  , allow_multifrag_(allow_multifrag)
207  , read_only_(read_only)
208  , allow_loop_joins_(allow_loop_joins)
209  , authMetadata_(authMetadata)
210  , system_parameters_(system_parameters)
211  , legacy_syntax_(legacy_syntax)
212  , dispatch_queue_(
213  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
214  , super_user_rights_(false)
215  , idle_session_duration_(idle_session_duration * 60)
216  , max_session_duration_(max_session_duration * 60)
217  , enable_rendering_(enable_rendering)
218  , renderer_use_ppll_polys_(renderer_use_ppll_polys)
219  , renderer_prefer_igpu_(renderer_prefer_igpu)
220  , renderer_vulkan_timeout_(renderer_vulkan_timeout_ms)
221  , renderer_use_parallel_executors_(renderer_use_parallel_executors)
222  , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
223  , render_oom_retry_threshold_(render_oom_retry_threshold)
224  , max_concurrent_render_sessions_(max_concurrent_render_sessions)
225  , reserved_gpu_mem_(reserved_gpu_mem)
226  , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
227  , render_mem_bytes_(render_mem_bytes)
228  , num_reader_threads_(num_reader_threads)
229 #ifdef ENABLE_GEOS
230  , libgeos_so_filename_(libgeos_so_filename)
231 #endif
232  , disk_cache_config_(disk_cache_config)
233  , udf_filename_(udf_filename)
234  , clang_path_(clang_path)
235  , clang_options_(clang_options)
236  , max_num_sessions_(-1) {
237  LOG(INFO) << "HeavyDB Server " << MAPD_RELEASE;
238  initialize(is_new_db);
239  resetSessionsStore();
240 }
241 
243  if (sessions_store_) {
244  // Disconnect any existing sessions.
245  auto sessions = sessions_store_->getAllSessions();
246  for (auto session : sessions) {
247  sessions_store_->disconnect(session->get_session_id());
248  }
249  }
252  1,
256  [this](auto& session_ptr) { disconnect_impl(session_ptr); });
257 }
258 
259 void DBHandler::initialize(const bool is_new_db) {
260  if (!initialized_) {
261  initialized_ = true;
262  } else {
264  "Server already initialized; service restart required to activate any new "
265  "entitlements.");
266  return;
267  }
268 
271  cpu_mode_only_ = true;
272  } else {
273 #ifdef HAVE_CUDA
275  cpu_mode_only_ = false;
276 #else
278  LOG(WARNING) << "This build isn't CUDA enabled, will run on CPU";
279  cpu_mode_only_ = true;
280 #endif
281  }
282 
283  bool is_rendering_enabled = enable_rendering_;
284  if (system_parameters_.num_gpus == 0) {
285  is_rendering_enabled = false;
286  }
287 
288  const auto data_path =
289  boost::filesystem::path(base_data_path_) / shared::kDataDirectoryName;
290  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
291  // manage cannot ask for
292  size_t total_reserved = reserved_gpu_mem_;
293  if (is_rendering_enabled) {
294  total_reserved += render_mem_bytes_;
295  }
296 
297  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
298 #ifdef HAVE_CUDA
299  if (!cpu_mode_only_ || is_rendering_enabled) {
300  try {
301  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
303  } catch (const std::exception& e) {
304  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
305  << e.what();
307  cpu_mode_only_ = true;
308  is_rendering_enabled = false;
309  }
310  }
311 #endif // HAVE_CUDA
312 
313  try {
314  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
316  std::move(cuda_mgr),
318  total_reserved,
321  } catch (const std::exception& e) {
322  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
323  }
324 
325  std::string udf_ast_filename("");
326 
327  try {
328  if (!udf_filename_.empty()) {
329  const auto cuda_mgr = data_mgr_->getCudaMgr();
330  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
331  cuda_mgr ? cuda_mgr->getDeviceArch()
333  UdfCompiler compiler(device_arch, clang_path_, clang_options_);
334 
335  const auto [cpu_udf_ir_file, cuda_udf_ir_file] = compiler.compileUdf(udf_filename_);
336  Executor::addUdfIrToModule(cpu_udf_ir_file, /*is_cuda_ir=*/false);
337  if (!cuda_udf_ir_file.empty()) {
338  Executor::addUdfIrToModule(cuda_udf_ir_file, /*is_cuda_ir=*/true);
339  }
340  udf_ast_filename = compiler.getAstFileName(udf_filename_);
341  }
342  } catch (const std::exception& e) {
343  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
344  }
345 
346  try {
347  calcite_ =
348  std::make_shared<Calcite>(system_parameters_, base_data_path_, udf_ast_filename);
349  } catch (const std::exception& e) {
350  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
351  }
352 
353  try {
354  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
355  if (!udf_filename_.empty()) {
356  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
357  }
358  } catch (const std::exception& e) {
359  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
360  }
361 
362  try {
364  } catch (const std::exception& e) {
365  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
366  }
367 
368  try {
369  auto udtfs = ThriftSerializers::to_thrift(
371  std::vector<TUserDefinedFunction> udfs = {};
372  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
373  } catch (const std::exception& e) {
374  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
375  }
376 
377  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
379  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
380  cpu_mode_only_ = true;
381  }
382 
383  LOG(INFO) << "Started in " << executor_device_type_ << " mode.";
384 
385  try {
387  SysCatalog::instance().init(base_data_path_,
388  data_mgr_,
390  calcite_,
391  is_new_db,
392  !db_leaves_.empty(),
394  } catch (const std::exception& e) {
395  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
396  }
397 
398  import_path_ = boost::filesystem::path(base_data_path_) / shared::kDefaultImportDirName;
399  start_time_ = std::time(nullptr);
400 
401  if (is_rendering_enabled) {
402  try {
403  render_handler_.reset(new RenderHandler(this,
407  false,
408  0,
409  false,
414  } catch (const std::exception& e) {
415  LOG(ERROR) << "Backend rendering disabled: " << e.what();
416  }
417  }
418 
420 
421 #ifdef ENABLE_GEOS
422  if (!libgeos_so_filename_.empty()) {
423  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename_));
424  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
425  }
426 #endif
427 }
428 
430  shutdown();
431 }
432 
433 void DBHandler::check_read_only(const std::string& str) {
434  if (DBHandler::read_only_) {
435  THROW_DB_EXCEPTION(str + " disabled: server running in read-only mode.");
436  }
437 }
438 
440  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
441  // We would create an in memory session for calcite with super user privileges which
442  // would be used for getting all tables metadata when a user runs the query. The
443  // session would be under the name of a proxy user/password which would only persist
444  // till server's lifetime or execution of calcite query(in memory) whichever is the
445  // earliest.
447  std::string session_id;
448  do {
450  } while (calcite_sessions_.find(session_id) != calcite_sessions_.end());
451  Catalog_Namespace::UserMetadata user_meta(-1,
452  calcite_->getInternalSessionProxyUserName(),
453  calcite_->getInternalSessionProxyPassword(),
454  true,
455  -1,
456  true,
457  false);
458  const auto emplace_ret = calcite_sessions_.emplace(
459  session_id,
460  std::make_shared<Catalog_Namespace::SessionInfo>(
461  catalog_ptr, user_meta, executor_device_type_, session_id));
462  CHECK(emplace_ret.second);
463  return session_id;
464 }
465 
466 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
467  // Remove InMemory calcite Session.
469  CHECK(calcite_sessions_.erase(session_id)) << session_id;
470 }
471 
472 // internal connection for connections with no password
473 void DBHandler::internal_connect(TSessionId& session_id,
474  const std::string& username,
475  const std::string& dbname) {
477  auto stdlog = STDLOG(); // session_id set by connect_impl()
478  std::string username2 = username; // login() may reset username given as argument
479  std::string dbname2 = dbname; // login() may reset dbname given as argument
481  std::shared_ptr<Catalog> cat = nullptr;
482  try {
483  cat =
484  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
485  } catch (std::exception& e) {
486  THROW_DB_EXCEPTION(e.what());
487  }
488 
489  DBObject dbObject(dbname2, DatabaseDBObjectType);
490  dbObject.loadKey(*cat);
492  std::vector<DBObject> dbObjects;
493  dbObjects.push_back(dbObject);
494  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
495  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
496  " is not allowed to access database " + dbname2 + ".");
497  }
498  connect_impl(session_id, std::string(), dbname2, user_meta, cat, stdlog);
499 }
500 
502  return leaf_aggregator_.leafCount() > 0;
503 }
504 
505 void DBHandler::krb5_connect(TKrb5Session& session,
506  const std::string& inputToken,
507  const std::string& dbname) {
508  THROW_DB_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
509 }
510 
511 void DBHandler::connect(TSessionId& session_id,
512  const std::string& username,
513  const std::string& passwd,
514  const std::string& dbname) {
516  auto stdlog = STDLOG(); // session_info set by connect_impl()
517  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
518  std::string username2 = username; // login() may reset username given as argument
519  std::string dbname2 = dbname; // login() may reset dbname given as argument
521  std::shared_ptr<Catalog> cat = nullptr;
522  try {
523  cat = SysCatalog::instance().login(
524  dbname2, username2, passwd, user_meta, !super_user_rights_);
525  } catch (std::exception& e) {
526  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
527  THROW_DB_EXCEPTION(e.what());
528  }
529 
530  DBObject dbObject(dbname2, DatabaseDBObjectType);
531  dbObject.loadKey(*cat);
533  std::vector<DBObject> dbObjects;
534  dbObjects.push_back(dbObject);
535  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
536  stdlog.appendNameValuePairs(
537  "user", username, "db", dbname, "exception", "Missing Privileges");
538  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
539  " is not allowed to access database " + dbname2 + ".");
540  }
541  connect_impl(session_id, passwd, dbname2, user_meta, cat, stdlog);
542 
543  // if pki auth session_id will come back encrypted with user pubkey
544  SysCatalog::instance().check_for_session_encryption(passwd, session_id);
545 }
546 
547 void DBHandler::connect_impl(TSessionId& session_id,
548  const std::string& passwd,
549  const std::string& dbname,
550  const Catalog_Namespace::UserMetadata& user_meta,
551  std::shared_ptr<Catalog> cat,
552  query_state::StdLog& stdlog) {
553  // TODO(sy): Is there any reason to have dbname as a parameter
554  // here when the cat parameter already provides cat->name()?
555  // Should dbname and cat->name() ever differ?
556  auto session_ptr = sessions_store_->add(user_meta, cat, executor_device_type_);
557  session_id = session_ptr->get_session_id();
558  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
559  stdlog.setSessionInfo(session_ptr);
560  session_ptr->set_connection_info(getConnectionInfo().toString());
561  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
562  // while doing warmup
563  }
564  auto const roles =
565  stdlog.getConstSessionInfo()->get_currentUser().isSuper
566  ? std::vector<std::string>{{"super"}}
567  : SysCatalog::instance().getRoles(
568  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
569  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
570 }
571 
572 void DBHandler::disconnect(const TSessionId& session_id_or_json) {
573  heavyai::RequestInfo const request_info(session_id_or_json);
574  SET_REQUEST_ID(request_info.requestId());
575  auto session_ptr = get_session_ptr(request_info.sessionId());
576  auto stdlog = STDLOG(session_ptr, "client", getConnectionInfo().toString());
577  sessions_store_->disconnect(request_info.sessionId());
578 }
579 
581  const auto session_id = session_ptr->get_session_id();
582  std::exception_ptr leaf_exception = nullptr;
583  try {
584  if (leaf_aggregator_.leafCount() > 0) {
585  leaf_aggregator_.disconnect(session_id);
586  }
587  } catch (...) {
588  leaf_exception = std::current_exception();
589  }
590 
591  {
592  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
593  render_group_assignment_map_.erase(session_id);
594  }
595 
596  if (render_handler_) {
597  render_handler_->disconnect(session_id);
598  }
599 
600  if (leaf_exception) {
601  std::rethrow_exception(leaf_exception);
602  }
603 }
604 
605 void DBHandler::switch_database(const TSessionId& session_id_or_json,
606  const std::string& dbname) {
607  heavyai::RequestInfo const request_info(session_id_or_json);
608  SET_REQUEST_ID(request_info.requestId());
609  auto session_ptr = get_session_ptr(request_info.sessionId());
610  auto stdlog = STDLOG(session_ptr);
611  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
612  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
613  try {
614  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
615  dbname2, session_ptr->get_currentUser().userName);
616  session_ptr->set_catalog_ptr(cat);
617  if (leaf_aggregator_.leafCount() > 0) {
618  leaf_aggregator_.switch_database(request_info.sessionId(), dbname);
619  return;
620  }
621  } catch (std::exception& e) {
622  THROW_DB_EXCEPTION(e.what());
623  }
624 }
625 
626 void DBHandler::clone_session(TSessionId& session2_id,
627  const TSessionId& session1_id_or_json) {
628  heavyai::RequestInfo const request_info(session1_id_or_json);
629  SET_REQUEST_ID(request_info.requestId());
630  auto session1_ptr = get_session_ptr(request_info.sessionId());
631  auto stdlog = STDLOG(session1_ptr);
632  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
633 
634  try {
635  const Catalog_Namespace::UserMetadata& user_meta = session1_ptr->get_currentUser();
636  std::shared_ptr<Catalog> cat = session1_ptr->get_catalog_ptr();
637  auto session2_ptr = sessions_store_->add(user_meta, cat, executor_device_type_);
638  session2_id = session2_ptr->get_session_id();
639  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database "
640  << cat->name();
641  if (leaf_aggregator_.leafCount() > 0) {
642  leaf_aggregator_.clone_session(request_info.sessionId(), session2_id);
643  return;
644  }
645  } catch (std::exception& e) {
646  THROW_DB_EXCEPTION(e.what());
647  }
648 }
649 
650 void DBHandler::interrupt(const TSessionId& query_session_id_or_json,
651  const TSessionId& interrupt_session_id_or_json) {
652  // if this is for distributed setting, query_session becomes a parent session (agg)
653  // and the interrupt session is one of existing session in the leaf node (leaf)
654  // so we can think there exists a logical mapping
655  // between query_session (agg) and interrupt_session (leaf)
656  heavyai::RequestInfo const query_request_info(query_session_id_or_json);
657  heavyai::RequestInfo const interrupt_request_info(interrupt_session_id_or_json);
658  SET_REQUEST_ID(interrupt_request_info.requestId());
659  auto session_ptr = get_session_ptr(interrupt_request_info.sessionId());
660  auto& cat = session_ptr->getCatalog();
661  auto stdlog = STDLOG(session_ptr);
662  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
663  const auto allow_query_interrupt =
665  if (g_enable_dynamic_watchdog || allow_query_interrupt) {
666  const auto dbname = cat.getCurrentDB().dbName;
668  jit_debug_ ? "/tmp" : "",
669  jit_debug_ ? "mapdquery" : "",
671  CHECK(executor);
672 
673  if (leaf_aggregator_.leafCount() > 0) {
674  leaf_aggregator_.interrupt(query_request_info.sessionId(),
675  interrupt_request_info.sessionId());
676  }
677  auto target_executor_ids =
678  executor->getExecutorIdsRunningQuery(query_request_info.sessionId());
679  if (target_executor_ids.empty()) {
681  executor->getSessionLock());
682  if (executor->checkIsQuerySessionEnrolled(query_request_info.sessionId(),
683  session_read_lock)) {
684  session_read_lock.unlock();
685  VLOG(1) << "Received interrupt: "
686  << "User " << session_ptr->get_currentUser().userLoggable()
687  << ", Database " << dbname << std::endl;
688  executor->interrupt(query_request_info.sessionId(),
689  interrupt_request_info.sessionId());
690  }
691  } else {
692  for (auto& executor_id : target_executor_ids) {
693  VLOG(1) << "Received interrupt: "
694  << "Executor " << executor_id << ", User "
695  << session_ptr->get_currentUser().userLoggable() << ", Database "
696  << dbname << std::endl;
697  auto target_executor = Executor::getExecutor(executor_id);
698  target_executor->interrupt(query_request_info.sessionId(),
699  interrupt_request_info.sessionId());
700  }
701  }
702 
703  LOG(INFO) << "User " << session_ptr->get_currentUser().userName
704  << " interrupted session with database " << dbname << std::endl;
705  }
706 }
707 
709  if (g_cluster) {
710  if (leaf_aggregator_.leafCount() > 0) {
711  return TRole::type::AGGREGATOR;
712  }
713  return TRole::type::LEAF;
714  }
715  return TRole::type::SERVER;
716 }
717 void DBHandler::get_server_status(TServerStatus& _return,
718  const TSessionId& session_id_or_json) {
719  heavyai::RequestInfo const request_info(session_id_or_json);
720  SET_REQUEST_ID(request_info.requestId());
721  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
722  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
723  const auto rendering_enabled = bool(render_handler_);
724  _return.read_only = read_only_;
725  _return.version = MAPD_RELEASE;
726  _return.rendering_enabled = rendering_enabled;
727  _return.start_time = start_time_;
728  _return.edition = MAPD_EDITION;
729  _return.host_name = heavyai::get_hostname();
730  _return.poly_rendering_enabled = rendering_enabled;
731  _return.role = getServerRole();
732  _return.renderer_status_json =
733  render_handler_ ? render_handler_->get_renderer_status_json() : "";
734 }
735 
736 void DBHandler::get_status(std::vector<TServerStatus>& _return,
737  const TSessionId& session_id_or_json) {
738  //
739  // get_status() is now called locally at startup on the aggregator
740  // in order to validate that all nodes of a cluster are running the
741  // same software version and the same renderer status
742  //
743  // In that context, it is called with the InvalidSessionID, and
744  // with the local super-user flag set.
745  //
746  // Hence, we allow this session-less mode only in distributed mode, and
747  // then on a leaf (always), or on the aggregator (only in super-user mode)
748  //
749  heavyai::RequestInfo const request_info(session_id_or_json);
750  SET_REQUEST_ID(request_info.requestId());
751  auto const allow_invalid_session = g_cluster && (!isAggregator() || super_user_rights_);
752 
753  if (!allow_invalid_session || request_info.sessionId() != getInvalidSessionId()) {
754  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
755  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
756  } else {
757  LOG(INFO) << "get_status() called in session-less mode";
758  }
759  const auto rendering_enabled = bool(render_handler_);
760  TServerStatus ret;
761  ret.read_only = read_only_;
762  ret.version = MAPD_RELEASE;
763  ret.rendering_enabled = rendering_enabled;
764  ret.start_time = start_time_;
765  ret.edition = MAPD_EDITION;
766  ret.host_name = heavyai::get_hostname();
767  ret.poly_rendering_enabled = rendering_enabled;
768  ret.role = getServerRole();
769  ret.renderer_status_json =
770  render_handler_ ? render_handler_->get_renderer_status_json() : "";
771 
772  _return.push_back(ret);
773  if (leaf_aggregator_.leafCount() > 0) {
774  std::vector<TServerStatus> leaf_status =
775  leaf_aggregator_.getLeafStatus(request_info.sessionId());
776  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
777  }
778 }
779 
780 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
781  const TSessionId& session_id_or_json) {
782  heavyai::RequestInfo const request_info(session_id_or_json);
783  SET_REQUEST_ID(request_info.requestId());
784  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
785  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
786  THardwareInfo ret;
787  const auto cuda_mgr = data_mgr_->getCudaMgr();
788  if (cuda_mgr) {
789  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
790  ret.start_gpu = cuda_mgr->getStartGpu();
791  if (ret.start_gpu >= 0) {
792  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
793  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
794  }
795  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
796  TGpuSpecification gpu_spec;
797  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
798  gpu_spec.num_sm = deviceProperties->numMPs;
799  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
800  gpu_spec.memory = deviceProperties->globalMem;
801  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
802  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
803  ret.gpu_info.push_back(gpu_spec);
804  }
805  }
806 
807  // start hardware/OS dependent code
808  ret.num_cpu_hw = std::thread::hardware_concurrency();
809  // ^ This might return diffrent results in case of hyper threading
810  // end hardware/OS dependent code
811 
812  _return.hardware_info.push_back(ret);
813 }
814 
815 void DBHandler::get_session_info(TSessionInfo& _return,
816  const TSessionId& session_id_or_json) {
817  heavyai::RequestInfo const request_info(session_id_or_json);
818  SET_REQUEST_ID(request_info.requestId());
819  auto session_ptr = get_session_ptr(request_info.sessionId());
820  CHECK(session_ptr);
821  auto stdlog = STDLOG(session_ptr);
822  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
823  auto user_metadata = session_ptr->get_currentUser();
824  _return.user = user_metadata.userName;
825  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
826  _return.start_time = session_ptr->get_start_time();
827  _return.is_super = user_metadata.isSuper;
828 }
829 
830 void DBHandler::set_leaf_info(const TSessionId& session, const TLeafInfo& info) {
831  g_distributed_leaf_idx = info.leaf_id;
832  g_distributed_num_leaves = info.num_leaves;
833 }
834 
836  const SQLTypeInfo& ti,
837  TColumn& column) {
838  if (ti.is_array()) {
839  TColumn tColumn;
840  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
841  CHECK(array_tv);
842  bool is_null = !array_tv->is_initialized();
843  if (!is_null) {
844  const auto& vec = array_tv->get();
845  for (const auto& elem_tv : vec) {
846  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
847  }
848  }
849  column.data.arr_col.push_back(tColumn);
850  column.nulls.push_back(is_null && !ti.get_notnull());
851  } else if (ti.is_geometry()) {
852  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
853  if (scalar_tv) {
854  auto s_n = boost::get<NullableString>(scalar_tv);
855  auto s = boost::get<std::string>(s_n);
856  if (s) {
857  column.data.str_col.push_back(*s);
858  } else {
859  column.data.str_col.emplace_back(""); // null string
860  auto null_p = boost::get<void*>(s_n);
861  CHECK(null_p && !*null_p);
862  }
863  column.nulls.push_back(!s && !ti.get_notnull());
864  } else {
865  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
866  CHECK(array_tv);
867  bool is_null = !array_tv->is_initialized();
868  if (!is_null) {
869  auto elem_type = SQLTypeInfo(kDOUBLE, false);
870  TColumn tColumn;
871  const auto& vec = array_tv->get();
872  for (const auto& elem_tv : vec) {
873  value_to_thrift_column(elem_tv, elem_type, tColumn);
874  }
875  column.data.arr_col.push_back(tColumn);
876  column.nulls.push_back(false);
877  } else {
878  TColumn tColumn;
879  column.data.arr_col.push_back(tColumn);
880  column.nulls.push_back(is_null && !ti.get_notnull());
881  }
882  }
883  } else {
884  CHECK(!ti.is_column());
885  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
886  CHECK(scalar_tv);
887  if (boost::get<int64_t>(scalar_tv)) {
888  int64_t data = *(boost::get<int64_t>(scalar_tv));
889 
890  if (ti.is_decimal()) {
891  double val = static_cast<double>(data);
892  if (ti.get_scale() > 0) {
893  val /= pow(10.0, std::abs(ti.get_scale()));
894  }
895  column.data.real_col.push_back(val);
896  } else {
897  column.data.int_col.push_back(data);
898  }
899 
900  switch (ti.get_type()) {
901  case kBOOLEAN:
902  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
903  break;
904  case kTINYINT:
905  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
906  break;
907  case kSMALLINT:
908  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
909  break;
910  case kINT:
911  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
912  break;
913  case kNUMERIC:
914  case kDECIMAL:
915  case kBIGINT:
916  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
917  break;
918  case kTIME:
919  case kTIMESTAMP:
920  case kDATE:
921  case kINTERVAL_DAY_TIME:
923  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
924  break;
925  default:
926  column.nulls.push_back(false);
927  }
928  } else if (boost::get<double>(scalar_tv)) {
929  double data = *(boost::get<double>(scalar_tv));
930  column.data.real_col.push_back(data);
931  if (ti.get_type() == kFLOAT) {
932  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
933  } else {
934  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
935  }
936  } else if (boost::get<float>(scalar_tv)) {
937  CHECK_EQ(kFLOAT, ti.get_type());
938  float data = *(boost::get<float>(scalar_tv));
939  column.data.real_col.push_back(data);
940  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
941  } else if (boost::get<NullableString>(scalar_tv)) {
942  auto s_n = boost::get<NullableString>(scalar_tv);
943  auto s = boost::get<std::string>(s_n);
944  if (s) {
945  column.data.str_col.push_back(*s);
946  } else {
947  column.data.str_col.emplace_back(""); // null string
948  auto null_p = boost::get<void*>(s_n);
949  CHECK(null_p && !*null_p);
950  }
951  column.nulls.push_back(!s && !ti.get_notnull());
952  } else {
953  CHECK(false);
954  }
955  }
956 }
957 
958 TDatum DBHandler::value_to_thrift(const TargetValue& tv, const SQLTypeInfo& ti) {
959  TDatum datum;
960  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
961  if (!scalar_tv) {
962  CHECK(ti.is_array());
963  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
964  CHECK(array_tv);
965  if (array_tv->is_initialized()) {
966  const auto& vec = array_tv->get();
967  for (const auto& elem_tv : vec) {
968  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
969  datum.val.arr_val.push_back(scalar_col_val);
970  }
971  // Datum is not null, at worst it's an empty array Datum
972  datum.is_null = false;
973  } else {
974  datum.is_null = true;
975  }
976  return datum;
977  }
978  if (boost::get<int64_t>(scalar_tv)) {
979  int64_t data = *(boost::get<int64_t>(scalar_tv));
980 
981  if (ti.is_decimal()) {
982  double val = static_cast<double>(data);
983  if (ti.get_scale() > 0) {
984  val /= pow(10.0, std::abs(ti.get_scale()));
985  }
986  datum.val.real_val = val;
987  } else {
988  datum.val.int_val = data;
989  }
990 
991  switch (ti.get_type()) {
992  case kBOOLEAN:
993  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
994  break;
995  case kTINYINT:
996  datum.is_null = (datum.val.int_val == NULL_TINYINT);
997  break;
998  case kSMALLINT:
999  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
1000  break;
1001  case kINT:
1002  datum.is_null = (datum.val.int_val == NULL_INT);
1003  break;
1004  case kDECIMAL:
1005  case kNUMERIC:
1006  case kBIGINT:
1007  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1008  break;
1009  case kTIME:
1010  case kTIMESTAMP:
1011  case kDATE:
1012  case kINTERVAL_DAY_TIME:
1013  case kINTERVAL_YEAR_MONTH:
1014  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1015  break;
1016  default:
1017  datum.is_null = false;
1018  }
1019  } else if (boost::get<double>(scalar_tv)) {
1020  datum.val.real_val = *(boost::get<double>(scalar_tv));
1021  if (ti.get_type() == kFLOAT) {
1022  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1023  } else {
1024  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
1025  }
1026  } else if (boost::get<float>(scalar_tv)) {
1027  CHECK_EQ(kFLOAT, ti.get_type());
1028  datum.val.real_val = *(boost::get<float>(scalar_tv));
1029  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1030  } else if (boost::get<NullableString>(scalar_tv)) {
1031  auto s_n = boost::get<NullableString>(scalar_tv);
1032  auto s = boost::get<std::string>(s_n);
1033  if (s) {
1034  datum.val.str_val = *s;
1035  } else {
1036  auto null_p = boost::get<void*>(s_n);
1037  CHECK(null_p && !*null_p);
1038  }
1039  datum.is_null = !s;
1040  } else {
1041  CHECK(false);
1042  }
1043  return datum;
1044 }
1045 
1047  TQueryResult& _return,
1048  const QueryStateProxy& query_state_proxy,
1049  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1050  const std::string& query_str,
1051  const bool column_format,
1052  const std::string& nonce,
1053  const int32_t first_n,
1054  const int32_t at_most_n,
1055  const bool use_calcite) {
1056  _return.total_time_ms = 0;
1057  _return.nonce = nonce;
1058  ParserWrapper pw{query_str};
1059  switch (pw.getQueryType()) {
1061  _return.query_type = TQueryType::READ;
1062  VLOG(1) << "query type: READ";
1063  break;
1064  }
1066  _return.query_type = TQueryType::WRITE;
1067  VLOG(1) << "query type: WRITE";
1068  break;
1069  }
1071  _return.query_type = TQueryType::SCHEMA_READ;
1072  VLOG(1) << "query type: SCHEMA READ";
1073  break;
1074  }
1076  _return.query_type = TQueryType::SCHEMA_WRITE;
1077  VLOG(1) << "query type: SCHEMA WRITE";
1078  break;
1079  }
1080  default: {
1081  _return.query_type = TQueryType::UNKNOWN;
1082  LOG(WARNING) << "query type: UNKNOWN";
1083  break;
1084  }
1085  }
1086 
1089  _return.total_time_ms += measure<>::execution([&]() {
1091  query_state_proxy,
1092  column_format,
1093  session_ptr->get_executor_device_type(),
1094  first_n,
1095  at_most_n,
1096  use_calcite,
1097  locks);
1099  _return, result, query_state_proxy, column_format, first_n, at_most_n);
1100  });
1101 }
1102 
1103 void DBHandler::convertData(TQueryResult& _return,
1105  const QueryStateProxy& query_state_proxy,
1106  const bool column_format,
1107  const int32_t first_n,
1108  const int32_t at_most_n) {
1109  _return.execution_time_ms += result.getExecutionTime();
1110  if (result.empty()) {
1111  return;
1112  }
1113 
1114  switch (result.getResultType()) {
1116  convertRows(_return,
1117  query_state_proxy,
1118  result.getTargetsMeta(),
1119  *result.getRows(),
1120  column_format,
1121  first_n,
1122  at_most_n);
1123  break;
1125  convertResult(_return, *result.getRows(), true);
1126  break;
1128  convertExplain(_return, *result.getRows(), true);
1129  break;
1131  convertRows(_return,
1132  query_state_proxy,
1133  result.getTargetsMeta(),
1134  *result.getRows(),
1135  column_format,
1136  -1,
1137  -1);
1138  break;
1139  }
1140 }
1141 
1142 void DBHandler::sql_execute(TQueryResult& _return,
1143  const TSessionId& session_id_or_json,
1144  const std::string& query_str,
1145  const bool column_format,
1146  const std::string& nonce,
1147  const int32_t first_n,
1148  const int32_t at_most_n) {
1149  heavyai::RequestInfo const request_info(session_id_or_json);
1150  SET_REQUEST_ID(request_info.requestId());
1151  const std::string exec_ra_prefix = "execute relalg";
1152  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1153  auto actual_query =
1154  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1155  auto session_ptr = get_session_ptr(request_info.sessionId());
1156  auto query_state = create_query_state(session_ptr, actual_query);
1157  auto stdlog = STDLOG(session_ptr, query_state);
1158  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1159  stdlog.appendNameValuePairs("nonce", nonce);
1160  auto timer = DEBUG_TIMER(__func__);
1161  try {
1162  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1163  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1164  };
1165 
1166  if (first_n >= 0 && at_most_n >= 0) {
1167  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1168  }
1169 
1170  if (leaf_aggregator_.leafCount() > 0) {
1171  if (!agg_handler_) {
1172  THROW_DB_EXCEPTION("Distributed support is disabled.");
1173  }
1174  _return.total_time_ms = measure<>::execution([&]() {
1175  agg_handler_->cluster_execute(_return,
1176  query_state->createQueryStateProxy(),
1177  query_state->getQueryStr(),
1178  column_format,
1179  nonce,
1180  first_n,
1181  at_most_n,
1183  });
1184  _return.nonce = nonce;
1185  } else {
1186  sql_execute_local(_return,
1187  query_state->createQueryStateProxy(),
1188  session_ptr,
1189  actual_query,
1190  column_format,
1191  nonce,
1192  first_n,
1193  at_most_n,
1194  use_calcite);
1195  }
1196  _return.total_time_ms += process_deferred_copy_from(request_info.sessionId());
1197  std::string debug_json = timer.stopAndGetJson();
1198  if (!debug_json.empty()) {
1199  _return.__set_debug(std::move(debug_json));
1200  }
1201  stdlog.appendNameValuePairs(
1202  "execution_time_ms",
1203  _return.execution_time_ms,
1204  "total_time_ms", // BE-3420 - Redundant with duration field
1205  stdlog.duration<std::chrono::milliseconds>());
1206  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1207  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1208  } catch (const std::exception& e) {
1209  if (strstr(e.what(), "java.lang.NullPointerException")) {
1210  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1211  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1212  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1213  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1214  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1215  } else {
1216  THROW_DB_EXCEPTION(e.what());
1217  }
1218  }
1219 }
1220 
1222  const TSessionId& session_id_or_json,
1223  const std::string& query_str,
1224  const bool column_format,
1225  const int32_t first_n,
1226  const int32_t at_most_n,
1228  heavyai::RequestInfo const request_info(session_id_or_json);
1229  SET_REQUEST_ID(request_info.requestId());
1230  const std::string exec_ra_prefix = "execute relalg";
1231  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1232  auto actual_query =
1233  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1234 
1235  auto session_ptr = get_session_ptr(request_info.sessionId());
1236  CHECK(session_ptr);
1237  auto query_state = create_query_state(session_ptr, actual_query);
1238  auto stdlog = STDLOG(session_ptr, query_state);
1239  auto timer = DEBUG_TIMER(__func__);
1240 
1241  try {
1242  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1243  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1244  };
1245 
1246  if (first_n >= 0 && at_most_n >= 0) {
1247  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1248  }
1249  auto total_time_ms = measure<>::execution([&]() {
1251  query_state->createQueryStateProxy(),
1252  column_format,
1253  session_ptr->get_executor_device_type(),
1254  first_n,
1255  at_most_n,
1256  use_calcite,
1257  locks);
1258  });
1259 
1260  _return.setExecutionTime(total_time_ms +
1261  process_deferred_copy_from(request_info.sessionId()));
1262 
1263  stdlog.appendNameValuePairs(
1264  "execution_time_ms",
1265  _return.getExecutionTime(),
1266  "total_time_ms", // BE-3420 - Redundant with duration field
1267  stdlog.duration<std::chrono::milliseconds>());
1268  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1269  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1270  } catch (const std::exception& e) {
1271  if (strstr(e.what(), "java.lang.NullPointerException")) {
1272  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1273  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1274  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1275  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1276  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1277  } else {
1278  THROW_DB_EXCEPTION(e.what());
1279  }
1280  }
1281 }
1282 
1283 int64_t DBHandler::process_deferred_copy_from(const TSessionId& session_id) {
1284  int64_t total_time_ms(0);
1285  // if the SQL statement we just executed was a geo COPY FROM, the import
1286  // parameters were captured, and this flag set, so we do the actual import here
1287  if (auto deferred_copy_from_state = deferred_copy_from_sessions(session_id)) {
1288  // import_geo_table() calls create_table() which calls this function to
1289  // do the work, so reset the flag now to avoid executing this part a
1290  // second time at the end of that, which would fail as the table was
1291  // already created! Also reset the flag with a ScopeGuard on exiting
1292  // this function any other way, such as an exception from the code above!
1293  deferred_copy_from_sessions.remove(session_id);
1294 
1295  // create table as replicated?
1296  TCreateParams create_params;
1297  if (deferred_copy_from_state->partitions == "REPLICATED") {
1298  create_params.is_replicated = true;
1299  }
1300 
1301  // now do (and time) the import
1302  total_time_ms = measure<>::execution([&]() {
1303  importGeoTableGlobFilterSort(session_id,
1304  deferred_copy_from_state->table,
1305  deferred_copy_from_state->file_name,
1306  deferred_copy_from_state->copy_params,
1307  TRowDescriptor(),
1308  create_params);
1309  });
1310  }
1311  return total_time_ms;
1312 }
1313 
1314 void DBHandler::sql_execute_df(TDataFrame& _return,
1315  const TSessionId& session_id_or_json,
1316  const std::string& query_str,
1317  const TDeviceType::type results_device_type,
1318  const int32_t device_id,
1319  const int32_t first_n,
1320  const TArrowTransport::type transport_method) {
1321  heavyai::RequestInfo const request_info(session_id_or_json);
1322  SET_REQUEST_ID(request_info.requestId());
1323  auto session_ptr = get_session_ptr(request_info.sessionId());
1324  CHECK(session_ptr);
1325  auto query_state = create_query_state(session_ptr, query_str);
1326  auto stdlog = STDLOG(session_ptr, query_state);
1327 
1328  const auto executor_device_type = session_ptr->get_executor_device_type();
1329 
1330  if (results_device_type == TDeviceType::GPU) {
1331  if (executor_device_type != ExecutorDeviceType::GPU) {
1332  THROW_DB_EXCEPTION(std::string("GPU mode is not allowed in this session"));
1333  }
1334  if (!data_mgr_->gpusPresent()) {
1335  THROW_DB_EXCEPTION(std::string("No GPU is available in this server"));
1336  }
1337  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1339  std::string("Invalid device_id or unavailable GPU with this ID"));
1340  }
1341  }
1342  ParserWrapper pw{query_str};
1343  if (pw.getQueryType() != ParserWrapper::QueryType::Read) {
1344  THROW_DB_EXCEPTION(std::string(
1345  "Only read queries supported for the Arrow sql_execute_df endpoint."));
1346  }
1347  if (ExplainInfo(query_str).isCalciteExplain()) {
1348  THROW_DB_EXCEPTION(std::string(
1349  "Explain is currently unsupported by the Arrow sql_execute_df endpoint."));
1350  }
1351 
1352  ExecutionResult execution_result;
1354  sql_execute_impl(execution_result,
1355  query_state->createQueryStateProxy(),
1356  true, /* column_format - does this do anything? */
1357  executor_device_type,
1358  first_n,
1359  -1, /* at_most_n */
1360  true,
1361  locks);
1362 
1363  const auto result_set = execution_result.getRows();
1364  const auto executor_results_device_type = results_device_type == TDeviceType::CPU
1367  _return.execution_time_ms =
1368  execution_result.getExecutionTime() - result_set->getQueueTime();
1369  const auto converter = std::make_unique<ArrowResultSetConverter>(
1370  result_set,
1371  data_mgr_,
1372  executor_results_device_type,
1373  device_id,
1374  getTargetNames(execution_result.getTargetsMeta()),
1375  first_n,
1376  ArrowTransport(transport_method));
1377  ArrowResult arrow_result;
1378  _return.arrow_conversion_time_ms +=
1379  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
1380  _return.sm_handle =
1381  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
1382  _return.sm_size = arrow_result.sm_size;
1383  _return.df_handle =
1384  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
1385  _return.df_buffer =
1386  std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
1387  if (executor_results_device_type == ExecutorDeviceType::GPU) {
1388  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1389  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
1390  ipc_handle_to_dev_ptr_.insert(
1391  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
1392  }
1393  _return.df_size = arrow_result.df_size;
1394 }
1395 
1396 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1397  const TSessionId& session_id_or_json,
1398  const std::string& query_str,
1399  const int32_t device_id,
1400  const int32_t first_n) {
1401  heavyai::RequestInfo request_info(session_id_or_json);
1402  SET_REQUEST_ID(request_info.requestId());
1403  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1404  request_info.setRequestId(logger::request_id());
1405  sql_execute_df(_return,
1406  request_info.json(),
1407  query_str,
1408  TDeviceType::GPU,
1409  device_id,
1410  first_n,
1411  TArrowTransport::SHARED_MEMORY);
1412 }
1413 
1414 // For now we have only one user of a data frame in all cases.
1415 void DBHandler::deallocate_df(const TSessionId& session_id_or_json,
1416  const TDataFrame& df,
1417  const TDeviceType::type device_type,
1418  const int32_t device_id) {
1419  heavyai::RequestInfo const request_info(session_id_or_json);
1420  SET_REQUEST_ID(request_info.requestId());
1421  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1422  std::string serialized_cuda_handle = "";
1423  if (device_type == TDeviceType::GPU) {
1424  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1425  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1426  TDBException ex;
1427  ex.error_msg = std::string(
1428  "Current data frame handle is not bookkept or been inserted "
1429  "twice");
1430  LOG(ERROR) << ex.error_msg;
1431  throw ex;
1432  }
1433  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1434  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1435  }
1436  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1437  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1439  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1441  result,
1442  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1443  device_id,
1444  data_mgr_);
1445 }
1446 
1447 void DBHandler::sql_validate(TRowDescriptor& _return,
1448  const TSessionId& session_id_or_json,
1449  const std::string& query_str) {
1450  heavyai::RequestInfo const request_info(session_id_or_json);
1451  SET_REQUEST_ID(request_info.requestId());
1452  try {
1453  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1454  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1455  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1456  stdlog.setQueryState(query_state);
1457 
1458  ParserWrapper pw{query_str};
1459  if (ExplainInfo(query_str).isExplain() || pw.is_ddl || pw.is_update_dml) {
1460  throw std::runtime_error("Can only validate SELECT statements.");
1461  }
1462 
1463  const auto execute_read_lock =
1467 
1468  TPlanResult parse_result;
1470  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1471  query_state->getQueryStr(),
1472  {},
1473  true,
1475  /*check_privileges=*/true);
1476  const auto query_ra = parse_result.plan_result;
1477 
1478  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1479  _return = fixup_row_descriptor(result.row_set.row_desc,
1480  query_state->getConstSessionInfo()->getCatalog());
1481  } catch (const std::exception& e) {
1482  THROW_DB_EXCEPTION(std::string(e.what()));
1483  }
1484 }
1485 
1486 namespace {
1487 
1489  std::unordered_set<std::string> uc_column_names;
1490  std::unordered_set<std::string> uc_column_table_qualifiers;
1491 };
1492 
1493 // Extract what looks like a (qualified) identifier from the partial query.
1494 // The results will be used to rank the auto-completion results: tables which
1495 // contain at least one of the identifiers first.
1497  const std::string& sql) {
1498  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1499  boost::regex::extended | boost::regex::icase};
1500  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1501  boost::sregex_token_iterator end;
1502  std::unordered_set<std::string> uc_column_names;
1503  std::unordered_set<std::string> uc_column_table_qualifiers;
1504  for (; tok_it != end; ++tok_it) {
1505  std::string column_name = *tok_it;
1506  std::vector<std::string> column_tokens;
1507  boost::split(column_tokens, column_name, boost::is_any_of("."));
1508  if (column_tokens.size() == 2) {
1509  // If the column name is qualified, take user's word.
1510  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1511  } else {
1512  uc_column_names.insert(to_upper(column_name));
1513  }
1514  }
1515  return {uc_column_names, uc_column_table_qualifiers};
1516 }
1517 
1518 } // namespace
1519 
1520 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1521  const TSessionId& session_id_or_json,
1522  const std::string& sql,
1523  const int cursor) {
1524  heavyai::RequestInfo const request_info(session_id_or_json);
1525  SET_REQUEST_ID(request_info.requestId());
1526  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1527  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1528  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1529  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1530  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1531  proj_tokens.uc_column_names, visible_tables, stdlog);
1532  // Add the table qualifiers explicitly specified by the user.
1533  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1534  proj_tokens.uc_column_table_qualifiers.end());
1535  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1536  std::sort(
1537  hints.begin(),
1538  hints.end(),
1539  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1540  if (lhs.type == TCompletionHintType::TABLE &&
1541  rhs.type == TCompletionHintType::TABLE) {
1542  // Between two tables, one which is compatible with the specified
1543  // projections and one which isn't, pick the one which is compatible.
1544  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1545  compatible_table_names.end() &&
1546  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1547  compatible_table_names.end()) {
1548  return true;
1549  }
1550  }
1551  return lhs.type < rhs.type;
1552  });
1553 }
1554 
1555 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1556  std::vector<std::string>& visible_tables,
1557  query_state::StdLog& stdlog,
1558  const std::string& sql,
1559  const int cursor) {
1560  const auto& session_info = *stdlog.getConstSessionInfo();
1561  try {
1562  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1563 
1564  // Filter out keywords suggested by Calcite which we don't support.
1566  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1567  } catch (const std::exception& e) {
1568  TDBException ex;
1569  ex.error_msg = std::string(e.what());
1570  LOG(ERROR) << ex.error_msg;
1571  throw ex;
1572  }
1573  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1574  const size_t length_to_cursor =
1575  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1576  // Trust hints from Calcite after the FROM keyword.
1577  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1578  return;
1579  }
1580  // Before FROM, the query is too incomplete for context-sensitive completions.
1581  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1582 }
1583 
1584 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1585  query_state::StdLog& stdlog,
1586  std::vector<std::string>& visible_tables,
1587  const std::string& sql,
1588  const int cursor) {
1589  const auto last_word =
1590  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1591  boost::regex select_expr{R"(\s*select\s+)",
1592  boost::regex::extended | boost::regex::icase};
1593  const size_t length_to_cursor =
1594  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1595  // After SELECT but before FROM, look for all columns in all tables which match the
1596  // prefix.
1597  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1598  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1599  // Trust the fully qualified columns the most.
1600  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1601  return;
1602  }
1603  // Not much information to use, just retrieve column names which match the prefix.
1604  if (should_suggest_column_hints(sql)) {
1605  get_column_hints(hints, last_word, column_names_by_table);
1606  return;
1607  }
1608  const std::string kFromKeyword{"FROM"};
1609  if (boost::istarts_with(kFromKeyword, last_word)) {
1610  TCompletionHint keyword_hint;
1611  keyword_hint.type = TCompletionHintType::KEYWORD;
1612  keyword_hint.replaced = last_word;
1613  keyword_hint.hints.emplace_back(kFromKeyword);
1614  hints.push_back(keyword_hint);
1615  }
1616  } else {
1617  const std::string kSelectKeyword{"SELECT"};
1618  if (boost::istarts_with(kSelectKeyword, last_word)) {
1619  TCompletionHint keyword_hint;
1620  keyword_hint.type = TCompletionHintType::KEYWORD;
1621  keyword_hint.replaced = last_word;
1622  keyword_hint.hints.emplace_back(kSelectKeyword);
1623  hints.push_back(keyword_hint);
1624  }
1625  }
1626 }
1627 
1628 std::unordered_map<std::string, std::unordered_set<std::string>>
1629 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1630  query_state::StdLog& stdlog) {
1631  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1632  for (auto it = table_names.begin(); it != table_names.end();) {
1633  TTableDetails table_details;
1634  try {
1635  get_table_details_impl(table_details, stdlog, *it, false, false);
1636  } catch (const TDBException& e) {
1637  // Remove the corrupted Table/View name from the list for further processing.
1638  it = table_names.erase(it);
1639  continue;
1640  }
1641  for (const auto& column_type : table_details.row_desc) {
1642  column_names_by_table[*it].emplace(column_type.col_name);
1643  }
1644  ++it;
1645  }
1646  return column_names_by_table;
1647 }
1648 
1652 }
1653 
1655  const std::unordered_set<std::string>& uc_column_names,
1656  std::vector<std::string>& table_names,
1657  query_state::StdLog& stdlog) {
1658  std::unordered_set<std::string> compatible_table_names_by_column;
1659  for (auto it = table_names.begin(); it != table_names.end();) {
1660  TTableDetails table_details;
1661  try {
1662  get_table_details_impl(table_details, stdlog, *it, false, false);
1663  } catch (const TDBException& e) {
1664  // Remove the corrupted Table/View name from the list for further processing.
1665  it = table_names.erase(it);
1666  continue;
1667  }
1668  for (const auto& column_type : table_details.row_desc) {
1669  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1670  compatible_table_names_by_column.emplace(to_upper(*it));
1671  break;
1672  }
1673  }
1674  ++it;
1675  }
1676  return compatible_table_names_by_column;
1677 }
1678 
1679 void DBHandler::dispatch_query_task(std::shared_ptr<QueryDispatchQueue::Task> query_task,
1680  const bool is_update_delete) {
1682  dispatch_queue_->submit(std::move(query_task), is_update_delete);
1683 }
1684 
1685 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1686  QueryStateProxy query_state_proxy) {
1687  TQueryResult _return;
1689  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1690  [this,
1691  &result,
1692  query_state_proxy,
1693  &query_ra,
1694  parent_thread_local_ids =
1695  logger::thread_local_ids()](const size_t executor_index) {
1696  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1697  execute_rel_alg(result,
1698  query_state_proxy,
1699  query_ra,
1700  true,
1702  -1,
1703  -1,
1704  /*just_validate=*/true,
1705  /*find_filter_push_down_candidates=*/false,
1706  ExplainInfo(),
1707  executor_index);
1708  });
1709  dispatch_query_task(execute_rel_alg_task, /*is_update_delete=*/false);
1710  auto result_future = execute_rel_alg_task->get_future();
1711  result_future.get();
1712  DBHandler::convertData(_return, result, query_state_proxy, true, -1, -1);
1713  return _return;
1714 }
1715 
1716 void DBHandler::get_roles(std::vector<std::string>& roles,
1717  const TSessionId& session_id_or_json) {
1718  heavyai::RequestInfo const request_info(session_id_or_json);
1719  SET_REQUEST_ID(request_info.requestId());
1720  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1721  auto session_ptr = stdlog.getConstSessionInfo();
1722  if (!session_ptr->get_currentUser().isSuper) {
1723  // WARNING: This appears to not include roles a user is a member of,
1724  // if the role has no permissions granted to it.
1725  roles =
1726  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1727  session_ptr->getCatalog().getCurrentDB().dbId);
1728  } else {
1729  roles = SysCatalog::instance().getRoles(
1730  false, true, session_ptr->get_currentUser().userName);
1731  }
1732 }
1733 
1734 bool DBHandler::has_role(const TSessionId& session_id_or_json,
1735  const std::string& granteeName,
1736  const std::string& roleName) {
1737  heavyai::RequestInfo const request_info(session_id_or_json);
1738  SET_REQUEST_ID(request_info.requestId());
1739  const auto session_ptr = get_session_ptr(request_info.sessionId());
1740  const auto stdlog = STDLOG(session_ptr);
1741  const auto current_user = session_ptr->get_currentUser();
1742  if (!current_user.isSuper) {
1743  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1744  user && current_user.userName != granteeName) {
1745  THROW_DB_EXCEPTION("Only super users can check other user's roles.");
1746  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1747  current_user.userName, granteeName, true)) {
1749  "Only super users can check roles assignment that have not been directly "
1750  "granted to a user.");
1751  }
1752  }
1753  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1754 }
1755 
1756 static TDBObject serialize_db_object(const std::string& roleName,
1757  const DBObject& inObject) {
1758  TDBObject outObject;
1759  outObject.objectName = inObject.getName();
1760  outObject.grantee = roleName;
1761  outObject.objectId = inObject.getObjectKey().objectId;
1762  const auto ap = inObject.getPrivileges();
1763  switch (inObject.getObjectKey().permissionType) {
1764  case DatabaseDBObjectType:
1765  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1766  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1767  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1768  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1769  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1770 
1771  break;
1772  case TableDBObjectType:
1773  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1774  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1775  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1776  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1777  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1778  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1779  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1780  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1781  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1782 
1783  break;
1784  case DashboardDBObjectType:
1785  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1786  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1787  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1788  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1789  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1790 
1791  break;
1792  case ViewDBObjectType:
1793  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1794  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1795  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1796  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1797  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1798  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1799  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1800 
1801  break;
1802  case ServerDBObjectType:
1803  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1804  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1805  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1806  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1807  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::SERVER_USAGE));
1808 
1809  break;
1810  default:
1811  CHECK(false);
1812  }
1813  const int type_val = static_cast<int>(inObject.getType());
1814  CHECK(type_val >= 0 && type_val < 6);
1815  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1816  return outObject;
1817 }
1818 
1820  const TDBObjectPermissions& permissions) {
1821  if (!permissions.__isset.database_permissions_) {
1822  THROW_DB_EXCEPTION("Database permissions not set for check.")
1823  }
1824  auto perms = permissions.database_permissions_;
1825  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1826  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1827  (perms.view_sql_editor_ &&
1829  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1830  return false;
1831  } else {
1832  return true;
1833  }
1834 }
1835 
1837  const TDBObjectPermissions& permissions) {
1838  if (!permissions.__isset.table_permissions_) {
1839  THROW_DB_EXCEPTION("Table permissions not set for check.")
1840  }
1841  auto perms = permissions.table_permissions_;
1842  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1843  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1844  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1845  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1846  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1847  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1848  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1849  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1850  return false;
1851  } else {
1852  return true;
1853  }
1854 }
1855 
1857  const TDBObjectPermissions& permissions) {
1858  if (!permissions.__isset.dashboard_permissions_) {
1859  THROW_DB_EXCEPTION("Dashboard permissions not set for check.")
1860  }
1861  auto perms = permissions.dashboard_permissions_;
1862  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1863  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1864  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1865  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1866  return false;
1867  } else {
1868  return true;
1869  }
1870 }
1871 
1873  const TDBObjectPermissions& permissions) {
1874  if (!permissions.__isset.view_permissions_) {
1875  THROW_DB_EXCEPTION("View permissions not set for check.")
1876  }
1877  auto perms = permissions.view_permissions_;
1878  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1879  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1880  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1881  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1882  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1883  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1884  return false;
1885  } else {
1886  return true;
1887  }
1888 }
1889 
1891  const TDBObjectPermissions& permissions) {
1892  CHECK(permissions.__isset.server_permissions_);
1893  auto perms = permissions.server_permissions_;
1894  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
1895  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
1896  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER)) ||
1897  (perms.usage_ && !privs.hasPermission(ServerPrivileges::SERVER_USAGE))) {
1898  return false;
1899  } else {
1900  return true;
1901  }
1902 }
1903 
1904 bool DBHandler::has_object_privilege(const TSessionId& session_id_or_json,
1905  const std::string& granteeName,
1906  const std::string& objectName,
1907  const TDBObjectType::type objectType,
1908  const TDBObjectPermissions& permissions) {
1909  heavyai::RequestInfo const request_info(session_id_or_json);
1910  SET_REQUEST_ID(request_info.requestId());
1911  auto session_ptr = get_session_ptr(request_info.sessionId());
1912  auto stdlog = STDLOG(session_ptr);
1913  auto const& cat = session_ptr->getCatalog();
1914  auto const& current_user = session_ptr->get_currentUser();
1915  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1916  current_user.userName, granteeName, false)) {
1918  "Users except superusers can only check privileges for self or roles granted "
1919  "to "
1920  "them.")
1921  }
1923  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1924  user_meta.isSuper) {
1925  return true;
1926  }
1927  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1928  if (!grnt) {
1929  THROW_DB_EXCEPTION("User or Role " + granteeName + " does not exist.")
1930  }
1932  std::string func_name;
1933  switch (objectType) {
1936  func_name = "database";
1937  break;
1940  func_name = "table";
1941  break;
1944  func_name = "dashboard";
1945  break;
1948  func_name = "view";
1949  break;
1952  func_name = "server";
1953  break;
1954  default:
1955  THROW_DB_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1956  }
1957  DBObject req_object(objectName, type);
1958  req_object.loadKey(cat);
1959 
1960  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1961  if (grantee_object) {
1962  // if grantee has privs on the object
1963  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1964  } else {
1965  // no privileges on that object
1966  return false;
1967  }
1968 }
1969 
1970 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1971  const TSessionId& session_id_or_json,
1972  const std::string& roleName) {
1973  heavyai::RequestInfo const request_info(session_id_or_json);
1974  SET_REQUEST_ID(request_info.requestId());
1975  auto session_ptr = get_session_ptr(request_info.sessionId());
1976  auto stdlog = STDLOG(session_ptr);
1977  auto const& user = session_ptr->get_currentUser();
1978  if (!user.isSuper &&
1979  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1980  return;
1981  }
1982  auto* rl = SysCatalog::instance().getGrantee(roleName);
1983  if (rl) {
1984  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1985  for (auto& dbObject : *rl->getDbObjects(true)) {
1986  if (dbObject.first.dbId != dbId) {
1987  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1988  // usecase for now, though)
1989  continue;
1990  }
1991  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1992  TDBObjectsForRole.push_back(tdbObject);
1993  }
1994  } else {
1995  THROW_DB_EXCEPTION("User or role " + roleName + " does not exist.");
1996  }
1997 }
1998 
1999 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
2000  const TSessionId& session_id_or_json,
2001  const std::string& objectName,
2002  const TDBObjectType::type type) {
2003  heavyai::RequestInfo const request_info(session_id_or_json);
2004  SET_REQUEST_ID(request_info.requestId());
2005  auto session_ptr = get_session_ptr(request_info.sessionId());
2006  auto stdlog = STDLOG(session_ptr);
2007  const auto& cat = session_ptr->getCatalog();
2008  DBObjectType object_type;
2009  switch (type) {
2011  object_type = DBObjectType::DatabaseDBObjectType;
2012  break;
2014  object_type = DBObjectType::TableDBObjectType;
2015  break;
2018  break;
2020  object_type = DBObjectType::ViewDBObjectType;
2021  break;
2023  object_type = DBObjectType::ServerDBObjectType;
2024  break;
2025  default:
2026  THROW_DB_EXCEPTION("Failed to get object privileges for " + objectName +
2027  ": unknown object type (" + std::to_string(type) + ").");
2028  }
2029  DBObject object_to_find(objectName, object_type);
2030 
2031  // TODO(adb): Use DatabaseLock to protect method
2032  try {
2033  if (object_type == DashboardDBObjectType) {
2034  if (objectName == "") {
2035  object_to_find = DBObject(-1, object_type);
2036  } else {
2037  object_to_find = DBObject(std::stoi(objectName), object_type);
2038  }
2039  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
2040  !objectName.empty()) {
2041  // special handling for view / table
2042  auto td = cat.getMetadataForTable(objectName, false);
2043  if (td) {
2044  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
2045  object_to_find = DBObject(objectName, object_type);
2046  }
2047  }
2048  object_to_find.loadKey(cat);
2049  } catch (const std::exception&) {
2050  THROW_DB_EXCEPTION("Object with name " + objectName + " does not exist.");
2051  }
2052 
2053  // object type on database level
2054  DBObject object_to_find_dblevel("", object_type);
2055  object_to_find_dblevel.loadKey(cat);
2056  // if user is superuser respond with a full priv
2057  if (session_ptr->get_currentUser().isSuper) {
2058  // using ALL_TABLE here to set max permissions
2059  DBObject dbObj{object_to_find.getObjectKey(),
2061  session_ptr->get_currentUser().userId};
2062  dbObj.setName("super");
2063  TDBObjects.push_back(
2064  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
2065  };
2066 
2067  std::vector<std::string> grantees =
2068  SysCatalog::instance().getRoles(true,
2069  session_ptr->get_currentUser().isSuper,
2070  session_ptr->get_currentUser().userName);
2071  for (const auto& grantee : grantees) {
2072  DBObject* object_found;
2073  auto* gr = SysCatalog::instance().getGrantee(grantee);
2074  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
2075  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2076  }
2077  // check object permissions on Database level
2078  if (gr &&
2079  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
2080  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2081  }
2082  }
2083 }
2084 
2086  std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr,
2087  std::vector<std::string>& roles,
2088  const std::string& granteeName,
2089  bool effective) {
2090  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
2091  if (grantee) {
2092  if (session_ptr->get_currentUser().isSuper) {
2093  roles = grantee->getRoles(/*only_direct=*/!effective);
2094  } else if (grantee->isUser()) {
2095  if (session_ptr->get_currentUser().userName == granteeName) {
2096  roles = grantee->getRoles(/*only_direct=*/!effective);
2097  } else {
2099  "Only a superuser is authorized to request list of roles granted to another "
2100  "user.");
2101  }
2102  } else {
2103  CHECK(!grantee->isUser());
2104  // granteeName is actually a roleName here and we can check a role
2105  // only if it is granted to us
2106  if (SysCatalog::instance().isRoleGrantedToGrantee(
2107  session_ptr->get_currentUser().userName, granteeName, false)) {
2108  roles = grantee->getRoles(/*only_direct=*/!effective);
2109  } else {
2110  THROW_DB_EXCEPTION("A user can check only roles granted to him.");
2111  }
2112  }
2113  } else {
2114  THROW_DB_EXCEPTION("Grantee " + granteeName + " does not exist.");
2115  }
2116 }
2117 
2118 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
2119  const TSessionId& session_id_or_json,
2120  const std::string& granteeName) {
2121  // WARNING: This function only returns directly granted roles.
2122  // See also: get_all_effective_roles_for_user() for all of a user's roles.
2123  heavyai::RequestInfo const request_info(session_id_or_json);
2124  SET_REQUEST_ID(request_info.requestId());
2125  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2126  auto session_ptr = stdlog.getConstSessionInfo();
2127  getAllRolesForUserImpl(session_ptr, roles, granteeName, /*effective=*/false);
2128 }
2129 
2130 void DBHandler::get_all_effective_roles_for_user(std::vector<std::string>& roles,
2131  const TSessionId& session_id_or_json,
2132  const std::string& granteeName) {
2133  heavyai::RequestInfo const request_info(session_id_or_json);
2134  SET_REQUEST_ID(request_info.requestId());
2135  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2136  auto session_ptr = stdlog.getConstSessionInfo();
2137  getAllRolesForUserImpl(session_ptr, roles, granteeName, /*effective=*/true);
2138 }
2139 
2140 namespace {
2142  const std::map<std::string, std::vector<std::string>>& table_col_names) {
2143  std::ostringstream oss;
2144  for (const auto& [table_name, col_names] : table_col_names) {
2145  oss << ":" << table_name;
2146  for (const auto& col_name : col_names) {
2147  oss << "," << col_name;
2148  }
2149  }
2150  return oss.str();
2151 }
2152 } // namespace
2153 
2155  TPixelTableRowResult& _return,
2156  const TSessionId& session_id_or_json,
2157  const int64_t widget_id,
2158  const TPixel& pixel,
2159  const std::map<std::string, std::vector<std::string>>& table_col_names,
2160  const bool column_format,
2161  const int32_t pixel_radius,
2162  const std::string& nonce) {
2163  heavyai::RequestInfo const request_info(session_id_or_json);
2164  SET_REQUEST_ID(request_info.requestId());
2165  auto session_ptr = get_session_ptr(request_info.sessionId());
2166  auto stdlog = STDLOG(session_ptr,
2167  "widget_id",
2168  widget_id,
2169  "pixel.x",
2170  pixel.x,
2171  "pixel.y",
2172  pixel.y,
2173  "column_format",
2174  column_format,
2175  "pixel_radius",
2176  pixel_radius,
2177  "table_col_names",
2178  dump_table_col_names(table_col_names),
2179  "nonce",
2180  nonce);
2181  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2182  if (!render_handler_) {
2183  THROW_DB_EXCEPTION("Backend rendering is disabled.");
2184  }
2185 
2186  try {
2187  render_handler_->get_result_row_for_pixel(_return,
2188  session_ptr,
2189  widget_id,
2190  pixel,
2191  table_col_names,
2192  column_format,
2193  pixel_radius,
2194  nonce);
2195  } catch (std::exception& e) {
2196  THROW_DB_EXCEPTION(e.what());
2197  }
2198 }
2199 
2201  const ColumnDescriptor* cd) {
2202  TColumnType col_type;
2203  col_type.col_name = cd->columnName;
2204  col_type.src_name = cd->sourceName;
2205  col_type.col_id = cd->columnId;
2206  col_type.col_type.type = type_to_thrift(cd->columnType);
2207  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
2208  col_type.col_type.nullable = !cd->columnType.get_notnull();
2209  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
2210  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
2211  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
2212  }
2213  if (IS_GEO(cd->columnType.get_type())) {
2215  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
2216  } else {
2217  col_type.col_type.precision = cd->columnType.get_precision();
2218  col_type.col_type.scale = cd->columnType.get_scale();
2219  }
2220  col_type.is_system = cd->isSystemCol;
2222  cat != nullptr) {
2223  // have to get the actual size of the encoding from the dictionary definition
2224  const int dict_id = cd->columnType.get_comp_param();
2225  if (!cat->getMetadataForDict(dict_id, false)) {
2226  col_type.col_type.comp_param = 0;
2227  return col_type;
2228  }
2229  auto dd = cat->getMetadataForDict(dict_id, false);
2230  if (!dd) {
2231  THROW_DB_EXCEPTION("Dictionary doesn't exist");
2232  }
2233  col_type.col_type.comp_param = dd->dictNBits;
2234  } else {
2235  col_type.col_type.comp_param =
2236  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
2237  ? 32
2238  : cd->columnType.get_comp_param();
2239  }
2240  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
2241  if (cd->default_value.has_value()) {
2242  col_type.__set_default_value(cd->getDefaultValueLiteral());
2243  }
2244  return col_type;
2245 }
2246 
2247 void DBHandler::get_internal_table_details(TTableDetails& _return,
2248  const TSessionId& session_id_or_json,
2249  const std::string& table_name,
2250  const bool include_system_columns) {
2251  heavyai::RequestInfo const request_info(session_id_or_json);
2252  SET_REQUEST_ID(request_info.requestId());
2253  auto stdlog =
2254  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2255  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2256  get_table_details_impl(_return, stdlog, table_name, include_system_columns, false);
2257 }
2258 
2260  TTableDetails& _return,
2261  const TSessionId& session_id_or_json,
2262  const std::string& table_name,
2263  const std::string& database_name) {
2264  heavyai::RequestInfo const request_info(session_id_or_json);
2265  SET_REQUEST_ID(request_info.requestId());
2266  auto stdlog =
2267  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2268  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2269  get_table_details_impl(_return, stdlog, table_name, true, false, database_name);
2270 }
2271 
2272 void DBHandler::get_table_details(TTableDetails& _return,
2273  const TSessionId& session_id_or_json,
2274  const std::string& table_name) {
2275  heavyai::RequestInfo const request_info(session_id_or_json);
2276  SET_REQUEST_ID(request_info.requestId());
2277  auto stdlog =
2278  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2279  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2280 
2281  auto execute_read_lock =
2285  get_table_details_impl(_return, stdlog, table_name, false, false);
2286 }
2287 
2288 void DBHandler::get_table_details_for_database(TTableDetails& _return,
2289  const TSessionId& session_id_or_json,
2290  const std::string& table_name,
2291  const std::string& database_name) {
2292  heavyai::RequestInfo const request_info(session_id_or_json);
2293  SET_REQUEST_ID(request_info.requestId());
2294  auto stdlog =
2295  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2296  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2297 
2298  auto execute_read_lock =
2302  get_table_details_impl(_return, stdlog, table_name, false, false, database_name);
2303 }
2304 
2305 namespace {
2306 TTableRefreshInfo get_refresh_info(const TableDescriptor* td) {
2307  CHECK(td->isForeignTable());
2308  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
2309  CHECK(foreign_table);
2310  TTableRefreshInfo refresh_info;
2311  const auto& update_type =
2313  CHECK(update_type.has_value());
2314  if (update_type.value() == foreign_storage::ForeignTable::ALL_REFRESH_UPDATE_TYPE) {
2315  refresh_info.update_type = TTableRefreshUpdateType::ALL;
2316  } else if (update_type.value() ==
2318  refresh_info.update_type = TTableRefreshUpdateType::APPEND;
2319  } else {
2320  UNREACHABLE() << "Unexpected refresh update type: " << update_type.value();
2321  }
2322 
2323  const auto& timing_type =
2325  CHECK(timing_type.has_value());
2326  if (timing_type.value() == foreign_storage::ForeignTable::MANUAL_REFRESH_TIMING_TYPE) {
2327  refresh_info.timing_type = TTableRefreshTimingType::MANUAL;
2328  refresh_info.interval_count = -1;
2329  } else if (timing_type.value() ==
2331  refresh_info.timing_type = TTableRefreshTimingType::SCHEDULED;
2332  const auto& start_date_time = foreign_table->getOption(
2334  CHECK(start_date_time.has_value());
2335  auto start_date_time_epoch = dateTimeParse<kTIMESTAMP>(start_date_time.value(), 0);
2336  refresh_info.start_date_time =
2337  shared::convert_temporal_to_iso_format({kTIMESTAMP}, start_date_time_epoch);
2338  const auto& interval =
2339  foreign_table->getOption(foreign_storage::ForeignTable::REFRESH_INTERVAL_KEY);
2340  CHECK(interval.has_value());
2341  const auto& interval_str = interval.value();
2342  refresh_info.interval_count =
2343  std::stoi(interval_str.substr(0, interval_str.length() - 1));
2344  auto interval_type = std::toupper(interval_str[interval_str.length() - 1]);
2345  if (interval_type == 'H') {
2346  refresh_info.interval_type = TTableRefreshIntervalType::HOUR;
2347  } else if (interval_type == 'D') {
2348  refresh_info.interval_type = TTableRefreshIntervalType::DAY;
2349  } else if (interval_type == 'S') {
2350  // This use case is for development only.
2351  refresh_info.interval_type = TTableRefreshIntervalType::NONE;
2352  } else {
2353  UNREACHABLE() << "Unexpected interval type: " << interval_str;
2354  }
2355  } else {
2356  UNREACHABLE() << "Unexpected refresh timing type: " << timing_type.value();
2357  }
2358  if (foreign_table->last_refresh_time !=
2360  refresh_info.last_refresh_time = shared::convert_temporal_to_iso_format(
2361  {kTIMESTAMP}, foreign_table->last_refresh_time);
2362  }
2363  if (foreign_table->next_refresh_time !=
2365  refresh_info.next_refresh_time = shared::convert_temporal_to_iso_format(
2366  {kTIMESTAMP}, foreign_table->next_refresh_time);
2367  }
2368  return refresh_info;
2369 }
2370 } // namespace
2371 
2372 void DBHandler::get_table_details_impl(TTableDetails& _return,
2373  query_state::StdLog& stdlog,
2374  const std::string& table_name,
2375  const bool get_system,
2376  const bool get_physical,
2377  const std::string& database_name) {
2378  try {
2379  auto session_info = stdlog.getSessionInfo();
2380  auto cat = (database_name.empty())
2381  ? &session_info->getCatalog()
2382  : SysCatalog::instance().getCatalog(database_name).get();
2383  if (!cat) {
2384  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2385  }
2386  const auto td_with_lock =
2388  *cat, table_name, false);
2389  const auto td = td_with_lock();
2390  CHECK(td);
2391 
2392  bool have_privileges_on_view_sources = true;
2393  if (td->isView) {
2394  auto query_state = create_query_state(session_info, td->viewSQL);
2395  stdlog.setQueryState(query_state);
2396  try {
2397  if (hasTableAccessPrivileges(td, *session_info)) {
2398  const auto [query_ra, locks] = parse_to_ra(query_state->createQueryStateProxy(),
2399  query_state->getQueryStr(),
2400  {},
2401  true,
2403  false);
2404  try {
2405  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2406  query_ra);
2407  } catch (const std::runtime_error&) {
2408  have_privileges_on_view_sources = false;
2409  }
2410 
2411  const auto result = validate_rel_alg(query_ra.plan_result,
2412  query_state->createQueryStateProxy());
2413 
2414  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, *cat);
2415  } else {
2416  throw std::runtime_error(
2417  "Unable to access view " + table_name +
2418  ". The view may not exist, or the logged in user may not "
2419  "have permission to access the view.");
2420  }
2421  } catch (const std::exception& e) {
2422  throw std::runtime_error("View '" + table_name +
2423  "' query has failed with an error: '" +
2424  std::string(e.what()) +
2425  "'.\nThe view must be dropped and re-created to "
2426  "resolve the error. \nQuery:\n" +
2427  query_state->getQueryStr());
2428  }
2429  } else {
2430  if (hasTableAccessPrivileges(td, *session_info)) {
2431  const auto col_descriptors = cat->getAllColumnMetadataForTable(
2432  td->tableId, get_system, true, get_physical);
2433  const auto deleted_cd = cat->getDeletedColumn(td);
2434  for (const auto cd : col_descriptors) {
2435  if (cd == deleted_cd) {
2436  continue;
2437  }
2438  _return.row_desc.push_back(populateThriftColumnType(cat, cd));
2439  }
2440  } else {
2441  throw std::runtime_error(
2442  "Unable to access table " + table_name +
2443  ". The table may not exist, or the logged in user may not "
2444  "have permission to access the table.");
2445  }
2446  }
2447  _return.fragment_size = td->maxFragRows;
2448  _return.page_size = td->fragPageSize;
2449  _return.max_rows = td->maxRows;
2450  _return.view_sql =
2451  (have_privileges_on_view_sources ? td->viewSQL
2452  : "[Not enough privileges to see the view SQL]");
2453  _return.shard_count = td->nShards;
2454  _return.key_metainfo = td->keyMetainfo;
2455  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2456  _return.partition_detail =
2457  td->partitions.empty()
2458  ? TPartitionDetail::DEFAULT
2459  : (table_is_replicated(td)
2460  ? TPartitionDetail::REPLICATED
2461  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2462  : TPartitionDetail::OTHER));
2463  if (td->isView) {
2464  _return.table_type = TTableType::VIEW;
2465  } else if (td->isTemporaryTable()) {
2466  _return.table_type = TTableType::TEMPORARY;
2467  } else if (td->isForeignTable()) {
2468  _return.table_type = TTableType::FOREIGN;
2469  _return.refresh_info = get_refresh_info(td);
2470  } else {
2471  _return.table_type = TTableType::DEFAULT;
2472  }
2473 
2474  } catch (const std::runtime_error& e) {
2475  THROW_DB_EXCEPTION(std::string(e.what()));
2476  }
2477 }
2478 
2479 void DBHandler::get_link_view(TFrontendView& _return,
2480  const TSessionId& session_id_or_json,
2481  const std::string& link) {
2482  heavyai::RequestInfo const request_info(session_id_or_json);
2483  SET_REQUEST_ID(request_info.requestId());
2484  auto session_ptr = get_session_ptr(request_info.sessionId());
2485  auto stdlog = STDLOG(session_ptr);
2486  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2487  auto const& cat = session_ptr->getCatalog();
2488  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2489  if (!ld) {
2490  THROW_DB_EXCEPTION("Link " + link + " is not valid.");
2491  }
2492  _return.view_state = ld->viewState;
2493  _return.view_name = ld->link;
2494  _return.update_time = ld->updateTime;
2495  _return.view_metadata = ld->viewMetadata;
2496 }
2497 
2499  const TableDescriptor* td,
2500  const Catalog_Namespace::SessionInfo& session_info) {
2501  auto& cat = session_info.getCatalog();
2502  auto user_metadata = session_info.get_currentUser();
2503 
2504  if (user_metadata.isSuper) {
2505  return true;
2506  }
2507 
2509  dbObject.loadKey(cat);
2510  std::vector<DBObject> privObjects = {dbObject};
2511 
2512  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2513 }
2514 
2515 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2516  const Catalog_Namespace::SessionInfo& session_info,
2517  const GetTablesType get_tables_type,
2518  const std::string& database_name) {
2519  if (database_name.empty()) {
2520  table_names = session_info.getCatalog().getTableNamesForUser(
2521  session_info.get_currentUser(), get_tables_type);
2522  } else {
2523  auto request_cat = SysCatalog::instance().getCatalog(database_name);
2524  if (!request_cat) {
2525  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2526  }
2527  table_names = request_cat->getTableNamesForUser(session_info.get_currentUser(),
2528  get_tables_type);
2529  }
2530 }
2531 
2532 void DBHandler::get_tables(std::vector<std::string>& table_names,
2533  const TSessionId& session_id_or_json) {
2534  heavyai::RequestInfo const request_info(session_id_or_json);
2535  SET_REQUEST_ID(request_info.requestId());
2536  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2537  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2539  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2540 }
2541 
2542 void DBHandler::get_tables_for_database(std::vector<std::string>& table_names,
2543  const TSessionId& session_id_or_json,
2544  const std::string& database_name) {
2545  heavyai::RequestInfo const request_info(session_id_or_json);
2546  SET_REQUEST_ID(request_info.requestId());
2547  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2548  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2549 
2550  get_tables_impl(table_names,
2551  *stdlog.getConstSessionInfo(),
2553  database_name);
2554 }
2555 
2556 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2557  const TSessionId& session_id_or_json) {
2558  heavyai::RequestInfo const request_info(session_id_or_json);
2559  SET_REQUEST_ID(request_info.requestId());
2560  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2561  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2562  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2563 }
2564 
2565 void DBHandler::get_views(std::vector<std::string>& table_names,
2566  const TSessionId& session_id_or_json) {
2567  heavyai::RequestInfo const request_info(session_id_or_json);
2568  SET_REQUEST_ID(request_info.requestId());
2569  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2570  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2571  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2572 }
2573 
2574 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2575  QueryStateProxy query_state_proxy,
2576  const Catalog_Namespace::SessionInfo& session_info,
2577  const bool with_table_locks) {
2578  const auto& cat = session_info.getCatalog();
2579  // Get copies of table descriptors here in order to avoid possible use of dangling
2580  // pointers, if tables are concurrently dropped.
2581  const auto tables = cat.getAllTableMetadataCopy();
2582  _return.reserve(tables.size());
2583 
2584  for (const auto& td : tables) {
2585  if (td.shard >= 0) {
2586  // skip shards, they're not standalone tables
2587  continue;
2588  }
2589  if (!hasTableAccessPrivileges(&td, session_info)) {
2590  // skip table, as there are no privileges to access it
2591  continue;
2592  }
2593 
2594  TTableMeta ret;
2595  ret.table_name = td.tableName;
2596  ret.is_view = td.isView;
2597  ret.is_replicated = table_is_replicated(&td);
2598  ret.shard_count = td.nShards;
2599  ret.max_rows = td.maxRows;
2600  ret.table_id = td.tableId;
2601 
2602  std::vector<TTypeInfo> col_types;
2603  std::vector<std::string> col_names;
2604  size_t num_cols = 0;
2605  if (td.isView) {
2606  try {
2607  TPlanResult parse_result;
2609  std::tie(parse_result, locks) = parse_to_ra(
2610  query_state_proxy, td.viewSQL, {}, with_table_locks, system_parameters_);
2611  const auto query_ra = parse_result.plan_result;
2612 
2613  ExecutionResult ex_result;
2614  execute_rel_alg(ex_result,
2615  query_state_proxy,
2616  query_ra,
2617  true,
2619  -1,
2620  -1,
2621  /*just_validate=*/true,
2622  /*find_push_down_candidates=*/false,
2623  ExplainInfo());
2624  TQueryResult result;
2625  DBHandler::convertData(result, ex_result, query_state_proxy, true, -1, -1);
2626  num_cols = result.row_set.row_desc.size();
2627  for (const auto& col : result.row_set.row_desc) {
2628  if (col.is_physical) {
2629  num_cols--;
2630  continue;
2631  }
2632  col_types.push_back(col.col_type);
2633  col_names.push_back(col.col_name);
2634  }
2635  } catch (std::exception& e) {
2636  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td.tableName;
2637  }
2638  } else {
2639  try {
2640  if (hasTableAccessPrivileges(&td, session_info)) {
2641  const auto col_descriptors =
2642  cat.getAllColumnMetadataForTable(td.tableId, false, true, false);
2643  const auto deleted_cd = cat.getDeletedColumn(&td);
2644  for (const auto cd : col_descriptors) {
2645  if (cd == deleted_cd) {
2646  continue;
2647  }
2648  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2649  col_names.push_back(cd->columnName);
2650  }
2651  num_cols = col_descriptors.size();
2652  } else {
2653  continue;
2654  }
2655  } catch (const std::runtime_error& e) {
2656  THROW_DB_EXCEPTION(e.what());
2657  }
2658  }
2659 
2660  ret.num_cols = num_cols;
2661  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2662  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2663 
2664  _return.push_back(ret);
2665  }
2666 }
2667 
2668 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2669  const TSessionId& session_id_or_json) {
2670  heavyai::RequestInfo const request_info(session_id_or_json);
2671  SET_REQUEST_ID(request_info.requestId());
2672  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2673  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2674  auto session_ptr = stdlog.getConstSessionInfo();
2675  auto query_state = create_query_state(session_ptr, "");
2676  stdlog.setQueryState(query_state);
2677 
2678  auto execute_read_lock =
2682 
2683  try {
2684  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2685  } catch (const std::exception& e) {
2686  THROW_DB_EXCEPTION(e.what());
2687  }
2688 }
2689 
2690 void DBHandler::get_users(std::vector<std::string>& user_names,
2691  const TSessionId& session_id_or_json) {
2692  heavyai::RequestInfo const request_info(session_id_or_json);
2693  SET_REQUEST_ID(request_info.requestId());
2694  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2695  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2696  auto session_ptr = stdlog.getConstSessionInfo();
2697  std::list<Catalog_Namespace::UserMetadata> user_list;
2698 
2699  if (!session_ptr->get_currentUser().isSuper) {
2700  user_list = SysCatalog::instance().getAllUserMetadata(
2701  session_ptr->getCatalog().getCurrentDB().dbId);
2702  } else {
2703  user_list = SysCatalog::instance().getAllUserMetadata();
2704  }
2705  for (auto u : user_list) {
2706  user_names.push_back(u.userName);
2707  }
2708 }
2709 
2710 void DBHandler::get_version(std::string& version) {
2711  version = MAPD_RELEASE;
2712 }
2713 
2714 void DBHandler::clear_gpu_memory(const TSessionId& session_id_or_json) {
2715  heavyai::RequestInfo const request_info(session_id_or_json);
2716  SET_REQUEST_ID(request_info.requestId());
2717  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2718  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2719  auto session_ptr = stdlog.getConstSessionInfo();
2720  if (!session_ptr->get_currentUser().isSuper) {
2721  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2722  }
2723  try {
2725  } catch (const std::exception& e) {
2726  THROW_DB_EXCEPTION(e.what());
2727  }
2728  if (render_handler_) {
2729  render_handler_->clear_gpu_memory();
2730  }
2731 }
2732 
2733 void DBHandler::clear_cpu_memory(const TSessionId& session_id_or_json) {
2734  heavyai::RequestInfo const request_info(session_id_or_json);
2735  SET_REQUEST_ID(request_info.requestId());
2736  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2737  auto session_ptr = stdlog.getConstSessionInfo();
2738  if (!session_ptr->get_currentUser().isSuper) {
2739  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2740  }
2741  try {
2743  } catch (const std::exception& e) {
2744  THROW_DB_EXCEPTION(e.what());
2745  }
2746  if (render_handler_) {
2747  render_handler_->clear_cpu_memory();
2748  }
2749 }
2750 
2751 void DBHandler::clearRenderMemory(const TSessionId& session_id_or_json) {
2752  heavyai::RequestInfo const request_info(session_id_or_json);
2753  SET_REQUEST_ID(request_info.requestId());
2754  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2755  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2756  auto session_ptr = stdlog.getConstSessionInfo();
2757  if (!session_ptr->get_currentUser().isSuper) {
2758  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_render_memory");
2759  }
2760  if (render_handler_) {
2761  render_handler_->clear_cpu_memory();
2762  render_handler_->clear_gpu_memory();
2763  }
2764 }
2765 
2766 void DBHandler::set_cur_session(const TSessionId& parent_session_id_or_json,
2767  const TSessionId& leaf_session_id_or_json,
2768  const std::string& start_time_str,
2769  const std::string& label,
2770  bool for_running_query_kernel) {
2771  // internal API to manage query interruption in distributed mode
2772  heavyai::RequestInfo const parent_request_info(parent_session_id_or_json);
2773  heavyai::RequestInfo const leaf_request_info(leaf_session_id_or_json);
2774  SET_REQUEST_ID(leaf_request_info.requestId());
2775  auto stdlog = STDLOG(get_session_ptr(leaf_request_info.sessionId()));
2776  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2777  auto session_ptr = stdlog.getConstSessionInfo();
2778 
2780  executor->enrollQuerySession(parent_request_info.sessionId(),
2781  label,
2782  start_time_str,
2784  for_running_query_kernel
2785  ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2786  : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
2787 }
2788 
2789 void DBHandler::invalidate_cur_session(const TSessionId& parent_session_id_or_json,
2790  const TSessionId& leaf_session_id_or_json,
2791  const std::string& start_time_str,
2792  const std::string& label,
2793  bool for_running_query_kernel) {
2794  // internal API to manage query interruption in distributed mode
2795  heavyai::RequestInfo const parent_request_info(parent_session_id_or_json);
2796  heavyai::RequestInfo const leaf_request_info(leaf_session_id_or_json);
2797  SET_REQUEST_ID(leaf_request_info.requestId());
2798  auto stdlog = STDLOG(get_session_ptr(leaf_request_info.sessionId()));
2799  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2801  executor->clearQuerySessionStatus(parent_request_info.sessionId(), start_time_str);
2802 }
2803 
2805  return INVALID_SESSION_ID;
2806 }
2807 
2808 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2809  const TSessionId& session_id_or_json,
2810  const std::string& memory_level) {
2811  heavyai::RequestInfo const request_info(session_id_or_json);
2812  SET_REQUEST_ID(request_info.requestId());
2813  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2814  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2815  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2816  if (!memory_level.compare("gpu")) {
2817  internal_memory =
2818  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2819  } else {
2820  internal_memory =
2821  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2822  }
2823 
2824  for (auto memInfo : internal_memory) {
2825  TNodeMemoryInfo nodeInfo;
2826  nodeInfo.page_size = memInfo.pageSize;
2827  nodeInfo.max_num_pages = memInfo.maxNumPages;
2828  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2829  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2830  for (auto gpu : memInfo.nodeMemoryData) {
2831  TMemoryData md;
2832  md.slab = gpu.slabNum;
2833  md.start_page = gpu.startPage;
2834  md.num_pages = gpu.numPages;
2835  md.touch = gpu.touch;
2836  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2837  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2838  nodeInfo.node_memory_data.push_back(md);
2839  }
2840  _return.push_back(nodeInfo);
2841  }
2842 }
2843 
2844 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos,
2845  const TSessionId& session_id_or_json) {
2846  heavyai::RequestInfo const request_info(session_id_or_json);
2847  SET_REQUEST_ID(request_info.requestId());
2848  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2849  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2850  auto session_ptr = stdlog.getConstSessionInfo();
2851  const auto& user = session_ptr->get_currentUser();
2853  SysCatalog::instance().getDatabaseListForUser(user);
2854  for (auto& db : dbs) {
2855  TDBInfo dbinfo;
2856  dbinfo.db_name = std::move(db.dbName);
2857  dbinfo.db_owner = std::move(db.dbOwnerName);
2858  dbinfos.push_back(std::move(dbinfo));
2859  }
2860 }
2861 
2862 TExecuteMode::type DBHandler::getExecutionMode(const TSessionId& session_id) {
2863  auto executor = get_session_ptr(session_id)->get_executor_device_type();
2864  switch (executor) {
2866  return TExecuteMode::CPU;
2868  return TExecuteMode::GPU;
2869  default:
2870  UNREACHABLE();
2871  }
2872  UNREACHABLE();
2873  return TExecuteMode::CPU;
2874 }
2875 void DBHandler::set_execution_mode(const TSessionId& session_id_or_json,
2876  const TExecuteMode::type mode) {
2877  heavyai::RequestInfo const request_info(session_id_or_json);
2878  SET_REQUEST_ID(request_info.requestId());
2879  auto session_ptr = get_session_ptr(request_info.sessionId());
2880  auto stdlog = STDLOG(session_ptr);
2881  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2882  DBHandler::set_execution_mode_nolock(session_ptr.get(), mode);
2883 }
2884 
2885 namespace {
2886 
2888  if (td && td->nShards) {
2889  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2890  }
2891 }
2892 
2893 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
2894  const std::vector<std::string>& column_names) {
2895  std::unordered_set<std::string> unique_names;
2896  for (const auto& name : column_names) {
2897  auto lower_name = to_lower(name);
2898  if (unique_names.find(lower_name) != unique_names.end()) {
2899  THROW_DB_EXCEPTION("Column " + name + " is mentioned multiple times");
2900  } else {
2901  unique_names.insert(lower_name);
2902  }
2903  }
2904  for (const auto& cd : descs) {
2905  auto iter = unique_names.find(to_lower(cd->columnName));
2906  if (iter != unique_names.end()) {
2907  unique_names.erase(iter);
2908  }
2909  }
2910  if (!unique_names.empty()) {
2911  THROW_DB_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
2912  }
2913 }
2914 
2915 // Return vector of IDs mapping column descriptors to the list of comumn names.
2916 // The size of the vector is the number of actual columns (geophisical columns excluded).
2917 // ID is either a position in column_names matching the descriptor, or -1 if the column
2918 // is missing from the column_names
2919 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
2920  const std::vector<std::string>& column_names) {
2921  std::vector<int> desc_to_column_ids;
2922  if (column_names.empty()) {
2923  int col_idx = 0;
2924  for (const auto& cd : descs) {
2925  if (!cd->isGeoPhyCol) {
2926  desc_to_column_ids.push_back(col_idx);
2927  ++col_idx;
2928  }
2929  }
2930  } else {
2931  for (const auto& cd : descs) {
2932  if (!cd->isGeoPhyCol) {
2933  bool found = false;
2934  for (size_t j = 0; j < column_names.size(); ++j) {
2935  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
2936  found = true;
2937  desc_to_column_ids.push_back(j);
2938  break;
2939  }
2940  }
2941  if (!found) {
2942  if (!cd->columnType.get_notnull()) {
2943  desc_to_column_ids.push_back(-1);
2944  } else {
2945  THROW_DB_EXCEPTION("Column '" + cd->columnName +
2946  "' cannot be omitted due to NOT NULL constraint");
2947  }
2948  }
2949  }
2950  }
2951  }
2952  return desc_to_column_ids;
2953 }
2954 
2955 } // namespace
2956 
2958  const TSessionId& session_id,
2959  const Catalog& catalog,
2960  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
2961  const ColumnDescriptor* cd,
2962  size_t& col_idx,
2963  size_t num_rows,
2964  const std::string& table_name,
2965  bool assign_render_groups) {
2966  auto geo_col_idx = col_idx - 1;
2967  const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2968  std::vector<std::vector<double>> coords_column, bounds_column;
2969  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2970  std::vector<int> render_groups_column;
2971  SQLTypeInfo ti = cd->columnType;
2972  if (num_rows != wkt_or_wkb_hex_column->size() ||
2973  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
2974  ti,
2975  coords_column,
2976  bounds_column,
2977  ring_sizes_column,
2978  poly_rings_column,
2979  true)) {
2980  std::ostringstream oss;
2981  oss << "Invalid geometry in column " << cd->columnName;
2982  THROW_DB_EXCEPTION(oss.str());
2983  }
2984 
2985  // start or continue assigning render groups for poly columns?
2986  if (IS_GEO_POLY(cd->columnType.get_type()) && assign_render_groups &&
2988  // get RGA to use
2989  import_export::RenderGroupAnalyzer* render_group_analyzer{};
2990  {
2991  // mutex the map access
2992  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
2993 
2994  // emplace new RGA or fetch existing RGA from map
2995  auto [itr_table, emplaced_table] = render_group_assignment_map_.try_emplace(
2996  session_id, RenderGroupAssignmentTableMap());
2997  LOG_IF(INFO, emplaced_table)
2998  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2999  "Persistent Data for Session '"
3000  << session_id << "'";
3001  auto [itr_column, emplaced_column] =
3002  itr_table->second.try_emplace(table_name, RenderGroupAssignmentColumnMap());
3003  LOG_IF(INFO, emplaced_column)
3004  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3005  "Persistent Data for Table '"
3006  << table_name << "'";
3007  auto [itr_analyzer, emplaced_analyzer] = itr_column->second.try_emplace(
3008  cd->columnName, std::make_unique<import_export::RenderGroupAnalyzer>());
3009  LOG_IF(INFO, emplaced_analyzer)
3010  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3011  "Persistent Data for Column '"
3012  << cd->columnName << "'";
3013  render_group_analyzer = itr_analyzer->second.get();
3014  CHECK(render_group_analyzer);
3015 
3016  // seed new RGA from existing table/column, to handle appends
3017  if (emplaced_analyzer) {
3018  LOG(INFO) << "load_table_binary_columnar_polys: Seeding Render Groups from "
3019  "existing table...";
3020  render_group_analyzer->seedFromExistingTableContents(
3021  catalog, table_name, cd->columnName);
3022  LOG(INFO) << "load_table_binary_columnar_polys: Done";
3023  }
3024  }
3025 
3026  // assign render groups for this set of bounds
3027  LOG(INFO) << "load_table_binary_columnar_polys: Assigning Render Groups...";
3028  render_groups_column.reserve(bounds_column.size());
3029  for (auto const& bounds : bounds_column) {
3030  CHECK_EQ(bounds.size(), 4u);
3031  int rg = render_group_analyzer->insertBoundsAndReturnRenderGroup(bounds);
3032  render_groups_column.push_back(rg);
3033  }
3034  LOG(INFO) << "load_table_binary_columnar_polys: Done";
3035  } else {
3036  // render groups all zero
3037  render_groups_column.resize(bounds_column.size(), 0);
3038  }
3039 
3040  // Populate physical columns, advance col_idx
3042  cd,
3043  import_buffers,
3044  col_idx,
3045  coords_column,
3046  bounds_column,
3047  ring_sizes_column,
3048  poly_rings_column,
3049  render_groups_column);
3050 }
3051 
3053  const TSessionId& session_id,
3054  const Catalog& catalog,
3055  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3056  const std::list<const ColumnDescriptor*>& cds,
3057  const std::vector<int>& desc_id_to_column_id,
3058  size_t num_rows,
3059  const std::string& table_name,
3060  bool assign_render_groups) {
3061  size_t skip_physical_cols = 0;
3062  size_t col_idx = 0, import_idx = 0;
3063  for (const auto& cd : cds) {
3064  if (skip_physical_cols > 0) {
3065  CHECK(cd->isGeoPhyCol);
3066  skip_physical_cols--;
3067  continue;
3068  } else if (cd->columnType.is_geometry()) {
3069  skip_physical_cols = cd->columnType.get_physical_cols();
3070  }
3071  if (desc_id_to_column_id[import_idx] == -1) {
3072  import_buffers[col_idx]->addDefaultValues(cd, num_rows);
3073  col_idx++;
3074  if (cd->columnType.is_geometry()) {
3075  fillGeoColumns(session_id,
3076  catalog,
3077  import_buffers,
3078  cd,
3079  col_idx,
3080  num_rows,
3081  table_name,
3082  assign_render_groups);
3083  }
3084  } else {
3085  col_idx++;
3086  col_idx += skip_physical_cols;
3087  }
3088  import_idx++;
3089  }
3090 }
3091 
3092 void DBHandler::load_table_binary(const TSessionId& session_id_or_json,
3093  const std::string& table_name,
3094  const std::vector<TRow>& rows,
3095  const std::vector<std::string>& column_names) {
3096  try {
3097  heavyai::RequestInfo const request_info(session_id_or_json);
3098  SET_REQUEST_ID(request_info.requestId());
3099  auto stdlog =
3100  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
3101  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3102  auto session_ptr = stdlog.getConstSessionInfo();
3103 
3104  if (rows.empty()) {
3105  THROW_DB_EXCEPTION("No rows to insert");
3106  }
3107 
3108  const auto execute_read_lock =
3112  std::unique_ptr<import_export::Loader> loader;
3113  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3114  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3115  table_name,
3116  rows.front().cols.size(),
3117  &loader,
3118  &import_buffers,
3119  column_names,
3120  "load_table_binary");
3121 
3122  auto col_descs = loader->get_column_descs();
3123  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3124 
3125  size_t rows_completed = 0;
3126  for (auto const& row : rows) {
3127  size_t col_idx = 0;
3128  try {
3129  for (auto cd : col_descs) {
3130  auto mapped_idx = desc_id_to_column_id[col_idx];
3131  if (mapped_idx != -1) {
3132  import_buffers[col_idx]->add_value(
3133  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3134  }
3135  col_idx++;
3136  }
3137  rows_completed++;
3138  } catch (const std::exception& e) {
3139  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3140  import_buffers[col_idx_to_pop]->pop_value();
3141  }
3142  LOG(ERROR) << "Input exception thrown: " << e.what()
3143  << ". Row discarded, issue at column : " << (col_idx + 1)
3144  << " data :" << row;
3145  }
3146  }
3147  fillMissingBuffers(request_info.sessionId(),
3148  session_ptr->getCatalog(),
3149  import_buffers,
3150  col_descs,
3151  desc_id_to_column_id,
3152  rows_completed,
3153  table_name,
3154  false);
3155  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3156  session_ptr->getCatalog(), table_name);
3157  if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3158  THROW_DB_EXCEPTION(loader->getErrorMessage());
3159  }
3160  } catch (const std::exception& e) {
3161  THROW_DB_EXCEPTION(std::string(e.what()));
3162  }
3163 }
3164 
3165 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3167  const Catalog_Namespace::SessionInfo& session_info,
3168  const std::string& table_name,
3169  size_t num_cols,
3170  std::unique_ptr<import_export::Loader>* loader,
3171  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3172  const std::vector<std::string>& column_names,
3173  std::string load_type) {
3174  if (num_cols == 0) {
3175  THROW_DB_EXCEPTION("No columns to insert");
3176  }
3177  check_read_only(load_type);
3178  auto& cat = session_info.getCatalog();
3179  auto td_with_lock =
3180  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3182  cat, table_name, true));
3183  const auto td = (*td_with_lock)();
3184  CHECK(td);
3185 
3186  if (g_cluster && !leaf_aggregator_.leafCount()) {
3187  // Sharded table rows need to be routed to the leaf by an aggregator.
3189  }
3190  check_table_load_privileges(session_info, table_name);
3191 
3192  loader->reset(new import_export::Loader(cat, td));
3193 
3194  auto col_descs = (*loader)->get_column_descs();
3195  check_valid_column_names(col_descs, column_names);
3196  if (column_names.empty()) {
3197  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
3198  // Subtracting 1 (rowid) until TableDescriptor is updated.
3199  auto geo_physical_cols = std::count_if(
3200  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
3201  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
3202  (td->hasDeletedCol ? 2 : 1);
3203  if (num_cols != num_table_cols) {
3204  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
3205  ") does not match number of columns in table " +
3206  td->tableName + " (" + std::to_string(num_table_cols) +
3207  ")");
3208  }
3209  } else if (num_cols != column_names.size()) {
3211  "Number of columns specified does not match the "
3212  "number of columns given (" +
3213  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
3214  }
3215 
3216  *import_buffers = import_export::setup_column_loaders(td, loader->get());
3217  return std::move(td_with_lock);
3218 }
3219 namespace {
3220 
3221 size_t get_column_size(const TColumn& column) {
3222  if (!column.nulls.empty()) {
3223  return column.nulls.size();
3224  } else {
3225  // it is a very bold estimate but later we check it against REAL data
3226  // and if this function returns a wrong result (e.g. both int and string
3227  // vectors are filled with values), we get an error
3228  return column.data.int_col.size() + column.data.arr_col.size() +
3229  column.data.real_col.size() + column.data.str_col.size();
3230  }
3231 }
3232 
3233 } // namespace
3234 
3235 void DBHandler::load_table_binary_columnar(const TSessionId& session_id_or_json,
3236  const std::string& table_name,
3237  const std::vector<TColumn>& cols,
3238  const std::vector<std::string>& column_names) {
3239  heavyai::RequestInfo const request_info(session_id_or_json);
3240  SET_REQUEST_ID(request_info.requestId());
3242  table_name,
3243  cols,
3244  column_names,
3246 }
3247 
3249  const TSessionId& session_id_or_json,
3250  const std::string& table_name,
3251  const std::vector<TColumn>& cols,
3252  const std::vector<std::string>& column_names,
3253  const bool assign_render_groups) {
3254  heavyai::RequestInfo const request_info(session_id_or_json);
3255  SET_REQUEST_ID(request_info.requestId());
3257  table_name,
3258  cols,
3259  column_names,
3260  assign_render_groups
3263 }
3264 
3266  const TSessionId& session_id,
3267  const std::string& table_name,
3268  const std::vector<TColumn>& cols,
3269  const std::vector<std::string>& column_names,
3270  const AssignRenderGroupsMode assign_render_groups_mode) {
3271  auto stdlog = STDLOG(get_session_ptr(session_id), "table_name", table_name);
3272  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3273  auto session_ptr = stdlog.getConstSessionInfo();
3274 
3275  if (assign_render_groups_mode == AssignRenderGroupsMode::kCleanUp) {
3276  // throw if the user tries to pass column data on a clean-up
3277  if (cols.size()) {
3279  "load_table_binary_columnar_polys: Column data must be empty when called with "
3280  "assign_render_groups = false");
3281  }
3282 
3283  // mutex the map access
3284  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
3285 
3286  // drop persistent render group assignment data for this session and table
3287  // keep the per-session map in case other tables are active (ideally not)
3288  auto itr_session = render_group_assignment_map_.find(session_id);
3289  if (itr_session != render_group_assignment_map_.end()) {
3290  LOG(INFO) << "load_table_binary_columnar_polys: Cleaning up Render Group "
3291  "Assignment Persistent Data for Session '"
3292  << session_id << "', Table '" << table_name << "'";
3293  itr_session->second.erase(table_name);
3294  }
3295 
3296  // just doing clean-up, so we're done
3297  return;
3298  }
3299 
3300  const auto execute_read_lock =
3304  std::unique_ptr<import_export::Loader> loader;
3305  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3306  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3307  table_name,
3308  cols.size(),
3309  &loader,
3310  &import_buffers,
3311  column_names,
3312  "load_table_binary_columnar");
3313 
3314  auto desc_id_to_column_id =
3315  column_ids_by_names(loader->get_column_descs(), column_names);
3316  size_t num_rows = get_column_size(cols.front());
3317  size_t import_idx = 0; // index into the TColumn vector being loaded
3318  size_t col_idx = 0; // index into column description vector
3319  try {
3320  size_t skip_physical_cols = 0;
3321  for (auto cd : loader->get_column_descs()) {
3322  if (skip_physical_cols > 0) {
3323  CHECK(cd->isGeoPhyCol);
3324  skip_physical_cols--;
3325  continue;
3326  }
3327  auto mapped_idx = desc_id_to_column_id[import_idx];
3328  if (mapped_idx != -1) {
3329  size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3330  if (col_rows != num_rows) {
3331  std::ostringstream oss;
3332  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
3333  << cd->columnName << " , expecting " << num_rows << " rows, column "
3334  << col_idx << " has " << col_rows << " rows";
3335  THROW_DB_EXCEPTION(oss.str());
3336  }
3337  // Advance to the next column in the table
3338  col_idx++;
3339  // For geometry columns: process WKT strings and fill physical columns
3340  if (cd->columnType.is_geometry()) {
3341  fillGeoColumns(session_id,
3342  session_ptr->getCatalog(),
3343  import_buffers,
3344  cd,
3345  col_idx,
3346  num_rows,
3347  table_name,
3348  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3349  skip_physical_cols = cd->columnType.get_physical_cols();
3350  }
3351  } else {
3352  col_idx++;
3353  if (cd->columnType.is_geometry()) {
3354  skip_physical_cols = cd->columnType.get_physical_cols();
3355  col_idx += skip_physical_cols;
3356  }
3357  }
3358  // Advance to the next column of values being loaded
3359  import_idx++;
3360  }
3361  } catch (const std::exception& e) {
3362  std::ostringstream oss;
3363  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
3364  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3365  THROW_DB_EXCEPTION(oss.str());
3366  }
3367  fillMissingBuffers(session_id,
3368  session_ptr->getCatalog(),
3369  import_buffers,
3370  loader->get_column_descs(),
3371  desc_id_to_column_id,
3372  num_rows,
3373  table_name,
3374  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3375  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3376  session_ptr->getCatalog(), table_name);
3377  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3378  THROW_DB_EXCEPTION(loader->getErrorMessage());
3379  }
3380 }
3381 
3382 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
3383 
3384 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3385  do { \
3386  ::arrow::Status _s = (s); \
3387  if (UNLIKELY(!_s.ok())) { \
3388  TDBException ex; \
3389  ex.error_msg = _s.ToString(); \
3390  LOG(ERROR) << s.ToString(); \
3391  throw ex; \
3392  } \
3393  } while (0)
3394 
3395 namespace {
3396 
3397 RecordBatchVector loadArrowStream(const std::string& stream) {
3398  RecordBatchVector batches;
3399  try {
3400  // TODO(wesm): Make this simpler in general, see ARROW-1600
3401  auto stream_buffer =
3402  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
3403  static_cast<int64_t>(stream.size()));
3404 
3405  arrow::io::BufferReader buf_reader(stream_buffer);
3406  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3407  ARROW_ASSIGN_OR_THROW(batch_reader,
3408  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3409 
3410  while (true) {
3411  std::shared_ptr<arrow::RecordBatch> batch;
3412  // Read batch (zero-copy) from the stream
3413  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
3414  if (batch == nullptr) {
3415  break;
3416  }
3417  batches.emplace_back(std::move(batch));
3418  }
3419  } catch (const std::exception& e) {
3420  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
3421  }
3422  return batches;
3423 }
3424 
3425 } // namespace
3426 
3427 void DBHandler::load_table_binary_arrow(const TSessionId& session_id_or_json,
3428  const std::string& table_name,
3429  const std::string& arrow_stream,
3430  const bool use_column_names) {
3431  heavyai::RequestInfo const request_info(session_id_or_json);
3432  SET_REQUEST_ID(request_info.requestId());
3433  auto stdlog =
3434  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
3435  auto session_ptr = stdlog.getConstSessionInfo();
3436 
3437  RecordBatchVector batches = loadArrowStream(arrow_stream);
3438  // Assuming have one batch for now
3439  if (batches.size() != 1) {
3440  THROW_DB_EXCEPTION("Expected a single Arrow record batch. Import aborted");
3441  }
3442 
3443  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3444  std::unique_ptr<import_export::Loader> loader;
3445  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3446  std::vector<std::string> column_names;
3447  if (use_column_names) {
3448  column_names = batch->schema()->field_names();
3449  }
3450  const auto execute_read_lock =
3454  auto schema_read_lock =
3455  prepare_loader_generic(*session_ptr,
3456  table_name,
3457  static_cast<size_t>(batch->num_columns()),
3458  &loader,
3459  &import_buffers,
3460  column_names,
3461  "load_table_binary_arrow");
3462 
3463  auto desc_id_to_column_id =
3464  column_ids_by_names(loader->get_column_descs(), column_names);
3465  size_t num_rows = 0;
3466  size_t col_idx = 0;
3467  try {
3468  for (auto cd : loader->get_column_descs()) {
3469  auto mapped_idx = desc_id_to_column_id[col_idx];
3470  if (mapped_idx != -1) {
3471  auto& array = *batch->column(mapped_idx);
3472  import_export::ArraySliceRange row_slice(0, array.length());
3473  num_rows = import_buffers[col_idx]->add_arrow_values(
3474  cd, array, true, row_slice, nullptr);
3475  }
3476  col_idx++;
3477  }
3478  } catch (const std::exception& e) {
3479  LOG(ERROR) << "Input exception thrown: " << e.what()
3480  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3481  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
3482  // other import paths
3483  THROW_DB_EXCEPTION(e.what());
3484  }
3485  fillMissingBuffers(request_info.sessionId(),
3486  session_ptr->getCatalog(),
3487  import_buffers,
3488  loader->get_column_descs(),
3489  desc_id_to_column_id,
3490  num_rows,
3491  table_name,
3492  false);
3493  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3494  session_ptr->getCatalog(), table_name);
3495  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3496  THROW_DB_EXCEPTION(loader->getErrorMessage());
3497  }
3498 }
3499 
3500 void DBHandler::load_table(const TSessionId& session_id_or_json,
3501  const std::string& table_name,
3502  const std::vector<TStringRow>& rows,
3503  const std::vector<std::string>& column_names) {
3504  try {
3505  heavyai::RequestInfo const request_info(session_id_or_json);
3506  SET_REQUEST_ID(request_info.requestId());
3507  auto stdlog =
3508  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
3509  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3510  auto session_ptr = stdlog.getConstSessionInfo();
3511 
3512  if (rows.empty()) {
3513  THROW_DB_EXCEPTION("No rows to insert");
3514  }
3515 
3516  const auto execute_read_lock =
3520  std::unique_ptr<import_export::Loader> loader;
3521  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3522  auto schema_read_lock =
3523  prepare_loader_generic(*session_ptr,
3524  table_name,
3525  static_cast<size_t>(rows.front().cols.size()),
3526  &loader,
3527  &import_buffers,
3528  column_names,
3529  "load_table");
3530 
3531  auto col_descs = loader->get_column_descs();
3532  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3533  import_export::CopyParams copy_params;
3534  size_t rows_completed = 0;
3535  for (auto const& row : rows) {
3536  size_t import_idx = 0; // index into the TStringRow being loaded
3537  size_t col_idx = 0; // index into column description vector
3538  try {
3539  size_t skip_physical_cols = 0;
3540  for (auto cd : col_descs) {
3541  if (skip_physical_cols > 0) {
3542  CHECK(cd->isGeoPhyCol);
3543  skip_physical_cols--;
3544  continue;
3545  }
3546  auto mapped_idx = desc_id_to_column_id[import_idx];
3547  if (mapped_idx != -1) {
3548  import_buffers[col_idx]->add_value(cd,
3549  row.cols[mapped_idx].str_val,
3550  row.cols[mapped_idx].is_null,
3551  copy_params);
3552  }
3553  col_idx++;
3554  if (cd->columnType.is_geometry()) {
3555  // physical geo columns will be filled separately lately
3556  skip_physical_cols = cd->columnType.get_physical_cols();
3557  col_idx += skip_physical_cols;
3558  }
3559  // Advance to the next field within the row
3560  import_idx++;
3561  }
3562  rows_completed++;
3563  } catch (const std::exception& e) {
3564  LOG(ERROR) << "Input exception thrown: " << e.what()
3565  << ". Row discarded, issue at column : " << (col_idx + 1)
3566  << " data :" << row;
3567  THROW_DB_EXCEPTION(std::string("Exception: ") + e.what());
3568  }
3569  }
3570  // do batch filling of geo columns separately
3571  if (rows.size() != 0) {
3572  const auto& row = rows[0];
3573  size_t col_idx = 0; // index into column description vector
3574  try {
3575  size_t import_idx = 0;
3576  size_t skip_physical_cols = 0;
3577  for (auto cd : col_descs) {
3578  if (skip_physical_cols > 0) {
3579  skip_physical_cols--;
3580  continue;
3581  }
3582  auto mapped_idx = desc_id_to_column_id[import_idx];
3583  col_idx++;
3584  if (cd->columnType.is_geometry()) {
3585  skip_physical_cols = cd->columnType.get_physical_cols();
3586  if (mapped_idx != -1) {
3587  fillGeoColumns(request_info.sessionId(),
3588  session_ptr->getCatalog(),
3589  import_buffers,
3590  cd,
3591  col_idx,
3592  rows_completed,
3593  table_name,
3594  false);
3595  } else {
3596  col_idx += skip_physical_cols;
3597  }
3598  }
3599  import_idx++;
3600  }
3601  } catch (const std::exception& e) {
3602  LOG(ERROR) << "Input exception thrown: " << e.what()
3603  << ". Row discarded, issue at column : " << (col_idx + 1)
3604  << " data :" << row;
3605  THROW_DB_EXCEPTION(e.what());
3606  }
3607  }
3608  fillMissingBuffers(request_info.sessionId(),
3609  session_ptr->getCatalog(),
3610  import_buffers,
3611  col_descs,
3612  desc_id_to_column_id,
3613  rows_completed,
3614  table_name,
3615  false);
3616  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3617  session_ptr->getCatalog(), table_name);
3618  if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3619  THROW_DB_EXCEPTION(loader->getErrorMessage());
3620  }
3621 
3622  } catch (const std::exception& e) {
3623  THROW_DB_EXCEPTION(std::string(e.what()));
3624  }
3625 }
3626 
3627 char DBHandler::unescape_char(std::string str) {
3628  char out = str[0];
3629  if (str.size() == 2 && str[0] == '\\') {
3630  if (str[1] == 't') {
3631  out = '\t';
3632  } else if (str[1] == 'n') {
3633  out = '\n';
3634  } else if (str[1] == '0') {
3635  out = '\0';
3636  } else if (str[1] == '\'') {
3637  out = '\'';
3638  } else if (str[1] == '\\') {
3639  out = '\\';
3640  }
3641  }
3642  return out;
3643 }
3644 
3646  import_export::CopyParams copy_params;
3647  switch (cp.has_header) {
3648  case TImportHeaderRow::AUTODETECT:
3650  break;
3651  case TImportHeaderRow::NO_HEADER:
3653  break;
3654  case TImportHeaderRow::HAS_HEADER:
3656  break;
3657  default:
3658  CHECK(false);
3659  }
3660  copy_params.quoted = cp.quoted;
3661  if (cp.delimiter.length() > 0) {
3662  copy_params.delimiter = unescape_char(cp.delimiter);
3663  } else {
3664  copy_params.delimiter = '\0';
3665  }
3666  if (cp.null_str.length() > 0) {
3667  copy_params.null_str = cp.null_str;
3668  }
3669  if (cp.quote.length() > 0) {
3670  copy_params.quote = unescape_char(cp.quote);
3671  }
3672  if (cp.escape.length() > 0) {
3673  copy_params.escape = unescape_char(cp.escape);
3674  }
3675  if (cp.line_delim.length() > 0) {
3676  copy_params.line_delim = unescape_char(cp.line_delim);
3677  }
3678  if (cp.array_delim.length() > 0) {
3679  copy_params.array_delim = unescape_char(cp.array_delim);
3680  }
3681  if (cp.array_begin.length() > 0) {
3682  copy_params.array_begin = unescape_char(cp.array_begin);
3683  }
3684  if (cp.array_end.length() > 0) {
3685  copy_params.array_end = unescape_char(cp.array_end);
3686  }
3687  if (cp.threads != 0) {
3688  copy_params.threads = cp.threads;
3689  }
3690  if (cp.s3_access_key.length() > 0) {
3691  copy_params.s3_access_key = cp.s3_access_key;
3692  }
3693  if (cp.s3_secret_key.length() > 0) {
3694  copy_params.s3_secret_key = cp.s3_secret_key;
3695  }
3696  if (cp.s3_session_token.length() > 0) {
3697  copy_params.s3_session_token = cp.s3_session_token;
3698  }
3699  if (cp.s3_region.length() > 0) {
3700  copy_params.s3_region = cp.s3_region;
3701  }
3702  if (cp.s3_endpoint.length() > 0) {
3703  copy_params.s3_endpoint = cp.s3_endpoint;
3704  }
3705 #ifdef HAVE_AWS_S3
3706  if (g_allow_s3_server_privileges && cp.s3_access_key.length() == 0 &&
3707  cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3708  const auto& server_credentials =
3709  Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3710  copy_params.s3_access_key = server_credentials.GetAWSAccessKeyId();
3711  copy_params.s3_secret_key = server_credentials.GetAWSSecretKey();
3712  copy_params.s3_session_token = server_credentials.GetSessionToken();
3713  }
3714 #endif
3715 
3716  switch (cp.source_type) {
3717  case TSourceType::DELIMITED_FILE:
3719  break;
3720  case TSourceType::GEO_FILE:
3722  break;
3723  case TSourceType::PARQUET_FILE:
3724 #ifdef ENABLE_IMPORT_PARQUET
3726  break;
3727 #else
3728  THROW_DB_EXCEPTION("Parquet not supported");
3729 #endif
3730  case TSourceType::ODBC:
3731  THROW_DB_EXCEPTION("ODBC source not supported");
3732  case TSourceType::RASTER_FILE:
3734  break;
3735  default:
3736  CHECK(false);
3737  }
3738 
3739  switch (cp.geo_coords_encoding) {
3740  case TEncodingType::GEOINT:
3741  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3742  break;
3743  case TEncodingType::NONE:
3744  copy_params.geo_coords_encoding = kENCODING_NONE;
3745  break;
3746  default:
3747  THROW_DB_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3748  std::to_string((int)cp.geo_coords_encoding));
3749  }
3750  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3751  switch (cp.geo_coords_type) {
3752  case TDatumType::GEOGRAPHY:
3753  copy_params.geo_coords_type = kGEOGRAPHY;
3754  break;
3755  case TDatumType::GEOMETRY:
3756  copy_params.geo_coords_type = kGEOMETRY;
3757  break;
3758  default:
3759  THROW_DB_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3760  std::to_string((int)cp.geo_coords_type));
3761  }
3762  switch (cp.geo_coords_srid) {
3763  case 4326:
3764  case 3857:
3765  case 900913:
3766  copy_params.geo_coords_srid = cp.geo_coords_srid;
3767  break;
3768  default:
3769  THROW_DB_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3770  std::to_string((int)cp.geo_coords_srid));
3771  }
3772  copy_params.sanitize_column_names = cp.sanitize_column_names;
3773  copy_params.geo_layer_name = cp.geo_layer_name;
3774  copy_params.geo_assign_render_groups =
3775  cp.geo_assign_render_groups && g_enable_assign_render_groups;
3776  copy_params.geo_explode_collections = cp.geo_explode_collections;
3777  copy_params.source_srid = cp.source_srid;
3778  switch (cp.raster_point_type) {
3779  case TRasterPointType::NONE:
3781  break;
3782  case TRasterPointType::AUTO:
3784  break;
3785  case TRasterPointType::SMALLINT:
3787  break;
3788  case TRasterPointType::INT:
3790  break;
3791  case TRasterPointType::FLOAT:
3793  break;
3794  case TRasterPointType::DOUBLE:
3796  break;
3797  case TRasterPointType::POINT:
3799  break;
3800  default:
3801  CHECK(false);
3802  }
3803  copy_params.raster_import_bands = cp.raster_import_bands;
3804  if (cp.raster_scanlines_per_thread < 0) {
3805  THROW_DB_EXCEPTION("Invalid raster_scanlines_per_thread in TCopyParams (" +
3806  std::to_string((int)cp.raster_scanlines_per_thread));
3807  } else {
3808  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3809  }
3810  switch (cp.raster_point_transform) {
3811  case TRasterPointTransform::NONE:
3813  break;
3814  case TRasterPointTransform::AUTO:
3816  break;
3817  case TRasterPointTransform::FILE:
3819  break;
3820  case TRasterPointTransform::WORLD:
3822  break;
3823  default:
3824  CHECK(false);
3825  }
3826  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3827  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3828  copy_params.dsn = cp.odbc_dsn;
3829  copy_params.connection_string = cp.odbc_connection_string;
3830  copy_params.sql_select = cp.odbc_sql_select;
3831  copy_params.sql_order_by = cp.odbc_sql_order_by;
3832  copy_params.username = cp.odbc_username;
3833  copy_params.password = cp.odbc_password;
3834  copy_params.credential_string = cp.odbc_credential_string;
3835  copy_params.add_metadata_columns = cp.add_metadata_columns;
3836  copy_params.trim_spaces = cp.trim_spaces;
3837  return copy_params;
3838 }
3839 
3841  TCopyParams copy_params;
3842  copy_params.delimiter = cp.delimiter;
3843  copy_params.null_str = cp.null_str;
3844  switch (cp.has_header) {
3846  copy_params.has_header = TImportHeaderRow::AUTODETECT;
3847  break;
3849  copy_params.has_header = TImportHeaderRow::NO_HEADER;
3850  break;
3852  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
3853  break;
3854  default:
3855  CHECK(false);
3856  }
3857  copy_params.quoted = cp.quoted;
3858  copy_params.quote = cp.quote;
3859  copy_params.escape = cp.escape;
3860  copy_params.line_delim = cp.line_delim;
3861  copy_params.array_delim = cp.array_delim;
3862  copy_params.array_begin = cp.array_begin;
3863  copy_params.array_end = cp.array_end;
3864  copy_params.threads = cp.threads;
3865  copy_params.s3_access_key = cp.s3_access_key;
3866  copy_params.s3_secret_key = cp.s3_secret_key;
3867  copy_params.s3_session_token = cp.s3_session_token;
3868  copy_params.s3_region = cp.s3_region;
3869  copy_params.s3_endpoint = cp.s3_endpoint;
3870  switch (cp.source_type) {
3872  copy_params.source_type = TSourceType::DELIMITED_FILE;
3873  break;
3875  copy_params.source_type = TSourceType::GEO_FILE;
3876  break;
3878  copy_params.source_type = TSourceType::PARQUET_FILE;
3879  break;
3881  copy_params.source_type = TSourceType::RASTER_FILE;
3882  break;
3884  copy_params.source_type = TSourceType::ODBC;
3885  break;
3886  default:
3887  CHECK(false);
3888  }
3889  switch (cp.geo_coords_encoding) {
3890  case kENCODING_GEOINT:
3891  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
3892  break;
3893  default:
3894  copy_params.geo_coords_encoding = TEncodingType::NONE;
3895  break;
3896  }
3897  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3898  switch (cp.geo_coords_type) {
3899  case kGEOGRAPHY:
3900  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
3901  break;
3902  case kGEOMETRY:
3903  copy_params.geo_coords_type = TDatumType::GEOMETRY;
3904  break;
3905  default:
3906  CHECK(false);
3907  }
3908  copy_params.geo_coords_srid = cp.geo_coords_srid;
3909  copy_params.sanitize_column_names = cp.sanitize_column_names;
3910  copy_params.geo_layer_name = cp.geo_layer_name;
3911  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3912  copy_params.geo_explode_collections = cp.geo_explode_collections;
3913  copy_params.source_srid = cp.source_srid;
3914  switch (cp.raster_point_type) {
3916  copy_params.raster_point_type = TRasterPointType::NONE;
3917  break;
3919  copy_params.raster_point_type = TRasterPointType::AUTO;
3920  break;
3922  copy_params.raster_point_type = TRasterPointType::SMALLINT;
3923  break;
3925  copy_params.raster_point_type = TRasterPointType::INT;
3926  break;
3928  copy_params.raster_point_type = TRasterPointType::FLOAT;
3929  break;
3931  copy_params.raster_point_type = TRasterPointType::DOUBLE;
3932  break;
3934  copy_params.raster_point_type = TRasterPointType::POINT;
3935  break;
3936  default:
3937  CHECK(false);
3938  }
3939  copy_params.raster_import_bands = cp.raster_import_bands;
3940  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3941  switch (cp.raster_point_transform) {
3943  copy_params.raster_point_transform = TRasterPointTransform::NONE;
3944  break;
3946  copy_params.raster_point_transform = TRasterPointTransform::AUTO;
3947  break;
3949  copy_params.raster_point_transform = TRasterPointTransform::FILE;
3950  break;
3952  copy_params.raster_point_transform = TRasterPointTransform::WORLD;
3953  break;
3954  default:
3955  CHECK(false);
3956  }
3957  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3958  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3959  copy_params.odbc_dsn = cp.dsn;
3960  copy_params.odbc_connection_string = cp.connection_string;
3961  copy_params.odbc_sql_select = cp.sql_select;
3962  copy_params.odbc_sql_order_by = cp.sql_order_by;
3963  copy_params.odbc_username = cp.username;
3964  copy_params.odbc_password = cp.password;
3965  copy_params.odbc_credential_string = cp.credential_string;
3966  copy_params.add_metadata_columns = cp.add_metadata_columns;
3967  copy_params.trim_spaces = cp.trim_spaces;
3968  return copy_params;
3969 }
3970 
3971 namespace {
3972 void add_vsi_network_prefix(std::string& path) {
3973  // do we support network file access?
3974  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
3975 
3976  // modify head of filename based on source location
3977  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
3978  if (!gdal_network) {
3980  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
3981  }
3982  // invoke GDAL CURL virtual file reader
3983  path = "/vsicurl/" + path;
3984  } else if (boost::istarts_with(path, "s3://")) {
3985  if (!gdal_network) {
3987  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
3988  }
3989  // invoke GDAL S3 virtual file reader
3990  boost::replace_first(path, "s3://", "/vsis3/");
3991  }
3992 }
3993 
3994 void add_vsi_geo_prefix(std::string& path) {
3995  // single gzip'd file (not an archive)?
3996  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3997  path = "/vsigzip/" + path;
3998  }
3999 }
4000 
4001 void add_vsi_archive_prefix(std::string& path) {
4002  // check for compressed file or file bundle
4003  if (boost::iends_with(path, ".zip")) {
4004  // zip archive
4005  path = "/vsizip/" + path;
4006  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
4007  boost::iends_with(path, ".tar.gz")) {
4008  // tar archive (compressed or uncompressed)
4009  path = "/vsitar/" + path;
4010  }
4011 }
4012 
4013 std::string remove_vsi_prefixes(const std::string& path_in) {
4014  std::string path(path_in);
4015 
4016  // these will be first
4017  if (boost::istarts_with(path, "/vsizip/")) {
4018  boost::replace_first(path, "/vsizip/", "");
4019  } else if (boost::istarts_with(path, "/vsitar/")) {
4020  boost::replace_first(path, "/vsitar/", "");
4021  } else if (boost::istarts_with(path, "/vsigzip/")) {
4022  boost::replace_first(path, "/vsigzip/", "");
4023  }
4024 
4025  // then these
4026  if (boost::istarts_with(path, "/vsicurl/")) {
4027  boost::replace_first(path, "/vsicurl/", "");
4028  } else if (boost::istarts_with(path, "/vsis3/")) {
4029  boost::replace_first(path, "/vsis3/", "s3://");
4030  }
4031 
4032  return path;
4033 }
4034 
4035 bool path_is_relative(const std::string& path) {
4036  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
4037  boost::istarts_with(path, "https://")) {
4038  return false;
4039  }
4040  return !boost::filesystem::path(path).is_absolute();
4041 }
4042 
4043 bool path_has_valid_filename(const std::string& path) {
4044  auto filename = boost::filesystem::path(path).filename().string();
4045  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
4046  return false;
4047  }
4048  return true;
4049 }
4050 
4051 bool is_a_supported_geo_file(const std::string& path) {
4052  if (!path_has_valid_filename(path)) {
4053  return false;
4054  }
4055  // this is now just for files that we want to recognize
4056  // as geo when inside an archive (see below)
4057  // @TODO(se) make this more flexible?
4058  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
4059  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
4060  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
4061  boost::iends_with(path, ".gdb.zip") || boost::iends_with(path, ".fgb")) {
4062  return true;
4063  }
4064  return false;
4065 }
4066 
4067 bool is_a_supported_archive_file(const std::string& path) {
4068  if (!path_has_valid_filename(path)) {
4069  return false;
4070  }
4071  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
4072  return true;
4073  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
4074  boost::iends_with(path, ".tar.gz")) {
4075  return true;
4076  }
4077  return false;
4078 }
4079 
4080 std::string find_first_geo_file_in_archive(const std::string& archive_path,
4081  const import_export::CopyParams& copy_params) {
4082  // get the recursive list of all files in the archive
4083  std::vector<std::string> files =
4084  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
4085 
4086  // report the list
4087  LOG(INFO) << "Found " << files.size() << " files in Archive "
4088  << remove_vsi_prefixes(archive_path);
4089  for (const auto& file : files) {
4090  LOG(INFO) << " " << file;
4091  }
4092 
4093  // scan the list for the first candidate file
4094  bool found_suitable_file = false;
4095  std::string file_name;
4096  for (const auto& file : files) {
4097  if (is_a_supported_geo_file(file)) {
4098  file_name = file;
4099  found_suitable_file = true;
4100  break;
4101  }
4102  }
4103 
4104  // if we didn't find anything
4105  if (!found_suitable_file) {
4106  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
4107  remove_vsi_prefixes(archive_path);
4108  file_name.clear();
4109  }
4110 
4111  // done
4112  return file_name;
4113 }
4114 
4115 bool is_local_file(const std::string& file_path) {
4116  return (!boost::istarts_with(file_path, "s3://") &&
4117  !boost::istarts_with(file_path, "http://") &&
4118  !boost::istarts_with(file_path, "https://"));
4119 }
4120 
4121 void validate_import_file_path_if_local(const std::string& file_path) {
4122  if (is_local_file(file_path)) {
4124  file_path, ddl_utils::DataTransferType::IMPORT, true);
4125  }
4126 }
4127 } // namespace
4128 
4129 void DBHandler::detect_column_types(TDetectResult& _return,
4130  const TSessionId& session_id_or_json,
4131  const std::string& file_name_in,
4132  const TCopyParams& cp) {
4133  heavyai::RequestInfo const request_info(session_id_or_json);
4134  SET_REQUEST_ID(request_info.requestId());
4135  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4136  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4137  check_read_only("detect_column_types");
4138 
4139  bool is_raster = false;
4140  boost::filesystem::path file_path;
4142  if (copy_params.source_type != import_export::SourceType::kOdbc) {
4143  std::string file_name{file_name_in};
4144  if (path_is_relative(file_name)) {
4145  // assume relative paths are relative to data_path / import / <session>
4146  auto temp_file_path = import_path_ /
4147  picosha2::hash256_hex_string(request_info.sessionId()) /
4148  boost::filesystem::path(file_name).filename();
4149  file_name = temp_file_path.string();
4150  }
4152 
4153  if ((copy_params.source_type == import_export::SourceType::kGeoFile ||
4155  is_local_file(file_name)) {
4156  const shared::FilePathOptions options{copy_params.regex_path_filter,
4157  copy_params.file_sort_order_by,
4158  copy_params.file_sort_regex};
4159  auto file_paths = shared::local_glob_filter_sort_files(file_name, options, false);
4160  // For geo and raster detect, pick the first file, if multiple files are provided
4161  // (e.g. through file globbing).
4162  CHECK(!file_paths.empty());
4163  file_name = file_paths[0];
4164  }
4165 
4166  // if it's a geo or raster import, handle alternative paths (S3, HTTP, archive etc.)
4167  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
4168  if (is_a_supported_archive_file(file_name)) {
4169  // find the archive file
4170  add_vsi_network_prefix(file_name);
4171  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4172  THROW_DB_EXCEPTION("Archive does not exist: " + file_name_in);
4173  }
4174  // find geo file in archive
4175  add_vsi_archive_prefix(file_name);
4176  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4177  // prepare to detect that geo file
4178  if (geo_file.size()) {
4179  file_name = file_name + std::string("/") + geo_file;
4180  }
4181  } else {
4182  // prepare to detect geo file directly
4183  add_vsi_network_prefix(file_name);
4184  add_vsi_geo_prefix(file_name);
4185  }
4186  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
4187  // prepare to detect raster file directly
4188  add_vsi_network_prefix(file_name);
4189  add_vsi_geo_prefix(file_name);
4190  is_raster = true;
4191  }
4192 
4193  file_path = boost::filesystem::path(file_name);
4194  // can be a s3 url
4195  if (!boost::istarts_with(file_name, "s3://")) {
4196  if (!boost::filesystem::path(file_name).is_absolute()) {
4197  file_path = import_path_ /
4198  picosha2::hash256_hex_string(request_info.sessionId()) /
4199  boost::filesystem::path(file_name).filename();
4200  file_name = file_path.string();
4201  }
4202 
4203  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4205  // check for geo or raster file
4206  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4207  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4208  "\" does not exist.")
4209  }
4210  } else {
4211  // check for regular file
4212  if (!shared::file_or_glob_path_exists(file_path.string())) {
4213  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4214  "\" does not exist.");
4215  }
4216  }
4217  }
4218  }
4219 
4220  try {
4222 #ifdef ENABLE_IMPORT_PARQUET
4224 #endif
4225  ) {
4226  import_export::Detector detector(file_path, copy_params);
4227  auto best_types = detector.getBestColumnTypes();
4228  std::vector<std::string> headers = detector.get_headers();
4229  copy_params = detector.get_copy_params();
4230 
4231  _return.copy_params = copyparams_to_thrift(copy_params);
4232  _return.row_set.row_desc.resize(best_types.size());
4233  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
4234  TColumnType col;
4235  auto& ti = best_types[col_idx];
4236  col.col_type.precision = ti.get_precision();
4237  col.col_type.scale = ti.get_scale();
4238  col.col_type.comp_param = ti.get_comp_param();
4239  if (ti.is_geometry()) {
4240  // set this so encoding_to_thrift does the right thing
4241  ti.set_compression(copy_params.geo_coords_encoding);
4242  // fill in these directly
4243  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
4244  col.col_type.scale = copy_params.geo_coords_srid;
4245  col.col_type.comp_param = copy_params.geo_coords_comp_param;
4246  }
4247  col.col_type.type = type_to_thrift(ti);
4248  col.col_type.encoding = encoding_to_thrift(ti);
4249  if (ti.is_array()) {
4250  col.col_type.is_array = true;
4251  }
4252  if (copy_params.sanitize_column_names) {
4253  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
4254  } else {
4255  col.col_name = headers[col_idx];
4256  }
4257  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
4258  _return.row_set.row_desc[col_idx] = col;
4259  }
4260  auto sample_data =
4262 
4263  TRow sample_row;
4264  for (auto row : sample_data) {
4265  sample_row.cols.clear();
4266  for (const auto& s : row) {
4267  TDatum td;
4268  td.val.str_val = s;
4269  td.is_null = s.empty();
4270  sample_row.cols.push_back(td);
4271  }
4272  _return.row_set.rows.push_back(sample_row);
4273  }
4274  } else if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4276  check_geospatial_files(file_path, copy_params);
4277  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
4278  file_path.string(), is_raster, Geospatial::kGeoColumnName, copy_params);
4279  for (auto cd : cds) {
4280  if (copy_params.sanitize_column_names) {
4281  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
4282  }
4283  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
4284  }
4285  if (!is_raster) {
4286  // @TODO(se) support for raster?
4287  std::map<std::string, std::vector<std::string>> sample_data;
4289  file_path.string(),
4291  sample_data,
4293  copy_params);
4294  if (sample_data.size() > 0) {
4295  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4296  TRow sample_row;
4297  for (auto cd : cds) {
4298  TDatum td;
4299  td.val.str_val = sample_data[cd.sourceName].at(i);
4300  td.is_null = td.val.str_val.empty();
4301  sample_row.cols.push_back(td);
4302  }
4303  _return.row_set.rows.push_back(sample_row);
4304  }
4305  }
4306  }
4307  _return.copy_params = copyparams_to_thrift(copy_params);
4308  }
4309  } catch (const std::exception& e) {
4310  THROW_DB_EXCEPTION("detect_column_types error: " + std::string(e.what()));
4311  }
4312 }
4313 
4314 void DBHandler::render_vega(TRenderResult& _return,
4315  const TSessionId& session_id_or_json,
4316  const int64_t widget_id,
4317  const std::string& vega_json,
4318  const int compression_level,
4319  const std::string& nonce) {
4320  heavyai::RequestInfo const request_info(session_id_or_json);
4321  SET_REQUEST_ID(request_info.requestId());
4322  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()),
4323  "widget_id",
4324  widget_id,
4325  "compression_level",
4326  compression_level,
4327  "vega_json",
4328  vega_json,
4329  "nonce",
4330  nonce);
4331  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4332  stdlog.appendNameValuePairs("nonce", nonce);
4333  if (!render_handler_) {
4334  THROW_DB_EXCEPTION("Backend rendering is disabled.");
4335  }
4336 
4337  // cast away const-ness of incoming Thrift string ref
4338  // to allow it to be passed down as an r-value and
4339  // ultimately std::moved into the RenderSession
4340  auto& non_const_vega_json = const_cast<std::string&>(vega_json);
4341 
4342  _return.total_time_ms = measure<>::execution([&]() {
4343  try {
4344  render_handler_->render_vega(_return,
4345  stdlog.getSessionInfo(),
4346  widget_id,
4347  std::move(non_const_vega_json),
4348  compression_level,
4349  nonce);
4350  } catch (std::exception& e) {
4351  THROW_DB_EXCEPTION(e.what());
4352  }
4353  });
4354 }
4355 
4357  int32_t dashboard_id,
4358  AccessPrivileges requestedPermissions) {
4359  DBObject object(dashboard_id, DashboardDBObjectType);
4360  auto& catalog = session_info.getCatalog();
4361  auto& user = session_info.get_currentUser();
4362  object.loadKey(catalog);
4363  object.setPrivileges(requestedPermissions);
4364  std::vector<DBObject> privs = {object};
4365  return SysCatalog::instance().checkPrivileges(user, privs);
4366 }
4367 
4368 // custom expressions
4369 namespace {
4372 
4373 std::unique_ptr<Catalog_Namespace::CustomExpression> create_custom_expr_from_thrift_obj(
4374  const TCustomExpression& t_custom_expr,
4375  const Catalog& catalog) {
4376  if (t_custom_expr.data_source_name.empty()) {
4377  THROW_DB_EXCEPTION("Custom expression data source name cannot be empty.")
4378  }
4379  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4380  << "Unexpected data source type: "
4381  << static_cast<int>(t_custom_expr.data_source_type);
4382  auto td = catalog.getMetadataForTable(t_custom_expr.data_source_name, false);
4383  if (!td) {
4384  THROW_DB_EXCEPTION("Custom expression references a table \"" +
4385  t_custom_expr.data_source_name + "\" that does not exist.")
4386  }
4387  DataSourceType data_source_type = DataSourceType::TABLE;
4388  return std::make_unique<CustomExpression>(
4389  t_custom_expr.name, t_custom_expr.expression_json, data_source_type, td->tableId);
4390 }
4391 
4392 TCustomExpression create_thrift_obj_from_custom_expr(const CustomExpression& custom_expr,
4393  const Catalog& catalog) {
4394  TCustomExpression t_custom_expr;
4395  t_custom_expr.id = custom_expr.id;
4396  t_custom_expr.name = custom_expr.name;
4397  t_custom_expr.expression_json = custom_expr.expression_json;
4398  t_custom_expr.data_source_id = custom_expr.data_source_id;
4399  t_custom_expr.is_deleted = custom_expr.is_deleted;
4400  CHECK(custom_expr.data_source_type == DataSourceType::TABLE)
4401  << "Unexpected data source type: "
4402  << static_cast<int>(custom_expr.data_source_type);
4403  t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4404  auto td = catalog.getMetadataForTable(custom_expr.data_source_id, false);
4405  if (td) {
4406  t_custom_expr.data_source_name = td->tableName;
4407  } else {
4408  LOG(WARNING)
4409  << "Custom expression references a deleted data source. Custom expression id: "
4410  << custom_expr.id << ", name: " << custom_expr.name;
4411  }
4412  return t_custom_expr;
4413 }
4414 } // namespace
4415 
4416 int32_t DBHandler::create_custom_expression(const TSessionId& session_id_or_json,
4417  const TCustomExpression& t_custom_expr) {
4418  heavyai::RequestInfo const request_info(session_id_or_json);
4419  SET_REQUEST_ID(request_info.requestId());
4420  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4421  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4422  check_read_only("create_custom_expression");
4423 
4424  auto session_ptr = stdlog.getConstSessionInfo();
4425  if (!session_ptr->get_currentUser().isSuper) {
4426  THROW_DB_EXCEPTION("Custom expressions can only be created by super users.")
4427  }
4428  auto& catalog = session_ptr->getCatalog();
4430  return catalog.createCustomExpression(
4431  create_custom_expr_from_thrift_obj(t_custom_expr, catalog));
4432 }
4433 
4434 void DBHandler::get_custom_expressions(std::vector<TCustomExpression>& _return,
4435  const TSessionId& session_id_or_json) {
4436  heavyai::RequestInfo const request_info(session_id_or_json);
4437  SET_REQUEST_ID(request_info.requestId());
4438  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4439  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4440 
4441  auto session_ptr = stdlog.getConstSessionInfo();
4442  auto& catalog = session_ptr->getCatalog();
4444  auto custom_expressions =
4445  catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4446  for (const auto& custom_expression : custom_expressions) {
4447  _return.emplace_back(create_thrift_obj_from_custom_expr(*custom_expression, catalog));
4448  }
4449 }
4450 
4451 void DBHandler::update_custom_expression(const TSessionId& session_id_or_json,
4452  const int32_t id,
4453  const std::string& expression_json) {
4454  heavyai::RequestInfo const request_info(session_id_or_json);
4455  SET_REQUEST_ID(request_info.requestId());
4456  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4457  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4458  check_read_only("update_custom_expression");
4459 
4460  auto session_ptr = stdlog.getConstSessionInfo();
4461  if (!session_ptr->get_currentUser().isSuper) {
4462  THROW_DB_EXCEPTION("Custom expressions can only be updated by super users.")
4463  }
4464  auto& catalog = session_ptr->getCatalog();
4466  catalog.updateCustomExpression(id, expression_json);
4467 }
4468 
4470  const TSessionId& session_id_or_json,
4471  const std::vector<int32_t>& custom_expression_ids,
4472  const bool do_soft_delete) {
4473  heavyai::RequestInfo const request_info(session_id_or_json);
4474  SET_REQUEST_ID(request_info.requestId());
4475  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4476  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4477  check_read_only("delete_custom_expressions");
4478 
4479  auto session_ptr = stdlog.getConstSessionInfo();
4480  if (!session_ptr->get_currentUser().isSuper) {
4481  THROW_DB_EXCEPTION("Custom expressions can only be deleted by super users.")
4482  }
4483  auto& catalog = session_ptr->getCatalog();
4485  catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4486 }
4487 
4488 // dashboards
4489 void DBHandler::get_dashboard(TDashboard& dashboard,
4490  const TSessionId& session_id_or_json,
4491  const int32_t dashboard_id) {
4492  heavyai::RequestInfo const request_info(session_id_or_json);
4493  SET_REQUEST_ID(request_info.requestId());
4494  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4495  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4496  auto session_ptr = stdlog.getConstSessionInfo();
4497  auto const& cat = session_ptr->getCatalog();
4499  auto dash = cat.getMetadataForDashboard(dashboard_id);
4500  if (!dash) {
4501  THROW_DB_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
4502  " doesn't exist");
4503  }
4505  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4506  THROW_DB_EXCEPTION("User has no view privileges for the dashboard with id " +
4507  std::to_string(dashboard_id));
4508  }
4509  user_meta.userName = "";
4510  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4511  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
4512 }
4513 
4514 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
4515  const TSessionId& session_id_or_json) {
4516  heavyai::RequestInfo const request_info(session_id_or_json);
4517  SET_REQUEST_ID(request_info.requestId());
4518  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4519  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4520  auto session_ptr = stdlog.getConstSessionInfo();
4521  auto const& cat = session_ptr->getCatalog();
4523  const auto dashes = cat.getAllDashboardsMetadata();
4524  user_meta.userName = "";
4525  for (const auto dash : dashes) {
4527  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4528  // dashboardState is intentionally not populated here
4529  // for payload reasons
4530  // use get_dashboard call to get state
4531  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
4532  }
4533  }
4534 }
4535 
4537  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4539  const DashboardDescriptor* dash,
4540  const bool populate_state) {
4541  auto const& cat = session_ptr->getCatalog();
4542  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4543  auto objects_list = SysCatalog::instance().getMetadataForObject(
4544  cat.getCurrentDB().dbId,
4545  static_cast<int>(DBObjectType::DashboardDBObjectType),
4546  dash->dashboardId);
4547  TDashboard dashboard;
4548  dashboard.dashboard_name = dash->dashboardName;
4549  if (populate_state) {
4550  dashboard.dashboard_state = dash->dashboardState;
4551  }
4552  dashboard.image_hash = dash->imageHash;
4553  dashboard.update_time = dash->updateTime;
4554  dashboard.dashboard_metadata = dash->dashboardMetadata;
4555  dashboard.dashboard_id = dash->dashboardId;
4556  dashboard.dashboard_owner = dash->user;
4557  TDashboardPermissions perms;
4558  // Super user has all permissions.
4559  if (session_ptr->get_currentUser().isSuper) {
4560  perms.create_ = true;
4561  perms.delete_ = true;
4562  perms.edit_ = true;
4563  perms.view_ = true;
4564  } else {
4565  // Collect all grants on current user
4566  // add them to the permissions.
4567  auto obj_to_find =
4568  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
4569  obj_to_find.loadKey(cat);
4570  std::vector<std::string> grantees =
4571  SysCatalog::instance().getRoles(true,
4572  session_ptr->get_currentUser().isSuper,
4573  session_ptr->get_currentUser().userName);
4574  for (const auto& grantee : grantees) {
4575  DBObject* object_found;
4576  auto* gr = SysCatalog::instance().getGrantee(grantee);
4577  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
4578  const auto obj_privs = object_found->getPrivileges();
4579  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4580  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4581  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4582  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4583  }
4584  }
4585  }
4586  dashboard.dashboard_permissions = perms;
4587  if (objects_list.empty() ||
4588  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
4589  dashboard.is_dash_shared = false;
4590  } else {
4591  dashboard.is_dash_shared = true;
4592  }
4593  return dashboard;
4594 }
4595 
4596 namespace dbhandler {
4597 bool is_info_schema_db(const std::string& db_name) {
4598  return (db_name == shared::kInfoSchemaDbName &&
4599  SysCatalog::instance().hasExecutedMigration(shared::kInfoSchemaMigrationName));
4600 }
4601 
4602 void check_not_info_schema_db(const std::string& db_name, bool throw_db_exception) {
4603  if (is_info_schema_db(db_name)) {
4604  std::string error_message{"Write requests/queries are not allowed in the " +
4605  shared::kInfoSchemaDbName + " database."};
4606  if (throw_db_exception) {
4607  THROW_DB_EXCEPTION(error_message)
4608  } else {
4609  throw std::runtime_error(error_message);
4610  }
4611  }
4612 }
4613 } // namespace dbhandler
4614 
4615 int32_t DBHandler::create_dashboard(const TSessionId& session_id_or_json,
4616  const std::string& dashboard_name,
4617  const std::string& dashboard_state,
4618  const std::string& image_hash,
4619  const std::string& dashboard_metadata) {
4620  heavyai::RequestInfo const request_info(session_id_or_json);
4621  SET_REQUEST_ID(request_info.requestId());
4622  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4623  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4624  auto session_ptr = stdlog.getConstSessionInfo();
4625  CHECK(session_ptr);
4626  check_read_only("create_dashboard");
4627  auto& cat = session_ptr->getCatalog();
4630  }
4631 
4632  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
4634  THROW_DB_EXCEPTION("Not enough privileges to create a dashboard.");
4635  }
4636 
4637  if (dashboard_exists(cat, session_ptr->get_currentUser().userId, dashboard_name)) {
4638  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4639  }
4640 
4642  dd.dashboardName = dashboard_name;
4643  dd.dashboardState = dashboard_state;
4644  dd.imageHash = image_hash;
4645  dd.dashboardMetadata = dashboard_metadata;
4646  dd.userId = session_ptr->get_currentUser().userId;
4647  dd.user = session_ptr->get_currentUser().userName;
4648 
4649  try {
4650  auto id = cat.createDashboard(dd);
4651  // TODO: transactionally unsafe
4652  SysCatalog::instance().createDBObject(
4653  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
4654  return id;
4655  } catch (const std::exception& e) {
4656  THROW_DB_EXCEPTION(e.what());
4657  }
4658 }
4659 
4660 void DBHandler::replace_dashboard(const TSessionId& session_id_or_json,
4661  const int32_t dashboard_id,
4662  const std::string& dashboard_name,
4663  const std::string& dashboard_owner,
4664  const std::string& dashboard_state,
4665  const std::string& image_hash,
4666  const std::string& dashboard_metadata) {
4667  heavyai::RequestInfo const request_info(session_id_or_json);
4668  SET_REQUEST_ID(request_info.requestId());
4669  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4670  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4671  auto session_ptr = stdlog.getConstSessionInfo();
4672  CHECK(session_ptr);
4673  check_read_only("replace_dashboard");
4674  auto& cat = session_ptr->getCatalog();
4677  }
4678 
4680  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
4681  THROW_DB_EXCEPTION("Not enough privileges to replace a dashboard.");
4682  }
4683 
4684  if (auto dash = cat.getMetadataForDashboard(
4685  std::to_string(session_ptr->get_currentUser().userId), dashboard_name)) {
4686  if (dash->dashboardId != dashboard_id) {
4687  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4688  }
4689  }
4690 
4692  dd.dashboardName = dashboard_name;
4693  dd.dashboardState = dashboard_state;
4694  dd.imageHash = image_hash;
4695  dd.dashboardMetadata = dashboard_metadata;
4697  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
4698  THROW_DB_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
4699  " does not exist");
4700  }
4701  dd.userId = user.userId;
4702  dd.user = dashboard_owner;
4703  dd.dashboardId = dashboard_id;
4704 
4705  try {
4706  cat.replaceDashboard(dd);
4707  } catch (const std::exception& e) {
4708  THROW_DB_EXCEPTION(e.what());
4709  }
4710 }
4711 
4712 void DBHandler::delete_dashboard(const TSessionId& session_id_or_json,
4713  const int32_t dashboard_id) {
4714  delete_dashboards(session_id_or_json, {dashboard_id});
4715 }
4716 
4717 void DBHandler::delete_dashboards(const TSessionId& session_id_or_json,
4718  const std::vector<int32_t>& dashboard_ids) {
4719  heavyai::RequestInfo const request_info(session_id_or_json);
4720  SET_REQUEST_ID(request_info.requestId());
4721  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4722  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4723  auto session_ptr = stdlog.getConstSessionInfo();
4724  check_read_only("delete_dashboards");
4725  auto& cat = session_ptr->getCatalog();
4728  }
4729  // Checks will be performed in catalog
4730  try {
4731  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4732  } catch (const std::exception& e) {
4733  THROW_DB_EXCEPTION(e.what());
4734  }
4735 }
4736 
4737 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session_id_or_json,
4738  int32_t dashboard_id,
4739  std::vector<std::string> groups) {
4740  heavyai::RequestInfo const request_info(session_id_or_json);
4741  SET_REQUEST_ID(request_info.requestId());
4742  const auto session_info = get_session_copy(request_info.sessionId());
4743  auto& cat = session_info.getCatalog();
4744  auto dash = cat.getMetadataForDashboard(dashboard_id);
4745  if (!dash) {
4746  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4747  " does not exist");
4748  } else if (session_info.get_currentUser().userId != dash->userId &&
4749  !session_info.get_currentUser().isSuper) {
4750  throw std::runtime_error(
4751  "User should be either owner of dashboard or super user to share/unshare it");
4752  }
4753  std::vector<std::string> valid_groups;
4755  for (auto& group : groups) {
4756  user_meta.isSuper = false; // initialize default flag
4757  if (!SysCatalog::instance().getGrantee(group)) {
4758  THROW_DB_EXCEPTION("User/Role " + group + " does not exist");
4759  } else if (!user_meta.isSuper) {
4760  valid_groups.push_back(group);
4761  }
4762  }
4763  return valid_groups;
4764 }
4765 
4766 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
4767  for (auto const& group : groups) {
4768  if (!SysCatalog::instance().getGrantee(group)) {
4769  THROW_DB_EXCEPTION("User/Role '" + group + "' does not exist");
4770  }
4771  }
4772 }
4773 
4775  const Catalog_Namespace::SessionInfo& session_info,
4776  const std::vector<int32_t>& dashboard_ids) {
4777  auto& cat = session_info.getCatalog();
4778  std::map<std::string, std::list<int32_t>> errors;
4779  for (auto const& dashboard_id : dashboard_ids) {
4780  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
4781  if (!dashboard) {
4782  errors["Dashboard id does not exist"].push_back(dashboard_id);
4783  } else if (session_info.get_currentUser().userId != dashboard->userId &&
4784  !session_info.get_currentUser().isSuper) {
4785  errors["User should be either owner of dashboard or super user to share/unshare it"]
4786  .push_back(dashboard_id);
4787  }
4788  }
4789  if (!errors.empty()) {
4790  std::stringstream error_stream;
4791  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
4792  for (const auto& [error, id_list] : errors) {
4793  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
4794  }
4795  THROW_DB_EXCEPTION(error_stream.str());
4796  }
4797 }
4798 
4799 void DBHandler::shareOrUnshareDashboards(const TSessionId& session_id,
4800  const std::vector<int32_t>& dashboard_ids,
4801  const std::vector<std::string>& groups,
4802  const TDashboardPermissions& permissions,
4803  const bool do_share) {
4804  auto stdlog = STDLOG(get_session_ptr(session_id));
4805  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4806  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
4807  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
4808  !permissions.view_) {
4809  THROW_DB_EXCEPTION("At least one privilege should be assigned for " +
4810  std::string(do_share ? "grants" : "revokes"));
4811  }
4812  auto session_ptr = stdlog.getConstSessionInfo();
4813  auto const& catalog = session_ptr->getCatalog();
4814  auto& sys_catalog = SysCatalog::instance();
4815  validateGroups(groups);
4816  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
4817  std::vector<DBObject> batch_objects;
4818  for (auto const& dashboard_id : dashboard_ids) {
4819  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
4820  AccessPrivileges privs;
4821  if (permissions.delete_) {
4823  }
4824  if (permissions.create_) {
4826  }
4827  if (permissions.edit_) {
4829  }
4830  if (permissions.view_) {
4832  }
4833  object.setPrivileges(privs);
4834  batch_objects.push_back(object);
4835  }
4836  if (do_share) {
4837  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4838  } else {
4839  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4840  }
4841 }
4842 
4843 void DBHandler::share_dashboards(const TSessionId& session_id_or_json,
4844  const std::vector<int32_t>& dashboard_ids,
4845  const std::vector<std::string>& groups,
4846  const TDashboardPermissions& permissions) {
4847  heavyai::RequestInfo const request_info(session_id_or_json);
4848  SET_REQUEST_ID(request_info.requestId());
4850  request_info.sessionId(), dashboard_ids, groups, permissions, true);
4851 }
4852 
4853 // NOOP: Grants not available for objects as of now
4854 void DBHandler::share_dashboard(const TSessionId& session_id_or_json,
4855  const int32_t dashboard_id,
4856  const std::vector<std::string>& groups,
4857  const std::vector<std::string>& objects,
4858  const TDashboardPermissions& permissions,
4859  const bool grant_role = false) {
4860  share_dashboards(session_id_or_json, {dashboard_id}, groups, permissions);
4861 }
4862 
4863 void DBHandler::unshare_dashboards(const TSessionId& session_id_or_json,
4864  const std::vector<int32_t>& dashboard_ids,
4865  const std::vector<std::string>& groups,
4866  const TDashboardPermissions& permissions) {
4867  heavyai::RequestInfo const request_info(session_id_or_json);
4868  SET_REQUEST_ID(request_info.requestId());
4870  request_info.sessionId(), dashboard_ids, groups, permissions, false);
4871 }
4872 
4873 void DBHandler::unshare_dashboard(const TSessionId& session_id_or_json,
4874  const int32_t dashboard_id,
4875  const std::vector<std::string>& groups,
4876  const std::vector<std::string>& objects,
4877  const TDashboardPermissions& permissions) {
4878  unshare_dashboards(session_id_or_json, {dashboard_id}, groups, permissions);
4879 }
4880 
4882  std::vector<TDashboardGrantees>& dashboard_grantees,
4883  const TSessionId& session_id_or_json,
4884  const int32_t dashboard_id) {
4885  heavyai::RequestInfo const request_info(session_id_or_json);
4886  SET_REQUEST_ID(request_info.requestId());
4887  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4888  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4889  auto session_ptr = stdlog.getConstSessionInfo();
4890  auto const& cat = session_ptr->getCatalog();
4892  auto dash = cat.getMetadataForDashboard(dashboard_id);
4893  if (!dash) {
4894  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4895  " does not exist");
4896  } else if (session_ptr->get_currentUser().userId != dash->userId &&
4897  !session_ptr->get_currentUser().isSuper) {
4899  "User should be either owner of dashboard or super user to access grantees");
4900  }
4901  std::vector<ObjectRoleDescriptor*> objectsList;
4902  objectsList = SysCatalog::instance().getMetadataForObject(
4903  cat.getCurrentDB().dbId,
4904  static_cast<int>(DBObjectType::DashboardDBObjectType),
4905  dashboard_id); // By default objecttypecan be only dashabaords
4906  user_meta.userId = -1;
4907  user_meta.userName = "";
4908  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4909  for (auto object : objectsList) {
4910  if (user_meta.userName == object->roleName) {
4911  // Mask owner
4912  continue;
4913  }
4914  TDashboardGrantees grantee;
4915  TDashboardPermissions perm;
4916  grantee.name = object->roleName;
4917  grantee.is_user = object->roleType;
4918  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4919  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4920  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4921  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4922  grantee.permissions = perm;
4923  dashboard_grantees.push_back(grantee);
4924  }
4925 }
4926 
4927 void DBHandler::create_link(std::string& _return,
4928  const TSessionId& session_id_or_json,
4929  const std::string& view_state,
4930  const std::string& view_metadata) {
4931  heavyai::RequestInfo const request_info(session_id_or_json);
4932  SET_REQUEST_ID(request_info.requestId());
4933  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4934  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4935  auto session_ptr = stdlog.getConstSessionInfo();
4936  // check_read_only("create_link");
4937  auto& cat = session_ptr->getCatalog();
4938 
4939  LinkDescriptor ld;
4940  ld.userId = session_ptr->get_currentUser().userId;
4941  ld.viewState = view_state;
4942  ld.viewMetadata = view_metadata;
4943 
4944  try {
4945  _return = cat.createLink(ld, 6);
4946  } catch (const std::exception& e) {
4947  THROW_DB_EXCEPTION(e.what());
4948  }
4949 }
4950 
4952  const std::string& name,
4953  const bool is_array) {
4954  TColumnType ct;
4955  ct.col_name = name;
4956  ct.col_type.type = type;
4957  ct.col_type.is_array = is_array;
4958  return ct;
4959 }
4960 
4961 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
4962  const import_export::CopyParams& copy_params) {
4963  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
4964  if (std::find(shp_ext.begin(),
4965  shp_ext.end(),
4966  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
4967  shp_ext.end()) {
4968  for (auto ext : shp_ext) {
4969  auto aux_file = file_path;
4971  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
4972  copy_params) &&
4974  aux_file.replace_extension(ext).string(), copy_params)) {
4975  throw std::runtime_error("required file for shapefile does not exist: " +
4976  aux_file.filename().string());
4977  }
4978  }
4979  }
4980 }
4981 
4982 void DBHandler::create_table(const TSessionId& session_id_or_json,
4983  const std::string& table_name,
4984  const TRowDescriptor& rd,
4985  const TCreateParams& create_params) {
4986  heavyai::RequestInfo request_info(session_id_or_json);
4987  SET_REQUEST_ID(request_info.requestId());
4988  auto stdlog = STDLOG("table_name", table_name);
4989  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4990  check_read_only("create_table");
4991 
4992  if (ImportHelpers::is_reserved_name(table_name)) {
4993  THROW_DB_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
4994  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
4995  THROW_DB_EXCEPTION("Invalid characters in table name: " + table_name);
4996  }
4997 
4998  auto rds = rd;
4999 
5000  std::string stmt{"CREATE TABLE " + table_name};
5001  std::vector<std::string> col_stmts;
5002 
5003  for (auto col : rds) {
5004  if (ImportHelpers::is_reserved_name(col.col_name)) {
5005  THROW_DB_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
5006  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
5007  THROW_DB_EXCEPTION("Invalid characters in column name: " + col.col_name);
5008  }
5009  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
5010  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
5011  THROW_DB_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
5012  " for column: " + col.col_name);
5013  }
5014 
5015  if (col.col_type.type == TDatumType::DECIMAL) {
5016  // if no precision or scale passed in set to default 14,7
5017  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
5018  col.col_type.precision = 14;
5019  col.col_type.scale = 7;
5020  }
5021  }
5022 
5023  std::string col_stmt;
5024  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
5025  if (col.__isset.default_value) {
5026  col_stmt.append(" DEFAULT " + col.default_value);
5027  }
5028 
5029  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
5030  // `nullable` argument, leading this to default to false. Uncomment for v2.
5031  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
5032 
5033  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
5034  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
5035  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
5036  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
5037  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT ||
5038  thrift_to_encoding(col.col_type.encoding) == kENCODING_DATE_IN_DAYS) {
5039  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
5040  }
5041  } else if (col.col_type.type == TDatumType::STR) {
5042  // non DICT encoded strings
5043  col_stmt.append(" ENCODING NONE");
5044  } else if (col.col_type.type == TDatumType::POINT ||
5045  col.col_type.type == TDatumType::MULTIPOINT ||
5046  col.col_type.type == TDatumType::LINESTRING ||
5047  col.col_type.type == TDatumType::MULTILINESTRING ||
5048  col.col_type.type == TDatumType::POLYGON ||
5049  col.col_type.type == TDatumType::MULTIPOLYGON) {
5050  // non encoded compressable geo
5051  if (col.col_type.scale == 4326) {
5052  col_stmt.append(" ENCODING NONE");
5053  }
5054  }
5055  col_stmts.push_back(col_stmt);
5056  }
5057 
5058  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
5059 
5060  if (create_params.is_replicated) {
5061  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
5062  }
5063 
5064  stmt.append(";");
5065 
5066  TQueryResult ret;
5067  request_info.setRequestId(logger::request_id());
5068  sql_execute(ret, request_info.json(), stmt, true, "", -1, -1);
5069 }
5070 
5071 void DBHandler::import_table(const TSessionId& session_id_or_json,
5072  const std::string& table_name,
5073  const std::string& file_name_in,
5074  const TCopyParams& cp) {
5075  try {
5076  heavyai::RequestInfo const request_info(session_id_or_json);
5077  SET_REQUEST_ID(request_info.requestId());
5078  auto stdlog =
5079  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
5080  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5081  auto session_ptr = stdlog.getConstSessionInfo();
5082  check_read_only("import_table");
5083  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
5084 
5085  const auto execute_read_lock =
5089  auto& cat = session_ptr->getCatalog();
5091  auto start_time = ::toString(std::chrono::system_clock::now());
5093  executor->enrollQuerySession(request_info.sessionId(),
5094  "IMPORT_TABLE",
5095  start_time,
5097  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5098  }
5099 
5100  ScopeGuard clearInterruptStatus = [executor, &request_info, &start_time] {
5101  // reset the runtime query interrupt status
5103  executor->clearQuerySessionStatus(request_info.sessionId(), start_time);
5104  }
5105  };
5106  const auto td_with_lock =
5108  cat, table_name);
5109  const auto td = td_with_lock();
5110  CHECK(td);
5111  check_table_load_privileges(*session_ptr, table_name);
5112 
5113  std::string copy_from_source;
5115  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5116  copy_from_source = copy_params.sql_select;
5117  } else {
5118  std::string file_name{file_name_in};
5119  auto file_path = boost::filesystem::path(file_name);
5120  if (!boost::istarts_with(file_name, "s3://")) {
5121  if (!boost::filesystem::path(file_name).is_absolute()) {
5122  file_path = import_path_ /
5123  picosha2::hash256_hex_string(request_info.sessionId()) /
5124  boost::filesystem::path(file_name).filename();
5125  file_name = file_path.string();
5126  }
5127  if (!shared::file_or_glob_path_exists(file_path.string())) {
5128  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
5129  "\" does not exist.");
5130  }
5131  }
5133 
5134  // TODO(andrew): add delimiter detection to Importer
5135  if (copy_params.delimiter == '\0') {
5136  copy_params.delimiter = ',';
5137  if (boost::filesystem::extension(file_path) == ".tsv") {
5138  copy_params.delimiter = '\t';
5139  }
5140  }
5141  copy_from_source = file_path.string();
5142  }
5143 
5144  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
5145  session_ptr->getCatalog(), table_name);
5146  std::unique_ptr<import_export::AbstractImporter> importer;
5147  importer = import_export::create_importer(cat, td, copy_from_source, copy_params);
5148  auto ms = measure<>::execution([&]() { importer->import(session_ptr.get()); });
5149  LOG(INFO) << "Total Import Time: " << (double)ms / 1000.0 << " Seconds.";
5150  } catch (const TDBException& e) {
5151  throw;
5152  } catch (const std::exception& e) {
5153  THROW_DB_EXCEPTION(std::string(e.what()));
5154  }
5155 }
5156 
5157 namespace {
5158 
5159 // helper functions for error checking below
5160 // these would usefully be added as methods of TDatumType
5161 // but that's not possible as it's auto-generated by Thrift
5162 
5164  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
5165  t == TDatumType::LINESTRING || t == TDatumType::MULTILINESTRING ||
5166  t == TDatumType::POINT || t == TDatumType::MULTIPOINT);
5167 }
5168 
5170  std::stringstream ss;
5171  ss << t;
5172  return ss.str();
5173 }
5174 
5175 std::string get_mismatch_attr_warning_text(const std::string& table_name,
5176  const std::string& file_path,
5177  const std::string& column_name,
5178  const std::string& attr,
5179  const std::string& got,
5180  const std::string& expected) {
5181  return "Issue encountered in geo/raster file '" + file_path +
5182  "' while appending to table '" + table_name + "'. Column '" + column_name +
5183  "' " + attr + " mismatch (got '" + got + "', expected '" + expected + "')";
5184 }
5185 
5186 } // namespace
5187 
5188 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
5189  THROW_DB_EXCEPTION("Could not append geo/raster file '" + \
5190  file_path.filename().string() + "' to table '" + table_name + \
5191  "'. Column '" + cd->columnName + "' " + attr + " mismatch (got '" + \
5192  got + "', expected '" + expected + "')");
5193 
5194 void DBHandler::import_geo_table(const TSessionId& session_id_or_json,
5195  const std::string& table_name,
5196  const std::string& file_name,
5197  const TCopyParams& cp,
5198  const TRowDescriptor& row_desc,
5199  const TCreateParams& create_params) {
5200  // this is the direct Thrift endpoint
5201  // it does NOT support the separate FSI regex/filter/sort options
5202  // but it DOES support basic globbing specified in the filename itself
5203  heavyai::RequestInfo const request_info(session_id_or_json);
5204  SET_REQUEST_ID(request_info.requestId());
5205  importGeoTableGlobFilterSort(request_info.sessionId(),
5206  table_name,
5207  file_name,
5209  row_desc,
5210  create_params);
5211 }
5212 
5213 void DBHandler::importGeoTableGlobFilterSort(const TSessionId& session_id,
5214  const std::string& table_name,
5215  const std::string& file_name,
5216  const import_export::CopyParams& copy_params,
5217  const TRowDescriptor& row_desc,
5218  const TCreateParams& create_params) {
5219  // this is called by the above direct Thrift endpoint
5220  // and also for a deferred COPY FROM for geo/raster
5221  // it DOES support the full FSI regex/filter/sort options
5222  std::vector<std::string> file_names;
5223  try {
5224  const shared::FilePathOptions options{copy_params.regex_path_filter,
5225  copy_params.file_sort_order_by,
5226  copy_params.file_sort_regex};
5228  file_names = shared::local_glob_filter_sort_files(file_name, options, false);
5229  } catch (const shared::FileNotFoundException& e) {
5230  // no files match, just try the original filename, might be remote
5231  file_names.push_back(file_name);
5232  }
5233  // import whatever we found
5234  for (auto const& file_name : file_names) {
5236  session_id, table_name, file_name, copy_params, row_desc, create_params);
5237  }
5238 }
5239 
5240 void DBHandler::importGeoTableSingle(const TSessionId& session_id,
5241  const std::string& table_name,
5242  const std::string& file_name_in,
5243  const import_export::CopyParams& copy_params,
5244  const TRowDescriptor& row_desc,
5245  const TCreateParams& create_params) {
5246  auto stdlog = STDLOG(get_session_ptr(session_id), "table_name", table_name);
5247  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5248  auto session_ptr = stdlog.getConstSessionInfo();
5249  check_read_only("import_table");
5250 
5251  auto& cat = session_ptr->getCatalog();
5253  auto start_time = ::toString(std::chrono::system_clock::now());
5255  executor->enrollQuerySession(session_id,
5256  "IMPORT_GEO_TABLE",
5257  start_time,
5259  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5260  }
5261 
5262  ScopeGuard clearInterruptStatus = [executor, &session_id, &start_time] {
5263  // reset the runtime query interrupt status
5265  executor->clearQuerySessionStatus(session_id, start_time);
5266  }
5267  };
5268 
5269  std::string file_name{file_name_in};
5270 
5271  if (path_is_relative(file_name)) {
5272  // assume relative paths are relative to data_path / import / <session>
5273  auto file_path = import_path_ / picosha2::hash256_hex_string(session_id) /
5274  boost::filesystem::path(file_name).filename();
5275  file_name = file_path.string();
5276  }
5278 
5279  bool is_raster = false;
5280  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
5281  if (is_a_supported_archive_file(file_name)) {
5282  // find the archive file
5283  add_vsi_network_prefix(file_name);
5284  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
5285  THROW_DB_EXCEPTION("Archive does not exist: " + file_name_in);
5286  }
5287  // find geo file in archive
5288  add_vsi_archive_prefix(file_name);
5289  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
5290  // prepare to load that geo file
5291  if (geo_file.size()) {
5292  file_name = file_name + std::string("/") + geo_file;
5293  }
5294  } else {
5295  // prepare to load geo file directly
5296  add_vsi_network_prefix(file_name);
5297  add_vsi_geo_prefix(file_name);
5298  }
5299  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
5300  // prepare to load geo raster file directly
5301  add_vsi_network_prefix(file_name);
5302  add_vsi_geo_prefix(file_name);
5303  is_raster = true;
5304  } else {
5305  THROW_DB_EXCEPTION("import_geo_table called with file_type other than GEO or RASTER");
5306  }
5307 
5308  // log what we're about to try to do
5309  VLOG(1) << "import_geo_table: Original filename: " << file_name_in;
5310  VLOG(1) << "import_geo_table: Actual filename: " << file_name;
5311  VLOG(1) << "import_geo_table: Raster: " << is_raster;
5312 
5313  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
5314  auto file_path = boost::filesystem::path(file_name);
5315  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
5316  THROW_DB_EXCEPTION("File does not exist: " + file_path.filename().string());
5317  }
5318 
5319  // use GDAL to check any dependent files exist (ditto)
5320  try {
5321  check_geospatial_files(file_path, copy_params);
5322  } catch (const std::exception& e) {
5323  THROW_DB_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5324  }
5325 
5326  // get layer info and deconstruct
5327  // in general, we will get a combination of layers of these four types:
5328  // EMPTY: no rows, report and skip
5329  // GEO: create a geo table from this
5330  // NON_GEO: create a regular table from this
5331  // UNSUPPORTED_GEO: report and skip
5332  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
5333  if (!is_raster) {
5334  try {
5335  layer_info =
5336  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
5337  } catch (const std::exception& e) {
5338  THROW_DB_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5339  }
5340  }
5341 
5342  // categorize the results
5343  using LayerNameToContentsMap =
5344  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
5345  LayerNameToContentsMap load_layers;
5346  LOG_IF(INFO, layer_info.size() > 0)
5347  << "import_geo_table: Found the following layers in the geo file:";
5348  for (const auto& layer : layer_info) {
5349  switch (layer.contents) {
5351  LOG(INFO) << "import_geo_table: '" << layer.name
5352  << "' (will import as geo table)";
5353  load_layers[layer.name] = layer.contents;
5354  break;
5356  LOG(INFO) << "import_geo_table: '" << layer.name
5357  << "' (will import as regular table)";
5358  load_layers[layer.name] = layer.contents;
5359  break;
5361  LOG(WARNING) << "import_geo_table: '" << layer.name
5362  << "' (will not import, unsupported geo type)";
5363  break;
5365  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
5366  break;
5367  default:
5368  break;
5369  }
5370  }
5371 
5372  // if nothing is loadable, stop now
5373  if (!is_raster && load_layers.size() == 0) {
5374  THROW_DB_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
5375  }
5376 
5377  // if we've been given an explicit layer name, check that it exists and is loadable
5378  // scan the original list, as it may exist but not have been gathered as loadable
5379  if (!is_raster && copy_params.geo_layer_name.size()) {
5380  bool found = false;
5381  for (const auto& layer : layer_info) {
5382  if (copy_params.geo_layer_name == layer.name) {
5385  // forget all the other layers and just load this one
5386  load_layers.clear();
5387  load_layers[layer.name] = layer.contents;
5388  found = true;
5389  break;
5390  } else if (layer.contents ==
5392  THROW_DB_EXCEPTION("import_geo_table: Explicit geo layer '" +
5393  copy_params.geo_layer_name + "' has unsupported geo type!");
5394  } else if (layer.contents ==