OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: DBHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "DBHandler.h"
25 #include "DistributedLoader.h"
27 #include "TokenCompletionHints.h"
28 
29 #ifdef HAVE_PROFILER
30 #include <gperftools/heap-profiler.h>
31 #endif // HAVE_PROFILER
32 
33 #include "MapDRelease.h"
34 
35 #include "Calcite/Calcite.h"
36 #include "gen-cpp/CalciteServer.h"
37 
39 
40 #include "Catalog/Catalog.h"
45 #include "DistributedHandler.h"
47 #include "Geospatial/GDAL.h"
48 #include "Geospatial/Transforms.h"
49 #include "Geospatial/Types.h"
50 #include "ImportExport/Importer.h"
51 #include "LockMgr/LockMgr.h"
53 #include "Parser/ParserWrapper.h"
55 #include "Parser/parser.h"
58 #include "QueryEngine/Execute.h"
68 #include "Shared/ArrowUtil.h"
69 #include "Shared/StringTransform.h"
70 #include "Shared/import_helpers.h"
72 #include "Shared/measure.h"
73 #include "Shared/scope.h"
74 
75 #include <fcntl.h>
76 #include <picosha2.h>
77 #include <sys/time.h>
78 #include <sys/types.h>
79 #include <sys/wait.h>
80 #include <unistd.h>
81 #include <algorithm>
82 #include <boost/algorithm/string.hpp>
83 #include <boost/filesystem.hpp>
84 #include <boost/make_shared.hpp>
85 #include <boost/process/search_path.hpp>
86 #include <boost/program_options.hpp>
87 #include <boost/regex.hpp>
88 #include <boost/tokenizer.hpp>
89 #include <cmath>
90 #include <csignal>
91 #include <fstream>
92 #include <future>
93 #include <map>
94 #include <memory>
95 #include <random>
96 #include <regex>
97 #include <string>
98 #include <thread>
99 #include <typeinfo>
100 
101 #include <arrow/api.h>
102 #include <arrow/io/api.h>
103 #include <arrow/ipc/api.h>
104 
105 #include "Shared/ArrowUtil.h"
106 
107 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
108 
111 
112 #define INVALID_SESSION_ID ""
113 
114 #define THROW_MAPD_EXCEPTION(errstr) \
115  { \
116  TOmniSciException ex; \
117  ex.error_msg = errstr; \
118  LOG(ERROR) << ex.error_msg; \
119  throw ex; \
120  }
121 
122 thread_local std::string TrackingProcessor::client_address;
124 
125 namespace {
126 
127 SessionMap::iterator get_session_from_map(const TSessionId& session,
128  SessionMap& session_map) {
129  auto session_it = session_map.find(session);
130  if (session_it == session_map.end()) {
131  THROW_MAPD_EXCEPTION("Session not valid.");
132  }
133  return session_it;
134 }
135 
136 struct ForceDisconnect : public std::runtime_error {
137  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
138 };
139 
140 } // namespace
141 
142 template <>
143 SessionMap::iterator DBHandler::get_session_it_unsafe(
144  const TSessionId& session,
145  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
146  auto session_it = get_session_from_map(session, sessions_);
147  try {
148  check_session_exp_unsafe(session_it);
149  } catch (const ForceDisconnect& e) {
150  read_lock.unlock();
151  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
152  auto session_it2 = get_session_from_map(session, sessions_);
153  disconnect_impl(session_it2, write_lock);
154  THROW_MAPD_EXCEPTION(e.what());
155  }
156  return session_it;
157 }
158 
159 template <>
160 SessionMap::iterator DBHandler::get_session_it_unsafe(
161  const TSessionId& session,
162  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
163  auto session_it = get_session_from_map(session, sessions_);
164  try {
165  check_session_exp_unsafe(session_it);
166  } catch (const ForceDisconnect& e) {
167  disconnect_impl(session_it, write_lock);
168  THROW_MAPD_EXCEPTION(e.what());
169  }
170  return session_it;
171 }
172 
173 #ifdef ENABLE_GEOS
174 extern std::unique_ptr<std::string> g_libgeos_so_filename;
175 #endif
176 
177 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
178  const std::vector<LeafHostInfo>& string_leaves,
179  const std::string& base_data_path,
180  const bool cpu_only,
181  const bool allow_multifrag,
182  const bool jit_debug,
183  const bool intel_jit_profile,
184  const bool read_only,
185  const bool allow_loop_joins,
186  const bool enable_rendering,
187  const bool renderer_use_vulkan_driver,
188  const bool enable_auto_clear_render_mem,
189  const int render_oom_retry_threshold,
190  const size_t render_mem_bytes,
191  const size_t max_concurrent_render_sessions,
192  const int num_gpus,
193  const int start_gpu,
194  const size_t reserved_gpu_mem,
195  const bool render_compositor_use_last_gpu,
196  const size_t num_reader_threads,
197  const AuthMetadata& authMetadata,
198  const SystemParameters& system_parameters,
199  const bool legacy_syntax,
200  const int idle_session_duration,
201  const int max_session_duration,
202  const bool enable_runtime_udf_registration,
203  const std::string& udf_filename,
204  const std::string& clang_path,
205  const std::vector<std::string>& clang_options,
206 #ifdef ENABLE_GEOS
207  const std::string& libgeos_so_filename,
208 #endif
209  const DiskCacheConfig& disk_cache_config)
210  : leaf_aggregator_(db_leaves)
211  , string_leaves_(string_leaves)
212  , base_data_path_(base_data_path)
213  , random_gen_(std::random_device{}())
214  , session_id_dist_(0, INT32_MAX)
215  , jit_debug_(jit_debug)
216  , intel_jit_profile_(intel_jit_profile)
217  , allow_multifrag_(allow_multifrag)
218  , read_only_(read_only)
219  , allow_loop_joins_(allow_loop_joins)
220  , authMetadata_(authMetadata)
221  , system_parameters_(system_parameters)
222  , legacy_syntax_(legacy_syntax)
223  , dispatch_queue_(
224  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
225  , super_user_rights_(false)
226  , idle_session_duration_(idle_session_duration * 60)
227  , max_session_duration_(max_session_duration * 60)
228  , runtime_udf_registration_enabled_(enable_runtime_udf_registration) {
229  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
230  // Register foreign storage interfaces here
232  bool is_rendering_enabled = enable_rendering;
233 
234  try {
235  if (cpu_only) {
236  is_rendering_enabled = false;
237  executor_device_type_ = ExecutorDeviceType::CPU;
238  cpu_mode_only_ = true;
239  } else {
240 #ifdef HAVE_CUDA
241  executor_device_type_ = ExecutorDeviceType::GPU;
242  cpu_mode_only_ = false;
243 #else
244  executor_device_type_ = ExecutorDeviceType::CPU;
245  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
246  cpu_mode_only_ = true;
247  is_rendering_enabled = false;
248 #endif
249  }
250  } catch (const std::exception& e) {
251  LOG(FATAL) << "Failed to executor device type: " << e.what();
252  }
253 
254  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
255  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
256  // manage cannot ask for
257  size_t total_reserved = reserved_gpu_mem;
258  if (is_rendering_enabled) {
259  total_reserved += render_mem_bytes;
260  }
261 
262  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
263 #ifdef HAVE_CUDA
264  if (!cpu_mode_only_ || is_rendering_enabled) {
265  try {
266  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(num_gpus, start_gpu);
267  } catch (const std::exception& e) {
268  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
269  << e.what();
270  cpu_mode_only_ = true;
271  is_rendering_enabled = false;
272  }
273  }
274 #endif // HAVE_CUDA
275 
276  try {
277  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
278  system_parameters,
279  std::move(cuda_mgr),
280  !cpu_mode_only_,
281  total_reserved,
282  num_reader_threads,
283  disk_cache_config));
284  } catch (const std::exception& e) {
285  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
286  }
287 
288  std::string udf_ast_filename("");
289 
290  try {
291  if (!udf_filename.empty()) {
292  const auto cuda_mgr = data_mgr_->getCudaMgr();
293  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
294  cuda_mgr ? cuda_mgr->getDeviceArch()
296  UdfCompiler compiler(udf_filename, device_arch, clang_path, clang_options);
297  int compile_result = compiler.compileUdf();
298 
299  if (compile_result == 0) {
300  udf_ast_filename = compiler.getAstFileName();
301  }
302  }
303  } catch (const std::exception& e) {
304  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
305  }
306 
307  try {
308  calcite_ =
309  std::make_shared<Calcite>(system_parameters, base_data_path_, udf_ast_filename);
310  } catch (const std::exception& e) {
311  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
312  }
313 
314  try {
315  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
316  if (!udf_filename.empty()) {
317  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
318  }
319  } catch (const std::exception& e) {
320  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
321  }
322 
323  try {
325  } catch (const std::exception& e) {
326  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
327  }
328 
329  try {
330  auto udtfs = ThriftSerializers::to_thrift(
332  std::vector<TUserDefinedFunction> udfs = {};
333  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
334  } catch (const std::exception& e) {
335  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
336  }
337 
338  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
339  executor_device_type_ = ExecutorDeviceType::CPU;
340  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
341  cpu_mode_only_ = true;
342  }
343 
344  switch (executor_device_type_) {
346  LOG(INFO) << "Started in GPU mode" << std::endl;
347  break;
349  LOG(INFO) << "Started in CPU mode" << std::endl;
350  break;
351  }
352 
353  try {
354  SysCatalog::instance().init(base_data_path_,
355  data_mgr_,
356  authMetadata,
357  calcite_,
358  false,
359  !db_leaves.empty(),
360  string_leaves_);
361  } catch (const std::exception& e) {
362  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
363  }
364 
365  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
366  start_time_ = std::time(nullptr);
367 
368  if (is_rendering_enabled) {
369  try {
370  render_handler_.reset(new RenderHandler(this,
371  render_mem_bytes,
372  max_concurrent_render_sessions,
373  render_compositor_use_last_gpu,
374  false,
375  0,
376  false,
377  system_parameters_));
378  } catch (const std::exception& e) {
379  LOG(ERROR) << "Backend rendering disabled: " << e.what();
380  }
381  }
382 
383  if (leaf_aggregator_.leafCount() > 0) {
384  try {
385  agg_handler_.reset(new MapDAggHandler(this));
386  } catch (const std::exception& e) {
387  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
388  }
389  } else if (g_cluster) {
390  try {
391  leaf_handler_.reset(new MapDLeafHandler(this));
392  } catch (const std::exception& e) {
393  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
394  }
395  }
396 
397 #ifdef ENABLE_GEOS
398  if (!libgeos_so_filename.empty()) {
399  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename));
400  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
401  }
402 #endif
403 }
404 
406 
408  const std::string& query_str,
409  std::list<std::unique_ptr<Parser::Stmt>>& parse_trees) {
410  int num_parse_errors = 0;
411  std::string last_parsed;
412  SQLParser parser;
413  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
414  if (num_parse_errors > 0) {
415  throw std::runtime_error("Syntax error at: " + last_parsed);
416  }
417  if (parse_trees.size() > 1) {
418  throw std::runtime_error("multiple SQL statements not allowed");
419  } else if (parse_trees.size() != 1) {
420  throw std::runtime_error("empty SQL statment not allowed");
421  }
422 }
423 void DBHandler::check_read_only(const std::string& str) {
424  if (DBHandler::read_only_) {
425  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
426  }
427 }
428 
430  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
431  // We would create an in memory session for calcite with super user privileges which
432  // would be used for getting all tables metadata when a user runs the query. The
433  // session would be under the name of a proxy user/password which would only persist
434  // till server's lifetime or execution of calcite query(in memory) whichever is the
435  // earliest.
436  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
437  std::string session_id;
438  do {
439  session_id = generate_random_string(64);
440  } while (sessions_.find(session_id) != sessions_.end());
441  Catalog_Namespace::UserMetadata user_meta(-1,
442  calcite_->getInternalSessionProxyUserName(),
443  calcite_->getInternalSessionProxyPassword(),
444  true,
445  -1,
446  true);
447  const auto emplace_ret =
448  sessions_.emplace(session_id,
449  std::make_shared<Catalog_Namespace::SessionInfo>(
450  catalog_ptr, user_meta, executor_device_type_, session_id));
451  CHECK(emplace_ret.second);
452  return session_id;
453 }
454 
456  const Catalog_Namespace::UserMetadata user_meta) {
457  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
458  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
459  user_meta.isSuper.load();
460 }
461 
462 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
463  // Remove InMemory calcite Session.
464  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
465  const auto it = sessions_.find(session_id);
466  CHECK(it != sessions_.end());
467  sessions_.erase(it);
468 }
469 
470 // internal connection for connections with no password
471 void DBHandler::internal_connect(TSessionId& session,
472  const std::string& username,
473  const std::string& dbname) {
474  auto stdlog = STDLOG(); // session_info set by connect_impl()
475  std::string username2 = username; // login() may reset username given as argument
476  std::string dbname2 = dbname; // login() may reset dbname given as argument
478  std::shared_ptr<Catalog> cat = nullptr;
479  try {
480  cat =
481  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
482  } catch (std::exception& e) {
483  THROW_MAPD_EXCEPTION(e.what());
484  }
485 
486  DBObject dbObject(dbname2, DatabaseDBObjectType);
487  dbObject.loadKey(*cat);
489  std::vector<DBObject> dbObjects;
490  dbObjects.push_back(dbObject);
491  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
492  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
493  " is not allowed to access database " + dbname2 + ".");
494  }
495  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
496 }
497 
498 void DBHandler::krb5_connect(TKrb5Session& session,
499  const std::string& inputToken,
500  const std::string& dbname) {
501  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
502 }
503 
504 void DBHandler::connect(TSessionId& session,
505  const std::string& username,
506  const std::string& passwd,
507  const std::string& dbname) {
508  auto stdlog = STDLOG(); // session_info set by connect_impl()
509  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
510  std::string username2 = username; // login() may reset username given as argument
511  std::string dbname2 = dbname; // login() may reset dbname given as argument
513  std::shared_ptr<Catalog> cat = nullptr;
514  try {
515  cat = SysCatalog::instance().login(
516  dbname2, username2, passwd, user_meta, !super_user_rights_);
517  } catch (std::exception& e) {
518  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
519  THROW_MAPD_EXCEPTION(e.what());
520  }
521 
522  DBObject dbObject(dbname2, DatabaseDBObjectType);
523  dbObject.loadKey(*cat);
525  std::vector<DBObject> dbObjects;
526  dbObjects.push_back(dbObject);
527  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
528  stdlog.appendNameValuePairs(
529  "user", username, "db", dbname, "exception", "Missing Privileges");
530  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
531  " is not allowed to access database " + dbname2 + ".");
532  }
533  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
534 
535  // TODO Fake restriction returned for my user
536  // next phase this info will come from the SAML login
537  // auto login_session = get_session_ptr(session, false);
538  // std::string col = "t1";
539  // std::vector<std::string> vals = {"a", "b", "c"};
540  // auto rest = std::make_shared<Restriction>(col, vals);
541  // login_session->set_restriction(rest);
542 
543  // if pki auth session will come back encrypted with user pubkey
544  SysCatalog::instance().check_for_session_encryption(passwd, session);
545 }
546 
547 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::create_new_session(
548  TSessionId& session,
549  const std::string& dbname,
550  const Catalog_Namespace::UserMetadata& user_meta,
551  std::shared_ptr<Catalog> cat) {
552  do {
553  session = generate_random_string(32);
554  } while (sessions_.find(session) != sessions_.end());
555  std::pair<SessionMap::iterator, bool> emplace_retval =
556  sessions_.emplace(session,
557  std::make_shared<Catalog_Namespace::SessionInfo>(
558  cat, user_meta, executor_device_type_, session));
559  CHECK(emplace_retval.second);
560  auto& session_ptr = emplace_retval.first->second;
561  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
562  return session_ptr;
563 }
564 
565 void DBHandler::connect_impl(TSessionId& session,
566  const std::string& passwd,
567  const std::string& dbname,
568  const Catalog_Namespace::UserMetadata& user_meta,
569  std::shared_ptr<Catalog> cat,
570  query_state::StdLog& stdlog) {
571  // TODO(sy): Is there any reason to have dbname as a parameter
572  // here when the cat parameter already provides cat->name()?
573  // Should dbname and cat->name() ever differ?
574  {
575  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
576  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
577  stdlog.setSessionInfo(session_ptr);
578  session_ptr->set_connection_info(getConnectionInfo().toString());
579  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
580  // while doing warmup
581  if (leaf_aggregator_.leafCount() > 0) {
582  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
583  return;
584  }
585  }
586  }
587  auto const roles =
588  stdlog.getConstSessionInfo()->get_currentUser().isSuper
589  ? std::vector<std::string>{{"super"}}
590  : SysCatalog::instance().getRoles(
591  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
592  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
593 }
594 
595 void DBHandler::disconnect(const TSessionId& session) {
596  auto stdlog = STDLOG();
597  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
598 
599  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
600  auto session_it = get_session_it_unsafe(session, write_lock);
601  stdlog.setSessionInfo(session_it->second);
602  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
603 
604  LOG(INFO) << "User " << session_it->second->get_currentUser().userLoggable()
605  << " disconnected from database " << dbname
606  << " with public_session_id: " << session_it->second->get_public_session_id();
607 
608  disconnect_impl(session_it, write_lock);
609 }
610 
611 void DBHandler::disconnect_impl(const SessionMap::iterator& session_it,
612  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
613  // session_it existence should already have been checked (i.e. called via
614  // get_session_it_unsafe(...))
615  const auto session_id = session_it->second->get_session_id();
616  if (leaf_aggregator_.leafCount() > 0) {
617  leaf_aggregator_.disconnect(session_id);
618  }
619  sessions_.erase(session_it);
620  write_lock.unlock();
621 
622  if (render_handler_) {
623  render_handler_->disconnect(session_id);
624  }
625 }
626 
627 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
628  auto stdlog = STDLOG(get_session_ptr(session));
629  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
630  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
631  auto session_it = get_session_it_unsafe(session, write_lock);
632 
633  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
634 
635  try {
636  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
637  dbname2, session_it->second->get_currentUser().userName);
638  session_it->second->set_catalog_ptr(cat);
639  if (leaf_aggregator_.leafCount() > 0) {
640  leaf_aggregator_.switch_database(session, dbname);
641  return;
642  }
643  } catch (std::exception& e) {
644  THROW_MAPD_EXCEPTION(e.what());
645  }
646 }
647 
648 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
649  auto stdlog = STDLOG(get_session_ptr(session1));
650  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
651  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
652  auto session_it = get_session_it_unsafe(session1, write_lock);
653 
654  try {
655  const Catalog_Namespace::UserMetadata& user_meta =
656  session_it->second->get_currentUser();
657  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
658  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
659  if (leaf_aggregator_.leafCount() > 0) {
660  leaf_aggregator_.clone_session(session1, session2);
661  return;
662  }
663  } catch (std::exception& e) {
664  THROW_MAPD_EXCEPTION(e.what());
665  }
666 }
667 
668 void DBHandler::interrupt(const TSessionId& query_session,
669  const TSessionId& interrupt_session) {
670  // if this is for distributed setting, query_session becomes a parent session (agg)
671  // and the interrupt session is one of existing session in the leaf node (leaf)
672  // so we can think there exists a logical mapping
673  // between query_session (agg) and interrupt_session (leaf)
674  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
675  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
677  // Shared lock to allow simultaneous interrupts of multiple sessions
678  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
679 
680  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
681  auto& cat = session_it->second.get()->getCatalog();
682  const auto dbname = cat.getCurrentDB().dbName;
684  jit_debug_ ? "/tmp" : "",
685  jit_debug_ ? "mapdquery" : "",
687  CHECK(executor);
688 
689  VLOG(1) << "Received interrupt: "
690  << "Session " << *session_it->second << ", Executor " << executor
691  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
692  << session_it->second->get_currentUser().userLoggable() << ", Database "
693  << dbname << std::endl;
694 
695  if (leaf_aggregator_.leafCount() > 0) {
696  leaf_aggregator_.interrupt(query_session, interrupt_session);
697  }
698  executor->interrupt(query_session, interrupt_session);
699 
700  LOG(INFO) << "User " << session_it->second->get_currentUser().userLoggable()
701  << " interrupted session with database " << dbname << std::endl;
702  }
703 }
704 
705 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
706  auto stdlog = STDLOG(get_session_ptr(session));
707  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
708  const auto rendering_enabled = bool(render_handler_);
709  _return.read_only = read_only_;
710  _return.version = MAPD_RELEASE;
711  _return.rendering_enabled = rendering_enabled;
712  _return.poly_rendering_enabled = rendering_enabled;
713  _return.start_time = start_time_;
714  _return.edition = MAPD_EDITION;
715  _return.host_name = omnisci::get_hostname();
716 }
717 
718 void DBHandler::get_status(std::vector<TServerStatus>& _return,
719  const TSessionId& session) {
720  auto stdlog = STDLOG(get_session_ptr(session));
721  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
722  const auto rendering_enabled = bool(render_handler_);
723  TServerStatus ret;
724  ret.read_only = read_only_;
725  ret.version = MAPD_RELEASE;
726  ret.rendering_enabled = rendering_enabled;
727  ret.poly_rendering_enabled = rendering_enabled;
728  ret.start_time = start_time_;
729  ret.edition = MAPD_EDITION;
730  ret.host_name = omnisci::get_hostname();
731 
732  // TSercivePort tcp_port{}
733 
734  if (g_cluster) {
735  ret.role =
736  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
737  } else {
738  ret.role = TRole::type::SERVER;
739  }
740 
741  _return.push_back(ret);
742  if (leaf_aggregator_.leafCount() > 0) {
743  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
744  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
745  }
746 }
747 
748 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
749  const TSessionId& session) {
750  auto stdlog = STDLOG(get_session_ptr(session));
751  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
752  THardwareInfo ret;
753  const auto cuda_mgr = data_mgr_->getCudaMgr();
754  if (cuda_mgr) {
755  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
756  ret.start_gpu = cuda_mgr->getStartGpu();
757  if (ret.start_gpu >= 0) {
758  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
759  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
760  }
761  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
762  TGpuSpecification gpu_spec;
763  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
764  gpu_spec.num_sm = deviceProperties->numMPs;
765  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
766  gpu_spec.memory = deviceProperties->globalMem;
767  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
768  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
769  ret.gpu_info.push_back(gpu_spec);
770  }
771  }
772 
773  // start hardware/OS dependent code
774  ret.num_cpu_hw = std::thread::hardware_concurrency();
775  // ^ This might return diffrent results in case of hyper threading
776  // end hardware/OS dependent code
777 
778  _return.hardware_info.push_back(ret);
779  if (leaf_aggregator_.leafCount() > 0) {
780  ret.host_name = "aggregator";
781  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
782  _return.hardware_info.insert(_return.hardware_info.end(),
783  leaf_hardware.hardware_info.begin(),
784  leaf_hardware.hardware_info.end());
785  }
786 }
787 
788 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
789  auto session_ptr = get_session_ptr(session);
790  auto stdlog = STDLOG(session_ptr);
791  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
792  auto user_metadata = session_ptr->get_currentUser();
793 
794  _return.user = user_metadata.userName;
795  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
796  _return.start_time = session_ptr->get_start_time();
797  _return.is_super = user_metadata.isSuper;
798 }
799 
801  const SQLTypeInfo& ti,
802  TColumn& column) {
803  if (ti.is_array()) {
804  TColumn tColumn;
805  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
806  CHECK(array_tv);
807  bool is_null = !array_tv->is_initialized();
808  if (!is_null) {
809  const auto& vec = array_tv->get();
810  for (const auto& elem_tv : vec) {
811  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
812  }
813  }
814  column.data.arr_col.push_back(tColumn);
815  column.nulls.push_back(is_null && !ti.get_notnull());
816  } else if (ti.is_geometry()) {
817  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
818  if (scalar_tv) {
819  auto s_n = boost::get<NullableString>(scalar_tv);
820  auto s = boost::get<std::string>(s_n);
821  if (s) {
822  column.data.str_col.push_back(*s);
823  } else {
824  column.data.str_col.emplace_back(""); // null string
825  auto null_p = boost::get<void*>(s_n);
826  CHECK(null_p && !*null_p);
827  }
828  column.nulls.push_back(!s && !ti.get_notnull());
829  } else {
830  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
831  CHECK(array_tv);
832  bool is_null = !array_tv->is_initialized();
833  if (!is_null) {
834  auto elem_type = SQLTypeInfo(kDOUBLE, false);
835  TColumn tColumn;
836  const auto& vec = array_tv->get();
837  for (const auto& elem_tv : vec) {
838  value_to_thrift_column(elem_tv, elem_type, tColumn);
839  }
840  column.data.arr_col.push_back(tColumn);
841  column.nulls.push_back(false);
842  } else {
843  TColumn tColumn;
844  column.data.arr_col.push_back(tColumn);
845  column.nulls.push_back(is_null && !ti.get_notnull());
846  }
847  }
848  } else {
849  CHECK(!ti.is_column());
850  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
851  CHECK(scalar_tv);
852  if (boost::get<int64_t>(scalar_tv)) {
853  int64_t data = *(boost::get<int64_t>(scalar_tv));
854 
855  if (ti.is_decimal()) {
856  double val = static_cast<double>(data);
857  if (ti.get_scale() > 0) {
858  val /= pow(10.0, std::abs(ti.get_scale()));
859  }
860  column.data.real_col.push_back(val);
861  } else {
862  column.data.int_col.push_back(data);
863  }
864 
865  switch (ti.get_type()) {
866  case kBOOLEAN:
867  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
868  break;
869  case kTINYINT:
870  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
871  break;
872  case kSMALLINT:
873  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
874  break;
875  case kINT:
876  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
877  break;
878  case kNUMERIC:
879  case kDECIMAL:
880  case kBIGINT:
881  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
882  break;
883  case kTIME:
884  case kTIMESTAMP:
885  case kDATE:
886  case kINTERVAL_DAY_TIME:
888  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
889  break;
890  default:
891  column.nulls.push_back(false);
892  }
893  } else if (boost::get<double>(scalar_tv)) {
894  double data = *(boost::get<double>(scalar_tv));
895  column.data.real_col.push_back(data);
896  if (ti.get_type() == kFLOAT) {
897  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
898  } else {
899  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
900  }
901  } else if (boost::get<float>(scalar_tv)) {
902  CHECK_EQ(kFLOAT, ti.get_type());
903  float data = *(boost::get<float>(scalar_tv));
904  column.data.real_col.push_back(data);
905  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
906  } else if (boost::get<NullableString>(scalar_tv)) {
907  auto s_n = boost::get<NullableString>(scalar_tv);
908  auto s = boost::get<std::string>(s_n);
909  if (s) {
910  column.data.str_col.push_back(*s);
911  } else {
912  column.data.str_col.emplace_back(""); // null string
913  auto null_p = boost::get<void*>(s_n);
914  CHECK(null_p && !*null_p);
915  }
916  column.nulls.push_back(!s && !ti.get_notnull());
917  } else {
918  CHECK(false);
919  }
920  }
921 }
922 
923 TDatum DBHandler::value_to_thrift(const TargetValue& tv, const SQLTypeInfo& ti) {
924  TDatum datum;
925  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
926  if (!scalar_tv) {
927  CHECK(ti.is_array());
928  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
929  CHECK(array_tv);
930  if (array_tv->is_initialized()) {
931  const auto& vec = array_tv->get();
932  for (const auto& elem_tv : vec) {
933  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
934  datum.val.arr_val.push_back(scalar_col_val);
935  }
936  // Datum is not null, at worst it's an empty array Datum
937  datum.is_null = false;
938  } else {
939  datum.is_null = true;
940  }
941  return datum;
942  }
943  if (boost::get<int64_t>(scalar_tv)) {
944  int64_t data = *(boost::get<int64_t>(scalar_tv));
945 
946  if (ti.is_decimal()) {
947  double val = static_cast<double>(data);
948  if (ti.get_scale() > 0) {
949  val /= pow(10.0, std::abs(ti.get_scale()));
950  }
951  datum.val.real_val = val;
952  } else {
953  datum.val.int_val = data;
954  }
955 
956  switch (ti.get_type()) {
957  case kBOOLEAN:
958  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
959  break;
960  case kTINYINT:
961  datum.is_null = (datum.val.int_val == NULL_TINYINT);
962  break;
963  case kSMALLINT:
964  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
965  break;
966  case kINT:
967  datum.is_null = (datum.val.int_val == NULL_INT);
968  break;
969  case kDECIMAL:
970  case kNUMERIC:
971  case kBIGINT:
972  datum.is_null = (datum.val.int_val == NULL_BIGINT);
973  break;
974  case kTIME:
975  case kTIMESTAMP:
976  case kDATE:
977  case kINTERVAL_DAY_TIME:
979  datum.is_null = (datum.val.int_val == NULL_BIGINT);
980  break;
981  default:
982  datum.is_null = false;
983  }
984  } else if (boost::get<double>(scalar_tv)) {
985  datum.val.real_val = *(boost::get<double>(scalar_tv));
986  if (ti.get_type() == kFLOAT) {
987  datum.is_null = (datum.val.real_val == NULL_FLOAT);
988  } else {
989  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
990  }
991  } else if (boost::get<float>(scalar_tv)) {
992  CHECK_EQ(kFLOAT, ti.get_type());
993  datum.val.real_val = *(boost::get<float>(scalar_tv));
994  datum.is_null = (datum.val.real_val == NULL_FLOAT);
995  } else if (boost::get<NullableString>(scalar_tv)) {
996  auto s_n = boost::get<NullableString>(scalar_tv);
997  auto s = boost::get<std::string>(s_n);
998  if (s) {
999  datum.val.str_val = *s;
1000  } else {
1001  auto null_p = boost::get<void*>(s_n);
1002  CHECK(null_p && !*null_p);
1003  }
1004  datum.is_null = !s;
1005  } else {
1006  CHECK(false);
1007  }
1008  return datum;
1009 }
1010 
1011 void DBHandler::sql_execute(TQueryResult& _return,
1012  const TSessionId& session,
1013  const std::string& query_str,
1014  const bool column_format,
1015  const std::string& nonce,
1016  const int32_t first_n,
1017  const int32_t at_most_n) {
1018  auto session_ptr = get_session_ptr(session);
1019  auto query_state = create_query_state(session_ptr, query_str);
1020  auto stdlog = STDLOG(session_ptr, query_state);
1021  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1022  stdlog.appendNameValuePairs("nonce", nonce);
1023  auto timer = DEBUG_TIMER(__func__);
1024 
1025  try {
1026  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
1027  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1028  };
1029 
1030  if (first_n >= 0 && at_most_n >= 0) {
1032  std::string("At most one of first_n and at_most_n can be set"));
1033  }
1034 
1035  if (leaf_aggregator_.leafCount() > 0) {
1036  if (!agg_handler_) {
1037  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
1038  }
1039  _return.total_time_ms = measure<>::execution([&]() {
1040  agg_handler_->cluster_execute(_return,
1041  query_state->createQueryStateProxy(),
1042  query_state->getQueryStr(),
1043  column_format,
1044  nonce,
1045  first_n,
1046  at_most_n,
1048  });
1049  _return.nonce = nonce;
1050  } else {
1051  _return.total_time_ms = measure<>::execution([&]() {
1053  query_state->createQueryStateProxy(),
1054  column_format,
1055  nonce,
1056  session_ptr->get_executor_device_type(),
1057  first_n,
1058  at_most_n);
1059  });
1060  }
1061 
1062  // if the SQL statement we just executed was a geo COPY FROM, the import
1063  // parameters were captured, and this flag set, so we do the actual import here
1064  if (auto geo_copy_from_state =
1065  geo_copy_from_sessions(session_ptr->get_session_id())) {
1066  // import_geo_table() calls create_table() which calls this function to
1067  // do the work, so reset the flag now to avoid executing this part a
1068  // second time at the end of that, which would fail as the table was
1069  // already created! Also reset the flag with a ScopeGuard on exiting
1070  // this function any other way, such as an exception from the code above!
1071  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1072 
1073  // create table as replicated?
1074  TCreateParams create_params;
1075  if (geo_copy_from_state->geo_copy_from_partitions == "REPLICATED") {
1076  create_params.is_replicated = true;
1077  }
1078 
1079  // now do (and time) the import
1080  _return.total_time_ms = measure<>::execution([&]() {
1082  session,
1083  geo_copy_from_state->geo_copy_from_table,
1084  geo_copy_from_state->geo_copy_from_file_name,
1085  copyparams_to_thrift(geo_copy_from_state->geo_copy_from_copy_params),
1086  TRowDescriptor(),
1087  create_params);
1088  });
1089  }
1090  std::string debug_json = timer.stopAndGetJson();
1091  if (!debug_json.empty()) {
1092  _return.__set_debug(std::move(debug_json));
1093  }
1094  stdlog.appendNameValuePairs(
1095  "execution_time_ms",
1096  _return.execution_time_ms,
1097  "total_time_ms", // BE-3420 - Redundant with duration field
1098  stdlog.duration<std::chrono::milliseconds>());
1099  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1100  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1101  } catch (const std::exception& e) {
1102  if (strstr(e.what(), "java.lang.NullPointerException")) {
1103  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
1104  "query failed from broken view or other schema related issue");
1105  } else if (strstr(e.what(), "Parse failed: Encountered \";\"")) {
1106  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1107  } else if (strstr(e.what(),
1108  "Parse failed: Encountered \"<EOF>\" at line 0, column 0")) {
1109  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1110  } else {
1111  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1112  }
1113  }
1114 }
1115 
1116 void DBHandler::sql_execute_df(TDataFrame& _return,
1117  const TSessionId& session,
1118  const std::string& query_str,
1119  const TDeviceType::type device_type,
1120  const int32_t device_id,
1121  const int32_t first_n,
1122  const TArrowTransport::type transport_method) {
1123  auto session_ptr = get_session_ptr(session);
1124  auto query_state = create_query_state(session_ptr, query_str);
1125  auto stdlog = STDLOG(session_ptr, query_state);
1126 
1127  if (device_type == TDeviceType::GPU) {
1128  const auto executor_device_type = session_ptr->get_executor_device_type();
1129  if (executor_device_type != ExecutorDeviceType::GPU) {
1131  std::string("Exception: GPU mode is not allowed in this session"));
1132  }
1133  if (!data_mgr_->gpusPresent()) {
1134  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
1135  }
1136  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1138  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
1139  }
1140  }
1141  _return.execution_time_ms = 0;
1142 
1143  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
1146  auto query_state_proxy = query_state->createQueryStateProxy();
1147  try {
1148  ParserWrapper pw{query_str};
1149  if (!pw.is_ddl && !pw.is_update_dml &&
1150  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
1151  std::string query_ra;
1153  _return.execution_time_ms += measure<>::execution([&]() {
1154  TPlanResult result;
1155  std::tie(result, locks) =
1156  parse_to_ra(query_state_proxy, query_str, {}, true, system_parameters_);
1157  query_ra = result.plan_result;
1158  });
1159 
1160  if (pw.isCalciteExplain()) {
1161  throw std::runtime_error("explain is not unsupported by current thrift API");
1162  }
1165  auto submitted_time = std::chrono::system_clock::now();
1166  query_state_proxy.getQueryState().setQuerySubmittedTime(submitted_time);
1167  executor->enrollQuerySession(session_ptr->get_session_id(),
1168  query_str,
1169  submitted_time,
1171  QuerySessionStatus::QueryStatus::PENDING_QUEUE);
1172  }
1173  execute_rel_alg_df(_return,
1174  query_ra,
1175  query_state_proxy,
1176  *session_ptr,
1177  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
1179  static_cast<size_t>(device_id),
1180  first_n,
1181  transport_method);
1182  return;
1183  }
1184  } catch (std::exception& e) {
1185  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1186  }
1188  "Exception: DDL or update DML are not unsupported by current thrift API");
1189 }
1190 
1191 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1192  const TSessionId& session,
1193  const std::string& query_str,
1194  const int32_t device_id,
1195  const int32_t first_n) {
1196  auto stdlog = STDLOG(get_session_ptr(session));
1197  sql_execute_df(_return,
1198  session,
1199  query_str,
1200  TDeviceType::GPU,
1201  device_id,
1202  first_n,
1203  TArrowTransport::SHARED_MEMORY);
1204 }
1205 
1206 // For now we have only one user of a data frame in all cases.
1207 void DBHandler::deallocate_df(const TSessionId& session,
1208  const TDataFrame& df,
1209  const TDeviceType::type device_type,
1210  const int32_t device_id) {
1211  auto stdlog = STDLOG(get_session_ptr(session));
1212  std::string serialized_cuda_handle = "";
1213  if (device_type == TDeviceType::GPU) {
1214  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1215  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1216  TOmniSciException ex;
1217  ex.error_msg = std::string(
1218  "Exception: current data frame handle is not bookkept or been inserted "
1219  "twice");
1220  LOG(ERROR) << ex.error_msg;
1221  throw ex;
1222  }
1223  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1224  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1225  }
1226  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1227  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1229  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1231  result,
1232  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1233  device_id,
1234  data_mgr_);
1235 }
1236 
1237 std::string DBHandler::apply_copy_to_shim(const std::string& query_str) {
1238  auto result = query_str;
1239  {
1240  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
1241  // boost::regex::icase};
1242  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
1243  boost::regex::extended | boost::regex::icase};
1244  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
1245  result.replace(
1246  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
1247  });
1248  }
1249  return result;
1250 }
1251 
1252 void DBHandler::sql_validate(TRowDescriptor& _return,
1253  const TSessionId& session,
1254  const std::string& query_str) {
1255  try {
1256  auto stdlog = STDLOG(get_session_ptr(session));
1257  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1258  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1259  stdlog.setQueryState(query_state);
1260 
1261  ParserWrapper pw{query_str};
1262  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1263  pw.is_update_dml) {
1264  throw std::runtime_error("Can only validate SELECT statements.");
1265  }
1266 
1267  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
1270 
1271  TPlanResult parse_result;
1273  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1274  query_state->getQueryStr(),
1275  {},
1276  true,
1278  /*check_privileges=*/true);
1279  const auto query_ra = parse_result.plan_result;
1280 
1281  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1282  _return = fixup_row_descriptor(result.row_set.row_desc,
1283  query_state->getConstSessionInfo()->getCatalog());
1284  } catch (const std::exception& e) {
1285  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
1286  }
1287 }
1288 
1289 namespace {
1290 
1292  std::unordered_set<std::string> uc_column_names;
1293  std::unordered_set<std::string> uc_column_table_qualifiers;
1294 };
1295 
1296 // Extract what looks like a (qualified) identifier from the partial query.
1297 // The results will be used to rank the auto-completion results: tables which
1298 // contain at least one of the identifiers first.
1300  const std::string& sql) {
1301  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1302  boost::regex::extended | boost::regex::icase};
1303  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1304  boost::sregex_token_iterator end;
1305  std::unordered_set<std::string> uc_column_names;
1306  std::unordered_set<std::string> uc_column_table_qualifiers;
1307  for (; tok_it != end; ++tok_it) {
1308  std::string column_name = *tok_it;
1309  std::vector<std::string> column_tokens;
1310  boost::split(column_tokens, column_name, boost::is_any_of("."));
1311  if (column_tokens.size() == 2) {
1312  // If the column name is qualified, take user's word.
1313  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1314  } else {
1315  uc_column_names.insert(to_upper(column_name));
1316  }
1317  }
1318  return {uc_column_names, uc_column_table_qualifiers};
1319 }
1320 
1321 } // namespace
1322 
1323 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1324  const TSessionId& session,
1325  const std::string& sql,
1326  const int cursor) {
1327  auto stdlog = STDLOG(get_session_ptr(session));
1328  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1329  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1330  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1331  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1332  proj_tokens.uc_column_names, visible_tables, stdlog);
1333  // Add the table qualifiers explicitly specified by the user.
1334  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1335  proj_tokens.uc_column_table_qualifiers.end());
1336  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1337  std::sort(
1338  hints.begin(),
1339  hints.end(),
1340  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1341  if (lhs.type == TCompletionHintType::TABLE &&
1342  rhs.type == TCompletionHintType::TABLE) {
1343  // Between two tables, one which is compatible with the specified
1344  // projections and one which isn't, pick the one which is compatible.
1345  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1346  compatible_table_names.end() &&
1347  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1348  compatible_table_names.end()) {
1349  return true;
1350  }
1351  }
1352  return lhs.type < rhs.type;
1353  });
1354 }
1355 
1356 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1357  std::vector<std::string>& visible_tables,
1358  query_state::StdLog& stdlog,
1359  const std::string& sql,
1360  const int cursor) {
1361  const auto& session_info = *stdlog.getConstSessionInfo();
1362  try {
1363  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1364  // Filter out keywords suggested by Calcite which we don't support.
1366  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1367  } catch (const std::exception& e) {
1368  TOmniSciException ex;
1369  ex.error_msg = "Exception: " + std::string(e.what());
1370  LOG(ERROR) << ex.error_msg;
1371  throw ex;
1372  }
1373  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1374  const size_t length_to_cursor =
1375  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1376  // Trust hints from Calcite after the FROM keyword.
1377  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1378  return;
1379  }
1380  // Before FROM, the query is too incomplete for context-sensitive completions.
1381  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1382 }
1383 
1384 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1385  query_state::StdLog& stdlog,
1386  std::vector<std::string>& visible_tables,
1387  const std::string& sql,
1388  const int cursor) {
1389  const auto last_word =
1390  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1391  boost::regex select_expr{R"(\s*select\s+)",
1392  boost::regex::extended | boost::regex::icase};
1393  const size_t length_to_cursor =
1394  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1395  // After SELECT but before FROM, look for all columns in all tables which match the
1396  // prefix.
1397  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1398  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1399  // Trust the fully qualified columns the most.
1400  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1401  return;
1402  }
1403  // Not much information to use, just retrieve column names which match the prefix.
1404  if (should_suggest_column_hints(sql)) {
1405  get_column_hints(hints, last_word, column_names_by_table);
1406  return;
1407  }
1408  const std::string kFromKeyword{"FROM"};
1409  if (boost::istarts_with(kFromKeyword, last_word)) {
1410  TCompletionHint keyword_hint;
1411  keyword_hint.type = TCompletionHintType::KEYWORD;
1412  keyword_hint.replaced = last_word;
1413  keyword_hint.hints.emplace_back(kFromKeyword);
1414  hints.push_back(keyword_hint);
1415  }
1416  } else {
1417  const std::string kSelectKeyword{"SELECT"};
1418  if (boost::istarts_with(kSelectKeyword, last_word)) {
1419  TCompletionHint keyword_hint;
1420  keyword_hint.type = TCompletionHintType::KEYWORD;
1421  keyword_hint.replaced = last_word;
1422  keyword_hint.hints.emplace_back(kSelectKeyword);
1423  hints.push_back(keyword_hint);
1424  }
1425  }
1426 }
1427 
1428 std::unordered_map<std::string, std::unordered_set<std::string>>
1429 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1430  query_state::StdLog& stdlog) {
1431  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1432  for (auto it = table_names.begin(); it != table_names.end();) {
1433  TTableDetails table_details;
1434  try {
1435  get_table_details_impl(table_details, stdlog, *it, false, false);
1436  } catch (const TOmniSciException& e) {
1437  // Remove the corrupted Table/View name from the list for further processing.
1438  it = table_names.erase(it);
1439  continue;
1440  }
1441  for (const auto& column_type : table_details.row_desc) {
1442  column_names_by_table[*it].emplace(column_type.col_name);
1443  }
1444  ++it;
1445  }
1446  return column_names_by_table;
1447 }
1448 
1452 }
1453 
1455  const std::unordered_set<std::string>& uc_column_names,
1456  std::vector<std::string>& table_names,
1457  query_state::StdLog& stdlog) {
1458  std::unordered_set<std::string> compatible_table_names_by_column;
1459  for (auto it = table_names.begin(); it != table_names.end();) {
1460  TTableDetails table_details;
1461  try {
1462  get_table_details_impl(table_details, stdlog, *it, false, false);
1463  } catch (const TOmniSciException& e) {
1464  // Remove the corrupted Table/View name from the list for further processing.
1465  it = table_names.erase(it);
1466  continue;
1467  }
1468  for (const auto& column_type : table_details.row_desc) {
1469  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1470  compatible_table_names_by_column.emplace(to_upper(*it));
1471  break;
1472  }
1473  }
1474  ++it;
1475  }
1476  return compatible_table_names_by_column;
1477 }
1478 
1479 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1480  QueryStateProxy query_state_proxy) {
1481  TQueryResult result;
1482  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1483  [this, &result, &query_state_proxy, &query_ra](const size_t executor_index) {
1484  execute_rel_alg(result,
1485  query_state_proxy,
1486  query_ra,
1487  true,
1489  -1,
1490  -1,
1491  /*just_validate=*/true,
1492  /*find_filter_push_down_candidates=*/false,
1494  executor_index);
1495  });
1497  dispatch_queue_->submit(execute_rel_alg_task, /*is_update_delete=*/false);
1498  auto result_future = execute_rel_alg_task->get_future();
1499  result_future.get();
1500  return result;
1501 }
1502 
1503 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1504  auto stdlog = STDLOG(get_session_ptr(session));
1505  auto session_ptr = stdlog.getConstSessionInfo();
1506  if (!session_ptr->get_currentUser().isSuper) {
1507  // WARNING: This appears to not include roles a user is a member of,
1508  // if the role has no permissions granted to it.
1509  roles =
1510  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1511  session_ptr->getCatalog().getCurrentDB().dbId);
1512  } else {
1513  roles = SysCatalog::instance().getRoles(
1514  false, true, session_ptr->get_currentUser().userName);
1515  }
1516 }
1517 
1518 bool DBHandler::has_role(const TSessionId& sessionId,
1519  const std::string& granteeName,
1520  const std::string& roleName) {
1521  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1522  const auto session_ptr = stdlog.getConstSessionInfo();
1523  const auto current_user = session_ptr->get_currentUser();
1524  if (!current_user.isSuper) {
1525  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1526  user && current_user.userName != granteeName) {
1527  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1528  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1529  current_user.userName, granteeName, true)) {
1531  "Only super users can check roles assignment that have not been directly "
1532  "granted to a user.");
1533  }
1534  }
1535  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1536 }
1537 
1538 static TDBObject serialize_db_object(const std::string& roleName,
1539  const DBObject& inObject) {
1540  TDBObject outObject;
1541  outObject.objectName = inObject.getName();
1542  outObject.grantee = roleName;
1543  const auto ap = inObject.getPrivileges();
1544  switch (inObject.getObjectKey().permissionType) {
1545  case DatabaseDBObjectType:
1546  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1547  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1548  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1549  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1550  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1551 
1552  break;
1553  case TableDBObjectType:
1554  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1555  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1556  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1557  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1558  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1559  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1560  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1561  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1562  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1563 
1564  break;
1565  case DashboardDBObjectType:
1566  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1567  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1568  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1569  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1570  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1571 
1572  break;
1573  case ViewDBObjectType:
1574  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1575  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1576  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1577  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1578  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1579  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1580  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1581 
1582  break;
1583  case ServerDBObjectType:
1584  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1585  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1586  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1587  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1588 
1589  break;
1590  default:
1591  CHECK(false);
1592  }
1593  const int type_val = static_cast<int>(inObject.getType());
1594  CHECK(type_val >= 0 && type_val < 6);
1595  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1596  return outObject;
1597 }
1598 
1600  const TDBObjectPermissions& permissions) {
1601  if (!permissions.__isset.database_permissions_) {
1602  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1603  }
1604  auto perms = permissions.database_permissions_;
1605  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1606  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1607  (perms.view_sql_editor_ &&
1609  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1610  return false;
1611  } else {
1612  return true;
1613  }
1614 }
1615 
1617  const TDBObjectPermissions& permissions) {
1618  if (!permissions.__isset.table_permissions_) {
1619  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1620  }
1621  auto perms = permissions.table_permissions_;
1622  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1623  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1624  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1625  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1626  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1627  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1628  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1629  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1630  return false;
1631  } else {
1632  return true;
1633  }
1634 }
1635 
1637  const TDBObjectPermissions& permissions) {
1638  if (!permissions.__isset.dashboard_permissions_) {
1639  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1640  }
1641  auto perms = permissions.dashboard_permissions_;
1642  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1643  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1644  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1645  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1646  return false;
1647  } else {
1648  return true;
1649  }
1650 }
1651 
1653  const TDBObjectPermissions& permissions) {
1654  if (!permissions.__isset.view_permissions_) {
1655  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1656  }
1657  auto perms = permissions.view_permissions_;
1658  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1659  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1660  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1661  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1662  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1663  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1664  return false;
1665  } else {
1666  return true;
1667  }
1668 }
1669 
1671  const TDBObjectPermissions& permissions) {
1672  CHECK(permissions.__isset.server_permissions_);
1673  auto perms = permissions.server_permissions_;
1674  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
1675  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
1676  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER))) {
1677  return false;
1678  } else {
1679  return true;
1680  }
1681 }
1682 
1683 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1684  const std::string& granteeName,
1685  const std::string& objectName,
1686  const TDBObjectType::type objectType,
1687  const TDBObjectPermissions& permissions) {
1688  auto stdlog = STDLOG(get_session_ptr(sessionId));
1689  auto session_ptr = stdlog.getConstSessionInfo();
1690  auto const& cat = session_ptr->getCatalog();
1691  auto const& current_user = session_ptr->get_currentUser();
1692  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1693  current_user.userName, granteeName, false)) {
1695  "Users except superusers can only check privileges for self or roles granted "
1696  "to "
1697  "them.")
1698  }
1700  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1701  user_meta.isSuper) {
1702  return true;
1703  }
1704  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1705  if (!grnt) {
1706  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1707  }
1709  std::string func_name;
1710  switch (objectType) {
1713  func_name = "database";
1714  break;
1717  func_name = "table";
1718  break;
1721  func_name = "dashboard";
1722  break;
1725  func_name = "view";
1726  break;
1729  func_name = "server";
1730  break;
1731  default:
1732  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1733  }
1734  DBObject req_object(objectName, type);
1735  req_object.loadKey(cat);
1736 
1737  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1738  if (grantee_object) {
1739  // if grantee has privs on the object
1740  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1741  } else {
1742  // no privileges on that object
1743  return false;
1744  }
1745 }
1746 
1747 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1748  const TSessionId& sessionId,
1749  const std::string& roleName) {
1750  auto stdlog = STDLOG(get_session_ptr(sessionId));
1751  auto session_ptr = stdlog.getConstSessionInfo();
1752  auto const& user = session_ptr->get_currentUser();
1753  if (!user.isSuper &&
1754  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1755  return;
1756  }
1757  auto* rl = SysCatalog::instance().getGrantee(roleName);
1758  if (rl) {
1759  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1760  for (auto& dbObject : *rl->getDbObjects(true)) {
1761  if (dbObject.first.dbId != dbId) {
1762  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1763  // usecase for now, though)
1764  continue;
1765  }
1766  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1767  TDBObjectsForRole.push_back(tdbObject);
1768  }
1769  } else {
1770  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
1771  }
1772 }
1773 
1774 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1775  const TSessionId& sessionId,
1776  const std::string& objectName,
1777  const TDBObjectType::type type) {
1778  auto stdlog = STDLOG(get_session_ptr(sessionId));
1779  auto session_ptr = stdlog.getConstSessionInfo();
1780  DBObjectType object_type;
1781  switch (type) {
1783  object_type = DBObjectType::DatabaseDBObjectType;
1784  break;
1786  object_type = DBObjectType::TableDBObjectType;
1787  break;
1790  break;
1792  object_type = DBObjectType::ViewDBObjectType;
1793  break;
1795  object_type = DBObjectType::ServerDBObjectType;
1796  break;
1797  default:
1798  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
1799  ": unknown object type (" + std::to_string(type) + ").");
1800  }
1801  DBObject object_to_find(objectName, object_type);
1802 
1803  // TODO(adb): Use DatabaseLock to protect method
1804  try {
1805  if (object_type == DashboardDBObjectType) {
1806  if (objectName == "") {
1807  object_to_find = DBObject(-1, object_type);
1808  } else {
1809  object_to_find = DBObject(std::stoi(objectName), object_type);
1810  }
1811  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1812  !objectName.empty()) {
1813  // special handling for view / table
1814  auto const& cat = session_ptr->getCatalog();
1815  auto td = cat.getMetadataForTable(objectName, false);
1816  if (td) {
1817  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1818  object_to_find = DBObject(objectName, object_type);
1819  }
1820  }
1821  object_to_find.loadKey(session_ptr->getCatalog());
1822  } catch (const std::exception&) {
1823  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
1824  }
1825 
1826  // object type on database level
1827  DBObject object_to_find_dblevel("", object_type);
1828  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
1829  // if user is superuser respond with a full priv
1830  if (session_ptr->get_currentUser().isSuper) {
1831  // using ALL_TABLE here to set max permissions
1832  DBObject dbObj{object_to_find.getObjectKey(),
1834  session_ptr->get_currentUser().userId};
1835  dbObj.setName("super");
1836  TDBObjects.push_back(
1837  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
1838  };
1839 
1840  std::vector<std::string> grantees =
1841  SysCatalog::instance().getRoles(true,
1842  session_ptr->get_currentUser().isSuper,
1843  session_ptr->get_currentUser().userName);
1844  for (const auto& grantee : grantees) {
1845  DBObject* object_found;
1846  auto* gr = SysCatalog::instance().getGrantee(grantee);
1847  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1848  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1849  }
1850  // check object permissions on Database level
1851  if (gr &&
1852  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1853  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1854  }
1855  }
1856 }
1857 
1858 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
1859  const TSessionId& sessionId,
1860  const std::string& granteeName) {
1861  auto stdlog = STDLOG(get_session_ptr(sessionId));
1862  auto session_ptr = stdlog.getConstSessionInfo();
1863  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
1864  if (grantee) {
1865  if (session_ptr->get_currentUser().isSuper) {
1866  roles = grantee->getRoles();
1867  } else if (grantee->isUser()) {
1868  if (session_ptr->get_currentUser().userName == granteeName) {
1869  roles = grantee->getRoles();
1870  } else {
1872  "Only a superuser is authorized to request list of roles granted to "
1873  "another "
1874  "user.");
1875  }
1876  } else {
1877  CHECK(!grantee->isUser());
1878  // granteeName is actually a roleName here and we can check a role
1879  // only if it is granted to us
1880  if (SysCatalog::instance().isRoleGrantedToGrantee(
1881  session_ptr->get_currentUser().userName, granteeName, false)) {
1882  roles = grantee->getRoles();
1883  } else {
1884  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
1885  }
1886  }
1887  } else {
1888  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
1889  }
1890 }
1891 
1892 namespace {
1894  const std::map<std::string, std::vector<std::string>>& table_col_names) {
1895  std::ostringstream oss;
1896  for (const auto& [table_name, col_names] : table_col_names) {
1897  oss << ":" << table_name;
1898  for (const auto& col_name : col_names) {
1899  oss << "," << col_name;
1900  }
1901  }
1902  return oss.str();
1903 }
1904 } // namespace
1905 
1907  TPixelTableRowResult& _return,
1908  const TSessionId& session,
1909  const int64_t widget_id,
1910  const TPixel& pixel,
1911  const std::map<std::string, std::vector<std::string>>& table_col_names,
1912  const bool column_format,
1913  const int32_t pixel_radius,
1914  const std::string& nonce) {
1915  auto stdlog = STDLOG(get_session_ptr(session),
1916  "widget_id",
1917  widget_id,
1918  "pixel.x",
1919  pixel.x,
1920  "pixel.y",
1921  pixel.y,
1922  "column_format",
1923  column_format,
1924  "pixel_radius",
1925  pixel_radius,
1926  "table_col_names",
1927  dump_table_col_names(table_col_names),
1928  "nonce",
1929  nonce);
1930  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1931  auto session_ptr = stdlog.getSessionInfo();
1932  if (!render_handler_) {
1933  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
1934  }
1935 
1936  try {
1937  render_handler_->get_result_row_for_pixel(_return,
1938  session_ptr,
1939  widget_id,
1940  pixel,
1941  table_col_names,
1942  column_format,
1943  pixel_radius,
1944  nonce);
1945  } catch (std::exception& e) {
1946  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1947  }
1948 }
1949 
1951  const ColumnDescriptor* cd) {
1952  TColumnType col_type;
1953  col_type.col_name = cd->columnName;
1954  col_type.src_name = cd->sourceName;
1955  col_type.col_id = cd->columnId;
1956  col_type.col_type.type = type_to_thrift(cd->columnType);
1957  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
1958  col_type.col_type.nullable = !cd->columnType.get_notnull();
1959  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
1960  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
1961  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
1962  }
1963  if (IS_GEO(cd->columnType.get_type())) {
1965  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
1966  } else {
1967  col_type.col_type.precision = cd->columnType.get_precision();
1968  col_type.col_type.scale = cd->columnType.get_scale();
1969  }
1970  col_type.is_system = cd->isSystemCol;
1972  cat != nullptr) {
1973  // have to get the actual size of the encoding from the dictionary definition
1974  const int dict_id = cd->columnType.get_comp_param();
1975  if (!cat->getMetadataForDict(dict_id, false)) {
1976  col_type.col_type.comp_param = 0;
1977  return col_type;
1978  }
1979  auto dd = cat->getMetadataForDict(dict_id, false);
1980  if (!dd) {
1981  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
1982  }
1983  col_type.col_type.comp_param = dd->dictNBits;
1984  } else {
1985  col_type.col_type.comp_param =
1986  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
1987  ? 32
1988  : cd->columnType.get_comp_param();
1989  }
1990  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
1991  return col_type;
1992 }
1993 
1994 void DBHandler::get_internal_table_details(TTableDetails& _return,
1995  const TSessionId& session,
1996  const std::string& table_name) {
1997  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1998  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1999  get_table_details_impl(_return, stdlog, table_name, true, false);
2000 }
2001 
2002 void DBHandler::get_table_details(TTableDetails& _return,
2003  const TSessionId& session,
2004  const std::string& table_name) {
2005  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2006  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2007  get_table_details_impl(_return, stdlog, table_name, false, false);
2008 }
2009 
2010 void DBHandler::get_table_details_impl(TTableDetails& _return,
2011  query_state::StdLog& stdlog,
2012  const std::string& table_name,
2013  const bool get_system,
2014  const bool get_physical) {
2015  try {
2016  auto session_info = stdlog.getSessionInfo();
2017  auto& cat = session_info->getCatalog();
2018  const auto td_with_lock =
2020  cat, table_name, false);
2021  const auto td = td_with_lock();
2022  CHECK(td);
2023 
2024  bool have_privileges_on_view_sources = true;
2025  if (td->isView) {
2026  auto query_state = create_query_state(session_info, td->viewSQL);
2027  stdlog.setQueryState(query_state);
2028  try {
2029  if (hasTableAccessPrivileges(td, *session_info)) {
2030  // TODO(adb): we should take schema read locks on all tables making up the
2031  // view, at minimum
2032  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
2033  query_state->getQueryStr(),
2034  {},
2035  false,
2037  false);
2038  try {
2039  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2040  query_ra.first);
2041  } catch (const std::runtime_error&) {
2042  have_privileges_on_view_sources = false;
2043  }
2044 
2045  const auto result = validate_rel_alg(query_ra.first.plan_result,
2046  query_state->createQueryStateProxy());
2047 
2048  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
2049  } else {
2050  throw std::runtime_error(
2051  "Unable to access view " + table_name +
2052  ". The view may not exist, or the logged in user may not "
2053  "have permission to access the view.");
2054  }
2055  } catch (const std::exception& e) {
2056  throw std::runtime_error("View '" + table_name +
2057  "' query has failed with an error: '" +
2058  std::string(e.what()) +
2059  "'.\nThe view must be dropped and re-created to "
2060  "resolve the error. \nQuery:\n" +
2061  query_state->getQueryStr());
2062  }
2063  } else {
2064  if (hasTableAccessPrivileges(td, *session_info)) {
2065  const auto col_descriptors =
2066  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
2067  const auto deleted_cd = cat.getDeletedColumn(td);
2068  for (const auto cd : col_descriptors) {
2069  if (cd == deleted_cd) {
2070  continue;
2071  }
2072  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
2073  }
2074  } else {
2075  throw std::runtime_error(
2076  "Unable to access table " + table_name +
2077  ". The table may not exist, or the logged in user may not "
2078  "have permission to access the table.");
2079  }
2080  }
2081  _return.fragment_size = td->maxFragRows;
2082  _return.page_size = td->fragPageSize;
2083  _return.max_rows = td->maxRows;
2084  _return.view_sql =
2085  (have_privileges_on_view_sources ? td->viewSQL
2086  : "[Not enough privileges to see the view SQL]");
2087  _return.shard_count = td->nShards;
2088  _return.key_metainfo = td->keyMetainfo;
2089  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2090  _return.partition_detail =
2091  td->partitions.empty()
2092  ? TPartitionDetail::DEFAULT
2093  : (table_is_replicated(td)
2094  ? TPartitionDetail::REPLICATED
2095  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2096  : TPartitionDetail::OTHER));
2097  } catch (const std::runtime_error& e) {
2098  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2099  }
2100 }
2101 
2102 void DBHandler::get_link_view(TFrontendView& _return,
2103  const TSessionId& session,
2104  const std::string& link) {
2105  auto stdlog = STDLOG(get_session_ptr(session));
2106  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2107  auto session_ptr = stdlog.getConstSessionInfo();
2108  auto const& cat = session_ptr->getCatalog();
2109  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2110  if (!ld) {
2111  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
2112  }
2113  _return.view_state = ld->viewState;
2114  _return.view_name = ld->link;
2115  _return.update_time = ld->updateTime;
2116  _return.view_metadata = ld->viewMetadata;
2117 }
2118 
2120  const TableDescriptor* td,
2121  const Catalog_Namespace::SessionInfo& session_info) {
2122  auto& cat = session_info.getCatalog();
2123  auto user_metadata = session_info.get_currentUser();
2124 
2125  if (user_metadata.isSuper) {
2126  return true;
2127  }
2128 
2130  dbObject.loadKey(cat);
2131  std::vector<DBObject> privObjects = {dbObject};
2132 
2133  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2134 }
2135 
2136 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2137  const Catalog_Namespace::SessionInfo& session_info,
2138  const GetTablesType get_tables_type) {
2139  table_names = session_info.getCatalog().getTableNamesForUser(
2140  session_info.get_currentUser(), get_tables_type);
2141 }
2142 
2143 void DBHandler::get_tables(std::vector<std::string>& table_names,
2144  const TSessionId& session) {
2145  auto stdlog = STDLOG(get_session_ptr(session));
2146  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2148  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2149 }
2150 
2151 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2152  const TSessionId& session) {
2153  auto stdlog = STDLOG(get_session_ptr(session));
2154  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2155  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2156 }
2157 
2158 void DBHandler::get_views(std::vector<std::string>& table_names,
2159  const TSessionId& session) {
2160  auto stdlog = STDLOG(get_session_ptr(session));
2161  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2162  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2163 }
2164 
2165 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2166  QueryStateProxy query_state_proxy,
2167  const Catalog_Namespace::SessionInfo& session_info,
2168  const bool with_table_locks) {
2169  const auto& cat = session_info.getCatalog();
2170  const auto tables = cat.getAllTableMetadata();
2171  _return.reserve(tables.size());
2172 
2173  for (const auto td : tables) {
2174  if (td->shard >= 0) {
2175  // skip shards, they're not standalone tables
2176  continue;
2177  }
2178  if (!hasTableAccessPrivileges(td, session_info)) {
2179  // skip table, as there are no privileges to access it
2180  continue;
2181  }
2182 
2183  TTableMeta ret;
2184  ret.table_name = td->tableName;
2185  ret.is_view = td->isView;
2186  ret.is_replicated = table_is_replicated(td);
2187  ret.shard_count = td->nShards;
2188  ret.max_rows = td->maxRows;
2189  ret.table_id = td->tableId;
2190 
2191  std::vector<TTypeInfo> col_types;
2192  std::vector<std::string> col_names;
2193  size_t num_cols = 0;
2194  if (td->isView) {
2195  try {
2196  TPlanResult parse_result;
2198  std::tie(parse_result, locks) = parse_to_ra(
2199  query_state_proxy, td->viewSQL, {}, with_table_locks, system_parameters_);
2200  const auto query_ra = parse_result.plan_result;
2201 
2202  TQueryResult result;
2203  execute_rel_alg(result,
2204  query_state_proxy,
2205  query_ra,
2206  true,
2208  -1,
2209  -1,
2210  /*just_validate=*/true,
2211  /*find_push_down_candidates=*/false,
2213  num_cols = result.row_set.row_desc.size();
2214  for (const auto& col : result.row_set.row_desc) {
2215  if (col.is_physical) {
2216  num_cols--;
2217  continue;
2218  }
2219  col_types.push_back(col.col_type);
2220  col_names.push_back(col.col_name);
2221  }
2222  } catch (std::exception& e) {
2223  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
2224  }
2225  } else {
2226  try {
2227  if (hasTableAccessPrivileges(td, session_info)) {
2228  const auto col_descriptors =
2229  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2230  const auto deleted_cd = cat.getDeletedColumn(td);
2231  for (const auto cd : col_descriptors) {
2232  if (cd == deleted_cd) {
2233  continue;
2234  }
2235  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2236  col_names.push_back(cd->columnName);
2237  }
2238  num_cols = col_descriptors.size();
2239  } else {
2240  continue;
2241  }
2242  } catch (const std::runtime_error& e) {
2243  THROW_MAPD_EXCEPTION(e.what());
2244  }
2245  }
2246 
2247  ret.num_cols = num_cols;
2248  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2249  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2250 
2251  _return.push_back(ret);
2252  }
2253 }
2254 
2255 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2256  const TSessionId& session) {
2257  auto stdlog = STDLOG(get_session_ptr(session));
2258  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2259  auto session_ptr = stdlog.getConstSessionInfo();
2260  auto query_state = create_query_state(session_ptr, "");
2261  stdlog.setQueryState(query_state);
2262 
2263  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2266 
2267  try {
2268  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2269  } catch (const std::exception& e) {
2270  THROW_MAPD_EXCEPTION(e.what());
2271  }
2272 }
2273 
2274 void DBHandler::get_users(std::vector<std::string>& user_names,
2275  const TSessionId& session) {
2276  auto stdlog = STDLOG(get_session_ptr(session));
2277  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2278  auto session_ptr = stdlog.getConstSessionInfo();
2279  std::list<Catalog_Namespace::UserMetadata> user_list;
2280 
2281  if (!session_ptr->get_currentUser().isSuper) {
2282  user_list = SysCatalog::instance().getAllUserMetadata(
2283  session_ptr->getCatalog().getCurrentDB().dbId);
2284  } else {
2285  user_list = SysCatalog::instance().getAllUserMetadata();
2286  }
2287  for (auto u : user_list) {
2288  user_names.push_back(u.userName);
2289  }
2290 }
2291 
2292 void DBHandler::get_version(std::string& version) {
2293  version = MAPD_RELEASE;
2294 }
2295 
2296 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2297  auto stdlog = STDLOG(get_session_ptr(session));
2298  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2299  auto session_ptr = stdlog.getConstSessionInfo();
2300  if (!session_ptr->get_currentUser().isSuper) {
2301  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2302  }
2303  try {
2305  } catch (const std::exception& e) {
2306  THROW_MAPD_EXCEPTION(e.what());
2307  }
2308  if (render_handler_) {
2309  render_handler_->clear_gpu_memory();
2310  }
2311 
2312  if (leaf_aggregator_.leafCount() > 0) {
2314  }
2315 }
2316 
2317 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2318  auto stdlog = STDLOG(get_session_ptr(session));
2319  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2320  auto session_ptr = stdlog.getConstSessionInfo();
2321  if (!session_ptr->get_currentUser().isSuper) {
2322  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2323  }
2324  try {
2326  } catch (const std::exception& e) {
2327  THROW_MAPD_EXCEPTION(e.what());
2328  }
2329  if (render_handler_) {
2330  render_handler_->clear_cpu_memory();
2331  }
2332 
2333  if (leaf_aggregator_.leafCount() > 0) {
2335  }
2336 }
2337 
2339  return INVALID_SESSION_ID;
2340 }
2341 
2342 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2343  const TSessionId& session,
2344  const std::string& memory_level) {
2345  auto stdlog = STDLOG(get_session_ptr(session));
2346  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2347  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2348  Data_Namespace::MemoryLevel mem_level;
2349  if (!memory_level.compare("gpu")) {
2351  internal_memory =
2352  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2353  } else {
2355  internal_memory =
2356  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2357  }
2358 
2359  for (auto memInfo : internal_memory) {
2360  TNodeMemoryInfo nodeInfo;
2361  if (leaf_aggregator_.leafCount() > 0) {
2362  nodeInfo.host_name = omnisci::get_hostname();
2363  }
2364  nodeInfo.page_size = memInfo.pageSize;
2365  nodeInfo.max_num_pages = memInfo.maxNumPages;
2366  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2367  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2368  for (auto gpu : memInfo.nodeMemoryData) {
2369  TMemoryData md;
2370  md.slab = gpu.slabNum;
2371  md.start_page = gpu.startPage;
2372  md.num_pages = gpu.numPages;
2373  md.touch = gpu.touch;
2374  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2375  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2376  nodeInfo.node_memory_data.push_back(md);
2377  }
2378  _return.push_back(nodeInfo);
2379  }
2380  if (leaf_aggregator_.leafCount() > 0) {
2381  std::vector<TNodeMemoryInfo> leafSummary =
2382  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2383  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2384  }
2385 }
2386 
2387 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2388  auto stdlog = STDLOG(get_session_ptr(session));
2389  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2390  auto session_ptr = stdlog.getConstSessionInfo();
2391  const auto& user = session_ptr->get_currentUser();
2393  SysCatalog::instance().getDatabaseListForUser(user);
2394  for (auto& db : dbs) {
2395  TDBInfo dbinfo;
2396  dbinfo.db_name = std::move(db.dbName);
2397  dbinfo.db_owner = std::move(db.dbOwnerName);
2398  dbinfos.push_back(std::move(dbinfo));
2399  }
2400 }
2401 
2402 void DBHandler::set_execution_mode(const TSessionId& session,
2403  const TExecuteMode::type mode) {
2404  auto stdlog = STDLOG(get_session_ptr(session));
2405  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2406  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
2407  auto session_it = get_session_it_unsafe(session, write_lock);
2408  if (leaf_aggregator_.leafCount() > 0) {
2409  leaf_aggregator_.set_execution_mode(session, mode);
2410  try {
2411  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2412  } catch (const TOmniSciException& e) {
2413  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2414  }
2415  return;
2416  }
2417  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2418 }
2419 
2420 namespace {
2421 
2423  if (td && td->nShards) {
2424  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2425  }
2426 }
2427 
2428 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
2429  const std::vector<std::string>& column_names) {
2430  std::unordered_set<std::string> unique_names;
2431  for (const auto& name : column_names) {
2432  auto lower_name = to_lower(name);
2433  if (unique_names.find(lower_name) != unique_names.end()) {
2434  THROW_MAPD_EXCEPTION("Column " + name + " is mentioned multiple times");
2435  } else {
2436  unique_names.insert(lower_name);
2437  }
2438  }
2439  for (const auto& cd : descs) {
2440  auto iter = unique_names.find(to_lower(cd->columnName));
2441  if (iter != unique_names.end()) {
2442  unique_names.erase(iter);
2443  }
2444  }
2445  if (!unique_names.empty()) {
2446  THROW_MAPD_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
2447  }
2448 }
2449 
2450 // Return vector of IDs mapping column descriptors to the list of comumn names.
2451 // The size of the vector is the number of actual columns (geophisical columns excluded).
2452 // ID is either a position in column_names matching the descriptor, or -1 if the column
2453 // is missing from the column_names
2454 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
2455  const std::vector<std::string>& column_names) {
2456  std::vector<int> desc_to_column_ids;
2457  if (column_names.empty()) {
2458  int col_idx = 0;
2459  for (const auto& cd : descs) {
2460  if (!cd->isGeoPhyCol) {
2461  desc_to_column_ids.push_back(col_idx);
2462  ++col_idx;
2463  }
2464  }
2465  } else {
2466  for (const auto& cd : descs) {
2467  if (!cd->isGeoPhyCol) {
2468  bool found = false;
2469  for (size_t j = 0; j < column_names.size(); ++j) {
2470  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
2471  found = true;
2472  desc_to_column_ids.push_back(j);
2473  break;
2474  }
2475  }
2476  if (!found) {
2477  if (!cd->columnType.get_notnull()) {
2478  desc_to_column_ids.push_back(-1);
2479  } else {
2480  THROW_MAPD_EXCEPTION("Column '" + cd->columnName +
2481  "' cannot be omitted due to NOT NULL constraint");
2482  }
2483  }
2484  }
2485  }
2486  }
2487  return desc_to_column_ids;
2488 }
2489 
2490 } // namespace
2491 
2492 void DBHandler::load_table_binary(const TSessionId& session,
2493  const std::string& table_name,
2494  const std::vector<TRow>& rows,
2495  const std::vector<std::string>& column_names) {
2496  try {
2497  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2498  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2499  auto session_ptr = stdlog.getConstSessionInfo();
2500  check_read_only("load_table_binary");
2501  auto& cat = session_ptr->getCatalog();
2502  const auto td_with_lock =
2504  cat, table_name, true);
2505  const auto td = td_with_lock();
2506  CHECK(td);
2507  if (g_cluster && !leaf_aggregator_.leafCount()) {
2508  // Sharded table rows need to be routed to the leaf by an aggregator.
2510  }
2511  check_table_load_privileges(*session_ptr, table_name);
2512  if (rows.empty()) {
2513  THROW_MAPD_EXCEPTION("No rows to insert");
2514  }
2515  std::unique_ptr<import_export::Loader> loader;
2516  if (leaf_aggregator_.leafCount() > 0) {
2517  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2518  } else {
2519  loader.reset(new import_export::Loader(cat, td));
2520  }
2521  auto col_descs = loader->get_column_descs();
2522  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
2523  check_valid_column_names(col_descs, column_names);
2524  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2525  // Subtracting 1 (rowid) until TableDescriptor is updated.
2526  if (column_names.empty() &&
2527  rows.front().cols.size() !=
2528  static_cast<size_t>(td->nColumns) - (td->hasDeletedCol ? 2 : 1)) {
2529  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2530  } else if (!column_names.empty() && rows.front().cols.size() != column_names.size()) {
2532  "Number of columns specified does not match the "
2533  "number of columns given (" +
2534  std::to_string(rows.front().cols.size()) + " vs " +
2535  std::to_string(column_names.size()) + ")");
2536  }
2537  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2538  for (auto cd : col_descs) {
2539  import_buffers.push_back(std::unique_ptr<import_export::TypedImportBuffer>(
2540  new import_export::TypedImportBuffer(cd, loader->getStringDict(cd))));
2541  }
2542  TDatum empty_value;
2543  empty_value.is_null = true;
2544  size_t rows_completed = 0;
2545  for (auto const& row : rows) {
2546  size_t col_idx = 0;
2547  try {
2548  for (auto cd : col_descs) {
2549  int mapped_idx = desc_id_to_column_id[col_idx];
2550  if (mapped_idx != -1) {
2551  import_buffers[col_idx]->add_value(
2552  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
2553  } else {
2554  import_buffers[col_idx]->add_value(cd, empty_value, true);
2555  }
2556  col_idx++;
2557  }
2558  rows_completed++;
2559  } catch (const std::exception& e) {
2560  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2561  import_buffers[col_idx_to_pop]->pop_value();
2562  }
2563  LOG(ERROR) << "Input exception thrown: " << e.what()
2564  << ". Row discarded, issue at column : " << (col_idx + 1)
2565  << " data :" << row;
2566  }
2567  }
2568  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2569  session_ptr->getCatalog(), table_name);
2570  loader->load(import_buffers, rows_completed);
2571  } catch (const std::exception& e) {
2572  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2573  }
2574 }
2575 
2576 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
2578  const Catalog_Namespace::SessionInfo& session_info,
2579  const std::string& table_name,
2580  size_t num_cols,
2581  std::unique_ptr<import_export::Loader>* loader,
2582  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
2583  const std::vector<std::string>& column_names) {
2584  if (num_cols == 0) {
2585  THROW_MAPD_EXCEPTION("No columns to insert");
2586  }
2587  auto& cat = session_info.getCatalog();
2588  auto td_with_lock =
2589  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
2591  cat, table_name, true));
2592  const auto td = (*td_with_lock)();
2593  CHECK(td);
2594  if (g_cluster && !leaf_aggregator_.leafCount()) {
2595  // Sharded table rows need to be routed to the leaf by an aggregator.
2597  }
2598  check_table_load_privileges(session_info, table_name);
2599 
2600  if (leaf_aggregator_.leafCount() > 0) {
2601  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2602  } else {
2603  loader->reset(new import_export::Loader(cat, td));
2604  }
2605  auto col_descs = (*loader)->get_column_descs();
2606  check_valid_column_names(col_descs, column_names);
2607  if (column_names.empty()) {
2608  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2609  // Subtracting 1 (rowid) until TableDescriptor is updated.
2610  auto geo_physical_cols = std::count_if(
2611  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2612  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
2613  (td->hasDeletedCol ? 2 : 1);
2614  if (num_cols != num_table_cols) {
2615  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
2616  ") does not match number of columns in table " +
2617  td->tableName + " (" + std::to_string(num_table_cols) +
2618  ")");
2619  }
2620  } else if (num_cols != column_names.size()) {
2622  "Number of columns specified does not match the "
2623  "number of columns given (" +
2624  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
2625  }
2626  *import_buffers = import_export::setup_column_loaders(td, loader->get());
2627  return std::move(td_with_lock);
2628 }
2629 
2630 namespace {
2631 
2632 size_t get_column_size(const TColumn& column) {
2633  if (!column.nulls.empty()) {
2634  return column.nulls.size();
2635  } else {
2636  // it is a very bold estimate but later we check it against REAL data
2637  // and if this function returns a wrong result (e.g. both int and string
2638  // vectors are filled with values), we get an error
2639  return column.data.int_col.size() + column.data.arr_col.size() +
2640  column.data.real_col.size() + column.data.str_col.size();
2641  }
2642 }
2643 
2644 } // namespace
2645 
2646 void DBHandler::load_table_binary_columnar(const TSessionId& session,
2647  const std::string& table_name,
2648  const std::vector<TColumn>& cols,
2649  const std::vector<std::string>& column_names) {
2650  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2651  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2652  auto session_ptr = stdlog.getConstSessionInfo();
2653  check_read_only("load_table_binary_columnar");
2654  auto const& cat = session_ptr->getCatalog();
2655  std::unique_ptr<import_export::Loader> loader;
2656  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2658  *session_ptr, table_name, cols.size(), &loader, &import_buffers, column_names);
2659  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2660  session_ptr->getCatalog(), table_name);
2661  auto desc_id_to_column_id =
2662  column_ids_by_names(loader->get_column_descs(), column_names);
2663  size_t num_rows = get_column_size(cols.front());
2664  size_t import_idx = 0; // index into the TColumn vector being loaded
2665  size_t col_idx = 0; // index into column description vector
2666  TColumn empty_column;
2667  empty_column.nulls.resize(num_rows, true);
2668  empty_column.data.arr_col.resize(num_rows);
2669  empty_column.data.int_col.resize(num_rows);
2670  empty_column.data.str_col.resize(num_rows);
2671  empty_column.data.real_col.resize(num_rows);
2672  try {
2673  size_t skip_physical_cols = 0;
2674  for (auto cd : loader->get_column_descs()) {
2675  if (skip_physical_cols > 0) {
2676  if (!cd->isGeoPhyCol) {
2677  throw std::runtime_error("Unexpected physical column");
2678  }
2679  skip_physical_cols--;
2680  continue;
2681  }
2682  int mapped_idx = desc_id_to_column_id[import_idx];
2683  const TColumn& value = mapped_idx == -1 ? empty_column : cols[mapped_idx];
2684  size_t col_rows = import_buffers[col_idx]->add_values(cd, value);
2685  if (col_rows != num_rows) {
2686  std::ostringstream oss;
2687  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
2688  << cd->columnName << " , expecting " << num_rows << " rows, column "
2689  << col_idx << " has " << col_rows << " rows";
2690  THROW_MAPD_EXCEPTION(oss.str());
2691  }
2692  // Advance to the next column in the table
2693  col_idx++;
2694 
2695  // For geometry columns: process WKT strings and fill physical columns
2696  if (cd->columnType.is_geometry()) {
2697  auto geo_col_idx = col_idx - 1;
2698  const auto wkt_or_wkb_hex_column =
2699  import_buffers[geo_col_idx]->getGeoStringBuffer();
2700  std::vector<std::vector<double>> coords_column, bounds_column;
2701  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2702  int render_group = 0;
2703  SQLTypeInfo ti = cd->columnType;
2704  if (num_rows != wkt_or_wkb_hex_column->size() ||
2705  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
2706  ti,
2707  coords_column,
2708  bounds_column,
2709  ring_sizes_column,
2710  poly_rings_column,
2711  false)) {
2712  std::ostringstream oss;
2713  oss << "load_table_binary_columnar: Invalid geometry in column "
2714  << cd->columnName;
2715  THROW_MAPD_EXCEPTION(oss.str());
2716  }
2717  // Populate physical columns, advance col_idx
2719  cat,
2720  cd,
2721  import_buffers,
2722  col_idx,
2723  coords_column,
2724  bounds_column,
2725  ring_sizes_column,
2726  poly_rings_column,
2727  render_group);
2728  skip_physical_cols = cd->columnType.get_physical_cols();
2729  }
2730  // Advance to the next column of values being loaded
2731  import_idx++;
2732  }
2733  } catch (const std::exception& e) {
2734  std::ostringstream oss;
2735  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
2736  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2737  THROW_MAPD_EXCEPTION(oss.str());
2738  }
2739  loader->load(import_buffers, num_rows);
2740 }
2741 
2742 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
2743 
2744 #define ARROW_THRIFT_THROW_NOT_OK(s) \
2745  do { \
2746  ::arrow::Status _s = (s); \
2747  if (UNLIKELY(!_s.ok())) { \
2748  TOmniSciException ex; \
2749  ex.error_msg = _s.ToString(); \
2750  LOG(ERROR) << s.ToString(); \
2751  throw ex; \
2752  } \
2753  } while (0)
2754 
2755 namespace {
2756 
2757 RecordBatchVector loadArrowStream(const std::string& stream) {
2758  RecordBatchVector batches;
2759  try {
2760  // TODO(wesm): Make this simpler in general, see ARROW-1600
2761  auto stream_buffer =
2762  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
2763  static_cast<int64_t>(stream.size()));
2764 
2765  arrow::io::BufferReader buf_reader(stream_buffer);
2766  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
2767  ARROW_ASSIGN_OR_THROW(batch_reader,
2768  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
2769 
2770  while (true) {
2771  std::shared_ptr<arrow::RecordBatch> batch;
2772  // Read batch (zero-copy) from the stream
2773  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
2774  if (batch == nullptr) {
2775  break;
2776  }
2777  batches.emplace_back(std::move(batch));
2778  }
2779  } catch (const std::exception& e) {
2780  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
2781  }
2782  return batches;
2783 }
2784 
2785 } // namespace
2786 
2787 void DBHandler::load_table_binary_arrow(const TSessionId& session,
2788  const std::string& table_name,
2789  const std::string& arrow_stream,
2790  const bool use_column_names) {
2791  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2792  auto session_ptr = stdlog.getConstSessionInfo();
2793  check_read_only("load_table_binary_arrow");
2794  RecordBatchVector batches = loadArrowStream(arrow_stream);
2795  // Assuming have one batch for now
2796  if (batches.size() != 1) {
2797  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
2798  }
2799  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
2800  std::unique_ptr<import_export::Loader> loader;
2801  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2802  std::vector<std::string> column_names;
2803  if (use_column_names) {
2804  column_names = batch->schema()->field_names();
2805  }
2806  auto read_lock = prepare_columnar_loader(*session_ptr,
2807  table_name,
2808  static_cast<size_t>(batch->num_columns()),
2809  &loader,
2810  &import_buffers,
2811  column_names);
2812  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2813  session_ptr->getCatalog(), table_name);
2814  auto desc_id_to_column_id =
2815  column_ids_by_names(loader->get_column_descs(), column_names);
2816  size_t num_rows = 0;
2817  size_t col_idx = 0;
2818  std::shared_ptr<arrow::Array> empty_array;
2819  {
2820  arrow::BooleanBuilder builder;
2821  ARROW_THROW_NOT_OK(builder.Resize(batch->num_rows()));
2822  ARROW_THROW_NOT_OK(builder.AppendNulls(batch->num_rows()));
2823  auto status = builder.Finish(&empty_array);
2824  if (!status.ok()) {
2825  THROW_MAPD_EXCEPTION("Failed to load data: " + status.message());
2826  }
2827  }
2828 
2829  try {
2830  for (auto cd : loader->get_column_descs()) {
2831  int mapped_idx = desc_id_to_column_id[col_idx];
2832  if (mapped_idx != -1) {
2833  auto& array = *batch->column(mapped_idx);
2834  import_export::ArraySliceRange row_slice(0, array.length());
2835  num_rows = import_buffers[col_idx]->add_arrow_values(
2836  cd, array, true, row_slice, nullptr);
2837  } else {
2838  import_export::ArraySliceRange row_slice(0, empty_array->length());
2839  num_rows = import_buffers[col_idx]->add_arrow_values(
2840  cd, *empty_array, false, row_slice, nullptr);
2841  }
2842  col_idx++;
2843  }
2844  } catch (const std::exception& e) {
2845  LOG(ERROR) << "Input exception thrown: " << e.what()
2846  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2847  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
2848  // other import paths
2849  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2850  }
2851  loader->load(import_buffers, num_rows);
2852 }
2853 
2854 void DBHandler::load_table(const TSessionId& session,
2855  const std::string& table_name,
2856  const std::vector<TStringRow>& rows,
2857  const std::vector<std::string>& column_names) {
2858  try {
2859  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2860  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2861  auto session_ptr = stdlog.getConstSessionInfo();
2862  check_read_only("load_table");
2863  auto& cat = session_ptr->getCatalog();
2864  const auto td_with_lock =
2866  cat, table_name, true);
2867  const auto td = td_with_lock();
2868  CHECK(td);
2869 
2870  if (g_cluster && !leaf_aggregator_.leafCount()) {
2871  // Sharded table rows need to be routed to the leaf by an aggregator.
2873  }
2874  check_table_load_privileges(*session_ptr, table_name);
2875  if (rows.empty()) {
2876  THROW_MAPD_EXCEPTION("No rows to insert");
2877  }
2878  std::unique_ptr<import_export::Loader> loader;
2879  if (leaf_aggregator_.leafCount() > 0) {
2880  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2881  } else {
2882  loader.reset(new import_export::Loader(cat, td));
2883  }
2884  auto col_descs = loader->get_column_descs();
2885  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
2886  check_valid_column_names(col_descs, column_names);
2887  if (column_names.empty()) {
2888  auto geo_physical_cols = std::count_if(
2889  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2890  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2891  // Subtracting 1 (rowid) until TableDescriptor is updated.
2892  if (rows.front().cols.size() != static_cast<size_t>(td->nColumns) -
2893  geo_physical_cols -
2894  (td->hasDeletedCol ? 2 : 1)) {
2895  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name +
2896  " (" + std::to_string(rows.front().cols.size()) + " vs " +
2897  std::to_string(td->nColumns - geo_physical_cols - 1) + ")");
2898  }
2899  } else {
2900  if (rows.front().cols.size() != column_names.size()) {
2902  "Number of columns specified does not match the "
2903  "number of columns given (" +
2904  std::to_string(rows.front().cols.size()) + " vs " +
2905  std::to_string(column_names.size()) + ")");
2906  }
2907  }
2908 
2909  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2910  for (auto cd : col_descs) {
2911  import_buffers.push_back(std::unique_ptr<import_export::TypedImportBuffer>(
2912  new import_export::TypedImportBuffer(cd, loader->getStringDict(cd))));
2913  }
2914  import_export::CopyParams copy_params;
2915  size_t rows_completed = 0;
2916  for (auto const& row : rows) {
2917  size_t import_idx = 0; // index into the TStringRow being loaded
2918  size_t col_idx = 0; // index into column description vector
2919  try {
2920  size_t skip_physical_cols = 0;
2921  for (auto cd : col_descs) {
2922  if (skip_physical_cols > 0) {
2923  if (!cd->isGeoPhyCol) {
2924  throw std::runtime_error("Unexpected physical column");
2925  }
2926  skip_physical_cols--;
2927  continue;
2928  }
2929  const std::string empty_val = "";
2930  int mapped_idx = desc_id_to_column_id[import_idx];
2931  const std::string& value =
2932  mapped_idx == -1 ? empty_val : row.cols[mapped_idx].str_val;
2933  bool is_null = mapped_idx == -1 || row.cols[mapped_idx].is_null;
2934  import_buffers[col_idx]->add_value(cd, value, is_null, copy_params);
2935  // Advance to the next column within the table
2936  col_idx++;
2937 
2938  if (cd->columnType.is_geometry()) {
2939  // Populate physical columns
2940  std::vector<double> coords, bounds;
2941  std::vector<int> ring_sizes, poly_rings;
2942  int render_group = 0;
2943  SQLTypeInfo ti{cd->columnType};
2945  !is_null ? value : std::string(),
2946  ti,
2947  coords,
2948  bounds,
2949  ring_sizes,
2950  poly_rings,
2951  false)) {
2952  throw std::runtime_error("Invalid geometry");
2953  }
2954  if (cd->columnType.get_type() != ti.get_type()) {
2955  throw std::runtime_error("Geometry type mismatch");
2956  }
2958  cd,
2959  import_buffers,
2960  col_idx,
2961  coords,
2962  bounds,
2963  ring_sizes,
2964  poly_rings,
2965  render_group);
2966  skip_physical_cols = cd->columnType.get_physical_cols();
2967  }
2968  // Advance to the next field within the row
2969  import_idx++;
2970  }
2971  rows_completed++;
2972  } catch (const std::exception& e) {
2973  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2974  import_buffers[col_idx_to_pop]->pop_value();
2975  }
2976  LOG(ERROR) << "Input exception thrown: " << e.what()
2977  << ". Row discarded, issue at column : " << (col_idx + 1)
2978  << " data :" << row;
2979  }
2980  }
2981  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2982  session_ptr->getCatalog(), table_name);
2983  loader->load(import_buffers, rows_completed);
2984  } catch (const std::exception& e) {
2985  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2986  }
2987 }
2988 
2989 char DBHandler::unescape_char(std::string str) {
2990  char out = str[0];
2991  if (str.size() == 2 && str[0] == '\\') {
2992  if (str[1] == 't') {
2993  out = '\t';
2994  } else if (str[1] == 'n') {
2995  out = '\n';
2996  } else if (str[1] == '0') {
2997  out = '\0';
2998  } else if (str[1] == '\'') {
2999  out = '\'';
3000  } else if (str[1] == '\\') {
3001  out = '\\';
3002  }
3003  }
3004  return out;
3005 }
3006 
3008  import_export::CopyParams copy_params;
3009  switch (cp.has_header) {
3010  case TImportHeaderRow::AUTODETECT:
3012  break;
3013  case TImportHeaderRow::NO_HEADER:
3015  break;
3016  case TImportHeaderRow::HAS_HEADER:
3018  break;
3019  default:
3020  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
3021  std::to_string((int)cp.has_header));
3022  break;
3023  }
3024  copy_params.quoted = cp.quoted;
3025  if (cp.delimiter.length() > 0) {
3026  copy_params.delimiter = unescape_char(cp.delimiter);
3027  } else {
3028  copy_params.delimiter = '\0';
3029  }
3030  if (cp.null_str.length() > 0) {
3031  copy_params.null_str = cp.null_str;
3032  }
3033  if (cp.quote.length() > 0) {
3034  copy_params.quote = unescape_char(cp.quote);
3035  }
3036  if (cp.escape.length() > 0) {
3037  copy_params.escape = unescape_char(cp.escape);
3038  }
3039  if (cp.line_delim.length() > 0) {
3040  copy_params.line_delim = unescape_char(cp.line_delim);
3041  }
3042  if (cp.array_delim.length() > 0) {
3043  copy_params.array_delim = unescape_char(cp.array_delim);
3044  }
3045  if (cp.array_begin.length() > 0) {
3046  copy_params.array_begin = unescape_char(cp.array_begin);
3047  }
3048  if (cp.array_end.length() > 0) {
3049  copy_params.array_end = unescape_char(cp.array_end);
3050  }
3051  if (cp.threads != 0) {
3052  copy_params.threads = cp.threads;
3053  }
3054  if (cp.s3_access_key.length() > 0) {
3055  copy_params.s3_access_key = cp.s3_access_key;
3056  }
3057  if (cp.s3_secret_key.length() > 0) {
3058  copy_params.s3_secret_key = cp.s3_secret_key;
3059  }
3060  if (cp.s3_region.length() > 0) {
3061  copy_params.s3_region = cp.s3_region;
3062  }
3063  if (cp.s3_endpoint.length() > 0) {
3064  copy_params.s3_endpoint = cp.s3_endpoint;
3065  }
3066  switch (cp.file_type) {
3067  case TFileType::POLYGON:
3069  break;
3070  case TFileType::DELIMITED:
3072  break;
3073 #ifdef ENABLE_IMPORT_PARQUET
3074  case TFileType::PARQUET:
3075  copy_params.file_type = import_export::FileType::PARQUET;
3076  break;
3077 #endif
3078  default:
3079  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
3080  std::to_string((int)cp.file_type));
3081  break;
3082  }
3083  switch (cp.geo_coords_encoding) {
3084  case TEncodingType::GEOINT:
3085  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3086  break;
3087  case TEncodingType::NONE:
3088  copy_params.geo_coords_encoding = kENCODING_NONE;
3089  break;
3090  default:
3091  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3092  std::to_string((int)cp.geo_coords_encoding));
3093  break;
3094  }
3095  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3096  switch (cp.geo_coords_type) {
3097  case TDatumType::GEOGRAPHY:
3098  copy_params.geo_coords_type = kGEOGRAPHY;
3099  break;
3100  case TDatumType::GEOMETRY:
3101  copy_params.geo_coords_type = kGEOMETRY;
3102  break;
3103  default:
3104  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3105  std::to_string((int)cp.geo_coords_type));
3106  break;
3107  }
3108  switch (cp.geo_coords_srid) {
3109  case 4326:
3110  case 3857:
3111  case 900913:
3112  copy_params.geo_coords_srid = cp.geo_coords_srid;
3113  break;
3114  default:
3115  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3116  std::to_string((int)cp.geo_coords_srid));
3117  break;
3118  }
3119  copy_params.sanitize_column_names = cp.sanitize_column_names;
3120  copy_params.geo_layer_name = cp.geo_layer_name;
3121  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3122  copy_params.geo_explode_collections = cp.geo_explode_collections;
3123  copy_params.source_srid = cp.source_srid;
3124  return copy_params;
3125 }
3126 
3128  TCopyParams copy_params;
3129  copy_params.delimiter = cp.delimiter;
3130  copy_params.null_str = cp.null_str;
3131  switch (cp.has_header) {
3133  copy_params.has_header = TImportHeaderRow::AUTODETECT;
3134  break;
3136  copy_params.has_header = TImportHeaderRow::NO_HEADER;
3137  break;
3139  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
3140  break;
3141  default:
3142  CHECK(false);
3143  break;
3144  }
3145  copy_params.quoted = cp.quoted;
3146  copy_params.quote = cp.quote;
3147  copy_params.escape = cp.escape;
3148  copy_params.line_delim = cp.line_delim;
3149  copy_params.array_delim = cp.array_delim;
3150  copy_params.array_begin = cp.array_begin;
3151  copy_params.array_end = cp.array_end;
3152  copy_params.threads = cp.threads;
3153  copy_params.s3_access_key = cp.s3_access_key;
3154  copy_params.s3_secret_key = cp.s3_secret_key;
3155  copy_params.s3_region = cp.s3_region;
3156  copy_params.s3_endpoint = cp.s3_endpoint;
3157  switch (cp.file_type) {
3159  copy_params.file_type = TFileType::POLYGON;
3160  break;
3161  default:
3162  copy_params.file_type = TFileType::DELIMITED;
3163  break;
3164  }
3165  switch (cp.geo_coords_encoding) {
3166  case kENCODING_GEOINT:
3167  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
3168  break;
3169  default:
3170  copy_params.geo_coords_encoding = TEncodingType::NONE;
3171  break;
3172  }
3173  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3174  switch (cp.geo_coords_type) {
3175  case kGEOGRAPHY:
3176  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
3177  break;
3178  case kGEOMETRY:
3179  copy_params.geo_coords_type = TDatumType::GEOMETRY;
3180  break;
3181  default:
3182  CHECK(false);
3183  break;
3184  }
3185  copy_params.geo_coords_srid = cp.geo_coords_srid;
3186  copy_params.sanitize_column_names = cp.sanitize_column_names;
3187  copy_params.geo_layer_name = cp.geo_layer_name;
3188  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3189  copy_params.geo_explode_collections = cp.geo_explode_collections;
3190  copy_params.source_srid = cp.source_srid;
3191  return copy_params;
3192 }
3193 
3194 namespace {
3195 void add_vsi_network_prefix(std::string& path) {
3196  // do we support network file access?
3197  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
3198 
3199  // modify head of filename based on source location
3200  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
3201  if (!gdal_network) {
3203  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
3204  }
3205  // invoke GDAL CURL virtual file reader
3206  path = "/vsicurl/" + path;
3207  } else if (boost::istarts_with(path, "s3://")) {
3208  if (!gdal_network) {
3210  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
3211  }
3212  // invoke GDAL S3 virtual file reader
3213  boost::replace_first(path, "s3://", "/vsis3/");
3214  }
3215 }
3216 
3217 void add_vsi_geo_prefix(std::string& path) {
3218  // single gzip'd file (not an archive)?
3219  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3220  path = "/vsigzip/" + path;
3221  }
3222 }
3223 
3224 void add_vsi_archive_prefix(std::string& path) {
3225  // check for compressed file or file bundle
3226  if (boost::iends_with(path, ".zip")) {
3227  // zip archive
3228  path = "/vsizip/" + path;
3229  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3230  boost::iends_with(path, ".tar.gz")) {
3231  // tar archive (compressed or uncompressed)
3232  path = "/vsitar/" + path;
3233  }
3234 }
3235 
3236 std::string remove_vsi_prefixes(const std::string& path_in) {
3237  std::string path(path_in);
3238 
3239  // these will be first
3240  if (boost::istarts_with(path, "/vsizip/")) {
3241  boost::replace_first(path, "/vsizip/", "");
3242  } else if (boost::istarts_with(path, "/vsitar/")) {
3243  boost::replace_first(path, "/vsitar/", "");
3244  } else if (boost::istarts_with(path, "/vsigzip/")) {
3245  boost::replace_first(path, "/vsigzip/", "");
3246  }
3247 
3248  // then these
3249  if (boost::istarts_with(path, "/vsicurl/")) {
3250  boost::replace_first(path, "/vsicurl/", "");
3251  } else if (boost::istarts_with(path, "/vsis3/")) {
3252  boost::replace_first(path, "/vsis3/", "s3://");
3253  }
3254 
3255  return path;
3256 }
3257 
3258 bool path_is_relative(const std::string& path) {
3259  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
3260  boost::istarts_with(path, "https://")) {
3261  return false;
3262  }
3263  return !boost::filesystem::path(path).is_absolute();
3264 }
3265 
3266 bool path_has_valid_filename(const std::string& path) {
3267  auto filename = boost::filesystem::path(path).filename().string();
3268  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
3269  return false;
3270  }
3271  return true;
3272 }
3273 
3274 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
3275  if (!path_has_valid_filename(path)) {
3276  return false;
3277  }
3278  if (include_gz) {
3279  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
3280  return true;
3281  }
3282  }
3283  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
3284  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
3285  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
3286  boost::iends_with(path, ".gdb.zip")) {
3287  return true;
3288  }
3289  return false;
3290 }
3291 
3292 bool is_a_supported_archive_file(const std::string& path) {
3293  if (!path_has_valid_filename(path)) {
3294  return false;
3295  }
3296  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
3297  return true;
3298  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3299  boost::iends_with(path, ".tar.gz")) {
3300  return true;
3301  }
3302  return false;
3303 }
3304 
3305 std::string find_first_geo_file_in_archive(const std::string& archive_path,
3306  const import_export::CopyParams& copy_params) {
3307  // get the recursive list of all files in the archive
3308  std::vector<std::string> files =
3309  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
3310 
3311  // report the list
3312  LOG(INFO) << "Found " << files.size() << " files in Archive "
3313  << remove_vsi_prefixes(archive_path);
3314  for (const auto& file : files) {
3315  LOG(INFO) << " " << file;
3316  }
3317 
3318  // scan the list for the first candidate file
3319  bool found_suitable_file = false;
3320  std::string file_name;
3321  for (const auto& file : files) {
3322  if (is_a_supported_geo_file(file, false)) {
3323  file_name = file;
3324  found_suitable_file = true;
3325  break;
3326  }
3327  }
3328 
3329  // if we didn't find anything
3330  if (!found_suitable_file) {
3331  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
3332  remove_vsi_prefixes(archive_path);
3333  file_name.clear();
3334  }
3335 
3336  // done
3337  return file_name;
3338 }
3339 
3340 bool is_local_file(const std::string& file_path) {
3341  return (!boost::istarts_with(file_path, "s3://") &&
3342  !boost::istarts_with(file_path, "http://") &&
3343  !boost::istarts_with(file_path, "https://"));
3344 }
3345 
3346 void validate_import_file_path_if_local(const std::string& file_path) {
3347  if (is_local_file(file_path)) {
3349  }
3350 }
3351 } // namespace
3352 
3353 void DBHandler::detect_column_types(TDetectResult& _return,
3354  const TSessionId& session,
3355  const std::string& file_name_in,
3356  const TCopyParams& cp) {
3357  auto stdlog = STDLOG(get_session_ptr(session));
3358  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3359  check_read_only("detect_column_types");
3360 
3362 
3363  std::string file_name{file_name_in};
3364  if (path_is_relative(file_name)) {
3365  // assume relative paths are relative to data_path / mapd_import / <session>
3366  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3367  boost::filesystem::path(file_name).filename();
3368  file_name = file_path.string();
3369  }
3371 
3372  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
3373  if (copy_params.file_type == import_export::FileType::POLYGON) {
3374  if (is_a_supported_geo_file(file_name, true)) {
3375  // prepare to detect geo file directly
3376  add_vsi_network_prefix(file_name);
3377  add_vsi_geo_prefix(file_name);
3378  } else if (is_a_supported_archive_file(file_name)) {
3379  // find the archive file
3380  add_vsi_network_prefix(file_name);
3381  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
3382  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3383  }
3384  // find geo file in archive
3385  add_vsi_archive_prefix(file_name);
3386  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3387  // prepare to detect that geo file
3388  if (geo_file.size()) {
3389  file_name = file_name + std::string("/") + geo_file;
3390  }
3391  } else {
3392  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
3393  file_name_in);
3394  }
3395  }
3396 
3397  auto file_path = boost::filesystem::path(file_name);
3398  // can be a s3 url
3399  if (!boost::istarts_with(file_name, "s3://")) {
3400  if (!boost::filesystem::path(file_name).is_absolute()) {
3401  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3402  boost::filesystem::path(file_name).filename();
3403  file_name = file_path.string();
3404  }
3405 
3406  if (copy_params.file_type == import_export::FileType::POLYGON) {
3407  // check for geo file
3408  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3409  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3410  }
3411  } else {
3412  // check for regular file
3413  if (!boost::filesystem::exists(file_path)) {
3414  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3415  }
3416  }
3417  }
3418 
3419  try {
3421 #ifdef ENABLE_IMPORT_PARQUET
3422  || (copy_params.file_type == import_export::FileType::PARQUET)
3423 #endif
3424  ) {
3425  import_export::Detector detector(file_path, copy_params);
3426  std::vector<SQLTypes> best_types = detector.best_sqltypes;
3427  std::vector<EncodingType> best_encodings = detector.best_encodings;
3428  std::vector<std::string> headers = detector.get_headers();
3429  copy_params = detector.get_copy_params();
3430 
3431  _return.copy_params = copyparams_to_thrift(copy_params);
3432  _return.row_set.row_desc.resize(best_types.size());
3433  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
3434  TColumnType col;
3435  SQLTypes t = best_types[col_idx];
3436  EncodingType encodingType = best_encodings[col_idx];
3437  SQLTypeInfo ti(t, false, encodingType);
3438  if (IS_GEO(t)) {
3439  // set this so encoding_to_thrift does the right thing
3440  ti.set_compression(copy_params.geo_coords_encoding);
3441  // fill in these directly
3442  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3443  col.col_type.scale = copy_params.geo_coords_srid;
3444  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3445  }
3446  col.col_type.type = type_to_thrift(ti);
3447  col.col_type.encoding = encoding_to_thrift(ti);
3448  if (copy_params.sanitize_column_names) {
3449  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3450  } else {
3451  col.col_name = headers[col_idx];
3452  }
3453  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3454  _return.row_set.row_desc[col_idx] = col;
3455  }
3456  size_t num_samples = 100;
3457  auto sample_data = detector.get_sample_rows(num_samples);
3458 
3459  TRow sample_row;
3460  for (auto row : sample_data) {
3461  sample_row.cols.clear();
3462  for (const auto& s : row) {
3463  TDatum td;
3464  td.val.str_val = s;
3465  td.is_null = s.empty();
3466  sample_row.cols.push_back(td);
3467  }
3468  _return.row_set.rows.push_back(sample_row);
3469  }
3470  } else if (copy_params.file_type == import_export::FileType::POLYGON) {
3471  // @TODO simon.eves get this from somewhere!
3472  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
3473 
3474  check_geospatial_files(file_path, copy_params);
3475  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
3476  file_path.string(), geoColumnName, copy_params);
3477  for (auto cd : cds) {
3478  if (copy_params.sanitize_column_names) {
3479  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
3480  }
3481  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
3482  }
3483  std::map<std::string, std::vector<std::string>> sample_data;
3485  file_path.string(), geoColumnName, sample_data, 100, copy_params);
3486  if (sample_data.size() > 0) {
3487  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
3488  TRow sample_row;
3489  for (auto cd : cds) {
3490  TDatum td;
3491  td.val.str_val = sample_data[cd.sourceName].at(i);
3492  td.is_null = td.val.str_val.empty();
3493  sample_row.cols.push_back(td);
3494  }
3495  _return.row_set.rows.push_back(sample_row);
3496  }
3497  }
3498  _return.copy_params = copyparams_to_thrift(copy_params);
3499  }
3500  } catch (const std::exception& e) {
3501  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
3502  }
3503 }
3504 
3505 void DBHandler::render_vega(TRenderResult& _return,
3506  const TSessionId& session,
3507  const int64_t widget_id,
3508  const std::string& vega_json,
3509  const int compression_level,
3510  const std::string& nonce) {
3511  auto stdlog = STDLOG(get_session_ptr(session),
3512  "widget_id",
3513  widget_id,
3514  "compression_level",
3515  compression_level,
3516  "vega_json",
3517  vega_json,
3518  "nonce",
3519  nonce);
3520  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3521  stdlog.appendNameValuePairs("nonce", nonce);
3522  auto session_ptr = stdlog.getConstSessionInfo();
3523  if (!render_handler_) {
3524  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
3525  }
3526 
3527  _return.total_time_ms = measure<>::execution([&]() {
3528  try {
3529  render_handler_->render_vega(
3530  _return,
3531  std::make_shared<Catalog_Namespace::SessionInfo>(*session_ptr),
3532  widget_id,
3533  vega_json,
3534  compression_level,
3535  nonce);
3536  } catch (std::exception& e) {
3537  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3538  }
3539  });
3540 }
3541 
3543  int32_t dashboard_id,
3544  AccessPrivileges requestedPermissions) {
3545  DBObject object(dashboard_id, DashboardDBObjectType);
3546  auto& catalog = session_info.getCatalog();
3547  auto& user = session_info.get_currentUser();
3548  object.loadKey(catalog);
3549  object.setPrivileges(requestedPermissions);
3550  std::vector<DBObject> privs = {object};
3551  return SysCatalog::instance().checkPrivileges(user, privs);
3552 }
3553 
3554 // dashboards
3555 void DBHandler::get_dashboard(TDashboard& dashboard,
3556  const TSessionId& session,
3557  const int32_t dashboard_id) {
3558  auto stdlog = STDLOG(get_session_ptr(session));
3559  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3560  auto session_ptr = stdlog.getConstSessionInfo();
3561  auto const& cat = session_ptr->getCatalog();
3563  auto dash = cat.getMetadataForDashboard(dashboard_id);
3564  if (!dash) {
3565  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
3566  " doesn't exist");
3567  }
3569  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3570  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
3571  std::to_string(dashboard_id));
3572  }
3573  user_meta.userName = "";
3574  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3575  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
3576 }
3577 
3578 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
3579  const TSessionId& session) {
3580  auto stdlog = STDLOG(get_session_ptr(session));
3581  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3582  auto session_ptr = stdlog.getConstSessionInfo();
3583  auto const& cat = session_ptr->getCatalog();
3585  const auto dashes = cat.getAllDashboardsMetadata();
3586  user_meta.userName = "";
3587  for (const auto dash : dashes) {
3589  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3590  // dashboardState is intentionally not populated here
3591  // for payload reasons
3592  // use get_dashboard call to get state
3593  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
3594  }
3595  }
3596 }
3597 
3599  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
3601  const DashboardDescriptor* dash,
3602  const bool populate_state) {
3603  auto const& cat = session_ptr->getCatalog();
3604  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3605  auto objects_list = SysCatalog::instance().getMetadataForObject(
3606  cat.getCurrentDB().dbId,
3607  static_cast<int>(DBObjectType::DashboardDBObjectType),
3608  dash->dashboardId);
3609  TDashboard dashboard;
3610  dashboard.dashboard_name = dash->dashboardName;
3611  if (populate_state)
3612  dashboard.dashboard_state = dash->dashboardState;
3613  dashboard.image_hash = dash->imageHash;
3614  dashboard.update_time = dash->updateTime;
3615  dashboard.dashboard_metadata = dash->dashboardMetadata;
3616  dashboard.dashboard_id = dash->dashboardId;
3617  dashboard.dashboard_owner = dash->user;
3618  TDashboardPermissions perms;
3619  // Super user has all permissions.
3620  if (session_ptr->get_currentUser().isSuper) {
3621  perms.create_ = true;
3622  perms.delete_ = true;
3623  perms.edit_ = true;
3624  perms.view_ = true;
3625  } else {
3626  // Collect all grants on current user
3627  // add them to the permissions.
3628  auto obj_to_find =
3629  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
3630  obj_to_find.loadKey(session_ptr->getCatalog());
3631  std::vector<std::string> grantees =
3632  SysCatalog::instance().getRoles(true,
3633  session_ptr->get_currentUser().isSuper,
3634  session_ptr->get_currentUser().userName);
3635  for (const auto& grantee : grantees) {
3636  DBObject* object_found;
3637  auto* gr = SysCatalog::instance().getGrantee(grantee);
3638  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
3639  const auto obj_privs = object_found->getPrivileges();
3640  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3641  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3642  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3643  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3644  }
3645  }
3646  }
3647  dashboard.dashboard_permissions = perms;
3648  if (objects_list.empty() ||
3649  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3650  dashboard.is_dash_shared = false;
3651  } else {
3652  dashboard.is_dash_shared = true;
3653  }
3654  return dashboard;
3655 }
3656 
3657 int32_t DBHandler::create_dashboard(const TSessionId& session,
3658  const std::string& dashboard_name,
3659  const std::string& dashboard_state,
3660  const std::string& image_hash,
3661  const std::string& dashboard_metadata) {
3662  auto stdlog = STDLOG(get_session_ptr(session));
3663  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3664  auto session_ptr = stdlog.getConstSessionInfo();
3665  check_read_only("create_dashboard");
3666  auto& cat = session_ptr->getCatalog();
3667 
3668  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
3670  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
3671  }
3672 
3673  auto dash = cat.getMetadataForDashboard(
3674  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
3675  if (dash) {
3676  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
3677  }
3678 
3680  dd.dashboardName = dashboard_name;
3681  dd.dashboardState = dashboard_state;
3682  dd.imageHash = image_hash;
3683  dd.dashboardMetadata = dashboard_metadata;
3684  dd.userId = session_ptr->get_currentUser().userId;
3685  dd.user = session_ptr->get_currentUser().userName;
3686 
3687  try {
3688  auto id = cat.createDashboard(dd);
3689  // TODO: transactionally unsafe
3690  SysCatalog::instance().createDBObject(
3691  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
3692  return id;
3693  } catch (const std::exception& e) {
3694  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3695  }
3696 }
3697 
3698 void DBHandler::replace_dashboard(const TSessionId& session,
3699  const int32_t dashboard_id,
3700  const std::string& dashboard_name,
3701  const std::string& dashboard_owner,
3702  const std::string& dashboard_state,
3703  const std::string& image_hash,
3704  const std::string& dashboard_metadata) {
3705  auto stdlog = STDLOG(get_session_ptr(session));
3706  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3707  auto session_ptr = stdlog.getConstSessionInfo();
3708  CHECK(session_ptr);
3709  check_read_only("replace_dashboard");
3710  auto& cat = session_ptr->getCatalog();
3711 
3713  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
3714  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
3715  }
3716 
3718  dd.dashboardName = dashboard_name;
3719  dd.dashboardState = dashboard_state;
3720  dd.imageHash = image_hash;
3721  dd.dashboardMetadata = dashboard_metadata;
3723  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
3724  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
3725  " does not exist");
3726  }
3727  dd.userId = user.userId;
3728  dd.user = dashboard_owner;
3729  dd.dashboardId = dashboard_id;
3730 
3731  try {
3732  cat.replaceDashboard(dd);
3733  } catch (const std::exception& e) {
3734  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3735  }
3736 }
3737 
3738 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
3739  delete_dashboards(session, {dashboard_id});
3740 }
3741 
3742 void DBHandler::delete_dashboards(const TSessionId& session,
3743  const std::vector<int32_t>& dashboard_ids) {
3744  auto stdlog = STDLOG(get_session_ptr(session));
3745  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3746  auto session_ptr = stdlog.getConstSessionInfo();
3747  check_read_only("delete_dashboards");
3748  auto& cat = session_ptr->getCatalog();
3749  // Checks will be performed in catalog
3750  try {
3751  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
3752  } catch (const std::exception& e) {
3753  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3754  }
3755 }
3756 
3757 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
3758  int32_t dashboard_id,
3759  std::vector<std::string> groups) {
3760  const auto session_info = get_session_copy(session);
3761  auto& cat = session_info.getCatalog();
3762  auto dash = cat.getMetadataForDashboard(dashboard_id);
3763  if (!dash) {
3764  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3765  " does not exist");
3766  } else if (session_info.get_currentUser().userId != dash->userId &&
3767  !session_info.get_currentUser().isSuper) {
3768  throw std::runtime_error(
3769  "User should be either owner of dashboard or super user to share/unshare it");
3770  }
3771  std::vector<std::string> valid_groups;
3773  for (auto& group : groups) {
3774  user_meta.isSuper = false; // initialize default flag
3775  if (!SysCatalog::instance().getGrantee(group)) {
3776  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
3777  } else if (!user_meta.isSuper) {
3778  valid_groups.push_back(group);
3779  }
3780  }
3781  return valid_groups;
3782 }
3783 
3784 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
3785  for (auto const& group : groups) {
3786  if (!SysCatalog::instance().getGrantee(group)) {
3787  THROW_MAPD_EXCEPTION("Exception: User/Role '" + group + "' does not exist");
3788  }
3789  }
3790 }
3791 
3793  const Catalog_Namespace::SessionInfo& session_info,
3794  const std::vector<int32_t>& dashboard_ids) {
3795  auto& cat = session_info.getCatalog();
3796  std::map<std::string, std::list<int32_t>> errors;
3797  for (auto const& dashboard_id : dashboard_ids) {
3798  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
3799  if (!dashboard) {
3800  errors["Dashboard id does not exist"].push_back(dashboard_id);
3801  } else if (session_info.get_currentUser().userId != dashboard->userId &&
3802  !session_info.get_currentUser().isSuper) {
3803  errors["User should be either owner of dashboard or super user to share/unshare it"]
3804  .push_back(dashboard_id);
3805  }
3806  }
3807  if (!errors.empty()) {
3808  std::stringstream error_stream;
3809  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
3810  for (const auto& [error, id_list] : errors) {
3811  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
3812  }
3813  THROW_MAPD_EXCEPTION(error_stream.str());
3814  }
3815 }
3816 
3817 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
3818  const std::vector<int32_t>& dashboard_ids,
3819  const std::vector<std::string>& groups,
3820  const TDashboardPermissions& permissions,
3821  const bool do_share) {
3822  auto stdlog = STDLOG(get_session_ptr(session));
3823  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3824  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
3825  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3826  !permissions.view_) {
3827  THROW_MAPD_EXCEPTION("At least one privilege should be assigned for " +
3828  std::string(do_share ? "grants" : "revokes"));
3829  }
3830  auto session_ptr = stdlog.getConstSessionInfo();
3831  auto const& catalog = session_ptr->getCatalog();
3832  auto& sys_catalog = SysCatalog::instance();
3833  validateGroups(groups);
3834  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
3835  std::vector<DBObject> batch_objects;
3836  for (auto const& dashboard_id : dashboard_ids) {
3837  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
3838  AccessPrivileges privs;
3839  if (permissions.delete_) {
3841  }
3842  if (permissions.create_) {
3844  }
3845  if (permissions.edit_) {
3847  }
3848  if (permissions.view_) {
3850  }
3851  object.setPrivileges(privs);
3852  batch_objects.push_back(object);
3853  }
3854  if (do_share) {
3855  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
3856  } else {
3857  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
3858  }
3859 }
3860 
3861 void DBHandler::share_dashboards(const TSessionId& session,
3862  const std::vector<int32_t>& dashboard_ids,
3863  const std::vector<std::string>& groups,
3864  const TDashboardPermissions& permissions) {
3865  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
3866 }
3867 
3868 // NOOP: Grants not available for objects as of now
3869 void DBHandler::share_dashboard(const TSessionId& session,
3870  const int32_t dashboard_id,
3871  const std::vector<std::string>& groups,
3872  const std::vector<std::string>& objects,
3873  const TDashboardPermissions& permissions,
3874  const bool grant_role = false) {
3875  share_dashboards(session, {dashboard_id}, groups, permissions);
3876 }
3877 
3878 void DBHandler::unshare_dashboards(const TSessionId& session,
3879  const std::vector<int32_t>& dashboard_ids,
3880  const std::vector<std::string>& groups,
3881  const TDashboardPermissions& permissions) {
3882  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
3883 }
3884 
3885 void DBHandler::unshare_dashboard(const TSessionId& session,
3886  const int32_t dashboard_id,
3887  const std::vector<std::string>& groups,
3888  const std::vector<std::string>& objects,
3889  const TDashboardPermissions& permissions) {
3890  unshare_dashboards(session, {dashboard_id}, groups, permissions);
3891 }
3892 
3894  std::vector<TDashboardGrantees>& dashboard_grantees,
3895  const TSessionId& session,
3896  int32_t dashboard_id) {
3897  auto stdlog = STDLOG(get_session_ptr(session));
3898  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3899  auto session_ptr = stdlog.getConstSessionInfo();
3900  auto const& cat = session_ptr->getCatalog();
3902  auto dash = cat.getMetadataForDashboard(dashboard_id);
3903  if (!dash) {
3904  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3905  " does not exist");
3906  } else if (session_ptr->get_currentUser().userId != dash->userId &&
3907  !session_ptr->get_currentUser().isSuper) {
3909  "User should be either owner of dashboard or super user to access grantees");
3910  }
3911  std::vector<ObjectRoleDescriptor*> objectsList;
3912  objectsList = SysCatalog::instance().getMetadataForObject(
3913  cat.getCurrentDB().dbId,
3914  static_cast<int>(DBObjectType::DashboardDBObjectType),
3915  dashboard_id); // By default objecttypecan be only dashabaords
3916  user_meta.userId = -1;
3917  user_meta.userName = "";
3918  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3919  for (auto object : objectsList) {
3920  if (user_meta.userName == object->roleName) {
3921  // Mask owner
3922  continue;
3923  }
3924  TDashboardGrantees grantee;
3925  TDashboardPermissions perm;
3926  grantee.name = object->roleName;
3927  grantee.is_user = object->roleType;
3928  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3929  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3930  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3931  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3932  grantee.permissions = perm;
3933  dashboard_grantees.push_back(grantee);
3934  }
3935 }
3936 
3937 void DBHandler::create_link(std::string& _return,
3938  const TSessionId& session,
3939  const std::string& view_state,
3940  const std::string& view_metadata) {
3941  auto stdlog = STDLOG(get_session_ptr(session));
3942  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3943  auto session_ptr = stdlog.getConstSessionInfo();
3944  // check_read_only("create_link");
3945  auto& cat = session_ptr->getCatalog();
3946 
3947  LinkDescriptor ld;
3948  ld.userId = session_ptr->get_currentUser().userId;
3949  ld.viewState = view_state;
3950  ld.viewMetadata = view_metadata;
3951 
3952  try {
3953  _return = cat.createLink(ld, 6);
3954  } catch (const std::exception& e) {
3955  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3956  }
3957 }
3958 
3960  const std::string& name,
3961  const bool is_array) {
3962  TColumnType ct;
3963  ct.col_name = name;
3964  ct.col_type.type = type;
3965  ct.col_type.is_array = is_array;
3966  return ct;
3967 }
3968 
3969 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
3970  const import_export::CopyParams& copy_params) {
3971  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
3972  if (std::find(shp_ext.begin(),
3973  shp_ext.end(),
3974  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
3975  shp_ext.end()) {
3976  for (auto ext : shp_ext) {
3977  auto aux_file = file_path;
3979  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
3980  copy_params) &&
3982  aux_file.replace_extension(ext).string(), copy_params)) {
3983  throw std::runtime_error("required file for shapefile does not exist: " +
3984  aux_file.filename().string());
3985  }
3986  }
3987  }
3988 }
3989 
3990 void DBHandler::create_table(const TSessionId& session,
3991  const std::string& table_name,
3992  const TRowDescriptor& rd,
3993  const TFileType::type file_type,
3994  const TCreateParams& create_params) {
3995  auto stdlog = STDLOG("table_name", table_name);
3996  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3997  check_read_only("create_table");
3998 
3999  if (ImportHelpers::is_reserved_name(table_name)) {
4000  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
4001  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
4002  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
4003  }
4004 
4005  auto rds = rd;
4006 
4007  // no longer need to manually add the poly column for a TFileType::POLYGON table
4008  // a column of the correct geo type has already been added
4009  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
4010 
4011  std::string stmt{"CREATE TABLE " + table_name};
4012  std::vector<std::string> col_stmts;
4013 
4014  for (auto col : rds) {
4015  if (ImportHelpers::is_reserved_name(col.col_name)) {
4016  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
4017  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
4018  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
4019  }
4020  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
4021  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
4022  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
4023  " for column: " + col.col_name);
4024  }
4025 
4026  if (col.col_type.type == TDatumType::DECIMAL) {
4027  // if no precision or scale passed in set to default 14,7
4028  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
4029  col.col_type.precision = 14;
4030  col.col_type.scale = 7;
4031  }
4032  }
4033 
4034  std::string col_stmt;
4035  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
4036 
4037  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
4038  // `nullable` argument, leading this to default to false. Uncomment for v2.
4039  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
4040 
4041  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
4042  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
4043  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
4044  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
4045  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
4046  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
4047  }
4048  } else if (col.col_type.type == TDatumType::STR) {
4049  // non DICT encoded strings
4050  col_stmt.append(" ENCODING NONE");
4051  } else if (col.col_type.type == TDatumType::POINT ||
4052  col.col_type.type == TDatumType::LINESTRING ||
4053  col.col_type.type == TDatumType::POLYGON ||
4054  col.col_type.type == TDatumType::MULTIPOLYGON) {
4055  // non encoded compressable geo
4056  if (col.col_type.scale == 4326) {
4057  col_stmt.append(" ENCODING NONE");
4058  }
4059  }
4060  col_stmts.push_back(col_stmt);
4061  }
4062 
4063  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
4064 
4065  if (create_params.is_replicated) {
4066  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
4067  }
4068 
4069  stmt.append(";");
4070 
4071  TQueryResult ret;
4072  sql_execute(ret, session, stmt, true, "", -1, -1);
4073 }
4074 
4075 void DBHandler::import_table(const TSessionId& session,
4076  const std::string& table_name,
4077  const std::string& file_name_in,
4078  const TCopyParams& cp) {
4079  try {
4080  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4081  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4082  auto session_ptr = stdlog.getConstSessionInfo();
4083  check_read_only("import_table");
4084  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
4085 
4086  auto& cat = session_ptr->getCatalog();
4087  const auto td_with_lock =
4089  cat, table_name);
4090  const auto td = td_with_lock();
4091  CHECK(td);
4092  check_table_load_privileges(*session_ptr, table_name);
4093 
4094  std::string file_name{file_name_in};
4095  auto file_path = boost::filesystem::path(file_name);
4097  if (!boost::istarts_with(file_name, "s3://")) {
4098  if (!boost::filesystem::path(file_name).is_absolute()) {
4099  file_path = import_path_ / picosha2::hash256_hex_string(session) /
4100  boost::filesystem::path(file_name).filename();
4101  file_name = file_path.string();
4102  }
4103  if (!boost::filesystem::exists(file_path)) {
4104  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
4105  }
4106  }
4108 
4109  // TODO(andrew): add delimiter detection to Importer
4110  if (copy_params.delimiter == '\0') {
4111  copy_params.delimiter = ',';
4112  if (boost::filesystem::extension(file_path) == ".tsv") {
4113  copy_params.delimiter = '\t';
4114  }
4115  }
4116 
4117  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
4118  session_ptr->getCatalog(), table_name);
4119  std::unique_ptr<import_export::Importer> importer;
4120  if (leaf_aggregator_.leafCount() > 0) {
4121  importer.reset(new import_export::Importer(
4122  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4123  file_path.string(),
4124  copy_params));
4125  } else {
4126  importer.reset(
4127  new import_export::Importer(cat, td, file_path.string(), copy_params));
4128  }
4129  auto ms = measure<>::execution([&]() { importer->import(); });
4130  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
4131  } catch (const std::exception& e) {
4132  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
4133  }
4134 }
4135 
4136 namespace {
4137 
4138 // helper functions for error checking below
4139 // these would usefully be added as methods of TDatumType
4140 // but that's not possible as it's auto-generated by Thrift
4141 
4143  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
4144  t == TDatumType::LINESTRING || t == TDatumType::POINT);
4145 }
4146 
4147 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4148 
4149 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
4150  std::stringstream ss;
4151  ss << t;
4152  return ss.str();
4153 }
4154 
4155 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
4156  std::string result;
4157  switch (p) {
4158  case SQLTypes::kGEOGRAPHY:
4159  result = "GEOGRAPHY";
4160  break;
4161  case SQLTypes::kGEOMETRY:
4162  result = "GEOMETRY";
4163  break;
4164  default:
4165  result = "INVALID";
4166  break;
4167  }
4168  return result;
4169 }
4170 
4171 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
4172  std::stringstream ss;
4173  ss << t;
4174  return ss.str();
4175 }
4176 
4177 #endif
4178 
4179 } // namespace
4180 
4181 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
4182  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
4183  "' to table '" + table_name + "'. Column '" + cd->columnName + \
4184  "' " + attr + " mismatch (got '" + got + "', expected '" + \
4185  expected + "')");
4186 
4187 void DBHandler::import_geo_table(const TSessionId& session,
4188  const std::string& table_name,
4189  const std::string& file_name_in,
4190  const TCopyParams& cp,
4191  const TRowDescriptor& row_desc,
4192  const TCreateParams& create_params) {
4193  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4194  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4195  auto session_ptr = stdlog.getConstSessionInfo();
4196  check_read_only("import_table");
4197  auto& cat = session_ptr->getCatalog();
4198 
4200 
4201  std::string file_name{file_name_in};
4202 
4203  if (path_is_relative(file_name)) {
4204  // assume relative paths are relative to data_path / mapd_import / <session>
4205  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4206  boost::filesystem::path(file_name).filename();
4207  file_name = file_path.string();
4208  }
4210 
4211  if (is_a_supported_geo_file(file_name, true)) {
4212  // prepare to load geo file directly
4213  add_vsi_network_prefix(file_name);
4214  add_vsi_geo_prefix(file_name);
4215  } else if (is_a_supported_archive_file(file_name)) {
4216  // find the archive file
4217  add_vsi_network_prefix(file_name);
4218  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4219  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4220  }
4221  // find geo file in archive
4222  add_vsi_archive_prefix(file_name);
4223  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4224  // prepare to load that geo file
4225  if (geo_file.size()) {
4226  file_name = file_name + std::string("/") + geo_file;
4227  }
4228  } else {
4229  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4230  file_name_in);
4231  }
4232 
4233  // log what we're about to try to do
4234  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
4235  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
4236 
4237  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
4238  auto file_path = boost::filesystem::path(file_name);
4239  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4240  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
4241  }
4242 
4243  // use GDAL to check any dependent files exist (ditto)
4244  try {
4245  check_geospatial_files(file_path, copy_params);
4246  } catch (const std::exception& e) {
4247  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4248  }
4249 
4250  // get layer info and deconstruct
4251  // in general, we will get a combination of layers of these four types:
4252  // EMPTY: no rows, report and skip
4253  // GEO: create a geo table from this
4254  // NON_GEO: create a regular table from this
4255  // UNSUPPORTED_GEO: report and skip
4256  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
4257  try {
4258  layer_info = import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4259  } catch (const std::exception& e) {
4260  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4261  }
4262 
4263  // categorize the results
4264  using LayerNameToContentsMap =
4265  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
4266  LayerNameToContentsMap load_layers;
4267  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
4268  for (const auto& layer : layer_info) {
4269  switch (layer.contents) {
4271  LOG(INFO) << "import_geo_table: '" << layer.name
4272  << "' (will import as geo table)";
4273  load_layers[layer.name] = layer.contents;
4274  break;
4276  LOG(INFO) << "import_geo_table: '" << layer.name
4277  << "' (will import as regular table)";
4278  load_layers[layer.name] = layer.contents;
4279  break;
4281  LOG(WARNING) << "import_geo_table: '" << layer.name
4282  << "' (will not import, unsupported geo type)";
4283  break;
4285  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
4286  break;
4287  default:
4288  break;
4289  }
4290  }
4291 
4292  // if nothing is loadable, stop now
4293  if (load_layers.size() == 0) {
4294  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
4295  }
4296 
4297  // if we've been given an explicit layer name, check that it exists and is loadable
4298  // scan the original list, as it may exist but not have been gathered as loadable
4299  if (copy_params.geo_layer_name.size()) {
4300  bool found = false;
4301  for (const auto& layer : layer_info) {
4302  if (copy_params.geo_layer_name == layer.name) {
4305  // forget all the other layers and just load this one
4306  load_layers.clear();
4307  load_layers[layer.name] = layer.contents;
4308  found = true;
4309  break;
4310  } else if (layer.contents ==
4312  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4313  copy_params.geo_layer_name +
4314  "' has unsupported geo type!");
4315  } else if (layer.contents ==
4317  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4318  copy_params.geo_layer_name + "' is empty!");
4319  }
4320  }
4321  }
4322  if (!found) {
4323  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4324  copy_params.geo_layer_name + "' not found!");
4325  }
4326  }
4327 
4328  // Immerse import of multiple layers is not yet supported
4329  // @TODO fix this!
4330  if (row_desc.size() > 0 && load_layers.size() > 1) {
4332  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
4333  }
4334 
4335  // one definition of layer table name construction
4336  // we append the layer name if we're loading more than one table
4337  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
4338  const std::string& layer_name) {
4339  if (load_layers.size() > 1) {
4340  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
4341  if (sanitized_layer_name != layer_name) {
4342  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
4343  << sanitized_layer_name << "' for table name";
4344  }
4345  return table_name + "_" + sanitized_layer_name;
4346  }
4347  return table_name;
4348  };
4349 
4350  // if we're importing multiple tables, then NONE of them must exist already
4351  if (load_layers.size() > 1) {
4352  for (const auto& layer : load_layers) {
4353  // construct table name
4354  auto this_table_name = construct_layer_table_name(table_name, layer.first);
4355 
4356  // table must not exist
4357  if (cat.getMetadataForTable(this_table_name)) {
4358  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
4359  "' already exists, aborting!");
4360  }
4361  }
4362  }
4363 
4364  // prepare to gather errors that would otherwise be exceptions, as we can only throw
4365  // one
4366  std::vector<std::string> caught_exception_messages;
4367 
4368  // prepare to time multi-layer import
4369  double total_import_ms = 0.0;
4370 
4371  // now we're safe to start importing
4372  // we loop over the layers we're going to attempt to load
4373  for (const auto& layer : load_layers) {
4374  // unpack
4375  const auto& layer_name = layer.first;
4376  const auto& layer_contents = layer.second;
4377  bool is_geo_layer =
4379 
4380  // construct table name again
4381  auto this_table_name = construct_layer_table_name(table_name, layer_name);
4382 
4383  // report
4384  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
4385 
4386  // we need a row descriptor
4387  TRowDescriptor rd;
4388  if (row_desc.size() > 0) {
4389  // we have a valid RowDescriptor
4390  // this is the case where Immerse has already detected and created
4391  // all we need to do is import and trust that the data will match
4392  // use the provided row descriptor
4393  // table must already exist (we check this below)
4394  rd = row_desc;
4395  } else {
4396  // we don't have a RowDescriptor
4397  // we have to detect the file ourselves
4398  TDetectResult cds;
4399  TCopyParams cp_copy = cp; // retain S3 auth tokens
4400  cp_copy.geo_layer_name = layer_name;
4401  cp_copy.file_type = TFileType::POLYGON;
4402  try {
4403  detect_column_types(cds, session, file_name_in, cp_copy);
4404  } catch (const std::exception& e) {
4405  // capture the error and abort this layer
4406  caught_exception_messages.emplace_back(
4407  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
4408  continue;
4409  }
4410  rd = cds.row_set.row_desc;
4411 
4412  // then, if the table does NOT already exist, create it
4413  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4414  if (!td) {
4415  try {
4416  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
4417  } catch (const std::exception& e) {
4418  // capture the error and abort this layer
4419  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
4420  layer_name + "':" + e.what());
4421  continue;
4422  }
4423  }
4424  }
4425 
4426  // by this point, the table should exist, one way or another
4427  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4428  if (!td) {
4429  // capture the error and abort this layer
4430  std::string exception_message =
4431  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
4432  this_table_name + "'; table does not exist or failed to create.";
4433  caught_exception_messages.emplace_back(exception_message);
4434  continue;
4435  }
4436 
4437  // then, we have to verify that the structure matches
4438  // get column descriptors (non-system, non-deleted, logical columns only)
4439  const auto col_descriptors =
4440  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4441 
4442  // first, compare the column count
4443  if (col_descriptors.size() != rd.size()) {
4444  // capture the error and abort this layer
4445  std::string exception_message =
4446  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
4447  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
4448  ", expecting " + std::to_string(col_descriptors.size()) + ")";
4449  caught_exception_messages.emplace_back(exception_message);
4450  continue;
4451  }
4452 
4453  try {
4454  // then the names and types
4455  int rd_index = 0;
4456  for (auto cd : col_descriptors) {
4457  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
4458  std::string gname = rd[rd_index].col_name; // got
4459  std::string ename = cd->columnName; // expecting
4460 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4461  TTypeInfo gti = rd[rd_index].col_type; // got
4462 #endif
4463  TTypeInfo eti = cd_col_type.col_type; // expecting
4464  // check for name match
4465  if (gname != ename) {
4466  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
4467  gname == OMNISCI_GEO_PREFIX) {
4468  // rename incoming geo column to match existing legacy default geo column
4469  rd[rd_index].col_name = gname;
4470  LOG(INFO)
4471  << "import_geo_table: Renaming incoming geo column to match existing "
4472  "legacy default geo column";
4473  } else {
4474  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
4475  }
4476  }
4477 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4478  // check for type attributes match
4479  // these attrs must always match regardless of type
4480  if (gti.type != eti.type) {
4482  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
4483  }
4484  if (gti.is_array != eti.is_array) {
4486  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
4487  }
4488  if (gti.nullable != eti.nullable) {
4490  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
4491  }
4492  if (TTypeInfo_IsGeo(eti.type)) {
4493  // for geo, only these other attrs must also match
4494  // encoding and comp_param are allowed to differ
4495  // this allows appending to existing geo table
4496  // without needing to know the existing encoding
4497  if (gti.precision != eti.precision) {
4499  "geo sub-type",
4500  TTypeInfo_GeoSubTypeToString(gti.precision),
4501  TTypeInfo_GeoSubTypeToString(eti.precision));
4502  }
4503  if (gti.scale != eti.scale) {
4505  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
4506  }
4507  if (gti.encoding != eti.encoding) {
4508  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
4509  }
4510  if (gti.comp_param != eti.comp_param) {
4511  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
4512  }
4513  } else {
4514  // non-geo, all other attrs must also match
4515  // @TODO consider relaxing some of these dependent on type
4516  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
4517  if (gti.precision != eti.precision) {
4519  std::to_string(gti.precision),
4520  std::to_string(eti.precision));
4521  }
4522  if (gti.scale != eti.scale) {
4524  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
4525  }
4526  if (gti.encoding != eti.encoding) {
4528  "encoding",
4529  TTypeInfo_EncodingToString(gti.encoding),
4530  TTypeInfo_EncodingToString(eti.encoding));
4531  }
4532  if (gti.comp_param != eti.comp_param) {
4534  std::to_string(gti.comp_param),
4535  std::to_string(eti.comp_param));
4536  }
4537  }
4538 #endif
4539  rd_index++;
4540  }
4541  } catch (const std::exception& e) {
4542  // capture the error and abort this layer
4543  caught_exception_messages.emplace_back(e.what());
4544  continue;
4545  }
4546 
4547  std::map<std::string, std::string> colname_to_src;
4548  for (auto r : rd) {
4549  colname_to_src[r.col_name] =
4550  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
4551  }
4552 
4553  try {
4554  check_table_load_privileges(*session_ptr, this_table_name);
4555  } catch (const std::exception& e) {
4556  // capture the error and abort this layer
4557  caught_exception_messages.emplace_back(e.what());
4558  continue;
4559  }
4560 
4561  if (is_geo_layer) {
4562  // Final check to ensure that we actually have a geo column
4563  // of the expected name and type before doing the actual import,
4564  // in case the user naively overrode the name or type in Immerse
4565  // Preview (which as of 6/8/18 it still allows you to do).
4566  // This avoids a fatal assert later when it fails to find the
4567  // column. We should make Immerse more robust and disallow this.
4568  bool have_geo_column_with_correct_name = false;
4569  for (const auto& r : rd) {
4570  if (TTypeInfo_IsGeo(r.col_type.type)) {
4571  // TODO(team): allow user to override the geo column name
4572  if (r.col_name == OMNISCI_GEO_PREFIX) {
4573  have_geo_column_with_correct_name = true;
4574  } else if (r.col_name == LEGACY_GEO_PREFIX) {
4575  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
4576  // Normalize column names for geo append with legacy column naming scheme
4577  colname_to_src[r.col_name] = r.col_name;
4578  have_geo_column_with_correct_name = true;
4579  }
4580  }
4581  }
4582  if (!have_geo_column_with_correct_name) {
4583  std::string exception_message = "Table " + this_table_name +
4584  " does not have a geo column with name '" +
4585  OMNISCI_GEO_PREFIX + "'. Import aborted!";
4586  caught_exception_messages.emplace_back(exception_message);
4587  continue;
4588  }
4589  }
4590 
4591  try {
4592  // import this layer only?
4593  copy_params.geo_layer_name = layer_name;
4594 
4595  // create an importer
4596  std::unique_ptr<import_export::Importer> importer;
4597  if (leaf_aggregator_.leafCount() > 0) {
4598  importer.reset(new import_export::Importer(
4599  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4600  file_path.string(),
4601  copy_params));
4602  } else {
4603  importer.reset(
4604  new import_export::Importer(cat, td, file_path.string(), copy_params));
4605  }
4606 
4607  // import
4608  auto ms = measure<>::execution([&]() { importer->importGDAL(colname_to_src); });
4609  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
4610  << "s";
4611  total_import_ms += ms;
4612  } catch (const std::exception& e) {
4613  std::string exception_message =
4614  "Import of Layer '" + this_table_name + "' failed: " + e.what();
4615  caught_exception_messages.emplace_back(exception_message);
4616  continue;
4617  }
4618  }
4619 
4620  // did we catch any exceptions?
4621  if (caught_exception_messages.size()) {
4622  // combine all the strings into one and throw a single Thrift exception
4623  std::string combined_exception_message = "Failed to import geo file:\n";
4624  for (const auto& message : caught_exception_messages) {
4625  combined_exception_message += message + "\n";
4626  }
4627  THROW_MAPD_EXCEPTION(combined_exception_message);
4628  } else {
4629  // report success and total time
4630  LOG(INFO) << "Import Successful!";
4631  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
4632  }
4633 }
4634 
4635 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
4636 
4637 void DBHandler::import_table_status(TImportStatus& _return,
4638  const TSessionId& session,
4639  const std::string& import_id) {
4640  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
4641  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4642  auto is = import_export::Importer::get_import_status(import_id);
4643  _return.elapsed = is.elapsed.count();
4644  _return.rows_completed = is.rows_completed;
4645  _return.rows_estimated = is.rows_estimated;
4646  _return.rows_rejected = is.rows_rejected;
4647 }
4648 
4650  const TSessionId& session,
4651  const std::string& archive_path_in,
4652  const TCopyParams& copy_params) {
4653  auto stdlog =
4654  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
4655  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4656 
4657  std::string archive_path(archive_path_in);
4658 
4659  if (path_is_relative(archive_path)) {
4660  // assume relative paths are relative to data_path / mapd_import / <session>
4661  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4662  boost::filesystem::path(archive_path).filename();
4663  archive_path = file_path.string();
4664  }
4665  validate_import_file_path_if_local(archive_path);
4666 
4667  if (is_a_supported_archive_file(archive_path)) {
4668  // find the archive file
4669  add_vsi_network_prefix(archive_path);
4670  if (!import_export::Importer::gdalFileExists(archive_path,
4671  thrift_to_copyparams(copy_params))) {
4672  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4673  }
4674  // find geo file in archive
4675  add_vsi_archive_prefix(archive_path);
4676  std::string geo_file =
4677  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
4678  // what did we get?
4679  if (geo_file.size()) {
4680  // prepend it with the original path
4681  _return = archive_path_in + std::string("/") + geo_file;
4682  } else {
4683  // just return the original path
4684  _return = archive_path_in;
4685  }
4686  } else {
4687  // just return the original path
4688  _return = archive_path_in;
4689  }
4690 }
4691 
4692 void DBHandler::get_all_files_in_archive(std::vector<std::string>& _return,
4693  const TSessionId& session,
4694  const std::string& archive_path_in,
4695  const TCopyParams& copy_params) {
4696  auto stdlog =
4697  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
4698  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4699 
4700  std::string archive_path(archive_path_in);
4701  if (path_is_relative(archive_path)) {
4702  // assume relative paths are relative to data_path / mapd_import / <session>
4703  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4704  boost::filesystem::path(archive_path).filename();
4705  archive_path = file_path.string();
4706  }
4707  validate_import_file_path_if_local(archive_path);
4708 
4709  if (is_a_supported_archive_file(archive_path)) {
4710  // find the archive file
4711  add_vsi_network_prefix(archive_path);
4712  if (!import_export::Importer::gdalFileExists(archive_path,
4713  thrift_to_copyparams(copy_params))) {
4714  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4715  }
4716  // find all files in archive
4717  add_vsi_archive_prefix(archive_path);
4719  archive_path, thrift_to_copyparams(copy_params));
4720  // prepend them all with original path
4721  for (auto& s : _return) {
4722  s = archive_path_in + '/' + s;
4723  }
4724  }
4725 }
4726 
4727 void DBHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
4728  const TSessionId& session,
4729  const std::string& file_name_in,
4730  const TCopyParams& cp) {
4731  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
4732  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4733 
4734  std::string file_name(file_name_in);
4735 
4737 
4738  // handle relative paths
4739  if (path_is_relative(file_name)) {
4740  // assume relative paths are relative to data_path / mapd_import / <session>
4741  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4742  boost::filesystem::path(file_name).filename();
4743  file_name = file_path.string();
4744  }
4746 
4747  // validate file_name
4748  if (is_a_supported_geo_file(file_name, true)) {
4749  // prepare to load geo file directly
4750  add_vsi_network_prefix(file_name);
4751  add_vsi_geo_prefix(file_name);
4752  } else if (is_a_supported_archive_file(file_name)) {
4753  // find the archive file
4754  add_vsi_network_prefix(file_name);
4755  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4756  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4757  }
4758  // find geo file in archive
4759  add_vsi_archive_prefix(file_name);
4760  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4761  // prepare to load that geo file
4762  if (geo_file.size()) {
4763  file_name = file_name + std::string("/") + geo_file;
4764  }
4765  } else {
4766  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4767  file_name_in);
4768  }
4769 
4770  // check the file actually exists
4771  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4772  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
4773  }
4774 
4775  // find all layers
4776  auto internal_layer_info =
4777  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4778 
4779  // convert to Thrift type
4780  for (const auto& internal_layer : internal_layer_info) {
4781  TGeoFileLayerInfo layer;
4782  layer.name = internal_layer.name;
4783  switch (internal_layer.contents) {
4785  layer.contents = TGeoFileLayerContents::EMPTY;
4786  break;
4788  layer.contents = TGeoFileLayerContents::GEO;
4789  break;
4791  layer.contents = TGeoFileLayerContents::NON_GEO;
4792  break;
4794  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
4795  break;
4796  default:
4797  CHECK(false);
4798  }
4799  _return.emplace_back(layer); // no suitable constructor to just pass parameters
4800  }
4801 }
4802 
4803 void DBHandler::start_heap_profile(const TSessionId& session) {
4804  auto stdlog = STDLOG(get_session_ptr(session));
4805 #ifdef HAVE_PROFILER
4806  if (IsHeapProfilerRunning()) {
4807  THROW_MAPD_EXCEPTION("Profiler already started");
4808  }
4809  HeapProfilerStart("omnisci");
4810 #else
4811  THROW_MAPD_EXCEPTION("Profiler not enabled");
4812 #endif // HAVE_PROFILER
4813 }
4814 
4815 void DBHandler::stop_heap_profile(const TSessionId& session) {
4816  auto stdlog = STDLOG(get_session_ptr(session));
4817 #ifdef HAVE_PROFILER
4818  if (!IsHeapProfilerRunning()) {
4819  THROW_MAPD_EXCEPTION("Profiler not running");
4820  }
4821  HeapProfilerStop();
4822 #else
4823  THROW_MAPD_EXCEPTION("Profiler not enabled");
4824 #endif // HAVE_PROFILER
4825 }
4826 
4827 void DBHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
4828  auto stdlog = STDLOG(get_session_ptr(session));
4829 #ifdef HAVE_PROFILER
4830  if (!IsHeapProfilerRunning()) {
4831  THROW_MAPD_EXCEPTION("Profiler not running");
4832  }
4833  auto profile_buff = GetHeapProfile();
4834  profile = profile_buff;
4835  free(profile_buff);
4836 #else
4837  THROW_MAPD_EXCEPTION("Profiler not enabled");
4838 #endif // HAVE_PROFILER
4839 }
4840 
4841 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
4842 void DBHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
4843  if (session_it->second.use_count() > 2 ||
4844  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
4845  // SessionInfo is being used in more than one active operation. Original copy + one
4846  // stored in StdLog. Skip the checks.
4847  return;
4848  }
4849  time_t last_used_time = session_it->second->get_last_used_time();
4850  time_t start_time = session_it->second->get_start_time();
4851  const auto current_session_duration = time(0) - last_used_time;
4852  if (current_session_duration > idle_session_duration_) {
4853  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
4854  << " idle duration " << current_session_duration
4855  << " seconds exceeds maximum idle duration " << idle_session_duration_
4856  << " seconds. Invalidating session.";
4857  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
4858  }
4859  const auto total_session_duration = time(0) - start_time;
4860  if (total_session_duration > max_session_duration_) {
4861  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
4862  << " total duration " << total_session_duration
4863  << " seconds exceeds maximum total session duration "
4864  << max_session_duration_ << " seconds. Invalidating session.";
4865  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
4866  }
4867 }
4868 
4869 std::shared_ptr<const Catalog_Namespace::SessionInfo> DBHandler::get_const_session_ptr(
4870  const TSessionId& session) {
4871  return get_session_ptr(session);
4872 }
4873 
4875  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4876  return *get_session_it_unsafe(session, read_lock)->second;
4877 }
4878 
4879 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_copy_ptr(
4880  const TSessionId& session) {
4881  // Note(Wamsi): We have `get_const_session_ptr` which would return as const
4882  // SessionInfo stored in the map. You can use `get_const_session_ptr` instead of the
4883  // copy of SessionInfo but beware that it can be changed in teh map. So if you do not
4884  // care about the changes then use `get_const_session_ptr` if you do then use this
4885  // function to get a copy. We should eventually aim to merge both
4886  // `get_const_session_ptr` and `get_session_copy_ptr`.
4887  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4888  auto& session_info_ref = *get_session_it_unsafe(session, read_lock)->second;
4889  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
4890 }
4891 
4892 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_ptr(
4893  const TSessionId& session_id) {
4894  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
4895  // Should be used only when you need to make updates to original SessionInfo object.
4896  // Currently used by `update_session_last_used_duration`
4897 
4898  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will
4899  // be invalid during disconnect. SessionInfo will be erased from map by the time it
4900  // reaches here. In both the above cases, we would return `nullptr` and can skip
4901  // SessionInfo updates.
4902  if (session_id.empty()) {
4903  return {};
4904  }
4905  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4906  return get_session_it_unsafe(session_id, read_lock)->second;
4907 }
4908 
4910  const Catalog_Namespace::SessionInfo& session_info,
4911  const std::string& table_name) {
4912  auto user_metadata = session_info.get_currentUser();
4913  auto& cat = session_info.getCatalog();
4914  DBObject dbObject(table_name, TableDBObjectType);
4915  dbObject.loadKey(cat);
4917  std::vector<DBObject> privObjects;
4918  privObjects.push_back(dbObject);
4919  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
4920  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
4921  user_metadata.userLoggable() +
4922  " has no insert privileges for table " + table_name + ".");
4923  }
4924 }
4925 
4926 void DBHandler::check_table_load_privileges(const TSessionId& session,
4927  const std::string& table_name) {
4928  const auto session_info = get_session_copy(session);
4929  check_table_load_privileges(session_info, table_name);
4930 }
4931 
4933  const TExecuteMode::type mode) {
4934  const std::string user_name = session_ptr->get_currentUser().userLoggable();
4935  switch (mode) {
4936  case TExecuteMode::GPU:
4937  if (cpu_mode_only_) {
4938  TOmniSciException e;
4939  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
4940  throw e;
4941  }
4943  LOG(INFO) << "User " << user_name << " sets GPU mode.";
4944  break;
4945  case TExecuteMode::CPU:
4947  LOG(INFO) << "User " << user_name << " sets CPU mode.";
4948  break;
4949  }
4950 }
4951 
4952 std::vector<PushedDownFilterInfo> DBHandler::execute_rel_alg(
4953  TQueryResult& _return,
4954  QueryStateProxy query_state_proxy,
4955  const std::string& query_ra,
4956  const bool column_format,
4957  const ExecutorDeviceType executor_device_type,
4958  const int32_t first_n,
4959  const int32_t at_most_n,
4960  const bool just_validate,
4961  const bool find_push_down_candidates,
4962  const ExplainInfo& explain_info,
4963  const std::optional<size_t> executor_index) const {
4964  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4965 
4966  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
4967  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
4968 
4969  const auto& cat = query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog();
4970  auto executor = Executor::getExecutor(
4971  executor_index ? *executor_index : Executor::UNITARY_EXECUTOR_ID,
4972  jit_debug_ ? "/tmp" : "",
4973  jit_debug_ ? "mapdquery" : "",
4975  RelAlgExecutor ra_executor(executor.get(),
4976  cat,
4977  query_ra,
4978  query_state_proxy.getQueryState().shared_from_this());
4979  // handle hints
4980  const auto& query_hints = ra_executor.getParsedQueryHints();
4981  CompilationOptions co = {
4982  query_hints.cpu_mode ? ExecutorDeviceType::CPU : executor_device_type,
4983  /*hoist_literals=*/true,
4986  /*allow_lazy_fetch=*/true,
4987  /*filter_on_deleted_column=*/true,
4991  auto validate_or_explain_query =
4992  explain_info.justExplain() || explain_info.justCalciteExplain() || just_validate;
4995  explain_info.justExplain(),
4996  allow_loop_joins_ || just_validate,
4998  jit_debug_,
4999  just_validate,
5002  find_push_down_candidates,
5003  explain_info.justCalciteExplain(),
5005  g_enable_runtime_query_interrupt && !validate_or_explain_query &&
5006  !query_state_proxy.getQueryState()
5008  ->get_session_id()
5009  .empty(),
5011  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
5014  nullptr,
5015  nullptr,
5016  0,
5017  0),
5018  {}};
5019  _return.execution_time_ms += measure<>::execution([&]() {
5020  result = ra_executor.executeRelAlgQuery(co, eo, explain_info.explain_plan, nullptr);
5021  });
5022  // reduce execution time by the time spent during queue waiting
5023  _return.execution_time_ms -= result.getRows()->getQueueTime();
5024  VLOG(1) << cat.getDataMgr().getSystemMemoryUsage();
5025  const auto& filter_push_down_info = result.getPushedDownFilterInfo();
5026  if (!filter_push_down_info.empty()) {
5027  return filter_push_down_info;
5028  }
5029  if (explain_info.justExplain()) {
5030  convert_explain(_return, *result.getRows(), column_format);
5031  } else if (!explain_info.justCalciteExplain()) {
5032  convert_rows(_return,
5033  timer.createQueryStateProxy(),
5034  result.getTargetsMeta(),
5035  *result.getRows(),
5036  column_format,
5037  first_n,
5038  at_most_n);
5039  }
5040  return {};
5041 }
5042 
5043 void DBHandler::execute_rel_alg_df(TDataFrame& _return,
5044  const std::string& query_ra,
5045  QueryStateProxy query_state_proxy,
5046  const Catalog_Namespace::SessionInfo& session_info,
5047  const ExecutorDeviceType device_type,
5048  const size_t device_id,
5049  const int32_t first_n,
5050  const TArrowTransport::type transport_method) const {
5051  const auto& cat = session_info.getCatalog();
5052  CHECK(device_type == ExecutorDeviceType::CPU ||
5055  jit_debug_ ? "/tmp" : "",
5056  jit_debug_ ? "mapdquery" : "",
5058  RelAlgExecutor ra_executor(executor.get(),
5059  cat,
5060  query_ra,
5061  query_state_proxy.getQueryState().shared_from_this());
5062  const auto& query_hints = ra_executor.getParsedQueryHints();
5063  CompilationOptions co = {query_hints.cpu_mode ? ExecutorDeviceType::CPU : device_type,
5064  /*hoist_literals=*/true,
5067  /*allow_lazy_fetch=*/true,
5068  /*filter_on_deleted_column=*/true,
5071  ExecutionOptions eo = {
5074  false,
5077  jit_debug_,
5078  false,
5081  false,
5082  false,
5084  g_enable_runtime_query_interrupt && !query_state_proxy.getQueryState()
5086  ->get_session_id()
5087  .empty(),
5089  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
5092  nullptr,
5093  nullptr,
5094  0,
5095  0),
5096  {}};
5097  _return.execution_time_ms += measure<>::execution(
5098  [&]() { result = ra_executor.executeRelAlgQuery(co, eo, false, nullptr); });
5099  _return.execution_time_ms -= result.getRows()->getQueueTime();
5100  const auto rs = result.getRows();
5101  const auto converter =
5102  std::make_unique<ArrowResultSetConverter>(rs,
5103  data_mgr_,
5104  device_type,
5105  device_id,
5106  getTargetNames(result.getTargetsMeta()),
5107  first_n,
5108  ArrowTransport(transport_method));
5109  ArrowResult arrow_result;
5110  _return.arrow_conversion_time_ms +=
5111  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
5112  _return.sm_handle =
5113  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
5114  _return.sm_size = arrow_result.sm_size;
5115  _return.df_handle =
5116  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
5117  _return.df_buffer =
5118  std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
5119  if (device_type == ExecutorDeviceType::GPU) {
5120  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
5121  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
5122  ipc_handle_to_dev_ptr_.insert(
5123  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
5124  }
5125  _return.df_size = arrow_result.df_size;
5126 }
5127 
5128 std::vector<TargetMetaInfo> DBHandler::getTargetMetaInfo(
5129  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
5130  std::vector<TargetMetaInfo> result;
5131  for (const auto& target : targets) {
5132  CHECK(target);
5133  CHECK(target->get_expr());
5134  result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
5135  }
5136  return result;
5137 }
5138 
5139 std::vector<std::string> DBHandler::getTargetNames(
5140  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
5141  std::vector<std::string> names;
5142  for (const auto& target : targets) {
5143  CHECK(target);
5144  CHECK(target->get_expr());
5145  names.push_back(target->get_resname());
5146  }
5147  return names;
5148 }
5149 
5150 std::vector<std::string> DBHandler::getTargetNames(
5151  const std::vector<TargetMetaInfo>& targets) const {
5152  std::vector<std::string> names;
5153  for (const auto& target : targets) {
5154  names.push_back(target.get_resname());
5155  }
5156  return names;
5157 }
5158 
5159 void DBHandler::convert_rows(TQueryResult& _return,
5160  QueryStateProxy query_state_proxy,
5161  const std::vector<TargetMetaInfo>& targets,
5162  const ResultSet& results,
5163  const bool column_format,
5164  const int32_t first_n,
5165  const int32_t at_most_n) const {
5166  query_state::Timer timer = query_state_proxy.createTimer(__func__);
5167  _return.row_set.row_desc = ThriftSerializers::target_meta_infos_to_thrift(targets);
5168  int32_t fetched{0};
5169  if (column_format) {
5170  _return.row_set.is_columnar = true;
5171  std::vector<TColumn> tcolumns(results.colCount());
5172  while (first_n == -1 || fetched < first_n) {
5173  const auto crt_row = results.getNextRow(true, true);
5174  if (crt_row.empty()) {
5175  break;
5176  }
5177  ++fetched;
5178  if (at_most_n >= 0 && fetched > at_most_n) {
5179  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
5180  std::to_string(at_most_n));
5181  }
5182  for (size_t i = 0; i < results.colCount(); ++i) {
5183  const auto agg_result = crt_row[i];
5184  value_to_thrift_column(agg_result, targets[i].get_type_info(), tcolumns[i]);
5185  }
5186  }
5187  for (size_t i = 0; i < results.colCount(); ++i) {
5188  _return.row_set.columns.push_back(tcolumns[i]);
5189  }
5190  } else {
5191  _return.row_set.is_columnar = false;
5192  while (first_n == -1 || fetched < first_n) {
5193  const auto crt_row = results.getNextRow(true, true);
5194  if (crt_row.empty()) {
5195  break;
5196  }
5197  ++fetched;
5198  if (at_most_n >= 0 && fetched > at_most_n) {
5199  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
5200  std::to_string(at_most_n));
5201  }
5202  TRow trow;
5203  trow.cols.reserve(results.colCount());
5204  for (size_t i = 0; i < results.colCount(); ++i) {
5205  const auto agg_result = crt_row[i];
5206  trow.cols.push_back(value_to_thrift(agg_result, targets[i].get_type_info()));
5207  }
5208  _return.row_set.rows.push_back(trow);
5209  }
5210  }
5211 }
5212 
5213 TRowDescriptor DBHandler::fixup_row_descriptor(const TRowDescriptor& row_desc,
5214  const Catalog& cat) {
5215  TRowDescriptor fixedup_row_desc;
5216  for (const TColumnType& col_desc : row_desc) {
5217  auto fixedup_col_desc = col_desc;
5218  if (col_desc.col_type.encoding == TEncodingType::DICT &&
5219  col_desc.col_type.comp_param > 0) {
5220  const auto dd = cat.getMetadataForDict(col_desc.col_type.comp_param, false);
5221  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
5222  }
5223  fixedup_row_desc.push_back(fixedup_col_desc);
5224  }
5225 
5226  return fixedup_row_desc;
5227 }
5228 
5229 // create simple result set to return a single column result
5230 void DBHandler::create_simple_result(TQueryResult& _return,
5231  const ResultSet& results,
5232  const bool column_format,
5233  const std::string label) const {
5234  CHECK_EQ(size_t(1), results.rowCount());
5235  TColumnType proj_info;
5236  proj_info.col_name = label;
5237  proj_info.col_type.type = TDatumType::STR;
5238  proj_info.col_type.nullable = false;
5239  proj_info.col_type.is_array = false;
5240  _return.row_set.row_desc.push_back(proj_info);
5241  const auto crt_row = results.getNextRow(true, true);
5242  const auto tv = crt_row[0];
5243  CHECK(results.getNextRow(true, true).empty());
5244  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
5245  CHECK(scalar_tv);
5246  const auto s_n = boost::get<NullableString>(scalar_tv);
5247  CHECK(s_n);
5248  const auto s = boost::get<std::string>(s_n);
5249  CHECK(s);
5250  if (column_format) {
5251  TColumn tcol;
5252  tcol.data.str_col.push_back(*s);
5253  tcol.nulls.push_back(false);
5254  _return.row_set.is_columnar = true;
5255  _return.row_set.columns.push_back(tcol);
5256  } else {
5257  TDatum explanation;
5258  explanation.val.str_val = *s;
5259  explanation.is_null = false;
5260  TRow trow;
5261  trow.cols.push_back(explanation);
5262  _return.row_set.is_columnar = false;
5263  _return.row_set.rows.push_back(trow);
5264  }
5265 }
5266 
5267 void DBHandler::convert_explain(TQueryResult& _return,
5268  const ResultSet& results,
5269  const bool column_format) const {
5270  create_simple_result(_return, results, column_format, "Explanation");
5271 }
5272 
5273 void DBHandler::convert_result(TQueryResult& _return,
5274  const ResultSet& results,
5275  const bool column_format) const {
5276  create_simple_result(_return, results, column_format, "Result");
5277 }
5278 
5279 // this all should be moved out of here to catalog
5281  const TableDescriptor* td,
5282  const AccessPrivileges access_priv) {
5283  CHECK(td);
5284  auto& cat = session_info.getCatalog();
5285  std::vector<DBObject> privObjects;
5286  DBObject dbObject(td->tableName, TableDBObjectType);
5287  dbObject.loadKey(cat);
5288  dbObject.setPrivileges(access_priv);
5289  privObjects.push_back(dbObject);
5290  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
5291  privObjects);
5292 };
5293 
5295  const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl);
5296  if (drop_db_stmt) {
5297  invalidate_sessions(*drop_db_stmt->getDatabaseName(), drop_db_stmt);
5298  return;
5299  }
5300  const auto rename_db_stmt = dynamic_cast<Parser::RenameDatabaseStmt*>(ddl);
5301  if (rename_db_stmt) {
5302  invalidate_sessions(*rename_db_stmt->getPreviousDatabaseName(), rename_db_stmt);
5303  return;
5304  }
5305  const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl);
5306  if (drop_user_stmt) {
5307  invalidate_sessions(*drop_user_stmt->getUserName(), drop_user_stmt);
5308  return;
5309  }
5310  const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl);
5311  if (rename_user_stmt) {
5312  invalidate_sessions(*rename_user_stmt->getOldUserName(), rename_user_stmt);
5313  return;
5314  }
5315 }
5316 
5317 void DBHandler::sql_execute_impl(TQueryResult& _return,
5318  QueryStateProxy query_state_proxy,
5319  const bool column_format,
5320  const std::string& nonce,
5321  const ExecutorDeviceType executor_device_type,
5322  const int32_t first_n,
5323  const int32_t at_most_n) {
5324  if (leaf_handler_) {
5325  leaf_handler_->flush_queue();
5326  }
5327 
5328  _return.nonce = nonce;
5329  _return.execution_time_ms = 0;
5330  auto const query_str = strip(query_state_proxy.getQueryState().getQueryStr());
5331  auto session_ptr = query_state_proxy.getQueryState().getConstSessionInfo();
5332  // Call to DistributedValidate() below may change cat.
5333  auto& cat = session_ptr->getCatalog();
5334 
5335  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
5336 
5337  mapd_unique_lock<mapd_shared_mutex> executeWriteLock;
5338  mapd_shared_lock<mapd_shared_mutex> executeReadLock;
5339 
5341  ParserWrapper pw{query_str};
5342  switch (pw.getQueryType()) {
5344  _return.query_type = TQueryType::READ;
5345  VLOG(1) << "query type: READ";
5346  break;
5347  }
5349  _return.query_type = TQueryType::WRITE;
5350  VLOG(1) << "query type: WRITE";
5351  break;
5352  }
5354  _return.query_type = TQueryType::SCHEMA_READ;
5355  VLOG(1) << "query type: SCHEMA READ";
5356  break;
5357  }
5359  _return.query_type = TQueryType::SCHEMA_WRITE;
5360  VLOG(1) << "query type: SCHEMA WRITE";
5361  break;
5362  }
5363  default: {
5364  _return.query_type = TQueryType::UNKNOWN;
5365  LOG(WARNING) << "query type: UNKNOWN";
5366  break;
5367  }
5368  }
5369  if (pw.isCalcitePathPermissable(read_only_)) {
5370  // run DDL before the locks as DDL statements should handle their own locking
5371  if (pw.isCalciteDdl()) {
5372  std::string query_ra;
5373  _return.execution_time_ms += measure<>::execution([&]() {
5374  TPlanResult result;
5375  std::tie(result, locks) =
5376  parse_to_ra(query_state_proxy, query_str, {}, false, system_parameters_);
5377  query_ra = result.plan_result;
5378  });
5379 
5380  executeDdl(_return, query_ra, session_ptr);
5381  return;
5382  }
5383 
5384  executeReadLock = mapd_shared_lock<mapd_shared_mutex>(
5387 
5388  std::string query_ra;
5389  _return.execution_time_ms += measure<>::execution([&]() {
5390  TPlanResult result;
5391  std::tie(result, locks) =
5392  parse_to_ra(query_state_proxy, query_str, {}, true, system_parameters_);
5393  query_ra = result.plan_result;
5394  });
5395 
5396  std::string query_ra_calcite_explain;
5397  if (pw.isCalciteExplain() && (!g_enable_filter_push_down || g_cluster)) {
5398  // return the ra as the result
5399  convert_explain(_return, ResultSet(query_ra), true);
5400  return;
5401  } else if (pw.isCalciteExplain()) {
5402  // removing the "explain calcite " from the beginning of the "query_str":
5403  std::string temp_query_str =
5404  query_str.substr(std::string("explain calcite ").length());
5405 
5406  CHECK(!locks.empty());
5407  query_ra_calcite_explain =
5408  parse_to_ra(query_state_proxy, temp_query_str, {}, false, system_parameters_)
5409  .first.plan_result;
5410  }
5411  const auto explain_info = pw.getExplainInfo();
5412  std::vector<PushedDownFilterInfo> filter_push_down_requests;
5413  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
5414  [this,
5415  &filter_push_down_requests,
5416  &_return,
5417  &query_state_proxy,
5418  &explain_info,
5419  &query_ra_calcite_explain,
5420  &query_ra,
5421  &query_str,
5422  &locks,
5423  column_format,
5424  executor_device_type,
5425  first_n,
5426  at_most_n](const size_t executor_index) {
5427  filter_push_down_requests = execute_rel_alg(
5428  _return,
5429  query_state_proxy,
5430  explain_info.justCalciteExplain() ? query_ra_calcite_explain : query_ra,
5431  column_format,
5432  executor_device_type,
5433  first_n,
5434  at_most_n,
5435  /*just_validate=*/false,
5437  explain_info,
5438  executor_index);
5439  if (explain_info.justCalciteExplain() && filter_push_down_requests.empty()) {
5440  // we only reach here if filter push down was enabled, but no filter
5441  // push down candidate was found
5442  convert_explain(_return, ResultSet(query_ra), true);
5443  } else if (!filter_push_down_requests.empty()) {
5444  CHECK(!locks.empty());
5446  query_state_proxy,
5447  query_ra,
5448  column_format,
5449  executor_device_type,
5450  first_n,
5451  at_most_n,
5452  explain_info.justExplain(),
5453  explain_info.justCalciteExplain(),
5454  filter_push_down_requests);
5455  } else if (explain_info.justCalciteExplain() &&
5456  filter_push_down_requests.empty()) {
5457  // return the ra as the result:
5458  // If we reach here, the 'filter_push_down_request' turned out to be
5459  // empty, i.e., no filter push down so we continue with the initial
5460  // (unchanged) query's calcite explanation.
5461  CHECK(!locks.empty());
5462  query_ra =
5463  parse_to_ra(query_state_proxy, query_str, {}, false, system_parameters_)
5464  .first.plan_result;
5465  convert_explain(_return, ResultSet(query_ra), true);
5466  }
5467  });
5469  if (g_enable_runtime_query_interrupt && !session_ptr->get_session_id().empty()) {
5471  auto submitted_time = std::chrono::system_clock::now();
5472  query_state_proxy.getQueryState().setQuerySubmittedTime(submitted_time);
5473  executor->enrollQuerySession(session_ptr->get_session_id(),
5474  query_str,
5475  submitted_time,
5477  QuerySessionStatus::QueryStatus::PENDING_QUEUE);
5478  }
5479  dispatch_queue_->submit(execute_rel_alg_task,
5480  pw.getDMLType() == ParserWrapper::DMLType::Update ||
5481  pw.getDMLType() == ParserWrapper::DMLType::Delete);
5482  auto result_future = execute_rel_alg_task->get_future();
5483  result_future.get();
5484  return;
5485  } else if (pw.is_optimize || pw.is_validate) {
5486  // Get the Stmt object
5487  DBHandler::parser_with_error_handler(query_str, parse_trees);
5488 
5489  if (pw.is_optimize) {
5490  const auto optimize_stmt =
5491  dynamic_cast<Parser::OptimizeTableStmt*>(parse_trees.front().get());
5492  CHECK(optimize_stmt);
5493 
5494  _return.execution_time_ms += measure<>::execution([&]() {
5495  const auto td_with_lock =
5497  cat, optimize_stmt->getTableName());
5498  const auto td = td_with_lock();
5499 
5500  if (!td || !user_can_access_table(
5501  *session_ptr, td, AccessPrivileges::DELETE_FROM_TABLE)) {
5502  throw std::runtime_error("Table " + optimize_stmt->getTableName() +
5503  " does not exist.");
5504  }
5505  if (td->isView) {
5506  throw std::runtime_error("OPTIMIZE TABLE command is not supported on views.");
5507  }
5508 
5509  // acquire write lock on table data
5510  auto data_lock =
5512 
5513  auto executor = Executor::getExecutor(
5515  const TableOptimizer optimizer(td, executor.get(), cat);
5516  if (optimize_stmt->shouldVacuumDeletedRows()) {
5517  optimizer.vacuumDeletedRows();
5518  }
5519  optimizer.recomputeMetadata();
5520  });
5521 
5522  return;
5523  }
5524  if (pw.is_validate) {
5525  // check user is superuser
5526  if (!session_ptr->get_currentUser().isSuper) {
5527  throw std::runtime_error("Superuser is required to run VALIDATE");
5528  }
5529  const auto validate_stmt =
5530  dynamic_cast<Parser::ValidateStmt*>(parse_trees.front().get());
5531  CHECK(validate_stmt);
5532 
5533  // Prevent any other query from running while doing