OmniSciDB  340b00dbf6
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: DBHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "DBHandler.h"
25 #include "DistributedLoader.h"
27 #include "TokenCompletionHints.h"
28 
29 #ifdef HAVE_PROFILER
30 #include <gperftools/heap-profiler.h>
31 #endif // HAVE_PROFILER
32 
33 #include "MapDRelease.h"
34 
35 #include "Calcite/Calcite.h"
36 #include "gen-cpp/CalciteServer.h"
37 
39 
40 #include "Catalog/Catalog.h"
45 #include "DistributedHandler.h"
47 #include "Geospatial/GDAL.h"
48 #include "Geospatial/Transforms.h"
49 #include "Geospatial/Types.h"
50 #include "ImportExport/Importer.h"
51 #include "LockMgr/LockMgr.h"
53 #include "Parser/ParserWrapper.h"
55 #include "Parser/parser.h"
58 #include "QueryEngine/Execute.h"
67 #include "Shared/StringTransform.h"
68 #include "Shared/import_helpers.h"
70 #include "Shared/measure.h"
71 #include "Shared/scope.h"
72 
73 #include <fcntl.h>
74 #include <picosha2.h>
75 #include <sys/time.h>
76 #include <sys/types.h>
77 #include <sys/wait.h>
78 #include <unistd.h>
79 #include <algorithm>
80 #include <boost/algorithm/string.hpp>
81 #include <boost/filesystem.hpp>
82 #include <boost/make_shared.hpp>
83 #include <boost/process/search_path.hpp>
84 #include <boost/program_options.hpp>
85 #include <boost/regex.hpp>
86 #include <boost/tokenizer.hpp>
87 #include <cmath>
88 #include <csignal>
89 #include <fstream>
90 #include <future>
91 #include <map>
92 #include <memory>
93 #include <random>
94 #include <regex>
95 #include <string>
96 #include <thread>
97 #include <typeinfo>
98 
99 #include <arrow/api.h>
100 #include <arrow/io/api.h>
101 #include <arrow/ipc/api.h>
102 
103 #include "Shared/ArrowUtil.h"
104 
105 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
106 
109 
110 #define INVALID_SESSION_ID ""
111 
112 #define THROW_MAPD_EXCEPTION(errstr) \
113  { \
114  TOmniSciException ex; \
115  ex.error_msg = errstr; \
116  LOG(ERROR) << ex.error_msg; \
117  throw ex; \
118  }
119 
120 thread_local std::string TrackingProcessor::client_address;
122 
123 namespace {
124 
125 SessionMap::iterator get_session_from_map(const TSessionId& session,
126  SessionMap& session_map) {
127  auto session_it = session_map.find(session);
128  if (session_it == session_map.end()) {
129  THROW_MAPD_EXCEPTION("Session not valid.");
130  }
131  return session_it;
132 }
133 
134 struct ForceDisconnect : public std::runtime_error {
135  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
136 };
137 
138 } // namespace
139 
140 template <>
141 SessionMap::iterator DBHandler::get_session_it_unsafe(
142  const TSessionId& session,
143  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
144  auto session_it = get_session_from_map(session, sessions_);
145  try {
146  check_session_exp_unsafe(session_it);
147  } catch (const ForceDisconnect& e) {
148  read_lock.unlock();
149  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
150  auto session_it2 = get_session_from_map(session, sessions_);
151  disconnect_impl(session_it2, write_lock);
152  THROW_MAPD_EXCEPTION(e.what());
153  }
154  return session_it;
155 }
156 
157 template <>
158 SessionMap::iterator DBHandler::get_session_it_unsafe(
159  const TSessionId& session,
160  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
161  auto session_it = get_session_from_map(session, sessions_);
162  try {
163  check_session_exp_unsafe(session_it);
164  } catch (const ForceDisconnect& e) {
165  disconnect_impl(session_it, write_lock);
166  THROW_MAPD_EXCEPTION(e.what());
167  }
168  return session_it;
169 }
170 
171 #ifdef ENABLE_GEOS
172 extern std::unique_ptr<std::string> g_libgeos_so_filename;
173 #endif
174 
175 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
176  const std::vector<LeafHostInfo>& string_leaves,
177  const std::string& base_data_path,
178  const bool cpu_only,
179  const bool allow_multifrag,
180  const bool jit_debug,
181  const bool intel_jit_profile,
182  const bool read_only,
183  const bool allow_loop_joins,
184  const bool enable_rendering,
185  const bool renderer_use_vulkan_driver,
186  const bool enable_auto_clear_render_mem,
187  const int render_oom_retry_threshold,
188  const size_t render_mem_bytes,
189  const size_t max_concurrent_render_sessions,
190  const int num_gpus,
191  const int start_gpu,
192  const size_t reserved_gpu_mem,
193  const bool render_compositor_use_last_gpu,
194  const size_t num_reader_threads,
195  const AuthMetadata& authMetadata,
196  const SystemParameters& system_parameters,
197  const bool legacy_syntax,
198  const int idle_session_duration,
199  const int max_session_duration,
200  const bool enable_runtime_udf_registration,
201  const std::string& udf_filename,
202  const std::string& clang_path,
203  const std::vector<std::string>& clang_options,
204 #ifdef ENABLE_GEOS
205  const std::string& libgeos_so_filename,
206 #endif
207  const DiskCacheConfig& disk_cache_config)
208  : leaf_aggregator_(db_leaves)
209  , string_leaves_(string_leaves)
210  , base_data_path_(base_data_path)
211  , random_gen_(std::random_device{}())
212  , session_id_dist_(0, INT32_MAX)
213  , jit_debug_(jit_debug)
214  , intel_jit_profile_(intel_jit_profile)
215  , allow_multifrag_(allow_multifrag)
216  , read_only_(read_only)
217  , allow_loop_joins_(allow_loop_joins)
218  , authMetadata_(authMetadata)
219  , system_parameters_(system_parameters)
220  , legacy_syntax_(legacy_syntax)
221  , dispatch_queue_(
222  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
223  , super_user_rights_(false)
224  , idle_session_duration_(idle_session_duration * 60)
225  , max_session_duration_(max_session_duration * 60)
226  , runtime_udf_registration_enabled_(enable_runtime_udf_registration) {
227  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
228  // Register foreign storage interfaces here
230  bool is_rendering_enabled = enable_rendering;
231 
232  try {
233  if (cpu_only) {
234  is_rendering_enabled = false;
235  executor_device_type_ = ExecutorDeviceType::CPU;
236  cpu_mode_only_ = true;
237  } else {
238 #ifdef HAVE_CUDA
239  executor_device_type_ = ExecutorDeviceType::GPU;
240  cpu_mode_only_ = false;
241 #else
242  executor_device_type_ = ExecutorDeviceType::CPU;
243  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
244  cpu_mode_only_ = true;
245  is_rendering_enabled = false;
246 #endif
247  }
248  } catch (const std::exception& e) {
249  LOG(FATAL) << "Failed to executor device type: " << e.what();
250  }
251 
252  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
253  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
254  // manage cannot ask for
255  size_t total_reserved = reserved_gpu_mem;
256  if (is_rendering_enabled) {
257  total_reserved += render_mem_bytes;
258  }
259 
260  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
261 #ifdef HAVE_CUDA
262  if (!cpu_mode_only_ || is_rendering_enabled) {
263  try {
264  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(num_gpus, start_gpu);
265  } catch (const std::exception& e) {
266  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
267  << e.what();
268  cpu_mode_only_ = true;
269  is_rendering_enabled = false;
270  }
271  }
272 #endif // HAVE_CUDA
273 
274  try {
275  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
276  system_parameters,
277  std::move(cuda_mgr),
278  !cpu_mode_only_,
279  total_reserved,
280  num_reader_threads,
281  disk_cache_config));
282  } catch (const std::exception& e) {
283  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
284  }
285 
286  std::string udf_ast_filename("");
287 
288  try {
289  if (!udf_filename.empty()) {
290  const auto cuda_mgr = data_mgr_->getCudaMgr();
291  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
292  cuda_mgr ? cuda_mgr->getDeviceArch()
294  UdfCompiler compiler(udf_filename, device_arch, clang_path, clang_options);
295  int compile_result = compiler.compileUdf();
296 
297  if (compile_result == 0) {
298  udf_ast_filename = compiler.getAstFileName();
299  }
300  }
301  } catch (const std::exception& e) {
302  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
303  }
304 
305  try {
306  calcite_ =
307  std::make_shared<Calcite>(system_parameters, base_data_path_, udf_ast_filename);
308  } catch (const std::exception& e) {
309  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
310  }
311 
312  try {
313  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
314  if (!udf_filename.empty()) {
315  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
316  }
317  } catch (const std::exception& e) {
318  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
319  }
320 
321  try {
323  } catch (const std::exception& e) {
324  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
325  }
326 
327  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
328  executor_device_type_ = ExecutorDeviceType::CPU;
329  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
330  cpu_mode_only_ = true;
331  }
332 
333  switch (executor_device_type_) {
335  LOG(INFO) << "Started in GPU mode" << std::endl;
336  break;
338  LOG(INFO) << "Started in CPU mode" << std::endl;
339  break;
340  }
341 
342  try {
343  SysCatalog::instance().init(base_data_path_,
344  data_mgr_,
345  authMetadata,
346  calcite_,
347  false,
348  !db_leaves.empty(),
349  string_leaves_);
350  } catch (const std::exception& e) {
351  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
352  }
353 
354  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
355  start_time_ = std::time(nullptr);
356 
357  if (is_rendering_enabled) {
358  try {
359  render_handler_.reset(new RenderHandler(this,
360  render_mem_bytes,
361  max_concurrent_render_sessions,
362  render_compositor_use_last_gpu,
363  false,
364  0,
365  false,
366  system_parameters_));
367  } catch (const std::exception& e) {
368  LOG(ERROR) << "Backend rendering disabled: " << e.what();
369  }
370  }
371 
372  if (leaf_aggregator_.leafCount() > 0) {
373  try {
374  agg_handler_.reset(new MapDAggHandler(this));
375  } catch (const std::exception& e) {
376  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
377  }
378  } else if (g_cluster) {
379  try {
380  leaf_handler_.reset(new MapDLeafHandler(this));
381  } catch (const std::exception& e) {
382  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
383  }
384  }
385 
386 #ifdef ENABLE_GEOS
387  if (!libgeos_so_filename.empty()) {
388  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename));
389  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
390  }
391 #endif
392 }
393 
395 
397  const std::string& query_str,
398  std::list<std::unique_ptr<Parser::Stmt>>& parse_trees) {
399  int num_parse_errors = 0;
400  std::string last_parsed;
401  SQLParser parser;
402  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
403  if (num_parse_errors > 0) {
404  throw std::runtime_error("Syntax error at: " + last_parsed);
405  }
406  if (parse_trees.size() > 1) {
407  throw std::runtime_error("multiple SQL statements not allowed");
408  } else if (parse_trees.size() != 1) {
409  throw std::runtime_error("empty SQL statment not allowed");
410  }
411 }
412 void DBHandler::check_read_only(const std::string& str) {
413  if (DBHandler::read_only_) {
414  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
415  }
416 }
417 
419  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
420  // We would create an in memory session for calcite with super user privileges which
421  // would be used for getting all tables metadata when a user runs the query. The
422  // session would be under the name of a proxy user/password which would only persist
423  // till server's lifetime or execution of calcite query(in memory) whichever is the
424  // earliest.
425  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
426  std::string session_id;
427  do {
428  session_id = generate_random_string(64);
429  } while (sessions_.find(session_id) != sessions_.end());
430  Catalog_Namespace::UserMetadata user_meta(-1,
431  calcite_->getInternalSessionProxyUserName(),
432  calcite_->getInternalSessionProxyPassword(),
433  true,
434  -1,
435  true);
436  const auto emplace_ret =
437  sessions_.emplace(session_id,
438  std::make_shared<Catalog_Namespace::SessionInfo>(
439  catalog_ptr, user_meta, executor_device_type_, session_id));
440  CHECK(emplace_ret.second);
441  return session_id;
442 }
443 
445  const Catalog_Namespace::UserMetadata user_meta) {
446  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
447  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
448  user_meta.isSuper.load();
449 }
450 
451 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
452  // Remove InMemory calcite Session.
453  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
454  const auto it = sessions_.find(session_id);
455  CHECK(it != sessions_.end());
456  sessions_.erase(it);
457 }
458 
459 // internal connection for connections with no password
460 void DBHandler::internal_connect(TSessionId& session,
461  const std::string& username,
462  const std::string& dbname) {
463  auto stdlog = STDLOG(); // session_info set by connect_impl()
464  std::string username2 = username; // login() may reset username given as argument
465  std::string dbname2 = dbname; // login() may reset dbname given as argument
467  std::shared_ptr<Catalog> cat = nullptr;
468  try {
469  cat =
470  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
471  } catch (std::exception& e) {
472  THROW_MAPD_EXCEPTION(e.what());
473  }
474 
475  DBObject dbObject(dbname2, DatabaseDBObjectType);
476  dbObject.loadKey(*cat);
478  std::vector<DBObject> dbObjects;
479  dbObjects.push_back(dbObject);
480  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
481  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
482  " is not allowed to access database " + dbname2 + ".");
483  }
484  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
485 }
486 
487 void DBHandler::krb5_connect(TKrb5Session& session,
488  const std::string& inputToken,
489  const std::string& dbname) {
490  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
491 }
492 
493 void DBHandler::connect(TSessionId& session,
494  const std::string& username,
495  const std::string& passwd,
496  const std::string& dbname) {
497  auto stdlog = STDLOG(); // session_info set by connect_impl()
498  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
499  std::string username2 = username; // login() may reset username given as argument
500  std::string dbname2 = dbname; // login() may reset dbname given as argument
502  std::shared_ptr<Catalog> cat = nullptr;
503  try {
504  cat = SysCatalog::instance().login(
505  dbname2, username2, passwd, user_meta, !super_user_rights_);
506  } catch (std::exception& e) {
507  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
508  THROW_MAPD_EXCEPTION(e.what());
509  }
510 
511  DBObject dbObject(dbname2, DatabaseDBObjectType);
512  dbObject.loadKey(*cat);
514  std::vector<DBObject> dbObjects;
515  dbObjects.push_back(dbObject);
516  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
517  stdlog.appendNameValuePairs(
518  "user", username, "db", dbname, "exception", "Missing Privileges");
519  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
520  " is not allowed to access database " + dbname2 + ".");
521  }
522  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
523  // if pki auth session will come back encrypted with user pubkey
524  SysCatalog::instance().check_for_session_encryption(passwd, session);
525 }
526 
527 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::create_new_session(
528  TSessionId& session,
529  const std::string& dbname,
530  const Catalog_Namespace::UserMetadata& user_meta,
531  std::shared_ptr<Catalog> cat) {
532  do {
533  session = generate_random_string(32);
534  } while (sessions_.find(session) != sessions_.end());
535  std::pair<SessionMap::iterator, bool> emplace_retval =
536  sessions_.emplace(session,
537  std::make_shared<Catalog_Namespace::SessionInfo>(
538  cat, user_meta, executor_device_type_, session));
539  CHECK(emplace_retval.second);
540  auto& session_ptr = emplace_retval.first->second;
541  LOG(INFO) << "User " << user_meta.userName << " connected to database " << dbname;
542  return session_ptr;
543 }
544 
545 void DBHandler::connect_impl(TSessionId& session,
546  const std::string& passwd,
547  const std::string& dbname,
548  const Catalog_Namespace::UserMetadata& user_meta,
549  std::shared_ptr<Catalog> cat,
550  query_state::StdLog& stdlog) {
551  // TODO(sy): Is there any reason to have dbname as a parameter
552  // here when the cat parameter already provides cat->name()?
553  // Should dbname and cat->name() ever differ?
554  {
555  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
556  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
557  stdlog.setSessionInfo(session_ptr);
558  session_ptr->set_connection_info(getConnectionInfo().toString());
559  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
560  // while doing warmup
561  if (leaf_aggregator_.leafCount() > 0) {
562  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
563  return;
564  }
565  }
566  }
567  auto const roles =
568  stdlog.getConstSessionInfo()->get_currentUser().isSuper
569  ? std::vector<std::string>{{"super"}}
570  : SysCatalog::instance().getRoles(
571  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
572  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
573 }
574 
575 void DBHandler::disconnect(const TSessionId& session) {
576  auto stdlog = STDLOG();
577  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
578 
579  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
580  auto session_it = get_session_it_unsafe(session, write_lock);
581  stdlog.setSessionInfo(session_it->second);
582  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
583 
584  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
585  << " disconnected from database " << dbname
586  << " with public_session_id: " << session_it->second->get_public_session_id();
587  disconnect_impl(session_it, write_lock);
588 }
589 
590 void DBHandler::disconnect_impl(const SessionMap::iterator& session_it,
591  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
592  // session_it existence should already have been checked (i.e. called via
593  // get_session_it_unsafe(...))
594  const auto session_id = session_it->second->get_session_id();
595  if (leaf_aggregator_.leafCount() > 0) {
596  leaf_aggregator_.disconnect(session_id);
597  }
598  sessions_.erase(session_it);
599  write_lock.unlock();
600 
601  if (render_handler_) {
602  render_handler_->disconnect(session_id);
603  }
604 }
605 
606 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
607  auto stdlog = STDLOG(get_session_ptr(session));
608  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
609  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
610  auto session_it = get_session_it_unsafe(session, write_lock);
611 
612  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
613 
614  try {
615  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
616  dbname2, session_it->second->get_currentUser().userName);
617  session_it->second->set_catalog_ptr(cat);
618  if (leaf_aggregator_.leafCount() > 0) {
619  leaf_aggregator_.switch_database(session, dbname);
620  return;
621  }
622  } catch (std::exception& e) {
623  THROW_MAPD_EXCEPTION(e.what());
624  }
625 }
626 
627 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
628  auto stdlog = STDLOG(get_session_ptr(session1));
629  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
630  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
631  auto session_it = get_session_it_unsafe(session1, write_lock);
632 
633  try {
634  const Catalog_Namespace::UserMetadata& user_meta =
635  session_it->second->get_currentUser();
636  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
637  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
638  if (leaf_aggregator_.leafCount() > 0) {
639  leaf_aggregator_.clone_session(session1, session2);
640  return;
641  }
642  } catch (std::exception& e) {
643  THROW_MAPD_EXCEPTION(e.what());
644  }
645 }
646 
647 void DBHandler::interrupt(const TSessionId& query_session,
648  const TSessionId& interrupt_session) {
649  // if this is for distributed setting, query_session becomes a parent session (agg)
650  // and the interrupt session is one of existing session in the leaf node (leaf)
651  // so we can think there exists a logical mapping
652  // between query_session (agg) and interrupt_session (leaf)
653  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
654  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
656  // Shared lock to allow simultaneous interrupts of multiple sessions
657  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
658 
659  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
660  auto& cat = session_it->second.get()->getCatalog();
661  const auto dbname = cat.getCurrentDB().dbName;
663  jit_debug_ ? "/tmp" : "",
664  jit_debug_ ? "mapdquery" : "",
666  CHECK(executor);
667 
668  VLOG(1) << "Received interrupt: "
669  << "Session " << *session_it->second << ", Executor " << executor
670  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
671  << session_it->second->get_currentUser().userName << ", Database " << dbname
672  << std::endl;
673 
674  if (leaf_aggregator_.leafCount() > 0) {
675  leaf_aggregator_.interrupt(query_session, interrupt_session);
676  }
677  executor->interrupt(query_session, interrupt_session);
678 
679  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
680  << " interrupted session with database " << dbname << std::endl;
681  }
682 }
683 
684 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
685  auto stdlog = STDLOG(get_session_ptr(session));
686  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
687  const auto rendering_enabled = bool(render_handler_);
688  _return.read_only = read_only_;
689  _return.version = MAPD_RELEASE;
690  _return.rendering_enabled = rendering_enabled;
691  _return.poly_rendering_enabled = rendering_enabled;
692  _return.start_time = start_time_;
693  _return.edition = MAPD_EDITION;
694  _return.host_name = omnisci::get_hostname();
695 }
696 
697 void DBHandler::get_status(std::vector<TServerStatus>& _return,
698  const TSessionId& session) {
699  auto stdlog = STDLOG(get_session_ptr(session));
700  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
701  const auto rendering_enabled = bool(render_handler_);
702  TServerStatus ret;
703  ret.read_only = read_only_;
704  ret.version = MAPD_RELEASE;
705  ret.rendering_enabled = rendering_enabled;
706  ret.poly_rendering_enabled = rendering_enabled;
707  ret.start_time = start_time_;
708  ret.edition = MAPD_EDITION;
709  ret.host_name = omnisci::get_hostname();
710 
711  // TSercivePort tcp_port{}
712 
713  if (g_cluster) {
714  ret.role =
715  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
716  } else {
717  ret.role = TRole::type::SERVER;
718  }
719 
720  _return.push_back(ret);
721  if (leaf_aggregator_.leafCount() > 0) {
722  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
723  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
724  }
725 }
726 
727 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
728  const TSessionId& session) {
729  auto stdlog = STDLOG(get_session_ptr(session));
730  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
731  THardwareInfo ret;
732  const auto cuda_mgr = data_mgr_->getCudaMgr();
733  if (cuda_mgr) {
734  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
735  ret.start_gpu = cuda_mgr->getStartGpu();
736  if (ret.start_gpu >= 0) {
737  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
738  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
739  }
740  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
741  TGpuSpecification gpu_spec;
742  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
743  gpu_spec.num_sm = deviceProperties->numMPs;
744  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
745  gpu_spec.memory = deviceProperties->globalMem;
746  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
747  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
748  ret.gpu_info.push_back(gpu_spec);
749  }
750  }
751 
752  // start hardware/OS dependent code
753  ret.num_cpu_hw = std::thread::hardware_concurrency();
754  // ^ This might return diffrent results in case of hyper threading
755  // end hardware/OS dependent code
756 
757  _return.hardware_info.push_back(ret);
758  if (leaf_aggregator_.leafCount() > 0) {
759  ret.host_name = "aggregator";
760  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
761  _return.hardware_info.insert(_return.hardware_info.end(),
762  leaf_hardware.hardware_info.begin(),
763  leaf_hardware.hardware_info.end());
764  }
765 }
766 
767 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
768  auto session_ptr = get_session_ptr(session);
769  auto stdlog = STDLOG(session_ptr);
770  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
771  auto user_metadata = session_ptr->get_currentUser();
772 
773  _return.user = user_metadata.userName;
774  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
775  _return.start_time = session_ptr->get_start_time();
776  _return.is_super = user_metadata.isSuper;
777 }
778 
780  const SQLTypeInfo& ti,
781  TColumn& column) {
782  if (ti.is_array()) {
783  TColumn tColumn;
784  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
785  CHECK(array_tv);
786  bool is_null = !array_tv->is_initialized();
787  if (!is_null) {
788  const auto& vec = array_tv->get();
789  for (const auto& elem_tv : vec) {
790  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
791  }
792  }
793  column.data.arr_col.push_back(tColumn);
794  column.nulls.push_back(is_null && !ti.get_notnull());
795  } else if (ti.is_geometry()) {
796  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
797  if (scalar_tv) {
798  auto s_n = boost::get<NullableString>(scalar_tv);
799  auto s = boost::get<std::string>(s_n);
800  if (s) {
801  column.data.str_col.push_back(*s);
802  } else {
803  column.data.str_col.emplace_back(""); // null string
804  auto null_p = boost::get<void*>(s_n);
805  CHECK(null_p && !*null_p);
806  }
807  column.nulls.push_back(!s && !ti.get_notnull());
808  } else {
809  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
810  CHECK(array_tv);
811  bool is_null = !array_tv->is_initialized();
812  if (!is_null) {
813  auto elem_type = SQLTypeInfo(kDOUBLE, false);
814  TColumn tColumn;
815  const auto& vec = array_tv->get();
816  for (const auto& elem_tv : vec) {
817  value_to_thrift_column(elem_tv, elem_type, tColumn);
818  }
819  column.data.arr_col.push_back(tColumn);
820  column.nulls.push_back(false);
821  } else {
822  TColumn tColumn;
823  column.data.arr_col.push_back(tColumn);
824  column.nulls.push_back(is_null && !ti.get_notnull());
825  }
826  }
827  } else {
828  CHECK(!ti.is_column());
829  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
830  CHECK(scalar_tv);
831  if (boost::get<int64_t>(scalar_tv)) {
832  int64_t data = *(boost::get<int64_t>(scalar_tv));
833 
834  if (ti.is_decimal()) {
835  double val = static_cast<double>(data);
836  if (ti.get_scale() > 0) {
837  val /= pow(10.0, std::abs(ti.get_scale()));
838  }
839  column.data.real_col.push_back(val);
840  } else {
841  column.data.int_col.push_back(data);
842  }
843 
844  switch (ti.get_type()) {
845  case kBOOLEAN:
846  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
847  break;
848  case kTINYINT:
849  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
850  break;
851  case kSMALLINT:
852  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
853  break;
854  case kINT:
855  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
856  break;
857  case kNUMERIC:
858  case kDECIMAL:
859  case kBIGINT:
860  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
861  break;
862  case kTIME:
863  case kTIMESTAMP:
864  case kDATE:
865  case kINTERVAL_DAY_TIME:
867  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
868  break;
869  default:
870  column.nulls.push_back(false);
871  }
872  } else if (boost::get<double>(scalar_tv)) {
873  double data = *(boost::get<double>(scalar_tv));
874  column.data.real_col.push_back(data);
875  if (ti.get_type() == kFLOAT) {
876  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
877  } else {
878  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
879  }
880  } else if (boost::get<float>(scalar_tv)) {
881  CHECK_EQ(kFLOAT, ti.get_type());
882  float data = *(boost::get<float>(scalar_tv));
883  column.data.real_col.push_back(data);
884  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
885  } else if (boost::get<NullableString>(scalar_tv)) {
886  auto s_n = boost::get<NullableString>(scalar_tv);
887  auto s = boost::get<std::string>(s_n);
888  if (s) {
889  column.data.str_col.push_back(*s);
890  } else {
891  column.data.str_col.emplace_back(""); // null string
892  auto null_p = boost::get<void*>(s_n);
893  CHECK(null_p && !*null_p);
894  }
895  column.nulls.push_back(!s && !ti.get_notnull());
896  } else {
897  CHECK(false);
898  }
899  }
900 }
901 
902 TDatum DBHandler::value_to_thrift(const TargetValue& tv, const SQLTypeInfo& ti) {
903  TDatum datum;
904  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
905  if (!scalar_tv) {
906  CHECK(ti.is_array());
907  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
908  CHECK(array_tv);
909  if (array_tv->is_initialized()) {
910  const auto& vec = array_tv->get();
911  for (const auto& elem_tv : vec) {
912  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
913  datum.val.arr_val.push_back(scalar_col_val);
914  }
915  // Datum is not null, at worst it's an empty array Datum
916  datum.is_null = false;
917  } else {
918  datum.is_null = true;
919  }
920  return datum;
921  }
922  if (boost::get<int64_t>(scalar_tv)) {
923  int64_t data = *(boost::get<int64_t>(scalar_tv));
924 
925  if (ti.is_decimal()) {
926  double val = static_cast<double>(data);
927  if (ti.get_scale() > 0) {
928  val /= pow(10.0, std::abs(ti.get_scale()));
929  }
930  datum.val.real_val = val;
931  } else {
932  datum.val.int_val = data;
933  }
934 
935  switch (ti.get_type()) {
936  case kBOOLEAN:
937  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
938  break;
939  case kTINYINT:
940  datum.is_null = (datum.val.int_val == NULL_TINYINT);
941  break;
942  case kSMALLINT:
943  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
944  break;
945  case kINT:
946  datum.is_null = (datum.val.int_val == NULL_INT);
947  break;
948  case kDECIMAL:
949  case kNUMERIC:
950  case kBIGINT:
951  datum.is_null = (datum.val.int_val == NULL_BIGINT);
952  break;
953  case kTIME:
954  case kTIMESTAMP:
955  case kDATE:
956  case kINTERVAL_DAY_TIME:
958  datum.is_null = (datum.val.int_val == NULL_BIGINT);
959  break;
960  default:
961  datum.is_null = false;
962  }
963  } else if (boost::get<double>(scalar_tv)) {
964  datum.val.real_val = *(boost::get<double>(scalar_tv));
965  if (ti.get_type() == kFLOAT) {
966  datum.is_null = (datum.val.real_val == NULL_FLOAT);
967  } else {
968  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
969  }
970  } else if (boost::get<float>(scalar_tv)) {
971  CHECK_EQ(kFLOAT, ti.get_type());
972  datum.val.real_val = *(boost::get<float>(scalar_tv));
973  datum.is_null = (datum.val.real_val == NULL_FLOAT);
974  } else if (boost::get<NullableString>(scalar_tv)) {
975  auto s_n = boost::get<NullableString>(scalar_tv);
976  auto s = boost::get<std::string>(s_n);
977  if (s) {
978  datum.val.str_val = *s;
979  } else {
980  auto null_p = boost::get<void*>(s_n);
981  CHECK(null_p && !*null_p);
982  }
983  datum.is_null = !s;
984  } else {
985  CHECK(false);
986  }
987  return datum;
988 }
989 
990 void DBHandler::sql_execute(TQueryResult& _return,
991  const TSessionId& session,
992  const std::string& query_str,
993  const bool column_format,
994  const std::string& nonce,
995  const int32_t first_n,
996  const int32_t at_most_n) {
997  auto session_ptr = get_session_ptr(session);
998  auto query_state = create_query_state(session_ptr, query_str);
999  auto stdlog = STDLOG(session_ptr, query_state);
1000  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1001  stdlog.appendNameValuePairs("nonce", nonce);
1002  auto timer = DEBUG_TIMER(__func__);
1003 
1004  try {
1005  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
1006  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1007  };
1008 
1009  if (first_n >= 0 && at_most_n >= 0) {
1011  std::string("At most one of first_n and at_most_n can be set"));
1012  }
1013 
1014  if (leaf_aggregator_.leafCount() > 0) {
1015  if (!agg_handler_) {
1016  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
1017  }
1018  _return.total_time_ms = measure<>::execution([&]() {
1019  agg_handler_->cluster_execute(_return,
1020  query_state->createQueryStateProxy(),
1021  query_state->getQueryStr(),
1022  column_format,
1023  nonce,
1024  first_n,
1025  at_most_n,
1027  });
1028  _return.nonce = nonce;
1029  } else {
1030  _return.total_time_ms = measure<>::execution([&]() {
1032  query_state->createQueryStateProxy(),
1033  column_format,
1034  nonce,
1035  session_ptr->get_executor_device_type(),
1036  first_n,
1037  at_most_n);
1038  });
1039  }
1040 
1041  // if the SQL statement we just executed was a geo COPY FROM, the import
1042  // parameters were captured, and this flag set, so we do the actual import here
1043  if (auto geo_copy_from_state =
1044  geo_copy_from_sessions(session_ptr->get_session_id())) {
1045  // import_geo_table() calls create_table() which calls this function to
1046  // do the work, so reset the flag now to avoid executing this part a
1047  // second time at the end of that, which would fail as the table was
1048  // already created! Also reset the flag with a ScopeGuard on exiting
1049  // this function any other way, such as an exception from the code above!
1050  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1051 
1052  // create table as replicated?
1053  TCreateParams create_params;
1054  if (geo_copy_from_state->geo_copy_from_partitions == "REPLICATED") {
1055  create_params.is_replicated = true;
1056  }
1057 
1058  // now do (and time) the import
1059  _return.total_time_ms = measure<>::execution([&]() {
1061  session,
1062  geo_copy_from_state->geo_copy_from_table,
1063  geo_copy_from_state->geo_copy_from_file_name,
1064  copyparams_to_thrift(geo_copy_from_state->geo_copy_from_copy_params),
1065  TRowDescriptor(),
1066  create_params);
1067  });
1068  }
1069  std::string debug_json = timer.stopAndGetJson();
1070  if (!debug_json.empty()) {
1071  _return.__set_debug(std::move(debug_json));
1072  }
1073  stdlog.appendNameValuePairs(
1074  "execution_time_ms",
1075  _return.execution_time_ms,
1076  "total_time_ms", // BE-3420 - Redundant with duration field
1077  stdlog.duration<std::chrono::milliseconds>());
1078  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1079  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1080  } catch (const std::exception& e) {
1081  if (strstr(e.what(), "java.lang.NullPointerException")) {
1082  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
1083  "query failed from broken view or other schema related issue");
1084  } else if (strstr(e.what(), "Parse failed: Encountered \";\"")) {
1085  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1086  } else if (strstr(e.what(),
1087  "Parse failed: Encountered \"<EOF>\" at line 0, column 0")) {
1088  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1089  } else {
1090  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1091  }
1092  }
1093 }
1094 
1095 void DBHandler::sql_execute_df(TDataFrame& _return,
1096  const TSessionId& session,
1097  const std::string& query_str,
1098  const TDeviceType::type device_type,
1099  const int32_t device_id,
1100  const int32_t first_n,
1101  const TArrowTransport::type transport_method) {
1102  auto session_ptr = get_session_ptr(session);
1103  auto query_state = create_query_state(session_ptr, query_str);
1104  auto stdlog = STDLOG(session_ptr, query_state);
1105 
1106  if (device_type == TDeviceType::GPU) {
1107  const auto executor_device_type = session_ptr->get_executor_device_type();
1108  if (executor_device_type != ExecutorDeviceType::GPU) {
1110  std::string("Exception: GPU mode is not allowed in this session"));
1111  }
1112  if (!data_mgr_->gpusPresent()) {
1113  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
1114  }
1115  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1117  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
1118  }
1119  }
1120  _return.execution_time_ms = 0;
1121 
1122  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
1125 
1126  try {
1127  ParserWrapper pw{query_str};
1128  if (!pw.is_ddl && !pw.is_update_dml &&
1129  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
1130  std::string query_ra;
1132  _return.execution_time_ms += measure<>::execution([&]() {
1133  TPlanResult result;
1134  std::tie(result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1135  query_str,
1136  {},
1137  true,
1139  query_ra = result.plan_result;
1140  });
1141 
1142  if (pw.isCalciteExplain()) {
1143  throw std::runtime_error("explain is not unsupported by current thrift API");
1144  }
1145  execute_rel_alg_df(_return,
1146  query_ra,
1147  query_state->createQueryStateProxy(),
1148  *session_ptr,
1149  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
1151  static_cast<size_t>(device_id),
1152  first_n,
1153  transport_method);
1154  return;
1155  }
1156  } catch (std::exception& e) {
1157  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1158  }
1160  "Exception: DDL or update DML are not unsupported by current thrift API");
1161 }
1162 
1163 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1164  const TSessionId& session,
1165  const std::string& query_str,
1166  const int32_t device_id,
1167  const int32_t first_n) {
1168  auto stdlog = STDLOG(get_session_ptr(session));
1169  sql_execute_df(_return,
1170  session,
1171  query_str,
1172  TDeviceType::GPU,
1173  device_id,
1174  first_n,
1175  TArrowTransport::SHARED_MEMORY);
1176 }
1177 
1178 // For now we have only one user of a data frame in all cases.
1179 void DBHandler::deallocate_df(const TSessionId& session,
1180  const TDataFrame& df,
1181  const TDeviceType::type device_type,
1182  const int32_t device_id) {
1183  auto stdlog = STDLOG(get_session_ptr(session));
1184  std::string serialized_cuda_handle = "";
1185  if (device_type == TDeviceType::GPU) {
1186  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1187  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1188  TOmniSciException ex;
1189  ex.error_msg = std::string(
1190  "Exception: current data frame handle is not bookkept or been inserted "
1191  "twice");
1192  LOG(ERROR) << ex.error_msg;
1193  throw ex;
1194  }
1195  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1196  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1197  }
1198  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1199  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1201  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1203  result,
1204  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1205  device_id,
1206  data_mgr_);
1207 }
1208 
1209 std::string DBHandler::apply_copy_to_shim(const std::string& query_str) {
1210  auto result = query_str;
1211  {
1212  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
1213  // boost::regex::icase};
1214  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
1215  boost::regex::extended | boost::regex::icase};
1216  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
1217  result.replace(
1218  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
1219  });
1220  }
1221  return result;
1222 }
1223 
1224 void DBHandler::sql_validate(TRowDescriptor& _return,
1225  const TSessionId& session,
1226  const std::string& query_str) {
1227  try {
1228  auto stdlog = STDLOG(get_session_ptr(session));
1229  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1230  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1231  stdlog.setQueryState(query_state);
1232 
1233  ParserWrapper pw{query_str};
1234  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1235  pw.is_update_dml) {
1236  throw std::runtime_error("Can only validate SELECT statements.");
1237  }
1238 
1239  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
1242 
1243  TPlanResult parse_result;
1245  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1246  query_state->getQueryStr(),
1247  {},
1248  true,
1250  /*check_privileges=*/true);
1251  const auto query_ra = parse_result.plan_result;
1252 
1253  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1254  _return = fixup_row_descriptor(result.row_set.row_desc,
1255  query_state->getConstSessionInfo()->getCatalog());
1256  } catch (const std::exception& e) {
1257  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
1258  }
1259 }
1260 
1261 namespace {
1262 
1264  std::unordered_set<std::string> uc_column_names;
1265  std::unordered_set<std::string> uc_column_table_qualifiers;
1266 };
1267 
1268 // Extract what looks like a (qualified) identifier from the partial query.
1269 // The results will be used to rank the auto-completion results: tables which
1270 // contain at least one of the identifiers first.
1272  const std::string& sql) {
1273  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1274  boost::regex::extended | boost::regex::icase};
1275  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1276  boost::sregex_token_iterator end;
1277  std::unordered_set<std::string> uc_column_names;
1278  std::unordered_set<std::string> uc_column_table_qualifiers;
1279  for (; tok_it != end; ++tok_it) {
1280  std::string column_name = *tok_it;
1281  std::vector<std::string> column_tokens;
1282  boost::split(column_tokens, column_name, boost::is_any_of("."));
1283  if (column_tokens.size() == 2) {
1284  // If the column name is qualified, take user's word.
1285  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1286  } else {
1287  uc_column_names.insert(to_upper(column_name));
1288  }
1289  }
1290  return {uc_column_names, uc_column_table_qualifiers};
1291 }
1292 
1293 } // namespace
1294 
1295 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1296  const TSessionId& session,
1297  const std::string& sql,
1298  const int cursor) {
1299  auto stdlog = STDLOG(get_session_ptr(session));
1300  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1301  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1302  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1303  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1304  proj_tokens.uc_column_names, visible_tables, stdlog);
1305  // Add the table qualifiers explicitly specified by the user.
1306  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1307  proj_tokens.uc_column_table_qualifiers.end());
1308  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1309  std::sort(
1310  hints.begin(),
1311  hints.end(),
1312  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1313  if (lhs.type == TCompletionHintType::TABLE &&
1314  rhs.type == TCompletionHintType::TABLE) {
1315  // Between two tables, one which is compatible with the specified
1316  // projections and one which isn't, pick the one which is compatible.
1317  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1318  compatible_table_names.end() &&
1319  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1320  compatible_table_names.end()) {
1321  return true;
1322  }
1323  }
1324  return lhs.type < rhs.type;
1325  });
1326 }
1327 
1328 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1329  std::vector<std::string>& visible_tables,
1330  query_state::StdLog& stdlog,
1331  const std::string& sql,
1332  const int cursor) {
1333  const auto& session_info = *stdlog.getConstSessionInfo();
1334  try {
1335  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1336  // Filter out keywords suggested by Calcite which we don't support.
1338  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1339  } catch (const std::exception& e) {
1340  TOmniSciException ex;
1341  ex.error_msg = "Exception: " + std::string(e.what());
1342  LOG(ERROR) << ex.error_msg;
1343  throw ex;
1344  }
1345  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1346  const size_t length_to_cursor =
1347  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1348  // Trust hints from Calcite after the FROM keyword.
1349  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1350  return;
1351  }
1352  // Before FROM, the query is too incomplete for context-sensitive completions.
1353  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1354 }
1355 
1356 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1357  query_state::StdLog& stdlog,
1358  std::vector<std::string>& visible_tables,
1359  const std::string& sql,
1360  const int cursor) {
1361  const auto last_word =
1362  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1363  boost::regex select_expr{R"(\s*select\s+)",
1364  boost::regex::extended | boost::regex::icase};
1365  const size_t length_to_cursor =
1366  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1367  // After SELECT but before FROM, look for all columns in all tables which match the
1368  // prefix.
1369  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1370  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1371  // Trust the fully qualified columns the most.
1372  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1373  return;
1374  }
1375  // Not much information to use, just retrieve column names which match the prefix.
1376  if (should_suggest_column_hints(sql)) {
1377  get_column_hints(hints, last_word, column_names_by_table);
1378  return;
1379  }
1380  const std::string kFromKeyword{"FROM"};
1381  if (boost::istarts_with(kFromKeyword, last_word)) {
1382  TCompletionHint keyword_hint;
1383  keyword_hint.type = TCompletionHintType::KEYWORD;
1384  keyword_hint.replaced = last_word;
1385  keyword_hint.hints.emplace_back(kFromKeyword);
1386  hints.push_back(keyword_hint);
1387  }
1388  } else {
1389  const std::string kSelectKeyword{"SELECT"};
1390  if (boost::istarts_with(kSelectKeyword, last_word)) {
1391  TCompletionHint keyword_hint;
1392  keyword_hint.type = TCompletionHintType::KEYWORD;
1393  keyword_hint.replaced = last_word;
1394  keyword_hint.hints.emplace_back(kSelectKeyword);
1395  hints.push_back(keyword_hint);
1396  }
1397  }
1398 }
1399 
1400 std::unordered_map<std::string, std::unordered_set<std::string>>
1401 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1402  query_state::StdLog& stdlog) {
1403  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1404  for (auto it = table_names.begin(); it != table_names.end();) {
1405  TTableDetails table_details;
1406  try {
1407  get_table_details_impl(table_details, stdlog, *it, false, false);
1408  } catch (const TOmniSciException& e) {
1409  // Remove the corrupted Table/View name from the list for further processing.
1410  it = table_names.erase(it);
1411  continue;
1412  }
1413  for (const auto& column_type : table_details.row_desc) {
1414  column_names_by_table[*it].emplace(column_type.col_name);
1415  }
1416  ++it;
1417  }
1418  return column_names_by_table;
1419 }
1420 
1424 }
1425 
1427  const std::unordered_set<std::string>& uc_column_names,
1428  std::vector<std::string>& table_names,
1429  query_state::StdLog& stdlog) {
1430  std::unordered_set<std::string> compatible_table_names_by_column;
1431  for (auto it = table_names.begin(); it != table_names.end();) {
1432  TTableDetails table_details;
1433  try {
1434  get_table_details_impl(table_details, stdlog, *it, false, false);
1435  } catch (const TOmniSciException& e) {
1436  // Remove the corrupted Table/View name from the list for further processing.
1437  it = table_names.erase(it);
1438  continue;
1439  }
1440  for (const auto& column_type : table_details.row_desc) {
1441  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1442  compatible_table_names_by_column.emplace(to_upper(*it));
1443  break;
1444  }
1445  }
1446  ++it;
1447  }
1448  return compatible_table_names_by_column;
1449 }
1450 
1451 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1452  QueryStateProxy query_state_proxy) {
1453  TQueryResult result;
1454  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1455  [this, &result, &query_state_proxy, &query_ra](const size_t executor_index) {
1456  execute_rel_alg(result,
1457  query_state_proxy,
1458  query_ra,
1459  true,
1461  -1,
1462  -1,
1463  /*just_validate=*/true,
1464  /*find_filter_push_down_candidates=*/false,
1466  executor_index);
1467  });
1469  dispatch_queue_->submit(execute_rel_alg_task, /*is_update_delete=*/false);
1470  auto result_future = execute_rel_alg_task->get_future();
1471  result_future.get();
1472  return result;
1473 }
1474 
1475 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1476  auto stdlog = STDLOG(get_session_ptr(session));
1477  auto session_ptr = stdlog.getConstSessionInfo();
1478  if (!session_ptr->get_currentUser().isSuper) {
1479  // WARNING: This appears to not include roles a user is a member of,
1480  // if the role has no permissions granted to it.
1481  roles =
1482  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1483  session_ptr->getCatalog().getCurrentDB().dbId);
1484  } else {
1485  roles = SysCatalog::instance().getRoles(
1486  false, true, session_ptr->get_currentUser().userName);
1487  }
1488 }
1489 
1490 bool DBHandler::has_role(const TSessionId& sessionId,
1491  const std::string& granteeName,
1492  const std::string& roleName) {
1493  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1494  const auto session_ptr = stdlog.getConstSessionInfo();
1495  const auto current_user = session_ptr->get_currentUser();
1496  if (!current_user.isSuper) {
1497  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1498  user && current_user.userName != granteeName) {
1499  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1500  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1501  current_user.userName, granteeName, true)) {
1503  "Only super users can check roles assignment that have not been directly "
1504  "granted to a user.");
1505  }
1506  }
1507  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1508 }
1509 
1510 static TDBObject serialize_db_object(const std::string& roleName,
1511  const DBObject& inObject) {
1512  TDBObject outObject;
1513  outObject.objectName = inObject.getName();
1514  outObject.grantee = roleName;
1515  const auto ap = inObject.getPrivileges();
1516  switch (inObject.getObjectKey().permissionType) {
1517  case DatabaseDBObjectType:
1518  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1519  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1520  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1521  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1522  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1523 
1524  break;
1525  case TableDBObjectType:
1526  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1527  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1528  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1529  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1530  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1531  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1532  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1533  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1534  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1535 
1536  break;
1537  case DashboardDBObjectType:
1538  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1539  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1540  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1541  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1542  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1543 
1544  break;
1545  case ViewDBObjectType:
1546  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1547  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1548  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1549  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1550  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1551  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1552  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1553 
1554  break;
1555  default:
1556  CHECK(false);
1557  }
1558  const int type_val = static_cast<int>(inObject.getType());
1559  CHECK(type_val >= 0 && type_val < 5);
1560  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1561  return outObject;
1562 }
1563 
1565  const TDBObjectPermissions& permissions) {
1566  if (!permissions.__isset.database_permissions_) {
1567  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1568  }
1569  auto perms = permissions.database_permissions_;
1570  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1571  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1572  (perms.view_sql_editor_ &&
1574  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1575  return false;
1576  } else {
1577  return true;
1578  }
1579 }
1580 
1582  const TDBObjectPermissions& permissions) {
1583  if (!permissions.__isset.table_permissions_) {
1584  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1585  }
1586  auto perms = permissions.table_permissions_;
1587  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1588  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1589  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1590  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1591  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1592  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1593  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1594  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1595  return false;
1596  } else {
1597  return true;
1598  }
1599 }
1600 
1602  const TDBObjectPermissions& permissions) {
1603  if (!permissions.__isset.dashboard_permissions_) {
1604  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1605  }
1606  auto perms = permissions.dashboard_permissions_;
1607  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1608  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1609  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1610  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1611  return false;
1612  } else {
1613  return true;
1614  }
1615 }
1616 
1618  const TDBObjectPermissions& permissions) {
1619  if (!permissions.__isset.view_permissions_) {
1620  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1621  }
1622  auto perms = permissions.view_permissions_;
1623  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1624  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1625  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1626  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1627  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1628  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1629  return false;
1630  } else {
1631  return true;
1632  }
1633 }
1634 
1635 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1636  const std::string& granteeName,
1637  const std::string& objectName,
1638  const TDBObjectType::type objectType,
1639  const TDBObjectPermissions& permissions) {
1640  auto stdlog = STDLOG(get_session_ptr(sessionId));
1641  auto session_ptr = stdlog.getConstSessionInfo();
1642  auto const& cat = session_ptr->getCatalog();
1643  auto const& current_user = session_ptr->get_currentUser();
1644  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1645  current_user.userName, granteeName, false)) {
1647  "Users except superusers can only check privileges for self or roles granted "
1648  "to "
1649  "them.")
1650  }
1652  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1653  user_meta.isSuper) {
1654  return true;
1655  }
1656  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1657  if (!grnt) {
1658  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1659  }
1661  std::string func_name;
1662  switch (objectType) {
1665  func_name = "database";
1666  break;
1669  func_name = "table";
1670  break;
1673  func_name = "dashboard";
1674  break;
1677  func_name = "view";
1678  break;
1679  default:
1680  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1681  }
1682  DBObject req_object(objectName, type);
1683  req_object.loadKey(cat);
1684 
1685  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1686  if (grantee_object) {
1687  // if grantee has privs on the object
1688  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1689  } else {
1690  // no privileges on that object
1691  return false;
1692  }
1693 }
1694 
1695 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1696  const TSessionId& sessionId,
1697  const std::string& roleName) {
1698  auto stdlog = STDLOG(get_session_ptr(sessionId));
1699  auto session_ptr = stdlog.getConstSessionInfo();
1700  auto const& user = session_ptr->get_currentUser();
1701  if (!user.isSuper &&
1702  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1703  return;
1704  }
1705  auto* rl = SysCatalog::instance().getGrantee(roleName);
1706  if (rl) {
1707  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1708  for (auto& dbObject : *rl->getDbObjects(true)) {
1709  if (dbObject.first.dbId != dbId) {
1710  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1711  // usecase for now, though)
1712  continue;
1713  }
1714  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1715  TDBObjectsForRole.push_back(tdbObject);
1716  }
1717  } else {
1718  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
1719  }
1720 }
1721 
1722 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1723  const TSessionId& sessionId,
1724  const std::string& objectName,
1725  const TDBObjectType::type type) {
1726  auto stdlog = STDLOG(get_session_ptr(sessionId));
1727  auto session_ptr = stdlog.getConstSessionInfo();
1728  DBObjectType object_type;
1729  switch (type) {
1731  object_type = DBObjectType::DatabaseDBObjectType;
1732  break;
1734  object_type = DBObjectType::TableDBObjectType;
1735  break;
1738  break;
1740  object_type = DBObjectType::ViewDBObjectType;
1741  break;
1742  default:
1743  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
1744  ": unknown object type (" + std::to_string(type) + ").");
1745  }
1746  DBObject object_to_find(objectName, object_type);
1747 
1748  // TODO(adb): Use DatabaseLock to protect method
1749  try {
1750  if (object_type == DashboardDBObjectType) {
1751  if (objectName == "") {
1752  object_to_find = DBObject(-1, object_type);
1753  } else {
1754  object_to_find = DBObject(std::stoi(objectName), object_type);
1755  }
1756  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1757  !objectName.empty()) {
1758  // special handling for view / table
1759  auto const& cat = session_ptr->getCatalog();
1760  auto td = cat.getMetadataForTable(objectName, false);
1761  if (td) {
1762  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1763  object_to_find = DBObject(objectName, object_type);
1764  }
1765  }
1766  object_to_find.loadKey(session_ptr->getCatalog());
1767  } catch (const std::exception&) {
1768  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
1769  }
1770 
1771  // object type on database level
1772  DBObject object_to_find_dblevel("", object_type);
1773  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
1774  // if user is superuser respond with a full priv
1775  if (session_ptr->get_currentUser().isSuper) {
1776  // using ALL_TABLE here to set max permissions
1777  DBObject dbObj{object_to_find.getObjectKey(),
1779  session_ptr->get_currentUser().userId};
1780  dbObj.setName("super");
1781  TDBObjects.push_back(
1782  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
1783  };
1784 
1785  std::vector<std::string> grantees =
1786  SysCatalog::instance().getRoles(true,
1787  session_ptr->get_currentUser().isSuper,
1788  session_ptr->get_currentUser().userName);
1789  for (const auto& grantee : grantees) {
1790  DBObject* object_found;
1791  auto* gr = SysCatalog::instance().getGrantee(grantee);
1792  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1793  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1794  }
1795  // check object permissions on Database level
1796  if (gr &&
1797  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1798  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1799  }
1800  }
1801 }
1802 
1803 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
1804  const TSessionId& sessionId,
1805  const std::string& granteeName) {
1806  auto stdlog = STDLOG(get_session_ptr(sessionId));
1807  auto session_ptr = stdlog.getConstSessionInfo();
1808  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
1809  if (grantee) {
1810  if (session_ptr->get_currentUser().isSuper) {
1811  roles = grantee->getRoles();
1812  } else if (grantee->isUser()) {
1813  if (session_ptr->get_currentUser().userName == granteeName) {
1814  roles = grantee->getRoles();
1815  } else {
1817  "Only a superuser is authorized to request list of roles granted to "
1818  "another "
1819  "user.");
1820  }
1821  } else {
1822  CHECK(!grantee->isUser());
1823  // granteeName is actually a roleName here and we can check a role
1824  // only if it is granted to us
1825  if (SysCatalog::instance().isRoleGrantedToGrantee(
1826  session_ptr->get_currentUser().userName, granteeName, false)) {
1827  roles = grantee->getRoles();
1828  } else {
1829  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
1830  }
1831  }
1832  } else {
1833  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
1834  }
1835 }
1836 
1837 namespace {
1839  const std::map<std::string, std::vector<std::string>>& table_col_names) {
1840  std::ostringstream oss;
1841  for (const auto& [table_name, col_names] : table_col_names) {
1842  oss << ":" << table_name;
1843  for (const auto& col_name : col_names) {
1844  oss << "," << col_name;
1845  }
1846  }
1847  return oss.str();
1848 }
1849 } // namespace
1850 
1852  TPixelTableRowResult& _return,
1853  const TSessionId& session,
1854  const int64_t widget_id,
1855  const TPixel& pixel,
1856  const std::map<std::string, std::vector<std::string>>& table_col_names,
1857  const bool column_format,
1858  const int32_t pixel_radius,
1859  const std::string& nonce) {
1860  auto stdlog = STDLOG(get_session_ptr(session),
1861  "widget_id",
1862  widget_id,
1863  "pixel.x",
1864  pixel.x,
1865  "pixel.y",
1866  pixel.y,
1867  "column_format",
1868  column_format,
1869  "pixel_radius",
1870  pixel_radius,
1871  "table_col_names",
1872  dump_table_col_names(table_col_names),
1873  "nonce",
1874  nonce);
1875  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1876  auto session_ptr = stdlog.getSessionInfo();
1877  if (!render_handler_) {
1878  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
1879  }
1880 
1881  try {
1882  render_handler_->get_result_row_for_pixel(_return,
1883  session_ptr,
1884  widget_id,
1885  pixel,
1886  table_col_names,
1887  column_format,
1888  pixel_radius,
1889  nonce);
1890  } catch (std::exception& e) {
1891  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1892  }
1893 }
1894 
1896  const ColumnDescriptor* cd) {
1897  TColumnType col_type;
1898  col_type.col_name = cd->columnName;
1899  col_type.src_name = cd->sourceName;
1900  col_type.col_id = cd->columnId;
1901  col_type.col_type.type = type_to_thrift(cd->columnType);
1902  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
1903  col_type.col_type.nullable = !cd->columnType.get_notnull();
1904  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
1905  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
1906  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
1907  }
1908  if (IS_GEO(cd->columnType.get_type())) {
1910  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
1911  } else {
1912  col_type.col_type.precision = cd->columnType.get_precision();
1913  col_type.col_type.scale = cd->columnType.get_scale();
1914  }
1915  col_type.is_system = cd->isSystemCol;
1917  cat != nullptr) {
1918  // have to get the actual size of the encoding from the dictionary definition
1919  const int dict_id = cd->columnType.get_comp_param();
1920  if (!cat->getMetadataForDict(dict_id, false)) {
1921  col_type.col_type.comp_param = 0;
1922  return col_type;
1923  }
1924  auto dd = cat->getMetadataForDict(dict_id, false);
1925  if (!dd) {
1926  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
1927  }
1928  col_type.col_type.comp_param = dd->dictNBits;
1929  } else {
1930  col_type.col_type.comp_param =
1931  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
1932  ? 32
1933  : cd->columnType.get_comp_param();
1934  }
1935  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
1936  return col_type;
1937 }
1938 
1939 void DBHandler::get_internal_table_details(TTableDetails& _return,
1940  const TSessionId& session,
1941  const std::string& table_name) {
1942  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1943  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1944  get_table_details_impl(_return, stdlog, table_name, true, false);
1945 }
1946 
1947 void DBHandler::get_table_details(TTableDetails& _return,
1948  const TSessionId& session,
1949  const std::string& table_name) {
1950  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1951  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1952  get_table_details_impl(_return, stdlog, table_name, false, false);
1953 }
1954 
1955 void DBHandler::get_table_details_impl(TTableDetails& _return,
1956  query_state::StdLog& stdlog,
1957  const std::string& table_name,
1958  const bool get_system,
1959  const bool get_physical) {
1960  try {
1961  auto session_info = stdlog.getSessionInfo();
1962  auto& cat = session_info->getCatalog();
1963  const auto td_with_lock =
1965  cat, table_name, false);
1966  const auto td = td_with_lock();
1967  CHECK(td);
1968 
1969  bool have_privileges_on_view_sources = true;
1970  if (td->isView) {
1971  auto query_state = create_query_state(session_info, td->viewSQL);
1972  stdlog.setQueryState(query_state);
1973  try {
1974  if (hasTableAccessPrivileges(td, *session_info)) {
1975  // TODO(adb): we should take schema read locks on all tables making up the
1976  // view, at minimum
1977  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
1978  query_state->getQueryStr(),
1979  {},
1980  false,
1982  false);
1983  try {
1984  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
1985  query_ra.first);
1986  } catch (const std::runtime_error&) {
1987  have_privileges_on_view_sources = false;
1988  }
1989 
1990  const auto result = validate_rel_alg(query_ra.first.plan_result,
1991  query_state->createQueryStateProxy());
1992 
1993  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
1994  } else {
1995  throw std::runtime_error(
1996  "Unable to access view " + table_name +
1997  ". The view may not exist, or the logged in user may not "
1998  "have permission to access the view.");
1999  }
2000  } catch (const std::exception& e) {
2001  throw std::runtime_error("View '" + table_name +
2002  "' query has failed with an error: '" +
2003  std::string(e.what()) +
2004  "'.\nThe view must be dropped and re-created to "
2005  "resolve the error. \nQuery:\n" +
2006  query_state->getQueryStr());
2007  }
2008  } else {
2009  if (hasTableAccessPrivileges(td, *session_info)) {
2010  const auto col_descriptors =
2011  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
2012  const auto deleted_cd = cat.getDeletedColumn(td);
2013  for (const auto cd : col_descriptors) {
2014  if (cd == deleted_cd) {
2015  continue;
2016  }
2017  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
2018  }
2019  } else {
2020  throw std::runtime_error(
2021  "Unable to access table " + table_name +
2022  ". The table may not exist, or the logged in user may not "
2023  "have permission to access the table.");
2024  }
2025  }
2026  _return.fragment_size = td->maxFragRows;
2027  _return.page_size = td->fragPageSize;
2028  _return.max_rows = td->maxRows;
2029  _return.view_sql =
2030  (have_privileges_on_view_sources ? td->viewSQL
2031  : "[Not enough privileges to see the view SQL]");
2032  _return.shard_count = td->nShards;
2033  _return.key_metainfo = td->keyMetainfo;
2034  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2035  _return.partition_detail =
2036  td->partitions.empty()
2037  ? TPartitionDetail::DEFAULT
2038  : (table_is_replicated(td)
2039  ? TPartitionDetail::REPLICATED
2040  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2041  : TPartitionDetail::OTHER));
2042  } catch (const std::runtime_error& e) {
2043  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2044  }
2045 }
2046 
2047 void DBHandler::get_link_view(TFrontendView& _return,
2048  const TSessionId& session,
2049  const std::string& link) {
2050  auto stdlog = STDLOG(get_session_ptr(session));
2051  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2052  auto session_ptr = stdlog.getConstSessionInfo();
2053  auto const& cat = session_ptr->getCatalog();
2054  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2055  if (!ld) {
2056  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
2057  }
2058  _return.view_state = ld->viewState;
2059  _return.view_name = ld->link;
2060  _return.update_time = ld->updateTime;
2061  _return.view_metadata = ld->viewMetadata;
2062 }
2063 
2065  const TableDescriptor* td,
2066  const Catalog_Namespace::SessionInfo& session_info) {
2067  auto& cat = session_info.getCatalog();
2068  auto user_metadata = session_info.get_currentUser();
2069 
2070  if (user_metadata.isSuper) {
2071  return true;
2072  }
2073 
2075  dbObject.loadKey(cat);
2076  std::vector<DBObject> privObjects = {dbObject};
2077 
2078  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2079 }
2080 
2081 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2082  const Catalog_Namespace::SessionInfo& session_info,
2083  const GetTablesType get_tables_type) {
2084  table_names = session_info.getCatalog().getTableNamesForUser(
2085  session_info.get_currentUser(), get_tables_type);
2086 }
2087 
2088 void DBHandler::get_tables(std::vector<std::string>& table_names,
2089  const TSessionId& session) {
2090  auto stdlog = STDLOG(get_session_ptr(session));
2091  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2093  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2094 }
2095 
2096 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2097  const TSessionId& session) {
2098  auto stdlog = STDLOG(get_session_ptr(session));
2099  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2100  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2101 }
2102 
2103 void DBHandler::get_views(std::vector<std::string>& table_names,
2104  const TSessionId& session) {
2105  auto stdlog = STDLOG(get_session_ptr(session));
2106  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2107  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2108 }
2109 
2110 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2111  QueryStateProxy query_state_proxy,
2112  const Catalog_Namespace::SessionInfo& session_info,
2113  const bool with_table_locks) {
2114  const auto& cat = session_info.getCatalog();
2115  const auto tables = cat.getAllTableMetadata();
2116  _return.reserve(tables.size());
2117 
2118  for (const auto td : tables) {
2119  if (td->shard >= 0) {
2120  // skip shards, they're not standalone tables
2121  continue;
2122  }
2123  if (!hasTableAccessPrivileges(td, session_info)) {
2124  // skip table, as there are no privileges to access it
2125  continue;
2126  }
2127 
2128  TTableMeta ret;
2129  ret.table_name = td->tableName;
2130  ret.is_view = td->isView;
2131  ret.is_replicated = table_is_replicated(td);
2132  ret.shard_count = td->nShards;
2133  ret.max_rows = td->maxRows;
2134  ret.table_id = td->tableId;
2135 
2136  std::vector<TTypeInfo> col_types;
2137  std::vector<std::string> col_names;
2138  size_t num_cols = 0;
2139  if (td->isView) {
2140  try {
2141  TPlanResult parse_result;
2143  std::tie(parse_result, locks) = parse_to_ra(
2144  query_state_proxy, td->viewSQL, {}, with_table_locks, system_parameters_);
2145  const auto query_ra = parse_result.plan_result;
2146 
2147  TQueryResult result;
2148  execute_rel_alg(result,
2149  query_state_proxy,
2150  query_ra,
2151  true,
2153  -1,
2154  -1,
2155  /*just_validate=*/true,
2156  /*find_push_down_candidates=*/false,
2158  num_cols = result.row_set.row_desc.size();
2159  for (const auto& col : result.row_set.row_desc) {
2160  if (col.is_physical) {
2161  num_cols--;
2162  continue;
2163  }
2164  col_types.push_back(col.col_type);
2165  col_names.push_back(col.col_name);
2166  }
2167  } catch (std::exception& e) {
2168  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
2169  }
2170  } else {
2171  try {
2172  if (hasTableAccessPrivileges(td, session_info)) {
2173  const auto col_descriptors =
2174  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2175  const auto deleted_cd = cat.getDeletedColumn(td);
2176  for (const auto cd : col_descriptors) {
2177  if (cd == deleted_cd) {
2178  continue;
2179  }
2180  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2181  col_names.push_back(cd->columnName);
2182  }
2183  num_cols = col_descriptors.size();
2184  } else {
2185  continue;
2186  }
2187  } catch (const std::runtime_error& e) {
2188  THROW_MAPD_EXCEPTION(e.what());
2189  }
2190  }
2191 
2192  ret.num_cols = num_cols;
2193  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2194  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2195 
2196  _return.push_back(ret);
2197  }
2198 }
2199 
2200 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2201  const TSessionId& session) {
2202  auto stdlog = STDLOG(get_session_ptr(session));
2203  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2204  auto session_ptr = stdlog.getConstSessionInfo();
2205  auto query_state = create_query_state(session_ptr, "");
2206  stdlog.setQueryState(query_state);
2207 
2208  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2211 
2212  try {
2213  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2214  } catch (const std::exception& e) {
2215  THROW_MAPD_EXCEPTION(e.what());
2216  }
2217 }
2218 
2219 void DBHandler::get_users(std::vector<std::string>& user_names,
2220  const TSessionId& session) {
2221  auto stdlog = STDLOG(get_session_ptr(session));
2222  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2223  auto session_ptr = stdlog.getConstSessionInfo();
2224  std::list<Catalog_Namespace::UserMetadata> user_list;
2225 
2226  if (!session_ptr->get_currentUser().isSuper) {
2227  user_list = SysCatalog::instance().getAllUserMetadata(
2228  session_ptr->getCatalog().getCurrentDB().dbId);
2229  } else {
2230  user_list = SysCatalog::instance().getAllUserMetadata();
2231  }
2232  for (auto u : user_list) {
2233  user_names.push_back(u.userName);
2234  }
2235 }
2236 
2237 void DBHandler::get_version(std::string& version) {
2238  version = MAPD_RELEASE;
2239 }
2240 
2241 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2242  auto stdlog = STDLOG(get_session_ptr(session));
2243  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2244  auto session_ptr = stdlog.getConstSessionInfo();
2245  if (!session_ptr->get_currentUser().isSuper) {
2246  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2247  }
2248  try {
2250  } catch (const std::exception& e) {
2251  THROW_MAPD_EXCEPTION(e.what());
2252  }
2253  if (render_handler_) {
2254  render_handler_->clear_gpu_memory();
2255  }
2256 
2257  if (leaf_aggregator_.leafCount() > 0) {
2259  }
2260 }
2261 
2262 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2263  auto stdlog = STDLOG(get_session_ptr(session));
2264  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2265  auto session_ptr = stdlog.getConstSessionInfo();
2266  if (!session_ptr->get_currentUser().isSuper) {
2267  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2268  }
2269  try {
2271  } catch (const std::exception& e) {
2272  THROW_MAPD_EXCEPTION(e.what());
2273  }
2274  if (render_handler_) {
2275  render_handler_->clear_cpu_memory();
2276  }
2277 
2278  if (leaf_aggregator_.leafCount() > 0) {
2280  }
2281 }
2282 
2284  return INVALID_SESSION_ID;
2285 }
2286 
2287 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2288  const TSessionId& session,
2289  const std::string& memory_level) {
2290  auto stdlog = STDLOG(get_session_ptr(session));
2291  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2292  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2293  Data_Namespace::MemoryLevel mem_level;
2294  if (!memory_level.compare("gpu")) {
2296  internal_memory =
2297  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2298  } else {
2300  internal_memory =
2301  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2302  }
2303 
2304  for (auto memInfo : internal_memory) {
2305  TNodeMemoryInfo nodeInfo;
2306  if (leaf_aggregator_.leafCount() > 0) {
2307  nodeInfo.host_name = omnisci::get_hostname();
2308  }
2309  nodeInfo.page_size = memInfo.pageSize;
2310  nodeInfo.max_num_pages = memInfo.maxNumPages;
2311  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2312  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2313  for (auto gpu : memInfo.nodeMemoryData) {
2314  TMemoryData md;
2315  md.slab = gpu.slabNum;
2316  md.start_page = gpu.startPage;
2317  md.num_pages = gpu.numPages;
2318  md.touch = gpu.touch;
2319  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2320  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2321  nodeInfo.node_memory_data.push_back(md);
2322  }
2323  _return.push_back(nodeInfo);
2324  }
2325  if (leaf_aggregator_.leafCount() > 0) {
2326  std::vector<TNodeMemoryInfo> leafSummary =
2327  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2328  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2329  }
2330 }
2331 
2332 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2333  auto stdlog = STDLOG(get_session_ptr(session));
2334  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2335  auto session_ptr = stdlog.getConstSessionInfo();
2336  const auto& user = session_ptr->get_currentUser();
2338  SysCatalog::instance().getDatabaseListForUser(user);
2339  for (auto& db : dbs) {
2340  TDBInfo dbinfo;
2341  dbinfo.db_name = std::move(db.dbName);
2342  dbinfo.db_owner = std::move(db.dbOwnerName);
2343  dbinfos.push_back(std::move(dbinfo));
2344  }
2345 }
2346 
2347 void DBHandler::set_execution_mode(const TSessionId& session,
2348  const TExecuteMode::type mode) {
2349  auto stdlog = STDLOG(get_session_ptr(session));
2350  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2351  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
2352  auto session_it = get_session_it_unsafe(session, write_lock);
2353  if (leaf_aggregator_.leafCount() > 0) {
2354  leaf_aggregator_.set_execution_mode(session, mode);
2355  try {
2356  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2357  } catch (const TOmniSciException& e) {
2358  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2359  }
2360  return;
2361  }
2362  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2363 }
2364 
2365 namespace {
2366 
2368  if (td && td->nShards) {
2369  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2370  }
2371 }
2372 
2373 } // namespace
2374 
2375 void DBHandler::load_table_binary(const TSessionId& session,
2376  const std::string& table_name,
2377  const std::vector<TRow>& rows) {
2378  try {
2379  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2380  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2381  auto session_ptr = stdlog.getConstSessionInfo();
2382  check_read_only("load_table_binary");
2383  auto& cat = session_ptr->getCatalog();
2384  const auto td_with_lock =
2386  cat, table_name, true);
2387  const auto td = td_with_lock();
2388  CHECK(td);
2389  if (g_cluster && !leaf_aggregator_.leafCount()) {
2390  // Sharded table rows need to be routed to the leaf by an aggregator.
2392  }
2393  check_table_load_privileges(*session_ptr, table_name);
2394  if (rows.empty()) {
2395  return;
2396  }
2397  std::unique_ptr<import_export::Loader> loader;
2398  if (leaf_aggregator_.leafCount() > 0) {
2399  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2400  } else {
2401  loader.reset(new import_export::Loader(cat, td));
2402  }
2403  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2404  // Subtracting 1 (rowid) until TableDescriptor is updated.
2405  if (rows.front().cols.size() !=
2406  static_cast<size_t>(td->nColumns) - (td->hasDeletedCol ? 2 : 1)) {
2407  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2408  }
2409  auto col_descs = loader->get_column_descs();
2410  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2411  for (auto cd : col_descs) {
2412  import_buffers.push_back(std::unique_ptr<import_export::TypedImportBuffer>(
2413  new import_export::TypedImportBuffer(cd, loader->getStringDict(cd))));
2414  }
2415  for (auto const& row : rows) {
2416  size_t col_idx = 0;
2417  try {
2418  for (auto cd : col_descs) {
2419  import_buffers[col_idx]->add_value(
2420  cd, row.cols[col_idx], row.cols[col_idx].is_null);
2421  col_idx++;
2422  }
2423  } catch (const std::exception& e) {
2424  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2425  import_buffers[col_idx_to_pop]->pop_value();
2426  }
2427  LOG(ERROR) << "Input exception thrown: " << e.what()
2428  << ". Row discarded, issue at column : " << (col_idx + 1)
2429  << " data :" << row;
2430  }
2431  }
2432  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2433  session_ptr->getCatalog(), table_name);
2434  loader->load(import_buffers, rows.size());
2435  } catch (const std::exception& e) {
2436  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2437  }
2438 }
2439 
2440 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
2442  const Catalog_Namespace::SessionInfo& session_info,
2443  const std::string& table_name,
2444  size_t num_cols,
2445  std::unique_ptr<import_export::Loader>* loader,
2446  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers) {
2447  auto& cat = session_info.getCatalog();
2448  auto td_with_lock =
2449  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
2451  cat, table_name, true));
2452  const auto td = (*td_with_lock)();
2453  CHECK(td);
2454  if (g_cluster && !leaf_aggregator_.leafCount()) {
2455  // Sharded table rows need to be routed to the leaf by an aggregator.
2457  }
2458  check_table_load_privileges(session_info, table_name);
2459 
2460  if (leaf_aggregator_.leafCount() > 0) {
2461  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2462  } else {
2463  loader->reset(new import_export::Loader(cat, td));
2464  }
2465 
2466  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2467  // Subtracting 1 (rowid) until TableDescriptor is updated.
2468  auto col_descs = (*loader)->get_column_descs();
2469  const auto geo_physical_cols = std::count_if(
2470  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2471  const auto num_table_cols =
2472  static_cast<size_t>(td->nColumns) - geo_physical_cols - (td->hasDeletedCol ? 2 : 1);
2473  if (num_cols != num_table_cols) {
2474  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
2475  ") does not match number of columns in table " +
2476  td->tableName + " (" + std::to_string(num_table_cols) + ")");
2477  }
2478 
2479  *import_buffers = import_export::setup_column_loaders(td, loader->get());
2480  return std::move(td_with_lock);
2481 }
2482 
2483 void DBHandler::load_table_binary_columnar(const TSessionId& session,
2484  const std::string& table_name,
2485  const std::vector<TColumn>& cols) {
2486  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2487  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2488  auto session_ptr = stdlog.getConstSessionInfo();
2489  check_read_only("load_table_binary_columnar");
2490  auto const& cat = session_ptr->getCatalog();
2491 
2492  std::unique_ptr<import_export::Loader> loader;
2493  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2495  *session_ptr, table_name, cols.size(), &loader, &import_buffers);
2496  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2497  session_ptr->getCatalog(), table_name);
2498 
2499  size_t numRows = 0;
2500  size_t import_idx = 0; // index into the TColumn vector being loaded
2501  size_t col_idx = 0; // index into column description vector
2502  try {
2503  size_t skip_physical_cols = 0;
2504  for (auto cd : loader->get_column_descs()) {
2505  if (skip_physical_cols > 0) {
2506  if (!cd->isGeoPhyCol) {
2507  throw std::runtime_error("Unexpected physical column");
2508  }
2509  skip_physical_cols--;
2510  continue;
2511  }
2512  size_t colRows = import_buffers[col_idx]->add_values(cd, cols[import_idx]);
2513  if (col_idx == 0) {
2514  numRows = colRows;
2515  } else if (colRows != numRows) {
2516  std::ostringstream oss;
2517  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
2518  << cd->columnName << " , expecting " << numRows << " rows, column "
2519  << col_idx << " has " << colRows << " rows";
2520  THROW_MAPD_EXCEPTION(oss.str());
2521  }
2522  // Advance to the next column in the table
2523  col_idx++;
2524 
2525  // For geometry columns: process WKT strings and fill physical columns
2526  if (cd->columnType.is_geometry()) {
2527  auto geo_col_idx = col_idx - 1;
2528  const auto wkt_or_wkb_hex_column =
2529  import_buffers[geo_col_idx]->getGeoStringBuffer();
2530  std::vector<std::vector<double>> coords_column, bounds_column;
2531  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2532  int render_group = 0;
2533  SQLTypeInfo ti = cd->columnType;
2534  if (numRows != wkt_or_wkb_hex_column->size() ||
2535  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
2536  ti,
2537  coords_column,
2538  bounds_column,
2539  ring_sizes_column,
2540  poly_rings_column,
2541  false)) {
2542  std::ostringstream oss;
2543  oss << "load_table_binary_columnar: Invalid geometry in column "
2544  << cd->columnName;
2545  THROW_MAPD_EXCEPTION(oss.str());
2546  }
2547  // Populate physical columns, advance col_idx
2549  cat,
2550  cd,
2551  import_buffers,
2552  col_idx,
2553  coords_column,
2554  bounds_column,
2555  ring_sizes_column,
2556  poly_rings_column,
2557  render_group);
2558  skip_physical_cols = cd->columnType.get_physical_cols();
2559  }
2560  // Advance to the next column of values being loaded
2561  import_idx++;
2562  }
2563  } catch (const std::exception& e) {
2564  std::ostringstream oss;
2565  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
2566  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2567  THROW_MAPD_EXCEPTION(oss.str());
2568  }
2569  loader->load(import_buffers, numRows);
2570 }
2571 
2572 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
2573 
2574 #define ARROW_THRIFT_THROW_NOT_OK(s) \
2575  do { \
2576  ::arrow::Status _s = (s); \
2577  if (UNLIKELY(!_s.ok())) { \
2578  TOmniSciException ex; \
2579  ex.error_msg = _s.ToString(); \
2580  LOG(ERROR) << s.ToString(); \
2581  throw ex; \
2582  } \
2583  } while (0)
2584 
2585 namespace {
2586 
2587 RecordBatchVector loadArrowStream(const std::string& stream) {
2588  RecordBatchVector batches;
2589  try {
2590  // TODO(wesm): Make this simpler in general, see ARROW-1600
2591  auto stream_buffer =
2592  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
2593  static_cast<int64_t>(stream.size()));
2594 
2595  arrow::io::BufferReader buf_reader(stream_buffer);
2596  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
2597  ARROW_ASSIGN_OR_THROW(batch_reader,
2598  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
2599 
2600  while (true) {
2601  std::shared_ptr<arrow::RecordBatch> batch;
2602  // Read batch (zero-copy) from the stream
2603  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
2604  if (batch == nullptr) {
2605  break;
2606  }
2607  batches.emplace_back(std::move(batch));
2608  }
2609  } catch (const std::exception& e) {
2610  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
2611  }
2612  return batches;
2613 }
2614 
2615 } // namespace
2616 
2617 void DBHandler::load_table_binary_arrow(const TSessionId& session,
2618  const std::string& table_name,
2619  const std::string& arrow_stream) {
2620  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2621  auto session_ptr = stdlog.getConstSessionInfo();
2622  check_read_only("load_table_binary_arrow");
2623 
2624  RecordBatchVector batches = loadArrowStream(arrow_stream);
2625 
2626  // Assuming have one batch for now
2627  if (batches.size() != 1) {
2628  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
2629  }
2630 
2631  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
2632 
2633  std::unique_ptr<import_export::Loader> loader;
2634  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2635  auto read_lock = prepare_columnar_loader(*session_ptr,
2636  table_name,
2637  static_cast<size_t>(batch->num_columns()),
2638  &loader,
2639  &import_buffers);
2640  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2641  session_ptr->getCatalog(), table_name);
2642 
2643  size_t numRows = 0;
2644  size_t col_idx = 0;
2645  try {
2646  for (auto cd : loader->get_column_descs()) {
2647  auto& array = *batch->column(col_idx);
2648  import_export::ArraySliceRange row_slice(0, array.length());
2649  numRows =
2650  import_buffers[col_idx]->add_arrow_values(cd, array, true, row_slice, nullptr);
2651  col_idx++;
2652  }
2653  } catch (const std::exception& e) {
2654  LOG(ERROR) << "Input exception thrown: " << e.what()
2655  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2656  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
2657  // other import paths
2658  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2659  }
2660  loader->load(import_buffers, numRows);
2661 }
2662 
2663 void DBHandler::load_table(const TSessionId& session,
2664  const std::string& table_name,
2665  const std::vector<TStringRow>& rows) {
2666  try {
2667  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2668  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2669  auto session_ptr = stdlog.getConstSessionInfo();
2670  check_read_only("load_table");
2671  auto& cat = session_ptr->getCatalog();
2672  const auto td_with_lock =
2674  cat, table_name, true);
2675  const auto td = td_with_lock();
2676  CHECK(td);
2677 
2678  if (g_cluster && !leaf_aggregator_.leafCount()) {
2679  // Sharded table rows need to be routed to the leaf by an aggregator.
2681  }
2682  check_table_load_privileges(*session_ptr, table_name);
2683  if (rows.empty()) {
2684  return;
2685  }
2686  std::unique_ptr<import_export::Loader> loader;
2687  if (leaf_aggregator_.leafCount() > 0) {
2688  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2689  } else {
2690  loader.reset(new import_export::Loader(cat, td));
2691  }
2692  auto col_descs = loader->get_column_descs();
2693  auto geo_physical_cols = std::count_if(
2694  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2695  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2696  // Subtracting 1 (rowid) until TableDescriptor is updated.
2697  if (rows.front().cols.size() != static_cast<size_t>(td->nColumns) -
2698  geo_physical_cols - (td->hasDeletedCol ? 2 : 1)) {
2699  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name +
2700  " (" + std::to_string(rows.front().cols.size()) + " vs " +
2701  std::to_string(td->nColumns - geo_physical_cols - 1) + ")");
2702  }
2703  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2704  for (auto cd : col_descs) {
2705  import_buffers.push_back(std::unique_ptr<import_export::TypedImportBuffer>(
2706  new import_export::TypedImportBuffer(cd, loader->getStringDict(cd))));
2707  }
2708  import_export::CopyParams copy_params;
2709  size_t rows_completed = 0;
2710  for (auto const& row : rows) {
2711  size_t import_idx = 0; // index into the TStringRow being loaded
2712  size_t col_idx = 0; // index into column description vector
2713  try {
2714  size_t skip_physical_cols = 0;
2715  for (auto cd : col_descs) {
2716  if (skip_physical_cols > 0) {
2717  if (!cd->isGeoPhyCol) {
2718  throw std::runtime_error("Unexpected physical column");
2719  }
2720  skip_physical_cols--;
2721  continue;
2722  }
2723  import_buffers[col_idx]->add_value(cd,
2724  row.cols[import_idx].str_val,
2725  row.cols[import_idx].is_null,
2726  copy_params);
2727  // Advance to the next column within the table
2728  col_idx++;
2729 
2730  if (cd->columnType.is_geometry()) {
2731  // Populate physical columns
2732  std::vector<double> coords, bounds;
2733  std::vector<int> ring_sizes, poly_rings;
2734  int render_group = 0;
2735  SQLTypeInfo ti{cd->columnType};
2737  !row.cols[import_idx].is_null ? row.cols[import_idx].str_val
2738  : std::string(),
2739  ti,
2740  coords,
2741  bounds,
2742  ring_sizes,
2743  poly_rings,
2744  false)) {
2745  throw std::runtime_error("Invalid geometry");
2746  }
2747  if (cd->columnType.get_type() != ti.get_type()) {
2748  throw std::runtime_error("Geometry type mismatch");
2749  }
2751  cd,
2752  import_buffers,
2753  col_idx,
2754  coords,
2755  bounds,
2756  ring_sizes,
2757  poly_rings,
2758  render_group);
2759  skip_physical_cols = cd->columnType.get_physical_cols();
2760  }
2761  // Advance to the next field within the row
2762  import_idx++;
2763  }
2764  rows_completed++;
2765  } catch (const std::exception& e) {
2766  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2767  import_buffers[col_idx_to_pop]->pop_value();
2768  }
2769  LOG(ERROR) << "Input exception thrown: " << e.what()
2770  << ". Row discarded, issue at column : " << (col_idx + 1)
2771  << " data :" << row;
2772  }
2773  }
2774  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2775  session_ptr->getCatalog(), table_name);
2776  loader->load(import_buffers, rows_completed);
2777  } catch (const std::exception& e) {
2778  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2779  }
2780 }
2781 
2782 char DBHandler::unescape_char(std::string str) {
2783  char out = str[0];
2784  if (str.size() == 2 && str[0] == '\\') {
2785  if (str[1] == 't') {
2786  out = '\t';
2787  } else if (str[1] == 'n') {
2788  out = '\n';
2789  } else if (str[1] == '0') {
2790  out = '\0';
2791  } else if (str[1] == '\'') {
2792  out = '\'';
2793  } else if (str[1] == '\\') {
2794  out = '\\';
2795  }
2796  }
2797  return out;
2798 }
2799 
2801  import_export::CopyParams copy_params;
2802  switch (cp.has_header) {
2803  case TImportHeaderRow::AUTODETECT:
2805  break;
2806  case TImportHeaderRow::NO_HEADER:
2808  break;
2809  case TImportHeaderRow::HAS_HEADER:
2811  break;
2812  default:
2813  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
2814  std::to_string((int)cp.has_header));
2815  break;
2816  }
2817  copy_params.quoted = cp.quoted;
2818  if (cp.delimiter.length() > 0) {
2819  copy_params.delimiter = unescape_char(cp.delimiter);
2820  } else {
2821  copy_params.delimiter = '\0';
2822  }
2823  if (cp.null_str.length() > 0) {
2824  copy_params.null_str = cp.null_str;
2825  }
2826  if (cp.quote.length() > 0) {
2827  copy_params.quote = unescape_char(cp.quote);
2828  }
2829  if (cp.escape.length() > 0) {
2830  copy_params.escape = unescape_char(cp.escape);
2831  }
2832  if (cp.line_delim.length() > 0) {
2833  copy_params.line_delim = unescape_char(cp.line_delim);
2834  }
2835  if (cp.array_delim.length() > 0) {
2836  copy_params.array_delim = unescape_char(cp.array_delim);
2837  }
2838  if (cp.array_begin.length() > 0) {
2839  copy_params.array_begin = unescape_char(cp.array_begin);
2840  }
2841  if (cp.array_end.length() > 0) {
2842  copy_params.array_end = unescape_char(cp.array_end);
2843  }
2844  if (cp.threads != 0) {
2845  copy_params.threads = cp.threads;
2846  }
2847  if (cp.s3_access_key.length() > 0) {
2848  copy_params.s3_access_key = cp.s3_access_key;
2849  }
2850  if (cp.s3_secret_key.length() > 0) {
2851  copy_params.s3_secret_key = cp.s3_secret_key;
2852  }
2853  if (cp.s3_region.length() > 0) {
2854  copy_params.s3_region = cp.s3_region;
2855  }
2856  if (cp.s3_endpoint.length() > 0) {
2857  copy_params.s3_endpoint = cp.s3_endpoint;
2858  }
2859  switch (cp.file_type) {
2860  case TFileType::POLYGON:
2862  break;
2863  case TFileType::DELIMITED:
2865  break;
2866 #ifdef ENABLE_IMPORT_PARQUET
2867  case TFileType::PARQUET:
2868  copy_params.file_type = import_export::FileType::PARQUET;
2869  break;
2870 #endif
2871  default:
2872  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
2873  std::to_string((int)cp.file_type));
2874  break;
2875  }
2876  switch (cp.geo_coords_encoding) {
2877  case TEncodingType::GEOINT:
2878  copy_params.geo_coords_encoding = kENCODING_GEOINT;
2879  break;
2880  case TEncodingType::NONE:
2881  copy_params.geo_coords_encoding = kENCODING_NONE;
2882  break;
2883  default:
2884  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
2885  std::to_string((int)cp.geo_coords_encoding));
2886  break;
2887  }
2888  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2889  switch (cp.geo_coords_type) {
2890  case TDatumType::GEOGRAPHY:
2891  copy_params.geo_coords_type = kGEOGRAPHY;
2892  break;
2893  case TDatumType::GEOMETRY:
2894  copy_params.geo_coords_type = kGEOMETRY;
2895  break;
2896  default:
2897  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
2898  std::to_string((int)cp.geo_coords_type));
2899  break;
2900  }
2901  switch (cp.geo_coords_srid) {
2902  case 4326:
2903  case 3857:
2904  case 900913:
2905  copy_params.geo_coords_srid = cp.geo_coords_srid;
2906  break;
2907  default:
2908  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
2909  std::to_string((int)cp.geo_coords_srid));
2910  break;
2911  }
2912  copy_params.sanitize_column_names = cp.sanitize_column_names;
2913  copy_params.geo_layer_name = cp.geo_layer_name;
2914  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2915  copy_params.geo_explode_collections = cp.geo_explode_collections;
2916  return copy_params;
2917 }
2918 
2920  TCopyParams copy_params;
2921  copy_params.delimiter = cp.delimiter;
2922  copy_params.null_str = cp.null_str;
2923  switch (cp.has_header) {
2925  copy_params.has_header = TImportHeaderRow::AUTODETECT;
2926  break;
2928  copy_params.has_header = TImportHeaderRow::NO_HEADER;
2929  break;
2931  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
2932  break;
2933  default:
2934  CHECK(false);
2935  break;
2936  }
2937  copy_params.quoted = cp.quoted;
2938  copy_params.quote = cp.quote;
2939  copy_params.escape = cp.escape;
2940  copy_params.line_delim = cp.line_delim;
2941  copy_params.array_delim = cp.array_delim;
2942  copy_params.array_begin = cp.array_begin;
2943  copy_params.array_end = cp.array_end;
2944  copy_params.threads = cp.threads;
2945  copy_params.s3_access_key = cp.s3_access_key;
2946  copy_params.s3_secret_key = cp.s3_secret_key;
2947  copy_params.s3_region = cp.s3_region;
2948  copy_params.s3_endpoint = cp.s3_endpoint;
2949  switch (cp.file_type) {
2951  copy_params.file_type = TFileType::POLYGON;
2952  break;
2953  default:
2954  copy_params.file_type = TFileType::DELIMITED;
2955  break;
2956  }
2957  switch (cp.geo_coords_encoding) {
2958  case kENCODING_GEOINT:
2959  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
2960  break;
2961  default:
2962  copy_params.geo_coords_encoding = TEncodingType::NONE;
2963  break;
2964  }
2965  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2966  switch (cp.geo_coords_type) {
2967  case kGEOGRAPHY:
2968  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
2969  break;
2970  case kGEOMETRY:
2971  copy_params.geo_coords_type = TDatumType::GEOMETRY;
2972  break;
2973  default:
2974  CHECK(false);
2975  break;
2976  }
2977  copy_params.geo_coords_srid = cp.geo_coords_srid;
2978  copy_params.sanitize_column_names = cp.sanitize_column_names;
2979  copy_params.geo_layer_name = cp.geo_layer_name;
2980  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2981  copy_params.geo_explode_collections = cp.geo_explode_collections;
2982  return copy_params;
2983 }
2984 
2985 namespace {
2986 void add_vsi_network_prefix(std::string& path) {
2987  // do we support network file access?
2988  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
2989 
2990  // modify head of filename based on source location
2991  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
2992  if (!gdal_network) {
2994  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
2995  }
2996  // invoke GDAL CURL virtual file reader
2997  path = "/vsicurl/" + path;
2998  } else if (boost::istarts_with(path, "s3://")) {
2999  if (!gdal_network) {
3001  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
3002  }
3003  // invoke GDAL S3 virtual file reader
3004  boost::replace_first(path, "s3://", "/vsis3/");
3005  }
3006 }
3007 
3008 void add_vsi_geo_prefix(std::string& path) {
3009  // single gzip'd file (not an archive)?
3010  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3011  path = "/vsigzip/" + path;
3012  }
3013 }
3014 
3015 void add_vsi_archive_prefix(std::string& path) {
3016  // check for compressed file or file bundle
3017  if (boost::iends_with(path, ".zip")) {
3018  // zip archive
3019  path = "/vsizip/" + path;
3020  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3021  boost::iends_with(path, ".tar.gz")) {
3022  // tar archive (compressed or uncompressed)
3023  path = "/vsitar/" + path;
3024  }
3025 }
3026 
3027 std::string remove_vsi_prefixes(const std::string& path_in) {
3028  std::string path(path_in);
3029 
3030  // these will be first
3031  if (boost::istarts_with(path, "/vsizip/")) {
3032  boost::replace_first(path, "/vsizip/", "");
3033  } else if (boost::istarts_with(path, "/vsitar/")) {
3034  boost::replace_first(path, "/vsitar/", "");
3035  } else if (boost::istarts_with(path, "/vsigzip/")) {
3036  boost::replace_first(path, "/vsigzip/", "");
3037  }
3038 
3039  // then these
3040  if (boost::istarts_with(path, "/vsicurl/")) {
3041  boost::replace_first(path, "/vsicurl/", "");
3042  } else if (boost::istarts_with(path, "/vsis3/")) {
3043  boost::replace_first(path, "/vsis3/", "s3://");
3044  }
3045 
3046  return path;
3047 }
3048 
3049 bool path_is_relative(const std::string& path) {
3050  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
3051  boost::istarts_with(path, "https://")) {
3052  return false;
3053  }
3054  return !boost::filesystem::path(path).is_absolute();
3055 }
3056 
3057 bool path_has_valid_filename(const std::string& path) {
3058  auto filename = boost::filesystem::path(path).filename().string();
3059  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
3060  return false;
3061  }
3062  return true;
3063 }
3064 
3065 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
3066  if (!path_has_valid_filename(path)) {
3067  return false;
3068  }
3069  if (include_gz) {
3070  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
3071  return true;
3072  }
3073  }
3074  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
3075  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
3076  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
3077  boost::iends_with(path, ".gdb.zip")) {
3078  return true;
3079  }
3080  return false;
3081 }
3082 
3083 bool is_a_supported_archive_file(const std::string& path) {
3084  if (!path_has_valid_filename(path)) {
3085  return false;
3086  }
3087  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
3088  return true;
3089  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3090  boost::iends_with(path, ".tar.gz")) {
3091  return true;
3092  }
3093  return false;
3094 }
3095 
3096 std::string find_first_geo_file_in_archive(const std::string& archive_path,
3097  const import_export::CopyParams& copy_params) {
3098  // get the recursive list of all files in the archive
3099  std::vector<std::string> files =
3100  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
3101 
3102  // report the list
3103  LOG(INFO) << "Found " << files.size() << " files in Archive "
3104  << remove_vsi_prefixes(archive_path);
3105  for (const auto& file : files) {
3106  LOG(INFO) << " " << file;
3107  }
3108 
3109  // scan the list for the first candidate file
3110  bool found_suitable_file = false;
3111  std::string file_name;
3112  for (const auto& file : files) {
3113  if (is_a_supported_geo_file(file, false)) {
3114  file_name = file;
3115  found_suitable_file = true;
3116  break;
3117  }
3118  }
3119 
3120  // if we didn't find anything
3121  if (!found_suitable_file) {
3122  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
3123  remove_vsi_prefixes(archive_path);
3124  file_name.clear();
3125  }
3126 
3127  // done
3128  return file_name;
3129 }
3130 
3131 bool is_local_file(const std::string& file_path) {
3132  return (!boost::istarts_with(file_path, "s3://") &&
3133  !boost::istarts_with(file_path, "http://") &&
3134  !boost::istarts_with(file_path, "https://"));
3135 }
3136 
3137 void validate_import_file_path_if_local(const std::string& file_path) {
3138  if (is_local_file(file_path)) {
3140  }
3141 }
3142 } // namespace
3143 
3144 void DBHandler::detect_column_types(TDetectResult& _return,
3145  const TSessionId& session,
3146  const std::string& file_name_in,
3147  const TCopyParams& cp) {
3148  auto stdlog = STDLOG(get_session_ptr(session));
3149  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3150  check_read_only("detect_column_types");
3151 
3153 
3154  std::string file_name{file_name_in};
3155  if (path_is_relative(file_name)) {
3156  // assume relative paths are relative to data_path / mapd_import / <session>
3157  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3158  boost::filesystem::path(file_name).filename();
3159  file_name = file_path.string();
3160  }
3162 
3163  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
3164  if (copy_params.file_type == import_export::FileType::POLYGON) {
3165  if (is_a_supported_geo_file(file_name, true)) {
3166  // prepare to detect geo file directly
3167  add_vsi_network_prefix(file_name);
3168  add_vsi_geo_prefix(file_name);
3169  } else if (is_a_supported_archive_file(file_name)) {
3170  // find the archive file
3171  add_vsi_network_prefix(file_name);
3172  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
3173  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3174  }
3175  // find geo file in archive
3176  add_vsi_archive_prefix(file_name);
3177  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3178  // prepare to detect that geo file
3179  if (geo_file.size()) {
3180  file_name = file_name + std::string("/") + geo_file;
3181  }
3182  } else {
3183  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
3184  file_name_in);
3185  }
3186  }
3187 
3188  auto file_path = boost::filesystem::path(file_name);
3189  // can be a s3 url
3190  if (!boost::istarts_with(file_name, "s3://")) {
3191  if (!boost::filesystem::path(file_name).is_absolute()) {
3192  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3193  boost::filesystem::path(file_name).filename();
3194  file_name = file_path.string();
3195  }
3196 
3197  if (copy_params.file_type == import_export::FileType::POLYGON) {
3198  // check for geo file
3199  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3200  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3201  }
3202  } else {
3203  // check for regular file
3204  if (!boost::filesystem::exists(file_path)) {
3205  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3206  }
3207  }
3208  }
3209 
3210  try {
3212 #ifdef ENABLE_IMPORT_PARQUET
3213  || (copy_params.file_type == import_export::FileType::PARQUET)
3214 #endif
3215  ) {
3216  import_export::Detector detector(file_path, copy_params);
3217  std::vector<SQLTypes> best_types = detector.best_sqltypes;
3218  std::vector<EncodingType> best_encodings = detector.best_encodings;
3219  std::vector<std::string> headers = detector.get_headers();
3220  copy_params = detector.get_copy_params();
3221 
3222  _return.copy_params = copyparams_to_thrift(copy_params);
3223  _return.row_set.row_desc.resize(best_types.size());
3224  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
3225  TColumnType col;
3226  SQLTypes t = best_types[col_idx];
3227  EncodingType encodingType = best_encodings[col_idx];
3228  SQLTypeInfo ti(t, false, encodingType);
3229  if (IS_GEO(t)) {
3230  // set this so encoding_to_thrift does the right thing
3231  ti.set_compression(copy_params.geo_coords_encoding);
3232  // fill in these directly
3233  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3234  col.col_type.scale = copy_params.geo_coords_srid;
3235  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3236  }
3237  col.col_type.type = type_to_thrift(ti);
3238  col.col_type.encoding = encoding_to_thrift(ti);
3239  if (copy_params.sanitize_column_names) {
3240  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3241  } else {
3242  col.col_name = headers[col_idx];
3243  }
3244  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3245  _return.row_set.row_desc[col_idx] = col;
3246  }
3247  size_t num_samples = 100;
3248  auto sample_data = detector.get_sample_rows(num_samples);
3249 
3250  TRow sample_row;
3251  for (auto row : sample_data) {
3252  sample_row.cols.clear();
3253  for (const auto& s : row) {
3254  TDatum td;
3255  td.val.str_val = s;
3256  td.is_null = s.empty();
3257  sample_row.cols.push_back(td);
3258  }
3259  _return.row_set.rows.push_back(sample_row);
3260  }
3261  } else if (copy_params.file_type == import_export::FileType::POLYGON) {
3262  // @TODO simon.eves get this from somewhere!
3263  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
3264 
3265  check_geospatial_files(file_path, copy_params);
3266  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
3267  file_path.string(), geoColumnName, copy_params);
3268  for (auto cd : cds) {
3269  if (copy_params.sanitize_column_names) {
3270  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
3271  }
3272  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
3273  }
3274  std::map<std::string, std::vector<std::string>> sample_data;
3276  file_path.string(), geoColumnName, sample_data, 100, copy_params);
3277  if (sample_data.size() > 0) {
3278  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
3279  TRow sample_row;
3280  for (auto cd : cds) {
3281  TDatum td;
3282  td.val.str_val = sample_data[cd.sourceName].at(i);
3283  td.is_null = td.val.str_val.empty();
3284  sample_row.cols.push_back(td);
3285  }
3286  _return.row_set.rows.push_back(sample_row);
3287  }
3288  }
3289  _return.copy_params = copyparams_to_thrift(copy_params);
3290  }
3291  } catch (const std::exception& e) {
3292  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
3293  }
3294 }
3295 
3296 void DBHandler::render_vega(TRenderResult& _return,
3297  const TSessionId& session,
3298  const int64_t widget_id,
3299  const std::string& vega_json,
3300  const int compression_level,
3301  const std::string& nonce) {
3302  auto stdlog = STDLOG(get_session_ptr(session),
3303  "widget_id",
3304  widget_id,
3305  "compression_level",
3306  compression_level,
3307  "vega_json",
3308  vega_json,
3309  "nonce",
3310  nonce);
3311  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3312  stdlog.appendNameValuePairs("nonce", nonce);
3313  auto session_ptr = stdlog.getConstSessionInfo();
3314  if (!render_handler_) {
3315  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
3316  }
3317 
3318  _return.total_time_ms = measure<>::execution([&]() {
3319  try {
3320  render_handler_->render_vega(
3321  _return,
3322  std::make_shared<Catalog_Namespace::SessionInfo>(*session_ptr),
3323  widget_id,
3324  vega_json,
3325  compression_level,
3326  nonce);
3327  } catch (std::exception& e) {
3328  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3329  }
3330  });
3331 }
3332 
3334  int32_t dashboard_id,
3335  AccessPrivileges requestedPermissions) {
3336  DBObject object(dashboard_id, DashboardDBObjectType);
3337  auto& catalog = session_info.getCatalog();
3338  auto& user = session_info.get_currentUser();
3339  object.loadKey(catalog);
3340  object.setPrivileges(requestedPermissions);
3341  std::vector<DBObject> privs = {object};
3342  return SysCatalog::instance().checkPrivileges(user, privs);
3343 }
3344 
3345 // dashboards
3346 void DBHandler::get_dashboard(TDashboard& dashboard,
3347  const TSessionId& session,
3348  const int32_t dashboard_id) {
3349  auto stdlog = STDLOG(get_session_ptr(session));
3350  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3351  auto session_ptr = stdlog.getConstSessionInfo();
3352  auto const& cat = session_ptr->getCatalog();
3354  auto dash = cat.getMetadataForDashboard(dashboard_id);
3355  if (!dash) {
3356  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
3357  " doesn't exist");
3358  }
3360  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3361  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
3362  std::to_string(dashboard_id));
3363  }
3364  user_meta.userName = "";
3365  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3366  auto objects_list = SysCatalog::instance().getMetadataForObject(
3367  cat.getCurrentDB().dbId,
3368  static_cast<int>(DBObjectType::DashboardDBObjectType),
3369  dashboard_id);
3370  dashboard.dashboard_name = dash->dashboardName;
3371  dashboard.dashboard_state = dash->dashboardState;
3372  dashboard.image_hash = dash->imageHash;
3373  dashboard.update_time = dash->updateTime;
3374  dashboard.dashboard_metadata = dash->dashboardMetadata;
3375  dashboard.dashboard_owner = dash->user;
3376  dashboard.dashboard_id = dash->dashboardId;
3377  if (objects_list.empty() ||
3378  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3379  dashboard.is_dash_shared = false;
3380  } else {
3381  dashboard.is_dash_shared = true;
3382  }
3383 }
3384 
3385 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
3386  const TSessionId& session) {
3387  auto stdlog = STDLOG(get_session_ptr(session));
3388  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3389  auto session_ptr = stdlog.getConstSessionInfo();
3390  auto const& cat = session_ptr->getCatalog();
3392  const auto dashes = cat.getAllDashboardsMetadata();
3393  user_meta.userName = "";
3394  for (const auto d : dashes) {
3395  SysCatalog::instance().getMetadataForUserById(d->userId, user_meta);
3397  *session_ptr, d->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3398  auto objects_list = SysCatalog::instance().getMetadataForObject(
3399  cat.getCurrentDB().dbId,
3400  static_cast<int>(DBObjectType::DashboardDBObjectType),
3401  d->dashboardId);
3402  TDashboard dash;
3403  dash.dashboard_name = d->dashboardName;
3404  dash.image_hash = d->imageHash;
3405  dash.update_time = d->updateTime;
3406  dash.dashboard_metadata = d->dashboardMetadata;
3407  dash.dashboard_id = d->dashboardId;
3408  dash.dashboard_owner = d->user;
3409  // dashboardState is intentionally not populated here
3410  // for payload reasons
3411  // use get_dashboard call to get state
3412  if (objects_list.empty() ||
3413  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3414  dash.is_dash_shared = false;
3415  } else {
3416  dash.is_dash_shared = true;
3417  }
3418  dashboards.push_back(dash);
3419  }
3420  }
3421 }
3422 
3423 int32_t DBHandler::create_dashboard(const TSessionId& session,
3424  const std::string& dashboard_name,
3425  const std::string& dashboard_state,
3426  const std::string& image_hash,
3427  const std::string& dashboard_metadata) {
3428  auto stdlog = STDLOG(get_session_ptr(session));
3429  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3430  auto session_ptr = stdlog.getConstSessionInfo();
3431  check_read_only("create_dashboard");
3432  auto& cat = session_ptr->getCatalog();
3433 
3434  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
3436  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
3437  }
3438 
3439  auto dash = cat.getMetadataForDashboard(
3440  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
3441  if (dash) {
3442  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
3443  }
3444 
3446  dd.dashboardName = dashboard_name;
3447  dd.dashboardState = dashboard_state;
3448  dd.imageHash = image_hash;
3449  dd.dashboardMetadata = dashboard_metadata;
3450  dd.userId = session_ptr->get_currentUser().userId;
3451  dd.user = session_ptr->get_currentUser().userName;
3452 
3453  try {
3454  auto id = cat.createDashboard(dd);
3455  // TODO: transactionally unsafe
3456  SysCatalog::instance().createDBObject(
3457  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
3458  return id;
3459  } catch (const std::exception& e) {
3460  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3461  }
3462 }
3463 
3464 void DBHandler::replace_dashboard(const TSessionId& session,
3465  const int32_t dashboard_id,
3466  const std::string& dashboard_name,
3467  const std::string& dashboard_owner,
3468  const std::string& dashboard_state,
3469  const std::string& image_hash,
3470  const std::string& dashboard_metadata) {
3471  auto stdlog = STDLOG(get_session_ptr(session));
3472  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3473  auto session_ptr = stdlog.getConstSessionInfo();
3474  check_read_only("replace_dashboard");
3475  auto& cat = session_ptr->getCatalog();
3476 
3478  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
3479  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
3480  }
3481 
3483  dd.dashboardName = dashboard_name;
3484  dd.dashboardState = dashboard_state;
3485  dd.imageHash = image_hash;
3486  dd.dashboardMetadata = dashboard_metadata;
3488  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
3489  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
3490  " does not exist");
3491  }
3492  dd.userId = user.userId;
3493  dd.user = dashboard_owner;
3494  dd.dashboardId = dashboard_id;
3495 
3496  try {
3497  cat.replaceDashboard(dd);
3498  } catch (const std::exception& e) {
3499  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3500  }
3501 }
3502 
3503 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
3504  delete_dashboards(session, {dashboard_id});
3505 }
3506 
3507 void DBHandler::delete_dashboards(const TSessionId& session,
3508  const std::vector<int32_t>& dashboard_ids) {
3509  auto stdlog = STDLOG(get_session_ptr(session));
3510  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3511  auto session_ptr = stdlog.getConstSessionInfo();
3512  check_read_only("delete_dashboards");
3513  auto& cat = session_ptr->getCatalog();
3514  // Checks will be performed in catalog
3515  try {
3516  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
3517  } catch (const std::exception& e) {
3518  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3519  }
3520 }
3521 
3522 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
3523  int32_t dashboard_id,
3524  std::vector<std::string> groups) {
3525  const auto session_info = get_session_copy(session);
3526  auto& cat = session_info.getCatalog();
3527  auto dash = cat.getMetadataForDashboard(dashboard_id);
3528  if (!dash) {
3529  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3530  " does not exist");
3531  } else if (session_info.get_currentUser().userId != dash->userId &&
3532  !session_info.get_currentUser().isSuper) {
3533  throw std::runtime_error(
3534  "User should be either owner of dashboard or super user to share/unshare it");
3535  }
3536  std::vector<std::string> valid_groups;
3538  for (auto& group : groups) {
3539  user_meta.isSuper = false; // initialize default flag
3540  if (!SysCatalog::instance().getGrantee(group)) {
3541  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
3542  } else if (!user_meta.isSuper) {
3543  valid_groups.push_back(group);
3544  }
3545  }
3546  return valid_groups;
3547 }
3548 
3549 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
3550  for (auto const& group : groups) {
3551  if (!SysCatalog::instance().getGrantee(group)) {
3552  THROW_MAPD_EXCEPTION("Exception: User/Role '" + group + "' does not exist");
3553  }
3554  }
3555 }
3556 
3558  const Catalog_Namespace::SessionInfo& session_info,
3559  const std::vector<int32_t>& dashboard_ids) {
3560  auto& cat = session_info.getCatalog();
3561  std::map<std::string, std::list<int32_t>> errors;
3562  for (auto const& dashboard_id : dashboard_ids) {
3563  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
3564  if (!dashboard) {
3565  errors["Dashboard id does not exist"].push_back(dashboard_id);
3566  } else if (session_info.get_currentUser().userId != dashboard->userId &&
3567  !session_info.get_currentUser().isSuper) {
3568  errors["User should be either owner of dashboard or super user to share/unshare it"]
3569  .push_back(dashboard_id);
3570  }
3571  }
3572  if (!errors.empty()) {
3573  std::stringstream error_stream;
3574  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
3575  for (const auto& [error, id_list] : errors) {
3576  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
3577  }
3578  THROW_MAPD_EXCEPTION(error_stream.str());
3579  }
3580 }
3581 
3582 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
3583  const std::vector<int32_t>& dashboard_ids,
3584  const std::vector<std::string>& groups,
3585  const TDashboardPermissions& permissions,
3586  const bool do_share) {
3587  auto stdlog = STDLOG(get_session_ptr(session));
3588  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3589  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
3590  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3591  !permissions.view_) {
3592  THROW_MAPD_EXCEPTION("At least one privilege should be assigned for " +
3593  std::string(do_share ? "grants" : "revokes"));
3594  }
3595  auto session_ptr = stdlog.getConstSessionInfo();
3596  auto const& catalog = session_ptr->getCatalog();
3597  auto& sys_catalog = SysCatalog::instance();
3598  validateGroups(groups);
3599  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
3600  std::vector<DBObject> batch_objects;
3601  for (auto const& dashboard_id : dashboard_ids) {
3602  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
3603  AccessPrivileges privs;
3604  if (permissions.delete_) {
3606  }
3607  if (permissions.create_) {
3609  }
3610  if (permissions.edit_) {
3612  }
3613  if (permissions.view_) {
3615  }
3616  object.setPrivileges(privs);
3617  batch_objects.push_back(object);
3618  }
3619  if (do_share) {
3620  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
3621  } else {
3622  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
3623  }
3624 }
3625 
3626 void DBHandler::share_dashboards(const TSessionId& session,
3627  const std::vector<int32_t>& dashboard_ids,
3628  const std::vector<std::string>& groups,
3629  const TDashboardPermissions& permissions) {
3630  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
3631 }
3632 
3633 // NOOP: Grants not available for objects as of now
3634 void DBHandler::share_dashboard(const TSessionId& session,
3635  const int32_t dashboard_id,
3636  const std::vector<std::string>& groups,
3637  const std::vector<std::string>& objects,
3638  const TDashboardPermissions& permissions,
3639  const bool grant_role = false) {
3640  share_dashboards(session, {dashboard_id}, groups, permissions);
3641 }
3642 
3643 void DBHandler::unshare_dashboards(const TSessionId& session,
3644  const std::vector<int32_t>& dashboard_ids,
3645  const std::vector<std::string>& groups,
3646  const TDashboardPermissions& permissions) {
3647  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
3648 }
3649 
3650 void DBHandler::unshare_dashboard(const TSessionId& session,
3651  const int32_t dashboard_id,
3652  const std::vector<std::string>& groups,
3653  const std::vector<std::string>& objects,
3654  const TDashboardPermissions& permissions) {
3655  unshare_dashboards(session, {dashboard_id}, groups, permissions);
3656 }
3657 
3659  std::vector<TDashboardGrantees>& dashboard_grantees,
3660  const TSessionId& session,
3661  int32_t dashboard_id) {
3662  auto stdlog = STDLOG(get_session_ptr(session));
3663  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3664  auto session_ptr = stdlog.getConstSessionInfo();
3665  auto const& cat = session_ptr->getCatalog();
3667  auto dash = cat.getMetadataForDashboard(dashboard_id);
3668  if (!dash) {
3669  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3670  " does not exist");
3671  } else if (session_ptr->get_currentUser().userId != dash->userId &&
3672  !session_ptr->get_currentUser().isSuper) {
3674  "User should be either owner of dashboard or super user to access grantees");
3675  }
3676  std::vector<ObjectRoleDescriptor*> objectsList;
3677  objectsList = SysCatalog::instance().getMetadataForObject(
3678  cat.getCurrentDB().dbId,
3679  static_cast<int>(DBObjectType::DashboardDBObjectType),
3680  dashboard_id); // By default objecttypecan be only dashabaords
3681  user_meta.userId = -1;
3682  user_meta.userName = "";
3683  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3684  for (auto object : objectsList) {
3685  if (user_meta.userName == object->roleName) {
3686  // Mask owner
3687  continue;
3688  }
3689  TDashboardGrantees grantee;
3690  TDashboardPermissions perm;
3691  grantee.name = object->roleName;
3692  grantee.is_user = object->roleType;
3693  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3694  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3695  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3696  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3697  grantee.permissions = perm;
3698  dashboard_grantees.push_back(grantee);
3699  }
3700 }
3701 
3702 void DBHandler::create_link(std::string& _return,
3703  const TSessionId& session,
3704  const std::string& view_state,
3705  const std::string& view_metadata) {
3706  auto stdlog = STDLOG(get_session_ptr(session));
3707  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3708  auto session_ptr = stdlog.getConstSessionInfo();
3709  // check_read_only("create_link");
3710  auto& cat = session_ptr->getCatalog();
3711 
3712  LinkDescriptor ld;
3713  ld.userId = session_ptr->get_currentUser().userId;
3714  ld.viewState = view_state;
3715  ld.viewMetadata = view_metadata;
3716 
3717  try {
3718  _return = cat.createLink(ld, 6);
3719  } catch (const std::exception& e) {
3720  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3721  }
3722 }
3723 
3725  const std::string& name,
3726  const bool is_array) {
3727  TColumnType ct;
3728  ct.col_name = name;
3729  ct.col_type.type = type;
3730  ct.col_type.is_array = is_array;
3731  return ct;
3732 }
3733 
3734 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
3735  const import_export::CopyParams& copy_params) {
3736  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
3737  if (std::find(shp_ext.begin(),
3738  shp_ext.end(),
3739  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
3740  shp_ext.end()) {
3741  for (auto ext : shp_ext) {
3742  auto aux_file = file_path;
3744  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
3745  copy_params) &&
3747  aux_file.replace_extension(ext).string(), copy_params)) {
3748  throw std::runtime_error("required file for shapefile does not exist: " +
3749  aux_file.filename().string());
3750  }
3751  }
3752  }
3753 }
3754 
3755 void DBHandler::create_table(const TSessionId& session,
3756  const std::string& table_name,
3757  const TRowDescriptor& rd,
3758  const TFileType::type file_type,
3759  const TCreateParams& create_params) {
3760  auto stdlog = STDLOG("table_name", table_name);
3761  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3762  check_read_only("create_table");
3763 
3764  if (ImportHelpers::is_reserved_name(table_name)) {
3765  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
3766  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
3767  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
3768  }
3769 
3770  auto rds = rd;
3771 
3772  // no longer need to manually add the poly column for a TFileType::POLYGON table
3773  // a column of the correct geo type has already been added
3774  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
3775 
3776  std::string stmt{"CREATE TABLE " + table_name};
3777  std::vector<std::string> col_stmts;
3778 
3779  for (auto col : rds) {
3780  if (ImportHelpers::is_reserved_name(col.col_name)) {
3781  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
3782  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
3783  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
3784  }
3785  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
3786  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
3787  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
3788  " for column: " + col.col_name);
3789  }
3790 
3791  if (col.col_type.type == TDatumType::DECIMAL) {
3792  // if no precision or scale passed in set to default 14,7
3793  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
3794  col.col_type.precision = 14;
3795  col.col_type.scale = 7;
3796  }
3797  }
3798 
3799  std::string col_stmt;
3800  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
3801 
3802  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
3803  // `nullable` argument, leading this to default to false. Uncomment for v2.
3804  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
3805 
3806  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
3807  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
3808  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
3809  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
3810  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
3811  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
3812  }
3813  } else if (col.col_type.type == TDatumType::STR) {
3814  // non DICT encoded strings
3815  col_stmt.append(" ENCODING NONE");
3816  } else if (col.col_type.type == TDatumType::POINT ||
3817  col.col_type.type == TDatumType::LINESTRING ||
3818  col.col_type.type == TDatumType::POLYGON ||
3819  col.col_type.type == TDatumType::MULTIPOLYGON) {
3820  // non encoded compressable geo
3821  if (col.col_type.scale == 4326) {
3822  col_stmt.append(" ENCODING NONE");
3823  }
3824  }
3825  col_stmts.push_back(col_stmt);
3826  }
3827 
3828  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
3829 
3830  if (create_params.is_replicated) {
3831  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
3832  }
3833 
3834  stmt.append(";");
3835 
3836  TQueryResult ret;
3837  sql_execute(ret, session, stmt, true, "", -1, -1);
3838 }
3839 
3840 void DBHandler::import_table(const TSessionId& session,
3841  const std::string& table_name,
3842  const std::string& file_name_in,
3843  const TCopyParams& cp) {
3844  try {
3845  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3846  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3847  auto session_ptr = stdlog.getConstSessionInfo();
3848  check_read_only("import_table");
3849  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
3850 
3851  auto& cat = session_ptr->getCatalog();
3852  const auto td_with_lock =
3854  cat, table_name);
3855  const auto td = td_with_lock();
3856  CHECK(td);
3857  check_table_load_privileges(*session_ptr, table_name);
3858 
3859  std::string file_name{file_name_in};
3860  auto file_path = boost::filesystem::path(file_name);
3862  if (!boost::istarts_with(file_name, "s3://")) {
3863  if (!boost::filesystem::path(file_name).is_absolute()) {
3864  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3865  boost::filesystem::path(file_name).filename();
3866  file_name = file_path.string();
3867  }
3868  if (!boost::filesystem::exists(file_path)) {
3869  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3870  }
3871  }
3873 
3874  // TODO(andrew): add delimiter detection to Importer
3875  if (copy_params.delimiter == '\0') {
3876  copy_params.delimiter = ',';
3877  if (boost::filesystem::extension(file_path) == ".tsv") {
3878  copy_params.delimiter = '\t';
3879  }
3880  }
3881 
3882  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3883  session_ptr->getCatalog(), table_name);
3884  std::unique_ptr<import_export::Importer> importer;
3885  if (leaf_aggregator_.leafCount() > 0) {
3886  importer.reset(new import_export::Importer(
3887  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
3888  file_path.string(),
3889  copy_params));
3890  } else {
3891  importer.reset(
3892  new import_export::Importer(cat, td, file_path.string(), copy_params));
3893  }
3894  auto ms = measure<>::execution([&]() { importer->import(); });
3895  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
3896  } catch (const std::exception& e) {
3897  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
3898  }
3899 }
3900 
3901 namespace {
3902 
3903 // helper functions for error checking below
3904 // these would usefully be added as methods of TDatumType
3905 // but that's not possible as it's auto-generated by Thrift
3906 
3908  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
3909  t == TDatumType::LINESTRING || t == TDatumType::POINT);
3910 }
3911 
3912 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3913 
3914 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
3915  std::stringstream ss;
3916  ss << t;
3917  return ss.str();
3918 }
3919 
3920 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
3921  std::string result;
3922  switch (p) {
3923  case SQLTypes::kGEOGRAPHY:
3924  result = "GEOGRAPHY";
3925  break;
3926  case SQLTypes::kGEOMETRY:
3927  result = "GEOMETRY";
3928  break;
3929  default:
3930  result = "INVALID";
3931  break;
3932  }
3933  return result;
3934 }
3935 
3936 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
3937  std::stringstream ss;
3938  ss << t;
3939  return ss.str();
3940 }
3941 
3942 #endif
3943 
3944 } // namespace
3945 
3946 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
3947  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
3948  "' to table '" + table_name + "'. Column '" + cd->columnName + \
3949  "' " + attr + " mismatch (got '" + got + "', expected '" + \
3950  expected + "')");
3951 
3952 void DBHandler::import_geo_table(const TSessionId& session,
3953  const std::string& table_name,
3954  const std::string& file_name_in,
3955  const TCopyParams& cp,
3956  const TRowDescriptor& row_desc,
3957  const TCreateParams& create_params) {
3958  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3959  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3960  auto session_ptr = stdlog.getConstSessionInfo();
3961  check_read_only("import_table");
3962  auto& cat = session_ptr->getCatalog();
3963 
3965 
3966  std::string file_name{file_name_in};
3967 
3968  if (path_is_relative(file_name)) {
3969  // assume relative paths are relative to data_path / mapd_import / <session>
3970  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3971  boost::filesystem::path(file_name).filename();
3972  file_name = file_path.string();
3973  }
3975 
3976  if (is_a_supported_geo_file(file_name, true)) {
3977  // prepare to load geo file directly
3978  add_vsi_network_prefix(file_name);
3979  add_vsi_geo_prefix(file_name);
3980  } else if (is_a_supported_archive_file(file_name)) {
3981  // find the archive file
3982  add_vsi_network_prefix(file_name);
3983  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
3984  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3985  }
3986  // find geo file in archive
3987  add_vsi_archive_prefix(file_name);
3988  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3989  // prepare to load that geo file
3990  if (geo_file.size()) {
3991  file_name = file_name + std::string("/") + geo_file;
3992  }
3993  } else {
3994  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
3995  file_name_in);
3996  }
3997 
3998  // log what we're about to try to do
3999  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
4000  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
4001 
4002  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
4003  auto file_path = boost::filesystem::path(file_name);
4004  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4005  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
4006  }
4007 
4008  // use GDAL to check any dependent files exist (ditto)
4009  try {
4010  check_geospatial_files(file_path, copy_params);
4011  } catch (const std::exception& e) {
4012  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4013  }
4014 
4015  // get layer info and deconstruct
4016  // in general, we will get a combination of layers of these four types:
4017  // EMPTY: no rows, report and skip
4018  // GEO: create a geo table from this
4019  // NON_GEO: create a regular table from this
4020  // UNSUPPORTED_GEO: report and skip
4021  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
4022  try {
4023  layer_info = import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4024  } catch (const std::exception& e) {
4025  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4026  }
4027 
4028  // categorize the results
4029  using LayerNameToContentsMap =
4030  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
4031  LayerNameToContentsMap load_layers;
4032  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
4033  for (const auto& layer : layer_info) {
4034  switch (layer.contents) {
4036  LOG(INFO) << "import_geo_table: '" << layer.name
4037  << "' (will import as geo table)";
4038  load_layers[layer.name] = layer.contents;
4039  break;
4041  LOG(INFO) << "import_geo_table: '" << layer.name
4042  << "' (will import as regular table)";
4043  load_layers[layer.name] = layer.contents;
4044  break;
4046  LOG(WARNING) << "import_geo_table: '" << layer.name
4047  << "' (will not import, unsupported geo type)";
4048  break;
4050  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
4051  break;
4052  default:
4053  break;
4054  }
4055  }
4056 
4057  // if nothing is loadable, stop now
4058  if (load_layers.size() == 0) {
4059  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
4060  }
4061 
4062  // if we've been given an explicit layer name, check that it exists and is loadable
4063  // scan the original list, as it may exist but not have been gathered as loadable
4064  if (copy_params.geo_layer_name.size()) {
4065  bool found = false;
4066  for (const auto& layer : layer_info) {
4067  if (copy_params.geo_layer_name == layer.name) {
4070  // forget all the other layers and just load this one
4071  load_layers.clear();
4072  load_layers[layer.name] = layer.contents;
4073  found = true;
4074  break;
4075  } else if (layer.contents ==
4077  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4078  copy_params.geo_layer_name +
4079  "' has unsupported geo type!");
4080  } else if (layer.contents ==
4082  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4083  copy_params.geo_layer_name + "' is empty!");
4084  }
4085  }
4086  }
4087  if (!found) {
4088  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4089  copy_params.geo_layer_name + "' not found!");
4090  }
4091  }
4092 
4093  // Immerse import of multiple layers is not yet supported
4094  // @TODO fix this!
4095  if (row_desc.size() > 0 && load_layers.size() > 1) {
4097  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
4098  }
4099 
4100  // one definition of layer table name construction
4101  // we append the layer name if we're loading more than one table
4102  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
4103  const std::string& layer_name) {
4104  if (load_layers.size() > 1) {
4105  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
4106  if (sanitized_layer_name != layer_name) {
4107  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
4108  << sanitized_layer_name << "' for table name";
4109  }
4110  return table_name + "_" + sanitized_layer_name;
4111  }
4112  return table_name;
4113  };
4114 
4115  // if we're importing multiple tables, then NONE of them must exist already
4116  if (load_layers.size() > 1) {
4117  for (const auto& layer : load_layers) {
4118  // construct table name
4119  auto this_table_name = construct_layer_table_name(table_name, layer.first);
4120 
4121  // table must not exist
4122  if (cat.getMetadataForTable(this_table_name)) {
4123  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
4124  "' already exists, aborting!");
4125  }
4126  }
4127  }
4128 
4129  // prepare to gather errors that would otherwise be exceptions, as we can only throw
4130  // one
4131  std::vector<std::string> caught_exception_messages;
4132 
4133  // prepare to time multi-layer import
4134  double total_import_ms = 0.0;
4135 
4136  // now we're safe to start importing
4137  // we loop over the layers we're going to attempt to load
4138  for (const auto& layer : load_layers) {
4139  // unpack
4140  const auto& layer_name = layer.first;
4141  const auto& layer_contents = layer.second;
4142  bool is_geo_layer =
4144 
4145  // construct table name again
4146  auto this_table_name = construct_layer_table_name(table_name, layer_name);
4147 
4148  // report
4149  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
4150 
4151  // we need a row descriptor
4152  TRowDescriptor rd;
4153  if (row_desc.size() > 0) {
4154  // we have a valid RowDescriptor
4155  // this is the case where Immerse has already detected and created
4156  // all we need to do is import and trust that the data will match
4157  // use the provided row descriptor
4158  // table must already exist (we check this below)
4159  rd = row_desc;
4160  } else {
4161  // we don't have a RowDescriptor
4162  // we have to detect the file ourselves
4163  TDetectResult cds;
4164  TCopyParams cp_copy = cp; // retain S3 auth tokens
4165  cp_copy.geo_layer_name = layer_name;
4166  cp_copy.file_type = TFileType::POLYGON;
4167  try {
4168  detect_column_types(cds, session, file_name_in, cp_copy);
4169  } catch (const std::exception& e) {
4170  // capture the error and abort this layer
4171  caught_exception_messages.emplace_back(
4172  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
4173  continue;
4174  }
4175  rd = cds.row_set.row_desc;
4176 
4177  // then, if the table does NOT already exist, create it
4178  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4179  if (!td) {
4180  try {
4181  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
4182  } catch (const std::exception& e) {
4183  // capture the error and abort this layer
4184  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
4185  layer_name + "':" + e.what());
4186  continue;
4187  }
4188  }
4189  }
4190 
4191  // by this point, the table should exist, one way or another
4192  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4193  if (!td) {
4194  // capture the error and abort this layer
4195  std::string exception_message =
4196  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
4197  this_table_name + "'; table does not exist or failed to create.";
4198  caught_exception_messages.emplace_back(exception_message);
4199  continue;
4200  }
4201 
4202  // then, we have to verify that the structure matches
4203  // get column descriptors (non-system, non-deleted, logical columns only)
4204  const auto col_descriptors =
4205  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4206 
4207  // first, compare the column count
4208  if (col_descriptors.size() != rd.size()) {
4209  // capture the error and abort this layer
4210  std::string exception_message =
4211  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
4212  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
4213  ", expecting " + std::to_string(col_descriptors.size()) + ")";
4214  caught_exception_messages.emplace_back(exception_message);
4215  continue;
4216  }
4217 
4218  try {
4219  // then the names and types
4220  int rd_index = 0;
4221  for (auto cd : col_descriptors) {
4222  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
4223  std::string gname = rd[rd_index].col_name; // got
4224  std::string ename = cd->columnName; // expecting
4225 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4226  TTypeInfo gti = rd[rd_index].col_type; // got
4227 #endif
4228  TTypeInfo eti = cd_col_type.col_type; // expecting
4229  // check for name match
4230  if (gname != ename) {
4231  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
4232  gname == OMNISCI_GEO_PREFIX) {
4233  // rename incoming geo column to match existing legacy default geo column
4234  rd[rd_index].col_name = gname;
4235  LOG(INFO)
4236  << "import_geo_table: Renaming incoming geo column to match existing "
4237  "legacy default geo column";
4238  } else {
4239  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
4240  }
4241  }
4242 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4243  // check for type attributes match
4244  // these attrs must always match regardless of type
4245  if (gti.type != eti.type) {
4247  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
4248  }
4249  if (gti.is_array != eti.is_array) {
4251  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
4252  }
4253  if (gti.nullable != eti.nullable) {
4255  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
4256  }
4257  if (TTypeInfo_IsGeo(eti.type)) {
4258  // for geo, only these other attrs must also match
4259  // encoding and comp_param are allowed to differ
4260  // this allows appending to existing geo table
4261  // without needing to know the existing encoding
4262  if (gti.precision != eti.precision) {
4264  "geo sub-type",
4265  TTypeInfo_GeoSubTypeToString(gti.precision),
4266  TTypeInfo_GeoSubTypeToString(eti.precision));
4267  }
4268  if (gti.scale != eti.scale) {
4270  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
4271  }
4272  if (gti.encoding != eti.encoding) {
4273  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
4274  }
4275  if (gti.comp_param != eti.comp_param) {
4276  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
4277  }
4278  } else {
4279  // non-geo, all other attrs must also match
4280  // @TODO consider relaxing some of these dependent on type
4281  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
4282  if (gti.precision != eti.precision) {
4284  std::to_string(gti.precision),
4285  std::to_string(eti.precision));
4286  }
4287  if (gti.scale != eti.scale) {
4289  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
4290  }
4291  if (gti.encoding != eti.encoding) {
4293  "encoding",
4294  TTypeInfo_EncodingToString(gti.encoding),
4295  TTypeInfo_EncodingToString(eti.encoding));
4296  }
4297  if (gti.comp_param != eti.comp_param) {
4299  std::to_string(gti.comp_param),
4300  std::to_string(eti.comp_param));
4301  }
4302  }
4303 #endif
4304  rd_index++;
4305  }
4306  } catch (const std::exception& e) {
4307  // capture the error and abort this layer
4308  caught_exception_messages.emplace_back(e.what());
4309  continue;
4310  }
4311 
4312  std::map<std::string, std::string> colname_to_src;
4313  for (auto r : rd) {
4314  colname_to_src[r.col_name] =
4315  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
4316  }
4317 
4318  try {
4319  check_table_load_privileges(*session_ptr, this_table_name);
4320  } catch (const std::exception& e) {
4321  // capture the error and abort this layer
4322  caught_exception_messages.emplace_back(e.what());
4323  continue;
4324  }
4325 
4326  if (is_geo_layer) {
4327  // Final check to ensure that we actually have a geo column
4328  // of the expected name and type before doing the actual import,
4329  // in case the user naively overrode the name or type in Immerse
4330  // Preview (which as of 6/8/18 it still allows you to do).
4331  // This avoids a fatal assert later when it fails to find the
4332  // column. We should make Immerse more robust and disallow this.
4333  bool have_geo_column_with_correct_name = false;
4334  for (const auto& r : rd) {
4335  if (TTypeInfo_IsGeo(r.col_type.type)) {
4336  // TODO(team): allow user to override the geo column name
4337  if (r.col_name == OMNISCI_GEO_PREFIX) {
4338  have_geo_column_with_correct_name = true;
4339  } else if (r.col_name == LEGACY_GEO_PREFIX) {
4340  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
4341  // Normalize column names for geo append with legacy column naming scheme
4342  colname_to_src[r.col_name] = r.col_name;
4343  have_geo_column_with_correct_name = true;
4344  }
4345  }
4346  }
4347  if (!have_geo_column_with_correct_name) {
4348  std::string exception_message = "Table " + this_table_name +
4349  " does not have a geo column with name '" +
4350  OMNISCI_GEO_PREFIX + "'. Import aborted!";
4351  caught_exception_messages.emplace_back(exception_message);
4352  continue;
4353  }
4354  }
4355 
4356  try {
4357  // import this layer only?
4358  copy_params.geo_layer_name = layer_name;
4359 
4360  // create an importer
4361  std::unique_ptr<import_export::Importer> importer;
4362  if (leaf_aggregator_.leafCount() > 0) {
4363  importer.reset(new import_export::Importer(
4364  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4365  file_path.string(),
4366  copy_params));
4367  } else {
4368  importer.reset(
4369  new import_export::Importer(cat, td, file_path.string(), copy_params));
4370  }
4371 
4372  // import
4373  auto ms = measure<>::execution([&]() { importer->importGDAL(colname_to_src); });
4374  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
4375  << "s";
4376  total_import_ms += ms;
4377  } catch (const std::exception& e) {
4378  std::string exception_message =
4379  "Import of Layer '" + this_table_name + "' failed: " + e.what();
4380  caught_exception_messages.emplace_back(exception_message);
4381  continue;
4382  }
4383  }
4384 
4385  // did we catch any exceptions?
4386  if (caught_exception_messages.size()) {
4387  // combine all the strings into one and throw a single Thrift exception
4388  std::string combined_exception_message = "Failed to import geo file:\n";
4389  for (const auto& message : caught_exception_messages) {
4390  combined_exception_message += message + "\n";
4391  }
4392  THROW_MAPD_EXCEPTION(combined_exception_message);
4393  } else {
4394  // report success and total time
4395  LOG(INFO) << "Import Successful!";
4396  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
4397  }
4398 }
4399 
4400 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
4401 
4402 void DBHandler::import_table_status(TImportStatus& _return,
4403  const TSessionId& session,
4404  const std::string& import_id) {
4405  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
4406  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4407  auto is = import_export::Importer::get_import_status(import_id);
4408  _return.elapsed = is.elapsed.count();
4409  _return.rows_completed = is.rows_completed;
4410  _return.rows_estimated = is.rows_estimated;
4411  _return.rows_rejected = is.rows_rejected;
4412 }
4413 
4415  const TSessionId& session,
4416  const std::string& archive_path_in,
4417  const TCopyParams& copy_params) {
4418  auto stdlog =
4419  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
4420  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4421 
4422  std::string archive_path(archive_path_in);
4423 
4424  if (path_is_relative(archive_path)) {
4425  // assume relative paths are relative to data_path / mapd_import / <session>
4426  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4427  boost::filesystem::path(archive_path).filename();
4428  archive_path = file_path.string();
4429  }
4430  validate_import_file_path_if_local(archive_path);
4431 
4432  if (is_a_supported_archive_file(archive_path)) {
4433  // find the archive file
4434  add_vsi_network_prefix(archive_path);
4435  if (!import_export::Importer::gdalFileExists(archive_path,
4436  thrift_to_copyparams(copy_params))) {
4437  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4438  }
4439  // find geo file in archive
4440  add_vsi_archive_prefix(archive_path);
4441  std::string geo_file =
4442  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
4443  // what did we get?
4444  if (geo_file.size()) {
4445  // prepend it with the original path
4446  _return = archive_path_in + std::string("/") + geo_file;
4447  } else {
4448  // just return the original path
4449  _return = archive_path_in;
4450  }
4451  } else {
4452  // just return the original path
4453  _return = archive_path_in;
4454  }
4455 }
4456 
4457 void DBHandler::get_all_files_in_archive(std::vector<std::string>& _return,
4458  const TSessionId& session,
4459  const std::string& archive_path_in,
4460  const TCopyParams& copy_params) {
4461  auto stdlog =
4462  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
4463  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4464 
4465  std::string archive_path(archive_path_in);
4466  if (path_is_relative(archive_path)) {
4467  // assume relative paths are relative to data_path / mapd_import / <session>
4468  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4469  boost::filesystem::path(archive_path).filename();
4470  archive_path = file_path.string();
4471  }
4472  validate_import_file_path_if_local(archive_path);
4473 
4474  if (is_a_supported_archive_file(archive_path)) {
4475  // find the archive file
4476  add_vsi_network_prefix(archive_path);
4477  if (!import_export::Importer::gdalFileExists(archive_path,
4478  thrift_to_copyparams(copy_params))) {
4479  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4480  }
4481  // find all files in archive
4482  add_vsi_archive_prefix(archive_path);
4484  archive_path, thrift_to_copyparams(copy_params));
4485  // prepend them all with original path
4486  for (auto& s : _return) {
4487  s = archive_path_in + '/' + s;
4488  }
4489  }
4490 }
4491 
4492 void DBHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
4493  const TSessionId& session,
4494  const std::string& file_name_in,
4495  const TCopyParams& cp) {
4496  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
4497  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4498 
4499  std::string file_name(file_name_in);
4500 
4502 
4503  // handle relative paths
4504  if (path_is_relative(file_name)) {
4505  // assume relative paths are relative to data_path / mapd_import / <session>
4506  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4507  boost::filesystem::path(file_name).filename();
4508  file_name = file_path.string();
4509  }
4511 
4512  // validate file_name
4513  if (is_a_supported_geo_file(file_name, true)) {
4514  // prepare to load geo file directly
4515  add_vsi_network_prefix(file_name);
4516  add_vsi_geo_prefix(file_name);
4517  } else if (is_a_supported_archive_file(file_name)) {
4518  // find the archive file
4519  add_vsi_network_prefix(file_name);
4520  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4521  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4522  }
4523  // find geo file in archive
4524  add_vsi_archive_prefix(file_name);
4525  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4526  // prepare to load that geo file
4527  if (geo_file.size()) {
4528  file_name = file_name + std::string("/") + geo_file;
4529  }
4530  } else {
4531  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4532  file_name_in);
4533  }
4534 
4535  // check the file actually exists
4536  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4537  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
4538  }
4539 
4540  // find all layers
4541  auto internal_layer_info =
4542  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4543 
4544  // convert to Thrift type
4545  for (const auto& internal_layer : internal_layer_info) {
4546  TGeoFileLayerInfo layer;
4547  layer.name = internal_layer.name;
4548  switch (internal_layer.contents) {
4550  layer.contents = TGeoFileLayerContents::EMPTY;
4551  break;
4553  layer.contents = TGeoFileLayerContents::GEO;
4554  break;
4556  layer.contents = TGeoFileLayerContents::NON_GEO;
4557  break;
4559  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
4560  break;
4561  default:
4562  CHECK(false);
4563  }
4564  _return.emplace_back(layer); // no suitable constructor to just pass parameters
4565  }
4566 }
4567 
4568 void DBHandler::start_heap_profile(const TSessionId& session) {
4569  auto stdlog = STDLOG(get_session_ptr(session));
4570 #ifdef HAVE_PROFILER
4571  if (IsHeapProfilerRunning()) {
4572  THROW_MAPD_EXCEPTION("Profiler already started");
4573  }
4574  HeapProfilerStart("omnisci");
4575 #else
4576  THROW_MAPD_EXCEPTION("Profiler not enabled");
4577 #endif // HAVE_PROFILER
4578 }
4579 
4580 void DBHandler::stop_heap_profile(const TSessionId& session) {
4581  auto stdlog = STDLOG(get_session_ptr(session));
4582 #ifdef HAVE_PROFILER
4583  if (!IsHeapProfilerRunning()) {
4584  THROW_MAPD_EXCEPTION("Profiler not running");
4585  }
4586  HeapProfilerStop();
4587 #else
4588  THROW_MAPD_EXCEPTION("Profiler not enabled");
4589 #endif // HAVE_PROFILER
4590 }
4591 
4592 void DBHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
4593  auto stdlog = STDLOG(get_session_ptr(session));
4594 #ifdef HAVE_PROFILER
4595  if (!IsHeapProfilerRunning()) {
4596  THROW_MAPD_EXCEPTION("Profiler not running");
4597  }
4598  auto profile_buff = GetHeapProfile();
4599  profile = profile_buff;
4600  free(profile_buff);
4601 #else
4602  THROW_MAPD_EXCEPTION("Profiler not enabled");
4603 #endif // HAVE_PROFILER
4604 }
4605 
4606 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
4607 void DBHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
4608  if (session_it->second.use_count() > 2 ||
4609  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
4610  // SessionInfo is being used in more than one active operation. Original copy + one
4611  // stored in StdLog. Skip the checks.
4612  return;
4613  }
4614  time_t last_used_time = session_it->second->get_last_used_time();
4615  time_t start_time = session_it->second->get_start_time();
4616  const auto current_session_duration = time(0) - last_used_time;
4617  if (current_session_duration > idle_session_duration_) {
4618  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
4619  << " idle duration " << current_session_duration
4620  << " seconds exceeds maximum idle duration " << idle_session_duration_
4621  << " seconds. Invalidating session.";
4622  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
4623  }
4624  const auto total_session_duration = time(0) - start_time;
4625  if (total_session_duration > max_session_duration_) {
4626  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
4627  << " total duration " << total_session_duration
4628  << " seconds exceeds maximum total session duration "
4629  << max_session_duration_ << " seconds. Invalidating session.";
4630  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
4631  }
4632 }
4633 
4634 std::shared_ptr<const Catalog_Namespace::SessionInfo> DBHandler::get_const_session_ptr(
4635  const TSessionId& session) {
4636  return get_session_ptr(session);
4637 }
4638 
4640  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4641  return *get_session_it_unsafe(session, read_lock)->second;
4642 }
4643 
4644 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_copy_ptr(
4645  const TSessionId& session) {
4646  // Note(Wamsi): We have `get_const_session_ptr` which would return as const
4647  // SessionInfo stored in the map. You can use `get_const_session_ptr` instead of the
4648  // copy of SessionInfo but beware that it can be changed in teh map. So if you do not
4649  // care about the changes then use `get_const_session_ptr` if you do then use this
4650  // function to get a copy. We should eventually aim to merge both
4651  // `get_const_session_ptr` and `get_session_copy_ptr`.
4652  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4653  auto& session_info_ref = *get_session_it_unsafe(session, read_lock)->second;
4654  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
4655 }
4656 
4657 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_ptr(
4658  const TSessionId& session_id) {
4659  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
4660  // Should be used only when you need to make updates to original SessionInfo object.
4661  // Currently used by `update_session_last_used_duration`
4662 
4663  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will
4664  // be invalid during disconnect. SessionInfo will be erased from map by the time it
4665  // reaches here. In both the above cases, we would return `nullptr` and can skip
4666  // SessionInfo updates.
4667  if (session_id.empty()) {
4668  return {};
4669  }
4670  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4671  return get_session_it_unsafe(session_id, read_lock)->second;
4672 }
4673 
4675  const Catalog_Namespace::SessionInfo& session_info,
4676  const std::string& table_name) {
4677  auto user_metadata = session_info.get_currentUser();
4678  auto& cat = session_info.getCatalog();
4679  DBObject dbObject(table_name, TableDBObjectType);
4680  dbObject.loadKey(cat);
4682  std::vector<DBObject> privObjects;
4683  privObjects.push_back(dbObject);
4684  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
4685  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
4686  user_metadata.userName + " has no insert privileges for table " +
4687  table_name + ".");
4688  }
4689 }
4690 
4691 void DBHandler::check_table_load_privileges(const TSessionId& session,
4692  const std::string& table_name) {
4693  const auto session_info = get_session_copy(session);
4694  check_table_load_privileges(session_info, table_name);
4695 }
4696 
4698  const TExecuteMode::type mode) {
4699  const std::string& user_name = session_ptr->get_currentUser().userName;
4700  switch (mode) {
4701  case TExecuteMode::GPU:
4702  if (cpu_mode_only_) {
4703  TOmniSciException e;
4704  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
4705  throw e;
4706  }
4708  LOG(INFO) << "User " << user_name << " sets GPU mode.";
4709  break;
4710  case TExecuteMode::CPU:
4712  LOG(INFO) << "User " << user_name << " sets CPU mode.";
4713  break;
4714  }
4715 }
4716 
4717 std::vector<PushedDownFilterInfo> DBHandler::execute_rel_alg(
4718  TQueryResult& _return,
4719  QueryStateProxy query_state_proxy,
4720  const std::string& query_ra,
4721  const bool column_format,
4722  const ExecutorDeviceType executor_device_type,
4723  const int32_t first_n,
4724  const int32_t at_most_n,
4725  const bool just_validate,
4726  const bool find_push_down_candidates,
4727  const ExplainInfo& explain_info,
4728  const std::optional<size_t> executor_index) const {
4729  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4730 
4731  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
4732  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
4733 
4734  const auto& cat = query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog();
4735  auto executor = Executor::getExecutor(
4736  executor_index ? *executor_index : Executor::UNITARY_EXECUTOR_ID,
4737  jit_debug_ ? "/tmp" : "",
4738  jit_debug_ ? "mapdquery" : "",
4740  RelAlgExecutor ra_executor(executor.get(),
4741  cat,
4742  query_ra,
4743  query_state_proxy.getQueryState().shared_from_this());
4744  // handle hints
4745  const auto& query_hints = ra_executor.getParsedQueryHints();
4746  CompilationOptions co = {
4747  query_hints.cpu_mode ? ExecutorDeviceType::CPU : executor_device_type,
4748  /*hoist_literals=*/true,
4751  /*allow_lazy_fetch=*/true,
4752  /*filter_on_deleted_column=*/true,
4758  explain_info.justExplain(),
4759  allow_loop_joins_ || just_validate,
4761  jit_debug_,
4762  just_validate,
4765  find_push_down_candidates,
4766  explain_info.justCalciteExplain(),
4770  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4773  nullptr,
4774  nullptr),
4775  {}};
4776  _return.execution_time_ms += measure<>::execution([&]() {
4777  result = ra_executor.executeRelAlgQuery(co, eo, explain_info.explain_plan, nullptr);
4778  });
4779  // reduce execution time by the time spent during queue waiting
4780  _return.execution_time_ms -= result.getRows()->getQueueTime();
4781  VLOG(1) << cat.getDataMgr().getSystemMemoryUsage();
4782  const auto& filter_push_down_info = result.getPushedDownFilterInfo();
4783  if (!filter_push_down_info.empty()) {
4784  return filter_push_down_info;
4785  }
4786  if (explain_info.justExplain()) {
4787  convert_explain(_return, *result.getRows(), column_format);
4788  } else if (!explain_info.justCalciteExplain()) {
4789  convert_rows(_return,
4790  timer.createQueryStateProxy(),
4791  result.getTargetsMeta(),
4792  *result.getRows(),
4793  column_format,
4794  first_n,
4795  at_most_n);
4796  }
4797  return {};
4798 }
4799 
4800 void DBHandler::execute_rel_alg_df(TDataFrame& _return,
4801  const std::string& query_ra,
4802  QueryStateProxy query_state_proxy,
4803  const Catalog_Namespace::SessionInfo& session_info,
4804  const ExecutorDeviceType device_type,
4805  const size_t device_id,
4806  const int32_t first_n,
4807  const TArrowTransport::type transport_method) const {
4808  const auto& cat = session_info.getCatalog();
4809  CHECK(device_type == ExecutorDeviceType::CPU ||
4812  jit_debug_ ? "/tmp" : "",
4813  jit_debug_ ? "mapdquery" : "",
4815  RelAlgExecutor ra_executor(executor.get(),
4816  cat,
4817  query_ra,
4818  query_state_proxy.getQueryState().shared_from_this());
4819  const auto& query_hints = ra_executor.getParsedQueryHints();
4820  CompilationOptions co = {query_hints.cpu_mode ? ExecutorDeviceType::CPU : device_type,
4821  /*hoist_literals=*/true,
4824  /*allow_lazy_fetch=*/true,
4825  /*filter_on_deleted_column=*/true,
4830  false,
4833  jit_debug_,
4834  false,
4837  false,
4838  false,
4842  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4845  nullptr,
4846  nullptr),
4847  {}};
4848  _return.execution_time_ms += measure<>::execution(
4849  [&]() { result = ra_executor.executeRelAlgQuery(co, eo, false, nullptr); });
4850  _return.execution_time_ms -= result.getRows()->getQueueTime();
4851  const auto rs = result.getRows();
4852  const auto converter =
4853  std::make_unique<ArrowResultSetConverter>(rs,
4854  data_mgr_,
4855  device_type,
4856  device_id,
4857  getTargetNames(result.getTargetsMeta()),
4858  first_n,
4859  ArrowTransport(transport_method));
4860  ArrowResult arrow_result;
4861  _return.arrow_conversion_time_ms +=
4862  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
4863  _return.sm_handle =
4864  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
4865  _return.sm_size = arrow_result.sm_size;
4866  _return.df_handle =
4867  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
4868  _return.df_buffer =
4869  std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
4870  if (device_type == ExecutorDeviceType::GPU) {
4871  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
4872  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
4873  ipc_handle_to_dev_ptr_.insert(
4874  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
4875  }
4876  _return.df_size = arrow_result.df_size;
4877 }
4878 
4879 std::vector<TargetMetaInfo> DBHandler::getTargetMetaInfo(
4880  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4881  std::vector<TargetMetaInfo> result;
4882  for (const auto& target : targets) {
4883  CHECK(target);
4884  CHECK(target->get_expr());
4885  result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
4886  }
4887  return result;
4888 }
4889 
4890 std::vector<std::string> DBHandler::getTargetNames(
4891  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4892  std::vector<std::string> names;
4893  for (const auto& target : targets) {
4894  CHECK(target);
4895  CHECK(target->get_expr());
4896  names.push_back(target->get_resname());
4897  }
4898  return names;
4899 }
4900 
4901 std::vector<std::string> DBHandler::getTargetNames(
4902  const std::vector<TargetMetaInfo>& targets) const {
4903  std::vector<std::string> names;
4904  for (const auto& target : targets) {
4905  names.push_back(target.get_resname());
4906  }
4907  return names;
4908 }
4909 
4910 void DBHandler::convert_rows(TQueryResult& _return,
4911  QueryStateProxy query_state_proxy,
4912  const std::vector<TargetMetaInfo>& targets,
4913  const ResultSet& results,
4914  const bool column_format,
4915  const int32_t first_n,
4916  const int32_t at_most_n) const {
4917  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4918  _return.row_set.row_desc = ThriftSerializers::target_meta_infos_to_thrift(targets);
4919  int32_t fetched{0};
4920  if (column_format) {
4921  _return.row_set.is_columnar = true;
4922  std::vector<TColumn> tcolumns(results.colCount());
4923  while (first_n == -1 || fetched < first_n) {
4924  const auto crt_row = results.getNextRow(true, true);
4925  if (crt_row.empty()) {
4926  break;
4927  }
4928  ++fetched;
4929  if (at_most_n >= 0 && fetched > at_most_n) {
4930  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4931  std::to_string(at_most_n));
4932  }
4933  for (size_t i = 0; i < results.colCount(); ++i) {
4934  const auto agg_result = crt_row[i];
4935  value_to_thrift_column(agg_result, targets[i].get_type_info(), tcolumns[i]);
4936  }
4937  }
4938  for (size_t i = 0; i < results.colCount(); ++i) {
4939  _return.row_set.columns.push_back(tcolumns[i]);
4940  }
4941  } else {
4942  _return.row_set.is_columnar = false;
4943  while (first_n == -1 || fetched < first_n) {
4944  const auto crt_row = results.getNextRow(true, true);
4945  if (crt_row.empty()) {
4946  break;
4947  }
4948  ++fetched;
4949  if (at_most_n >= 0 && fetched > at_most_n) {
4950  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4951  std::to_string(at_most_n));
4952  }
4953  TRow trow;
4954  trow.cols.reserve(results.colCount());
4955  for (size_t i = 0; i < results.colCount(); ++i) {
4956  const auto agg_result = crt_row[i];
4957  trow.cols.push_back(value_to_thrift(agg_result, targets[i].get_type_info()));
4958  }
4959  _return.row_set.rows.push_back(trow);
4960  }
4961  }
4962 }
4963 
4964 TRowDescriptor DBHandler::fixup_row_descriptor(const TRowDescriptor& row_desc,
4965  const Catalog& cat) {
4966  TRowDescriptor fixedup_row_desc;
4967  for (const TColumnType& col_desc : row_desc) {
4968  auto fixedup_col_desc = col_desc;
4969  if (col_desc.col_type.encoding == TEncodingType::DICT &&
4970  col_desc.col_type.comp_param > 0) {
4971  const auto dd = cat.getMetadataForDict(col_desc.col_type.comp_param, false);
4972  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
4973  }
4974  fixedup_row_desc.push_back(fixedup_col_desc);
4975  }
4976 
4977  return fixedup_row_desc;
4978 }
4979 
4980 // create simple result set to return a single column result
4981 void DBHandler::create_simple_result(TQueryResult& _return,
4982  const ResultSet& results,
4983  const bool column_format,
4984  const std::string label) const {
4985  CHECK_EQ(size_t(1), results.rowCount());
4986  TColumnType proj_info;
4987  proj_info.col_name = label;
4988  proj_info.col_type.type = TDatumType::STR;
4989  proj_info.col_type.nullable = false;
4990  proj_info.col_type.is_array = false;
4991  _return.row_set.row_desc.push_back(proj_info);
4992  const auto crt_row = results.getNextRow(true, true);
4993  const auto tv = crt_row[0];
4994  CHECK(results.getNextRow(true, true).empty());
4995  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
4996  CHECK(scalar_tv);
4997  const auto s_n = boost::get<NullableString>(scalar_tv);
4998  CHECK(s_n);
4999  const auto s = boost::get<std::string>(s_n);
5000  CHECK(s);
5001  if (column_format) {
5002  TColumn tcol;
5003  tcol.data.str_col.push_back(*s);
5004  tcol.nulls.push_back(false);
5005  _return.row_set.is_columnar = true;
5006  _return.row_set.columns.push_back(tcol);
5007  } else {
5008  TDatum explanation;
5009  explanation.val.str_val = *s;
5010  explanation.is_null = false;
5011  TRow trow;
5012  trow.cols.push_back(explanation);
5013  _return.row_set.is_columnar = false;
5014  _return.row_set.rows.push_back(trow);
5015  }
5016 }
5017 
5018 void DBHandler::convert_explain(TQueryResult& _return,
5019  const ResultSet& results,
5020  const bool column_format) const {
5021  create_simple_result(_return, results, column_format, "Explanation");
5022 }
5023 
5024 void DBHandler::convert_result(TQueryResult& _return,
5025  const ResultSet& results,
5026  const bool column_format) const {
5027  create_simple_result(_return, results, column_format, "Result");
5028 }
5029 
5030 // this all should be moved out of here to catalog
5032  const TableDescriptor* td,
5033  const AccessPrivileges access_priv) {
5034  CHECK(td);
5035  auto& cat = session_info.getCatalog();
5036  std::vector<DBObject> privObjects;
5037  DBObject dbObject(td->tableName, TableDBObjectType);
5038  dbObject.loadKey(cat);
5039  dbObject.setPrivileges(access_priv);
5040  privObjects.push_back(dbObject);
5041  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
5042  privObjects);
5043 };
5044 
5046  const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl);
5047  if (drop_db_stmt) {
5048  invalidate_sessions(*drop_db_stmt->getDatabaseName(), drop_db_stmt);
5049  return;
5050  }
5051  const auto rename_db_stmt = dynamic_cast<Parser::RenameDatabaseStmt*>(ddl);
5052  if (rename_db_stmt) {
5053  invalidate_sessions(*rename_db_stmt->getPreviousDatabaseName(), rename_db_stmt);
5054  return;
5055  }
5056  const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl);
5057  if (drop_user_stmt) {
5058  invalidate_sessions(*drop_user_stmt->getUserName(), drop_user_stmt);
5059  return;
5060  }
5061  const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl);
5062  if (rename_user_stmt) {
5063  invalidate_sessions(*rename_user_stmt->getOldUserName(), rename_user_stmt);
5064  return;
5065  }
5066 }
5067 
5068 void DBHandler::sql_execute_impl(TQueryResult& _return,
5069  QueryStateProxy query_state_proxy,
5070  const bool column_format,
5071  const std::string& nonce,
5072  const ExecutorDeviceType executor_device_type,
5073  const int32_t first_n,
5074  const int32_t at_most_n) {
5075  if (leaf_handler_) {
5076  leaf_handler_->flush_queue();
5077  }
5078 
5079  _return.nonce = nonce;
5080  _return.execution_time_ms = 0;
5081  auto const query_str = strip(query_state_proxy.getQueryState().getQueryStr());
5082  auto session_ptr = query_state_proxy.getQueryState().getConstSessionInfo();
5083  // Call to DistributedValidate() below may change cat.
5084  auto& cat = session_ptr->getCatalog();
5085 
5086  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
5087 
5088  mapd_unique_lock<mapd_shared_mutex> executeWriteLock;
5089  mapd_shared_lock<mapd_shared_mutex> executeReadLock;
5090 
5092  ParserWrapper pw{query_str};
5093  switch (pw.getQueryType()) {
5095  _return.query_type = TQueryType::READ;
5096  VLOG(1) << "query type: READ";
5097  break;
5098  }
5100  _return.query_type = TQueryType::WRITE;
5101  VLOG(1) << "query type: WRITE";
5102  break;
5103  }
5105  _return.query_type = TQueryType::SCHEMA_READ;
5106  VLOG(1) << "query type: SCHEMA READ";
5107  break;
5108  }
5110  _return.query_type = TQueryType::SCHEMA_WRITE;
5111  VLOG(1) << "query type: SCHEMA WRITE";
5112  break;
5113  }
5114  default: {
5115  _return.query_type = TQueryType::UNKNOWN;
5116  LOG(WARNING) << "query type: UNKNOWN";
5117  break;
5118  }
5119  }
5120  if (pw.isCalcitePathPermissable(read_only_)) {
5121  // run DDL before the locks as DDL statements should handle their own locking
5122  if (pw.isCalciteDdl()) {
5123  std::string query_ra;
5124  _return.execution_time_ms += measure<>::execution([&]() {
5125  TPlanResult result;
5126  std::tie(result, locks) =
5127  parse_to_ra(query_state_proxy, query_str, {}, false, system_parameters_);
5128  query_ra = result.plan_result;
5129  });
5130 
5131  executeDdl(_return, query_ra, session_ptr);
5132  return;
5133  }
5134 
5135  executeReadLock = mapd_shared_lock<mapd_shared_mutex>(
5138 
5139  std::string query_ra;
5140  _return.execution_time_ms += measure<>::execution([&]() {
5141  TPlanResult result;
5142  std::tie(result, locks) =
5143  parse_to_ra(query_state_proxy, query_str, {}, true, system_parameters_);
5144  query_ra = result.plan_result;
5145  });
5146 
5147  std::string query_ra_calcite_explain;
5148  if (pw.isCalciteExplain() && (!g_enable_filter_push_down || g_cluster)) {
5149  // return the ra as the result
5150  convert_explain(_return, ResultSet(query_ra), true);
5151  return;
5152  } else if (pw.isCalciteExplain()) {
5153  // removing the "explain calcite " from the beginning of the "query_str":
5154  std::string temp_query_str =
5155  query_str.substr(std::string("explain calcite ").length());
5156 
5157  CHECK(!locks.empty());
5158  query_ra_calcite_explain =
5159  parse_to_ra(query_state_proxy, temp_query_str, {}, false, system_parameters_)
5160  .first.plan_result;
5161  }
5162  const auto explain_info = pw.getExplainInfo();
5163  std::vector<PushedDownFilterInfo> filter_push_down_requests;
5164  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
5165  [this,
5166  &filter_push_down_requests,
5167  &_return,
5168  &query_state_proxy,
5169  &explain_info,
5170  &query_ra_calcite_explain,
5171  &query_ra,
5172  &query_str,
5173  &locks,
5174  column_format,
5175  executor_device_type,
5176  first_n,
5177  at_most_n](const size_t executor_index) {
5178  filter_push_down_requests = execute_rel_alg(
5179  _return,
5180  query_state_proxy,
5181  explain_info.justCalciteExplain() ? query_ra_calcite_explain : query_ra,
5182  column_format,
5183  executor_device_type,
5184  first_n,
5185  at_most_n,
5186  /*just_validate=*/false,
5188  explain_info,
5189  executor_index);
5190  if (explain_info.justCalciteExplain() && filter_push_down_requests.empty()) {
5191  // we only reach here if filter push down was enabled, but no filter
5192  // push down candidate was found
5193  convert_explain(_return, ResultSet(query_ra), true);
5194  } else if (!filter_push_down_requests.empty()) {
5195  CHECK(!locks.empty());
5197  query_state_proxy,
5198  query_ra,
5199  column_format,
5200  executor_device_type,
5201  first_n,
5202  at_most_n,
5203  explain_info.justExplain(),
5204  explain_info.justCalciteExplain(),
5205  filter_push_down_requests);
5206  } else if (explain_info.justCalciteExplain() &&
5207  filter_push_down_requests.empty()) {
5208  // return the ra as the result:
5209  // If we reach here, the 'filter_push_down_request' turned out to be
5210  // empty, i.e., no filter push down so we continue with the initial
5211  // (unchanged) query's calcite explanation.
5212  CHECK(!locks.empty());
5213  query_ra =
5214  parse_to_ra(query_state_proxy, query_str, {}, false, system_parameters_)
5215  .first.plan_result;
5216  convert_explain(_return, ResultSet(query_ra), true);
5217  }
5218  });
5220  dispatch_queue_->submit(execute_rel_alg_task,
5221  pw.getDMLType() == ParserWrapper::DMLType::Update ||
5222  pw.getDMLType() == ParserWrapper::DMLType::Delete);
5223  auto result_future = execute_rel_alg_task->get_future();
5224  result_future.get();
5225  return;
5226  } else if (pw.is_optimize || pw.is_validate) {
5227  // Get the Stmt object
5228  DBHandler::parser_with_error_handler(query_str, parse_trees);
5229 
5230  if (pw.is_optimize) {
5231  const auto optimize_stmt =
5232  dynamic_cast<Parser::OptimizeTableStmt*>(parse_trees.front().get());
5233  CHECK(optimize_stmt);
5234 
5235  _return.execution_time_ms += measure<>::execution([&]() {
5236  const auto td_with_lock =
5238  cat, optimize_stmt->getTableName());
5239  const auto td = td_with_lock();
5240 
5241  if (!td || !user_can_access_table(
5242  *session_ptr, td, AccessPrivileges::DELETE_FROM_TABLE)) {
5243  throw std::runtime_error("Table " + optimize_stmt->getTableName() +
5244  " does not exist.");
5245  }
5246  if (td->isView) {
5247  throw std::runtime_error("OPTIMIZE TABLE command is not supported on views.");
5248  }
5249 
5250  // acquire write lock on table data
5251  auto data_lock =
5253 
5254  auto executor = Executor::getExecutor(
5256  const TableOptimizer optimizer(td, executor.get(), cat);
5257  if (optimize_stmt->shouldVacuumDeletedRows()) {
5258  optimizer.vacuumDeletedRows();
5259  }
5260  optimizer.recomputeMetadata();
5261  });
5262 
5263  return;
5264  }
5265  if (pw.is_validate) {
5266  // check user is superuser
5267  if (!session_ptr->get_currentUser().isSuper) {
5268  throw std::runtime_error("Superuser is required to run VALIDATE");
5269  }
5270  const auto validate_stmt =
5271  dynamic_cast<Parser::ValidateStmt*>(parse_trees.front().get());
5272  CHECK(validate_stmt);
5273 
5274  // Prevent any other query from running while doing validate
5275  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5278 
5279  std::string output{"Result for validate"};
5281  _return.execution_time_ms += measure<>::execution([&]() {
5282  const DistributedValidate validator(validate_stmt->getType(),
5283  validate_stmt->isRepairTypeRemove(),
5284  cat, // tables may be dropped here
5286  *session_ptr,
5287  *this);
5288  output = validator.validate(query_state_proxy);
5289  });
5290  } else {
5291  output = "Not running on a cluster nothing to validate";
5292  }
5293  convert_result(_return, ResultSet(output), true);
5294  return;
5295  }
5296  }
5297  LOG(INFO) << "passing query to legacy processor";
5298 
5299  const auto result = apply_copy_to_shim(query_str);
5300  DBHandler::parser_with_error_handler(result, parse_trees);
5301  auto handle_ddl = [&query_state_proxy, &session_ptr, &_return, &locks, this](
5302  Parser::DDLStmt* ddl) -> bool {
5303  if (!ddl) {
5304  return false;
5305  }
5306  const auto show_create_stmt = dynamic_cast<Parser::ShowCreateTableStmt*>(ddl);
5307  if (show_create_stmt) {
5308  _return.execution_time_ms +=
5309  measure<>::execution([&]() { ddl->execute(*session_ptr); });
5310  const auto create_string = show_create_stmt->getCreateStmt();
5311  convert_result(_return, ResultSet(create_string), true);
5312  return true;
5313  }
5314 
5315  const auto import_stmt = dynamic_cast<Parser::CopyTableStmt*>(ddl);
5316  if (import_stmt) {
5317  if (g_cluster && !leaf_aggregator_.leafCount()) {
5318  // Don't allow copy from imports directly on a leaf node
5319  throw std::runtime_error(
5320  "Cannot import on an individual leaf. Please import from the Aggregator.");
5321  } else if (leaf_aggregator_.leafCount() > 0) {
5322  _return.execution_time_ms += measure<>::execution(
5323  [&]() { execute_distributed_copy_statement(import_stmt, *session_ptr); });
5324  } else {
5325  _return.execution_time_ms +=
5326  measure<>::execution([&]() { ddl->execute(*session_ptr); });
5327  }
5328 
5329  // Read response message
5330  convert_result(_return, ResultSet(*import_stmt->return_message.get()), true);
5331  _return.success = import_stmt->get_success();
5332 
5333  // get geo_copy_from info
5334  if (import_stmt->was_geo_copy_from()) {
5335  GeoCopyFromState geo_copy_from_state;
5336  import_stmt->get_geo_copy_from_payload(
5337  geo_copy_from_state.geo_copy_from_table,
5338  geo_copy_from_state.geo_copy_from_file_name,
5339  geo_copy_from_state.geo_copy_from_copy_params,
5340  geo_copy_from_state.geo_copy_from_partitions);
5341  geo_copy_from_sessions.add(session_ptr->get_session_id(), geo_copy_from_state);
5342  }
5343  return true;
5344  }
5345 
5346  // Check for DDL statements requiring locking and get locks
5347  auto export_stmt = dynamic_cast<Parser::ExportQueryStmt*>(ddl);
5348  if (export_stmt) {
5349  const auto query_string = export_stmt->get_select_stmt();
5350  TPlanResult result;
5351  CHECK(locks.empty());
5352  std::tie(result, locks) =
5353  parse_to_ra(query_state_proxy, query_string, {}, true, system_parameters_);
5354  }
5355  _return.execution_time_ms += measure<>::execution([&]() {
5356  ddl->execute(*session_ptr);
5358  });
5359  return true;
5360  };
5361 
5362  for (const auto& stmt : parse_trees) {
5363  auto select_stmt = dynamic_cast<Parser::SelectStmt*>(stmt.get());
5364  if (!select_stmt) {
5365  check_read_only("Non-SELECT statements");
5366  }
5367  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt.get());
5368  if (!handle_ddl(ddl)) {
5369  auto stmtp = dynamic_cast<Parser::InsertValuesStmt*>(stmt.get());
5370  CHECK(stmtp); // no other statements supported
5371 
5372  if (parse_trees.size() != 1) {
5373  throw std::runtime_error("Can only run one INSERT INTO query at a time.");
5374  }
5375 
5376  _return.execution_time_ms +=
5377  measure<>::execution([&]() { stmtp->execute(*session_ptr); });
5378  }
5379  }
5380 }
5381 
5383  TQueryResult& _return,
5384  QueryStateProxy query_state_proxy,
5385  std::string& query_ra,
5386  const bool column_format,
5387  const ExecutorDeviceType executor_device_type,
5388  const int32_t first_n,
5389  const int32_t at_most_n,
5390  const bool just_explain,
5391  const bool just_calcite_explain,
5392  const std::vector<PushedDownFilterInfo>& filter_push_down_requests) {
5393  // collecting the selected filters' info to be sent to Calcite:
5394  std::vector<TFilterPushDownInfo> filter_push_down_info;
5395  for (const auto& req : filter_push_down_requests) {
5396  TFilterPushDownInfo filter_push_down_info_for_request;
5397  filter_push_down_info_for_request.input_prev = req.input_prev;
5398  filter_push_down_info_for_request.input_start = req.input_start;
5399  filter_push_down_info_for_request.input_next = req.input_next;
5400  filter_push_down_info.push_back(filter_push_down_info_for_request);
5401  }
5402  // deriving the new relational algebra plan with respect to the pushed down filters
5403  _return.execution_time_ms += measure<>::execution([&]() {
5404  query_ra = parse_to_ra(query_state_proxy,
5405  query_state_proxy.getQueryState().getQueryStr(),
5406  filter_push_down_info,
5407  false,
5409  .first.plan_result;
5410  });
5411 
5412  if (just_calcite_explain) {
5413  // return the new ra as the result
5414  convert_explain(_return, ResultSet(query_ra), true);
5415  return;
5416  }
5417 
5418  // execute the new relational algebra plan:
5419  auto explain_info = ExplainInfo::defaults();
5420  explain_info.explain = just_explain;
5421  execute_rel_alg(_return,
5422  query_state_proxy,
5423  query_ra,
5424  column_format,
5425  executor_device_type,
5426  first_n,
5427  at_most_n,
5428  /*just_validate=*/false,
5429  /*find_push_down_candidates=*/false,
5430  explain_info);
5431 }
5432 
5434  Parser::CopyTableStmt* copy_stmt,
5435  const Catalog_Namespace::SessionInfo& session_info) {
5436  auto importer_factory = [&session_info, this](
5437  const Catalog& catalog,
5438  const TableDescriptor* td,
5439  const std::string& file_path,
5440  const import_export::CopyParams& copy_params) {
5441  return std::make_unique<import_export::Importer>(
5442  new DistributedLoader(session_info, td, &leaf_aggregator_),
5443  file_path,
5444  copy_params);
5445  };
5446  copy_stmt->execute(session_info, importer_factory);
5447 }
5448 
5449 std::pair<TPlanResult, lockmgr::LockedTableDescriptors> DBHandler::parse_to_ra(
5450  QueryStateProxy query_state_proxy,
5451  const std::string& query_str,
5452  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
5453  const bool acquire_locks,
5454  const SystemParameters system_parameters,
5455  bool check_privileges) {
5456  query_state::Timer timer = query_state_proxy.createTimer(__func__);
5457  ParserWrapper pw{query_str};
5458  const std::string actual_query{pw.isSelectExplain() ? pw.actual_query : query_str};
5459  TPlanResult result;
5460  if (pw.isCalcitePathPermissable()) {
5461  auto cat = query_state_proxy.getQueryState().getConstSessionInfo()->get_catalog_ptr();
5462  auto session_cleanup_handler = [&](const auto& session_id) {
5463  removeInMemoryCalciteSession(session_id);
5464  };
5465  auto process_calcite_request = [&] {
5466  const auto& in_memory_session_id = createInMemoryCalciteSession(cat);
5467  try {
5468  result = calcite_->process(timer.createQueryStateProxy(),
5469  legacy_syntax_ ? pg_shim(actual_query) : actual_query,
5470  filter_push_down_info,
5472  pw.isCalciteExplain(),
5473  system_parameters.enable_calcite_view_optimize,
5474  check_privileges,
5475  in_memory_session_id);
5476  session_cleanup_handler(in_memory_session_id);
5477  } catch (std::exception&) {
5478  session_cleanup_handler(in_memory_session_id);
5479  throw;
5480  }
5481  };
5482  process_calcite_request();
5484  if (acquire_locks) {
5485  std::set<std::string> read_only_tables;
5486  for (const auto& table : result.resolved_accessed_objects.tables_selected_from) {
5487  read_only_tables.insert(table);
5488  }
5489  std::vector<std::string> tables;
5490  tables.insert(tables.end(),
5491  result.resolved_accessed_objects.tables_selected_from.begin(),
5492  result.resolved_accessed_objects.tables_selected_from.end());
5493  tables.insert(tables.end(),
5494  result.resolved_accessed_objects.tables_inserted_into.begin(),
5495  result.resolved_accessed_objects.tables_inserted_into.end());
5496  tables.insert(tables.end(),
5497  result.resolved_accessed_objects.tables_updated_in.begin(),
5498  result.resolved_accessed_objects.tables_updated_in.end());
5499  tables.insert(tables.end(),
5500  result.resolved_accessed_objects.tables_deleted_from.begin(),
5501  result.resolved_accessed_objects.tables_deleted_from.end());
5502  // avoid deadlocks by enforcing a deterministic locking sequence
5503  // first, obtain table schema locks
5504  // then, obtain table data locks
5505  std::sort(tables.begin(), tables.end());
5506  for (const auto& table : tables) {
5507  locks.emplace_back(
5510  lockmgr::ReadLock>::acquireTableDescriptor(*cat.get(), table)));
5511  if (read_only_tables.count(table)) {
5512  locks.emplace_back(
5515  cat->getDatabaseId(), (*locks.back())())));
5516  } else {
5517  // Aquire an insert data lock for updates/deletes, consistent w/ insert. The
5518  // table data lock will be aquired in the fragmenter during checkpoint.
5519  locks.emplace_back(
5522  cat->getDatabaseId(), (*locks.back())())));
5523  }
5524  }
5525  }
5526  return std::make_pair(result, std::move(locks));
5527  }
5528  return std::make_pair(result, lockmgr::LockedTableDescriptors{});
5529 }
5530 
5531 int64_t DBHandler::query_get_outer_fragment_count(const TSessionId& session,
5532  const std::string& select_query) {
5533  auto stdlog = STDLOG(get_session_ptr(session));
5534  if (!leaf_handler_) {
5535  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5536  }
5537  try {
5538  return leaf_handler_->query_get_outer_fragment_count(session, select_query);
5539  } catch (std::exception& e) {
5540  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5541  }
5542 }
5543 
5544 void DBHandler::check_table_consistency(TTableMeta& _return,
5545  const TSessionId& session,
5546  const int32_t table_id) {
5547  auto stdlog = STDLOG(get_session_ptr(session));
5548  if (!leaf_handler_) {
5549  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5550  }
5551  try {
5552  leaf_handler_->check_table_consistency(_return, session, table_id);
5553  } catch (std::exception& e) {
5554  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5555  }
5556 }
5557 
5558 void DBHandler::start_query(TPendingQuery& _return,
5559  const TSessionId& leaf_session,
5560  const TSessionId& parent_session,
5561  const std::string& query_ra,
5562  const bool just_explain,
5563  const std::vector<int64_t>& outer_fragment_indices) {
5564  auto stdlog = STDLOG(get_session_ptr(leaf_session));
5565  auto session_ptr = stdlog.getConstSessionInfo();
5566  if (!leaf_handler_) {
5567  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5568  }
5569