OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "DBHandler.h"
24 #include "DistributedLoader.h"
25 #include "TokenCompletionHints.h"
26 
27 #ifdef HAVE_PROFILER
28 #include <gperftools/heap-profiler.h>
29 #endif // HAVE_PROFILER
30 
31 #include "MapDRelease.h"
32 
33 #include "Calcite/Calcite.h"
34 #include "gen-cpp/CalciteServer.h"
35 
38 
39 #include "Catalog/Catalog.h"
44 #include "DistributedHandler.h"
46 #include "Geospatial/ColumnNames.h"
47 #include "Geospatial/Compression.h"
48 #include "Geospatial/GDAL.h"
49 #include "Geospatial/Types.h"
50 #include "ImportExport/Importer.h"
51 #include "LockMgr/LockMgr.h"
53 #include "Parser/ParserWrapper.h"
57 #include "QueryEngine/Execute.h"
67 #include "RequestInfo.h"
68 #ifdef HAVE_RUNTIME_LIBS
70 #endif
71 #include "Shared/ArrowUtil.h"
72 #include "Shared/DateTimeParser.h"
73 #include "Shared/StringTransform.h"
74 #include "Shared/SysDefinitions.h"
75 #include "Shared/file_path_util.h"
77 #include "Shared/import_helpers.h"
78 #include "Shared/measure.h"
79 #include "Shared/misc.h"
80 #include "Shared/scope.h"
82 
83 #ifdef HAVE_AWS_S3
84 #include <aws/core/auth/AWSCredentialsProviderChain.h>
85 #endif
86 #include <fcntl.h>
87 #include <picosha2.h>
88 #include <sys/types.h>
89 #include <algorithm>
90 #include <boost/algorithm/string.hpp>
91 #include <boost/filesystem.hpp>
92 #include <boost/make_shared.hpp>
93 #include <boost/process/search_path.hpp>
94 #include <boost/program_options.hpp>
95 #include <boost/tokenizer.hpp>
96 #include <chrono>
97 #include <cmath>
98 #include <csignal>
99 #include <fstream>
100 #include <future>
101 #include <map>
102 #include <memory>
103 #include <random>
104 #include <string>
105 #include <thread>
106 #include <typeinfo>
107 
108 #include <arrow/api.h>
109 #include <arrow/io/api.h>
110 #include <arrow/ipc/api.h>
111 
112 #include "Shared/ArrowUtil.h"
113 #include "Shared/distributed.h"
114 
115 #ifdef ENABLE_IMPORT_PARQUET
116 extern bool g_enable_parquet_import_fsi;
117 #endif
118 
119 #ifdef HAVE_AWS_S3
120 extern bool g_allow_s3_server_privileges;
121 #endif
122 
123 extern bool g_enable_system_tables;
126 extern bool g_allow_memory_status_log;
127 
130 
131 #define INVALID_SESSION_ID ""
132 
133 #define SET_REQUEST_ID(parent_request_id) \
134  if (g_uniform_request_ids_per_thrift_call && parent_request_id) \
135  logger::set_request_id(parent_request_id); \
136  else if (logger::set_new_request_id(); parent_request_id) \
137  LOG(INFO) << "This request has parent request_id(" << parent_request_id << ')'
138 
139 #define THROW_DB_EXCEPTION(errstr) \
140  { \
141  TDBException ex; \
142  ex.error_msg = errstr; \
143  LOG(ERROR) << ex.error_msg; \
144  throw ex; \
145  }
146 
147 thread_local std::string TrackingProcessor::client_address;
149 
150 namespace {
151 
153  const int32_t user_id,
154  const std::string& dashboard_name) {
155  return (cat.getMetadataForDashboard(std::to_string(user_id), dashboard_name));
156 }
157 
158 struct ForceDisconnect : public std::runtime_error {
159  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
160 };
161 
162 } // namespace
163 
164 #ifdef ENABLE_GEOS
165 // from Geospatial/GeosValidation.cpp
166 extern std::unique_ptr<std::string> g_libgeos_so_filename;
167 #endif
168 
169 DBHandler::DBHandler(const std::vector<LeafHostInfo>& db_leaves,
170  const std::vector<LeafHostInfo>& string_leaves,
171  const std::string& base_data_path,
172  const bool allow_multifrag,
173  const bool jit_debug,
174  const bool intel_jit_profile,
175  const bool read_only,
176  const bool allow_loop_joins,
177  const bool enable_rendering,
178  const bool renderer_prefer_igpu,
179  const unsigned renderer_vulkan_timeout_ms,
180  const bool renderer_use_parallel_executors,
181  const bool enable_auto_clear_render_mem,
182  const int render_oom_retry_threshold,
183  const size_t render_mem_bytes,
184  const size_t max_concurrent_render_sessions,
185  const size_t reserved_gpu_mem,
186  const bool render_compositor_use_last_gpu,
187  const bool renderer_enable_slab_allocation,
188  const size_t num_reader_threads,
189  const AuthMetadata& authMetadata,
190  SystemParameters& system_parameters,
191  const bool legacy_syntax,
192  const int idle_session_duration,
193  const int max_session_duration,
194  const std::string& udf_filename,
195  const std::string& clang_path,
196  const std::vector<std::string>& clang_options,
197 #ifdef ENABLE_GEOS
198  const std::string& libgeos_so_filename,
199 #endif
200 #ifdef HAVE_TORCH_TFS
201  const std::string& torch_lib_path,
202 #endif
203  const File_Namespace::DiskCacheConfig& disk_cache_config,
204  const bool is_new_db)
205  : leaf_aggregator_(db_leaves)
206  , db_leaves_(db_leaves)
207  , string_leaves_(string_leaves)
208  , base_data_path_(base_data_path)
209  , random_gen_(std::random_device{}())
210  , session_id_dist_(0, INT32_MAX)
211  , jit_debug_(jit_debug)
212  , intel_jit_profile_(intel_jit_profile)
213  , allow_multifrag_(allow_multifrag)
214  , read_only_(read_only)
215  , allow_loop_joins_(allow_loop_joins)
216  , authMetadata_(authMetadata)
217  , system_parameters_(system_parameters)
218  , legacy_syntax_(legacy_syntax)
219  , dispatch_queue_(
220  std::make_unique<QueryDispatchQueue>(system_parameters.num_executors))
221  , super_user_rights_(false)
222  , idle_session_duration_(idle_session_duration * 60)
223  , max_session_duration_(max_session_duration * 60)
224  , enable_rendering_(enable_rendering)
225  , renderer_prefer_igpu_(renderer_prefer_igpu)
226  , renderer_vulkan_timeout_(renderer_vulkan_timeout_ms)
227  , renderer_use_parallel_executors_(renderer_use_parallel_executors)
228  , enable_auto_clear_render_mem_(enable_auto_clear_render_mem)
229  , render_oom_retry_threshold_(render_oom_retry_threshold)
230  , render_mem_bytes_(render_mem_bytes)
231  , max_concurrent_render_sessions_(max_concurrent_render_sessions)
232  , reserved_gpu_mem_(reserved_gpu_mem)
233  , render_compositor_use_last_gpu_(render_compositor_use_last_gpu)
234  , renderer_enable_slab_allocation_{renderer_enable_slab_allocation}
235  , num_reader_threads_(num_reader_threads)
236 #ifdef ENABLE_GEOS
237  , libgeos_so_filename_(libgeos_so_filename)
238 #endif
239 #ifdef HAVE_TORCH_TFS
240  , torch_lib_path_(torch_lib_path)
241 #endif
242  , disk_cache_config_(disk_cache_config)
243  , udf_filename_(udf_filename)
244  , clang_path_(clang_path)
245  , clang_options_(clang_options)
246  , max_num_sessions_(-1) {
247  LOG(INFO) << "HeavyDB Server " << MAPD_RELEASE;
248  initialize(is_new_db);
249  resetSessionsStore();
250 }
251 
253  size_t num_cpu_slots{0};
254  size_t num_gpu_slots{0};
255  size_t cpu_result_mem{0};
256  size_t cpu_buffer_pool_mem{0};
257  size_t gpu_buffer_pool_mem{0};
258  LOG(INFO) << "Initializing Executor Resource Manager";
259 
260  if (g_cpu_threads_override != 0) {
261  LOG(INFO) << "\tSetting Executor resource pool avaiable CPU threads/slots to "
262  "user-specified value of "
263  << g_cpu_threads_override << ".";
264  num_cpu_slots = g_cpu_threads_override;
265  } else {
266  LOG(INFO) << "\tSetting Executor resource pool avaiable CPU threads/slots to default "
267  "value of "
268  << cpu_threads() << ".";
269  // Setting the number of CPU slots to cpu_threads() will cause the ExecutorResourceMgr
270  // to set the logical number of available cpu slots to mirror the number of threads in
271  // the tbb thread pool and used elsewhere in the system, but we may want to consider a
272  // capability to allow the executor resource pool number of threads to be set
273  // independently as some fraction of the what cpu_threads() will return, to give some
274  // breathing room for all the other processes in the system that use CPU threadds
275  num_cpu_slots = cpu_threads();
276  }
277  LOG(INFO) << "\tSetting max per-query CPU threads to ratio of "
279  << num_cpu_slots << " available threads, or "
281  num_cpu_slots)
282  << " threads.";
283 
284  // system_parameters_.num_gpus will be -1 if there are no GPUs enabled so we need to
285  // guard against this
286  num_gpu_slots = system_parameters_.num_gpus < 0 ? static_cast<size_t>(0)
288 
289  cpu_buffer_pool_mem = data_mgr_->getCpuBufferPoolSize();
292  } else {
293  const size_t system_mem_bytes = DataMgr::getTotalSystemMemory();
294  CHECK_GT(system_mem_bytes, size_t(0));
295  const size_t remaining_cpu_mem_bytes = system_mem_bytes >= cpu_buffer_pool_mem
296  ? system_mem_bytes - cpu_buffer_pool_mem
297  : 0UL;
298  cpu_result_mem =
299  std::max(static_cast<size_t>(remaining_cpu_mem_bytes *
301  static_cast<size_t>(1UL << 32));
302  }
303  // Below gets total combined size of all gpu buffer pools
304  // Likely will move to per device pool resource management,
305  // but keeping simple for now
306  gpu_buffer_pool_mem = data_mgr_->getGpuBufferPoolSize();
307 
308  // When we move to using the BufferMgrs directly in
309  // ExecutorResourcePool, there won't be a need for
310  // the buffer_pool_max_occupancy variable - a
311  // safety "fudge" factor as what the resource pool sees
312  // and what the BufferMgrs see will be exactly the same.
313 
314  // However we need to ensure we can quickly access
315  // chunk state of BufferMgrs without going through coarse lock
316  // before we do this, so use this fudge ratio for now
317 
318  // Note that if we are not conservative enough with the below and
319  // overshoot, the error will still be caught and if on GPU, the query
320  // can be re-run on CPU
321 
322  constexpr double buffer_pool_max_occupancy{0.95};
323  const size_t conservative_cpu_buffer_pool_mem =
324  static_cast<size_t>(cpu_buffer_pool_mem * buffer_pool_max_occupancy);
325  const size_t conservative_gpu_buffer_pool_mem =
326  static_cast<size_t>(gpu_buffer_pool_mem * buffer_pool_max_occupancy);
327 
328  LOG(INFO)
329  << "\tSetting Executor resource pool reserved space for CPU buffer pool memory to "
330  << format_num_bytes(conservative_cpu_buffer_pool_mem) << ".";
331  if (gpu_buffer_pool_mem > 0UL) {
332  LOG(INFO) << "\tSetting Executor resource pool reserved space for GPU buffer pool "
333  "memory to "
334  << format_num_bytes(conservative_gpu_buffer_pool_mem) << ".";
335  }
336  LOG(INFO) << "\tSetting Executor resource pool reserved space for CPU result memory to "
337  << format_num_bytes(cpu_result_mem) << ".";
338 
340  num_cpu_slots,
341  num_gpu_slots,
342  cpu_result_mem,
343  conservative_cpu_buffer_pool_mem,
344  conservative_gpu_buffer_pool_mem,
352 }
353 
355 #ifndef _WIN32
356  size_t temp;
357  CHECK(!__builtin_mul_overflow(g_num_tuple_threshold_switch_to_baseline,
359  &temp))
360  << "The product of g_num_tuple_threshold_switch_to_baseline and "
361  "g_ratio_num_hash_entry_to_num_tuple_switch_to_baseline exceeds 64 bits.";
362 #endif
363 }
364 
366  if (sessions_store_) {
367  // Disconnect any existing sessions.
368  auto sessions = sessions_store_->getAllSessions();
369  for (auto session : sessions) {
370  sessions_store_->disconnect(session->get_session_id());
371  }
372  }
375  1,
379  [this](auto& session_ptr) { disconnect_impl(session_ptr); });
380 }
381 
382 void DBHandler::initialize(const bool is_new_db) {
383  if (!initialized_) {
384  initialized_ = true;
385  } else {
387  "Server already initialized; service restart required to activate any new "
388  "entitlements.");
389  return;
390  }
391 
394  cpu_mode_only_ = true;
395  } else {
396 #ifdef HAVE_CUDA
398  cpu_mode_only_ = false;
399 #else
401  LOG(WARNING) << "This build isn't CUDA enabled, will run on CPU";
402  cpu_mode_only_ = true;
403 #endif
404  }
405 
406  bool is_rendering_enabled = enable_rendering_;
407  if (system_parameters_.num_gpus == 0) {
408  is_rendering_enabled = false;
409  }
410 
411  const auto data_path =
412  boost::filesystem::path(base_data_path_) / shared::kDataDirectoryName;
413  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
414  // manage cannot ask for
415  size_t total_reserved = reserved_gpu_mem_;
416  if (is_rendering_enabled) {
417  total_reserved += render_mem_bytes_;
418  }
419 
420  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
421 #ifdef HAVE_CUDA
422  if (!cpu_mode_only_ || is_rendering_enabled) {
423  try {
424  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(
426  if (system_parameters_.num_gpus < 0) {
427  system_parameters_.num_gpus = cuda_mgr->getDeviceCount();
428  } else {
430  std::min(system_parameters_.num_gpus, cuda_mgr->getDeviceCount());
431  }
432  } catch (const std::exception& e) {
433  LOG(ERROR) << "Unable to instantiate CudaMgr, falling back to CPU-only mode. "
434  << e.what();
436  cpu_mode_only_ = true;
437  is_rendering_enabled = false;
438  }
439  }
440 #endif // HAVE_CUDA
441 
443 
444  try {
445  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
447  std::move(cuda_mgr),
449  total_reserved,
452  } catch (const std::exception& e) {
453  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
454  }
457  }
458 
459  std::string udf_ast_filename("");
460 
461  try {
462  if (!udf_filename_.empty()) {
463  const auto cuda_mgr = data_mgr_->getCudaMgr();
464  const CudaMgr_Namespace::NvidiaDeviceArch device_arch =
465  cuda_mgr ? cuda_mgr->getDeviceArch()
467  UdfCompiler compiler(device_arch, clang_path_, clang_options_);
468 
469  const auto [cpu_udf_ir_file, cuda_udf_ir_file] = compiler.compileUdf(udf_filename_);
470  Executor::addUdfIrToModule(cpu_udf_ir_file, /*is_cuda_ir=*/false);
471  if (!cuda_udf_ir_file.empty()) {
472  Executor::addUdfIrToModule(cuda_udf_ir_file, /*is_cuda_ir=*/true);
473  }
474  udf_ast_filename = compiler.getAstFileName(udf_filename_);
475  }
476  } catch (const std::exception& e) {
477  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
478  }
479 
480  try {
481  calcite_ =
482  std::make_shared<Calcite>(system_parameters_, base_data_path_, udf_ast_filename);
483  } catch (const std::exception& e) {
484  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
485  }
486 
487  try {
488  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
489  if (!udf_filename_.empty()) {
490  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
491  }
492  } catch (const std::exception& e) {
493  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
494  }
495 
496  try {
498  } catch (const std::exception& e) {
499  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
500  }
501 
502 #ifdef HAVE_RUNTIME_LIBS
503  try {
504 #ifdef HAVE_TORCH_TFS
505  RuntimeLibManager::loadRuntimeLibs(torch_lib_path_);
506 #else
508 #endif
509  } catch (const std::exception& e) {
510  LOG(ERROR) << "Failed to load runtime libraries: " << e.what();
511  LOG(ERROR) << "Support for runtime library table functions is disabled.";
512  }
513 #endif
514 
515  try {
516  auto udtfs = ThriftSerializers::to_thrift(
518  std::vector<TUserDefinedFunction> udfs = {};
519  calcite_->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
520  } catch (const std::exception& e) {
521  LOG(FATAL) << "Failed to register compile-time table functions: " << e.what();
522  }
523 
524  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
526  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
527  cpu_mode_only_ = true;
528  }
529 
530  LOG(INFO) << "Started in " << executor_device_type_ << " mode.";
531 
532  try {
535  data_mgr_,
537  calcite_,
538  is_new_db,
539  !db_leaves_.empty(),
541  } catch (const std::exception& e) {
542  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
543  }
544 
545  import_path_ = boost::filesystem::path(base_data_path_) / shared::kDefaultImportDirName;
546  start_time_ = std::time(nullptr);
547 
548  if (is_rendering_enabled) {
549  try {
550  render_handler_.reset(new RenderHandler(this,
554  false,
555  false,
561  } catch (const std::exception& e) {
562  LOG(ERROR) << "Backend rendering disabled: " << e.what();
563  }
564  }
565 
567 
568 #ifdef ENABLE_GEOS
569  if (!libgeos_so_filename_.empty()) {
570  g_libgeos_so_filename.reset(new std::string(libgeos_so_filename_));
571  LOG(INFO) << "Overriding default geos library with '" + *g_libgeos_so_filename + "'";
572  }
573 #endif
574 }
575 
577  shutdown();
578 }
579 
580 void DBHandler::check_read_only(const std::string& str) {
581  if (DBHandler::read_only_) {
582  THROW_DB_EXCEPTION(str + " disabled: server running in read-only mode.");
583  }
584 }
585 
587  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
588  // We would create an in memory session for calcite with super user privileges which
589  // would be used for getting all tables metadata when a user runs the query. The
590  // session would be under the name of a proxy user/password which would only persist
591  // till server's lifetime or execution of calcite query(in memory) whichever is the
592  // earliest.
594  std::string session_id;
595  do {
597  } while (calcite_sessions_.find(session_id) != calcite_sessions_.end());
598  Catalog_Namespace::UserMetadata user_meta(-1,
599  calcite_->getInternalSessionProxyUserName(),
600  calcite_->getInternalSessionProxyPassword(),
601  true,
602  -1,
603  true,
604  false);
605  const auto emplace_ret = calcite_sessions_.emplace(
606  session_id,
607  std::make_shared<Catalog_Namespace::SessionInfo>(
608  catalog_ptr, user_meta, executor_device_type_, session_id));
609  CHECK(emplace_ret.second);
610  return session_id;
611 }
612 
613 void DBHandler::removeInMemoryCalciteSession(const std::string& session_id) {
614  // Remove InMemory calcite Session.
616  CHECK(calcite_sessions_.erase(session_id)) << session_id;
617 }
618 
619 // internal connection for connections with no password
620 void DBHandler::internal_connect(TSessionId& session_id,
621  const std::string& username,
622  const std::string& dbname) {
624  auto stdlog = STDLOG(); // session_id set by connect_impl()
625  std::string username2 = username; // login() may reset username given as argument
626  std::string dbname2 = dbname; // login() may reset dbname given as argument
628  std::shared_ptr<Catalog> cat = nullptr;
629  try {
630  cat =
631  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
632  } catch (std::exception& e) {
633  THROW_DB_EXCEPTION(e.what());
634  }
635 
636  DBObject dbObject(dbname2, DatabaseDBObjectType);
637  dbObject.loadKey(*cat);
639  std::vector<DBObject> dbObjects;
640  dbObjects.push_back(dbObject);
641  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
642  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
643  " is not allowed to access database " + dbname2 + ".");
644  }
645  connect_impl(session_id, std::string(), dbname2, user_meta, cat, stdlog);
646 }
647 
649  return leaf_aggregator_.leafCount() > 0;
650 }
651 
652 void DBHandler::krb5_connect(TKrb5Session& session,
653  const std::string& inputToken,
654  const std::string& dbname) {
655  THROW_DB_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
656 }
657 
658 void DBHandler::connect(TSessionId& session_id,
659  const std::string& username,
660  const std::string& passwd,
661  const std::string& dbname) {
663  auto stdlog = STDLOG(); // session_info set by connect_impl()
664  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
665  std::string username2 = username; // login() may reset username given as argument
666  std::string dbname2 = dbname; // login() may reset dbname given as argument
668  std::shared_ptr<Catalog> cat = nullptr;
669  try {
670  cat = SysCatalog::instance().login(
671  dbname2, username2, passwd, user_meta, !super_user_rights_);
672  } catch (std::exception& e) {
673  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
674  THROW_DB_EXCEPTION(e.what());
675  }
676 
677  DBObject dbObject(dbname2, DatabaseDBObjectType);
678  dbObject.loadKey(*cat);
680  std::vector<DBObject> dbObjects;
681  dbObjects.push_back(dbObject);
682  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
683  stdlog.appendNameValuePairs(
684  "user", username, "db", dbname, "exception", "Missing Privileges");
685  THROW_DB_EXCEPTION("Unauthorized Access: user " + user_meta.userLoggable() +
686  " is not allowed to access database " + dbname2 + ".");
687  }
688  connect_impl(session_id, passwd, dbname2, user_meta, cat, stdlog);
689 
690  // if pki auth session_id will come back encrypted with user pubkey
691  SysCatalog::instance().check_for_session_encryption(passwd, session_id);
692 }
693 
694 void DBHandler::connect_impl(TSessionId& session_id,
695  const std::string& passwd,
696  const std::string& dbname,
697  const Catalog_Namespace::UserMetadata& user_meta,
698  std::shared_ptr<Catalog> cat,
699  query_state::StdLog& stdlog) {
700  // TODO(sy): Is there any reason to have dbname as a parameter
701  // here when the cat parameter already provides cat->name()?
702  // Should dbname and cat->name() ever differ?
703  auto session_ptr = sessions_store_->add(user_meta, cat, executor_device_type_);
704  session_id = session_ptr->get_session_id();
705  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database " << dbname;
706  stdlog.setSessionInfo(session_ptr);
707  session_ptr->set_connection_info(getConnectionInfo().toString());
708  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time
709  // while doing warmup
710  }
711  auto const roles =
712  stdlog.getConstSessionInfo()->get_currentUser().isSuper
713  ? std::vector<std::string>{{"super"}}
714  : SysCatalog::instance().getRoles(
715  false, false, stdlog.getConstSessionInfo()->get_currentUser().userName);
716  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
717 }
718 
719 void DBHandler::disconnect(const TSessionId& session_id_or_json) {
720  heavyai::RequestInfo const request_info(session_id_or_json);
721  SET_REQUEST_ID(request_info.requestId());
722  auto session_ptr = get_session_ptr(request_info.sessionId());
723  auto stdlog = STDLOG(session_ptr, "client", getConnectionInfo().toString());
724  sessions_store_->disconnect(request_info.sessionId());
725 }
726 
728  const auto session_id = session_ptr->get_session_id();
729  std::exception_ptr leaf_exception = nullptr;
730  try {
731  if (leaf_aggregator_.leafCount() > 0) {
732  leaf_aggregator_.disconnect(session_id);
733  }
734  } catch (...) {
735  leaf_exception = std::current_exception();
736  }
737 
738  if (render_handler_) {
739  render_handler_->disconnect(session_id);
740  }
741 
742  if (leaf_exception) {
743  std::rethrow_exception(leaf_exception);
744  }
745 }
746 
747 void DBHandler::switch_database(const TSessionId& session_id_or_json,
748  const std::string& dbname) {
749  heavyai::RequestInfo const request_info(session_id_or_json);
750  SET_REQUEST_ID(request_info.requestId());
751  auto session_ptr = get_session_ptr(request_info.sessionId());
752  auto stdlog = STDLOG(session_ptr);
753  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
754  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
755  try {
756  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
757  dbname2, session_ptr->get_currentUser().userName);
758  session_ptr->set_catalog_ptr(cat);
759  if (leaf_aggregator_.leafCount() > 0) {
760  leaf_aggregator_.switch_database(request_info.sessionId(), dbname);
761  return;
762  }
763  } catch (std::exception& e) {
764  THROW_DB_EXCEPTION(e.what());
765  }
766 }
767 
768 void DBHandler::clone_session(TSessionId& session2_id,
769  const TSessionId& session1_id_or_json) {
770  heavyai::RequestInfo const request_info(session1_id_or_json);
771  SET_REQUEST_ID(request_info.requestId());
772  auto session1_ptr = get_session_ptr(request_info.sessionId());
773  auto stdlog = STDLOG(session1_ptr);
774  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
775 
776  try {
777  const Catalog_Namespace::UserMetadata& user_meta = session1_ptr->get_currentUser();
778  std::shared_ptr<Catalog> cat = session1_ptr->get_catalog_ptr();
779  auto session2_ptr = sessions_store_->add(user_meta, cat, executor_device_type_);
780  session2_id = session2_ptr->get_session_id();
781  LOG(INFO) << "User " << user_meta.userLoggable() << " connected to database "
782  << cat->name();
783  if (leaf_aggregator_.leafCount() > 0) {
784  leaf_aggregator_.clone_session(request_info.sessionId(), session2_id);
785  return;
786  }
787  } catch (std::exception& e) {
788  THROW_DB_EXCEPTION(e.what());
789  }
790 }
791 
792 void DBHandler::interrupt(const TSessionId& query_session_id_or_json,
793  const TSessionId& interrupt_session_id_or_json) {
794  // if this is for distributed setting, query_session becomes a parent session (agg)
795  // and the interrupt session is one of existing session in the leaf node (leaf)
796  // so we can think there exists a logical mapping
797  // between query_session (agg) and interrupt_session (leaf)
798  heavyai::RequestInfo const query_request_info(query_session_id_or_json);
799  heavyai::RequestInfo const interrupt_request_info(interrupt_session_id_or_json);
800  SET_REQUEST_ID(interrupt_request_info.requestId());
801  auto session_ptr = get_session_ptr(interrupt_request_info.sessionId());
802  auto& cat = session_ptr->getCatalog();
803  auto stdlog = STDLOG(session_ptr);
804  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
805  const auto allow_query_interrupt =
807  if (g_enable_dynamic_watchdog || allow_query_interrupt) {
808  const auto dbname = cat.getCurrentDB().dbName;
810  jit_debug_ ? "/tmp" : "",
811  jit_debug_ ? "mapdquery" : "",
813  CHECK(executor);
814 
815  if (leaf_aggregator_.leafCount() > 0) {
816  leaf_aggregator_.interrupt(query_request_info.sessionId(),
817  interrupt_request_info.sessionId());
818  }
819  auto target_executor_ids =
820  executor->getExecutorIdsRunningQuery(query_request_info.sessionId());
821  if (target_executor_ids.empty()) {
823  executor->getSessionLock());
824  if (executor->checkIsQuerySessionEnrolled(query_request_info.sessionId(),
825  session_read_lock)) {
826  session_read_lock.unlock();
827  VLOG(1) << "Received interrupt: "
828  << "User " << session_ptr->get_currentUser().userLoggable()
829  << ", Database " << dbname << std::endl;
830  executor->interrupt(query_request_info.sessionId(),
831  interrupt_request_info.sessionId());
832  }
833  } else {
834  for (auto& executor_id : target_executor_ids) {
835  VLOG(1) << "Received interrupt: "
836  << "Executor " << executor_id << ", User "
837  << session_ptr->get_currentUser().userLoggable() << ", Database "
838  << dbname << std::endl;
839  auto target_executor = Executor::getExecutor(executor_id);
840  target_executor->interrupt(query_request_info.sessionId(),
841  interrupt_request_info.sessionId());
842  }
843  }
844 
845  LOG(INFO) << "User " << session_ptr->get_currentUser().userName
846  << " interrupted session with database " << dbname << std::endl;
847  }
848 }
849 
851  if (g_cluster) {
852  if (leaf_aggregator_.leafCount() > 0) {
853  return TRole::type::AGGREGATOR;
854  }
855  return TRole::type::LEAF;
856  }
857  return TRole::type::SERVER;
858 }
859 void DBHandler::get_server_status(TServerStatus& _return,
860  const TSessionId& session_id_or_json) {
861  heavyai::RequestInfo const request_info(session_id_or_json);
862  SET_REQUEST_ID(request_info.requestId());
863  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
864  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
865  const auto rendering_enabled = bool(render_handler_);
866  _return.read_only = read_only_;
867  _return.version = MAPD_RELEASE;
868  _return.rendering_enabled = rendering_enabled;
869  _return.start_time = start_time_;
870  _return.edition = MAPD_EDITION;
871  _return.host_name = heavyai::get_hostname();
872  _return.poly_rendering_enabled = rendering_enabled;
873  _return.role = getServerRole();
874  _return.renderer_status_json =
875  render_handler_ ? render_handler_->get_renderer_status_json() : "";
876 }
877 
878 void DBHandler::get_status(std::vector<TServerStatus>& _return,
879  const TSessionId& session_id_or_json) {
880  //
881  // get_status() is now called locally at startup on the aggregator
882  // in order to validate that all nodes of a cluster are running the
883  // same software version and the same renderer status
884  //
885  // In that context, it is called with the InvalidSessionID, and
886  // with the local super-user flag set.
887  //
888  // Hence, we allow this session-less mode only in distributed mode, and
889  // then on a leaf (always), or on the aggregator (only in super-user mode)
890  //
891  heavyai::RequestInfo const request_info(session_id_or_json);
892  SET_REQUEST_ID(request_info.requestId());
893  auto const allow_invalid_session = g_cluster && (!isAggregator() || super_user_rights_);
894 
895  if (!allow_invalid_session || request_info.sessionId() != getInvalidSessionId()) {
896  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
897  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
898  } else {
899  LOG(INFO) << "get_status() called in session-less mode";
900  }
901  const auto rendering_enabled = bool(render_handler_);
902  TServerStatus ret;
903  ret.read_only = read_only_;
904  ret.version = MAPD_RELEASE;
905  ret.rendering_enabled = rendering_enabled;
906  ret.start_time = start_time_;
907  ret.edition = MAPD_EDITION;
908  ret.host_name = heavyai::get_hostname();
909  ret.poly_rendering_enabled = rendering_enabled;
910  ret.role = getServerRole();
911  ret.renderer_status_json =
912  render_handler_ ? render_handler_->get_renderer_status_json() : "";
913  ret.host_id = "";
914 
915  _return.push_back(ret);
916  if (leaf_aggregator_.leafCount() > 0) {
917  std::vector<TServerStatus> leaf_status =
918  leaf_aggregator_.getLeafStatus(request_info.sessionId());
919  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
920  }
921 }
922 
923 void DBHandler::get_hardware_info(TClusterHardwareInfo& _return,
924  const TSessionId& session_id_or_json) {
925  heavyai::RequestInfo const request_info(session_id_or_json);
926  SET_REQUEST_ID(request_info.requestId());
927  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
928  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
929  THardwareInfo ret;
930  const auto cuda_mgr = data_mgr_->getCudaMgr();
931  if (cuda_mgr) {
932  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
933  ret.start_gpu = cuda_mgr->getStartGpu();
934  if (ret.start_gpu >= 0) {
935  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
936  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
937  }
938  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
939  TGpuSpecification gpu_spec;
940  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
941  gpu_spec.num_sm = deviceProperties->numMPs;
942  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
943  gpu_spec.memory = deviceProperties->globalMem;
944  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
945  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
946  ret.gpu_info.push_back(gpu_spec);
947  }
948  }
949 
950  // start hardware/OS dependent code
951  ret.num_cpu_hw = std::thread::hardware_concurrency();
952  // ^ This might return diffrent results in case of hyper threading
953  // end hardware/OS dependent code
954 
955  _return.hardware_info.push_back(ret);
956 }
957 
958 void DBHandler::get_session_info(TSessionInfo& _return,
959  const TSessionId& session_id_or_json) {
960  heavyai::RequestInfo const request_info(session_id_or_json);
961  SET_REQUEST_ID(request_info.requestId());
962  auto session_ptr = get_session_ptr(request_info.sessionId());
963  CHECK(session_ptr);
964  auto stdlog = STDLOG(session_ptr);
965  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
966  auto user_metadata = session_ptr->get_currentUser();
967  _return.user = user_metadata.userName;
968  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
969  _return.start_time = session_ptr->get_start_time();
970  _return.is_super = user_metadata.isSuper;
971 }
972 
973 void DBHandler::set_leaf_info(const TSessionId& session, const TLeafInfo& info) {
974  g_distributed_leaf_idx = info.leaf_id;
975  g_distributed_num_leaves = info.num_leaves;
976 }
977 
979  const SQLTypeInfo& ti,
980  TColumn& column) {
981  if (ti.is_array()) {
983  << "element types of arrays should always be nullable";
984  TColumn tColumn;
985  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
986  CHECK(array_tv);
987  bool is_null = !array_tv->is_initialized();
988  if (!is_null) {
989  const auto& vec = array_tv->get();
990  for (const auto& elem_tv : vec) {
991  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
992  }
993  }
994  column.data.arr_col.push_back(tColumn);
995  column.nulls.push_back(is_null && !ti.get_notnull());
996  } else if (ti.is_geometry()) {
997  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
998  if (scalar_tv) {
999  auto s_n = boost::get<NullableString>(scalar_tv);
1000  auto s = boost::get<std::string>(s_n);
1001  if (s) {
1002  column.data.str_col.push_back(*s);
1003  } else {
1004  column.data.str_col.emplace_back(""); // null string
1005  auto null_p = boost::get<void*>(s_n);
1006  CHECK(null_p && !*null_p);
1007  }
1008  column.nulls.push_back(!s && !ti.get_notnull());
1009  } else {
1010  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1011  CHECK(array_tv);
1012  bool is_null = !array_tv->is_initialized();
1013  if (!is_null) {
1014  auto elem_type = SQLTypeInfo(kDOUBLE, false);
1015  TColumn tColumn;
1016  const auto& vec = array_tv->get();
1017  for (const auto& elem_tv : vec) {
1018  value_to_thrift_column(elem_tv, elem_type, tColumn);
1019  }
1020  column.data.arr_col.push_back(tColumn);
1021  column.nulls.push_back(false);
1022  } else {
1023  TColumn tColumn;
1024  column.data.arr_col.push_back(tColumn);
1025  column.nulls.push_back(is_null && !ti.get_notnull());
1026  }
1027  }
1028  } else {
1029  CHECK(!ti.is_column());
1030  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1031  CHECK(scalar_tv);
1032  if (boost::get<int64_t>(scalar_tv)) {
1033  int64_t data = *(boost::get<int64_t>(scalar_tv));
1034 
1035  if (ti.is_decimal()) {
1036  double val = static_cast<double>(data);
1037  if (ti.get_scale() > 0) {
1038  val /= pow(10.0, std::abs(ti.get_scale()));
1039  }
1040  column.data.real_col.push_back(val);
1041  } else {
1042  column.data.int_col.push_back(data);
1043  }
1044 
1045  switch (ti.get_type()) {
1046  case kBOOLEAN:
1047  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
1048  break;
1049  case kTINYINT:
1050  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
1051  break;
1052  case kSMALLINT:
1053  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
1054  break;
1055  case kINT:
1056  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
1057  break;
1058  case kNUMERIC:
1059  case kDECIMAL:
1060  case kBIGINT:
1061  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1062  break;
1063  case kTIME:
1064  case kTIMESTAMP:
1065  case kDATE:
1066  case kINTERVAL_DAY_TIME:
1067  case kINTERVAL_YEAR_MONTH:
1068  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
1069  break;
1070  default:
1071  column.nulls.push_back(false);
1072  }
1073  } else if (boost::get<double>(scalar_tv)) {
1074  double data = *(boost::get<double>(scalar_tv));
1075  column.data.real_col.push_back(data);
1076  if (ti.get_type() == kFLOAT) {
1077  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1078  } else {
1079  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
1080  }
1081  } else if (boost::get<float>(scalar_tv)) {
1082  CHECK_EQ(kFLOAT, ti.get_type());
1083  float data = *(boost::get<float>(scalar_tv));
1084  column.data.real_col.push_back(data);
1085  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
1086  } else if (boost::get<NullableString>(scalar_tv)) {
1087  auto s_n = boost::get<NullableString>(scalar_tv);
1088  auto s = boost::get<std::string>(s_n);
1089  if (s) {
1090  column.data.str_col.push_back(*s);
1091  } else {
1092  column.data.str_col.emplace_back(""); // null string
1093  auto null_p = boost::get<void*>(s_n);
1094  CHECK(null_p && !*null_p);
1095  }
1096  column.nulls.push_back(!s && !ti.get_notnull());
1097  } else {
1098  CHECK(false);
1099  }
1100  }
1101 }
1102 
1104  TDatum datum;
1105  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
1106  if (!scalar_tv) {
1107  CHECK(ti.is_array());
1109  << "element types of arrays should always be nullable";
1110  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
1111  CHECK(array_tv);
1112  if (array_tv->is_initialized()) {
1113  const auto& vec = array_tv->get();
1114  for (const auto& elem_tv : vec) {
1115  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
1116  datum.val.arr_val.push_back(scalar_col_val);
1117  }
1118  // Datum is not null, at worst it's an empty array Datum
1119  datum.is_null = false;
1120  } else {
1121  datum.is_null = true;
1122  }
1123  return datum;
1124  }
1125  if (boost::get<int64_t>(scalar_tv)) {
1126  int64_t data = *(boost::get<int64_t>(scalar_tv));
1127 
1128  if (ti.is_decimal()) {
1129  double val = static_cast<double>(data);
1130  if (ti.get_scale() > 0) {
1131  val /= pow(10.0, std::abs(ti.get_scale()));
1132  }
1133  datum.val.real_val = val;
1134  } else {
1135  datum.val.int_val = data;
1136  }
1137 
1138  switch (ti.get_type()) {
1139  case kBOOLEAN:
1140  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
1141  break;
1142  case kTINYINT:
1143  datum.is_null = (datum.val.int_val == NULL_TINYINT);
1144  break;
1145  case kSMALLINT:
1146  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
1147  break;
1148  case kINT:
1149  datum.is_null = (datum.val.int_val == NULL_INT);
1150  break;
1151  case kDECIMAL:
1152  case kNUMERIC:
1153  case kBIGINT:
1154  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1155  break;
1156  case kTIME:
1157  case kTIMESTAMP:
1158  case kDATE:
1159  case kINTERVAL_DAY_TIME:
1160  case kINTERVAL_YEAR_MONTH:
1161  datum.is_null = (datum.val.int_val == NULL_BIGINT);
1162  break;
1163  default:
1164  datum.is_null = false;
1165  }
1166  } else if (boost::get<double>(scalar_tv)) {
1167  datum.val.real_val = *(boost::get<double>(scalar_tv));
1168  if (ti.get_type() == kFLOAT) {
1169  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1170  } else {
1171  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
1172  }
1173  } else if (boost::get<float>(scalar_tv)) {
1174  CHECK_EQ(kFLOAT, ti.get_type());
1175  datum.val.real_val = *(boost::get<float>(scalar_tv));
1176  datum.is_null = (datum.val.real_val == NULL_FLOAT);
1177  } else if (boost::get<NullableString>(scalar_tv)) {
1178  auto s_n = boost::get<NullableString>(scalar_tv);
1179  auto s = boost::get<std::string>(s_n);
1180  if (s) {
1181  datum.val.str_val = *s;
1182  } else {
1183  auto null_p = boost::get<void*>(s_n);
1184  CHECK(null_p && !*null_p);
1185  }
1186  datum.is_null = !s;
1187  } else {
1188  CHECK(false);
1189  }
1190  return datum;
1191 }
1192 
1194  TQueryResult& _return,
1195  const QueryStateProxy& query_state_proxy,
1196  const std::shared_ptr<Catalog_Namespace::SessionInfo> session_ptr,
1197  const std::string& query_str,
1198  const bool column_format,
1199  const std::string& nonce,
1200  const int32_t first_n,
1201  const int32_t at_most_n,
1202  const bool use_calcite) {
1203  _return.total_time_ms = 0;
1204  _return.nonce = nonce;
1205  ParserWrapper pw{query_str};
1206  switch (pw.getQueryType()) {
1208  _return.query_type = TQueryType::READ;
1209  VLOG(1) << "query type: READ";
1210  break;
1211  }
1213  _return.query_type = TQueryType::WRITE;
1214  VLOG(1) << "query type: WRITE";
1215  break;
1216  }
1218  _return.query_type = TQueryType::SCHEMA_READ;
1219  VLOG(1) << "query type: SCHEMA READ";
1220  break;
1221  }
1223  _return.query_type = TQueryType::SCHEMA_WRITE;
1224  VLOG(1) << "query type: SCHEMA WRITE";
1225  break;
1226  }
1227  default: {
1228  _return.query_type = TQueryType::UNKNOWN;
1229  LOG(WARNING) << "query type: UNKNOWN";
1230  break;
1231  }
1232  }
1233 
1236  _return.total_time_ms += measure<>::execution([&]() {
1238  query_state_proxy,
1239  column_format,
1240  session_ptr->get_executor_device_type(),
1241  first_n,
1242  at_most_n,
1243  use_calcite,
1244  locks);
1246  _return, result, query_state_proxy, column_format, first_n, at_most_n);
1247  });
1248 }
1249 
1250 void DBHandler::convertData(TQueryResult& _return,
1252  const QueryStateProxy& query_state_proxy,
1253  const bool column_format,
1254  const int32_t first_n,
1255  const int32_t at_most_n) {
1256  _return.execution_time_ms += result.getExecutionTime();
1257  if (result.empty()) {
1258  return;
1259  }
1260 
1261  switch (result.getResultType()) {
1263  convertRows(_return,
1264  query_state_proxy,
1265  result.getTargetsMeta(),
1266  *result.getRows(),
1267  column_format,
1268  first_n,
1269  at_most_n);
1270  break;
1272  convertResult(_return, *result.getRows(), true);
1273  break;
1275  convertExplain(_return, *result.getRows(), true);
1276  break;
1278  convertRows(_return,
1279  query_state_proxy,
1280  result.getTargetsMeta(),
1281  *result.getRows(),
1282  column_format,
1283  -1,
1284  -1);
1285  break;
1286  }
1287 }
1288 
1289 void DBHandler::sql_execute(TQueryResult& _return,
1290  const TSessionId& session_id_or_json,
1291  const std::string& query_str,
1292  const bool column_format,
1293  const std::string& nonce,
1294  const int32_t first_n,
1295  const int32_t at_most_n) {
1296  heavyai::RequestInfo const request_info(session_id_or_json);
1297  SET_REQUEST_ID(request_info.requestId());
1298  const std::string exec_ra_prefix = "execute relalg";
1299  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1300  auto actual_query =
1301  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1302  auto session_ptr = get_session_ptr(request_info.sessionId());
1303  auto query_state = create_query_state(session_ptr, actual_query);
1304  auto stdlog = STDLOG(session_ptr, query_state);
1305  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1306  stdlog.appendNameValuePairs("nonce", nonce);
1307  auto timer = DEBUG_TIMER(__func__);
1308  try {
1309  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1310  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1311  };
1312 
1313  if (first_n >= 0 && at_most_n >= 0) {
1314  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1315  }
1316 
1317  if (leaf_aggregator_.leafCount() > 0) {
1318  if (!agg_handler_) {
1319  THROW_DB_EXCEPTION("Distributed support is disabled.");
1320  }
1321  _return.total_time_ms = measure<>::execution([&]() {
1322  agg_handler_->cluster_execute(_return,
1323  query_state->createQueryStateProxy(),
1324  query_state->getQueryStr(),
1325  column_format,
1326  nonce,
1327  first_n,
1328  at_most_n,
1330  });
1331  _return.nonce = nonce;
1332  } else {
1333  sql_execute_local(_return,
1334  query_state->createQueryStateProxy(),
1335  session_ptr,
1336  actual_query,
1337  column_format,
1338  nonce,
1339  first_n,
1340  at_most_n,
1341  use_calcite);
1342  }
1343  _return.total_time_ms += process_deferred_copy_from(request_info.sessionId());
1344  std::string debug_json = timer.stopAndGetJson();
1345  if (!debug_json.empty()) {
1346  _return.__set_debug(std::move(debug_json));
1347  }
1348  stdlog.appendNameValuePairs(
1349  "execution_time_ms",
1350  _return.execution_time_ms,
1351  "total_time_ms", // BE-3420 - Redundant with duration field
1352  stdlog.duration<std::chrono::milliseconds>());
1353  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1354  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1355  } catch (const std::exception& e) {
1356  if (strstr(e.what(), "java.lang.NullPointerException")) {
1357  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1358  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1359  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1360  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1361  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1362  } else {
1363  THROW_DB_EXCEPTION(e.what());
1364  }
1365  }
1366 }
1367 
1369  const TSessionId& session_id_or_json,
1370  const std::string& query_str,
1371  const bool column_format,
1372  const int32_t first_n,
1373  const int32_t at_most_n,
1375  heavyai::RequestInfo const request_info(session_id_or_json);
1376  SET_REQUEST_ID(request_info.requestId());
1377  const std::string exec_ra_prefix = "execute relalg";
1378  const bool use_calcite = !boost::starts_with(query_str, exec_ra_prefix);
1379  auto actual_query =
1380  use_calcite ? query_str : boost::trim_copy(query_str.substr(exec_ra_prefix.size()));
1381 
1382  auto session_ptr = get_session_ptr(request_info.sessionId());
1383  CHECK(session_ptr);
1384  auto query_state = create_query_state(session_ptr, actual_query);
1385  auto stdlog = STDLOG(session_ptr, query_state);
1386  auto timer = DEBUG_TIMER(__func__);
1387 
1388  try {
1389  ScopeGuard reset_was_deferred_copy_from = [this, &session_ptr] {
1390  deferred_copy_from_sessions.remove(session_ptr->get_session_id());
1391  };
1392 
1393  if (first_n >= 0 && at_most_n >= 0) {
1394  THROW_DB_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
1395  }
1396  auto total_time_ms = measure<>::execution([&]() {
1398  query_state->createQueryStateProxy(),
1399  column_format,
1400  session_ptr->get_executor_device_type(),
1401  first_n,
1402  at_most_n,
1403  use_calcite,
1404  locks);
1405  });
1406 
1407  _return.setExecutionTime(total_time_ms +
1408  process_deferred_copy_from(request_info.sessionId()));
1409 
1410  stdlog.appendNameValuePairs(
1411  "execution_time_ms",
1412  _return.getExecutionTime(),
1413  "total_time_ms", // BE-3420 - Redundant with duration field
1414  stdlog.duration<std::chrono::milliseconds>());
1415  VLOG(1) << "Table Schema Locks:\n" << lockmgr::TableSchemaLockMgr::instance();
1416  VLOG(1) << "Table Data Locks:\n" << lockmgr::TableDataLockMgr::instance();
1417  } catch (const std::exception& e) {
1418  if (strstr(e.what(), "java.lang.NullPointerException")) {
1419  THROW_DB_EXCEPTION("query failed from broken view or other schema related issue");
1420  } else if (strstr(e.what(), "SQL Error: Encountered \";\"")) {
1421  THROW_DB_EXCEPTION("multiple SQL statements not allowed");
1422  } else if (strstr(e.what(), "SQL Error: Encountered \"<EOF>\" at line 0, column 0")) {
1423  THROW_DB_EXCEPTION("empty SQL statment not allowed");
1424  } else {
1425  THROW_DB_EXCEPTION(e.what());
1426  }
1427  }
1428 }
1429 
1430 int64_t DBHandler::process_deferred_copy_from(const TSessionId& session_id) {
1431  int64_t total_time_ms(0);
1432  // if the SQL statement we just executed was a geo COPY FROM, the import
1433  // parameters were captured, and this flag set, so we do the actual import here
1434  if (auto deferred_copy_from_state = deferred_copy_from_sessions(session_id)) {
1435  // import_geo_table() calls create_table() which calls this function to
1436  // do the work, so reset the flag now to avoid executing this part a
1437  // second time at the end of that, which would fail as the table was
1438  // already created! Also reset the flag with a ScopeGuard on exiting
1439  // this function any other way, such as an exception from the code above!
1440  deferred_copy_from_sessions.remove(session_id);
1441 
1442  // create table as replicated?
1443  TCreateParams create_params;
1444  if (deferred_copy_from_state->partitions == "REPLICATED") {
1445  create_params.is_replicated = true;
1446  }
1447 
1448  // now do (and time) the import
1449  total_time_ms = measure<>::execution([&]() {
1450  importGeoTableGlobFilterSort(session_id,
1451  deferred_copy_from_state->table,
1452  deferred_copy_from_state->file_name,
1453  deferred_copy_from_state->copy_params,
1454  TRowDescriptor(),
1455  create_params);
1456  });
1457  }
1458  return total_time_ms;
1459 }
1460 
1461 void DBHandler::sql_execute_df(TDataFrame& _return,
1462  const TSessionId& session_id_or_json,
1463  const std::string& query_str,
1464  const TDeviceType::type results_device_type,
1465  const int32_t device_id,
1466  const int32_t first_n,
1467  const TArrowTransport::type transport_method) {
1468  heavyai::RequestInfo const request_info(session_id_or_json);
1469  SET_REQUEST_ID(request_info.requestId());
1470  auto session_ptr = get_session_ptr(request_info.sessionId());
1471  CHECK(session_ptr);
1472  auto query_state = create_query_state(session_ptr, query_str);
1473  auto stdlog = STDLOG(session_ptr, query_state);
1474 
1475  const auto executor_device_type = session_ptr->get_executor_device_type();
1476 
1477  if (results_device_type == TDeviceType::GPU) {
1478  if (executor_device_type != ExecutorDeviceType::GPU) {
1479  THROW_DB_EXCEPTION(std::string("GPU mode is not allowed in this session"));
1480  }
1481  if (!data_mgr_->gpusPresent()) {
1482  THROW_DB_EXCEPTION(std::string("No GPU is available in this server"));
1483  }
1484  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1486  std::string("Invalid device_id or unavailable GPU with this ID"));
1487  }
1488  }
1489  ParserWrapper pw{query_str};
1490  if (pw.getQueryType() != ParserWrapper::QueryType::Read) {
1491  THROW_DB_EXCEPTION(std::string(
1492  "Only read queries supported for the Arrow sql_execute_df endpoint."));
1493  }
1494  if (ExplainInfo(query_str).isCalciteExplain()) {
1495  THROW_DB_EXCEPTION(std::string(
1496  "Explain is currently unsupported by the Arrow sql_execute_df endpoint."));
1497  }
1498 
1499  ExecutionResult execution_result;
1501  sql_execute_impl(execution_result,
1502  query_state->createQueryStateProxy(),
1503  true, /* column_format - does this do anything? */
1504  executor_device_type,
1505  first_n,
1506  -1, /* at_most_n */
1507  true,
1508  locks);
1509 
1510  const auto result_set = execution_result.getRows();
1511  const auto executor_results_device_type = results_device_type == TDeviceType::CPU
1514  _return.execution_time_ms =
1515  execution_result.getExecutionTime() - result_set->getQueueTime();
1516  const auto converter = std::make_unique<ArrowResultSetConverter>(
1517  result_set,
1518  data_mgr_,
1519  executor_results_device_type,
1520  device_id,
1521  getTargetNames(execution_result.getTargetsMeta()),
1522  first_n,
1523  ArrowTransport(transport_method));
1524  ArrowResult arrow_result;
1525  _return.arrow_conversion_time_ms +=
1526  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
1527  _return.sm_handle =
1528  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
1529  _return.sm_size = arrow_result.sm_size;
1530  _return.df_handle =
1531  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
1532  _return.df_buffer =
1533  std::string(arrow_result.df_buffer.begin(), arrow_result.df_buffer.end());
1534  if (executor_results_device_type == ExecutorDeviceType::GPU) {
1535  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1536  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
1537  ipc_handle_to_dev_ptr_.insert(
1538  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
1539  }
1540  _return.df_size = arrow_result.df_size;
1541 }
1542 
1543 void DBHandler::sql_execute_gdf(TDataFrame& _return,
1544  const TSessionId& session_id_or_json,
1545  const std::string& query_str,
1546  const int32_t device_id,
1547  const int32_t first_n) {
1548  heavyai::RequestInfo request_info(session_id_or_json);
1549  SET_REQUEST_ID(request_info.requestId());
1550  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1551  request_info.setRequestId(logger::request_id());
1552  sql_execute_df(_return,
1553  request_info.json(),
1554  query_str,
1555  TDeviceType::GPU,
1556  device_id,
1557  first_n,
1558  TArrowTransport::SHARED_MEMORY);
1559 }
1560 
1561 // For now we have only one user of a data frame in all cases.
1562 void DBHandler::deallocate_df(const TSessionId& session_id_or_json,
1563  const TDataFrame& df,
1564  const TDeviceType::type device_type,
1565  const int32_t device_id) {
1566  heavyai::RequestInfo const request_info(session_id_or_json);
1567  SET_REQUEST_ID(request_info.requestId());
1568  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1569  std::string serialized_cuda_handle = "";
1570  if (device_type == TDeviceType::GPU) {
1571  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1572  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1573  TDBException ex;
1574  ex.error_msg = std::string(
1575  "Current data frame handle is not bookkept or been inserted "
1576  "twice");
1577  LOG(ERROR) << ex.error_msg;
1578  throw ex;
1579  }
1580  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1581  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1582  }
1583  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1584  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1586  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1588  result,
1589  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1590  device_id,
1591  data_mgr_);
1592 }
1593 
1594 void DBHandler::sql_validate(TRowDescriptor& _return,
1595  const TSessionId& session_id_or_json,
1596  const std::string& query_str) {
1597  heavyai::RequestInfo const request_info(session_id_or_json);
1598  SET_REQUEST_ID(request_info.requestId());
1599  try {
1600  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1601  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1602  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1603  stdlog.setQueryState(query_state);
1604 
1605  ParserWrapper pw{query_str};
1606  if (ExplainInfo(query_str).isExplain() || pw.is_ddl || pw.is_update_dml) {
1607  throw std::runtime_error("Can only validate SELECT statements.");
1608  }
1609 
1610  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
1611 
1612  TPlanResult parse_result;
1614  std::tie(parse_result, locks) = parse_to_ra(query_state->createQueryStateProxy(),
1615  query_state->getQueryStr(),
1616  {},
1617  true,
1619  /*check_privileges=*/true);
1620  const auto query_ra = parse_result.plan_result;
1621  _return = validateRelAlg(query_ra, query_state->createQueryStateProxy());
1622  } catch (const std::exception& e) {
1623  THROW_DB_EXCEPTION(std::string(e.what()));
1624  }
1625 }
1626 
1627 namespace {
1628 
1630  std::unordered_set<std::string> uc_column_names;
1631  std::unordered_set<std::string> uc_column_table_qualifiers;
1632 };
1633 
1634 // Extract what looks like a (qualified) identifier from the partial query.
1635 // The results will be used to rank the auto-completion results: tables which
1636 // contain at least one of the identifiers first.
1638  const std::string& sql) {
1639  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1640  boost::regex::extended | boost::regex::icase};
1641  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1642  boost::sregex_token_iterator end;
1643  std::unordered_set<std::string> uc_column_names;
1644  std::unordered_set<std::string> uc_column_table_qualifiers;
1645  for (; tok_it != end; ++tok_it) {
1646  std::string column_name = *tok_it;
1647  std::vector<std::string> column_tokens;
1648  boost::split(column_tokens, column_name, boost::is_any_of("."));
1649  if (column_tokens.size() == 2) {
1650  // If the column name is qualified, take user's word.
1651  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1652  } else {
1653  uc_column_names.insert(to_upper(column_name));
1654  }
1655  }
1656  return {uc_column_names, uc_column_table_qualifiers};
1657 }
1658 
1659 } // namespace
1660 
1661 void DBHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1662  const TSessionId& session_id_or_json,
1663  const std::string& sql,
1664  const int cursor) {
1665  heavyai::RequestInfo const request_info(session_id_or_json);
1666  SET_REQUEST_ID(request_info.requestId());
1667  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1668  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1669  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1670  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1671  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1672  proj_tokens.uc_column_names, visible_tables, stdlog);
1673  // Add the table qualifiers explicitly specified by the user.
1674  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1675  proj_tokens.uc_column_table_qualifiers.end());
1676  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1677  std::sort(
1678  hints.begin(),
1679  hints.end(),
1680  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1681  if (lhs.type == TCompletionHintType::TABLE &&
1682  rhs.type == TCompletionHintType::TABLE) {
1683  // Between two tables, one which is compatible with the specified
1684  // projections and one which isn't, pick the one which is compatible.
1685  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1686  compatible_table_names.end() &&
1687  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1688  compatible_table_names.end()) {
1689  return true;
1690  }
1691  }
1692  return lhs.type < rhs.type;
1693  });
1694 }
1695 
1696 void DBHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1697  std::vector<std::string>& visible_tables,
1698  query_state::StdLog& stdlog,
1699  const std::string& sql,
1700  const int cursor) {
1701  const auto& session_info = *stdlog.getConstSessionInfo();
1702  try {
1703  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1704 
1705  // Filter out keywords suggested by Calcite which we don't support.
1707  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1708  } catch (const std::exception& e) {
1709  TDBException ex;
1710  ex.error_msg = std::string(e.what());
1711  LOG(ERROR) << ex.error_msg;
1712  throw ex;
1713  }
1714  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1715  const size_t length_to_cursor =
1716  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1717  // Trust hints from Calcite after the FROM keyword.
1718  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1719  return;
1720  }
1721  // Before FROM, the query is too incomplete for context-sensitive completions.
1722  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1723 }
1724 
1725 void DBHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1726  query_state::StdLog& stdlog,
1727  std::vector<std::string>& visible_tables,
1728  const std::string& sql,
1729  const int cursor) {
1730  const auto last_word =
1731  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1732  boost::regex select_expr{R"(\s*select\s+)",
1733  boost::regex::extended | boost::regex::icase};
1734  const size_t length_to_cursor =
1735  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1736  // After SELECT but before FROM, look for all columns in all tables which match the
1737  // prefix.
1738  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1739  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1740  // Trust the fully qualified columns the most.
1741  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1742  return;
1743  }
1744  // Not much information to use, just retrieve column names which match the prefix.
1745  if (should_suggest_column_hints(sql)) {
1746  get_column_hints(hints, last_word, column_names_by_table);
1747  return;
1748  }
1749  const std::string kFromKeyword{"FROM"};
1750  if (boost::istarts_with(kFromKeyword, last_word)) {
1751  TCompletionHint keyword_hint;
1752  keyword_hint.type = TCompletionHintType::KEYWORD;
1753  keyword_hint.replaced = last_word;
1754  keyword_hint.hints.emplace_back(kFromKeyword);
1755  hints.push_back(keyword_hint);
1756  }
1757  } else {
1758  const std::string kSelectKeyword{"SELECT"};
1759  if (boost::istarts_with(kSelectKeyword, last_word)) {
1760  TCompletionHint keyword_hint;
1761  keyword_hint.type = TCompletionHintType::KEYWORD;
1762  keyword_hint.replaced = last_word;
1763  keyword_hint.hints.emplace_back(kSelectKeyword);
1764  hints.push_back(keyword_hint);
1765  }
1766  }
1767 }
1768 
1769 std::unordered_map<std::string, std::unordered_set<std::string>>
1770 DBHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1771  query_state::StdLog& stdlog) {
1772  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1773  for (auto it = table_names.begin(); it != table_names.end();) {
1774  TTableDetails table_details;
1775  try {
1776  get_table_details_impl(table_details, stdlog, *it, false, false);
1777  } catch (const TDBException& e) {
1778  // Remove the corrupted Table/View name from the list for further processing.
1779  it = table_names.erase(it);
1780  continue;
1781  }
1782  for (const auto& column_type : table_details.row_desc) {
1783  column_names_by_table[*it].emplace(column_type.col_name);
1784  }
1785  ++it;
1786  }
1787  return column_names_by_table;
1788 }
1789 
1793 }
1794 
1796  const std::unordered_set<std::string>& uc_column_names,
1797  std::vector<std::string>& table_names,
1798  query_state::StdLog& stdlog) {
1799  std::unordered_set<std::string> compatible_table_names_by_column;
1800  for (auto it = table_names.begin(); it != table_names.end();) {
1801  TTableDetails table_details;
1802  try {
1803  get_table_details_impl(table_details, stdlog, *it, false, false);
1804  } catch (const TDBException& e) {
1805  // Remove the corrupted Table/View name from the list for further processing.
1806  it = table_names.erase(it);
1807  continue;
1808  }
1809  for (const auto& column_type : table_details.row_desc) {
1810  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1811  compatible_table_names_by_column.emplace(to_upper(*it));
1812  break;
1813  }
1814  }
1815  ++it;
1816  }
1817  return compatible_table_names_by_column;
1818 }
1819 
1820 void DBHandler::dispatch_query_task(std::shared_ptr<QueryDispatchQueue::Task> query_task,
1821  const bool is_update_delete) {
1823  dispatch_queue_->submit(std::move(query_task), is_update_delete);
1824 }
1825 
1826 TRowDescriptor DBHandler::validateRelAlg(const std::string& query_ra,
1827  QueryStateProxy query_state_proxy) {
1828  TQueryResult query_result;
1829  ExecutionResult execution_result;
1830  auto execute_rel_alg_task = std::make_shared<QueryDispatchQueue::Task>(
1831  [this,
1832  &execution_result,
1833  query_state_proxy,
1834  &query_ra,
1835  parent_thread_local_ids =
1836  logger::thread_local_ids()](const size_t executor_index) {
1837  logger::LocalIdsScopeGuard lisg = parent_thread_local_ids.setNewThreadId();
1838  execute_rel_alg(execution_result,
1839  query_state_proxy,
1840  query_ra,
1841  true,
1843  -1,
1844  -1,
1845  /*just_validate=*/true,
1846  /*find_filter_push_down_candidates=*/false,
1847  ExplainInfo(),
1848  executor_index);
1849  });
1850  dispatch_query_task(execute_rel_alg_task, /*is_update_delete=*/false);
1851  auto result_future = execute_rel_alg_task->get_future();
1852  result_future.get();
1853  DBHandler::convertData(query_result, execution_result, query_state_proxy, true, -1, -1);
1854 
1855  const auto& row_desc = query_result.row_set.row_desc;
1856  const auto& targets_meta = execution_result.getTargetsMeta();
1857  CHECK_EQ(row_desc.size(), targets_meta.size());
1858 
1859  // TODO: Below fixup logic should no longer be needed after the comp_param refactor
1860  TRowDescriptor fixedup_row_desc;
1861  for (size_t i = 0; i < row_desc.size(); i++) {
1862  const auto& col_desc = row_desc[i];
1863  auto fixedup_col_desc = col_desc;
1864  if (col_desc.col_type.encoding == TEncodingType::DICT &&
1865  col_desc.col_type.comp_param > 0) {
1866  const auto& type_info = targets_meta[i].get_type_info();
1867  CHECK_EQ(type_info.get_compression(), kENCODING_DICT);
1869  type_info.getStringDictKey().db_id);
1870  const auto dd = cat->getMetadataForDict(col_desc.col_type.comp_param, false);
1871  CHECK(dd);
1872  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
1873  }
1874  fixedup_row_desc.push_back(fixedup_col_desc);
1875  }
1876  return fixedup_row_desc;
1877 }
1878 
1879 void DBHandler::get_roles(std::vector<std::string>& roles,
1880  const TSessionId& session_id_or_json) {
1881  heavyai::RequestInfo const request_info(session_id_or_json);
1882  SET_REQUEST_ID(request_info.requestId());
1883  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
1884  auto session_ptr = stdlog.getConstSessionInfo();
1885  if (!session_ptr->get_currentUser().isSuper) {
1886  // WARNING: This appears to not include roles a user is a member of,
1887  // if the role has no permissions granted to it.
1888  roles =
1889  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1890  session_ptr->getCatalog().getCurrentDB().dbId);
1891  } else {
1892  roles = SysCatalog::instance().getRoles(
1893  false, true, session_ptr->get_currentUser().userName);
1894  }
1895 }
1896 
1897 bool DBHandler::has_role(const TSessionId& session_id_or_json,
1898  const std::string& granteeName,
1899  const std::string& roleName) {
1900  heavyai::RequestInfo const request_info(session_id_or_json);
1901  SET_REQUEST_ID(request_info.requestId());
1902  const auto session_ptr = get_session_ptr(request_info.sessionId());
1903  const auto stdlog = STDLOG(session_ptr);
1904  const auto current_user = session_ptr->get_currentUser();
1905  if (!current_user.isSuper) {
1906  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1907  user && current_user.userName != granteeName) {
1908  THROW_DB_EXCEPTION("Only super users can check other user's roles.");
1909  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1910  current_user.userName, granteeName, true)) {
1912  "Only super users can check roles assignment that have not been directly "
1913  "granted to a user.");
1914  }
1915  }
1916  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1917 }
1918 
1919 static TDBObject serialize_db_object(const std::string& roleName,
1920  const DBObject& inObject) {
1921  TDBObject outObject;
1922  outObject.objectName = inObject.getName();
1923  outObject.grantee = roleName;
1924  outObject.objectId = inObject.getObjectKey().objectId;
1925  const auto ap = inObject.getPrivileges();
1926  switch (inObject.getObjectKey().permissionType) {
1927  case DatabaseDBObjectType:
1928  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1929  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1930  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1931  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1932  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1933 
1934  break;
1935  case TableDBObjectType:
1936  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1937  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1938  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1939  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1940  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1941  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1942  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1943  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1944  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1945 
1946  break;
1947  case DashboardDBObjectType:
1948  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1949  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1950  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1951  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1952  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1953 
1954  break;
1955  case ViewDBObjectType:
1956  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1957  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1958  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1959  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1960  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1961  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1962  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1963 
1964  break;
1965  case ServerDBObjectType:
1966  outObject.privilegeObjectType = TDBObjectType::ServerDBObjectType;
1967  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::CREATE_SERVER));
1968  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::DROP_SERVER));
1969  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::ALTER_SERVER));
1970  outObject.privs.push_back(ap.hasPermission(ServerPrivileges::SERVER_USAGE));
1971 
1972  break;
1973  default:
1974  CHECK(false);
1975  }
1976  const int type_val = static_cast<int>(inObject.getType());
1977  CHECK(type_val >= 0 && type_val < 6);
1978  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1979  return outObject;
1980 }
1981 
1983  const TDBObjectPermissions& permissions) {
1984  if (!permissions.__isset.database_permissions_) {
1985  THROW_DB_EXCEPTION("Database permissions not set for check.")
1986  }
1987  auto perms = permissions.database_permissions_;
1988  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1989  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1990  (perms.view_sql_editor_ &&
1992  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1993  return false;
1994  } else {
1995  return true;
1996  }
1997 }
1998 
2000  const TDBObjectPermissions& permissions) {
2001  if (!permissions.__isset.table_permissions_) {
2002  THROW_DB_EXCEPTION("Table permissions not set for check.")
2003  }
2004  auto perms = permissions.table_permissions_;
2005  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
2006  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
2007  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
2008  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
2009  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
2010  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
2011  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
2012  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
2013  return false;
2014  } else {
2015  return true;
2016  }
2017 }
2018 
2020  const TDBObjectPermissions& permissions) {
2021  if (!permissions.__isset.dashboard_permissions_) {
2022  THROW_DB_EXCEPTION("Dashboard permissions not set for check.")
2023  }
2024  auto perms = permissions.dashboard_permissions_;
2025  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
2026  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
2027  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
2028  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
2029  return false;
2030  } else {
2031  return true;
2032  }
2033 }
2034 
2036  const TDBObjectPermissions& permissions) {
2037  if (!permissions.__isset.view_permissions_) {
2038  THROW_DB_EXCEPTION("View permissions not set for check.")
2039  }
2040  auto perms = permissions.view_permissions_;
2041  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
2042  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
2043  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
2044  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
2045  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
2046  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
2047  return false;
2048  } else {
2049  return true;
2050  }
2051 }
2052 
2054  const TDBObjectPermissions& permissions) {
2055  CHECK(permissions.__isset.server_permissions_);
2056  auto perms = permissions.server_permissions_;
2057  if ((perms.create_ && !privs.hasPermission(ServerPrivileges::CREATE_SERVER)) ||
2058  (perms.drop_ && !privs.hasPermission(ServerPrivileges::DROP_SERVER)) ||
2059  (perms.alter_ && !privs.hasPermission(ServerPrivileges::ALTER_SERVER)) ||
2060  (perms.usage_ && !privs.hasPermission(ServerPrivileges::SERVER_USAGE))) {
2061  return false;
2062  } else {
2063  return true;
2064  }
2065 }
2066 
2067 bool DBHandler::has_object_privilege(const TSessionId& session_id_or_json,
2068  const std::string& granteeName,
2069  const std::string& objectName,
2070  const TDBObjectType::type objectType,
2071  const TDBObjectPermissions& permissions) {
2072  heavyai::RequestInfo const request_info(session_id_or_json);
2073  SET_REQUEST_ID(request_info.requestId());
2074  auto session_ptr = get_session_ptr(request_info.sessionId());
2075  auto stdlog = STDLOG(session_ptr);
2076  auto const& cat = session_ptr->getCatalog();
2077  auto const& current_user = session_ptr->get_currentUser();
2078  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
2079  current_user.userName, granteeName, false)) {
2081  "Users except superusers can only check privileges for self or roles granted "
2082  "to "
2083  "them.")
2084  }
2086  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
2087  user_meta.isSuper) {
2088  return true;
2089  }
2090  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
2091  if (!grnt) {
2092  THROW_DB_EXCEPTION("User or Role " + granteeName + " does not exist.")
2093  }
2095  std::string func_name;
2096  switch (objectType) {
2099  func_name = "database";
2100  break;
2103  func_name = "table";
2104  break;
2107  func_name = "dashboard";
2108  break;
2111  func_name = "view";
2112  break;
2115  func_name = "server";
2116  break;
2117  default:
2118  THROW_DB_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
2119  }
2120  DBObject req_object(objectName, type);
2121  req_object.loadKey(cat);
2122 
2123  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
2124  if (grantee_object) {
2125  // if grantee has privs on the object
2126  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
2127  } else {
2128  // no privileges on that object
2129  return false;
2130  }
2131 }
2132 
2133 void DBHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
2134  const TSessionId& session_id_or_json,
2135  const std::string& roleName) {
2136  heavyai::RequestInfo const request_info(session_id_or_json);
2137  SET_REQUEST_ID(request_info.requestId());
2138  auto session_ptr = get_session_ptr(request_info.sessionId());
2139  auto stdlog = STDLOG(session_ptr);
2140  auto const& user = session_ptr->get_currentUser();
2141  if (!user.isSuper &&
2142  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
2143  return;
2144  }
2145  auto* rl = SysCatalog::instance().getGrantee(roleName);
2146  if (rl) {
2147  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
2148  for (auto& dbObject : *rl->getDbObjects(true)) {
2149  if (dbObject.first.dbId != dbId) {
2150  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
2151  // usecase for now, though)
2152  continue;
2153  }
2154  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
2155  TDBObjectsForRole.push_back(tdbObject);
2156  }
2157  } else {
2158  THROW_DB_EXCEPTION("User or role " + roleName + " does not exist.");
2159  }
2160 }
2161 
2162 void DBHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
2163  const TSessionId& session_id_or_json,
2164  const std::string& objectName,
2165  const TDBObjectType::type type) {
2166  heavyai::RequestInfo const request_info(session_id_or_json);
2167  SET_REQUEST_ID(request_info.requestId());
2168  auto session_ptr = get_session_ptr(request_info.sessionId());
2169  auto stdlog = STDLOG(session_ptr);
2170  const auto& cat = session_ptr->getCatalog();
2171  DBObjectType object_type;
2172  switch (type) {
2174  object_type = DBObjectType::DatabaseDBObjectType;
2175  break;
2177  object_type = DBObjectType::TableDBObjectType;
2178  break;
2181  break;
2183  object_type = DBObjectType::ViewDBObjectType;
2184  break;
2186  object_type = DBObjectType::ServerDBObjectType;
2187  break;
2188  default:
2189  THROW_DB_EXCEPTION("Failed to get object privileges for " + objectName +
2190  ": unknown object type (" + std::to_string(type) + ").");
2191  }
2192  DBObject object_to_find(objectName, object_type);
2193 
2194  // TODO(adb): Use DatabaseLock to protect method
2195  try {
2196  if (object_type == DashboardDBObjectType) {
2197  if (objectName == "") {
2198  object_to_find = DBObject(-1, object_type);
2199  } else {
2200  object_to_find = DBObject(std::stoi(objectName), object_type);
2201  }
2202  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
2203  !objectName.empty()) {
2204  // special handling for view / table
2205  auto td = cat.getMetadataForTable(objectName, false);
2206  if (td) {
2207  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
2208  object_to_find = DBObject(objectName, object_type);
2209  }
2210  }
2211  object_to_find.loadKey(cat);
2212  } catch (const std::exception&) {
2213  THROW_DB_EXCEPTION("Object with name " + objectName + " does not exist.");
2214  }
2215 
2216  // object type on database level
2217  DBObject object_to_find_dblevel("", object_type);
2218  object_to_find_dblevel.loadKey(cat);
2219  // if user is superuser respond with a full priv
2220  if (session_ptr->get_currentUser().isSuper) {
2221  // using ALL_TABLE here to set max permissions
2222  DBObject dbObj{object_to_find.getObjectKey(),
2224  session_ptr->get_currentUser().userId};
2225  dbObj.setName("super");
2226  TDBObjects.push_back(
2227  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
2228  };
2229 
2230  std::vector<std::string> grantees =
2231  SysCatalog::instance().getRoles(true,
2232  session_ptr->get_currentUser().isSuper,
2233  session_ptr->get_currentUser().userName);
2234  for (const auto& grantee : grantees) {
2235  DBObject* object_found;
2236  auto* gr = SysCatalog::instance().getGrantee(grantee);
2237  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
2238  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2239  }
2240  // check object permissions on Database level
2241  if (gr &&
2242  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
2243  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
2244  }
2245  }
2246 }
2247 
2249  std::shared_ptr<Catalog_Namespace::SessionInfo const> session_ptr,
2250  std::vector<std::string>& roles,
2251  const std::string& granteeName,
2252  bool effective) {
2253  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
2254  if (grantee) {
2255  if (session_ptr->get_currentUser().isSuper) {
2256  roles = grantee->getRoles(/*only_direct=*/!effective);
2257  } else if (grantee->isUser()) {
2258  if (session_ptr->get_currentUser().userName == granteeName) {
2259  roles = grantee->getRoles(/*only_direct=*/!effective);
2260  } else {
2262  "Only a superuser is authorized to request list of roles granted to another "
2263  "user.");
2264  }
2265  } else {
2266  CHECK(!grantee->isUser());
2267  // granteeName is actually a roleName here and we can check a role
2268  // only if it is granted to us
2269  if (SysCatalog::instance().isRoleGrantedToGrantee(
2270  session_ptr->get_currentUser().userName, granteeName, false)) {
2271  roles = grantee->getRoles(/*only_direct=*/!effective);
2272  } else {
2273  THROW_DB_EXCEPTION("A user can check only roles granted to him.");
2274  }
2275  }
2276  } else {
2277  THROW_DB_EXCEPTION("Grantee " + granteeName + " does not exist.");
2278  }
2279 }
2280 
2281 void DBHandler::get_all_roles_for_user(std::vector<std::string>& roles,
2282  const TSessionId& session_id_or_json,
2283  const std::string& granteeName) {
2284  // WARNING: This function only returns directly granted roles.
2285  // See also: get_all_effective_roles_for_user() for all of a user's roles.
2286  heavyai::RequestInfo const request_info(session_id_or_json);
2287  SET_REQUEST_ID(request_info.requestId());
2288  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2289  auto session_ptr = stdlog.getConstSessionInfo();
2290  getAllRolesForUserImpl(session_ptr, roles, granteeName, /*effective=*/false);
2291 }
2292 
2293 void DBHandler::get_all_effective_roles_for_user(std::vector<std::string>& roles,
2294  const TSessionId& session_id_or_json,
2295  const std::string& granteeName) {
2296  heavyai::RequestInfo const request_info(session_id_or_json);
2297  SET_REQUEST_ID(request_info.requestId());
2298  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2299  auto session_ptr = stdlog.getConstSessionInfo();
2300  getAllRolesForUserImpl(session_ptr, roles, granteeName, /*effective=*/true);
2301 }
2302 
2303 namespace {
2305  const std::map<std::string, std::vector<std::string>>& table_col_names) {
2306  std::ostringstream oss;
2307  for (const auto& [table_name, col_names] : table_col_names) {
2308  oss << ":" << table_name;
2309  for (const auto& col_name : col_names) {
2310  oss << "," << col_name;
2311  }
2312  }
2313  return oss.str();
2314 }
2315 } // namespace
2316 
2318  TPixelTableRowResult& _return,
2319  const TSessionId& session_id_or_json,
2320  const int64_t widget_id,
2321  const TPixel& pixel,
2322  const std::map<std::string, std::vector<std::string>>& table_col_names,
2323  const bool column_format,
2324  const int32_t pixel_radius,
2325  const std::string& nonce) {
2326  heavyai::RequestInfo const request_info(session_id_or_json);
2327  SET_REQUEST_ID(request_info.requestId());
2328  auto session_ptr = get_session_ptr(request_info.sessionId());
2329  auto stdlog = STDLOG(session_ptr,
2330  "widget_id",
2331  widget_id,
2332  "pixel.x",
2333  pixel.x,
2334  "pixel.y",
2335  pixel.y,
2336  "column_format",
2337  column_format,
2338  "pixel_radius",
2339  pixel_radius,
2340  "table_col_names",
2341  dump_table_col_names(table_col_names),
2342  "nonce",
2343  nonce);
2344  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2345  if (!render_handler_) {
2346  THROW_DB_EXCEPTION("Backend rendering is disabled.");
2347  }
2348 
2349  try {
2350  render_handler_->get_result_row_for_pixel(_return,
2351  session_ptr,
2352  widget_id,
2353  pixel,
2354  table_col_names,
2355  column_format,
2356  pixel_radius,
2357  nonce);
2358  } catch (std::exception& e) {
2359  THROW_DB_EXCEPTION(e.what());
2360  }
2361 }
2362 
2364  const ColumnDescriptor* cd) {
2365  TColumnType col_type;
2366  col_type.col_name = cd->columnName;
2367  col_type.src_name = cd->sourceName;
2368  col_type.col_id = cd->columnId;
2369  col_type.col_type.type = type_to_thrift(cd->columnType);
2370  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
2371  col_type.col_type.nullable = !cd->columnType.get_notnull();
2372  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
2373  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
2374  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
2375  }
2376  if (IS_GEO(cd->columnType.get_type())) {
2378  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
2379  } else {
2380  col_type.col_type.precision = cd->columnType.get_precision();
2381  col_type.col_type.scale = cd->columnType.get_scale();
2382  }
2383  col_type.is_system = cd->isSystemCol;
2385  cat != nullptr) {
2386  // have to get the actual size of the encoding from the dictionary definition
2387  const int dict_id = cd->columnType.get_comp_param();
2388  if (!cat->getMetadataForDict(dict_id, false)) {
2389  col_type.col_type.comp_param = 0;
2390  return col_type;
2391  }
2392  auto dd = cat->getMetadataForDict(dict_id, false);
2393  if (!dd) {
2394  THROW_DB_EXCEPTION("Dictionary doesn't exist");
2395  }
2396  col_type.col_type.comp_param = dd->dictNBits;
2397  } else {
2398  col_type.col_type.comp_param =
2399  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
2400  ? 32
2401  : cd->columnType.get_comp_param();
2402  }
2403  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
2404  if (cd->default_value.has_value()) {
2405  col_type.__set_default_value(cd->getDefaultValueLiteral());
2406  }
2407  return col_type;
2408 }
2409 
2410 void DBHandler::get_internal_table_details(TTableDetails& _return,
2411  const TSessionId& session_id_or_json,
2412  const std::string& table_name,
2413  const bool include_system_columns) {
2414  heavyai::RequestInfo const request_info(session_id_or_json);
2415  SET_REQUEST_ID(request_info.requestId());
2416  auto stdlog =
2417  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2418  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2419  get_table_details_impl(_return, stdlog, table_name, include_system_columns, false);
2420 }
2421 
2423  TTableDetails& _return,
2424  const TSessionId& session_id_or_json,
2425  const std::string& table_name,
2426  const std::string& database_name) {
2427  heavyai::RequestInfo const request_info(session_id_or_json);
2428  SET_REQUEST_ID(request_info.requestId());
2429  auto stdlog =
2430  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2431  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2432  get_table_details_impl(_return, stdlog, table_name, true, false, database_name);
2433 }
2434 
2435 void DBHandler::get_table_details(TTableDetails& _return,
2436  const TSessionId& session_id_or_json,
2437  const std::string& table_name) {
2438  heavyai::RequestInfo const request_info(session_id_or_json);
2439  SET_REQUEST_ID(request_info.requestId());
2440  auto stdlog =
2441  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2442  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2443 
2444  auto execute_read_lock = legacylockmgr::getExecuteReadLock();
2445  get_table_details_impl(_return, stdlog, table_name, false, false);
2446 }
2447 
2448 void DBHandler::get_table_details_for_database(TTableDetails& _return,
2449  const TSessionId& session_id_or_json,
2450  const std::string& table_name,
2451  const std::string& database_name) {
2452  heavyai::RequestInfo const request_info(session_id_or_json);
2453  SET_REQUEST_ID(request_info.requestId());
2454  auto stdlog =
2455  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
2456  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2457 
2458  auto execute_read_lock = legacylockmgr::getExecuteReadLock();
2459  get_table_details_impl(_return, stdlog, table_name, false, false, database_name);
2460 }
2461 
2462 namespace {
2463 TTableRefreshInfo get_refresh_info(const TableDescriptor* td) {
2464  CHECK(td->isForeignTable());
2465  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
2466  CHECK(foreign_table);
2467  TTableRefreshInfo refresh_info;
2468  const auto& update_type =
2470  CHECK(update_type.has_value());
2471  if (update_type.value() == foreign_storage::ForeignTable::ALL_REFRESH_UPDATE_TYPE) {
2472  refresh_info.update_type = TTableRefreshUpdateType::ALL;
2473  } else if (update_type.value() ==
2475  refresh_info.update_type = TTableRefreshUpdateType::APPEND;
2476  } else {
2477  UNREACHABLE() << "Unexpected refresh update type: " << update_type.value();
2478  }
2479 
2480  const auto& timing_type =
2482  CHECK(timing_type.has_value());
2483  if (timing_type.value() == foreign_storage::ForeignTable::MANUAL_REFRESH_TIMING_TYPE) {
2484  refresh_info.timing_type = TTableRefreshTimingType::MANUAL;
2485  refresh_info.interval_count = -1;
2486  } else if (timing_type.value() ==
2488  refresh_info.timing_type = TTableRefreshTimingType::SCHEDULED;
2489  const auto& start_date_time = foreign_table->getOption(
2491  CHECK(start_date_time.has_value());
2492  auto start_date_time_epoch = dateTimeParse<kTIMESTAMP>(start_date_time.value(), 0);
2493  refresh_info.start_date_time =
2494  shared::convert_temporal_to_iso_format({kTIMESTAMP}, start_date_time_epoch);
2495  const auto& interval =
2496  foreign_table->getOption(foreign_storage::ForeignTable::REFRESH_INTERVAL_KEY);
2497  CHECK(interval.has_value());
2498  const auto& interval_str = interval.value();
2499  refresh_info.interval_count =
2500  std::stoi(interval_str.substr(0, interval_str.length() - 1));
2501  auto interval_type = std::toupper(interval_str[interval_str.length() - 1]);
2502  if (interval_type == 'H') {
2503  refresh_info.interval_type = TTableRefreshIntervalType::HOUR;
2504  } else if (interval_type == 'D') {
2505  refresh_info.interval_type = TTableRefreshIntervalType::DAY;
2506  } else if (interval_type == 'S') {
2507  // This use case is for development only.
2508  refresh_info.interval_type = TTableRefreshIntervalType::NONE;
2509  } else {
2510  UNREACHABLE() << "Unexpected interval type: " << interval_str;
2511  }
2512  } else {
2513  UNREACHABLE() << "Unexpected refresh timing type: " << timing_type.value();
2514  }
2515  if (foreign_table->last_refresh_time !=
2517  refresh_info.last_refresh_time = shared::convert_temporal_to_iso_format(
2518  {kTIMESTAMP}, foreign_table->last_refresh_time);
2519  }
2520  if (foreign_table->next_refresh_time !=
2522  refresh_info.next_refresh_time = shared::convert_temporal_to_iso_format(
2523  {kTIMESTAMP}, foreign_table->next_refresh_time);
2524  }
2525  return refresh_info;
2526 }
2527 } // namespace
2528 
2529 void DBHandler::get_table_details_impl(TTableDetails& _return,
2530  query_state::StdLog& stdlog,
2531  const std::string& table_name,
2532  const bool get_system,
2533  const bool get_physical,
2534  const std::string& database_name) {
2535  try {
2536  auto session_info = stdlog.getSessionInfo();
2537  auto cat = (database_name.empty())
2538  ? &session_info->getCatalog()
2539  : SysCatalog::instance().getCatalog(database_name).get();
2540  if (!cat) {
2541  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2542  }
2543  const auto td_with_lock =
2545  *cat, table_name, false);
2546  const auto td = td_with_lock();
2547  CHECK(td);
2548 
2549  bool have_privileges_on_view_sources = true;
2550  if (td->isView) {
2551  auto query_state = create_query_state(session_info, td->viewSQL);
2552  stdlog.setQueryState(query_state);
2553  try {
2554  if (hasTableAccessPrivileges(td, *session_info)) {
2555  const auto [query_ra, locks] = parse_to_ra(query_state->createQueryStateProxy(),
2556  query_state->getQueryStr(),
2557  {},
2558  true,
2560  false);
2561  try {
2562  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
2563  query_ra);
2564  } catch (const std::runtime_error&) {
2565  have_privileges_on_view_sources = false;
2566  }
2567 
2568  _return.row_desc =
2569  validateRelAlg(query_ra.plan_result, query_state->createQueryStateProxy());
2570  } else {
2571  throw std::runtime_error(
2572  "Unable to access view " + table_name +
2573  ". The view may not exist, or the logged in user may not "
2574  "have permission to access the view.");
2575  }
2576  } catch (const std::exception& e) {
2577  throw std::runtime_error("View '" + table_name +
2578  "' query has failed with an error: '" +
2579  std::string(e.what()) +
2580  "'.\nThe view must be dropped and re-created to "
2581  "resolve the error. \nQuery:\n" +
2582  query_state->getQueryStr());
2583  }
2584  } else {
2585  if (hasTableAccessPrivileges(td, *session_info)) {
2586  const auto col_descriptors = cat->getAllColumnMetadataForTable(
2587  td->tableId, get_system, true, get_physical);
2588  const auto deleted_cd = cat->getDeletedColumn(td);
2589  for (const auto cd : col_descriptors) {
2590  if (cd == deleted_cd) {
2591  continue;
2592  }
2593  _return.row_desc.push_back(populateThriftColumnType(cat, cd));
2594  }
2595  } else {
2596  throw std::runtime_error(
2597  "Unable to access table " + table_name +
2598  ". The table may not exist, or the logged in user may not "
2599  "have permission to access the table.");
2600  }
2601  }
2602  _return.fragment_size = td->maxFragRows;
2603  _return.page_size = td->fragPageSize;
2604  _return.max_rows = td->maxRows;
2605  _return.view_sql =
2606  (have_privileges_on_view_sources ? td->viewSQL
2607  : "[Not enough privileges to see the view SQL]");
2608  _return.shard_count = td->nShards * std::max(g_leaf_count, size_t(1));
2609  if (td->nShards > 0) {
2610  auto cd = cat->getMetadataForColumn(td->tableId, td->shardedColumnId);
2611  CHECK(cd);
2612  _return.sharded_column_name = cd->columnName;
2613  }
2614  _return.key_metainfo = td->keyMetainfo;
2615  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
2616  _return.partition_detail =
2617  td->partitions.empty()
2618  ? TPartitionDetail::DEFAULT
2619  : (table_is_replicated(td)
2620  ? TPartitionDetail::REPLICATED
2621  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
2622  : TPartitionDetail::OTHER));
2623  if (td->isView) {
2624  _return.table_type = TTableType::VIEW;
2625  } else if (td->isTemporaryTable()) {
2626  _return.table_type = TTableType::TEMPORARY;
2627  } else if (td->isForeignTable()) {
2628  _return.table_type = TTableType::FOREIGN;
2629  _return.refresh_info = get_refresh_info(td);
2630  } else {
2631  _return.table_type = TTableType::DEFAULT;
2632  }
2633 
2634  } catch (const std::runtime_error& e) {
2635  THROW_DB_EXCEPTION(std::string(e.what()));
2636  }
2637 }
2638 
2639 void DBHandler::get_link_view(TFrontendView& _return,
2640  const TSessionId& session_id_or_json,
2641  const std::string& link) {
2642  heavyai::RequestInfo const request_info(session_id_or_json);
2643  SET_REQUEST_ID(request_info.requestId());
2644  auto session_ptr = get_session_ptr(request_info.sessionId());
2645  auto stdlog = STDLOG(session_ptr);
2646  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2647  auto const& cat = session_ptr->getCatalog();
2648  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
2649  if (!ld) {
2650  THROW_DB_EXCEPTION("Link " + link + " is not valid.");
2651  }
2652  _return.view_state = ld->viewState;
2653  _return.view_name = ld->link;
2654  _return.update_time = ld->updateTime;
2655  _return.view_metadata = ld->viewMetadata;
2656 }
2657 
2659  const TableDescriptor* td,
2660  const Catalog_Namespace::SessionInfo& session_info) {
2661  auto& cat = session_info.getCatalog();
2662  auto user_metadata = session_info.get_currentUser();
2663 
2664  if (user_metadata.isSuper) {
2665  return true;
2666  }
2667 
2669  dbObject.loadKey(cat);
2670  std::vector<DBObject> privObjects = {dbObject};
2671 
2672  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2673 }
2674 
2675 void DBHandler::get_tables_impl(std::vector<std::string>& table_names,
2676  const Catalog_Namespace::SessionInfo& session_info,
2677  const GetTablesType get_tables_type,
2678  const std::string& database_name) {
2679  if (database_name.empty()) {
2680  table_names = session_info.getCatalog().getTableNamesForUser(
2681  session_info.get_currentUser(), get_tables_type);
2682  } else {
2683  auto request_cat = SysCatalog::instance().getCatalog(database_name);
2684  if (!request_cat) {
2685  THROW_DB_EXCEPTION("Database " + database_name + " does not exist.");
2686  }
2687  table_names = request_cat->getTableNamesForUser(session_info.get_currentUser(),
2688  get_tables_type);
2689  }
2690 }
2691 
2692 void DBHandler::get_tables(std::vector<std::string>& table_names,
2693  const TSessionId& session_id_or_json) {
2694  heavyai::RequestInfo const request_info(session_id_or_json);
2695  SET_REQUEST_ID(request_info.requestId());
2696  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2697  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2699  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2700 }
2701 
2702 void DBHandler::get_tables_for_database(std::vector<std::string>& table_names,
2703  const TSessionId& session_id_or_json,
2704  const std::string& database_name) {
2705  heavyai::RequestInfo const request_info(session_id_or_json);
2706  SET_REQUEST_ID(request_info.requestId());
2707  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2708  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2709 
2710  get_tables_impl(table_names,
2711  *stdlog.getConstSessionInfo(),
2713  database_name);
2714 }
2715 
2716 void DBHandler::get_physical_tables(std::vector<std::string>& table_names,
2717  const TSessionId& session_id_or_json) {
2718  heavyai::RequestInfo const request_info(session_id_or_json);
2719  SET_REQUEST_ID(request_info.requestId());
2720  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2721  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2722  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2723 }
2724 
2725 void DBHandler::get_views(std::vector<std::string>& table_names,
2726  const TSessionId& session_id_or_json) {
2727  heavyai::RequestInfo const request_info(session_id_or_json);
2728  SET_REQUEST_ID(request_info.requestId());
2729  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2730  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2731  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2732 }
2733 
2734 void DBHandler::get_tables_meta_impl(std::vector<TTableMeta>& _return,
2735  QueryStateProxy query_state_proxy,
2736  const Catalog_Namespace::SessionInfo& session_info,
2737  const bool with_table_locks) {
2738  const auto& cat = session_info.getCatalog();
2739  // Get copies of table descriptors here in order to avoid possible use of dangling
2740  // pointers, if tables are concurrently dropped.
2741  const auto tables = cat.getAllTableMetadataCopy();
2742  _return.reserve(tables.size());
2743 
2744  for (const auto& td : tables) {
2745  if (td.shard >= 0) {
2746  // skip shards, they're not standalone tables
2747  continue;
2748  }
2749  if (!hasTableAccessPrivileges(&td, session_info)) {
2750  // skip table, as there are no privileges to access it
2751  continue;
2752  }
2753 
2754  TTableMeta ret;
2755  ret.table_name = td.tableName;
2756  ret.is_view = td.isView;
2757  ret.is_replicated = table_is_replicated(&td);
2758  ret.shard_count = td.nShards;
2759  ret.max_rows = td.maxRows;
2760  ret.table_id = td.tableId;
2761 
2762  std::vector<TTypeInfo> col_types;
2763  std::vector<std::string> col_names;
2764  size_t num_cols = 0;
2765  if (td.isView) {
2766  try {
2767  TPlanResult parse_result;
2769  std::tie(parse_result, locks) = parse_to_ra(
2770  query_state_proxy, td.viewSQL, {}, with_table_locks, system_parameters_);
2771  const auto query_ra = parse_result.plan_result;
2772 
2773  ExecutionResult ex_result;
2774  execute_rel_alg(ex_result,
2775  query_state_proxy,
2776  query_ra,
2777  true,
2779  -1,
2780  -1,
2781  /*just_validate=*/true,
2782  /*find_push_down_candidates=*/false,
2783  ExplainInfo());
2784  TQueryResult result;
2785  DBHandler::convertData(result, ex_result, query_state_proxy, true, -1, -1);
2786  num_cols = result.row_set.row_desc.size();
2787  for (const auto& col : result.row_set.row_desc) {
2788  if (col.is_physical) {
2789  num_cols--;
2790  continue;
2791  }
2792  col_types.push_back(col.col_type);
2793  col_names.push_back(col.col_name);
2794  }
2795  } catch (std::exception& e) {
2796  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td.tableName;
2797  }
2798  } else {
2799  try {
2800  if (hasTableAccessPrivileges(&td, session_info)) {
2801  const auto col_descriptors =
2802  cat.getAllColumnMetadataForTable(td.tableId, false, true, false);
2803  const auto deleted_cd = cat.getDeletedColumn(&td);
2804  for (const auto cd : col_descriptors) {
2805  if (cd == deleted_cd) {
2806  continue;
2807  }
2808  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2809  col_names.push_back(cd->columnName);
2810  }
2811  num_cols = col_descriptors.size();
2812  } else {
2813  continue;
2814  }
2815  } catch (const std::runtime_error& e) {
2816  THROW_DB_EXCEPTION(e.what());
2817  }
2818  }
2819 
2820  ret.num_cols = num_cols;
2821  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2822  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2823 
2824  _return.push_back(ret);
2825  }
2826 }
2827 
2828 void DBHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2829  const TSessionId& session_id_or_json) {
2830  heavyai::RequestInfo const request_info(session_id_or_json);
2831  SET_REQUEST_ID(request_info.requestId());
2832  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2833  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2834  auto session_ptr = stdlog.getConstSessionInfo();
2835  auto query_state = create_query_state(session_ptr, "");
2836  stdlog.setQueryState(query_state);
2837 
2838  auto execute_read_lock = legacylockmgr::getExecuteReadLock();
2839 
2840  try {
2841  get_tables_meta_impl(_return, query_state->createQueryStateProxy(), *session_ptr);
2842  } catch (const std::exception& e) {
2843  THROW_DB_EXCEPTION(e.what());
2844  }
2845 }
2846 
2847 void DBHandler::get_users(std::vector<std::string>& user_names,
2848  const TSessionId& session_id_or_json) {
2849  heavyai::RequestInfo const request_info(session_id_or_json);
2850  SET_REQUEST_ID(request_info.requestId());
2851  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2852  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2853  auto session_ptr = stdlog.getConstSessionInfo();
2854  std::list<Catalog_Namespace::UserMetadata> user_list;
2855 
2856  if (!session_ptr->get_currentUser().isSuper) {
2857  user_list = SysCatalog::instance().getAllUserMetadata(
2858  session_ptr->getCatalog().getCurrentDB().dbId);
2859  } else {
2860  user_list = SysCatalog::instance().getAllUserMetadata();
2861  }
2862  for (auto u : user_list) {
2863  user_names.push_back(u.userName);
2864  }
2865 }
2866 
2867 void DBHandler::get_version(std::string& version) {
2868  version = MAPD_RELEASE;
2869 }
2870 
2871 namespace {
2872 
2876  return [] {
2877  // we need to resume erm queue if we throw any exception
2878  // that heavydb server can handle w/o shutting it down
2880  };
2881  }
2882  return [] {};
2883 }
2884 
2885 } // namespace
2886 
2887 void DBHandler::clear_gpu_memory(const TSessionId& session_id_or_json) {
2888  heavyai::RequestInfo const request_info(session_id_or_json);
2889  SET_REQUEST_ID(request_info.requestId());
2890  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2891  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2892  auto session_ptr = stdlog.getConstSessionInfo();
2893  if (!session_ptr->get_currentUser().isSuper) {
2894  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2895  }
2897  // clear renderer memory first
2898  // this will block until any running render finishes
2899  if (render_handler_) {
2900  render_handler_->clear_gpu_memory();
2901  }
2902  // then clear the QE memory
2903  // the renderer will have disconnected from any QE memory
2904  try {
2906  } catch (const std::exception& e) {
2907  THROW_DB_EXCEPTION(e.what());
2908  }
2909 }
2910 
2911 void DBHandler::clear_cpu_memory(const TSessionId& session_id_or_json) {
2912  heavyai::RequestInfo const request_info(session_id_or_json);
2913  SET_REQUEST_ID(request_info.requestId());
2914  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2915  auto session_ptr = stdlog.getConstSessionInfo();
2916  if (!session_ptr->get_currentUser().isSuper) {
2917  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2918  }
2920  // clear renderer memory first
2921  // this will block until any running render finishes
2922  if (render_handler_) {
2923  render_handler_->clear_cpu_memory();
2924  }
2925  // then clear the QE memory
2926  // the renderer will have disconnected from any QE memory
2927  try {
2929  } catch (const std::exception& e) {
2930  THROW_DB_EXCEPTION(e.what());
2931  }
2932 }
2933 
2934 void DBHandler::clearRenderMemory(const TSessionId& session_id_or_json) {
2935  heavyai::RequestInfo const request_info(session_id_or_json);
2936  SET_REQUEST_ID(request_info.requestId());
2937  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
2938  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2939  auto session_ptr = stdlog.getConstSessionInfo();
2940  if (!session_ptr->get_currentUser().isSuper) {
2941  THROW_DB_EXCEPTION("Superuser privilege is required to run clear_render_memory");
2942  }
2943  if (render_handler_) {
2945  render_handler_->clear_cpu_memory();
2946  render_handler_->clear_gpu_memory();
2947  }
2948 }
2949 
2950 void DBHandler::pause_executor_queue(const TSessionId& session) {
2951  auto stdlog = STDLOG(get_session_ptr(session));
2952  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2953  auto session_ptr = stdlog.getConstSessionInfo();
2954  if (!session_ptr->get_currentUser().isSuper) {
2955  THROW_DB_EXCEPTION("Superuser privilege is required to run PAUSE EXECUTOR QUEUE");
2956  }
2957  try {
2959  } catch (const std::exception& e) {
2960  THROW_DB_EXCEPTION(e.what());
2961  }
2962 }
2963 
2964 void DBHandler::resume_executor_queue(const TSessionId& session) {
2965  auto stdlog = STDLOG(get_session_ptr(session));
2966  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2967  auto session_ptr = stdlog.getConstSessionInfo();
2968  if (!session_ptr->get_currentUser().isSuper) {
2969  THROW_DB_EXCEPTION("Superuser privilege is required to run RESUME EXECUTOR QUEUE");
2970  }
2971  try {
2973  } catch (const std::exception& e) {
2974  THROW_DB_EXCEPTION(e.what());
2975  }
2976 }
2977 
2978 void DBHandler::set_cur_session(const TSessionId& parent_session_id_or_json,
2979  const TSessionId& leaf_session_id_or_json,
2980  const std::string& start_time_str,
2981  const std::string& label,
2982  bool for_running_query_kernel) {
2983  // internal API to manage query interruption in distributed mode
2984  heavyai::RequestInfo const parent_request_info(parent_session_id_or_json);
2985  heavyai::RequestInfo const leaf_request_info(leaf_session_id_or_json);
2986  SET_REQUEST_ID(leaf_request_info.requestId());
2987  auto stdlog = STDLOG(get_session_ptr(leaf_request_info.sessionId()));
2988  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2989  auto session_ptr = stdlog.getConstSessionInfo();
2990 
2992  executor->enrollQuerySession(parent_request_info.sessionId(),
2993  label,
2994  start_time_str,
2996  for_running_query_kernel
2997  ? QuerySessionStatus::QueryStatus::RUNNING_QUERY_KERNEL
2998  : QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
2999 }
3000 
3001 void DBHandler::invalidate_cur_session(const TSessionId& parent_session_id_or_json,
3002  const TSessionId& leaf_session_id_or_json,
3003  const std::string& start_time_str,
3004  const std::string& label,
3005  bool for_running_query_kernel) {
3006  // internal API to manage query interruption in distributed mode
3007  heavyai::RequestInfo const parent_request_info(parent_session_id_or_json);
3008  heavyai::RequestInfo const leaf_request_info(leaf_session_id_or_json);
3009  SET_REQUEST_ID(leaf_request_info.requestId());
3010  auto stdlog = STDLOG(get_session_ptr(leaf_request_info.sessionId()));
3011  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3013  executor->clearQuerySessionStatus(parent_request_info.sessionId(), start_time_str);
3014 }
3015 
3017  return INVALID_SESSION_ID;
3018 }
3019 
3020 void DBHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
3021  const TSessionId& session_id_or_json,
3022  const std::string& memory_level) {
3023  heavyai::RequestInfo const request_info(session_id_or_json);
3024  SET_REQUEST_ID(request_info.requestId());
3025  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
3026  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3027  std::vector<Data_Namespace::MemoryInfo> internal_memory;
3028  if (!memory_level.compare("gpu")) {
3029  internal_memory =
3030  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
3031  } else {
3032  internal_memory =
3033  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
3034  }
3035 
3036  for (auto memInfo : internal_memory) {
3037  TNodeMemoryInfo nodeInfo;
3038  nodeInfo.page_size = memInfo.pageSize;
3039  nodeInfo.max_num_pages = memInfo.maxNumPages;
3040  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
3041  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
3042  for (auto gpu : memInfo.nodeMemoryData) {
3043  TMemoryData md;
3044  md.slab = gpu.slabNum;
3045  md.start_page = gpu.startPage;
3046  md.num_pages = gpu.numPages;
3047  md.touch = gpu.touch;
3048  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
3049  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
3050  nodeInfo.node_memory_data.push_back(md);
3051  }
3052  _return.push_back(nodeInfo);
3053  }
3054 }
3055 
3056 void DBHandler::get_databases(std::vector<TDBInfo>& dbinfos,
3057  const TSessionId& session_id_or_json) {
3058  heavyai::RequestInfo const request_info(session_id_or_json);
3059  SET_REQUEST_ID(request_info.requestId());
3060  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
3061  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3062  auto session_ptr = stdlog.getConstSessionInfo();
3063  const auto& user = session_ptr->get_currentUser();
3065  SysCatalog::instance().getDatabaseListForUser(user);
3066  for (auto& db : dbs) {
3067  TDBInfo dbinfo;
3068  dbinfo.db_name = std::move(db.dbName);
3069  dbinfo.db_owner = std::move(db.dbOwnerName);
3070  dbinfos.push_back(std::move(dbinfo));
3071  }
3072 }
3073 
3074 TExecuteMode::type DBHandler::getExecutionMode(const TSessionId& session_id) {
3075  auto executor = get_session_ptr(session_id)->get_executor_device_type();
3076  switch (executor) {
3078  return TExecuteMode::CPU;
3080  return TExecuteMode::GPU;
3081  default:
3082  UNREACHABLE();
3083  }
3084  UNREACHABLE();
3085  return TExecuteMode::CPU;
3086 }
3087 void DBHandler::set_execution_mode(const TSessionId& session_id_or_json,
3088  const TExecuteMode::type mode) {
3089  heavyai::RequestInfo const request_info(session_id_or_json);
3090  SET_REQUEST_ID(request_info.requestId());
3091  auto session_ptr = get_session_ptr(request_info.sessionId());
3092  auto stdlog = STDLOG(session_ptr);
3093  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3094  DBHandler::set_execution_mode_nolock(session_ptr.get(), mode);
3095 }
3096 
3097 namespace {
3098 
3100  if (td && td->nShards) {
3101  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
3102  }
3103 }
3104 
3105 void check_valid_column_names(const std::list<const ColumnDescriptor*>& descs,
3106  const std::vector<std::string>& column_names) {
3107  std::unordered_set<std::string> unique_names;
3108  for (const auto& name : column_names) {
3109  auto lower_name = to_lower(name);
3110  if (unique_names.find(lower_name) != unique_names.end()) {
3111  THROW_DB_EXCEPTION("Column " + name + " is mentioned multiple times");
3112  } else {
3113  unique_names.insert(lower_name);
3114  }
3115  }
3116  for (const auto& cd : descs) {
3117  auto iter = unique_names.find(to_lower(cd->columnName));
3118  if (iter != unique_names.end()) {
3119  unique_names.erase(iter);
3120  }
3121  }
3122  if (!unique_names.empty()) {
3123  THROW_DB_EXCEPTION("Column " + *unique_names.begin() + " does not exist");
3124  }
3125 }
3126 
3127 // Return vector of IDs mapping column descriptors to the list of comumn names.
3128 // The size of the vector is the number of actual columns (geophisical columns excluded).
3129 // ID is either a position in column_names matching the descriptor, or -1 if the column
3130 // is missing from the column_names
3131 std::vector<int> column_ids_by_names(const std::list<const ColumnDescriptor*>& descs,
3132  const std::vector<std::string>& column_names) {
3133  std::vector<int> desc_to_column_ids;
3134  if (column_names.empty()) {
3135  int col_idx = 0;
3136  for (const auto& cd : descs) {
3137  if (!cd->isGeoPhyCol) {
3138  desc_to_column_ids.push_back(col_idx);
3139  ++col_idx;
3140  }
3141  }
3142  } else {
3143  for (const auto& cd : descs) {
3144  if (!cd->isGeoPhyCol) {
3145  bool found = false;
3146  for (size_t j = 0; j < column_names.size(); ++j) {
3147  if (to_lower(cd->columnName) == to_lower(column_names[j])) {
3148  found = true;
3149  desc_to_column_ids.push_back(j);
3150  break;
3151  }
3152  }
3153  if (!found) {
3154  if (!cd->columnType.get_notnull()) {
3155  desc_to_column_ids.push_back(-1);
3156  } else {
3157  THROW_DB_EXCEPTION("Column '" + cd->columnName +
3158  "' cannot be omitted due to NOT NULL constraint");
3159  }
3160  }
3161  }
3162  }
3163  }
3164  return desc_to_column_ids;
3165 }
3166 
3168  std::ostringstream oss;
3169  oss << "Cache size information {";
3171  // 1. Data recycler
3172  // 1.a Resultset Recycler
3173  auto resultset_cache_size =
3174  executor->getResultSetRecyclerHolder()
3175  .getResultSetRecycler()
3176  ->getResultSetRecyclerMetricTracker()
3177  .getCurrentCacheSize(DataRecyclerUtil::CPU_DEVICE_IDENTIFIER);
3178  if (resultset_cache_size) {
3179  oss << "\"query_resultset\": " << *resultset_cache_size << " bytes, ";
3180  }
3181 
3182  // 1.b Join Hash Table Recycler
3183  auto perfect_join_ht_cache_size =
3186  auto baseline_join_ht_cache_size =
3189  auto bbox_intersect_ht_cache_size =
3193  auto bbox_intersect_ht_tuner_cache_size =
3197  auto sum_hash_table_cache_size =
3198  perfect_join_ht_cache_size + baseline_join_ht_cache_size +
3199  bbox_intersect_ht_cache_size + bbox_intersect_ht_tuner_cache_size;
3200  oss << "\"hash_tables\": " << sum_hash_table_cache_size << " bytes, ";
3201 
3202  // 1.c Chunk Metadata Recycler
3203  auto chunk_metadata_cache_size =
3204  executor->getResultSetRecyclerHolder()
3205  .getChunkMetadataRecycler()
3206  ->getCurrentCacheSizeForDevice(CacheItemType::CHUNK_METADATA,
3208  oss << "\"chunk_metadata\": " << chunk_metadata_cache_size << " bytes, ";
3209 
3210  // 2. Query Plan Dag
3211  auto query_plan_dag_cache_size =
3212  executor->getQueryPlanDagCache().getCurrentNodeMapSize();
3213  oss << "\"query_plan_dag\": " << query_plan_dag_cache_size << " bytes, ";
3214 
3215  // 3. Compiled (GPU) Code
3216  oss << "\"compiled_GPU code\": "
3217  << QueryEngine::getInstance()->gpu_code_accessor->getCacheSize() << " bytes, ";
3218 
3219  // 4. String Dictionary
3220  oss << "\"string_dictionary\": " << cat.getTotalMemorySizeForDictionariesForDatabase()
3221  << " bytes";
3222  oss << "}";
3223  LOG(INFO) << oss.str();
3224 }
3225 
3226 void log_system_cpu_memory_status(std::string const& query,
3229  std::ostringstream oss;
3230  oss << query << "\n" << cat.getDataMgr().getSystemMemoryUsage();
3231  LOG(INFO) << oss.str();
3232  log_cache_size(cat);
3233  }
3234 }
3235 } // namespace
3236 
3238  const TSessionId& session_id,
3239  const Catalog& catalog,
3240  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3241  const ColumnDescriptor* cd,
3242  size_t& col_idx,
3243  size_t num_rows,
3244  const std::string& table_name) {
3245  auto geo_col_idx = col_idx - 1;
3246  const auto wkt_or_wkb_hex_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
3247  std::vector<std::vector<double>> coords_column, bounds_column;
3248  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
3249  SQLTypeInfo ti = cd->columnType;
3250  const bool validate_with_geos_if_available = false;
3251  if (num_rows != wkt_or_wkb_hex_column->size() ||
3252  !Geospatial::GeoTypesFactory::getGeoColumns(wkt_or_wkb_hex_column,
3253  ti,
3254  coords_column,
3255  bounds_column,
3256  ring_sizes_column,
3257  poly_rings_column,
3258  validate_with_geos_if_available)) {
3259  std::ostringstream oss;
3260  oss << "Invalid geometry in column " << cd->columnName;
3261  THROW_DB_EXCEPTION(oss.str());
3262  }
3263 
3264  // Populate physical columns, advance col_idx
3266  cd,
3267  import_buffers,
3268  col_idx,
3269  coords_column,
3270  bounds_column,
3271  ring_sizes_column,
3272  poly_rings_column);
3273 }
3274 
3276  const TSessionId& session_id,
3277  const Catalog& catalog,
3278  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>& import_buffers,
3279  const std::list<const ColumnDescriptor*>& cds,
3280  const std::vector<int>& desc_id_to_column_id,
3281  size_t num_rows,
3282  const std::string& table_name) {
3283  size_t skip_physical_cols = 0;
3284  size_t col_idx = 0, import_idx = 0;
3285  for (const auto& cd : cds) {
3286  if (skip_physical_cols > 0) {
3287  CHECK(cd->isGeoPhyCol);
3288  skip_physical_cols--;
3289  continue;
3290  } else if (cd->columnType.is_geometry()) {
3291  skip_physical_cols = cd->columnType.get_physical_cols();
3292  }
3293  if (desc_id_to_column_id[import_idx] == -1) {
3294  import_buffers[col_idx]->addDefaultValues(cd, num_rows);
3295  col_idx++;
3296  if (cd->columnType.is_geometry()) {
3298  session_id, catalog, import_buffers, cd, col_idx, num_rows, table_name);
3299  }
3300  } else {
3301  col_idx++;
3302  col_idx += skip_physical_cols;
3303  }
3304  import_idx++;
3305  }
3306 }
3307 
3308 namespace {
3309 std::string get_load_tag(const std::string& load_tag, const std::string& table_name) {
3310  std::ostringstream oss;
3311  oss << load_tag << "(" << table_name << ")";
3312  return oss.str();
3313 }
3314 
3315 std::string get_import_tag(const std::string& import_tag,
3316  const std::string& table_name,
3317  const std::string& file_path) {
3318  std::ostringstream oss;
3319  oss << import_tag << "(" << table_name << ", file_path:" << file_path << ")";
3320  return oss.str();
3321 }
3322 } // namespace
3323 
3324 void DBHandler::load_table_binary(const TSessionId& session_id_or_json,
3325  const std::string& table_name,
3326  const std::vector<TRow>& rows,
3327  const std::vector<std::string>& column_names) {
3328  try {
3329  heavyai::RequestInfo const request_info(session_id_or_json);
3330  SET_REQUEST_ID(request_info.requestId());
3331  auto stdlog =
3332  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
3333  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3334  auto session_ptr = stdlog.getConstSessionInfo();
3335 
3336  if (rows.empty()) {
3337  THROW_DB_EXCEPTION("No rows to insert");
3338  }
3339 
3340  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
3341  std::unique_ptr<import_export::Loader> loader;
3342  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3343  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3344  table_name,
3345  rows.front().cols.size(),
3346  &loader,
3347  &import_buffers,
3348  column_names,
3349  "load_table_binary");
3350 
3351  auto col_descs = loader->get_column_descs();
3352  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3353 
3354  size_t rows_completed = 0;
3355  auto const load_tag = get_load_tag("load_table_binary", table_name);
3356  log_system_cpu_memory_status("start_" + load_tag, session_ptr->getCatalog());
3357  ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3358  log_system_cpu_memory_status("finish_" + load_tag, session_ptr->getCatalog());
3359  };
3360  for (auto const& row : rows) {
3361  size_t col_idx = 0;
3362  try {
3363  for (auto cd : col_descs) {
3364  auto mapped_idx = desc_id_to_column_id[col_idx];
3365  if (mapped_idx != -1) {
3366  import_buffers[col_idx]->add_value(
3367  cd, row.cols[mapped_idx], row.cols[mapped_idx].is_null);
3368  }
3369  col_idx++;
3370  }
3371  rows_completed++;
3372  } catch (const std::exception& e) {
3373  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
3374  import_buffers[col_idx_to_pop]->pop_value();
3375  }
3376  LOG(ERROR) << "Input exception thrown: " << e.what()
3377  << ". Row discarded, issue at column : " << (col_idx + 1)
3378  << " data :" << row;
3379  }
3380  }
3381  fillMissingBuffers(request_info.sessionId(),
3382  session_ptr->getCatalog(),
3383  import_buffers,
3384  col_descs,
3385  desc_id_to_column_id,
3386  rows_completed,
3387  table_name);
3388  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3389  session_ptr->getCatalog(), table_name);
3390  if (!loader->load(import_buffers, rows.size(), session_ptr.get())) {
3391  THROW_DB_EXCEPTION(loader->getErrorMessage());
3392  }
3393  } catch (const std::exception& e) {
3394  THROW_DB_EXCEPTION(std::string(e.what()));
3395  }
3396 }
3397 
3398 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
3400  const Catalog_Namespace::SessionInfo& session_info,
3401  const std::string& table_name,
3402  size_t num_cols,
3403  std::unique_ptr<import_export::Loader>* loader,
3404  std::vector<std::unique_ptr<import_export::TypedImportBuffer>>* import_buffers,
3405  const std::vector<std::string>& column_names,
3406  std::string load_type) {
3407  if (num_cols == 0) {
3408  THROW_DB_EXCEPTION("No columns to insert");
3409  }
3410  check_read_only(load_type);
3411  auto& cat = session_info.getCatalog();
3412  auto td_with_lock =
3413  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
3415  cat, table_name, true));
3416  const auto td = (*td_with_lock)();
3417  CHECK(td);
3418 
3419  if (g_cluster && !leaf_aggregator_.leafCount()) {
3420  // Sharded table rows need to be routed to the leaf by an aggregator.
3422  }
3423  check_table_load_privileges(session_info, table_name);
3424 
3425  loader->reset(new import_export::Loader(cat, td));
3426 
3427  auto col_descs = (*loader)->get_column_descs();
3428  check_valid_column_names(col_descs, column_names);
3429  if (column_names.empty()) {
3430  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
3431  // Subtracting 1 (rowid) until TableDescriptor is updated.
3432  auto geo_physical_cols = std::count_if(
3433  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
3434  const auto num_table_cols = static_cast<size_t>(td->nColumns) - geo_physical_cols -
3435  (td->hasDeletedCol ? 2 : 1);
3436  if (num_cols != num_table_cols) {
3437  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
3438  ") does not match number of columns in table " +
3439  td->tableName + " (" + std::to_string(num_table_cols) +
3440  ")");
3441  }
3442  } else if (num_cols != column_names.size()) {
3444  "Number of columns specified does not match the "
3445  "number of columns given (" +
3446  std::to_string(num_cols) + " vs " + std::to_string(column_names.size()) + ")");
3447  }
3448 
3449  *import_buffers = import_export::setup_column_loaders(td, loader->get());
3450  return std::move(td_with_lock);
3451 }
3452 namespace {
3453 
3454 size_t get_column_size(const TColumn& column) {
3455  if (!column.nulls.empty()) {
3456  return column.nulls.size();
3457  } else {
3458  // it is a very bold estimate but later we check it against REAL data
3459  // and if this function returns a wrong result (e.g. both int and string
3460  // vectors are filled with values), we get an error
3461  return column.data.int_col.size() + column.data.arr_col.size() +
3462  column.data.real_col.size() + column.data.str_col.size();
3463  }
3464 }
3465 
3466 } // namespace
3467 
3468 void DBHandler::load_table_binary_columnar(const TSessionId& session_id_or_json,
3469  const std::string& table_name,
3470  const std::vector<TColumn>& cols,
3471  const std::vector<std::string>& column_names) {
3472  heavyai::RequestInfo const request_info(session_id_or_json);
3473  SET_REQUEST_ID(request_info.requestId());
3474  auto stdlog =
3475  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
3476  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3477  auto session_ptr = stdlog.getConstSessionInfo();
3478 
3479  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
3480  std::unique_ptr<import_export::Loader> loader;
3481  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3482  auto schema_read_lock = prepare_loader_generic(*session_ptr,
3483  table_name,
3484  cols.size(),
3485  &loader,
3486  &import_buffers,
3487  column_names,
3488  "load_table_binary_columnar");
3489 
3490  auto desc_id_to_column_id =
3491  column_ids_by_names(loader->get_column_descs(), column_names);
3492  size_t num_rows = get_column_size(cols.front());
3493  size_t import_idx = 0; // index into the TColumn vector being loaded
3494  size_t col_idx = 0; // index into column description vector
3495  auto const load_tag = get_load_tag("load_table_binary_columnar", table_name);
3496  log_system_cpu_memory_status("start_" + load_tag, session_ptr->getCatalog());
3497  ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3498  log_system_cpu_memory_status("finish_" + load_tag, session_ptr->getCatalog());
3499  };
3500  try {
3501  size_t skip_physical_cols = 0;
3502  for (auto cd : loader->get_column_descs()) {
3503  if (skip_physical_cols > 0) {
3504  CHECK(cd->isGeoPhyCol);
3505  skip_physical_cols--;
3506  continue;
3507  }
3508  auto mapped_idx = desc_id_to_column_id[import_idx];
3509  if (mapped_idx != -1) {
3510  size_t col_rows = import_buffers[col_idx]->add_values(cd, cols[mapped_idx]);
3511  if (col_rows != num_rows) {
3512  std::ostringstream oss;
3513  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
3514  << cd->columnName << " , expecting " << num_rows << " rows, column "
3515  << col_idx << " has " << col_rows << " rows";
3516  THROW_DB_EXCEPTION(oss.str());
3517  }
3518  // Advance to the next column in the table
3519  col_idx++;
3520  // For geometry columns: process WKT strings and fill physical columns
3521  if (cd->columnType.is_geometry()) {
3522  fillGeoColumns(request_info.sessionId(),
3523  session_ptr->getCatalog(),
3524  import_buffers,
3525  cd,
3526  col_idx,
3527  num_rows,
3528  table_name);
3529  skip_physical_cols = cd->columnType.get_physical_cols();
3530  }
3531  } else {
3532  col_idx++;
3533  if (cd->columnType.is_geometry()) {
3534  skip_physical_cols = cd->columnType.get_physical_cols();
3535  col_idx += skip_physical_cols;
3536  }
3537  }
3538  // Advance to the next column of values being loaded
3539  import_idx++;
3540  }
3541  } catch (const std::exception& e) {
3542  std::ostringstream oss;
3543  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
3544  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3545  THROW_DB_EXCEPTION(oss.str());
3546  }
3547  fillMissingBuffers(request_info.sessionId(),
3548  session_ptr->getCatalog(),
3549  import_buffers,
3550  loader->get_column_descs(),
3551  desc_id_to_column_id,
3552  num_rows,
3553  table_name);
3554  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3555  session_ptr->getCatalog(), table_name);
3556  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3557  THROW_DB_EXCEPTION(loader->getErrorMessage());
3558  }
3559 }
3560 
3561 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
3562 
3563 #define ARROW_THRIFT_THROW_NOT_OK(s) \
3564  do { \
3565  ::arrow::Status _s = (s); \
3566  if (UNLIKELY(!_s.ok())) { \
3567  TDBException ex; \
3568  ex.error_msg = _s.ToString(); \
3569  LOG(ERROR) << s.ToString(); \
3570  throw ex; \
3571  } \
3572  } while (0)
3573 
3574 namespace {
3575 
3576 RecordBatchVector loadArrowStream(const std::string& stream) {
3577  RecordBatchVector batches;
3578  try {
3579  // TODO(wesm): Make this simpler in general, see ARROW-1600
3580  auto stream_buffer =
3581  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
3582  static_cast<int64_t>(stream.size()));
3583 
3584  arrow::io::BufferReader buf_reader(stream_buffer);
3585  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
3586  ARROW_ASSIGN_OR_THROW(batch_reader,
3587  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader));
3588 
3589  while (true) {
3590  std::shared_ptr<arrow::RecordBatch> batch;
3591  // Read batch (zero-copy) from the stream
3592  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
3593  if (batch == nullptr) {
3594  break;
3595  }
3596  batches.emplace_back(std::move(batch));
3597  }
3598  } catch (const std::exception& e) {
3599  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
3600  }
3601  return batches;
3602 }
3603 
3604 } // namespace
3605 
3606 void DBHandler::load_table_binary_arrow(const TSessionId& session_id_or_json,
3607  const std::string& table_name,
3608  const std::string& arrow_stream,
3609  const bool use_column_names) {
3610  heavyai::RequestInfo const request_info(session_id_or_json);
3611  SET_REQUEST_ID(request_info.requestId());
3612  auto stdlog =
3613  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
3614  auto session_ptr = stdlog.getConstSessionInfo();
3615 
3616  RecordBatchVector batches = loadArrowStream(arrow_stream);
3617  // Assuming have one batch for now
3618  if (batches.size() != 1) {
3619  THROW_DB_EXCEPTION("Expected a single Arrow record batch. Import aborted");
3620  }
3621 
3622  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
3623  std::unique_ptr<import_export::Loader> loader;
3624  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3625  std::vector<std::string> column_names;
3626  if (use_column_names) {
3627  column_names = batch->schema()->field_names();
3628  }
3629  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
3630  auto schema_read_lock =
3631  prepare_loader_generic(*session_ptr,
3632  table_name,
3633  static_cast<size_t>(batch->num_columns()),
3634  &loader,
3635  &import_buffers,
3636  column_names,
3637  "load_table_binary_arrow");
3638 
3639  auto desc_id_to_column_id =
3640  column_ids_by_names(loader->get_column_descs(), column_names);
3641  size_t num_rows = 0;
3642 
3643  // col_idx indexes "desc_id_to_column_id"
3644  size_t col_idx = 0;
3645  auto const load_tag = get_load_tag("load_table_binary_arrow", table_name);
3646  log_system_cpu_memory_status("start_" + load_tag, session_ptr->getCatalog());
3647  ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3648  log_system_cpu_memory_status("finish_" + load_tag, session_ptr->getCatalog());
3649  };
3650  try {
3651  for (auto cd : loader->get_column_descs()) {
3652  if (cd->isGeoPhyCol) {
3653  // Skip in the case of "cd" being a physical cols, as they are generated
3654  // in fillGeoColumns:
3655  // * Point: coords col
3656  // * MultiPoint/LineString: coords/bounds cols
3657  // etc...
3658  continue;
3659  }
3660  auto mapped_idx = desc_id_to_column_id[col_idx];
3661  if (mapped_idx != -1) {
3662  auto& array = *batch->column(mapped_idx);
3663  import_export::ArraySliceRange row_slice(0, array.length());
3664 
3665  // col_id indexes "import_buffers"
3666  size_t col_id = cd->columnId;
3667 
3668  // When importing a buffer with "add_arrow_values", the index in
3669  // "importing_buffers" is given by the "columnId" attribute of a ColumnDescriptor.
3670  // This index will differ from "col_idx" if any of the importing columns is a
3671  // geometry column as they have physical columns for other properties (i.e. a
3672  // LineString also has "coords" and "bounds").
3673  num_rows = import_buffers[col_id - 1]->add_arrow_values(
3674  cd, array, true, row_slice, nullptr);
3675  // For geometry columns: process WKT strings and fill physical columns
3676  if (cd->columnType.is_geometry()) {
3677  fillGeoColumns(request_info.sessionId(),
3678  session_ptr->getCatalog(),
3679  import_buffers,
3680  cd,
3681  col_id,
3682  num_rows,
3683  table_name);
3684  }
3685  }
3686  // Advance to the next column in the table
3687  col_idx++;
3688  }
3689  } catch (const std::exception& e) {
3690  LOG(ERROR) << "Input exception thrown: " << e.what()
3691  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
3692  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
3693  // other import paths
3694  THROW_DB_EXCEPTION(e.what());
3695  }
3696  fillMissingBuffers(request_info.sessionId(),
3697  session_ptr->getCatalog(),
3698  import_buffers,
3699  loader->get_column_descs(),
3700  desc_id_to_column_id,
3701  num_rows,
3702  table_name);
3703  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3704  session_ptr->getCatalog(), table_name);
3705  if (!loader->load(import_buffers, num_rows, session_ptr.get())) {
3706  THROW_DB_EXCEPTION(loader->getErrorMessage());
3707  }
3708 }
3709 
3710 void DBHandler::load_table(const TSessionId& session_id_or_json,
3711  const std::string& table_name,
3712  const std::vector<TStringRow>& rows,
3713  const std::vector<std::string>& column_names) {
3714  try {
3715  heavyai::RequestInfo const request_info(session_id_or_json);
3716  SET_REQUEST_ID(request_info.requestId());
3717  auto stdlog =
3718  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
3719  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3720  auto session_ptr = stdlog.getConstSessionInfo();
3721 
3722  if (rows.empty()) {
3723  THROW_DB_EXCEPTION("No rows to insert");
3724  }
3725  auto const load_tag = get_load_tag("load_table", table_name);
3726  log_system_cpu_memory_status("start_" + load_tag, session_ptr->getCatalog());
3727  ScopeGuard cleanup = [&load_tag, &session_ptr]() {
3728  log_system_cpu_memory_status("finish_" + load_tag, session_ptr->getCatalog());
3729  };
3730  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
3731  std::unique_ptr<import_export::Loader> loader;
3732  std::vector<std::unique_ptr<import_export::TypedImportBuffer>> import_buffers;
3733  auto schema_read_lock =
3734  prepare_loader_generic(*session_ptr,
3735  table_name,
3736  static_cast<size_t>(rows.front().cols.size()),
3737  &loader,
3738  &import_buffers,
3739  column_names,
3740  "load_table");
3741 
3742  auto col_descs = loader->get_column_descs();
3743  auto desc_id_to_column_id = column_ids_by_names(col_descs, column_names);
3744  import_export::CopyParams copy_params;
3745  size_t rows_completed = 0;
3746  for (auto const& row : rows) {
3747  size_t import_idx = 0; // index into the TStringRow being loaded
3748  size_t col_idx = 0; // index into column description vector
3749  try {
3750  size_t skip_physical_cols = 0;
3751  for (auto cd : col_descs) {
3752  if (skip_physical_cols > 0) {
3753  CHECK(cd->isGeoPhyCol);
3754  skip_physical_cols--;
3755  continue;
3756  }
3757  auto mapped_idx = desc_id_to_column_id[import_idx];
3758  if (mapped_idx != -1) {
3759  import_buffers[col_idx]->add_value(cd,
3760  row.cols[mapped_idx].str_val,
3761  row.cols[mapped_idx].is_null,
3762  copy_params);
3763  }
3764  col_idx++;
3765  if (cd->columnType.is_geometry()) {
3766  // physical geo columns will be filled separately lately
3767  skip_physical_cols = cd->columnType.get_physical_cols();
3768  col_idx += skip_physical_cols;
3769  }
3770  // Advance to the next field within the row
3771  import_idx++;
3772  }
3773  rows_completed++;
3774  } catch (const std::exception& e) {
3775  LOG(ERROR) << "Input exception thrown: " << e.what()
3776  << ". Row discarded, issue at column : " << (col_idx + 1)
3777  << " data :" << row;
3778  THROW_DB_EXCEPTION(std::string("Exception: ") + e.what());
3779  }
3780  }
3781  // do batch filling of geo columns separately
3782  if (rows.size() != 0) {
3783  const auto& row = rows[0];
3784  size_t col_idx = 0; // index into column description vector
3785  try {
3786  size_t import_idx = 0;
3787  size_t skip_physical_cols = 0;
3788  for (auto cd : col_descs) {
3789  if (skip_physical_cols > 0) {
3790  skip_physical_cols--;
3791  continue;
3792  }
3793  auto mapped_idx = desc_id_to_column_id[import_idx];
3794  col_idx++;
3795  if (cd->columnType.is_geometry()) {
3796  skip_physical_cols = cd->columnType.get_physical_cols();
3797  if (mapped_idx != -1) {
3798  fillGeoColumns(request_info.sessionId(),
3799  session_ptr->getCatalog(),
3800  import_buffers,
3801  cd,
3802  col_idx,
3803  rows_completed,
3804  table_name);
3805  } else {
3806  col_idx += skip_physical_cols;
3807  }
3808  }
3809  import_idx++;
3810  }
3811  } catch (const std::exception& e) {
3812  LOG(ERROR) << "Input exception thrown: " << e.what()
3813  << ". Row discarded, issue at column : " << (col_idx + 1)
3814  << " data :" << row;
3815  THROW_DB_EXCEPTION(e.what());
3816  }
3817  }
3818  fillMissingBuffers(request_info.sessionId(),
3819  session_ptr->getCatalog(),
3820  import_buffers,
3821  col_descs,
3822  desc_id_to_column_id,
3823  rows_completed,
3824  table_name);
3825  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3826  session_ptr->getCatalog(), table_name);
3827  if (!loader->load(import_buffers, rows_completed, session_ptr.get())) {
3828  THROW_DB_EXCEPTION(loader->getErrorMessage());
3829  }
3830 
3831  } catch (const std::exception& e) {
3832  THROW_DB_EXCEPTION(std::string(e.what()));
3833  }
3834 }
3835 
3836 char DBHandler::unescape_char(std::string str) {
3837  char out = str[0];
3838  if (str.size() == 2 && str[0] == '\\') {
3839  if (str[1] == 't') {
3840  out = '\t';
3841  } else if (str[1] == 'n') {
3842  out = '\n';
3843  } else if (str[1] == '0') {
3844  out = '\0';
3845  } else if (str[1] == '\'') {
3846  out = '\'';
3847  } else if (str[1] == '\\') {
3848  out = '\\';
3849  }
3850  }
3851  return out;
3852 }
3853 
3855  import_export::CopyParams copy_params;
3856  switch (cp.has_header) {
3857  case TImportHeaderRow::AUTODETECT:
3859  break;
3860  case TImportHeaderRow::NO_HEADER:
3862  break;
3863  case TImportHeaderRow::HAS_HEADER:
3865  break;
3866  default:
3867  CHECK(false);
3868  }
3869  copy_params.quoted = cp.quoted;
3870  if (cp.delimiter.length() > 0) {
3871  copy_params.delimiter = unescape_char(cp.delimiter);
3872  } else {
3873  copy_params.delimiter = '\0';
3874  }
3875  if (cp.null_str.length() > 0) {
3876  copy_params.null_str = cp.null_str;
3877  }
3878  if (cp.quote.length() > 0) {
3879  copy_params.quote = unescape_char(cp.quote);
3880  }
3881  if (cp.escape.length() > 0) {
3882  copy_params.escape = unescape_char(cp.escape);
3883  }
3884  if (cp.line_delim.length() > 0) {
3885  copy_params.line_delim = unescape_char(cp.line_delim);
3886  }
3887  if (cp.array_delim.length() > 0) {
3888  copy_params.array_delim = unescape_char(cp.array_delim);
3889  }
3890  if (cp.array_begin.length() > 0) {
3891  copy_params.array_begin = unescape_char(cp.array_begin);
3892  }
3893  if (cp.array_end.length() > 0) {
3894  copy_params.array_end = unescape_char(cp.array_end);
3895  }
3896  if (cp.threads != 0) {
3897  copy_params.threads = cp.threads;
3898  }
3899  if (cp.s3_access_key.length() > 0) {
3900  copy_params.s3_access_key = cp.s3_access_key;
3901  }
3902  if (cp.s3_secret_key.length() > 0) {
3903  copy_params.s3_secret_key = cp.s3_secret_key;
3904  }
3905  if (cp.s3_session_token.length() > 0) {
3906  copy_params.s3_session_token = cp.s3_session_token;
3907  }
3908  if (cp.s3_region.length() > 0) {
3909  copy_params.s3_region = cp.s3_region;
3910  }
3911  if (cp.s3_endpoint.length() > 0) {
3912  copy_params.s3_endpoint = cp.s3_endpoint;
3913  }
3914 #ifdef HAVE_AWS_S3
3915  if (g_allow_s3_server_privileges && cp.s3_access_key.length() == 0 &&
3916  cp.s3_secret_key.length() == 0 && cp.s3_session_token.length() == 0) {
3917  const auto& server_credentials =
3918  Aws::Auth::DefaultAWSCredentialsProviderChain().GetAWSCredentials();
3919  copy_params.s3_access_key = server_credentials.GetAWSAccessKeyId();
3920  copy_params.s3_secret_key = server_credentials.GetAWSSecretKey();
3921  copy_params.s3_session_token = server_credentials.GetSessionToken();
3922  }
3923 #endif
3924 
3925  switch (cp.source_type) {
3926  case TSourceType::DELIMITED_FILE:
3928  break;
3929  case TSourceType::GEO_FILE:
3931  break;
3932  case TSourceType::PARQUET_FILE:
3933 #ifdef ENABLE_IMPORT_PARQUET
3935  break;
3936 #else
3937  THROW_DB_EXCEPTION("Parquet not supported");
3938 #endif
3939  case TSourceType::ODBC:
3940  THROW_DB_EXCEPTION("ODBC source not supported");
3941  case TSourceType::RASTER_FILE:
3943  break;
3944  default:
3945  CHECK(false);
3946  }
3947 
3948  switch (cp.geo_coords_encoding) {
3949  case TEncodingType::GEOINT:
3950  copy_params.geo_coords_encoding = kENCODING_GEOINT;
3951  break;
3952  case TEncodingType::NONE:
3953  copy_params.geo_coords_encoding = kENCODING_NONE;
3954  break;
3955  default:
3956  THROW_DB_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
3957  std::to_string((int)cp.geo_coords_encoding));
3958  }
3959  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
3960  switch (cp.geo_coords_type) {
3961  case TDatumType::GEOGRAPHY:
3962  copy_params.geo_coords_type = kGEOGRAPHY;
3963  break;
3964  case TDatumType::GEOMETRY:
3965  copy_params.geo_coords_type = kGEOMETRY;
3966  break;
3967  default:
3968  THROW_DB_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
3969  std::to_string((int)cp.geo_coords_type));
3970  }
3971  switch (cp.geo_coords_srid) {
3972  case 4326:
3973  case 3857:
3974  case 900913:
3975  copy_params.geo_coords_srid = cp.geo_coords_srid;
3976  break;
3977  default:
3978  THROW_DB_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
3979  std::to_string((int)cp.geo_coords_srid));
3980  }
3981  copy_params.sanitize_column_names = cp.sanitize_column_names;
3982  copy_params.geo_layer_name = cp.geo_layer_name;
3983  copy_params.geo_explode_collections = cp.geo_explode_collections;
3984  copy_params.source_srid = cp.source_srid;
3985  switch (cp.raster_point_type) {
3986  case TRasterPointType::NONE:
3988  break;
3989  case TRasterPointType::AUTO:
3991  break;
3992  case TRasterPointType::SMALLINT:
3994  break;
3995  case TRasterPointType::INT:
3997  break;
3998  case TRasterPointType::FLOAT:
4000  break;
4001  case TRasterPointType::DOUBLE:
4003  break;
4004  case TRasterPointType::POINT:
4006  break;
4007  default:
4008  CHECK(false);
4009  }
4010  copy_params.raster_import_bands = cp.raster_import_bands;
4011  if (cp.raster_scanlines_per_thread < 0) {
4012  THROW_DB_EXCEPTION("Invalid raster_scanlines_per_thread in TCopyParams (" +
4013  std::to_string((int)cp.raster_scanlines_per_thread));
4014  } else {
4015  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
4016  }
4017  switch (cp.raster_point_transform) {
4018  case TRasterPointTransform::NONE:
4020  break;
4021  case TRasterPointTransform::AUTO:
4023  break;
4024  case TRasterPointTransform::FILE:
4026  break;
4027  case TRasterPointTransform::WORLD:
4029  break;
4030  default:
4031  CHECK(false);
4032  }
4033  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
4034  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
4035  copy_params.dsn = cp.odbc_dsn;
4036  copy_params.connection_string = cp.odbc_connection_string;
4037  copy_params.sql_select = cp.odbc_sql_select;
4038  copy_params.sql_order_by = cp.odbc_sql_order_by;
4039  copy_params.username = cp.odbc_username;
4040  copy_params.password = cp.odbc_password;
4041  copy_params.credential_string = cp.odbc_credential_string;
4042  copy_params.add_metadata_columns = cp.add_metadata_columns;
4043  copy_params.trim_spaces = cp.trim_spaces;
4044  copy_params.geo_validate_geometry = cp.geo_validate_geometry;
4045  copy_params.raster_drop_if_all_null = cp.raster_drop_if_all_null;
4046  return copy_params;
4047 }
4048 
4050  TCopyParams copy_params;
4051  copy_params.delimiter = cp.delimiter;
4052  copy_params.null_str = cp.null_str;
4053  switch (cp.has_header) {
4055  copy_params.has_header = TImportHeaderRow::AUTODETECT;
4056  break;
4058  copy_params.has_header = TImportHeaderRow::NO_HEADER;
4059  break;
4061  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
4062  break;
4063  default:
4064  CHECK(false);
4065  }
4066  copy_params.quoted = cp.quoted;
4067  copy_params.quote = cp.quote;
4068  copy_params.escape = cp.escape;
4069  copy_params.line_delim = cp.line_delim;
4070  copy_params.array_delim = cp.array_delim;
4071  copy_params.array_begin = cp.array_begin;
4072  copy_params.array_end = cp.array_end;
4073  copy_params.threads = cp.threads;
4074  copy_params.s3_access_key = cp.s3_access_key;
4075  copy_params.s3_secret_key = cp.s3_secret_key;
4076  copy_params.s3_session_token = cp.s3_session_token;
4077  copy_params.s3_region = cp.s3_region;
4078  copy_params.s3_endpoint = cp.s3_endpoint;
4079  switch (cp.source_type) {
4081  copy_params.source_type = TSourceType::DELIMITED_FILE;
4082  break;
4084  copy_params.source_type = TSourceType::GEO_FILE;
4085  break;
4087  copy_params.source_type = TSourceType::PARQUET_FILE;
4088  break;
4090  copy_params.source_type = TSourceType::RASTER_FILE;
4091  break;
4093  copy_params.source_type = TSourceType::ODBC;
4094  break;
4095  default:
4096  CHECK(false);
4097  }
4098  switch (cp.geo_coords_encoding) {
4099  case kENCODING_GEOINT:
4100  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
4101  break;
4102  default:
4103  copy_params.geo_coords_encoding = TEncodingType::NONE;
4104  break;
4105  }
4106  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
4107  switch (cp.geo_coords_type) {
4108  case kGEOGRAPHY:
4109  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
4110  break;
4111  case kGEOMETRY:
4112  copy_params.geo_coords_type = TDatumType::GEOMETRY;
4113  break;
4114  default:
4115  CHECK(false);
4116  }
4117  copy_params.geo_coords_srid = cp.geo_coords_srid;
4118  copy_params.sanitize_column_names = cp.sanitize_column_names;
4119  copy_params.geo_layer_name = cp.geo_layer_name;
4120  copy_params.geo_assign_render_groups = false;
4121  copy_params.geo_explode_collections = cp.geo_explode_collections;
4122  copy_params.source_srid = cp.source_srid;
4123  switch (cp.raster_point_type) {
4125  copy_params.raster_point_type = TRasterPointType::NONE;
4126  break;
4128  copy_params.raster_point_type = TRasterPointType::AUTO;
4129  break;
4131  copy_params.raster_point_type = TRasterPointType::SMALLINT;
4132  break;
4134  copy_params.raster_point_type = TRasterPointType::INT;
4135  break;
4137  copy_params.raster_point_type = TRasterPointType::FLOAT;
4138  break;
4140  copy_params.raster_point_type = TRasterPointType::DOUBLE;
4141  break;
4143  copy_params.raster_point_type = TRasterPointType::POINT;
4144  break;
4145  default:
4146  CHECK(false);
4147  }
4148  copy_params.raster_import_bands = cp.raster_import_bands;
4149  copy_params.raster_scanlines_per_thread = cp.raster_scanlines_per_thread;
4150  switch (cp.raster_point_transform) {
4152  copy_params.raster_point_transform = TRasterPointTransform::NONE;
4153  break;
4155  copy_params.raster_point_transform = TRasterPointTransform::AUTO;
4156  break;
4158  copy_params.raster_point_transform = TRasterPointTransform::FILE;
4159  break;
4161  copy_params.raster_point_transform = TRasterPointTransform::WORLD;
4162  break;
4163  default:
4164  CHECK(false);
4165  }
4166  copy_params.raster_point_compute_angle = cp.raster_point_compute_angle;
4167  copy_params.raster_import_dimensions = cp.raster_import_dimensions;
4168  copy_params.odbc_dsn = cp.dsn;
4169  copy_params.odbc_connection_string = cp.connection_string;
4170  copy_params.odbc_sql_select = cp.sql_select;
4171  copy_params.odbc_sql_order_by = cp.sql_order_by;
4172  copy_params.odbc_username = cp.username;
4173  copy_params.odbc_password = cp.password;
4174  copy_params.odbc_credential_string = cp.credential_string;
4175  copy_params.add_metadata_columns = cp.add_metadata_columns;
4176  copy_params.trim_spaces = cp.trim_spaces;
4177  copy_params.geo_validate_geometry = cp.geo_validate_geometry;
4178  copy_params.raster_drop_if_all_null = cp.raster_drop_if_all_null;
4179  return copy_params;
4180 }
4181 
4182 namespace {
4183 void add_vsi_network_prefix(std::string& path) {
4184  // do we support network file access?
4185  bool gdal_network = Geospatial::GDAL::supportsNetworkFileAccess();
4186 
4187  // modify head of filename based on source location
4188  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
4189  if (!gdal_network) {
4191  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
4192  }
4193  // invoke GDAL CURL virtual file reader
4194  path = "/vsicurl/" + path;
4195  } else if (boost::istarts_with(path, "s3://")) {
4196  if (!gdal_network) {
4198  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
4199  }
4200  // invoke GDAL S3 virtual file reader
4201  boost::replace_first(path, "s3://", "/vsis3/");
4202  }
4203 }
4204 
4205 void add_vsi_geo_prefix(std::string& path) {
4206  // single gzip'd file (not an archive)?
4207  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
4208  path = "/vsigzip/" + path;
4209  }
4210 }
4211 
4212 void add_vsi_archive_prefix(std::string& path) {
4213  // check for compressed file or file bundle
4214  if (boost::iends_with(path, ".zip")) {
4215  // zip archive
4216  path = "/vsizip/" + path;
4217  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
4218  boost::iends_with(path, ".tar.gz")) {
4219  // tar archive (compressed or uncompressed)
4220  path = "/vsitar/" + path;
4221  }
4222 }
4223 
4224 std::string remove_vsi_prefixes(const std::string& path_in) {
4225  std::string path(path_in);
4226 
4227  // these will be first
4228  if (boost::istarts_with(path, "/vsizip/")) {
4229  boost::replace_first(path, "/vsizip/", "");
4230  } else if (boost::istarts_with(path, "/vsitar/")) {
4231  boost::replace_first(path, "/vsitar/", "");
4232  } else if (boost::istarts_with(path, "/vsigzip/")) {
4233  boost::replace_first(path, "/vsigzip/", "");
4234  }
4235 
4236  // then these
4237  if (boost::istarts_with(path, "/vsicurl/")) {
4238  boost::replace_first(path, "/vsicurl/", "");
4239  } else if (boost::istarts_with(path, "/vsis3/")) {
4240  boost::replace_first(path, "/vsis3/", "s3://");
4241  }
4242 
4243  return path;
4244 }
4245 
4246 bool path_is_relative(const std::string& path) {
4247  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
4248  boost::istarts_with(path, "https://")) {
4249  return false;
4250  }
4251  return !boost::filesystem::path(path).is_absolute();
4252 }
4253 
4254 bool path_has_valid_filename(const std::string& path) {
4255  auto filename = boost::filesystem::path(path).filename().string();
4256  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
4257  return false;
4258  }
4259  return true;
4260 }
4261 
4262 bool is_a_supported_geo_file(const std::string& path) {
4263  if (!path_has_valid_filename(path)) {
4264  return false;
4265  }
4266  // this is now just for files that we want to recognize
4267  // as geo when inside an archive (see below)
4268  // @TODO(se) make this more flexible?
4269  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
4270  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
4271  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
4272  boost::iends_with(path, ".gdb.zip") || boost::iends_with(path, ".fgb")) {
4273  return true;
4274  }
4275  return false;
4276 }
4277 
4278 bool is_a_supported_archive_file(const std::string& path) {
4279  if (!path_has_valid_filename(path)) {
4280  return false;
4281  }
4282  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
4283  return true;
4284  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
4285  boost::iends_with(path, ".tar.gz")) {
4286  return true;
4287  }
4288  return false;
4289 }
4290 
4291 std::string find_first_geo_file_in_archive(const std::string& archive_path,
4292  const import_export::CopyParams& copy_params) {
4293  // get the recursive list of all files in the archive
4294  std::vector<std::string> files =
4295  import_export::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
4296 
4297  // report the list
4298  LOG(INFO) << "Found " << files.size() << " files in Archive "
4299  << remove_vsi_prefixes(archive_path);
4300  for (const auto& file : files) {
4301  LOG(INFO) << " " << file;
4302  }
4303 
4304  // scan the list for the first candidate file
4305  bool found_suitable_file = false;
4306  std::string file_name;
4307  for (const auto& file : files) {
4308  if (is_a_supported_geo_file(file)) {
4309  file_name = file;
4310  found_suitable_file = true;
4311  break;
4312  }
4313  }
4314 
4315  // if we didn't find anything
4316  if (!found_suitable_file) {
4317  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
4318  remove_vsi_prefixes(archive_path);
4319  file_name.clear();
4320  }
4321 
4322  // done
4323  return file_name;
4324 }
4325 
4326 bool is_local_file(const std::string& file_path) {
4327  return (!boost::istarts_with(file_path, "s3://") &&
4328  !boost::istarts_with(file_path, "http://") &&
4329  !boost::istarts_with(file_path, "https://"));
4330 }
4331 
4332 void validate_import_file_path_if_local(const std::string& file_path) {
4333  if (is_local_file(file_path)) {
4335  file_path, ddl_utils::DataTransferType::IMPORT, true);
4336  }
4337 }
4338 } // namespace
4339 
4340 void DBHandler::detect_column_types(TDetectResult& _return,
4341  const TSessionId& session_id_or_json,
4342  const std::string& file_name_in,
4343  const TCopyParams& cp) {
4344  heavyai::RequestInfo const request_info(session_id_or_json);
4345  SET_REQUEST_ID(request_info.requestId());
4346  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4347  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4348  check_read_only("detect_column_types");
4349 
4350  bool is_raster = false;
4351  boost::filesystem::path file_path;
4353  if (copy_params.source_type != import_export::SourceType::kOdbc) {
4354  std::string file_name{file_name_in};
4355  if (path_is_relative(file_name)) {
4356  // assume relative paths are relative to data_path / import / <session>
4357  auto temp_file_path = import_path_ /
4358  picosha2::hash256_hex_string(request_info.sessionId()) /
4359  boost::filesystem::path(file_name).filename();
4360  file_name = temp_file_path.string();
4361  }
4363 
4364  if ((copy_params.source_type == import_export::SourceType::kGeoFile ||
4366  is_local_file(file_name)) {
4367  const shared::FilePathOptions options{copy_params.regex_path_filter,
4368  copy_params.file_sort_order_by,
4369  copy_params.file_sort_regex};
4370  auto file_paths = shared::local_glob_filter_sort_files(file_name, options, false);
4371  // For geo and raster detect, pick the first file, if multiple files are provided
4372  // (e.g. through file globbing).
4373  CHECK(!file_paths.empty());
4374  file_name = file_paths[0];
4375  }
4376 
4377  // if it's a geo or raster import, handle alternative paths (S3, HTTP, archive etc.)
4378  if (copy_params.source_type == import_export::SourceType::kGeoFile) {
4379  if (is_a_supported_archive_file(file_name)) {
4380  // find the archive file
4381  add_vsi_network_prefix(file_name);
4382  if (!import_export::Importer::gdalFileExists(file_name, copy_params)) {
4383  THROW_DB_EXCEPTION("Archive does not exist: " + file_name_in);
4384  }
4385  // find geo file in archive
4386  add_vsi_archive_prefix(file_name);
4387  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4388  // prepare to detect that geo file
4389  if (geo_file.size()) {
4390  file_name = file_name + std::string("/") + geo_file;
4391  }
4392  } else {
4393  // prepare to detect geo file directly
4394  add_vsi_network_prefix(file_name);
4395  add_vsi_geo_prefix(file_name);
4396  }
4397  } else if (copy_params.source_type == import_export::SourceType::kRasterFile) {
4398  // prepare to detect raster file directly
4399  add_vsi_network_prefix(file_name);
4400  add_vsi_geo_prefix(file_name);
4401  is_raster = true;
4402  }
4403 
4404  file_path = boost::filesystem::path(file_name);
4405  // can be a s3 url
4406  if (!boost::istarts_with(file_name, "s3://")) {
4407  if (!boost::filesystem::path(file_name).is_absolute()) {
4408  file_path = import_path_ /
4409  picosha2::hash256_hex_string(request_info.sessionId()) /
4410  boost::filesystem::path(file_name).filename();
4411  file_name = file_path.string();
4412  }
4413 
4414  if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4416  // check for geo or raster file
4417  if (!import_export::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4418  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4419  "\" does not exist.")
4420  }
4421  } else {
4422  // check for regular file
4423  if (!shared::file_or_glob_path_exists(file_path.string())) {
4424  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
4425  "\" does not exist.");
4426  }
4427  }
4428  }
4429  }
4430 
4431  try {
4433 #ifdef ENABLE_IMPORT_PARQUET
4435 #endif
4436  ) {
4437  import_export::Detector detector(file_path, copy_params);
4438  auto best_types = detector.getBestColumnTypes();
4439  std::vector<std::string> headers = detector.get_headers();
4440  copy_params = detector.get_copy_params();
4441 
4442  _return.copy_params = copyparams_to_thrift(copy_params);
4443  _return.row_set.row_desc.resize(best_types.size());
4444  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
4445  TColumnType col;
4446  auto& ti = best_types[col_idx];
4447  col.col_type.precision = ti.get_precision();
4448  col.col_type.scale = ti.get_scale();
4449  col.col_type.comp_param = ti.get_comp_param();
4450  if (ti.is_geometry()) {
4451  // set this so encoding_to_thrift does the right thing
4452  ti.set_compression(copy_params.geo_coords_encoding);
4453  // fill in these directly
4454  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
4455  col.col_type.scale = copy_params.geo_coords_srid;
4456  col.col_type.comp_param = copy_params.geo_coords_comp_param;
4457  }
4458  col.col_type.type = type_to_thrift(ti);
4459  col.col_type.encoding = encoding_to_thrift(ti);
4460  if (ti.is_array()) {
4461  col.col_type.is_array = true;
4462  }
4463  if (copy_params.sanitize_column_names) {
4464  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
4465  } else {
4466  col.col_name = headers[col_idx];
4467  }
4468  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
4469  _return.row_set.row_desc[col_idx] = col;
4470  }
4471  auto sample_data = detector.get_sample_rows(shared::kDefaultSampleRowsCount);
4472 
4473  TRow sample_row;
4474  for (auto row : sample_data) {
4475  sample_row.cols.clear();
4476  for (const auto& s : row) {
4477  TDatum td;
4478  td.val.str_val = s;
4479  td.is_null = s.empty();
4480  sample_row.cols.push_back(td);
4481  }
4482  _return.row_set.rows.push_back(sample_row);
4483  }
4484  } else if (copy_params.source_type == import_export::SourceType::kGeoFile ||
4486  check_geospatial_files(file_path, copy_params);
4487  std::list<ColumnDescriptor> cds = import_export::Importer::gdalToColumnDescriptors(
4488  file_path.string(), is_raster, Geospatial::kGeoColumnName, copy_params);
4489  for (auto cd : cds) {
4490  if (copy_params.sanitize_column_names) {
4491  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
4492  }
4493  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
4494  }
4495  if (!is_raster) {
4496  // @TODO(se) support for raster?
4497  std::map<std::string, std::vector<std::string>> sample_data;
4500  sample_data,
4502  copy_params);
4503  if (sample_data.size() > 0) {
4504  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
4505  TRow sample_row;
4506  for (auto cd : cds) {
4507  TDatum td;
4508  td.val.str_val = sample_data[cd.sourceName].at(i);
4509  td.is_null = td.val.str_val.empty();
4510  sample_row.cols.push_back(td);
4511  }
4512  _return.row_set.rows.push_back(sample_row);
4513  }
4514  }
4515  }
4516  _return.copy_params = copyparams_to_thrift(copy_params);
4517  }
4518  } catch (const std::exception& e) {
4519  THROW_DB_EXCEPTION("detect_column_types error: " + std::string(e.what()));
4520  }
4521 }
4522 
4523 void DBHandler::render_vega(TRenderResult& _return,
4524  const TSessionId& session_id_or_json,
4525  const int64_t widget_id,
4526  const std::string& vega_json,
4527  const int compression_level,
4528  const std::string& nonce) {
4529  heavyai::RequestInfo const request_info(session_id_or_json);
4530  SET_REQUEST_ID(request_info.requestId());
4531  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()),
4532  "widget_id",
4533  widget_id,
4534  "compression_level",
4535  compression_level,
4536  "vega_json",
4537  vega_json,
4538  "nonce",
4539  nonce);
4540  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4541  stdlog.appendNameValuePairs("nonce", nonce);
4542  if (!render_handler_) {
4543  THROW_DB_EXCEPTION("Backend rendering is disabled.");
4544  }
4545 
4546  // cast away const-ness of incoming Thrift string ref
4547  // to allow it to be passed down as an r-value and
4548  // ultimately std::moved into the RenderSession
4549  auto& non_const_vega_json = const_cast<std::string&>(vega_json);
4550 
4551  _return.total_time_ms = measure<>::execution([&]() {
4552  try {
4553  render_handler_->render_vega(_return,
4554  stdlog.getSessionInfo(),
4555  widget_id,
4556  std::move(non_const_vega_json),
4557  compression_level,
4558  nonce);
4559  } catch (std::exception& e) {
4560  THROW_DB_EXCEPTION(e.what());
4561  }
4562  });
4563 }
4564 
4566  int32_t dashboard_id,
4567  AccessPrivileges requestedPermissions) {
4568  DBObject object(dashboard_id, DashboardDBObjectType);
4569  auto& catalog = session_info.getCatalog();
4570  auto& user = session_info.get_currentUser();
4571  object.loadKey(catalog);
4572  object.setPrivileges(requestedPermissions);
4573  std::vector<DBObject> privs = {object};
4574  return SysCatalog::instance().checkPrivileges(user, privs);
4575 }
4576 
4577 // custom expressions
4578 namespace {
4581 
4582 std::unique_ptr<Catalog_Namespace::CustomExpression> create_custom_expr_from_thrift_obj(
4583  const TCustomExpression& t_custom_expr,
4584  const Catalog& catalog) {
4585  if (t_custom_expr.data_source_name.empty()) {
4586  THROW_DB_EXCEPTION("Custom expression data source name cannot be empty.")
4587  }
4588  CHECK(t_custom_expr.data_source_type == TDataSourceType::type::TABLE)
4589  << "Unexpected data source type: "
4590  << static_cast<int>(t_custom_expr.data_source_type);
4591  auto td = catalog.getMetadataForTable(t_custom_expr.data_source_name, false);
4592  if (!td) {
4593  THROW_DB_EXCEPTION("Custom expression references a table \"" +
4594  t_custom_expr.data_source_name + "\" that does not exist.")
4595  }
4596  DataSourceType data_source_type = DataSourceType::TABLE;
4597  return std::make_unique<CustomExpression>(
4598  t_custom_expr.name, t_custom_expr.expression_json, data_source_type, td->tableId);
4599 }
4600 
4601 TCustomExpression create_thrift_obj_from_custom_expr(const CustomExpression& custom_expr,
4602  const Catalog& catalog) {
4603  TCustomExpression t_custom_expr;
4604  t_custom_expr.id = custom_expr.id;
4605  t_custom_expr.name = custom_expr.name;
4606  t_custom_expr.expression_json = custom_expr.expression_json;
4607  t_custom_expr.data_source_id = custom_expr.data_source_id;
4608  t_custom_expr.is_deleted = custom_expr.is_deleted;
4609  CHECK(custom_expr.data_source_type == DataSourceType::TABLE)
4610  << "Unexpected data source type: "
4611  << static_cast<int>(custom_expr.data_source_type);
4612  t_custom_expr.data_source_type = TDataSourceType::type::TABLE;
4613  auto td = catalog.getMetadataForTable(custom_expr.data_source_id, false);
4614  if (td) {
4615  t_custom_expr.data_source_name = td->tableName;
4616  } else {
4617  LOG(WARNING)
4618  << "Custom expression references a deleted data source. Custom expression id: "
4619  << custom_expr.id << ", name: " << custom_expr.name;
4620  }
4621  return t_custom_expr;
4622 }
4623 } // namespace
4624 
4625 int32_t DBHandler::create_custom_expression(const TSessionId& session_id_or_json,
4626  const TCustomExpression& t_custom_expr) {
4627  heavyai::RequestInfo const request_info(session_id_or_json);
4628  SET_REQUEST_ID(request_info.requestId());
4629  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4630  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4631  check_read_only("create_custom_expression");
4632 
4633  auto session_ptr = stdlog.getConstSessionInfo();
4634  if (!session_ptr->get_currentUser().isSuper) {
4635  THROW_DB_EXCEPTION("Custom expressions can only be created by super users.")
4636  }
4637  auto& catalog = session_ptr->getCatalog();
4639  return catalog.createCustomExpression(
4640  create_custom_expr_from_thrift_obj(t_custom_expr, catalog));
4641 }
4642 
4643 void DBHandler::get_custom_expressions(std::vector<TCustomExpression>& _return,
4644  const TSessionId& session_id_or_json) {
4645  heavyai::RequestInfo const request_info(session_id_or_json);
4646  SET_REQUEST_ID(request_info.requestId());
4647  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4648  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4649 
4650  auto session_ptr = stdlog.getConstSessionInfo();
4651  auto& catalog = session_ptr->getCatalog();
4653  auto custom_expressions =
4654  catalog.getCustomExpressionsForUser(session_ptr->get_currentUser());
4655  for (const auto& custom_expression : custom_expressions) {
4656  _return.emplace_back(create_thrift_obj_from_custom_expr(*custom_expression, catalog));
4657  }
4658 }
4659 
4660 void DBHandler::update_custom_expression(const TSessionId& session_id_or_json,
4661  const int32_t id,
4662  const std::string& expression_json) {
4663  heavyai::RequestInfo const request_info(session_id_or_json);
4664  SET_REQUEST_ID(request_info.requestId());
4665  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4666  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4667  check_read_only("update_custom_expression");
4668 
4669  auto session_ptr = stdlog.getConstSessionInfo();
4670  if (!session_ptr->get_currentUser().isSuper) {
4671  THROW_DB_EXCEPTION("Custom expressions can only be updated by super users.")
4672  }
4673  auto& catalog = session_ptr->getCatalog();
4675  catalog.updateCustomExpression(id, expression_json);
4676 }
4677 
4679  const TSessionId& session_id_or_json,
4680  const std::vector<int32_t>& custom_expression_ids,
4681  const bool do_soft_delete) {
4682  heavyai::RequestInfo const request_info(session_id_or_json);
4683  SET_REQUEST_ID(request_info.requestId());
4684  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4685  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4686  check_read_only("delete_custom_expressions");
4687 
4688  auto session_ptr = stdlog.getConstSessionInfo();
4689  if (!session_ptr->get_currentUser().isSuper) {
4690  THROW_DB_EXCEPTION("Custom expressions can only be deleted by super users.")
4691  }
4692  auto& catalog = session_ptr->getCatalog();
4694  catalog.deleteCustomExpressions(custom_expression_ids, do_soft_delete);
4695 }
4696 
4697 // dashboards
4698 void DBHandler::get_dashboard(TDashboard& dashboard,
4699  const TSessionId& session_id_or_json,
4700  const int32_t dashboard_id) {
4701  heavyai::RequestInfo const request_info(session_id_or_json);
4702  SET_REQUEST_ID(request_info.requestId());
4703  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4704  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4705  auto session_ptr = stdlog.getConstSessionInfo();
4706  auto const& cat = session_ptr->getCatalog();
4708  auto dash = cat.getMetadataForDashboard(dashboard_id);
4709  if (!dash) {
4710  THROW_DB_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
4711  " doesn't exist");
4712  }
4714  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4715  THROW_DB_EXCEPTION("User has no view privileges for the dashboard with id " +
4716  std::to_string(dashboard_id));
4717  }
4718  user_meta.userName = "";
4719  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4720  dashboard = get_dashboard_impl(session_ptr, user_meta, dash);
4721 }
4722 
4723 void DBHandler::get_dashboards(std::vector<TDashboard>& dashboards,
4724  const TSessionId& session_id_or_json) {
4725  heavyai::RequestInfo const request_info(session_id_or_json);
4726  SET_REQUEST_ID(request_info.requestId());
4727  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4728  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4729  auto session_ptr = stdlog.getConstSessionInfo();
4730  auto const& cat = session_ptr->getCatalog();
4732  const auto dashes = cat.getAllDashboardsMetadata();
4733  user_meta.userName = "";
4734  for (const auto dash : dashes) {
4736  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
4737  // dashboardState is intentionally not populated here
4738  // for payload reasons
4739  // use get_dashboard call to get state
4740  dashboards.push_back(get_dashboard_impl(session_ptr, user_meta, dash, false));
4741  }
4742  }
4743 }
4744 
4746  const std::shared_ptr<Catalog_Namespace::SessionInfo const>& session_ptr,
4748  const DashboardDescriptor* dash,
4749  const bool populate_state) {
4750  auto const& cat = session_ptr->getCatalog();
4751  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
4752  auto objects_list = SysCatalog::instance().getMetadataForObject(
4753  cat.getCurrentDB().dbId,
4754  static_cast<int>(DBObjectType::DashboardDBObjectType),
4755  dash->dashboardId);
4756  TDashboard dashboard;
4757  dashboard.dashboard_name = dash->dashboardName;
4758  if (populate_state) {
4759  dashboard.dashboard_state = dash->dashboardState;
4760  }
4761  dashboard.image_hash = dash->imageHash;
4762  dashboard.update_time = dash->updateTime;
4763  dashboard.dashboard_metadata = dash->dashboardMetadata;
4764  dashboard.dashboard_id = dash->dashboardId;
4765  dashboard.dashboard_owner = dash->user;
4766  TDashboardPermissions perms;
4767  // Super user has all permissions.
4768  if (session_ptr->get_currentUser().isSuper) {
4769  perms.create_ = true;
4770  perms.delete_ = true;
4771  perms.edit_ = true;
4772  perms.view_ = true;
4773  } else {
4774  // Collect all grants on current user
4775  // add them to the permissions.
4776  auto obj_to_find =
4777  DBObject(dashboard.dashboard_id, DBObjectType::DashboardDBObjectType);
4778  obj_to_find.loadKey(cat);
4779  std::vector<std::string> grantees =
4780  SysCatalog::instance().getRoles(true,
4781  session_ptr->get_currentUser().isSuper,
4782  session_ptr->get_currentUser().userName);
4783  for (const auto& grantee : grantees) {
4784  DBObject* object_found;
4785  auto* gr = SysCatalog::instance().getGrantee(grantee);
4786  if (gr && (object_found = gr->findDbObject(obj_to_find.getObjectKey(), true))) {
4787  const auto obj_privs = object_found->getPrivileges();
4788  perms.create_ |= obj_privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
4789  perms.delete_ |= obj_privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
4790  perms.edit_ |= obj_privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
4791  perms.view_ |= obj_privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
4792  }
4793  }
4794  }
4795  dashboard.dashboard_permissions = perms;
4796  if (objects_list.empty() ||
4797  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
4798  dashboard.is_dash_shared = false;
4799  } else {
4800  dashboard.is_dash_shared = true;
4801  }
4802  return dashboard;
4803 }
4804 
4805 namespace dbhandler {
4806 bool is_info_schema_db(const std::string& db_name) {
4807  return (db_name == shared::kInfoSchemaDbName &&
4808  SysCatalog::instance().hasExecutedMigration(shared::kInfoSchemaMigrationName));
4809 }
4810 
4811 void check_not_info_schema_db(const std::string& db_name, bool throw_db_exception) {
4812  if (is_info_schema_db(db_name)) {
4813  std::string error_message{"Write requests/queries are not allowed in the " +
4814  shared::kInfoSchemaDbName + " database."};
4815  if (throw_db_exception) {
4816  THROW_DB_EXCEPTION(error_message)
4817  } else {
4818  throw std::runtime_error(error_message);
4819  }
4820  }
4821 }
4822 } // namespace dbhandler
4823 
4824 int32_t DBHandler::create_dashboard(const TSessionId& session_id_or_json,
4825  const std::string& dashboard_name,
4826  const std::string& dashboard_state,
4827  const std::string& image_hash,
4828  const std::string& dashboard_metadata) {
4829  heavyai::RequestInfo const request_info(session_id_or_json);
4830  SET_REQUEST_ID(request_info.requestId());
4831  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4832  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4833  auto session_ptr = stdlog.getConstSessionInfo();
4834  CHECK(session_ptr);
4835  check_read_only("create_dashboard");
4836  auto& cat = session_ptr->getCatalog();
4839  }
4840 
4841  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
4843  THROW_DB_EXCEPTION("Not enough privileges to create a dashboard.");
4844  }
4845 
4846  if (dashboard_exists(cat, session_ptr->get_currentUser().userId, dashboard_name)) {
4847  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4848  }
4849 
4851  dd.dashboardName = dashboard_name;
4852  dd.dashboardState = dashboard_state;
4853  dd.imageHash = image_hash;
4854  dd.dashboardMetadata = dashboard_metadata;
4855  dd.userId = session_ptr->get_currentUser().userId;
4856  dd.user = session_ptr->get_currentUser().userName;
4857 
4858  try {
4859  auto id = cat.createDashboard(dd);
4860  // TODO: transactionally unsafe
4861  SysCatalog::instance().createDBObject(
4862  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
4863  return id;
4864  } catch (const std::exception& e) {
4865  THROW_DB_EXCEPTION(e.what());
4866  }
4867 }
4868 
4869 void DBHandler::replace_dashboard(const TSessionId& session_id_or_json,
4870  const int32_t dashboard_id,
4871  const std::string& dashboard_name,
4872  const std::string& dashboard_owner,
4873  const std::string& dashboard_state,
4874  const std::string& image_hash,
4875  const std::string& dashboard_metadata) {
4876  heavyai::RequestInfo const request_info(session_id_or_json);
4877  SET_REQUEST_ID(request_info.requestId());
4878  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4879  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4880  auto session_ptr = stdlog.getConstSessionInfo();
4881  CHECK(session_ptr);
4882  check_read_only("replace_dashboard");
4883  auto& cat = session_ptr->getCatalog();
4886  }
4887 
4889  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
4890  THROW_DB_EXCEPTION("Not enough privileges to replace a dashboard.");
4891  }
4892 
4893  if (auto dash = cat.getMetadataForDashboard(
4894  std::to_string(session_ptr->get_currentUser().userId), dashboard_name)) {
4895  if (dash->dashboardId != dashboard_id) {
4896  THROW_DB_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
4897  }
4898  }
4899 
4901  dd.dashboardName = dashboard_name;
4902  dd.dashboardState = dashboard_state;
4903  dd.imageHash = image_hash;
4904  dd.dashboardMetadata = dashboard_metadata;
4906  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
4907  THROW_DB_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
4908  " does not exist");
4909  }
4910  dd.userId = user.userId;
4911  dd.user = dashboard_owner;
4912  dd.dashboardId = dashboard_id;
4913 
4914  try {
4915  cat.replaceDashboard(dd);
4916  } catch (const std::exception& e) {
4917  THROW_DB_EXCEPTION(e.what());
4918  }
4919 }
4920 
4921 void DBHandler::delete_dashboard(const TSessionId& session_id_or_json,
4922  const int32_t dashboard_id) {
4923  delete_dashboards(session_id_or_json, {dashboard_id});
4924 }
4925 
4926 void DBHandler::delete_dashboards(const TSessionId& session_id_or_json,
4927  const std::vector<int32_t>& dashboard_ids) {
4928  heavyai::RequestInfo const request_info(session_id_or_json);
4929  SET_REQUEST_ID(request_info.requestId());
4930  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
4931  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4932  auto session_ptr = stdlog.getConstSessionInfo();
4933  check_read_only("delete_dashboards");
4934  auto& cat = session_ptr->getCatalog();
4937  }
4938  // Checks will be performed in catalog
4939  try {
4940  cat.deleteMetadataForDashboards(dashboard_ids, session_ptr->get_currentUser());
4941  } catch (const std::exception& e) {
4942  THROW_DB_EXCEPTION(e.what());
4943  }
4944 }
4945 
4946 std::vector<std::string> DBHandler::get_valid_groups(const TSessionId& session_id_or_json,
4947  int32_t dashboard_id,
4948  std::vector<std::string> groups) {
4949  heavyai::RequestInfo const request_info(session_id_or_json);
4950  SET_REQUEST_ID(request_info.requestId());
4951  const auto session_info = get_session_copy(request_info.sessionId());
4952  auto& cat = session_info.getCatalog();
4953  auto dash = cat.getMetadataForDashboard(dashboard_id);
4954  if (!dash) {
4955  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
4956  " does not exist");
4957  } else if (session_info.get_currentUser().userId != dash->userId &&
4958  !session_info.get_currentUser().isSuper) {
4959  throw std::runtime_error(
4960  "User should be either owner of dashboard or super user to share/unshare it");
4961  }
4962  std::vector<std::string> valid_groups;
4964  for (auto& group : groups) {
4965  user_meta.isSuper = false; // initialize default flag
4966  if (!SysCatalog::instance().getGrantee(group)) {
4967  THROW_DB_EXCEPTION("User/Role " + group + " does not exist");
4968  } else if (!user_meta.isSuper) {
4969  valid_groups.push_back(group);
4970  }
4971  }
4972  return valid_groups;
4973 }
4974 
4975 void DBHandler::validateGroups(const std::vector<std::string>& groups) {
4976  for (auto const& group : groups) {
4977  if (!SysCatalog::instance().getGrantee(group)) {
4978  THROW_DB_EXCEPTION("User/Role '" + group + "' does not exist");
4979  }
4980  }
4981 }
4982 
4984  const Catalog_Namespace::SessionInfo& session_info,
4985  const std::vector<int32_t>& dashboard_ids) {
4986  auto& cat = session_info.getCatalog();
4987  std::map<std::string, std::list<int32_t>> errors;
4988  for (auto const& dashboard_id : dashboard_ids) {
4989  auto dashboard = cat.getMetadataForDashboard(dashboard_id);
4990  if (!dashboard) {
4991  errors["Dashboard id does not exist"].push_back(dashboard_id);
4992  } else if (session_info.get_currentUser().userId != dashboard->userId &&
4993  !session_info.get_currentUser().isSuper) {
4994  errors["User should be either owner of dashboard or super user to share/unshare it"]
4995  .push_back(dashboard_id);
4996  }
4997  }
4998  if (!errors.empty()) {
4999  std::stringstream error_stream;
5000  error_stream << "Share/Unshare dashboard(s) failed with error(s)\n";
5001  for (const auto& [error, id_list] : errors) {
5002  error_stream << "Dashboard ids " << join(id_list, ", ") << ": " << error << "\n";
5003  }
5004  THROW_DB_EXCEPTION(error_stream.str());
5005  }
5006 }
5007 
5008 void DBHandler::shareOrUnshareDashboards(const TSessionId& session_id,
5009  const std::vector<int32_t>& dashboard_ids,
5010  const std::vector<std::string>& groups,
5011  const TDashboardPermissions& permissions,
5012  const bool do_share) {
5013  auto stdlog = STDLOG(get_session_ptr(session_id));
5014  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5015  check_read_only(do_share ? "share_dashboards" : "unshare_dashboards");
5016  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
5017  !permissions.view_) {
5018  THROW_DB_EXCEPTION("At least one privilege should be assigned for " +
5019  std::string(do_share ? "grants" : "revokes"));
5020  }
5021  auto session_ptr = stdlog.getConstSessionInfo();
5022  auto const& catalog = session_ptr->getCatalog();
5023  auto& sys_catalog = SysCatalog::instance();
5024  validateGroups(groups);
5025  validateDashboardIdsForSharing(*session_ptr, dashboard_ids);
5026  std::vector<DBObject> batch_objects;
5027  for (auto const& dashboard_id : dashboard_ids) {
5028  DBObject object(dashboard_id, DBObjectType::DashboardDBObjectType);
5029  AccessPrivileges privs;
5030  if (permissions.delete_) {
5032  }
5033  if (permissions.create_) {
5035  }
5036  if (permissions.edit_) {
5038  }
5039  if (permissions.view_) {
5041  }
5042  object.setPrivileges(privs);
5043  batch_objects.push_back(object);
5044  }
5045  if (do_share) {
5046  sys_catalog.grantDBObjectPrivilegesBatch(groups, batch_objects, catalog);
5047  } else {
5048  sys_catalog.revokeDBObjectPrivilegesBatch(groups, batch_objects, catalog);
5049  }
5050 }
5051 
5052 void DBHandler::share_dashboards(const TSessionId& session_id_or_json,
5053  const std::vector<int32_t>& dashboard_ids,
5054  const std::vector<std::string>& groups,
5055  const TDashboardPermissions& permissions) {
5056  heavyai::RequestInfo const request_info(session_id_or_json);
5057  SET_REQUEST_ID(request_info.requestId());
5059  request_info.sessionId(), dashboard_ids, groups, permissions, true);
5060 }
5061 
5062 // NOOP: Grants not available for objects as of now
5063 void DBHandler::share_dashboard(const TSessionId& session_id_or_json,
5064  const int32_t dashboard_id,
5065  const std::vector<std::string>& groups,
5066  const std::vector<std::string>& objects,
5067  const TDashboardPermissions& permissions,
5068  const bool grant_role = false) {
5069  share_dashboards(session_id_or_json, {dashboard_id}, groups, permissions);
5070 }
5071 
5072 void DBHandler::unshare_dashboards(const TSessionId& session_id_or_json,
5073  const std::vector<int32_t>& dashboard_ids,
5074  const std::vector<std::string>& groups,
5075  const TDashboardPermissions& permissions) {
5076  heavyai::RequestInfo const request_info(session_id_or_json);
5077  SET_REQUEST_ID(request_info.requestId());
5079  request_info.sessionId(), dashboard_ids, groups, permissions, false);
5080 }
5081 
5082 void DBHandler::unshare_dashboard(const TSessionId& session_id_or_json,
5083  const int32_t dashboard_id,
5084  const std::vector<std::string>& groups,
5085  const std::vector<std::string>& objects,
5086  const TDashboardPermissions& permissions) {
5087  unshare_dashboards(session_id_or_json, {dashboard_id}, groups, permissions);
5088 }
5089 
5091  std::vector<TDashboardGrantees>& dashboard_grantees,
5092  const TSessionId& session_id_or_json,
5093  const int32_t dashboard_id) {
5094  heavyai::RequestInfo const request_info(session_id_or_json);
5095  SET_REQUEST_ID(request_info.requestId());
5096  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
5097  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5098  auto session_ptr = stdlog.getConstSessionInfo();
5099  auto const& cat = session_ptr->getCatalog();
5101  auto dash = cat.getMetadataForDashboard(dashboard_id);
5102  if (!dash) {
5103  THROW_DB_EXCEPTION("Dashboard id " + std::to_string(dashboard_id) +
5104  " does not exist");
5105  } else if (session_ptr->get_currentUser().userId != dash->userId &&
5106  !session_ptr->get_currentUser().isSuper) {
5108  "User should be either owner of dashboard or super user to access grantees");
5109  }
5110  std::vector<ObjectRoleDescriptor*> objectsList;
5111  objectsList = SysCatalog::instance().getMetadataForObject(
5112  cat.getCurrentDB().dbId,
5113  static_cast<int>(DBObjectType::DashboardDBObjectType),
5114  dashboard_id); // By default objecttypecan be only dashabaords
5115  user_meta.userId = -1;
5116  user_meta.userName = "";
5117  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
5118  for (auto object : objectsList) {
5119  if (user_meta.userName == object->roleName) {
5120  // Mask owner
5121  continue;
5122  }
5123  TDashboardGrantees grantee;
5124  TDashboardPermissions perm;
5125  grantee.name = object->roleName;
5126  grantee.is_user = object->roleType;
5127  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
5128  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
5129  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
5130  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
5131  grantee.permissions = perm;
5132  dashboard_grantees.push_back(grantee);
5133  }
5134 }
5135 
5136 void DBHandler::create_link(std::string& _return,
5137  const TSessionId& session_id_or_json,
5138  const std::string& view_state,
5139  const std::string& view_metadata) {
5140  heavyai::RequestInfo const request_info(session_id_or_json);
5141  SET_REQUEST_ID(request_info.requestId());
5142  auto stdlog = STDLOG(get_session_ptr(request_info.sessionId()));
5143  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5144  auto session_ptr = stdlog.getConstSessionInfo();
5145  // check_read_only("create_link");
5146  auto& cat = session_ptr->getCatalog();
5147 
5148  LinkDescriptor ld;
5149  ld.userId = session_ptr->get_currentUser().userId;
5150  ld.viewState = view_state;
5151  ld.viewMetadata = view_metadata;
5152 
5153  try {
5154  _return = cat.createLink(ld, 6);
5155  } catch (const std::exception& e) {
5156  THROW_DB_EXCEPTION(e.what());
5157  }
5158 }
5159 
5161  const std::string& name,
5162  const bool is_array) {
5163  TColumnType ct;
5164  ct.col_name = name;
5165  ct.col_type.type = type;
5166  ct.col_type.is_array = is_array;
5167  return ct;
5168 }
5169 
5170 void DBHandler::check_geospatial_files(const boost::filesystem::path file_path,
5171  const import_export::CopyParams& copy_params) {
5172  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
5173  if (std::find(shp_ext.begin(),
5174  shp_ext.end(),
5175  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
5176  shp_ext.end()) {
5177  for (auto ext : shp_ext) {
5178  auto aux_file = file_path;
5180  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
5181  copy_params) &&
5183  aux_file.replace_extension(ext).string(), copy_params)) {
5184  throw std::runtime_error("required file for shapefile does not exist: " +
5185  aux_file.filename().string());
5186  }
5187  }
5188  }
5189 }
5190 
5191 void DBHandler::create_table(const TSessionId& session_id_or_json,
5192  const std::string& table_name,
5193  const TRowDescriptor& rd,
5194  const TCreateParams& create_params) {
5195  heavyai::RequestInfo request_info(session_id_or_json);
5196  SET_REQUEST_ID(request_info.requestId());
5197  auto stdlog = STDLOG("table_name", table_name);
5198  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5199  check_read_only("create_table");
5200 
5201  if (ImportHelpers::is_reserved_name(table_name)) {
5202  THROW_DB_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
5203  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
5204  THROW_DB_EXCEPTION("Invalid characters in table name: " + table_name);
5205  }
5206 
5207  auto rds = rd;
5208 
5209  std::string stmt{"CREATE TABLE " + table_name};
5210  std::vector<std::string> col_stmts;
5211 
5212  for (auto col : rds) {
5213  if (ImportHelpers::is_reserved_name(col.col_name)) {
5214  THROW_DB_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
5215  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
5216  THROW_DB_EXCEPTION("Invalid characters in column name: " + col.col_name);
5217  }
5218  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
5219  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
5220  THROW_DB_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
5221  " for column: " + col.col_name);
5222  }
5223 
5224  if (col.col_type.type == TDatumType::DECIMAL) {
5225  // if no precision or scale passed in set to default 14,7
5226  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
5227  col.col_type.precision = 14;
5228  col.col_type.scale = 7;
5229  }
5230  }
5231 
5232  std::string col_stmt;
5233  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
5234  if (col.__isset.default_value) {
5235  col_stmt.append(" DEFAULT " + col.default_value);
5236  }
5237 
5238  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
5239  // `nullable` argument, leading this to default to false. Uncomment for v2.
5240  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
5241 
5242  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
5243  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
5244  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
5245  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
5246  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT ||
5247  thrift_to_encoding(col.col_type.encoding) == kENCODING_DATE_IN_DAYS) {
5248  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
5249  }
5250  } else if (col.col_type.type == TDatumType::STR) {
5251  // non DICT encoded strings
5252  col_stmt.append(" ENCODING NONE");
5253  } else if (col.col_type.type == TDatumType::POINT ||
5254  col.col_type.type == TDatumType::MULTIPOINT ||
5255  col.col_type.type == TDatumType::LINESTRING ||
5256  col.col_type.type == TDatumType::MULTILINESTRING ||
5257  col.col_type.type == TDatumType::POLYGON ||
5258  col.col_type.type == TDatumType::MULTIPOLYGON) {
5259  // non encoded compressable geo
5260  if (col.col_type.scale == 4326) {
5261  col_stmt.append(" ENCODING NONE");
5262  }
5263  }
5264  col_stmts.push_back(col_stmt);
5265  }
5266 
5267  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
5268 
5269  if (create_params.is_replicated) {
5270  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
5271  }
5272 
5273  stmt.append(";");
5274 
5275  TQueryResult ret;
5276  request_info.setRequestId(logger::request_id());
5277  sql_execute(ret, request_info.json(), stmt, true, "", -1, -1);
5278 }
5279 
5280 void DBHandler::import_table(const TSessionId& session_id_or_json,
5281  const std::string& table_name,
5282  const std::string& file_name_in,
5283  const TCopyParams& cp) {
5284  try {
5285  heavyai::RequestInfo const request_info(session_id_or_json);
5286  SET_REQUEST_ID(request_info.requestId());
5287  auto stdlog =
5288  STDLOG(get_session_ptr(request_info.sessionId()), "table_name", table_name);
5289  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
5290  auto session_ptr = stdlog.getConstSessionInfo();
5291  check_read_only("import_table");
5292  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
5293 
5294  const auto execute_read_lock = legacylockmgr::getExecuteReadLock();
5295  auto& cat = session_ptr->getCatalog();
5297  auto start_time = ::toString(std::chrono::system_clock::now());
5299  executor->enrollQuerySession(request_info.sessionId(),
5300  "IMPORT_TABLE",
5301  start_time,
5303  QuerySessionStatus::QueryStatus::RUNNING_IMPORTER);
5304  }
5305 
5306  ScopeGuard clearInterruptStatus = [executor, &request_info, &start_time] {
5307  // reset the runtime query interrupt status
5309  executor->clearQuerySessionStatus(request_info.sessionId(), start_time);
5310  }
5311  };
5312  const auto td_with_lock =
5314  cat, table_name);
5315  const auto td = td_with_lock();
5316  CHECK(td);
5317  check_table_load_privileges(*session_ptr, table_name);
5318 
5319  std::string copy_from_source;
5321  if (copy_params.source_type == import_export::SourceType::kOdbc) {
5322  copy_from_source = copy_params.sql_select;
5323  } else {
5324  std::string file_name{file_name_in};
5325  auto file_path = boost::filesystem::path(file_name);
5326  if (!boost::istarts_with(file_name, "s3://")) {
5327  if (!boost::filesystem::path(file_name).is_absolute()) {
5328  file_path = import_path_ /
5329  picosha2::hash256_hex_string(request_info.sessionId()) /
5330  boost::filesystem::path(file_name).filename();
5331  file_name = file_path.string();
5332  }
5333  if (!shared::file_or_glob_path_exists(file_path.string())) {
5334  THROW_DB_EXCEPTION("File or directory \"" + file_path.string() +
5335  "\" does not exist.");
5336  }
5337  }
5339 
5340  // TODO(andrew): add delimiter detection to Importer
5341  if (copy_params.delimiter == '\0') {
5342  copy_params.delimiter = ',';
5343  if (boost::filesystem::path(file_path).extension() == ".tsv") {
5344  copy_params.delimiter = '\t';
5345  }
5346  }
5347  copy_from_source = file_path.string();
5348  }
5349  auto const load_tag = get_import_tag("import_table", table_name, copy_from_source);
5350  log_system_cpu_memory_status("start_" + load_tag, session_ptr->getCatalog());
5351  ScopeGuard cleanup = [&load_tag, &session_ptr]() {
5352  log_system_cpu_memory_status("finish_" + load_tag, session_ptr->getCatalog());
5353  };
5354  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
5355  session_ptr->getCatalog(), table_name);
5356  std::unique_ptr<import_export::AbstractImporter> importer;
5357  importer = import_export::create_importer(cat, td, copy_from_source, copy_params);
5358  auto ms = measure<>::execution([&]() { importer->import(session_ptr.get()); });
5359  LOG(INFO) << "Total Import Time: " << (double)ms / 1000.0 << " Seconds.";
5360  } catch (const TDBException& e) {
5361  throw;
5362  } catch (const std::exception& e) {
5363  THROW_DB_EXCEPTION(std::string(e.what()));
5364  }
5365 }
5366 
5367 namespace {
5368 
5369 // helper functions for error checking below
5370 // these would usefully be added as methods of TDatumType
5371 // but that's not possible as it's auto-generated by Thrift
5372