OmniSciDB  eb3a3d0a03
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: DBHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "DBHandler.h"
25 #include "DistributedLoader.h"
26 #include "TokenCompletionHints.h"
27 
28 #ifdef HAVE_PROFILER
29 #include <gperftools/heap-profiler.h>
30 #endif // HAVE_PROFILER
31 
32 #include "MapDRelease.h"
33 
34 #include "Calcite/Calcite.h"
35 #include "gen-cpp/CalciteServer.h"
36 
39 
40 #include "Catalog/Catalog.h"
44 #include "DistributedHandler.h"
46 #include "Geospatial/Compression.h"
47 #include "Geospatial/GDAL.h"
48 #include "Geospatial/Transforms.h"
49 #include "Geospatial/Types.h"
50 #include "ImportExport/Importer.h"
51 #include "LockMgr/LockMgr.h"
53 #include "Parser/ParserWrapper.h"
55 #include "Parser/parser.h"
58 #include "QueryEngine/Execute.h"
68 #include "Shared/ArrowUtil.h"
69 #include "Shared/StringTransform.h"
70 #include "Shared/import_helpers.h"
72 #include "Shared/measure.h"
73 #include "Shared/scope.h"
75 
76 #ifdef HAVE_AWS_S3
77 #include <aws/core/auth/AWSCredentialsProviderChain.h>
78 #endif
79 #include <fcntl.h>
80 #include <picosha2.h>
81 #include <sys/types.h>
82 #include <algorithm>
83 #include <boost/algorithm/string.hpp>
84 #include <boost/filesystem.hpp>
85 #include <boost/make_shared.hpp>
86 #include <boost/process/search_path.hpp>
87 #include <boost/program_options.hpp>
88 #include <boost/regex.hpp>
89 #include <boost/tokenizer.hpp>
90 #include <cmath>
91 #include <csignal>
92 #include <fstream>
93 #include <future>
94 #include <map>
95 #include <memory>
96 #include <random>
97 #include <regex>
98 #include <string>
99 #include <thread>
100 #include <typeinfo>
101 
102 #include <arrow/api.h>
103 #include <arrow/io/api.h>
104 #include <arrow/ipc/api.h>
105 
106 #include "Shared/ArrowUtil.h"
107 
108 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
109 
110 #ifdef HAVE_AWS_S3
111 extern bool g_allow_s3_server_privileges;
112 #endif
113 
116 
117 #define INVALID_SESSION_ID ""
118 
119 #define THROW_MAPD_EXCEPTION(errstr) \
120  { \
121  TOmniSciException ex; \
122  ex.error_msg = errstr; \
123  LOG(ERROR) << ex.error_msg; \
124  throw ex; \
125  }
126 
127 thread_local std::string TrackingProcessor::client_address;
129 
130 namespace {
131 
132 SessionMap::iterator get_session_from_map(const TSessionId& session,
133  SessionMap& session_map) {
134  auto session_it = session_map.find(session);
135  if (session_it == session_map.end()) {
136  THROW_MAPD_EXCEPTION("Session not valid.");
137  }
138  return session_it;
139 }
140 
141 struct ForceDisconnect : public std::runtime_error {
142  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
143 };
144 
145 } // namespace
146 
147 template <>
148 SessionMap::iterator DBHandler::get_session_it_unsafe(
149  const TSessionId& session,
150  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
151  auto session_it = get_session_from_map(session, sessions_);
152  try {
153  check_session_exp_unsafe(session_it);
154  } catch (const ForceDisconnect& e) {
155  read_lock.unlock();
156  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
157  auto session_it2 = get_session_from_map(session, sessions_);
158  disconnect_impl(session_it2, write_lock);
159  THROW_MAPD_EXCEPTION(e.what());
160  }
161  return session_it;
162 }
163 
164 template <>
165 SessionMap::iterator DBHandler::get_session_it_unsafe(
166  const TSessionId& session,
167  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
168  auto session_it = get_session_from_map(session, sessions_);
169  try {
170  check_session_exp_unsafe(session_it);
171  } catch (const ForceDisconnect& e) {
172  disconnect_impl(session_it, write_lock);
173  THROW_MAPD_EXCEPTION(e.what());
174  }
175  return session_it;
176 }
177 
178 template <>
180  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
181  std::vector<std::string> expired_sessions;
182  for (auto session_pair : sessions_) {
183  auto session_it = get_session_from_map(session_pair.first, sessions_);
184  try {
185  check_session_exp_unsafe(session_it);
186  } catch (const ForceDisconnect& e) {
187  expired_sessions.emplace_back(session_it->second->get_session_id());
188  }
189  }
190 
191  for (auto session_id : expired_sessions) {
192  if (leaf_aggregator_.leafCount() > 0) {
193  try {
194  leaf_aggregator_.disconnect(session_id);
195  } catch (TOmniSciException& toe) {
196  LOG(INFO) << " Problem disconnecting from leaves : " << toe.what();
197  } catch (std::exception& e) {
198  LOG(INFO)
199  << " Problem disconnecting from leaves, check leaf logs for additonal info";
200  }
201  }
202  sessions_.erase(session_id);
203  }
204  if (render_handler_) {
205  write_lock.unlock();
206  for (auto session_id : expired_sessions) {
207  // NOTE: the render disconnect is done after the session lock is released to
208  // avoid a deadlock. See: https://omnisci.atlassian.net/browse/BE-3324
209  // This out-of-scope solution is a compromise for now until a better session
210  // handling/locking mechanism is developed for the renderer. Note as well that the
211  // session_id cannot be immediately reused. If a render request were to slip in
212  // after the lock is released and before the render disconnect could cause a
213  // problem.
214  render_handler_->disconnect(session_id);
215  }
216  }
217 }
218 
219 #ifdef ENABLE_GEOS
220 extern std::unique_ptr<std::string> g_libgeos_so_filename;
221 #endif
222 
223 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
224  const std::vector<LeafHostInfo>& string_leaves,
225  const std::string& base_data_path,
226  const bool allow_multifrag,
227  const bool jit_debug,
228  const bool intel_jit_profile,
229  const bool read_only,
230  const bool allow_loop_joins,
231  const bool enable_rendering,
232  const bool renderer_use_vulkan_driver,
233  const bool enable_auto_clear_render_mem,
234  const int render_oom_retry_threshold,
235  const size_t render_mem_bytes,
236  const size_t max_concurrent_render_sessions,
237  const size_t reserved_gpu_mem,
238  const bool render_compositor_use_last_gpu,
239  const size_t num_reader_threads,
240  const AuthMetadata& authMetadata,
241  SystemParameters& system_parameters,
242  const bool legacy_syntax,
243  const int idle_session_duration,
244  const int max_session_duration,
245  const bool enable_runtime_udf_registration,
246  const std::string& udf_filename,
247  const std::string& clang_path,
248  const std::vector<std::string>& clang_options,
249 #ifdef ENABLE_GEOS
250  const std::string& libgeos_so_filename,
251 #endif
252  const File_Namespace::DiskCacheConfig& disk_cache_config,
253  const bool is_new_db)
254  : leaf_aggregator_(db_leaves)
255  , db_leaves_(db_leaves)
256  , string_leaves_(string_leaves)
257  , base_data_path_(base_data_path)
258  , random_gen_(std::random_device{}())
259  , session_id_dist_(0, INT32_MAX)
260  , jit_debug_(jit_debug)
261  , intel_jit_profile_(intel_jit_profile)
262  , allow_multifrag_(allow_multifrag)
263  , read_only_(read_only)
264  , allow_loop_joins_(allow_loop_joins)
265  , authMetadata_(authMetadata)
266  , system_parameters_(system_parameters)
267  , legacy_syntax_(legacy_syntax)
268  , dispatch_queue_(
269  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
270  , super_user_rights_(false)
271  , idle_session_duration_(idle_session_duration * 60)
272  , max_session_duration_(max_session_duration * 60)
273  , runtime_udf_registration_enabled_(enable_runtime_udf_registration)
274 
275  , enable_rendering_(enable_rendering)
276  , renderer_use_vulkan_driver_(renderer_use_vulkan_driver)
277  , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
278  , render_oom_retry_threshold_(render_oom_retry_threshold)
279  , max_concurrent_render_sessions_(max_concurrent_render_sessions)
280  , reserved_gpu_mem_(reserved_gpu_mem)
281  , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
282  , render_mem_bytes_(render_mem_bytes)
283  , num_reader_threads_(num_reader_threads)
284 #ifdef ENABLE_GEOS
285  , libgeos_so_filename_(libgeos_so_filename)
286 #endif
287  , disk_cache_config_(disk_cache_config)
288  , udf_filename_(udf_filename)
289  , clang_path_(clang_path)
290  , clang_options_(clang_options)
291 
292 {
293  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
294  initialize(is_new_db);
295 }
296 
297 void DBHandler::initialize(const bool is_new_db) {
298  if (!initialized_) {
299  initialized_ = true;
300  } else {
302  "Server already initialized; service restart required to activate any new "
303  "entitlements.");
304  return;
305  }
306 
309  cpu_mode_only_ = true;
310  } else {
311 #ifdef HAVE_CUDA
313  cpu_mode_only_ = false;
314 #else
316  LOG(WARNING) << "This build isn't CUDA enabled, will run on CPU";
317  cpu_mode_only_ = true;
318 #endif
319  }
320 
321  bool is_rendering_enabled = enable_rendering_;
322  if (system_parameters_.num_gpus == 0) {
323  is_rendering_enabled = false;
324  }
325 
326  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
327  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
328  // manage cannot ask for
329  size_t total_reserved = reserved_gpu_mem_;
330  if (is_rendering_enabled) {
331  total_reserved += render_mem_bytes_;
332  }
333 
334  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
335 #ifdef HAVE_CUDA
336  if (!cpu_mode_only_ || is_rendering_enabled) {
337  try {
338  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
340  } catch (const std::exception& e) {
341  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
342  << e.what();
343  cpu_mode_only_ = true;
344  is_rendering_enabled = false;
345  }
346  }
347 #endif // HAVE_CUDA
348 
349  try {
350  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
352  std::move(cuda_mgr),
354  total_reserved,
357  } catch (const std::exception& e) {
358  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
359  }
360 
361  std::string udf_ast_filename("");
362 
363  try {
364  if (!udf_filename_.empty()) {
365  const auto cuda_mgr = data_mgr_->getCudaMgr();
366  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
367  cuda_mgr ? cuda_mgr->getDeviceArch()
369  UdfCompiler compiler(device_arch, clang_path_, clang_options_);
370 
371  const auto [cpu_udf_ir_file, cuda_udf_ir_file] = compiler.compileUdf(udf_filename_);
372  Executor::addUdfIrToModule(cpu_udf_ir_file, /*is_cuda_ir=*/false);
373  if (!cuda_udf_ir_file.empty()) {
374  Executor::addUdfIrToModule(cuda_udf_ir_file, /*is_cuda_ir=*/true);
375  }
376  udf_ast_filename = compiler.getAstFileName(udf_filename_);
377  }
378  } catch (const std::exception& e) {
379  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
380  }
381 
382  try {
383  calcite_ =
384  std::make_shared<Calcite>(system_parameters_, base_data_path_, udf_ast_filename);
385  } catch (const std::exception& e) {
386  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
387  }
388 
389  try {
390  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
391  if (!udf_filename_.empty()) {
392  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
393  }
394  } catch (const std::exception& e) {
395  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
396  }
397 
398  try {
400  } catch (const std::exception& e) {
401  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
402  }
403 
404  try {
405  auto udtfs = ThriftSerializers::to_thrift(
407  std::vector<TUserDefinedFunction> udfs = {};
408  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
409  } catch (const std::exception& e) {
410  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
411  }
412 
413  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
415  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
416  cpu_mode_only_ = true;
417  }
418 
419  switch (executor_device_type_) {
421  LOG(INFO) << "Started in GPU mode" << std::endl;
422  break;
424  LOG(INFO) << "Started in CPU mode" << std::endl;
425  break;
426  }
427 
428  try {
430  SysCatalog::instance().init(base_data_path_,
431  data_mgr_,
433  calcite_,
434  is_new_db,
435  !db_leaves_.empty(),
437  } catch (const std::exception& e) {
438  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
439  }
440 
441  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
442  start_time_ = std::time(nullptr);
443 
444  if (is_rendering_enabled) {
445  try {
446  render_handler_.reset(new RenderHandler(this,
450  false,
451  0,
452  false,
454  } catch (const std::exception& e) {
455  LOG(ERROR) << "Backend rendering disabled: " << e.what();
456  }
457  }
458 
459  if (leaf_aggregator_.leafCount() > 0) {
460  try {
461  agg_handler_.reset(new MapDAggHandler(this));
462  } catch (const std::exception& e) {
463  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
464  }
465  } else if (g_cluster) {
466  try {
467  leaf_handler_.reset(new MapDLeafHandler(this));
468  } catch (const std::exception& e) {
469  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
470  }
471  }
472 
473 #ifdef ENABLE_GEOS
474  if (!libgeos_so_filename_.empty()) {
475  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename_));
476  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
477  }
478 #endif
479 }
480 
482  shutdown();
483 }
484 
486  const std::string& query_str,
487  std::list<std::unique_ptr<Parser::Stmt>>& parse_trees) {
488  int num_parse_errors = 0;
489  std::string last_parsed;
490  SQLParser parser;
491  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
492  if (num_parse_errors > 0) {
493  throw std::runtime_error("Syntax error at: " + last_parsed);
494  }
495  if (parse_trees.size() > 1) {
496  throw std::runtime_error("multiple SQL statements not allowed");
497  } else if (parse_trees.size() != 1) {
498  throw std::runtime_error("empty SQL statment not allowed");
499  }
500 }
501 void DBHandler::check_read_only(const std::string& str) {
502  if (DBHandler::read_only_) {
503  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
504  }
505 }
506 
508  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
509  // We would create an in memory session for calcite with super user privileges which
510  // would be used for getting all tables metadata when a user runs the query. The
511  // session would be under the name of a proxy user/password which would only persist
512  // till server's lifetime or execution of calcite query(in memory) whichever is the
513  // earliest.
514  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
515  std::string session_id;
516  do {
517  session_id = generate_random_string(64);
518  } while (sessions_.find(session_id) != sessions_.end());
519  Catalog_Namespace::UserMetadata user_meta(-1,
520  calcite_->getInternalSessionProxyUserName(),
521  calcite_->getInternalSessionProxyPassword(),
522  true,
523  -1,
524  true,
525  false);
526  const auto emplace_ret =
527  sessions_.emplace(session_id,
528  std::make_shared<Catalog_Namespace::SessionInfo>(
529  catalog_ptr, user_meta, executor_device_type_, session_id));
530  CHECK(emplace_ret.second);
531  return session_id;
532 }
533 
535  const Catalog_Namespace::UserMetadata user_meta) {
536  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
537  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
538  user_meta.isSuper.load();
539 }
540 
541 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
542  // Remove InMemory calcite Session.
543  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
544  const auto it = sessions_.find(session_id);
545  CHECK(it != sessions_.end());
546  sessions_.erase(it);
547 }
548 
549 // internal connection for connections with no password
550 void DBHandler::internal_connect(TSessionId& session,
551  const std::string& username,
552  const std::string& dbname) {
553  auto stdlog = STDLOG(); // session_info set by connect_impl()
554  std::string username2 = username; // login() may reset username given as argument
555  std::string dbname2 = dbname; // login() may reset dbname given as argument
557  std::shared_ptr<Catalog> cat = nullptr;
558  try {
559  cat =
560  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
561  } catch (std::exception& e) {
562  THROW_MAPD_EXCEPTION(e.what());
563  }
564 
565  DBObject dbObject(dbname2, DatabaseDBObjectType);
566  dbObject.loadKey(*cat);
568  std::vector<DBObject> dbObjects;
569  dbObjects.push_back(dbObject);
570  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
571  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
572  " is not allowed to access database " + dbname2 + ".");
573  }
574  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
575 }
576 
578  return leaf_aggregator_.leafCount() > 0;
579 }
580 
581 void DBHandler::krb5_connect(TKrb5Session& session,
582  const std::string& inputToken,
583  const std::string& dbname) {
584  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
585 }
586 
587 void DBHandler::connect(TSessionId& session,
588  const std::string& username,
589  const std::string& passwd,
590  const std::string& dbname) {
591  auto stdlog = STDLOG(); // session_info set by connect_impl()
592  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
593  std::string username2 = username; // login() may reset username given as argument
594  std::string dbname2 = dbname; // login() may reset dbname given as argument
596  std::shared_ptr<Catalog> cat = nullptr;
597  try {
598  cat = SysCatalog::instance().login(
599  dbname2, username2, passwd, user_meta, !super_user_rights_);
600  } catch (std::exception& e) {
601  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
602  THROW_MAPD_EXCEPTION(e.what());
603  }
604 
605  DBObject dbObject(dbname2, DatabaseDBObjectType);
606  dbObject.loadKey(*cat);
608  std::vector<DBObject> dbObjects;
609  dbObjects.push_back(dbObject);
610  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
611  stdlog.appendNameValuePairs(
612  "user", username, "db", dbname, "exception", "Missing Privileges");
613  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
614  " is not allowed to access database " + dbname2 + ".");
615  }
616  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
617 
618  // Restriction is returned as part of the users metadata on login but
619  // is per session so transfering it over here
620  // Currently only SAML can even set a Restriction
621  auto restriction = std::make_shared<Restriction>(user_meta.restriction);
622  auto login_session = get_session_ptr(session);
623  login_session->set_restriction(restriction);
624 
625  // if pki auth session will come back encrypted with user pubkey
626  SysCatalog::instance().check_for_session_encryption(passwd, session);
627 }
628 
629 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::create_new_session(
630  TSessionId& session,
631  const std::string& dbname,
632  const Catalog_Namespace::UserMetadata& user_meta,
633  std::shared_ptr<Catalog> cat) {
634  do {
635  session = generate_random_string(32);
636  } while (sessions_.find(session) != sessions_.end());
637  std::pair<SessionMap::iterator, bool> emplace_retval =
638  sessions_.emplace(session,
639  std::make_shared<Catalog_Namespace::SessionInfo>(
640  cat, user_meta, executor_device_type_, session));
641  CHECK(emplace_retval.second);
642  auto& session_ptr = emplace_retval.first->second;
643  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
644  return session_ptr;
645 }
646 
647 void DBHandler::connect_impl(TSessionId& session,
648  const std::string& passwd,
649  const std::string& dbname,
650  const Catalog_Namespace::UserMetadata& user_meta,
651  std::shared_ptr<Catalog> cat,
652  query_state::StdLog& stdlog) {
653  // TODO(sy): Is there any reason to have dbname as a parameter
654  // here when the cat parameter already provides cat->name()?
655  // Should dbname and cat->name() ever differ?
656  {
657  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
658  expire_idle_sessions_unsafe(write_lock);
660  sessions_.size() + 1 > static_cast<size_t>(system_parameters_.num_sessions)) {
661  THROW_MAPD_EXCEPTION("Too many active sessions");
662  }
663  }
664  {
665  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
666  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
667  stdlog.setSessionInfo(session_ptr);
668  session_ptr->set_connection_info(getConnectionInfo().toString());
669  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
670  // while doing warmup
671  if (leaf_aggregator_.leafCount() > 0) {
672  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
673  return;
674  }
675  }
676  }
677  auto const roles =
678  stdlog.getConstSessionInfo()->get_currentUser().isSuper
679  ? std::vector<std::string>{{"super"}}
680  : SysCatalog::instance().getRoles(
681  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
682  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
683 }
684 
685 void DBHandler::disconnect(const TSessionId& session) {
686  auto stdlog = STDLOG();
687  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
688 
689  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
690  auto session_it = get_session_it_unsafe(session, write_lock);
691  stdlog.setSessionInfo(session_it->second);
692  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
693 
694  LOG(INFO) << "User " << session_it->second->get_currentUser().userLoggable()
695  << " disconnected from database " << dbname
696  << " with public_session_id: " << session_it->second->get_public_session_id();
697 
698  disconnect_impl(session_it, write_lock);
699 }
700 
701 void DBHandler::disconnect_impl(const SessionMap::iterator& session_it,
702  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
703  // session_it existence should already have been checked (i.e. called via
704  // get_session_it_unsafe(...))
705 
706  const auto session_id = session_it->second->get_session_id();
707  std::exception_ptr leaf_exception = nullptr;
708  try {
709  if (leaf_aggregator_.leafCount() > 0) {
710  leaf_aggregator_.disconnect(session_id);
711  }
712  } catch (...) {
713  leaf_exception = std::current_exception();
714  }
715 
716  {
717  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
718  render_group_assignment_map_.erase(session_id);
719  }
720 
721  sessions_.erase(session_it);
722  write_lock.unlock();
723 
724  if (render_handler_) {
725  render_handler_->disconnect(session_id);
726  }
727 }
728 
729 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
730  auto stdlog = STDLOG(get_session_ptr(session));
731  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
732  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
733  auto session_it = get_session_it_unsafe(session, write_lock);
734 
735  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
736 
737  try {
738  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
739  dbname2, session_it->second->get_currentUser().userName);
740  session_it->second->set_catalog_ptr(cat);
741  if (leaf_aggregator_.leafCount() > 0) {
742  leaf_aggregator_.switch_database(session, dbname);
743  return;
744  }
745  } catch (std::exception& e) {
746  THROW_MAPD_EXCEPTION(e.what());
747  }
748 }
749 
750 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
751  auto stdlog = STDLOG(get_session_ptr(session1));
752  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
753  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
754  auto session_it = get_session_it_unsafe(session1, write_lock);
755 
756  try {
757  const Catalog_Namespace::UserMetadata& user_meta =
758  session_it->second->get_currentUser();
759  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
760  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
761  if (leaf_aggregator_.leafCount() > 0) {
762  leaf_aggregator_.clone_session(session1, session2);
763  return;
764  }
765  } catch (std::exception& e) {
766  THROW_MAPD_EXCEPTION(e.what());
767  }
768 }
769 
770 void DBHandler::interrupt(const TSessionId& query_session,
771  const TSessionId& interrupt_session) {
772  // if this is for distributed setting, query_session becomes a parent session (agg)
773  // and the interrupt session is one of existing session in the leaf node (leaf)
774  // so we can think there exists a logical mapping
775  // between query_session (agg) and interrupt_session (leaf)
776  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
777  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
778  const auto allow_query_interrupt =
780  if (g_enable_dynamic_watchdog || allow_query_interrupt) {
781  // Shared lock to allow simultaneous interrupts of multiple sessions
782  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
783 
784  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
785  auto& cat = session_it->second.get()->getCatalog();
786  const auto dbname = cat.getCurrentDB().dbName;
788  jit_debug_ ? "/tmp" : "",
789  jit_debug_ ? "mapdquery" : "",
791  CHECK(executor);
792 
793  if (leaf_aggregator_.leafCount() > 0) {
794  leaf_aggregator_.interrupt(query_session, interrupt_session);
795  }
796  auto target_executor_ids = executor->getExecutorIdsRunningQuery(query_session);
797  if (target_executor_ids.empty()) {
798  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
799  if (executor->checkIsQuerySessionEnrolled(query_session, session_read_lock)) {
800  session_read_lock.unlock();
801  VLOG(1) << "Received interrupt: "
802  << "Session " << *session_it->second << ", User "
803  << session_it->second->get_currentUser().userLoggable() << ", Database "
804  << dbname << std::endl;
805  executor->interrupt(query_session, interrupt_session);
806  }
807  } else {
808  for (auto& executor_id : target_executor_ids) {
809  VLOG(1) << "Received interrupt: "
810  << "Session " << *session_it->second << ", Executor " << executor_id
811  << ", User " << session_it->second->get_currentUser().userLoggable()
812  << ", Database " << dbname << std::endl;
813  auto target_executor = Executor::getExecutor(executor_id);
814  target_executor->interrupt(query_session, interrupt_session);
815  }
816  }
817 
818  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
819  << " interrupted session with database " << dbname << std::endl;
820  }
821 }
822 
824  if (g_cluster) {
825  if (leaf_aggregator_.leafCount() > 0) {
826  return TRole::type::AGGREGATOR;
827  }
828  return TRole::type::LEAF;
829  }
830  return TRole::type::SERVER;
831 }
832 
833 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
834  auto stdlog = STDLOG(get_session_ptr(session));
835  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
836  const auto rendering_enabled = bool(render_handler_);
837  _return.read_only = read_only_;
838  _return.version = MAPD_RELEASE;
839  _return.rendering_enabled = rendering_enabled;
840  _return.start_time = start_time_;
841  _return.edition = MAPD_EDITION;
842  _return.host_name = omnisci::get_hostname();
843  _return.poly_rendering_enabled = rendering_enabled;
844  _return.role = getServerRole();
845  _return.renderer_status_json =
846  render_handler_ ? render_handler_->get_renderer_status_json() : "";
847 }
848 
849 void DBHandler::get_status(std::vector<TServerStatus>& _return,
850  const TSessionId& session) {
851  //
852  // get_status() is now called locally at startup on the aggregator
853  // in order to validate that all nodes of a cluster are running the
854  // same software version and the same renderer status
855  //
856  // In that context, it is called with the InvalidSessionID, and
857  // with the local super-user flag set.
858  //
859  // Hence, we allow this session-less mode only in distributed mode, and
860  // then on a leaf (always), or on the aggregator (only in super-user mode)
861  //
862  auto const allow_invalid_session = g_cluster && (!isAggregator() || super_user_rights_);
863  if (!allow_invalid_session || session != getInvalidSessionId()) {
864  auto stdlog = STDLOG(get_session_ptr(session));
865  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
866  } else {
867  LOG(INFO) << "get_status() called in session-less mode";
868  }
869  const auto rendering_enabled = bool(render_handler_);
870  TServerStatus ret;
871  ret.read_only = read_only_;
872  ret.version = MAPD_RELEASE;
873  ret.rendering_enabled = rendering_enabled;
874  ret.start_time = start_time_;
875  ret.edition = MAPD_EDITION;
876  ret.host_name = omnisci::get_hostname();
877  ret.poly_rendering_enabled = rendering_enabled;
878  ret.role = getServerRole();
879  ret.renderer_status_json =
880  render_handler_ ? render_handler_->get_renderer_status_json() : "";
881 
882  _return.push_back(ret);
883  if (leaf_aggregator_.leafCount() > 0) {
884  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
885  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
886  }
887 }
888 
889 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
890  const TSessionId& session) {
891  auto stdlog = STDLOG(get_session_ptr(session));
892  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
893  THardwareInfo ret;
894  const auto cuda_mgr = data_mgr_->getCudaMgr();
895  if (cuda_mgr) {
896  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
897  ret.start_gpu = cuda_mgr->getStartGpu();
898  if (ret.start_gpu >= 0) {
899  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
900  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
901  }
902  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
903  TGpuSpecification gpu_spec;
904  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
905  gpu_spec.num_sm = deviceProperties->numMPs;
906  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
907  gpu_spec.memory = deviceProperties->globalMem;
908  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
909  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
910  ret.gpu_info.push_back(gpu_spec);
911  }
912  }
913 
914  // start hardware/OS dependent code
915  ret.num_cpu_hw = std::thread::hardware_concurrency();
916  // ^ This might return diffrent results in case of hyper threading
917  // end hardware/OS dependent code
918 
919  _return.hardware_info.push_back(ret);
920  if (leaf_aggregator_.leafCount() > 0) {
921  ret.host_name = "aggregator";
922  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
923  _return.hardware_info.insert(_return.hardware_info.end(),
924  leaf_hardware.hardware_info.begin(),
925  leaf_hardware.hardware_info.end());
926  }
927 }
928 
929 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
930  auto session_ptr = get_session_ptr(session);
931  CHECK(session_ptr);
932  auto stdlog = STDLOG(session_ptr);
933  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
934  auto user_metadata = session_ptr->get_currentUser();
935 
936  _return.user = user_metadata.userName;
937  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
938  _return.start_time = session_ptr->get_start_time();
939  _return.is_super = user_metadata.isSuper;
940 }
941 
943  const SQLTypeInfo& ti,
944  TColumn& column) {
945  if (ti.is_array()) {
946  TColumn tColumn;
947  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
948  CHECK(array_tv);
949  bool is_null = !array_tv->is_initialized();
950  if (!is_null) {
951  const auto& vec = array_tv->get();
952  for (const auto& elem_tv : vec) {
953  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
954  }
955  }
956  column.data.arr_col.push_back(tColumn);
957  column.nulls.push_back(is_null && !ti.get_notnull());
958  } else if (ti.is_geometry()) {
959  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
960  if (scalar_tv) {
961  auto s_n = boost::get<NullableString>(scalar_tv);
962  auto s = boost::get<std::string>(s_n);
963  if (s) {
964  column.data.str_col.push_back(*s);
965  } else {
966  column.data.str_col.emplace_back(""); // null string
967  auto null_p = boost::get<void*>(s_n);
968  CHECK(null_p && !*null_p);
969  }
970  column.nulls.push_back(!s && !ti.get_notnull());
971  } else {
972  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
973  CHECK(array_tv);
974  bool is_null = !array_tv->is_initialized();
975  if (!is_null) {
976  auto elem_type = SQLTypeInfo(kDOUBLE, false);
977  TColumn tColumn;
978  const auto& vec = array_tv->get();
979  for (const auto& elem_tv : vec) {
980  value_to_thrift_column(elem_tv, elem_type, tColumn);
981  }
982  column.data.arr_col.push_back(tColumn);
983  column.nulls.push_back(false);
984  } else {
985  TColumn tColumn;
986  column.data.arr_col.push_back(tColumn);
987  column.nulls.push_back(is_null && !ti.get_notnull());
988  }
989  }
990  } else {
991  CHECK(!ti.is_column());
992  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
993  CHECK(scalar_tv);
994  if (boost::get<int64_t>(scalar_tv)) {
995  int64_t data = *(boost::get<int64_t>(scalar_tv));
996 
997  if (ti.is_decimal()) {
998  double val = static_cast<double>(data);
999  if (ti.get_scale() > 0) {
1000  val /= pow(10.0, std::abs(ti.get_scale()));
1001  }
1002  column.data.real_col.push_back(val);
1003  } else {
1004  column.data.int_col.push_back(data);
1005  }
1006 
1007  switch (ti.get_type()) {
1008  case kBOOLEAN:
1009  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
1010  break;
1011  case kTINYINT:
1012  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
1013  break;
1014  case kSMALLINT:
1015  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
1016  break;
1017  case kINT:
1018  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
1019  break;
1020  case kNUMERIC:
1021  case kDECIMAL:
1022  case kBIGINT:
1023  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1024  break;
1025  case kTIME:
1026  case kTIMESTAMP:
1027  case kDATE:
1028  case kINTERVAL_DAY_TIME:
1029  case kINTERVAL_YEAR_MONTH:
1030  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1031  break;
1032  default:
1033  column.nulls.push_back(false);
1034  }
1035  } else if (boost::get<double>(scalar_tv)) {
1036  double data = *(boost::get<double>(scalar_tv));
1037  column.data.real_col.push_back(data);
1038  if (ti.get_type() == kFLOAT) {
1039  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1040  } else {
1041  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
1042  }
1043  } else if (boost::get<float>(scalar_tv)) {
1044  CHECK_EQ(kFLOAT, ti.get_type());
1045  float data = *(boost::get<float>(scalar_tv));
1046  column.data.real_col.push_back(data);
1047  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1048  } else if (boost::get<NullableString>(scalar_tv)) {
1049  auto s_n = boost::get<NullableString>(scalar_tv);
1050  auto s = boost::get<std::string>(s_n);
1051  if (s) {
1052  column.data.str_col.push_back(*s);
1053  } else {
1054  column.data.str_col.emplace_back(""); // null string
1055  auto null_p = boost::get<void*>(s_n);
1056  CHECK(null_p && !*null_p);
1057  }
1058  column.nulls.push_back(!s && !ti.get_notnull());
1059  } else {
1060  CHECK(false);
1061  }
1062  }
1063 }
1064 
1066  TDatum datum;
1067  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1068  if (!scalar_tv) {
1069  CHECK(ti.is_array());
1070  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1071  CHECK(array_tv);
1072  if (array_tv->is_initialized()) {
1073  const auto& vec = array_tv->get();
1074  for (const auto& elem_tv : vec) {
1075  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
1076  datum.val.arr_val.push_back(scalar_col_val);
1077  }
1078  // Datum is not null, at worst it's an empty array Datum
1079  datum.is_null = false;
1080  } else {
1081  datum.is_null = true;
1082  }
1083  return datum;
1084  }
1085  if (boost::get<int64_t>(scalar_tv)) {
1086  int64_t data = *(boost::get<int64_t>(scalar_tv));
1087 
1088  if (ti.is_decimal()) {
1089  double val = static_cast<double>(data);
1090  if (ti.get_scale() > 0) {
1091  val /= pow(10.0, std::abs(ti.get_scale()));
1092  }
1093  datum.val.real_val = val;
1094  } else {
1095  datum.val.int_val = data;
1096  }
1097 
1098  switch (ti.get_type()) {
1099  case kBOOLEAN:
1100  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
1101  break;
1102  case kTINYINT:
1103  datum.is_null = (datum.val.int_val == NULL_TINYINT);
1104  break;
1105  case kSMALLINT:
1106  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
1107  break;
1108  case kINT:
1109  datum.is_null = (datum.val.int_val == NULL_INT);
1110  break;
1111  case kDECIMAL:
1112  case kNUMERIC:
1113  case kBIGINT:
1114  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1115  break;
1116  case kTIME:
1117  case kTIMESTAMP:
1118  case kDATE:
1119  case kINTERVAL_DAY_TIME:
1120  case kINTERVAL_YEAR_MONTH:
1121  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1122  break;
1123  default:
1124  datum.is_null = false;
1125  }
1126  } else if (boost::get<double>(scalar_tv)) {
1127  datum.val.real_val = *(boost::get<double>(scalar_tv));
1128  if (ti.get_type() == kFLOAT) {
1129  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1130  } else {
1131  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
1132  }
1133  } else if (boost::get<float>(scalar_tv)) {
1134  CHECK_EQ(kFLOAT, ti.get_type());
1135  datum.val.real_val = *(boost::get<float>(scalar_tv));
1136  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1137  } else if (boost::get<NullableString>(scalar_tv)) {
1138  auto s_n = boost::get<NullableString>(scalar_tv);
1139  auto s = boost::get<std::string>(s_n);
1140  if (s) {
1141  datum.val.str_val = *s;
1142  } else {
1143  auto null_p = boost::get<void*>(s_n);
1144  CHECK(null_p && !*null_p);
1145  }
1146  datum.is_null = !s;
1147  } else {
1148  CHECK(false);
1149  }
1150  return datum;
1151 }
1152 
1154  TQueryResult& _return,
1155  const QueryStateProxy& query_state_proxy,
1156  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1157  const std::string& query_str,
1158  const bool column_format,
1159  const std::string& nonce,
1160  const int32_t first_n,
1161  const int32_t at_most_n,
1162  const bool use_calcite) {
1163  _return.total_time_ms = 0;
1164  _return.nonce = nonce;
1165  ParserWrapper pw{query_str};
1166  switch (pw.getQueryType()) {
1168  _return.query_type = TQueryType::READ;
1169  VLOG(1) << "query type: READ";
1170  break;
1171  }
1173  _return.query_type = TQueryType::WRITE;
1174  VLOG(1) << "query type: WRITE";
1175  break;
1176  }
1178  _return.query_type = TQueryType::SCHEMA_READ;
1179  VLOG(1) << "query type: SCHEMA READ";
1180  break;
1181  }
1183  _return.query_type = TQueryType::SCHEMA_WRITE;
1184  VLOG(1) << "query type: SCHEMA WRITE";
1185  break;
1186  }
1187  default: {
1188  _return.query_type = TQueryType::UNKNOWN;
1189  LOG(WARNING) << "query type: UNKNOWN";
1190  break;
1191  }
1192  }
1194  _return.total_time_ms += measure<>::execution([&]() {
1196  query_state_proxy,
1197  column_format,
1198  session_ptr->get_executor_device_type(),
1199  first_n,
1200  at_most_n,
1201  use_calcite);
1203  _return, result, query_state_proxy, query_str, column_format, first_n, at_most_n);
1204  });
1205 }
1206 
1207 void DBHandler::convertData(TQueryResult& _return,
1209  const QueryStateProxy& query_state_proxy,
1210  const std::string& query_str,
1211  const bool column_format,
1212  const int32_t first_n,
1213  const int32_t at_most_n) {
1214  _return.execution_time_ms += result.getExecutionTime();
1215  if (result.empty()) {
1216  return;
1217  }
1218 
1219  switch (result.getResultType()) {
1221  convertRows(_return,
1222  query_state_proxy,
1223  result.getTargetsMeta(),
1224  *result.getRows(),
1225  column_format,
1226  first_n,
1227  at_most_n);
1228  break;
1230  convertResult(_return, *result.getRows(), true);
1231  break;
1233  convertExplain(_return, *result.getRows(), true);
1234  break;
1236  convertRows(_return,
1237  query_state_proxy,
1238  result.getTargetsMeta(),
1239  *result.getRows(),
1240  column_format,
1241  -1,
1242  -1);
1243  break;
1244  }
1245 }
1246 
1247 void DBHandler::sql_execute(TQueryResult& _return,
1248  const TSessionId& session,
1249  const std::string& query_str,
1250  const bool column_format,
1251  const std::string& nonce,
1252  const int32_t first_n,
1253  const int32_t at_most_n) {
1254  const std::string exec_ra_prefix = "execute relalg";
1255  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1256  auto actual_query =
1257  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1258  auto session_ptr = get_session_ptr(session);
1259  auto query_state = create_query_state(session_ptr, actual_query);
1260  auto stdlog = STDLOG(session_ptr, query_state);
1261  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1262  stdlog.appendNameValuePairs("nonce", nonce);
1263  auto timer = DEBUG_TIMER(__func__);
1264  try {
1265  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
1266  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1267  };
1268 
1269  if (first_n >= 0 && at_most_n >= 0) {
1271  std::string("At most one of first_n and at_most_n can be set"));
1272  }
1273 
1274  if (leaf_aggregator_.leafCount() > 0) {
1275  if (!agg_handler_) {
1276  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
1277  }
1278  _return.total_time_ms = measure<>::execution([&]() {
1279  agg_handler_->cluster_execute(_return,
1280  query_state->createQueryStateProxy(),
1281  query_state->getQueryStr(),
1282  column_format,
1283  nonce,
1284  first_n,
1285  at_most_n,
1287  });
1288  _return.nonce = nonce;
1289  } else {
1290  sql_execute_local(_return,
1291  query_state->createQueryStateProxy(),
1292  session_ptr,
1293  actual_query,
1294  column_format,
1295  nonce,
1296  first_n,
1297  at_most_n,
1298  use_calcite);
1299  }
1300  _return.total_time_ms += process_geo_copy_from(session);
1301  std::string debug_json = timer.stopAndGetJson();
1302  if (!debug_json.empty()) {
1303  _return.__set_debug(std::move(debug_json));
1304  }
1305  stdlog.appendNameValuePairs(
1306  "execution_time_ms",
1307  _return.execution_time_ms,
1308  "total_time_ms", // BE-3420 - Redundant with duration field
1309  stdlog.duration<std::chrono::milliseconds>());
1310  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1311  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1312  } catch (const std::exception& e) {
1313  if (strstr(e.what(), "java.lang.NullPointerException")) {
1314  THROW_MAPD_EXCEPTION("query failed from broken view or other schema related issue");
1315  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1316  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1317  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1318  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1319  } else {
1320  THROW_MAPD_EXCEPTION(e.what());
1321  }
1322  }
1323 }
1324 
1326  const TSessionId& session,
1327  const std::string& query_str,
1328  const bool column_format,
1329  const int32_t first_n,
1330  const int32_t at_most_n) {
1331  const std::string exec_ra_prefix = "execute relalg";
1332  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1333  auto actual_query =
1334  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1335 
1336  auto session_ptr = get_session_ptr(session);
1337  CHECK(session_ptr);
1338  auto query_state = create_query_state(session_ptr, actual_query);
1339  auto stdlog = STDLOG(session_ptr, query_state);
1340  auto timer = DEBUG_TIMER(__func__);
1341 
1342  try {
1343  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
1344  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1345  };
1346 
1347  if (first_n >= 0 && at_most_n >= 0) {
1349  std::string("At most one of first_n and at_most_n can be set"));
1350  }
1351  auto total_time_ms = measure<>::execution([&]() {
1353  query_state->createQueryStateProxy(),
1354  column_format,
1355  session_ptr->get_executor_device_type(),
1356  first_n,
1357  at_most_n,
1358  use_calcite);
1359  });
1360 
1361  _return.setExecutionTime(total_time_ms + process_geo_copy_from(session));
1362 
1363  stdlog.appendNameValuePairs(
1364  "execution_time_ms",
1365  _return.getExecutionTime(),
1366  "total_time_ms", // BE-3420 - Redundant with duration field
1367  stdlog.duration<std::chrono::milliseconds>());
1368  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1369  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1370  } catch (const std::exception& e) {
1371  if (strstr(e.what(), "java.lang.NullPointerException")) {
1372  THROW_MAPD_EXCEPTION("query failed from broken view or other schema related issue");
1373  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1374  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1375  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1376  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1377  } else {
1378  THROW_MAPD_EXCEPTION(e.what());
1379  }
1380  }
1381 }
1382 
1383 int64_t DBHandler::process_geo_copy_from(const TSessionId& session_id) {
1384  int64_t total_time_ms(0);
1385  // if the SQL statement we just executed was a geo COPY FROM, the import
1386  // parameters were captured, and this flag set, so we do the actual import here
1387  if (auto geo_copy_from_state = geo_copy_from_sessions(session_id)) {
1388  // import_geo_table() calls create_table() which calls this function to
1389  // do the work, so reset the flag now to avoid executing this part a
1390  // second time at the end of that, which would fail as the table was
1391  // already created! Also reset the flag with a ScopeGuard on exiting
1392  // this function any other way, such as an exception from the code above!
1393  geo_copy_from_sessions.remove(session_id);
1394 
1395  // create table as replicated?
1396  TCreateParams create_params;
1397  if (geo_copy_from_state->geo_copy_from_partitions == "REPLICATED") {
1398  create_params.is_replicated = true;
1399  }
1400 
1401  // now do (and time) the import
1402  total_time_ms = measure<>::execution([&]() {
1404  session_id,
1405  geo_copy_from_state->geo_copy_from_table,
1406  geo_copy_from_state->geo_copy_from_file_name,
1407  copyparams_to_thrift(geo_copy_from_state->geo_copy_from_copy_params),
1408  TRowDescriptor(),
1409  create_params);
1410  });
1411  }
1412  return total_time_ms;
1413 }
1414 
1415 void DBHandler::sql_execute_df(TDataFrame& _return,
1416  const TSessionId& session,
1417  const std::string& query_str,
1418  const TDeviceType::type results_device_type,
1419  const int32_t device_id,
1420  const int32_t first_n,
1421  const TArrowTransport::type transport_method) {
1422  auto session_ptr = get_session_ptr(session);
1423  CHECK(session_ptr);
1424  auto query_state = create_query_state(session_ptr, query_str);
1425  auto stdlog = STDLOG(session_ptr, query_state);
1426 
1427  const auto executor_device_type = session_ptr->get_executor_device_type();
1428 
1429  if (results_device_type == TDeviceType::GPU) {
1430  if (executor_device_type != ExecutorDeviceType::GPU) {
1431  THROW_MAPD_EXCEPTION(std::string("GPU mode is not allowed in this session"));
1432  }
1433  if (!data_mgr_->gpusPresent()) {
1434  THROW_MAPD_EXCEPTION(std::string("No GPU is available in this server"));
1435  }
1436  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1438  std::string("Invalid device_id or unavailable GPU with this ID"));
1439  }
1440  }
1441  _return.execution_time_ms = 0;
1442 
1443  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
1446  auto query_state_proxy = query_state->createQueryStateProxy();
1447  try {
1448  ParserWrapper pw{query_str};
1449  if (!pw.is_ddl && !pw.is_update_dml &&
1450  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
1451  std::string query_ra;
1453  _return.execution_time_ms += measure<>::execution([&]() {
1454  TPlanResult result;
1455  std::tie(result, locks) =
1456  parse_to_ra(query_state_proxy, query_str, {}, true, system_parameters_);
1457  query_ra = result.plan_result;
1458  });
1459 
1460  if (pw.isCalciteExplain()) {
1461  throw std::runtime_error("explain is not unsupported by current thrift API");
1462  }
1465  executor->enrollQuerySession(session_ptr->get_session_id(),
1466  query_str,
1467  query_state->getQuerySubmittedTime(),
1469  QuerySessionStatus::QueryStatus::PENDING_QUEUE);
1470  }
1471  execute_rel_alg_df(_return,
1472  query_ra,
1473  query_state_proxy,
1474  *session_ptr,
1475  executor_device_type,
1476  results_device_type == TDeviceType::CPU
1479  static_cast<size_t>(device_id),
1480  first_n,
1481  transport_method);
1482  return;
1483  }
1484  } catch (std::exception& e) {
1485  THROW_MAPD_EXCEPTION(e.what());
1486  }
1487  THROW_MAPD_EXCEPTION("DDL or update DML are not unsupported by current thrift API");
1488 }
1489 
1490 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1491  const TSessionId& session,
1492  const std::string& query_str,
1493  const int32_t device_id,
1494  const int32_t first_n) {
1495  auto stdlog = STDLOG(get_session_ptr(session));
1496  sql_execute_df(_return,
1497  session,
1498  query_str,
1499  TDeviceType::GPU,
1500  device_id,
1501  first_n,
1502  TArrowTransport::SHARED_MEMORY);
1503 }
1504 
1505 // For now we have only one user of a data frame in all cases.
1506 void DBHandler::deallocate_df(const TSessionId& session,
1507  const TDataFrame& df,
1508  const TDeviceType::type device_type,
1509  const int32_t device_id) {
1510  auto stdlog = STDLOG(get_session_ptr(session));
1511  std::string serialized_cuda_handle = "";
1512  if (device_type == TDeviceType::GPU) {
1513  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1514  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1515  TOmniSciException ex;
1516  ex.error_msg = std::string(
1517  "Current data frame handle is not bookkept or been inserted "
1518  "twice");
1519  LOG(ERROR) << ex.error_msg;
1520  throw ex;
1521  }
1522  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1523  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1524  }
1525  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1526  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1528  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1530  result,
1531  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1532  device_id,
1533  data_mgr_);
1534 }
1535 
1536 void DBHandler::sql_validate(TRowDescriptor& _return,
1537  const TSessionId& session,
1538  const std::string& query_str) {
1539  try {
1540  auto stdlog = STDLOG(get_session_ptr(session));
1541  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1542  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1543  stdlog.setQueryState(query_state);
1544 
1545  ParserWrapper pw{query_str};
1546  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1547  pw.is_update_dml) {
1548  throw std::runtime_error("Can only validate SELECT statements.");
1549  }
1550 
1551  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
1554 
1555  TPlanResult parse_result;
1557  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1558  query_state->getQueryStr(),
1559  {},
1560  true,
1562  /*check_privileges=*/true);
1563  const auto query_ra = parse_result.plan_result;
1564 
1565  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1566  _return = fixup_row_descriptor(result.row_set.row_desc,
1567  query_state->getConstSessionInfo()->getCatalog());
1568  } catch (const std::exception& e) {
1569  THROW_MAPD_EXCEPTION(std::string(e.what()));
1570  }
1571 }
1572 
1573 namespace {
1574 
1576  std::unordered_set<std::string> uc_column_names;
1577  std::unordered_set<std::string> uc_column_table_qualifiers;
1578 };
1579 
1580 // Extract what looks like a (qualified) identifier from the partial query.
1581 // The results will be used to rank the auto-completion results: tables which
1582 // contain at least one of the identifiers first.
1584  const std::string& sql) {
1585  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1586  boost::regex::extended | boost::regex::icase};
1587  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1588  boost::sregex_token_iterator end;
1589  std::unordered_set<std::string> uc_column_names;
1590  std::unordered_set<std::string> uc_column_table_qualifiers;
1591  for (; tok_it != end; ++tok_it) {
1592  std::string column_name = *tok_it;
1593  std::vector<std::string> column_tokens;
1594  boost::split(column_tokens, column_name, boost::is_any_of("."));
1595  if (column_tokens.size() == 2) {
1596  // If the column name is qualified, take user's word.
1597  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1598  } else {
1599  uc_column_names.insert(to_upper(column_name));
1600  }
1601  }
1602  return {uc_column_names, uc_column_table_qualifiers};
1603 }
1604 
1605 } // namespace
1606 
1607 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1608  const TSessionId& session,
1609  const std::string& sql,
1610  const int cursor) {
1611  auto stdlog = STDLOG(get_session_ptr(session));
1612  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1613  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1614  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1615  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1616  proj_tokens.uc_column_names, visible_tables, stdlog);
1617  // Add the table qualifiers explicitly specified by the user.
1618  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1619  proj_tokens.uc_column_table_qualifiers.end());
1620  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1621  std::sort(
1622  hints.begin(),
1623  hints.end(),
1624  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1625  if (lhs.type == TCompletionHintType::TABLE &&
1626  rhs.type == TCompletionHintType::TABLE) {
1627  // Between two tables, one which is compatible with the specified
1628  // projections and one which isn't, pick the one which is compatible.
1629  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1630  compatible_table_names.end() &&
1631  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1632  compatible_table_names.end()) {
1633  return true;
1634  }
1635  }
1636  return lhs.type < rhs.type;
1637  });
1638 }
1639 
1640 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1641  std::vector<std::string>& visible_tables,
1642  query_state::StdLog& stdlog,
1643  const std::string& sql,
1644  const int cursor) {
1645  const auto& session_info = *stdlog.getConstSessionInfo();
1646  try {
1647  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1648 
1649  // Filter out keywords suggested by Calcite which we don't support.
1651  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1652  } catch (const std::exception& e) {
1653  TOmniSciException ex;
1654  ex.error_msg = std::string(e.what());
1655  LOG(ERROR) << ex.error_msg;
1656  throw ex;
1657  }
1658  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1659  const size_t length_to_cursor =
1660  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1661  // Trust hints from Calcite after the FROM keyword.
1662  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1663  return;
1664  }
1665  // Before FROM, the query is too incomplete for context-sensitive completions.
1666  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1667 }
1668 
1669 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1670  query_state::StdLog& stdlog,
1671  std::vector<std::string>& visible_tables,
1672  const std::string& sql,
1673  const int cursor) {
1674  const auto last_word =
1675  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1676  boost::regex select_expr{R"(\s*select\s+)",
1677  boost::regex::extended | boost::regex::icase};
1678  const size_t length_to_cursor =
1679  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1680  // After SELECT but before FROM, look for all columns in all tables which match the
1681  // prefix.
1682  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1683  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1684  // Trust the fully qualified columns the most.
1685  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1686  return;
1687  }
1688  // Not much information to use, just retrieve column names which match the prefix.
1689  if (should_suggest_column_hints(sql)) {
1690  get_column_hints(hints, last_word, column_names_by_table);
1691  return;
1692  }
1693  const std::string kFromKeyword{"FROM"};
1694  if (boost::istarts_with(kFromKeyword, last_word)) {
1695  TCompletionHint keyword_hint;
1696  keyword_hint.type = TCompletionHintType::KEYWORD;
1697  keyword_hint.replaced = last_word;
1698  keyword_hint.hints.emplace_back(kFromKeyword);
1699  hints.push_back(keyword_hint);
1700  }
1701  } else {
1702  const std::string kSelectKeyword{"SELECT"};
1703  if (boost::istarts_with(kSelectKeyword, last_word)) {
1704  TCompletionHint keyword_hint;
1705  keyword_hint.type = TCompletionHintType::KEYWORD;
1706  keyword_hint.replaced = last_word;
1707  keyword_hint.hints.emplace_back(kSelectKeyword);
1708  hints.push_back(keyword_hint);
1709  }
1710  }
1711 }
1712 
1713 std::unordered_map<std::string, std::unordered_set<std::string>>
1714 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1715  query_state::StdLog& stdlog) {
1716  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1717  for (auto it = table_names.begin(); it != table_names.end();) {
1718  TTableDetails table_details;
1719  try {
1720  get_table_details_impl(table_details, stdlog, *it, false, false);
1721  } catch (const TOmniSciException& e) {
1722  // Remove the corrupted Table/View name from the list for further processing.
1723  it = table_names.erase(it);
1724  continue;
1725  }
1726  for (const auto& column_type : table_details.row_desc) {
1727  column_names_by_table[*it].emplace(column_type.col_name);
1728  }
1729  ++it;
1730  }
1731  return column_names_by_table;
1732 }
1733 
1737 }
1738 
1740  const std::unordered_set<std::string>& uc_column_names,
1741  std::vector<std::string>& table_names,
1742  query_state::StdLog& stdlog) {
1743  std::unordered_set<std::string> compatible_table_names_by_column;
1744  for (auto it = table_names.begin(); it != table_names.end();) {
1745  TTableDetails table_details;
1746  try {
1747  get_table_details_impl(table_details, stdlog, *it, false, false);
1748  } catch (const TOmniSciException& e) {
1749  // Remove the corrupted Table/View name from the list for further processing.
1750  it = table_names.erase(it);
1751  continue;
1752  }
1753  for (const auto& column_type : table_details.row_desc) {
1754  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1755  compatible_table_names_by_column.emplace(to_upper(*it));
1756  break;
1757  }
1758  }
1759  ++it;
1760  }
1761  return compatible_table_names_by_column;
1762 }
1763 
1764 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1765  QueryStateProxy query_state_proxy) {
1766  TQueryResult _return;
1768  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1769  [this, &result, &query_state_proxy, &query_ra](const size_t executor_index) {
1770  auto qid_scope_guard = query_state_proxy.getQueryState().setThreadLocalQueryId();
1771  execute_rel_alg(result,
1772  query_state_proxy,
1773  query_ra,
1774  true,
1776  -1,
1777  -1,
1778  /*just_validate=*/true,
1779  /*find_filter_push_down_candidates=*/false,
1781  executor_index);
1782  });
1784  dispatch_queue_->submit(execute_rel_alg_task, /*is_update_delete=*/false);
1785  auto result_future = execute_rel_alg_task->get_future();
1786  result_future.get();
1787  DBHandler::convertData(_return, result, query_state_proxy, query_ra, true, -1, -1);
1788  return _return;
1789 }
1790 
1791 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1792  auto stdlog = STDLOG(get_session_ptr(session));
1793  auto session_ptr = stdlog.getConstSessionInfo();
1794  if (!session_ptr->get_currentUser().isSuper) {
1795  // WARNING: This appears to not include roles a user is a member of,
1796  // if the role has no permissions granted to it.
1797  roles =
1798  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1799  session_ptr->getCatalog().getCurrentDB().dbId);
1800  } else {
1801  roles = SysCatalog::instance().getRoles(
1802  false, true, session_ptr->get_currentUser().userName);
1803  }
1804 }
1805 
1806 bool DBHandler::has_role(const TSessionId& sessionId,
1807  const std::string& granteeName,
1808  const std::string& roleName) {
1809  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1810  const auto session_ptr = stdlog.getConstSessionInfo();
1811  const auto current_user = session_ptr->get_currentUser();
1812  if (!current_user.isSuper) {
1813  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1814  user && current_user.userName != granteeName) {
1815  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1816  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1817  current_user.userName, granteeName, true)) {
1819  "Only super users can check roles assignment that have not been directly "
1820  "granted to a user.");
1821  }
1822  }
1823  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1824 }
1825 
1826 static TDBObject serialize_db_object(const std::string& roleName,
1827  const DBObject& inObject) {
1828  TDBObject outObject;
1829  outObject.objectName = inObject.getName();
1830  outObject.grantee = roleName;
1831  outObject.objectId = inObject.getObjectKey().objectId;
1832  const auto ap = inObject.getPrivileges();
1833  switch (inObject.getObjectKey().permissionType) {
1834  case DatabaseDBObjectType:
1835  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1836  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1837  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1838  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1839  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1840 
1841  break;
1842  case TableDBObjectType:
1843  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1844  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1845  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1846  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1847  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1848  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1849  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1850  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1851  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1852 
1853  break;
1854  case DashboardDBObjectType:
1855  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1856  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1857  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1858  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1859  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1860 
1861  break;
1862  case ViewDBObjectType:
1863  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1864  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1865  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1866  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1867  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1868  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1869  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1870 
1871  break;
1872  case ServerDBObjectType:
1873  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1874  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1875  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1876  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1877  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::SERVER_USAGE));
1878 
1879  break;
1880  default:
1881  CHECK(false);
1882  }
1883  const int type_val = static_cast<int>(inObject.getType());
1884  CHECK(type_val >= 0 && type_val < 6);
1885  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1886  return outObject;
1887 }
1888 
1890  const TDBObjectPermissions& permissions) {
1891  if (!permissions.__isset.database_permissions_) {
1892  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1893  }
1894  auto perms = permissions.database_permissions_;
1895  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1896  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1897  (perms.view_sql_editor_ &&
1899  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1900  return false;
1901  } else {
1902  return true;
1903  }
1904 }
1905 
1907  const TDBObjectPermissions& permissions) {
1908  if (!permissions.__isset.table_permissions_) {
1909  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1910  }
1911  auto perms = permissions.table_permissions_;
1912  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1913  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1914  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1915  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1916  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1917  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1918  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1919  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1920  return false;
1921  } else {
1922  return true;
1923  }
1924 }
1925 
1927  const TDBObjectPermissions& permissions) {
1928  if (!permissions.__isset.dashboard_permissions_) {
1929  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1930  }
1931  auto perms = permissions.dashboard_permissions_;
1932  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1933  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1934  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1935  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1936  return false;
1937  } else {
1938  return true;
1939  }
1940 }
1941 
1943  const TDBObjectPermissions& permissions) {
1944  if (!permissions.__isset.view_permissions_) {
1945  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1946  }
1947  auto perms = permissions.view_permissions_;
1948  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1949  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1950  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1951  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1952  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1953  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1954  return false;
1955  } else {
1956  return true;
1957  }
1958 }
1959 
1961  const TDBObjectPermissions& permissions) {
1962  CHECK(permissions.__isset.server_permissions_);
1963  auto perms = permissions.server_permissions_;
1964  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
1965  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
1966  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER)) ||
1967  (perms.usage_ && !privs.hasPermission(ServerPrivileges::SERVER_USAGE))) {
1968  return false;
1969  } else {
1970  return true;
1971  }
1972 }
1973 
1974 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1975  const std::string& granteeName,
1976  const std::string& objectName,
1977  const TDBObjectType::type objectType,
1978  const TDBObjectPermissions& permissions) {
1979  auto stdlog = STDLOG(get_session_ptr(sessionId));
1980  auto session_ptr = stdlog.getConstSessionInfo();
1981  auto const& cat = session_ptr->getCatalog();
1982  auto const& current_user = session_ptr->get_currentUser();
1983  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1984  current_user.userName, granteeName, false)) {
1986  "Users except superusers can only check privileges for self or roles granted "
1987  "to "
1988  "them.")
1989  }
1991  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1992  user_meta.isSuper) {
1993  return true;
1994  }
1995  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1996  if (!grnt) {
1997  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1998  }
2000  std::string func_name;
2001  switch (objectType) {
2004  func_name = "database";
2005  break;
2008  func_name = "table";
2009  break;
2012  func_name = "dashboard";
2013  break;
2016  func_name = "view";
2017  break;
2020  func_name = "server";
2021  break;
2022  default:
2023  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
2024  }
2025  DBObject req_object(objectName, type);
2026  req_object.loadKey(cat);
2027 
2028  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
2029  if (grantee_object) {
2030  // if grantee has privs on the object
2031  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
2032  } else {
2033  // no privileges on that object
2034  return false;
2035  }
2036 }
2037 
2038 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
2039  const TSessionId& sessionId,
2040  const std::string& roleName) {
2041  auto stdlog = STDLOG(get_session_ptr(sessionId));
2042  auto session_ptr = stdlog.getConstSessionInfo();
2043  auto const& user = session_ptr->get_currentUser();
2044  if (!user.isSuper &&
2045  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
2046  return;
2047  }
2048  auto* rl = SysCatalog::instance().getGrantee(roleName);
2049  if (rl) {
2050  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
2051  for (auto& dbObject : *rl->getDbObjects(true)) {
2052  if (dbObject.first.dbId != dbId) {
2053  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
2054  // usecase for now, though)
2055  continue;
2056  }
2057  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
2058  TDBObjectsForRole.push_back(tdbObject);
2059  }
2060  } else {
2061  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
2062  }
2063 }
2064 
2065 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
2066  const TSessionId& sessionId,
2067  const std::string& objectName,
2068  const TDBObjectType::type type) {
2069  auto stdlog = STDLOG(get_session_ptr(sessionId));
2070  auto session_ptr = stdlog.getConstSessionInfo();
2071  DBObjectType object_type;
2072  switch (type) {
2074  object_type = DBObjectType::DatabaseDBObjectType;
2075  break;
2077  object_type = DBObjectType::TableDBObjectType;
2078  break;
2081  break;
2083  object_type = DBObjectType::ViewDBObjectType;
2084  break;
2086  object_type = DBObjectType::ServerDBObjectType;
2087  break;
2088  default:
2089  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
2090  ": unknown object type (" + std::to_string(type) + ").");
2091  }
2092  DBObject object_to_find(objectName, object_type);
2093 
2094  // TODO(adb): Use DatabaseLock to protect method
2095  try {
2096  if (object_type == DashboardDBObjectType) {
2097  if (objectName == "") {
2098  object_to_find = DBObject(-1, object_type);
2099  } else {
2100  object_to_find = DBObject(std::stoi(objectName), object_type);
2101  }
2102  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
2103  !objectName.empty()) {
2104  // special handling for view / table
2105  auto const& cat = session_ptr->getCatalog();
2106  auto td = cat.getMetadataForTable(objectName, false);
2107  if (td) {
2108  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
2109  object_to_find = DBObject(objectName, object_type);
2110  }
2111  }
2112  object_to_find.loadKey(session_ptr->getCatalog());
2113  } catch (const std::exception&) {
2114  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
2115  }
2116 
2117  // object type on database level
2118  DBObject object_to_find_dblevel("", object_type);
2119  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
2120  // if user is superuser respond with a full priv
2121  if (session_ptr->get_currentUser().isSuper) {
2122  // using ALL_TABLE here to set max permissions
2123  DBObject dbObj{object_to_find.getObjectKey(),
2125  session_ptr->get_currentUser().userId};
2126  dbObj.setName("super");
2127  TDBObjects.push_back(
2128  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
2129  };
2130 
2131  std::vector<std::string> grantees =
2132  SysCatalog::instance().getRoles(true,
2133  session_ptr->get_currentUser().isSuper,
2134  session_ptr->get_currentUser().userName);
2135  for (const auto& grantee : grantees) {
2136  DBObject* object_found;
2137  auto* gr = SysCatalog::instance().getGrantee(grantee);
2138  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
2139  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2140  }
2141  // check object permissions on Database level
2142  if (gr &&
2143  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
2144  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2145  }
2146  }
2147 }
2148 
2149 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
2150  const TSessionId& sessionId,
2151  const std::string& granteeName) {
2152  auto stdlog = STDLOG(get_session_ptr(sessionId));
2153  auto session_ptr = stdlog.getConstSessionInfo();
2154  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
2155  if (grantee) {
2156  if (session_ptr->get_currentUser().isSuper) {
2157  roles = grantee->getRoles();
2158  } else if (grantee->isUser()) {
2159  if (session_ptr->get_currentUser().userName == granteeName) {
2160  roles = grantee->getRoles();
2161  } else {
2163  "Only a superuser is authorized to request list of roles granted to "
2164  "another "
2165  "user.");
2166  }
2167  } else {
2168  CHECK(!grantee->isUser());
2169  // granteeName is actually a roleName here and we can check a role
2170  // only if it is granted to us
2171  if (SysCatalog::instance().isRoleGrantedToGrantee(
2172  session_ptr->get_currentUser().userName, granteeName, false)) {
2173  roles = grantee->getRoles();
2174  } else {
2175  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
2176  }
2177  }
2178  } else {
2179  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
2180  }
2181 }
2182 
2183 namespace {
2185  const std::map<std::string, std::vector<std::string>>& table_col_names) {
2186  std::ostringstream oss;
2187  for (const auto& [table_name, col_names] : table_col_names) {
2188  oss << ":" << table_name;
2189  for (const auto& col_name : col_names) {
2190  oss << "," << col_name;
2191  }
2192  }
2193  return oss.str();
2194 }
2195 } // namespace
2196 
2198  TPixelTableRowResult& _return,
2199  const TSessionId& session,
2200  const int64_t widget_id,
2201  const TPixel& pixel,
2202  const std::map<std::string, std::vector<std::string>>& table_col_names,
2203  const bool column_format,
2204  const int32_t pixel_radius,
2205  const std::string& nonce) {
2206  auto stdlog = STDLOG(get_session_ptr(session),
2207  "widget_id",
2208  widget_id,
2209  "pixel.x",
2210  pixel.x,
2211  "pixel.y",
2212  pixel.y,
2213  "column_format",
2214  column_format,
2215  "pixel_radius",
2216  pixel_radius,
2217  "table_col_names",
2218  dump_table_col_names(table_col_names),
2219  "nonce",
2220  nonce);
2221  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2222  auto session_ptr = stdlog.getSessionInfo();
2223  if (!render_handler_) {
2224  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
2225  }
2226 
2227  try {
2228  render_handler_->get_result_row_for_pixel(_return,
2229  session_ptr,
2230  widget_id,
2231  pixel,
2232  table_col_names,
2233  column_format,
2234  pixel_radius,
2235  nonce);
2236  } catch (std::exception& e) {
2237  THROW_MAPD_EXCEPTION(e.what());
2238  }
2239 }
2240 
2242  const ColumnDescriptor* cd) {
2243  TColumnType col_type;
2244  col_type.col_name = cd->columnName;
2245  col_type.src_name = cd->sourceName;
2246  col_type.col_id = cd->columnId;
2247  col_type.col_type.type = type_to_thrift(cd->columnType);
2248  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
2249  col_type.col_type.nullable = !cd->columnType.get_notnull();
2250  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
2251  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
2252  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
2253  }
2254  if (IS_GEO(cd->columnType.get_type())) {
2256  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
2257  } else {
2258  col_type.col_type.precision = cd->columnType.get_precision();
2259  col_type.col_type.scale = cd->columnType.get_scale();
2260  }
2261  col_type.is_system = cd->isSystemCol;
2263  cat != nullptr) {
2264  // have to get the actual size of the encoding from the dictionary definition
2265  const int dict_id = cd->columnType.get_comp_param();
2266  if (!cat->getMetadataForDict(dict_id, false)) {
2267  col_type.col_type.comp_param = 0;
2268  return col_type;
2269  }
2270  auto dd = cat->getMetadataForDict(dict_id, false);
2271  if (!dd) {
2272  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
2273  }
2274  col_type.col_type.comp_param = dd->dictNBits;
2275  } else {
2276  col_type.col_type.comp_param =
2277  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
2278  ? 32
2279  : cd->columnType.get_comp_param();
2280  }
2281  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
2282  if (cd->default_value.has_value()) {
2283  col_type.__set_default_value(cd->getDefaultValueLiteral());
2284  }
2285  return col_type;
2286 }
2287 
2288 void DBHandler::get_internal_table_details(TTableDetails& _return,
2289  const TSessionId& session,
2290  const std::string& table_name) {
2291  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2292  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2293  get_table_details_impl(_return, stdlog, table_name, true, false);
2294 }
2295 
2297  TTableDetails& _return,
2298  const TSessionId& session,
2299  const std::string& table_name,
2300  const std::string& database_name) {
2301  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2302  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2303  get_table_details_impl(_return, stdlog, table_name, true, false, database_name);
2304 }
2305 
2306 void DBHandler::get_table_details(TTableDetails& _return,
2307  const TSessionId& session,
2308  const std::string& table_name) {
2309  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2310  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2311  get_table_details_impl(_return, stdlog, table_name, false, false);
2312 }
2313 
2314 void DBHandler::get_table_details_for_database(TTableDetails& _return,
2315  const TSessionId& session,
2316  const std::string& table_name,
2317  const std::string& database_name) {
2318  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2319  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2320  get_table_details_impl(_return, stdlog, table_name, false, false, database_name);
2321 }
2322 
2323 void DBHandler::get_table_details_impl(TTableDetails& _return,
2324  query_state::StdLog& stdlog,
2325  const std::string& table_name,
2326  const bool get_system,
2327  const bool get_physical,
2328  const std::string& database_name) {
2329  try {
2330  auto session_info = stdlog.getSessionInfo();
2331  auto& cat = (database_name.empty())
2332  ? session_info->getCatalog()
2333  : *SysCatalog::instance().getCatalog(database_name);
2334  const auto td_with_lock =
2336  cat, table_name, false);
2337  const auto td = td_with_lock();
2338  CHECK(td);
2339 
2340  bool have_privileges_on_view_sources = true;
2341  if (td->isView) {
2342  auto query_state = create_query_state(session_info, td->viewSQL);
2343  stdlog.setQueryState(query_state);
2344  try {
2345  if (hasTableAccessPrivileges(td, *session_info)) {
2346  // TODO(adb): we should take schema read locks on all tables making up the
2347  // view, at minimum
2348  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
2349  query_state->getQueryStr(),
2350  {},
2351  false,
2353  false);
2354  try {
2355  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2356  query_ra.first);
2357  } catch (const std::runtime_error&) {
2358  have_privileges_on_view_sources = false;
2359  }
2360 
2361  const auto result = validate_rel_alg(query_ra.first.plan_result,
2362  query_state->createQueryStateProxy());
2363 
2364  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
2365  } else {
2366  throw std::runtime_error(
2367  "Unable to access view " + table_name +
2368  ". The view may not exist, or the logged in user may not "
2369  "have permission to access the view.");
2370  }
2371  } catch (const std::exception& e) {
2372  throw std::runtime_error("View '" + table_name +
2373  "' query has failed with an error: '" +
2374  std::string(e.what()) +
2375  "'.\nThe view must be dropped and re-created to "
2376  "resolve the error. \nQuery:\n" +
2377  query_state->getQueryStr());
2378  }
2379  } else {
2380  if (hasTableAccessPrivileges(td, *session_info)) {
2381  const auto col_descriptors =
2382  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
2383  const auto deleted_cd = cat.getDeletedColumn(td);
2384  for (const auto cd : col_descriptors) {
2385  if (cd == deleted_cd) {
2386  continue;
2387  }
2388  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
2389  }
2390  } else {
2391  throw std::runtime_error(
2392  "Unable to access table " + table_name +
2393  ". The table may not exist, or the logged in user may not "
2394  "have permission to access the table.");
2395  }
2396  }
2397  _return.fragment_size = td->maxFragRows;
2398  _return.page_size = td->fragPageSize;
2399  _return.max_rows = td->maxRows;
2400  _return.view_sql =
2401  (have_privileges_on_view_sources ? td->viewSQL
2402  : "[Not enough privileges to see the view SQL]");
2403  _return.shard_count = td->nShards;
2404  _return.key_metainfo = td->keyMetainfo;
2405  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2406  _return.partition_detail =
2407  td->partitions.empty()
2409  : (table_is_replicated(td)
2410  ? TPartitionDetail::REPLICATED
2411  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2412  : TPartitionDetail::OTHER));
2413  } catch (const std::runtime_error& e) {
2414  THROW_MAPD_EXCEPTION(std::string(e.what()));
2415  }
2416 }
2417 
2418 void DBHandler::get_link_view(TFrontendView& _return,
2419  const TSessionId& session,
2420  const std::string& link) {
2421  auto stdlog = STDLOG(get_session_ptr(session));
2422  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2423  auto session_ptr = stdlog.getConstSessionInfo();
2424  auto const& cat = session_ptr->getCatalog();
2425  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2426  if (!ld) {
2427  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
2428  }
2429  _return.view_state = ld->viewState;
2430  _return.view_name = ld->link;
2431  _return.update_time = ld->updateTime;
2432  _return.view_metadata = ld->viewMetadata;
2433 }
2434 
2436  const TableDescriptor* td,
2437  const Catalog_Namespace::SessionInfo& session_info) {
2438  auto& cat = session_info.getCatalog();
2439  auto user_metadata = session_info.get_currentUser();
2440 
2441  if (user_metadata.isSuper) {
2442  return true;
2443  }
2444 
2446  dbObject.loadKey(cat);
2447  std::vector<DBObject> privObjects = {dbObject};
2448 
2449  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2450 }
2451 
2452 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2453  const Catalog_Namespace::SessionInfo& session_info,
2454  const GetTablesType get_tables_type,
2455  const std::string& database_name) {
2456  if (database_name.empty()) {
2457  table_names = session_info.getCatalog().getTableNamesForUser(
2458  session_info.get_currentUser(), get_tables_type);
2459  } else {
2460  auto request_cat = SysCatalog::instance().getCatalog(database_name);
2461  table_names = request_cat->getTableNamesForUser(session_info.get_currentUser(),
2462  get_tables_type);
2463  }
2464 }
2465 
2466 void DBHandler::get_tables(std::vector<std::string>& table_names,
2467  const TSessionId& session) {
2468  auto stdlog = STDLOG(get_session_ptr(session));
2469  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2471  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2472 }
2473 
2474 void DBHandler::get_tables_for_database(std::vector<std::string>& table_names,
2475  const TSessionId& session,
2476  const std::string& database_name) {
2477  auto stdlog = STDLOG(get_session_ptr(session));
2478  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2479 
2480  get_tables_impl(table_names,
2481  *stdlog.getConstSessionInfo(),
2483  database_name);
2484 }
2485 
2486 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2487  const TSessionId& session) {
2488  auto stdlog = STDLOG(get_session_ptr(session));
2489  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2490  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2491 }
2492 
2493 void DBHandler::get_views(std::vector<std::string>& table_names,
2494  const TSessionId& session) {
2495  auto stdlog = STDLOG(get_session_ptr(session));
2496  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2497  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2498 }
2499 
2500 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2501  QueryStateProxy query_state_proxy,
2502  const Catalog_Namespace::SessionInfo& session_info,
2503  const bool with_table_locks) {
2504  const auto& cat = session_info.getCatalog();
2505  const auto tables = cat.getAllTableMetadata();
2506  _return.reserve(tables.size());
2507 
2508  for (const auto td : tables) {
2509  if (td->shard >= 0) {
2510  // skip shards, they're not standalone tables
2511  continue;
2512  }
2513  if (!hasTableAccessPrivileges(td, session_info)) {
2514  // skip table, as there are no privileges to access it
2515  continue;
2516  }
2517 
2518  TTableMeta ret;
2519  ret.table_name = td->tableName;
2520  ret.is_view = td->isView;
2521  ret.is_replicated = table_is_replicated(td);
2522  ret.shard_count = td->nShards;
2523  ret.max_rows = td->maxRows;
2524  ret.table_id = td->tableId;
2525 
2526  std::vector<TTypeInfo> col_types;
2527  std::vector<std::string> col_names;
2528  size_t num_cols = 0;
2529  if (td->isView) {
2530  try {
2531  TPlanResult parse_result;
2533  std::tie(parse_result, locks) = parse_to_ra(
2534  query_state_proxy, td->viewSQL, {}, with_table_locks, system_parameters_);
2535  const auto query_ra = parse_result.plan_result;
2536 
2537  ExecutionResult ex_result;
2538  execute_rel_alg(ex_result,
2539  query_state_proxy,
2540  query_ra,
2541  true,
2543  -1,
2544  -1,
2545  /*just_validate=*/true,
2546  /*find_push_down_candidates=*/false,
2548  TQueryResult result;
2550  result, ex_result, query_state_proxy, query_ra, true, -1, -1);
2551  num_cols = result.row_set.row_desc.size();
2552  for (const auto& col : result.row_set.row_desc) {
2553  if (col.is_physical) {
2554  num_cols--;
2555  continue;
2556  }
2557  col_types.push_back(col.col_type);
2558  col_names.push_back(col.col_name);
2559  }
2560  } catch (std::exception& e) {
2561  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
2562  }
2563  } else {
2564  try {
2565  if (hasTableAccessPrivileges(td, session_info)) {
2566  const auto col_descriptors =
2567  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2568  const auto deleted_cd = cat.getDeletedColumn(td);
2569  for (const auto cd : col_descriptors) {
2570  if (cd == deleted_cd) {
2571  continue;
2572  }
2573  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2574  col_names.push_back(cd->columnName);
2575  }
2576  num_cols = col_descriptors.size();
2577  } else {
2578  continue;
2579  }
2580  } catch (const std::runtime_error& e) {
2581  THROW_MAPD_EXCEPTION(e.what());
2582  }
2583  }
2584 
2585  ret.num_cols = num_cols;
2586  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2587  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2588 
2589  _return.push_back(ret);
2590  }
2591 }
2592 
2593 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2594  const TSessionId& session) {
2595  auto stdlog = STDLOG(get_session_ptr(session));
2596  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2597  auto session_ptr = stdlog.getConstSessionInfo();
2598  auto query_state = create_query_state(session_ptr, "");
2599  stdlog.setQueryState(query_state);
2600 
2601  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2604 
2605  try {
2606  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2607  } catch (const std::exception& e) {
2608  THROW_MAPD_EXCEPTION(e.what());
2609  }
2610 }
2611 
2612 void DBHandler::get_users(std::vector<std::string>& user_names,
2613  const TSessionId& session) {
2614  auto stdlog = STDLOG(get_session_ptr(session));
2615  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2616  auto session_ptr = stdlog.getConstSessionInfo();
2617  std::list<Catalog_Namespace::UserMetadata> user_list;
2618 
2619  if (!session_ptr->get_currentUser().isSuper) {
2620  user_list = SysCatalog::instance().getAllUserMetadata(
2621  session_ptr->getCatalog().getCurrentDB().dbId);
2622  } else {
2623  user_list = SysCatalog::instance().getAllUserMetadata();
2624  }
2625  for (auto u : user_list) {
2626  user_names.push_back(u.userName);
2627  }
2628 }
2629 
2630 void DBHandler::get_version(std::string& version) {
2631  version = MAPD_RELEASE;
2632 }
2633 
2634 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2635  auto stdlog = STDLOG(get_session_ptr(session));
2636  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2637  auto session_ptr = stdlog.getConstSessionInfo();
2638  if (!session_ptr->get_currentUser().isSuper) {
2639  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2640  }
2641  try {
2643  } catch (const std::exception& e) {
2644  THROW_MAPD_EXCEPTION(e.what());
2645  }
2646  if (render_handler_) {
2647  render_handler_->clear_gpu_memory();
2648  }
2649 
2650  if (leaf_aggregator_.leafCount() > 0) {
2652  }
2653 }
2654 
2655 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2656  auto stdlog = STDLOG(get_session_ptr(session));
2657  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2658  auto session_ptr = stdlog.getConstSessionInfo();
2659  if (!session_ptr->get_currentUser().isSuper) {
2660  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2661  }
2662  try {
2664  } catch (const std::exception& e) {
2665  THROW_MAPD_EXCEPTION(e.what());
2666  }
2667  if (render_handler_) {
2668  render_handler_->clear_cpu_memory();
2669  }
2670 
2671  if (leaf_aggregator_.leafCount() > 0) {
2673  }
2674 }
2675 
2676 void DBHandler::set_cur_session(const TSessionId& parent_session,
2677  const TSessionId& leaf_session,
2678  const std::string& start_time_str,
2679  const std::string& label,
2680  bool for_running_query_kernel) {
2681  // internal API to manage query interruption in distributed mode
2682  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2683  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2684  auto session_ptr = stdlog.getConstSessionInfo();
2685 
2687  executor->enrollQuerySession(parent_session,
2688  label,
2689  start_time_str,
2691  for_running_query_kernel
2692  ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2693  : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
2694 }
2695 
2696 void DBHandler::invalidate_cur_session(const TSessionId& parent_session,
2697  const TSessionId& leaf_session,
2698  const std::string& start_time_str,
2699  const std::string& label,
2700  bool for_running_query_kernel) {
2701  // internal API to manage query interruption in distributed mode
2702  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2703  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2705  executor->clearQuerySessionStatus(parent_session, start_time_str);
2706 }
2707 
2709  return INVALID_SESSION_ID;
2710 }
2711 
2712 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2713  const TSessionId& session,
2714  const std::string& memory_level) {
2715  auto stdlog = STDLOG(get_session_ptr(session));
2716  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2717  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2718  Data_Namespace::MemoryLevel mem_level;
2719  if (!memory_level.compare("gpu")) {
2721  internal_memory =
2722  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2723  } else {
2725  internal_memory =
2726  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2727  }
2728 
2729  for (auto memInfo : internal_memory) {
2730  TNodeMemoryInfo nodeInfo;
2731  if (leaf_aggregator_.leafCount() > 0) {
2732  nodeInfo.host_name = omnisci::get_hostname();
2733  }
2734  nodeInfo.page_size = memInfo.pageSize;
2735  nodeInfo.max_num_pages = memInfo.maxNumPages;
2736  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2737  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2738  for (auto gpu : memInfo.nodeMemoryData) {
2739  TMemoryData md;
2740  md.slab = gpu.slabNum;
2741  md.start_page = gpu.startPage;
2742  md.num_pages = gpu.numPages;
2743  md.touch = gpu.touch;
2744  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2745  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2746  nodeInfo.node_memory_data.push_back(md);
2747  }
2748  _return.push_back(nodeInfo);
2749  }
2750  if (leaf_aggregator_.leafCount() > 0) {
2751  std::vector<TNodeMemoryInfo> leafSummary =
2752  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2753  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2754  }
2755 }
2756 
2757 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2758  auto stdlog = STDLOG(get_session_ptr(session));
2759  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2760  auto session_ptr = stdlog.getConstSessionInfo();
2761  const auto& user = session_ptr->get_currentUser();
2763  SysCatalog::instance().getDatabaseListForUser(user);
2764  for (auto& db : dbs) {
2765  TDBInfo dbinfo;
2766  dbinfo.db_name = std::move(db.dbName);
2767  dbinfo.db_owner = std::move(db.dbOwnerName);
2768  dbinfos.push_back(std::move(dbinfo));
2769  }
2770 }
2771 
2772 void DBHandler::set_execution_mode(const TSessionId& session,
2773  const TExecuteMode::type mode) {
2774  auto stdlog = STDLOG(get_session_ptr(session));
2775  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2776  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
2777  auto session_it = get_session_it_unsafe(session, write_lock);
2778  if (leaf_aggregator_.leafCount() > 0) {
2779  leaf_aggregator_.set_execution_mode(session, mode);
2780  try {
2781  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2782  } catch (const TOmniSciException& e) {
2783  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2784  }
2785  return;
2786  }
2787  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2788 }
2789 
2790 namespace {
2791 
2793  if (td && td->nShards) {
2794  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2795  }
2796 }
2797 
2798 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
2799  const std::vector<std::string>& column_names) {
2800  std::unordered_set<std::string> unique_names;
2801  for (const auto& name : column_names) {
2802  auto lower_name = to_lower(name);
2803  if (unique_names.find(lower_name) != unique_names.end()) {
2804  THROW_MAPD_EXCEPTION("Column " + name + " is mentioned multiple times");
2805  } else {
2806  unique_names.insert(lower_name);
2807  }
2808  }
2809  for (const auto& cd : descs) {
2810  auto iter = unique_names.find(to_lower(cd->columnName));
2811  if (iter != unique_names.end()) {
2812  unique_names.erase(iter);
2813  }
2814  }
2815  if (!unique_names.empty()) {
2816  THROW_MAPD_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
2817  }
2818 }
2819 
2820 // Return vector of IDs mapping column descriptors to the list of comumn names.
2821 // The size of the vector is the number of actual columns (geophisical columns excluded).
2822 // ID is either a position in column_names matching the descriptor, or -1 if the column
2823 // is missing from the column_names
2824 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
2825  const std::vector<std::string>& column_names) {
2826  std::vector<int> desc_to_column_ids;
2827  if (column_names.empty()) {
2828  int col_idx = 0;
2829  for (const auto& cd : descs) {
2830  if (!cd->isGeoPhyCol) {
2831  desc_to_column_ids.push_back(col_idx);
2832  ++col_idx;
2833  }
2834  }
2835  } else {
2836  for (const auto& cd : descs) {
2837  if (!cd->isGeoPhyCol) {
2838  bool found = false;
2839  for (size_t j = 0; j < column_names.size(); ++j) {
2840  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
2841  found = true;
2842  desc_to_column_ids.push_back(j);
2843  break;
2844  }
2845  }
2846  if (!found) {
2847  if (!cd->columnType.get_notnull()) {
2848  desc_to_column_ids.push_back(-1);
2849  } else {
2850  THROW_MAPD_EXCEPTION("Column '" + cd->columnName +
2851  "' cannot be omitted due to NOT NULL constraint");
2852  }
2853  }
2854  }
2855  }
2856  }
2857  return desc_to_column_ids;
2858 }
2859 
2860 } // namespace
2861 
2863  const TSessionId& session,
2864  const Catalog& catalog,
2865  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
2866  const ColumnDescriptor* cd,
2867  size_t& col_idx,
2868  size_t num_rows,
2869  const std::string& table_name,
2870  bool assign_render_groups) {
2871  auto geo_col_idx = col_idx - 1;
2872  const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2873  std::vector<std::vector<double>> coords_column, bounds_column;
2874  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2875  std::vector<int> render_groups_column;
2876  SQLTypeInfo ti = cd->columnType;
2877  if (num_rows != wkt_or_wkb_hex_column->size() ||
2878  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
2879  ti,
2880  coords_column,
2881  bounds_column,
2882  ring_sizes_column,
2883  poly_rings_column,
2884  false)) {
2885  std::ostringstream oss;
2886  oss << "Invalid geometry in column " << cd->columnName;
2887  THROW_MAPD_EXCEPTION(oss.str());
2888  }
2889 
2890  // start or continue assigning render groups for poly columns?
2891  if (IS_GEO_POLY(cd->columnType.get_type()) && assign_render_groups) {
2892  // get RGA to use
2893  import_export::RenderGroupAnalyzer* render_group_analyzer{};
2894  {
2895  // mutex the map access
2896  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
2897 
2898  // emplace new RGA or fetch existing RGA from map
2899  auto [itr_table, emplaced_table] = render_group_assignment_map_.try_emplace(
2900  session, RenderGroupAssignmentTableMap());
2901  LOG_IF(INFO, emplaced_table)
2902  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2903  "Persistent Data for Session '"
2904  << session << "'";
2905  auto [itr_column, emplaced_column] =
2906  itr_table->second.try_emplace(table_name, RenderGroupAssignmentColumnMap());
2907  LOG_IF(INFO, emplaced_column)
2908  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2909  "Persistent Data for Table '"
2910  << table_name << "'";
2911  auto [itr_analyzer, emplaced_analyzer] = itr_column->second.try_emplace(
2912  cd->columnName, std::make_unique<import_export::RenderGroupAnalyzer>());
2913  LOG_IF(INFO, emplaced_analyzer)
2914  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2915  "Persistent Data for Column '"
2916  << cd->columnName << "'";
2917  render_group_analyzer = itr_analyzer->second.get();
2918  CHECK(render_group_analyzer);
2919 
2920  // seed new RGA from existing table/column, to handle appends
2921  if (emplaced_analyzer) {
2922  LOG(INFO) << "load_table_binary_columnar_polys: Seeding Render Groups from "
2923  "existing table...";
2924  render_group_analyzer->seedFromExistingTableContents(
2925  catalog, table_name, cd->columnName);
2926  LOG(INFO) << "load_table_binary_columnar_polys: Done";
2927  }
2928  }
2929 
2930  // assign render groups for this set of bounds
2931  LOG(INFO) << "load_table_binary_columnar_polys: Assigning Render Groups...";
2932  render_groups_column.reserve(bounds_column.size());
2933  for (auto const& bounds : bounds_column) {
2934  CHECK_EQ(bounds.size(), 4u);
2935  int rg = render_group_analyzer->insertBoundsAndReturnRenderGroup(bounds);
2936  render_groups_column.push_back(rg);
2937  }
2938  LOG(INFO) << "load_table_binary_columnar_polys: Done";
2939  } else {
2940  // render groups all zero
2941  render_groups_column.resize(bounds_column.size(), 0);
2942  }
2943 
2944  // Populate physical columns, advance col_idx
2946  cd,
2947  import_buffers,
2948  col_idx,
2949  coords_column,
2950  bounds_column,
2951  ring_sizes_column,
2952  poly_rings_column,
2953  render_groups_column);
2954 }
2955 
2957  const TSessionId& session,
2958  const Catalog& catalog,
2959  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
2960  const std::list<const ColumnDescriptor*>& cds,
2961  const std::vector<int>& desc_id_to_column_id,
2962  size_t num_rows,
2963  const std::string& table_name,
2964  bool assign_render_groups) {
2965  size_t skip_physical_cols = 0;
2966  size_t col_idx = 0, import_idx = 0;
2967  for (const auto& cd : cds) {
2968  if (skip_physical_cols > 0) {
2969  CHECK(cd->isGeoPhyCol);
2970  skip_physical_cols--;
2971  continue;
2972  } else if (cd->columnType.is_geometry()) {
2973  skip_physical_cols = cd->columnType.get_physical_cols();
2974  }
2975  if (desc_id_to_column_id[import_idx] == -1) {
2976  import_buffers[col_idx]->addDefaultValues(cd, num_rows);
2977  col_idx++;
2978  if (cd->columnType.is_geometry()) {
2979  fillGeoColumns(session,
2980  catalog,
2981  import_buffers,
2982  cd,
2983  col_idx,
2984  num_rows,
2985  table_name,
2986  assign_render_groups);
2987  }
2988  } else {
2989  col_idx++;
2990  col_idx += skip_physical_cols;
2991  }
2992  import_idx++;
2993  }
2994 }
2995 
2996 void DBHandler::load_table_binary(const TSessionId& session,
2997  const std::string& table_name,
2998  const std::vector<TRow>& rows,
2999  const std::vector<std::string>& column_names) {
3000  try {
3001  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3002  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3003  auto session_ptr = stdlog.getConstSessionInfo();
3004 
3005  if (rows.empty()) {
3006  THROW_MAPD_EXCEPTION("No rows to insert");
3007  }
3008 
3009  std::unique_ptr<import_export::Loader> loader;
3010  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3011  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3012  table_name,
3013  rows.front().cols.size(),
3014  &loader,
3015  &import_buffers,
3016  column_names,
3017  "load_table_binary");
3018 
3019  auto col_descs = loader->get_column_descs();
3020  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3021 
3022  size_t rows_completed = 0;
3023  for (auto const& row : rows) {
3024  size_t col_idx = 0;
3025  try {
3026  for (auto cd : col_descs) {
3027  auto mapped_idx = desc_id_to_column_id[col_idx];
3028  if (mapped_idx != -1) {
3029  import_buffers[col_idx]->add_value(
3030  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3031  }
3032  col_idx++;
3033  }
3034  rows_completed++;
3035  } catch (const std::exception& e) {
3036  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3037  import_buffers[col_idx_to_pop]->pop_value();
3038  }
3039  LOG(ERROR) << "Input exception thrown: " << e.what()
3040  << ". Row discarded, issue at column : " << (col_idx + 1)
3041  << " data :" << row;
3042  }
3043  }
3044  fillMissingBuffers(session,
3045  session_ptr->getCatalog(),
3046  import_buffers,
3047  col_descs,
3048  desc_id_to_column_id,
3049  rows_completed,
3050  table_name,
3051  false);
3052  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3053  session_ptr->getCatalog(), table_name);
3054  if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3055  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3056  }
3057  } catch (const std::exception& e) {
3058  THROW_MAPD_EXCEPTION(std::string(e.what()));
3059  }
3060 }
3061 
3062 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3064  const Catalog_Namespace::SessionInfo& session_info,
3065  const std::string& table_name,
3066  size_t num_cols,
3067  std::unique_ptr<import_export::Loader>* loader,
3068  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3069  const std::vector<std::string>& column_names,
3070  std::string load_type) {
3071  if (num_cols == 0) {
3072  THROW_MAPD_EXCEPTION("No columns to insert");
3073  }
3074  check_read_only(load_type);
3075  auto& cat = session_info.getCatalog();
3076  auto td_with_lock =
3077  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3079  cat, table_name, true));
3080  const auto td = (*td_with_lock)();
3081  CHECK(td);
3082 
3083  if (g_cluster && !leaf_aggregator_.leafCount()) {
3084  // Sharded table rows need to be routed to the leaf by an aggregator.
3086  }
3087  check_table_load_privileges(session_info, table_name);
3088 
3089  if (leaf_aggregator_.leafCount() > 0) {
3090  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
3091  } else {
3092  loader->reset(new import_export::Loader(cat, td));
3093  }
3094 
3095  auto col_descs = (*loader)->get_column_descs();
3096  check_valid_column_names(col_descs, column_names);
3097  if (column_names.empty()) {
3098  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
3099  // Subtracting 1 (rowid) until TableDescriptor is updated.
3100  auto geo_physical_cols = std::count_if(
3101  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
3102  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
3103  (td->hasDeletedCol ? 2 : 1);
3104  if (num_cols != num_table_cols) {
3105  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
3106  ") does not match number of columns in table " +
3107  td->tableName + " (" + std::to_string(num_table_cols) +
3108  ")");
3109  }
3110  } else if (num_cols != column_names.size()) {
3112  "Number of columns specified does not match the "
3113  "number of columns given (" +
3114  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
3115  }
3116 
3117  *import_buffers = import_export::setup_column_loaders(td, loader->get());
3118  return std::move(td_with_lock);
3119 }
3120 namespace {
3121 
3122 size_t get_column_size(const TColumn& column) {
3123  if (!column.nulls.empty()) {
3124  return column.nulls.size();
3125  } else {
3126  // it is a very bold estimate but later we check it against REAL data
3127  // and if this function returns a wrong result (e.g. both int and string
3128  // vectors are filled with values), we get an error
3129  return column.data.int_col.size() + column.data.arr_col.size() +
3130  column.data.real_col.size() + column.data.str_col.size();
3131  }
3132 }
3133 
3134 } // namespace
3135 
3136 void DBHandler::load_table_binary_columnar(const TSessionId& session,
3137  const std::string& table_name,
3138  const std::vector<TColumn>& cols,
3139  const std::vector<std::string>& column_names) {
3141  session, table_name, cols, column_names, AssignRenderGroupsMode::kNone);
3142 }
3143 
3145  const TSessionId& session,
3146  const std::string& table_name,
3147  const std::vector<TColumn>& cols,
3148  const std::vector<std::string>& column_names,
3149  const bool assign_render_groups) {
3151  table_name,
3152  cols,
3153  column_names,
3154  assign_render_groups
3157 }
3158 
3160  const TSessionId& session,
3161  const std::string& table_name,
3162  const std::vector<TColumn>& cols,
3163  const std::vector<std::string>& column_names,
3164  const AssignRenderGroupsMode assign_render_groups_mode) {
3165  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3166  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3167  auto session_ptr = stdlog.getConstSessionInfo();
3168 
3169  if (assign_render_groups_mode == AssignRenderGroupsMode::kCleanUp) {
3170  // throw if the user tries to pass column data on a clean-up
3171  if (cols.size()) {
3173  "load_table_binary_columnar_polys: Column data must be empty when called with "
3174  "assign_render_groups = false");
3175  }
3176 
3177  // mutex the map access
3178  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
3179 
3180  // drop persistent render group assignment data for this session and table
3181  // keep the per-session map in case other tables are active (ideally not)
3182  auto itr_session = render_group_assignment_map_.find(session);
3183  if (itr_session != render_group_assignment_map_.end()) {
3184  LOG(INFO) << "load_table_binary_columnar_polys: Cleaning up Render Group "
3185  "Assignment Persistent Data for Session '"
3186  << session << "', Table '" << table_name << "'";
3187  itr_session->second.erase(table_name);
3188  }
3189 
3190  // just doing clean-up, so we're done
3191  return;
3192  }
3193 
3194  std::unique_ptr<import_export::Loader> loader;
3195  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3196  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3197  table_name,
3198  cols.size(),
3199  &loader,
3200  &import_buffers,
3201  column_names,
3202  "load_table_binary_columnar");
3203 
3204  auto desc_id_to_column_id =
3205  column_ids_by_names(loader->get_column_descs(), column_names);
3206  size_t num_rows = get_column_size(cols.front());
3207  size_t import_idx = 0; // index into the TColumn vector being loaded
3208  size_t col_idx = 0; // index into column description vector
3209  try {
3210  size_t skip_physical_cols = 0;
3211  for (auto cd : loader->get_column_descs()) {
3212  if (skip_physical_cols > 0) {
3213  CHECK(cd->isGeoPhyCol);
3214  skip_physical_cols--;
3215  continue;
3216  }
3217  auto mapped_idx = desc_id_to_column_id[import_idx];
3218  if (mapped_idx != -1) {
3219  size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3220  if (col_rows != num_rows) {
3221  std::ostringstream oss;
3222  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
3223  << cd->columnName << " , expecting " << num_rows << " rows, column "
3224  << col_idx << " has " << col_rows << " rows";
3225  THROW_MAPD_EXCEPTION(oss.str());
3226  }
3227  // Advance to the next column in the table
3228  col_idx++;
3229  // For geometry columns: process WKT strings and fill physical columns
3230  if (cd->columnType.is_geometry()) {
3231  fillGeoColumns(session,
3232  session_ptr->getCatalog(),
3233  import_buffers,
3234  cd,
3235  col_idx,
3236  num_rows,
3237  table_name,
3238  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3239  skip_physical_cols = cd->columnType.get_physical_cols();
3240  }
3241  } else {
3242  col_idx++;
3243  if (cd->columnType.is_geometry()) {
3244  skip_physical_cols = cd->columnType.get_physical_cols();
3245  col_idx += skip_physical_cols;
3246  }
3247  }
3248  // Advance to the next column of values being loaded
3249  import_idx++;
3250  }
3251  } catch (const std::exception& e) {
3252  std::ostringstream oss;
3253  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
3254  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3255  THROW_MAPD_EXCEPTION(oss.str());
3256  }
3257  fillMissingBuffers(session,
3258  session_ptr->getCatalog(),
3259  import_buffers,
3260  loader->get_column_descs(),
3261  desc_id_to_column_id,
3262  num_rows,
3263  table_name,
3264  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3265  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3266  session_ptr->getCatalog(), table_name);
3267  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3268  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3269  }
3270 }
3271 
3272 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
3273 
3274 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3275  do { \
3276  ::arrow::Status _s = (s); \
3277  if (UNLIKELY(!_s.ok())) { \
3278  TOmniSciException ex; \
3279  ex.error_msg = _s.ToString(); \
3280  LOG(ERROR) << s.ToString(); \
3281  throw ex; \
3282  } \
3283  } while (0)
3284 
3285 namespace {
3286 
3287 RecordBatchVector loadArrowStream(const std::string& stream) {
3288  RecordBatchVector batches;
3289  try {
3290  // TODO(wesm): Make this simpler in general, see ARROW-1600
3291  auto stream_buffer =
3292  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
3293  static_cast<int64_t>(stream.size()));
3294 
3295  arrow::io::BufferReader buf_reader(stream_buffer);
3296  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3297  ARROW_ASSIGN_OR_THROW(batch_reader,
3298  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3299 
3300  while (true) {
3301  std::shared_ptr<arrow::RecordBatch> batch;
3302  // Read batch (zero-copy) from the stream
3303  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
3304  if (batch == nullptr) {
3305  break;
3306  }
3307  batches.emplace_back(std::move(batch));
3308  }
3309  } catch (const std::exception& e) {
3310  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
3311  }
3312  return batches;
3313 }
3314 
3315 } // namespace
3316 
3317 void DBHandler::load_table_binary_arrow(const TSessionId& session,
3318  const std::string& table_name,
3319  const std::string& arrow_stream,
3320  const bool use_column_names) {
3321  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3322  auto session_ptr = stdlog.getConstSessionInfo();
3323 
3324  RecordBatchVector batches = loadArrowStream(arrow_stream);
3325  // Assuming have one batch for now
3326  if (batches.size() != 1) {
3327  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
3328  }
3329  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3330  std::unique_ptr<import_export::Loader> loader;
3331  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3332  std::vector<std::string> column_names;
3333  if (use_column_names) {
3334  column_names = batch->schema()->field_names();
3335  }
3336  auto schema_read_lock =
3337  prepare_loader_generic(*session_ptr,
3338  table_name,
3339  static_cast<size_t>(batch->num_columns()),
3340  &loader,
3341  &import_buffers,
3342  column_names,
3343  "load_table_binary_arrow");
3344 
3345  auto desc_id_to_column_id =
3346  column_ids_by_names(loader->get_column_descs(), column_names);
3347  size_t num_rows = 0;
3348  size_t col_idx = 0;
3349  try {
3350  for (auto cd : loader->get_column_descs()) {
3351  auto mapped_idx = desc_id_to_column_id[col_idx];
3352  if (mapped_idx != -1) {
3353  auto& array = *batch->column(mapped_idx);
3354  import_export::ArraySliceRange row_slice(0, array.length());
3355  num_rows = import_buffers[col_idx]->add_arrow_values(
3356  cd, array, true, row_slice, nullptr);
3357  }
3358  col_idx++;
3359  }
3360  } catch (const std::exception& e) {
3361  LOG(ERROR) << "Input exception thrown: " << e.what()
3362  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3363  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
3364  // other import paths
3365  THROW_MAPD_EXCEPTION(e.what());
3366  }
3367  fillMissingBuffers(session,
3368  session_ptr->getCatalog(),
3369  import_buffers,
3370  loader->get_column_descs(),
3371  desc_id_to_column_id,
3372  num_rows,
3373  table_name,
3374  false);
3375  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3376  session_ptr->getCatalog(), table_name);
3377  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3378  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3379  }
3380 }
3381 
3382 void DBHandler::load_table(const TSessionId& session,
3383  const std::string& table_name,
3384  const std::vector<TStringRow>& rows,
3385  const std::vector<std::string>& column_names) {
3386  try {
3387  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3388  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3389  auto session_ptr = stdlog.getConstSessionInfo();
3390 
3391  if (rows.empty()) {
3392  THROW_MAPD_EXCEPTION("No rows to insert");
3393  }
3394 
3395  std::unique_ptr<import_export::Loader> loader;
3396  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3397  auto schema_read_lock =
3398  prepare_loader_generic(*session_ptr,
3399  table_name,
3400  static_cast<size_t>(rows.front().cols.size()),
3401  &loader,
3402  &import_buffers,
3403  column_names,
3404  "load_table");
3405 
3406  auto col_descs = loader->get_column_descs();
3407  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3408  import_export::CopyParams copy_params;
3409  size_t rows_completed = 0;
3410  for (auto const& row : rows) {
3411  size_t import_idx = 0; // index into the TStringRow being loaded
3412  size_t col_idx = 0; // index into column description vector
3413  try {
3414  size_t skip_physical_cols = 0;
3415  for (auto cd : col_descs) {
3416  if (skip_physical_cols > 0) {
3417  CHECK(cd->isGeoPhyCol);
3418  skip_physical_cols--;
3419  continue;
3420  }
3421  auto mapped_idx = desc_id_to_column_id[import_idx];
3422  if (mapped_idx != -1) {
3423  import_buffers[col_idx]->add_value(cd,
3424  row.cols[mapped_idx].str_val,
3425  row.cols[mapped_idx].is_null,
3426  copy_params);
3427  }
3428  col_idx++;
3429  if (cd->columnType.is_geometry()) {
3430  // physical geo columns will be filled separately lately
3431  skip_physical_cols = cd->columnType.get_physical_cols();
3432  col_idx += skip_physical_cols;
3433  }
3434  // Advance to the next field within the row
3435  import_idx++;
3436  }
3437  rows_completed++;
3438  } catch (const std::exception& e) {
3439  LOG(ERROR) << "Input exception thrown: " << e.what()
3440  << ". Row discarded, issue at column : " << (col_idx + 1)
3441  << " data :" << row;
3442  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3443  }
3444  }
3445  // do batch filling of geo columns separately
3446  if (rows.size() != 0) {
3447  const auto& row = rows[0];
3448  size_t col_idx = 0; // index into column description vector
3449  try {
3450  size_t import_idx = 0;
3451  size_t skip_physical_cols = 0;
3452  for (auto cd : col_descs) {
3453  if (skip_physical_cols > 0) {
3454  skip_physical_cols--;
3455  continue;
3456  }
3457  auto mapped_idx = desc_id_to_column_id[import_idx];
3458  col_idx++;
3459  if (cd->columnType.is_geometry()) {
3460  skip_physical_cols = cd->columnType.get_physical_cols();
3461  if (mapped_idx != -1) {
3462  fillGeoColumns(session,
3463  session_ptr->getCatalog(),
3464  import_buffers,
3465  cd,
3466  col_idx,
3467  rows_completed,
3468  table_name,
3469  false);
3470  } else {
3471  col_idx += skip_physical_cols;
3472  }
3473  }
3474  import_idx++;
3475  }
3476  } catch (const std::exception& e) {
3477  LOG(ERROR) << "Input exception thrown: " << e.what()
3478  << ". Row discarded, issue at column : " << (col_idx + 1)
3479  << " data :" << row;
3480  THROW_MAPD_EXCEPTION(e.what());
3481  }
3482  }
3483  fillMissingBuffers(session,
3484  session_ptr->getCatalog(),
3485  import_buffers,
3486  col_descs,
3487  desc_id_to_column_id,
3488  rows_completed,
3489  table_name,
3490  false);
3491  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3492  session_ptr->getCatalog(), table_name);
3493  if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3494  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3495  }
3496 
3497  } catch (const std::exception& e) {
3498  THROW_MAPD_EXCEPTION(std::string(e.what()));
3499  }
3500 }
3501 
3502 char DBHandler::unescape_char(std::string str) {
3503  char out = str[0];
3504  if (str.size() == 2 && str[0] == '\\') {
3505  if (str[1] == 't') {
3506  out = '\t';
3507  } else if (str[1] == 'n') {
3508  out = '\n';
3509  } else if (str[1] == '0') {
3510  out = '\0';
3511  } else if (str[1] == '\'') {
3512  out = '\'';
3513  } else if (str[1] == '\\') {
3514  out = '\\';
3515  }
3516  }
3517  return out;
3518 }
3519 
3521  import_export::CopyParams copy_params;
3522  switch (cp.has_header) {
3523  case TImportHeaderRow::AUTODETECT:
3525  break;
3526  case TImportHeaderRow::NO_HEADER:
3528  break;
3529  case TImportHeaderRow::HAS_HEADER:
3531  break;
3532  default:
3533  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
3534  std::to_string((int)cp.has_header));
3535  break;
3536  }
3537  copy_params.quoted = cp.quoted;
3538  if (cp.delimiter.length() > 0) {
3539  copy_params.delimiter = unescape_char(cp.delimiter);
3540  } else {
3541  copy_params.delimiter = '\0';
3542  }
3543  if (cp.null_str.length() > 0) {
3544  copy_params.null_str = cp.null_str;
3545  }
3546  if (cp.quote.length() > 0) {
3547  copy_params.quote = unescape_char(cp.quote);
3548  }
3549  if (cp.escape.length() > 0) {
3550  copy_params.escape = unescape_char(cp.escape);
3551  }
3552  if (cp.line_delim.length() > 0) {
3553  copy_params.line_delim = unescape_char(cp.line_delim);
3554  }
3555  if (cp.array_delim.length() > 0) {
3556  copy_params.array_delim = unescape_char(cp.array_delim);
3557  }
3558  if (cp.array_begin.length() > 0) {
3559  copy_params.array_begin = unescape_char(cp.array_begin);
3560  }
3561  if (cp.array_end.length() > 0) {
3562  copy_params.array_end = unescape_char(cp.array_end);
3563  }
3564  if (cp.threads != 0) {
3565  copy_params.threads = cp.threads;
3566  }
3567  if (cp.s3_access_key.length() > 0) {
3568  copy_params.s3_access_key = cp.s3_access_key;
3569  }
3570  if (cp.s3_secret_key.length() > 0) {
3571  copy_params.s3_secret_key = cp.s3_secret_key;
3572  }
3573  if (cp.s3_session_token.length() > 0) {
3574  copy_params.s3_session_token = cp.s3_session_token;
3575  }
3576  if (cp.s3_region.length() > 0) {
3577  copy_params.s3_region = cp.s3_region;
3578  }
3579  if (cp.s3_endpoint.length() > 0) {
3580  copy_params.s3_endpoint = cp.s3_endpoint;
3581  }
3582 #ifdef HAVE_AWS_S3
3583  if (g_allow_s3_server_privileges && cp.s3_access_key.length() == 0 &&
3584  cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3585  const auto& server_credentials =
3586  Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3587  copy_params.s3_access_key = server_credentials.GetAWSAccessKeyId();
3588  copy_params.s3_secret_key = server_credentials.GetAWSSecretKey();
3589  copy_params.s3_session_token = server_credentials.GetSessionToken();
3590  }
3591 #endif
3592  switch (cp.file_type) {
3593  case TFileType::POLYGON:
3595  break;
3596  case TFileType::DELIMITED:
3598  break;
3599 #ifdef ENABLE_IMPORT_PARQUET
3600  case TFileType::PARQUET:
3601  copy_params.file_type = import_export::FileType::PARQUET;
3602  break;
3603 #endif
3604  default:
3605  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
3606  std::to_string((int)cp.file_type));
3607  break;
3608  }
3609  switch (cp.geo_coords_encoding) {
3610  case TEncodingType::GEOINT:
3611  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3612  break;
3613  case TEncodingType::NONE:
3614  copy_params.geo_coords_encoding = kENCODING_NONE;
3615  break;
3616  default:
3617  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3618  std::to_string((int)cp.geo_coords_encoding));
3619  break;
3620  }
3621  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3622  switch (cp.geo_coords_type) {
3623  case TDatumType::GEOGRAPHY:
3624  copy_params.geo_coords_type = kGEOGRAPHY;
3625  break;
3626  case TDatumType::GEOMETRY:
3627  copy_params.geo_coords_type = kGEOMETRY;
3628  break;
3629  default:
3630  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3631  std::to_string((int)cp.geo_coords_type));
3632  break;
3633  }
3634  switch (cp.geo_coords_srid) {
3635  case 4326:
3636  case 3857:
3637  case 900913:
3638  copy_params.geo_coords_srid = cp.geo_coords_srid;
3639  break;
3640  default:
3641  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3642  std::to_string((int)cp.geo_coords_srid));
3643  break;
3644  }
3645  copy_params.sanitize_column_names = cp.sanitize_column_names;
3646  copy_params.geo_layer_name = cp.geo_layer_name;
3647  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3648  copy_params.geo_explode_collections = cp.geo_explode_collections;
3649  copy_params.source_srid = cp.source_srid;
3650  return copy_params;
3651 }
3652 
3654  TCopyParams copy_params;
3655  copy_params.delimiter = cp.delimiter;
3656  copy_params.null_str = cp.null_str;
3657  switch (cp.has_header) {
3659  copy_params.has_header = TImportHeaderRow::AUTODETECT;
3660  break;
3662  copy_params.has_header = TImportHeaderRow::NO_HEADER;
3663  break;
3665  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
3666  break;
3667  default:
3668  CHECK(false);
3669  break;
3670  }
3671  copy_params.quoted = cp.quoted;
3672  copy_params.quote = cp.quote;
3673  copy_params.escape = cp.escape;
3674  copy_params.line_delim = cp.line_delim;
3675  copy_params.array_delim = cp.array_delim;
3676  copy_params.array_begin = cp.array_begin;
3677  copy_params.array_end = cp.array_end;
3678  copy_params.threads = cp.threads;
3679  copy_params.s3_access_key = cp.s3_access_key;
3680  copy_params.s3_secret_key = cp.s3_secret_key;
3681  copy_params.s3_session_token = cp.s3_session_token;
3682  copy_params.s3_region = cp.s3_region;
3683  copy_params.s3_endpoint = cp.s3_endpoint;
3684  switch (cp.file_type) {
3686  copy_params.file_type = TFileType::POLYGON;
3687  break;
3688  default:
3689  copy_params.file_type = TFileType::DELIMITED;
3690  break;
3691  }
3692  switch (cp.geo_coords_encoding) {
3693  case kENCODING_GEOINT:
3694  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
3695  break;
3696  default:
3697  copy_params.geo_coords_encoding = TEncodingType::NONE;
3698  break;
3699  }
3700  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3701  switch (cp.geo_coords_type) {
3702  case kGEOGRAPHY:
3703  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
3704  break;
3705  case kGEOMETRY:
3706  copy_params.geo_coords_type = TDatumType::GEOMETRY;
3707  break;
3708  default:
3709  CHECK(false);
3710  break;
3711  }
3712  copy_params.geo_coords_srid = cp.geo_coords_srid;
3713  copy_params.sanitize_column_names = cp.sanitize_column_names;
3714  copy_params.geo_layer_name = cp.geo_layer_name;
3715  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3716  copy_params.geo_explode_collections = cp.geo_explode_collections;
3717  copy_params.source_srid = cp.source_srid;
3718  return copy_params;
3719 }
3720 
3721 namespace {
3722 void add_vsi_network_prefix(std::string& path) {
3723  // do we support network file access?
3724  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
3725 
3726  // modify head of filename based on source location
3727  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
3728  if (!gdal_network) {
3730  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
3731  }
3732  // invoke GDAL CURL virtual file reader
3733  path = "/vsicurl/" + path;
3734  } else if (boost::istarts_with(path, "s3://")) {
3735  if (!gdal_network) {
3737  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
3738  }
3739  // invoke GDAL S3 virtual file reader
3740  boost::replace_first(path, "s3://", "/vsis3/");
3741  }
3742 }
3743 
3744 void add_vsi_geo_prefix(std::string& path) {
3745  // single gzip'd file (not an archive)?
3746  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3747  path = "/vsigzip/" + path;
3748  }
3749 }
3750 
3751 void add_vsi_archive_prefix(std::string& path) {
3752  // check for compressed file or file bundle
3753  if (boost::iends_with(path, ".zip")) {
3754  // zip archive
3755  path = "/vsizip/" + path;
3756  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3757  boost::iends_with(path, ".tar.gz")) {
3758  // tar archive (compressed or uncompressed)
3759  path = "/vsitar/" + path;
3760  }
3761 }
3762 
3763 std::string remove_vsi_prefixes(const std::string& path_in) {
3764  std::string path(path_in);
3765 
3766  // these will be first
3767  if (boost::istarts_with(path, "/vsizip/")) {
3768  boost::replace_first(path, "/vsizip/", "");
3769  } else if (boost::istarts_with(path, "/vsitar/")) {
3770  boost::replace_first(path, "/vsitar/", "");
3771  } else if (boost::istarts_with(path, "/vsigzip/")) {
3772  boost::replace_first(path, "/vsigzip/", "");
3773  }
3774 
3775  // then these
3776  if (boost::istarts_with(path, "/vsicurl/")) {
3777  boost::replace_first(path, "/vsicurl/", "");
3778  } else if (boost::istarts_with(path, "/vsis3/")) {
3779  boost::replace_first(path, "/vsis3/", "s3://");
3780  }
3781 
3782  return path;
3783 }
3784 
3785 bool path_is_relative(const std::string& path) {
3786  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
3787  boost::istarts_with(path, "https://")) {
3788  return false;
3789  }
3790  return !boost::filesystem::path(path).is_absolute();
3791 }
3792 
3793 bool path_has_valid_filename(const std::string& path) {
3794  auto filename = boost::filesystem::path(path).filename().string();
3795  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
3796  return false;
3797  }
3798  return true;
3799 }
3800 
3801 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
3802  if (!path_has_valid_filename(path)) {
3803  return false;
3804  }
3805  if (include_gz) {
3806  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
3807  return true;
3808  }
3809  }
3810  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
3811  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
3812  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
3813  boost::iends_with(path, ".gdb.zip") || boost::iends_with(path, ".fgb")) {
3814  return true;
3815  }
3816  return false;
3817 }
3818 
3819 bool is_a_supported_archive_file(const std::string& path) {
3820  if (!path_has_valid_filename(path)) {
3821  return false;
3822  }
3823  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
3824  return true;
3825  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3826  boost::iends_with(path, ".tar.gz")) {
3827  return true;
3828  }
3829  return false;
3830 }
3831 
3832 std::string find_first_geo_file_in_archive(const std::string& archive_path,
3833  const import_export::CopyParams& copy_params) {
3834  // get the recursive list of all files in the archive
3835  std::vector<std::string> files =
3836  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
3837 
3838  // report the list
3839  LOG(INFO) << "Found " << files.size() << " files in Archive "
3840  << remove_vsi_prefixes(archive_path);
3841  for (const auto& file : files) {
3842  LOG(INFO) << " " << file;
3843  }
3844 
3845  // scan the list for the first candidate file
3846  bool found_suitable_file = false;
3847  std::string file_name;
3848  for (const auto& file : files) {
3849  if (is_a_supported_geo_file(file, false)) {
3850  file_name = file;
3851  found_suitable_file = true;
3852  break;
3853  }
3854  }
3855 
3856  // if we didn't find anything
3857  if (!found_suitable_file) {
3858  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
3859  remove_vsi_prefixes(archive_path);
3860  file_name.clear();
3861  }
3862 
3863  // done
3864  return file_name;
3865 }
3866 
3867 bool is_local_file(const std::string& file_path) {
3868  return (!boost::istarts_with(file_path, "s3://") &&
3869  !boost::istarts_with(file_path, "http://") &&
3870  !boost::istarts_with(file_path, "https://"));
3871 }
3872 
3873 void validate_import_file_path_if_local(const std::string& file_path) {
3874  if (is_local_file(file_path)) {
3876  }
3877 }
3878 } // namespace
3879 
3880 void DBHandler::detect_column_types(TDetectResult& _return,
3881  const TSessionId& session,
3882  const std::string& file_name_in,
3883  const TCopyParams& cp) {
3884  auto stdlog = STDLOG(get_session_ptr(session));
3885  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3886  check_read_only("detect_column_types");
3887 
3889 
3890  std::string file_name{file_name_in};
3891  if (path_is_relative(file_name)) {
3892  // assume relative paths are relative to data_path / mapd_import / <session>
3893  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3894  boost::filesystem::path(file_name).filename();
3895  file_name = file_path.string();
3896  }
3898 
3899  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
3900  if (copy_params.file_type == import_export::FileType::POLYGON) {
3901  if (is_a_supported_geo_file(file_name, true)) {
3902  // prepare to detect geo file directly
3903  add_vsi_network_prefix(file_name);
3904  add_vsi_geo_prefix(file_name);
3905  } else if (is_a_supported_archive_file(file_name)) {
3906  // find the archive file
3907  add_vsi_network_prefix(file_name);
3908  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
3909  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3910  }
3911  // find geo file in archive
3912  add_vsi_archive_prefix(file_name);
3913  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3914  // prepare to detect that geo file
3915  if (geo_file.size()) {
3916  file_name = file_name + std::string("/") + geo_file;
3917  }
3918  } else {
3919  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
3920  file_name_in);
3921  }
3922  }
3923 
3924  auto file_path = boost::filesystem::path(file_name);
3925  // can be a s3 url
3926  if (!boost::istarts_with(file_name, "s3://")) {
3927  if (!boost::filesystem::path(file_name).is_absolute()) {
3928  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3929  boost::filesystem::path(file_name).filename();
3930  file_name = file_path.string();
3931  }
3932 
3933  if (copy_params.file_type == import_export::FileType::POLYGON) {
3934  // check for geo file
3935  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3936  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3937  }
3938  } else {
3939  // check for regular file
3940  if (!boost::filesystem::exists(file_path)) {
3941  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3942  }
3943  }
3944  }
3945 
3946  try {
3948 #ifdef ENABLE_IMPORT_PARQUET
3949  || (copy_params.file_type == import_export::FileType::PARQUET)
3950 #endif
3951  ) {
3952  import_export::Detector detector(file_path, copy_params);
3953  std::vector<SQLTypes> best_types = detector.best_sqltypes;
3954  std::vector<EncodingType> best_encodings = detector.best_encodings;
3955  std::vector<std::string> headers = detector.get_headers();
3956  copy_params = detector.get_copy_params();
3957 
3958  _return.copy_params = copyparams_to_thrift(copy_params);
3959  _return.row_set.row_desc.resize(best_types.size());
3960  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
3961  TColumnType col;
3962  SQLTypes t = best_types[col_idx];
3963  EncodingType encodingType = best_encodings[col_idx];
3964  SQLTypeInfo ti(t, false, encodingType);
3965  if (IS_GEO(t)) {
3966  // set this so encoding_to_thrift does the right thing
3967  ti.set_compression(copy_params.geo_coords_encoding);
3968  // fill in these directly
3969  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3970  col.col_type.scale = copy_params.geo_coords_srid;
3971  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3972  }
3973  col.col_type.type = type_to_thrift(ti);
3974  col.col_type.encoding = encoding_to_thrift(ti);
3975  if (copy_params.sanitize_column_names) {
3976  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3977  } else {
3978  col.col_name = headers[col_idx];
3979  }
3980  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3981  _return.row_set.row_desc[col_idx] = col;
3982  }
3983  size_t num_samples = 100;
3984  auto sample_data = detector.get_sample_rows(num_samples);
3985 
3986  TRow sample_row;
3987  for (auto row : sample_data) {
3988  sample_row.cols.clear();
3989  for (const auto& s : row) {
3990  TDatum td;
3991  td.val.str_val = s;
3992  td.is_null = s.empty();
3993  sample_row.cols.push_back(td);
3994  }
3995  _return.row_set.rows.push_back(sample_row);
3996  }
3997  } else if (copy_params.file_type == import_export::FileType::POLYGON) {
3998  // @TODO simon.eves get this from somewhere!
3999  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
4000 
4001  check_geospatial_files(file_path, copy_params);
4002  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
4003  file_path.string(), geoColumnName, copy_params);
4004  for (auto cd : cds) {
4005  if (copy_params.sanitize_column_names) {
4006  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
4007  }
4008  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
4009  }
4010  std::map<std::string, std::vector<std::string>> sample_data;
4012  file_path.string(), geoColumnName, sample_data, 100, copy_params);
4013  if (sample_data.size() > 0) {
4014  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4015  TRow sample_row;
4016  for (auto cd : cds) {
4017  TDatum td;
4018  td.val.str_val = sample_data[cd.sourceName].at(i);
4019  td.is_null = td.val.str_val.empty();
4020  sample_row.cols.push_back(td);
4021  }
4022  _return.row_set.rows.push_back(sample_row);
4023  }
4024  }
4025  _return.copy_params = copyparams_to_thrift(copy_params);
4026  }
4027  } catch (const std::exception& e) {
4028  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
4029  }
4030 }
4031 
4032 void DBHandler::render_vega(TRenderResult& _return,
4033  const TSessionId& session,
4034  const int64_t widget_id,
4035  const std::string& vega_json,
4036  const int compression_level,
4037  const std::string& nonce) {
4038  auto stdlog = STDLOG(get_session_ptr(session),
4039  "widget_id",
4040  widget_id,
4041  "compression_level",
4042  compression_level,
4043  "vega_json",
4044  vega_json,
4045  "nonce",
4046  nonce);
4047  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4048  stdlog.appendNameValuePairs("nonce", nonce);
4049  auto session_ptr = stdlog.getConstSessionInfo();
4050  if (!render_handler_) {
4051  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
4052  }
4053 
4054  _return.total_time_ms = measure<>::execution([&]() {
4055  try {
4056  render_handler_->render_vega(_return,
4057  stdlog.getSessionInfo(),
4058  widget_id,
4059  vega_json,
4060  compression_level,
4061  nonce);
4062  } catch (std::exception& e) {
4063  THROW_MAPD_EXCEPTION(e.what());
4064  }
4065  });
4066 }
4067 
4069  int32_t dashboard_id,
4070  AccessPrivileges requestedPermissions) {
4071  DBObject object(dashboard_id, DashboardDBObjectType);
4072  auto& catalog = session_info.getCatalog();
4073  auto& user = session_info.get_currentUser();
4074  object.loadKey(catalog);
4075  object.setPrivileges(requestedPermissions);
4076  std::vector<DBObject> privs = {object};
4077  return SysCatalog::instance().checkPrivileges(user, privs);
4078 }
4079 
4080 // custom expressions
4081 namespace {
4084 
4085 std::unique_ptr<Catalog_Namespace::CustomExpression> create_custom_expr_from_thrift_obj(
4086  const TCustomExpression& t_custom_expr) {
4087  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4088  << "Unexpected data source type: "
4089  << static_cast<int>(t_custom_expr.data_source_type);
4090  DataSourceType data_source_type = DataSourceType::TABLE;
4091  return std::make_unique<CustomExpression>(t_custom_expr.name,
4092  t_custom_expr.expression_json,
4093  data_source_type,
4094  t_custom_expr.data_source_id);
4095 }
4096 
4098  const CustomExpression& custom_expr) {
4099  TCustomExpression t_custom_expr;
4100  t_custom_expr.id = custom_expr.id;
4101  t_custom_expr.name = custom_expr.name;
4102  t_custom_expr.expression_json = custom_expr.expression_json;
4103  t_custom_expr.data_source_id = custom_expr.data_source_id;
4104  t_custom_expr.is_deleted = custom_expr.is_deleted;
4105  CHECK(custom_expr.data_source_type == DataSourceType::TABLE)
4106  << "Unexpected data source type: "
4107  << static_cast<int>(custom_expr.data_source_type);
4108  t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4109  return t_custom_expr;
4110 }
4111 } // namespace
4112 
4113 int32_t DBHandler::create_custom_expression(const TSessionId& session,
4114  const TCustomExpression& t_custom_expr) {
4115  auto stdlog = STDLOG(get_session_ptr(session));
4116  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4117  check_read_only("create_custom_expression");
4118 
4119  auto session_ptr = stdlog.getConstSessionInfo();
4120  if (!session_ptr->get_currentUser().isSuper) {
4121  THROW_MAPD_EXCEPTION("Custom expressions can only be created by super users.")
4122  }
4123  auto& catalog = session_ptr->getCatalog();
4124  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4125  << "Unexpected data source type: "
4126  << static_cast<int>(t_custom_expr.data_source_type);
4127  if (catalog.getMetadataForTable(t_custom_expr.data_source_id, false) == nullptr) {
4128  THROW_MAPD_EXCEPTION("Custom expression references a table that does not exist.")
4129  }
4130  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
4131  return catalog.createCustomExpression(
4132  create_custom_expr_from_thrift_obj(t_custom_expr));
4133 }
4134 
4135 void DBHandler::get_custom_expressions(std::vector<TCustomExpression>& _return,
4136  const TSessionId& session) {
4137  auto stdlog = STDLOG(get_session_ptr(session));
4138  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4139 
4140  auto session_ptr = stdlog.getConstSessionInfo();
4141  auto& catalog = session_ptr->getCatalog();
4142  mapd_shared_lock<mapd_shared_mutex> read_lock(custom_expressions_mutex_);
4143  auto custom_expressions =
4144  catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4145  for (const auto& custom_expression : custom_expressions) {
4146  _return.emplace_back(create_thrift_obj_from_custom_expr(*custom_expression));
4147  }
4148 }
4149 
4150 void DBHandler::update_custom_expression(const TSessionId& session,
4151  const int32_t id,
4152  const std::string& expression_json) {
4153  auto stdlog = STDLOG(get_session_ptr(session));
4154  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4155  check_read_only("update_custom_expression");
4156 
4157  auto session_ptr = stdlog.getConstSessionInfo();
4158  if (!session_ptr->get_currentUser().isSuper) {
4159  THROW_MAPD_EXCEPTION("Custom expressions can only be updated by super users.")
4160  }
4161  auto& catalog = session_ptr->getCatalog();
4162  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
4163  catalog.updateCustomExpression(id, expression_json);
4164 }
4165 
4167  const TSessionId& session,
4168  const std::vector<int32_t>& custom_expression_ids,
4169  const bool do_soft_delete) {
4170  auto stdlog = STDLOG(get_session_ptr(session));
4171  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4172  check_read_only("delete_custom_expressions");
4173 
4174  auto session_ptr = stdlog.getConstSessionInfo();
4175  if (!session_ptr->get_currentUser().isSuper) {
4176  THROW_MAPD_EXCEPTION("Custom expressions can only be deleted by super users.")
4177  }
4178  auto& catalog = session_ptr->getCatalog();
4179  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
4180  catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4181 }
4182 
4183 // dashboards
4184 void DBHandler::get_dashboard(TDashboard& dashboard,
4185  const TSessionId& session,
4186  const int32_t dashboard_id) {
4187  auto stdlog = STDLOG(get_session_ptr(session));
4188  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4189  auto session_ptr = stdlog.getConstSessionInfo();
4190  auto const& cat = session_ptr->getCatalog();
4192  auto dash = cat.getMetadataForDashboard(dashboard_id);
4193  if (!dash) {
4194  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
4195  " doesn't exist");
4196  }
4198  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4199  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
4200  std::to_string(dashboard_id));
4201  }
4202  user_meta.userName = "";
4203  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4204  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
4205 }
4206 
4207 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
4208  const TSessionId& session) {
4209  auto stdlog = STDLOG(get_session_ptr(session));
4210  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4211  auto session_ptr = stdlog.getConstSessionInfo();
4212  auto const& cat = session_ptr->getCatalog();
4214  const auto dashes = cat.getAllDashboardsMetadata();
4215  user_meta.userName = "";
4216  for (const auto dash : dashes) {
4218  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4219  // dashboardState is intentionally not populated here
4220  // for payload reasons
4221  // use get_dashboard call to get state
4222  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
4223  }
4224  }
4225 }
4226 
4228  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4230  const DashboardDescriptor* dash,
4231  const bool populate_state) {
4232  auto const& cat = session_ptr->getCatalog();
4233  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4234  auto objects_list = SysCatalog::instance().getMetadataForObject(
4235  cat.getCurrentDB().dbId,
4236  static_cast<int>(DBObjectType::DashboardDBObjectType),
4237  dash->dashboardId);
4238  TDashboard dashboard;
4239  dashboard.dashboard_name = dash->dashboardName;
4240  if (populate_state)
4241  dashboard.dashboard_state = dash->dashboardState;
4242  dashboard.image_hash = dash->imageHash;
4243  dashboard.update_time = dash->updateTime;
4244  dashboard.dashboard_metadata = dash->dashboardMetadata;
4245  dashboard.dashboard_id = dash->dashboardId;
4246  dashboard.dashboard_owner = dash->user;
4247  TDashboardPermissions perms;
4248  // Super user has all permissions.
4249  if (session_ptr->get_currentUser().isSuper) {
4250  perms.create_ = true;
4251  perms.delete_ = true;
4252  perms.edit_ = true;
4253  perms.view_ = true;
4254  } else {
4255  // Collect all grants on current user
4256  // add them to the permissions.
4257  auto obj_to_find =
4258  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
4259  obj_to_find.loadKey(session_ptr->getCatalog());
4260  std::vector<std::string> grantees =
4261  SysCatalog::instance().getRoles(true,
4262  session_ptr->get_currentUser().isSuper,
4263  session_ptr->get_currentUser().userName);
4264  for (const auto& grantee : grantees) {
4265  DBObject* object_found;
4266  auto* gr = SysCatalog::instance().getGrantee(grantee);
4267  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
4268  const auto obj_privs = object_found->getPrivileges();
4269  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4270  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4271  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4272  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4273  }
4274  }
4275  }
4276  dashboard.dashboard_permissions = perms;
4277  if (objects_list.empty() ||
4278  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
4279  dashboard.is_dash_shared = false;
4280  } else {
4281  dashboard.is_dash_shared = true;
4282  }
4283  return dashboard;
4284 }
4285 
4286 int32_t DBHandler::create_dashboard(const TSessionId& session,
4287  const std::string& dashboard_name,
4288  const std::string& dashboard_state,
4289  const std::string& image_hash,
4290  const std::string& dashboard_metadata) {
4291  auto stdlog = STDLOG(get_session_ptr(session));
4292  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4293  auto session_ptr = stdlog.getConstSessionInfo();
4294  check_read_only("create_dashboard");
4295  auto& cat = session_ptr->getCatalog();
4296 
4297  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
4299  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
4300  }
4301 
4302  auto dash = cat.getMetadataForDashboard(
4303  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
4304  if (dash) {
4305  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4306  }
4307 
4309  dd.dashboardName = dashboard_name;
4310  dd.dashboardState = dashboard_state;
4311  dd.imageHash = image_hash;
4312  dd.dashboardMetadata = dashboard_metadata;
4313  dd.userId = session_ptr->get_currentUser().userId;
4314  dd.user = session_ptr->get_currentUser().userName;
4315 
4316  try {
4317  auto id = cat.createDashboard(dd);
4318  // TODO: transactionally unsafe
4319  SysCatalog::instance().createDBObject(
4320  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
4321  return id;
4322  } catch (const std::exception& e) {
4323  THROW_MAPD_EXCEPTION(e.what());
4324  }
4325 }
4326 
4327 void DBHandler::replace_dashboard(const TSessionId& session,
4328  const int32_t dashboard_id,
4329  const std::string& dashboard_name,
4330  const std::string& dashboard_owner,
4331  const std::string& dashboard_state,
4332  const std::string& image_hash,
4333  const std::string& dashboard_metadata) {
4334  auto stdlog = STDLOG(get_session_ptr(session));
4335  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4336  auto session_ptr = stdlog.getConstSessionInfo();
4337  CHECK(session_ptr);
4338  check_read_only("replace_dashboard");
4339  auto& cat = session_ptr->getCatalog();
4340 
4342  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
4343  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
4344  }
4345 
4347  dd.dashboardName = dashboard_name;
4348  dd.dashboardState = dashboard_state;
4349  dd.imageHash = image_hash;
4350  dd.dashboardMetadata = dashboard_metadata;
4352  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
4353  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
4354  " does not exist");
4355  }
4356  dd.userId = user.userId;
4357  dd.user = dashboard_owner;
4358  dd.dashboardId = dashboard_id;
4359 
4360  try {
4361  cat.replaceDashboard(dd);
4362  } catch (const std::exception& e) {
4363  THROW_MAPD_EXCEPTION(e.what());
4364  }
4365 }
4366 
4367 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
4368  delete_dashboards(session, {dashboard_id});
4369 }
4370 
4371 void DBHandler::delete_dashboards(const TSessionId& session,
4372  const std::vector<int32_t>& dashboard_ids) {
4373  auto stdlog = STDLOG(get_session_ptr(session));
4374  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4375  auto session_ptr = stdlog.getConstSessionInfo();
4376  check_read_only("delete_dashboards");
4377  auto& cat = session_ptr->getCatalog();
4378  // Checks will be performed in catalog
4379  try {
4380  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4381  } catch (const std::exception& e) {
4382  THROW_MAPD_EXCEPTION(e.what());
4383  }
4384 }
4385 
4386 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
4387  int32_t dashboard_id,
4388  std::vector<std::string> groups) {
4389  const auto session_info = get_session_copy(session);
4390  auto& cat = session_info.getCatalog();
4391  auto dash = cat.getMetadataForDashboard(dashboard_id);
4392  if (!dash) {
4393  THROW_MAPD_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4394  " does not exist");
4395  } else if (session_info.get_currentUser().userId != dash->userId &&
4396  !session_info.get_currentUser().isSuper) {
4397  throw std::runtime_error(
4398  "User should be either owner of dashboard or super user to share/unshare it");
4399  }
4400  std::vector<std::string> valid_groups;
4402  for (auto& group : groups) {
4403  user_meta.isSuper = false; // initialize default flag
4404  if (!SysCatalog::instance().getGrantee(group)) {
4405  THROW_MAPD_EXCEPTION("User/Role " + group + " does not exist");
4406  } else if (!user_meta.isSuper) {
4407  valid_groups.push_back(group);
4408  }
4409  }
4410  return valid_groups;
4411 }
4412 
4413 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
4414  for (auto const& group : groups) {
4415  if (!SysCatalog::instance().getGrantee(group)) {
4416  THROW_MAPD_EXCEPTION("User/Role '" + group + "' does not exist");
4417  }
4418  }
4419 }
4420 
4422  const Catalog_Namespace::SessionInfo& session_info,
4423  const std::vector<int32_t>& dashboard_ids) {
4424  auto& cat = session_info.getCatalog();
4425  std::map<std::string, std::list<int32_t>> errors;
4426  for (auto const& dashboard_id : dashboard_ids) {
4427  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
4428  if (!dashboard) {
4429  errors["Dashboard id does not exist"].push_back(dashboard_id);
4430  } else if (session_info.get_currentUser().userId != dashboard->userId &&
4431  !session_info.get_currentUser().isSuper) {
4432  errors["User should be either owner of dashboard or super user to share/unshare it"]
4433  .push_back(dashboard_id);
4434  }
4435  }
4436  if (!errors.empty()) {
4437  std::stringstream error_stream;
4438  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
4439  for (const auto& [error, id_list] : errors) {
4440  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
4441  }
4442  THROW_MAPD_EXCEPTION(error_stream.str());
4443  }
4444 }
4445 
4446 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
4447  const std::vector<int32_t>& dashboard_ids,
4448  const std::vector<std::string>& groups,
4449  const TDashboardPermissions& permissions,
4450  const bool do_share) {
4451  auto stdlog = STDLOG(get_session_ptr(session));
4452  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4453  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
4454  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
4455  !permissions.view_) {
4456  THROW_MAPD_EXCEPTION("At least one privilege should be assigned for " +
4457  std::string(do_share ? "grants" : "revokes"));
4458  }
4459  auto session_ptr = stdlog.getConstSessionInfo();
4460  auto const& catalog = session_ptr->getCatalog();
4461  auto& sys_catalog = SysCatalog::instance();
4462  validateGroups(groups);
4463  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
4464  std::vector<DBObject> batch_objects;
4465  for (auto const& dashboard_id : dashboard_ids) {
4466  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
4467  AccessPrivileges privs;
4468  if (permissions.delete_) {
4470  }
4471  if (permissions.create_) {
4473  }
4474  if (permissions.edit_) {
4476  }
4477  if (permissions.view_) {
4479  }
4480  object.setPrivileges(privs);
4481  batch_objects.push_back(object);
4482  }
4483  if (do_share) {
4484  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4485  } else {
4486  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4487  }
4488 }
4489 
4490 void DBHandler::share_dashboards(const TSessionId& session,
4491  const std::vector<int32_t>& dashboard_ids,
4492  const std::vector<std::string>& groups,
4493  const TDashboardPermissions& permissions) {
4494  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
4495 }
4496 
4497 // NOOP: Grants not available for objects as of now
4498 void DBHandler::share_dashboard(const TSessionId& session,
4499  const int32_t dashboard_id,
4500  const std::vector<std::string>& groups,
4501  const std::vector<std::string>& objects,
4502  const TDashboardPermissions& permissions,
4503  const bool grant_role = false) {
4504  share_dashboards(session, {dashboard_id}, groups, permissions);
4505 }
4506 
4507 void DBHandler::unshare_dashboards(const TSessionId& session,
4508  const std::vector<int32_t>& dashboard_ids,
4509  const std::vector<std::string>& groups,
4510  const TDashboardPermissions& permissions) {
4511  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
4512 }
4513 
4514 void DBHandler::unshare_dashboard(const TSessionId& session,
4515  const int32_t dashboard_id,
4516  const std::vector<std::string>& groups,
4517  const std::vector<std::string>& objects,
4518  const TDashboardPermissions& permissions) {
4519  unshare_dashboards(session, {dashboard_id}, groups, permissions);
4520 }
4521 
4523  std::vector<TDashboardGrantees>& dashboard_grantees,
4524  const TSessionId& session,
4525  const int32_t dashboard_id) {
4526  auto stdlog = STDLOG(get_session_ptr(session));
4527  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4528  auto session_ptr = stdlog.getConstSessionInfo();
4529  auto const& cat = session_ptr->getCatalog();
4531  auto dash = cat.getMetadataForDashboard(dashboard_id);
4532  if (!dash) {
4533  THROW_MAPD_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4534  " does not exist");
4535  } else if (session_ptr->get_currentUser().userId != dash->userId &&
4536  !session_ptr->get_currentUser().isSuper) {
4538  "User should be either owner of dashboard or super user to access grantees");
4539  }
4540  std::vector<ObjectRoleDescriptor*> objectsList;
4541  objectsList = SysCatalog::instance().getMetadataForObject(
4542  cat.getCurrentDB().dbId,
4543  static_cast<int>(DBObjectType::DashboardDBObjectType),
4544  dashboard_id); // By default objecttypecan be only dashabaords
4545  user_meta.userId = -1;
4546  user_meta.userName = "";
4547  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4548  for (auto object : objectsList) {
4549  if (user_meta.userName == object->roleName) {
4550  // Mask owner
4551  continue;
4552  }
4553  TDashboardGrantees grantee;
4554  TDashboardPermissions perm;
4555  grantee.name = object->roleName;
4556  grantee.is_user = object->roleType;
4557  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4558  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4559  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4560  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4561  grantee.permissions = perm;
4562  dashboard_grantees.push_back(grantee);
4563  }
4564 }
4565 
4566 void DBHandler::create_link(std::string& _return,
4567  const TSessionId& session,
4568  const std::string& view_state,
4569  const std::string& view_metadata) {
4570  auto stdlog = STDLOG(get_session_ptr(session));
4571  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4572  auto session_ptr = stdlog.getConstSessionInfo();
4573  // check_read_only("create_link");
4574  auto& cat = session_ptr->getCatalog();
4575 
4576  LinkDescriptor ld;
4577  ld.userId = session_ptr->get_currentUser().userId;
4578  ld.viewState = view_state;
4579  ld.viewMetadata = view_metadata;
4580 
4581  try {
4582  _return = cat.createLink(ld, 6);
4583  } catch (const std::exception& e) {
4584  THROW_MAPD_EXCEPTION(e.what());
4585  }
4586 }
4587 
4589  const std::string& name,
4590  const bool is_array) {
4591  TColumnType ct;
4592  ct.col_name = name;
4593  ct.col_type.type = type;
4594  ct.col_type.is_array = is_array;
4595  return ct;
4596 }
4597 
4598 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
4599  const import_export::CopyParams& copy_params) {
4600  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
4601  if (std::find(shp_ext.begin(),
4602  shp_ext.end(),
4603  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
4604  shp_ext.end()) {
4605  for (auto ext : shp_ext) {
4606  auto aux_file = file_path;
4608  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
4609  copy_params) &&
4611  aux_file.replace_extension(ext).string(), copy_params)) {
4612  throw std::runtime_error("required file for shapefile does not exist: " +
4613  aux_file.filename().string());
4614  }
4615  }
4616  }
4617 }
4618 
4619 void DBHandler::create_table(const TSessionId& session,
4620  const std::string& table_name,
4621  const TRowDescriptor& rd,
4622  const TFileType::type file_type,
4623  const TCreateParams& create_params) {
4624  auto stdlog = STDLOG("table_name", table_name);
4625  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4626  check_read_only("create_table");
4627 
4628  if (ImportHelpers::is_reserved_name(table_name)) {
4629  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
4630  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
4631  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
4632  }
4633 
4634  auto rds = rd;
4635 
4636  // no longer need to manually add the poly column for a TFileType::POLYGON table
4637  // a column of the correct geo type has already been added
4638  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
4639 
4640  std::string stmt{"CREATE TABLE " + table_name};
4641  std::vector<std::string> col_stmts;
4642 
4643  for (auto col : rds) {
4644  if (ImportHelpers::is_reserved_name(col.col_name)) {
4645  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
4646  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
4647  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
4648  }
4649  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
4650  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
4651  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
4652  " for column: " + col.col_name);
4653  }
4654 
4655  if (col.col_type.type == TDatumType::DECIMAL) {
4656  // if no precision or scale passed in set to default 14,7
4657  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
4658  col.col_type.precision = 14;
4659  col.col_type.scale = 7;
4660  }
4661  }
4662 
4663  std::string col_stmt;
4664  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
4665  if (col.__isset.default_value) {
4666  col_stmt.append(" DEFAULT " + col.default_value);
4667  }
4668 
4669  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
4670  // `nullable` argument, leading this to default to false. Uncomment for v2.
4671  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
4672 
4673  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
4674  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
4675  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
4676  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
4677  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
4678  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
4679  }
4680  } else if (col.col_type.type == TDatumType::STR) {
4681  // non DICT encoded strings
4682  col_stmt.append(" ENCODING NONE");
4683  } else if (col.col_type.type == TDatumType::POINT ||
4684  col.col_type.type == TDatumType::LINESTRING ||
4685  col.col_type.type == TDatumType::POLYGON ||
4686  col.col_type.type == TDatumType::MULTIPOLYGON) {
4687  // non encoded compressable geo
4688  if (col.col_type.scale == 4326) {
4689  col_stmt.append(" ENCODING NONE");
4690  }
4691  }
4692  col_stmts.push_back(col_stmt);
4693  }
4694 
4695  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
4696 
4697  if (create_params.is_replicated) {
4698  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
4699  }
4700 
4701  stmt.append(";");
4702 
4703  TQueryResult ret;
4704  sql_execute(ret, session, stmt, true, "", -1, -1);
4705 }
4706 
4707 void DBHandler::import_table(const TSessionId& session,
4708  const std::string& table_name,
4709  const std::string& file_name_in,
4710  const TCopyParams& cp) {
4711  try {
4712  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4713  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4714  auto session_ptr = stdlog.getConstSessionInfo();
4715  check_read_only("import_table");
4716  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
4717 
4718  auto& cat = session_ptr->getCatalog();
4720  auto start_time = ::toString(std::chrono::system_clock::now());
4722  executor->enrollQuerySession(session,
4723  "IMPORT_TABLE",
4724  start_time,
4726  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
4727  }
4728 
4729  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
4730  // reset the runtime query interrupt status
4732  executor->clearQuerySessionStatus(session, start_time);
4733  }
4734  };
4735  const auto td_with_lock =
4737  cat, table_name);
4738  const auto td = td_with_lock();
4739  CHECK(td);
4740  check_table_load_privileges(*session_ptr, table_name);
4741 
4742  std::string file_name{file_name_in};
4743  auto file_path = boost::filesystem::path(file_name);
4745  if (!boost::istarts_with(file_name, "s3://")) {
4746  if (!boost::filesystem::path(file_name).is_absolute()) {
4747  file_path = import_path_ / picosha2::hash256_hex_string(session) /
4748  boost::filesystem::path(file_name).filename();
4749  file_name = file_path.string();
4750  }
4751  if (!boost::filesystem::exists(file_path)) {
4752  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
4753  }
4754  }
4756 
4757  // TODO(andrew): add delimiter detection to Importer
4758  if (copy_params.delimiter == '\0') {
4759  copy_params.delimiter = ',';
4760  if (boost::filesystem::extension(file_path) == ".tsv") {
4761  copy_params.delimiter = '\t';
4762  }
4763  }
4764 
4765  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
4766  session_ptr->getCatalog(), table_name);
4767  std::unique_ptr<import_export::Importer> importer;
4768  if (leaf_aggregator_.leafCount() > 0) {
4769  importer.reset(new import_export::Importer(
4770  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4771  file_path.string(),
4772  copy_params));
4773  } else {
4774  importer.reset(
4775  new import_export::Importer(cat, td, file_path.string(), copy_params));
4776  }
4777  auto ms = measure<>::execution([&]() { importer->import(session_ptr.get()); });
4778  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
4779  } catch (const std::exception& e) {
4780  THROW_MAPD_EXCEPTION(std::string(e.what()));
4781  }
4782 }
4783 
4784 namespace {
4785 
4786 // helper functions for error checking below
4787 // these would usefully be added as methods of TDatumType
4788 // but that's not possible as it's auto-generated by Thrift
4789 
4791  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
4793 }
4794 
4795 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4796 
4797 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
4798  std::stringstream ss;
4799  ss << t;
4800  return ss.str();
4801 }
4802 
4803 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
4804  std::string result;
4805  switch (p) {
4806  case SQLTypes::kGEOGRAPHY:
4807  result = "GEOGRAPHY";
4808  break;
4809  case SQLTypes::kGEOMETRY:
4810  result = "GEOMETRY";
4811  break;
4812  default:
4813  result = "INVALID";
4814  break;
4815  }
4816  return result;
4817 }
4818 
4819 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
4820  std::stringstream ss;
4821  ss << t;
4822  return ss.str();
4823 }
4824 
4825 #endif
4826 
4827 } // namespace
4828 
4829 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
4830  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
4831  "' to table '" + table_name + "'. Column '" + cd->columnName + \
4832  "' " + attr + " mismatch (got '" + got + "', expected '" + \
4833  expected + "')");
4834 
4835 void DBHandler::import_geo_table(const TSessionId& session,
4836  const std::string& table_name,
4837  const std::string& file_name_in,
4838  const TCopyParams& cp,
4839  const TRowDescriptor& row_desc,
4840  const TCreateParams& create_params) {
4841  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4842  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4843  auto session_ptr = stdlog.getConstSessionInfo();
4844  check_read_only("import_table");
4845 
4846  auto& cat = session_ptr->getCatalog();
4848  auto start_time = ::toString(std::chrono::system_clock::now());
4850  executor->enrollQuerySession(session,
4851  "IMPORT_GEO_TABLE",
4852  start_time,
4854  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
4855  }
4856 
4857  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
4858  // reset the runtime query interrupt status
4860  executor->clearQuerySessionStatus(session, start_time);
4861  }
4862  };
4863 
4865 
4866  std::string file_name{file_name_in};
4867 
4868  if (path_is_relative(file_name)) {
4869  // assume relative paths are relative to data_path / mapd_import / <session>
4870  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4871  boost::filesystem::path(file_name).filename();
4872  file_name = file_path.string();
4873  }
4875 
4876  if (is_a_supported_geo_file(file_name, true)) {
4877  // prepare to load geo file directly
4878  add_vsi_network_prefix(file_name);
4879  add_vsi_geo_prefix(file_name);
4880  } else if (is_a_supported_archive_file(file_name)) {
4881  // find the archive file
4882  add_vsi_network_prefix(file_name);
4883  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4884  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4885  }
4886  // find geo file in archive
4887  add_vsi_archive_prefix(file_name);
4888  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4889  // prepare to load that geo file
4890  if (geo_file.size()) {
4891  file_name = file_name + std::string("/") + geo_file;
4892  }
4893  } else {
4894  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4895  file_name_in);
4896  }
4897 
4898  // log what we're about to try to do
4899  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
4900  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
4901 
4902  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
4903  auto file_path = boost::filesystem::path(file_name);
4904  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4905  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
4906  }
4907 
4908  // use GDAL to check any dependent files exist (ditto)
4909  try {
4910  check_geospatial_files(file_path, copy_params);
4911  } catch (const std::exception& e) {
4912  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4913  }
4914 
4915  // get layer info and deconstruct
4916  // in general, we will get a combination of layers of these four types:
4917  // EMPTY: no rows, report and skip
4918  // GEO: create a geo table from this
4919  // NON_GEO: create a regular table from this
4920  // UNSUPPORTED_GEO: report and skip
4921  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
4922  try {
4923  layer_info = import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4924  } catch (const std::exception& e) {
4925  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4926  }
4927 
4928  // categorize the results
4929  using LayerNameToContentsMap =
4930  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
4931  LayerNameToContentsMap load_layers;
4932  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
4933  for (const auto& layer : layer_info) {
4934  switch (layer.contents) {
4936  LOG(INFO) << "import_geo_table: '" << layer.name
4937  << "' (will import as geo table)";
4938  load_layers[layer.name] = layer.contents;
4939  break;
4941  LOG(INFO) << "import_geo_table: '" << layer.name
4942  << "' (will import as regular table)";
4943  load_layers[layer.name] = layer.contents;
4944  break;
4946  LOG(WARNING) << "import_geo_table: '" << layer.name
4947  << "' (will not import, unsupported geo type)";
4948  break;
4950  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
4951  break;
4952  default:
4953  break;
4954  }
4955  }
4956 
4957  // if nothing is loadable, stop now
4958  if (load_layers.size() == 0) {
4959  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
4960  }
4961 
4962  // if we've been given an explicit layer name, check that it exists and is loadable
4963  // scan the original list, as it may exist but not have been gathered as loadable
4964  if (copy_params.geo_layer_name.size()) {
4965  bool found = false;
4966  for (const auto& layer : layer_info) {
4967  if (copy_params.geo_layer_name == layer.name) {
4970  // forget all the other layers and just load this one
4971  load_layers.clear();
4972  load_layers[layer.name] = layer.contents;
4973  found = true;
4974  break;
4975  } else if (layer.contents ==
4977  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4978  copy_params.geo_layer_name +
4979  "' has unsupported geo type!");
4980  } else if (layer.contents ==
4982  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4983  copy_params.geo_layer_name + "' is empty!");
4984  }
4985  }
4986  }
4987  if (!found) {
4988  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4989  copy_params.geo_layer_name + "' not found!");
4990  }
4991  }
4992 
4993  // Immerse import of multiple layers is not yet supported
4994  // @TODO fix this!
4995  if (row_desc.size() > 0 && load_layers.size() > 1) {
4997  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
4998  }
4999 
5000  // one definition of layer table name construction
5001  // we append the layer name if we're loading more than one table
5002  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
5003  const std::string& layer_name) {
5004  if (load_layers.size() > 1) {
5005  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
5006  if (sanitized_layer_name != layer_name) {
5007  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
5008  << sanitized_layer_name << "' for table name";
5009  }
5010  return table_name + "_" + sanitized_layer_name;
5011  }
5012  return table_name;
5013  };
5014 
5015  // if we're importing multiple tables, then NONE of them must exist already
5016  if (load_layers.size() > 1) {
5017  for (const auto& layer : load_layers) {
5018  // construct table name
5019  auto this_table_name = construct_layer_table_name(table_name, layer.first);
5020 
5021  // table must not exist
5022  if (cat.getMetadataForTable(this_table_name)) {
5023  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
5024  "' already exists, aborting!");
5025  }
5026  }
5027  }
5028 
5029  // prepare to gather errors that would otherwise be exceptions, as we can only throw
5030  // one
5031  std::vector<std::string> caught_exception_messages;
5032 
5033  // prepare to time multi-layer import
5034  double total_import_ms = 0.0;
5035 
5036  // now we're safe to start importing
5037  // we loop over the layers we're going to attempt to load
5038  for (const auto& layer : load_layers) {
5039  // unpack
5040  const auto& layer_name = layer.first;
5041  const auto& layer_contents = layer.second;
5042  bool is_geo_layer =
5044 
5045  // construct table name again
5046  auto this_table_name = construct_layer_table_name(table_name, layer_name);
5047 
5048  // report
5049  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
5050 
5051  // we need a row descriptor
5052  TRowDescriptor rd;
5053  if (row_desc.size() > 0) {
5054  // we have a valid RowDescriptor
5055  // this is the case where Immerse has already detected and created
5056  // all we need to do is import and trust that the data will match
5057  // use the provided row descriptor
5058  // table must already exist (we check this below)
5059  rd = row_desc;
5060  } else {
5061  // we don't have a RowDescriptor
5062  // we have to detect the file ourselves
5063  TDetectResult cds;
5064  TCopyParams cp_copy = cp; // retain S3 auth tokens
5065  cp_copy.geo_layer_name = layer_name;
5066  cp_copy.file_type = TFileType::POLYGON;
5067  try {
5068  detect_column_types(cds, session, file_name_in, cp_copy);
5069  } catch (const std::exception& e) {
5070  // capture the error and abort this layer
5071  caught_exception_messages.emplace_back(
5072  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
5073  continue;
5074  }
5075  rd = cds.row_set.row_desc;
5076 
5077  // then, if the table does NOT already exist, create it
5078  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
5079  if (!td) {
5080  try {
5081  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
5082  } catch (const std::exception& e) {
5083  // capture the error and abort this layer
5084  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
5085  layer_name + "':" + e.what());
5086  continue;
5087  }
5088  }
5089  }
5090 
5091  // match locking sequence for CopyTableStmt::execute
5092  mapd_unique_lock<mapd_shared_mutex> execute_read_lock(
5095 
5096  const TableDescriptor* td{nullptr};
5097  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>> td_with_lock;
5098  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5099 
5100  try {
5101  td_with_lock =
5102  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>>(
5104  lockmgr::WriteLock>::acquireTableDescriptor(cat, this_table_name));
5105  td = (*td_with_lock)();
5106  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5108  } catch (const std::runtime_error& e) {
5109  // capture the error and abort this layer
5110  std::string exception_message =
5111  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
5112  this_table_name + "'; table does not exist or failed to create.";
5113  caught_exception_messages.emplace_back(exception_message);
5114  continue;
5115  }
5116  CHECK(td);
5117 
5118  // then, we have to verify that the structure matches
5119  // get column descriptors (non-system, non-deleted, logical columns only)
5120  const auto col_descriptors =
5121  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5122 
5123  // first, compare the column count
5124  if (col_descriptors.size() != rd.size()) {
5125  // capture the error and abort this layer
5126  std::string exception_message =
5127  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
5128  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
5129  ", expecting " + std::to_string(col_descriptors.size()) + ")";
5130  caught_exception_messages.emplace_back(exception_message);
5131  continue;
5132  }
5133 
5134  try {
5135  // then the names and types
5136  int rd_index = 0;
5137  for (auto cd : col_descriptors) {
5138  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
5139  std::string gname = rd[rd_index].col_name; // got
5140  std::string ename = cd->columnName; // expecting
5141 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
5142  TTypeInfo gti = rd[rd_index].col_type; // got
5143 #endif
5144  TTypeInfo eti = cd_col_type.col_type; // expecting
5145  // check for name match
5146  if (gname != ename) {
5147  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
5148  gname == OMNISCI_GEO_PREFIX) {
5149  // rename incoming geo column to match existing legacy default geo column
5150  rd[rd_index].col_name = gname;
5151  LOG(INFO)
5152  << "import_geo_table: Renaming incoming geo column to match existing "
5153  "legacy default geo column";
5154  } else {
5155  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
5156  }
5157  }
5158 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
5159  // check for type attributes match
5160  // these attrs must always match regardless of type
5161  if (gti.type != eti.type) {
5163  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
5164  }
5165  if (gti.is_array != eti.is_array) {
5167  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
5168  }
5169  if (gti.nullable != eti.nullable) {
5171  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
5172  }
5173  if (TTypeInfo_IsGeo(eti.type)) {
5174  // for geo, only these other attrs must also match
5175  // encoding and comp_param are allowed to differ
5176  // this allows appending to existing geo table
5177  // without needing to know the existing encoding
5178  if (gti.precision != eti.precision) {
5180  "geo sub-type",
5181  TTypeInfo_GeoSubTypeToString(gti.precision),
5182  TTypeInfo_GeoSubTypeToString(eti.precision));
5183  }
5184  if (gti.scale != eti.scale) {
5186  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
5187  }
5188  if (gti.encoding != eti.encoding) {
5189  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
5190  }
5191  if (gti.comp_param != eti.comp_param) {
5192  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
5193  }
5194  } else {
5195  // non-geo, all other attrs must also match
5196  // @TODO consider relaxing some of these dependent on type
5197  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
5198  if (gti.precision != eti.precision) {
5200  std::to_string(gti.precision),
5201  std::to_string(eti.precision));
5202  }
5203  if (gti.scale != eti.scale) {
5205  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
5206  }
5207  if (gti.encoding != eti.encoding) {
5209  "encoding",
5210  TTypeInfo_EncodingToString(gti.encoding),
5211  TTypeInfo_EncodingToString(eti.encoding));
5212  }
5213  if (gti.comp_param != eti.comp_param) {
5215  std::to_string(gti.comp_param),
5216  std::to_string(eti.comp_param));
5217  }
5218  }
5219 #endif
5220  rd_index++;
5221  }
5222  } catch (const std::exception& e) {
5223  // capture the error and abort this layer
5224  caught_exception_messages.emplace_back(e.what());
5225  continue;
5226  }
5227 
5228  std::map<std::string, std::string> colname_to_src;
5229  for (auto r : rd) {
5230  colname_to_src[r.col_name] =
5231  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
5232  }
5233 
5234  try {
5235  check_table_load_privileges(*session_ptr, this_table_name);
5236  } catch (const std::exception& e) {
5237  // capture the error and abort this layer
5238  caught_exception_messages.emplace_back(e.what());
5239  continue;
5240  }
5241 
5242  if (is_geo_layer) {
5243  // Final check to ensure that we have exactly one geo column
5244  // before doing the actual import, in case the user naively
5245  // overrode the types in Immerse Preview (which as of 6/17/21
5246  // it still allows you to do). We should make Immerse more
5247  // robust and disallow re-typing of columns to/from geo types
5248  // completely. Currently, if multiple columns are re-typed
5249  // such that there is still exactly one geo column (but it's
5250  // the wrong one) then this test will pass, but the import
5251  // will then reject some (or more likely all) of the rows.
5252  int num_geo_columns{0};
5253  for (auto const& col : rd) {
5254  if (TTypeInfo_IsGeo(col.col_type.type)) {
5255  num_geo_columns++;
5256  }
5257  }
5258  if (num_geo_columns != 1) {
5259  std::string exception_message =
5260  "Table '" + this_table_name +
5261  "' must have exactly one geo column. Import aborted!";
5262  caught_exception_messages.emplace_back(exception_message);
5263  continue;
5264  }
5265  }
5266 
5267  try {
5268  // import this layer only?
5269  copy_params.geo_layer_name = layer_name;
5270 
5271  // create an importer
5272  std::unique_ptr<import_export::Importer> importer;
5273  if (leaf_aggregator_.leafCount() > 0) {
5274  importer.reset(new import_export::Importer(
5275  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
5276  file_path.string(),
5277  copy_params));
5278  } else {
5279  importer.reset(
5280  new import_export::Importer(cat, td, file_path.string(), copy_params));
5281  }
5282 
5283  // import
5284  auto ms = measure<>::execution(
5285  [&]() { importer->importGDAL(colname_to_src, session_ptr.get()); });
5286  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
5287  << "s";
5288  total_import_ms += ms;
5289  } catch (const std::exception& e) {
5290  std::string exception_message =
5291  "Import of Layer '" + this_table_name + "' failed: " + e.what();
5292  caught_exception_messages.emplace_back(exception_message);
5293  continue;
5294  }
5295  }
5296 
5297  // did we catch any exceptions?
5298  if (caught_exception_messages.size()) {
5299  // combine all the strings into one and throw a single Thrift exception
5300  std::string combined_exception_message = "Failed to import geo file:\n";
5301  for (const auto& message : caught_exception_messages) {
5302  combined_exception_message += message + "\n";
5303  }
5304  THROW_MAPD_EXCEPTION(combined_exception_message);
5305  } else {
5306  // report success and total time
5307  LOG(INFO) << "Import Successful!";
5308  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
5309  }
5310 }
5311 
5312 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
5313 
5314 void DBHandler::import_table_status(TImportStatus& _return,
5315  const TSessionId& session,
5316  const std::string& import_id) {
5317  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
5318  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5319  auto is = import_export::Importer::get_import_status(import_id);
5320  _return.elapsed = is.elapsed.count();
5321  _return.rows_completed = is.rows_completed;
5322  _return.rows_estimated = is.rows_estimated;
5323  _return.rows_rejected = is.rows_rejected;
5324 }
5325 
5327  const TSessionId& session,
5328  const std::string& archive_path_in,
5329  const TCopyParams& copy_params) {
5330  auto stdlog =
5331  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
5332  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5333 
5334  std::string archive_path(archive_path_in);
5335 
5336  if (path_is_relative(archive_path)) {
5337  // assume relative paths are relative to data_path / mapd_import / <session>
5338  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5339  boost::filesystem::path(archive_path).filename();
5340  archive_path = file_path.string();
5341  }
5342  validate_import_file_path_if_local(archive_path);
5343 
5344  if (is_a_supported_archive_file(archive_path)) {
5345  // find the archive file
5346  add_vsi_network_prefix(archive_path);
5347  if (!import_export::Importer::gdalFileExists(archive_path,
5348  thrift_to_copyparams(copy_params))) {
5349  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
5350  }
5351  // find geo file in archive
5352  add_vsi_archive_prefix(archive_path);
5353  std::string geo_file =
5354  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
5355  // what did we get?
5356  if (geo_file.size()) {
5357  // prepend it with the original path
5358  _return = archive_path_in + std::string("/") + geo_file;
5359  } else {
5360  // just return the original path
5361  _return = archive_path_in;
5362  }
5363  } else {
5364  // just return the original path
5365  _return = archive_path_in;
5366  }
5367 }
5368 
5369 void DBHandler::get_all_files_in_archive(std::vector<std::string>& _return,
5370  const TSessionId& session,
5371  const std::string& archive_path_in,
5372  const TCopyParams& copy_params) {
5373  auto stdlog =
5374  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
5375  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5376 
5377  std::string archive_path(archive_path_in);
5378  if (path_is_relative(archive_path)) {
5379  // assume relative paths are relative to data_path / mapd_import / <session>
5380  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5381  boost::filesystem::path(archive_path).filename();
5382  archive_path = file_path.string();
5383  }
5384  validate_import_file_path_if_local(archive_path);
5385 
5386  if (is_a_supported_archive_file(archive_path)) {
5387  // find the archive file
5388  add_vsi_network_prefix(archive_path);
5389  if (!import_export::Importer::gdalFileExists(archive_path,
5390  thrift_to_copyparams(copy_params))) {
5391  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
5392  }
5393  // find all files in archive
5394  add_vsi_archive_prefix(archive_path);
5396  archive_path, thrift_to_copyparams(copy_params));
5397  // prepend them all with original path
5398  for (auto& s : _return) {
5399  s = archive_path_in + '/' + s;
5400  }
5401  }
5402 }
5403 
5404 void DBHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
5405  const TSessionId& session,
5406  const std::string& file_name_in,
5407  const TCopyParams& cp) {
5408  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
5409  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5410 
5411  std::string file_name(file_name_in);
5412 
5414 
5415  // handle relative paths
5416  if (path_is_relative(file_name)) {
5417  // assume relative paths are relative to data_path / mapd_import / <session>
5418  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5419  boost::filesystem::path(file_name).filename();
5420  file_name = file_path.string();
5421  }
5423 
5424  // validate file_name
5425  if (is_a_supported_geo_file(file_name, true)) {
5426  // prepare to load geo file directly
5427  add_vsi_network_prefix(file_name);
5428  add_vsi_geo_prefix(file_name);
5429  } else if (is_a_supported_archive_file(file_name)) {
5430  // find the archive file
5431  add_vsi_network_prefix(file_name);
5432  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
5433  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
5434  }
5435  // find geo file in archive
5436  add_vsi_archive_prefix(file_name);
5437  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
5438  // prepare to load that geo file
5439  if (geo_file.size()) {
5440  file_name = file_name + std::string("/") + geo_file;
5441  }
5442  } else {
5443  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
5444  file_name_in);
5445  }
5446 
5447  // check the file actually exists
5448  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
5449  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
5450  }
5451 
5452  // find all layers
5453  auto internal_layer_info =
5454  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
5455 
5456  // convert to Thrift type
5457  for (const auto& internal_layer : internal_layer_info) {
5458  TGeoFileLayerInfo layer;
5459  layer.name = internal_layer.name;
5460  switch (internal_layer.contents) {
5462  layer.contents = TGeoFileLayerContents::EMPTY;
5463  break;
5465  layer.contents = TGeoFileLayerContents::GEO;
5466  break;
5468  layer.contents = TGeoFileLayerContents::NON_GEO;
5469  break;
5471  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
5472  break;
5473  default:
5474  CHECK(false);
5475  }
5476  _return.emplace_back(layer); // no suitable constructor to just pass parameters
5477  }
5478 }
5479 
5480 void DBHandler::start_heap_profile(const TSessionId& session) {
5481  auto stdlog = STDLOG(get_session_ptr(session));
5482 #ifdef HAVE_PROFILER
5483  if (IsHeapProfilerRunning()) {
5484  THROW_MAPD_EXCEPTION("Profiler already started");
5485  }
5486  HeapProfilerStart("omnisci");
5487 #else
5488  THROW_MAPD_EXCEPTION("Profiler not enabled");
5489 #endif // HAVE_PROFILER
5490 }
5491 
5492 void DBHandler::stop_heap_profile(const TSessionId& session) {
5493  auto stdlog = STDLOG(get_session_ptr(session));
5494 #ifdef HAVE_PROFILER
5495  if (!IsHeapProfilerRunning()) {
5496  THROW_MAPD_EXCEPTION("Profiler not running");
5497  }
5498  HeapProfilerStop();
5499 #else
5500  THROW_MAPD_EXCEPTION("Profiler not enabled");
5501 #endif // HAVE_PROFILER
5502 }
5503 
5504 void DBHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
5505  auto stdlog = STDLOG(get_session_ptr(session));
5506 #ifdef HAVE_PROFILER
5507  if (!IsHeapProfilerRunning()) {
5508  THROW_MAPD_EXCEPTION("Profiler not running");
5509  }
5510  auto profile_buff = GetHeapProfile();
5511  profile = profile_buff;
5512  free(profile_buff);
5513 #else
5514  THROW_MAPD_EXCEPTION("Profiler not enabled");
5515 #endif // HAVE_PROFILER
5516 }
5517 
5518 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
5519 void DBHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
5520  if (session_it->second.use_count() > 2 ||
5521  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
5522  // SessionInfo is being used in more than one active operation. Original copy + one
5523  // stored in StdLog. Skip the checks.
5524  return;
5525  }
5526  time_t last_used_time = session_it->second->get_last_used_time();
5527  time_t start_time = session_it->second->get_start_time();
5528  const auto current_session_duration = time(0) - last_used_time;
5529  if (current_session_duration > idle_session_duration_) {
5530  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
5531  << " idle duration " << current_session_duration
5532  << " seconds exceeds maximum idle duration " << idle_session_duration_
5533  << " seconds. Invalidating session.";
5534  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
5535  }
5536  const auto total_session_duration = time(0) - start_time;
5537  if (total_session_duration > max_session_duration_) {
5538  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
5539  << " total duration " << total_session_duration
5540  << " seconds exceeds maximum total session duration "
5541  << max_session_duration_ << " seconds. Invalidating session.";
5542  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
5543  }
5544 }
5545 
5546 std::shared_ptr<const Catalog_Namespace::SessionInfo> DBHandler::get_const_session_ptr(
5547  const TSessionId& session) {
5548  return get_session_ptr(session);
5549 }
5550