OmniSciDB  8a228a1076
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: DBHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "DBHandler.h"
25 #include "DistributedLoader.h"
26 #include "MapDServer.h"
28 #include "TokenCompletionHints.h"
29 
30 #ifdef HAVE_PROFILER
31 #include <gperftools/heap-profiler.h>
32 #endif // HAVE_PROFILER
33 
34 #include "MapDRelease.h"
35 
36 #include "Calcite/Calcite.h"
37 #include "gen-cpp/CalciteServer.h"
38 
40 
41 #include "Catalog/Catalog.h"
46 #include "DistributedHandler.h"
48 #include "ImportExport/GDAL.h"
49 #include "ImportExport/Importer.h"
50 #include "LockMgr/LockMgr.h"
51 #include "Parser/ParserWrapper.h"
53 #include "Parser/parser.h"
56 #include "QueryEngine/Execute.h"
65 #include "Shared/StringTransform.h"
66 #include "Shared/SysInfo.h"
67 #include "Shared/geo_types.h"
68 #include "Shared/geosupport.h"
69 #include "Shared/import_helpers.h"
71 #include "Shared/measure.h"
72 #include "Shared/scope.h"
73 
74 #include <fcntl.h>
75 #include <picosha2.h>
76 #include <sys/time.h>
77 #include <sys/types.h>
78 #include <sys/wait.h>
79 #include <unistd.h>
80 #include <algorithm>
81 #include <boost/algorithm/string.hpp>
82 #include <boost/filesystem.hpp>
83 #include <boost/make_shared.hpp>
84 #include <boost/process/search_path.hpp>
85 #include <boost/program_options.hpp>
86 #include <boost/regex.hpp>
87 #include <boost/tokenizer.hpp>
88 #include <cmath>
89 #include <csignal>
90 #include <fstream>
91 #include <future>
92 #include <map>
93 #include <memory>
94 #include <random>
95 #include <regex>
96 #include <string>
97 #include <thread>
98 #include <typeinfo>
99 
100 #include <arrow/api.h>
101 #include <arrow/io/api.h>
102 #include <arrow/ipc/api.h>
103 
104 #include "Shared/ArrowUtil.h"
105 
106 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
107 
110 
111 #define INVALID_SESSION_ID ""
112 
113 #define THROW_MAPD_EXCEPTION(errstr) \
114  { \
115  TOmniSciException ex; \
116  ex.error_msg = errstr; \
117  LOG(ERROR) << ex.error_msg; \
118  throw ex; \
119  }
120 
121 thread_local std::string TrackingProcessor::client_address;
123 
124 namespace {
125 
126 SessionMap::iterator get_session_from_map(const TSessionId& session,
127  SessionMap& session_map) {
128  auto session_it = session_map.find(session);
129  if (session_it == session_map.end()) {
130  THROW_MAPD_EXCEPTION("Session not valid.");
131  }
132  return session_it;
133 }
134 
135 struct ForceDisconnect : public std::runtime_error {
136  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
137 };
138 
139 } // namespace
140 
141 template <>
142 SessionMap::iterator DBHandler::get_session_it_unsafe(
143  const TSessionId& session,
144  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
145  auto session_it = get_session_from_map(session, sessions_);
146  try {
147  check_session_exp_unsafe(session_it);
148  } catch (const ForceDisconnect& e) {
149  read_lock.unlock();
150  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
151  auto session_it2 = get_session_from_map(session, sessions_);
152  disconnect_impl(session_it2, write_lock);
153  THROW_MAPD_EXCEPTION(e.what());
154  }
155  return session_it;
156 }
157 
158 template <>
159 SessionMap::iterator DBHandler::get_session_it_unsafe(
160  const TSessionId& session,
161  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
162  auto session_it = get_session_from_map(session, sessions_);
163  try {
164  check_session_exp_unsafe(session_it);
165  } catch (const ForceDisconnect& e) {
166  disconnect_impl(session_it, write_lock);
167  THROW_MAPD_EXCEPTION(e.what());
168  }
169  return session_it;
170 }
171 
172 #ifdef ENABLE_GEOS
173 extern std::unique_ptr<std::string> g_libgeos_so_filename;
174 #endif
175 
176 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
177  const std::vector<LeafHostInfo>& string_leaves,
178  const std::string& base_data_path,
179  const bool cpu_only,
180  const bool allow_multifrag,
181  const bool jit_debug,
182  const bool intel_jit_profile,
183  const bool read_only,
184  const bool allow_loop_joins,
185  const bool enable_rendering,
186  const bool enable_auto_clear_render_mem,
187  const int render_oom_retry_threshold,
188  const size_t render_mem_bytes,
189  const size_t max_concurrent_render_sessions,
190  const int num_gpus,
191  const int start_gpu,
192  const size_t reserved_gpu_mem,
193  const bool render_compositor_use_last_gpu,
194  const size_t num_reader_threads,
195  const AuthMetadata& authMetadata,
196  const SystemParameters& system_parameters,
197  const bool legacy_syntax,
198  const int idle_session_duration,
199  const int max_session_duration,
200  const bool enable_runtime_udf_registration,
201  const std::string& udf_filename,
202  const std::string& clang_path,
203  const std::vector<std::string>& clang_options,
204 #ifdef ENABLE_GEOS
205  const std::string& libgeos_so_filename,
206 #endif
207  const DiskCacheConfig& disk_cache_config)
208  : leaf_aggregator_(db_leaves)
209  , string_leaves_(string_leaves)
210  , base_data_path_(base_data_path)
211  , random_gen_(std::random_device{}())
212  , session_id_dist_(0, INT32_MAX)
213  , jit_debug_(jit_debug)
214  , intel_jit_profile_(intel_jit_profile)
215  , allow_multifrag_(allow_multifrag)
216  , read_only_(read_only)
217  , allow_loop_joins_(allow_loop_joins)
218  , authMetadata_(authMetadata)
219  , system_parameters_(system_parameters)
220  , legacy_syntax_(legacy_syntax)
221  , dispatch_queue_(
222  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
223  , super_user_rights_(false)
224  , idle_session_duration_(idle_session_duration * 60)
225  , max_session_duration_(max_session_duration * 60)
226  , runtime_udf_registration_enabled_(enable_runtime_udf_registration) {
227  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
228  // Register foreign storage interfaces here
230  bool is_rendering_enabled = enable_rendering;
231 
232  try {
233  if (cpu_only) {
234  is_rendering_enabled = false;
236  cpu_mode_only_ = true;
237  } else {
238 #ifdef HAVE_CUDA
240  cpu_mode_only_ = false;
241 #else
243  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
244  cpu_mode_only_ = true;
245  is_rendering_enabled = false;
246 #endif
247  }
248  } catch (const std::exception& e) {
249  LOG(FATAL) << "Failed to executor device type: " << e.what();
250  }
251 
252  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
253  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
254  // manage cannot ask for
255  size_t total_reserved = reserved_gpu_mem;
256  if (is_rendering_enabled) {
257  total_reserved += render_mem_bytes;
258  }
259 
260  try {
261  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
262  system_parameters,
264  num_gpus,
265  start_gpu,
266  total_reserved,
267  num_reader_threads,
268  disk_cache_config));
269  } catch (const std::exception& e) {
270  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
271  }
272 
273  std::string udf_ast_filename("");
274 
275  try {
276  if (!udf_filename.empty()) {
277  const auto cuda_mgr = data_mgr_->getCudaMgr();
278  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
279  cuda_mgr ? cuda_mgr->getDeviceArch()
281  UdfCompiler compiler(udf_filename, device_arch, clang_path, clang_options);
282  int compile_result = compiler.compileUdf();
283 
284  if (compile_result == 0) {
285  udf_ast_filename = compiler.getAstFileName();
286  }
287  }
288  } catch (const std::exception& e) {
289  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
290  }
291 
292  try {
293  calcite_ =
294  std::make_shared<Calcite>(system_parameters, base_data_path_, udf_ast_filename);
295  } catch (const std::exception& e) {
296  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
297  }
298 
299  try {
300  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
301  if (!udf_filename.empty()) {
302  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
303  }
304  } catch (const std::exception& e) {
305  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
306  }
307 
308  try {
310  } catch (const std::exception& e) {
311  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
312  }
313 
314  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
316  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
317  cpu_mode_only_ = true;
318  }
319 
320  switch (executor_device_type_) {
322  LOG(INFO) << "Started in GPU mode" << std::endl;
323  break;
325  LOG(INFO) << "Started in CPU mode" << std::endl;
326  break;
327  }
328 
329  try {
330  SysCatalog::instance().init(base_data_path_,
331  data_mgr_,
332  authMetadata,
333  calcite_,
334  false,
335  !db_leaves.empty(),
337  } catch (const std::exception& e) {
338  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
339  }
340 
341  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
342  start_time_ = std::time(nullptr);
343 
344  if (is_rendering_enabled) {
345  try {
346  render_handler_.reset(new RenderHandler(this,
347  render_mem_bytes,
348  max_concurrent_render_sessions,
349  render_compositor_use_last_gpu,
350  false,
351  0,
353  } catch (const std::exception& e) {
354  LOG(ERROR) << "Backend rendering disabled: " << e.what();
355  }
356  }
357 
358  if (leaf_aggregator_.leafCount() > 0) {
359  try {
360  agg_handler_.reset(new MapDAggHandler(this));
361  } catch (const std::exception& e) {
362  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
363  }
364  } else if (g_cluster) {
365  try {
366  leaf_handler_.reset(new MapDLeafHandler(this));
367  } catch (const std::exception& e) {
368  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
369  }
370  }
371 
372 #ifdef ENABLE_GEOS
373  if (!libgeos_so_filename.empty()) {
374  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename));
375  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
376  }
377 #endif
378 }
379 
381 
383  const std::string& query_str,
384  std::list<std::unique_ptr<Parser::Stmt>>& parse_trees) {
385  int num_parse_errors = 0;
386  std::string last_parsed;
387  SQLParser parser;
388  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
389  if (num_parse_errors > 0) {
390  throw std::runtime_error("Syntax error at: " + last_parsed);
391  }
392  if (parse_trees.size() > 1) {
393  throw std::runtime_error("multiple SQL statements not allowed");
394  } else if (parse_trees.size() != 1) {
395  throw std::runtime_error("empty SQL statment not allowed");
396  }
397 }
398 void DBHandler::check_read_only(const std::string& str) {
399  if (DBHandler::read_only_) {
400  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
401  }
402 }
403 
405  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
406  // We would create an in memory session for calcite with super user privileges which
407  // would be used for getting all tables metadata when a user runs the query. The
408  // session would be under the name of a proxy user/password which would only persist
409  // till server's lifetime or execution of calcite query(in memory) whichever is the
410  // earliest.
411  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
412  std::string session_id;
413  do {
414  session_id = generate_random_string(64);
415  } while (sessions_.find(session_id) != sessions_.end());
416  Catalog_Namespace::UserMetadata user_meta(-1,
417  calcite_->getInternalSessionProxyUserName(),
418  calcite_->getInternalSessionProxyPassword(),
419  true,
420  -1,
421  true);
422  const auto emplace_ret =
423  sessions_.emplace(session_id,
424  std::make_shared<Catalog_Namespace::SessionInfo>(
425  catalog_ptr, user_meta, executor_device_type_, session_id));
426  CHECK(emplace_ret.second);
427  return session_id;
428 }
429 
431  const Catalog_Namespace::UserMetadata user_meta) {
432  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
433  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
434  user_meta.isSuper.load();
435 }
436 
437 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
438  // Remove InMemory calcite Session.
439  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
440  const auto it = sessions_.find(session_id);
441  CHECK(it != sessions_.end());
442  sessions_.erase(it);
443 }
444 
445 // internal connection for connections with no password
446 void DBHandler::internal_connect(TSessionId& session,
447  const std::string& username,
448  const std::string& dbname) {
449  auto stdlog = STDLOG(); // session_info set by connect_impl()
450  std::string username2 = username; // login() may reset username given as argument
451  std::string dbname2 = dbname; // login() may reset dbname given as argument
453  std::shared_ptr<Catalog> cat = nullptr;
454  try {
455  cat =
456  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
457  } catch (std::exception& e) {
458  THROW_MAPD_EXCEPTION(e.what());
459  }
460 
461  DBObject dbObject(dbname2, DatabaseDBObjectType);
462  dbObject.loadKey(*cat);
463  dbObject.setPrivileges(AccessPrivileges::ACCESS);
464  std::vector<DBObject> dbObjects;
465  dbObjects.push_back(dbObject);
466  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
467  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
468  " is not allowed to access database " + dbname2 + ".");
469  }
470  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
471 }
472 
473 void DBHandler::krb5_connect(TKrb5Session& session,
474  const std::string& inputToken,
475  const std::string& dbname) {
476  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
477 }
478 
479 void DBHandler::connect(TSessionId& session,
480  const std::string& username,
481  const std::string& passwd,
482  const std::string& dbname) {
483  auto stdlog = STDLOG(); // session_info set by connect_impl()
484  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
485  std::string username2 = username; // login() may reset username given as argument
486  std::string dbname2 = dbname; // login() may reset dbname given as argument
488  std::shared_ptr<Catalog> cat = nullptr;
489  try {
490  cat = SysCatalog::instance().login(
491  dbname2, username2, passwd, user_meta, !super_user_rights_);
492  } catch (std::exception& e) {
493  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
494  THROW_MAPD_EXCEPTION(e.what());
495  }
496 
497  DBObject dbObject(dbname2, DatabaseDBObjectType);
498  dbObject.loadKey(*cat);
499  dbObject.setPrivileges(AccessPrivileges::ACCESS);
500  std::vector<DBObject> dbObjects;
501  dbObjects.push_back(dbObject);
502  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
503  stdlog.appendNameValuePairs(
504  "user", username, "db", dbname, "exception", "Missing Privileges");
505  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
506  " is not allowed to access database " + dbname2 + ".");
507  }
508  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
509  // if pki auth session will come back encrypted with user pubkey
510  SysCatalog::instance().check_for_session_encryption(passwd, session);
511 }
512 
513 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::create_new_session(
514  TSessionId& session,
515  const std::string& dbname,
516  const Catalog_Namespace::UserMetadata& user_meta,
517  std::shared_ptr<Catalog> cat) {
518  do {
519  session = generate_random_string(32);
520  } while (sessions_.find(session) != sessions_.end());
521  std::pair<SessionMap::iterator, bool> emplace_retval =
522  sessions_.emplace(session,
523  std::make_shared<Catalog_Namespace::SessionInfo>(
524  cat, user_meta, executor_device_type_, session));
525  CHECK(emplace_retval.second);
526  auto& session_ptr = emplace_retval.first->second;
527  LOG(INFO) << "User " << user_meta.userName << " connected to database " << dbname;
528  return session_ptr;
529 }
530 
531 void DBHandler::connect_impl(TSessionId& session,
532  const std::string& passwd,
533  const std::string& dbname,
534  const Catalog_Namespace::UserMetadata& user_meta,
535  std::shared_ptr<Catalog> cat,
536  query_state::StdLog& stdlog) {
537  // TODO(sy): Is there any reason to have dbname as a parameter
538  // here when the cat parameter already provides cat->name()?
539  // Should dbname and cat->name() ever differ?
540  {
541  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
542  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
543  stdlog.setSessionInfo(session_ptr);
544  session_ptr->set_connection_info(getConnectionInfo().toString());
545  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
546  // while doing warmup
547  if (leaf_aggregator_.leafCount() > 0) {
548  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
549  return;
550  }
551  }
552  }
553  auto const roles =
554  stdlog.getConstSessionInfo()->get_currentUser().isSuper
555  ? std::vector<std::string>{{"super"}}
556  : SysCatalog::instance().getRoles(
557  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
558  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
559 }
560 
561 void DBHandler::disconnect(const TSessionId& session) {
562  auto stdlog = STDLOG();
563  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
564 
565  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
566  auto session_it = get_session_it_unsafe(session, write_lock);
567  stdlog.setSessionInfo(session_it->second);
568  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
569 
570  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
571  << " disconnected from database " << dbname
572  << " with public_session_id: " << session_it->second->get_public_session_id();
573  disconnect_impl(session_it, write_lock);
574 }
575 
576 void DBHandler::disconnect_impl(const SessionMap::iterator& session_it,
577  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
578  // session_it existence should already have been checked (i.e. called via
579  // get_session_it_unsafe(...))
580  const auto session_id = session_it->second->get_session_id();
581  if (leaf_aggregator_.leafCount() > 0) {
582  leaf_aggregator_.disconnect(session_id);
583  }
584  sessions_.erase(session_it);
585  write_lock.unlock();
586 
587  if (render_handler_) {
588  render_handler_->disconnect(session_id);
589  }
590 }
591 
592 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
593  auto stdlog = STDLOG(get_session_ptr(session));
594  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
595  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
596  auto session_it = get_session_it_unsafe(session, write_lock);
597 
598  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
599 
600  try {
601  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
602  dbname2, session_it->second->get_currentUser().userName);
603  session_it->second->set_catalog_ptr(cat);
604  if (leaf_aggregator_.leafCount() > 0) {
605  leaf_aggregator_.switch_database(session, dbname);
606  return;
607  }
608  } catch (std::exception& e) {
609  THROW_MAPD_EXCEPTION(e.what());
610  }
611 }
612 
613 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
614  auto stdlog = STDLOG(get_session_ptr(session1));
615  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
616  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
617  auto session_it = get_session_it_unsafe(session1, write_lock);
618 
619  try {
620  const Catalog_Namespace::UserMetadata& user_meta =
621  session_it->second->get_currentUser();
622  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
623  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
624  if (leaf_aggregator_.leafCount() > 0) {
625  leaf_aggregator_.clone_session(session1, session2);
626  return;
627  }
628  } catch (std::exception& e) {
629  THROW_MAPD_EXCEPTION(e.what());
630  }
631 }
632 
633 void DBHandler::interrupt(const TSessionId& query_session,
634  const TSessionId& interrupt_session) {
635  // if this is for distributed setting, query_session becomes a parent session (agg)
636  // and the interrupt session is one of existing session in the leaf node (leaf)
637  // so we can think there exists a logical mapping
638  // between query_session (agg) and interrupt_session (leaf)
639  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
640  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
642  // Shared lock to allow simultaneous interrupts of multiple sessions
643  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
644 
645  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
646  auto& cat = session_it->second.get()->getCatalog();
647  const auto dbname = cat.getCurrentDB().dbName;
649  jit_debug_ ? "/tmp" : "",
650  jit_debug_ ? "mapdquery" : "",
652  CHECK(executor);
653 
654  VLOG(1) << "Received interrupt: "
655  << "Session " << *session_it->second << ", Executor " << executor
656  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
657  << session_it->second->get_currentUser().userName << ", Database " << dbname
658  << std::endl;
659 
660  if (leaf_aggregator_.leafCount() > 0) {
661  leaf_aggregator_.interrupt(query_session, interrupt_session);
662  }
663  executor->interrupt(query_session, interrupt_session);
664 
665  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
666  << " interrupted session with database " << dbname << std::endl;
667  }
668 }
669 
670 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
671  auto stdlog = STDLOG(get_session_ptr(session));
672  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
673  const auto rendering_enabled = bool(render_handler_);
674  _return.read_only = read_only_;
675  _return.version = MAPD_RELEASE;
676  _return.rendering_enabled = rendering_enabled;
677  _return.poly_rendering_enabled = rendering_enabled;
678  _return.start_time = start_time_;
679  _return.edition = MAPD_EDITION;
680  _return.host_name = get_hostname();
681 }
682 
683 void DBHandler::get_status(std::vector<TServerStatus>& _return,
684  const TSessionId& session) {
685  auto stdlog = STDLOG(get_session_ptr(session));
686  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
687  const auto rendering_enabled = bool(render_handler_);
688  TServerStatus ret;
689  ret.read_only = read_only_;
690  ret.version = MAPD_RELEASE;
691  ret.rendering_enabled = rendering_enabled;
692  ret.poly_rendering_enabled = rendering_enabled;
693  ret.start_time = start_time_;
694  ret.edition = MAPD_EDITION;
695  ret.host_name = get_hostname();
696 
697  // TSercivePort tcp_port{}
698 
699  if (g_cluster) {
700  ret.role =
701  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
702  } else {
703  ret.role = TRole::type::SERVER;
704  }
705 
706  _return.push_back(ret);
707  if (leaf_aggregator_.leafCount() > 0) {
708  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
709  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
710  }
711 }
712 
713 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
714  const TSessionId& session) {
715  auto stdlog = STDLOG(get_session_ptr(session));
716  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
717  THardwareInfo ret;
718  const auto cuda_mgr = data_mgr_->getCudaMgr();
719  if (cuda_mgr) {
720  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
721  ret.start_gpu = cuda_mgr->getStartGpu();
722  if (ret.start_gpu >= 0) {
723  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
724  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
725  }
726  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
727  TGpuSpecification gpu_spec;
728  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
729  gpu_spec.num_sm = deviceProperties->numMPs;
730  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
731  gpu_spec.memory = deviceProperties->globalMem;
732  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
733  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
734  ret.gpu_info.push_back(gpu_spec);
735  }
736  }
737 
738  // start hardware/OS dependent code
739  ret.num_cpu_hw = std::thread::hardware_concurrency();
740  // ^ This might return diffrent results in case of hyper threading
741  // end hardware/OS dependent code
742 
743  _return.hardware_info.push_back(ret);
744  if (leaf_aggregator_.leafCount() > 0) {
745  ret.host_name = "aggregator";
746  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
747  _return.hardware_info.insert(_return.hardware_info.end(),
748  leaf_hardware.hardware_info.begin(),
749  leaf_hardware.hardware_info.end());
750  }
751 }
752 
753 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
754  auto session_ptr = get_session_ptr(session);
755  auto stdlog = STDLOG(session_ptr);
756  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
757  auto user_metadata = session_ptr->get_currentUser();
758 
759  _return.user = user_metadata.userName;
760  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
761  _return.start_time = session_ptr->get_start_time();
762  _return.is_super = user_metadata.isSuper;
763 }
764 
766  const SQLTypeInfo& ti,
767  TColumn& column) {
768  if (ti.is_array()) {
769  TColumn tColumn;
770  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
771  CHECK(array_tv);
772  bool is_null = !array_tv->is_initialized();
773  if (!is_null) {
774  const auto& vec = array_tv->get();
775  for (const auto& elem_tv : vec) {
776  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
777  }
778  }
779  column.data.arr_col.push_back(tColumn);
780  column.nulls.push_back(is_null && !ti.get_notnull());
781  } else if (ti.is_column()) {
782  value_to_thrift_column(tv, ti.get_elem_type(), column);
783  } else if (ti.is_geometry()) {
784  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
785  if (scalar_tv) {
786  auto s_n = boost::get<NullableString>(scalar_tv);
787  auto s = boost::get<std::string>(s_n);
788  if (s) {
789  column.data.str_col.push_back(*s);
790  } else {
791  column.data.str_col.emplace_back(""); // null string
792  auto null_p = boost::get<void*>(s_n);
793  CHECK(null_p && !*null_p);
794  }
795  column.nulls.push_back(!s && !ti.get_notnull());
796  } else {
797  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
798  CHECK(array_tv);
799  bool is_null = !array_tv->is_initialized();
800  if (!is_null) {
801  auto elem_type = SQLTypeInfo(kDOUBLE, false);
802  TColumn tColumn;
803  const auto& vec = array_tv->get();
804  for (const auto& elem_tv : vec) {
805  value_to_thrift_column(elem_tv, elem_type, tColumn);
806  }
807  column.data.arr_col.push_back(tColumn);
808  column.nulls.push_back(false);
809  } else {
810  TColumn tColumn;
811  column.data.arr_col.push_back(tColumn);
812  column.nulls.push_back(is_null && !ti.get_notnull());
813  }
814  }
815  } else {
816  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
817  CHECK(scalar_tv);
818  if (boost::get<int64_t>(scalar_tv)) {
819  int64_t data = *(boost::get<int64_t>(scalar_tv));
820 
821  if (ti.is_decimal()) {
822  double val = static_cast<double>(data);
823  if (ti.get_scale() > 0) {
824  val /= pow(10.0, std::abs(ti.get_scale()));
825  }
826  column.data.real_col.push_back(val);
827  } else {
828  column.data.int_col.push_back(data);
829  }
830 
831  switch (ti.get_type()) {
832  case kBOOLEAN:
833  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
834  break;
835  case kTINYINT:
836  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
837  break;
838  case kSMALLINT:
839  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
840  break;
841  case kINT:
842  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
843  break;
844  case kNUMERIC:
845  case kDECIMAL:
846  case kBIGINT:
847  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
848  break;
849  case kTIME:
850  case kTIMESTAMP:
851  case kDATE:
852  case kINTERVAL_DAY_TIME:
854  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
855  break;
856  default:
857  column.nulls.push_back(false);
858  }
859  } else if (boost::get<double>(scalar_tv)) {
860  double data = *(boost::get<double>(scalar_tv));
861  column.data.real_col.push_back(data);
862  if (ti.get_type() == kFLOAT) {
863  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
864  } else {
865  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
866  }
867  } else if (boost::get<float>(scalar_tv)) {
868  CHECK_EQ(kFLOAT, ti.get_type());
869  float data = *(boost::get<float>(scalar_tv));
870  column.data.real_col.push_back(data);
871  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
872  } else if (boost::get<NullableString>(scalar_tv)) {
873  auto s_n = boost::get<NullableString>(scalar_tv);
874  auto s = boost::get<std::string>(s_n);
875  if (s) {
876  column.data.str_col.push_back(*s);
877  } else {
878  column.data.str_col.emplace_back(""); // null string
879  auto null_p = boost::get<void*>(s_n);
880  CHECK(null_p && !*null_p);
881  }
882  column.nulls.push_back(!s && !ti.get_notnull());
883  } else {
884  CHECK(false);
885  }
886  }
887 }
888 
889 TDatum DBHandler::value_to_thrift(const TargetValue& tv, const SQLTypeInfo& ti) {
890  TDatum datum;
891  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
892  if (!scalar_tv) {
893  CHECK(ti.is_array());
894  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
895  CHECK(array_tv);
896  if (array_tv->is_initialized()) {
897  const auto& vec = array_tv->get();
898  for (const auto& elem_tv : vec) {
899  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
900  datum.val.arr_val.push_back(scalar_col_val);
901  }
902  // Datum is not null, at worst it's an empty array Datum
903  datum.is_null = false;
904  } else {
905  datum.is_null = true;
906  }
907  return datum;
908  }
909  if (boost::get<int64_t>(scalar_tv)) {
910  int64_t data = *(boost::get<int64_t>(scalar_tv));
911 
912  if (ti.is_decimal()) {
913  double val = static_cast<double>(data);
914  if (ti.get_scale() > 0) {
915  val /= pow(10.0, std::abs(ti.get_scale()));
916  }
917  datum.val.real_val = val;
918  } else {
919  datum.val.int_val = data;
920  }
921 
922  switch (ti.get_type()) {
923  case kBOOLEAN:
924  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
925  break;
926  case kTINYINT:
927  datum.is_null = (datum.val.int_val == NULL_TINYINT);
928  break;
929  case kSMALLINT:
930  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
931  break;
932  case kINT:
933  datum.is_null = (datum.val.int_val == NULL_INT);
934  break;
935  case kDECIMAL:
936  case kNUMERIC:
937  case kBIGINT:
938  datum.is_null = (datum.val.int_val == NULL_BIGINT);
939  break;
940  case kTIME:
941  case kTIMESTAMP:
942  case kDATE:
943  case kINTERVAL_DAY_TIME:
945  datum.is_null = (datum.val.int_val == NULL_BIGINT);
946  break;
947  default:
948  datum.is_null = false;
949  }
950  } else if (boost::get<double>(scalar_tv)) {
951  datum.val.real_val = *(boost::get<double>(scalar_tv));
952  if (ti.get_type() == kFLOAT) {
953  datum.is_null = (datum.val.real_val == NULL_FLOAT);
954  } else {
955  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
956  }
957  } else if (boost::get<float>(scalar_tv)) {
958  CHECK_EQ(kFLOAT, ti.get_type());
959  datum.val.real_val = *(boost::get<float>(scalar_tv));
960  datum.is_null = (datum.val.real_val == NULL_FLOAT);
961  } else if (boost::get<NullableString>(scalar_tv)) {
962  auto s_n = boost::get<NullableString>(scalar_tv);
963  auto s = boost::get<std::string>(s_n);
964  if (s) {
965  datum.val.str_val = *s;
966  } else {
967  auto null_p = boost::get<void*>(s_n);
968  CHECK(null_p && !*null_p);
969  }
970  datum.is_null = !s;
971  } else {
972  CHECK(false);
973  }
974  return datum;
975 }
976 
977 void DBHandler::sql_execute(TQueryResult& _return,
978  const TSessionId& session,
979  const std::string& query_str,
980  const bool column_format,
981  const std::string& nonce,
982  const int32_t first_n,
983  const int32_t at_most_n) {
984  auto session_ptr = get_session_ptr(session);
985  auto query_state = create_query_state(session_ptr, query_str);
986  auto stdlog = STDLOG(session_ptr, query_state);
987  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
988  stdlog.appendNameValuePairs("nonce", nonce);
989  auto timer = DEBUG_TIMER(__func__);
990 
991  try {
992  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
993  geo_copy_from_sessions.remove(session_ptr->get_session_id());
994  };
995 
996  if (first_n >= 0 && at_most_n >= 0) {
998  std::string("At most one of first_n and at_most_n can be set"));
999  }
1000 
1001  if (leaf_aggregator_.leafCount() > 0) {
1002  if (!agg_handler_) {
1003  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
1004  }
1005  _return.total_time_ms = measure<>::execution([&]() {
1006  agg_handler_->cluster_execute(_return,
1007  query_state->createQueryStateProxy(),
1008  query_state->getQueryStr(),
1009  column_format,
1010  nonce,
1011  first_n,
1012  at_most_n,
1014  });
1015  _return.nonce = nonce;
1016  } else {
1017  _return.total_time_ms = measure<>::execution([&]() {
1019  query_state->createQueryStateProxy(),
1020  column_format,
1021  nonce,
1022  session_ptr->get_executor_device_type(),
1023  first_n,
1024  at_most_n);
1025  });
1026  }
1027 
1028  // if the SQL statement we just executed was a geo COPY FROM, the import
1029  // parameters were captured, and this flag set, so we do the actual import here
1030  if (auto geo_copy_from_state =
1031  geo_copy_from_sessions(session_ptr->get_session_id())) {
1032  // import_geo_table() calls create_table() which calls this function to
1033  // do the work, so reset the flag now to avoid executing this part a
1034  // second time at the end of that, which would fail as the table was
1035  // already created! Also reset the flag with a ScopeGuard on exiting
1036  // this function any other way, such as an exception from the code above!
1037  geo_copy_from_sessions.remove(session_ptr->get_session_id());
1038 
1039  // create table as replicated?
1040  TCreateParams create_params;
1041  if (geo_copy_from_state->geo_copy_from_partitions == "REPLICATED") {
1042  create_params.is_replicated = true;
1043  }
1044 
1045  // now do (and time) the import
1046  _return.total_time_ms = measure<>::execution([&]() {
1048  session,
1049  geo_copy_from_state->geo_copy_from_table,
1050  geo_copy_from_state->geo_copy_from_file_name,
1051  copyparams_to_thrift(geo_copy_from_state->geo_copy_from_copy_params),
1052  TRowDescriptor(),
1053  create_params);
1054  });
1055  }
1056  std::string debug_json = timer.stopAndGetJson();
1057  if (!debug_json.empty()) {
1058  _return.__set_debug(std::move(debug_json));
1059  }
1060  stdlog.appendNameValuePairs(
1061  "execution_time_ms",
1062  _return.execution_time_ms,
1063  "total_time_ms", // BE-3420 - Redundant with duration field
1064  stdlog.duration<std::chrono::milliseconds>());
1065  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1066  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1067  } catch (const std::exception& e) {
1068  if (strstr(e.what(), "java.lang.NullPointerException")) {
1069  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
1070  "query failed from broken view or other schema related issue");
1071  } else if (strstr(e.what(), "Parse failed: Encountered \";\"")) {
1072  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1073  } else if (strstr(e.what(),
1074  "Parse failed: Encountered \"<EOF>\" at line 0, column 0")) {
1075  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1076  } else {
1077  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1078  }
1079  }
1080 }
1081 
1082 void DBHandler::sql_execute_df(TDataFrame& _return,
1083  const TSessionId& session,
1084  const std::string& query_str,
1085  const TDeviceType::type device_type,
1086  const int32_t device_id,
1087  const int32_t first_n) {
1088  auto session_ptr = get_session_ptr(session);
1089  auto query_state = create_query_state(session_ptr, query_str);
1090  auto stdlog = STDLOG(session_ptr, query_state);
1091 
1092  if (device_type == TDeviceType::GPU) {
1093  const auto executor_device_type = session_ptr->get_executor_device_type();
1094  if (executor_device_type != ExecutorDeviceType::GPU) {
1096  std::string("Exception: GPU mode is not allowed in this session"));
1097  }
1098  if (!data_mgr_->gpusPresent()) {
1099  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
1100  }
1101  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1103  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
1104  }
1105  }
1106  _return.execution_time_ms = 0;
1107 
1108  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
1111 
1112  try {
1113  ParserWrapper pw{query_str};
1114  if (!pw.is_ddl && !pw.is_update_dml &&
1115  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
1116  std::string query_ra;
1118  _return.execution_time_ms += measure<>::execution([&]() {
1119  TPlanResult result;
1120  std::tie(result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1121  query_str,
1122  {},
1123  true,
1125  query_ra = result.plan_result;
1126  });
1127 
1128  if (pw.isCalciteExplain()) {
1129  throw std::runtime_error("explain is not unsupported by current thrift API");
1130  }
1131  execute_rel_alg_df(_return,
1132  query_ra,
1133  query_state->createQueryStateProxy(),
1134  *session_ptr,
1135  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
1137  static_cast<size_t>(device_id),
1138  first_n);
1139  return;
1140  }
1141  } catch (std::exception& e) {
1142  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1143  }
1145  "Exception: DDL or update DML are not unsupported by current thrift API");
1146 }
1147 
1148 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1149  const TSessionId& session,
1150  const std::string& query_str,
1151  const int32_t device_id,
1152  const int32_t first_n) {
1153  auto stdlog = STDLOG(get_session_ptr(session));
1154  sql_execute_df(_return, session, query_str, TDeviceType::GPU, device_id, first_n);
1155 }
1156 
1157 // For now we have only one user of a data frame in all cases.
1158 void DBHandler::deallocate_df(const TSessionId& session,
1159  const TDataFrame& df,
1160  const TDeviceType::type device_type,
1161  const int32_t device_id) {
1162  auto stdlog = STDLOG(get_session_ptr(session));
1163  std::string serialized_cuda_handle = "";
1164  if (device_type == TDeviceType::GPU) {
1165  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1166  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1167  TOmniSciException ex;
1168  ex.error_msg = std::string(
1169  "Exception: current data frame handle is not bookkept or been inserted "
1170  "twice");
1171  LOG(ERROR) << ex.error_msg;
1172  throw ex;
1173  }
1174  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1175  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1176  }
1177  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1178  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1180  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1182  result,
1183  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1184  device_id,
1185  data_mgr_);
1186 }
1187 
1188 std::string DBHandler::apply_copy_to_shim(const std::string& query_str) {
1189  auto result = query_str;
1190  {
1191  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
1192  // boost::regex::icase};
1193  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
1194  boost::regex::extended | boost::regex::icase};
1195  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
1196  result.replace(
1197  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
1198  });
1199  }
1200  return result;
1201 }
1202 
1203 void DBHandler::sql_validate(TRowDescriptor& _return,
1204  const TSessionId& session,
1205  const std::string& query_str) {
1206  auto stdlog = STDLOG(get_session_ptr(session));
1207  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1208  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1209  stdlog.setQueryState(query_state);
1210 
1211  ParserWrapper pw{query_str};
1212  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1213  pw.is_update_dml) {
1214  THROW_MAPD_EXCEPTION("Can only validate SELECT statements.");
1215  }
1216  DBHandler::validate_rel_alg(_return, query_state->createQueryStateProxy());
1217 }
1218 
1219 namespace {
1220 
1222  std::unordered_set<std::string> uc_column_names;
1223  std::unordered_set<std::string> uc_column_table_qualifiers;
1224 };
1225 
1226 // Extract what looks like a (qualified) identifier from the partial query.
1227 // The results will be used to rank the auto-completion results: tables which
1228 // contain at least one of the identifiers first.
1230  const std::string& sql) {
1231  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1232  boost::regex::extended | boost::regex::icase};
1233  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1234  boost::sregex_token_iterator end;
1235  std::unordered_set<std::string> uc_column_names;
1236  std::unordered_set<std::string> uc_column_table_qualifiers;
1237  for (; tok_it != end; ++tok_it) {
1238  std::string column_name = *tok_it;
1239  std::vector<std::string> column_tokens;
1240  boost::split(column_tokens, column_name, boost::is_any_of("."));
1241  if (column_tokens.size() == 2) {
1242  // If the column name is qualified, take user's word.
1243  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1244  } else {
1245  uc_column_names.insert(to_upper(column_name));
1246  }
1247  }
1248  return {uc_column_names, uc_column_table_qualifiers};
1249 }
1250 
1251 } // namespace
1252 
1253 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1254  const TSessionId& session,
1255  const std::string& sql,
1256  const int cursor) {
1257  auto stdlog = STDLOG(get_session_ptr(session));
1258  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1259  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1260  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1261  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1262  proj_tokens.uc_column_names, visible_tables, stdlog);
1263  // Add the table qualifiers explicitly specified by the user.
1264  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1265  proj_tokens.uc_column_table_qualifiers.end());
1266  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1267  std::sort(
1268  hints.begin(),
1269  hints.end(),
1270  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1271  if (lhs.type == TCompletionHintType::TABLE &&
1272  rhs.type == TCompletionHintType::TABLE) {
1273  // Between two tables, one which is compatible with the specified
1274  // projections and one which isn't, pick the one which is compatible.
1275  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1276  compatible_table_names.end() &&
1277  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1278  compatible_table_names.end()) {
1279  return true;
1280  }
1281  }
1282  return lhs.type < rhs.type;
1283  });
1284 }
1285 
1286 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1287  std::vector<std::string>& visible_tables,
1288  query_state::StdLog& stdlog,
1289  const std::string& sql,
1290  const int cursor) {
1291  const auto& session_info = *stdlog.getConstSessionInfo();
1292  try {
1293  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1294  // Filter out keywords suggested by Calcite which we don't support.
1296  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1297  } catch (const std::exception& e) {
1298  TOmniSciException ex;
1299  ex.error_msg = "Exception: " + std::string(e.what());
1300  LOG(ERROR) << ex.error_msg;
1301  throw ex;
1302  }
1303  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1304  const size_t length_to_cursor =
1305  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1306  // Trust hints from Calcite after the FROM keyword.
1307  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1308  return;
1309  }
1310  // Before FROM, the query is too incomplete for context-sensitive completions.
1311  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1312 }
1313 
1314 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1315  query_state::StdLog& stdlog,
1316  std::vector<std::string>& visible_tables,
1317  const std::string& sql,
1318  const int cursor) {
1319  const auto last_word =
1320  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1321  boost::regex select_expr{R"(\s*select\s+)",
1322  boost::regex::extended | boost::regex::icase};
1323  const size_t length_to_cursor =
1324  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1325  // After SELECT but before FROM, look for all columns in all tables which match the
1326  // prefix.
1327  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1328  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1329  // Trust the fully qualified columns the most.
1330  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1331  return;
1332  }
1333  // Not much information to use, just retrieve column names which match the prefix.
1334  if (should_suggest_column_hints(sql)) {
1335  get_column_hints(hints, last_word, column_names_by_table);
1336  return;
1337  }
1338  const std::string kFromKeyword{"FROM"};
1339  if (boost::istarts_with(kFromKeyword, last_word)) {
1340  TCompletionHint keyword_hint;
1341  keyword_hint.type = TCompletionHintType::KEYWORD;
1342  keyword_hint.replaced = last_word;
1343  keyword_hint.hints.emplace_back(kFromKeyword);
1344  hints.push_back(keyword_hint);
1345  }
1346  } else {
1347  const std::string kSelectKeyword{"SELECT"};
1348  if (boost::istarts_with(kSelectKeyword, last_word)) {
1349  TCompletionHint keyword_hint;
1350  keyword_hint.type = TCompletionHintType::KEYWORD;
1351  keyword_hint.replaced = last_word;
1352  keyword_hint.hints.emplace_back(kSelectKeyword);
1353  hints.push_back(keyword_hint);
1354  }
1355  }
1356 }
1357 
1358 std::unordered_map<std::string, std::unordered_set<std::string>>
1359 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1360  query_state::StdLog& stdlog) {
1361  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1362  for (auto it = table_names.begin(); it != table_names.end();) {
1363  TTableDetails table_details;
1364  try {
1365  get_table_details_impl(table_details, stdlog, *it, false, false);
1366  } catch (const TOmniSciException& e) {
1367  // Remove the corrupted Table/View name from the list for further processing.
1368  it = table_names.erase(it);
1369  continue;
1370  }
1371  for (const auto& column_type : table_details.row_desc) {
1372  column_names_by_table[*it].emplace(column_type.col_name);
1373  }
1374  ++it;
1375  }
1376  return column_names_by_table;
1377 }
1378 
1382 }
1383 
1385  const std::unordered_set<std::string>& uc_column_names,
1386  std::vector<std::string>& table_names,
1387  query_state::StdLog& stdlog) {
1388  std::unordered_set<std::string> compatible_table_names_by_column;
1389  for (auto it = table_names.begin(); it != table_names.end();) {
1390  TTableDetails table_details;
1391  try {
1392  get_table_details_impl(table_details, stdlog, *it, false, false);
1393  } catch (const TOmniSciException& e) {
1394  // Remove the corrupted Table/View name from the list for further processing.
1395  it = table_names.erase(it);
1396  continue;
1397  }
1398  for (const auto& column_type : table_details.row_desc) {
1399  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1400  compatible_table_names_by_column.emplace(to_upper(*it));
1401  break;
1402  }
1403  }
1404  ++it;
1405  }
1406  return compatible_table_names_by_column;
1407 }
1408 
1409 void DBHandler::validate_rel_alg(TRowDescriptor& _return,
1410  QueryStateProxy query_state_proxy) {
1411  try {
1412  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
1415 
1416  // TODO(adb): for a validate query we do not need write locks, though the lock would
1417  // generally be short lived.
1418  TPlanResult parse_result;
1420  std::tie(parse_result, locks) =
1421  parse_to_ra(query_state_proxy,
1422  query_state_proxy.getQueryState().getQueryStr(),
1423  {},
1424  true,
1426  const auto query_ra = parse_result.plan_result;
1427 
1428  TQueryResult result;
1430  query_state_proxy,
1431  query_ra,
1432  true,
1434  -1,
1435  -1,
1436  /*just_validate=*/true,
1437  /*find_filter_push_down_candidates=*/false,
1439 
1440  _return = fixup_row_descriptor(
1441  result.row_set.row_desc,
1442  query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog());
1443  } catch (std::exception& e) {
1444  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1445  }
1446 }
1447 
1448 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1449  auto stdlog = STDLOG(get_session_ptr(session));
1450  auto session_ptr = stdlog.getConstSessionInfo();
1451  if (!session_ptr->get_currentUser().isSuper) {
1452  // WARNING: This appears to not include roles a user is a member of,
1453  // if the role has no permissions granted to it.
1454  roles =
1455  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1456  session_ptr->getCatalog().getCurrentDB().dbId);
1457  } else {
1458  roles = SysCatalog::instance().getRoles(
1459  false, true, session_ptr->get_currentUser().userName);
1460  }
1461 }
1462 
1463 bool DBHandler::has_role(const TSessionId& sessionId,
1464  const std::string& granteeName,
1465  const std::string& roleName) {
1466  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1467  const auto session_ptr = stdlog.getConstSessionInfo();
1468  const auto current_user = session_ptr->get_currentUser();
1469  if (!current_user.isSuper) {
1470  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1471  user && current_user.userName != granteeName) {
1472  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1473  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1474  current_user.userName, granteeName, true)) {
1476  "Only super users can check roles assignment that have not been directly "
1477  "granted to a user.");
1478  }
1479  }
1480  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1481 }
1482 
1483 static TDBObject serialize_db_object(const std::string& roleName,
1484  const DBObject& inObject) {
1485  TDBObject outObject;
1486  outObject.objectName = inObject.getName();
1487  outObject.grantee = roleName;
1488  const auto ap = inObject.getPrivileges();
1489  switch (inObject.getObjectKey().permissionType) {
1490  case DatabaseDBObjectType:
1491  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1492  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1493  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1494  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1495  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1496 
1497  break;
1498  case TableDBObjectType:
1499  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1500  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1501  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1502  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1503  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1504  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1505  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1506  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1507  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1508 
1509  break;
1510  case DashboardDBObjectType:
1511  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1512  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1513  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1514  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1515  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1516 
1517  break;
1518  case ViewDBObjectType:
1519  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1520  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1521  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1522  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1523  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1524  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1525  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1526 
1527  break;
1528  default:
1529  CHECK(false);
1530  }
1531  const int type_val = static_cast<int>(inObject.getType());
1532  CHECK(type_val >= 0 && type_val < 5);
1533  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1534  return outObject;
1535 }
1536 
1538  const TDBObjectPermissions& permissions) {
1539  if (!permissions.__isset.database_permissions_) {
1540  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1541  }
1542  auto perms = permissions.database_permissions_;
1543  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1544  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1545  (perms.view_sql_editor_ &&
1547  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1548  return false;
1549  } else {
1550  return true;
1551  }
1552 }
1553 
1555  const TDBObjectPermissions& permissions) {
1556  if (!permissions.__isset.table_permissions_) {
1557  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1558  }
1559  auto perms = permissions.table_permissions_;
1560  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1561  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1562  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1563  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1564  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1565  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1566  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1567  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1568  return false;
1569  } else {
1570  return true;
1571  }
1572 }
1573 
1575  const TDBObjectPermissions& permissions) {
1576  if (!permissions.__isset.dashboard_permissions_) {
1577  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1578  }
1579  auto perms = permissions.dashboard_permissions_;
1580  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1581  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1582  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1583  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1584  return false;
1585  } else {
1586  return true;
1587  }
1588 }
1589 
1591  const TDBObjectPermissions& permissions) {
1592  if (!permissions.__isset.view_permissions_) {
1593  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1594  }
1595  auto perms = permissions.view_permissions_;
1596  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1597  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1598  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1599  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1600  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1601  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1602  return false;
1603  } else {
1604  return true;
1605  }
1606 }
1607 
1608 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1609  const std::string& granteeName,
1610  const std::string& objectName,
1611  const TDBObjectType::type objectType,
1612  const TDBObjectPermissions& permissions) {
1613  auto stdlog = STDLOG(get_session_ptr(sessionId));
1614  auto session_ptr = stdlog.getConstSessionInfo();
1615  auto const& cat = session_ptr->getCatalog();
1616  auto const& current_user = session_ptr->get_currentUser();
1617  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1618  current_user.userName, granteeName, false)) {
1620  "Users except superusers can only check privileges for self or roles granted "
1621  "to "
1622  "them.")
1623  }
1625  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1626  user_meta.isSuper) {
1627  return true;
1628  }
1629  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1630  if (!grnt) {
1631  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1632  }
1634  std::string func_name;
1635  switch (objectType) {
1638  func_name = "database";
1639  break;
1642  func_name = "table";
1643  break;
1646  func_name = "dashboard";
1647  break;
1650  func_name = "view";
1651  break;
1652  default:
1653  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1654  }
1655  DBObject req_object(objectName, type);
1656  req_object.loadKey(cat);
1657 
1658  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1659  if (grantee_object) {
1660  // if grantee has privs on the object
1661  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1662  } else {
1663  // no privileges on that object
1664  return false;
1665  }
1666 }
1667 
1668 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1669  const TSessionId& sessionId,
1670  const std::string& roleName) {
1671  auto stdlog = STDLOG(get_session_ptr(sessionId));
1672  auto session_ptr = stdlog.getConstSessionInfo();
1673  auto const& user = session_ptr->get_currentUser();
1674  if (!user.isSuper &&
1675  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1676  return;
1677  }
1678  auto* rl = SysCatalog::instance().getGrantee(roleName);
1679  if (rl) {
1680  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1681  for (auto& dbObject : *rl->getDbObjects(true)) {
1682  if (dbObject.first.dbId != dbId) {
1683  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1684  // usecase for now, though)
1685  continue;
1686  }
1687  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1688  TDBObjectsForRole.push_back(tdbObject);
1689  }
1690  } else {
1691  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
1692  }
1693 }
1694 
1695 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1696  const TSessionId& sessionId,
1697  const std::string& objectName,
1698  const TDBObjectType::type type) {
1699  auto stdlog = STDLOG(get_session_ptr(sessionId));
1700  auto session_ptr = stdlog.getConstSessionInfo();
1701  DBObjectType object_type;
1702  switch (type) {
1704  object_type = DBObjectType::DatabaseDBObjectType;
1705  break;
1707  object_type = DBObjectType::TableDBObjectType;
1708  break;
1711  break;
1713  object_type = DBObjectType::ViewDBObjectType;
1714  break;
1715  default:
1716  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
1717  ": unknown object type (" + std::to_string(type) + ").");
1718  }
1719  DBObject object_to_find(objectName, object_type);
1720 
1721  // TODO(adb): Use DatabaseLock to protect method
1722  try {
1723  if (object_type == DashboardDBObjectType) {
1724  if (objectName == "") {
1725  object_to_find = DBObject(-1, object_type);
1726  } else {
1727  object_to_find = DBObject(std::stoi(objectName), object_type);
1728  }
1729  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1730  !objectName.empty()) {
1731  // special handling for view / table
1732  auto const& cat = session_ptr->getCatalog();
1733  auto td = cat.getMetadataForTable(objectName, false);
1734  if (td) {
1735  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1736  object_to_find = DBObject(objectName, object_type);
1737  }
1738  }
1739  object_to_find.loadKey(session_ptr->getCatalog());
1740  } catch (const std::exception&) {
1741  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
1742  }
1743 
1744  // object type on database level
1745  DBObject object_to_find_dblevel("", object_type);
1746  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
1747  // if user is superuser respond with a full priv
1748  if (session_ptr->get_currentUser().isSuper) {
1749  // using ALL_TABLE here to set max permissions
1750  DBObject dbObj{object_to_find.getObjectKey(),
1752  session_ptr->get_currentUser().userId};
1753  dbObj.setName("super");
1754  TDBObjects.push_back(
1755  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
1756  };
1757 
1758  std::vector<std::string> grantees =
1759  SysCatalog::instance().getRoles(true,
1760  session_ptr->get_currentUser().isSuper,
1761  session_ptr->get_currentUser().userName);
1762  for (const auto& grantee : grantees) {
1763  DBObject* object_found;
1764  auto* gr = SysCatalog::instance().getGrantee(grantee);
1765  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1766  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1767  }
1768  // check object permissions on Database level
1769  if (gr &&
1770  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1771  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1772  }
1773  }
1774 }
1775 
1776 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
1777  const TSessionId& sessionId,
1778  const std::string& granteeName) {
1779  auto stdlog = STDLOG(get_session_ptr(sessionId));
1780  auto session_ptr = stdlog.getConstSessionInfo();
1781  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
1782  if (grantee) {
1783  if (session_ptr->get_currentUser().isSuper) {
1784  roles = grantee->getRoles();
1785  } else if (grantee->isUser()) {
1786  if (session_ptr->get_currentUser().userName == granteeName) {
1787  roles = grantee->getRoles();
1788  } else {
1790  "Only a superuser is authorized to request list of roles granted to "
1791  "another "
1792  "user.");
1793  }
1794  } else {
1795  CHECK(!grantee->isUser());
1796  // granteeName is actually a roleName here and we can check a role
1797  // only if it is granted to us
1798  if (SysCatalog::instance().isRoleGrantedToGrantee(
1799  session_ptr->get_currentUser().userName, granteeName, false)) {
1800  roles = grantee->getRoles();
1801  } else {
1802  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
1803  }
1804  }
1805  } else {
1806  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
1807  }
1808 }
1809 
1810 namespace {
1812  const std::map<std::string, std::vector<std::string>>& table_col_names) {
1813  std::ostringstream oss;
1814  for (const auto& [table_name, col_names] : table_col_names) {
1815  oss << ":" << table_name;
1816  for (const auto& col_name : col_names) {
1817  oss << "," << col_name;
1818  }
1819  }
1820  return oss.str();
1821 }
1822 } // namespace
1823 
1825  TPixelTableRowResult& _return,
1826  const TSessionId& session,
1827  const int64_t widget_id,
1828  const TPixel& pixel,
1829  const std::map<std::string, std::vector<std::string>>& table_col_names,
1830  const bool column_format,
1831  const int32_t pixel_radius,
1832  const std::string& nonce) {
1833  auto stdlog = STDLOG(get_session_ptr(session),
1834  "widget_id",
1835  widget_id,
1836  "pixel.x",
1837  pixel.x,
1838  "pixel.y",
1839  pixel.y,
1840  "column_format",
1841  column_format,
1842  "pixel_radius",
1843  pixel_radius,
1844  "table_col_names",
1845  dump_table_col_names(table_col_names),
1846  "nonce",
1847  nonce);
1848  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1849  auto session_ptr = stdlog.getSessionInfo();
1850  if (!render_handler_) {
1851  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
1852  }
1853 
1854  try {
1855  render_handler_->get_result_row_for_pixel(_return,
1856  session_ptr,
1857  widget_id,
1858  pixel,
1859  table_col_names,
1860  column_format,
1861  pixel_radius,
1862  nonce);
1863  } catch (std::exception& e) {
1864  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1865  }
1866 }
1867 
1868 namespace {
1869 
1870 inline void fixup_geo_column_descriptor(TColumnType& col_type,
1871  const SQLTypes subtype,
1872  const int output_srid) {
1873  col_type.col_type.precision = static_cast<int>(subtype);
1874  col_type.col_type.scale = output_srid;
1875 }
1876 
1877 } // namespace
1878 
1880  const ColumnDescriptor* cd) {
1881  TColumnType col_type;
1882  col_type.col_name = cd->columnName;
1883  col_type.src_name = cd->sourceName;
1884  col_type.col_id = cd->columnId;
1885  col_type.col_type.type = type_to_thrift(cd->columnType);
1886  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
1887  col_type.col_type.nullable = !cd->columnType.get_notnull();
1888  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
1889  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
1890  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
1891  }
1892  if (IS_GEO(cd->columnType.get_type())) {
1894  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
1895  } else {
1896  col_type.col_type.precision = cd->columnType.get_precision();
1897  col_type.col_type.scale = cd->columnType.get_scale();
1898  }
1899  col_type.is_system = cd->isSystemCol;
1901  cat != nullptr) {
1902  // have to get the actual size of the encoding from the dictionary definition
1903  const int dict_id = cd->columnType.get_comp_param();
1904  if (!cat->getMetadataForDict(dict_id, false)) {
1905  col_type.col_type.comp_param = 0;
1906  return col_type;
1907  }
1908  auto dd = cat->getMetadataForDict(dict_id, false);
1909  if (!dd) {
1910  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
1911  }
1912  col_type.col_type.comp_param = dd->dictNBits;
1913  } else {
1914  col_type.col_type.comp_param =
1915  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
1916  ? 32
1917  : cd->columnType.get_comp_param();
1918  }
1919  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
1920  return col_type;
1921 }
1922 
1923 void DBHandler::get_internal_table_details(TTableDetails& _return,
1924  const TSessionId& session,
1925  const std::string& table_name) {
1926  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1927  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1928  get_table_details_impl(_return, stdlog, table_name, true, false);
1929 }
1930 
1931 void DBHandler::get_table_details(TTableDetails& _return,
1932  const TSessionId& session,
1933  const std::string& table_name) {
1934  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1935  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1936  get_table_details_impl(_return, stdlog, table_name, false, false);
1937 }
1938 
1939 void DBHandler::get_table_details_impl(TTableDetails& _return,
1940  query_state::StdLog& stdlog,
1941  const std::string& table_name,
1942  const bool get_system,
1943  const bool get_physical) {
1944  try {
1945  auto session_info = stdlog.getSessionInfo();
1946  auto& cat = session_info->getCatalog();
1947  const auto td_with_lock =
1949  cat, table_name, false);
1950  const auto td = td_with_lock();
1951  CHECK(td);
1952 
1953  bool have_privileges_on_view_sources = true;
1954  if (td->isView) {
1955  auto query_state = create_query_state(session_info, td->viewSQL);
1956  stdlog.setQueryState(query_state);
1957  try {
1958  if (hasTableAccessPrivileges(td, *session_info)) {
1959  // TODO(adb): we should take schema read locks on all tables making up the
1960  // view, at minimum
1961  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
1962  query_state->getQueryStr(),
1963  {},
1964  false,
1966  false);
1967  try {
1968  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
1969  query_ra.first);
1970  } catch (const std::runtime_error&) {
1971  have_privileges_on_view_sources = false;
1972  }
1973 
1974  TQueryResult result;
1975  execute_rel_alg(result,
1976  query_state->createQueryStateProxy(),
1977  query_ra.first.plan_result,
1978  true,
1980  -1,
1981  -1,
1982  /*just_validate=*/true,
1983  /*find_filter_push_down_candidates=*/false,
1985  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
1986  } else {
1987  throw std::runtime_error(
1988  "Unable to access view " + table_name +
1989  ". The view may not exist, or the logged in user may not "
1990  "have permission to access the view.");
1991  }
1992  } catch (const std::exception& e) {
1993  throw std::runtime_error("View '" + table_name +
1994  "' query has failed with an error: '" +
1995  std::string(e.what()) +
1996  "'.\nThe view must be dropped and re-created to "
1997  "resolve the error. \nQuery:\n" +
1998  query_state->getQueryStr());
1999  }
2000  } else {
2001  if (hasTableAccessPrivileges(td, *session_info)) {
2002  const auto col_descriptors =
2003  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
2004  const auto deleted_cd = cat.getDeletedColumn(td);
2005  for (const auto cd : col_descriptors) {
2006  if (cd == deleted_cd) {
2007  continue;
2008  }
2009  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
2010  }
2011  } else {
2012  throw std::runtime_error(
2013  "Unable to access table " + table_name +
2014  ". The table may not exist, or the logged in user may not "
2015  "have permission to access the table.");
2016  }
2017  }
2018  _return.fragment_size = td->maxFragRows;
2019  _return.page_size = td->fragPageSize;
2020  _return.max_rows = td->maxRows;
2021  _return.view_sql =
2022  (have_privileges_on_view_sources ? td->viewSQL
2023  : "[Not enough privileges to see the view SQL]");
2024  _return.shard_count = td->nShards;
2025  _return.key_metainfo = td->keyMetainfo;
2026  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2027  _return.partition_detail =
2028  td->partitions.empty()
2029  ? TPartitionDetail::DEFAULT
2030  : (table_is_replicated(td)
2031  ? TPartitionDetail::REPLICATED
2032  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2033  : TPartitionDetail::OTHER));
2034  } catch (const std::runtime_error& e) {
2035  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2036  }
2037 }
2038 
2039 void DBHandler::get_link_view(TFrontendView& _return,
2040  const TSessionId& session,
2041  const std::string& link) {
2042  auto stdlog = STDLOG(get_session_ptr(session));
2043  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2044  auto session_ptr = stdlog.getConstSessionInfo();
2045  auto const& cat = session_ptr->getCatalog();
2046  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2047  if (!ld) {
2048  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
2049  }
2050  _return.view_state = ld->viewState;
2051  _return.view_name = ld->link;
2052  _return.update_time = ld->updateTime;
2053  _return.view_metadata = ld->viewMetadata;
2054 }
2055 
2057  const TableDescriptor* td,
2058  const Catalog_Namespace::SessionInfo& session_info) {
2059  auto& cat = session_info.getCatalog();
2060  auto user_metadata = session_info.get_currentUser();
2061 
2062  if (user_metadata.isSuper) {
2063  return true;
2064  }
2065 
2067  dbObject.loadKey(cat);
2068  std::vector<DBObject> privObjects = {dbObject};
2069 
2070  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2071 }
2072 
2073 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2074  const Catalog_Namespace::SessionInfo& session_info,
2075  const GetTablesType get_tables_type) {
2076  table_names = session_info.getCatalog().getTableNamesForUser(
2077  session_info.get_currentUser(), get_tables_type);
2078 }
2079 
2080 void DBHandler::get_tables(std::vector<std::string>& table_names,
2081  const TSessionId& session) {
2082  auto stdlog = STDLOG(get_session_ptr(session));
2083  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2085  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2086 }
2087 
2088 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2089  const TSessionId& session) {
2090  auto stdlog = STDLOG(get_session_ptr(session));
2091  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2092  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2093 }
2094 
2095 void DBHandler::get_views(std::vector<std::string>& table_names,
2096  const TSessionId& session) {
2097  auto stdlog = STDLOG(get_session_ptr(session));
2098  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2099  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2100 }
2101 
2102 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2103  QueryStateProxy query_state_proxy,
2104  const Catalog_Namespace::SessionInfo& session_info,
2105  const bool with_table_locks) {
2106  const auto& cat = session_info.getCatalog();
2107  const auto tables = cat.getAllTableMetadata();
2108  _return.reserve(tables.size());
2109 
2110  for (const auto td : tables) {
2111  if (td->shard >= 0) {
2112  // skip shards, they're not standalone tables
2113  continue;
2114  }
2115  if (!hasTableAccessPrivileges(td, session_info)) {
2116  // skip table, as there are no privileges to access it
2117  continue;
2118  }
2119 
2120  TTableMeta ret;
2121  ret.table_name = td->tableName;
2122  ret.is_view = td->isView;
2123  ret.is_replicated = table_is_replicated(td);
2124  ret.shard_count = td->nShards;
2125  ret.max_rows = td->maxRows;
2126  ret.table_id = td->tableId;
2127 
2128  std::vector<TTypeInfo> col_types;
2129  std::vector<std::string> col_names;
2130  size_t num_cols = 0;
2131  if (td->isView) {
2132  try {
2133  TPlanResult parse_result;
2135  std::tie(parse_result, locks) = parse_to_ra(
2136  query_state_proxy, td->viewSQL, {}, with_table_locks, system_parameters_);
2137  const auto query_ra = parse_result.plan_result;
2138 
2139  TQueryResult result;
2140  execute_rel_alg(result,
2141  query_state_proxy,
2142  query_ra,
2143  true,
2145  -1,
2146  -1,
2147  /*just_validate=*/true,
2148  /*find_push_down_candidates=*/false,
2150  num_cols = result.row_set.row_desc.size();
2151  for (const auto& col : result.row_set.row_desc) {
2152  if (col.is_physical) {
2153  num_cols--;
2154  continue;
2155  }
2156  col_types.push_back(col.col_type);
2157  col_names.push_back(col.col_name);
2158  }
2159  } catch (std::exception& e) {
2160  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
2161  }
2162  } else {
2163  try {
2164  if (hasTableAccessPrivileges(td, session_info)) {
2165  const auto col_descriptors =
2166  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2167  const auto deleted_cd = cat.getDeletedColumn(td);
2168  for (const auto cd : col_descriptors) {
2169  if (cd == deleted_cd) {
2170  continue;
2171  }
2172  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2173  col_names.push_back(cd->columnName);
2174  }
2175  num_cols = col_descriptors.size();
2176  } else {
2177  continue;
2178  }
2179  } catch (const std::runtime_error& e) {
2180  THROW_MAPD_EXCEPTION(e.what());
2181  }
2182  }
2183 
2184  ret.num_cols = num_cols;
2185  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2186  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2187 
2188  _return.push_back(ret);
2189  }
2190 }
2191 
2192 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2193  const TSessionId& session) {
2194  auto stdlog = STDLOG(get_session_ptr(session));
2195  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2196  auto session_ptr = stdlog.getConstSessionInfo();
2197  auto query_state = create_query_state(session_ptr, "");
2198  stdlog.setQueryState(query_state);
2199 
2200  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2203 
2204  try {
2205  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2206  } catch (const std::exception& e) {
2207  THROW_MAPD_EXCEPTION(e.what());
2208  }
2209 }
2210 
2211 void DBHandler::get_users(std::vector<std::string>& user_names,
2212  const TSessionId& session) {
2213  auto stdlog = STDLOG(get_session_ptr(session));
2214  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2215  auto session_ptr = stdlog.getConstSessionInfo();
2216  std::list<Catalog_Namespace::UserMetadata> user_list;
2217 
2218  if (!session_ptr->get_currentUser().isSuper) {
2219  user_list = SysCatalog::instance().getAllUserMetadata(
2220  session_ptr->getCatalog().getCurrentDB().dbId);
2221  } else {
2222  user_list = SysCatalog::instance().getAllUserMetadata();
2223  }
2224  for (auto u : user_list) {
2225  user_names.push_back(u.userName);
2226  }
2227 }
2228 
2229 void DBHandler::get_version(std::string& version) {
2230  version = MAPD_RELEASE;
2231 }
2232 
2233 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2234  auto stdlog = STDLOG(get_session_ptr(session));
2235  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2236  auto session_ptr = stdlog.getConstSessionInfo();
2237  if (!session_ptr->get_currentUser().isSuper) {
2238  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2239  }
2240  try {
2242  } catch (const std::exception& e) {
2243  THROW_MAPD_EXCEPTION(e.what());
2244  }
2245  if (render_handler_) {
2246  render_handler_->clear_gpu_memory();
2247  }
2248 
2249  if (leaf_aggregator_.leafCount() > 0) {
2251  }
2252 }
2253 
2254 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2255  auto stdlog = STDLOG(get_session_ptr(session));
2256  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2257  auto session_ptr = stdlog.getConstSessionInfo();
2258  if (!session_ptr->get_currentUser().isSuper) {
2259  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2260  }
2261  try {
2263  } catch (const std::exception& e) {
2264  THROW_MAPD_EXCEPTION(e.what());
2265  }
2266  if (render_handler_) {
2267  render_handler_->clear_cpu_memory();
2268  }
2269 
2270  if (leaf_aggregator_.leafCount() > 0) {
2272  }
2273 }
2274 
2276  return INVALID_SESSION_ID;
2277 }
2278 
2279 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2280  const TSessionId& session,
2281  const std::string& memory_level) {
2282  auto stdlog = STDLOG(get_session_ptr(session));
2283  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2284  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2285  Data_Namespace::MemoryLevel mem_level;
2286  if (!memory_level.compare("gpu")) {
2288  internal_memory =
2289  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2290  } else {
2292  internal_memory =
2293  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2294  }
2295 
2296  for (auto memInfo : internal_memory) {
2297  TNodeMemoryInfo nodeInfo;
2298  if (leaf_aggregator_.leafCount() > 0) {
2299  nodeInfo.host_name = get_hostname();
2300  }
2301  nodeInfo.page_size = memInfo.pageSize;
2302  nodeInfo.max_num_pages = memInfo.maxNumPages;
2303  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2304  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2305  for (auto gpu : memInfo.nodeMemoryData) {
2306  TMemoryData md;
2307  md.slab = gpu.slabNum;
2308  md.start_page = gpu.startPage;
2309  md.num_pages = gpu.numPages;
2310  md.touch = gpu.touch;
2311  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2312  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2313  nodeInfo.node_memory_data.push_back(md);
2314  }
2315  _return.push_back(nodeInfo);
2316  }
2317  if (leaf_aggregator_.leafCount() > 0) {
2318  std::vector<TNodeMemoryInfo> leafSummary =
2319  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2320  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2321  }
2322 }
2323 
2324 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2325  auto stdlog = STDLOG(get_session_ptr(session));
2326  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2327  auto session_ptr = stdlog.getConstSessionInfo();
2328  const auto& user = session_ptr->get_currentUser();
2330  SysCatalog::instance().getDatabaseListForUser(user);
2331  for (auto& db : dbs) {
2332  TDBInfo dbinfo;
2333  dbinfo.db_name = std::move(db.dbName);
2334  dbinfo.db_owner = std::move(db.dbOwnerName);
2335  dbinfos.push_back(std::move(dbinfo));
2336  }
2337 }
2338 
2339 void DBHandler::set_execution_mode(const TSessionId& session,
2340  const TExecuteMode::type mode) {
2341  auto stdlog = STDLOG(get_session_ptr(session));
2342  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2343  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
2344  auto session_it = get_session_it_unsafe(session, write_lock);
2345  if (leaf_aggregator_.leafCount() > 0) {
2346  leaf_aggregator_.set_execution_mode(session, mode);
2347  try {
2348  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2349  } catch (const TOmniSciException& e) {
2350  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2351  }
2352  return;
2353  }
2354  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2355 }
2356 
2357 namespace {
2358 
2360  if (td && td->nShards) {
2361  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2362  }
2363 }
2364 
2365 } // namespace
2366 
2367 void DBHandler::load_table_binary(const TSessionId& session,
2368  const std::string& table_name,
2369  const std::vector<TRow>& rows) {
2370  try {
2371  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2372  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2373  auto session_ptr = stdlog.getConstSessionInfo();
2374  check_read_only("load_table_binary");
2375  auto& cat = session_ptr->getCatalog();
2376  const auto td_with_lock =
2378  cat, table_name, true);
2379  const auto td = td_with_lock();
2380  CHECK(td);
2381  if (g_cluster && !leaf_aggregator_.leafCount()) {
2382  // Sharded table rows need to be routed to the leaf by an aggregator.
2384  }
2385  check_table_load_privileges(*session_ptr, table_name);
2386  if (rows.empty()) {
2387  return;
2388  }
2389  std::unique_ptr<import_export::Loader> loader;
2390  if (leaf_aggregator_.leafCount() > 0) {
2391  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2392  } else {
2393  loader.reset(new import_export::Loader(cat, td));
2394  }
2395  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2396  // Subtracting 1 (rowid) until TableDescriptor is updated.
2397  if (rows.front().cols.size() !=
2398  static_cast<size_t>(td->nColumns) - (td->hasDeletedCol ? 2 : 1)) {
2399  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2400  }
2401  auto col_descs = loader->get_column_descs();
2402  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2403  for (auto cd : col_descs) {
2404  import_buffers.push_back(std::unique_ptr<import_export::TypedImportBuffer>(
2405  new import_export::TypedImportBuffer(cd, loader->getStringDict(cd))));
2406  }
2407  for (auto const& row : rows) {
2408  size_t col_idx = 0;
2409  try {
2410  for (auto cd : col_descs) {
2411  import_buffers[col_idx]->add_value(
2412  cd, row.cols[col_idx], row.cols[col_idx].is_null);
2413  col_idx++;
2414  }
2415  } catch (const std::exception& e) {
2416  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2417  import_buffers[col_idx_to_pop]->pop_value();
2418  }
2419  LOG(ERROR) << "Input exception thrown: " << e.what()
2420  << ". Row discarded, issue at column : " << (col_idx + 1)
2421  << " data :" << row;
2422  }
2423  }
2424  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2425  session_ptr->getCatalog(), table_name);
2426  loader->load(import_buffers, rows.size());
2427  } catch (const std::exception& e) {
2428  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2429  }
2430 }
2431 
2432 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
2434  const Catalog_Namespace::SessionInfo& session_info,
2435  const std::string& table_name,
2436  size_t num_cols,
2437  std::unique_ptr<import_export::Loader>* loader,
2438  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers) {
2439  auto& cat = session_info.getCatalog();
2440  auto td_with_lock =
2441  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
2443  cat, table_name, true));
2444  const auto td = (*td_with_lock)();
2445  CHECK(td);
2446  if (g_cluster && !leaf_aggregator_.leafCount()) {
2447  // Sharded table rows need to be routed to the leaf by an aggregator.
2449  }
2450  check_table_load_privileges(session_info, table_name);
2451 
2452  if (leaf_aggregator_.leafCount() > 0) {
2453  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2454  } else {
2455  loader->reset(new import_export::Loader(cat, td));
2456  }
2457 
2458  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2459  // Subtracting 1 (rowid) until TableDescriptor is updated.
2460  auto col_descs = (*loader)->get_column_descs();
2461  const auto geo_physical_cols = std::count_if(
2462  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2463  const auto num_table_cols =
2464  static_cast<size_t>(td->nColumns) - geo_physical_cols - (td->hasDeletedCol ? 2 : 1);
2465  if (num_cols != num_table_cols) {
2466  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
2467  ") does not match number of columns in table " +
2468  td->tableName + " (" + std::to_string(num_table_cols) + ")");
2469  }
2470 
2471  *import_buffers = import_export::setup_column_loaders(td, loader->get());
2472  return std::move(td_with_lock);
2473 }
2474 
2475 void DBHandler::load_table_binary_columnar(const TSessionId& session,
2476  const std::string& table_name,
2477  const std::vector<TColumn>& cols) {
2478  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2479  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2480  auto session_ptr = stdlog.getConstSessionInfo();
2481  check_read_only("load_table_binary_columnar");
2482  auto const& cat = session_ptr->getCatalog();
2483 
2484  std::unique_ptr<import_export::Loader> loader;
2485  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2487  *session_ptr, table_name, cols.size(), &loader, &import_buffers);
2488  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2489  session_ptr->getCatalog(), table_name);
2490 
2491  size_t numRows = 0;
2492  size_t import_idx = 0; // index into the TColumn vector being loaded
2493  size_t col_idx = 0; // index into column description vector
2494  try {
2495  size_t skip_physical_cols = 0;
2496  for (auto cd : loader->get_column_descs()) {
2497  if (skip_physical_cols > 0) {
2498  if (!cd->isGeoPhyCol) {
2499  throw std::runtime_error("Unexpected physical column");
2500  }
2501  skip_physical_cols--;
2502  continue;
2503  }
2504  size_t colRows = import_buffers[col_idx]->add_values(cd, cols[import_idx]);
2505  if (col_idx == 0) {
2506  numRows = colRows;
2507  } else if (colRows != numRows) {
2508  std::ostringstream oss;
2509  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
2510  << cd->columnName << " , expecting " << numRows << " rows, column "
2511  << col_idx << " has " << colRows << " rows";
2512  THROW_MAPD_EXCEPTION(oss.str());
2513  }
2514  // Advance to the next column in the table
2515  col_idx++;
2516 
2517  // For geometry columns: process WKT strings and fill physical columns
2518  if (cd->columnType.is_geometry()) {
2519  auto geo_col_idx = col_idx - 1;
2520  const auto wkt_or_wkb_hex_column =
2521  import_buffers[geo_col_idx]->getGeoStringBuffer();
2522  std::vector<std::vector<double>> coords_column, bounds_column;
2523  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2524  int render_group = 0;
2525  SQLTypeInfo ti = cd->columnType;
2526  if (numRows != wkt_or_wkb_hex_column->size() ||
2527  !Geo_namespace::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
2528  ti,
2529  coords_column,
2530  bounds_column,
2531  ring_sizes_column,
2532  poly_rings_column,
2533  false)) {
2534  std::ostringstream oss;
2535  oss << "load_table_binary_columnar: Invalid geometry in column "
2536  << cd->columnName;
2537  THROW_MAPD_EXCEPTION(oss.str());
2538  }
2539  // Populate physical columns, advance col_idx
2541  cat,
2542  cd,
2543  import_buffers,
2544  col_idx,
2545  coords_column,
2546  bounds_column,
2547  ring_sizes_column,
2548  poly_rings_column,
2549  render_group);
2550  skip_physical_cols = cd->columnType.get_physical_cols();
2551  }
2552  // Advance to the next column of values being loaded
2553  import_idx++;
2554  }
2555  } catch (const std::exception& e) {
2556  std::ostringstream oss;
2557  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
2558  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2559  THROW_MAPD_EXCEPTION(oss.str());
2560  }
2561  loader->load(import_buffers, numRows);
2562 }
2563 
2564 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
2565 
2566 #define ARROW_THRIFT_THROW_NOT_OK(s) \
2567  do { \
2568  ::arrow::Status _s = (s); \
2569  if (UNLIKELY(!_s.ok())) { \
2570  TOmniSciException ex; \
2571  ex.error_msg = _s.ToString(); \
2572  LOG(ERROR) << s.ToString(); \
2573  throw ex; \
2574  } \
2575  } while (0)
2576 
2577 namespace {
2578 
2579 RecordBatchVector loadArrowStream(const std::string& stream) {
2580  RecordBatchVector batches;
2581  try {
2582  // TODO(wesm): Make this simpler in general, see ARROW-1600
2583  auto stream_buffer =
2584  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
2585  static_cast<int64_t>(stream.size()));
2586 
2587  arrow::io::BufferReader buf_reader(stream_buffer);
2588  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
2589  ARROW_ASSIGN_OR_THROW(batch_reader,
2590  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
2591 
2592  while (true) {
2593  std::shared_ptr<arrow::RecordBatch> batch;
2594  // Read batch (zero-copy) from the stream
2595  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
2596  if (batch == nullptr) {
2597  break;
2598  }
2599  batches.emplace_back(std::move(batch));
2600  }
2601  } catch (const std::exception& e) {
2602  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
2603  }
2604  return batches;
2605 }
2606 
2607 } // namespace
2608 
2609 void DBHandler::load_table_binary_arrow(const TSessionId& session,
2610  const std::string& table_name,
2611  const std::string& arrow_stream) {
2612  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2613  auto session_ptr = stdlog.getConstSessionInfo();
2614  check_read_only("load_table_binary_arrow");
2615 
2616  RecordBatchVector batches = loadArrowStream(arrow_stream);
2617 
2618  // Assuming have one batch for now
2619  if (batches.size() != 1) {
2620  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
2621  }
2622 
2623  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
2624 
2625  std::unique_ptr<import_export::Loader> loader;
2626  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2627  auto read_lock = prepare_columnar_loader(*session_ptr,
2628  table_name,
2629  static_cast<size_t>(batch->num_columns()),
2630  &loader,
2631  &import_buffers);
2632  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2633  session_ptr->getCatalog(), table_name);
2634 
2635  size_t numRows = 0;
2636  size_t col_idx = 0;
2637  try {
2638  for (auto cd : loader->get_column_descs()) {
2639  auto& array = *batch->column(col_idx);
2640  import_export::ArraySliceRange row_slice(0, array.length());
2641  numRows =
2642  import_buffers[col_idx]->add_arrow_values(cd, array, true, row_slice, nullptr);
2643  col_idx++;
2644  }
2645  } catch (const std::exception& e) {
2646  LOG(ERROR) << "Input exception thrown: " << e.what()
2647  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2648  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
2649  // other import paths
2650  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2651  }
2652  loader->load(import_buffers, numRows);
2653 }
2654 
2655 void DBHandler::load_table(const TSessionId& session,
2656  const std::string& table_name,
2657  const std::vector<TStringRow>& rows) {
2658  try {
2659  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2660  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2661  auto session_ptr = stdlog.getConstSessionInfo();
2662  check_read_only("load_table");
2663  auto& cat = session_ptr->getCatalog();
2664  const auto td_with_lock =
2666  cat, table_name, true);
2667  const auto td = td_with_lock();
2668  CHECK(td);
2669 
2670  if (g_cluster && !leaf_aggregator_.leafCount()) {
2671  // Sharded table rows need to be routed to the leaf by an aggregator.
2673  }
2674  check_table_load_privileges(*session_ptr, table_name);
2675  if (rows.empty()) {
2676  return;
2677  }
2678  std::unique_ptr<import_export::Loader> loader;
2679  if (leaf_aggregator_.leafCount() > 0) {
2680  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2681  } else {
2682  loader.reset(new import_export::Loader(cat, td));
2683  }
2684  auto col_descs = loader->get_column_descs();
2685  auto geo_physical_cols = std::count_if(
2686  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2687  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2688  // Subtracting 1 (rowid) until TableDescriptor is updated.
2689  if (rows.front().cols.size() != static_cast<size_t>(td->nColumns) -
2690  geo_physical_cols - (td->hasDeletedCol ? 2 : 1)) {
2691  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name +
2692  " (" + std::to_string(rows.front().cols.size()) + " vs " +
2693  std::to_string(td->nColumns - geo_physical_cols - 1) + ")");
2694  }
2695  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
2696  for (auto cd : col_descs) {
2697  import_buffers.push_back(std::unique_ptr<import_export::TypedImportBuffer>(
2698  new import_export::TypedImportBuffer(cd, loader->getStringDict(cd))));
2699  }
2700  import_export::CopyParams copy_params;
2701  size_t rows_completed = 0;
2702  for (auto const& row : rows) {
2703  size_t import_idx = 0; // index into the TStringRow being loaded
2704  size_t col_idx = 0; // index into column description vector
2705  try {
2706  size_t skip_physical_cols = 0;
2707  for (auto cd : col_descs) {
2708  if (skip_physical_cols > 0) {
2709  if (!cd->isGeoPhyCol) {
2710  throw std::runtime_error("Unexpected physical column");
2711  }
2712  skip_physical_cols--;
2713  continue;
2714  }
2715  import_buffers[col_idx]->add_value(cd,
2716  row.cols[import_idx].str_val,
2717  row.cols[import_idx].is_null,
2718  copy_params);
2719  // Advance to the next column within the table
2720  col_idx++;
2721 
2722  if (cd->columnType.is_geometry()) {
2723  // Populate physical columns
2724  std::vector<double> coords, bounds;
2725  std::vector<int> ring_sizes, poly_rings;
2726  int render_group = 0;
2727  SQLTypeInfo ti;
2728  if (row.cols[import_idx].is_null ||
2730  row.cols[import_idx].str_val,
2731  ti,
2732  coords,
2733  bounds,
2734  ring_sizes,
2735  poly_rings,
2736  false)) {
2737  throw std::runtime_error("Invalid geometry");
2738  }
2739  if (cd->columnType.get_type() != ti.get_type()) {
2740  throw std::runtime_error("Geometry type mismatch");
2741  }
2743  cd,
2744  import_buffers,
2745  col_idx,
2746  coords,
2747  bounds,
2748  ring_sizes,
2749  poly_rings,
2750  render_group);
2751  skip_physical_cols = cd->columnType.get_physical_cols();
2752  }
2753  // Advance to the next field within the row
2754  import_idx++;
2755  }
2756  rows_completed++;
2757  } catch (const std::exception& e) {
2758  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2759  import_buffers[col_idx_to_pop]->pop_value();
2760  }
2761  LOG(ERROR) << "Input exception thrown: " << e.what()
2762  << ". Row discarded, issue at column : " << (col_idx + 1)
2763  << " data :" << row;
2764  }
2765  }
2766  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2767  session_ptr->getCatalog(), table_name);
2768  loader->load(import_buffers, rows_completed);
2769  } catch (const std::exception& e) {
2770  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2771  }
2772 }
2773 
2774 char DBHandler::unescape_char(std::string str) {
2775  char out = str[0];
2776  if (str.size() == 2 && str[0] == '\\') {
2777  if (str[1] == 't') {
2778  out = '\t';
2779  } else if (str[1] == 'n') {
2780  out = '\n';
2781  } else if (str[1] == '0') {
2782  out = '\0';
2783  } else if (str[1] == '\'') {
2784  out = '\'';
2785  } else if (str[1] == '\\') {
2786  out = '\\';
2787  }
2788  }
2789  return out;
2790 }
2791 
2793  import_export::CopyParams copy_params;
2794  switch (cp.has_header) {
2795  case TImportHeaderRow::AUTODETECT:
2797  break;
2798  case TImportHeaderRow::NO_HEADER:
2800  break;
2801  case TImportHeaderRow::HAS_HEADER:
2803  break;
2804  default:
2805  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
2806  std::to_string((int)cp.has_header));
2807  break;
2808  }
2809  copy_params.quoted = cp.quoted;
2810  if (cp.delimiter.length() > 0) {
2811  copy_params.delimiter = unescape_char(cp.delimiter);
2812  } else {
2813  copy_params.delimiter = '\0';
2814  }
2815  if (cp.null_str.length() > 0) {
2816  copy_params.null_str = cp.null_str;
2817  }
2818  if (cp.quote.length() > 0) {
2819  copy_params.quote = unescape_char(cp.quote);
2820  }
2821  if (cp.escape.length() > 0) {
2822  copy_params.escape = unescape_char(cp.escape);
2823  }
2824  if (cp.line_delim.length() > 0) {
2825  copy_params.line_delim = unescape_char(cp.line_delim);
2826  }
2827  if (cp.array_delim.length() > 0) {
2828  copy_params.array_delim = unescape_char(cp.array_delim);
2829  }
2830  if (cp.array_begin.length() > 0) {
2831  copy_params.array_begin = unescape_char(cp.array_begin);
2832  }
2833  if (cp.array_end.length() > 0) {
2834  copy_params.array_end = unescape_char(cp.array_end);
2835  }
2836  if (cp.threads != 0) {
2837  copy_params.threads = cp.threads;
2838  }
2839  if (cp.s3_access_key.length() > 0) {
2840  copy_params.s3_access_key = cp.s3_access_key;
2841  }
2842  if (cp.s3_secret_key.length() > 0) {
2843  copy_params.s3_secret_key = cp.s3_secret_key;
2844  }
2845  if (cp.s3_region.length() > 0) {
2846  copy_params.s3_region = cp.s3_region;
2847  }
2848  if (cp.s3_endpoint.length() > 0) {
2849  copy_params.s3_endpoint = cp.s3_endpoint;
2850  }
2851  switch (cp.file_type) {
2852  case TFileType::POLYGON:
2854  break;
2855  case TFileType::DELIMITED:
2857  break;
2858 #ifdef ENABLE_IMPORT_PARQUET
2859  case TFileType::PARQUET:
2860  copy_params.file_type = import_export::FileType::PARQUET;
2861  break;
2862 #endif
2863  default:
2864  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
2865  std::to_string((int)cp.file_type));
2866  break;
2867  }
2868  switch (cp.geo_coords_encoding) {
2869  case TEncodingType::GEOINT:
2870  copy_params.geo_coords_encoding = kENCODING_GEOINT;
2871  break;
2872  case TEncodingType::NONE:
2873  copy_params.geo_coords_encoding = kENCODING_NONE;
2874  break;
2875  default:
2876  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
2877  std::to_string((int)cp.geo_coords_encoding));
2878  break;
2879  }
2880  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2881  switch (cp.geo_coords_type) {
2882  case TDatumType::GEOGRAPHY:
2883  copy_params.geo_coords_type = kGEOGRAPHY;
2884  break;
2885  case TDatumType::GEOMETRY:
2886  copy_params.geo_coords_type = kGEOMETRY;
2887  break;
2888  default:
2889  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
2890  std::to_string((int)cp.geo_coords_type));
2891  break;
2892  }
2893  switch (cp.geo_coords_srid) {
2894  case 4326:
2895  case 3857:
2896  case 900913:
2897  copy_params.geo_coords_srid = cp.geo_coords_srid;
2898  break;
2899  default:
2900  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
2901  std::to_string((int)cp.geo_coords_srid));
2902  break;
2903  }
2904  copy_params.sanitize_column_names = cp.sanitize_column_names;
2905  copy_params.geo_layer_name = cp.geo_layer_name;
2906  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2907  copy_params.geo_explode_collections = cp.geo_explode_collections;
2908  return copy_params;
2909 }
2910 
2912  TCopyParams copy_params;
2913  copy_params.delimiter = cp.delimiter;
2914  copy_params.null_str = cp.null_str;
2915  switch (cp.has_header) {
2917  copy_params.has_header = TImportHeaderRow::AUTODETECT;
2918  break;
2920  copy_params.has_header = TImportHeaderRow::NO_HEADER;
2921  break;
2923  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
2924  break;
2925  default:
2926  CHECK(false);
2927  break;
2928  }
2929  copy_params.quoted = cp.quoted;
2930  copy_params.quote = cp.quote;
2931  copy_params.escape = cp.escape;
2932  copy_params.line_delim = cp.line_delim;
2933  copy_params.array_delim = cp.array_delim;
2934  copy_params.array_begin = cp.array_begin;
2935  copy_params.array_end = cp.array_end;
2936  copy_params.threads = cp.threads;
2937  copy_params.s3_access_key = cp.s3_access_key;
2938  copy_params.s3_secret_key = cp.s3_secret_key;
2939  copy_params.s3_region = cp.s3_region;
2940  copy_params.s3_endpoint = cp.s3_endpoint;
2941  switch (cp.file_type) {
2943  copy_params.file_type = TFileType::POLYGON;
2944  break;
2945  default:
2946  copy_params.file_type = TFileType::DELIMITED;
2947  break;
2948  }
2949  switch (cp.geo_coords_encoding) {
2950  case kENCODING_GEOINT:
2951  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
2952  break;
2953  default:
2954  copy_params.geo_coords_encoding = TEncodingType::NONE;
2955  break;
2956  }
2957  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2958  switch (cp.geo_coords_type) {
2959  case kGEOGRAPHY:
2960  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
2961  break;
2962  case kGEOMETRY:
2963  copy_params.geo_coords_type = TDatumType::GEOMETRY;
2964  break;
2965  default:
2966  CHECK(false);
2967  break;
2968  }
2969  copy_params.geo_coords_srid = cp.geo_coords_srid;
2970  copy_params.sanitize_column_names = cp.sanitize_column_names;
2971  copy_params.geo_layer_name = cp.geo_layer_name;
2972  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2973  copy_params.geo_explode_collections = cp.geo_explode_collections;
2974  return copy_params;
2975 }
2976 
2977 namespace {
2978 void add_vsi_network_prefix(std::string& path) {
2979  // do we support network file access?
2980  bool gdal_network = import_export::GDAL::supportsNetworkFileAccess();
2981 
2982  // modify head of filename based on source location
2983  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
2984  if (!gdal_network) {
2986  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
2987  }
2988  // invoke GDAL CURL virtual file reader
2989  path = "/vsicurl/" + path;
2990  } else if (boost::istarts_with(path, "s3://")) {
2991  if (!gdal_network) {
2993  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
2994  }
2995  // invoke GDAL S3 virtual file reader
2996  boost::replace_first(path, "s3://", "/vsis3/");
2997  }
2998 }
2999 
3000 void add_vsi_geo_prefix(std::string& path) {
3001  // single gzip'd file (not an archive)?
3002  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3003  path = "/vsigzip/" + path;
3004  }
3005 }
3006 
3007 void add_vsi_archive_prefix(std::string& path) {
3008  // check for compressed file or file bundle
3009  if (boost::iends_with(path, ".zip")) {
3010  // zip archive
3011  path = "/vsizip/" + path;
3012  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3013  boost::iends_with(path, ".tar.gz")) {
3014  // tar archive (compressed or uncompressed)
3015  path = "/vsitar/" + path;
3016  }
3017 }
3018 
3019 std::string remove_vsi_prefixes(const std::string& path_in) {
3020  std::string path(path_in);
3021 
3022  // these will be first
3023  if (boost::istarts_with(path, "/vsizip/")) {
3024  boost::replace_first(path, "/vsizip/", "");
3025  } else if (boost::istarts_with(path, "/vsitar/")) {
3026  boost::replace_first(path, "/vsitar/", "");
3027  } else if (boost::istarts_with(path, "/vsigzip/")) {
3028  boost::replace_first(path, "/vsigzip/", "");
3029  }
3030 
3031  // then these
3032  if (boost::istarts_with(path, "/vsicurl/")) {
3033  boost::replace_first(path, "/vsicurl/", "");
3034  } else if (boost::istarts_with(path, "/vsis3/")) {
3035  boost::replace_first(path, "/vsis3/", "s3://");
3036  }
3037 
3038  return path;
3039 }
3040 
3041 bool path_is_relative(const std::string& path) {
3042  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
3043  boost::istarts_with(path, "https://")) {
3044  return false;
3045  }
3046  return !boost::filesystem::path(path).is_absolute();
3047 }
3048 
3049 bool path_has_valid_filename(const std::string& path) {
3050  auto filename = boost::filesystem::path(path).filename().string();
3051  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
3052  return false;
3053  }
3054  return true;
3055 }
3056 
3057 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
3058  if (!path_has_valid_filename(path)) {
3059  return false;
3060  }
3061  if (include_gz) {
3062  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
3063  return true;
3064  }
3065  }
3066  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
3067  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
3068  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
3069  boost::iends_with(path, ".gdb.zip")) {
3070  return true;
3071  }
3072  return false;
3073 }
3074 
3075 bool is_a_supported_archive_file(const std::string& path) {
3076  if (!path_has_valid_filename(path)) {
3077  return false;
3078  }
3079  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
3080  return true;
3081  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3082  boost::iends_with(path, ".tar.gz")) {
3083  return true;
3084  }
3085  return false;
3086 }
3087 
3088 std::string find_first_geo_file_in_archive(const std::string& archive_path,
3089  const import_export::CopyParams& copy_params) {
3090  // get the recursive list of all files in the archive
3091  std::vector<std::string> files =
3092  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
3093 
3094  // report the list
3095  LOG(INFO) << "Found " << files.size() << " files in Archive "
3096  << remove_vsi_prefixes(archive_path);
3097  for (const auto& file : files) {
3098  LOG(INFO) << " " << file;
3099  }
3100 
3101  // scan the list for the first candidate file
3102  bool found_suitable_file = false;
3103  std::string file_name;
3104  for (const auto& file : files) {
3105  if (is_a_supported_geo_file(file, false)) {
3106  file_name = file;
3107  found_suitable_file = true;
3108  break;
3109  }
3110  }
3111 
3112  // if we didn't find anything
3113  if (!found_suitable_file) {
3114  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
3115  remove_vsi_prefixes(archive_path);
3116  file_name.clear();
3117  }
3118 
3119  // done
3120  return file_name;
3121 }
3122 
3123 bool is_local_file(const std::string& file_path) {
3124  return (!boost::istarts_with(file_path, "s3://") &&
3125  !boost::istarts_with(file_path, "http://") &&
3126  !boost::istarts_with(file_path, "https://"));
3127 }
3128 
3129 void validate_import_file_path_if_local(const std::string& file_path) {
3130  if (is_local_file(file_path)) {
3132  }
3133 }
3134 } // namespace
3135 
3136 void DBHandler::detect_column_types(TDetectResult& _return,
3137  const TSessionId& session,
3138  const std::string& file_name_in,
3139  const TCopyParams& cp) {
3140  auto stdlog = STDLOG(get_session_ptr(session));
3141  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3142  check_read_only("detect_column_types");
3143 
3145 
3146  std::string file_name{file_name_in};
3147  if (path_is_relative(file_name)) {
3148  // assume relative paths are relative to data_path / mapd_import / <session>
3149  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3150  boost::filesystem::path(file_name).filename();
3151  file_name = file_path.string();
3152  }
3154 
3155  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
3156  if (copy_params.file_type == import_export::FileType::POLYGON) {
3157  if (is_a_supported_geo_file(file_name, true)) {
3158  // prepare to detect geo file directly
3159  add_vsi_network_prefix(file_name);
3160  add_vsi_geo_prefix(file_name);
3161  } else if (is_a_supported_archive_file(file_name)) {
3162  // find the archive file
3163  add_vsi_network_prefix(file_name);
3164  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
3165  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3166  }
3167  // find geo file in archive
3168  add_vsi_archive_prefix(file_name);
3169  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3170  // prepare to detect that geo file
3171  if (geo_file.size()) {
3172  file_name = file_name + std::string("/") + geo_file;
3173  }
3174  } else {
3175  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
3176  file_name_in);
3177  }
3178  }
3179 
3180  auto file_path = boost::filesystem::path(file_name);
3181  // can be a s3 url
3182  if (!boost::istarts_with(file_name, "s3://")) {
3183  if (!boost::filesystem::path(file_name).is_absolute()) {
3184  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3185  boost::filesystem::path(file_name).filename();
3186  file_name = file_path.string();
3187  }
3188 
3189  if (copy_params.file_type == import_export::FileType::POLYGON) {
3190  // check for geo file
3191  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3192  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3193  }
3194  } else {
3195  // check for regular file
3196  if (!boost::filesystem::exists(file_path)) {
3197  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3198  }
3199  }
3200  }
3201 
3202  try {
3204 #ifdef ENABLE_IMPORT_PARQUET
3205  || (copy_params.file_type == import_export::FileType::PARQUET)
3206 #endif
3207  ) {
3208  import_export::Detector detector(file_path, copy_params);
3209  std::vector<SQLTypes> best_types = detector.best_sqltypes;
3210  std::vector<EncodingType> best_encodings = detector.best_encodings;
3211  std::vector<std::string> headers = detector.get_headers();
3212  copy_params = detector.get_copy_params();
3213 
3214  _return.copy_params = copyparams_to_thrift(copy_params);
3215  _return.row_set.row_desc.resize(best_types.size());
3216  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
3217  TColumnType col;
3218  SQLTypes t = best_types[col_idx];
3219  EncodingType encodingType = best_encodings[col_idx];
3220  SQLTypeInfo ti(t, false, encodingType);
3221  if (IS_GEO(t)) {
3222  // set this so encoding_to_thrift does the right thing
3223  ti.set_compression(copy_params.geo_coords_encoding);
3224  // fill in these directly
3225  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3226  col.col_type.scale = copy_params.geo_coords_srid;
3227  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3228  }
3229  col.col_type.type = type_to_thrift(ti);
3230  col.col_type.encoding = encoding_to_thrift(ti);
3231  if (copy_params.sanitize_column_names) {
3232  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3233  } else {
3234  col.col_name = headers[col_idx];
3235  }
3236  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3237  _return.row_set.row_desc[col_idx] = col;
3238  }
3239  size_t num_samples = 100;
3240  auto sample_data = detector.get_sample_rows(num_samples);
3241 
3242  TRow sample_row;
3243  for (auto row : sample_data) {
3244  sample_row.cols.clear();
3245  for (const auto& s : row) {
3246  TDatum td;
3247  td.val.str_val = s;
3248  td.is_null = s.empty();
3249  sample_row.cols.push_back(td);
3250  }
3251  _return.row_set.rows.push_back(sample_row);
3252  }
3253  } else if (copy_params.file_type == import_export::FileType::POLYGON) {
3254  // @TODO simon.eves get this from somewhere!
3255  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
3256 
3257  check_geospatial_files(file_path, copy_params);
3258  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
3259  file_path.string(), geoColumnName, copy_params);
3260  for (auto cd : cds) {
3261  if (copy_params.sanitize_column_names) {
3262  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
3263  }
3264  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
3265  }
3266  std::map<std::string, std::vector<std::string>> sample_data;
3268  file_path.string(), geoColumnName, sample_data, 100, copy_params);
3269  if (sample_data.size() > 0) {
3270  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
3271  TRow sample_row;
3272  for (auto cd : cds) {
3273  TDatum td;
3274  td.val.str_val = sample_data[cd.sourceName].at(i);
3275  td.is_null = td.val.str_val.empty();
3276  sample_row.cols.push_back(td);
3277  }
3278  _return.row_set.rows.push_back(sample_row);
3279  }
3280  }
3281  _return.copy_params = copyparams_to_thrift(copy_params);
3282  }
3283  } catch (const std::exception& e) {
3284  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
3285  }
3286 }
3287 
3288 void DBHandler::render_vega(TRenderResult& _return,
3289  const TSessionId& session,
3290  const int64_t widget_id,
3291  const std::string& vega_json,
3292  const int compression_level,
3293  const std::string& nonce) {
3294  auto stdlog = STDLOG(get_session_ptr(session),
3295  "widget_id",
3296  widget_id,
3297  "compression_level",
3298  compression_level,
3299  "vega_json",
3300  vega_json,
3301  "nonce",
3302  nonce);
3303  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3304  stdlog.appendNameValuePairs("nonce", nonce);
3305  auto session_ptr = stdlog.getConstSessionInfo();
3306  if (!render_handler_) {
3307  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
3308  }
3309 
3310  _return.total_time_ms = measure<>::execution([&]() {
3311  try {
3312  render_handler_->render_vega(
3313  _return,
3314  std::make_shared<Catalog_Namespace::SessionInfo>(*session_ptr),
3315  widget_id,
3316  vega_json,
3317  compression_level,
3318  nonce);
3319  } catch (std::exception& e) {
3320  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3321  }
3322  });
3323 }
3324 
3326  int32_t dashboard_id,
3327  AccessPrivileges requestedPermissions) {
3328  DBObject object(dashboard_id, DashboardDBObjectType);
3329  auto& catalog = session_info.getCatalog();
3330  auto& user = session_info.get_currentUser();
3331  object.loadKey(catalog);
3332  object.setPrivileges(requestedPermissions);
3333  std::vector<DBObject> privs = {object};
3334  return SysCatalog::instance().checkPrivileges(user, privs);
3335 }
3336 
3337 // dashboards
3338 void DBHandler::get_dashboard(TDashboard& dashboard,
3339  const TSessionId& session,
3340  const int32_t dashboard_id) {
3341  auto stdlog = STDLOG(get_session_ptr(session));
3342  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3343  auto session_ptr = stdlog.getConstSessionInfo();
3344  auto const& cat = session_ptr->getCatalog();
3346  auto dash = cat.getMetadataForDashboard(dashboard_id);
3347  if (!dash) {
3348  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
3349  " doesn't exist");
3350  }
3352  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3353  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
3354  std::to_string(dashboard_id));
3355  }
3356  user_meta.userName = "";
3357  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3358  auto objects_list = SysCatalog::instance().getMetadataForObject(
3359  cat.getCurrentDB().dbId,
3360  static_cast<int>(DBObjectType::DashboardDBObjectType),
3361  dashboard_id);
3362  dashboard.dashboard_name = dash->dashboardName;
3363  dashboard.dashboard_state = dash->dashboardState;
3364  dashboard.image_hash = dash->imageHash;
3365  dashboard.update_time = dash->updateTime;
3366  dashboard.dashboard_metadata = dash->dashboardMetadata;
3367  dashboard.dashboard_owner = dash->user;
3368  dashboard.dashboard_id = dash->dashboardId;
3369  if (objects_list.empty() ||
3370  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3371  dashboard.is_dash_shared = false;
3372  } else {
3373  dashboard.is_dash_shared = true;
3374  }
3375 }
3376 
3377 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
3378  const TSessionId& session) {
3379  auto stdlog = STDLOG(get_session_ptr(session));
3380  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3381  auto session_ptr = stdlog.getConstSessionInfo();
3382  auto const& cat = session_ptr->getCatalog();
3384  const auto dashes = cat.getAllDashboardsMetadata();
3385  user_meta.userName = "";
3386  for (const auto d : dashes) {
3387  SysCatalog::instance().getMetadataForUserById(d->userId, user_meta);
3389  *session_ptr, d->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3390  auto objects_list = SysCatalog::instance().getMetadataForObject(
3391  cat.getCurrentDB().dbId,
3392  static_cast<int>(DBObjectType::DashboardDBObjectType),
3393  d->dashboardId);
3394  TDashboard dash;
3395  dash.dashboard_name = d->dashboardName;
3396  dash.image_hash = d->imageHash;
3397  dash.update_time = d->updateTime;
3398  dash.dashboard_metadata = d->dashboardMetadata;
3399  dash.dashboard_id = d->dashboardId;
3400  dash.dashboard_owner = d->user;
3401  // dashboardState is intentionally not populated here
3402  // for payload reasons
3403  // use get_dashboard call to get state
3404  if (objects_list.empty() ||
3405  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3406  dash.is_dash_shared = false;
3407  } else {
3408  dash.is_dash_shared = true;
3409  }
3410  dashboards.push_back(dash);
3411  }
3412  }
3413 }
3414 
3415 int32_t DBHandler::create_dashboard(const TSessionId& session,
3416  const std::string& dashboard_name,
3417  const std::string& dashboard_state,
3418  const std::string& image_hash,
3419  const std::string& dashboard_metadata) {
3420  auto stdlog = STDLOG(get_session_ptr(session));
3421  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3422  auto session_ptr = stdlog.getConstSessionInfo();
3423  check_read_only("create_dashboard");
3424  auto& cat = session_ptr->getCatalog();
3425 
3426  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
3428  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
3429  }
3430 
3431  auto dash = cat.getMetadataForDashboard(
3432  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
3433  if (dash) {
3434  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
3435  }
3436 
3438  dd.dashboardName = dashboard_name;
3439  dd.dashboardState = dashboard_state;
3440  dd.imageHash = image_hash;
3441  dd.dashboardMetadata = dashboard_metadata;
3442  dd.userId = session_ptr->get_currentUser().userId;
3443  dd.user = session_ptr->get_currentUser().userName;
3444 
3445  try {
3446  auto id = cat.createDashboard(dd);
3447  // TODO: transactionally unsafe
3448  SysCatalog::instance().createDBObject(
3449  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
3450  return id;
3451  } catch (const std::exception& e) {
3452  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3453  }
3454 }
3455 
3456 void DBHandler::replace_dashboard(const TSessionId& session,
3457  const int32_t dashboard_id,
3458  const std::string& dashboard_name,
3459  const std::string& dashboard_owner,
3460  const std::string& dashboard_state,
3461  const std::string& image_hash,
3462  const std::string& dashboard_metadata) {
3463  auto stdlog = STDLOG(get_session_ptr(session));
3464  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3465  auto session_ptr = stdlog.getConstSessionInfo();
3466  check_read_only("replace_dashboard");
3467  auto& cat = session_ptr->getCatalog();
3468 
3470  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
3471  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
3472  }
3473 
3475  dd.dashboardName = dashboard_name;
3476  dd.dashboardState = dashboard_state;
3477  dd.imageHash = image_hash;
3478  dd.dashboardMetadata = dashboard_metadata;
3480  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
3481  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
3482  " does not exist");
3483  }
3484  dd.userId = user.userId;
3485  dd.user = dashboard_owner;
3486  dd.dashboardId = dashboard_id;
3487 
3488  try {
3489  cat.replaceDashboard(dd);
3490  } catch (const std::exception& e) {
3491  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3492  }
3493 }
3494 
3495 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
3496  delete_dashboards(session, {dashboard_id});
3497 }
3498 
3499 void DBHandler::delete_dashboards(const TSessionId& session,
3500  const std::vector<int32_t>& dashboard_ids) {
3501  auto stdlog = STDLOG(get_session_ptr(session));
3502  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3503  auto session_ptr = stdlog.getConstSessionInfo();
3504  check_read_only("delete_dashboards");
3505  auto& cat = session_ptr->getCatalog();
3506  // Checks will be performed in catalog
3507  try {
3508  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
3509  } catch (const std::exception& e) {
3510  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3511  }
3512 }
3513 
3514 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
3515  int32_t dashboard_id,
3516  std::vector<std::string> groups) {
3517  const auto session_info = get_session_copy(session);
3518  auto& cat = session_info.getCatalog();
3519  auto dash = cat.getMetadataForDashboard(dashboard_id);
3520  if (!dash) {
3521  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3522  " does not exist");
3523  } else if (session_info.get_currentUser().userId != dash->userId &&
3524  !session_info.get_currentUser().isSuper) {
3525  throw std::runtime_error(
3526  "User should be either owner of dashboard or super user to share/unshare it");
3527  }
3528  std::vector<std::string> valid_groups;
3530  for (auto& group : groups) {
3531  user_meta.isSuper = false; // initialize default flag
3532  if (!SysCatalog::instance().getGrantee(group)) {
3533  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
3534  } else if (!user_meta.isSuper) {
3535  valid_groups.push_back(group);
3536  }
3537  }
3538  return valid_groups;
3539 }
3540 
3541 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
3542  for (auto const& group : groups) {
3543  if (!SysCatalog::instance().getGrantee(group)) {
3544  THROW_MAPD_EXCEPTION("Exception: User/Role '" + group + "' does not exist");
3545  }
3546  }
3547 }
3548 
3550  const Catalog_Namespace::SessionInfo& session_info,
3551  const std::vector<int32_t>& dashboard_ids) {
3552  auto& cat = session_info.getCatalog();
3553  std::map<std::string, std::list<int32_t>> errors;
3554  for (auto const& dashboard_id : dashboard_ids) {
3555  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
3556  if (!dashboard) {
3557  errors["Dashboard id does not exist"].push_back(dashboard_id);
3558  } else if (session_info.get_currentUser().userId != dashboard->userId &&
3559  !session_info.get_currentUser().isSuper) {
3560  errors["User should be either owner of dashboard or super user to share/unshare it"]
3561  .push_back(dashboard_id);
3562  }
3563  }
3564  if (!errors.empty()) {
3565  std::stringstream error_stream;
3566  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
3567  for (const auto& [error, id_list] : errors) {
3568  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
3569  }
3570  THROW_MAPD_EXCEPTION(error_stream.str());
3571  }
3572 }
3573 
3574 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
3575  const std::vector<int32_t>& dashboard_ids,
3576  const std::vector<std::string>& groups,
3577  const TDashboardPermissions& permissions,
3578  const bool do_share) {
3579  auto stdlog = STDLOG(get_session_ptr(session));
3580  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3581  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
3582  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3583  !permissions.view_) {
3584  THROW_MAPD_EXCEPTION("At least one privilege should be assigned for " +
3585  std::string(do_share ? "grants" : "revokes"));
3586  }
3587  auto session_ptr = stdlog.getConstSessionInfo();
3588  auto const& catalog = session_ptr->getCatalog();
3589  auto& sys_catalog = SysCatalog::instance();
3590  validateGroups(groups);
3591  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
3592  std::vector<DBObject> batch_objects;
3593  for (auto const& dashboard_id : dashboard_ids) {
3594  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
3595  AccessPrivileges privs;
3596  if (permissions.delete_) {
3598  }
3599  if (permissions.create_) {
3601  }
3602  if (permissions.edit_) {
3604  }
3605  if (permissions.view_) {
3607  }
3608  object.setPrivileges(privs);
3609  batch_objects.push_back(object);
3610  }
3611  if (do_share) {
3612  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
3613  } else {
3614  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
3615  }
3616 }
3617 
3618 void DBHandler::share_dashboards(const TSessionId& session,
3619  const std::vector<int32_t>& dashboard_ids,
3620  const std::vector<std::string>& groups,
3621  const TDashboardPermissions& permissions) {
3622  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
3623 }
3624 
3625 // NOOP: Grants not available for objects as of now
3626 void DBHandler::share_dashboard(const TSessionId& session,
3627  const int32_t dashboard_id,
3628  const std::vector<std::string>& groups,
3629  const std::vector<std::string>& objects,
3630  const TDashboardPermissions& permissions,
3631  const bool grant_role = false) {
3632  share_dashboards(session, {dashboard_id}, groups, permissions);
3633 }
3634 
3635 void DBHandler::unshare_dashboards(const TSessionId& session,
3636  const std::vector<int32_t>& dashboard_ids,
3637  const std::vector<std::string>& groups,
3638  const TDashboardPermissions& permissions) {
3639  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
3640 }
3641 
3642 void DBHandler::unshare_dashboard(const TSessionId& session,
3643  const int32_t dashboard_id,
3644  const std::vector<std::string>& groups,
3645  const std::vector<std::string>& objects,
3646  const TDashboardPermissions& permissions) {
3647  unshare_dashboards(session, {dashboard_id}, groups, permissions);
3648 }
3649 
3651  std::vector<TDashboardGrantees>& dashboard_grantees,
3652  const TSessionId& session,
3653  int32_t dashboard_id) {
3654  auto stdlog = STDLOG(get_session_ptr(session));
3655  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3656  auto session_ptr = stdlog.getConstSessionInfo();
3657  auto const& cat = session_ptr->getCatalog();
3659  auto dash = cat.getMetadataForDashboard(dashboard_id);
3660  if (!dash) {
3661  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3662  " does not exist");
3663  } else if (session_ptr->get_currentUser().userId != dash->userId &&
3664  !session_ptr->get_currentUser().isSuper) {
3666  "User should be either owner of dashboard or super user to access grantees");
3667  }
3668  std::vector<ObjectRoleDescriptor*> objectsList;
3669  objectsList = SysCatalog::instance().getMetadataForObject(
3670  cat.getCurrentDB().dbId,
3671  static_cast<int>(DBObjectType::DashboardDBObjectType),
3672  dashboard_id); // By default objecttypecan be only dashabaords
3673  user_meta.userId = -1;
3674  user_meta.userName = "";
3675  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3676  for (auto object : objectsList) {
3677  if (user_meta.userName == object->roleName) {
3678  // Mask owner
3679  continue;
3680  }
3681  TDashboardGrantees grantee;
3682  TDashboardPermissions perm;
3683  grantee.name = object->roleName;
3684  grantee.is_user = object->roleType;
3685  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3686  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3687  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3688  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3689  grantee.permissions = perm;
3690  dashboard_grantees.push_back(grantee);
3691  }
3692 }
3693 
3694 void DBHandler::create_link(std::string& _return,
3695  const TSessionId& session,
3696  const std::string& view_state,
3697  const std::string& view_metadata) {
3698  auto stdlog = STDLOG(get_session_ptr(session));
3699  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3700  auto session_ptr = stdlog.getConstSessionInfo();
3701  // check_read_only("create_link");
3702  auto& cat = session_ptr->getCatalog();
3703 
3704  LinkDescriptor ld;
3705  ld.userId = session_ptr->get_currentUser().userId;
3706  ld.viewState = view_state;
3707  ld.viewMetadata = view_metadata;
3708 
3709  try {
3710  _return = cat.createLink(ld, 6);
3711  } catch (const std::exception& e) {
3712  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3713  }
3714 }
3715 
3717  const std::string& name,
3718  const bool is_array) {
3719  TColumnType ct;
3720  ct.col_name = name;
3721  ct.col_type.type = type;
3722  ct.col_type.is_array = is_array;
3723  return ct;
3724 }
3725 
3726 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
3727  const import_export::CopyParams& copy_params) {
3728  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
3729  if (std::find(shp_ext.begin(),
3730  shp_ext.end(),
3731  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
3732  shp_ext.end()) {
3733  for (auto ext : shp_ext) {
3734  auto aux_file = file_path;
3736  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
3737  copy_params) &&
3739  aux_file.replace_extension(ext).string(), copy_params)) {
3740  throw std::runtime_error("required file for shapefile does not exist: " +
3741  aux_file.filename().string());
3742  }
3743  }
3744  }
3745 }
3746 
3747 void DBHandler::create_table(const TSessionId& session,
3748  const std::string& table_name,
3749  const TRowDescriptor& rd,
3750  const TFileType::type file_type,
3751  const TCreateParams& create_params) {
3752  auto stdlog = STDLOG("table_name", table_name);
3753  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3754  check_read_only("create_table");
3755 
3756  if (ImportHelpers::is_reserved_name(table_name)) {
3757  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
3758  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
3759  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
3760  }
3761 
3762  auto rds = rd;
3763 
3764  // no longer need to manually add the poly column for a TFileType::POLYGON table
3765  // a column of the correct geo type has already been added
3766  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
3767 
3768  std::string stmt{"CREATE TABLE " + table_name};
3769  std::vector<std::string> col_stmts;
3770 
3771  for (auto col : rds) {
3772  if (ImportHelpers::is_reserved_name(col.col_name)) {
3773  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
3774  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
3775  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
3776  }
3777  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
3778  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
3779  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
3780  " for column: " + col.col_name);
3781  }
3782 
3783  if (col.col_type.type == TDatumType::DECIMAL) {
3784  // if no precision or scale passed in set to default 14,7
3785  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
3786  col.col_type.precision = 14;
3787  col.col_type.scale = 7;
3788  }
3789  }
3790 
3791  std::string col_stmt;
3792  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
3793 
3794  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
3795  // `nullable` argument, leading this to default to false. Uncomment for v2.
3796  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
3797 
3798  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
3799  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
3800  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
3801  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
3802  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
3803  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
3804  }
3805  } else if (col.col_type.type == TDatumType::STR) {
3806  // non DICT encoded strings
3807  col_stmt.append(" ENCODING NONE");
3808  } else if (col.col_type.type == TDatumType::POINT ||
3809  col.col_type.type == TDatumType::LINESTRING ||
3810  col.col_type.type == TDatumType::POLYGON ||
3811  col.col_type.type == TDatumType::MULTIPOLYGON) {
3812  // non encoded compressable geo
3813  if (col.col_type.scale == 4326) {
3814  col_stmt.append(" ENCODING NONE");
3815  }
3816  }
3817  col_stmts.push_back(col_stmt);
3818  }
3819 
3820  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
3821 
3822  if (create_params.is_replicated) {
3823  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
3824  }
3825 
3826  stmt.append(";");
3827 
3828  TQueryResult ret;
3829  sql_execute(ret, session, stmt, true, "", -1, -1);
3830 }
3831 
3832 void DBHandler::import_table(const TSessionId& session,
3833  const std::string& table_name,
3834  const std::string& file_name_in,
3835  const TCopyParams& cp) {
3836  try {
3837  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3838  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3839  auto session_ptr = stdlog.getConstSessionInfo();
3840  check_read_only("import_table");
3841  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
3842 
3843  auto& cat = session_ptr->getCatalog();
3844  const auto td_with_lock =
3846  cat, table_name);
3847  const auto td = td_with_lock();
3848  CHECK(td);
3849  check_table_load_privileges(*session_ptr, table_name);
3850 
3851  std::string file_name{file_name_in};
3852  auto file_path = boost::filesystem::path(file_name);
3854  if (!boost::istarts_with(file_name, "s3://")) {
3855  if (!boost::filesystem::path(file_name).is_absolute()) {
3856  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3857  boost::filesystem::path(file_name).filename();
3858  file_name = file_path.string();
3859  }
3860  if (!boost::filesystem::exists(file_path)) {
3861  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3862  }
3863  }
3865 
3866  // TODO(andrew): add delimiter detection to Importer
3867  if (copy_params.delimiter == '\0') {
3868  copy_params.delimiter = ',';
3869  if (boost::filesystem::extension(file_path) == ".tsv") {
3870  copy_params.delimiter = '\t';
3871  }
3872  }
3873 
3874  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3875  session_ptr->getCatalog(), table_name);
3876  std::unique_ptr<import_export::Importer> importer;
3877  if (leaf_aggregator_.leafCount() > 0) {
3878  importer.reset(new import_export::Importer(
3879  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
3880  file_path.string(),
3881  copy_params));
3882  } else {
3883  importer.reset(
3884  new import_export::Importer(cat, td, file_path.string(), copy_params));
3885  }
3886  auto ms = measure<>::execution([&]() { importer->import(); });
3887  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
3888  } catch (const std::exception& e) {
3889  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
3890  }
3891 }
3892 
3893 namespace {
3894 
3895 // helper functions for error checking below
3896 // these would usefully be added as methods of TDatumType
3897 // but that's not possible as it's auto-generated by Thrift
3898 
3900  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
3901  t == TDatumType::LINESTRING || t == TDatumType::POINT);
3902 }
3903 
3904 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3905 
3906 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
3907  std::stringstream ss;
3908  ss << t;
3909  return ss.str();
3910 }
3911 
3912 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
3913  std::string result;
3914  switch (p) {
3915  case SQLTypes::kGEOGRAPHY:
3916  result = "GEOGRAPHY";
3917  break;
3918  case SQLTypes::kGEOMETRY:
3919  result = "GEOMETRY";
3920  break;
3921  default:
3922  result = "INVALID";
3923  break;
3924  }
3925  return result;
3926 }
3927 
3928 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
3929  std::stringstream ss;
3930  ss << t;
3931  return ss.str();
3932 }
3933 
3934 #endif
3935 
3936 } // namespace
3937 
3938 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
3939  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
3940  "' to table '" + table_name + "'. Column '" + cd->columnName + \
3941  "' " + attr + " mismatch (got '" + got + "', expected '" + \
3942  expected + "')");
3943 
3944 void DBHandler::import_geo_table(const TSessionId& session,
3945  const std::string& table_name,
3946  const std::string& file_name_in,
3947  const TCopyParams& cp,
3948  const TRowDescriptor& row_desc,
3949  const TCreateParams& create_params) {
3950  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3951  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3952  auto session_ptr = stdlog.getConstSessionInfo();
3953  check_read_only("import_table");
3954  auto& cat = session_ptr->getCatalog();
3955 
3957 
3958  std::string file_name{file_name_in};
3959 
3960  if (path_is_relative(file_name)) {
3961  // assume relative paths are relative to data_path / mapd_import / <session>
3962  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3963  boost::filesystem::path(file_name).filename();
3964  file_name = file_path.string();
3965  }
3967 
3968  if (is_a_supported_geo_file(file_name, true)) {
3969  // prepare to load geo file directly
3970  add_vsi_network_prefix(file_name);
3971  add_vsi_geo_prefix(file_name);
3972  } else if (is_a_supported_archive_file(file_name)) {
3973  // find the archive file
3974  add_vsi_network_prefix(file_name);
3975  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
3976  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3977  }
3978  // find geo file in archive
3979  add_vsi_archive_prefix(file_name);
3980  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3981  // prepare to load that geo file
3982  if (geo_file.size()) {
3983  file_name = file_name + std::string("/") + geo_file;
3984  }
3985  } else {
3986  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
3987  file_name_in);
3988  }
3989 
3990  // log what we're about to try to do
3991  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
3992  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
3993 
3994  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
3995  auto file_path = boost::filesystem::path(file_name);
3996  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3997  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
3998  }
3999 
4000  // use GDAL to check any dependent files exist (ditto)
4001  try {
4002  check_geospatial_files(file_path, copy_params);
4003  } catch (const std::exception& e) {
4004  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4005  }
4006 
4007  // get layer info and deconstruct
4008  // in general, we will get a combination of layers of these four types:
4009  // EMPTY: no rows, report and skip
4010  // GEO: create a geo table from this
4011  // NON_GEO: create a regular table from this
4012  // UNSUPPORTED_GEO: report and skip
4013  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
4014  try {
4015  layer_info = import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4016  } catch (const std::exception& e) {
4017  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
4018  }
4019 
4020  // categorize the results
4021  using LayerNameToContentsMap =
4022  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
4023  LayerNameToContentsMap load_layers;
4024  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
4025  for (const auto& layer : layer_info) {
4026  switch (layer.contents) {
4028  LOG(INFO) << "import_geo_table: '" << layer.name
4029  << "' (will import as geo table)";
4030  load_layers[layer.name] = layer.contents;
4031  break;
4033  LOG(INFO) << "import_geo_table: '" << layer.name
4034  << "' (will import as regular table)";
4035  load_layers[layer.name] = layer.contents;
4036  break;
4038  LOG(WARNING) << "import_geo_table: '" << layer.name
4039  << "' (will not import, unsupported geo type)";
4040  break;
4042  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
4043  break;
4044  default:
4045  break;
4046  }
4047  }
4048 
4049  // if nothing is loadable, stop now
4050  if (load_layers.size() == 0) {
4051  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
4052  }
4053 
4054  // if we've been given an explicit layer name, check that it exists and is loadable
4055  // scan the original list, as it may exist but not have been gathered as loadable
4056  if (copy_params.geo_layer_name.size()) {
4057  bool found = false;
4058  for (const auto& layer : layer_info) {
4059  if (copy_params.geo_layer_name == layer.name) {
4062  // forget all the other layers and just load this one
4063  load_layers.clear();
4064  load_layers[layer.name] = layer.contents;
4065  found = true;
4066  break;
4067  } else if (layer.contents ==
4069  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4070  copy_params.geo_layer_name +
4071  "' has unsupported geo type!");
4072  } else if (layer.contents ==
4074  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4075  copy_params.geo_layer_name + "' is empty!");
4076  }
4077  }
4078  }
4079  if (!found) {
4080  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
4081  copy_params.geo_layer_name + "' not found!");
4082  }
4083  }
4084 
4085  // Immerse import of multiple layers is not yet supported
4086  // @TODO fix this!
4087  if (row_desc.size() > 0 && load_layers.size() > 1) {
4089  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
4090  }
4091 
4092  // one definition of layer table name construction
4093  // we append the layer name if we're loading more than one table
4094  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
4095  const std::string& layer_name) {
4096  if (load_layers.size() > 1) {
4097  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
4098  if (sanitized_layer_name != layer_name) {
4099  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
4100  << sanitized_layer_name << "' for table name";
4101  }
4102  return table_name + "_" + sanitized_layer_name;
4103  }
4104  return table_name;
4105  };
4106 
4107  // if we're importing multiple tables, then NONE of them must exist already
4108  if (load_layers.size() > 1) {
4109  for (const auto& layer : load_layers) {
4110  // construct table name
4111  auto this_table_name = construct_layer_table_name(table_name, layer.first);
4112 
4113  // table must not exist
4114  if (cat.getMetadataForTable(this_table_name)) {
4115  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
4116  "' already exists, aborting!");
4117  }
4118  }
4119  }
4120 
4121  // prepare to gather errors that would otherwise be exceptions, as we can only throw
4122  // one
4123  std::vector<std::string> caught_exception_messages;
4124 
4125  // prepare to time multi-layer import
4126  double total_import_ms = 0.0;
4127 
4128  // now we're safe to start importing
4129  // we loop over the layers we're going to attempt to load
4130  for (const auto& layer : load_layers) {
4131  // unpack
4132  const auto& layer_name = layer.first;
4133  const auto& layer_contents = layer.second;
4134  bool is_geo_layer =
4136 
4137  // construct table name again
4138  auto this_table_name = construct_layer_table_name(table_name, layer_name);
4139 
4140  // report
4141  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
4142 
4143  // we need a row descriptor
4144  TRowDescriptor rd;
4145  if (row_desc.size() > 0) {
4146  // we have a valid RowDescriptor
4147  // this is the case where Immerse has already detected and created
4148  // all we need to do is import and trust that the data will match
4149  // use the provided row descriptor
4150  // table must already exist (we check this below)
4151  rd = row_desc;
4152  } else {
4153  // we don't have a RowDescriptor
4154  // we have to detect the file ourselves
4155  TDetectResult cds;
4156  TCopyParams cp_copy = cp; // retain S3 auth tokens
4157  cp_copy.geo_layer_name = layer_name;
4158  cp_copy.file_type = TFileType::POLYGON;
4159  try {
4160  detect_column_types(cds, session, file_name_in, cp_copy);
4161  } catch (const std::exception& e) {
4162  // capture the error and abort this layer
4163  caught_exception_messages.emplace_back(
4164  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
4165  continue;
4166  }
4167  rd = cds.row_set.row_desc;
4168 
4169  // then, if the table does NOT already exist, create it
4170  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4171  if (!td) {
4172  try {
4173  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
4174  } catch (const std::exception& e) {
4175  // capture the error and abort this layer
4176  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
4177  layer_name + "':" + e.what());
4178  continue;
4179  }
4180  }
4181  }
4182 
4183  // by this point, the table should exist, one way or another
4184  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4185  if (!td) {
4186  // capture the error and abort this layer
4187  std::string exception_message =
4188  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
4189  this_table_name + "'; table does not exist or failed to create.";
4190  caught_exception_messages.emplace_back(exception_message);
4191  continue;
4192  }
4193 
4194  // then, we have to verify that the structure matches
4195  // get column descriptors (non-system, non-deleted, logical columns only)
4196  const auto col_descriptors =
4197  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4198 
4199  // first, compare the column count
4200  if (col_descriptors.size() != rd.size()) {
4201  // capture the error and abort this layer
4202  std::string exception_message =
4203  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
4204  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
4205  ", expecting " + std::to_string(col_descriptors.size()) + ")";
4206  caught_exception_messages.emplace_back(exception_message);
4207  continue;
4208  }
4209 
4210  try {
4211  // then the names and types
4212  int rd_index = 0;
4213  for (auto cd : col_descriptors) {
4214  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
4215  std::string gname = rd[rd_index].col_name; // got
4216  std::string ename = cd->columnName; // expecting
4217 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4218  TTypeInfo gti = rd[rd_index].col_type; // got
4219 #endif
4220  TTypeInfo eti = cd_col_type.col_type; // expecting
4221  // check for name match
4222  if (gname != ename) {
4223  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
4224  gname == OMNISCI_GEO_PREFIX) {
4225  // rename incoming geo column to match existing legacy default geo column
4226  rd[rd_index].col_name = gname;
4227  LOG(INFO)
4228  << "import_geo_table: Renaming incoming geo column to match existing "
4229  "legacy default geo column";
4230  } else {
4231  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
4232  }
4233  }
4234 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4235  // check for type attributes match
4236  // these attrs must always match regardless of type
4237  if (gti.type != eti.type) {
4239  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
4240  }
4241  if (gti.is_array != eti.is_array) {
4243  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
4244  }
4245  if (gti.nullable != eti.nullable) {
4247  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
4248  }
4249  if (TTypeInfo_IsGeo(eti.type)) {
4250  // for geo, only these other attrs must also match
4251  // encoding and comp_param are allowed to differ
4252  // this allows appending to existing geo table
4253  // without needing to know the existing encoding
4254  if (gti.precision != eti.precision) {
4256  "geo sub-type",
4257  TTypeInfo_GeoSubTypeToString(gti.precision),
4258  TTypeInfo_GeoSubTypeToString(eti.precision));
4259  }
4260  if (gti.scale != eti.scale) {
4262  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
4263  }
4264  if (gti.encoding != eti.encoding) {
4265  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
4266  }
4267  if (gti.comp_param != eti.comp_param) {
4268  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
4269  }
4270  } else {
4271  // non-geo, all other attrs must also match
4272  // @TODO consider relaxing some of these dependent on type
4273  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
4274  if (gti.precision != eti.precision) {
4276  std::to_string(gti.precision),
4277  std::to_string(eti.precision));
4278  }
4279  if (gti.scale != eti.scale) {
4281  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
4282  }
4283  if (gti.encoding != eti.encoding) {
4285  "encoding",
4286  TTypeInfo_EncodingToString(gti.encoding),
4287  TTypeInfo_EncodingToString(eti.encoding));
4288  }
4289  if (gti.comp_param != eti.comp_param) {
4291  std::to_string(gti.comp_param),
4292  std::to_string(eti.comp_param));
4293  }
4294  }
4295 #endif
4296  rd_index++;
4297  }
4298  } catch (const std::exception& e) {
4299  // capture the error and abort this layer
4300  caught_exception_messages.emplace_back(e.what());
4301  continue;
4302  }
4303 
4304  std::map<std::string, std::string> colname_to_src;
4305  for (auto r : rd) {
4306  colname_to_src[r.col_name] =
4307  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
4308  }
4309 
4310  try {
4311  check_table_load_privileges(*session_ptr, this_table_name);
4312  } catch (const std::exception& e) {
4313  // capture the error and abort this layer
4314  caught_exception_messages.emplace_back(e.what());
4315  continue;
4316  }
4317 
4318  if (is_geo_layer) {
4319  // Final check to ensure that we actually have a geo column
4320  // of the expected name and type before doing the actual import,
4321  // in case the user naively overrode the name or type in Immerse
4322  // Preview (which as of 6/8/18 it still allows you to do).
4323  // This avoids a fatal assert later when it fails to find the
4324  // column. We should make Immerse more robust and disallow this.
4325  bool have_geo_column_with_correct_name = false;
4326  for (const auto& r : rd) {
4327  if (TTypeInfo_IsGeo(r.col_type.type)) {
4328  // TODO(team): allow user to override the geo column name
4329  if (r.col_name == OMNISCI_GEO_PREFIX) {
4330  have_geo_column_with_correct_name = true;
4331  } else if (r.col_name == LEGACY_GEO_PREFIX) {
4332  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
4333  // Normalize column names for geo append with legacy column naming scheme
4334  colname_to_src[r.col_name] = r.col_name;
4335  have_geo_column_with_correct_name = true;
4336  }
4337  }
4338  }
4339  if (!have_geo_column_with_correct_name) {
4340  std::string exception_message = "Table " + this_table_name +
4341  " does not have a geo column with name '" +
4342  OMNISCI_GEO_PREFIX + "'. Import aborted!";
4343  caught_exception_messages.emplace_back(exception_message);
4344  continue;
4345  }
4346  }
4347 
4348  try {
4349  // import this layer only?
4350  copy_params.geo_layer_name = layer_name;
4351 
4352  // create an importer
4353  std::unique_ptr<import_export::Importer> importer;
4354  if (leaf_aggregator_.leafCount() > 0) {
4355  importer.reset(new import_export::Importer(
4356  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4357  file_path.string(),
4358  copy_params));
4359  } else {
4360  importer.reset(
4361  new import_export::Importer(cat, td, file_path.string(), copy_params));
4362  }
4363 
4364  // import
4365  auto ms = measure<>::execution([&]() { importer->importGDAL(colname_to_src); });
4366  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
4367  << "s";
4368  total_import_ms += ms;
4369  } catch (const std::exception& e) {
4370  std::string exception_message =
4371  "Import of Layer '" + this_table_name + "' failed: " + e.what();
4372  caught_exception_messages.emplace_back(exception_message);
4373  continue;
4374  }
4375  }
4376 
4377  // did we catch any exceptions?
4378  if (caught_exception_messages.size()) {
4379  // combine all the strings into one and throw a single Thrift exception
4380  std::string combined_exception_message = "Failed to import geo file:\n";
4381  for (const auto& message : caught_exception_messages) {
4382  combined_exception_message += message + "\n";
4383  }
4384  THROW_MAPD_EXCEPTION(combined_exception_message);
4385  } else {
4386  // report success and total time
4387  LOG(INFO) << "Import Successful!";
4388  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
4389  }
4390 }
4391 
4392 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
4393 
4394 void DBHandler::import_table_status(TImportStatus& _return,
4395  const TSessionId& session,
4396  const std::string& import_id) {
4397  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
4398  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4399  auto is = import_export::Importer::get_import_status(import_id);
4400  _return.elapsed = is.elapsed.count();
4401  _return.rows_completed = is.rows_completed;
4402  _return.rows_estimated = is.rows_estimated;
4403  _return.rows_rejected = is.rows_rejected;
4404 }
4405 
4407  const TSessionId& session,
4408  const std::string& archive_path_in,
4409  const TCopyParams& copy_params) {
4410  auto stdlog =
4411  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
4412  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4413 
4414  std::string archive_path(archive_path_in);
4415 
4416  if (path_is_relative(archive_path)) {
4417  // assume relative paths are relative to data_path / mapd_import / <session>
4418  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4419  boost::filesystem::path(archive_path).filename();
4420  archive_path = file_path.string();
4421  }
4422  validate_import_file_path_if_local(archive_path);
4423 
4424  if (is_a_supported_archive_file(archive_path)) {
4425  // find the archive file
4426  add_vsi_network_prefix(archive_path);
4427  if (!import_export::Importer::gdalFileExists(archive_path,
4428  thrift_to_copyparams(copy_params))) {
4429  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4430  }
4431  // find geo file in archive
4432  add_vsi_archive_prefix(archive_path);
4433  std::string geo_file =
4434  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
4435  // what did we get?
4436  if (geo_file.size()) {
4437  // prepend it with the original path
4438  _return = archive_path_in + std::string("/") + geo_file;
4439  } else {
4440  // just return the original path
4441  _return = archive_path_in;
4442  }
4443  } else {
4444  // just return the original path
4445  _return = archive_path_in;
4446  }
4447 }
4448 
4449 void DBHandler::get_all_files_in_archive(std::vector<std::string>& _return,
4450  const TSessionId& session,
4451  const std::string& archive_path_in,
4452  const TCopyParams& copy_params) {
4453  auto stdlog =
4454  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
4455  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4456 
4457  std::string archive_path(archive_path_in);
4458  if (path_is_relative(archive_path)) {
4459  // assume relative paths are relative to data_path / mapd_import / <session>
4460  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4461  boost::filesystem::path(archive_path).filename();
4462  archive_path = file_path.string();
4463  }
4464  validate_import_file_path_if_local(archive_path);
4465 
4466  if (is_a_supported_archive_file(archive_path)) {
4467  // find the archive file
4468  add_vsi_network_prefix(archive_path);
4469  if (!import_export::Importer::gdalFileExists(archive_path,
4470  thrift_to_copyparams(copy_params))) {
4471  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4472  }
4473  // find all files in archive
4474  add_vsi_archive_prefix(archive_path);
4476  archive_path, thrift_to_copyparams(copy_params));
4477  // prepend them all with original path
4478  for (auto& s : _return) {
4479  s = archive_path_in + '/' + s;
4480  }
4481  }
4482 }
4483 
4484 void DBHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
4485  const TSessionId& session,
4486  const std::string& file_name_in,
4487  const TCopyParams& cp) {
4488  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
4489  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4490 
4491  std::string file_name(file_name_in);
4492 
4494 
4495  // handle relative paths
4496  if (path_is_relative(file_name)) {
4497  // assume relative paths are relative to data_path / mapd_import / <session>
4498  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4499  boost::filesystem::path(file_name).filename();
4500  file_name = file_path.string();
4501  }
4503 
4504  // validate file_name
4505  if (is_a_supported_geo_file(file_name, true)) {
4506  // prepare to load geo file directly
4507  add_vsi_network_prefix(file_name);
4508  add_vsi_geo_prefix(file_name);
4509  } else if (is_a_supported_archive_file(file_name)) {
4510  // find the archive file
4511  add_vsi_network_prefix(file_name);
4512  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4513  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4514  }
4515  // find geo file in archive
4516  add_vsi_archive_prefix(file_name);
4517  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4518  // prepare to load that geo file
4519  if (geo_file.size()) {
4520  file_name = file_name + std::string("/") + geo_file;
4521  }
4522  } else {
4523  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4524  file_name_in);
4525  }
4526 
4527  // check the file actually exists
4528  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4529  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
4530  }
4531 
4532  // find all layers
4533  auto internal_layer_info =
4534  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4535 
4536  // convert to Thrift type
4537  for (const auto& internal_layer : internal_layer_info) {
4538  TGeoFileLayerInfo layer;
4539  layer.name = internal_layer.name;
4540  switch (internal_layer.contents) {
4542  layer.contents = TGeoFileLayerContents::EMPTY;
4543  break;
4545  layer.contents = TGeoFileLayerContents::GEO;
4546  break;
4548  layer.contents = TGeoFileLayerContents::NON_GEO;
4549  break;
4551  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
4552  break;
4553  default:
4554  CHECK(false);
4555  }
4556  _return.emplace_back(layer); // no suitable constructor to just pass parameters
4557  }
4558 }
4559 
4560 void DBHandler::start_heap_profile(const TSessionId& session) {
4561  auto stdlog = STDLOG(get_session_ptr(session));
4562 #ifdef HAVE_PROFILER
4563  if (IsHeapProfilerRunning()) {
4564  THROW_MAPD_EXCEPTION("Profiler already started");
4565  }
4566  HeapProfilerStart("omnisci");
4567 #else
4568  THROW_MAPD_EXCEPTION("Profiler not enabled");
4569 #endif // HAVE_PROFILER
4570 }
4571 
4572 void DBHandler::stop_heap_profile(const TSessionId& session) {
4573  auto stdlog = STDLOG(get_session_ptr(session));
4574 #ifdef HAVE_PROFILER
4575  if (!IsHeapProfilerRunning()) {
4576  THROW_MAPD_EXCEPTION("Profiler not running");
4577  }
4578  HeapProfilerStop();
4579 #else
4580  THROW_MAPD_EXCEPTION("Profiler not enabled");
4581 #endif // HAVE_PROFILER
4582 }
4583 
4584 void DBHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
4585  auto stdlog = STDLOG(get_session_ptr(session));
4586 #ifdef HAVE_PROFILER
4587  if (!IsHeapProfilerRunning()) {
4588  THROW_MAPD_EXCEPTION("Profiler not running");
4589  }
4590  auto profile_buff = GetHeapProfile();
4591  profile = profile_buff;
4592  free(profile_buff);
4593 #else
4594  THROW_MAPD_EXCEPTION("Profiler not enabled");
4595 #endif // HAVE_PROFILER
4596 }
4597 
4598 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
4599 void DBHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
4600  if (session_it->second.use_count() > 2 ||
4601  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
4602  // SessionInfo is being used in more than one active operation. Original copy + one
4603  // stored in StdLog. Skip the checks.
4604  return;
4605  }
4606  time_t last_used_time = session_it->second->get_last_used_time();
4607  time_t start_time = session_it->second->get_start_time();
4608  const auto current_session_duration = time(0) - last_used_time;
4609  if (current_session_duration > idle_session_duration_) {
4610  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
4611  << " idle duration " << current_session_duration
4612  << " seconds exceeds maximum idle duration " << idle_session_duration_
4613  << " seconds. Invalidating session.";
4614  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
4615  }
4616  const auto total_session_duration = time(0) - start_time;
4617  if (total_session_duration > max_session_duration_) {
4618  LOG(INFO) << "Session " << session_it->second->get_public_session_id()
4619  << " total duration " << total_session_duration
4620  << " seconds exceeds maximum total session duration "
4621  << max_session_duration_ << " seconds. Invalidating session.";
4622  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
4623  }
4624 }
4625 
4626 std::shared_ptr<const Catalog_Namespace::SessionInfo> DBHandler::get_const_session_ptr(
4627  const TSessionId& session) {
4628  return get_session_ptr(session);
4629 }
4630 
4632  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4633  return *get_session_it_unsafe(session, read_lock)->second;
4634 }
4635 
4636 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_copy_ptr(
4637  const TSessionId& session) {
4638  // Note(Wamsi): We have `get_const_session_ptr` which would return as const
4639  // SessionInfo stored in the map. You can use `get_const_session_ptr` instead of the
4640  // copy of SessionInfo but beware that it can be changed in teh map. So if you do not
4641  // care about the changes then use `get_const_session_ptr` if you do then use this
4642  // function to get a copy. We should eventually aim to merge both
4643  // `get_const_session_ptr` and `get_session_copy_ptr`.
4644  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4645  auto& session_info_ref = *get_session_it_unsafe(session, read_lock)->second;
4646  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
4647 }
4648 
4649 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::get_session_ptr(
4650  const TSessionId& session_id) {
4651  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
4652  // Should be used only when you need to make updates to original SessionInfo object.
4653  // Currently used by `update_session_last_used_duration`
4654 
4655  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will
4656  // be invalid during disconnect. SessionInfo will be erased from map by the time it
4657  // reaches here. In both the above cases, we would return `nullptr` and can skip
4658  // SessionInfo updates.
4659  if (session_id.empty()) {
4660  return {};
4661  }
4662  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4663  return get_session_it_unsafe(session_id, read_lock)->second;
4664 }
4665 
4667  const Catalog_Namespace::SessionInfo& session_info,
4668  const std::string& table_name) {
4669  auto user_metadata = session_info.get_currentUser();
4670  auto& cat = session_info.getCatalog();
4671  DBObject dbObject(table_name, TableDBObjectType);
4672  dbObject.loadKey(cat);
4674  std::vector<DBObject> privObjects;
4675  privObjects.push_back(dbObject);
4676  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
4677  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
4678  user_metadata.userName + " has no insert privileges for table " +
4679  table_name + ".");
4680  }
4681 }
4682 
4683 void DBHandler::check_table_load_privileges(const TSessionId& session,
4684  const std::string& table_name) {
4685  const auto session_info = get_session_copy(session);
4686  check_table_load_privileges(session_info, table_name);
4687 }
4688 
4690  const TExecuteMode::type mode) {
4691  const std::string& user_name = session_ptr->get_currentUser().userName;
4692  switch (mode) {
4693  case TExecuteMode::GPU:
4694  if (cpu_mode_only_) {
4695  TOmniSciException e;
4696  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
4697  throw e;
4698  }
4700  LOG(INFO) << "User " << user_name << " sets GPU mode.";
4701  break;
4702  case TExecuteMode::CPU:
4704  LOG(INFO) << "User " << user_name << " sets CPU mode.";
4705  break;
4706  }
4707 }
4708 
4709 std::vector<PushedDownFilterInfo> DBHandler::execute_rel_alg(
4710  TQueryResult& _return,
4711  QueryStateProxy query_state_proxy,
4712  const std::string& query_ra,
4713  const bool column_format,
4714  const ExecutorDeviceType executor_device_type,
4715  const int32_t first_n,
4716  const int32_t at_most_n,
4717  const bool just_validate,
4718  const bool find_push_down_candidates,
4719  const ExplainInfo& explain_info,
4720  const std::optional<size_t> executor_index) const {
4721  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4722 
4723  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
4724  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
4725 
4726  const auto& cat = query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog();
4727  auto executor = Executor::getExecutor(
4728  executor_index ? *executor_index : Executor::UNITARY_EXECUTOR_ID,
4729  jit_debug_ ? "/tmp" : "",
4730  jit_debug_ ? "mapdquery" : "",
4732  RelAlgExecutor ra_executor(executor.get(),
4733  cat,
4734  query_ra,
4735  query_state_proxy.getQueryState().shared_from_this());
4736  // handle hints
4737  const auto& query_hints = ra_executor.getParsedQueryHints();
4738  CompilationOptions co = {
4739  query_hints.cpu_mode ? ExecutorDeviceType::CPU : executor_device_type,
4740  /*hoist_literals=*/true,
4743  /*allow_lazy_fetch=*/true,
4744  /*filter_on_deleted_column=*/true,
4750  explain_info.justExplain(),
4751  allow_loop_joins_ || just_validate,
4753  jit_debug_,
4754  just_validate,
4757  find_push_down_candidates,
4758  explain_info.justCalciteExplain(),
4762  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4765  nullptr,
4766  nullptr),
4767  {}};
4768  _return.execution_time_ms += measure<>::execution([&]() {
4769  result = ra_executor.executeRelAlgQuery(co, eo, explain_info.explain_plan, nullptr);
4770  });
4771  // reduce execution time by the time spent during queue waiting
4772  _return.execution_time_ms -= result.getRows()->getQueueTime();
4773  VLOG(1) << cat.getDataMgr().getSystemMemoryUsage();
4774  const auto& filter_push_down_info = result.getPushedDownFilterInfo();
4775  if (!filter_push_down_info.empty()) {
4776  return filter_push_down_info;
4777  }
4778  if (explain_info.justExplain()) {
4779  convert_explain(_return, *result.getRows(), column_format);
4780  } else if (!explain_info.justCalciteExplain()) {
4781  convert_rows(_return,
4782  timer.createQueryStateProxy(),
4783  result.getTargetsMeta(),
4784  *result.getRows(),
4785  column_format,
4786  first_n,
4787  at_most_n);
4788  }
4789  return {};
4790 }
4791 
4792 void DBHandler::execute_rel_alg_df(TDataFrame& _return,
4793  const std::string& query_ra,
4794  QueryStateProxy query_state_proxy,
4795  const Catalog_Namespace::SessionInfo& session_info,
4796  const ExecutorDeviceType device_type,
4797  const size_t device_id,
4798  const int32_t first_n) const {
4799  const auto& cat = session_info.getCatalog();
4800  CHECK(device_type == ExecutorDeviceType::CPU ||
4803  jit_debug_ ? "/tmp" : "",
4804  jit_debug_ ? "mapdquery" : "",
4806  RelAlgExecutor ra_executor(executor.get(),
4807  cat,
4808  query_ra,
4809  query_state_proxy.getQueryState().shared_from_this());
4810  const auto& query_hints = ra_executor.getParsedQueryHints();
4811  CompilationOptions co = {query_hints.cpu_mode ? ExecutorDeviceType::CPU : device_type,
4812  /*hoist_literals=*/true,
4815  /*allow_lazy_fetch=*/true,
4816  /*filter_on_deleted_column=*/true,
4821  false,
4824  jit_debug_,
4825  false,
4828  false,
4829  false,
4833  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4836  nullptr,
4837  nullptr),
4838  {}};
4839  _return.execution_time_ms += measure<>::execution(
4840  [&]() { result = ra_executor.executeRelAlgQuery(co, eo, false, nullptr); });
4841  _return.execution_time_ms -= result.getRows()->getQueueTime();
4842  const auto rs = result.getRows();
4843  const auto converter =
4844  std::make_unique<ArrowResultSetConverter>(rs,
4845  data_mgr_,
4846  device_type,
4847  device_id,
4848  getTargetNames(result.getTargetsMeta()),
4849  first_n);
4850  ArrowResult arrow_result;
4851  _return.arrow_conversion_time_ms +=
4852  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
4853  _return.sm_handle =
4854  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
4855  _return.sm_size = arrow_result.sm_size;
4856  _return.df_handle =
4857  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
4858  if (device_type == ExecutorDeviceType::GPU) {
4859  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
4860  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
4861  ipc_handle_to_dev_ptr_.insert(
4862  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
4863  }
4864  _return.df_size = arrow_result.df_size;
4865 }
4866 
4867 std::vector<TargetMetaInfo> DBHandler::getTargetMetaInfo(
4868  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4869  std::vector<TargetMetaInfo> result;
4870  for (const auto& target : targets) {
4871  CHECK(target);
4872  CHECK(target->get_expr());
4873  result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
4874  }
4875  return result;
4876 }
4877 
4878 std::vector<std::string> DBHandler::getTargetNames(
4879  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4880  std::vector<std::string> names;
4881  for (const auto& target : targets) {
4882  CHECK(target);
4883  CHECK(target->get_expr());
4884  names.push_back(target->get_resname());
4885  }
4886  return names;
4887 }
4888 
4889 std::vector<std::string> DBHandler::getTargetNames(
4890  const std::vector<TargetMetaInfo>& targets) const {
4891  std::vector<std::string> names;
4892  for (const auto& target : targets) {
4893  names.push_back(target.get_resname());
4894  }
4895  return names;
4896 }
4897 
4899  const size_t idx) const {
4900  TColumnType proj_info;
4901  proj_info.col_name = target.get_resname();
4902  if (proj_info.col_name.empty()) {
4903  proj_info.col_name = "result_" + std::to_string(idx + 1);
4904  }
4905  const auto& target_ti = target.get_type_info();
4906  proj_info.col_type.type = type_to_thrift(target_ti);
4907  proj_info.col_type.encoding = encoding_to_thrift(target_ti);
4908  proj_info.col_type.nullable = !target_ti.get_notnull();
4909  proj_info.col_type.is_array = target_ti.get_type() == kARRAY;
4910  if (IS_GEO(target_ti.get_type())) {
4912  proj_info, target_ti.get_subtype(), target_ti.get_output_srid());
4913  } else {
4914  proj_info.col_type.precision = target_ti.get_precision();
4915  proj_info.col_type.scale = target_ti.get_scale();
4916  }
4917  if (target_ti.get_type() == kDATE) {
4918  proj_info.col_type.size = target_ti.get_size();
4919  }
4920  proj_info.col_type.comp_param =
4921  (target_ti.is_date_in_days() && target_ti.get_comp_param() == 0)
4922  ? 32
4923  : target_ti.get_comp_param();
4924  return proj_info;
4925 }
4926 
4928  const std::vector<TargetMetaInfo>& targets) const {
4929  TRowDescriptor row_desc;
4930  size_t i = 0;
4931  for (const auto& target : targets) {
4932  row_desc.push_back(convert_target_metainfo(target, i));
4933  ++i;
4934  }
4935  return row_desc;
4936 }
4937 
4938 template <class R>
4939 void DBHandler::convert_rows(TQueryResult& _return,
4940  QueryStateProxy query_state_proxy,
4941  const std::vector<TargetMetaInfo>& targets,
4942  const R& results,
4943  const bool column_format,
4944  const int32_t first_n,
4945  const int32_t at_most_n) const {
4946  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4947  _return.row_set.row_desc = convert_target_metainfo(targets);
4948  int32_t fetched{0};
4949  if (column_format) {
4950  _return.row_set.is_columnar = true;
4951  std::vector<TColumn> tcolumns(results.colCount());
4952  while (first_n == -1 || fetched < first_n) {
4953  const auto crt_row = results.getNextRow(true, true);
4954  if (crt_row.empty()) {
4955  break;
4956  }
4957  ++fetched;
4958  if (at_most_n >= 0 && fetched > at_most_n) {
4959  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4960  std::to_string(at_most_n));
4961  }
4962  for (size_t i = 0; i < results.colCount(); ++i) {
4963  const auto agg_result = crt_row[i];
4964  value_to_thrift_column(agg_result, targets[i].get_type_info(), tcolumns[i]);
4965  }
4966  }
4967  for (size_t i = 0; i < results.colCount(); ++i) {
4968  _return.row_set.columns.push_back(tcolumns[i]);
4969  }
4970  } else {
4971  _return.row_set.is_columnar = false;
4972  while (first_n == -1 || fetched < first_n) {
4973  const auto crt_row = results.getNextRow(true, true);
4974  if (crt_row.empty()) {
4975  break;
4976  }
4977  ++fetched;
4978  if (at_most_n >= 0 && fetched > at_most_n) {
4979  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4980  std::to_string(at_most_n));
4981  }
4982  TRow trow;
4983  trow.cols.reserve(results.colCount());
4984  for (size_t i = 0; i < results.colCount(); ++i) {
4985  const auto agg_result = crt_row[i];
4986  trow.cols.push_back(value_to_thrift(agg_result, targets[i].get_type_info()));
4987  }
4988  _return.row_set.rows.push_back(trow);
4989  }
4990  }
4991 }
4992 
4993 TRowDescriptor DBHandler::fixup_row_descriptor(const TRowDescriptor& row_desc,
4994  const Catalog& cat) {
4995  TRowDescriptor fixedup_row_desc;
4996  for (const TColumnType& col_desc : row_desc) {
4997  auto fixedup_col_desc = col_desc;
4998  if (col_desc.col_type.encoding == TEncodingType::DICT &&
4999  col_desc.col_type.comp_param > 0) {
5000  const auto dd = cat.getMetadataForDict(col_desc.col_type.comp_param, false);
5001  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
5002  }
5003  fixedup_row_desc.push_back(fixedup_col_desc);
5004  }
5005 
5006  return fixedup_row_desc;
5007 }
5008 
5009 // create simple result set to return a single column result
5010 void DBHandler::create_simple_result(TQueryResult& _return,
5011  const ResultSet& results,
5012  const bool column_format,
5013  const std::string label) const {
5014  CHECK_EQ(size_t(1), results.rowCount());
5015  TColumnType proj_info;
5016  proj_info.col_name = label;
5017  proj_info.col_type.type = TDatumType::STR;
5018  proj_info.col_type.nullable = false;
5019  proj_info.col_type.is_array = false;
5020  _return.row_set.row_desc.push_back(proj_info);
5021  const auto crt_row = results.getNextRow(true, true);
5022  const auto tv = crt_row[0];
5023  CHECK(results.getNextRow(true, true).empty());
5024  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
5025  CHECK(scalar_tv);
5026  const auto s_n = boost::get<NullableString>(scalar_tv);
5027  CHECK(s_n);
5028  const auto s = boost::get<std::string>(s_n);
5029  CHECK(s);
5030  if (column_format) {
5031  TColumn tcol;
5032  tcol.data.str_col.push_back(*s);
5033  tcol.nulls.push_back(false);
5034  _return.row_set.is_columnar = true;
5035  _return.row_set.columns.push_back(tcol);
5036  } else {
5037  TDatum explanation;
5038  explanation.val.str_val = *s;
5039  explanation.is_null = false;
5040  TRow trow;
5041  trow.cols.push_back(explanation);
5042  _return.row_set.is_columnar = false;
5043  _return.row_set.rows.push_back(trow);
5044  }
5045 }
5046 
5047 void DBHandler::convert_explain(TQueryResult& _return,
5048  const ResultSet& results,
5049  const bool column_format) const {
5050  create_simple_result(_return, results, column_format, "Explanation");
5051 }
5052 
5053 void DBHandler::convert_result(TQueryResult& _return,
5054  const ResultSet& results,
5055  const bool column_format) const {
5056  create_simple_result(_return, results, column_format, "Result");
5057 }
5058 
5059 // this all should be moved out of here to catalog
5061  const TableDescriptor* td,
5062  const AccessPrivileges access_priv) {
5063  CHECK(td);
5064  auto& cat = session_info.getCatalog();
5065  std::vector<DBObject> privObjects;
5066  DBObject dbObject(td->tableName, TableDBObjectType);
5067  dbObject.loadKey(cat);
5068  dbObject.setPrivileges(access_priv);
5069  privObjects.push_back(dbObject);
5070  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
5071  privObjects);
5072 };
5073 
5075  const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl);
5076  if (drop_db_stmt) {
5077  invalidate_sessions(*drop_db_stmt->getDatabaseName(), drop_db_stmt);
5078  return;
5079  }
5080  const auto rename_db_stmt = dynamic_cast<Parser::RenameDatabaseStmt*>(ddl);
5081  if (rename_db_stmt) {
5082  invalidate_sessions(*rename_db_stmt->getPreviousDatabaseName(), rename_db_stmt);
5083  return;
5084  }
5085  const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl);
5086  if (drop_user_stmt) {
5087  invalidate_sessions(*drop_user_stmt->getUserName(), drop_user_stmt);
5088  return;
5089  }
5090  const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl);
5091  if (rename_user_stmt) {
5092  invalidate_sessions(*rename_user_stmt->getOldUserName(), rename_user_stmt);
5093  return;
5094  }
5095 }
5096 
5097 void DBHandler::sql_execute_impl(TQueryResult& _return,
5098  QueryStateProxy query_state_proxy,
5099  const bool column_format,
5100  const std::string& nonce,
5101  const ExecutorDeviceType executor_device_type,
5102  const int32_t first_n,
5103  const int32_t at_most_n) {
5104  if (leaf_handler_) {
5105  leaf_handler_->flush_queue();
5106  }
5107 
5108  _return.nonce = nonce;
5109  _return.execution_time_ms = 0;
5110  auto const query_str = strip(query_state_proxy.getQueryState().getQueryStr());
5111  auto session_ptr = query_state_proxy.getQueryState().getConstSessionInfo();
5112  // Call to DistributedValidate() below may change cat.
5113  auto& cat = session_ptr->getCatalog();
5114 
5115  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
5116 
5117  mapd_unique_lock<mapd_shared_mutex> executeWriteLock;
5118  mapd_shared_lock<mapd_shared_mutex> executeReadLock;
5119 
5121  ParserWrapper pw{query_str};
5122  switch (pw.getQueryType()) {
5124  _return.query_type = TQueryType::READ;
5125  VLOG(1) << "query type: READ";
5126  break;
5127  }
5129  _return.query_type = TQueryType::WRITE;
5130  VLOG(1) << "query type: WRITE";
5131  break;
5132  }
5134  _return.query_type = TQueryType::SCHEMA_READ;
5135  VLOG(1) << "query type: SCHEMA READ";
5136  break;
5137  }
5139  _return.query_type = TQueryType::SCHEMA_WRITE;
5140  VLOG(1) << "query type: SCHEMA WRITE";
5141  break;
5142  }
5143  default: {
5144  _return.query_type = TQueryType::UNKNOWN;
5145  LOG(WARNING) << "query type: UNKNOWN";
5146  break;
5147  }
5148  }
5149  if (pw.isCalcitePathPermissable(read_only_)) {
5150  executeReadLock = mapd_shared_lock<mapd_shared_mutex>(
5153 
5154  std::string query_ra;
5155  _return.execution_time_ms += measure<>::execution([&]() {
5156  TPlanResult result;
5157  std::tie(result, locks) =
5158  parse_to_ra(query_state_proxy, query_str, {}, true, system_parameters_);
5159  query_ra = result.plan_result;
5160  });
5161 
5162  std::string query_ra_calcite_explain;
5163  if (pw.isCalciteExplain() && (!g_enable_filter_push_down || g_cluster)) {
5164  // return the ra as the result
5165  convert_explain(_return, ResultSet(query_ra), true);
5166  return;
5167  } else if (pw.isCalciteExplain()) {
5168  // removing the "explain calcite " from the beginning of the "query_str":
5169  std::string temp_query_str =
5170  query_str.substr(std::string("explain calcite ").length());
5171 
5172  CHECK(!locks.empty());
5173  query_ra_calcite_explain =
5174  parse_to_ra(query_state_proxy, temp_query_str, {}, false, system_parameters_)
5175  .first.plan_result;
5176  } else if (pw.isCalciteDdl()) {
5177  executeDdl(_return, query_ra, session_ptr);
5178  return;
5179  }
5180  const auto explain_info = pw.getExplainInfo();
5181  std::vector<PushedDownFilterInfo> filter_push_down_requests;
5182  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
5183  [this,
5184  &filter_push_down_requests,
5185  &_return,
5186  &query_state_proxy,
5187  &explain_info,
5188  &query_ra_calcite_explain,
5189  &query_ra,
5190  &query_str,
5191  &locks,
5192  column_format,
5193  executor_device_type,
5194  first_n,
5195  at_most_n](const size_t executor_index) {
5196  filter_push_down_requests = execute_rel_alg(
5197  _return,
5198  query_state_proxy,
5199  explain_info.justCalciteExplain() ? query_ra_calcite_explain : query_ra,
5200  column_format,
5201  executor_device_type,
5202  first_n,
5203  at_most_n,
5204  /*just_validate=*/false,
5206  explain_info,
5207  executor_index);
5208  if (explain_info.justCalciteExplain() && filter_push_down_requests.empty()) {
5209  // we only reach here if filter push down was enabled, but no filter
5210  // push down candidate was found
5211  convert_explain(_return, ResultSet(query_ra), true);
5212  } else if (!filter_push_down_requests.empty()) {
5213  CHECK(!locks.empty());
5215  query_state_proxy,
5216  query_ra,
5217  column_format,
5218  executor_device_type,
5219  first_n,
5220  at_most_n,
5221  explain_info.justExplain(),
5222  explain_info.justCalciteExplain(),
5223  filter_push_down_requests);
5224  } else if (explain_info.justCalciteExplain() &&
5225  filter_push_down_requests.empty()) {
5226  // return the ra as the result:
5227  // If we reach here, the 'filter_push_down_request' turned out to be
5228  // empty, i.e., no filter push down so we continue with the initial
5229  // (unchanged) query's calcite explanation.
5230  CHECK(!locks.empty());
5231  query_ra =
5232  parse_to_ra(query_state_proxy, query_str, {}, false, system_parameters_)
5233  .first.plan_result;
5234  convert_explain(_return, ResultSet(query_ra), true);
5235  }
5236  });
5238  dispatch_queue_->submit(execute_rel_alg_task);
5239  auto result_future = execute_rel_alg_task->get_future();
5240  result_future.get();
5241  return;
5242  } else if (pw.is_optimize || pw.is_validate) {
5243  // Get the Stmt object
5244  DBHandler::parser_with_error_handler(query_str, parse_trees);
5245 
5246  if (pw.is_optimize) {
5247  const auto optimize_stmt =
5248  dynamic_cast<Parser::OptimizeTableStmt*>(parse_trees.front().get());
5249  CHECK(optimize_stmt);
5250 
5251  _return.execution_time_ms += measure<>::execution([&]() {
5252  const auto td_with_lock =
5254  cat, optimize_stmt->getTableName());
5255  const auto td = td_with_lock();
5256 
5257  if (!td || !user_can_access_table(
5258  *session_ptr, td, AccessPrivileges::DELETE_FROM_TABLE)) {
5259  throw std::runtime_error("Table " + optimize_stmt->getTableName() +
5260  " does not exist.");
5261  }
5262  if (td->isView) {
5263  throw std::runtime_error("OPTIMIZE TABLE command is not supported on views.");
5264  }
5265 
5266  // acquire write lock on table data
5267  auto data_lock =
5269 
5270  auto executor = Executor::getExecutor(
5272  const TableOptimizer optimizer(td, executor.get(), cat);
5273  if (optimize_stmt->shouldVacuumDeletedRows()) {
5274  optimizer.vacuumDeletedRows();
5275  }
5276  optimizer.recomputeMetadata();
5277  });
5278 
5279  return;
5280  }
5281  if (pw.is_validate) {
5282  // check user is superuser
5283  if (!session_ptr->get_currentUser().isSuper) {
5284  throw std::runtime_error("Superuser is required to run VALIDATE");
5285  }
5286  const auto validate_stmt =
5287  dynamic_cast<Parser::ValidateStmt*>(parse_trees.front().get());
5288  CHECK(validate_stmt);
5289 
5290  // Prevent any other query from running while doing validate
5291  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5294 
5295  std::string output{"Result for validate"};
5297  _return.execution_time_ms += measure<>::execution([&]() {
5298  const DistributedValidate validator(validate_stmt->getType(),
5299  validate_stmt->isRepairTypeRemove(),
5300  cat, // tables may be dropped here
5302  *session_ptr,
5303  *this);
5304  output = validator.validate(query_state_proxy);
5305  });
5306  } else {
5307  output = "Not running on a cluster nothing to validate";
5308  }
5309  convert_result(_return, ResultSet(output), true);
5310  return;
5311  }
5312  }
5313  LOG(INFO) << "passing query to legacy processor";
5314 
5315  const auto result = apply_copy_to_shim(query_str);
5317  auto handle_ddl = [&query_state_proxy, &session_ptr, &_return, &locks, this](
5318  Parser::DDLStmt* ddl) -> bool {
5319  if (!ddl) {
5320  return false;
5321  }
5322  const auto show_create_stmt = dynamic_cast<Parser::ShowCreateTableStmt*>(ddl);
5323  if (show_create_stmt) {
5324  _return.execution_time_ms +=
5325  measure<>::execution([&]() { ddl->execute(*session_ptr); });
5326  const auto create_string = show_create_stmt->getCreateStmt();
5327  convert_result(_return, ResultSet(create_string), true);
5328  return true;
5329  }
5330 
5331  const auto import_stmt = dynamic_cast<Parser::CopyTableStmt*>(ddl);
5332  if (import_stmt) {
5333  if (g_cluster && !leaf_aggregator_.leafCount()) {
5334  // Don't allow copy from imports directly on a leaf node
5335  throw std::runtime_error(
5336  "Cannot import on an individual leaf. Please import from the Aggregator.");
5337  } else if (leaf_aggregator_.leafCount() > 0) {
5338  _return.execution_time_ms += measure<>::execution(
5339  [&]() { execute_distributed_copy_statement(import_stmt, *session_ptr); });
5340  } else {
5341  _return.execution_time_ms +=
5342  measure<>::execution([&]() { ddl->execute(*session_ptr); });
5343  }
5344 
5345  // Read response message
5346  convert_result(_return, ResultSet(*import_stmt->return_message.get()), true);
5347  _return.success = import_stmt->get_success();
5348 
5349  // get geo_copy_from info
5350  if (import_stmt->was_geo_copy_from()) {
5351  GeoCopyFromState geo_copy_from_state;
5352  import_stmt->get_geo_copy_from_payload(
5353  geo_copy_from_state.geo_copy_from_table,
5354  geo_copy_from_state.geo_copy_from_file_name,
5355  geo_copy_from_state.geo_copy_from_copy_params,
5356  geo_copy_from_state.geo_copy_from_partitions);
5357  geo_copy_from_sessions.add(session_ptr->get_session_id(), geo_copy_from_state);
5358  }
5359  return true;
5360  }
5361 
5362  // Check for DDL statements requiring locking and get locks
5363  auto export_stmt = dynamic_cast<Parser::ExportQueryStmt*>(ddl);
5364  if (export_stmt) {
5365  const auto query_string = export_stmt->get_select_stmt();
5366  TPlanResult result;
5367  CHECK(locks.empty());
5368  std::tie(result, locks) =
5369  parse_to_ra(query_state_proxy, query_string, {}, true, system_parameters_);
5370  }
5371  _return.execution_time_ms += measure<>::execution([&]() {
5372  ddl->execute(*session_ptr);
5374  });
5375  return true;
5376  };
5377 
5378  for (const auto& stmt : parse_trees) {
5379  auto select_stmt = dynamic_cast<Parser::SelectStmt*>(stmt.get());
5380  if (!select_stmt) {
5381  check_read_only("Non-SELECT statements");
5382  }
5383  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt.get());
5384  if (!handle_ddl(ddl)) {
5385  auto stmtp = dynamic_cast<Parser::InsertValuesStmt*>(stmt.get());
5386  CHECK(stmtp); // no other statements supported
5387 
5388  if (parse_trees.size() != 1) {
5389  throw std::runtime_error("Can only run one INSERT INTO query at a time.");
5390  }
5391 
5392  _return.execution_time_ms +=
5393  measure<>::execution([&]() { stmtp->execute(*session_ptr); });
5394  }
5395  }
5396 }
5397 
5399  TQueryResult& _return,
5400  QueryStateProxy query_state_proxy,
5401  std::string& query_ra,
5402  const bool column_format,
5403  const ExecutorDeviceType executor_device_type,
5404  const int32_t first_n,
5405  const int32_t at_most_n,
5406  const bool just_explain,
5407  const bool just_calcite_explain,
5408  const std::vector<PushedDownFilterInfo>& filter_push_down_requests) {
5409  // collecting the selected filters' info to be sent to Calcite:
5410  std::vector<TFilterPushDownInfo> filter_push_down_info;
5411  for (const auto& req : filter_push_down_requests) {
5412  TFilterPushDownInfo filter_push_down_info_for_request;
5413  filter_push_down_info_for_request.input_prev = req.input_prev;
5414  filter_push_down_info_for_request.input_start = req.input_start;
5415  filter_push_down_info_for_request.input_next = req.input_next;
5416  filter_push_down_info.push_back(filter_push_down_info_for_request);
5417  }
5418  // deriving the new relational algebra plan with respect to the pushed down filters
5419  _return.execution_time_ms += measure<>::execution([&]() {
5420  query_ra = parse_to_ra(query_state_proxy,
5421  query_state_proxy.getQueryState().getQueryStr(),
5422  filter_push_down_info,
5423  false,
5425  .first.plan_result;
5426  });
5427 
5428  if (just_calcite_explain) {
5429  // return the new ra as the result
5430  convert_explain(_return, ResultSet(query_ra), true);
5431  return;
5432  }
5433 
5434  // execute the new relational algebra plan:
5435  auto explain_info = ExplainInfo::defaults();
5436  explain_info.explain = just_explain;
5437  execute_rel_alg(_return,
5438  query_state_proxy,
5439  query_ra,
5440  column_format,
5441  executor_device_type,
5442  first_n,
5443  at_most_n,
5444  /*just_validate=*/false,
5445  /*find_push_down_candidates=*/false,
5446  explain_info);
5447 }
5448 
5450  Parser::CopyTableStmt* copy_stmt,
5451  const Catalog_Namespace::SessionInfo& session_info) {
5452  auto importer_factory = [&session_info, this](
5453  const Catalog& catalog,
5454  const TableDescriptor* td,
5455  const std::string& file_path,
5456  const import_export::CopyParams& copy_params) {
5457  return std::make_unique<import_export::Importer>(
5458  new DistributedLoader(session_info, td, &leaf_aggregator_),
5459  file_path,
5460  copy_params);
5461  };
5462  copy_stmt->execute(session_info, importer_factory);
5463 }
5464 
5465 std::pair<TPlanResult, lockmgr::LockedTableDescriptors> DBHandler::parse_to_ra(
5466  QueryStateProxy query_state_proxy,
5467  const std::string& query_str,
5468  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
5469  const bool acquire_locks,
5470  const SystemParameters system_parameters,
5471  bool check_privileges) {
5472  query_state::Timer timer = query_state_proxy.createTimer(__func__);
5473  ParserWrapper pw{query_str};
5474  const std::string actual_query{pw.isSelectExplain() ? pw.actual_query : query_str};
5475  TPlanResult result;
5476  if (pw.isCalcitePathPermissable()) {
5477  auto cat = query_state_proxy.getQueryState().getConstSessionInfo()->get_catalog_ptr();
5478  auto session_cleanup_handler = [&](const auto& session_id) {
5479  removeInMemoryCalciteSession(session_id);
5480  };
5481  auto process_calcite_request = [&] {
5482  const auto& in_memory_session_id = createInMemoryCalciteSession(cat);
5483  try {
5484  result = calcite_->process(timer.createQueryStateProxy(),
5485  legacy_syntax_ ? pg_shim(actual_query) : actual_query,
5486  filter_push_down_info,
5488  pw.isCalciteExplain(),
5489  system_parameters.enable_calcite_view_optimize,
5490  check_privileges,
5491  in_memory_session_id);
5492  session_cleanup_handler(in_memory_session_id);
5493  } catch (std::exception&) {
5494  session_cleanup_handler(in_memory_session_id);
5495  throw;
5496  }
5497  };
5498  process_calcite_request();
5500  if (acquire_locks) {
5501  std::set<std::string> read_only_tables;
5502  for (const auto& table : result.resolved_accessed_objects.tables_selected_from) {
5503  read_only_tables.insert(table);
5504  }
5505  std::vector<std::string> tables;
5506  tables.insert(tables.end(),
5507  result.resolved_accessed_objects.tables_selected_from.begin(),
5508  result.resolved_accessed_objects.tables_selected_from.end());
5509  tables.insert(tables.end(),
5510  result.resolved_accessed_objects.tables_inserted_into.begin(),
5511  result.resolved_accessed_objects.tables_inserted_into.end());
5512  tables.insert(tables.end(),
5513  result.resolved_accessed_objects.tables_updated_in.begin(),
5514  result.resolved_accessed_objects.tables_updated_in.end());
5515  tables.insert(tables.end(),
5516  result.resolved_accessed_objects.tables_deleted_from.begin(),
5517  result.resolved_accessed_objects.tables_deleted_from.end());
5518  // avoid deadlocks by enforcing a deterministic locking sequence
5519  std::sort(tables.begin(), tables.end());
5520  for (const auto& table : tables) {
5521  // first, obtain table schema locks
5522  // then, obtain table data locks
5523  if (read_only_tables.count(table)) {
5524  locks.emplace_back(
5527  lockmgr::ReadLock>::acquireTableDescriptor(*cat.get(), table)));
5528  locks.emplace_back(
5531  cat->getDatabaseId(), (*locks.back())())));
5532  } else {
5533  locks.emplace_back(
5536  lockmgr::WriteLock>::acquireTableDescriptor(*cat.get(), table)));
5537  // TODO(adb): Should we be taking this lock for inserts? Are inserts even
5538  // going down this path?
5539  locks.emplace_back(
5542  cat->getDatabaseId(), (*locks.back())())));
5543  }
5544  }
5545  }
5546  return std::make_pair(result, std::move(locks));
5547  }
5548  return std::make_pair(result, lockmgr::LockedTableDescriptors{});
5549 }
5550 
5551 int64_t DBHandler::query_get_outer_fragment_count(const TSessionId& session,
5552  const std::string& select_query) {
5553  auto stdlog = STDLOG(get_session_ptr(session));
5554  if (!leaf_handler_) {
5555  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5556  }
5557  try {
5558  return leaf_handler_->query_get_outer_fragment_count(session, select_query);
5559  } catch (std::exception& e) {
5560  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5561  }
5562 }
5563 
5564 void DBHandler::check_table_consistency(TTableMeta& _return,
5565  const TSessionId& session,
5566  const int32_t table_id) {
5567  auto stdlog = STDLOG(get_session_ptr(session));
5568  if (!leaf_handler_) {
5569  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5570  }
5571  try {
5572  leaf_handler_->check_table_consistency(_return, session, table_id);
5573  } catch (std::exception& e) {
5574  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5575  }
5576 }
5577 
5578 void DBHandler::start_query(TPendingQuery& _return,
5579  const TSessionId& leaf_session,
5580  const TSessionId& parent_session,
5581  const std::string& query_ra,
5582  const bool just_explain,
5583  const std::vector<int64_t>& outer_fragment_indices) {
5584  auto stdlog = STDLOG(get_session_ptr(leaf_session));
5585  auto session_ptr = stdlog.getConstSessionInfo();
5586  if (!leaf_handler_) {
5587  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5588  }
5589  LOG(INFO) << "start_query :" << *session_ptr << " :" << just_explain;
5590  auto time_ms = measure<>::execution([&]() {
5591  try {
5592  leaf_handler_->start_query(_return,
5593  leaf_session,
5594  parent_session,
5595  query_ra,
5596  just_explain,
5597  outer_fragment_indices);
5598  } catch (std::exception& e) {
5599  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5600  }
5601  });
5602  LOG(INFO) << "start_query-COMPLETED " << time_ms << "ms "
5603  << "id is " << _return.id;
5604 }
5605 
5606 void DBHandler::execute_query_step(TStepResult& _return,
5607  const TPendingQuery& pending_query) {
5608  if (!leaf_handler_) {
5609  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5610  }
5611  LOG(INFO) << "execute_query_step : id:" << pending_query.id;