OmniSciDB  91042dcc5b
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: DBHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "DBHandler.h"
25 #include "DistributedLoader.h"
26 #include "TokenCompletionHints.h"
27 
28 #ifdef HAVE_PROFILER
29 #include <gperftools/heap-profiler.h>
30 #endif // HAVE_PROFILER
31 
32 #include "MapDRelease.h"
33 
34 #include "Calcite/Calcite.h"
35 #include "gen-cpp/CalciteServer.h"
36 
39 
40 #include "Catalog/Catalog.h"
44 #include "DistributedHandler.h"
46 #include "Geospatial/Compression.h"
47 #include "Geospatial/GDAL.h"
48 #include "Geospatial/Transforms.h"
49 #include "Geospatial/Types.h"
51 #include "ImportExport/Importer.h"
52 #include "LockMgr/LockMgr.h"
54 #include "Parser/ParserWrapper.h"
56 #include "Parser/parser.h"
59 #include "QueryEngine/Execute.h"
69 #include "Shared/ArrowUtil.h"
70 #include "Shared/StringTransform.h"
71 #include "Shared/file_path_util.h"
72 #include "Shared/import_helpers.h"
74 #include "Shared/measure.h"
75 #include "Shared/scope.h"
77 
78 #ifdef HAVE_AWS_S3
79 #include <aws/core/auth/AWSCredentialsProviderChain.h>
80 #endif
81 #include <fcntl.h>
82 #include <picosha2.h>
83 #include <sys/types.h>
84 #include <algorithm>
85 #include <boost/algorithm/string.hpp>
86 #include <boost/filesystem.hpp>
87 #include <boost/make_shared.hpp>
88 #include <boost/process/search_path.hpp>
89 #include <boost/program_options.hpp>
90 #include <boost/regex.hpp>
91 #include <boost/tokenizer.hpp>
92 #include <chrono>
93 #include <cmath>
94 #include <csignal>
95 #include <fstream>
96 #include <future>
97 #include <map>
98 #include <memory>
99 #include <random>
100 #include <regex>
101 #include <string>
102 #include <thread>
103 #include <typeinfo>
104 
105 #include <arrow/api.h>
106 #include <arrow/io/api.h>
107 #include <arrow/ipc/api.h>
108 
109 #include "Shared/ArrowUtil.h"
110 
111 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
112 
113 #ifdef ENABLE_IMPORT_PARQUET
114 extern bool g_enable_parquet_import_fsi;
115 #endif
116 
117 #ifdef HAVE_AWS_S3
118 extern bool g_allow_s3_server_privileges;
119 #endif
120 
121 extern bool g_enable_system_tables;
123 
126 
127 #define INVALID_SESSION_ID ""
128 
129 #define THROW_MAPD_EXCEPTION(errstr) \
130  { \
131  TOmniSciException ex; \
132  ex.error_msg = errstr; \
133  LOG(ERROR) << ex.error_msg; \
134  throw ex; \
135  }
136 
137 thread_local std::string TrackingProcessor::client_address;
139 
140 namespace {
141 
142 SessionMap::iterator get_session_from_map(const TSessionId& session,
143  SessionMap& session_map) {
144  auto session_it = session_map.find(session);
145  if (session_it == session_map.end()) {
146  THROW_MAPD_EXCEPTION("Session not valid.");
147  }
148  return session_it;
149 }
150 
151 struct ForceDisconnect : public std::runtime_error {
152  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
153 };
154 
155 } // namespace
156 
157 template <>
158 SessionMap::iterator DBHandler::get_session_it_unsafe(
159  const TSessionId& session,
160  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
161  auto session_it = get_session_from_map(session, sessions_);
162  try {
163  check_session_exp_unsafe(session_it);
164  } catch (const ForceDisconnect& e) {
165  read_lock.unlock();
166  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
167  auto session_it2 = get_session_from_map(session, sessions_);
168  disconnect_impl(session_it2, write_lock);
169  THROW_MAPD_EXCEPTION(e.what());
170  }
171  return session_it;
172 }
173 
174 template <>
175 SessionMap::iterator DBHandler::get_session_it_unsafe(
176  const TSessionId& session,
177  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
178  auto session_it = get_session_from_map(session, sessions_);
179  try {
180  check_session_exp_unsafe(session_it);
181  } catch (const ForceDisconnect& e) {
182  disconnect_impl(session_it, write_lock);
183  THROW_MAPD_EXCEPTION(e.what());
184  }
185  return session_it;
186 }
187 
188 template <>
190  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
191  std::vector<std::string> expired_sessions;
192  for (auto session_pair : sessions_) {
193  auto session_it = get_session_from_map(session_pair.first, sessions_);
194  try {
195  check_session_exp_unsafe(session_it);
196  } catch (const ForceDisconnect& e) {
197  expired_sessions.emplace_back(session_it->second->get_session_id());
198  }
199  }
200 
201  for (auto session_id : expired_sessions) {
202  if (leaf_aggregator_.leafCount() > 0) {
203  try {
204  leaf_aggregator_.disconnect(session_id);
205  } catch (TOmniSciException& toe) {
206  LOG(INFO) << " Problem disconnecting from leaves : " << toe.what();
207  } catch (std::exception& e) {
208  LOG(INFO)
209  << " Problem disconnecting from leaves, check leaf logs for additonal info";
210  }
211  }
212  sessions_.erase(session_id);
213  }
214  if (render_handler_) {
215  write_lock.unlock();
216  for (auto session_id : expired_sessions) {
217  // NOTE: the render disconnect is done after the session lock is released to
218  // avoid a deadlock. See: https://omnisci.atlassian.net/browse/BE-3324
219  // This out-of-scope solution is a compromise for now until a better session
220  // handling/locking mechanism is developed for the renderer. Note as well that the
221  // session_id cannot be immediately reused. If a render request were to slip in
222  // after the lock is released and before the render disconnect could cause a
223  // problem.
224  render_handler_->disconnect(session_id);
225  }
226  }
227 }
228 
229 #ifdef ENABLE_GEOS
230 extern std::unique_ptr<std::string> g_libgeos_so_filename;
231 #endif
232 
233 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
234  const std::vector<LeafHostInfo>& string_leaves,
235  const std::string& base_data_path,
236  const bool allow_multifrag,
237  const bool jit_debug,
238  const bool intel_jit_profile,
239  const bool read_only,
240  const bool allow_loop_joins,
241  const bool enable_rendering,
242  const bool renderer_use_vulkan_driver,
243  const bool renderer_use_ppll_polys,
244  const bool renderer_prefer_igpu,
245  const unsigned renderer_vulkan_timeout_ms,
246  const bool enable_auto_clear_render_mem,
247  const int render_oom_retry_threshold,
248  const size_t render_mem_bytes,
249  const size_t max_concurrent_render_sessions,
250  const size_t reserved_gpu_mem,
251  const bool render_compositor_use_last_gpu,
252  const size_t num_reader_threads,
253  const AuthMetadata& authMetadata,
254  SystemParameters& system_parameters,
255  const bool legacy_syntax,
256  const int idle_session_duration,
257  const int max_session_duration,
258  const bool enable_runtime_udf_registration,
259  const std::string& udf_filename,
260  const std::string& clang_path,
261  const std::vector<std::string>& clang_options,
262 #ifdef ENABLE_GEOS
263  const std::string& libgeos_so_filename,
264 #endif
265  const File_Namespace::DiskCacheConfig& disk_cache_config,
266  const bool is_new_db)
267  : leaf_aggregator_(db_leaves)
268  , db_leaves_(db_leaves)
269  , string_leaves_(string_leaves)
270  , base_data_path_(base_data_path)
271  , random_gen_(std::random_device{}())
272  , session_id_dist_(0, INT32_MAX)
273  , jit_debug_(jit_debug)
274  , intel_jit_profile_(intel_jit_profile)
275  , allow_multifrag_(allow_multifrag)
276  , read_only_(read_only)
277  , allow_loop_joins_(allow_loop_joins)
278  , authMetadata_(authMetadata)
279  , system_parameters_(system_parameters)
280  , legacy_syntax_(legacy_syntax)
281  , dispatch_queue_(
282  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
283  , super_user_rights_(false)
284  , idle_session_duration_(idle_session_duration * 60)
285  , max_session_duration_(max_session_duration * 60)
286  , runtime_udf_registration_enabled_(enable_runtime_udf_registration)
287 
288  , enable_rendering_(enable_rendering)
289  , renderer_use_vulkan_driver_(renderer_use_vulkan_driver)
290  , renderer_use_ppll_polys_(renderer_use_ppll_polys)
291  , renderer_prefer_igpu_(renderer_prefer_igpu)
292  , renderer_vulkan_timeout_(renderer_vulkan_timeout_ms)
293  , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
294  , render_oom_retry_threshold_(render_oom_retry_threshold)
295  , max_concurrent_render_sessions_(max_concurrent_render_sessions)
296  , reserved_gpu_mem_(reserved_gpu_mem)
297  , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
298  , render_mem_bytes_(render_mem_bytes)
299  , num_reader_threads_(num_reader_threads)
300 #ifdef ENABLE_GEOS
301  , libgeos_so_filename_(libgeos_so_filename)
302 #endif
303  , disk_cache_config_(disk_cache_config)
304  , udf_filename_(udf_filename)
305  , clang_path_(clang_path)
306  , clang_options_(clang_options)
307 
308 {
309  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
310  initialize(is_new_db);
311 }
312 
313 void DBHandler::initialize(const bool is_new_db) {
314  if (!initialized_) {
315  initialized_ = true;
316  } else {
318  "Server already initialized; service restart required to activate any new "
319  "entitlements.");
320  return;
321  }
322 
325  cpu_mode_only_ = true;
326  } else {
327 #ifdef HAVE_CUDA
329  cpu_mode_only_ = false;
330 #else
332  LOG(WARNING) << "This build isn't CUDA enabled, will run on CPU";
333  cpu_mode_only_ = true;
334 #endif
335  }
336 
337  bool is_rendering_enabled = enable_rendering_;
338  if (system_parameters_.num_gpus == 0) {
339  is_rendering_enabled = false;
340  }
341 
342  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
343  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
344  // manage cannot ask for
345  size_t total_reserved = reserved_gpu_mem_;
346  if (is_rendering_enabled) {
347  total_reserved += render_mem_bytes_;
348  }
349 
350  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
351 #ifdef HAVE_CUDA
352  if (!cpu_mode_only_ || is_rendering_enabled) {
353  try {
354  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
356  } catch (const std::exception& e) {
357  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
358  << e.what();
359  cpu_mode_only_ = true;
360  is_rendering_enabled = false;
361  }
362  }
363 #endif // HAVE_CUDA
364 
365  try {
366  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
368  std::move(cuda_mgr),
370  total_reserved,
373  } catch (const std::exception& e) {
374  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
375  }
376 
377  std::string udf_ast_filename("");
378 
379  try {
380  if (!udf_filename_.empty()) {
381  const auto cuda_mgr = data_mgr_->getCudaMgr();
382  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
383  cuda_mgr ? cuda_mgr->getDeviceArch()
385  UdfCompiler compiler(device_arch, clang_path_, clang_options_);
386 
387  const auto [cpu_udf_ir_file, cuda_udf_ir_file] = compiler.compileUdf(udf_filename_);
388  Executor::addUdfIrToModule(cpu_udf_ir_file, /*is_cuda_ir=*/false);
389  if (!cuda_udf_ir_file.empty()) {
390  Executor::addUdfIrToModule(cuda_udf_ir_file, /*is_cuda_ir=*/true);
391  }
392  udf_ast_filename = compiler.getAstFileName(udf_filename_);
393  }
394  } catch (const std::exception& e) {
395  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
396  }
397 
398  try {
399  calcite_ =
400  std::make_shared<Calcite>(system_parameters_, base_data_path_, udf_ast_filename);
401  } catch (const std::exception& e) {
402  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
403  }
404 
405  try {
406  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
407  if (!udf_filename_.empty()) {
408  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
409  }
410  } catch (const std::exception& e) {
411  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
412  }
413 
414  try {
416  } catch (const std::exception& e) {
417  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
418  }
419 
420  try {
421  auto udtfs = ThriftSerializers::to_thrift(
423  std::vector<TUserDefinedFunction> udfs = {};
424  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
425  } catch (const std::exception& e) {
426  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
427  }
428 
429  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
431  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
432  cpu_mode_only_ = true;
433  }
434 
435  switch (executor_device_type_) {
437  LOG(INFO) << "Started in GPU mode" << std::endl;
438  break;
440  LOG(INFO) << "Started in CPU mode" << std::endl;
441  break;
442  }
443 
444  try {
446  SysCatalog::instance().init(base_data_path_,
447  data_mgr_,
449  calcite_,
450  is_new_db,
451  !db_leaves_.empty(),
453  } catch (const std::exception& e) {
454  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
455  }
456 
457  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
458  start_time_ = std::time(nullptr);
459 
460  if (is_rendering_enabled) {
461  try {
462  render_handler_.reset(new RenderHandler(this,
466  false,
467  0,
468  false,
469  false,
473  } catch (const std::exception& e) {
474  LOG(ERROR) << "Backend rendering disabled: " << e.what();
475  }
476  }
477 
478  if (leaf_aggregator_.leafCount() > 0) {
479  try {
480  agg_handler_.reset(new MapDAggHandler(this));
481  } catch (const std::exception& e) {
482  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
483  }
484  } else if (g_cluster) {
485  try {
486  leaf_handler_.reset(new MapDLeafHandler(this));
487  } catch (const std::exception& e) {
488  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
489  }
490  }
491 
492 #ifdef ENABLE_GEOS
493  if (!libgeos_so_filename_.empty()) {
494  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename_));
495  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
496  }
497 #endif
498 }
499 
501  shutdown();
502 }
503 
505  const std::string& query_str,
506  std::list<std::unique_ptr<Parser::Stmt>>& parse_trees) {
507  int num_parse_errors = 0;
508  std::string last_parsed;
509  SQLParser parser;
510  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
511  if (num_parse_errors > 0) {
512  throw std::runtime_error("Syntax error at: " + last_parsed);
513  }
514  if (parse_trees.size() > 1) {
515  throw std::runtime_error("multiple SQL statements not allowed");
516  } else if (parse_trees.size() != 1) {
517  throw std::runtime_error("empty SQL statment not allowed");
518  }
519 }
520 void DBHandler::check_read_only(const std::string& str) {
521  if (DBHandler::read_only_) {
522  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
523  }
524 }
525 
527  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
528  // We would create an in memory session for calcite with super user privileges which
529  // would be used for getting all tables metadata when a user runs the query. The
530  // session would be under the name of a proxy user/password which would only persist
531  // till server's lifetime or execution of calcite query(in memory) whichever is the
532  // earliest.
533  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
534  std::string session_id;
535  do {
536  session_id = generate_random_string(64);
537  } while (sessions_.find(session_id) != sessions_.end());
538  Catalog_Namespace::UserMetadata user_meta(-1,
539  calcite_->getInternalSessionProxyUserName(),
540  calcite_->getInternalSessionProxyPassword(),
541  true,
542  -1,
543  true,
544  false);
545  const auto emplace_ret =
546  sessions_.emplace(session_id,
547  std::make_shared<Catalog_Namespace::SessionInfo>(
548  catalog_ptr, user_meta, executor_device_type_, session_id));
549  CHECK(emplace_ret.second);
550  return session_id;
551 }
552 
554  const Catalog_Namespace::UserMetadata user_meta) {
555  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
556  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
557  user_meta.isSuper.load();
558 }
559 
560 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
561  // Remove InMemory calcite Session.
562  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
563  const auto it = sessions_.find(session_id);
564  CHECK(it != sessions_.end());
565  sessions_.erase(it);
566 }
567 
568 // internal connection for connections with no password
569 void DBHandler::internal_connect(TSessionId& session,
570  const std::string& username,
571  const std::string& dbname) {
572  auto stdlog = STDLOG(); // session_info set by connect_impl()
573  std::string username2 = username; // login() may reset username given as argument
574  std::string dbname2 = dbname; // login() may reset dbname given as argument
576  std::shared_ptr<Catalog> cat = nullptr;
577  try {
578  cat =
579  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
580  } catch (std::exception& e) {
581  THROW_MAPD_EXCEPTION(e.what());
582  }
583 
584  DBObject dbObject(dbname2, DatabaseDBObjectType);
585  dbObject.loadKey(*cat);
587  std::vector<DBObject> dbObjects;
588  dbObjects.push_back(dbObject);
589  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
590  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
591  " is not allowed to access database " + dbname2 + ".");
592  }
593  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
594 }
595 
597  return leaf_aggregator_.leafCount() > 0;
598 }
599 
600 void DBHandler::krb5_connect(TKrb5Session& session,
601  const std::string& inputToken,
602  const std::string& dbname) {
603  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
604 }
605 
606 void DBHandler::connect(TSessionId& session,
607  const std::string& username,
608  const std::string& passwd,
609  const std::string& dbname) {
610  auto stdlog = STDLOG(); // session_info set by connect_impl()
611  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
612  std::string username2 = username; // login() may reset username given as argument
613  std::string dbname2 = dbname; // login() may reset dbname given as argument
615  std::shared_ptr<Catalog> cat = nullptr;
616  try {
617  cat = SysCatalog::instance().login(
618  dbname2, username2, passwd, user_meta, !super_user_rights_);
619  } catch (std::exception& e) {
620  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
621  THROW_MAPD_EXCEPTION(e.what());
622  }
623 
624  DBObject dbObject(dbname2, DatabaseDBObjectType);
625  dbObject.loadKey(*cat);
627  std::vector<DBObject> dbObjects;
628  dbObjects.push_back(dbObject);
629  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
630  stdlog.appendNameValuePairs(
631  "user", username, "db", dbname, "exception", "Missing Privileges");
632  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
633  " is not allowed to access database " + dbname2 + ".");
634  }
635  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
636 
637  // if pki auth session will come back encrypted with user pubkey
638  SysCatalog::instance().check_for_session_encryption(passwd, session);
639 }
640 
641 std::shared_ptr<Catalog_Namespace::SessionInfo> DBHandler::create_new_session(
642  TSessionId& session,
643  const std::string& dbname,
644  const Catalog_Namespace::UserMetadata& user_meta,
645  std::shared_ptr<Catalog> cat) {
646  do {
647  session = generate_random_string(32);
648  } while (sessions_.find(session) != sessions_.end());
649  std::pair<SessionMap::iterator, bool> emplace_retval =
650  sessions_.emplace(session,
651  std::make_shared<Catalog_Namespace::SessionInfo>(
652  cat, user_meta, executor_device_type_, session));
653  CHECK(emplace_retval.second);
654  auto& session_ptr = emplace_retval.first->second;
655  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
656  return session_ptr;
657 }
658 
659 void DBHandler::connect_impl(TSessionId& session,
660  const std::string& passwd,
661  const std::string& dbname,
662  const Catalog_Namespace::UserMetadata& user_meta,
663  std::shared_ptr<Catalog> cat,
664  query_state::StdLog& stdlog) {
665  // TODO(sy): Is there any reason to have dbname as a parameter
666  // here when the cat parameter already provides cat->name()?
667  // Should dbname and cat->name() ever differ?
668  {
669  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
670  expire_idle_sessions_unsafe(write_lock);
672  sessions_.size() + 1 > static_cast<size_t>(system_parameters_.num_sessions)) {
673  THROW_MAPD_EXCEPTION("Too many active sessions");
674  }
675  }
676  {
677  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
678  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
679  stdlog.setSessionInfo(session_ptr);
680  session_ptr->set_connection_info(getConnectionInfo().toString());
681  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
682  // while doing warmup
683  if (leaf_aggregator_.leafCount() > 0) {
684  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
685  return;
686  }
687  }
688  }
689  auto const roles =
690  stdlog.getConstSessionInfo()->get_currentUser().isSuper
691  ? std::vector<std::string>{{"super"}}
692  : SysCatalog::instance().getRoles(
693  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
694  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
695 }
696 
697 void DBHandler::disconnect(const TSessionId& session) {
698  auto stdlog = STDLOG();
699  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
700 
701  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
702  auto session_it = get_session_it_unsafe(session, write_lock);
703  stdlog.setSessionInfo(session_it->second);
704  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
705 
706  LOG(INFO) << "User " << session_it->second->get_currentUser().userLoggable()
707  << " disconnected from database " << dbname
708  << " with public_session_id: " << session_it->second->get_public_session_id();
709 
710  disconnect_impl(session_it, write_lock);
711 }
712 
713 void DBHandler::disconnect_impl(const SessionMap::iterator& session_it,
714  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
715  // session_it existence should already have been checked (i.e. called via
716  // get_session_it_unsafe(...))
717 
718  const auto session_id = session_it->second->get_session_id();
719  std::exception_ptr leaf_exception = nullptr;
720  try {
721  if (leaf_aggregator_.leafCount() > 0) {
722  leaf_aggregator_.disconnect(session_id);
723  }
724  } catch (...) {
725  leaf_exception = std::current_exception();
726  }
727 
728  {
729  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
730  render_group_assignment_map_.erase(session_id);
731  }
732 
733  sessions_.erase(session_it);
734  write_lock.unlock();
735 
736  if (render_handler_) {
737  render_handler_->disconnect(session_id);
738  }
739 }
740 
741 void DBHandler::switch_database(const TSessionId& session, const std::string& dbname) {
742  auto stdlog = STDLOG(get_session_ptr(session));
743  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
744  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
745  auto session_it = get_session_it_unsafe(session, write_lock);
746 
747  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
748 
749  try {
750  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
751  dbname2, session_it->second->get_currentUser().userName);
752  session_it->second->set_catalog_ptr(cat);
753  if (leaf_aggregator_.leafCount() > 0) {
754  leaf_aggregator_.switch_database(session, dbname);
755  return;
756  }
757  } catch (std::exception& e) {
758  THROW_MAPD_EXCEPTION(e.what());
759  }
760 }
761 
762 void DBHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
763  auto stdlog = STDLOG(get_session_ptr(session1));
764  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
765  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
766  auto session_it = get_session_it_unsafe(session1, write_lock);
767 
768  try {
769  const Catalog_Namespace::UserMetadata& user_meta =
770  session_it->second->get_currentUser();
771  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
772  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
773  if (leaf_aggregator_.leafCount() > 0) {
774  leaf_aggregator_.clone_session(session1, session2);
775  return;
776  }
777  } catch (std::exception& e) {
778  THROW_MAPD_EXCEPTION(e.what());
779  }
780 }
781 
782 void DBHandler::interrupt(const TSessionId& query_session,
783  const TSessionId& interrupt_session) {
784  // if this is for distributed setting, query_session becomes a parent session (agg)
785  // and the interrupt session is one of existing session in the leaf node (leaf)
786  // so we can think there exists a logical mapping
787  // between query_session (agg) and interrupt_session (leaf)
788  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
789  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
790  const auto allow_query_interrupt =
792  if (g_enable_dynamic_watchdog || allow_query_interrupt) {
793  // Shared lock to allow simultaneous interrupts of multiple sessions
794  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
795 
796  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
797  auto& cat = session_it->second.get()->getCatalog();
798  const auto dbname = cat.getCurrentDB().dbName;
800  jit_debug_ ? "/tmp" : "",
801  jit_debug_ ? "mapdquery" : "",
803  CHECK(executor);
804 
805  if (leaf_aggregator_.leafCount() > 0) {
806  leaf_aggregator_.interrupt(query_session, interrupt_session);
807  }
808  auto target_executor_ids = executor->getExecutorIdsRunningQuery(query_session);
809  if (target_executor_ids.empty()) {
810  mapd_shared_lock<mapd_shared_mutex> session_read_lock(executor->getSessionLock());
811  if (executor->checkIsQuerySessionEnrolled(query_session, session_read_lock)) {
812  session_read_lock.unlock();
813  VLOG(1) << "Received interrupt: "
814  << "User " << session_it->second->get_currentUser().userLoggable()
815  << ", Database " << dbname << std::endl;
816  executor->interrupt(query_session, interrupt_session);
817  }
818  } else {
819  for (auto& executor_id : target_executor_ids) {
820  VLOG(1) << "Received interrupt: "
821  << "Executor " << executor_id << ", User "
822  << session_it->second->get_currentUser().userLoggable() << ", Database "
823  << dbname << std::endl;
824  auto target_executor = Executor::getExecutor(executor_id);
825  target_executor->interrupt(query_session, interrupt_session);
826  }
827  }
828 
829  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
830  << " interrupted session with database " << dbname << std::endl;
831  }
832 }
833 
835  if (g_cluster) {
836  if (leaf_aggregator_.leafCount() > 0) {
837  return TRole::type::AGGREGATOR;
838  }
839  return TRole::type::LEAF;
840  }
841  return TRole::type::SERVER;
842 }
843 
844 void DBHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
845  auto stdlog = STDLOG(get_session_ptr(session));
846  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
847  const auto rendering_enabled = bool(render_handler_);
848  _return.read_only = read_only_;
849  _return.version = MAPD_RELEASE;
850  _return.rendering_enabled = rendering_enabled;
851  _return.start_time = start_time_;
852  _return.edition = MAPD_EDITION;
853  _return.host_name = omnisci::get_hostname();
854  _return.poly_rendering_enabled = rendering_enabled;
855  _return.role = getServerRole();
856  _return.renderer_status_json =
857  render_handler_ ? render_handler_->get_renderer_status_json() : "";
858 }
859 
860 void DBHandler::get_status(std::vector<TServerStatus>& _return,
861  const TSessionId& session) {
862  //
863  // get_status() is now called locally at startup on the aggregator
864  // in order to validate that all nodes of a cluster are running the
865  // same software version and the same renderer status
866  //
867  // In that context, it is called with the InvalidSessionID, and
868  // with the local super-user flag set.
869  //
870  // Hence, we allow this session-less mode only in distributed mode, and
871  // then on a leaf (always), or on the aggregator (only in super-user mode)
872  //
873  auto const allow_invalid_session = g_cluster && (!isAggregator() || super_user_rights_);
874  if (!allow_invalid_session || session != getInvalidSessionId()) {
875  auto stdlog = STDLOG(get_session_ptr(session));
876  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
877  } else {
878  LOG(INFO) << "get_status() called in session-less mode";
879  }
880  const auto rendering_enabled = bool(render_handler_);
881  TServerStatus ret;
882  ret.read_only = read_only_;
883  ret.version = MAPD_RELEASE;
884  ret.rendering_enabled = rendering_enabled;
885  ret.start_time = start_time_;
886  ret.edition = MAPD_EDITION;
887  ret.host_name = omnisci::get_hostname();
888  ret.poly_rendering_enabled = rendering_enabled;
889  ret.role = getServerRole();
890  ret.renderer_status_json =
891  render_handler_ ? render_handler_->get_renderer_status_json() : "";
892 
893  _return.push_back(ret);
894  if (leaf_aggregator_.leafCount() > 0) {
895  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
896  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
897  }
898 }
899 
900 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
901  const TSessionId& session) {
902  auto stdlog = STDLOG(get_session_ptr(session));
903  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
904  THardwareInfo ret;
905  const auto cuda_mgr = data_mgr_->getCudaMgr();
906  if (cuda_mgr) {
907  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
908  ret.start_gpu = cuda_mgr->getStartGpu();
909  if (ret.start_gpu >= 0) {
910  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
911  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
912  }
913  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
914  TGpuSpecification gpu_spec;
915  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
916  gpu_spec.num_sm = deviceProperties->numMPs;
917  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
918  gpu_spec.memory = deviceProperties->globalMem;
919  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
920  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
921  ret.gpu_info.push_back(gpu_spec);
922  }
923  }
924 
925  // start hardware/OS dependent code
926  ret.num_cpu_hw = std::thread::hardware_concurrency();
927  // ^ This might return diffrent results in case of hyper threading
928  // end hardware/OS dependent code
929 
930  _return.hardware_info.push_back(ret);
931  if (leaf_aggregator_.leafCount() > 0) {
932  ret.host_name = "aggregator";
933  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
934  _return.hardware_info.insert(_return.hardware_info.end(),
935  leaf_hardware.hardware_info.begin(),
936  leaf_hardware.hardware_info.end());
937  }
938 }
939 
940 void DBHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
941  auto session_ptr = get_session_ptr(session);
942  CHECK(session_ptr);
943  auto stdlog = STDLOG(session_ptr);
944  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
945  auto user_metadata = session_ptr->get_currentUser();
946 
947  _return.user = user_metadata.userName;
948  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
949  _return.start_time = session_ptr->get_start_time();
950  _return.is_super = user_metadata.isSuper;
951 }
952 
954  const SQLTypeInfo& ti,
955  TColumn& column) {
956  if (ti.is_array()) {
957  TColumn tColumn;
958  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
959  CHECK(array_tv);
960  bool is_null = !array_tv->is_initialized();
961  if (!is_null) {
962  const auto& vec = array_tv->get();
963  for (const auto& elem_tv : vec) {
964  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
965  }
966  }
967  column.data.arr_col.push_back(tColumn);
968  column.nulls.push_back(is_null && !ti.get_notnull());
969  } else if (ti.is_geometry()) {
970  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
971  if (scalar_tv) {
972  auto s_n = boost::get<NullableString>(scalar_tv);
973  auto s = boost::get<std::string>(s_n);
974  if (s) {
975  column.data.str_col.push_back(*s);
976  } else {
977  column.data.str_col.emplace_back(""); // null string
978  auto null_p = boost::get<void*>(s_n);
979  CHECK(null_p && !*null_p);
980  }
981  column.nulls.push_back(!s && !ti.get_notnull());
982  } else {
983  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
984  CHECK(array_tv);
985  bool is_null = !array_tv->is_initialized();
986  if (!is_null) {
987  auto elem_type = SQLTypeInfo(kDOUBLE, false);
988  TColumn tColumn;
989  const auto& vec = array_tv->get();
990  for (const auto& elem_tv : vec) {
991  value_to_thrift_column(elem_tv, elem_type, tColumn);
992  }
993  column.data.arr_col.push_back(tColumn);
994  column.nulls.push_back(false);
995  } else {
996  TColumn tColumn;
997  column.data.arr_col.push_back(tColumn);
998  column.nulls.push_back(is_null && !ti.get_notnull());
999  }
1000  }
1001  } else {
1002  CHECK(!ti.is_column());
1003  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1004  CHECK(scalar_tv);
1005  if (boost::get<int64_t>(scalar_tv)) {
1006  int64_t data = *(boost::get<int64_t>(scalar_tv));
1007 
1008  if (ti.is_decimal()) {
1009  double val = static_cast<double>(data);
1010  if (ti.get_scale() > 0) {
1011  val /= pow(10.0, std::abs(ti.get_scale()));
1012  }
1013  column.data.real_col.push_back(val);
1014  } else {
1015  column.data.int_col.push_back(data);
1016  }
1017 
1018  switch (ti.get_type()) {
1019  case kBOOLEAN:
1020  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
1021  break;
1022  case kTINYINT:
1023  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
1024  break;
1025  case kSMALLINT:
1026  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
1027  break;
1028  case kINT:
1029  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
1030  break;
1031  case kNUMERIC:
1032  case kDECIMAL:
1033  case kBIGINT:
1034  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1035  break;
1036  case kTIME:
1037  case kTIMESTAMP:
1038  case kDATE:
1039  case kINTERVAL_DAY_TIME:
1040  case kINTERVAL_YEAR_MONTH:
1041  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1042  break;
1043  default:
1044  column.nulls.push_back(false);
1045  }
1046  } else if (boost::get<double>(scalar_tv)) {
1047  double data = *(boost::get<double>(scalar_tv));
1048  column.data.real_col.push_back(data);
1049  if (ti.get_type() == kFLOAT) {
1050  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1051  } else {
1052  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
1053  }
1054  } else if (boost::get<float>(scalar_tv)) {
1055  CHECK_EQ(kFLOAT, ti.get_type());
1056  float data = *(boost::get<float>(scalar_tv));
1057  column.data.real_col.push_back(data);
1058  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1059  } else if (boost::get<NullableString>(scalar_tv)) {
1060  auto s_n = boost::get<NullableString>(scalar_tv);
1061  auto s = boost::get<std::string>(s_n);
1062  if (s) {
1063  column.data.str_col.push_back(*s);
1064  } else {
1065  column.data.str_col.emplace_back(""); // null string
1066  auto null_p = boost::get<void*>(s_n);
1067  CHECK(null_p && !*null_p);
1068  }
1069  column.nulls.push_back(!s && !ti.get_notnull());
1070  } else {
1071  CHECK(false);
1072  }
1073  }
1074 }
1075 
1077  TDatum datum;
1078  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1079  if (!scalar_tv) {
1080  CHECK(ti.is_array());
1081  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1082  CHECK(array_tv);
1083  if (array_tv->is_initialized()) {
1084  const auto& vec = array_tv->get();
1085  for (const auto& elem_tv : vec) {
1086  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
1087  datum.val.arr_val.push_back(scalar_col_val);
1088  }
1089  // Datum is not null, at worst it's an empty array Datum
1090  datum.is_null = false;
1091  } else {
1092  datum.is_null = true;
1093  }
1094  return datum;
1095  }
1096  if (boost::get<int64_t>(scalar_tv)) {
1097  int64_t data = *(boost::get<int64_t>(scalar_tv));
1098 
1099  if (ti.is_decimal()) {
1100  double val = static_cast<double>(data);
1101  if (ti.get_scale() > 0) {
1102  val /= pow(10.0, std::abs(ti.get_scale()));
1103  }
1104  datum.val.real_val = val;
1105  } else {
1106  datum.val.int_val = data;
1107  }
1108 
1109  switch (ti.get_type()) {
1110  case kBOOLEAN:
1111  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
1112  break;
1113  case kTINYINT:
1114  datum.is_null = (datum.val.int_val == NULL_TINYINT);
1115  break;
1116  case kSMALLINT:
1117  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
1118  break;
1119  case kINT:
1120  datum.is_null = (datum.val.int_val == NULL_INT);
1121  break;
1122  case kDECIMAL:
1123  case kNUMERIC:
1124  case kBIGINT:
1125  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1126  break;
1127  case kTIME:
1128  case kTIMESTAMP:
1129  case kDATE:
1130  case kINTERVAL_DAY_TIME:
1131  case kINTERVAL_YEAR_MONTH:
1132  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1133  break;
1134  default:
1135  datum.is_null = false;
1136  }
1137  } else if (boost::get<double>(scalar_tv)) {
1138  datum.val.real_val = *(boost::get<double>(scalar_tv));
1139  if (ti.get_type() == kFLOAT) {
1140  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1141  } else {
1142  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
1143  }
1144  } else if (boost::get<float>(scalar_tv)) {
1145  CHECK_EQ(kFLOAT, ti.get_type());
1146  datum.val.real_val = *(boost::get<float>(scalar_tv));
1147  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1148  } else if (boost::get<NullableString>(scalar_tv)) {
1149  auto s_n = boost::get<NullableString>(scalar_tv);
1150  auto s = boost::get<std::string>(s_n);
1151  if (s) {
1152  datum.val.str_val = *s;
1153  } else {
1154  auto null_p = boost::get<void*>(s_n);
1155  CHECK(null_p && !*null_p);
1156  }
1157  datum.is_null = !s;
1158  } else {
1159  CHECK(false);
1160  }
1161  return datum;
1162 }
1163 
1165  TQueryResult& _return,
1166  const QueryStateProxy& query_state_proxy,
1167  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1168  const std::string& query_str,
1169  const bool column_format,
1170  const std::string& nonce,
1171  const int32_t first_n,
1172  const int32_t at_most_n,
1173  const bool use_calcite) {
1174  _return.total_time_ms = 0;
1175  _return.nonce = nonce;
1176  ParserWrapper pw{query_str};
1177  switch (pw.getQueryType()) {
1179  _return.query_type = TQueryType::READ;
1180  VLOG(1) << "query type: READ";
1181  break;
1182  }
1184  _return.query_type = TQueryType::WRITE;
1185  VLOG(1) << "query type: WRITE";
1186  break;
1187  }
1189  _return.query_type = TQueryType::SCHEMA_READ;
1190  VLOG(1) << "query type: SCHEMA READ";
1191  break;
1192  }
1194  _return.query_type = TQueryType::SCHEMA_WRITE;
1195  VLOG(1) << "query type: SCHEMA WRITE";
1196  break;
1197  }
1198  default: {
1199  _return.query_type = TQueryType::UNKNOWN;
1200  LOG(WARNING) << "query type: UNKNOWN";
1201  break;
1202  }
1203  }
1204 
1206  _return.total_time_ms += measure<>::execution([&]() {
1208  query_state_proxy,
1209  column_format,
1210  session_ptr->get_executor_device_type(),
1211  first_n,
1212  at_most_n,
1213  use_calcite);
1215  _return, result, query_state_proxy, query_str, column_format, first_n, at_most_n);
1216  });
1217 }
1218 
1219 void DBHandler::convertData(TQueryResult& _return,
1221  const QueryStateProxy& query_state_proxy,
1222  const std::string& query_str,
1223  const bool column_format,
1224  const int32_t first_n,
1225  const int32_t at_most_n) {
1226  _return.execution_time_ms += result.getExecutionTime();
1227  if (result.empty()) {
1228  return;
1229  }
1230 
1231  switch (result.getResultType()) {
1233  convertRows(_return,
1234  query_state_proxy,
1235  result.getTargetsMeta(),
1236  *result.getRows(),
1237  column_format,
1238  first_n,
1239  at_most_n);
1240  break;
1242  convertResult(_return, *result.getRows(), true);
1243  break;
1245  convertExplain(_return, *result.getRows(), true);
1246  break;
1248  convertRows(_return,
1249  query_state_proxy,
1250  result.getTargetsMeta(),
1251  *result.getRows(),
1252  column_format,
1253  -1,
1254  -1);
1255  break;
1256  }
1257 }
1258 
1259 void DBHandler::sql_execute(TQueryResult& _return,
1260  const TSessionId& session,
1261  const std::string& query_str,
1262  const bool column_format,
1263  const std::string& nonce,
1264  const int32_t first_n,
1265  const int32_t at_most_n) {
1266  const std::string exec_ra_prefix = "execute relalg";
1267  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1268  auto actual_query =
1269  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1270  auto session_ptr = get_session_ptr(session);
1271  auto query_state = create_query_state(session_ptr, actual_query);
1272  auto stdlog = STDLOG(session_ptr, query_state);
1273  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1274  stdlog.appendNameValuePairs("nonce", nonce);
1275  auto timer = DEBUG_TIMER(__func__);
1276  try {
1277  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1278  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1279  };
1280 
1281  if (first_n >= 0 && at_most_n >= 0) {
1283  std::string("At most one of first_n and at_most_n can be set"));
1284  }
1285 
1286  if (leaf_aggregator_.leafCount() > 0) {
1287  if (!agg_handler_) {
1288  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
1289  }
1290  _return.total_time_ms = measure<>::execution([&]() {
1291  agg_handler_->cluster_execute(_return,
1292  query_state->createQueryStateProxy(),
1293  query_state->getQueryStr(),
1294  column_format,
1295  nonce,
1296  first_n,
1297  at_most_n,
1299  });
1300  _return.nonce = nonce;
1301  } else {
1302  sql_execute_local(_return,
1303  query_state->createQueryStateProxy(),
1304  session_ptr,
1305  actual_query,
1306  column_format,
1307  nonce,
1308  first_n,
1309  at_most_n,
1310  use_calcite);
1311  }
1312  _return.total_time_ms += process_deferred_copy_from(session);
1313  std::string debug_json = timer.stopAndGetJson();
1314  if (!debug_json.empty()) {
1315  _return.__set_debug(std::move(debug_json));
1316  }
1317  stdlog.appendNameValuePairs(
1318  "execution_time_ms",
1319  _return.execution_time_ms,
1320  "total_time_ms", // BE-3420 - Redundant with duration field
1321  stdlog.duration<std::chrono::milliseconds>());
1322  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1323  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1324  } catch (const std::exception& e) {
1325  if (strstr(e.what(), "java.lang.NullPointerException")) {
1326  THROW_MAPD_EXCEPTION("query failed from broken view or other schema related issue");
1327  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1328  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1329  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1330  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1331  } else {
1332  THROW_MAPD_EXCEPTION(e.what());
1333  }
1334  }
1335 }
1336 
1338  const TSessionId& session,
1339  const std::string& query_str,
1340  const bool column_format,
1341  const int32_t first_n,
1342  const int32_t at_most_n) {
1343  const std::string exec_ra_prefix = "execute relalg";
1344  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1345  auto actual_query =
1346  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1347 
1348  auto session_ptr = get_session_ptr(session);
1349  CHECK(session_ptr);
1350  auto query_state = create_query_state(session_ptr, actual_query);
1351  auto stdlog = STDLOG(session_ptr, query_state);
1352  auto timer = DEBUG_TIMER(__func__);
1353 
1354  try {
1355  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1356  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1357  };
1358 
1359  if (first_n >= 0 && at_most_n >= 0) {
1361  std::string("At most one of first_n and at_most_n can be set"));
1362  }
1363  auto total_time_ms = measure<>::execution([&]() {
1365  query_state->createQueryStateProxy(),
1366  column_format,
1367  session_ptr->get_executor_device_type(),
1368  first_n,
1369  at_most_n,
1370  use_calcite);
1371  });
1372 
1373  _return.setExecutionTime(total_time_ms + process_deferred_copy_from(session));
1374 
1375  stdlog.appendNameValuePairs(
1376  "execution_time_ms",
1377  _return.getExecutionTime(),
1378  "total_time_ms", // BE-3420 - Redundant with duration field
1379  stdlog.duration<std::chrono::milliseconds>());
1380  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1381  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1382  } catch (const std::exception& e) {
1383  if (strstr(e.what(), "java.lang.NullPointerException")) {
1384  THROW_MAPD_EXCEPTION("query failed from broken view or other schema related issue");
1385  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1386  THROW_MAPD_EXCEPTION("multiple SQL statements not allowed");
1387  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1388  THROW_MAPD_EXCEPTION("empty SQL statment not allowed");
1389  } else {
1390  THROW_MAPD_EXCEPTION(e.what());
1391  }
1392  }
1393 }
1394 
1395 int64_t DBHandler::process_deferred_copy_from(const TSessionId& session_id) {
1396  int64_t total_time_ms(0);
1397  // if the SQL statement we just executed was a geo COPY FROM, the import
1398  // parameters were captured, and this flag set, so we do the actual import here
1399  if (auto deferred_copy_from_state = deferred_copy_from_sessions(session_id)) {
1400  // import_geo_table() calls create_table() which calls this function to
1401  // do the work, so reset the flag now to avoid executing this part a
1402  // second time at the end of that, which would fail as the table was
1403  // already created! Also reset the flag with a ScopeGuard on exiting
1404  // this function any other way, such as an exception from the code above!
1405  deferred_copy_from_sessions.remove(session_id);
1406 
1407  // create table as replicated?
1408  TCreateParams create_params;
1409  if (deferred_copy_from_state->partitions == "REPLICATED") {
1410  create_params.is_replicated = true;
1411  }
1412 
1413  // now do (and time) the import
1414  total_time_ms = measure<>::execution([&]() {
1415  importGeoTableGlobFilterSort(session_id,
1416  deferred_copy_from_state->table,
1417  deferred_copy_from_state->file_name,
1418  deferred_copy_from_state->copy_params,
1419  TRowDescriptor(),
1420  create_params);
1421  });
1422  }
1423  return total_time_ms;
1424 }
1425 
1426 void DBHandler::sql_execute_df(TDataFrame& _return,
1427  const TSessionId& session,
1428  const std::string& query_str,
1429  const TDeviceType::type results_device_type,
1430  const int32_t device_id,
1431  const int32_t first_n,
1432  const TArrowTransport::type transport_method) {
1433  auto session_ptr = get_session_ptr(session);
1434  CHECK(session_ptr);
1435  auto query_state = create_query_state(session_ptr, query_str);
1436  auto stdlog = STDLOG(session_ptr, query_state);
1437 
1438  const auto executor_device_type = session_ptr->get_executor_device_type();
1439 
1440  if (results_device_type == TDeviceType::GPU) {
1441  if (executor_device_type != ExecutorDeviceType::GPU) {
1442  THROW_MAPD_EXCEPTION(std::string("GPU mode is not allowed in this session"));
1443  }
1444  if (!data_mgr_->gpusPresent()) {
1445  THROW_MAPD_EXCEPTION(std::string("No GPU is available in this server"));
1446  }
1447  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1449  std::string("Invalid device_id or unavailable GPU with this ID"));
1450  }
1451  }
1452  ParserWrapper pw{query_str};
1453  if (pw.getQueryType() != ParserWrapper::QueryType::Read) {
1454  THROW_MAPD_EXCEPTION(std::string(
1455  "Only read queries supported for the Arrow sql_execute_df endpoint."));
1456  }
1457  if (pw.isCalciteExplain()) {
1458  THROW_MAPD_EXCEPTION(std::string(
1459  "Explain is currently unsupported by the Arrow sql_execute_df endpoint."));
1460  }
1461 
1462  ExecutionResult execution_result;
1463  sql_execute_impl(execution_result,
1464  query_state->createQueryStateProxy(),
1465  true, /* column_format - does this do anything? */
1466  executor_device_type,
1467  first_n,
1468  -1, /* at_most_n */
1469  true);
1470 
1471  const auto result_set = execution_result.getRows();
1472  const auto executor_results_device_type = results_device_type == TDeviceType::CPU
1475  _return.execution_time_ms =
1476  execution_result.getExecutionTime() - result_set->getQueueTime();
1477  const auto converter = std::make_unique<ArrowResultSetConverter>(
1478  result_set,
1479  data_mgr_,
1480  executor_results_device_type,
1481  device_id,
1482  getTargetNames(execution_result.getTargetsMeta()),
1483  first_n,
1484  ArrowTransport(transport_method));
1485  ArrowResult arrow_result;
1486  _return.arrow_conversion_time_ms +=
1487  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
1488  _return.sm_handle =
1489  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
1490  _return.sm_size = arrow_result.sm_size;
1491  _return.df_handle =
1492  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
1493  _return.df_buffer =
1494  std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
1495  if (executor_results_device_type == ExecutorDeviceType::GPU) {
1496  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1497  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
1498  ipc_handle_to_dev_ptr_.insert(
1499  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
1500  }
1501  _return.df_size = arrow_result.df_size;
1502 }
1503 
1504 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1505  const TSessionId& session,
1506  const std::string& query_str,
1507  const int32_t device_id,
1508  const int32_t first_n) {
1509  auto stdlog = STDLOG(get_session_ptr(session));
1510  sql_execute_df(_return,
1511  session,
1512  query_str,
1513  TDeviceType::GPU,
1514  device_id,
1515  first_n,
1516  TArrowTransport::SHARED_MEMORY);
1517 }
1518 
1519 // For now we have only one user of a data frame in all cases.
1520 void DBHandler::deallocate_df(const TSessionId& session,
1521  const TDataFrame& df,
1522  const TDeviceType::type device_type,
1523  const int32_t device_id) {
1524  auto stdlog = STDLOG(get_session_ptr(session));
1525  std::string serialized_cuda_handle = "";
1526  if (device_type == TDeviceType::GPU) {
1527  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1528  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1529  TOmniSciException ex;
1530  ex.error_msg = std::string(
1531  "Current data frame handle is not bookkept or been inserted "
1532  "twice");
1533  LOG(ERROR) << ex.error_msg;
1534  throw ex;
1535  }
1536  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1537  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1538  }
1539  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1540  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1542  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1544  result,
1545  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1546  device_id,
1547  data_mgr_);
1548 }
1549 
1550 void DBHandler::sql_validate(TRowDescriptor& _return,
1551  const TSessionId& session,
1552  const std::string& query_str) {
1553  try {
1554  auto stdlog = STDLOG(get_session_ptr(session));
1555  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1556  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1557  stdlog.setQueryState(query_state);
1558 
1559  ParserWrapper pw{query_str};
1560  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1561  pw.is_update_dml) {
1562  throw std::runtime_error("Can only validate SELECT statements.");
1563  }
1564 
1565  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
1568 
1569  TPlanResult parse_result;
1571  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1572  query_state->getQueryStr(),
1573  {},
1574  true,
1576  /*check_privileges=*/true);
1577  const auto query_ra = parse_result.plan_result;
1578 
1579  const auto result = validate_rel_alg(query_ra, query_state->createQueryStateProxy());
1580  _return = fixup_row_descriptor(result.row_set.row_desc,
1581  query_state->getConstSessionInfo()->getCatalog());
1582  } catch (const std::exception& e) {
1583  THROW_MAPD_EXCEPTION(std::string(e.what()));
1584  }
1585 }
1586 
1587 namespace {
1588 
1590  std::unordered_set<std::string> uc_column_names;
1591  std::unordered_set<std::string> uc_column_table_qualifiers;
1592 };
1593 
1594 // Extract what looks like a (qualified) identifier from the partial query.
1595 // The results will be used to rank the auto-completion results: tables which
1596 // contain at least one of the identifiers first.
1598  const std::string& sql) {
1599  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1600  boost::regex::extended | boost::regex::icase};
1601  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1602  boost::sregex_token_iterator end;
1603  std::unordered_set<std::string> uc_column_names;
1604  std::unordered_set<std::string> uc_column_table_qualifiers;
1605  for (; tok_it != end; ++tok_it) {
1606  std::string column_name = *tok_it;
1607  std::vector<std::string> column_tokens;
1608  boost::split(column_tokens, column_name, boost::is_any_of("."));
1609  if (column_tokens.size() == 2) {
1610  // If the column name is qualified, take user's word.
1611  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1612  } else {
1613  uc_column_names.insert(to_upper(column_name));
1614  }
1615  }
1616  return {uc_column_names, uc_column_table_qualifiers};
1617 }
1618 
1619 } // namespace
1620 
1621 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1622  const TSessionId& session,
1623  const std::string& sql,
1624  const int cursor) {
1625  auto stdlog = STDLOG(get_session_ptr(session));
1626  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1627  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1628  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1629  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1630  proj_tokens.uc_column_names, visible_tables, stdlog);
1631  // Add the table qualifiers explicitly specified by the user.
1632  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1633  proj_tokens.uc_column_table_qualifiers.end());
1634  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1635  std::sort(
1636  hints.begin(),
1637  hints.end(),
1638  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1639  if (lhs.type == TCompletionHintType::TABLE &&
1640  rhs.type == TCompletionHintType::TABLE) {
1641  // Between two tables, one which is compatible with the specified
1642  // projections and one which isn't, pick the one which is compatible.
1643  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1644  compatible_table_names.end() &&
1645  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1646  compatible_table_names.end()) {
1647  return true;
1648  }
1649  }
1650  return lhs.type < rhs.type;
1651  });
1652 }
1653 
1654 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1655  std::vector<std::string>& visible_tables,
1656  query_state::StdLog& stdlog,
1657  const std::string& sql,
1658  const int cursor) {
1659  const auto& session_info = *stdlog.getConstSessionInfo();
1660  try {
1661  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1662 
1663  // Filter out keywords suggested by Calcite which we don't support.
1665  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1666  } catch (const std::exception& e) {
1667  TOmniSciException ex;
1668  ex.error_msg = std::string(e.what());
1669  LOG(ERROR) << ex.error_msg;
1670  throw ex;
1671  }
1672  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1673  const size_t length_to_cursor =
1674  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1675  // Trust hints from Calcite after the FROM keyword.
1676  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1677  return;
1678  }
1679  // Before FROM, the query is too incomplete for context-sensitive completions.
1680  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1681 }
1682 
1683 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1684  query_state::StdLog& stdlog,
1685  std::vector<std::string>& visible_tables,
1686  const std::string& sql,
1687  const int cursor) {
1688  const auto last_word =
1689  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1690  boost::regex select_expr{R"(\s*select\s+)",
1691  boost::regex::extended | boost::regex::icase};
1692  const size_t length_to_cursor =
1693  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1694  // After SELECT but before FROM, look for all columns in all tables which match the
1695  // prefix.
1696  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1697  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1698  // Trust the fully qualified columns the most.
1699  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1700  return;
1701  }
1702  // Not much information to use, just retrieve column names which match the prefix.
1703  if (should_suggest_column_hints(sql)) {
1704  get_column_hints(hints, last_word, column_names_by_table);
1705  return;
1706  }
1707  const std::string kFromKeyword{"FROM"};
1708  if (boost::istarts_with(kFromKeyword, last_word)) {
1709  TCompletionHint keyword_hint;
1710  keyword_hint.type = TCompletionHintType::KEYWORD;
1711  keyword_hint.replaced = last_word;
1712  keyword_hint.hints.emplace_back(kFromKeyword);
1713  hints.push_back(keyword_hint);
1714  }
1715  } else {
1716  const std::string kSelectKeyword{"SELECT"};
1717  if (boost::istarts_with(kSelectKeyword, last_word)) {
1718  TCompletionHint keyword_hint;
1719  keyword_hint.type = TCompletionHintType::KEYWORD;
1720  keyword_hint.replaced = last_word;
1721  keyword_hint.hints.emplace_back(kSelectKeyword);
1722  hints.push_back(keyword_hint);
1723  }
1724  }
1725 }
1726 
1727 std::unordered_map<std::string, std::unordered_set<std::string>>
1728 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1729  query_state::StdLog& stdlog) {
1730  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1731  for (auto it = table_names.begin(); it != table_names.end();) {
1732  TTableDetails table_details;
1733  try {
1734  get_table_details_impl(table_details, stdlog, *it, false, false);
1735  } catch (const TOmniSciException& e) {
1736  // Remove the corrupted Table/View name from the list for further processing.
1737  it = table_names.erase(it);
1738  continue;
1739  }
1740  for (const auto& column_type : table_details.row_desc) {
1741  column_names_by_table[*it].emplace(column_type.col_name);
1742  }
1743  ++it;
1744  }
1745  return column_names_by_table;
1746 }
1747 
1751 }
1752 
1754  const std::unordered_set<std::string>& uc_column_names,
1755  std::vector<std::string>& table_names,
1756  query_state::StdLog& stdlog) {
1757  std::unordered_set<std::string> compatible_table_names_by_column;
1758  for (auto it = table_names.begin(); it != table_names.end();) {
1759  TTableDetails table_details;
1760  try {
1761  get_table_details_impl(table_details, stdlog, *it, false, false);
1762  } catch (const TOmniSciException& e) {
1763  // Remove the corrupted Table/View name from the list for further processing.
1764  it = table_names.erase(it);
1765  continue;
1766  }
1767  for (const auto& column_type : table_details.row_desc) {
1768  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1769  compatible_table_names_by_column.emplace(to_upper(*it));
1770  break;
1771  }
1772  }
1773  ++it;
1774  }
1775  return compatible_table_names_by_column;
1776 }
1777 
1778 TQueryResult DBHandler::validate_rel_alg(const std::string& query_ra,
1779  QueryStateProxy query_state_proxy) {
1780  TQueryResult _return;
1782  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1783  [this, &result, &query_state_proxy, &query_ra](const size_t executor_index) {
1784  auto qid_scope_guard = query_state_proxy.getQueryState().setThreadLocalQueryId();
1785  execute_rel_alg(result,
1786  query_state_proxy,
1787  query_ra,
1788  true,
1790  -1,
1791  -1,
1792  /*just_validate=*/true,
1793  /*find_filter_push_down_candidates=*/false,
1795  executor_index);
1796  });
1798  dispatch_queue_->submit(execute_rel_alg_task, /*is_update_delete=*/false);
1799  auto result_future = execute_rel_alg_task->get_future();
1800  result_future.get();
1801  DBHandler::convertData(_return, result, query_state_proxy, query_ra, true, -1, -1);
1802  return _return;
1803 }
1804 
1805 void DBHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1806  auto stdlog = STDLOG(get_session_ptr(session));
1807  auto session_ptr = stdlog.getConstSessionInfo();
1808  if (!session_ptr->get_currentUser().isSuper) {
1809  // WARNING: This appears to not include roles a user is a member of,
1810  // if the role has no permissions granted to it.
1811  roles =
1812  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1813  session_ptr->getCatalog().getCurrentDB().dbId);
1814  } else {
1815  roles = SysCatalog::instance().getRoles(
1816  false, true, session_ptr->get_currentUser().userName);
1817  }
1818 }
1819 
1820 bool DBHandler::has_role(const TSessionId& sessionId,
1821  const std::string& granteeName,
1822  const std::string& roleName) {
1823  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1824  const auto session_ptr = stdlog.getConstSessionInfo();
1825  const auto current_user = session_ptr->get_currentUser();
1826  if (!current_user.isSuper) {
1827  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1828  user && current_user.userName != granteeName) {
1829  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1830  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1831  current_user.userName, granteeName, true)) {
1833  "Only super users can check roles assignment that have not been directly "
1834  "granted to a user.");
1835  }
1836  }
1837  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1838 }
1839 
1840 static TDBObject serialize_db_object(const std::string& roleName,
1841  const DBObject& inObject) {
1842  TDBObject outObject;
1843  outObject.objectName = inObject.getName();
1844  outObject.grantee = roleName;
1845  outObject.objectId = inObject.getObjectKey().objectId;
1846  const auto ap = inObject.getPrivileges();
1847  switch (inObject.getObjectKey().permissionType) {
1848  case DatabaseDBObjectType:
1849  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1850  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1851  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1852  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1853  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1854 
1855  break;
1856  case TableDBObjectType:
1857  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1858  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1859  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1860  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1861  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1862  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1863  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1864  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1865  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1866 
1867  break;
1868  case DashboardDBObjectType:
1869  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1870  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1871  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1872  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1873  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1874 
1875  break;
1876  case ViewDBObjectType:
1877  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1878  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1879  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1880  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1881  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1882  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1883  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1884 
1885  break;
1886  case ServerDBObjectType:
1887  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1888  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1889  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1890  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1891  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::SERVER_USAGE));
1892 
1893  break;
1894  default:
1895  CHECK(false);
1896  }
1897  const int type_val = static_cast<int>(inObject.getType());
1898  CHECK(type_val >= 0 && type_val < 6);
1899  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1900  return outObject;
1901 }
1902 
1904  const TDBObjectPermissions& permissions) {
1905  if (!permissions.__isset.database_permissions_) {
1906  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1907  }
1908  auto perms = permissions.database_permissions_;
1909  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1910  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1911  (perms.view_sql_editor_ &&
1913  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1914  return false;
1915  } else {
1916  return true;
1917  }
1918 }
1919 
1921  const TDBObjectPermissions& permissions) {
1922  if (!permissions.__isset.table_permissions_) {
1923  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1924  }
1925  auto perms = permissions.table_permissions_;
1926  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1927  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1928  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1929  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1930  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1931  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1932  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1933  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1934  return false;
1935  } else {
1936  return true;
1937  }
1938 }
1939 
1941  const TDBObjectPermissions& permissions) {
1942  if (!permissions.__isset.dashboard_permissions_) {
1943  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1944  }
1945  auto perms = permissions.dashboard_permissions_;
1946  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1947  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1948  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1949  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1950  return false;
1951  } else {
1952  return true;
1953  }
1954 }
1955 
1957  const TDBObjectPermissions& permissions) {
1958  if (!permissions.__isset.view_permissions_) {
1959  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1960  }
1961  auto perms = permissions.view_permissions_;
1962  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1963  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1964  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1965  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1966  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1967  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1968  return false;
1969  } else {
1970  return true;
1971  }
1972 }
1973 
1975  const TDBObjectPermissions& permissions) {
1976  CHECK(permissions.__isset.server_permissions_);
1977  auto perms = permissions.server_permissions_;
1978  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
1979  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
1980  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER)) ||
1981  (perms.usage_ && !privs.hasPermission(ServerPrivileges::SERVER_USAGE))) {
1982  return false;
1983  } else {
1984  return true;
1985  }
1986 }
1987 
1988 bool DBHandler::has_object_privilege(const TSessionId& sessionId,
1989  const std::string& granteeName,
1990  const std::string& objectName,
1991  const TDBObjectType::type objectType,
1992  const TDBObjectPermissions& permissions) {
1993  auto stdlog = STDLOG(get_session_ptr(sessionId));
1994  auto session_ptr = stdlog.getConstSessionInfo();
1995  auto const& cat = session_ptr->getCatalog();
1996  auto const& current_user = session_ptr->get_currentUser();
1997  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1998  current_user.userName, granteeName, false)) {
2000  "Users except superusers can only check privileges for self or roles granted "
2001  "to "
2002  "them.")
2003  }
2005  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
2006  user_meta.isSuper) {
2007  return true;
2008  }
2009  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
2010  if (!grnt) {
2011  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
2012  }
2014  std::string func_name;
2015  switch (objectType) {
2018  func_name = "database";
2019  break;
2022  func_name = "table";
2023  break;
2026  func_name = "dashboard";
2027  break;
2030  func_name = "view";
2031  break;
2034  func_name = "server";
2035  break;
2036  default:
2037  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
2038  }
2039  DBObject req_object(objectName, type);
2040  req_object.loadKey(cat);
2041 
2042  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
2043  if (grantee_object) {
2044  // if grantee has privs on the object
2045  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
2046  } else {
2047  // no privileges on that object
2048  return false;
2049  }
2050 }
2051 
2052 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
2053  const TSessionId& sessionId,
2054  const std::string& roleName) {
2055  auto stdlog = STDLOG(get_session_ptr(sessionId));
2056  auto session_ptr = stdlog.getConstSessionInfo();
2057  auto const& user = session_ptr->get_currentUser();
2058  if (!user.isSuper &&
2059  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
2060  return;
2061  }
2062  auto* rl = SysCatalog::instance().getGrantee(roleName);
2063  if (rl) {
2064  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
2065  for (auto& dbObject : *rl->getDbObjects(true)) {
2066  if (dbObject.first.dbId != dbId) {
2067  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
2068  // usecase for now, though)
2069  continue;
2070  }
2071  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
2072  TDBObjectsForRole.push_back(tdbObject);
2073  }
2074  } else {
2075  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
2076  }
2077 }
2078 
2079 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
2080  const TSessionId& sessionId,
2081  const std::string& objectName,
2082  const TDBObjectType::type type) {
2083  auto stdlog = STDLOG(get_session_ptr(sessionId));
2084  auto session_ptr = stdlog.getConstSessionInfo();
2085  DBObjectType object_type;
2086  switch (type) {
2088  object_type = DBObjectType::DatabaseDBObjectType;
2089  break;
2091  object_type = DBObjectType::TableDBObjectType;
2092  break;
2095  break;
2097  object_type = DBObjectType::ViewDBObjectType;
2098  break;
2100  object_type = DBObjectType::ServerDBObjectType;
2101  break;
2102  default:
2103  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
2104  ": unknown object type (" + std::to_string(type) + ").");
2105  }
2106  DBObject object_to_find(objectName, object_type);
2107 
2108  // TODO(adb): Use DatabaseLock to protect method
2109  try {
2110  if (object_type == DashboardDBObjectType) {
2111  if (objectName == "") {
2112  object_to_find = DBObject(-1, object_type);
2113  } else {
2114  object_to_find = DBObject(std::stoi(objectName), object_type);
2115  }
2116  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
2117  !objectName.empty()) {
2118  // special handling for view / table
2119  auto const& cat = session_ptr->getCatalog();
2120  auto td = cat.getMetadataForTable(objectName, false);
2121  if (td) {
2122  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
2123  object_to_find = DBObject(objectName, object_type);
2124  }
2125  }
2126  object_to_find.loadKey(session_ptr->getCatalog());
2127  } catch (const std::exception&) {
2128  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
2129  }
2130 
2131  // object type on database level
2132  DBObject object_to_find_dblevel("", object_type);
2133  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
2134  // if user is superuser respond with a full priv
2135  if (session_ptr->get_currentUser().isSuper) {
2136  // using ALL_TABLE here to set max permissions
2137  DBObject dbObj{object_to_find.getObjectKey(),
2139  session_ptr->get_currentUser().userId};
2140  dbObj.setName("super");
2141  TDBObjects.push_back(
2142  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
2143  };
2144 
2145  std::vector<std::string> grantees =
2146  SysCatalog::instance().getRoles(true,
2147  session_ptr->get_currentUser().isSuper,
2148  session_ptr->get_currentUser().userName);
2149  for (const auto& grantee : grantees) {
2150  DBObject* object_found;
2151  auto* gr = SysCatalog::instance().getGrantee(grantee);
2152  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
2153  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2154  }
2155  // check object permissions on Database level
2156  if (gr &&
2157  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
2158  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2159  }
2160  }
2161 }
2162 
2164  std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr,
2165  std::vector<std::string>& roles,
2166  const TSessionId& sessionId,
2167  const std::string& granteeName,
2168  bool effective) {
2169  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
2170  if (grantee) {
2171  if (session_ptr->get_currentUser().isSuper) {
2172  roles = grantee->getRoles(/*only_direct=*/!effective);
2173  } else if (grantee->isUser()) {
2174  if (session_ptr->get_currentUser().userName == granteeName) {
2175  roles = grantee->getRoles(/*only_direct=*/!effective);
2176  } else {
2178  "Only a superuser is authorized to request list of roles granted to another "
2179  "user.");
2180  }
2181  } else {
2182  CHECK(!grantee->isUser());
2183  // granteeName is actually a roleName here and we can check a role
2184  // only if it is granted to us
2185  if (SysCatalog::instance().isRoleGrantedToGrantee(
2186  session_ptr->get_currentUser().userName, granteeName, false)) {
2187  roles = grantee->getRoles(/*only_direct=*/!effective);
2188  } else {
2189  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
2190  }
2191  }
2192  } else {
2193  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
2194  }
2195 }
2196 
2197 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
2198  const TSessionId& sessionId,
2199  const std::string& granteeName) {
2200  // WARNING: This function only returns directly granted roles.
2201  // See also: get_all_effective_roles_for_user() for all of a user's roles.
2202  auto stdlog = STDLOG(get_session_ptr(sessionId));
2203  auto session_ptr = stdlog.getConstSessionInfo();
2204  getAllRolesForUserImpl(session_ptr, roles, sessionId, granteeName, /*effective=*/false);
2205 }
2206 
2207 void DBHandler::get_all_effective_roles_for_user(std::vector<std::string>& roles,
2208  const TSessionId& sessionId,
2209  const std::string& granteeName) {
2210  auto stdlog = STDLOG(get_session_ptr(sessionId));
2211  auto session_ptr = stdlog.getConstSessionInfo();
2212  getAllRolesForUserImpl(session_ptr, roles, sessionId, granteeName, /*effective=*/true);
2213 }
2214 
2215 namespace {
2217  const std::map<std::string, std::vector<std::string>>& table_col_names) {
2218  std::ostringstream oss;
2219  for (const auto& [table_name, col_names] : table_col_names) {
2220  oss << ":" << table_name;
2221  for (const auto& col_name : col_names) {
2222  oss << "," << col_name;
2223  }
2224  }
2225  return oss.str();
2226 }
2227 } // namespace
2228 
2230  TPixelTableRowResult& _return,
2231  const TSessionId& session,
2232  const int64_t widget_id,
2233  const TPixel& pixel,
2234  const std::map<std::string, std::vector<std::string>>& table_col_names,
2235  const bool column_format,
2236  const int32_t pixel_radius,
2237  const std::string& nonce) {
2238  auto stdlog = STDLOG(get_session_ptr(session),
2239  "widget_id",
2240  widget_id,
2241  "pixel.x",
2242  pixel.x,
2243  "pixel.y",
2244  pixel.y,
2245  "column_format",
2246  column_format,
2247  "pixel_radius",
2248  pixel_radius,
2249  "table_col_names",
2250  dump_table_col_names(table_col_names),
2251  "nonce",
2252  nonce);
2253  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2254  auto session_ptr = stdlog.getSessionInfo();
2255  if (!render_handler_) {
2256  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
2257  }
2258 
2259  try {
2260  render_handler_->get_result_row_for_pixel(_return,
2261  session_ptr,
2262  widget_id,
2263  pixel,
2264  table_col_names,
2265  column_format,
2266  pixel_radius,
2267  nonce);
2268  } catch (std::exception& e) {
2269  THROW_MAPD_EXCEPTION(e.what());
2270  }
2271 }
2272 
2274  const ColumnDescriptor* cd) {
2275  TColumnType col_type;
2276  col_type.col_name = cd->columnName;
2277  col_type.src_name = cd->sourceName;
2278  col_type.col_id = cd->columnId;
2279  col_type.col_type.type = type_to_thrift(cd->columnType);
2280  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
2281  col_type.col_type.nullable = !cd->columnType.get_notnull();
2282  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
2283  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
2284  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
2285  }
2286  if (IS_GEO(cd->columnType.get_type())) {
2288  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
2289  } else {
2290  col_type.col_type.precision = cd->columnType.get_precision();
2291  col_type.col_type.scale = cd->columnType.get_scale();
2292  }
2293  col_type.is_system = cd->isSystemCol;
2295  cat != nullptr) {
2296  // have to get the actual size of the encoding from the dictionary definition
2297  const int dict_id = cd->columnType.get_comp_param();
2298  if (!cat->getMetadataForDict(dict_id, false)) {
2299  col_type.col_type.comp_param = 0;
2300  return col_type;
2301  }
2302  auto dd = cat->getMetadataForDict(dict_id, false);
2303  if (!dd) {
2304  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
2305  }
2306  col_type.col_type.comp_param = dd->dictNBits;
2307  } else {
2308  col_type.col_type.comp_param =
2309  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
2310  ? 32
2311  : cd->columnType.get_comp_param();
2312  }
2313  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
2314  if (cd->default_value.has_value()) {
2315  col_type.__set_default_value(cd->getDefaultValueLiteral());
2316  }
2317  return col_type;
2318 }
2319 
2320 void DBHandler::get_internal_table_details(TTableDetails& _return,
2321  const TSessionId& session,
2322  const std::string& table_name,
2323  const bool include_system_columns) {
2324  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2325  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2326  get_table_details_impl(_return, stdlog, table_name, include_system_columns, false);
2327 }
2328 
2330  TTableDetails& _return,
2331  const TSessionId& session,
2332  const std::string& table_name,
2333  const std::string& database_name) {
2334  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2335  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2336  get_table_details_impl(_return, stdlog, table_name, true, false, database_name);
2337 }
2338 
2339 void DBHandler::get_table_details(TTableDetails& _return,
2340  const TSessionId& session,
2341  const std::string& table_name) {
2342  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2343  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2344 
2345  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2348  get_table_details_impl(_return, stdlog, table_name, false, false);
2349 }
2350 
2351 void DBHandler::get_table_details_for_database(TTableDetails& _return,
2352  const TSessionId& session,
2353  const std::string& table_name,
2354  const std::string& database_name) {
2355  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2356  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2357 
2358  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2361  get_table_details_impl(_return, stdlog, table_name, false, false, database_name);
2362 }
2363 
2364 void DBHandler::get_table_details_impl(TTableDetails& _return,
2365  query_state::StdLog& stdlog,
2366  const std::string& table_name,
2367  const bool get_system,
2368  const bool get_physical,
2369  const std::string& database_name) {
2370  try {
2371  auto session_info = stdlog.getSessionInfo();
2372  auto& cat = (database_name.empty())
2373  ? session_info->getCatalog()
2374  : *SysCatalog::instance().getCatalog(database_name);
2375  const auto td_with_lock =
2377  cat, table_name, false);
2378  const auto td = td_with_lock();
2379  CHECK(td);
2380 
2381  bool have_privileges_on_view_sources = true;
2382  if (td->isView) {
2383  auto query_state = create_query_state(session_info, td->viewSQL);
2384  stdlog.setQueryState(query_state);
2385  try {
2386  if (hasTableAccessPrivileges(td, *session_info)) {
2387  const auto [query_ra, locks] = parse_to_ra(query_state->createQueryStateProxy(),
2388  query_state->getQueryStr(),
2389  {},
2390  true,
2392  false);
2393  try {
2394  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2395  query_ra);
2396  } catch (const std::runtime_error&) {
2397  have_privileges_on_view_sources = false;
2398  }
2399 
2400  const auto result = validate_rel_alg(query_ra.plan_result,
2401  query_state->createQueryStateProxy());
2402 
2403  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
2404  } else {
2405  throw std::runtime_error(
2406  "Unable to access view " + table_name +
2407  ". The view may not exist, or the logged in user may not "
2408  "have permission to access the view.");
2409  }
2410  } catch (const std::exception& e) {
2411  throw std::runtime_error("View '" + table_name +
2412  "' query has failed with an error: '" +
2413  std::string(e.what()) +
2414  "'.\nThe view must be dropped and re-created to "
2415  "resolve the error. \nQuery:\n" +
2416  query_state->getQueryStr());
2417  }
2418  } else {
2419  if (hasTableAccessPrivileges(td, *session_info)) {
2420  const auto col_descriptors =
2421  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
2422  const auto deleted_cd = cat.getDeletedColumn(td);
2423  for (const auto cd : col_descriptors) {
2424  if (cd == deleted_cd) {
2425  continue;
2426  }
2427  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
2428  }
2429  } else {
2430  throw std::runtime_error(
2431  "Unable to access table " + table_name +
2432  ". The table may not exist, or the logged in user may not "
2433  "have permission to access the table.");
2434  }
2435  }
2436  _return.fragment_size = td->maxFragRows;
2437  _return.page_size = td->fragPageSize;
2438  _return.max_rows = td->maxRows;
2439  _return.view_sql =
2440  (have_privileges_on_view_sources ? td->viewSQL
2441  : "[Not enough privileges to see the view SQL]");
2442  _return.shard_count = td->nShards;
2443  _return.key_metainfo = td->keyMetainfo;
2444  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2445  _return.partition_detail =
2446  td->partitions.empty()
2448  : (table_is_replicated(td)
2449  ? TPartitionDetail::REPLICATED
2450  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2451  : TPartitionDetail::OTHER));
2452  } catch (const std::runtime_error& e) {
2453  THROW_MAPD_EXCEPTION(std::string(e.what()));
2454  }
2455 }
2456 
2457 void DBHandler::get_link_view(TFrontendView& _return,
2458  const TSessionId& session,
2459  const std::string& link) {
2460  auto stdlog = STDLOG(get_session_ptr(session));
2461  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2462  auto session_ptr = stdlog.getConstSessionInfo();
2463  auto const& cat = session_ptr->getCatalog();
2464  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2465  if (!ld) {
2466  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
2467  }
2468  _return.view_state = ld->viewState;
2469  _return.view_name = ld->link;
2470  _return.update_time = ld->updateTime;
2471  _return.view_metadata = ld->viewMetadata;
2472 }
2473 
2475  const TableDescriptor* td,
2476  const Catalog_Namespace::SessionInfo& session_info) {
2477  auto& cat = session_info.getCatalog();
2478  auto user_metadata = session_info.get_currentUser();
2479 
2480  if (user_metadata.isSuper) {
2481  return true;
2482  }
2483 
2485  dbObject.loadKey(cat);
2486  std::vector<DBObject> privObjects = {dbObject};
2487 
2488  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2489 }
2490 
2491 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2492  const Catalog_Namespace::SessionInfo& session_info,
2493  const GetTablesType get_tables_type,
2494  const std::string& database_name) {
2495  if (database_name.empty()) {
2496  table_names = session_info.getCatalog().getTableNamesForUser(
2497  session_info.get_currentUser(), get_tables_type);
2498  } else {
2499  auto request_cat = SysCatalog::instance().getCatalog(database_name);
2500  table_names = request_cat->getTableNamesForUser(session_info.get_currentUser(),
2501  get_tables_type);
2502  }
2503 }
2504 
2505 void DBHandler::get_tables(std::vector<std::string>& table_names,
2506  const TSessionId& session) {
2507  auto stdlog = STDLOG(get_session_ptr(session));
2508  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2510  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2511 }
2512 
2513 void DBHandler::get_tables_for_database(std::vector<std::string>& table_names,
2514  const TSessionId& session,
2515  const std::string& database_name) {
2516  auto stdlog = STDLOG(get_session_ptr(session));
2517  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2518 
2519  get_tables_impl(table_names,
2520  *stdlog.getConstSessionInfo(),
2522  database_name);
2523 }
2524 
2525 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2526  const TSessionId& session) {
2527  auto stdlog = STDLOG(get_session_ptr(session));
2528  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2529  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2530 }
2531 
2532 void DBHandler::get_views(std::vector<std::string>& table_names,
2533  const TSessionId& session) {
2534  auto stdlog = STDLOG(get_session_ptr(session));
2535  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2536  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2537 }
2538 
2539 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2540  QueryStateProxy query_state_proxy,
2541  const Catalog_Namespace::SessionInfo& session_info,
2542  const bool with_table_locks) {
2543  const auto& cat = session_info.getCatalog();
2544  // Get copies of table descriptors here in order to avoid possible use of dangling
2545  // pointers, if tables are concurrently dropped.
2546  const auto tables = cat.getAllTableMetadataCopy();
2547  _return.reserve(tables.size());
2548 
2549  for (const auto& td : tables) {
2550  if (td.shard >= 0) {
2551  // skip shards, they're not standalone tables
2552  continue;
2553  }
2554  if (!hasTableAccessPrivileges(&td, session_info)) {
2555  // skip table, as there are no privileges to access it
2556  continue;
2557  }
2558 
2559  TTableMeta ret;
2560  ret.table_name = td.tableName;
2561  ret.is_view = td.isView;
2562  ret.is_replicated = table_is_replicated(&td);
2563  ret.shard_count = td.nShards;
2564  ret.max_rows = td.maxRows;
2565  ret.table_id = td.tableId;
2566 
2567  std::vector<TTypeInfo> col_types;
2568  std::vector<std::string> col_names;
2569  size_t num_cols = 0;
2570  if (td.isView) {
2571  try {
2572  TPlanResult parse_result;
2574  std::tie(parse_result, locks) = parse_to_ra(
2575  query_state_proxy, td.viewSQL, {}, with_table_locks, system_parameters_);
2576  const auto query_ra = parse_result.plan_result;
2577 
2578  ExecutionResult ex_result;
2579  execute_rel_alg(ex_result,
2580  query_state_proxy,
2581  query_ra,
2582  true,
2584  -1,
2585  -1,
2586  /*just_validate=*/true,
2587  /*find_push_down_candidates=*/false,
2589  TQueryResult result;
2591  result, ex_result, query_state_proxy, query_ra, true, -1, -1);
2592  num_cols = result.row_set.row_desc.size();
2593  for (const auto& col : result.row_set.row_desc) {
2594  if (col.is_physical) {
2595  num_cols--;
2596  continue;
2597  }
2598  col_types.push_back(col.col_type);
2599  col_names.push_back(col.col_name);
2600  }
2601  } catch (std::exception& e) {
2602  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td.tableName;
2603  }
2604  } else {
2605  try {
2606  if (hasTableAccessPrivileges(&td, session_info)) {
2607  const auto col_descriptors =
2608  cat.getAllColumnMetadataForTable(td.tableId, false, true, false);
2609  const auto deleted_cd = cat.getDeletedColumn(&td);
2610  for (const auto cd : col_descriptors) {
2611  if (cd == deleted_cd) {
2612  continue;
2613  }
2614  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2615  col_names.push_back(cd->columnName);
2616  }
2617  num_cols = col_descriptors.size();
2618  } else {
2619  continue;
2620  }
2621  } catch (const std::runtime_error& e) {
2622  THROW_MAPD_EXCEPTION(e.what());
2623  }
2624  }
2625 
2626  ret.num_cols = num_cols;
2627  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2628  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2629 
2630  _return.push_back(ret);
2631  }
2632 }
2633 
2634 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2635  const TSessionId& session) {
2636  auto stdlog = STDLOG(get_session_ptr(session));
2637  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2638  auto session_ptr = stdlog.getConstSessionInfo();
2639  auto query_state = create_query_state(session_ptr, "");
2640  stdlog.setQueryState(query_state);
2641 
2642  auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
2645 
2646  try {
2647  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2648  } catch (const std::exception& e) {
2649  THROW_MAPD_EXCEPTION(e.what());
2650  }
2651 }
2652 
2653 void DBHandler::get_users(std::vector<std::string>& user_names,
2654  const TSessionId& session) {
2655  auto stdlog = STDLOG(get_session_ptr(session));
2656  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2657  auto session_ptr = stdlog.getConstSessionInfo();
2658  std::list<Catalog_Namespace::UserMetadata> user_list;
2659 
2660  if (!session_ptr->get_currentUser().isSuper) {
2661  user_list = SysCatalog::instance().getAllUserMetadata(
2662  session_ptr->getCatalog().getCurrentDB().dbId);
2663  } else {
2664  user_list = SysCatalog::instance().getAllUserMetadata();
2665  }
2666  for (auto u : user_list) {
2667  user_names.push_back(u.userName);
2668  }
2669 }
2670 
2671 void DBHandler::get_version(std::string& version) {
2672  version = MAPD_RELEASE;
2673 }
2674 
2675 void DBHandler::clear_gpu_memory(const TSessionId& session) {
2676  auto stdlog = STDLOG(get_session_ptr(session));
2677  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2678  auto session_ptr = stdlog.getConstSessionInfo();
2679  if (!session_ptr->get_currentUser().isSuper) {
2680  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2681  }
2682  try {
2684  } catch (const std::exception& e) {
2685  THROW_MAPD_EXCEPTION(e.what());
2686  }
2687  if (render_handler_) {
2688  render_handler_->clear_gpu_memory();
2689  }
2690 
2691  if (leaf_aggregator_.leafCount() > 0) {
2693  }
2694 }
2695 
2696 void DBHandler::clear_cpu_memory(const TSessionId& session) {
2697  auto stdlog = STDLOG(get_session_ptr(session));
2698  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2699  auto session_ptr = stdlog.getConstSessionInfo();
2700  if (!session_ptr->get_currentUser().isSuper) {
2701  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2702  }
2703  try {
2705  } catch (const std::exception& e) {
2706  THROW_MAPD_EXCEPTION(e.what());
2707  }
2708  if (render_handler_) {
2709  render_handler_->clear_cpu_memory();
2710  }
2711 
2712  if (leaf_aggregator_.leafCount() > 0) {
2714  }
2715 }
2716 
2717 void DBHandler::clearRenderMemory(const TSessionId& session) {
2718  auto stdlog = STDLOG(get_session_ptr(session));
2719  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2720  auto session_ptr = stdlog.getConstSessionInfo();
2721  if (!session_ptr->get_currentUser().isSuper) {
2722  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_render_memory");
2723  }
2724  if (render_handler_) {
2725  render_handler_->clear_cpu_memory();
2726  render_handler_->clear_gpu_memory();
2727  }
2728 }
2729 
2730 void DBHandler::set_cur_session(const TSessionId& parent_session,
2731  const TSessionId& leaf_session,
2732  const std::string& start_time_str,
2733  const std::string& label,
2734  bool for_running_query_kernel) {
2735  // internal API to manage query interruption in distributed mode
2736  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2737  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2738  auto session_ptr = stdlog.getConstSessionInfo();
2739 
2741  executor->enrollQuerySession(parent_session,
2742  label,
2743  start_time_str,
2745  for_running_query_kernel
2746  ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2747  : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
2748 }
2749 
2750 void DBHandler::invalidate_cur_session(const TSessionId& parent_session,
2751  const TSessionId& leaf_session,
2752  const std::string& start_time_str,
2753  const std::string& label,
2754  bool for_running_query_kernel) {
2755  // internal API to manage query interruption in distributed mode
2756  auto stdlog = STDLOG(get_session_ptr(leaf_session));
2757  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2759  executor->clearQuerySessionStatus(parent_session, start_time_str);
2760 }
2761 
2763  return INVALID_SESSION_ID;
2764 }
2765 
2766 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2767  const TSessionId& session,
2768  const std::string& memory_level) {
2769  auto stdlog = STDLOG(get_session_ptr(session));
2770  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2771  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2772  Data_Namespace::MemoryLevel mem_level;
2773  if (!memory_level.compare("gpu")) {
2775  internal_memory =
2776  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2777  } else {
2779  internal_memory =
2780  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2781  }
2782 
2783  for (auto memInfo : internal_memory) {
2784  TNodeMemoryInfo nodeInfo;
2785  if (leaf_aggregator_.leafCount() > 0) {
2786  nodeInfo.host_name = omnisci::get_hostname();
2787  }
2788  nodeInfo.page_size = memInfo.pageSize;
2789  nodeInfo.max_num_pages = memInfo.maxNumPages;
2790  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2791  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2792  for (auto gpu : memInfo.nodeMemoryData) {
2793  TMemoryData md;
2794  md.slab = gpu.slabNum;
2795  md.start_page = gpu.startPage;
2796  md.num_pages = gpu.numPages;
2797  md.touch = gpu.touch;
2798  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2799  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2800  nodeInfo.node_memory_data.push_back(md);
2801  }
2802  _return.push_back(nodeInfo);
2803  }
2804  if (leaf_aggregator_.leafCount() > 0) {
2805  std::vector<TNodeMemoryInfo> leafSummary =
2806  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2807  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2808  }
2809 }
2810 
2811 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos, const TSessionId& session) {
2812  auto stdlog = STDLOG(get_session_ptr(session));
2813  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2814  auto session_ptr = stdlog.getConstSessionInfo();
2815  const auto& user = session_ptr->get_currentUser();
2817  SysCatalog::instance().getDatabaseListForUser(user);
2818  for (auto& db : dbs) {
2819  TDBInfo dbinfo;
2820  dbinfo.db_name = std::move(db.dbName);
2821  dbinfo.db_owner = std::move(db.dbOwnerName);
2822  dbinfos.push_back(std::move(dbinfo));
2823  }
2824 }
2825 
2826 void DBHandler::set_execution_mode(const TSessionId& session,
2827  const TExecuteMode::type mode) {
2828  auto stdlog = STDLOG(get_session_ptr(session));
2829  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2830  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
2831  auto session_it = get_session_it_unsafe(session, write_lock);
2832  if (leaf_aggregator_.leafCount() > 0) {
2833  leaf_aggregator_.set_execution_mode(session, mode);
2834  try {
2835  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2836  } catch (const TOmniSciException& e) {
2837  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2838  }
2839  return;
2840  }
2841  DBHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2842 }
2843 
2844 namespace {
2845 
2847  if (td && td->nShards) {
2848  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2849  }
2850 }
2851 
2852 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
2853  const std::vector<std::string>& column_names) {
2854  std::unordered_set<std::string> unique_names;
2855  for (const auto& name : column_names) {
2856  auto lower_name = to_lower(name);
2857  if (unique_names.find(lower_name) != unique_names.end()) {
2858  THROW_MAPD_EXCEPTION("Column " + name + " is mentioned multiple times");
2859  } else {
2860  unique_names.insert(lower_name);
2861  }
2862  }
2863  for (const auto& cd : descs) {
2864  auto iter = unique_names.find(to_lower(cd->columnName));
2865  if (iter != unique_names.end()) {
2866  unique_names.erase(iter);
2867  }
2868  }
2869  if (!unique_names.empty()) {
2870  THROW_MAPD_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
2871  }
2872 }
2873 
2874 // Return vector of IDs mapping column descriptors to the list of comumn names.
2875 // The size of the vector is the number of actual columns (geophisical columns excluded).
2876 // ID is either a position in column_names matching the descriptor, or -1 if the column
2877 // is missing from the column_names
2878 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
2879  const std::vector<std::string>& column_names) {
2880  std::vector<int> desc_to_column_ids;
2881  if (column_names.empty()) {
2882  int col_idx = 0;
2883  for (const auto& cd : descs) {
2884  if (!cd->isGeoPhyCol) {
2885  desc_to_column_ids.push_back(col_idx);
2886  ++col_idx;
2887  }
2888  }
2889  } else {
2890  for (const auto& cd : descs) {
2891  if (!cd->isGeoPhyCol) {
2892  bool found = false;
2893  for (size_t j = 0; j < column_names.size(); ++j) {
2894  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
2895  found = true;
2896  desc_to_column_ids.push_back(j);
2897  break;
2898  }
2899  }
2900  if (!found) {
2901  if (!cd->columnType.get_notnull()) {
2902  desc_to_column_ids.push_back(-1);
2903  } else {
2904  THROW_MAPD_EXCEPTION("Column '" + cd->columnName +
2905  "' cannot be omitted due to NOT NULL constraint");
2906  }
2907  }
2908  }
2909  }
2910  }
2911  return desc_to_column_ids;
2912 }
2913 
2914 } // namespace
2915 
2917  const TSessionId& session,
2918  const Catalog& catalog,
2919  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
2920  const ColumnDescriptor* cd,
2921  size_t& col_idx,
2922  size_t num_rows,
2923  const std::string& table_name,
2924  bool assign_render_groups) {
2925  auto geo_col_idx = col_idx - 1;
2926  const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2927  std::vector<std::vector<double>> coords_column, bounds_column;
2928  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2929  std::vector<int> render_groups_column;
2930  SQLTypeInfo ti = cd->columnType;
2931  if (num_rows != wkt_or_wkb_hex_column->size() ||
2932  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
2933  ti,
2934  coords_column,
2935  bounds_column,
2936  ring_sizes_column,
2937  poly_rings_column,
2938  false)) {
2939  std::ostringstream oss;
2940  oss << "Invalid geometry in column " << cd->columnName;
2941  THROW_MAPD_EXCEPTION(oss.str());
2942  }
2943 
2944  // start or continue assigning render groups for poly columns?
2945  if (IS_GEO_POLY(cd->columnType.get_type()) && assign_render_groups) {
2946  // get RGA to use
2947  import_export::RenderGroupAnalyzer* render_group_analyzer{};
2948  {
2949  // mutex the map access
2950  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
2951 
2952  // emplace new RGA or fetch existing RGA from map
2953  auto [itr_table, emplaced_table] = render_group_assignment_map_.try_emplace(
2954  session, RenderGroupAssignmentTableMap());
2955  LOG_IF(INFO, emplaced_table)
2956  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2957  "Persistent Data for Session '"
2958  << session << "'";
2959  auto [itr_column, emplaced_column] =
2960  itr_table->second.try_emplace(table_name, RenderGroupAssignmentColumnMap());
2961  LOG_IF(INFO, emplaced_column)
2962  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2963  "Persistent Data for Table '"
2964  << table_name << "'";
2965  auto [itr_analyzer, emplaced_analyzer] = itr_column->second.try_emplace(
2966  cd->columnName, std::make_unique<import_export::RenderGroupAnalyzer>());
2967  LOG_IF(INFO, emplaced_analyzer)
2968  << "load_table_binary_columnar_polys: Creating Render Group Assignment "
2969  "Persistent Data for Column '"
2970  << cd->columnName << "'";
2971  render_group_analyzer = itr_analyzer->second.get();
2972  CHECK(render_group_analyzer);
2973 
2974  // seed new RGA from existing table/column, to handle appends
2975  if (emplaced_analyzer) {
2976  LOG(INFO) << "load_table_binary_columnar_polys: Seeding Render Groups from "
2977  "existing table...";
2978  render_group_analyzer->seedFromExistingTableContents(
2979  catalog, table_name, cd->columnName);
2980  LOG(INFO) << "load_table_binary_columnar_polys: Done";
2981  }
2982  }
2983 
2984  // assign render groups for this set of bounds
2985  LOG(INFO) << "load_table_binary_columnar_polys: Assigning Render Groups...";
2986  render_groups_column.reserve(bounds_column.size());
2987  for (auto const& bounds : bounds_column) {
2988  CHECK_EQ(bounds.size(), 4u);
2989  int rg = render_group_analyzer->insertBoundsAndReturnRenderGroup(bounds);
2990  render_groups_column.push_back(rg);
2991  }
2992  LOG(INFO) << "load_table_binary_columnar_polys: Done";
2993  } else {
2994  // render groups all zero
2995  render_groups_column.resize(bounds_column.size(), 0);
2996  }
2997 
2998  // Populate physical columns, advance col_idx
3000  cd,
3001  import_buffers,
3002  col_idx,
3003  coords_column,
3004  bounds_column,
3005  ring_sizes_column,
3006  poly_rings_column,
3007  render_groups_column);
3008 }
3009 
3011  const TSessionId& session,
3012  const Catalog& catalog,
3013  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3014  const std::list<const ColumnDescriptor*>& cds,
3015  const std::vector<int>& desc_id_to_column_id,
3016  size_t num_rows,
3017  const std::string& table_name,
3018  bool assign_render_groups) {
3019  size_t skip_physical_cols = 0;
3020  size_t col_idx = 0, import_idx = 0;
3021  for (const auto& cd : cds) {
3022  if (skip_physical_cols > 0) {
3023  CHECK(cd->isGeoPhyCol);
3024  skip_physical_cols--;
3025  continue;
3026  } else if (cd->columnType.is_geometry()) {
3027  skip_physical_cols = cd->columnType.get_physical_cols();
3028  }
3029  if (desc_id_to_column_id[import_idx] == -1) {
3030  import_buffers[col_idx]->addDefaultValues(cd, num_rows);
3031  col_idx++;
3032  if (cd->columnType.is_geometry()) {
3033  fillGeoColumns(session,
3034  catalog,
3035  import_buffers,
3036  cd,
3037  col_idx,
3038  num_rows,
3039  table_name,
3040  assign_render_groups);
3041  }
3042  } else {
3043  col_idx++;
3044  col_idx += skip_physical_cols;
3045  }
3046  import_idx++;
3047  }
3048 }
3049 
3050 void DBHandler::load_table_binary(const TSessionId& session,
3051  const std::string& table_name,
3052  const std::vector<TRow>& rows,
3053  const std::vector<std::string>& column_names) {
3054  try {
3055  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3056  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3057  auto session_ptr = stdlog.getConstSessionInfo();
3058 
3059  if (rows.empty()) {
3060  THROW_MAPD_EXCEPTION("No rows to insert");
3061  }
3062 
3063  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
3066  std::unique_ptr<import_export::Loader> loader;
3067  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3068  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3069  table_name,
3070  rows.front().cols.size(),
3071  &loader,
3072  &import_buffers,
3073  column_names,
3074  "load_table_binary");
3075 
3076  auto col_descs = loader->get_column_descs();
3077  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3078 
3079  size_t rows_completed = 0;
3080  for (auto const& row : rows) {
3081  size_t col_idx = 0;
3082  try {
3083  for (auto cd : col_descs) {
3084  auto mapped_idx = desc_id_to_column_id[col_idx];
3085  if (mapped_idx != -1) {
3086  import_buffers[col_idx]->add_value(
3087  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3088  }
3089  col_idx++;
3090  }
3091  rows_completed++;
3092  } catch (const std::exception& e) {
3093  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3094  import_buffers[col_idx_to_pop]->pop_value();
3095  }
3096  LOG(ERROR) << "Input exception thrown: " << e.what()
3097  << ". Row discarded, issue at column : " << (col_idx + 1)
3098  << " data :" << row;
3099  }
3100  }
3101  fillMissingBuffers(session,
3102  session_ptr->getCatalog(),
3103  import_buffers,
3104  col_descs,
3105  desc_id_to_column_id,
3106  rows_completed,
3107  table_name,
3108  false);
3109  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3110  session_ptr->getCatalog(), table_name);
3111  if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3112  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3113  }
3114  } catch (const std::exception& e) {
3115  THROW_MAPD_EXCEPTION(std::string(e.what()));
3116  }
3117 }
3118 
3119 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3121  const Catalog_Namespace::SessionInfo& session_info,
3122  const std::string& table_name,
3123  size_t num_cols,
3124  std::unique_ptr<import_export::Loader>* loader,
3125  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3126  const std::vector<std::string>& column_names,
3127  std::string load_type) {
3128  if (num_cols == 0) {
3129  THROW_MAPD_EXCEPTION("No columns to insert");
3130  }
3131  check_read_only(load_type);
3132  auto& cat = session_info.getCatalog();
3133  auto td_with_lock =
3134  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3136  cat, table_name, true));
3137  const auto td = (*td_with_lock)();
3138  CHECK(td);
3139 
3140  if (g_cluster && !leaf_aggregator_.leafCount()) {
3141  // Sharded table rows need to be routed to the leaf by an aggregator.
3143  }
3144  check_table_load_privileges(session_info, table_name);
3145 
3146  if (leaf_aggregator_.leafCount() > 0) {
3147  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
3148  } else {
3149  loader->reset(new import_export::Loader(cat, td));
3150  }
3151 
3152  auto col_descs = (*loader)->get_column_descs();
3153  check_valid_column_names(col_descs, column_names);
3154  if (column_names.empty()) {
3155  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
3156  // Subtracting 1 (rowid) until TableDescriptor is updated.
3157  auto geo_physical_cols = std::count_if(
3158  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
3159  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
3160  (td->hasDeletedCol ? 2 : 1);
3161  if (num_cols != num_table_cols) {
3162  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
3163  ") does not match number of columns in table " +
3164  td->tableName + " (" + std::to_string(num_table_cols) +
3165  ")");
3166  }
3167  } else if (num_cols != column_names.size()) {
3169  "Number of columns specified does not match the "
3170  "number of columns given (" +
3171  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
3172  }
3173 
3174  *import_buffers = import_export::setup_column_loaders(td, loader->get());
3175  return std::move(td_with_lock);
3176 }
3177 namespace {
3178 
3179 size_t get_column_size(const TColumn& column) {
3180  if (!column.nulls.empty()) {
3181  return column.nulls.size();
3182  } else {
3183  // it is a very bold estimate but later we check it against REAL data
3184  // and if this function returns a wrong result (e.g. both int and string
3185  // vectors are filled with values), we get an error
3186  return column.data.int_col.size() + column.data.arr_col.size() +
3187  column.data.real_col.size() + column.data.str_col.size();
3188  }
3189 }
3190 
3191 } // namespace
3192 
3193 void DBHandler::load_table_binary_columnar(const TSessionId& session,
3194  const std::string& table_name,
3195  const std::vector<TColumn>& cols,
3196  const std::vector<std::string>& column_names) {
3198  session, table_name, cols, column_names, AssignRenderGroupsMode::kNone);
3199 }
3200 
3202  const TSessionId& session,
3203  const std::string& table_name,
3204  const std::vector<TColumn>& cols,
3205  const std::vector<std::string>& column_names,
3206  const bool assign_render_groups) {
3208  table_name,
3209  cols,
3210  column_names,
3211  assign_render_groups
3214 }
3215 
3217  const TSessionId& session,
3218  const std::string& table_name,
3219  const std::vector<TColumn>& cols,
3220  const std::vector<std::string>& column_names,
3221  const AssignRenderGroupsMode assign_render_groups_mode) {
3222  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3223  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3224  auto session_ptr = stdlog.getConstSessionInfo();
3225 
3226  if (assign_render_groups_mode == AssignRenderGroupsMode::kCleanUp) {
3227  // throw if the user tries to pass column data on a clean-up
3228  if (cols.size()) {
3230  "load_table_binary_columnar_polys: Column data must be empty when called with "
3231  "assign_render_groups = false");
3232  }
3233 
3234  // mutex the map access
3235  std::lock_guard<std::mutex> lock(render_group_assignment_mutex_);
3236 
3237  // drop persistent render group assignment data for this session and table
3238  // keep the per-session map in case other tables are active (ideally not)
3239  auto itr_session = render_group_assignment_map_.find(session);
3240  if (itr_session != render_group_assignment_map_.end()) {
3241  LOG(INFO) << "load_table_binary_columnar_polys: Cleaning up Render Group "
3242  "Assignment Persistent Data for Session '"
3243  << session << "', Table '" << table_name << "'";
3244  itr_session->second.erase(table_name);
3245  }
3246 
3247  // just doing clean-up, so we're done
3248  return;
3249  }
3250 
3251  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
3254  std::unique_ptr<import_export::Loader> loader;
3255  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3256  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3257  table_name,
3258  cols.size(),
3259  &loader,
3260  &import_buffers,
3261  column_names,
3262  "load_table_binary_columnar");
3263 
3264  auto desc_id_to_column_id =
3265  column_ids_by_names(loader->get_column_descs(), column_names);
3266  size_t num_rows = get_column_size(cols.front());
3267  size_t import_idx = 0; // index into the TColumn vector being loaded
3268  size_t col_idx = 0; // index into column description vector
3269  try {
3270  size_t skip_physical_cols = 0;
3271  for (auto cd : loader->get_column_descs()) {
3272  if (skip_physical_cols > 0) {
3273  CHECK(cd->isGeoPhyCol);
3274  skip_physical_cols--;
3275  continue;
3276  }
3277  auto mapped_idx = desc_id_to_column_id[import_idx];
3278  if (mapped_idx != -1) {
3279  size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3280  if (col_rows != num_rows) {
3281  std::ostringstream oss;
3282  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
3283  << cd->columnName << " , expecting " << num_rows << " rows, column "
3284  << col_idx << " has " << col_rows << " rows";
3285  THROW_MAPD_EXCEPTION(oss.str());
3286  }
3287  // Advance to the next column in the table
3288  col_idx++;
3289  // For geometry columns: process WKT strings and fill physical columns
3290  if (cd->columnType.is_geometry()) {
3291  fillGeoColumns(session,
3292  session_ptr->getCatalog(),
3293  import_buffers,
3294  cd,
3295  col_idx,
3296  num_rows,
3297  table_name,
3298  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3299  skip_physical_cols = cd->columnType.get_physical_cols();
3300  }
3301  } else {
3302  col_idx++;
3303  if (cd->columnType.is_geometry()) {
3304  skip_physical_cols = cd->columnType.get_physical_cols();
3305  col_idx += skip_physical_cols;
3306  }
3307  }
3308  // Advance to the next column of values being loaded
3309  import_idx++;
3310  }
3311  } catch (const std::exception& e) {
3312  std::ostringstream oss;
3313  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
3314  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3315  THROW_MAPD_EXCEPTION(oss.str());
3316  }
3317  fillMissingBuffers(session,
3318  session_ptr->getCatalog(),
3319  import_buffers,
3320  loader->get_column_descs(),
3321  desc_id_to_column_id,
3322  num_rows,
3323  table_name,
3324  assign_render_groups_mode == AssignRenderGroupsMode::kAssign);
3325  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3326  session_ptr->getCatalog(), table_name);
3327  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3328  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3329  }
3330 }
3331 
3332 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
3333 
3334 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3335  do { \
3336  ::arrow::Status _s = (s); \
3337  if (UNLIKELY(!_s.ok())) { \
3338  TOmniSciException ex; \
3339  ex.error_msg = _s.ToString(); \
3340  LOG(ERROR) << s.ToString(); \
3341  throw ex; \
3342  } \
3343  } while (0)
3344 
3345 namespace {
3346 
3347 RecordBatchVector loadArrowStream(const std::string& stream) {
3348  RecordBatchVector batches;
3349  try {
3350  // TODO(wesm): Make this simpler in general, see ARROW-1600
3351  auto stream_buffer =
3352  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
3353  static_cast<int64_t>(stream.size()));
3354 
3355  arrow::io::BufferReader buf_reader(stream_buffer);
3356  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3357  ARROW_ASSIGN_OR_THROW(batch_reader,
3358  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3359 
3360  while (true) {
3361  std::shared_ptr<arrow::RecordBatch> batch;
3362  // Read batch (zero-copy) from the stream
3363  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
3364  if (batch == nullptr) {
3365  break;
3366  }
3367  batches.emplace_back(std::move(batch));
3368  }
3369  } catch (const std::exception& e) {
3370  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
3371  }
3372  return batches;
3373 }
3374 
3375 } // namespace
3376 
3377 void DBHandler::load_table_binary_arrow(const TSessionId& session,
3378  const std::string& table_name,
3379  const std::string& arrow_stream,
3380  const bool use_column_names) {
3381  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3382  auto session_ptr = stdlog.getConstSessionInfo();
3383 
3384  RecordBatchVector batches = loadArrowStream(arrow_stream);
3385  // Assuming have one batch for now
3386  if (batches.size() != 1) {
3387  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
3388  }
3389 
3390  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3391  std::unique_ptr<import_export::Loader> loader;
3392  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3393  std::vector<std::string> column_names;
3394  if (use_column_names) {
3395  column_names = batch->schema()->field_names();
3396  }
3397  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
3400  auto schema_read_lock =
3401  prepare_loader_generic(*session_ptr,
3402  table_name,
3403  static_cast<size_t>(batch->num_columns()),
3404  &loader,
3405  &import_buffers,
3406  column_names,
3407  "load_table_binary_arrow");
3408 
3409  auto desc_id_to_column_id =
3410  column_ids_by_names(loader->get_column_descs(), column_names);
3411  size_t num_rows = 0;
3412  size_t col_idx = 0;
3413  try {
3414  for (auto cd : loader->get_column_descs()) {
3415  auto mapped_idx = desc_id_to_column_id[col_idx];
3416  if (mapped_idx != -1) {
3417  auto& array = *batch->column(mapped_idx);
3418  import_export::ArraySliceRange row_slice(0, array.length());
3419  num_rows = import_buffers[col_idx]->add_arrow_values(
3420  cd, array, true, row_slice, nullptr);
3421  }
3422  col_idx++;
3423  }
3424  } catch (const std::exception& e) {
3425  LOG(ERROR) << "Input exception thrown: " << e.what()
3426  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3427  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
3428  // other import paths
3429  THROW_MAPD_EXCEPTION(e.what());
3430  }
3431  fillMissingBuffers(session,
3432  session_ptr->getCatalog(),
3433  import_buffers,
3434  loader->get_column_descs(),
3435  desc_id_to_column_id,
3436  num_rows,
3437  table_name,
3438  false);
3439  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3440  session_ptr->getCatalog(), table_name);
3441  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3442  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3443  }
3444 }
3445 
3446 void DBHandler::load_table(const TSessionId& session,
3447  const std::string& table_name,
3448  const std::vector<TStringRow>& rows,
3449  const std::vector<std::string>& column_names) {
3450  try {
3451  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3452  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3453  auto session_ptr = stdlog.getConstSessionInfo();
3454 
3455  if (rows.empty()) {
3456  THROW_MAPD_EXCEPTION("No rows to insert");
3457  }
3458 
3459  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
3462  std::unique_ptr<import_export::Loader> loader;
3463  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3464  auto schema_read_lock =
3465  prepare_loader_generic(*session_ptr,
3466  table_name,
3467  static_cast<size_t>(rows.front().cols.size()),
3468  &loader,
3469  &import_buffers,
3470  column_names,
3471  "load_table");
3472 
3473  auto col_descs = loader->get_column_descs();
3474  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3475  import_export::CopyParams copy_params;
3476  size_t rows_completed = 0;
3477  for (auto const& row : rows) {
3478  size_t import_idx = 0; // index into the TStringRow being loaded
3479  size_t col_idx = 0; // index into column description vector
3480  try {
3481  size_t skip_physical_cols = 0;
3482  for (auto cd : col_descs) {
3483  if (skip_physical_cols > 0) {
3484  CHECK(cd->isGeoPhyCol);
3485  skip_physical_cols--;
3486  continue;
3487  }
3488  auto mapped_idx = desc_id_to_column_id[import_idx];
3489  if (mapped_idx != -1) {
3490  import_buffers[col_idx]->add_value(cd,
3491  row.cols[mapped_idx].str_val,
3492  row.cols[mapped_idx].is_null,
3493  copy_params);
3494  }
3495  col_idx++;
3496  if (cd->columnType.is_geometry()) {
3497  // physical geo columns will be filled separately lately
3498  skip_physical_cols = cd->columnType.get_physical_cols();
3499  col_idx += skip_physical_cols;
3500  }
3501  // Advance to the next field within the row
3502  import_idx++;
3503  }
3504  rows_completed++;
3505  } catch (const std::exception& e) {
3506  LOG(ERROR) << "Input exception thrown: " << e.what()
3507  << ". Row discarded, issue at column : " << (col_idx + 1)
3508  << " data :" << row;
3509  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3510  }
3511  }
3512  // do batch filling of geo columns separately
3513  if (rows.size() != 0) {
3514  const auto& row = rows[0];
3515  size_t col_idx = 0; // index into column description vector
3516  try {
3517  size_t import_idx = 0;
3518  size_t skip_physical_cols = 0;
3519  for (auto cd : col_descs) {
3520  if (skip_physical_cols > 0) {
3521  skip_physical_cols--;
3522  continue;
3523  }
3524  auto mapped_idx = desc_id_to_column_id[import_idx];
3525  col_idx++;
3526  if (cd->columnType.is_geometry()) {
3527  skip_physical_cols = cd->columnType.get_physical_cols();
3528  if (mapped_idx != -1) {
3529  fillGeoColumns(session,
3530  session_ptr->getCatalog(),
3531  import_buffers,
3532  cd,
3533  col_idx,
3534  rows_completed,
3535  table_name,
3536  false);
3537  } else {
3538  col_idx += skip_physical_cols;
3539  }
3540  }
3541  import_idx++;
3542  }
3543  } catch (const std::exception& e) {
3544  LOG(ERROR) << "Input exception thrown: " << e.what()
3545  << ". Row discarded, issue at column : " << (col_idx + 1)
3546  << " data :" << row;
3547  THROW_MAPD_EXCEPTION(e.what());
3548  }
3549  }
3550  fillMissingBuffers(session,
3551  session_ptr->getCatalog(),
3552  import_buffers,
3553  col_descs,
3554  desc_id_to_column_id,
3555  rows_completed,
3556  table_name,
3557  false);
3558  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3559  session_ptr->getCatalog(), table_name);
3560  if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3561  THROW_MAPD_EXCEPTION(loader->getErrorMessage());
3562  }
3563 
3564  } catch (const std::exception& e) {
3565  THROW_MAPD_EXCEPTION(std::string(e.what()));
3566  }
3567 }
3568 
3569 char DBHandler::unescape_char(std::string str) {
3570  char out = str[0];
3571  if (str.size() == 2 && str[0] == '\\') {
3572  if (str[1] == 't') {
3573  out = '\t';
3574  } else if (str[1] == 'n') {
3575  out = '\n';
3576  } else if (str[1] == '0') {
3577  out = '\0';
3578  } else if (str[1] == '\'') {
3579  out = '\'';
3580  } else if (str[1] == '\\') {
3581  out = '\\';
3582  }
3583  }
3584  return out;
3585 }
3586 
3588  import_export::CopyParams copy_params;
3589  switch (cp.has_header) {
3590  case TImportHeaderRow::AUTODETECT:
3592  break;
3593  case TImportHeaderRow::NO_HEADER:
3595  break;
3596  case TImportHeaderRow::HAS_HEADER:
3598  break;
3599  default:
3600  CHECK(false);
3601  }
3602  copy_params.quoted = cp.quoted;
3603  if (cp.delimiter.length() > 0) {
3604  copy_params.delimiter = unescape_char(cp.delimiter);
3605  } else {
3606  copy_params.delimiter = '\0';
3607  }
3608  if (cp.null_str.length() > 0) {
3609  copy_params.null_str = cp.null_str;
3610  }
3611  if (cp.quote.length() > 0) {
3612  copy_params.quote = unescape_char(cp.quote);
3613  }
3614  if (cp.escape.length() > 0) {
3615  copy_params.escape = unescape_char(cp.escape);
3616  }
3617  if (cp.line_delim.length() > 0) {
3618  copy_params.line_delim = unescape_char(cp.line_delim);
3619  }
3620  if (cp.array_delim.length() > 0) {
3621  copy_params.array_delim = unescape_char(cp.array_delim);
3622  }
3623  if (cp.array_begin.length() > 0) {
3624  copy_params.array_begin = unescape_char(cp.array_begin);
3625  }
3626  if (cp.array_end.length() > 0) {
3627  copy_params.array_end = unescape_char(cp.array_end);
3628  }
3629  if (cp.threads != 0) {
3630  copy_params.threads = cp.threads;
3631  }
3632  if (cp.s3_access_key.length() > 0) {
3633  copy_params.s3_access_key = cp.s3_access_key;
3634  }
3635  if (cp.s3_secret_key.length() > 0) {
3636  copy_params.s3_secret_key = cp.s3_secret_key;
3637  }
3638  if (cp.s3_session_token.length() > 0) {
3639  copy_params.s3_session_token = cp.s3_session_token;
3640  }
3641  if (cp.s3_region.length() > 0) {
3642  copy_params.s3_region = cp.s3_region;
3643  }
3644  if (cp.s3_endpoint.length() > 0) {
3645  copy_params.s3_endpoint = cp.s3_endpoint;
3646  }
3647 #ifdef HAVE_AWS_S3
3648  if (g_allow_s3_server_privileges && cp.s3_access_key.length() == 0 &&
3649  cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3650  const auto& server_credentials =
3651  Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3652  copy_params.s3_access_key = server_credentials.GetAWSAccessKeyId();
3653  copy_params.s3_secret_key = server_credentials.GetAWSSecretKey();
3654  copy_params.s3_session_token = server_credentials.GetSessionToken();
3655  }
3656 #endif
3657  if (cp.use_source_type) {
3658  switch (cp.source_type) {
3659  case TSourceType::DELIMITED:
3661  break;
3662  case TSourceType::GEO:
3664  break;
3665  case TSourceType::PARQUET:
3666 #ifdef ENABLE_IMPORT_PARQUET
3668  break;
3669 #else
3670  THROW_MAPD_EXCEPTION("Parquet not supported");
3671 #endif
3672  case TSourceType::ODBC:
3673  THROW_MAPD_EXCEPTION("ODBC source not supported");
3674  case TSourceType::RASTER:
3676  break;
3677  default:
3678  CHECK(false);
3679  }
3680  } else {
3681  switch (cp.file_type) {
3682  case TFileType::DELIMITED:
3684  break;
3685  case TFileType::GEO:
3687  break;
3688  case TFileType::PARQUET:
3689 #ifdef ENABLE_IMPORT_PARQUET
3691  break;
3692 #else
3693  THROW_MAPD_EXCEPTION("Parquet not supported");
3694 #endif
3695  case TFileType::RASTER:
3697  break;
3698  default:
3699  CHECK(false);
3700  }
3701  }
3702 
3703  switch (cp.geo_coords_encoding) {
3704  case TEncodingType::GEOINT:
3705  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3706  break;
3707  case TEncodingType::NONE:
3708  copy_params.geo_coords_encoding = kENCODING_NONE;
3709  break;
3710  default:
3711  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3712  std::to_string((int)cp.geo_coords_encoding));
3713  }
3714  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3715  switch (cp.geo_coords_type) {
3716  case TDatumType::GEOGRAPHY:
3717  copy_params.geo_coords_type = kGEOGRAPHY;
3718  break;
3719  case TDatumType::GEOMETRY:
3720  copy_params.geo_coords_type = kGEOMETRY;
3721  break;
3722  default:
3723  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3724  std::to_string((int)cp.geo_coords_type));
3725  }
3726  switch (cp.geo_coords_srid) {
3727  case 4326:
3728  case 3857:
3729  case 900913:
3730  copy_params.geo_coords_srid = cp.geo_coords_srid;
3731  break;
3732  default:
3733  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3734  std::to_string((int)cp.geo_coords_srid));
3735  }
3736  copy_params.sanitize_column_names = cp.sanitize_column_names;
3737  copy_params.geo_layer_name = cp.geo_layer_name;
3738  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3739  copy_params.geo_explode_collections = cp.geo_explode_collections;
3740  copy_params.source_srid = cp.source_srid;
3741  switch (cp.raster_point_type) {
3742  case TRasterPointType::NONE:
3744  break;
3745  case TRasterPointType::AUTO:
3747  break;
3750  break;
3751  case TRasterPointType::INT:
3753  break;
3756  break;
3759  break;
3762  break;
3763  default:
3764  CHECK(false);
3765  }
3766  copy_params.raster_import_bands = cp.raster_import_bands;
3767  if (cp.raster_scanlines_per_thread < 0) {
3768  THROW_MAPD_EXCEPTION("Invalid raster_scanlines_per_thread in TCopyParams (" +
3769  std::to_string((int)cp.raster_scanlines_per_thread));
3770  } else {
3771  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3772  }
3773  switch (cp.raster_point_transform) {
3774  case TRasterPointTransform::NONE:
3776  break;
3777  case TRasterPointTransform::AUTO:
3779  break;
3780  case TRasterPointTransform::FILE:
3782  break;
3783  case TRasterPointTransform::WORLD:
3785  break;
3786  default:
3787  CHECK(false);
3788  }
3789  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3790  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3791  copy_params.add_metadata_columns = cp.add_metadata_columns;
3792  return copy_params;
3793 }
3794 
3796  TCopyParams copy_params;
3797  copy_params.delimiter = cp.delimiter;
3798  copy_params.null_str = cp.null_str;
3799  switch (cp.has_header) {
3801  copy_params.has_header = TImportHeaderRow::AUTODETECT;
3802  break;
3804  copy_params.has_header = TImportHeaderRow::NO_HEADER;
3805  break;
3807  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
3808  break;
3809  default:
3810  CHECK(false);
3811  }
3812  copy_params.quoted = cp.quoted;
3813  copy_params.quote = cp.quote;
3814  copy_params.escape = cp.escape;
3815  copy_params.line_delim = cp.line_delim;
3816  copy_params.array_delim = cp.array_delim;
3817  copy_params.array_begin = cp.array_begin;
3818  copy_params.array_end = cp.array_end;
3819  copy_params.threads = cp.threads;
3820  copy_params.s3_access_key = cp.s3_access_key;
3821  copy_params.s3_secret_key = cp.s3_secret_key;
3822  copy_params.s3_session_token = cp.s3_session_token;
3823  copy_params.s3_region = cp.s3_region;
3824  copy_params.s3_endpoint = cp.s3_endpoint;
3825  switch (cp.source_type) {
3827  copy_params.file_type = TFileType::DELIMITED;
3828  copy_params.source_type = TSourceType::DELIMITED;
3829  break;
3831  copy_params.file_type = TFileType::GEO;
3832  copy_params.source_type = TSourceType::GEO;
3833  break;
3835  copy_params.file_type = TFileType::PARQUET;
3836  copy_params.source_type = TSourceType::PARQUET;
3837  break;
3839  copy_params.file_type = TFileType::RASTER;
3840  copy_params.source_type = TSourceType::RASTER;
3841  break;
3843  copy_params.source_type = TSourceType::ODBC;
3844  break;
3845  default:
3846  CHECK(false);
3847  }
3848  switch (cp.geo_coords_encoding) {
3849  case kENCODING_GEOINT:
3850  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
3851  break;
3852  default:
3853  copy_params.geo_coords_encoding = TEncodingType::NONE;
3854  break;
3855  }
3856  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3857  switch (cp.geo_coords_type) {
3858  case kGEOGRAPHY:
3859  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
3860  break;
3861  case kGEOMETRY:
3862  copy_params.geo_coords_type = TDatumType::GEOMETRY;
3863  break;
3864  default:
3865  CHECK(false);
3866  }
3867  copy_params.geo_coords_srid = cp.geo_coords_srid;
3868  copy_params.sanitize_column_names = cp.sanitize_column_names;
3869  copy_params.geo_layer_name = cp.geo_layer_name;
3870  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
3871  copy_params.geo_explode_collections = cp.geo_explode_collections;
3872  copy_params.source_srid = cp.source_srid;
3873  switch (cp.raster_point_type) {
3875  copy_params.raster_point_type = TRasterPointType::NONE;
3876  break;
3878  copy_params.raster_point_type = TRasterPointType::AUTO;
3879  break;
3881  copy_params.raster_point_type = TRasterPointType::SMALLINT;
3882  break;
3884  copy_params.raster_point_type = TRasterPointType::INT;
3885  break;
3887  copy_params.raster_point_type = TRasterPointType::FLOAT;
3888  break;
3890  copy_params.raster_point_type = TRasterPointType::DOUBLE;
3891  break;
3893  copy_params.raster_point_type = TRasterPointType::POINT;
3894  break;
3895  default:
3896  CHECK(false);
3897  }
3898  copy_params.raster_import_bands = cp.raster_import_bands;
3899  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
3900  switch (cp.raster_point_transform) {
3902  copy_params.raster_point_transform = TRasterPointTransform::NONE;
3903  break;
3905  copy_params.raster_point_transform = TRasterPointTransform::AUTO;
3906  break;
3908  copy_params.raster_point_transform = TRasterPointTransform::FILE;
3909  break;
3911  copy_params.raster_point_transform = TRasterPointTransform::WORLD;
3912  break;
3913  default:
3914  CHECK(false);
3915  }
3916  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
3917  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
3918  copy_params.odbc_dsn = cp.odbc_dsn;
3919  copy_params.odbc_connection_string = cp.odbc_connection_string;
3920  copy_params.odbc_sql_select = cp.odbc_sql_select;
3921  copy_params.odbc_username = cp.odbc_username;
3922  copy_params.odbc_password = cp.odbc_password;
3923  copy_params.odbc_credential_string = cp.odbc_credential_string;
3924  copy_params.add_metadata_columns = cp.add_metadata_columns;
3925  return copy_params;
3926 }
3927 
3928 namespace {
3929 void add_vsi_network_prefix(std::string& path) {
3930  // do we support network file access?
3931  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
3932 
3933  // modify head of filename based on source location
3934  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
3935  if (!gdal_network) {
3937  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
3938  }
3939  // invoke GDAL CURL virtual file reader
3940  path = "/vsicurl/" + path;
3941  } else if (boost::istarts_with(path, "s3://")) {
3942  if (!gdal_network) {
3944  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
3945  }
3946  // invoke GDAL S3 virtual file reader
3947  boost::replace_first(path, "s3://", "/vsis3/");
3948  }
3949 }
3950 
3951 void add_vsi_geo_prefix(std::string& path) {
3952  // single gzip'd file (not an archive)?
3953  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
3954  path = "/vsigzip/" + path;
3955  }
3956 }
3957 
3958 void add_vsi_archive_prefix(std::string& path) {
3959  // check for compressed file or file bundle
3960  if (boost::iends_with(path, ".zip")) {
3961  // zip archive
3962  path = "/vsizip/" + path;
3963  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3964  boost::iends_with(path, ".tar.gz")) {
3965  // tar archive (compressed or uncompressed)
3966  path = "/vsitar/" + path;
3967  }
3968 }
3969 
3970 std::string remove_vsi_prefixes(const std::string& path_in) {
3971  std::string path(path_in);
3972 
3973  // these will be first
3974  if (boost::istarts_with(path, "/vsizip/")) {
3975  boost::replace_first(path, "/vsizip/", "");
3976  } else if (boost::istarts_with(path, "/vsitar/")) {
3977  boost::replace_first(path, "/vsitar/", "");
3978  } else if (boost::istarts_with(path, "/vsigzip/")) {
3979  boost::replace_first(path, "/vsigzip/", "");
3980  }
3981 
3982  // then these
3983  if (boost::istarts_with(path, "/vsicurl/")) {
3984  boost::replace_first(path, "/vsicurl/", "");
3985  } else if (boost::istarts_with(path, "/vsis3/")) {
3986  boost::replace_first(path, "/vsis3/", "s3://");
3987  }
3988 
3989  return path;
3990 }
3991 
3992 bool path_is_relative(const std::string& path) {
3993  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
3994  boost::istarts_with(path, "https://")) {
3995  return false;
3996  }
3997  return !boost::filesystem::path(path).is_absolute();
3998 }
3999 
4000 bool path_has_valid_filename(const std::string& path) {
4001  auto filename = boost::filesystem::path(path).filename().string();
4002  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
4003  return false;
4004  }
4005  return true;
4006 }
4007 
4008 bool is_a_supported_geo_file(const std::string& path) {
4009  if (!path_has_valid_filename(path)) {
4010  return false;
4011  }
4012  // this is now just for files that we want to recognize
4013  // as geo when inside an archive (see below)
4014  // @TODO(se) make this more flexible?
4015  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
4016  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
4017  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
4018  boost::iends_with(path, ".gdb.zip") || boost::iends_with(path, ".fgb")) {
4019  return true;
4020  }
4021  return false;
4022 }
4023 
4024 bool is_a_supported_archive_file(const std::string& path) {
4025  if (!path_has_valid_filename(path)) {
4026  return false;
4027  }
4028  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
4029  return true;
4030  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
4031  boost::iends_with(path, ".tar.gz")) {
4032  return true;
4033  }
4034  return false;
4035 }
4036 
4037 std::string find_first_geo_file_in_archive(const std::string& archive_path,
4038  const import_export::CopyParams& copy_params) {
4039  // get the recursive list of all files in the archive
4040  std::vector<std::string> files =
4041  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
4042 
4043  // report the list
4044  LOG(INFO) << "Found " << files.size() << " files in Archive "
4045  << remove_vsi_prefixes(archive_path);
4046  for (const auto& file : files) {
4047  LOG(INFO) << " " << file;
4048  }
4049 
4050  // scan the list for the first candidate file
4051  bool found_suitable_file = false;
4052  std::string file_name;
4053  for (const auto& file : files) {
4054  if (is_a_supported_geo_file(file)) {
4055  file_name = file;
4056  found_suitable_file = true;
4057  break;
4058  }
4059  }
4060 
4061  // if we didn't find anything
4062  if (!found_suitable_file) {
4063  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
4064  remove_vsi_prefixes(archive_path);
4065  file_name.clear();
4066  }
4067 
4068  // done
4069  return file_name;
4070 }
4071 
4072 bool is_local_file(const std::string& file_path) {
4073  return (!boost::istarts_with(file_path, "s3://") &&
4074  !boost::istarts_with(file_path, "http://") &&
4075  !boost::istarts_with(file_path, "https://"));
4076 }
4077 
4078 void validate_import_file_path_if_local(const std::string& file_path) {
4079  if (is_local_file(file_path)) {
4081  }
4082 }
4083 } // namespace
4084 
4085 void DBHandler::detect_column_types(TDetectResult& _return,
4086  const TSessionId& session,
4087  const std::string& file_name_in,
4088  const TCopyParams& cp) {
4089  auto stdlog = STDLOG(get_session_ptr(session));
4090  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4091  check_read_only("detect_column_types");
4092 
4093  bool is_raster = false;
4094  boost::filesystem::path file_path;
4096  if (copy_params.source_type != import_export::SourceType::kOdbc) {
4097  std::string file_name{file_name_in};
4098  if (path_is_relative(file_name)) {
4099  // assume relative paths are relative to data_path / mapd_import / <session>
4100  auto temp_file_path = import_path_ / picosha2::hash256_hex_string(session) /
4101  boost::filesystem::path(file_name).filename();
4102  file_name = temp_file_path.string();
4103  }
4105 
4106  // if it's a geo or raster import, handle alternative paths (S3, HTTP, archive etc.)
4107  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
4108  if (is_a_supported_archive_file(file_name)) {
4109  // find the archive file
4110  add_vsi_network_prefix(file_name);
4111  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4112  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4113  }
4114  // find geo file in archive
4115  add_vsi_archive_prefix(file_name);
4116  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4117  // prepare to detect that geo file
4118  if (geo_file.size()) {
4119  file_name = file_name + std::string("/") + geo_file;
4120  }
4121  } else {
4122  // prepare to detect geo file directly
4123  add_vsi_network_prefix(file_name);
4124  add_vsi_geo_prefix(file_name);
4125  }
4126  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
4127  // prepare to detect raster file directly
4128  add_vsi_network_prefix(file_name);
4129  add_vsi_geo_prefix(file_name);
4130  is_raster = true;
4131  }
4132 
4133  file_path = boost::filesystem::path(file_name);
4134  // can be a s3 url
4135  if (!boost::istarts_with(file_name, "s3://")) {
4136  if (!boost::filesystem::path(file_name).is_absolute()) {
4137  file_path = import_path_ / picosha2::hash256_hex_string(session) /
4138  boost::filesystem::path(file_name).filename();
4139  file_name = file_path.string();
4140  }
4141 
4142  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4144  // check for geo or raster file
4145  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4146  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
4147  }
4148  } else {
4149  // check for regular file
4150  if (!boost::filesystem::exists(file_path)) {
4151  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
4152  }
4153  }
4154  }
4155  }
4156 
4157  try {
4159 #ifdef ENABLE_IMPORT_PARQUET
4161 #endif
4162  ) {
4163  import_export::Detector detector(file_path, copy_params);
4164  auto best_types = detector.getBestColumnTypes();
4165  std::vector<std::string> headers = detector.get_headers();
4166  copy_params = detector.get_copy_params();
4167 
4168  _return.copy_params = copyparams_to_thrift(copy_params);
4169  _return.row_set.row_desc.resize(best_types.size());
4170  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
4171  TColumnType col;
4172  auto& ti = best_types[col_idx];
4173  col.col_type.precision = ti.get_precision();
4174  col.col_type.scale = ti.get_scale();
4175  if (ti.is_geometry()) {
4176  // set this so encoding_to_thrift does the right thing
4177  ti.set_compression(copy_params.geo_coords_encoding);
4178  // fill in these directly
4179  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
4180  col.col_type.scale = copy_params.geo_coords_srid;
4181  col.col_type.comp_param = copy_params.geo_coords_comp_param;
4182  }
4183  col.col_type.type = type_to_thrift(ti);
4184  col.col_type.encoding = encoding_to_thrift(ti);
4185  if (copy_params.sanitize_column_names) {
4186  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
4187  } else {
4188  col.col_name = headers[col_idx];
4189  }
4190  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
4191  _return.row_set.row_desc[col_idx] = col;
4192  }
4193  auto sample_data =
4195 
4196  TRow sample_row;
4197  for (auto row : sample_data) {
4198  sample_row.cols.clear();
4199  for (const auto& s : row) {
4200  TDatum td;
4201  td.val.str_val = s;
4202  td.is_null = s.empty();
4203  sample_row.cols.push_back(td);
4204  }
4205  _return.row_set.rows.push_back(sample_row);
4206  }
4207  } else if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4209  // @TODO simon.eves get this from somewhere!
4210  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
4211 
4212  check_geospatial_files(file_path, copy_params);
4213  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
4214  file_path.string(), is_raster, geoColumnName, copy_params);
4215  for (auto cd : cds) {
4216  if (copy_params.sanitize_column_names) {
4217  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
4218  }
4219  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
4220  }
4221  if (!is_raster) {
4222  // @TODO(se) support for raster?
4223  std::map<std::string, std::vector<std::string>> sample_data;
4225  file_path.string(),
4226  geoColumnName,
4227  sample_data,
4229  copy_params);
4230  if (sample_data.size() > 0) {
4231  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4232  TRow sample_row;
4233  for (auto cd : cds) {
4234  TDatum td;
4235  td.val.str_val = sample_data[cd.sourceName].at(i);
4236  td.is_null = td.val.str_val.empty();
4237  sample_row.cols.push_back(td);
4238  }
4239  _return.row_set.rows.push_back(sample_row);
4240  }
4241  }
4242  }
4243  _return.copy_params = copyparams_to_thrift(copy_params);
4244  }
4245  } catch (const std::exception& e) {
4246  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
4247  }
4248 }
4249 
4250 void DBHandler::render_vega(TRenderResult& _return,
4251  const TSessionId& session,
4252  const int64_t widget_id,
4253  const std::string& vega_json,
4254  const int compression_level,
4255  const std::string& nonce) {
4256  auto stdlog = STDLOG(get_session_ptr(session),
4257  "widget_id",
4258  widget_id,
4259  "compression_level",
4260  compression_level,
4261  "vega_json",
4262  vega_json,
4263  "nonce",
4264  nonce);
4265  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4266  stdlog.appendNameValuePairs("nonce", nonce);
4267  auto session_ptr = stdlog.getConstSessionInfo();
4268  if (!render_handler_) {
4269  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
4270  }
4271 
4272  // cast away const-ness of incoming Thrift string ref
4273  // to allow it to be passed down as an r-value and
4274  // ultimately std::moved into the RenderSession
4275  auto& non_const_vega_json = const_cast<std::string&>(vega_json);
4276 
4277  _return.total_time_ms = measure<>::execution([&]() {
4278  try {
4279  render_handler_->render_vega(_return,
4280  stdlog.getSessionInfo(),
4281  widget_id,
4282  std::move(non_const_vega_json),
4283  compression_level,
4284  nonce);
4285  } catch (std::exception& e) {
4286  THROW_MAPD_EXCEPTION(e.what());
4287  }
4288  });
4289 }
4290 
4292  int32_t dashboard_id,
4293  AccessPrivileges requestedPermissions) {
4294  DBObject object(dashboard_id, DashboardDBObjectType);
4295  auto& catalog = session_info.getCatalog();
4296  auto& user = session_info.get_currentUser();
4297  object.loadKey(catalog);
4298  object.setPrivileges(requestedPermissions);
4299  std::vector<DBObject> privs = {object};
4300  return SysCatalog::instance().checkPrivileges(user, privs);
4301 }
4302 
4303 // custom expressions
4304 namespace {
4307 
4308 std::unique_ptr<Catalog_Namespace::CustomExpression> create_custom_expr_from_thrift_obj(
4309  const TCustomExpression& t_custom_expr,
4310  const Catalog& catalog) {
4311  if (t_custom_expr.data_source_name.empty()) {
4312  THROW_MAPD_EXCEPTION("Custom expression data source name cannot be empty.")
4313  }
4314  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4315  << "Unexpected data source type: "
4316  << static_cast<int>(t_custom_expr.data_source_type);
4317  auto td = catalog.getMetadataForTable(t_custom_expr.data_source_name, false);
4318  if (!td) {
4319  THROW_MAPD_EXCEPTION("Custom expression references a table \"" +
4320  t_custom_expr.data_source_name + "\" that does not exist.")
4321  }
4322  DataSourceType data_source_type = DataSourceType::TABLE;
4323  return std::make_unique<CustomExpression>(
4324  t_custom_expr.name, t_custom_expr.expression_json, data_source_type, td->tableId);
4325 }
4326 
4327 TCustomExpression create_thrift_obj_from_custom_expr(const CustomExpression& custom_expr,
4328  const Catalog& catalog) {
4329  TCustomExpression t_custom_expr;
4330  t_custom_expr.id = custom_expr.id;
4331  t_custom_expr.name = custom_expr.name;
4332  t_custom_expr.expression_json = custom_expr.expression_json;
4333  t_custom_expr.data_source_id = custom_expr.data_source_id;
4334  t_custom_expr.is_deleted = custom_expr.is_deleted;
4335  CHECK(custom_expr.data_source_type == DataSourceType::TABLE)
4336  << "Unexpected data source type: "
4337  << static_cast<int>(custom_expr.data_source_type);
4338  t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4339  auto td = catalog.getMetadataForTable(custom_expr.data_source_id, false);
4340  if (td) {
4341  t_custom_expr.data_source_name = td->tableName;
4342  } else {
4343  LOG(WARNING)
4344  << "Custom expression references a deleted data source. Custom expression id: "
4345  << custom_expr.id << ", name: " << custom_expr.name;
4346  }
4347  return t_custom_expr;
4348 }
4349 } // namespace
4350 
4351 int32_t DBHandler::create_custom_expression(const TSessionId& session,
4352  const TCustomExpression& t_custom_expr) {
4353  auto stdlog = STDLOG(get_session_ptr(session));
4354  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4355  check_read_only("create_custom_expression");
4356 
4357  auto session_ptr = stdlog.getConstSessionInfo();
4358  if (!session_ptr->get_currentUser().isSuper) {
4359  THROW_MAPD_EXCEPTION("Custom expressions can only be created by super users.")
4360  }
4361  auto& catalog = session_ptr->getCatalog();
4362  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
4363  return catalog.createCustomExpression(
4364  create_custom_expr_from_thrift_obj(t_custom_expr, catalog));
4365 }
4366 
4367 void DBHandler::get_custom_expressions(std::vector<TCustomExpression>& _return,
4368  const TSessionId& session) {
4369  auto stdlog = STDLOG(get_session_ptr(session));
4370  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4371 
4372  auto session_ptr = stdlog.getConstSessionInfo();
4373  auto& catalog = session_ptr->getCatalog();
4374  mapd_shared_lock<mapd_shared_mutex> read_lock(custom_expressions_mutex_);
4375  auto custom_expressions =
4376  catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4377  for (const auto& custom_expression : custom_expressions) {
4378  _return.emplace_back(create_thrift_obj_from_custom_expr(*custom_expression, catalog));
4379  }
4380 }
4381 
4382 void DBHandler::update_custom_expression(const TSessionId& session,
4383  const int32_t id,
4384  const std::string& expression_json) {
4385  auto stdlog = STDLOG(get_session_ptr(session));
4386  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4387  check_read_only("update_custom_expression");
4388 
4389  auto session_ptr = stdlog.getConstSessionInfo();
4390  if (!session_ptr->get_currentUser().isSuper) {
4391  THROW_MAPD_EXCEPTION("Custom expressions can only be updated by super users.")
4392  }
4393  auto& catalog = session_ptr->getCatalog();
4394  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
4395  catalog.updateCustomExpression(id, expression_json);
4396 }
4397 
4399  const TSessionId& session,
4400  const std::vector<int32_t>& custom_expression_ids,
4401  const bool do_soft_delete) {
4402  auto stdlog = STDLOG(get_session_ptr(session));
4403  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4404  check_read_only("delete_custom_expressions");
4405 
4406  auto session_ptr = stdlog.getConstSessionInfo();
4407  if (!session_ptr->get_currentUser().isSuper) {
4408  THROW_MAPD_EXCEPTION("Custom expressions can only be deleted by super users.")
4409  }
4410  auto& catalog = session_ptr->getCatalog();
4411  mapd_unique_lock<mapd_shared_mutex> write_lock(custom_expressions_mutex_);
4412  catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4413 }
4414 
4415 // dashboards
4416 void DBHandler::get_dashboard(TDashboard& dashboard,
4417  const TSessionId& session,
4418  const int32_t dashboard_id) {
4419  auto stdlog = STDLOG(get_session_ptr(session));
4420  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4421  auto session_ptr = stdlog.getConstSessionInfo();
4422  auto const& cat = session_ptr->getCatalog();
4424  auto dash = cat.getMetadataForDashboard(dashboard_id);
4425  if (!dash) {
4426  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
4427  " doesn't exist");
4428  }
4430  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4431  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
4432  std::to_string(dashboard_id));
4433  }
4434  user_meta.userName = "";
4435  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4436  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
4437 }
4438 
4439 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
4440  const TSessionId& session) {
4441  auto stdlog = STDLOG(get_session_ptr(session));
4442  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4443  auto session_ptr = stdlog.getConstSessionInfo();
4444  auto const& cat = session_ptr->getCatalog();
4446  const auto dashes = cat.getAllDashboardsMetadata();
4447  user_meta.userName = "";
4448  for (const auto dash : dashes) {
4450  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4451  // dashboardState is intentionally not populated here
4452  // for payload reasons
4453  // use get_dashboard call to get state
4454  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
4455  }
4456  }
4457 }
4458 
4460  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4462  const DashboardDescriptor* dash,
4463  const bool populate_state) {
4464  auto const& cat = session_ptr->getCatalog();
4465  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4466  auto objects_list = SysCatalog::instance().getMetadataForObject(
4467  cat.getCurrentDB().dbId,
4468  static_cast<int>(DBObjectType::DashboardDBObjectType),
4469  dash->dashboardId);
4470  TDashboard dashboard;
4471  dashboard.dashboard_name = dash->dashboardName;
4472  if (populate_state) {
4473  dashboard.dashboard_state = dash->dashboardState;
4474  }
4475  dashboard.image_hash = dash->imageHash;
4476  dashboard.update_time = dash->updateTime;
4477  dashboard.dashboard_metadata = dash->dashboardMetadata;
4478  dashboard.dashboard_id = dash->dashboardId;
4479  dashboard.dashboard_owner = dash->user;
4480  TDashboardPermissions perms;
4481  // Super user has all permissions.
4482  if (session_ptr->get_currentUser().isSuper) {
4483  perms.create_ = true;
4484  perms.delete_ = true;
4485  perms.edit_ = true;
4486  perms.view_ = true;
4487  } else {
4488  // Collect all grants on current user
4489  // add them to the permissions.
4490  auto obj_to_find =
4491  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
4492  obj_to_find.loadKey(session_ptr->getCatalog());
4493  std::vector<std::string> grantees =
4494  SysCatalog::instance().getRoles(true,
4495  session_ptr->get_currentUser().isSuper,
4496  session_ptr->get_currentUser().userName);
4497  for (const auto& grantee : grantees) {
4498  DBObject* object_found;
4499  auto* gr = SysCatalog::instance().getGrantee(grantee);
4500  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
4501  const auto obj_privs = object_found->getPrivileges();
4502  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4503  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4504  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4505  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4506  }
4507  }
4508  }
4509  dashboard.dashboard_permissions = perms;
4510  if (objects_list.empty() ||
4511  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
4512  dashboard.is_dash_shared = false;
4513  } else {
4514  dashboard.is_dash_shared = true;
4515  }
4516  return dashboard;
4517 }
4518 
4519 namespace {
4520 bool is_info_schema_db(const std::string& db_name) {
4521  return (db_name == INFORMATION_SCHEMA_DB &&
4522  SysCatalog::instance().hasExecutedMigration(INFORMATION_SCHEMA_MIGRATION));
4523 }
4524 
4525 void check_not_info_schema_db(const std::string& db_name,
4526  bool throw_mapd_exception = false) {
4527  if (is_info_schema_db(db_name)) {
4528  std::string error_message{"Write requests/queries are not allowed in the " +
4529  INFORMATION_SCHEMA_DB + " database."};
4530  if (throw_mapd_exception) {
4531  THROW_MAPD_EXCEPTION(error_message)
4532  } else {
4533  throw std::runtime_error(error_message);
4534  }
4535  }
4536 }
4537 } // namespace
4538 
4539 int32_t DBHandler::create_dashboard(const TSessionId& session,
4540  const std::string& dashboard_name,
4541  const std::string& dashboard_state,
4542  const std::string& image_hash,
4543  const std::string& dashboard_metadata) {
4544  auto stdlog = STDLOG(get_session_ptr(session));
4545  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4546  auto session_ptr = stdlog.getConstSessionInfo();
4547  check_read_only("create_dashboard");
4548  auto& cat = session_ptr->getCatalog();
4550  check_not_info_schema_db(cat.name(), true);
4551  }
4552 
4553  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
4555  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
4556  }
4557 
4558  auto dash = cat.getMetadataForDashboard(
4559  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
4560  if (dash) {
4561  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4562  }
4563 
4565  dd.dashboardName = dashboard_name;
4566  dd.dashboardState = dashboard_state;
4567  dd.imageHash = image_hash;
4568  dd.dashboardMetadata = dashboard_metadata;
4569  dd.userId = session_ptr->get_currentUser().userId;
4570  dd.user = session_ptr->get_currentUser().userName;
4571 
4572  try {
4573  auto id = cat.createDashboard(dd);
4574  // TODO: transactionally unsafe
4575  SysCatalog::instance().createDBObject(
4576  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
4577  return id;
4578  } catch (const std::exception& e) {
4579  THROW_MAPD_EXCEPTION(e.what());
4580  }
4581 }
4582 
4583 void DBHandler::replace_dashboard(const TSessionId& session,
4584  const int32_t dashboard_id,
4585  const std::string& dashboard_name,
4586  const std::string& dashboard_owner,
4587  const std::string& dashboard_state,
4588  const std::string& image_hash,
4589  const std::string& dashboard_metadata) {
4590  auto stdlog = STDLOG(get_session_ptr(session));
4591  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4592  auto session_ptr = stdlog.getConstSessionInfo();
4593  CHECK(session_ptr);
4594  check_read_only("replace_dashboard");
4595  auto& cat = session_ptr->getCatalog();
4597  check_not_info_schema_db(cat.name(), true);
4598  }
4599 
4601  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
4602  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
4603  }
4604 
4606  dd.dashboardName = dashboard_name;
4607  dd.dashboardState = dashboard_state;
4608  dd.imageHash = image_hash;
4609  dd.dashboardMetadata = dashboard_metadata;
4611  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
4612  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
4613  " does not exist");
4614  }
4615  dd.userId = user.userId;
4616  dd.user = dashboard_owner;
4617  dd.dashboardId = dashboard_id;
4618 
4619  try {
4620  cat.replaceDashboard(dd);
4621  } catch (const std::exception& e) {
4622  THROW_MAPD_EXCEPTION(e.what());
4623  }
4624 }
4625 
4626 void DBHandler::delete_dashboard(const TSessionId& session, const int32_t dashboard_id) {
4627  delete_dashboards(session, {dashboard_id});
4628 }
4629 
4630 void DBHandler::delete_dashboards(const TSessionId& session,
4631  const std::vector<int32_t>& dashboard_ids) {
4632  auto stdlog = STDLOG(get_session_ptr(session));
4633  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4634  auto session_ptr = stdlog.getConstSessionInfo();
4635  check_read_only("delete_dashboards");
4636  auto& cat = session_ptr->getCatalog();
4638  check_not_info_schema_db(cat.name(), true);
4639  }
4640  // Checks will be performed in catalog
4641  try {
4642  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4643  } catch (const std::exception& e) {
4644  THROW_MAPD_EXCEPTION(e.what());
4645  }
4646 }
4647 
4648 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session,
4649  int32_t dashboard_id,
4650  std::vector<std::string> groups) {
4651  const auto session_info = get_session_copy(session);
4652  auto& cat = session_info.getCatalog();
4653  auto dash = cat.getMetadataForDashboard(dashboard_id);
4654  if (!dash) {
4655  THROW_MAPD_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4656  " does not exist");
4657  } else if (session_info.get_currentUser().userId != dash->userId &&
4658  !session_info.get_currentUser().isSuper) {
4659  throw std::runtime_error(
4660  "User should be either owner of dashboard or super user to share/unshare it");
4661  }
4662  std::vector<std::string> valid_groups;
4664  for (auto& group : groups) {
4665  user_meta.isSuper = false; // initialize default flag
4666  if (!SysCatalog::instance().getGrantee(group)) {
4667  THROW_MAPD_EXCEPTION("User/Role " + group + " does not exist");
4668  } else if (!user_meta.isSuper) {
4669  valid_groups.push_back(group);
4670  }
4671  }
4672  return valid_groups;
4673 }
4674 
4675 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
4676  for (auto const& group : groups) {
4677  if (!SysCatalog::instance().getGrantee(group)) {
4678  THROW_MAPD_EXCEPTION("User/Role '" + group + "' does not exist");
4679  }
4680  }
4681 }
4682 
4684  const Catalog_Namespace::SessionInfo& session_info,
4685  const std::vector<int32_t>& dashboard_ids) {
4686  auto& cat = session_info.getCatalog();
4687  std::map<std::string, std::list<int32_t>> errors;
4688  for (auto const& dashboard_id : dashboard_ids) {
4689  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
4690  if (!dashboard) {
4691  errors["Dashboard id does not exist"].push_back(dashboard_id);
4692  } else if (session_info.get_currentUser().userId != dashboard->userId &&
4693  !session_info.get_currentUser().isSuper) {
4694  errors["User should be either owner of dashboard or super user to share/unshare it"]
4695  .push_back(dashboard_id);
4696  }
4697  }
4698  if (!errors.empty()) {
4699  std::stringstream error_stream;
4700  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
4701  for (const auto& [error, id_list] : errors) {
4702  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
4703  }
4704  THROW_MAPD_EXCEPTION(error_stream.str());
4705  }
4706 }
4707 
4708 void DBHandler::shareOrUnshareDashboards(const TSessionId& session,
4709  const std::vector<int32_t>& dashboard_ids,
4710  const std::vector<std::string>& groups,
4711  const TDashboardPermissions& permissions,
4712  const bool do_share) {
4713  auto stdlog = STDLOG(get_session_ptr(session));
4714  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4715  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
4716  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
4717  !permissions.view_) {
4718  THROW_MAPD_EXCEPTION("At least one privilege should be assigned for " +
4719  std::string(do_share ? "grants" : "revokes"));
4720  }
4721  auto session_ptr = stdlog.getConstSessionInfo();
4722  auto const& catalog = session_ptr->getCatalog();
4723  auto& sys_catalog = SysCatalog::instance();
4724  validateGroups(groups);
4725  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
4726  std::vector<DBObject> batch_objects;
4727  for (auto const& dashboard_id : dashboard_ids) {
4728  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
4729  AccessPrivileges privs;
4730  if (permissions.delete_) {
4732  }
4733  if (permissions.create_) {
4735  }
4736  if (permissions.edit_) {
4738  }
4739  if (permissions.view_) {
4741  }
4742  object.setPrivileges(privs);
4743  batch_objects.push_back(object);
4744  }
4745  if (do_share) {
4746  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4747  } else {
4748  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
4749  }
4750 }
4751 
4752 void DBHandler::share_dashboards(const TSessionId& session,
4753  const std::vector<int32_t>& dashboard_ids,
4754  const std::vector<std::string>& groups,
4755  const TDashboardPermissions& permissions) {
4756  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, true);
4757 }
4758 
4759 // NOOP: Grants not available for objects as of now
4760 void DBHandler::share_dashboard(const TSessionId& session,
4761  const int32_t dashboard_id,
4762  const std::vector<std::string>& groups,
4763  const std::vector<std::string>& objects,
4764  const TDashboardPermissions& permissions,
4765  const bool grant_role = false) {
4766  share_dashboards(session, {dashboard_id}, groups, permissions);
4767 }
4768 
4769 void DBHandler::unshare_dashboards(const TSessionId& session,
4770  const std::vector<int32_t>& dashboard_ids,
4771  const std::vector<std::string>& groups,
4772  const TDashboardPermissions& permissions) {
4773  shareOrUnshareDashboards(session, dashboard_ids, groups, permissions, false);
4774 }
4775 
4776 void DBHandler::unshare_dashboard(const TSessionId& session,
4777  const int32_t dashboard_id,
4778  const std::vector<std::string>& groups,
4779  const std::vector<std::string>& objects,
4780  const TDashboardPermissions& permissions) {
4781  unshare_dashboards(session, {dashboard_id}, groups, permissions);
4782 }
4783 
4785  std::vector<TDashboardGrantees>& dashboard_grantees,
4786  const TSessionId& session,
4787  const int32_t dashboard_id) {
4788  auto stdlog = STDLOG(get_session_ptr(session));
4789  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4790  auto session_ptr = stdlog.getConstSessionInfo();
4791  auto const& cat = session_ptr->getCatalog();
4793  auto dash = cat.getMetadataForDashboard(dashboard_id);
4794  if (!dash) {
4795  THROW_MAPD_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4796  " does not exist");
4797  } else if (session_ptr->get_currentUser().userId != dash->userId &&
4798  !session_ptr->get_currentUser().isSuper) {
4800  "User should be either owner of dashboard or super user to access grantees");
4801  }
4802  std::vector<ObjectRoleDescriptor*> objectsList;
4803  objectsList = SysCatalog::instance().getMetadataForObject(
4804  cat.getCurrentDB().dbId,
4805  static_cast<int>(DBObjectType::DashboardDBObjectType),
4806  dashboard_id); // By default objecttypecan be only dashabaords
4807  user_meta.userId = -1;
4808  user_meta.userName = "";
4809  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4810  for (auto object : objectsList) {
4811  if (user_meta.userName == object->roleName) {
4812  // Mask owner
4813  continue;
4814  }
4815  TDashboardGrantees grantee;
4816  TDashboardPermissions perm;
4817  grantee.name = object->roleName;
4818  grantee.is_user = object->roleType;
4819  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4820  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4821  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4822  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4823  grantee.permissions = perm;
4824  dashboard_grantees.push_back(grantee);
4825  }
4826 }
4827 
4828 void DBHandler::create_link(std::string& _return,
4829  const TSessionId& session,
4830  const std::string& view_state,
4831  const std::string& view_metadata) {
4832  auto stdlog = STDLOG(get_session_ptr(session));
4833  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4834  auto session_ptr = stdlog.getConstSessionInfo();
4835  // check_read_only("create_link");
4836  auto& cat = session_ptr->getCatalog();
4837 
4838  LinkDescriptor ld;
4839  ld.userId = session_ptr->get_currentUser().userId;
4840  ld.viewState = view_state;
4841  ld.viewMetadata = view_metadata;
4842 
4843  try {
4844  _return = cat.createLink(ld, 6);
4845  } catch (const std::exception& e) {
4846  THROW_MAPD_EXCEPTION(e.what());
4847  }
4848 }
4849 
4851  const std::string& name,
4852  const bool is_array) {
4853  TColumnType ct;
4854  ct.col_name = name;
4855  ct.col_type.type = type;
4856  ct.col_type.is_array = is_array;
4857  return ct;
4858 }
4859 
4860 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
4861  const import_export::CopyParams& copy_params) {
4862  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
4863  if (std::find(shp_ext.begin(),
4864  shp_ext.end(),
4865  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
4866  shp_ext.end()) {
4867  for (auto ext : shp_ext) {
4868  auto aux_file = file_path;
4870  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
4871  copy_params) &&
4873  aux_file.replace_extension(ext).string(), copy_params)) {
4874  throw std::runtime_error("required file for shapefile does not exist: " +
4875  aux_file.filename().string());
4876  }
4877  }
4878  }
4879 }
4880 
4881 void DBHandler::create_table(const TSessionId& session,
4882  const std::string& table_name,
4883  const TRowDescriptor& rd,
4884  const TFileType::type file_type,
4885  const TCreateParams& create_params) {
4886  // @TODO(se) remove file_type which is unused
4887  auto stdlog = STDLOG("table_name", table_name);
4888  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4889  check_read_only("create_table");
4890 
4891  if (ImportHelpers::is_reserved_name(table_name)) {
4892  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
4893  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
4894  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
4895  }
4896 
4897  auto rds = rd;
4898 
4899  std::string stmt{"CREATE TABLE " + table_name};
4900  std::vector<std::string> col_stmts;
4901 
4902  for (auto col : rds) {
4903  if (ImportHelpers::is_reserved_name(col.col_name)) {
4904  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
4905  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
4906  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
4907  }
4908  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
4909  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
4910  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
4911  " for column: " + col.col_name);
4912  }
4913 
4914  if (col.col_type.type == TDatumType::DECIMAL) {
4915  // if no precision or scale passed in set to default 14,7
4916  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
4917  col.col_type.precision = 14;
4918  col.col_type.scale = 7;
4919  }
4920  }
4921 
4922  std::string col_stmt;
4923  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
4924  if (col.__isset.default_value) {
4925  col_stmt.append(" DEFAULT " + col.default_value);
4926  }
4927 
4928  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
4929  // `nullable` argument, leading this to default to false. Uncomment for v2.
4930  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
4931 
4932  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
4933  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
4934  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
4935  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
4936  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
4937  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
4938  }
4939  } else if (col.col_type.type == TDatumType::STR) {
4940  // non DICT encoded strings
4941  col_stmt.append(" ENCODING NONE");
4942  } else if (col.col_type.type == TDatumType::POINT ||
4943  col.col_type.type == TDatumType::LINESTRING ||
4944  col.col_type.type == TDatumType::POLYGON ||
4945  col.col_type.type == TDatumType::MULTIPOLYGON) {
4946  // non encoded compressable geo
4947  if (col.col_type.scale == 4326) {
4948  col_stmt.append(" ENCODING NONE");
4949  }
4950  }
4951  col_stmts.push_back(col_stmt);
4952  }
4953 
4954  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
4955 
4956  if (create_params.is_replicated) {
4957  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
4958  }
4959 
4960  stmt.append(";");
4961 
4962  TQueryResult ret;
4963  sql_execute(ret, session, stmt, true, "", -1, -1);
4964 }
4965 
4966 void DBHandler::import_table(const TSessionId& session,
4967  const std::string& table_name,
4968  const std::string& file_name_in,
4969  const TCopyParams& cp) {
4970  try {
4971  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
4972  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4973  auto session_ptr = stdlog.getConstSessionInfo();
4974  check_read_only("import_table");
4975  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
4976 
4977  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
4980  auto& cat = session_ptr->getCatalog();
4982  auto start_time = ::toString(std::chrono::system_clock::now());
4984  executor->enrollQuerySession(session,
4985  "IMPORT_TABLE",
4986  start_time,
4988  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
4989  }
4990 
4991  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
4992  // reset the runtime query interrupt status
4994  executor->clearQuerySessionStatus(session, start_time);
4995  }
4996  };
4997  const auto td_with_lock =
4999  cat, table_name);
5000  const auto td = td_with_lock();
5001  CHECK(td);
5002  check_table_load_privileges(*session_ptr, table_name);
5003 
5004  std::string file_name{file_name_in};
5005  auto file_path = boost::filesystem::path(file_name);
5007  if (!boost::istarts_with(file_name, "s3://")) {
5008  if (!boost::filesystem::path(file_name).is_absolute()) {
5009  file_path = import_path_ / picosha2::hash256_hex_string(session) /
5010  boost::filesystem::path(file_name).filename();
5011  file_name = file_path.string();
5012  }
5013  if (!boost::filesystem::exists(file_path)) {
5014  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
5015  }
5016  }
5018 
5019  // TODO(andrew): add delimiter detection to Importer
5020  if (copy_params.delimiter == '\0') {
5021  copy_params.delimiter = ',';
5022  if (boost::filesystem::extension(file_path) == ".tsv") {
5023  copy_params.delimiter = '\t';
5024  }
5025  }
5026 
5027  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
5028  session_ptr->getCatalog(), table_name);
5029  std::unique_ptr<import_export::Importer> importer;
5030  if (leaf_aggregator_.leafCount() > 0) {
5031  importer.reset(new import_export::Importer(
5032  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
5033  file_path.string(),
5034  copy_params));
5035  } else {
5036  importer.reset(
5037  new import_export::Importer(cat, td, file_path.string(), copy_params));
5038  }
5039  auto ms = measure<>::execution([&]() { importer->import(session_ptr.get()); });
5040  LOG(INFO) << "Total Import Time: " << (double)ms / 1000.0 << " Seconds.";
5041  } catch (const std::exception& e) {
5042  THROW_MAPD_EXCEPTION(std::string(e.what()));
5043  }
5044 }
5045 
5046 namespace {
5047 
5048 // helper functions for error checking below
5049 // these would usefully be added as methods of TDatumType
5050 // but that's not possible as it's auto-generated by Thrift
5051 
5053  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
5055 }
5056 
5057 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
5058 
5059 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
5060  std::stringstream ss;
5061  ss << t;
5062  return ss.str();
5063 }
5064 
5065 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
5066  std::string result;
5067  switch (p) {
5068  case SQLTypes::kGEOGRAPHY:
5069  result = "GEOGRAPHY";
5070  break;
5071  case SQLTypes::kGEOMETRY:
5072  result = "GEOMETRY";
5073  break;
5074  default:
5075  result = "INVALID";
5076  break;
5077  }
5078  return result;
5079 }
5080 
5081 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
5082  std::stringstream ss;
5083  ss << t;
5084  return ss.str();
5085 }
5086 
5087 #endif
5088 
5089 } // namespace
5090 
5091 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
5092  THROW_MAPD_EXCEPTION("Could not append geo/raster file '" + \
5093  file_path.filename().string() + "' to table '" + table_name + \
5094  "'. Column '" + cd->columnName + "' " + attr + \
5095  " mismatch (got '" + got + "', expected '" + expected + "')");
5096 
5097 void DBHandler::import_geo_table(const TSessionId& session,
5098  const std::string& table_name,
5099  const std::string& file_name,
5100  const TCopyParams& cp,
5101  const TRowDescriptor& row_desc,
5102  const TCreateParams& create_params) {
5103  // this is the direct Thrift endpoint
5104  // it does NOT support the separate FSI regex/filter/sort options
5105  // but it DOES support basic globbing specified in the filename itself
5107  session, table_name, file_name, thrift_to_copyparams(cp), row_desc, create_params);
5108 }
5109 
5110 void DBHandler::importGeoTableGlobFilterSort(const TSessionId& session,
5111  const std::string& table_name,
5112  const std::string& file_name,
5113  const import_export::CopyParams& copy_params,
5114  const TRowDescriptor& row_desc,
5115  const TCreateParams& create_params) {
5116  // this is called by the above direct Thrift endpoint
5117  // and also for a deferred COPY FROM for geo/raster
5118  // it DOES support the full FSI regex/filter/sort options
5119  std::vector<std::string> file_names;
5120  try {
5122  copy_params.file_sort_regex);
5123  file_names = shared::local_glob_filter_sort_files(file_name,
5124  copy_params.regex_path_filter,
5125  copy_params.file_sort_order_by,
5126  copy_params.file_sort_regex,
5127  false);
5128  } catch (const shared::FileNotFoundException& e) {
5129  // no files match, just try the original filename, might be remote
5130  file_names.push_back(file_name);
5131  }
5132  // import whatever we found
5133  for (auto const& file_name : file_names) {
5135  session, table_name, file_name, copy_params, row_desc, create_params);
5136  }
5137 }
5138 
5139 void DBHandler::importGeoTableSingle(const TSessionId& session,
5140  const std::string& table_name,
5141  const std::string& file_name_in,
5142  const import_export::CopyParams& copy_params,
5143  const TRowDescriptor& row_desc,
5144  const TCreateParams& create_params) {
5145  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
5146  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5147  auto session_ptr = stdlog.getConstSessionInfo();
5148  check_read_only("import_table");
5149 
5150  auto& cat = session_ptr->getCatalog();
5152  auto start_time = ::toString(std::chrono::system_clock::now());
5154  executor->enrollQuerySession(session,
5155  "IMPORT_GEO_TABLE",
5156  start_time,
5158  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5159  }
5160 
5161  ScopeGuard clearInterruptStatus = [executor, &session, &start_time] {
5162  // reset the runtime query interrupt status
5164  executor->clearQuerySessionStatus(session, start_time);
5165  }
5166  };
5167 
5168  std::string file_name{file_name_in};
5169 
5170  if (path_is_relative(file_name)) {
5171  // assume relative paths are relative to data_path / mapd_import / <session>
5172  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
5173  boost::filesystem::path(file_name).filename();
5174  file_name = file_path.string();
5175  }
5177 
5178  bool is_raster = false;
5179  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
5180  if (is_a_supported_archive_file(file_name)) {
5181  // find the archive file
5182  add_vsi_network_prefix(file_name);
5183  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
5184  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
5185  }
5186  // find geo file in archive
5187  add_vsi_archive_prefix(file_name);
5188  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
5189  // prepare to load that geo file
5190  if (geo_file.size()) {
5191  file_name = file_name + std::string("/") + geo_file;
5192  }
5193  } else {
5194  // prepare to load geo file directly
5195  add_vsi_network_prefix(file_name);
5196  add_vsi_geo_prefix(file_name);
5197  }
5198  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
5199  // prepare to load geo raster file directly
5200  add_vsi_network_prefix(file_name);
5201  add_vsi_geo_prefix(file_name);
5202  is_raster = true;
5203  } else {
5205  "import_geo_table called with file_type other than GEO or RASTER");
5206  }
5207 
5208  // log what we're about to try to do
5209  VLOG(1) << "import_geo_table: Original filename: " << file_name_in;
5210  VLOG(1) << "import_geo_table: Actual filename: " << file_name;
5211  VLOG(1) << "import_geo_table: Raster: " << is_raster;
5212 
5213  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
5214  auto file_path = boost::filesystem::path(file_name);
5215  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
5216  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
5217  }
5218 
5219  // use GDAL to check any dependent files exist (ditto)
5220  try {
5221  check_geospatial_files(file_path, copy_params);
5222  } catch (const std::exception& e) {
5223  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5224  }
5225 
5226  // get layer info and deconstruct
5227  // in general, we will get a combination of layers of these four types:
5228  // EMPTY: no rows, report and skip
5229  // GEO: create a geo table from this
5230  // NON_GEO: create a regular table from this
5231  // UNSUPPORTED_GEO: report and skip
5232  std::vector<import_export::Importer::GeoFileLayerInfo> layer_info;
5233  if (!is_raster) {
5234  try {
5235  layer_info =
5236  import_export::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
5237  } catch (const std::exception& e) {
5238  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
5239  }
5240  }
5241 
5242  // categorize the results
5243  using LayerNameToContentsMap =
5244  std::map<std::string, import_export::Importer::GeoFileLayerContents>;
5245  LayerNameToContentsMap load_layers;
5246  LOG_IF(INFO, layer_info.size() > 0)
5247  << "import_geo_table: Found the following layers in the geo file:";
5248  for (const auto& layer : layer_info) {
5249  switch (layer.contents) {
5251  LOG(INFO) << "import_geo_table: '" << layer.name
5252  << "' (will import as geo table)";
5253  load_layers[layer.name] = layer.contents;
5254  break;
5256  LOG(INFO) << "import_geo_table: '" << layer.name
5257  << "' (will import as regular table)";
5258  load_layers[layer.name] = layer.contents;
5259  break;
5261  LOG(WARNING) << "import_geo_table: '" << layer.name
5262  << "' (will not import, unsupported geo type)";
5263  break;
5265  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
5266  break;
5267  default:
5268  break;
5269  }
5270  }
5271 
5272  // if nothing is loadable, stop now
5273  if (!is_raster && load_layers.size() == 0) {
5274  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
5275  }
5276 
5277  // if we've been given an explicit layer name, check that it exists and is loadable
5278  // scan the original list, as it may exist but not have been gathered as loadable
5279  if (!is_raster && copy_params.geo_layer_name.size()) {
5280  bool found = false;
5281  for (const auto& layer : layer_info) {
5282  if (copy_params.geo_layer_name == layer.name) {
5285  // forget all the other layers and just load this one
5286  load_layers.clear();
5287  load_layers[layer.name] = layer.contents;
5288  found = true;
5289  break;
5290  } else if (layer.contents ==
5292  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
5293  copy_params.geo_layer_name +
5294  "' has unsupported geo type!");
5295  } else if (layer.contents ==
5297  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
5298  copy_params.geo_layer_name + "' is empty!");
5299  }
5300  }
5301  }
5302  if (!found) {
5303  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
5304  copy_params.geo_layer_name + "' not found!");
5305  }
5306  }
5307 
5308  // Immerse import of multiple layers is not yet supported
5309  // @TODO fix this!
5310  if (!is_raster && row_desc.size() > 0 && load_layers.size() > 1) {
5312  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
5313  }
5314 
5315  // one definition of layer table name construction
5316  // we append the layer name if we're loading more than one table
5317  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
5318  const std::string& layer_name) {
5319  if (load_layers.size() > 1) {
5320  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
5321  if (sanitized_layer_name != layer_name) {
5322  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
5323  << sanitized_layer_name << "' for table name";
5324  }
5325  return table_name + "_" + sanitized_layer_name;
5326  }
5327  return table_name;
5328  };
5329 
5330  // if we're importing multiple tables, then NONE of them must exist already
5331  if (!is_raster && load_layers.size() > 1) {
5332  for (const auto& layer : load_layers) {
5333  // construct table name
5334  auto this_table_name = construct_layer_table_name(table_name, layer.first);
5335 
5336  // table must not exist
5337  if (cat.getMetadataForTable(this_table_name)) {
5338  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
5339  "' already exists, aborting!");
5340  }
5341  }
5342  }
5343 
5344  // prepare to gather errors that would otherwise be exceptions, as we can only throw
5345  // one
5346  std::vector<std::string> caught_exception_messages;
5347 
5348  // prepare to time multi-layer import
5349  double total_import_ms = 0.0;
5350 
5351  // for geo raster, we make a single dummy layer
5352  // the name is irrelevant, but set it to the filename so the log makes sense
5353  if (is_raster) {
5354  CHECK_EQ(load_layers.size(), 0u);
5355  load_layers.emplace(file_name, import_export::Importer::GeoFileLayerContents::GEO);
5356  }
5357 
5358  // now we're safe to start importing
5359  // we loop over the layers we're going to attempt to load
5360  for (const auto& layer : load_layers) {
5361  // unpack
5362  const auto& layer_name = layer.first;
5363  const auto& layer_contents = layer.second;
5364  bool is_geo_layer =
5366 
5367  // construct table name again
5368  auto this_table_name = construct_layer_table_name(table_name, layer_name);
5369 
5370  // report
5371  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
5372 
5373  // we need a row descriptor
5374  TRowDescriptor rd;
5375  if (row_desc.size() > 0) {
5376  // we have a valid RowDescriptor
5377  // this is the case where Immerse has already detected and created
5378  // all we need to do is import and trust that the data will match
5379  // use the provided row descriptor
5380  // table must already exist (we check this below)
5381  rd = row_desc;
5382  } else {
5383  // we don't have a RowDescriptor
5384  // we have to detect the file ourselves
5385  TDetectResult cds;
5386  TCopyParams cp_copy = copyparams_to_thrift(copy_params);
5387  cp_copy.geo_layer_name = layer_name;
5388  try {
5389  detect_column_types(cds, session, file_name_in, cp_copy);
5390  } catch (const std::exception& e) {
5391  // capture the error and abort this layer
5392  caught_exception_messages.emplace_back("Column Type Detection failed for '" +
5393  layer_name + "':" + e.what());
5394  continue;
5395  }
5396  rd = cds.row_set.row_desc;
5397 
5398  // then, if the table does NOT already exist, create it
5399  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
5400  if (!td) {
5401  try {
5402  create_table(session, this_table_name, rd, cp_copy.file_type, create_params);
5403  } catch (const std::exception& e) {
5404  // capture the error and abort this layer
5405  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
5406  layer_name + "':" + e.what());
5407  continue;
5408  }
5409  }
5410  }
5411 
5412  // match locking sequence for CopyTableStmt::execute
5413  mapd_shared_lock<mapd_shared_mutex> execute_read_lock(
5416 
5417  const TableDescriptor* td{nullptr};
5418  std::unique_ptr<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>> td_with_lock;
5419  std::unique_ptr<lockmgr::WriteLock> insert_data_lock;
5420 
5421  try {
5422  td_with_lock =
5423  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>>(
5425  lockmgr::WriteLock>::acquireTableDescriptor(cat, this_table_name));
5426  td = (*td_with_lock)();
5427  insert_data_lock = std::make_unique<lockmgr::WriteLock>(
5429  } catch (const std::runtime_error& e) {
5430  // capture the error and abort this layer
5431  std::string exception_message = "Could not import geo/raster file '" +
5432  file_path.filename().string() + "' to table '" +
5433  this_table_name +
5434  "'; table does not exist or failed to create.";
5435  caught_exception_messages.emplace_back(exception_message);
5436  continue;
5437  }
5438  CHECK(td);
5439 
5440  // then, we have to verify that the structure matches
5441  // get column descriptors (non-system, non-deleted, logical columns only)
5442  const auto col_descriptors =
5443  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
5444 
5445  // first, compare the column count
5446  if (col_descriptors.size() != rd.size()) {
5447  // capture the error and abort this layer
5448  std::string exception_message = "Could not append geo/raster file '" +
5449  file_path.filename().string() + "' to table '" +
5450  this_table_name + "'. Column count mismatch (got " +
5451  std::to_string(rd.size()) + ", expecting " +
5452  std::to_string(col_descriptors.size()) + ")";
5453  caught_exception_messages.emplace_back(exception_message);
5454  continue;
5455  }
5456 
5457  try {
5458  // then the names and types
5459  int rd_index = 0;
5460  for (auto cd : col_descriptors) {
5461  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
5462  std::string gname = rd[rd_index].col_name; // got
5463  std::string ename = cd->columnName; // expecting
5464 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
5465  TTypeInfo gti = rd[rd_index].col_type; // got
5466 #endif
5467  TTypeInfo eti = cd_col_type.col_type; // expecting
5468  // check for name match
5469  if (gname != ename) {
5470  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
5471  gname == OMNISCI_GEO_PREFIX) {
5472  // rename incoming geo column to match existing legacy default geo column
5473  rd[rd_index].col_name = gname;
5474  LOG(INFO)
5475  << "import_geo_table: Renaming incoming geo column to match existing "
5476  "legacy default geo column";
5477  } else {
5478  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
5479  }
5480  }
5481 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
5482  // check for type attributes match
5483  // these attrs must always match regardless of type
5484  if (gti.type != eti.type) {
5486  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
5487  }
5488  if (gti.is_array != eti.is_array) {
5490  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
5491  }
5492  if (gti.nullable != eti.nullable) {
5494  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
5495  }
5496  if (TTypeInfo_IsGeo(eti.type)) {
5497  // for geo, only these other attrs must also match
5498  // encoding and comp_param are allowed to differ
5499  // this allows appending to existing geo table
5500  // without needing to know the existing encoding
5501  if (gti.precision != eti.precision) {
5503  "geo sub-type",
5504  TTypeInfo_GeoSubTypeToString(gti.precision),
5505  TTypeInfo_GeoSubTypeToString(eti.precision));
5506  }
5507  if (gti.scale != eti.scale) {
5509  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
5510  }
5511  if (gti.encoding != eti.encoding) {
5512  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
5513  }
5514  if (gti.comp_param != eti.comp_param) {
5515  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
5516  }
5517  } else {
5518  // non-geo, all other attrs must also match
5519  // @TODO consider relaxing some of these dependent on type
5520  // e.g. DECIMAL precision/scale,