OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: DBHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "DBHandler.h"
25 #include "DistributedLoader.h"
27 #include "TokenCompletionHints.h"
28 
29 #ifdef HAVE_PROFILER
30 #include <gperftools/heap-profiler.h>
31 #endif // HAVE_PROFILER
32 
33 #include "MapDRelease.h"
34 
35 #include "Calcite/Calcite.h"
36 #include "gen-cpp/CalciteServer.h"
37 
39 
40 #include "Catalog/Catalog.h"
44 #include "DistributedHandler.h"
46 #include "Geospatial/GDAL.h"
47 #include "Geospatial/Transforms.h"
48 #include "Geospatial/Types.h"
49 #include "ImportExport/Importer.h"
50 #include "LockMgr/LockMgr.h"
52 #include "Parser/ParserWrapper.h"
54 #include "Parser/parser.h"
57 #include "QueryEngine/Execute.h"
67 #include "Shared/ArrowUtil.h"
68 #include "Shared/StringTransform.h"
69 #include "Shared/import_helpers.h"
71 #include "Shared/measure.h"
72 #include "Shared/scope.h"
73 
74 #include <fcntl.h>
75 #include <picosha2.h>
76 #include <sys/types.h>
77 #include <algorithm>
78 #include <boost/algorithm/string.hpp>
79 #include <boost/filesystem.hpp>
80 #include <boost/make_shared.hpp>
81 #include <boost/process/search_path.hpp>
82 #include <boost/program_options.hpp>
83 #include <boost/regex.hpp>
84 #include <boost/tokenizer.hpp>
85 #include <cmath>
86 #include <csignal>
87 #include <fstream>
88 #include <future>
89 #include <map>
90 #include <memory>
91 #include <random>
92 #include <regex>
93 #include <string>
94 #include <thread>
95 #include <typeinfo>
96 
97 #include <arrow/api.h>
98 #include <arrow/io/api.h>
99 #include <arrow/ipc/api.h>
100 
101 #include "Shared/ArrowUtil.h"
102 
103 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
104 
107 
108 #define INVALID_SESSION_ID ""
109 
110 #define THROW_MAPD_EXCEPTION(errstr) \
111  { \
112  TOmniSciException ex; \
113  ex.error_msg = errstr; \
114  LOG(ERROR) << ex.error_msg; \
115  throw ex; \
116  }
117 
118 thread_local std::string TrackingProcessor::client_address;
120 
121 namespace {
122 
123 SessionMap::iterator get_session_from_map(const TSessionId& session,
124  SessionMap& session_map) {
125  auto session_it = session_map.find(session);
126  if (session_it == session_map.end()) {
127  THROW_MAPD_EXCEPTION("Session not valid.");
128  }
129  return session_it;
130 }
131 
132 struct ForceDisconnect : public std::runtime_error {
133  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
134 };
135 
136 } // namespace
137 
138 template <>
139 SessionMap::iterator DBHandler::get_session_it_unsafe(
140  const TSessionId& session,
141  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
142  auto session_it = get_session_from_map(session, sessions_);
143  try {
144  check_session_exp_unsafe(session_it);
145  } catch (const ForceDisconnect& e) {
146  read_lock.unlock();
147  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
148  auto session_it2 = get_session_from_map(session, sessions_);
149  disconnect_impl(session_it2, write_lock);
150  THROW_MAPD_EXCEPTION(e.what());
151  }
152  return session_it;
153 }
154 
155 template <>
156 SessionMap::iterator DBHandler::get_session_it_unsafe(
157  const TSessionId& session,
158  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
159  auto session_it = get_session_from_map(session, sessions_);
160  try {
161  check_session_exp_unsafe(session_it);
162  } catch (const ForceDisconnect& e) {
163  disconnect_impl(session_it, write_lock);
164  THROW_MAPD_EXCEPTION(e.what());
165  }
166  return session_it;
167 }
168 
169 template <>
171  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
172  std::vector<std::string> expired_sessions;
173  for (auto session_pair : sessions_) {
174  auto session_it = get_session_from_map(session_pair.first, sessions_);
175  try {
176  check_session_exp_unsafe(session_it);
177  } catch (const ForceDisconnect& e) {
178  expired_sessions.emplace_back(session_it->second->get_session_id());
179  }
180  }
181 
182  for (auto session_id : expired_sessions) {
183  if (leaf_aggregator_.leafCount() > 0) {
184  try {
185  leaf_aggregator_.disconnect(session_id);
186  } catch (TOmniSciException& toe) {
187  LOG(INFO) << " Problem disconnecting from leaves : " << toe.what();
188  } catch (std::exception& e) {
189  LOG(INFO)
190  << " Problem disconnecting from leaves, check leaf logs for additonal info";
191  }
192  }
193  sessions_.erase(session_id);
194  }
195  if (render_handler_) {
196  write_lock.unlock();
197  for (auto session_id : expired_sessions) {
198  // NOTE: the render disconnect is done after the session lock is released to
199  // avoid a deadlock. See: https://omnisci.atlassian.net/browse/BE-3324
200  // This out-of-scope solution is a compromise for now until a better session
201  // handling/locking mechanism is developed for the renderer. Note as well that the
202  // session_id cannot be immediately reused. If a render request were to slip in
203  // after the lock is released and before the render disconnect could cause a
204  // problem.
205  render_handler_->disconnect(session_id);
206  }
207  }
208 }
209 
210 #ifdef ENABLE_GEOS
211 extern std::unique_ptr<std::string> g_libgeos_so_filename;
212 #endif
213 
214 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
215  const std::vector<LeafHostInfo>& string_leaves,
216  const std::string& base_data_path,
217  const bool allow_multifrag,
218  const bool jit_debug,
219  const bool intel_jit_profile,
220  const bool read_only,
221  const bool allow_loop_joins,
222  const bool enable_rendering,
223  const bool renderer_use_vulkan_driver,
224  const bool enable_auto_clear_render_mem,
225  const int render_oom_retry_threshold,
226  const size_t render_mem_bytes,
227  const size_t max_concurrent_render_sessions,
228  const size_t reserved_gpu_mem,
229  const bool render_compositor_use_last_gpu,
230  const size_t num_reader_threads,
231  const AuthMetadata& authMetadata,
232  SystemParameters& system_parameters,
233  const bool legacy_syntax,
234  const int idle_session_duration,
235  const int max_session_duration,
236  const bool enable_runtime_udf_registration,
237  const std::string& udf_filename,
238  const std::string& clang_path,
239  const std::vector<std::string>& clang_options,
240 #ifdef ENABLE_GEOS
241  const std::string& libgeos_so_filename,
242 #endif
243  const DiskCacheConfig& disk_cache_config,
244  const bool is_new_db)
245  : leaf_aggregator_(db_leaves)
246  , db_leaves_(db_leaves)
247  , string_leaves_(string_leaves)
248  , base_data_path_(base_data_path)
249  , random_gen_(std::random_device{}())
250  , session_id_dist_(0, INT32_MAX)
251  , jit_debug_(jit_debug)
252  , intel_jit_profile_(intel_jit_profile)
253  , allow_multifrag_(allow_multifrag)
254  , read_only_(read_only)
255  , allow_loop_joins_(allow_loop_joins)
256  , authMetadata_(authMetadata)
257  , system_parameters_(system_parameters)
258  , legacy_syntax_(legacy_syntax)
259  , dispatch_queue_(
260  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
261  , super_user_rights_(false)
262  , idle_session_duration_(idle_session_duration * 60)
263  , max_session_duration_(max_session_duration * 60)
264  , runtime_udf_registration_enabled_(enable_runtime_udf_registration)
265 
266  , enable_rendering_(enable_rendering)
267  , renderer_use_vulkan_driver_(renderer_use_vulkan_driver)
268  , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
269  , render_oom_retry_threshold_(render_oom_retry_threshold)
270  , max_concurrent_render_sessions_(max_concurrent_render_sessions)
271  , reserved_gpu_mem_(reserved_gpu_mem)
272  , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
273  , render_mem_bytes_(render_mem_bytes)
274  , num_reader_threads_(num_reader_threads)
275 #ifdef ENABLE_GEOS
276  , libgeos_so_filename_(libgeos_so_filename)
277 #endif
278  , disk_cache_config_(disk_cache_config)
279  , udf_filename_(udf_filename)
280  , clang_path_(clang_path)
281  , clang_options_(clang_options)
282 
283 {
284  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
285  initialize(is_new_db);
286 }
287 
288 void DBHandler::initialize(const bool is_new_db) {
289  if (!initialized_) {
290  initialized_ = true;
291  } else {
293  "Server already initialized; service restart required to activate any new "
294  "entitlements.");
295  return;
296  }
297 
300  cpu_mode_only_ = true;
301  } else {
302 #ifdef HAVE_CUDA
304  cpu_mode_only_ = false;
305 #else
307  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
308  cpu_mode_only_ = true;
309 #endif
310  }
311 
312  bool is_rendering_enabled = enable_rendering_;
313  if (system_parameters_.num_gpus == 0) {
314  is_rendering_enabled = false;
315  }
316 
317  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
318  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
319  // manage cannot ask for
320  size_t total_reserved = reserved_gpu_mem_;
321  if (is_rendering_enabled) {
322  total_reserved += render_mem_bytes_;
323  }
324 
325  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
326 #ifdef HAVE_CUDA
327  if (!cpu_mode_only_ || is_rendering_enabled) {
328  try {
329  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
331  } catch (const std::exception& e) {
332  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
333  << e.what();
334  cpu_mode_only_ = true;
335  is_rendering_enabled = false;
336  }
337  }
338 #endif // HAVE_CUDA
339 
340  try {
341  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
343  std::move(cuda_mgr),
345  total_reserved,
348  } catch (const std::exception& e) {
349  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
350  }
351 
352  std::string udf_ast_filename("");
353 
354  try {
355  if (!udf_filename_.empty()) {
356  const auto cuda_mgr = data_mgr_->getCudaMgr();
357  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
358  cuda_mgr ? cuda_mgr->getDeviceArch()
360  UdfCompiler compiler(udf_filename_, device_arch, clang_path_, clang_options_);
361  int compile_result = compiler.compileUdf();
362 
363  if (compile_result == 0) {
364  udf_ast_filename = compiler.getAstFileName();
365  }
366  }
367  } catch (const std::exception& e) {
368  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
369  }
370 
371  try {
372  calcite_ =
373  std::make_shared<Calcite>(system_parameters_, base_data_path_, udf_ast_filename);
374  } catch (const std::exception& e) {
375  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
376  }
377 
378  try {
379  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
380  if (!udf_filename_.empty()) {
381  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
382  }
383  } catch (const std::exception& e) {
384  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
385  }
386 
387  try {
389  } catch (const std::exception& e) {
390  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
391  }
392 
393  try {
394  auto udtfs = ThriftSerializers::to_thrift(
396  std::vector<TUserDefinedFunction> udfs = {};
397  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
398  } catch (const std::exception& e) {
399  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
400  }
401 
402  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
404  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
405  cpu_mode_only_ = true;
406  }
407 
408  switch (executor_device_type_) {
410  LOG(INFO) << "Started in GPU mode" << std::endl;
411  break;
413  LOG(INFO) << "Started in CPU mode" << std::endl;
414  break;
415  }
416 
417  try {
419  SysCatalog::instance().init(base_data_path_,
420  data_mgr_,
422  calcite_,
423  is_new_db,
424  !db_leaves_.empty(),
426  } catch (const std::exception& e) {
427  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
428  }
429 
430  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
431  start_time_ = std::time(nullptr);
432 
433  if (is_rendering_enabled) {
434  try {
435  render_handler_.reset(new RenderHandler(this,
439  false,
440  0,
441  false,
443  } catch (const std::exception& e) {
444  LOG(ERROR) << "Backend rendering disabled: " << e.what();
445  }
446  }
447 
448  if (leaf_aggregator_.leafCount() > 0) {
449  try {
450  agg_handler_.reset(new MapDAggHandler(this));
451  } catch (const std::exception& e) {
452  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
453  }
454  } else if (g_cluster) {
455  try {
456  leaf_handler_.reset(new MapDLeafHandler(this));
457  } catch (const std::exception& e) {
458  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
459  }
460  }
461 
462 #ifdef ENABLE_GEOS
463  if (!libgeos_so_filename_.empty()) {
464  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename_));
465  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
466  }
467 #endif
468 }
469 
471 
473  const std::string& query_str,
474  std::list<std::unique_ptr<Parser::Stmt>>& parse_trees) {
475  int num_parse_errors = 0;
476  std::string last_parsed;
477  SQLParser parser;
478  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
479  if (num_parse_errors > 0) {
480  throw std::runtime_error("Syntax error at: " + last_parsed);
481  }
482  if (parse_trees.size() > 1) {
483  throw std::runtime_error("multiple SQL statements not allowed");
484  } else if (parse_trees.size() != 1) {
485  throw std::runtime_error("empty SQL statment not allowed");
486  }
487 }
488 void DBHandler::check_read_only(const std::string& str) {
489  if (DBHandler::read_only_) {
490  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
491  }
492 }
493 
495  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
496  // We would create an in memory session for calcite with super user privileges which
497  // would be used for getting all tables metadata when a user runs the query. The
498  // session would be under the name of a proxy user/password which would only persist
499  // till server's lifetime or execution of calcite query(in memory) whichever is the
500  // earliest.
501  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
502  std::string session_id;
503  do {
504  session_id = generate_random_string(64);
505  } while (sessions_.find(session_id) != sessions_.end());
506  Catalog_Namespace::UserMetadata user_meta(-1,
507  calcite_->getInternalSessionProxyUserName(),
508  calcite_->getInternalSessionProxyPassword(),
509  true,
510  -1,
511  true);
512  const auto emplace_ret =
513  sessions_.emplace(session_id,
514  std::make_shared<Catalog_Namespace::SessionInfo>(
515  catalog_ptr, user_meta, executor_device_type_, session_id));
516  CHECK(emplace_ret.second);
517  return session_id;
518 }
519 
521  const Catalog_Namespace::UserMetadata user_meta) {
522  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
523  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
524  user_meta.isSuper.load();
525 }
526 
527 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
528  // Remove InMemory calcite Session.
529  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
530  const auto it = sessions_.find(session_id);
531  CHECK(it != sessions_.end());
532  sessions_.erase(it);
533 }
534 
535 // internal connection for connections with no password
536 void DBHandler::internal_connect(TSessionId& session,
537  const std::string& username,
538  const std::string& dbname) {
539  auto stdlog = STDLOG(); // session_info set by connect_impl()
540  std::string username2 = username; // login() may reset username given as argument
541  std::string dbname2 = dbname; // login() may reset dbname given as argument
543  std::shared_ptr<Catalog> cat = nullptr;
544  try {
545  cat =
546  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
547  } catch (std::exception& e) {
548  THROW_MAPD_EXCEPTION(e.what());
549  }
550 
551  DBObject dbObject(dbname2, DatabaseDBObjectType);
552  dbObject.loadKey(*cat);
554  std::vector<DBObject> dbObjects;
555  dbObjects.push_back(dbObject);
556  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
557  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
558  " is not allowed to access database " + dbname2 + ".");
559  }
560  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
561 }
562 
564  return leaf_aggregator_.leafCount() > 0;
565 }
566 
567 void DBHandler::krb5_connect(TKrb5Session& session,
568  const std::string& inputToken,
569  const std::string& dbname) {
570  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
571 }
572 
573 void DBHandler::connect(TSessionId& session,
574  const std::string& username,
575  const std::string& passwd,
576  const std::string& dbname) {
577  auto stdlog = STDLOG(); // session_info set by connect_impl()
578  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
579  std::string username2 = username; // login() may reset username given as argument
580  std::string dbname2 = dbname; // login() may reset dbname given as argument
582  std::shared_ptr<Catalog> cat = nullptr;
583  try {
584  cat = SysCatalog::instance().login(
585  dbname2, username2, passwd, user_meta, !super_user_rights_);
586  } catch (std::exception& e) {
587  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
588  THROW_MAPD_EXCEPTION(e.what());
589  }
590 
591  DBObject dbObject(dbname2, DatabaseDBObjectType);
592  dbObject.loadKey(*cat);
594  std::vector<DBObject> dbObjects;
595  dbObjects.push_back(dbObject);
596  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
597  stdlog.appendNameValuePairs(
598  "user", username, "db", dbname, "exception", "Missing Privileges");
599  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
600  " is not allowed to access database " + dbname2 + ".");
601  }
602  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
603 
604  // Restriction is returned as part of the users metadata on login but
605  // is per session so transfering it over here
606  // Currently only SAML can even set a Restriction
607  auto restriction = std::make_shared<Restriction>(user_meta.restriction);
608  auto login_session = get_session_ptr(session);
609  login_session->set_restriction(restriction);
610 
611  // if pki auth session will come back encrypted with user pubkey
612  SysCatalog::instance().check_for_session_encryption(passwd, session);
613 }
614 
615 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::create_new_session(
616  TSessionId& session,
617  const std::string& dbname,
618  const Catalog_Namespace::UserMetadata& user_meta,
619  std::shared_ptr<Catalog> cat) {
620  do {
621  session = generate_random_string(32);
622  } while (sessions_.find(session) != sessions_.end());
623  std::pair<SessionMap::iterator, bool> emplace_retval =
624  sessions_.emplace(session,
625  std::make_shared<Catalog_Namespace::SessionInfo>(
626  cat, user_meta, executor_device_type_, session));
627  CHECK(emplace_retval.second);
628  auto& session_ptr = emplace_retval.first->second;
629  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
630  return session_ptr;
631 }
632 
633 void DBHandler::connect_impl(TSessionId& session,
634  const std::string& passwd,
635  const std::string& dbname,
636  const Catalog_Namespace::UserMetadata& user_meta,
637  std::shared_ptr<Catalog> cat,
638  query_state::StdLog& stdlog) {
639  // TODO(sy): Is there any reason to have dbname as a parameter
640  // here when the cat parameter already provides cat->name()?
641  // Should dbname and cat->name() ever differ?
642  {
643  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
644  expire_idle_sessions_unsafe(write_lock);
646  sessions_.size() + 1 > static_cast<size_t>(system_parameters_.num_sessions)) {
647  THROW_MAPD_EXCEPTION("Too many active sessions");
648  }
649  }
650  {
651  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
652  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
653  stdlog.setSessionInfo(session_ptr);
654  session_ptr->set_connection_info(getConnectionInfo().toString());
655  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
656  // while doing warmup
657  if (leaf_aggregator_.leafCount() > 0) {
658  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
659  return;
660  }
661  }
662  }
663  auto const roles =
664  stdlog.getConstSessionInfo()->get_currentUser().isSuper
665  ? std::vector<std::string>{{"super"}}
666  : SysCatalog::instance().getRoles(
667  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
668  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
669 }
670 
671 void DBHandler::disconnect(const TSessionId& session) {
672  auto stdlog = STDLOG();
673  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
674 
675  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
676  auto session_it = get_session_it_unsafe(session, write_lock);
677  stdlog.setSessionInfo(session_it->second);
678  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
679 
680  LOG(INFO) << "User " << session_it->second->get_currentUser().userLoggable()
681  << " disconnected from database " << dbname
682  << " with public_session_id: " << session_it->second->get_public_session_id();
683 
684  disconnect_impl(session_it, write_lock);
685 }
686 
687 void DBHandler::disconnect_impl(const SessionMap::iterator& session_it,
688  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
689  // session_it existence should already have been checked (i.e. called via
690  // get_session_it_unsafe(...))
691 
692  const auto session_id = session_it->second->get_session_id();
693  std::exception_ptr leaf_exception = nullptr;
694  try {
695  if (leaf_aggregator_.leafCount() > 0) {
696  leaf_aggregator_.disconnect(session_id);
697  }
698  } catch (...) {
699  leaf_exception = std::current_exception();
700  }
701 
702  {
703  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
704  render_group_assignment_map_.erase(session_id);
705  }
706 
707  sessions_.erase(session_it);
708  write_lock.unlock();
709 
710  if (render_handler_) {
711  render_handler_->disconnect(session_id);
712  }
713 }
714 
715 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
716  auto stdlog = STDLOG(get_session_ptr(session));
717  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
718  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
719  auto session_it = get_session_it_unsafe(session, write_lock);
720 
721  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
722 
723  try {
724  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
725  dbname2, session_it->second->get_currentUser().userName);
726  session_it->second->set_catalog_ptr(cat);
727  if (leaf_aggregator_.leafCount() > 0) {
728  leaf_aggregator_.switch_database(session, dbname);
729  return;
730  }
731  } catch (std::exception& e) {
732  THROW_MAPD_EXCEPTION(e.what());
733  }
734 }
735 
736 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
737  auto stdlog = STDLOG(get_session_ptr(session1));
738  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
739  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
740  auto session_it = get_session_it_unsafe(session1, write_lock);
741 
742  try {
743  const Catalog_Namespace::UserMetadata& user_meta =
744  session_it->second->get_currentUser();
745  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
746  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
747  if (leaf_aggregator_.leafCount() > 0) {
748  leaf_aggregator_.clone_session(session1, session2);
749  return;
750  }
751  } catch (std::exception& e) {
752  THROW_MAPD_EXCEPTION(e.what());
753  }
754 }
755 
756 void DBHandler::interrupt(const TSessionId& query_session,
757  const TSessionId& interrupt_session) {
758  // if this is for distributed setting, query_session becomes a parent session (agg)
759  // and the interrupt session is one of existing session in the leaf node (leaf)
760  // so we can think there exists a logical mapping
761  // between query_session (agg) and interrupt_session (leaf)
762  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
763  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
764  const auto allow_query_interrupt =
766  if (g_enable_dynamic_watchdog || allow_query_interrupt) {
767  // Shared lock to allow simultaneous interrupts of multiple sessions
768  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
769 
770  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
771  auto& cat = session_it->second.get()->getCatalog();
772  const auto dbname = cat.getCurrentDB().dbName;
774  jit_debug_ ? "/tmp" : "",
775  jit_debug_ ? "mapdquery" : "",
777  CHECK(executor);
778 
779  VLOG(1) << "Received interrupt: "
780  << "Session " << *session_it->second << ", Executor " << executor
781  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
782  << session_it->second->get_currentUser().userLoggable() << ", Database "
783  << dbname << std::endl;
784 
785  if (leaf_aggregator_.leafCount() > 0) {
786  leaf_aggregator_.interrupt(query_session, interrupt_session);
787  }
788  executor->interrupt(query_session, interrupt_session);
789 
790  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
791  << " interrupted session with database " << dbname << std::endl;
792  }
793 }
794 
795 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
796  auto stdlog = STDLOG(get_session_ptr(session));
797  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
798  const auto rendering_enabled = bool(render_handler_);
799  _return.read_only = read_only_;
800  _return.version = MAPD_RELEASE;
801  _return.rendering_enabled = rendering_enabled;
802  _return.poly_rendering_enabled = rendering_enabled;
803  _return.start_time = start_time_;
804  _return.edition = MAPD_EDITION;
805  _return.host_name = omnisci::get_hostname();
806 }
807 
808 void DBHandler::get_status(std::vector<TServerStatus>& _return,
809  const TSessionId& session) {
810  auto stdlog = STDLOG(get_session_ptr(session));
811  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
812  const auto rendering_enabled = bool(render_handler_);
813  TServerStatus ret;
814  ret.read_only = read_only_;
815  ret.version = MAPD_RELEASE;
816  ret.rendering_enabled = rendering_enabled;
817  ret.poly_rendering_enabled = rendering_enabled;
818  ret.start_time = start_time_;
819  ret.edition = MAPD_EDITION;
820  ret.host_name = omnisci::get_hostname();
821 
822  // TSercivePort tcp_port{}
823 
824  if (g_cluster) {
825  ret.role =
826  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
827  } else {
828  ret.role = TRole::type::SERVER;
829  }
830 
831  _return.push_back(ret);
832  if (leaf_aggregator_.leafCount() > 0) {
833  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
834  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
835  }
836 }
837 
838 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
839  const TSessionId& session) {
840  auto stdlog = STDLOG(get_session_ptr(session));
841  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
842  THardwareInfo ret;
843  const auto cuda_mgr = data_mgr_->getCudaMgr();
844  if (cuda_mgr) {
845  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
846  ret.start_gpu = cuda_mgr->getStartGpu();
847  if (ret.start_gpu >= 0) {
848  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
849  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
850  }
851  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
852  TGpuSpecification gpu_spec;
853  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
854  gpu_spec.num_sm = deviceProperties->numMPs;
855  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
856  gpu_spec.memory = deviceProperties->globalMem;
857  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
858  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
859  ret.gpu_info.push_back(gpu_spec);
860  }
861  }
862 
863  // start hardware/OS dependent code
864  ret.num_cpu_hw = std::thread::hardware_concurrency();
865  // ^ This might return diffrent results in case of hyper threading
866  // end hardware/OS dependent code
867 
868  _return.hardware_info.push_back(ret);
869  if (leaf_aggregator_.leafCount() > 0) {
870  ret.host_name = "aggregator";
871  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
872  _return.hardware_info.insert(_return.hardware_info.end(),
873  leaf_hardware.hardware_info.begin(),
874  leaf_hardware.hardware_info.end());
875  }
876 }
877 
878 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
879  auto session_ptr = get_session_ptr(session);
880  auto stdlog = STDLOG(session_ptr);
881  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
882  auto user_metadata = session_ptr->get_currentUser();
883 
884  _return.user = user_metadata.userName;
885  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
886  _return.start_time = session_ptr->get_start_time();
887  _return.is_super = user_metadata.isSuper;
888 }
889 
891  const SQLTypeInfo& ti,
892  TColumn& column) {
893  if (ti.is_array()) {
894  TColumn tColumn;
895  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
896  CHECK(array_tv);
897  bool is_null = !array_tv->is_initialized();
898  if (!is_null) {
899  const auto& vec = array_tv->get();
900  for (const auto& elem_tv : vec) {
901  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
902  }
903  }
904  column.data.arr_col.push_back(tColumn);
905  column.nulls.push_back(is_null && !ti.get_notnull());
906  } else if (ti.is_geometry()) {
907  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
908  if (scalar_tv) {
909  auto s_n = boost::get<NullableString>(scalar_tv);
910  auto s = boost::get<std::string>(s_n);
911  if (s) {
912  column.data.str_col.push_back(*s);
913  } else {
914  column.data.str_col.emplace_back(""); // null string
915  auto null_p = boost::get<void*>(s_n);
916  CHECK(null_p && !*null_p);
917  }
918  column.nulls.push_back(!s && !ti.get_notnull());
919  } else {
920  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
921  CHECK(array_tv);
922  bool is_null = !array_tv->is_initialized();
923  if (!is_null) {
924  auto elem_type = SQLTypeInfo(kDOUBLE, false);
925  TColumn tColumn;
926  const auto& vec = array_tv->get();
927  for (const auto& elem_tv : vec) {
928  value_to_thrift_column(elem_tv, elem_type, tColumn);
929  }
930  column.data.arr_col.push_back(tColumn);
931  column.nulls.push_back(false);
932  } else {
933  TColumn tColumn;
934  column.data.arr_col.push_back(tColumn);
935  column.nulls.push_back(is_null && !ti.get_notnull());
936  }
937  }
938  } else {
939  CHECK(!ti.is_column());
940  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
941  CHECK(scalar_tv);
942  if (boost::get<int64_t>(scalar_tv)) {
943  int64_t data = *(boost::get<int64_t>(scalar_tv));
944 
945  if (ti.is_decimal()) {
946  double val = static_cast<double>(data);
947  if (ti.get_scale() > 0) {
948  val /= pow(10.0, std::abs(ti.get_scale()));
949  }
950  column.data.real_col.push_back(val);
951  } else {
952  column.data.int_col.push_back(data);
953  }
954 
955  switch (ti.get_type()) {
956  case kBOOLEAN:
957  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
958  break;
959  case kTINYINT:
960  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
961  break;
962  case kSMALLINT:
963  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
964  break;
965  case kINT:
966  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
967  break;
968  case kNUMERIC:
969  case kDECIMAL:
970  case kBIGINT:
971  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
972  break;
973  case kTIME:
974  case kTIMESTAMP:
975  case kDATE:
976  case kINTERVAL_DAY_TIME:
978  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
979  break;
980  default:
981  column.nulls.push_back(false);
982  }
983  } else if (boost::get<double>(scalar_tv)) {
984  double data = *(boost::get<double>(scalar_tv));
985  column.data.real_col.push_back(data);
986  if (ti.get_type() == kFLOAT) {
987  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
988  } else {
989  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
990  }
991  } else if (boost::get<float>(scalar_tv)) {
992  CHECK_EQ(kFLOAT, ti.get_type());
993  float data = *(boost::get<float>(scalar_tv));
994  column.data.real_col.push_back(data);
995  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
996  } else if (boost::get<NullableString>(scalar_tv)) {
997  auto s_n = boost::get<NullableString>(scalar_tv);
998  auto s = boost::get<std::string>(s_n);
999  if (s) {
1000  column.data.str_col.push_back(*s);
1001  } else {
1002  column.data.str_col.emplace_back(""); // null string
1003  auto null_p = boost::get<void*>(s_n);
1004  CHECK(null_p && !*null_p);
1005  }
1006  column.nulls.push_back(!s && !ti.get_notnull());
1007  } else {
1008  CHECK(false);
1009  }
1010  }
1011 }
1012 
1014  TDatum datum;
1015  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1016  if (!scalar_tv) {
1017  CHECK(ti.is_array());
1018  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1019  CHECK(array_tv);
1020  if (array_tv->is_initialized()) {
1021  const auto& vec = array_tv->get();
1022  for (const auto& elem_tv : vec) {
1023  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
1024  datum.val.arr_val.push_back(scalar_col_val);
1025  }
1026  // Datum is not null, at worst it's an empty array Datum
1027  datum.is_null = false;
1028  } else {
1029  datum.is_null = true;
1030  }
1031  return datum;
1032  }
1033  if (boost::get<int64_t>(scalar_tv)) {
1034  int64_t data = *(boost::get<int64_t>(scalar_tv));
1035 
1036  if (ti.is_decimal()) {
1037  double val = static_cast<double>(data);
1038  if (ti.get_scale() > 0) {
1039  val /= pow(10.0, std::abs(ti.get_scale()));
1040  }
1041  datum.val.real_val = val;
1042  } else {
1043  datum.val.int_val = data;
1044  }
1045 
1046  switch (ti.get_type()) {
1047  case kBOOLEAN:
1048  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
1049  break;
1050  case kTINYINT:
1051  datum.is_null = (datum.val.int_val == NULL_TINYINT);
1052  break;
1053  case kSMALLINT:
1054  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
1055  break;
1056  case kINT:
1057  datum.is_null = (datum.val.int_val == NULL_INT);
1058  break;
1059  case kDECIMAL:
1060  case kNUMERIC:
1061  case kBIGINT:
1062  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1063  break;
1064  case kTIME:
1065  case kTIMESTAMP:
1066  case kDATE:
1067  case kINTERVAL_DAY_TIME:
1068  case kINTERVAL_YEAR_MONTH:
1069  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1070  break;
1071  default:
1072  datum.is_null = false;
1073  }
1074  } else if (boost::get<double>(scalar_tv)) {
1075  datum.val.real_val = *(boost::get<double>(scalar_tv));
1076  if (ti.get_type() == kFLOAT) {
1077  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1078  } else {
1079  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
1080  }
1081  } else if (boost::get<float>(scalar_tv)) {
1082  CHECK_EQ(kFLOAT, ti.get_type());
1083  datum.val.real_val = *(boost::get<float>(scalar_tv));
1084  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1085  } else if (boost::get<NullableString>(scalar_tv)) {
1086  auto s_n = boost::get<NullableString>(scalar_tv);
1087  auto s = boost::get<std::string>(s_n);
1088  if (s) {
1089  datum.val.str_val = *s;
1090  } else {
1091  auto null_p = boost::get<void*>(s_n);
1092  CHECK(null_p && !*null_p);
1093  }
1094  datum.is_null = !s;
1095  } else {
1096  CHECK(false);
1097  }
1098  return datum;
1099 }
1100 
1102  TQueryResult& _return,
1103  const QueryStateProxy& query_state_proxy,
1104  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1105  const std::string& query_str,
1106  const bool column_format,
1107  const std::string& nonce,
1108  const int32_t first_n,
1109  const int32_t at_most_n,
1110  const bool use_calcite) {
1111  _return.total_time_ms = 0;
1112  _return.nonce = nonce;
1113  ParserWrapper pw{query_str};
1114  switch (pw.getQueryType()) {
1116  _return.query_type = TQueryType::READ;
1117  VLOG(1) << "query type: READ";
1118  break;
1119  }
1121  _return.query_type = TQueryType::WRITE;
1122  VLOG(1) << "query type: WRITE";
1123  break;
1124  }
1126  _return.query_type = TQueryType::SCHEMA_READ;
1127  VLOG(1) << "query type: SCHEMA READ";
1128  break;
1129  }
1131  _return.query_type = TQueryType::SCHEMA_WRITE;
1132  VLOG(1) << "query type: SCHEMA WRITE";
1133  break;
1134  }
1135  default: {
1136  _return.query_type = TQueryType::UNKNOWN;
1137  LOG(WARNING) << "query type: UNKNOWN";
1138  break;
1139  }
1140  }
1142  _return.total_time_ms += measure<>::execution([&]() {
1144  query_state_proxy,
1145  column_format,
1146  session_ptr->get_executor_device_type(),
1147  first_n,
1148  at_most_n,
1149  use_calcite);
1151  _return, result, query_state_proxy, query_str, column_format, first_n, at_most_n);
1152  });
1153 }
1154 
1155 void DBHandler::convertData(TQueryResult& _return,
1157  const QueryStateProxy& query_state_proxy,
1158  const std::string& query_str,
1159  const bool column_format,
1160  const int32_t first_n,
1161  const int32_t at_most_n) {
1162  _return.execution_time_ms += result.getExecutionTime();
1163  if (result.empty()) {
1164  return;
1165  }
1166 
1167  switch (result.getResultType()) {
1169  convertRows(_return,
1170  query_state_proxy,
1171  result.getTargetsMeta(),
1172  *result.getRows(),
1173  column_format,
1174  first_n,
1175  at_most_n);
1176  break;
1178  convertResult(_return, *result.getRows(), true);
1179  break;
1181  convertExplain(_return, *result.getRows(), true);
1182  break;
1184  convertRows(_return,
1185  query_state_proxy,
1186  result.getTargetsMeta(),
1187  *result.getRows(),
1188  column_format,
1189  -1,
1190  -1);
1191  break;
1192  }
1193 }
1194 
1195 void DBHandler::sql_execute(TQueryResult& _return,
1196  const TSessionId& session,
1197  const std::string& query_str,
1198  const bool column_format,
1199  const std::string& nonce,
1200  const int32_t first_n,
1201  const int32_t at_most_n) {
1202  const std::string exec_ra_prefix = "execute relalg";
1203  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1204  auto actual_query =
1205  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1206  auto session_ptr = get_session_ptr(session);
1207  auto query_state = create_query_state(session_ptr, actual_query);
1208  auto stdlog = STDLOG(session_ptr, query_state);
1209  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1210  stdlog.appendNameValuePairs("nonce", nonce);
1211  auto timer = DEBUG_TIMER(__func__);
1212  try {
1213  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
1214  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1215  };
1216 
1217  if (first_n >= 0 && at_most_n >= 0) {
1219  std::string("At most one of first_n and at_most_n can be set"));
1220  }
1221 
1222  if (leaf_aggregator_.leafCount() > 0) {
1223  if (!agg_handler_) {
1224  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
1225  }
1226  _return.total_time_ms = measure<>::execution([&]() {
1227  agg_handler_->cluster_execute(_return,
1228  query_state->createQueryStateProxy(),
1229  query_state->getQueryStr(),
1230  column_format,
1231  nonce,
1232  first_n,
1233  at_most_n,
1235  });
1236  _return.nonce = nonce;
1237  } else {
1238  sql_execute_local(_return,
1239  query_state->createQueryStateProxy(),
1240  session_ptr,
1241  actual_query,
1242  column_format,
1243  nonce,
1244  first_n,
1245  at_most_n,
1246  use_calcite);
1247  }
1248  _return.total_time_ms += process_geo_copy_from(session);
1249  std::string debug_json = timer.stopAndGetJson();
1250  if (!debug_json.empty()) {
1251  _return.__set_debug(std::move(debug_json));
1252  }
1253  stdlog.appendNameValuePairs(
1254  "execution_time_ms",
1255  _return.execution_time_ms,
1256  "total_time_ms", // BE-3420 - Redundant with duration field
1257  stdlog.duration<std::chrono::milliseconds>());
1258  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1259  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1260  } catch (const std::exception& e) {
1261  if (strstr(e.what(), "java.lang.NullPointerException")) {
1262  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
1263  "query failed from broken view or other schema related issue");
1264  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1265  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1266  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1267  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1268  } else {
1269  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1270  }
1271  }
1272 }
1273 
1275  const TSessionId& session,
1276  const std::string& query_str,
1277  const bool column_format,
1278  const int32_t first_n,
1279  const int32_t at_most_n) {
1280  const std::string exec_ra_prefix = "execute relalg";
1281  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1282  auto actual_query =
1283  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1284 
1285  auto session_ptr = get_session_ptr(session);
1286  auto query_state = create_query_state(session_ptr, actual_query);
1287  auto stdlog = STDLOG(session_ptr, query_state);
1288  auto timer = DEBUG_TIMER(__func__);
1289 
1290  try {
1291  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
1292  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1293  };
1294 
1295  if (first_n >= 0 && at_most_n >= 0) {
1297  std::string("At most one of first_n and at_most_n can be set"));
1298  }
1299  auto total_time_ms = measure<>::execution([&]() {
1301  query_state->createQueryStateProxy(),
1302  column_format,
1303  session_ptr->get_executor_device_type(),
1304  first_n,
1305  at_most_n,
1306  use_calcite);
1307  });
1308 
1309  _return.setExecutionTime(total_time_ms + process_geo_copy_from(session));
1310 
1311  stdlog.appendNameValuePairs(
1312  "execution_time_ms",
1313  _return.getExecutionTime(),
1314  "total_time_ms", // BE-3420 - Redundant with duration field
1315  stdlog.duration<std::chrono::milliseconds>());
1316  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1317  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1318  } catch (const std::exception& e) {
1319  if (strstr(e.what(), "java.lang.NullPointerException")) {
1320  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
1321  "query failed from broken view or other schema related issue");
1322  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1323  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1324  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1325  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1326  } else {
1327  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1328  }
1329  }
1330 }
1331 
1332 int64_t DBHandler::process_geo_copy_from(const TSessionId& session_id) {
1333  int64_t total_time_ms(0);
1334  // if the SQL statement we just executed was a geo COPY FROM, the import
1335  // parameters were captured, and this flag set, so we do the actual import here
1336  if (auto geo_copy_from_state = geo_copy_from_sessions(session_id)) {
1337  // import_geo_table() calls create_table() which calls this function to
1338  // do the work, so reset the flag now to avoid executing this part a
1339  // second time at the end of that, which would fail as the table was
1340  // already created! Also reset the flag with a ScopeGuard on exiting
1341  // this function any other way, such as an exception from the code above!
1342  geo_copy_from_sessions.remove(session_id);
1343 
1344  // create table as replicated?
1345  TCreateParams create_params;
1346  if (geo_copy_from_state->geo_copy_from_partitions == "REPLICATED") {
1347  create_params.is_replicated = true;
1348  }
1349 
1350  // now do (and time) the import
1351  total_time_ms = measure<>::execution([&]() {
1353  session_id,
1354  geo_copy_from_state->geo_copy_from_table,
1355  geo_copy_from_state->geo_copy_from_file_name,
1356  copyparams_to_thrift(geo_copy_from_state->geo_copy_from_copy_params),
1357  TRowDescriptor(),
1358  create_params);
1359  });
1360  }
1361  return total_time_ms;
1362 }
1363 
1364 void DBHandler::sql_execute_df(TDataFrame& _return,
1365  const TSessionId& session,
1366  const std::string& query_str,
1367  const TDeviceType::type device_type,
1368  const int32_t device_id,
1369  const int32_t first_n,
1370  const TArrowTransport::type transport_method) {
1371  auto session_ptr = get_session_ptr(session);
1372  auto query_state = create_query_state(session_ptr, query_str);
1373  auto stdlog = STDLOG(session_ptr, query_state);
1374 
1375  if (device_type == TDeviceType::GPU) {
1376  const auto executor_device_type = session_ptr->get_executor_device_type();
1377  if (executor_device_type != ExecutorDeviceType::GPU) {
1379  std::string("Exception: GPU mode is not allowed in this session"));
1380  }
1381  if (!data_mgr_->gpusPresent()) {
1382  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
1383  }
1384  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1386  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
1387  }
1388  }
1389  _return.execution_time_ms = 0;
1390 
1391  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
1394  auto query_state_proxy = query_state->createQueryStateProxy();
1395  try {
1396  ParserWrapper pw{query_str};
1397  if (!pw.is_ddl && !pw.is_update_dml &&
1398  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
1399  std::string query_ra;
1401  _return.execution_time_ms += measure<>::execution([&]() {
1402  TPlanResult result;
1403  std::tie(result, locks) =
1404  parse_to_ra(query_state_proxy, query_str, {}, true, system_parameters_);
1405  query_ra = result.plan_result;
1406  });
1407 
1408  if (pw.isCalciteExplain()) {
1409  throw std::runtime_error("explain is not unsupported by current thrift API");
1410  }
1413  executor->enrollQuerySession(session_ptr->get_session_id(),
1414  query_str,
1415  query_state->getQuerySubmittedTime(),
1417  QuerySessionStatus::QueryStatus::PENDING_QUEUE);
1418  }
1419  execute_rel_alg_df(_return,
1420  query_ra,
1421  query_state_proxy,
1422  *session_ptr,
1423  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
1425  static_cast<size_t>(device_id),
1426  first_n,
1427  transport_method);
1428  return;
1429  }
1430  } catch (std::exception& e) {
1431  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1432  }
1434  "Exception: DDL or update DML are not unsupported by current thrift API");
1435 }
1436 
1437 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1438  const TSessionId& session,
1439  const std::string& query_str,
1440  const int32_t device_id,
1441  const int32_t first_n) {
1442  auto stdlog = STDLOG(get_session_ptr(session));
1443  sql_execute_df(_return,
1444  session,
1445  query_str,
1446  TDeviceType::GPU,
1447  device_id,
1448  first_n,
1449  TArrowTransport::SHARED_MEMORY);
1450 }
1451 
1452 // For now we have only one user of a data frame in all cases.
1453 void DBHandler::deallocate_df(const TSessionId& session,
1454  const TDataFrame& df,
1455  const TDeviceType::type device_type,
1456  const int32_t device_id) {
1457  auto stdlog = STDLOG(get_session_ptr(session));
1458  std::string serialized_cuda_handle = "";
1459  if (device_type == TDeviceType::GPU) {
1460  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1461  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1462  TOmniSciException ex;
1463  ex.error_msg = std::string(
1464  "Exception: current data frame handle is not bookkept or been inserted "
1465  "twice");
1466  LOG(ERROR) << ex.error_msg;
1467  throw ex;
1468  }
1469  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1470  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1471  }
1472  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1473  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1475  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1477  result,
1478  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1479  device_id,
1480  data_mgr_);
1481 }
1482 
1483 std::string DBHandler::apply_copy_to_shim(const std::string& query_str) {
1484  auto result = query_str;
1485  {
1486  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
1487  // boost::regex::icase};
1488  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
1489  boost::regex::extended | boost::regex::icase};
1490  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
1491  result.replace(
1492  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
1493  });
1494  }
1495  return result;
1496 }
1497 
1498 void DBHandler::sql_validate(TRowDescriptor& _return,
1499  const TSessionId& session,
1500  const std::string& query_str) {
1501  try {
1502  auto stdlog = STDLOG(get_session_ptr(session));
1503  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1504  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1505  stdlog.setQueryState(query_state);
1506 
1507  ParserWrapper pw{query_str};
1508  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1509  pw.is_update_dml) {
1510  throw std::runtime_error("Can only validate SELECT statements.");
1511  }
1512 
1513  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
1516 
1517  TPlanResult parse_result;
1519  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1520  query_state->getQueryStr(),
1521  {},
1522  true,
1524  /*check_privileges=*/true);
1525  const auto query_ra = parse_result.plan_result;
1526 
1527  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1528  _return = fixup_row_descriptor(result.row_set.row_desc,
1529  query_state->getConstSessionInfo()->getCatalog());
1530  } catch (const std::exception& e) {
1531  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
1532  }
1533 }
1534 
1535 namespace {
1536 
1538  std::unordered_set<std::string> uc_column_names;
1539  std::unordered_set<std::string> uc_column_table_qualifiers;
1540 };
1541 
1542 // Extract what looks like a (qualified) identifier from the partial query.
1543 // The results will be used to rank the auto-completion results: tables which
1544 // contain at least one of the identifiers first.
1546  const std::string& sql) {
1547  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1548  boost::regex::extended | boost::regex::icase};
1549  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1550  boost::sregex_token_iterator end;
1551  std::unordered_set<std::string> uc_column_names;
1552  std::unordered_set<std::string> uc_column_table_qualifiers;
1553  for (; tok_it != end; ++tok_it) {
1554  std::string column_name = *tok_it;
1555  std::vector<std::string> column_tokens;
1556  boost::split(column_tokens, column_name, boost::is_any_of("."));
1557  if (column_tokens.size() == 2) {
1558  // If the column name is qualified, take user's word.
1559  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1560  } else {
1561  uc_column_names.insert(to_upper(column_name));
1562  }
1563  }
1564  return {uc_column_names, uc_column_table_qualifiers};
1565 }
1566 
1567 } // namespace
1568 
1569 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1570  const TSessionId& session,
1571  const std::string& sql,
1572  const int cursor) {
1573  auto stdlog = STDLOG(get_session_ptr(session));
1574  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1575  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1576  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1577  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1578  proj_tokens.uc_column_names, visible_tables, stdlog);
1579  // Add the table qualifiers explicitly specified by the user.
1580  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1581  proj_tokens.uc_column_table_qualifiers.end());
1582  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1583  std::sort(
1584  hints.begin(),
1585  hints.end(),
1586  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1587  if (lhs.type == TCompletionHintType::TABLE &&
1588  rhs.type == TCompletionHintType::TABLE) {
1589  // Between two tables, one which is compatible with the specified
1590  // projections and one which isn't, pick the one which is compatible.
1591  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1592  compatible_table_names.end() &&
1593  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1594  compatible_table_names.end()) {
1595  return true;
1596  }
1597  }
1598  return lhs.type < rhs.type;
1599  });
1600 }
1601 
1602 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1603  std::vector<std::string>& visible_tables,
1604  query_state::StdLog& stdlog,
1605  const std::string& sql,
1606  const int cursor) {
1607  const auto& session_info = *stdlog.getConstSessionInfo();
1608  try {
1609  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1610  // Filter out keywords suggested by Calcite which we don't support.
1612  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1613  } catch (const std::exception& e) {
1614  TOmniSciException ex;
1615  ex.error_msg = "Exception: " + std::string(e.what());
1616  LOG(ERROR) << ex.error_msg;
1617  throw ex;
1618  }
1619  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1620  const size_t length_to_cursor =
1621  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1622  // Trust hints from Calcite after the FROM keyword.
1623  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1624  return;
1625  }
1626  // Before FROM, the query is too incomplete for context-sensitive completions.
1627  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1628 }
1629 
1630 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1631  query_state::StdLog& stdlog,
1632  std::vector<std::string>& visible_tables,
1633  const std::string& sql,
1634  const int cursor) {
1635  const auto last_word =
1636  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1637  boost::regex select_expr{R"(\s*select\s+)",
1638  boost::regex::extended | boost::regex::icase};
1639  const size_t length_to_cursor =
1640  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1641  // After SELECT but before FROM, look for all columns in all tables which match the
1642  // prefix.
1643  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1644  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1645  // Trust the fully qualified columns the most.
1646  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1647  return;
1648  }
1649  // Not much information to use, just retrieve column names which match the prefix.
1650  if (should_suggest_column_hints(sql)) {
1651  get_column_hints(hints, last_word, column_names_by_table);
1652  return;
1653  }
1654  const std::string kFromKeyword{"FROM"};
1655  if (boost::istarts_with(kFromKeyword, last_word)) {
1656  TCompletionHint keyword_hint;
1657  keyword_hint.type = TCompletionHintType::KEYWORD;
1658  keyword_hint.replaced = last_word;
1659  keyword_hint.hints.emplace_back(kFromKeyword);
1660  hints.push_back(keyword_hint);
1661  }
1662  } else {
1663  const std::string kSelectKeyword{"SELECT"};
1664  if (boost::istarts_with(kSelectKeyword, last_word)) {
1665  TCompletionHint keyword_hint;
1666  keyword_hint.type = TCompletionHintType::KEYWORD;
1667  keyword_hint.replaced = last_word;
1668  keyword_hint.hints.emplace_back(kSelectKeyword);
1669  hints.push_back(keyword_hint);
1670  }
1671  }
1672 }
1673 
1674 std::unordered_map<std::string, std::unordered_set<std::string>>
1675 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1676  query_state::StdLog& stdlog) {
1677  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1678  for (auto it = table_names.begin(); it != table_names.end();) {
1679  TTableDetails table_details;
1680  try {
1681  get_table_details_impl(table_details, stdlog, *it, false, false);
1682  } catch (const TOmniSciException& e) {
1683  // Remove the corrupted Table/View name from the list for further processing.
1684  it = table_names.erase(it);
1685  continue;
1686  }
1687  for (const auto& column_type : table_details.row_desc) {
1688  column_names_by_table[*it].emplace(column_type.col_name);
1689  }
1690  ++it;
1691  }
1692  return column_names_by_table;
1693 }
1694 
1698 }
1699 
1701  const std::unordered_set<std::string>& uc_column_names,
1702  std::vector<std::string>& table_names,
1703  query_state::StdLog& stdlog) {
1704  std::unordered_set<std::string> compatible_table_names_by_column;
1705  for (auto it = table_names.begin(); it != table_names.end();) {
1706  TTableDetails table_details;
1707  try {
1708  get_table_details_impl(table_details, stdlog, *it, false, false);
1709  } catch (const TOmniSciException& e) {
1710  // Remove the corrupted Table/View name from the list for further processing.
1711  it = table_names.erase(it);
1712  continue;
1713  }
1714  for (const auto& column_type : table_details.row_desc) {
1715  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1716  compatible_table_names_by_column.emplace(to_upper(*it));
1717  break;
1718  }
1719  }
1720  ++it;
1721  }
1722  return compatible_table_names_by_column;
1723 }
1724 
1725 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1726  QueryStateProxy query_state_proxy) {
1727  TQueryResult _return;
1729  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1730  [this, &result, &query_state_proxy, &query_ra](const size_t executor_index) {
1731  execute_rel_alg(result,
1732  query_state_proxy,
1733  query_ra,
1734  true,
1736  -1,
1737  -1,
1738  /*just_validate=*/true,
1739  /*find_filter_push_down_candidates=*/false,
1741  executor_index);
1742  });
1744  dispatch_queue_->submit(execute_rel_alg_task, /*is_update_delete=*/false);
1745  auto result_future = execute_rel_alg_task->get_future();
1746  result_future.get();
1747  DBHandler::convertData(_return, result, query_state_proxy, query_ra, true, -1, -1);
1748  return _return;
1749 }
1750 
1751 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1752  auto stdlog = STDLOG(get_session_ptr(session));
1753  auto session_ptr = stdlog.getConstSessionInfo();
1754  if (!session_ptr->get_currentUser().isSuper) {
1755  // WARNING: This appears to not include roles a user is a member of,
1756  // if the role has no permissions granted to it.
1757  roles =
1758  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1759  session_ptr->getCatalog().getCurrentDB().dbId);
1760  } else {
1761  roles = SysCatalog::instance().getRoles(
1762  false, true, session_ptr->get_currentUser().userName);
1763  }
1764 }
1765 
1766 bool DBHandler::has_role(const TSessionId& sessionId,
1767  const std::string& granteeName,
1768  const std::string& roleName) {
1769  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1770  const auto session_ptr = stdlog.getConstSessionInfo();
1771  const auto current_user = session_ptr->get_currentUser();
1772  if (!current_user.isSuper) {
1773  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1774  user && current_user.userName != granteeName) {
1775  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1776  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1777  current_user.userName, granteeName, true)) {
1779  "Only super users can check roles assignment that have not been directly "
1780  "granted to a user.");
1781  }
1782  }
1783  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1784 }
1785 
1786 static TDBObject serialize_db_object(const std::string& roleName,
1787  const DBObject& inObject) {
1788  TDBObject outObject;
1789  outObject.objectName = inObject.getName();
1790  outObject.grantee = roleName;
1791  outObject.objectId = inObject.getObjectKey().objectId;
1792  const auto ap = inObject.getPrivileges();
1793  switch (inObject.getObjectKey().permissionType) {
1794  case DatabaseDBObjectType:
1795  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1796  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1797  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1798  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1799  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1800 
1801  break;
1802  case TableDBObjectType:
1803  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1804  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1805  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1806  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1807  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1808  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1809  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1810  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1811  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1812 
1813  break;
1814  case DashboardDBObjectType:
1815  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1816  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1817  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1818  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1819  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1820 
1821  break;
1822  case ViewDBObjectType:
1823  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1824  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1825  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1826  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1827  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1828  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1829  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1830 
1831  break;
1832  case ServerDBObjectType:
1833  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1834  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1835  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1836  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1837  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::SERVER_USAGE));
1838 
1839  break;
1840  default:
1841  CHECK(false);
1842  }
1843  const int type_val = static_cast<int>(inObject.getType());
1844  CHECK(type_val >= 0 && type_val < 6);
1845  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1846  return outObject;
1847 }
1848 
1850  const TDBObjectPermissions& permissions) {
1851  if (!permissions.__isset.database_permissions_) {
1852  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1853  }
1854  auto perms = permissions.database_permissions_;
1855  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1856  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1857  (perms.view_sql_editor_ &&
1859  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1860  return false;
1861  } else {
1862  return true;
1863  }
1864 }
1865 
1867  const TDBObjectPermissions& permissions) {
1868  if (!permissions.__isset.table_permissions_) {
1869  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1870  }
1871  auto perms = permissions.table_permissions_;
1872  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1873  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1874  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1875  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1876  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1877  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1878  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1879  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1880  return false;
1881  } else {
1882  return true;
1883  }
1884 }
1885 
1887  const TDBObjectPermissions& permissions) {
1888  if (!permissions.__isset.dashboard_permissions_) {
1889  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1890  }
1891  auto perms = permissions.dashboard_permissions_;
1892  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1893  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1894  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1895  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1896  return false;
1897  } else {
1898  return true;
1899  }
1900 }
1901 
1903  const TDBObjectPermissions& permissions) {
1904  if (!permissions.__isset.view_permissions_) {
1905  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1906  }
1907  auto perms = permissions.view_permissions_;
1908  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1909  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1910  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1911  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1912  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1913  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1914  return false;
1915  } else {
1916  return true;
1917  }
1918 }
1919 
1921  const TDBObjectPermissions& permissions) {
1922  CHECK(permissions.__isset.server_permissions_);
1923  auto perms = permissions.server_permissions_;
1924  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
1925  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
1926  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER)) ||
1927  (perms.usage_ && !privs.hasPermission(ServerPrivileges::SERVER_USAGE))) {
1928  return false;
1929  } else {
1930  return true;
1931  }
1932 }
1933 
1934 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1935  const std::string& granteeName,
1936  const std::string& objectName,
1937  const TDBObjectType::type objectType,
1938  const TDBObjectPermissions& permissions) {
1939  auto stdlog = STDLOG(get_session_ptr(sessionId));
1940  auto session_ptr = stdlog.getConstSessionInfo();
1941  auto const& cat = session_ptr->getCatalog();
1942  auto const& current_user = session_ptr->get_currentUser();
1943  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1944  current_user.userName, granteeName, false)) {
1946  "Users except superusers can only check privileges for self or roles granted "
1947  "to "
1948  "them.")
1949  }
1951  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1952  user_meta.isSuper) {
1953  return true;
1954  }
1955  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1956  if (!grnt) {
1957  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1958  }
1960  std::string func_name;
1961  switch (objectType) {
1964  func_name = "database";
1965  break;
1968  func_name = "table";
1969  break;
1972  func_name = "dashboard";
1973  break;
1976  func_name = "view";
1977  break;
1980  func_name = "server";
1981  break;
1982  default:
1983  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1984  }
1985  DBObject req_object(objectName, type);
1986  req_object.loadKey(cat);
1987 
1988  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1989  if (grantee_object) {
1990  // if grantee has privs on the object
1991  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1992  } else {
1993  // no privileges on that object
1994  return false;
1995  }
1996 }
1997 
1998 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1999  const TSessionId& sessionId,
2000  const std::string& roleName) {
2001  auto stdlog = STDLOG(get_session_ptr(sessionId));
2002  auto session_ptr = stdlog.getConstSessionInfo();
2003  auto const& user = session_ptr->get_currentUser();
2004  if (!user.isSuper &&
2005  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
2006  return;
2007  }
2008  auto* rl = SysCatalog::instance().getGrantee(roleName);
2009  if (rl) {
2010  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
2011  for (auto& dbObject : *rl->getDbObjects(true)) {
2012  if (dbObject.first.dbId != dbId) {
2013  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
2014  // usecase for now, though)
2015  continue;
2016  }
2017  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
2018  TDBObjectsForRole.push_back(tdbObject);
2019  }
2020  } else {
2021  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
2022  }
2023 }
2024 
2025 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
2026  const TSessionId& sessionId,
2027  const std::string& objectName,
2028  const TDBObjectType::type type) {
2029  auto stdlog = STDLOG(get_session_ptr(sessionId));
2030  auto session_ptr = stdlog.getConstSessionInfo();
2031  DBObjectType object_type;
2032  switch (type) {
2034  object_type = DBObjectType::DatabaseDBObjectType;
2035  break;
2037  object_type = DBObjectType::TableDBObjectType;
2038  break;
2041  break;
2043  object_type = DBObjectType::ViewDBObjectType;
2044  break;
2046  object_type = DBObjectType::ServerDBObjectType;
2047  break;
2048  default:
2049  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
2050  ": unknown object type (" + std::to_string(type) + ").");
2051  }
2052  DBObject object_to_find(objectName, object_type);
2053 
2054  // TODO(adb): Use DatabaseLock to protect method
2055  try {
2056  if (object_type == DashboardDBObjectType) {
2057  if (objectName == "") {
2058  object_to_find = DBObject(-1, object_type);
2059  } else {
2060  object_to_find = DBObject(std::stoi(objectName), object_type);
2061  }
2062  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
2063  !objectName.empty()) {
2064  // special handling for view / table
2065  auto const& cat = session_ptr->getCatalog();
2066  auto td = cat.getMetadataForTable(objectName, false);
2067  if (td) {
2068  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
2069  object_to_find = DBObject(objectName, object_type);
2070  }
2071  }
2072  object_to_find.loadKey(session_ptr->getCatalog());
2073  } catch (const std::exception&) {
2074  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
2075  }
2076 
2077  // object type on database level
2078  DBObject object_to_find_dblevel("", object_type);
2079  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
2080  // if user is superuser respond with a full priv
2081  if (session_ptr->get_currentUser().isSuper) {
2082  // using ALL_TABLE here to set max permissions
2083  DBObject dbObj{object_to_find.getObjectKey(),
2085  session_ptr->get_currentUser().userId};
2086  dbObj.setName("super");
2087  TDBObjects.push_back(
2088  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
2089  };
2090 
2091  std::vector<std::string> grantees =
2092  SysCatalog::instance().getRoles(true,
2093  session_ptr->get_currentUser().isSuper,
2094  session_ptr->get_currentUser().userName);
2095  for (const auto& grantee : grantees) {
2096  DBObject* object_found;
2097  auto* gr = SysCatalog::instance().getGrantee(grantee);
2098  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
2099  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2100  }
2101  // check object permissions on Database level
2102  if (gr &&
2103  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
2104  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2105  }
2106  }
2107 }
2108 
2109 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
2110  const TSessionId& sessionId,
2111  const std::string& granteeName) {
2112  auto stdlog = STDLOG(get_session_ptr(sessionId));
2113  auto session_ptr = stdlog.getConstSessionInfo();
2114  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
2115  if (grantee) {
2116  if (session_ptr->get_currentUser().isSuper) {
2117  roles = grantee->getRoles();
2118  } else if (grantee->isUser()) {
2119  if (session_ptr->get_currentUser().userName == granteeName) {
2120  roles = grantee->getRoles();
2121  } else {
2123  "Only a superuser is authorized to request list of roles granted to "
2124  "another "
2125  "user.");
2126  }
2127  } else {
2128  CHECK(!grantee->isUser());
2129  // granteeName is actually a roleName here and we can check a role
2130  // only if it is granted to us
2131  if (SysCatalog::instance().isRoleGrantedToGrantee(
2132  session_ptr->get_currentUser().userName, granteeName, false)) {
2133  roles = grantee->getRoles();
2134  } else {
2135  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
2136  }
2137  }
2138  } else {
2139  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
2140  }
2141 }
2142 
2143 namespace {
2145  const std::map<std::string, std::vector<std::string>>& table_col_names) {
2146  std::ostringstream oss;
2147  for (const auto& [table_name, col_names] : table_col_names) {
2148  oss << ":" << table_name;
2149  for (const auto& col_name : col_names) {
2150  oss << "," << col_name;
2151  }
2152  }
2153  return oss.str();
2154 }
2155 } // namespace
2156 
2158  TPixelTableRowResult& _return,
2159  const TSessionId& session,
2160  const int64_t widget_id,
2161  const TPixel& pixel,
2162  const std::map<std::string, std::vector<std::string>>& table_col_names,
2163  const bool column_format,
2164  const int32_t pixel_radius,
2165  const std::string& nonce) {
2166  auto stdlog = STDLOG(get_session_ptr(session),
2167  "widget_id",
2168  widget_id,
2169  "pixel.x",
2170  pixel.x,
2171  "pixel.y",
2172  pixel.y,
2173  "column_format",
2174  column_format,
2175  "pixel_radius",
2176  pixel_radius,
2177  "table_col_names",
2178  dump_table_col_names(table_col_names),
2179  "nonce",
2180  nonce);
2181  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2182  auto session_ptr = stdlog.getSessionInfo();
2183  if (!render_handler_) {
2184  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
2185  }
2186 
2187  try {
2188  render_handler_->get_result_row_for_pixel(_return,
2189  session_ptr,
2190  widget_id,
2191  pixel,
2192  table_col_names,
2193  column_format,
2194  pixel_radius,
2195  nonce);
2196  } catch (std::exception& e) {
2197  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2198  }
2199 }
2200 
2202  const ColumnDescriptor* cd) {
2203  TColumnType col_type;
2204  col_type.col_name = cd->columnName;
2205  col_type.src_name = cd->sourceName;
2206  col_type.col_id = cd->columnId;
2207  col_type.col_type.type = type_to_thrift(cd->columnType);
2208  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
2209  col_type.col_type.nullable = !cd->columnType.get_notnull();
2210  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
2211  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
2212  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
2213  }
2214  if (IS_GEO(cd->columnType.get_type())) {
2216  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
2217  } else {
2218  col_type.col_type.precision = cd->columnType.get_precision();
2219  col_type.col_type.scale = cd->columnType.get_scale();
2220  }
2221  col_type.is_system = cd->isSystemCol;
2223  cat != nullptr) {
2224  // have to get the actual size of the encoding from the dictionary definition
2225  const int dict_id = cd->columnType.get_comp_param();
2226  if (!cat->getMetadataForDict(dict_id, false)) {
2227  col_type.col_type.comp_param = 0;
2228  return col_type;
2229  }
2230  auto dd = cat->getMetadataForDict(dict_id, false);
2231  if (!dd) {
2232  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
2233  }
2234  col_type.col_type.comp_param = dd->dictNBits;
2235  } else {
2236  col_type.col_type.comp_param =
2237  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
2238  ? 32
2239  : cd->columnType.get_comp_param();
2240  }
2241  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
2242  return col_type;
2243 }
2244 
2245 void DBHandler::get_internal_table_details(TTableDetails& _return,
2246  const TSessionId& session,
2247  const std::string& table_name) {
2248  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2249  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2250  get_table_details_impl(_return, stdlog, table_name, true, false);
2251 }
2252 
2253 void DBHandler::get_table_details(TTableDetails& _return,
2254  const TSessionId& session,
2255  const std::string& table_name) {
2256  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2257  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2258  get_table_details_impl(_return, stdlog, table_name, false, false);
2259 }
2260 
2261 void DBHandler::get_table_details_impl(TTableDetails& _return,
2262  query_state::StdLog& stdlog,
2263  const std::string& table_name,
2264  const bool get_system,
2265  const bool get_physical) {
2266  try {
2267  auto session_info = stdlog.getSessionInfo();
2268  auto& cat = session_info->getCatalog();
2269  const auto td_with_lock =
2271  cat, table_name, false);
2272  const auto td = td_with_lock();
2273  CHECK(td);
2274 
2275  bool have_privileges_on_view_sources = true;
2276  if (td->isView) {
2277  auto query_state = create_query_state(session_info, td->viewSQL);
2278  stdlog.setQueryState(query_state);
2279  try {
2280  if (hasTableAccessPrivileges(td, *session_info)) {
2281  // TODO(adb): we should take schema read locks on all tables making up the
2282  // view, at minimum
2283  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
2284  query_state->getQueryStr(),
2285  {},
2286  false,
2288  false);
2289  try {
2290  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2291  query_ra.first);
2292  } catch (const std::runtime_error&) {
2293  have_privileges_on_view_sources = false;
2294  }
2295 
2296  const auto result = validate_rel_alg(query_ra.first.plan_result,
2297  query_state->createQueryStateProxy());
2298 
2299  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
2300  } else {
2301  throw std::runtime_error(
2302  "Unable to access view " + table_name +
2303  ". The view may not exist, or the logged in user may not "
2304  "have permission to access the view.");
2305  }
2306  } catch (const std::exception& e) {
2307  throw std::runtime_error("View '" + table_name +
2308  "' query has failed with an error: '" +
2309  std::string(e.what()) +
2310  "'.\nThe view must be dropped and re-created to "
2311  "resolve the error. \nQuery:\n" +
2312  query_state->getQueryStr());
2313  }
2314  } else {
2315  if (hasTableAccessPrivileges(td, *session_info)) {
2316  const auto col_descriptors =
2317  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
2318  const auto deleted_cd = cat.getDeletedColumn(td);
2319  for (const auto cd : col_descriptors) {
2320  if (cd == deleted_cd) {
2321  continue;
2322  }
2323  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
2324  }
2325  } else {
2326  throw std::runtime_error(
2327  "Unable to access table " + table_name +
2328  ". The table may not exist, or the logged in user may not "
2329  "have permission to access the table.");
2330  }
2331  }
2332  _return.fragment_size = td->maxFragRows;
2333  _return.page_size = td->fragPageSize;
2334  _return.max_rows = td->maxRows;
2335  _return.view_sql =
2336  (have_privileges_on_view_sources ? td->viewSQL
2337  : "[Not enough privileges to see the view SQL]");
2338  _return.shard_count = td->nShards;
2339  _return.key_metainfo = td->keyMetainfo;
2340  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2341  _return.partition_detail =
2342  td->partitions.empty()
2344  : (table_is_replicated(td)
2345  ? TPartitionDetail::REPLICATED
2346  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2347  : TPartitionDetail::OTHER));
2348  } catch (const std::runtime_error& e) {
2349  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2350  }
2351 }
2352 
2353 void DBHandler::get_link_view(TFrontendView& _return,
2354  const TSessionId& session,
2355  const std::string& link) {
2356  auto stdlog = STDLOG(get_session_ptr(session));
2357  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2358  auto session_ptr = stdlog.getConstSessionInfo();
2359  auto const& cat = session_ptr->getCatalog();
2360  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2361  if (!ld) {
2362  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
2363  }
2364  _return.view_state = ld->viewState;
2365  _return.view_name = ld->link;
2366  _return.update_time = ld->updateTime;
2367  _return.view_metadata = ld->viewMetadata;
2368 }
2369 
2371  const TableDescriptor* td,
2372  const Catalog_Namespace::SessionInfo& session_info) {
2373  auto& cat = session_info.getCatalog();
2374  auto user_metadata = session_info.get_currentUser();
2375 
2376  if (user_metadata.isSuper) {
2377  return true;
2378  }
2379 
2381  dbObject.loadKey(cat);
2382  std::vector<DBObject> privObjects = {dbObject};
2383 
2384  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2385 }
2386 
2387 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2388  const Catalog_Namespace::SessionInfo& session_info,
2389  const GetTablesType get_tables_type) {
2390  table_names = session_info.getCatalog().getTableNamesForUser(
2391  session_info.get_currentUser(), get_tables_type);
2392 }
2393 
2394 void DBHandler::get_tables(std::vector<std::string>& table_names,
2395  const TSessionId& session) {
2396  auto stdlog = STDLOG(get_session_ptr(session));
2397  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2399  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2400 }
2401 
2402 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2403  const TSessionId& session) {
2404  auto stdlog = STDLOG(get_session_ptr(session));
2405  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2406  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2407 }
2408 
2409 void DBHandler::get_views(std::vector<std::string>& table_names,
2410  const TSessionId& session) {
2411  auto stdlog = STDLOG(get_session_ptr(session));
2412  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2413  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2414 }
2415 
2416 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2417  QueryStateProxy query_state_proxy,
2418  const Catalog_Namespace::SessionInfo& session_info,
2419  const bool with_table_locks) {
2420  const auto& cat = session_info.getCatalog();
2421  const auto tables = cat.getAllTableMetadata();
2422  _return.reserve(tables.size());
2423 
2424  for (const auto td : tables) {
2425  if (td->shard >= 0) {
2426  // skip shards, they're not standalone tables
2427  continue;
2428  }
2429  if (!hasTableAccessPrivileges(td, session_info)) {
2430  // skip table, as there are no privileges to access it
2431  continue;
2432  }
2433 
2434  TTableMeta ret;
2435  ret.table_name = td->tableName;
2436  ret.is_view = td->isView;
2437  ret.is_replicated = table_is_replicated(td);
2438  ret.shard_count = td->nShards;
2439  ret.max_rows = td->maxRows;
2440  ret.table_id = td->tableId;
2441 
2442  std::vector<TTypeInfo> col_types;
2443  std::vector<std::string> col_names;
2444  size_t num_cols = 0;
2445  if (td->isView) {
2446  try {
2447  TPlanResult parse_result;
2449  std::tie(parse_result, locks) = parse_to_ra(
2450  query_state_proxy, td->viewSQL, {}, with_table_locks, system_parameters_);
2451  const auto query_ra = parse_result.plan_result;
2452 
2453  ExecutionResult ex_result;
2454  execute_rel_alg(ex_result,
2455  query_state_proxy,
2456  query_ra,
2457  true,
2459  -1,
2460  -1,
2461  /*just_validate=*/true,
2462  /*find_push_down_candidates=*/false,
2464  TQueryResult result;
2466  result, ex_result, query_state_proxy, query_ra, true, -1, -1);
2467  num_cols = result.row_set.row_desc.size();
2468  for (const auto& col : result.row_set.row_desc) {
2469  if (col.is_physical) {
2470  num_cols--;
2471  continue;
2472  }
2473  col_types.push_back(col.col_type);
2474  col_names.push_back(col.col_name);
2475  }
2476  } catch (std::exception& e) {
2477  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
2478  }
2479  } else {
2480  try {
2481  if (hasTableAccessPrivileges(td, session_info)) {
2482  const auto col_descriptors =
2483  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2484  const auto deleted_cd = cat.getDeletedColumn(td);
2485  for (const auto cd : col_descriptors) {
2486  if (cd == deleted_cd) {
2487  continue;
2488  }
2489  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2490  col_names.push_back(cd->columnName);
2491  }
2492  num_cols = col_descriptors.size();
2493  } else {
2494  continue;
2495  }
2496  } catch (const std::runtime_error& e) {
2497  THROW_MAPD_EXCEPTION(e.what());
2498  }
2499  }
2500 
2501  ret.num_cols = num_cols;
2502  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2503  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2504 
2505  _return.push_back(ret);
2506  }
2507 }
2508 
2509 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2510  const TSessionId& session) {
2511  auto stdlog = STDLOG(get_session_ptr(session));
2512  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2513  auto session_ptr = stdlog.getConstSessionInfo();
2514  auto query_state = create_query_state(session_ptr, "");
2515  stdlog.setQueryState(query_state);
2516 
2517  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2520 
2521  try {
2522  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2523  } catch (const std::exception& e) {
2524  THROW_MAPD_EXCEPTION(e.what());
2525  }
2526 }
2527 
2528 void DBHandler::get_users(std::vector<std::string>& user_names,
2529  const TSessionId& session) {
2530  auto stdlog = STDLOG(get_session_ptr(session));
2531  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2532  auto session_ptr = stdlog.getConstSessionInfo();
2533  std::list<Catalog_Namespace::UserMetadata> user_list;
2534 
2535  if (!session_ptr->get_currentUser().isSuper) {
2536  user_list = SysCatalog::instance().getAllUserMetadata(
2537  session_ptr->getCatalog().getCurrentDB().dbId);
2538  } else {
2539  user_list = SysCatalog::instance().getAllUserMetadata();
2540  }
2541  for (auto u : user_list) {
2542  user_names.push_back(u.userName);
2543  }
2544 }
2545 
2546 void DBHandler::get_version(std::string& version) {
2547  version = MAPD_RELEASE;
2548 }
2549 
2550 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2551  auto stdlog = STDLOG(get_session_ptr(session));
2552  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2553  auto session_ptr = stdlog.getConstSessionInfo();
2554  if (!session_ptr->get_currentUser().isSuper) {
2555  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2556  }
2557  try {
2559  } catch (const std::exception& e) {
2560  THROW_MAPD_EXCEPTION(e.what());
2561  }
2562  if (render_handler_) {
2563  render_handler_->clear_gpu_memory();
2564  }
2565 
2566  if (leaf_aggregator_.leafCount() > 0) {
2568  }
2569 }
2570 
2571 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2572  auto stdlog = STDLOG(get_session_ptr(session));
2573  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2574  auto session_ptr = stdlog.getConstSessionInfo();
2575  if (!session_ptr->get_currentUser().isSuper) {
2576  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2577  }
2578  try {
2580  } catch (const std::exception& e) {
2581  THROW_MAPD_EXCEPTION(e.what());
2582  }
2583  if (render_handler_) {
2584  render_handler_->clear_cpu_memory();
2585  }
2586 
2587  if (leaf_aggregator_.leafCount() > 0) {
2589  }
2590 }
2591 
2592 void DBHandler::set_cur_session(const TSessionId& parent_session,
2593  const TSessionId& leaf_session,
2594  const std::string& start_time_str,
2595  const std::string& label) {
2596  // internal API to manage query interruption in distributed mode
2597  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2598  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2599  auto session_ptr = stdlog.getConstSessionInfo();
2600 
2602  executor->enrollQuerySession(parent_session,
2603  label,
2604  start_time_str,
2606  QuerySessionStatus::QueryStatus::RUNNING);
2607  if (leaf_aggregator_.leafCount() > 0) {
2608  leaf_aggregator_.set_cur_session(parent_session, start_time_str, label);
2609  }
2610 }
2611 
2612 void DBHandler::invalidate_cur_session(const TSessionId& parent_session,
2613  const TSessionId& leaf_session,
2614  const std::string& start_time_str,
2615  const std::string& label) {
2616  // internal API to manage query interruption in distributed mode
2617  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2618  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2620  executor->clearQuerySessionStatus(parent_session, start_time_str, false);
2621  if (leaf_aggregator_.leafCount() > 0) {
2622  leaf_aggregator_.invalidate_cur_session(parent_session, start_time_str, label);
2623  }
2624 }
2625 
2627  return INVALID_SESSION_ID;
2628 }
2629 
2630 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2631  const TSessionId& session,
2632  const std::string& memory_level) {
2633  auto stdlog = STDLOG(get_session_ptr(session));
2634  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2635  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2636  Data_Namespace::MemoryLevel mem_level;
2637  if (!memory_level.compare("gpu")) {
2639  internal_memory =
2640  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2641  } else {
2643  internal_memory =
2644  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2645  }
2646 
2647  for (auto memInfo : internal_memory) {
2648  TNodeMemoryInfo nodeInfo;
2649  if (leaf_aggregator_.leafCount() > 0) {
2650  nodeInfo.host_name = omnisci::get_hostname();
2651  }
2652  nodeInfo.page_size = memInfo.pageSize;
2653  nodeInfo.max_num_pages = memInfo.maxNumPages;
2654  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2655  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2656  for (auto gpu : memInfo.nodeMemoryData) {
2657  TMemoryData md;
2658  md.slab = gpu.slabNum;
2659  md.start_page = gpu.startPage;
2660  md.num_pages = gpu.numPages;
2661  md.touch = gpu.touch;
2662  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2663  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2664  nodeInfo.node_memory_data.push_back(md);
2665  }
2666  _return.push_back(nodeInfo);
2667  }
2668  if (leaf_aggregator_.leafCount() > 0) {
2669  std::vector<TNodeMemoryInfo> leafSummary =
2670  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2671  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2672  }
2673 }
2674 
2675 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2676  auto stdlog = STDLOG(get_session_ptr(session));
2677  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2678  auto session_ptr = stdlog.getConstSessionInfo();
2679  const auto& user = session_ptr->get_currentUser();
2681  SysCatalog::instance().getDatabaseListForUser(user);
2682  for (auto& db : dbs) {
2683  TDBInfo dbinfo;
2684  dbinfo.db_name = std::move(db.dbName);
2685  dbinfo.db_owner = std::move(db.dbOwnerName);
2686  dbinfos.push_back(std::move(dbinfo));
2687  }
2688 }
2689 
2690 void DBHandler::set_execution_mode(const TSessionId& session,
2691  const TExecuteMode::type mode) {
2692  auto stdlog = STDLOG(get_session_ptr(session));
2693  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2694  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
2695  auto session_it = get_session_it_unsafe(session, write_lock);
2696  if (leaf_aggregator_.leafCount() > 0) {
2697  leaf_aggregator_.set_execution_mode(session, mode);
2698  try {
2699  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2700  } catch (const TOmniSciException& e) {
2701  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2702  }
2703  return;
2704  }
2705  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2706 }
2707 
2708 namespace {
2709 
2711  if (td && td->nShards) {
2712  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2713  }
2714 }
2715 
2716 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
2717  const std::vector<std::string>& column_names) {
2718  std::unordered_set<std::string> unique_names;
2719  for (const auto& name : column_names) {
2720  auto lower_name = to_lower(name);
2721  if (unique_names.find(lower_name) != unique_names.end()) {
2722  THROW_MAPD_EXCEPTION("Column " + name + " is mentioned multiple times");
2723  } else {
2724  unique_names.insert(lower_name);
2725  }
2726  }
2727  for (const auto& cd : descs) {
2728  auto iter = unique_names.find(to_lower(cd->columnName));
2729  if (iter != unique_names.end()) {
2730  unique_names.erase(iter);
2731  }
2732  }
2733  if (!unique_names.empty()) {
2734  THROW_MAPD_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
2735  }
2736 }
2737 
2738 // Return vector of IDs mapping column descriptors to the list of comumn names.
2739 // The size of the vector is the number of actual columns (geophisical columns excluded).
2740 // ID is either a position in column_names matching the descriptor, or -1 if the column
2741 // is missing from the column_names
2742 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
2743  const std::vector<std::string>& column_names) {
2744  std::vector<int> desc_to_column_ids;
2745  if (column_names.empty()) {
2746  int col_idx = 0;
2747  for (const auto& cd : descs) {
2748  if (!cd->isGeoPhyCol) {
2749  desc_to_column_ids.push_back(col_idx);
2750  ++col_idx;
2751  }
2752  }
2753  } else {
2754  for (const auto& cd : descs) {
2755  if (!cd->isGeoPhyCol) {
2756  bool found = false;
2757  for (size_t j = 0; j < column_names.size(); ++j) {
2758  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
2759  found = true;
2760  desc_to_column_ids.push_back(j);
2761  break;
2762  }
2763  }
2764  if (!found) {
2765  if (!cd->columnType.get_notnull()) {
2766  desc_to_column_ids.push_back(-1);
2767  } else {
2768  THROW_MAPD_EXCEPTION("Column '" + cd->columnName +
2769  "' cannot be omitted due to NOT NULL constraint");
2770  }
2771  }
2772  }
2773  }
2774  }
2775  return desc_to_column_ids;
2776 }
2777 
2778 } // namespace
2779 
2780 void DBHandler::load_table_binary(const TSessionId& session,
2781  const std::string& table_name,
2782  const std::vector<TRow>& rows,
2783  const std::vector<std::string>& column_names) {
2784  try {
2785  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2786  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2787  auto session_ptr = stdlog.getConstSessionInfo();
2788 
2789  if (rows.empty()) {
2790  THROW_MAPD_EXCEPTION("No rows to insert");
2791  }
2792 
2793  std::unique_ptr<import_export::Loader> loader;
2794  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2795  auto schema_read_lock = prepare_loader_generic(*session_ptr,
2796  table_name,
2797  rows.front().cols.size(),
2798  &loader,
2799  &import_buffers,
2800  column_names,
2801  "load_table_binary");
2802 
2803  auto col_descs = loader->get_column_descs();
2804  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
2805 
2806  TDatum empty_value;
2807  empty_value.is_null = true;
2808  size_t rows_completed = 0;
2809  for (auto const& row : rows) {
2810  size_t col_idx = 0;
2811  try {
2812  for (auto cd : col_descs) {
2813  int mapped_idx = desc_id_to_column_id[col_idx];
2814  if (mapped_idx != -1) {
2815  import_buffers[col_idx]->add_value(
2816  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
2817  } else {
2818  import_buffers[col_idx]->add_value(cd, empty_value, true);
2819  }
2820  col_idx++;
2821  }
2822  rows_completed++;
2823  } catch (const std::exception& e) {
2824  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2825  import_buffers[col_idx_to_pop]->pop_value();
2826  }
2827  LOG(ERROR) << "Input exception thrown: " << e.what()
2828  << ". Row discarded, issue at column : " << (col_idx + 1)
2829  << " data :" << row;
2830  }
2831  }
2832  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2833  session_ptr->getCatalog(), table_name);
2834  if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
2835  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
2836  }
2837  } catch (const std::exception& e) {
2838  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2839  }
2840 }
2841 
2842 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
2844  const Catalog_Namespace::SessionInfo& session_info,
2845  const std::string& table_name,
2846  size_t num_cols,
2847  std::unique_ptr<import_export::Loader>* loader,
2848  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
2849  const std::vector<std::string>& column_names,
2850  std::string load_type) {
2851  if (num_cols == 0) {
2852  THROW_MAPD_EXCEPTION("No columns to insert");
2853  }
2854  check_read_only(load_type);
2855  auto& cat = session_info.getCatalog();
2856  auto td_with_lock =
2857  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
2859  cat, table_name, true));
2860  const auto td = (*td_with_lock)();
2861  CHECK(td);
2862 
2863  if (g_cluster && !leaf_aggregator_.leafCount()) {
2864  // Sharded table rows need to be routed to the leaf by an aggregator.
2866  }
2867  check_table_load_privileges(session_info, table_name);
2868 
2869  if (leaf_aggregator_.leafCount() > 0) {
2870  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2871  } else {
2872  loader->reset(new import_export::Loader(cat, td));
2873  }
2874 
2875  auto col_descs = (*loader)->get_column_descs();
2876  check_valid_column_names(col_descs, column_names);
2877  if (column_names.empty()) {
2878  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2879  // Subtracting 1 (rowid) until TableDescriptor is updated.
2880  auto geo_physical_cols = std::count_if(
2881  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2882  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
2883  (td->hasDeletedCol ? 2 : 1);
2884  if (num_cols != num_table_cols) {
2885  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
2886  ") does not match number of columns in table " +
2887  td->tableName + " (" + std::to_string(num_table_cols) +
2888  ")");
2889  }
2890  } else if (num_cols != column_names.size()) {
2892  "Number of columns specified does not match the "
2893  "number of columns given (" +
2894  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
2895  }
2896 
2897  *import_buffers = import_export::setup_column_loaders(td, loader->get());
2898  return std::move(td_with_lock);
2899 }
2900 namespace {
2901 
2902 size_t get_column_size(const TColumn& column) {
2903  if (!column.nulls.empty()) {
2904  return column.nulls.size();
2905  } else {
2906  // it is a very bold estimate but later we check it against REAL data
2907  // and if this function returns a wrong result (e.g. both int and string
2908  // vectors are filled with values), we get an error
2909  return column.data.int_col.size() + column.data.arr_col.size() +
2910  column.data.real_col.size() + column.data.str_col.size();
2911  }
2912 }
2913 
2914 } // namespace
2915 
2916 void DBHandler::load_table_binary_columnar(const TSessionId& session,
2917  const std::string& table_name,
2918  const std::vector<TColumn>& cols,
2919  const std::vector<std::string>& column_names) {
2921  session, table_name, cols, column_names, AssignRenderGroupsMode::kNone);
2922 }
2923 
2925  const TSessionId& session,
2926  const std::string& table_name,
2927  const std::vector<TColumn>& cols,
2928  const std::vector<std::string>& column_names,
2929  const bool assign_render_groups) {
2931  table_name,
2932  cols,
2933  column_names,
2934  assign_render_groups
2937 }
2938 
2940  const TSessionId& session,
2941  const std::string& table_name,
2942  const std::vector<TColumn>& cols,
2943  const std::vector<std::string>& column_names,
2944  const AssignRenderGroupsMode assign_render_groups_mode) {
2945  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2946  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2947  auto session_ptr = stdlog.getConstSessionInfo();
2948 
2949  if (assign_render_groups_mode == AssignRenderGroupsMode::kCleanUp) {
2950  // throw if the user tries to pass column data on a clean-up
2951  if (cols.size()) {
2953  "load_table_binary_columnar_polys: Column data must be empty when called with "
2954  "assign_render_groups = false");
2955  }
2956 
2957  // mutex the map access
2958  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
2959 
2960  // drop persistent render group assignment data for this session and table
2961  // keep the per-session map in case other tables are active (ideally not)
2962  auto itr_session = render_group_assignment_map_.find(session);
2963  if (itr_session != render_group_assignment_map_.end()) {
2964  LOG(INFO) << "load_table_binary_columnar_polys: Cleaning up Render Group "
2965  "Assignment Persistent Data for Session '"
2966  << session << "', Table '" << table_name << "'";
2967  itr_session->second.erase(table_name);
2968  }
2969 
2970  // just doing clean-up, so we're done
2971  return;
2972  }
2973 
2974  std::unique_ptr<import_export::Loader> loader;
2975  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2976  auto schema_read_lock = prepare_loader_generic(*session_ptr,
2977  table_name,
2978  cols.size(),
2979  &loader,
2980  &import_buffers,
2981  column_names,
2982  "load_table_binary_columnar");
2983 
2984  auto desc_id_to_column_id =
2985  column_ids_by_names(loader->get_column_descs(), column_names);
2986  size_t num_rows = get_column_size(cols.front());
2987  size_t import_idx = 0; // index into the TColumn vector being loaded
2988  size_t col_idx = 0; // index into column description vector
2989  TColumn empty_column;
2990  empty_column.nulls.resize(num_rows, true);
2991  empty_column.data.arr_col.resize(num_rows);
2992  empty_column.data.int_col.resize(num_rows);
2993  empty_column.data.str_col.resize(num_rows);
2994  empty_column.data.real_col.resize(num_rows);
2995  try {
2996  size_t skip_physical_cols = 0;
2997  for (auto cd : loader->get_column_descs()) {
2998  if (skip_physical_cols > 0) {
2999  if (!cd->isGeoPhyCol) {
3000  throw std::runtime_error("Unexpected physical column");
3001  }
3002  skip_physical_cols--;
3003  continue;
3004  }
3005  int mapped_idx = desc_id_to_column_id[import_idx];
3006  const TColumn& value = mapped_idx == -1 ? empty_column : cols[mapped_idx];
3007  size_t col_rows = import_buffers[col_idx]->add_values(cd, value);
3008  if (col_rows != num_rows) {
3009  std::ostringstream oss;
3010  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
3011  << cd->columnName << " , expecting " << num_rows << " rows, column "
3012  << col_idx << " has " << col_rows << " rows";
3013  THROW_MAPD_EXCEPTION(oss.str());
3014  }
3015  // Advance to the next column in the table
3016  col_idx++;
3017 
3018  // For geometry columns: process WKT strings and fill physical columns
3019  if (cd->columnType.is_geometry()) {
3020  auto geo_col_idx = col_idx - 1;
3021  const auto wkt_or_wkb_hex_column =
3022  import_buffers[geo_col_idx]->getGeoStringBuffer();
3023  std::vector<std::vector<double>> coords_column, bounds_column;
3024  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
3025  std::vector<int> render_groups_column;
3026  SQLTypeInfo ti = cd->columnType;
3027  if (num_rows != wkt_or_wkb_hex_column->size() ||
3028  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
3029  ti,
3030  coords_column,
3031  bounds_column,
3032  ring_sizes_column,
3033  poly_rings_column,
3034  false)) {
3035  std::ostringstream oss;
3036  oss << "load_table_binary_columnar: Invalid geometry in column "
3037  << cd->columnName;
3038  THROW_MAPD_EXCEPTION(oss.str());
3039  }
3040 
3041  // start or continue assigning render groups for poly columns?
3042  if (IS_GEO_POLY(cd->columnType.get_type()) &&
3043  assign_render_groups_mode == AssignRenderGroupsMode::kAssign) {
3044  // get RGA to use
3045  import_export::RenderGroupAnalyzer* render_group_analyzer{};
3046  {
3047  // mutex the map access
3048  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
3049 
3050  // emplace new RGA or fetch existing RGA from map
3051  auto [itr_table, emplaced_table] = render_group_assignment_map_.try_emplace(
3052  session, RenderGroupAssignmentTableMap());
3053  LOG_IF(INFO, emplaced_table)
3054  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3055  "Persistent Data for Session '"
3056  << session << "'";
3057  auto [itr_column, emplaced_column] = itr_table->second.try_emplace(
3058  table_name, RenderGroupAssignmentColumnMap());
3059  LOG_IF(INFO, emplaced_column)
3060  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3061  "Persistent Data for Table '"
3062  << table_name << "'";
3063  auto [itr_analyzer, emplaced_analyzer] = itr_column->second.try_emplace(
3064  cd->columnName, std::make_unique<import_export::RenderGroupAnalyzer>());
3065  LOG_IF(INFO, emplaced_analyzer)
3066  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
3067  "Persistent Data for Column '"
3068  << cd->columnName << "'";
3069  render_group_analyzer = itr_analyzer->second.get();
3070  CHECK(render_group_analyzer);
3071 
3072  // seed new RGA from existing table/column, to handle appends
3073  if (emplaced_analyzer) {
3074  LOG(INFO) << "load_table_binary_columnar_polys: Seeding Render Groups from "
3075  "existing table...";
3076  render_group_analyzer->seedFromExistingTableContents(
3077  session_ptr->getCatalog(), table_name, cd->columnName);
3078  LOG(INFO) << "load_table_binary_columnar_polys: Done";
3079  }
3080  }
3081 
3082  // assign render groups for this set of bounds
3083  LOG(INFO) << "load_table_binary_columnar_polys: Assigning Render Groups...";
3084  render_groups_column.reserve(bounds_column.size());
3085  for (auto const& bounds : bounds_column) {
3086  CHECK_EQ(bounds.size(), 4u);
3087  int rg = render_group_analyzer->insertBoundsAndReturnRenderGroup(bounds);
3088  render_groups_column.push_back(rg);
3089  }
3090  LOG(INFO) << "load_table_binary_columnar_polys: Done";
3091  } else {
3092  // render groups all zero
3093  render_groups_column.resize(bounds_column.size(), 0);
3094  }
3095 
3096  // Populate physical columns, advance col_idx
3098  session_ptr->getCatalog(),
3099  cd,
3100  import_buffers,
3101  col_idx,
3102  coords_column,
3103  bounds_column,
3104  ring_sizes_column,
3105  poly_rings_column,
3106  render_groups_column);
3107  skip_physical_cols = cd->columnType.get_physical_cols();
3108  }
3109  // Advance to the next column of values being loaded
3110  import_idx++;
3111  }
3112  } catch (const std::exception& e) {
3113  std::ostringstream oss;
3114  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
3115  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3116  THROW_MAPD_EXCEPTION(oss.str());
3117  }
3118  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3119  session_ptr->getCatalog(), table_name);
3120  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3121  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3122  }
3123 }
3124 
3125 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
3126 
3127 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3128  do { \
3129  ::arrow::Status _s = (s); \
3130  if (UNLIKELY(!_s.ok())) { \
3131  TOmniSciException ex; \
3132  ex.error_msg = _s.ToString(); \
3133  LOG(ERROR) << s.ToString(); \
3134  throw ex; \
3135  } \
3136  } while (0)
3137 
3138 namespace {
3139 
3140 RecordBatchVector loadArrowStream(const std::string& stream) {
3141  RecordBatchVector batches;
3142  try {
3143  // TODO(wesm): Make this simpler in general, see ARROW-1600
3144  auto stream_buffer =
3145  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
3146  static_cast<int64_t>(stream.size()));
3147 
3148  arrow::io::BufferReader buf_reader(stream_buffer);
3149  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3150  ARROW_ASSIGN_OR_THROW(batch_reader,
3151  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3152 
3153  while (true) {
3154  std::shared_ptr<arrow::RecordBatch> batch;
3155  // Read batch (zero-copy) from the stream
3156  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
3157  if (batch == nullptr) {
3158  break;
3159  }
3160  batches.emplace_back(std::move(batch));
3161  }
3162  } catch (const std::exception& e) {
3163  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
3164  }
3165  return batches;
3166 }
3167 
3168 } // namespace
3169 
3170 void DBHandler::load_table_binary_arrow(const TSessionId& session,
3171  const std::string& table_name,
3172  const std::string& arrow_stream,
3173  const bool use_column_names) {
3174  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3175  auto session_ptr = stdlog.getConstSessionInfo();
3176 
3177  RecordBatchVector batches = loadArrowStream(arrow_stream);
3178  // Assuming have one batch for now
3179  if (batches.size() != 1) {
3180  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
3181  }
3182  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3183  std::unique_ptr<import_export::Loader> loader;
3184  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3185  std::vector<std::string> column_names;
3186  if (use_column_names) {
3187  column_names = batch->schema()->field_names();
3188  }
3189  auto schema_read_lock =
3190  prepare_loader_generic(*session_ptr,
3191  table_name,
3192  static_cast<size_t>(batch->num_columns()),
3193  &loader,
3194  &import_buffers,
3195  column_names,
3196  "load_table_binary_arrow");
3197 
3198  auto desc_id_to_column_id =
3199  column_ids_by_names(loader->get_column_descs(), column_names);
3200  size_t num_rows = 0;
3201  size_t col_idx = 0;
3202  std::shared_ptr<arrow::Array> empty_array;
3203  {
3204  arrow::BooleanBuilder builder;
3205  ARROW_THROW_NOT_OK(builder.Resize(batch->num_rows()));
3206  ARROW_THROW_NOT_OK(builder.AppendNulls(batch->num_rows()));
3207  auto status = builder.Finish(&empty_array);
3208  if (!status.ok()) {
3209  THROW_MAPD_EXCEPTION("Failed to load data: " + status.message());
3210  }
3211  }
3212 
3213  try {
3214  for (auto cd : loader->get_column_descs()) {
3215  int mapped_idx = desc_id_to_column_id[col_idx];
3216  if (mapped_idx != -1) {
3217  auto& array = *batch->column(mapped_idx);
3218  import_export::ArraySliceRange row_slice(0, array.length());
3219  num_rows = import_buffers[col_idx]->add_arrow_values(
3220  cd, array, true, row_slice, nullptr);
3221  } else {
3222  import_export::ArraySliceRange row_slice(0, empty_array->length());
3223  num_rows = import_buffers[col_idx]->add_arrow_values(
3224  cd, *empty_array, false, row_slice, nullptr);
3225  }
3226  col_idx++;
3227  }
3228  } catch (const std::exception& e) {
3229  LOG(ERROR) << "Input exception thrown: " << e.what()
3230  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3231  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
3232  // other import paths
3233  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3234  }
3235  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3236  session_ptr->getCatalog(), table_name);
3237  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3238  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3239  }
3240 }
3241 
3242 void DBHandler::load_table(const TSessionId& session,
3243  const std::string& table_name,
3244  const std::vector<TStringRow>& rows,
3245  const std::vector<std::string>& column_names) {
3246  try {
3247  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3248  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3249  auto session_ptr = stdlog.getConstSessionInfo();
3250 
3251  if (rows.empty()) {
3252  THROW_MAPD_EXCEPTION("No rows to insert");
3253  }
3254 
3255  std::unique_ptr<import_export::Loader> loader;
3256  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3257  auto schema_read_lock =
3258  prepare_loader_generic(*session_ptr,
3259  table_name,
3260  static_cast<size_t>(rows.front().cols.size()),
3261  &loader,
3262  &import_buffers,
3263  column_names,
3264  "load_table");
3265 
3266  auto col_descs = loader->get_column_descs();
3267  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3268  import_export::CopyParams copy_params;
3269  size_t rows_completed = 0;
3270  for (auto const& row : rows) {
3271  size_t import_idx = 0; // index into the TStringRow being loaded
3272  size_t col_idx = 0; // index into column description vector
3273  try {
3274  size_t skip_physical_cols = 0;
3275  for (auto cd : col_descs) {
3276  if (skip_physical_cols > 0) {
3277  if (!cd->isGeoPhyCol) {
3278  throw std::runtime_error("Unexpected physical column");
3279  }
3280  skip_physical_cols--;
3281  continue;
3282  }
3283  const std::string empty_val = "";
3284  int mapped_idx = desc_id_to_column_id[import_idx];
3285  const std::string& value =
3286  mapped_idx == -1 ? empty_val : row.cols[mapped_idx].str_val;
3287  bool is_null = mapped_idx == -1 || row.cols[mapped_idx].is_null;
3288  import_buffers[col_idx]->add_value(cd, value, is_null, copy_params);
3289  // Advance to the next column within the table
3290  col_idx++;
3291 
3292  if (cd->columnType.is_geometry()) {
3293  // Populate physical columns
3294  std::vector<double> coords, bounds;
3295  std::vector<int> ring_sizes, poly_rings;
3296  int render_group = 0;
3297  SQLTypeInfo ti{cd->columnType};
3299  !is_null ? value : std::string(),
3300  ti,
3301  coords,
3302  bounds,
3303  ring_sizes,
3304  poly_rings,
3305  false)) {
3306  throw std::runtime_error("Invalid geometry");
3307  }
3308  if (cd->columnType.get_type() != ti.get_type()) {
3309  throw std::runtime_error("Geometry type mismatch");
3310  }
3312  session_ptr->getCatalog(),
3313  cd,
3314  import_buffers,
3315  col_idx,
3316  coords,
3317  bounds,
3318  ring_sizes,
3319  poly_rings,
3320  render_group);
3321  skip_physical_cols = cd->columnType.get_physical_cols();
3322  }
3323  // Advance to the next field within the row
3324  import_idx++;
3325  }
3326  rows_completed++;
3327  } catch (const std::exception& e) {
3328  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3329  import_buffers[col_idx_to_pop]->pop_value();
3330  }
3331  LOG(ERROR) << "Input exception thrown: " << e.what()
3332  << ". Row discarded, issue at column : " << (col_idx + 1)
3333  << " data :" << row;
3334  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3335  }
3336  }
3337  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3338  session_ptr->getCatalog(), table_name);
3339  if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3340  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3341  }
3342 
3343  } catch (const std::exception& e) {
3344  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
3345  }
3346 }
3347 
3348 char DBHandler::unescape_char(std::string str) {
3349  char out = str[0];
3350  if (str.size() == 2 && str[0] == '\\') {
3351  if (str[1] == 't') {
3352  out = '\t';
3353  } else if (str[1] == 'n') {
3354  out = '\n';
3355  } else if (str[1] == '0') {
3356  out = '\0';
3357  } else if (str[1] == '\'') {
3358  out = '\'';
3359  } else if (str[1] == '\\') {
3360  out = '\\';
3361  }
3362  }
3363  return out;
3364 }
3365 
3367  import_export::CopyParams copy_params;
3368  switch (cp.has_header) {
3369  case TImportHeaderRow::AUTODETECT:
3371  break;
3372  case TImportHeaderRow::NO_HEADER:
3374  break;
3375  case TImportHeaderRow::HAS_HEADER:
3377  break;
3378  default:
3379  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
3380  std::to_string((int)cp.has_header));
3381  break;
3382  }
3383  copy_params.quoted = cp.quoted;
3384  if (cp.delimiter.length() > 0) {
3385  copy_params.delimiter = unescape_char(cp.delimiter);
3386  } else {
3387  copy_params.delimiter = '\0';
3388  }
3389  if (cp.null_str.length() > 0) {
3390  copy_params.null_str = cp.null_str;
3391  }
3392  if (cp.quote.length() > 0) {
3393  copy_params.quote = unescape_char(cp.quote);
3394  }
3395  if (cp.escape.length() > 0) {
3396  copy_params.escape = unescape_char(cp.escape);
3397  }
3398  if (cp.line_delim.length() > 0) {
3399  copy_params.line_delim = unescape_char(cp.line_delim);
3400  }
3401  if (cp.array_delim.length() > 0) {
3402  copy_params.array_delim = unescape_char(cp.array_delim);
3403  }
3404  if (cp.array_begin.length() > 0) {
3405  copy_params.array_begin = unescape_char(cp.array_begin);
3406  }
3407  if (cp.array_end.length() > 0) {
3408  copy_params.array_end = unescape_char(cp.array_end);
3409  }
3410  if (cp.threads != 0) {
3411  copy_params.threads = cp.threads;
3412  }
3413  if (cp.s3_access_key.length() > 0) {
3414  copy_params.s3_access_key = cp.s3_access_key;
3415  }
3416  if (cp.s3_secret_key.length() > 0) {
3417  copy_params.s3_secret_key = cp.s3_secret_key;
3418  }
3419  if (cp.s3_region.length() > 0) {
3420  copy_params.s3_region = cp.s3_region;
3421  }
3422  if (cp.s3_endpoint.length() > 0) {
3423  copy_params.s3_endpoint = cp.s3_endpoint;
3424  }
3425  switch (cp.file_type) {
3426  case TFileType::POLYGON:
3428  break;
3429  case TFileType::DELIMITED:
3431  break;
3432 #ifdef ENABLE_IMPORT_PARQUET
3433  case TFileType::PARQUET:
3434  copy_params.file_type = import_export::FileType::PARQUET;
3435  break;
3436 #endif
3437  default:
3438  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
3439  std::to_string((int)cp.file_type));
3440  break;
3441  }
3442  switch (cp.geo_coords_encoding) {
3443  case TEncodingType::GEOINT:
3444  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3445  break;
3446  case TEncodingType::NONE:
3447  copy_params.geo_coords_encoding = kENCODING_NONE;
3448  break;
3449  default:
3450  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3451  std::to_string((int)cp.geo_coords_encoding));
3452  break;
3453  }
3454  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3455  switch (cp.geo_coords_type) {
3456  case TDatumType::GEOGRAPHY:
3457  copy_params.geo_coords_type = kGEOGRAPHY;
3458  break;
3459  case TDatumType::GEOMETRY:
3460  copy_params.geo_coords_type = kGEOMETRY;
3461  break;
3462  default:
3463  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3464  std::to_string((int)cp.geo_coords_type));
3465  break;
3466  }
3467  switch (cp.geo_coords_srid) {
3468  case 4326:
3469  case 3857:
3470  case 900913:
3471  copy_params.geo_coords_srid = cp.geo_coords_srid;
3472  break;
3473  default:
3474  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3475  std::to_string((int)cp.geo_coords_srid));
3476  break;
3477  }
3478  copy_params.sanitize_column_names = cp.sanitize_column_names;
3479  copy_params.geo_layer_name = cp.geo_layer_name;
3480  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3481  copy_params.geo_explode_collections = cp.geo_explode_collections;
3482  copy_params.source_srid = cp.source_srid;
3483  return copy_params;
3484 }
3485 
3487  TCopyParams copy_params;
3488  copy_params.delimiter = cp.delimiter;
3489  copy_params.null_str = cp.null_str;
3490  switch (cp.has_header) {
3492  copy_params.has_header = TImportHeaderRow::AUTODETECT;
3493  break;
3495  copy_params.has_header = TImportHeaderRow::NO_HEADER;
3496  break;
3498  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
3499  break;
3500  default:
3501  CHECK(false);
3502  break;
3503  }
3504  copy_params.quoted = cp.quoted;
3505  copy_params.quote = cp.quote;
3506  copy_params.escape = cp.escape;
3507  copy_params.line_delim = cp.line_delim;
3508  copy_params.array_delim = cp.array_delim;
3509  copy_params.array_begin = cp.array_begin;
3510  copy_params.array_end = cp.array_end;
3511  copy_params.threads = cp.threads;
3512  copy_params.s3_access_key = cp.s3_access_key;
3513  copy_params.s3_secret_key = cp.s3_secret_key;
3514  copy_params.s3_region = cp.s3_region;
3515  copy_params.s3_endpoint = cp.s3_endpoint;
3516  switch (cp.file_type) {
3518  copy_params.file_type = TFileType::POLYGON;
3519  break;
3520  default:
3521  copy_params.file_type = TFileType::DELIMITED;
3522  break;
3523  }
3524  switch (cp.geo_coords_encoding) {
3525  case kENCODING_GEOINT:
3526  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
3527  break;
3528  default:
3529  copy_params.geo_coords_encoding = TEncodingType::NONE;
3530  break;
3531  }
3532  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3533  switch (cp.geo_coords_type) {
3534  case kGEOGRAPHY:
3535  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
3536  break;
3537  case kGEOMETRY:
3538  copy_params.geo_coords_type = TDatumType::GEOMETRY;
3539  break;
3540  default:
3541  CHECK(false);
3542  break;
3543  }
3544  copy_params.geo_coords_srid = cp.geo_coords_srid;
3545  copy_params.sanitize_column_names = cp.sanitize_column_names;
3546  copy_params.geo_layer_name = cp.geo_layer_name;
3547  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3548  copy_params.geo_explode_collections = cp.geo_explode_collections;
3549  copy_params.source_srid = cp.source_srid;
3550  return copy_params;
3551 }
3552 
3553 namespace {
3554 void add_vsi_network_prefix(std::string& path) {
3555  // do we support network file access?
3556  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
3557 
3558  // modify head of filename based on source location
3559  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
3560  if (!gdal_network) {
3562  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
3563  }
3564  // invoke GDAL CURL virtual file reader
3565  path = "/vsicurl/" + path;
3566  } else if (boost::istarts_with(path, "s3://")) {
3567  if (!gdal_network) {
3569  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
3570  }
3571  // invoke GDAL S3 virtual file reader
3572  boost::replace_first(path, "s3://", "/vsis3/");
3573  }
3574 }
3575 
3576 void add_vsi_geo_prefix(std::string& path) {
3577  // single gzip'd file (not an archive)?
3578  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3579  path = "/vsigzip/" + path;
3580  }
3581 }
3582 
3583 void add_vsi_archive_prefix(std::string& path) {
3584  // check for compressed file or file bundle
3585  if (boost::iends_with(path, ".zip")) {
3586  // zip archive
3587  path = "/vsizip/" + path;
3588  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3589  boost::iends_with(path, ".tar.gz")) {
3590  // tar archive (compressed or uncompressed)
3591  path = "/vsitar/" + path;
3592  }
3593 }
3594 
3595 std::string remove_vsi_prefixes(const std::string& path_in) {
3596  std::string path(path_in);
3597 
3598  // these will be first
3599  if (boost::istarts_with(path, "/vsizip/")) {
3600  boost::replace_first(path, "/vsizip/", "");
3601  } else if (boost::istarts_with(path, "/vsitar/")) {
3602  boost::replace_first(path, "/vsitar/", "");
3603  } else if (boost::istarts_with(path, "/vsigzip/")) {
3604  boost::replace_first(path, "/vsigzip/", "");
3605  }
3606 
3607  // then these
3608  if (boost::istarts_with(path, "/vsicurl/")) {
3609  boost::replace_first(path, "/vsicurl/", "");
3610  } else if (boost::istarts_with(path, "/vsis3/")) {
3611  boost::replace_first(path, "/vsis3/", "s3://");
3612  }
3613 
3614  return path;
3615 }
3616 
3617 bool path_is_relative(const std::string& path) {
3618  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
3619  boost::istarts_with(path, "https://")) {
3620  return false;
3621  }
3622  return !boost::filesystem::path(path).is_absolute();
3623 }
3624 
3625 bool path_has_valid_filename(const std::string& path) {
3626  auto filename = boost::filesystem::path(path).filename().string();
3627  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
3628  return false;
3629  }
3630  return true;
3631 }
3632 
3633 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
3634  if (!path_has_valid_filename(path)) {
3635  return false;
3636  }
3637  if (include_gz) {
3638  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
3639  return true;
3640  }
3641  }
3642  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
3643  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
3644  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
3645  boost::iends_with(path, ".gdb.zip")) {
3646  return true;
3647  }
3648  return false;
3649 }
3650 
3651 bool is_a_supported_archive_file(const std::string& path) {
3652  if (!path_has_valid_filename(path)) {
3653  return false;
3654  }
3655  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
3656  return true;
3657  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3658  boost::iends_with(path, ".tar.gz")) {
3659  return true;
3660  }
3661  return false;
3662 }
3663 
3664 std::string find_first_geo_file_in_archive(const std::string& archive_path,
3665  const import_export::CopyParams& copy_params) {
3666  // get the recursive list of all files in the archive
3667  std::vector<std::string> files =
3668  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
3669 
3670  // report the list
3671  LOG(INFO) << "Found " << files.size() << " files in Archive "
3672  << remove_vsi_prefixes(archive_path);
3673  for (const auto& file : files) {
3674  LOG(INFO) << " " << file;
3675  }
3676 
3677  // scan the list for the first candidate file
3678  bool found_suitable_file = false;
3679  std::string file_name;
3680  for (const auto& file : files) {
3681  if (is_a_supported_geo_file(file, false)) {
3682  file_name = file;
3683  found_suitable_file = true;
3684  break;
3685  }
3686  }
3687 
3688  // if we didn't find anything
3689  if (!found_suitable_file) {
3690  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
3691  remove_vsi_prefixes(archive_path);
3692  file_name.clear();
3693  }
3694 
3695  // done
3696  return file_name;
3697 }
3698 
3699 bool is_local_file(const std::string& file_path) {
3700  return (!boost::istarts_with(file_path, "s3://") &&
3701  !boost::istarts_with(file_path, "http://") &&
3702  !boost::istarts_with(file_path, "https://"));
3703 }
3704 
3705 void validate_import_file_path_if_local(const std::string& file_path) {
3706  if (is_local_file(file_path)) {
3708  }
3709 }
3710 } // namespace
3711 
3712 void DBHandler::detect_column_types(TDetectResult& _return,
3713  const TSessionId& session,
3714  const std::string& file_name_in,
3715  const TCopyParams& cp) {
3716  auto stdlog = STDLOG(get_session_ptr(session));
3717  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3718  check_read_only("detect_column_types");
3719 
3721 
3722  std::string file_name{file_name_in};
3723  if (path_is_relative(file_name)) {
3724  // assume relative paths are relative to data_path / mapd_import / <session>
3725  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3726  boost::filesystem::path(file_name).filename();
3727  file_name = file_path.string();
3728  }
3730 
3731  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
3732  if (copy_params.file_type == import_export::FileType::POLYGON) {
3733  if (is_a_supported_geo_file(file_name, true)) {
3734  // prepare to detect geo file directly
3735  add_vsi_network_prefix(file_name);
3736  add_vsi_geo_prefix(file_name);
3737  } else if (is_a_supported_archive_file(file_name)) {
3738  // find the archive file
3739  add_vsi_network_prefix(file_name);
3740  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
3741  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3742  }
3743  // find geo file in archive
3744  add_vsi_archive_prefix(file_name);
3745  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3746  // prepare to detect that geo file
3747  if (geo_file.size()) {
3748  file_name = file_name + std::string("/") + geo_file;
3749  }
3750  } else {
3751  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
3752  file_name_in);
3753  }
3754  }
3755 
3756  auto file_path = boost::filesystem::path(file_name);
3757  // can be a s3 url
3758  if (!boost::istarts_with(file_name, "s3://")) {
3759  if (!boost::filesystem::path(file_name).is_absolute()) {
3760  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3761  boost::filesystem::path(file_name).filename();
3762  file_name = file_path.string();
3763  }
3764 
3765  if (copy_params.file_type == import_export::FileType::POLYGON) {
3766  // check for geo file
3767  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3768  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3769  }
3770  } else {
3771  // check for regular file
3772  if (!boost::filesystem::exists(file_path)) {
3773  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3774  }
3775  }
3776  }
3777 
3778  try {
3780 #ifdef ENABLE_IMPORT_PARQUET
3781  || (copy_params.file_type == import_export::FileType::PARQUET)
3782 #endif
3783  ) {
3784  import_export::Detector detector(file_path, copy_params);
3785  std::vector<SQLTypes> best_types = detector.best_sqltypes;
3786  std::vector<EncodingType> best_encodings = detector.best_encodings;
3787  std::vector<std::string> headers = detector.get_headers();
3788  copy_params = detector.get_copy_params();
3789 
3790  _return.copy_params = copyparams_to_thrift(copy_params);
3791  _return.row_set.row_desc.resize(best_types.size());
3792  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
3793  TColumnType col;
3794  SQLTypes t = best_types[col_idx];
3795  EncodingType encodingType = best_encodings[col_idx];
3796  SQLTypeInfo ti(t, false, encodingType);
3797  if (IS_GEO(t)) {
3798  // set this so encoding_to_thrift does the right thing
3799  ti.set_compression(copy_params.geo_coords_encoding);
3800  // fill in these directly
3801  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3802  col.col_type.scale = copy_params.geo_coords_srid;
3803  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3804  }
3805  col.col_type.type = type_to_thrift(ti);
3806  col.col_type.encoding = encoding_to_thrift(ti);
3807  if (copy_params.sanitize_column_names) {
3808  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3809  } else {
3810  col.col_name = headers[col_idx];
3811  }
3812  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3813  _return.row_set.row_desc[col_idx] = col;
3814  }
3815  size_t num_samples = 100;
3816  auto sample_data = detector.get_sample_rows(num_samples);
3817 
3818  TRow sample_row;
3819  for (auto row : sample_data) {
3820  sample_row.cols.clear();
3821  for (const auto& s : row) {
3822  TDatum td;
3823  td.val.str_val = s;
3824  td.is_null = s.empty();
3825  sample_row.cols.push_back(td);
3826  }
3827  _return.row_set.rows.push_back(sample_row);
3828  }
3829  } else if (copy_params.file_type == import_export::FileType::POLYGON) {
3830  // @TODO simon.eves get this from somewhere!
3831  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
3832 
3833  check_geospatial_files(file_path, copy_params);
3834  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
3835  file_path.string(), geoColumnName, copy_params);
3836  for (auto cd : cds) {
3837  if (copy_params.sanitize_column_names) {
3838  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
3839  }
3840  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
3841  }
3842  std::map<std::string, std::vector<std::string>> sample_data;
3844  file_path.string(), geoColumnName, sample_data, 100, copy_params);
3845  if (sample_data.size() > 0) {
3846  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
3847  TRow sample_row;
3848  for (auto cd : cds) {
3849  TDatum td;
3850  td.val.str_val = sample_data[cd.sourceName].at(i);
3851  td.is_null = td.val.str_val.empty();
3852  sample_row.cols.push_back(td);
3853  }
3854  _return.row_set.rows.push_back(sample_row);
3855  }
3856  }
3857  _return.copy_params = copyparams_to_thrift(copy_params);
3858  }
3859  } catch (const std::exception& e) {
3860  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
3861  }
3862 }
3863 
3864 void DBHandler::render_vega(TRenderResult& _return,
3865  const TSessionId& session,
3866  const int64_t widget_id,
3867  const std::string& vega_json,
3868  const int compression_level,
3869  const std::string& nonce) {
3870  auto stdlog = STDLOG(get_session_ptr(session),
3871  "widget_id",
3872  widget_id,
3873  "compression_level",
3874  compression_level,
3875  "vega_json",
3876  vega_json,
3877  "nonce",
3878  nonce);
3879  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3880  stdlog.appendNameValuePairs("nonce", nonce);
3881  auto session_ptr = stdlog.getConstSessionInfo();
3882  if (!render_handler_) {
3883  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
3884  }
3885 
3886  _return.total_time_ms = measure<>::execution([&]() {
3887  try {
3888  render_handler_->render_vega(
3889  _return,
3890  std::make_shared<Catalog_Namespace::SessionInfo>(*session_ptr),
3891  widget_id,
3892  vega_json,
3893  compression_level,
3894  nonce);
3895  } catch (std::exception& e) {
3896  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3897  }
3898  });
3899 }
3900 
3902  int32_t dashboard_id,
3903  AccessPrivileges requestedPermissions) {
3904  DBObject object(dashboard_id, DashboardDBObjectType);
3905  auto& catalog = session_info.getCatalog();
3906  auto& user = session_info.get_currentUser();
3907  object.loadKey(catalog);
3908  object.setPrivileges(requestedPermissions);
3909  std::vector<DBObject> privs = {object};
3910  return SysCatalog::instance().checkPrivileges(user, privs);
3911 }
3912 
3913 // custom expressions
3914 namespace {
3917 
3918 std::unique_ptr<Catalog_Namespace::CustomExpression> create_custom_expr_from_thrift_obj(
3919  const TCustomExpression& t_custom_expr) {
3920  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
3921  << "Unexpected data source type: "
3922  << static_cast<int>(t_custom_expr.data_source_type);
3923  DataSourceType data_source_type = DataSourceType::TABLE;
3924  return std::make_unique<CustomExpression>(t_custom_expr.name,
3925  t_custom_expr.expression_json,
3926  data_source_type,
3927  t_custom_expr.data_source_id);
3928 }
3929 
3931  const CustomExpression& custom_expr) {
3932  TCustomExpression t_custom_expr;
3933  t_custom_expr.id = custom_expr.id;
3934  t_custom_expr.name = custom_expr.name;
3935  t_custom_expr.expression_json = custom_expr.expression_json;
3936  t_custom_expr.data_source_id = custom_expr.data_source_id;
3937  t_custom_expr.is_deleted = custom_expr.is_deleted;
3938  CHECK(custom_expr.data_source_type == DataSourceType::TABLE)
3939  << "Unexpected data source type: "
3940  << static_cast<int>(custom_expr.data_source_type);
3941  t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
3942  return t_custom_expr;
3943 }
3944 } // namespace
3945 
3946 int32_t DBHandler::create_custom_expression(const TSessionId& session,
3947  const TCustomExpression& t_custom_expr) {
3948  auto stdlog = STDLOG(get_session_ptr(session));
3949  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3950  check_read_only("create_custom_expression");
3951 
3952  auto session_ptr = stdlog.getConstSessionInfo();
3953  if (!session_ptr->get_currentUser().isSuper) {
3954  THROW_MAPD_EXCEPTION("Custom expressions can only be created by super users.")
3955  }
3956  auto& catalog = session_ptr->getCatalog();
3957  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
3958  << "Unexpected data source type: "
3959  << static_cast<int>(t_custom_expr.data_source_type);
3960  if (catalog.getMetadataForTable(t_custom_expr.data_source_id, false) == nullptr) {
3961  THROW_MAPD_EXCEPTION("Custom expression references a table that does not exist.")
3962  }
3963  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
3964  return catalog.createCustomExpression(
3965  create_custom_expr_from_thrift_obj(t_custom_expr));
3966 }
3967 
3968 void DBHandler::get_custom_expressions(std::vector<TCustomExpression>& _return,
3969  const TSessionId& session) {
3970  auto stdlog = STDLOG(get_session_ptr(session));
3971  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3972 
3973  auto session_ptr = stdlog.getConstSessionInfo();
3974  auto& catalog = session_ptr->getCatalog();
3975  mapd_shared_lock<mapd_shared_mutex> read_lock(custom_expressions_mutex_);
3976  auto custom_expressions =
3977  catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
3978  for (const auto& custom_expression : custom_expressions) {
3979  _return.emplace_back(create_thrift_obj_from_custom_expr(*custom_expression));
3980  }
3981 }
3982 
3983 void DBHandler::update_custom_expression(const TSessionId& session,
3984  const int32_t id,
3985  const std::string& expression_json) {
3986  auto stdlog = STDLOG(get_session_ptr(session));
3987  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3988  check_read_only("update_custom_expression");
3989 
3990  auto session_ptr = stdlog.getConstSessionInfo();
3991  if (!session_ptr->get_currentUser().isSuper) {
3992  THROW_MAPD_EXCEPTION("Custom expressions can only be updated by super users.")
3993  }
3994  auto& catalog = session_ptr->getCatalog();
3995  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
3996  catalog.updateCustomExpression(id, expression_json);
3997 }
3998 
4000  const TSessionId& session,
4001  const std::vector<int32_t>& custom_expression_ids,
4002  const bool do_soft_delete) {
4003  auto stdlog = STDLOG(get_session_ptr(session));
4004  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4005  check_read_only("delete_custom_expressions");
4006 
4007  auto session_ptr = stdlog.getConstSessionInfo();
4008  if (!session_ptr->get_currentUser().isSuper) {
4009  THROW_MAPD_EXCEPTION("Custom expressions can only be deleted by super users.")
4010  }
4011  auto& catalog = session_ptr->getCatalog();
4012  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
4013  catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4014 }
4015 
4016 // dashboards
4017 void DBHandler::get_dashboard(TDashboard& dashboard,
4018  const TSessionId& session,
4019  const int32_t dashboard_id) {
4020  auto stdlog = STDLOG(get_session_ptr(session));
4021  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4022  auto session_ptr = stdlog.getConstSessionInfo();
4023  auto const& cat = session_ptr->getCatalog();
4025  auto dash = cat.getMetadataForDashboard(dashboard_id);
4026  if (!dash) {
4027  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
4028  " doesn't exist");
4029  }
4031  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4032  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
4033  std::to_string(dashboard_id));
4034  }
4035  user_meta.userName = "";
4036  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4037  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
4038 }
4039 
4040 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
4041  const TSessionId& session) {
4042  auto stdlog = STDLOG(get_session_ptr(session));
4043  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4044  auto session_ptr = stdlog.getConstSessionInfo();
4045  auto const& cat = session_ptr->getCatalog();
4047  const auto dashes = cat.getAllDashboardsMetadata();
4048  user_meta.userName = "";
4049  for (const auto dash : dashes) {
4051  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4052  // dashboardState is intentionally not populated here
4053  // for payload reasons
4054  // use get_dashboard call to get state
4055  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
4056  }
4057  }
4058 }
4059 
4061  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4063  const DashboardDescriptor* dash,
4064  const bool populate_state) {
4065  auto const& cat = session_ptr->getCatalog();
4066  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4067  auto objects_list = SysCatalog::instance().getMetadataForObject(
4068  cat.getCurrentDB().dbId,
4069  static_cast<int>(DBObjectType::DashboardDBObjectType),
4070  dash->dashboardId);
4071  TDashboard dashboard;
4072  dashboard.dashboard_name = dash->dashboardName;
4073  if (populate_state)
4074  dashboard.dashboard_state = dash->dashboardState;
4075  dashboard.image_hash = dash->imageHash;
4076  dashboard.update_time = dash->updateTime;
4077  dashboard.dashboard_metadata = dash->dashboardMetadata;
4078  dashboard.dashboard_id = dash->dashboardId;
4079  dashboard.dashboard_owner = dash->user;
4080  TDashboardPermissions perms;
4081  // Super user has all permissions.
4082  if (session_ptr->get_currentUser().isSuper) {
4083  perms.create_ = true;
4084  perms.delete_ = true;
4085  perms.edit_ = true;
4086  perms.view_ = true;
4087  } else {
4088  // Collect all grants on current user
4089  // add them to the permissions.
4090  auto obj_to_find =
4091  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
4092  obj_to_find.loadKey(session_ptr->getCatalog());
4093  std::vector<std::string> grantees =
4094  SysCatalog::instance().getRoles(true,
4095  session_ptr->get_currentUser().isSuper,
4096  session_ptr->get_currentUser().userName);
4097  for (const auto& grantee : grantees) {
4098  DBObject* object_found;
4099  auto* gr = SysCatalog::instance().getGrantee(grantee);
4100  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
4101  const auto obj_privs = object_found->getPrivileges();
4102  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4103  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4104  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4105  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4106  }
4107  }
4108  }
4109  dashboard.dashboard_permissions = perms;
4110  if (objects_list.empty() ||
4111  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
4112  dashboard.is_dash_shared = false;
4113  } else {
4114  dashboard.is_dash_shared = true;
4115  }
4116  return dashboard;
4117 }
4118 
4119 int32_t DBHandler::create_dashboard(const TSessionId& session,
4120  const std::string& dashboard_name,
4121  const std::string& dashboard_state,
4122  const std::string& image_hash,
4123  const std::string& dashboard_metadata) {
4124  auto stdlog = STDLOG(get_session_ptr(session));
4125  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4126  auto session_ptr = stdlog.getConstSessionInfo();
4127  check_read_only("create_dashboard");
4128  auto& cat = session_ptr->getCatalog();
4129 
4130  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
4132  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
4133  }
4134 
4135  auto dash = cat.getMetadataForDashboard(
4136  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
4137  if (dash) {
4138  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4139  }
4140 
4142  dd.dashboardName = dashboard_name;
4143  dd.dashboardState = dashboard_state;
4144  dd.imageHash = image_hash;
4145  dd.dashboardMetadata = dashboard_metadata;
4146  dd.userId = session_ptr->get_currentUser().userId;
4147  dd.user = session_ptr->get_currentUser().userName;
4148 
4149  try {
4150  auto id = cat.createDashboard(dd);
4151  // TODO: transactionally unsafe
4152  SysCatalog::instance().createDBObject(
4153  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
4154  return id;
4155  } catch (const std::exception& e) {
4156  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
4157  }
4158 }
4159 
4160 void DBHandler::replace_dashboard(const TSessionId& session,
4161  const int32_t dashboard_id,
4162  const std::string& dashboard_name,
4163  const std::string& dashboard_owner,
4164  const std::string& dashboard_state,
4165  const std::string& image_hash,
4166  const std::string& dashboard_metadata) {
4167  auto stdlog = STDLOG(get_session_ptr(session));
4168  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4169  auto session_ptr = stdlog.getConstSessionInfo();
4170  CHECK(session_ptr);
4171  check_read_only("replace_dashboard");
4172  auto& cat = session_ptr->getCatalog();
4173 
4175  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
4176  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
4177  }
4178 
4180  dd.dashboardName = dashboard_name;
4181  dd.dashboardState = dashboard_state;
4182  dd.imageHash = image_hash;
4183  dd.dashboardMetadata = dashboard_metadata;
4185  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
4186  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
4187  " does not exist");
4188  }
4189  dd.userId = user.userId;
4190  dd.user = dashboard_owner;
4191  dd.dashboardId = dashboard_id;
4192 
4193  try {
4194  cat.replaceDashboard(dd);
4195  } catch (const std::exception& e) {
4196  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
4197  }
4198 }
4199 
4200 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
4201  delete_dashboards(session, {dashboard_id});
4202 }
4203 
4204 void DBHandler::delete_dashboards(const TSessionId& session,
4205  const std::vector<int32_t>& dashboard_ids) {
4206  auto stdlog = STDLOG(get_session_ptr(session));
4207  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4208  auto session_ptr = stdlog.getConstSessionInfo();
4209  check_read_only("delete_dashboards");
4210  auto& cat = session_ptr->getCatalog();
4211  // Checks will be performed in catalog
4212  try {
4213  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4214  } catch (const std::exception& e) {
4215  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
4216  }
4217 }
4218 
4219 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
4220  int32_t dashboard_id,
4221  std::vector<std::string> groups) {
4222  const auto session_info = get_session_copy(session);
4223  auto& cat = session_info.getCatalog();
4224  auto dash = cat.getMetadataForDashboard(dashboard_id);
4225  if (!dash) {
4226  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
4227  " does not exist");
4228  } else if (session_info.get_currentUser().userId != dash->userId &&
4229  !session_info.get_currentUser().isSuper) {
4230  throw std::runtime_error(
4231  "User should be either owner of dashboard or super user to share/unshare it");
4232  }
4233  std::vector<std::string> valid_groups;
4235  for (auto& group : groups) {
4236  user_meta.isSuper = false; // initialize default flag
4237  if (!SysCatalog::instance().getGrantee(group)) {
4238  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
4239  } else if (!user_meta.isSuper) {
4240  valid_groups.push_back(group);
4241  }
4242  }
4243  return valid_groups;
4244 }
4245 
4246 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
4247  for (auto const& group : groups) {
4248  if (!SysCatalog::instance().getGrantee(group)) {
4249  THROW_MAPD_EXCEPTION("Exception: User/Role '" + group + "' does not exist");
4250  }
4251  }
4252 }
4253 
4255  const Catalog_Namespace::SessionInfo& session_info,
4256  const std::vector<int32_t>& dashboard_ids) {
4257  auto& cat = session_info.getCatalog();
4258  std::map<std::string, std::list<int32_t>> errors;
4259  for (auto const& dashboard_id : dashboard_ids) {
4260  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
4261  if (!dashboard) {
4262  errors["Dashboard id does not exist"].push_back(dashboard_id);
4263  } else if (session_info.get_currentUser().userId != dashboard->userId &&
4264  !session_info.get_currentUser().isSuper) {
4265  errors["User should be either owner of dashboard or super user to share/unshare it"]
4266  .push_back(dashboard_id);
4267  }
4268  }
4269  if (!errors.empty()) {
4270  std::stringstream error_stream;
4271  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
4272  for (const auto& [error, id_list] : errors) {
4273  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
4274  }
4275  THROW_MAPD_EXCEPTION(error_stream.str());
4276  }
4277 }
4278 
4279 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
4280  const std::vector<int32_t>& dashboard_ids,
4281  const std::vector<std::string>& groups,
4282  const TDashboardPermissions& permissions,
4283  const bool do_share) {
4284  auto stdlog = STDLOG(get_session_ptr(session));
4285  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4286  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
4287  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
4288  !permissions.view_) {
4289  THROW_MAPD_EXCEPTION("At least one privilege should be assigned for " +
4290  std::string(do_share ? "grants" : "revokes"));
4291  }
4292  auto session_ptr = stdlog.getConstSessionInfo();
4293  auto const& catalog = session_ptr->getCatalog();
4294  auto& sys_catalog = SysCatalog::instance();
4295  validateGroups(groups);
4296  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
4297  std::vector<DBObject> batch_objects;
4298  for (auto const& dashboard_id : dashboard_ids) {
4299  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
4300  AccessPrivileges privs;
4301  if (permissions.delete_) {
4303  }
4304  if (permissions.create_) {
4306  }
4307  if (permissions.edit_) {
4309  }
4310  if (permissions.view_) {
4312  }
4313  object.setPrivileges(privs);
4314  batch_objects.push_back(object);
4315  }
4316  if (do_share) {
4317  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4318  } else {
4319  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4320  }
4321 }
4322 
4323 void DBHandler::share_dashboards(const TSessionId& session,
4324  const std::vector<int32_t>& dashboard_ids,
4325  const std::vector<std::string>& groups,
4326  const TDashboardPermissions& permissions) {
4327  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
4328 }
4329 
4330 // NOOP: Grants not available for objects as of now
4331 void DBHandler::share_dashboard(const TSessionId& session,
4332  const int32_t dashboard_id,
4333  const std::vector<std::string>& groups,
4334  const std::vector<std::string>& objects,
4335  const TDashboardPermissions& permissions,
4336  const bool grant_role = false) {
4337  share_dashboards(session, {dashboard_id}, groups, permissions);
4338 }
4339 
4340 void DBHandler::unshare_dashboards(const TSessionId& session,
4341  const std::vector<int32_t>& dashboard_ids,
4342  const std::vector<std::string>& groups,
4343  const TDashboardPermissions& permissions) {
4344  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
4345 }
4346 
4347 void DBHandler::unshare_dashboard(const TSessionId& session,
4348  const int32_t dashboard_id,
4349  const std::vector<std::string>& groups,
4350  const std::vector<std::string>& objects,
4351  const TDashboardPermissions& permissions) {
4352  unshare_dashboards(session, {dashboard_id}, groups, permissions);
4353 }
4354 
4356  std::vector<TDashboardGrantees>& dashboard_grantees,
4357  const TSessionId& session,
4358  const int32_t dashboard_id) {
4359  auto stdlog = STDLOG(get_session_ptr(session));
4360  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4361  auto session_ptr = stdlog.getConstSessionInfo();
4362  auto const& cat = session_ptr->getCatalog();
4364  auto dash = cat.getMetadataForDashboard(dashboard_id);
4365  if (!dash) {
4366  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
4367  " does not exist");
4368  } else if (session_ptr->get_currentUser().userId != dash->userId &&
4369  !session_ptr->get_currentUser().isSuper) {
4371  "User should be either owner of dashboard or super user to access grantees");
4372  }
4373  std::vector<ObjectRoleDescriptor*> objectsList;
4374  objectsList = SysCatalog::instance().getMetadataForObject(
4375  cat.getCurrentDB().dbId,
4376  static_cast<int>(DBObjectType::DashboardDBObjectType),
4377  dashboard_id); // By default objecttypecan be only dashabaords
4378  user_meta.userId = -1;
4379  user_meta.userName = "";
4380  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4381  for (auto object : objectsList) {
4382  if (user_meta.userName == object->roleName) {
4383  // Mask owner
4384  continue;
4385  }
4386  TDashboardGrantees grantee;
4387  TDashboardPermissions perm;
4388  grantee.name = object->roleName;
4389  grantee.is_user = object->roleType;
4390  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4391  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4392  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4393  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4394  grantee.permissions = perm;
4395  dashboard_grantees.push_back(grantee);
4396  }
4397 }
4398 
4399 void DBHandler::create_link(std::string& _return,
4400  const TSessionId& session,
4401  const std::string& view_state,
4402  const std::string& view_metadata) {
4403  auto stdlog = STDLOG(get_session_ptr(session));
4404  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4405  auto session_ptr = stdlog.getConstSessionInfo();
4406  // check_read_only("create_link");
4407  auto& cat = session_ptr->getCatalog();
4408 
4409  LinkDescriptor ld;
4410  ld.userId = session_ptr->get_currentUser().userId;
4411  ld.viewState = view_state;
4412  ld.viewMetadata = view_metadata;
4413 
4414  try {
4415  _return = cat.createLink(ld, 6);
4416  } catch (const std::exception& e) {
4417  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
4418  }
4419 }
4420 
4422  const std::string& name,
4423  const bool is_array) {
4424  TColumnType ct;
4425  ct.col_name = name;
4426  ct.col_type.type = type;
4427  ct.col_type.is_array = is_array;
4428  return ct;
4429 }
4430 
4431 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
4432  const import_export::CopyParams& copy_params) {
4433  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
4434  if (std::find(shp_ext.begin(),
4435  shp_ext.end(),
4436  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
4437  shp_ext.end()) {
4438  for (auto ext : shp_ext) {
4439  auto aux_file = file_path;
4441  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
4442  copy_params) &&
4444  aux_file.replace_extension(ext).string(), copy_params)) {
4445  throw std::runtime_error("required file for shapefile does not exist: " +
4446  aux_file.filename().string());
4447  }
4448  }
4449  }
4450 }
4451 
4452 void DBHandler::create_table(const TSessionId& session,
4453  const std::string& table_name,
4454  const TRowDescriptor& rd,
4455  const TFileType::type file_type,
4456  const TCreateParams& create_params) {
4457  auto stdlog = STDLOG("table_name", table_name);
4458  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4459  check_read_only("create_table");
4460 
4461  if (ImportHelpers::is_reserved_name(table_name)) {
4462  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
4463  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
4464  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
4465  }
4466 
4467  auto rds = rd;
4468 
4469  // no longer need to manually add the poly column for a TFileType::POLYGON table
4470  // a column of the correct geo type has already been added
4471  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
4472 
4473  std::string stmt{"CREATE TABLE " + table_name};
4474  std::vector<std::string> col_stmts;
4475 
4476  for (auto col : rds) {
4477  if (ImportHelpers::is_reserved_name(col.col_name)) {
4478  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
4479  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
4480  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
4481  }
4482  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
4483  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
4484  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
4485  " for column: " + col.col_name);
4486  }
4487 
4488  if (col.col_type.type == TDatumType::DECIMAL) {
4489  // if no precision or scale passed in set to default 14,7
4490  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
4491  col.col_type.precision = 14;
4492  col.col_type.scale = 7;
4493  }
4494  }
4495 
4496  std::string col_stmt;
4497  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
4498 
4499  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
4500  // `nullable` argument, leading this to default to false. Uncomment for v2.
4501  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
4502 
4503  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
4504  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
4505  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
4506  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
4507  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
4508  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
4509  }
4510  } else if (col.col_type.type == TDatumType::STR) {
4511  // non DICT encoded strings
4512  col_stmt.append(" ENCODING NONE");
4513  } else if (col.col_type.type == TDatumType::POINT ||
4514  col.col_type.type == TDatumType::LINESTRING ||
4515  col.col_type.type == TDatumType::POLYGON ||
4516  col.col_type.type == TDatumType::MULTIPOLYGON) {
4517  // non encoded compressable geo
4518  if (col.col_type.scale == 4326) {
4519  col_stmt.append(" ENCODING NONE");
4520  }
4521  }
4522  col_stmts.push_back(col_stmt);
4523  }
4524 
4525  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
4526 
4527  if (create_params.is_replicated) {
4528  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
4529  }
4530 
4531  stmt.append(";");
4532 
4533  TQueryResult ret;
4534  sql_execute(ret, session, stmt, true, "", -1, -1);
4535 }
4536 
4537 void DBHandler::import_table(const TSessionId& session,
4538  const std::string& table_name,
4539  const std::string& file_name_in,
4540  const TCopyParams& cp) {
4541  try {
4542  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4543  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4544  auto session_ptr = stdlog.getConstSessionInfo();
4545  check_read_only("import_table");
4546  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
4547 
4548  auto& cat = session_ptr->getCatalog();
4550  auto start_time = ::toString(std::chrono::system_clock::now());
4552  executor->enrollQuerySession(session,
4553  "IMPORT_TABLE",
4554  start_time,
4556  QuerySessionStatus::QueryStatus::RUNNING);
4557  }
4558 
4559  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
4560  // reset the runtime query interrupt status
4562  executor->clearQuerySessionStatus(session, start_time, false);
4563  }
4564  };
4565  const auto td_with_lock =
4567  cat, table_name);
4568  const auto td = td_with_lock();
4569  CHECK(td);
4570  check_table_load_privileges(*session_ptr, table_name);
4571 
4572  std::string file_name{file_name_in};
4573  auto file_path = boost::filesystem::path(file_name);
4575  if (!boost::istarts_with(file_name, "s3://")) {
4576  if (!boost::filesystem::path(file_name).is_absolute()) {
4577  file_path = import_path_ / picosha2::hash256_hex_string(session) /
4578  boost::filesystem::path(file_name).filename();
4579  file_name = file_path.string();
4580  }
4581  if (!boost::filesystem::exists(file_path)) {
4582  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
4583  }
4584  }
4586 
4587  // TODO(andrew): add delimiter detection to Importer
4588  if (copy_params.delimiter == '\0') {
4589  copy_params.delimiter = ',';
4590  if (boost::filesystem::extension(file_path) == ".tsv") {
4591  copy_params.delimiter = '\t';
4592  }
4593  }
4594 
4595  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
4596  session_ptr->getCatalog(), table_name);
4597  std::unique_ptr<import_export::Importer> importer;
4598  if (leaf_aggregator_.leafCount() > 0) {
4599  importer.reset(new import_export::Importer(
4600  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4601  file_path.string(),
4602  copy_params));
4603  } else {
4604  importer.reset(
4605  new import_export::Importer(cat, td, file_path.string(), copy_params));
4606  }
4607  auto ms = measure<>::execution([&]() { importer->import(session_ptr.get()); });
4608  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
4609  } catch (const std::exception& e) {
4610  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
4611  }
4612 }
4613 
4614 namespace {
4615 
4616 // helper functions for error checking below
4617 // these would usefully be added as methods of TDatumType
4618 // but that's not possible as it's auto-generated by Thrift
4619 
4621  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
4623 }
4624 
4625 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4626 
4627 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
4628  std::stringstream ss;
4629  ss << t;
4630  return ss.str();
4631 }
4632 
4633 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
4634  std::string result;
4635  switch (p) {
4636  case SQLTypes::kGEOGRAPHY:
4637  result = "GEOGRAPHY";
4638  break;
4639  case SQLTypes::kGEOMETRY:
4640  result = "GEOMETRY";
4641  break;
4642  default:
4643  result = "INVALID";
4644  break;
4645  }
4646  return result;
4647 }
4648 
4649 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
4650  std::stringstream ss;
4651  ss << t;
4652  return ss.str();
4653 }
4654 
4655 #endif
4656 
4657 } // namespace
4658 
4659 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
4660  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
4661  "' to table '" + table_name + "'. Column '" + cd->columnName + \
4662  "' " + attr + " mismatch (got '" + got + "', expected '" + \
4663  expected + "')");
4664 
4665 void DBHandler::import_geo_table(const TSessionId& session,
4666  const std::string& table_name,
4667  const std::string& file_name_in,
4668  const TCopyParams& cp,
4669  const TRowDescriptor& row_desc,
4670  const TCreateParams& create_params) {
4671  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4672  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4673  auto session_ptr = stdlog.getConstSessionInfo();
4674  check_read_only("import_table");
4675 
4676  auto& cat = session_ptr->getCatalog();
4678  auto start_time = ::toString(std::chrono::system_clock::now());
4680  executor->enrollQuerySession(session,
4681  "IMPORT_GEO_TABLE",
4682  start_time,
4684  QuerySessionStatus::QueryStatus::RUNNING);
4685  }
4686 
4687  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
4688  // reset the runtime query interrupt status
4690  executor->clearQuerySessionStatus(session, start_time, false);
4691  }
4692  };
4693 
4695 
4696  std::string file_name{file_name_in};
4697 
4698  if (path_is_relative(file_name)) {
4699  // assume relative paths are relative to data_path / mapd_import / <session>
4700  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4701  boost::filesystem::path(file_name).filename();
4702  file_name = file_path.string();
4703  }
4705 
4706  if (is_a_supported_geo_file(file_name, true)) {
4707  // prepare to load geo file directly
4708  add_vsi_network_prefix(file_name);
4709  add_vsi_geo_prefix(file_name);
4710  } else if (is_a_supported_archive_file(file_name)) {
4711  // find the archive file
4712  add_vsi_network_prefix(file_name);
4713  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4714  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4715  }
4716  // find geo file in archive
4717  add_vsi_archive_prefix(file_name);
4718  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4719  // prepare to load that geo file
4720  if (geo_file.size()) {
4721  file_name = file_name + std::string("/") + geo_file;
4722  }
4723  } else {
4724  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4725  file_name_in);
4726  }
4727 
4728  // log what we're about to try to do
4729  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
4730  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
4731 
4732  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
4733  auto file_path = boost::filesystem::path(file_name);
4734  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4735  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
4736  }
4737 
4738  // use GDAL to check any dependent files exist (ditto)
4739  try {
4740  check_geospatial_files(file_path, copy_params);
4741  } catch (const std::exception& e) {
4742  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4743  }
4744 
4745  // get layer info and deconstruct
4746  // in general, we will get a combination of layers of these four types:
4747  // EMPTY: no rows, report and skip
4748  // GEO: create a geo table from this
4749  // NON_GEO: create a regular table from this
4750  // UNSUPPORTED_GEO: report and skip
4751  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
4752  try {
4753  layer_info = import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4754  } catch (const std::exception& e) {
4755  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4756  }
4757 
4758  // categorize the results
4759  using LayerNameToContentsMap =
4760  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
4761  LayerNameToContentsMap load_layers;
4762  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
4763  for (const auto& layer : layer_info) {
4764  switch (layer.contents) {
4766  LOG(INFO) << "import_geo_table: '" << layer.name
4767  << "' (will import as geo table)";
4768  load_layers[layer.name] = layer.contents;
4769  break;
4771  LOG(INFO) << "import_geo_table: '" << layer.name
4772  << "' (will import as regular table)";
4773  load_layers[layer.name] = layer.contents;
4774  break;
4776  LOG(WARNING) << "import_geo_table: '" << layer.name
4777  << "' (will not import, unsupported geo type)";
4778  break;
4780  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
4781  break;
4782  default:
4783  break;
4784  }
4785  }
4786 
4787  // if nothing is loadable, stop now
4788  if (load_layers.size() == 0) {
4789  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
4790  }
4791 
4792  // if we've been given an explicit layer name, check that it exists and is loadable
4793  // scan the original list, as it may exist but not have been gathered as loadable
4794  if (copy_params.geo_layer_name.size()) {
4795  bool found = false;
4796  for (const auto& layer : layer_info) {
4797  if (copy_params.geo_layer_name == layer.name) {
4800  // forget all the other layers and just load this one
4801  load_layers.clear();
4802  load_layers[layer.name] = layer.contents;
4803  found = true;
4804  break;
4805  } else if (layer.contents ==
4807  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4808  copy_params.geo_layer_name +
4809  "' has unsupported geo type!");
4810  } else if (layer.contents ==
4812  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4813  copy_params.geo_layer_name + "' is empty!");
4814  }
4815  }
4816  }
4817  if (!found) {
4818  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4819  copy_params.geo_layer_name + "' not found!");
4820  }
4821  }
4822 
4823  // Immerse import of multiple layers is not yet supported
4824  // @TODO fix this!
4825  if (row_desc.size() > 0 && load_layers.size() > 1) {
4827  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
4828  }
4829 
4830  // one definition of layer table name construction
4831  // we append the layer name if we're loading more than one table
4832  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
4833  const std::string& layer_name) {
4834  if (load_layers.size() > 1) {
4835  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
4836  if (sanitized_layer_name != layer_name) {
4837  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
4838  << sanitized_layer_name << "' for table name";
4839  }
4840  return table_name + "_" + sanitized_layer_name;
4841  }
4842  return table_name;
4843  };
4844 
4845  // if we're importing multiple tables, then NONE of them must exist already
4846  if (load_layers.size() > 1) {
4847  for (const auto& layer : load_layers) {
4848  // construct table name
4849  auto this_table_name = construct_layer_table_name(table_name, layer.first);
4850 
4851  // table must not exist
4852  if (cat.getMetadataForTable(this_table_name)) {
4853  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
4854  "' already exists, aborting!");
4855  }
4856  }
4857  }
4858 
4859  // prepare to gather errors that would otherwise be exceptions, as we can only throw
4860  // one
4861  std::vector<std::string> caught_exception_messages;
4862 
4863  // prepare to time multi-layer import
4864  double total_import_ms = 0.0;
4865 
4866  // now we're safe to start importing
4867  // we loop over the layers we're going to attempt to load
4868  for (const auto& layer : load_layers) {
4869  // unpack
4870  const auto& layer_name = layer.first;
4871  const auto& layer_contents = layer.second;
4872  bool is_geo_layer =
4874 
4875  // construct table name again
4876  auto this_table_name = construct_layer_table_name(table_name, layer_name);
4877 
4878  // report
4879  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
4880 
4881  // we need a row descriptor
4882  TRowDescriptor rd;
4883  if (row_desc.size() > 0) {
4884  // we have a valid RowDescriptor
4885  // this is the case where Immerse has already detected and created
4886  // all we need to do is import and trust that the data will match
4887  // use the provided row descriptor
4888  // table must already exist (we check this below)
4889  rd = row_desc;
4890  } else {
4891  // we don't have a RowDescriptor
4892  // we have to detect the file ourselves
4893  TDetectResult cds;
4894  TCopyParams cp_copy = cp; // retain S3 auth tokens
4895  cp_copy.geo_layer_name = layer_name;
4896  cp_copy.file_type = TFileType::POLYGON;
4897  try {
4898  detect_column_types(cds, session, file_name_in, cp_copy);
4899  } catch (const std::exception& e) {
4900  // capture the error and abort this layer
4901  caught_exception_messages.emplace_back(
4902  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
4903  continue;
4904  }
4905  rd = cds.row_set.row_desc;
4906 
4907  // then, if the table does NOT already exist, create it
4908  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4909  if (!td) {
4910  try {
4911  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
4912  } catch (const std::exception& e) {
4913  // capture the error and abort this layer
4914  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
4915  layer_name + "':" + e.what());
4916  continue;
4917  }
4918  }
4919  }
4920 
4921  // by this point, the table should exist, one way or another
4922  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4923  if (!td) {
4924  // capture the error and abort this layer
4925  std::string exception_message =
4926  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
4927  this_table_name + "'; table does not exist or failed to create.";
4928  caught_exception_messages.emplace_back(exception_message);
4929  continue;
4930  }
4931 
4932  // then, we have to verify that the structure matches
4933  // get column descriptors (non-system, non-deleted, logical columns only)
4934  const auto col_descriptors =
4935  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4936 
4937  // first, compare the column count
4938  if (col_descriptors.size() != rd.size()) {
4939  // capture the error and abort this layer
4940  std::string exception_message =
4941  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
4942  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
4943  ", expecting " + std::to_string(col_descriptors.size()) + ")";
4944  caught_exception_messages.emplace_back(exception_message);
4945  continue;
4946  }
4947 
4948  try {
4949  // then the names and types
4950  int rd_index = 0;
4951  for (auto cd : col_descriptors) {
4952  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
4953  std::string gname = rd[rd_index].col_name; // got
4954  std::string ename = cd->columnName; // expecting
4955 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4956  TTypeInfo gti = rd[rd_index].col_type; // got
4957 #endif
4958  TTypeInfo eti = cd_col_type.col_type; // expecting
4959  // check for name match
4960  if (gname != ename) {
4961  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
4962  gname == OMNISCI_GEO_PREFIX) {
4963  // rename incoming geo column to match existing legacy default geo column
4964  rd[rd_index].col_name = gname;
4965  LOG(INFO)
4966  << "import_geo_table: Renaming incoming geo column to match existing "
4967  "legacy default geo column";
4968  } else {
4969  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
4970  }
4971  }
4972 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4973  // check for type attributes match
4974  // these attrs must always match regardless of type
4975  if (gti.type != eti.type) {
4977  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
4978  }
4979  if (gti.is_array != eti.is_array) {
4981  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
4982  }
4983  if (gti.nullable != eti.nullable) {
4985  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
4986  }
4987  if (TTypeInfo_IsGeo(eti.type)) {
4988  // for geo, only these other attrs must also match
4989  // encoding and comp_param are allowed to differ
4990  // this allows appending to existing geo table
4991  // without needing to know the existing encoding
4992  if (gti.precision != eti.precision) {
4994  "geo sub-type",
4995  TTypeInfo_GeoSubTypeToString(gti.precision),
4996  TTypeInfo_GeoSubTypeToString(eti.precision));
4997  }
4998  if (gti.scale != eti.scale) {
5000  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
5001  }
5002  if (gti.encoding != eti.encoding) {
5003  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
5004  }
5005  if (gti.comp_param != eti.comp_param) {
5006  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
5007  }
5008  } else {
5009  // non-geo, all other attrs must also match
5010  // @TODO consider relaxing some of these dependent on type
5011  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
5012  if (gti.precision != eti.precision) {
5014  std::to_string(gti.precision),
5015  std::to_string(eti.precision));
5016  }
5017  if (gti.scale != eti.scale) {
5019  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
5020  }
5021  if (gti.encoding != eti.encoding) {
5023  "encoding",
5024  TTypeInfo_EncodingToString(gti.encoding),
5025  TTypeInfo_EncodingToString(eti.encoding));
5026  }
5027  if (gti.comp_param != eti.comp_param) {
5029  std::to_string(gti.comp_param),
5030  std::to_string(eti.comp_param));
5031  }
5032  }
5033 #endif
5034  rd_index++;
5035  }
5036  } catch (const std::exception& e) {
5037  // capture the error and abort this layer
5038  caught_exception_messages.emplace_back(e.what());
5039  continue;
5040  }
5041 
5042  std::map<std::string, std::string> colname_to_src;
5043  for (auto r : rd) {
5044  colname_to_src[r.col_name] =
5045  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
5046  }
5047 
5048  try {
5049  check_table_load_privileges(*session_ptr, this_table_name);
5050  } catch (const std::exception& e) {
5051  // capture the error and abort this layer
5052  caught_exception_messages.emplace_back(e.what());
5053  continue;
5054  }
5055 
5056  if (is_geo_layer) {
5057  // Final check to ensure that we actually have a geo column
5058  // of the expected name and type before doing the actual import,
5059  // in case the user naively overrode the name or type in Immerse
5060  // Preview (which as of 6/8/18 it still allows you to do).
5061  // This avoids a fatal assert later when it fails to find the
5062  // column. We should make Immerse more robust and disallow this.
5063  bool have_geo_column_with_correct_name = false;
5064  for (const auto& r : rd) {
5065  if (TTypeInfo_IsGeo(r.col_type.type)) {
5066  // TODO(team): allow user to override the geo column name
5067  if (r.col_name == OMNISCI_GEO_PREFIX) {
5068  have_geo_column_with_correct_name = true;
5069  } else if (r.col_name == LEGACY_GEO_PREFIX) {
5070  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
5071  // Normalize column names for geo append with legacy column naming scheme
5072  colname_to_src[r.col_name] = r.col_name;
5073  have_geo_column_with_correct_name = true;
5074  }
5075  }
5076  }
5077  if (!have_geo_column_with_correct_name) {
5078  std::string exception_message = "Table " + this_table_name +
5079  " does not have a geo column with name '" +
5080  OMNISCI_GEO_PREFIX + "'. Import aborted!";
5081  caught_exception_messages.emplace_back(exception_message);
5082  continue;
5083  }
5084  }
5085 
5086  try {
5087  // import this layer only?
5088  copy_params.geo_layer_name = layer_name;
5089 
5090  // create an importer
5091  std::unique_ptr<import_export::Importer> importer;
5092  if (leaf_aggregator_.leafCount() > 0) {
5093  importer.reset(new import_export::Importer(
5094  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
5095  file_path.string(),
5096  copy_params));
5097  } else {
5098  importer.reset(
5099  new import_export::Importer(cat, td, file_path.string(), copy_params));
5100  }
5101 
5102  // import
5103  auto ms = measure<>::execution(
5104  [&]() { importer->importGDAL(colname_to_src, session_ptr.get()); });
5105  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
5106  << "s";
5107  total_import_ms += ms;
5108  } catch (const std::exception& e) {
5109  std::string exception_message =
5110  "Import of Layer '" + this_table_name + "' failed: " + e.what();
5111  caught_exception_messages.emplace_back(exception_message);
5112  continue;
5113  }
5114  }
5115 
5116  // did we catch any exceptions?
5117  if (caught_exception_messages.size()) {
5118  // combine all the strings into one and throw a single Thrift exception
5119  std::string combined_exception_message = "Failed to import geo file:\n";
5120  for (const auto& message : caught_exception_messages) {
5121  combined_exception_message += message + "\n";
5122  }
5123  THROW_MAPD_EXCEPTION(combined_exception_message);
5124  } else {
5125  // report success and total time
5126  LOG(INFO) << "Import Successful!";
5127  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
5128  }
5129 }
5130 
5131 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
5132 
5133 void DBHandler::import_table_status(TImportStatus& _return,
5134  const TSessionId& session,
5135  const std::string& import_id) {
5136  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
5137  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5138  auto is = import_export::Importer::get_import_status(import_id);
5139  _return.elapsed = is.elapsed.count();
5140  _return.rows_completed = is.rows_completed;
5141  _return.rows_estimated = is.rows_estimated;
5142  _return.rows_rejected = is.rows_rejected;
5143 }
5144 
5146  const TSessionId& session,
5147  const std::string& archive_path_in,
5148  const TCopyParams& copy_params) {
5149  auto stdlog =
5150  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
5151  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5152 
5153  std::string archive_path(archive_path_in);
5154 
5155  if (path_is_relative(archive_path)) {
5156  // assume relative paths are relative to data_path / mapd_import / <session>
5157  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5158  boost::filesystem::path(archive_path).filename();
5159  archive_path = file_path.string();
5160  }
5161  validate_import_file_path_if_local(archive_path);
5162 
5163  if (is_a_supported_archive_file(archive_path)) {
5164  // find the archive file
5165  add_vsi_network_prefix(archive_path);
5166  if (!import_export::Importer::gdalFileExists(archive_path,
5167  thrift_to_copyparams(copy_params))) {
5168  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
5169  }
5170  // find geo file in archive
5171  add_vsi_archive_prefix(archive_path);
5172  std::string geo_file =
5173  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
5174  // what did we get?
5175  if (geo_file.size()) {
5176  // prepend it with the original path
5177  _return = archive_path_in + std::string("/") + geo_file;
5178  } else {
5179  // just return the original path
5180  _return = archive_path_in;
5181  }
5182  } else {
5183  // just return the original path
5184  _return = archive_path_in;
5185  }
5186 }
5187 
5188 void DBHandler::get_all_files_in_archive(std::vector<std::string>& _return,
5189  const TSessionId& session,
5190  const std::string& archive_path_in,
5191  const TCopyParams& copy_params) {
5192  auto stdlog =
5193  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
5194  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5195 
5196  std::string archive_path(archive_path_in);
5197  if (path_is_relative(archive_path)) {
5198  // assume relative paths are relative to data_path / mapd_import / <session>
5199  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5200  boost::filesystem::path(archive_path).filename();
5201  archive_path = file_path.string();
5202  }
5203  validate_import_file_path_if_local(archive_path);
5204 
5205  if (is_a_supported_archive_file(archive_path)) {
5206  // find the archive file
5207  add_vsi_network_prefix(archive_path);
5208  if (!import_export::Importer::gdalFileExists(archive_path,
5209  thrift_to_copyparams(copy_params))) {
5210  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
5211  }
5212  // find all files in archive
5213  add_vsi_archive_prefix(archive_path);
5215  archive_path, thrift_to_copyparams(copy_params));
5216  // prepend them all with original path
5217  for (auto& s : _return) {
5218  s = archive_path_in + '/' + s;
5219  }
5220  }
5221 }
5222 
5223 void DBHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
5224  const TSessionId& session,
5225  const std::string& file_name_in,
5226  const TCopyParams& cp) {
5227  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
5228  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5229 
5230  std::string file_name(file_name_in);
5231 
5233 
5234  // handle relative paths
5235  if (path_is_relative(file_name)) {
5236  // assume relative paths are relative to data_path / mapd_import / <session>
5237  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5238  boost::filesystem::path(file_name).filename();
5239  file_name = file_path.string();
5240  }
5242 
5243  // validate file_name
5244  if (is_a_supported_geo_file(file_name, true)) {
5245  // prepare to load geo file directly
5246  add_vsi_network_prefix(file_name);
5247  add_vsi_geo_prefix(file_name);
5248  } else if (is_a_supported_archive_file(file_name)) {
5249  // find the archive file
5250  add_vsi_network_prefix(file_name);
5251  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
5252  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
5253  }
5254  // find geo file in archive
5255  add_vsi_archive_prefix(file_name);
5256  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
5257  // prepare to load that geo file
5258  if (geo_file.size()) {
5259  file_name = file_name + std::string("/") + geo_file;
5260  }
5261  } else {
5262  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
5263  file_name_in);
5264  }
5265 
5266  // check the file actually exists
5267  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
5268  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
5269  }
5270 
5271  // find all layers
5272  auto internal_layer_info =
5273  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
5274 
5275  // convert to Thrift type
5276  for (const auto& internal_layer : internal_layer_info) {
5277  TGeoFileLayerInfo layer;
5278  layer.name = internal_layer.name;
5279  switch (internal_layer.contents) {
5281  layer.contents = TGeoFileLayerContents::EMPTY;
5282  break;
5284  layer.contents = TGeoFileLayerContents::GEO;
5285  break;
5287  layer.contents = TGeoFileLayerContents::NON_GEO;
5288  break;
5290  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
5291  break;
5292  default:
5293  CHECK(false);
5294  }
5295  _return.emplace_back(layer); // no suitable constructor to just pass parameters
5296  }
5297 }
5298 
5299 void DBHandler::start_heap_profile(const TSessionId& session) {
5300  auto stdlog = STDLOG(get_session_ptr(session));
5301 #ifdef HAVE_PROFILER
5302  if (IsHeapProfilerRunning()) {
5303  THROW_MAPD_EXCEPTION("Profiler already started");
5304  }
5305  HeapProfilerStart("omnisci");
5306 #else
5307  THROW_MAPD_EXCEPTION("Profiler not enabled");
5308 #endif // HAVE_PROFILER
5309 }
5310 
5311 void DBHandler::stop_heap_profile(const TSessionId& session) {
5312  auto stdlog = STDLOG(get_session_ptr(session));
5313 #ifdef HAVE_PROFILER
5314  if (!IsHeapProfilerRunning()) {
5315  THROW_MAPD_EXCEPTION("Profiler not running");
5316  }
5317  HeapProfilerStop();
5318 #else
5319  THROW_MAPD_EXCEPTION("Profiler not enabled");
5320 #endif // HAVE_PROFILER
5321 }
5322 
5323 void DBHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
5324  auto stdlog = STDLOG(get_session_ptr(session));
5325 #ifdef HAVE_PROFILER
5326  if (!IsHeapProfilerRunning()) {
5327  THROW_MAPD_EXCEPTION("Profiler not running");
5328  }
5329  auto profile_buff = GetHeapProfile();
5330  profile = profile_buff;
5331  free(profile_buff);
5332 #else
5333  THROW_MAPD_EXCEPTION("Profiler not enabled");
5334 #endif // HAVE_PROFILER
5335 }
5336 
5337 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
5338 void DBHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
5339  if (session_it->second.use_count() > 2 ||
5340  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
5341  // SessionInfo is being used in more than one active operation. Original copy + one
5342  // stored in StdLog. Skip the checks.
5343  return;
5344  }
5345  time_t last_used_time = session_it->second->get_last_used_time();
5346  time_t start_time = session_it->second->get_start_time();
5347  const auto current_session_duration = time(0) - last_used_time;
5348  if (current_session_duration > idle_session_duration_) {
5349  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
5350  << " idle duration " << current_session_duration
5351  << " seconds exceeds maximum idle duration " << idle_session_duration_
5352  << " seconds. Invalidating session.";
5353  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
5354  }
5355  const auto total_session_duration = time(0) - start_time;
5356  if (total_session_duration > max_session_duration_) {
5357  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
5358  << " total duration " << total_session_duration
5359  << " seconds exceeds maximum total session duration "
5360  << max_session_duration_ << " seconds. Invalidating session.";
5361  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
5362  }
5363 }
5364 
5365 std::shared_ptr<const Catalog_Namespace::SessionInfo> DBHandler::get_const_session_ptr(
5366  const TSessionId& session) {
5367  return get_session_ptr(session);
5368 }
5369 
5371  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
5372  return *get_session_it_unsafe(session, read_lock)->second;
5373 }
5374 
5375 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_copy_ptr(
5376  const TSessionId& session) {
5377  // Note(Wamsi): We have `get_const_session_ptr` which would return as const
5378  // SessionInfo stored in the map. You can use `get_const_session_ptr` instead of the
5379  // copy of SessionInfo but beware that it can be changed in teh map. So if you do not
5380  // care about the changes then use `get_const_session_ptr` if you do then use this
5381  // function to get a copy. We should eventually aim to merge both
5382  // `get_const_session_ptr` and `get_session_copy_ptr`.
5383  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
5384  auto& session_info_ref = *get_session_it_unsafe(session, read_lock)->second;
5385  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
5386 }
5387 
5388 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_ptr(
5389  const TSessionId& session_id) {
5390  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
5391  // Should be used only when you need to make updates to original SessionInfo object.
5392  // Currently used by `update_session_last_used_duration`
5393 
5394  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will
5395  // be invalid during disconnect. SessionInfo will be erased from map by the time it
5396  // reaches here. In both the above cases, we would return `nullptr` and can skip
5397  // SessionInfo updates.
5398  if (session_id.empty()) {
5399  return {};
5400  }
5401  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
5402  return get_session_it_unsafe(session_id, read_lock)->second;
5403 }
5404 
5406  const Catalog_Namespace::SessionInfo& session_info,
5407  const std::string& table_name) {
5408  auto user_metadata = session_info.get_currentUser();
5409  auto& cat = session_info.getCatalog();
5410  DBObject dbObject(table_name, TableDBObjectType);
5411  dbObject.loadKey(cat);
5413  std::vector<DBObject> privObjects;
5414  privObjects.push_back(dbObject);
5415  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
5416  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
5417  user_metadata.userLoggable() +
5418  " has no insert privileges for table " + table_name + ".");
5419  }
5420 }
5421 
5422 void DBHandler::check_table_load_privileges(const TSessionId& session,
5423  const std::string& table_name) {
5424  const auto session_info = get_session_copy(session);
5425  check_table_load_privileges(session_info, table_name);
5426 }
5427 
5429  const TExecuteMode::type mode) {
5430  const std::string user_name = session_ptr->get_currentUser().userLoggable();
5431  switch (mode) {
5432  case TExecuteMode::GPU:
5433  if (cpu_mode_only_) {
5434  TOmniSciException e;
5435  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
5436  throw e;
5437  }
5439  LOG(INFO) << "User " << user_name << " sets GPU mode.";
5440  break;
5441  case TExecuteMode::CPU:
5443  LOG(INFO) << "User " << user_name << " sets CPU mode.";
5444  break;
5445  }
5446 }
5447 
5448 std::vector<PushedDownFilterInfo> DBHandler::execute_rel_alg(
5449  ExecutionResult& _return,
5450  QueryStateProxy query_state_proxy,
5451  const std::string& query_ra,
5452  const bool column_format,
5453  const ExecutorDeviceType executor_device_type,
5454  const int32_t first_n,
5455  const int32_t at_most_n,
5456  const bool just_validate,
5457  const bool find_push_down_candidates,
5458  const ExplainInfo& explain_info,
5459  const std::optional<size_t> executor_index) const {
5460  query_state::Timer timer = query_state_proxy.createTimer(__func__);
5461 
5462  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
5463  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
5464 
5465  const auto& cat = query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog();
5466  auto executor = Executor::getExecutor(
5467  executor_index ? *executor_index : Executor::UNITARY_EXECUTOR_ID,
5468  jit_debug_ ? "/tmp" : "",
5469  jit_debug_ ? "mapdquery" : "",
5471  RelAlgExecutor ra_executor(executor.get(),
5472  cat,
5473  query_ra,
5474  query_state_proxy.getQueryState().shared_from_this());
5475  // handle hints
5476  const auto& query_hints = ra_executor.getParsedQueryHints();
5477  const bool cpu_mode_enabled = query_hints.isHintRegistered(QueryHint::kCpuMode);
5478  CompilationOptions co = {
5479  cpu_mode_enabled ? ExecutorDeviceType::CPU : executor_device_type,
5480  /*hoist_literals=*/true,
5483  /*allow_lazy_fetch=*/true,
5484  /*filter_on_deleted_column=*/true,
5488  auto validate_or_explain_query =
5489  explain_info.justExplain() || explain_info.justCalciteExplain() || just_validate;
5492  explain_info.justExplain(),
5493  allow_loop_joins_ || just_validate,
5495  jit_debug_,
5496  just_validate,
5499  find_push_down_candidates,
5500  explain_info.justCalciteExplain(),
5502  g_enable_runtime_query_interrupt && !validate_or_explain_query &&
5503  !query_state_proxy.getQueryState()
5505  ->get_session_id()