OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
MapDHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: MapDHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "MapDHandler.h"
25 #include "DistributedLoader.h"
26 #include "MapDServer.h"
28 #include "TokenCompletionHints.h"
29 
30 #ifdef HAVE_PROFILER
31 #include <gperftools/heap-profiler.h>
32 #endif // HAVE_PROFILER
33 
34 #include "MapDRelease.h"
35 
36 #include "Calcite/Calcite.h"
37 #include "gen-cpp/CalciteServer.h"
38 
40 
41 #include "Catalog/Catalog.h"
43 #include "Import/Importer.h"
44 #include "LockMgr/TableLockMgr.h"
45 #include "MapDDistributedHandler.h"
46 #include "MapDRenderHandler.h"
47 #include "Parser/ParserWrapper.h"
49 #include "Parser/parser.h"
50 #include "Planner/Planner.h"
53 #include "QueryEngine/Execute.h"
62 #include "Shared/StringTransform.h"
63 #include "Shared/SysInfo.h"
64 #include "Shared/geo_types.h"
65 #include "Shared/geosupport.h"
66 #include "Shared/import_helpers.h"
68 #include "Shared/measure.h"
69 #include "Shared/scope.h"
70 
71 #include <fcntl.h>
72 #include <picosha2.h>
73 #include <sys/time.h>
74 #include <sys/types.h>
75 #include <sys/wait.h>
76 #include <unistd.h>
77 #include <algorithm>
78 #include <boost/algorithm/string.hpp>
79 #include <boost/filesystem.hpp>
80 #include <boost/make_shared.hpp>
81 #include <boost/process/search_path.hpp>
82 #include <boost/program_options.hpp>
83 #include <boost/regex.hpp>
84 #include <boost/tokenizer.hpp>
85 #include <cmath>
86 #include <csignal>
87 #include <fstream>
88 #include <future>
89 #include <map>
90 #include <memory>
91 #include <random>
92 #include <regex>
93 #include <string>
94 #include <thread>
95 #include <typeinfo>
96 
97 #include <arrow/api.h>
98 #include <arrow/io/api.h>
99 #include <arrow/ipc/api.h>
100 
101 #include "QueryEngine/ArrowUtil.h"
102 
103 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
104 
107 using namespace Lock_Namespace;
108 
109 #define INVALID_SESSION_ID ""
110 
111 #define THROW_MAPD_EXCEPTION(errstr) \
112  { \
113  TMapDException ex; \
114  ex.error_msg = errstr; \
115  LOG(ERROR) << ex.error_msg; \
116  throw ex; \
117  }
118 
119 namespace {
120 
121 SessionMap::iterator get_session_from_map(const TSessionId& session,
122  SessionMap& session_map) {
123  auto session_it = session_map.find(session);
124  if (session_it == session_map.end()) {
125  THROW_MAPD_EXCEPTION("Session not valid.");
126  }
127  return session_it;
128 }
129 
130 struct ForceDisconnect : public std::runtime_error {
131  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
132 };
133 
134 } // namespace
135 
136 template <>
138  const TSessionId& session,
139  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
140  auto session_it = get_session_from_map(session, sessions_);
141  try {
142  check_session_exp_unsafe(session_it);
143  } catch (const ForceDisconnect& e) {
144  read_lock.unlock();
145  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
146  auto session_it2 = get_session_from_map(session, sessions_);
147  disconnect_impl(session_it2);
148  THROW_MAPD_EXCEPTION(e.what());
149  }
150  return session_it;
151 }
152 
153 template <>
155  const TSessionId& session,
156  mapd_lock_guard<mapd_shared_mutex>& write_lock) {
157  auto session_it = get_session_from_map(session, sessions_);
158  try {
159  check_session_exp_unsafe(session_it);
160  } catch (const ForceDisconnect& e) {
161  disconnect_impl(session_it);
162  THROW_MAPD_EXCEPTION(e.what());
163  }
164  return session_it;
165 }
166 
167 MapDHandler::MapDHandler(const std::vector<LeafHostInfo>& db_leaves,
168  const std::vector<LeafHostInfo>& string_leaves,
169  const std::string& base_data_path,
170  const bool cpu_only,
171  const bool allow_multifrag,
172  const bool jit_debug,
173  const bool intel_jit_profile,
174  const bool read_only,
175  const bool allow_loop_joins,
176  const bool enable_rendering,
177  const bool enable_auto_clear_render_mem,
178  const int render_oom_retry_threshold,
179  const size_t render_mem_bytes,
180  const int num_gpus,
181  const int start_gpu,
182  const size_t reserved_gpu_mem,
183  const size_t num_reader_threads,
184  const AuthMetadata& authMetadata,
185  const MapDParameters& mapd_parameters,
186  const bool legacy_syntax,
187  const int idle_session_duration,
188  const int max_session_duration,
189  const bool enable_runtime_udf_registration,
190  const std::string& udf_filename)
191  : leaf_aggregator_(db_leaves)
192  , string_leaves_(string_leaves)
193  , base_data_path_(base_data_path)
194  , random_gen_(std::random_device{}())
195  , session_id_dist_(0, INT32_MAX)
196  , jit_debug_(jit_debug)
197  , intel_jit_profile_(intel_jit_profile)
198  , allow_multifrag_(allow_multifrag)
199  , read_only_(read_only)
200  , allow_loop_joins_(allow_loop_joins)
201  , authMetadata_(authMetadata)
202  , mapd_parameters_(mapd_parameters)
203  , legacy_syntax_(legacy_syntax)
204  , super_user_rights_(false)
205  , idle_session_duration_(idle_session_duration * 60)
206  , max_session_duration_(max_session_duration * 60)
207  , runtime_udf_registration_enabled_(enable_runtime_udf_registration)
208  , _was_geo_copy_from(false) {
209  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
210  bool is_rendering_enabled = enable_rendering;
211 
212  try {
213  if (cpu_only) {
214  is_rendering_enabled = false;
215  executor_device_type_ = ExecutorDeviceType::CPU;
216  cpu_mode_only_ = true;
217  } else {
218 #ifdef HAVE_CUDA
219  executor_device_type_ = ExecutorDeviceType::GPU;
220  cpu_mode_only_ = false;
221 #else
222  executor_device_type_ = ExecutorDeviceType::CPU;
223  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
224  cpu_mode_only_ = true;
225  is_rendering_enabled = false;
226 #endif
227  }
228  } catch (const std::exception& e) {
229  LOG(FATAL) << "Failed to executor device type: " << e.what();
230  }
231 
232  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
233  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
234  // manage cannot ask for
235  size_t total_reserved = reserved_gpu_mem;
236  if (is_rendering_enabled) {
237  total_reserved += render_mem_bytes;
238  }
239 
240  try {
241  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
242  mapd_parameters,
243  !cpu_mode_only_,
244  num_gpus,
245  start_gpu,
246  total_reserved,
247  num_reader_threads));
248  } catch (const std::exception& e) {
249  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
250  }
251 
252  std::string udf_ast_filename("");
253 
254  try {
255  if (!udf_filename.empty()) {
256  UdfCompiler compiler(udf_filename);
257  int compile_result = compiler.compileUdf();
258 
259  if (compile_result == 0) {
260  udf_ast_filename = compiler.getAstFileName();
261  }
262  }
263  } catch (const std::exception& e) {
264  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
265  }
266 
267  try {
268  calcite_ =
269  std::make_shared<Calcite>(mapd_parameters, base_data_path_, udf_ast_filename);
270  } catch (const std::exception& e) {
271  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
272  }
273 
274  try {
275  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
276  if (!udf_filename.empty()) {
277  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
278  }
279  } catch (const std::exception& e) {
280  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
281  }
282 
283  try {
285  } catch (const std::exception& e) {
286  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
287  }
288 
289  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
290  executor_device_type_ = ExecutorDeviceType::CPU;
291  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
292  cpu_mode_only_ = true;
293  }
294 
295  switch (executor_device_type_) {
297  LOG(INFO) << "Started in GPU mode" << std::endl;
298  break;
300  LOG(INFO) << "Started in CPU mode" << std::endl;
301  break;
302  }
303 
304  try {
305  SysCatalog::instance().init(base_data_path_,
306  data_mgr_,
307  authMetadata,
308  calcite_,
309  false,
310  !db_leaves.empty(),
311  string_leaves_);
312  } catch (const std::exception& e) {
313  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
314  }
315 
316  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
317  start_time_ = std::time(nullptr);
318 
319  if (is_rendering_enabled) {
320  try {
321  render_handler_.reset(new MapDRenderHandler(
322  this, render_mem_bytes, num_gpus, start_gpu, mapd_parameters_));
323  } catch (const std::exception& e) {
324  LOG(ERROR) << "Backend rendering disabled: " << e.what();
325  }
326  }
327 
328  if (leaf_aggregator_.leafCount() > 0) {
329  try {
330  agg_handler_.reset(new MapDAggHandler(this));
331  } catch (const std::exception& e) {
332  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
333  }
334  } else if (g_cluster) {
335  try {
336  leaf_handler_.reset(new MapDLeafHandler(this));
337  } catch (const std::exception& e) {
338  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
339  }
340  }
341 }
342 
344 
345 void MapDHandler::check_read_only(const std::string& str) {
347  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
348  }
349 }
350 
352  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
353  // We would create an in memory session for calcite with super user privileges which
354  // would be used for getting all tables metadata when a user runs the query. The
355  // session would be under the name of a proxy user/password which would only persist
356  // till server's lifetime or execution of calcite query(in memory) whichever is the
357  // earliest.
358  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
359  std::string session_id;
360  do {
361  session_id = generate_random_string(64);
362  } while (sessions_.find(session_id) != sessions_.end());
363  Catalog_Namespace::UserMetadata user_meta(-1,
364  calcite_->getInternalSessionProxyUserName(),
365  calcite_->getInternalSessionProxyPassword(),
366  true,
367  -1,
368  true);
369  const auto emplace_ret =
370  sessions_.emplace(session_id,
371  std::make_shared<Catalog_Namespace::SessionInfo>(
372  catalog_ptr, user_meta, executor_device_type_, session_id));
373  CHECK(emplace_ret.second);
374  return session_id;
375 }
376 
378  const Catalog_Namespace::UserMetadata user_meta) {
379  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
380  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
381  user_meta.isSuper.load();
382 }
383 
384 void MapDHandler::removeInMemoryCalciteSession(const std::string& session_id) {
385  // Remove InMemory calcite Session.
386  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
387  const auto it = sessions_.find(session_id);
388  CHECK(it != sessions_.end());
389  sessions_.erase(it);
390 }
391 // internal connection for connections with no password
392 void MapDHandler::internal_connect(TSessionId& session,
393  const std::string& username,
394  const std::string& dbname) {
395  auto stdlog = STDLOG(); // session_info set by connect_impl()
396  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
397  std::string username2 = username; // login() may reset username given as argument
398  std::string dbname2 = dbname; // login() may reset dbname given as argument
400  std::shared_ptr<Catalog> cat = nullptr;
401  try {
402  cat = SysCatalog::instance().login(
403  dbname2, username2, std::string(""), user_meta, false);
404  } catch (std::exception& e) {
405  THROW_MAPD_EXCEPTION(e.what());
406  }
407 
408  DBObject dbObject(dbname2, DatabaseDBObjectType);
409  dbObject.loadKey(*cat);
411  std::vector<DBObject> dbObjects;
412  dbObjects.push_back(dbObject);
413  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
414  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
415  " is not allowed to access database " + dbname2 + ".");
416  }
417  connect_impl(session, std::string(""), dbname2, user_meta, cat, stdlog);
418 }
419 
420 void MapDHandler::krb5_connect(TKrb5Session& session,
421  const std::string& inputToken,
422  const std::string& dbname) {
423  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
424 }
425 
426 void MapDHandler::connect(TSessionId& session,
427  const std::string& username,
428  const std::string& passwd,
429  const std::string& dbname) {
430  auto stdlog = STDLOG(); // session_info set by connect_impl()
431  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
432  std::string username2 = username; // login() may reset username given as argument
433  std::string dbname2 = dbname; // login() may reset dbname given as argument
435  std::shared_ptr<Catalog> cat = nullptr;
436  try {
437  cat = SysCatalog::instance().login(
438  dbname2, username2, passwd, user_meta, !super_user_rights_);
439  } catch (std::exception& e) {
440  THROW_MAPD_EXCEPTION(e.what());
441  }
442 
443  DBObject dbObject(dbname2, DatabaseDBObjectType);
444  dbObject.loadKey(*cat);
446  std::vector<DBObject> dbObjects;
447  dbObjects.push_back(dbObject);
448  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
449  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
450  " is not allowed to access database " + dbname2 + ".");
451  }
452  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
453  // if pki auth session will come back encrypted with user pubkey
454  SysCatalog::instance().check_for_session_encryption(passwd, session);
455 }
456 
457 void MapDHandler::connect_impl(TSessionId& session,
458  const std::string& passwd,
459  const std::string& dbname,
461  std::shared_ptr<Catalog> cat,
462  query_state::StdLog& stdlog) {
463  do {
464  session = generate_random_string(32);
465  } while (sessions_.find(session) != sessions_.end());
466  std::pair<SessionMap::iterator, bool> emplace_retval =
467  sessions_.emplace(session,
468  std::make_shared<Catalog_Namespace::SessionInfo>(
469  cat, user_meta, executor_device_type_, session));
470  CHECK(emplace_retval.second);
471  auto& session_ptr = emplace_retval.first->second;
472  stdlog.setSessionInfo(session_ptr);
473  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time while
474  // doing warmup
475  if (leaf_aggregator_.leafCount() > 0) {
476  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
477  return;
478  }
479  }
480  auto const roles = session_ptr->get_currentUser().isSuper
481  ? std::vector<std::string>{{"super"}}
482  : SysCatalog::instance().getRoles(
483  false, false, session_ptr->get_currentUser().userName);
484  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
485  LOG(INFO) << "User " << user_meta.userName << " connected to database " << dbname
486  << " with public_session_id " << session_ptr->get_public_session_id();
487 }
488 
489 void MapDHandler::disconnect(const TSessionId& session) {
490  auto stdlog = STDLOG();
491  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
492  auto session_it = get_session_it_unsafe(session, write_lock);
493  stdlog.setSessionInfo(session_it->second);
494  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
495  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
496  << " disconnected from database " << dbname << std::endl;
497  disconnect_impl(session_it);
498 }
499 
500 void MapDHandler::disconnect_impl(const SessionMap::iterator& session_it) {
501  // session_it existence should already have been checked (i.e. called via
502  // get_session_it_unsafe(...))
503  const auto session_id = session_it->second->get_session_id();
504  if (leaf_aggregator_.leafCount() > 0) {
505  leaf_aggregator_.disconnect(session_id);
506  }
507  if (render_handler_) {
508  render_handler_->disconnect(session_id);
509  }
510  sessions_.erase(session_it);
511 }
512 
513 void MapDHandler::switch_database(const TSessionId& session, const std::string& dbname) {
514  auto stdlog = STDLOG(get_session_ptr(session));
515  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
516  auto session_it = get_session_it_unsafe(session, write_lock);
517 
518  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
519 
520  try {
521  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
522  dbname2, session_it->second->get_currentUser().userName);
523  session_it->second->set_catalog_ptr(cat);
524  } catch (std::exception& e) {
525  THROW_MAPD_EXCEPTION(e.what());
526  }
527 }
528 
529 void MapDHandler::interrupt(const TSessionId& session) {
530  auto stdlog = STDLOG(get_session_ptr(session));
532  // Shared lock to allow simultaneous interrupts of multiple sessions
533  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
534  if (leaf_aggregator_.leafCount() > 0) {
535  leaf_aggregator_.interrupt(session);
536  }
537 
538  auto session_it = get_session_it_unsafe(session, read_lock);
539  auto& cat = session_it->second.get()->getCatalog();
540  const auto dbname = cat.getCurrentDB().dbName;
541  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
542  jit_debug_ ? "/tmp" : "",
543  jit_debug_ ? "mapdquery" : "",
545  nullptr);
546  CHECK(executor);
547 
548  VLOG(1) << "Received interrupt: "
549  << "Session " << *session_it->second << ", Executor " << executor
550  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
551  << session_it->second->get_currentUser().userName << ", Database " << dbname
552  << std::endl;
553 
554  executor->interrupt();
555 
556  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
557  << " interrupted session with database " << dbname << std::endl;
558  }
559 }
560 
561 void MapDHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
562  auto stdlog = STDLOG(get_session_ptr(session));
563  const auto rendering_enabled = bool(render_handler_);
564  _return.read_only = read_only_;
565  _return.version = MAPD_RELEASE;
566  _return.rendering_enabled = rendering_enabled;
567  _return.poly_rendering_enabled = rendering_enabled;
568  _return.start_time = start_time_;
569  _return.edition = MAPD_EDITION;
570  _return.host_name = get_hostname();
571 }
572 
573 void MapDHandler::get_status(std::vector<TServerStatus>& _return,
574  const TSessionId& session) {
575  auto stdlog = STDLOG(get_session_ptr(session));
576  const auto rendering_enabled = bool(render_handler_);
577  TServerStatus ret;
578  ret.read_only = read_only_;
579  ret.version = MAPD_RELEASE;
580  ret.rendering_enabled = rendering_enabled;
581  ret.poly_rendering_enabled = rendering_enabled;
582  ret.start_time = start_time_;
583  ret.edition = MAPD_EDITION;
584  ret.host_name = get_hostname();
585 
586  // TSercivePort tcp_port{}
587 
588  if (g_cluster) {
589  ret.role =
590  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
591  } else {
592  ret.role = TRole::type::SERVER;
593  }
594 
595  _return.push_back(ret);
596  if (leaf_aggregator_.leafCount() > 0) {
597  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
598  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
599  }
600 }
601 
602 void MapDHandler::get_hardware_info(TClusterHardwareInfo& _return,
603  const TSessionId& session) {
604  auto stdlog = STDLOG(get_session_ptr(session));
605  THardwareInfo ret;
606  const auto cuda_mgr = data_mgr_->getCudaMgr();
607  if (cuda_mgr) {
608  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
609  ret.start_gpu = cuda_mgr->getStartGpu();
610  if (ret.start_gpu >= 0) {
611  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
612  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
613  }
614  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
615  TGpuSpecification gpu_spec;
616  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
617  gpu_spec.num_sm = deviceProperties->numMPs;
618  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
619  gpu_spec.memory = deviceProperties->globalMem;
620  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
621  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
622  ret.gpu_info.push_back(gpu_spec);
623  }
624  }
625 
626  // start hardware/OS dependent code
627  ret.num_cpu_hw = std::thread::hardware_concurrency();
628  // ^ This might return diffrent results in case of hyper threading
629  // end hardware/OS dependent code
630 
631  _return.hardware_info.push_back(ret);
632  if (leaf_aggregator_.leafCount() > 0) {
633  ret.host_name = "aggregator";
634  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
635  _return.hardware_info.insert(_return.hardware_info.end(),
636  leaf_hardware.hardware_info.begin(),
637  leaf_hardware.hardware_info.end());
638  }
639 }
640 
641 void MapDHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
642  auto session_ptr = get_session_ptr(session);
643  auto stdlog = STDLOG(session_ptr);
644  auto user_metadata = session_ptr->get_currentUser();
645 
646  _return.user = user_metadata.userName;
647  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
648  _return.start_time = session_ptr->get_start_time();
649  _return.is_super = user_metadata.isSuper;
650 }
651 
653  const SQLTypeInfo& ti,
654  TColumn& column) {
655  if (ti.is_array()) {
656  TColumn tColumn;
657  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
658  CHECK(array_tv);
659  bool is_null = !array_tv->is_initialized();
660  if (!is_null) {
661  const auto& vec = array_tv->get();
662  for (const auto& elem_tv : vec) {
663  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
664  }
665  }
666  column.data.arr_col.push_back(tColumn);
667  column.nulls.push_back(is_null && !ti.get_notnull());
668  } else if (ti.is_geometry()) {
669  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
670  if (scalar_tv) {
671  auto s_n = boost::get<NullableString>(scalar_tv);
672  auto s = boost::get<std::string>(s_n);
673  if (s) {
674  column.data.str_col.push_back(*s);
675  } else {
676  column.data.str_col.emplace_back(""); // null string
677  auto null_p = boost::get<void*>(s_n);
678  CHECK(null_p && !*null_p);
679  }
680  column.nulls.push_back(!s && !ti.get_notnull());
681  } else {
682  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
683  CHECK(array_tv);
684  bool is_null = !array_tv->is_initialized();
685  if (!is_null) {
686  auto elem_type = SQLTypeInfo(kDOUBLE, false);
687  TColumn tColumn;
688  const auto& vec = array_tv->get();
689  for (const auto& elem_tv : vec) {
690  value_to_thrift_column(elem_tv, elem_type, tColumn);
691  }
692  column.data.arr_col.push_back(tColumn);
693  column.nulls.push_back(false);
694  } else {
695  TColumn tColumn;
696  column.data.arr_col.push_back(tColumn);
697  column.nulls.push_back(is_null && !ti.get_notnull());
698  }
699  }
700  } else {
701  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
702  CHECK(scalar_tv);
703  if (boost::get<int64_t>(scalar_tv)) {
704  int64_t data = *(boost::get<int64_t>(scalar_tv));
705 
706  if (is_member_of_typeset<kNUMERIC, kDECIMAL>(ti)) {
707  double val = static_cast<double>(data);
708  if (ti.get_scale() > 0) {
709  val /= pow(10.0, std::abs(ti.get_scale()));
710  }
711  column.data.real_col.push_back(val);
712  } else {
713  column.data.int_col.push_back(data);
714  }
715 
716  switch (ti.get_type()) {
717  case kBOOLEAN:
718  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
719  break;
720  case kTINYINT:
721  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
722  break;
723  case kSMALLINT:
724  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
725  break;
726  case kINT:
727  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
728  break;
729  case kNUMERIC:
730  case kDECIMAL:
731  case kBIGINT:
732  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
733  break;
734  case kTIME:
735  case kTIMESTAMP:
736  case kDATE:
737  case kINTERVAL_DAY_TIME:
739  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
740  break;
741  default:
742  column.nulls.push_back(false);
743  }
744  } else if (boost::get<double>(scalar_tv)) {
745  double data = *(boost::get<double>(scalar_tv));
746  column.data.real_col.push_back(data);
747  if (ti.get_type() == kFLOAT) {
748  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
749  } else {
750  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
751  }
752  } else if (boost::get<float>(scalar_tv)) {
753  CHECK_EQ(kFLOAT, ti.get_type());
754  float data = *(boost::get<float>(scalar_tv));
755  column.data.real_col.push_back(data);
756  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
757  } else if (boost::get<NullableString>(scalar_tv)) {
758  auto s_n = boost::get<NullableString>(scalar_tv);
759  auto s = boost::get<std::string>(s_n);
760  if (s) {
761  column.data.str_col.push_back(*s);
762  } else {
763  column.data.str_col.emplace_back(""); // null string
764  auto null_p = boost::get<void*>(s_n);
765  CHECK(null_p && !*null_p);
766  }
767  column.nulls.push_back(!s && !ti.get_notnull());
768  } else {
769  CHECK(false);
770  }
771  }
772 }
773 
775  TDatum datum;
776  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
777  if (!scalar_tv) {
778  CHECK(ti.is_array());
779  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
780  CHECK(array_tv);
781  if (array_tv->is_initialized()) {
782  const auto& vec = array_tv->get();
783  for (const auto& elem_tv : vec) {
784  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
785  datum.val.arr_val.push_back(scalar_col_val);
786  }
787  // Datum is not null, at worst it's an empty array Datum
788  datum.is_null = false;
789  } else {
790  datum.is_null = true;
791  }
792  return datum;
793  }
794  if (boost::get<int64_t>(scalar_tv)) {
795  int64_t data = *(boost::get<int64_t>(scalar_tv));
796 
797  if (is_member_of_typeset<kNUMERIC, kDECIMAL>(ti)) {
798  double val = static_cast<double>(data);
799  if (ti.get_scale() > 0) {
800  val /= pow(10.0, std::abs(ti.get_scale()));
801  }
802  datum.val.real_val = val;
803  } else {
804  datum.val.int_val = data;
805  }
806 
807  switch (ti.get_type()) {
808  case kBOOLEAN:
809  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
810  break;
811  case kTINYINT:
812  datum.is_null = (datum.val.int_val == NULL_TINYINT);
813  break;
814  case kSMALLINT:
815  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
816  break;
817  case kINT:
818  datum.is_null = (datum.val.int_val == NULL_INT);
819  break;
820  case kDECIMAL:
821  case kNUMERIC:
822  case kBIGINT:
823  datum.is_null = (datum.val.int_val == NULL_BIGINT);
824  break;
825  case kTIME:
826  case kTIMESTAMP:
827  case kDATE:
828  case kINTERVAL_DAY_TIME:
830  datum.is_null = (datum.val.int_val == NULL_BIGINT);
831  break;
832  default:
833  datum.is_null = false;
834  }
835  } else if (boost::get<double>(scalar_tv)) {
836  datum.val.real_val = *(boost::get<double>(scalar_tv));
837  if (ti.get_type() == kFLOAT) {
838  datum.is_null = (datum.val.real_val == NULL_FLOAT);
839  } else {
840  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
841  }
842  } else if (boost::get<float>(scalar_tv)) {
843  CHECK_EQ(kFLOAT, ti.get_type());
844  datum.val.real_val = *(boost::get<float>(scalar_tv));
845  datum.is_null = (datum.val.real_val == NULL_FLOAT);
846  } else if (boost::get<NullableString>(scalar_tv)) {
847  auto s_n = boost::get<NullableString>(scalar_tv);
848  auto s = boost::get<std::string>(s_n);
849  if (s) {
850  datum.val.str_val = *s;
851  } else {
852  auto null_p = boost::get<void*>(s_n);
853  CHECK(null_p && !*null_p);
854  }
855  datum.is_null = !s;
856  } else {
857  CHECK(false);
858  }
859  return datum;
860 }
861 
862 void MapDHandler::sql_execute(TQueryResult& _return,
863  const TSessionId& session,
864  const std::string& query_str,
865  const bool column_format,
866  const std::string& nonce,
867  const int32_t first_n,
868  const int32_t at_most_n) {
869  auto session_ptr = get_session_ptr(session);
870  auto query_state = create_query_state(session_ptr, query_str);
871  auto stdlog = STDLOG(query_state);
872 
873  ScopeGuard reset_was_geo_copy_from = [&] { _was_geo_copy_from = false; };
874 
875  if (first_n >= 0 && at_most_n >= 0) {
876  THROW_MAPD_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
877  }
878 
879  if (leaf_aggregator_.leafCount() > 0) {
880  if (!agg_handler_) {
881  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
882  }
883  _return.total_time_ms = measure<>::execution([&]() {
884  try {
885  agg_handler_->cluster_execute(_return,
886  query_state->createQueryStateProxy(),
887  query_state->getQueryStr(),
888  column_format,
889  nonce,
890  first_n,
891  at_most_n,
893  } catch (std::exception& e) {
894  const auto mapd_exception = dynamic_cast<const TMapDException*>(&e);
895  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
897  thrift_exception ? std::string(thrift_exception->what())
898  : mapd_exception ? mapd_exception->error_msg
899  : std::string("Exception: ") + e.what());
900  }
901  _return.nonce = nonce;
902  });
903  } else {
904  _return.total_time_ms = measure<>::execution([&]() {
906  query_state->createQueryStateProxy(),
907  column_format,
908  nonce,
909  session_ptr->get_executor_device_type(),
910  first_n,
911  at_most_n);
912  });
913  }
914 
915  // if the SQL statement we just executed was a geo COPY FROM, the import
916  // parameters were captured, and this flag set, so we do the actual import here
917  if (_was_geo_copy_from) {
918  // import_geo_table() calls create_table() which calls this function to
919  // do the work, so reset the flag now to avoid executing this part a
920  // second time at the end of that, which would fail as the table was
921  // already created! Also reset the flag with a ScopeGuard on exiting
922  // this function any other way, such as an exception from the code above!
923  _was_geo_copy_from = false;
924 
925  // create table as replicated?
926  TCreateParams create_params;
927  if (_geo_copy_from_partitions == "REPLICATED") {
928  create_params.is_replicated = true;
929  }
930 
931  // now do (and time) the import
932  _return.total_time_ms = measure<>::execution([&]() {
933  import_geo_table(session,
937  TRowDescriptor(),
938  create_params);
939  });
940  }
941  stdlog.appendNameValuePairs("execution_time_ms",
942  _return.execution_time_ms,
943  "total_time_ms", // BE-3420 - Redundant with duration field
944  stdlog.duration<std::chrono::milliseconds>());
945 }
946 
947 void MapDHandler::sql_execute_df(TDataFrame& _return,
948  const TSessionId& session,
949  const std::string& query_str,
950  const TDeviceType::type device_type,
951  const int32_t device_id,
952  const int32_t first_n) {
953  auto session_ptr = get_session_ptr(session);
954  auto query_state = create_query_state(session_ptr, query_str);
955  auto stdlog = STDLOG(session_ptr, query_state);
956 
957  if (device_type == TDeviceType::GPU) {
958  const auto executor_device_type = session_ptr->get_executor_device_type();
959  if (executor_device_type != ExecutorDeviceType::GPU) {
961  std::string("Exception: GPU mode is not allowed in this session"));
962  }
963  if (!data_mgr_->gpusPresent()) {
964  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
965  }
966  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
968  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
969  }
970  }
971  _return.execution_time_ms = 0;
972 
973  SQLParser parser;
974  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
975  std::string last_parsed;
976  try {
977  ParserWrapper pw{query_str};
978  if (!pw.is_ddl && !pw.is_update_dml &&
979  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
980  std::string query_ra;
981  TableMap table_map;
982  OptionalTableMap tableNames(table_map);
983  _return.execution_time_ms += measure<>::execution([&]() {
984  query_ra = parse_to_ra(query_state->createQueryStateProxy(),
985  query_str,
986  {},
987  tableNames,
989  });
990 
991  // COPY_TO/SELECT: get read ExecutorOuterLock >> TableReadLock for each table (note
992  // for update/delete this would be a table write lock)
993  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
995  std::vector<Lock_Namespace::TableLock> table_locks;
997  session_ptr->getCatalog(), tableNames.value(), table_locks);
998 
999  if (pw.isCalciteExplain()) {
1000  throw std::runtime_error("explain is not unsupported by current thrift API");
1001  }
1002  execute_rel_alg_df(_return,
1003  query_ra,
1004  *session_ptr,
1005  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
1007  static_cast<size_t>(device_id),
1008  first_n);
1009  if (!_return.sm_size) {
1010  throw std::runtime_error("schema is missing in returned result");
1011  }
1012  return;
1013  }
1014  LOG(INFO) << "passing query to legacy processor";
1015  } catch (std::exception& e) {
1016  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1017  }
1019  "Exception: DDL or update DML are not unsupported by current thrift API");
1020 }
1021 
1022 void MapDHandler::sql_execute_gdf(TDataFrame& _return,
1023  const TSessionId& session,
1024  const std::string& query_str,
1025  const int32_t device_id,
1026  const int32_t first_n) {
1027  auto stdlog = STDLOG(get_session_ptr(session));
1028  sql_execute_df(_return, session, query_str, TDeviceType::GPU, device_id, first_n);
1029 }
1030 
1031 // For now we have only one user of a data frame in all cases.
1032 void MapDHandler::deallocate_df(const TSessionId& session,
1033  const TDataFrame& df,
1034  const TDeviceType::type device_type,
1035  const int32_t device_id) {
1036  auto stdlog = STDLOG(get_session_ptr(session));
1037  int8_t* dev_ptr{0};
1038  if (device_type == TDeviceType::GPU) {
1039  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1040  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1041  TMapDException ex;
1042  ex.error_msg = std::string(
1043  "Exception: current data frame handle is not bookkept or been inserted twice");
1044  LOG(ERROR) << ex.error_msg;
1045  throw ex;
1046  }
1047  dev_ptr = ipc_handle_to_dev_ptr_[df.df_handle];
1048  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1049  }
1050  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1051  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1052  ArrowResult result{sm_handle, df.sm_size, df_handle, df.df_size, dev_ptr};
1054  result,
1055  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1056  device_id,
1057  data_mgr_);
1058 }
1059 
1060 std::string MapDHandler::apply_copy_to_shim(const std::string& query_str) {
1061  auto result = query_str;
1062  {
1063  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
1064  // boost::regex::icase};
1065  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
1066  boost::regex::extended | boost::regex::icase};
1067  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
1068  result.replace(
1069  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
1070  });
1071  }
1072  return result;
1073 }
1074 
1075 void MapDHandler::sql_validate(TTableDescriptor& _return,
1076  const TSessionId& session,
1077  const std::string& query_str) {
1078  auto stdlog = STDLOG(get_session_ptr(session));
1079  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1080  stdlog.setQueryState(query_state);
1081 
1082  ParserWrapper pw{query_str};
1083  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1084  pw.is_update_dml) {
1085  THROW_MAPD_EXCEPTION("Can only validate SELECT statements.");
1086  }
1087  MapDHandler::validate_rel_alg(_return, query_state->createQueryStateProxy());
1088 }
1089 
1090 namespace {
1091 
1093  std::unordered_set<std::string> uc_column_names;
1094  std::unordered_set<std::string> uc_column_table_qualifiers;
1095 };
1096 
1097 // Extract what looks like a (qualified) identifier from the partial query.
1098 // The results will be used to rank the auto-completion results: tables which
1099 // contain at least one of the identifiers first.
1101  const std::string& sql) {
1102  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1103  boost::regex::extended | boost::regex::icase};
1104  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1105  boost::sregex_token_iterator end;
1106  std::unordered_set<std::string> uc_column_names;
1107  std::unordered_set<std::string> uc_column_table_qualifiers;
1108  for (; tok_it != end; ++tok_it) {
1109  std::string column_name = *tok_it;
1110  std::vector<std::string> column_tokens;
1111  boost::split(column_tokens, column_name, boost::is_any_of("."));
1112  if (column_tokens.size() == 2) {
1113  // If the column name is qualified, take user's word.
1114  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1115  } else {
1116  uc_column_names.insert(to_upper(column_name));
1117  }
1118  }
1119  return {uc_column_names, uc_column_table_qualifiers};
1120 }
1121 
1122 } // namespace
1123 
1124 void MapDHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1125  const TSessionId& session,
1126  const std::string& sql,
1127  const int cursor) {
1128  auto stdlog = STDLOG(get_session_ptr(session));
1129  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1130  get_completion_hints_unsorted(hints, visible_tables, session, sql, cursor);
1131  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1132  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1133  proj_tokens.uc_column_names, visible_tables, session);
1134  // Add the table qualifiers explicitly specified by the user.
1135  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1136  proj_tokens.uc_column_table_qualifiers.end());
1137  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1138  std::sort(
1139  hints.begin(),
1140  hints.end(),
1141  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1142  if (lhs.type == TCompletionHintType::TABLE &&
1143  rhs.type == TCompletionHintType::TABLE) {
1144  // Between two tables, one which is compatible with the specified projections
1145  // and one which isn't, pick the one which is compatible.
1146  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1147  compatible_table_names.end() &&
1148  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1149  compatible_table_names.end()) {
1150  return true;
1151  }
1152  }
1153  return lhs.type < rhs.type;
1154  });
1155 }
1156 
1157 void MapDHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1158  std::vector<std::string>& visible_tables,
1159  const TSessionId& session,
1160  const std::string& sql,
1161  const int cursor) {
1162  const auto session_info = get_session_copy(session);
1163  try {
1164  get_tables(visible_tables, session);
1165  // Filter out keywords suggested by Calcite which we don't support.
1167  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1168  } catch (const std::exception& e) {
1169  TMapDException ex;
1170  ex.error_msg = "Exception: " + std::string(e.what());
1171  LOG(ERROR) << ex.error_msg;
1172  throw ex;
1173  }
1174  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1175  const size_t length_to_cursor =
1176  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1177  // Trust hints from Calcite after the FROM keyword.
1178  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1179  return;
1180  }
1181  // Before FROM, the query is too incomplete for context-sensitive completions.
1182  get_token_based_completions(hints, session, visible_tables, sql, cursor);
1183 }
1184 
1186  std::vector<TCompletionHint>& hints,
1187  const TSessionId& session,
1188  const std::vector<std::string>& visible_tables,
1189  const std::string& sql,
1190  const int cursor) {
1191  const auto last_word =
1192  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1193  boost::regex select_expr{R"(\s*select\s+)",
1194  boost::regex::extended | boost::regex::icase};
1195  const size_t length_to_cursor =
1196  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1197  // After SELECT but before FROM, look for all columns in all tables which match the
1198  // prefix.
1199  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1200  const auto column_names_by_table =
1201  fill_column_names_by_table(visible_tables, session);
1202  // Trust the fully qualified columns the most.
1203  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1204  return;
1205  }
1206  // Not much information to use, just retrieve column names which match the prefix.
1207  if (should_suggest_column_hints(sql)) {
1208  get_column_hints(hints, last_word, column_names_by_table);
1209  return;
1210  }
1211  const std::string kFromKeyword{"FROM"};
1212  if (boost::istarts_with(kFromKeyword, last_word)) {
1213  TCompletionHint keyword_hint;
1214  keyword_hint.type = TCompletionHintType::KEYWORD;
1215  keyword_hint.replaced = last_word;
1216  keyword_hint.hints.emplace_back(kFromKeyword);
1217  hints.push_back(keyword_hint);
1218  }
1219  } else {
1220  const std::string kSelectKeyword{"SELECT"};
1221  if (boost::istarts_with(kSelectKeyword, last_word)) {
1222  TCompletionHint keyword_hint;
1223  keyword_hint.type = TCompletionHintType::KEYWORD;
1224  keyword_hint.replaced = last_word;
1225  keyword_hint.hints.emplace_back(kSelectKeyword);
1226  hints.push_back(keyword_hint);
1227  }
1228  }
1229 }
1230 
1231 std::unordered_map<std::string, std::unordered_set<std::string>>
1232 MapDHandler::fill_column_names_by_table(const std::vector<std::string>& table_names,
1233  const TSessionId& session) {
1234  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1235  for (const auto& table_name : table_names) {
1236  TTableDetails table_details;
1237  get_table_details(table_details, session, table_name);
1238  for (const auto& column_type : table_details.row_desc) {
1239  column_names_by_table[table_name].emplace(column_type.col_name);
1240  }
1241  }
1242  return column_names_by_table;
1243 }
1244 
1246  const std::unordered_set<std::string>& uc_column_names,
1247  const std::vector<std::string>& table_names,
1248  const TSessionId& session) {
1249  std::unordered_set<std::string> compatible_table_names_by_column;
1250  for (const auto& table_name : table_names) {
1251  TTableDetails table_details;
1252  get_table_details(table_details, session, table_name);
1253  for (const auto& column_type : table_details.row_desc) {
1254  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1255  compatible_table_names_by_column.emplace(to_upper(table_name));
1256  break;
1257  }
1258  }
1259  }
1260  return compatible_table_names_by_column;
1261 }
1262 
1263 void MapDHandler::validate_rel_alg(TTableDescriptor& _return,
1264  QueryStateProxy query_state_proxy) {
1265  try {
1266  const auto query_ra = parse_to_ra(query_state_proxy,
1267  query_state_proxy.getQueryState().getQueryStr(),
1268  {},
1269  boost::none,
1271  TQueryResult result;
1273  query_state_proxy,
1274  query_ra,
1275  true,
1277  -1,
1278  -1,
1279  false,
1280  true,
1281  false,
1282  false,
1283  false);
1284  const auto& row_desc = fixup_row_descriptor(
1285  result.row_set.row_desc,
1286  query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog());
1287  for (const auto& col_desc : row_desc) {
1288  const auto it_ok = _return.insert(std::make_pair(col_desc.col_name, col_desc));
1289  CHECK(it_ok.second);
1290  }
1291  } catch (std::exception& e) {
1292  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1293  }
1294 }
1295 
1296 void MapDHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1297  auto stdlog = STDLOG(get_session_ptr(session));
1298  auto session_ptr = stdlog.getConstSessionInfo();
1299  if (!session_ptr->get_currentUser().isSuper) {
1300  // WARNING: This appears to not include roles a user is a member of,
1301  // if the role has no permissions granted to it.
1302  roles =
1303  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1304  session_ptr->getCatalog().getCurrentDB().dbId);
1305  } else {
1306  roles = SysCatalog::instance().getRoles(
1307  false, true, session_ptr->get_currentUser().userName);
1308  }
1309 }
1310 
1311 bool MapDHandler::has_role(const TSessionId& sessionId,
1312  const std::string& granteeName,
1313  const std::string& roleName) {
1314  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1315  const auto session_ptr = stdlog.getConstSessionInfo();
1316  const auto current_user = session_ptr->get_currentUser();
1317  if (!current_user.isSuper) {
1318  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1319  user && current_user.userName != granteeName) {
1320  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1321  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1322  current_user.userName, granteeName, true)) {
1324  "Only super users can check roles assignment that have not been directly "
1325  "granted to a user.");
1326  }
1327  }
1328  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1329 }
1330 
1331 static TDBObject serialize_db_object(const std::string& roleName,
1332  const DBObject& inObject) {
1333  TDBObject outObject;
1334  outObject.objectName = inObject.getName();
1335  outObject.grantee = roleName;
1336  const auto ap = inObject.getPrivileges();
1337  switch (inObject.getObjectKey().permissionType) {
1338  case DatabaseDBObjectType:
1339  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1340  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1341  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1342  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1343  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1344 
1345  break;
1346  case TableDBObjectType:
1347  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1348  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1349  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1350  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1351  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1352  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1353  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1354  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1355  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1356 
1357  break;
1358  case DashboardDBObjectType:
1359  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1360  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1361  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1362  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1363  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1364 
1365  break;
1366  case ViewDBObjectType:
1367  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1368  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1369  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1370  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1371  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1372  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1373  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1374 
1375  break;
1376  default:
1377  CHECK(false);
1378  }
1379  const int type_val = static_cast<int>(inObject.getType());
1380  CHECK(type_val >= 0 && type_val < 5);
1381  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1382  return outObject;
1383 }
1384 
1386  const TDBObjectPermissions& permissions) {
1387  if (!permissions.__isset.database_permissions_) {
1388  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1389  }
1390  auto perms = permissions.database_permissions_;
1391  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1392  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1393  (perms.view_sql_editor_ &&
1395  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1396  return false;
1397  } else {
1398  return true;
1399  }
1400 }
1401 
1403  const TDBObjectPermissions& permissions) {
1404  if (!permissions.__isset.table_permissions_) {
1405  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1406  }
1407  auto perms = permissions.table_permissions_;
1408  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1409  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1410  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1411  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1412  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1413  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1414  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1415  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1416  return false;
1417  } else {
1418  return true;
1419  }
1420 }
1421 
1423  const TDBObjectPermissions& permissions) {
1424  if (!permissions.__isset.dashboard_permissions_) {
1425  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1426  }
1427  auto perms = permissions.dashboard_permissions_;
1428  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1429  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1430  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1431  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1432  return false;
1433  } else {
1434  return true;
1435  }
1436 }
1437 
1439  const TDBObjectPermissions& permissions) {
1440  if (!permissions.__isset.view_permissions_) {
1441  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1442  }
1443  auto perms = permissions.view_permissions_;
1444  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1445  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1446  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1447  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1448  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1449  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1450  return false;
1451  } else {
1452  return true;
1453  }
1454 }
1455 
1456 bool MapDHandler::has_object_privilege(const TSessionId& sessionId,
1457  const std::string& granteeName,
1458  const std::string& objectName,
1459  const TDBObjectType::type objectType,
1460  const TDBObjectPermissions& permissions) {
1461  auto stdlog = STDLOG(get_session_ptr(sessionId));
1462  auto session_ptr = stdlog.getConstSessionInfo();
1463  auto const& cat = session_ptr->getCatalog();
1464  auto const& current_user = session_ptr->get_currentUser();
1465  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1466  current_user.userName, granteeName, false)) {
1468  "Users except superusers can only check privileges for self or roles granted to "
1469  "them.")
1470  }
1472  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1473  user_meta.isSuper) {
1474  return true;
1475  }
1476  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1477  if (!grnt) {
1478  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1479  }
1481  std::string func_name;
1482  switch (objectType) {
1485  func_name = "database";
1486  break;
1489  func_name = "table";
1490  break;
1493  func_name = "dashboard";
1494  break;
1497  func_name = "view";
1498  break;
1499  default:
1500  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1501  }
1502  DBObject req_object(objectName, type);
1503  req_object.loadKey(cat);
1504 
1505  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1506  if (grantee_object) {
1507  // if grantee has privs on the object
1508  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1509  } else {
1510  // no privileges on that object
1511  return false;
1512  }
1513 }
1514 
1515 void MapDHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1516  const TSessionId& sessionId,
1517  const std::string& roleName) {
1518  auto stdlog = STDLOG(get_session_ptr(sessionId));
1519  auto session_ptr = stdlog.getConstSessionInfo();
1520  auto const& user = session_ptr->get_currentUser();
1521  if (!user.isSuper &&
1522  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1523  return;
1524  }
1525  auto* rl = SysCatalog::instance().getGrantee(roleName);
1526  if (rl) {
1527  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1528  for (auto& dbObject : *rl->getDbObjects(true)) {
1529  if (dbObject.first.dbId != dbId) {
1530  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1531  // usecase for now, though)
1532  continue;
1533  }
1534  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1535  TDBObjectsForRole.push_back(tdbObject);
1536  }
1537  } else {
1538  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
1539  }
1540 }
1541 
1542 void MapDHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1543  const TSessionId& sessionId,
1544  const std::string& objectName,
1545  const TDBObjectType::type type) {
1546  auto stdlog = STDLOG(get_session_ptr(sessionId));
1547  auto session_ptr = stdlog.getConstSessionInfo();
1548  DBObjectType object_type;
1549  switch (type) {
1551  object_type = DBObjectType::DatabaseDBObjectType;
1552  break;
1554  object_type = DBObjectType::TableDBObjectType;
1555  break;
1558  break;
1560  object_type = DBObjectType::ViewDBObjectType;
1561  break;
1562  default:
1563  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
1564  ": unknown object type (" + std::to_string(type) + ").");
1565  }
1566  DBObject object_to_find(objectName, object_type);
1567 
1568  try {
1569  if (object_type == DashboardDBObjectType) {
1570  if (objectName == "") {
1571  object_to_find = DBObject(-1, object_type);
1572  } else {
1573  object_to_find = DBObject(std::stoi(objectName), object_type);
1574  }
1575  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1576  !objectName.empty()) {
1577  // special handling for view / table
1578  auto const& cat = session_ptr->getCatalog();
1579  auto td = cat.getMetadataForTable(objectName, false);
1580  if (td) {
1581  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1582  object_to_find = DBObject(objectName, object_type);
1583  }
1584  }
1585  object_to_find.loadKey(session_ptr->getCatalog());
1586  } catch (const std::exception&) {
1587  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
1588  }
1589 
1590  // object type on database level
1591  DBObject object_to_find_dblevel("", object_type);
1592  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
1593  // if user is superuser respond with a full priv
1594  if (session_ptr->get_currentUser().isSuper) {
1595  // using ALL_TABLE here to set max permissions
1596  DBObject dbObj{object_to_find.getObjectKey(),
1598  session_ptr->get_currentUser().userId};
1599  dbObj.setName("super");
1600  TDBObjects.push_back(
1601  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
1602  };
1603 
1604  std::vector<std::string> grantees =
1605  SysCatalog::instance().getRoles(true,
1606  session_ptr->get_currentUser().isSuper,
1607  session_ptr->get_currentUser().userName);
1608  for (const auto& grantee : grantees) {
1609  DBObject* object_found;
1610  auto* gr = SysCatalog::instance().getGrantee(grantee);
1611  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1612  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1613  }
1614  // check object permissions on Database level
1615  if (gr &&
1616  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1617  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1618  }
1619  }
1620 }
1621 
1622 void MapDHandler::get_all_roles_for_user(std::vector<std::string>& roles,
1623  const TSessionId& sessionId,
1624  const std::string& granteeName) {
1625  auto stdlog = STDLOG(get_session_ptr(sessionId));
1626  auto session_ptr = stdlog.getConstSessionInfo();
1627  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
1628  if (grantee) {
1629  if (session_ptr->get_currentUser().isSuper) {
1630  roles = grantee->getRoles();
1631  } else if (grantee->isUser()) {
1632  if (session_ptr->get_currentUser().userName == granteeName) {
1633  roles = grantee->getRoles();
1634  } else {
1636  "Only a superuser is authorized to request list of roles granted to another "
1637  "user.");
1638  }
1639  } else {
1640  CHECK(!grantee->isUser());
1641  // granteeName is actually a roleName here and we can check a role
1642  // only if it is granted to us
1643  if (SysCatalog::instance().isRoleGrantedToGrantee(
1644  session_ptr->get_currentUser().userName, granteeName, false)) {
1645  roles = grantee->getRoles();
1646  } else {
1647  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
1648  }
1649  }
1650  } else {
1651  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
1652  }
1653 }
1654 
1656  TPixelTableRowResult& _return,
1657  const TSessionId& session,
1658  const int64_t widget_id,
1659  const TPixel& pixel,
1660  const std::map<std::string, std::vector<std::string>>& table_col_names,
1661  const bool column_format,
1662  const int32_t pixel_radius,
1663  const std::string& nonce) {
1664  auto stdlog = STDLOG(get_session_ptr(session),
1665  "widget_id",
1666  widget_id,
1667  "pixel.x",
1668  pixel.x,
1669  "pixel.y",
1670  pixel.y,
1671  "column_format",
1672  column_format,
1673  "pixel_radius",
1674  pixel_radius,
1675  "table_col_names",
1676  MapDRenderHandler::dump_table_col_names(table_col_names),
1677  "nonce",
1678  nonce);
1679  auto session_ptr = stdlog.getSessionInfo();
1680  if (!render_handler_) {
1681  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
1682  }
1683 
1684  try {
1685  render_handler_->get_result_row_for_pixel(_return,
1686  session_ptr,
1687  widget_id,
1688  pixel,
1689  table_col_names,
1690  column_format,
1691  pixel_radius,
1692  nonce);
1693  } catch (std::exception& e) {
1694  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1695  }
1696 }
1697 
1698 namespace {
1699 
1700 inline void fixup_geo_column_descriptor(TColumnType& col_type,
1701  const SQLTypes subtype,
1702  const int output_srid) {
1703  col_type.col_type.precision = static_cast<int>(subtype);
1704  col_type.col_type.scale = output_srid;
1705 }
1706 
1707 } // namespace
1708 
1710  const ColumnDescriptor* cd) {
1711  TColumnType col_type;
1712  col_type.col_name = cd->columnName;
1713  col_type.src_name = cd->sourceName;
1714  col_type.col_id = cd->columnId;
1715  col_type.col_type.type = type_to_thrift(cd->columnType);
1716  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
1717  col_type.col_type.nullable = !cd->columnType.get_notnull();
1718  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
1719  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
1720  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
1721  }
1722  if (IS_GEO(cd->columnType.get_type())) {
1724  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
1725  } else {
1726  col_type.col_type.precision = cd->columnType.get_precision();
1727  col_type.col_type.scale = cd->columnType.get_scale();
1728  }
1729  col_type.is_system = cd->isSystemCol;
1731  cat != nullptr) {
1732  // have to get the actual size of the encoding from the dictionary definition
1733  const int dict_id = cd->columnType.get_comp_param();
1734  if (!cat->getMetadataForDict(dict_id, false)) {
1735  col_type.col_type.comp_param = 0;
1736  return col_type;
1737  }
1738  auto dd = cat->getMetadataForDict(dict_id, false);
1739  if (!dd) {
1740  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
1741  }
1742  col_type.col_type.comp_param = dd->dictNBits;
1743  } else {
1744  col_type.col_type.comp_param =
1745  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
1746  ? 32
1747  : cd->columnType.get_comp_param();
1748  }
1749  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
1750  return col_type;
1751 }
1752 
1753 void MapDHandler::get_internal_table_details(TTableDetails& _return,
1754  const TSessionId& session,
1755  const std::string& table_name) {
1756  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1757  get_table_details_impl(_return, stdlog, session, table_name, true, false);
1758 }
1759 
1760 void MapDHandler::get_table_details(TTableDetails& _return,
1761  const TSessionId& session,
1762  const std::string& table_name) {
1763  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1764  get_table_details_impl(_return, stdlog, session, table_name, false, false);
1765 }
1766 
1767 void MapDHandler::get_table_details_impl(TTableDetails& _return,
1768  query_state::StdLog& stdlog,
1769  const TSessionId& session,
1770  const std::string& table_name,
1771  const bool get_system,
1772  const bool get_physical) {
1773  auto session_copy =
1774  std::make_shared<Catalog_Namespace::SessionInfo>(*stdlog.getSessionInfo());
1775  auto& cat = session_copy->getCatalog();
1776  auto td = cat.getMetadataForTable(
1777  table_name,
1778  false); // don't populate fragmenter on this call since we only want metadata
1779  if (!td) {
1780  THROW_MAPD_EXCEPTION("Table " + table_name + " doesn't exist");
1781  }
1782  if (td->isView) {
1783  auto query_state = create_query_state(session_copy, td->viewSQL);
1784  stdlog.setQueryState(query_state);
1785  try {
1786  if (hasTableAccessPrivileges(td, session)) {
1787  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
1788  query_state->getQueryStr(),
1789  {},
1790  boost::none,
1792  TQueryResult result;
1793  execute_rel_alg(result,
1794  query_state->createQueryStateProxy(),
1795  query_ra,
1796  true,
1798  -1,
1799  -1,
1800  false,
1801  true,
1802  false,
1803  false,
1804  false);
1805  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
1806  } else {
1807  THROW_MAPD_EXCEPTION("User has no access privileges to table " + table_name);
1808  }
1809  } catch (const std::exception& e) {
1810  THROW_MAPD_EXCEPTION("View query has failed with an error: '" +
1811  std::string(e.what()) +
1812  "'.\nThe view must be dropped and re-created to "
1813  "resolve the error. \nQuery:\n" +
1814  query_state->getQueryStr());
1815  }
1816  } else {
1817  try {
1818  if (hasTableAccessPrivileges(td, session)) {
1819  const auto col_descriptors =
1820  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
1821  const auto deleted_cd = cat.getDeletedColumn(td);
1822  for (const auto cd : col_descriptors) {
1823  if (cd == deleted_cd) {
1824  continue;
1825  }
1826  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
1827  }
1828  } else {
1829  THROW_MAPD_EXCEPTION("User has no access privileges to table " + table_name);
1830  }
1831  } catch (const std::runtime_error& e) {
1832  THROW_MAPD_EXCEPTION(e.what());
1833  }
1834  }
1835  _return.fragment_size = td->maxFragRows;
1836  _return.page_size = td->fragPageSize;
1837  _return.max_rows = td->maxRows;
1838  _return.view_sql = td->viewSQL;
1839  _return.shard_count = td->nShards;
1840  _return.key_metainfo = td->keyMetainfo;
1841  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
1842  _return.partition_detail =
1843  td->partitions.empty()
1844  ? TPartitionDetail::DEFAULT
1845  : (table_is_replicated(td)
1846  ? TPartitionDetail::REPLICATED
1847  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
1848  : TPartitionDetail::OTHER));
1849 }
1850 
1851 void MapDHandler::get_link_view(TFrontendView& _return,
1852  const TSessionId& session,
1853  const std::string& link) {
1854  auto stdlog = STDLOG(get_session_ptr(session));
1855  auto session_ptr = stdlog.getConstSessionInfo();
1856  auto const& cat = session_ptr->getCatalog();
1857  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
1858  if (!ld) {
1859  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
1860  }
1861  _return.view_state = ld->viewState;
1862  _return.view_name = ld->link;
1863  _return.update_time = ld->updateTime;
1864  _return.view_metadata = ld->viewMetadata;
1865 }
1866 
1868  const TSessionId& session) {
1869  const auto session_info = get_session_copy(session);
1870  auto& cat = session_info.getCatalog();
1871  auto user_metadata = session_info.get_currentUser();
1872 
1873  if (user_metadata.isSuper) {
1874  return true;
1875  }
1876 
1878  dbObject.loadKey(cat);
1879  std::vector<DBObject> privObjects = {dbObject};
1880 
1881  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
1882 }
1883 
1884 void MapDHandler::get_tables_impl(std::vector<std::string>& table_names,
1885  query_state::StdLog& stdlog,
1886  const GetTablesType get_tables_type) {
1887  auto const session_ptr = stdlog.getConstSessionInfo();
1888  auto const& cat = session_ptr->getCatalog();
1889  const auto tables = cat.getAllTableMetadata();
1890  for (const auto td : tables) {
1891  if (td->shard >= 0) {
1892  // skip shards, they're not standalone tables
1893  continue;
1894  }
1895  switch (get_tables_type) {
1896  case GET_PHYSICAL_TABLES: {
1897  if (td->isView) {
1898  continue;
1899  }
1900  break;
1901  }
1902  case GET_VIEWS: {
1903  if (!td->isView) {
1904  continue;
1905  }
1906  }
1907  default:
1908  break;
1909  }
1910  if (!hasTableAccessPrivileges(td, session_ptr->get_session_id())) {
1911  // skip table, as there are no privileges to access it
1912  continue;
1913  }
1914  table_names.push_back(td->tableName);
1915  }
1916 }
1917 
1918 void MapDHandler::get_tables(std::vector<std::string>& table_names,
1919  const TSessionId& session) {
1920  auto stdlog = STDLOG(get_session_ptr(session));
1921  get_tables_impl(table_names, stdlog, GET_PHYSICAL_TABLES_AND_VIEWS);
1922 }
1923 
1924 void MapDHandler::get_physical_tables(std::vector<std::string>& table_names,
1925  const TSessionId& session) {
1926  auto stdlog = STDLOG(get_session_ptr(session));
1927  get_tables_impl(table_names, stdlog, GET_PHYSICAL_TABLES);
1928 }
1929 
1930 void MapDHandler::get_views(std::vector<std::string>& table_names,
1931  const TSessionId& session) {
1932  auto stdlog = STDLOG(get_session_ptr(session));
1933  get_tables_impl(table_names, stdlog, GET_VIEWS);
1934 }
1935 
1936 void MapDHandler::get_tables_meta(std::vector<TTableMeta>& _return,
1937  const TSessionId& session) {
1938  auto stdlog = STDLOG(get_session_ptr(session));
1939  auto session_ptr = stdlog.getConstSessionInfo();
1940  auto query_state = create_query_state(session_ptr, "");
1941  stdlog.setQueryState(query_state);
1942 
1943  const auto& cat = session_ptr->getCatalog();
1944  const auto tables = cat.getAllTableMetadata();
1945  _return.reserve(tables.size());
1946 
1947  for (const auto td : tables) {
1948  if (td->shard >= 0) {
1949  // skip shards, they're not standalone tables
1950  continue;
1951  }
1952  if (!hasTableAccessPrivileges(td, session)) {
1953  // skip table, as there are no privileges to access it
1954  continue;
1955  }
1956 
1957  TTableMeta ret;
1958  ret.table_name = td->tableName;
1959  ret.is_view = td->isView;
1960  ret.is_replicated = table_is_replicated(td);
1961  ret.shard_count = td->nShards;
1962  ret.max_rows = td->maxRows;
1963  ret.table_id = td->tableId;
1964 
1965  std::vector<TTypeInfo> col_types;
1966  std::vector<std::string> col_names;
1967  size_t num_cols = 0;
1968  if (td->isView) {
1969  try {
1970  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
1971  td->viewSQL,
1972  {},
1973  boost::none,
1975  TQueryResult result;
1976  execute_rel_alg(result,
1977  query_state->createQueryStateProxy(),
1978  query_ra,
1979  true,
1981  -1,
1982  -1,
1983  false,
1984  true,
1985  false,
1986  false,
1987  false);
1988  num_cols = result.row_set.row_desc.size();
1989  for (const auto col : result.row_set.row_desc) {
1990  if (col.is_physical) {
1991  num_cols--;
1992  continue;
1993  }
1994  col_types.push_back(col.col_type);
1995  col_names.push_back(col.col_name);
1996  }
1997  } catch (std::exception& e) {
1998  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
1999  }
2000  } else {
2001  try {
2002  if (hasTableAccessPrivileges(td, session)) {
2003  const auto col_descriptors =
2004  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2005  const auto deleted_cd = cat.getDeletedColumn(td);
2006  for (const auto cd : col_descriptors) {
2007  if (cd == deleted_cd) {
2008  continue;
2009  }
2010  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2011  col_names.push_back(cd->columnName);
2012  }
2013  num_cols = col_descriptors.size();
2014  } else {
2015  continue;
2016  }
2017  } catch (const std::runtime_error& e) {
2018  THROW_MAPD_EXCEPTION(e.what());
2019  }
2020  }
2021 
2022  ret.num_cols = num_cols;
2023  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2024  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2025 
2026  _return.push_back(ret);
2027  }
2028 }
2029 
2030 void MapDHandler::get_users(std::vector<std::string>& user_names,
2031  const TSessionId& session) {
2032  auto stdlog = STDLOG(get_session_ptr(session));
2033  auto session_ptr = stdlog.getConstSessionInfo();
2034  std::list<Catalog_Namespace::UserMetadata> user_list;
2035 
2036  if (!session_ptr->get_currentUser().isSuper) {
2037  user_list = SysCatalog::instance().getAllUserMetadata(
2038  session_ptr->getCatalog().getCurrentDB().dbId);
2039  } else {
2040  user_list = SysCatalog::instance().getAllUserMetadata();
2041  }
2042  for (auto u : user_list) {
2043  user_names.push_back(u.userName);
2044  }
2045 }
2046 
2047 void MapDHandler::get_version(std::string& version) {
2048  version = MAPD_RELEASE;
2049 }
2050 
2051 void MapDHandler::clear_gpu_memory(const TSessionId& session) {
2052  auto stdlog = STDLOG(get_session_ptr(session));
2053  auto session_ptr = stdlog.getConstSessionInfo();
2054  if (!session_ptr->get_currentUser().isSuper) {
2055  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2056  }
2057  try {
2059  } catch (const std::exception& e) {
2060  THROW_MAPD_EXCEPTION(e.what());
2061  }
2062  if (render_handler_) {
2063  render_handler_->clear_gpu_memory();
2064  }
2065 
2066  if (leaf_aggregator_.leafCount() > 0) {
2068  }
2069 }
2070 
2071 void MapDHandler::clear_cpu_memory(const TSessionId& session) {
2072  auto stdlog = STDLOG(get_session_ptr(session));
2073  auto session_ptr = stdlog.getConstSessionInfo();
2074  if (!session_ptr->get_currentUser().isSuper) {
2075  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2076  }
2077  try {
2079  } catch (const std::exception& e) {
2080  THROW_MAPD_EXCEPTION(e.what());
2081  }
2082  if (render_handler_) {
2083  render_handler_->clear_cpu_memory();
2084  }
2085 
2086  if (leaf_aggregator_.leafCount() > 0) {
2088  }
2089 }
2090 
2092  return INVALID_SESSION_ID;
2093 }
2094 
2095 void MapDHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2096  const TSessionId& session,
2097  const std::string& memory_level) {
2098  auto stdlog = STDLOG(get_session_ptr(session));
2099  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2100  Data_Namespace::MemoryLevel mem_level;
2101  if (!memory_level.compare("gpu")) {
2103  internal_memory =
2104  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2105  } else {
2107  internal_memory =
2108  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2109  }
2110 
2111  for (auto memInfo : internal_memory) {
2112  TNodeMemoryInfo nodeInfo;
2113  if (leaf_aggregator_.leafCount() > 0) {
2114  nodeInfo.host_name = get_hostname();
2115  }
2116  nodeInfo.page_size = memInfo.pageSize;
2117  nodeInfo.max_num_pages = memInfo.maxNumPages;
2118  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2119  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2120  for (auto gpu : memInfo.nodeMemoryData) {
2121  TMemoryData md;
2122  md.slab = gpu.slabNum;
2123  md.start_page = gpu.startPage;
2124  md.num_pages = gpu.numPages;
2125  md.touch = gpu.touch;
2126  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2127  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2128  nodeInfo.node_memory_data.push_back(md);
2129  }
2130  _return.push_back(nodeInfo);
2131  }
2132  if (leaf_aggregator_.leafCount() > 0) {
2133  std::vector<TNodeMemoryInfo> leafSummary =
2134  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2135  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2136  }
2137 }
2138 
2139 void MapDHandler::get_databases(std::vector<TDBInfo>& dbinfos,
2140  const TSessionId& session) {
2141  auto stdlog = STDLOG(get_session_ptr(session));
2142  auto session_ptr = stdlog.getConstSessionInfo();
2143  const auto& user = session_ptr->get_currentUser();
2145  SysCatalog::instance().getDatabaseListForUser(user);
2146  for (auto& db : dbs) {
2147  TDBInfo dbinfo;
2148  dbinfo.db_name = std::move(db.dbName);
2149  dbinfo.db_owner = std::move(db.dbOwnerName);
2150  dbinfos.push_back(std::move(dbinfo));
2151  }
2152 }
2153 
2154 void MapDHandler::set_execution_mode(const TSessionId& session,
2155  const TExecuteMode::type mode) {
2156  auto stdlog = STDLOG(get_session_ptr(session));
2157  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
2158  auto session_it = get_session_it_unsafe(session, write_lock);
2159  if (leaf_aggregator_.leafCount() > 0) {
2160  leaf_aggregator_.set_execution_mode(session, mode);
2161  try {
2162  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2163  } catch (const TMapDException& e) {
2164  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2165  }
2166  return;
2167  }
2168  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2169 }
2170 
2171 namespace {
2172 
2173 void check_table_not_sharded(const Catalog& cat, const std::string& table_name) {
2174  const auto td = cat.getMetadataForTable(table_name);
2175  if (td && td->nShards) {
2176  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2177  }
2178 }
2179 
2180 } // namespace
2181 
2182 void MapDHandler::load_table_binary(const TSessionId& session,
2183  const std::string& table_name,
2184  const std::vector<TRow>& rows) {
2185  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2186  auto session_ptr = stdlog.getConstSessionInfo();
2187  check_read_only("load_table_binary");
2188  auto& cat = session_ptr->getCatalog();
2189  if (g_cluster && !leaf_aggregator_.leafCount()) {
2190  // Sharded table rows need to be routed to the leaf by an aggregator.
2191  check_table_not_sharded(cat, table_name);
2192  }
2193  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2194  if (td == nullptr) {
2195  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2196  }
2197  check_table_load_privileges(*session_ptr, table_name);
2198  if (rows.empty()) {
2199  return;
2200  }
2201  std::unique_ptr<Importer_NS::Loader> loader;
2202  if (leaf_aggregator_.leafCount() > 0) {
2203  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2204  } else {
2205  loader.reset(new Importer_NS::Loader(cat, td));
2206  }
2207  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2208  // Subtracting 1 (rowid) until TableDescriptor is updated.
2209  if (rows.front().cols.size() !=
2210  static_cast<size_t>(td->nColumns) - (td->hasDeletedCol ? 2 : 1)) {
2211  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2212  }
2213  auto col_descs = loader->get_column_descs();
2214  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2215  for (auto cd : col_descs) {
2216  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2217  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2218  }
2219  for (auto const& row : rows) {
2220  size_t col_idx = 0;
2221  try {
2222  for (auto cd : col_descs) {
2223  import_buffers[col_idx]->add_value(
2224  cd, row.cols[col_idx], row.cols[col_idx].is_null);
2225  col_idx++;
2226  }
2227  } catch (const std::exception& e) {
2228  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2229  import_buffers[col_idx_to_pop]->pop_value();
2230  }
2231  LOG(ERROR) << "Input exception thrown: " << e.what()
2232  << ". Row discarded, issue at column : " << (col_idx + 1)
2233  << " data :" << row;
2234  }
2235  }
2236  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2237  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2238  loader->load(import_buffers, rows.size());
2239 }
2240 
2242  const Catalog_Namespace::SessionInfo& session_info,
2243  const std::string& table_name,
2244  size_t num_cols,
2245  std::unique_ptr<Importer_NS::Loader>* loader,
2246  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>>* import_buffers) {
2247  auto& cat = session_info.getCatalog();
2248  if (g_cluster && !leaf_aggregator_.leafCount()) {
2249  // Sharded table rows need to be routed to the leaf by an aggregator.
2250  check_table_not_sharded(cat, table_name);
2251  }
2252  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2253  if (td == nullptr) {
2254  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2255  }
2256  check_table_load_privileges(session_info, table_name);
2257  if (leaf_aggregator_.leafCount() > 0) {
2258  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2259  } else {
2260  loader->reset(new Importer_NS::Loader(cat, td));
2261  }
2262  auto col_descs = (*loader)->get_column_descs();
2263  auto geo_physical_cols = std::count_if(
2264  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2265  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2266  // Subtracting 1 (rowid) until TableDescriptor is updated.
2267  if (num_cols != static_cast<size_t>(td->nColumns) - geo_physical_cols -
2268  (td->hasDeletedCol ? 2 : 1) ||
2269  num_cols < 1) {
2270  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2271  }
2272  for (auto cd : col_descs) {
2273  import_buffers->push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2274  new Importer_NS::TypedImportBuffer(cd, (*loader)->getStringDict(cd))));
2275  }
2276 }
2277 
2278 void MapDHandler::load_table_binary_columnar(const TSessionId& session,
2279  const std::string& table_name,
2280  const std::vector<TColumn>& cols) {
2281  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2282  auto session_ptr = stdlog.getConstSessionInfo();
2283  check_read_only("load_table_binary_columnar");
2284  auto const& cat = session_ptr->getCatalog();
2285 
2286  std::unique_ptr<Importer_NS::Loader> loader;
2287  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2289  *session_ptr, table_name, cols.size(), &loader, &import_buffers);
2290 
2291  size_t numRows = 0;
2292  size_t import_idx = 0; // index into the TColumn vector being loaded
2293  size_t col_idx = 0; // index into column description vector
2294  try {
2295  size_t skip_physical_cols = 0;
2296  for (auto cd : loader->get_column_descs()) {
2297  if (skip_physical_cols > 0) {
2298  if (!cd->isGeoPhyCol) {
2299  throw std::runtime_error("Unexpected physical column");
2300  }
2301  skip_physical_cols--;
2302  continue;
2303  }
2304  size_t colRows = import_buffers[col_idx]->add_values(cd, cols[import_idx]);
2305  if (col_idx == 0) {
2306  numRows = colRows;
2307  } else if (colRows != numRows) {
2308  std::ostringstream oss;
2309  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
2310  << cd->columnName << " , expecting " << numRows << " rows, column "
2311  << col_idx << " has " << colRows << " rows";
2312  THROW_MAPD_EXCEPTION(oss.str());
2313  }
2314  // Advance to the next column in the table
2315  col_idx++;
2316 
2317  // For geometry columns: process WKT strings and fill physical columns
2318  if (cd->columnType.is_geometry()) {
2319  auto geo_col_idx = col_idx - 1;
2320  const auto wkt_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2321  std::vector<std::vector<double>> coords_column, bounds_column;
2322  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2323  int render_group = 0;
2324  SQLTypeInfo ti = cd->columnType;
2325  if (numRows != wkt_column->size() ||
2327  ti,
2328  coords_column,
2329  bounds_column,
2330  ring_sizes_column,
2331  poly_rings_column,
2332  false)) {
2333  std::ostringstream oss;
2334  oss << "load_table_binary_columnar: Invalid geometry in column "
2335  << cd->columnName;
2336  THROW_MAPD_EXCEPTION(oss.str());
2337  }
2338  // Populate physical columns, advance col_idx
2340  cd,
2341  import_buffers,
2342  col_idx,
2343  coords_column,
2344  bounds_column,
2345  ring_sizes_column,
2346  poly_rings_column,
2347  render_group);
2348  skip_physical_cols = cd->columnType.get_physical_cols();
2349  }
2350  // Advance to the next column of values being loaded
2351  import_idx++;
2352  }
2353  } catch (const std::exception& e) {
2354  std::ostringstream oss;
2355  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
2356  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2357  THROW_MAPD_EXCEPTION(oss.str());
2358  }
2359  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2360  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2361  loader->load(import_buffers, numRows);
2362 }
2363 
2364 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
2365 
2366 #define ARROW_THRIFT_THROW_NOT_OK(s) \
2367  do { \
2368  ::arrow::Status _s = (s); \
2369  if (UNLIKELY(!_s.ok())) { \
2370  TMapDException ex; \
2371  ex.error_msg = _s.ToString(); \
2372  LOG(ERROR) << s.ToString(); \
2373  throw ex; \
2374  } \
2375  } while (0)
2376 
2377 namespace {
2378 
2379 RecordBatchVector loadArrowStream(const std::string& stream) {
2380  RecordBatchVector batches;
2381  try {
2382  // TODO(wesm): Make this simpler in general, see ARROW-1600
2383  auto stream_buffer =
2384  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
2385  static_cast<int64_t>(stream.size()));
2386 
2387  arrow::io::BufferReader buf_reader(stream_buffer);
2388  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
2390  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader, &batch_reader));
2391 
2392  while (true) {
2393  std::shared_ptr<arrow::RecordBatch> batch;
2394  // Read batch (zero-copy) from the stream
2395  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
2396  if (batch == nullptr) {
2397  break;
2398  }
2399  batches.emplace_back(std::move(batch));
2400  }
2401  } catch (const std::exception& e) {
2402  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
2403  }
2404  return batches;
2405 }
2406 
2407 } // namespace
2408 
2409 void MapDHandler::load_table_binary_arrow(const TSessionId& session,
2410  const std::string& table_name,
2411  const std::string& arrow_stream) {
2412  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2413  auto session_ptr = stdlog.getConstSessionInfo();
2414  check_read_only("load_table_binary_arrow");
2415 
2416  RecordBatchVector batches = loadArrowStream(arrow_stream);
2417 
2418  // Assuming have one batch for now
2419  if (batches.size() != 1) {
2420  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
2421  }
2422 
2423  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
2424 
2425  std::unique_ptr<Importer_NS::Loader> loader;
2426  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2427  prepare_columnar_loader(*session_ptr,
2428  table_name,
2429  static_cast<size_t>(batch->num_columns()),
2430  &loader,
2431  &import_buffers);
2432 
2433  size_t numRows = 0;
2434  size_t col_idx = 0;
2435  try {
2436  for (auto cd : loader->get_column_descs()) {
2437  auto& array = *batch->column(col_idx);
2438  Importer_NS::ArraySliceRange row_slice(0, array.length());
2439  numRows =
2440  import_buffers[col_idx]->add_arrow_values(cd, array, true, row_slice, nullptr);
2441  col_idx++;
2442  }
2443  } catch (const std::exception& e) {
2444  LOG(ERROR) << "Input exception thrown: " << e.what()
2445  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2446  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
2447  // other import paths
2448  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2449  }
2450  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2451  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2452  loader->load(import_buffers, numRows);
2453 }
2454 
2455 void MapDHandler::load_table(const TSessionId& session,
2456  const std::string& table_name,
2457  const std::vector<TStringRow>& rows) {
2458  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2459  auto session_ptr = stdlog.getConstSessionInfo();
2460  check_read_only("load_table");
2461  auto& cat = session_ptr->getCatalog();
2462  if (g_cluster && !leaf_aggregator_.leafCount()) {
2463  // Sharded table rows need to be routed to the leaf by an aggregator.
2464  check_table_not_sharded(cat, table_name);
2465  }
2466  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2467  if (td == nullptr) {
2468  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2469  }
2470  check_table_load_privileges(*session_ptr, table_name);
2471  if (rows.empty()) {
2472  return;
2473  }
2474  std::unique_ptr<Importer_NS::Loader> loader;
2475  if (leaf_aggregator_.leafCount() > 0) {
2476  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2477  } else {
2478  loader.reset(new Importer_NS::Loader(cat, td));
2479  }
2480  auto col_descs = loader->get_column_descs();
2481  auto geo_physical_cols = std::count_if(
2482  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2483  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2484  // Subtracting 1 (rowid) until TableDescriptor is updated.
2485  if (rows.front().cols.size() != static_cast<size_t>(td->nColumns) - geo_physical_cols -
2486  (td->hasDeletedCol ? 2 : 1)) {
2487  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name +
2488  " (" + std::to_string(rows.front().cols.size()) + " vs " +
2489  std::to_string(td->nColumns - geo_physical_cols - 1) + ")");
2490  }
2491  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2492  for (auto cd : col_descs) {
2493  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2494  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2495  }
2496  Importer_NS::CopyParams copy_params;
2497  size_t rows_completed = 0;
2498  for (auto const& row : rows) {
2499  size_t import_idx = 0; // index into the TStringRow being loaded
2500  size_t col_idx = 0; // index into column description vector
2501  try {
2502  size_t skip_physical_cols = 0;
2503  for (auto cd : col_descs) {
2504  if (skip_physical_cols > 0) {
2505  if (!cd->isGeoPhyCol) {
2506  throw std::runtime_error("Unexpected physical column");
2507  }
2508  skip_physical_cols--;
2509  continue;
2510  }
2511  import_buffers[col_idx]->add_value(
2512  cd, row.cols[import_idx].str_val, row.cols[import_idx].is_null, copy_params);
2513  // Advance to the next column within the table
2514  col_idx++;
2515 
2516  if (cd->columnType.is_geometry()) {
2517  // Populate physical columns
2518  std::vector<double> coords, bounds;
2519  std::vector<int> ring_sizes, poly_rings;
2520  int render_group = 0;
2521  SQLTypeInfo ti;
2522  if (row.cols[import_idx].is_null ||
2523  !Geo_namespace::GeoTypesFactory::getGeoColumns(row.cols[import_idx].str_val,
2524  ti,
2525  coords,
2526  bounds,
2527  ring_sizes,
2528  poly_rings,
2529  false)) {
2530  throw std::runtime_error("Invalid geometry");
2531  }
2532  if (cd->columnType.get_type() != ti.get_type()) {
2533  throw std::runtime_error("Geometry type mismatch");
2534  }
2536  cd,
2537  import_buffers,
2538  col_idx,
2539  coords,
2540  bounds,
2541  ring_sizes,
2542  poly_rings,
2543  render_group);
2544  skip_physical_cols = cd->columnType.get_physical_cols();
2545  }
2546  // Advance to the next field within the row
2547  import_idx++;
2548  }
2549  rows_completed++;
2550  } catch (const std::exception& e) {
2551  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2552  import_buffers[col_idx_to_pop]->pop_value();
2553  }
2554  LOG(ERROR) << "Input exception thrown: " << e.what()
2555  << ". Row discarded, issue at column : " << (col_idx + 1)
2556  << " data :" << row;
2557  }
2558  }
2559  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2560  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2561  loader->load(import_buffers, rows_completed);
2562 }
2563 
2564 char MapDHandler::unescape_char(std::string str) {
2565  char out = str[0];
2566  if (str.size() == 2 && str[0] == '\\') {
2567  if (str[1] == 't') {
2568  out = '\t';
2569  } else if (str[1] == 'n') {
2570  out = '\n';
2571  } else if (str[1] == '0') {
2572  out = '\0';
2573  } else if (str[1] == '\'') {
2574  out = '\'';
2575  } else if (str[1] == '\\') {
2576  out = '\\';
2577  }
2578  }
2579  return out;
2580 }
2581 
2583  Importer_NS::CopyParams copy_params;
2584  switch (cp.has_header) {
2585  case TImportHeaderRow::AUTODETECT:
2587  break;
2588  case TImportHeaderRow::NO_HEADER:
2590  break;
2591  case TImportHeaderRow::HAS_HEADER:
2593  break;
2594  default:
2595  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
2596  std::to_string((int)cp.has_header));
2597  break;
2598  }
2599  copy_params.quoted = cp.quoted;
2600  if (cp.delimiter.length() > 0) {
2601  copy_params.delimiter = unescape_char(cp.delimiter);
2602  } else {
2603  copy_params.delimiter = '\0';
2604  }
2605  if (cp.null_str.length() > 0) {
2606  copy_params.null_str = cp.null_str;
2607  }
2608  if (cp.quote.length() > 0) {
2609  copy_params.quote = unescape_char(cp.quote);
2610  }
2611  if (cp.escape.length() > 0) {
2612  copy_params.escape = unescape_char(cp.escape);
2613  }
2614  if (cp.line_delim.length() > 0) {
2615  copy_params.line_delim = unescape_char(cp.line_delim);
2616  }
2617  if (cp.array_delim.length() > 0) {
2618  copy_params.array_delim = unescape_char(cp.array_delim);
2619  }
2620  if (cp.array_begin.length() > 0) {
2621  copy_params.array_begin = unescape_char(cp.array_begin);
2622  }
2623  if (cp.array_end.length() > 0) {
2624  copy_params.array_end = unescape_char(cp.array_end);
2625  }
2626  if (cp.threads != 0) {
2627  copy_params.threads = cp.threads;
2628  }
2629  if (cp.s3_access_key.length() > 0) {
2630  copy_params.s3_access_key = cp.s3_access_key;
2631  }
2632  if (cp.s3_secret_key.length() > 0) {
2633  copy_params.s3_secret_key = cp.s3_secret_key;
2634  }
2635  if (cp.s3_region.length() > 0) {
2636  copy_params.s3_region = cp.s3_region;
2637  }
2638  if (cp.s3_endpoint.length() > 0) {
2639  copy_params.s3_endpoint = cp.s3_endpoint;
2640  }
2641  switch (cp.file_type) {
2642  case TFileType::POLYGON:
2644  break;
2645  case TFileType::DELIMITED:
2647  break;
2648 #ifdef ENABLE_IMPORT_PARQUET
2649  case TFileType::PARQUET:
2650  copy_params.file_type = Importer_NS::FileType::PARQUET;
2651  break;
2652 #endif
2653  default:
2654  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
2655  std::to_string((int)cp.file_type));
2656  break;
2657  }
2658  switch (cp.geo_coords_encoding) {
2659  case TEncodingType::GEOINT:
2660  copy_params.geo_coords_encoding = kENCODING_GEOINT;
2661  break;
2662  case TEncodingType::NONE:
2663  copy_params.geo_coords_encoding = kENCODING_NONE;
2664  break;
2665  default:
2666  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
2667  std::to_string((int)cp.geo_coords_encoding));
2668  break;
2669  }
2670  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2671  switch (cp.geo_coords_type) {
2672  case TDatumType::GEOGRAPHY:
2673  copy_params.geo_coords_type = kGEOGRAPHY;
2674  break;
2675  case TDatumType::GEOMETRY:
2676  copy_params.geo_coords_type = kGEOMETRY;
2677  break;
2678  default:
2679  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
2680  std::to_string((int)cp.geo_coords_type));
2681  break;
2682  }
2683  switch (cp.geo_coords_srid) {
2684  case 4326:
2685  case 3857:
2686  case 900913:
2687  copy_params.geo_coords_srid = cp.geo_coords_srid;
2688  break;
2689  default:
2690  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
2691  std::to_string((int)cp.geo_coords_srid));
2692  break;
2693  }
2694  copy_params.sanitize_column_names = cp.sanitize_column_names;
2695  copy_params.geo_layer_name = cp.geo_layer_name;
2696  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2697  copy_params.geo_explode_collections = cp.geo_explode_collections;
2698  return copy_params;
2699 }
2700 
2702  TCopyParams copy_params;
2703  copy_params.delimiter = cp.delimiter;
2704  copy_params.null_str = cp.null_str;
2705  switch (cp.has_header) {
2707  copy_params.has_header = TImportHeaderRow::AUTODETECT;
2708  break;
2710  copy_params.has_header = TImportHeaderRow::NO_HEADER;
2711  break;
2713  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
2714  break;
2715  default:
2716  CHECK(false);
2717  break;
2718  }
2719  copy_params.quoted = cp.quoted;
2720  copy_params.quote = cp.quote;
2721  copy_params.escape = cp.escape;
2722  copy_params.line_delim = cp.line_delim;
2723  copy_params.array_delim = cp.array_delim;
2724  copy_params.array_begin = cp.array_begin;
2725  copy_params.array_end = cp.array_end;
2726  copy_params.threads = cp.threads;
2727  copy_params.s3_access_key = cp.s3_access_key;
2728  copy_params.s3_secret_key = cp.s3_secret_key;
2729  copy_params.s3_region = cp.s3_region;
2730  copy_params.s3_endpoint = cp.s3_endpoint;
2731  switch (cp.file_type) {
2733  copy_params.file_type = TFileType::POLYGON;
2734  break;
2735  default:
2736  copy_params.file_type = TFileType::DELIMITED;
2737  break;
2738  }
2739  switch (cp.geo_coords_encoding) {
2740  case kENCODING_GEOINT:
2741  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
2742  break;
2743  default:
2744  copy_params.geo_coords_encoding = TEncodingType::NONE;
2745  break;
2746  }
2747  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2748  switch (cp.geo_coords_type) {
2749  case kGEOGRAPHY:
2750  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
2751  break;
2752  case kGEOMETRY:
2753  copy_params.geo_coords_type = TDatumType::GEOMETRY;
2754  break;
2755  default:
2756  CHECK(false);
2757  break;
2758  }
2759  copy_params.geo_coords_srid = cp.geo_coords_srid;
2760  copy_params.sanitize_column_names = cp.sanitize_column_names;
2761  copy_params.geo_layer_name = cp.geo_layer_name;
2762  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2763  copy_params.geo_explode_collections = cp.geo_explode_collections;
2764  return copy_params;
2765 }
2766 
2767 void add_vsi_network_prefix(std::string& path) {
2768  // do we support network file access?
2770 
2771  // modify head of filename based on source location
2772  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
2773  if (!gdal_network) {
2775  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
2776  }
2777  // invoke GDAL CURL virtual file reader
2778  path = "/vsicurl/" + path;
2779  } else if (boost::istarts_with(path, "s3://")) {
2780  if (!gdal_network) {
2782  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
2783  }
2784  // invoke GDAL S3 virtual file reader
2785  boost::replace_first(path, "s3://", "/vsis3/");
2786  }
2787 }
2788 
2789 void add_vsi_geo_prefix(std::string& path) {
2790  // single gzip'd file (not an archive)?
2791  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
2792  path = "/vsigzip/" + path;
2793  }
2794 }
2795 
2796 void add_vsi_archive_prefix(std::string& path) {
2797  // check for compressed file or file bundle
2798  if (boost::iends_with(path, ".zip")) {
2799  // zip archive
2800  path = "/vsizip/" + path;
2801  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
2802  boost::iends_with(path, ".tar.gz")) {
2803  // tar archive (compressed or uncompressed)
2804  path = "/vsitar/" + path;
2805  }
2806 }
2807 
2808 std::string remove_vsi_prefixes(const std::string& path_in) {
2809  std::string path(path_in);
2810 
2811  // these will be first
2812  if (boost::istarts_with(path, "/vsizip/")) {
2813  boost::replace_first(path, "/vsizip/", "");
2814  } else if (boost::istarts_with(path, "/vsitar/")) {
2815  boost::replace_first(path, "/vsitar/", "");
2816  } else if (boost::istarts_with(path, "/vsigzip/")) {
2817  boost::replace_first(path, "/vsigzip/", "");
2818  }
2819 
2820  // then these
2821  if (boost::istarts_with(path, "/vsicurl/")) {
2822  boost::replace_first(path, "/vsicurl/", "");
2823  } else if (boost::istarts_with(path, "/vsis3/")) {
2824  boost::replace_first(path, "/vsis3/", "s3://");
2825  }
2826 
2827  return path;
2828 }
2829 
2830 bool path_is_relative(const std::string& path) {
2831  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
2832  boost::istarts_with(path, "https://")) {
2833  return false;
2834  }
2835  return !boost::filesystem::path(path).is_absolute();
2836 }
2837 
2838 bool path_has_valid_filename(const std::string& path) {
2839  auto filename = boost::filesystem::path(path).filename().string();
2840  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
2841  return false;
2842  }
2843  return true;
2844 }
2845 
2846 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
2847  if (!path_has_valid_filename(path)) {
2848  return false;
2849  }
2850  if (include_gz) {
2851  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
2852  return true;
2853  }
2854  }
2855  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
2856  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
2857  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
2858  boost::iends_with(path, ".gdb.zip")) {
2859  return true;
2860  }
2861  return false;
2862 }
2863 
2864 bool is_a_supported_archive_file(const std::string& path) {
2865  if (!path_has_valid_filename(path)) {
2866  return false;
2867  }
2868  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
2869  return true;
2870  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
2871  boost::iends_with(path, ".tar.gz")) {
2872  return true;
2873  }
2874  return false;
2875 }
2876 
2877 std::string find_first_geo_file_in_archive(const std::string& archive_path,
2878  const Importer_NS::CopyParams& copy_params) {
2879  // get the recursive list of all files in the archive
2880  std::vector<std::string> files =
2881  Importer_NS::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
2882 
2883  // report the list
2884  LOG(INFO) << "Found " << files.size() << " files in Archive "
2885  << remove_vsi_prefixes(archive_path);
2886  for (const auto& file : files) {
2887  LOG(INFO) << " " << file;
2888  }
2889 
2890  // scan the list for the first candidate file
2891  bool found_suitable_file = false;
2892  std::string file_name;
2893  for (const auto& file : files) {
2894  if (is_a_supported_geo_file(file, false)) {
2895  file_name = file;
2896  found_suitable_file = true;
2897  break;
2898  }
2899  }
2900 
2901  // if we didn't find anything
2902  if (!found_suitable_file) {
2903  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
2904  remove_vsi_prefixes(archive_path);
2905  file_name.clear();
2906  }
2907 
2908  // done
2909  return file_name;
2910 }
2911 
2912 void MapDHandler::detect_column_types(TDetectResult& _return,
2913  const TSessionId& session,
2914  const std::string& file_name_in,
2915  const TCopyParams& cp) {
2916  auto stdlog = STDLOG(get_session_ptr(session));
2917  check_read_only("detect_column_types");
2918 
2920 
2921  std::string file_name{file_name_in};
2922 
2923  if (path_is_relative(file_name)) {
2924  // assume relative paths are relative to data_path / mapd_import / <session>
2925  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
2926  boost::filesystem::path(file_name).filename();
2927  file_name = file_path.string();
2928  }
2929 
2930  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
2931  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
2932  if (is_a_supported_geo_file(file_name, true)) {
2933  // prepare to detect geo file directly
2934  add_vsi_network_prefix(file_name);
2935  add_vsi_geo_prefix(file_name);
2936  } else if (is_a_supported_archive_file(file_name)) {
2937  // find the archive file
2938  add_vsi_network_prefix(file_name);
2939  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
2940  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
2941  }
2942  // find geo file in archive
2943  add_vsi_archive_prefix(file_name);
2944  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
2945  // prepare to detect that geo file
2946  if (geo_file.size()) {
2947  file_name = file_name + std::string("/") + geo_file;
2948  }
2949  } else {
2950  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
2951  file_name_in);
2952  }
2953  }
2954 
2955  auto file_path = boost::filesystem::path(file_name);
2956  // can be a s3 url
2957  if (!boost::istarts_with(file_name, "s3://")) {
2958  if (!boost::filesystem::path(file_name).is_absolute()) {
2959  file_path = import_path_ / picosha2::hash256_hex_string(session) /
2960  boost::filesystem::path(file_name).filename();
2961  file_name = file_path.string();
2962  }
2963 
2964  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
2965  // check for geo file
2966  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
2967  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
2968  }
2969  } else {
2970  // check for regular file
2971  if (!boost::filesystem::exists(file_path)) {
2972  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
2973  }
2974  }
2975  }
2976  try {
2977  if (copy_params.file_type == Importer_NS::FileType::DELIMITED
2978 #ifdef ENABLE_IMPORT_PARQUET
2979  || (copy_params.file_type == Importer_NS::FileType::PARQUET)
2980 #endif
2981  ) {
2982  Importer_NS::Detector detector(file_path, copy_params);
2983  std::vector<SQLTypes> best_types = detector.best_sqltypes;
2984  std::vector<EncodingType> best_encodings = detector.best_encodings;
2985  std::vector<std::string> headers = detector.get_headers();
2986  copy_params = detector.get_copy_params();
2987 
2988  _return.copy_params = copyparams_to_thrift(copy_params);
2989  _return.row_set.row_desc.resize(best_types.size());
2990  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
2991  TColumnType col;
2992  SQLTypes t = best_types[col_idx];
2993  EncodingType encodingType = best_encodings[col_idx];
2994  SQLTypeInfo ti(t, false, encodingType);
2995  if (IS_GEO(t)) {
2996  // set this so encoding_to_thrift does the right thing
2997  ti.set_compression(copy_params.geo_coords_encoding);
2998  // fill in these directly
2999  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3000  col.col_type.scale = copy_params.geo_coords_srid;
3001  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3002  }
3003  col.col_type.type = type_to_thrift(ti);
3004  col.col_type.encoding = encoding_to_thrift(ti);
3005  if (copy_params.sanitize_column_names) {
3006  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3007  } else {
3008  col.col_name = headers[col_idx];
3009  }
3010  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3011  _return.row_set.row_desc[col_idx] = col;
3012  }
3013  size_t num_samples = 100;
3014  auto sample_data = detector.get_sample_rows(num_samples);
3015 
3016  TRow sample_row;
3017  for (auto row : sample_data) {
3018  sample_row.cols.clear();
3019  for (const auto& s : row) {
3020  TDatum td;
3021  td.val.str_val = s;
3022  td.is_null = s.empty();
3023  sample_row.cols.push_back(td);
3024  }
3025  _return.row_set.rows.push_back(sample_row);
3026  }
3027  } else if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
3028  // @TODO simon.eves get this from somewhere!
3029  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
3030 
3031  check_geospatial_files(file_path, copy_params);
3032  std::list<ColumnDescriptor> cds = Importer_NS::Importer::gdalToColumnDescriptors(
3033  file_path.string(), geoColumnName, copy_params);
3034  for (auto cd : cds) {
3035  if (copy_params.sanitize_column_names) {
3036  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
3037  }
3038  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
3039  }
3040  std::map<std::string, std::vector<std::string>> sample_data;
3042  file_path.string(), geoColumnName, sample_data, 100, copy_params);
3043  if (sample_data.size() > 0) {
3044  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
3045  TRow sample_row;
3046  for (auto cd : cds) {
3047  TDatum td;
3048  td.val.str_val = sample_data[cd.sourceName].at(i);
3049  td.is_null = td.val.str_val.empty();
3050  sample_row.cols.push_back(td);
3051  }
3052  _return.row_set.rows.push_back(sample_row);
3053  }
3054  }
3055  _return.copy_params = copyparams_to_thrift(copy_params);
3056  }
3057  } catch (const std::exception& e) {
3058  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
3059  }
3060 }
3061 
3063  const std::string& query_str,
3064  const Catalog_Namespace::SessionInfo& session_info,
3065  const std::string& action /* render or validate */) {
3066  auto& cat = session_info.getCatalog();
3067  LOG(INFO) << action << ": " << hide_sensitive_data_from_query(query_str);
3068  SQLParser parser;
3069  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
3070  std::string last_parsed;
3071  int num_parse_errors = 0;
3072  try {
3073  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
3074  } catch (std::exception& e) {
3075  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3076  }
3077  if (num_parse_errors > 0) {
3078  THROW_MAPD_EXCEPTION("Syntax error at: " + last_parsed);
3079  }
3080  if (parse_trees.size() != 1) {
3081  THROW_MAPD_EXCEPTION("Can only " + action + " a single query at a time.");
3082  }
3083  Parser::Stmt* stmt = parse_trees.front().get();
3084  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
3085  if (ddl != nullptr) {
3086  THROW_MAPD_EXCEPTION("Can only " + action + " SELECT statements.");
3087  }
3088  auto dml = static_cast<Parser::DMLStmt*>(stmt);
3089  Analyzer::Query query;
3090  dml->analyze(cat, query);
3091  Planner::Optimizer optimizer(query, cat);
3092  return optimizer.optimize();
3093 }
3094 
3095 void MapDHandler::render_vega(TRenderResult& _return,
3096  const TSessionId& session,
3097  const int64_t widget_id,
3098  const std::string& vega_json,
3099  const int compression_level,
3100  const std::string& nonce) {
3101  auto stdlog = STDLOG(get_session_ptr(session),
3102  "widget_id",
3103  widget_id,
3104  "compression_level",
3105  compression_level,
3106  "vega_json",
3107  vega_json,
3108  "nonce",
3109  nonce);
3110  auto session_ptr = stdlog.getConstSessionInfo();
3111  if (!render_handler_) {
3112  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
3113  }
3114 
3115  _return.total_time_ms = measure<>::execution([&]() {
3116  try {
3117  render_handler_->render_vega(
3118  _return,
3119  std::make_shared<Catalog_Namespace::SessionInfo>(*session_ptr),
3120  widget_id,
3121  vega_json,
3122  compression_level,
3123  nonce);
3124  } catch (std::exception& e) {
3125  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3126  }
3127  });
3128 }
3129 
3131  int32_t dashboard_id,
3132  AccessPrivileges requestedPermissions) {
3133  DBObject object(dashboard_id, DashboardDBObjectType);
3134  auto& catalog = session_info.getCatalog();
3135  auto& user = session_info.get_currentUser();
3136  object.loadKey(catalog);
3137  object.setPrivileges(requestedPermissions);
3138  std::vector<DBObject> privs = {object};
3139  return SysCatalog::instance().checkPrivileges(user, privs);
3140 }
3141 
3142 // dashboards
3143 void MapDHandler::get_dashboard(TDashboard& dashboard,
3144  const TSessionId& session,
3145  const int32_t dashboard_id) {
3146  auto stdlog = STDLOG(get_session_ptr(session));
3147  auto session_ptr = stdlog.getConstSessionInfo();
3148  auto const& cat = session_ptr->getCatalog();
3150  auto dash = cat.getMetadataForDashboard(dashboard_id);
3151  if (!dash) {
3152  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
3153  " doesn't exist");
3154  }
3156  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3157  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
3158  std::to_string(dashboard_id));
3159  }
3160  user_meta.userName = "";
3161  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3162  auto objects_list = SysCatalog::instance().getMetadataForObject(
3163  cat.getCurrentDB().dbId,
3164  static_cast<int>(DBObjectType::DashboardDBObjectType),
3165  dashboard_id);
3166  dashboard.dashboard_name = dash->dashboardName;
3167  dashboard.dashboard_state = dash->dashboardState;
3168  dashboard.image_hash = dash->imageHash;
3169  dashboard.update_time = dash->updateTime;
3170  dashboard.dashboard_metadata = dash->dashboardMetadata;
3171  dashboard.dashboard_owner = dash->user;
3172  dashboard.dashboard_id = dash->dashboardId;
3173  if (objects_list.empty() ||
3174  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3175  dashboard.is_dash_shared = false;
3176  } else {
3177  dashboard.is_dash_shared = true;
3178  }
3179 }
3180 
3181 void MapDHandler::get_dashboards(std::vector<TDashboard>& dashboards,
3182  const TSessionId& session) {
3183  auto stdlog = STDLOG(get_session_ptr(session));
3184  auto session_ptr = stdlog.getConstSessionInfo();
3185  auto const& cat = session_ptr->getCatalog();
3187  const auto dashes = cat.getAllDashboardsMetadata();
3188  user_meta.userName = "";
3189  for (const auto d : dashes) {
3190  SysCatalog::instance().getMetadataForUserById(d->userId, user_meta);
3192  *session_ptr, d->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3193  auto objects_list = SysCatalog::instance().getMetadataForObject(
3194  cat.getCurrentDB().dbId,
3195  static_cast<int>(DBObjectType::DashboardDBObjectType),
3196  d->dashboardId);
3197  TDashboard dash;
3198  dash.dashboard_name = d->dashboardName;
3199  dash.image_hash = d->imageHash;
3200  dash.update_time = d->updateTime;
3201  dash.dashboard_metadata = d->dashboardMetadata;
3202  dash.dashboard_id = d->dashboardId;
3203  dash.dashboard_owner = d->user;
3204  // dashboardState is intentionally not populated here
3205  // for payload reasons
3206  // use get_dashboard call to get state
3207  if (objects_list.empty() ||
3208  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3209  dash.is_dash_shared = false;
3210  } else {
3211  dash.is_dash_shared = true;
3212  }
3213  dashboards.push_back(dash);
3214  }
3215  }
3216 }
3217 
3218 int32_t MapDHandler::create_dashboard(const TSessionId& session,
3219  const std::string& dashboard_name,
3220  const std::string& dashboard_state,
3221  const std::string& image_hash,
3222  const std::string& dashboard_metadata) {
3223  auto stdlog = STDLOG(get_session_ptr(session));
3224  auto session_ptr = stdlog.getConstSessionInfo();
3225  check_read_only("create_dashboard");
3226  auto& cat = session_ptr->getCatalog();
3227 
3228  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
3230  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
3231  }
3232 
3233  auto dash = cat.getMetadataForDashboard(
3234  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
3235  if (dash) {
3236  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
3237  }
3238 
3240  dd.dashboardName = dashboard_name;
3241  dd.dashboardState = dashboard_state;
3242  dd.imageHash = image_hash;
3243  dd.dashboardMetadata = dashboard_metadata;
3244  dd.userId = session_ptr->get_currentUser().userId;
3245  dd.user = session_ptr->get_currentUser().userName;
3246 
3247  try {
3248  auto id = cat.createDashboard(dd);
3249  // TODO: transactionally unsafe
3250  SysCatalog::instance().createDBObject(
3251  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
3252  return id;
3253  } catch (const std::exception& e) {
3254  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3255  }
3256 }
3257 
3258 void MapDHandler::replace_dashboard(const TSessionId& session,
3259  const int32_t dashboard_id,
3260  const std::string& dashboard_name,
3261  const std::string& dashboard_owner,
3262  const std::string& dashboard_state,
3263  const std::string& image_hash,
3264  const std::string& dashboard_metadata) {
3265  auto stdlog = STDLOG(get_session_ptr(session));
3266  auto session_ptr = stdlog.getConstSessionInfo();
3267  check_read_only("replace_dashboard");
3268  auto& cat = session_ptr->getCatalog();
3269 
3271  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
3272  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
3273  }
3274 
3276  dd.dashboardName = dashboard_name;
3277  dd.dashboardState = dashboard_state;
3278  dd.imageHash = image_hash;
3279  dd.dashboardMetadata = dashboard_metadata;
3281  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
3282  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
3283  " does not exist");
3284  }
3285  dd.userId = user.userId;
3286  dd.user = dashboard_owner;
3287  dd.dashboardId = dashboard_id;
3288 
3289  try {
3290  cat.replaceDashboard(dd);
3291  } catch (const std::exception& e) {
3292  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3293  }
3294 }
3295 
3296 void MapDHandler::delete_dashboard(const TSessionId& session,
3297  const int32_t dashboard_id) {
3298  auto stdlog = STDLOG(get_session_ptr(session));
3299  auto session_ptr = stdlog.getConstSessionInfo();
3300  check_read_only("delete_dashboard");
3301  auto& cat = session_ptr->getCatalog();
3302  auto dash = cat.getMetadataForDashboard(dashboard_id);
3303  if (!dash) {
3304  THROW_MAPD_EXCEPTION("Dashboard with id" + std::to_string(dashboard_id) +
3305  " doesn't exist, so cannot delete it");
3306  }
3308  *session_ptr, dash->dashboardId, AccessPrivileges::DELETE_DASHBOARD)) {
3309  THROW_MAPD_EXCEPTION("Not enough privileges to delete a dashboard.");
3310  }
3311  try {
3312  cat.deleteMetadataForDashboard(dashboard_id);
3313  } catch (const std::exception& e) {
3314  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3315  }
3316 }
3317 
3318 std::vector<std::string> MapDHandler::get_valid_groups(const TSessionId& session,
3319  int32_t dashboard_id,
3320  std::vector<std::string> groups) {
3321  const auto session_info = get_session_copy(session);
3322  auto& cat = session_info.getCatalog();
3323  auto dash = cat.getMetadataForDashboard(dashboard_id);
3324  if (!dash) {
3325  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3326  " does not exist");
3327  } else if (session_info.get_currentUser().userId != dash->userId &&
3328  !session_info.get_currentUser().isSuper) {
3329  throw std::runtime_error(
3330  "User should be either owner of dashboard or super user to share/unshare it");
3331  }
3332  std::vector<std::string> valid_groups;
3334  for (auto& group : groups) {
3335  user_meta.isSuper = false; // initialize default flag
3336  if (!SysCatalog::instance().getGrantee(group)) {
3337  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
3338  } else if (!user_meta.isSuper) {
3339  valid_groups.push_back(group);
3340  }
3341  }
3342  return valid_groups;
3343 }
3344 
3345 // NOOP: Grants not available for objects as of now
3346 void MapDHandler::share_dashboard(const TSessionId& session,
3347  const int32_t dashboard_id,
3348  const std::vector<std::string>& groups,
3349  const std::vector<std::string>& objects,
3350  const TDashboardPermissions& permissions,
3351  const bool grant_role = false) {
3352  auto stdlog = STDLOG(get_session_ptr(session));
3353  auto session_ptr = stdlog.getConstSessionInfo();
3354  check_read_only("share_dashboard");
3355  std::vector<std::string> valid_groups;
3356  valid_groups = get_valid_groups(session, dashboard_id, groups);
3357  auto const& cat = session_ptr->getCatalog();
3358  // By default object type can only be dashboard
3360  DBObject object(dashboard_id, object_type);
3361  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3362  !permissions.view_) {
3363  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for grants");
3364  } else {
3365  AccessPrivileges privs;
3366 
3367  object.resetPrivileges();
3368  if (permissions.delete_) {
3370  }
3371 
3372  if (permissions.create_) {
3374  }
3375 
3376  if (permissions.edit_) {
3378  }
3379 
3380  if (permissions.view_) {
3382  }
3383 
3384  object.setPrivileges(privs);
3385  }
3386  SysCatalog::instance().grantDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3387  // grant system_role to grantees for underlying objects
3388  if (grant_role) {
3389  auto dash = cat.getMetadataForDashboard(dashboard_id);
3390  SysCatalog::instance().grantRoleBatch({dash->dashboardSystemRoleName}, valid_groups);
3391  }
3392 }
3393 
3394 void MapDHandler::unshare_dashboard(const TSessionId& session,
3395  const int32_t dashboard_id,
3396  const std::vector<std::string>& groups,
3397  const std::vector<std::string>& objects,
3398  const TDashboardPermissions& permissions) {
3399  auto stdlog = STDLOG(get_session_ptr(session));
3400  auto session_ptr = stdlog.getConstSessionInfo();
3401  check_read_only("unshare_dashboard");
3402  std::vector<std::string> valid_groups;
3403  valid_groups = get_valid_groups(session, dashboard_id, groups);
3404  auto const& cat = session_ptr->getCatalog();
3405  // By default object type can only be dashboard
3407  DBObject object(dashboard_id, object_type);
3408  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3409  !permissions.view_) {
3410  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for revokes");
3411  } else {
3412  AccessPrivileges privs;
3413 
3414  object.resetPrivileges();
3415  if (permissions.delete_) {
3417  }
3418 
3419  if (permissions.create_) {
3421  }
3422 
3423  if (permissions.edit_) {
3425  }
3426 
3427  if (permissions.view_) {
3429  }
3430 
3431  object.setPrivileges(privs);
3432  }
3433  SysCatalog::instance().revokeDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3434  // revoke system_role from grantees for underlying objects
3435  const auto dash = cat.getMetadataForDashboard(dashboard_id);
3436  SysCatalog::instance().revokeDashboardSystemRole(dash->dashboardSystemRoleName,
3437  valid_groups);
3438 }
3439 
3441  std::vector<TDashboardGrantees>& dashboard_grantees,
3442  const TSessionId& session,
3443  int32_t dashboard_id) {
3444  auto stdlog = STDLOG(get_session_ptr(session));
3445  auto session_ptr = stdlog.getConstSessionInfo();
3446  auto const& cat = session_ptr->getCatalog();
3448  auto dash = cat.getMetadataForDashboard(dashboard_id);
3449  if (!dash) {
3450  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3451  " does not exist");
3452  } else if (session_ptr->get_currentUser().userId != dash->userId &&
3453  !session_ptr->get_currentUser().isSuper) {
3455  "User should be either owner of dashboard or super user to access grantees");
3456  }
3457  std::vector<ObjectRoleDescriptor*> objectsList;
3458  objectsList = SysCatalog::instance().getMetadataForObject(
3459  cat.getCurrentDB().dbId,
3460  static_cast<int>(DBObjectType::DashboardDBObjectType),
3461  dashboard_id); // By default objecttypecan be only dashabaords
3462  user_meta.userId = -1;
3463  user_meta.userName = "";
3464  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3465  for (auto object : objectsList) {
3466  if (user_meta.userName == object->roleName) {
3467  // Mask owner
3468  continue;
3469  }
3470  TDashboardGrantees grantee;
3471  TDashboardPermissions perm;
3472  grantee.name = object->roleName;
3473  grantee.is_user = object->roleType;
3474  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3475  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3476  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3477  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3478  grantee.permissions = perm;
3479  dashboard_grantees.push_back(grantee);
3480  }
3481 }
3482 
3483 void MapDHandler::create_link(std::string& _return,
3484  const TSessionId& session,
3485  const std::string& view_state,
3486  const std::string& view_metadata) {
3487  auto stdlog = STDLOG(get_session_ptr(session));
3488  auto session_ptr = stdlog.getConstSessionInfo();
3489  // check_read_only("create_link");
3490  auto& cat = session_ptr->getCatalog();
3491 
3492  LinkDescriptor ld;
3493  ld.userId = session_ptr->get_currentUser().userId;
3494  ld.viewState = view_state;
3495  ld.viewMetadata = view_metadata;
3496 
3497  try {
3498  _return = cat.createLink(ld, 6);
3499  } catch (const std::exception& e) {
3500  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3501  }
3502 }
3503 
3505  const std::string& name,
3506  const bool is_array) {
3507  TColumnType ct;
3508  ct.col_name = name;
3509  ct.col_type.type = type;
3510  ct.col_type.is_array = is_array;
3511  return ct;
3512 }
3513 
3514 void MapDHandler::check_geospatial_files(const boost::filesystem::path file_path,
3515  const Importer_NS::CopyParams& copy_params) {
3516  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
3517  if (std::find(shp_ext.begin(),
3518  shp_ext.end(),
3519  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
3520  shp_ext.end()) {
3521  for (auto ext : shp_ext) {
3522  auto aux_file = file_path;
3524  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
3525  copy_params) &&
3526  !Importer_NS::Importer::gdalFileExists(aux_file.replace_extension(ext).string(),
3527  copy_params)) {
3528  throw std::runtime_error("required file for shapefile does not exist: " +
3529  aux_file.filename().string());
3530  }
3531  }
3532  }
3533 }
3534 
3535 void MapDHandler::create_table(const TSessionId& session,
3536  const std::string& table_name,
3537  const TRowDescriptor& rd,
3538  const TFileType::type file_type,
3539  const TCreateParams& create_params) {
3540  auto stdlog = STDLOG("table_name", table_name);
3541  check_read_only("create_table");
3542 
3543  if (ImportHelpers::is_reserved_name(table_name)) {
3544  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
3545  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
3546  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
3547  }
3548 
3549  auto rds = rd;
3550 
3551  // no longer need to manually add the poly column for a TFileType::POLYGON table
3552  // a column of the correct geo type has already been added
3553  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
3554 
3555  std::string stmt{"CREATE TABLE " + table_name};
3556  std::vector<std::string> col_stmts;
3557 
3558  for (auto col : rds) {
3559  if (ImportHelpers::is_reserved_name(col.col_name)) {
3560  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
3561  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
3562  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
3563  }
3564  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
3565  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
3566  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
3567  " for column: " + col.col_name);
3568  }
3569 
3570  if (col.col_type.type == TDatumType::DECIMAL) {
3571  // if no precision or scale passed in set to default 14,7
3572  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
3573  col.col_type.precision = 14;
3574  col.col_type.scale = 7;
3575  }
3576  }
3577 
3578  std::string col_stmt;
3579  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
3580 
3581  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
3582  // `nullable` argument, leading this to default to false. Uncomment for v2.
3583  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
3584 
3585  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
3586  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
3587  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
3588  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
3589  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
3590  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
3591  }
3592  } else if (col.col_type.type == TDatumType::STR) {
3593  // non DICT encoded strings
3594  col_stmt.append(" ENCODING NONE");
3595  } else if (col.col_type.type == TDatumType::POINT ||
3596  col.col_type.type == TDatumType::LINESTRING ||
3597  col.col_type.type == TDatumType::POLYGON ||
3598  col.col_type.type == TDatumType::MULTIPOLYGON) {
3599  // non encoded compressable geo
3600  if (col.col_type.scale == 4326) {
3601  col_stmt.append(" ENCODING NONE");
3602  }
3603  }
3604  col_stmts.push_back(col_stmt);
3605  }
3606 
3607  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
3608 
3609  if (create_params.is_replicated) {
3610  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
3611  }
3612 
3613  stmt.append(";");
3614 
3615  TQueryResult ret;
3616  sql_execute(ret, session, stmt, true, "", -1, -1);
3617 }
3618 
3619 void MapDHandler::import_table(const TSessionId& session,
3620  const std::string& table_name,
3621  const std::string& file_name_in,
3622  const TCopyParams& cp) {
3623  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3624  auto session_ptr = stdlog.getConstSessionInfo();
3625  check_read_only("import_table");
3626  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
3627  auto& cat = session_ptr->getCatalog();
3628 
3629  const TableDescriptor* td = cat.getMetadataForTable(table_name);
3630  if (td == nullptr) {
3631  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
3632  }
3633  check_table_load_privileges(*session_ptr, table_name);
3634 
3635  std::string file_name{file_name_in};
3636  auto file_path = boost::filesystem::path(file_name);
3638  if (!boost::istarts_with(file_name, "s3://")) {
3639  if (!boost::filesystem::path(file_name).is_absolute()) {
3640  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3641  boost::filesystem::path(file_name).filename();
3642  file_name = file_path.string();
3643  }
3644  if (!boost::filesystem::exists(file_path)) {
3645  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3646  }
3647  }
3648 
3649  // TODO(andrew): add delimiter detection to Importer
3650  if (copy_params.delimiter == '\0') {
3651  copy_params.delimiter = ',';
3652  if (boost::filesystem::extension(file_path) == ".tsv") {
3653  copy_params.delimiter = '\t';
3654  }
3655  }
3656 
3657  try {
3658  std::unique_ptr<Importer_NS::Importer> importer;
3659  if (leaf_aggregator_.leafCount() > 0) {
3660  importer.reset(new Importer_NS::Importer(
3661  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
3662  file_path.string(),
3663  copy_params));
3664  } else {
3665  importer.reset(new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
3666  }
3667  auto ms = measure<>::execution([&]() { importer->import(); });
3668  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
3669  } catch (const std::exception& e) {
3670  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
3671  }
3672 }
3673 
3674 namespace {
3675 
3676 // helper functions for error checking below
3677 // these would usefully be added as methods of TDatumType
3678 // but that's not possible as it's auto-generated by Thrift
3679 
3681  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
3682  t == TDatumType::LINESTRING || t == TDatumType::POINT);
3683 }
3684 
3685 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3686 
3687 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
3688  std::stringstream ss;
3689  ss << t;
3690  return ss.str();
3691 }
3692 
3693 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
3694  std::string result;
3695  switch (p) {
3696  case SQLTypes::kGEOGRAPHY:
3697  result = "GEOGRAPHY";
3698  break;
3699  case SQLTypes::kGEOMETRY:
3700  result = "GEOMETRY";
3701  break;
3702  default:
3703  result = "INVALID";
3704  break;
3705  }
3706  return result;
3707 }
3708 
3709 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
3710  std::stringstream ss;
3711  ss << t;
3712  return ss.str();
3713 }
3714 
3715 #endif
3716 
3717 } // namespace
3718 
3719 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
3720  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
3721  "' to table '" + table_name + "'. Column '" + cd->columnName + \
3722  "' " + attr + " mismatch (got '" + got + "', expected '" + \
3723  expected + "')");
3724 
3725 void MapDHandler::import_geo_table(const TSessionId& session,
3726  const std::string& table_name,
3727  const std::string& file_name_in,
3728  const TCopyParams& cp,
3729  const TRowDescriptor& row_desc,
3730  const TCreateParams& create_params) {
3731  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3732  auto session_ptr = stdlog.getConstSessionInfo();
3733  check_read_only("import_table");
3734  auto& cat = session_ptr->getCatalog();
3735 
3737 
3738  std::string file_name{file_name_in};
3739 
3740  if (path_is_relative(file_name)) {
3741  // assume relative paths are relative to data_path / mapd_import / <session>
3742  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3743  boost::filesystem::path(file_name).filename();
3744  file_name = file_path.string();
3745  }
3746 
3747  if (is_a_supported_geo_file(file_name, true)) {
3748  // prepare to load geo file directly
3749  add_vsi_network_prefix(file_name);
3750  add_vsi_geo_prefix(file_name);
3751  } else if (is_a_supported_archive_file(file_name)) {
3752  // find the archive file
3753  add_vsi_network_prefix(file_name);
3754  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
3755  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3756  }
3757  // find geo file in archive
3758  add_vsi_archive_prefix(file_name);
3759  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3760  // prepare to load that geo file
3761  if (geo_file.size()) {
3762  file_name = file_name + std::string("/") + geo_file;
3763  }
3764  } else {
3765  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
3766  file_name_in);
3767  }
3768 
3769  // log what we're about to try to do
3770  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
3771  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
3772 
3773  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
3774  auto file_path = boost::filesystem::path(file_name);
3775  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3776  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
3777  }
3778 
3779  // use GDAL to check any dependent files exist (ditto)
3780  try {
3781  check_geospatial_files(file_path, copy_params);
3782  } catch (const std::exception& e) {
3783  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3784  }
3785 
3786  // get layer info and deconstruct
3787  // in general, we will get a combination of layers of these four types:
3788  // EMPTY: no rows, report and skip
3789  // GEO: create a geo table from this
3790  // NON_GEO: create a regular table from this
3791  // UNSUPPORTED_GEO: report and skip
3792  std::vector<Importer_NS::Importer::GeoFileLayerInfo> layer_info;
3793  try {
3794  layer_info = Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
3795  } catch (const std::exception& e) {
3796  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3797  }
3798 
3799  // categorize the results
3800  using LayerNameToContentsMap =
3801  std::map<std::string, Importer_NS::Importer::GeoFileLayerContents>;
3802  LayerNameToContentsMap load_layers;
3803  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
3804  for (const auto& layer : layer_info) {
3805  switch (layer.contents) {
3807  LOG(INFO) << "import_geo_table: '" << layer.name
3808  << "' (will import as geo table)";
3809  load_layers[layer.name] = layer.contents;
3810  break;
3812  LOG(INFO) << "import_geo_table: '" << layer.name
3813  << "' (will import as regular table)";
3814  load_layers[layer.name] = layer.contents;
3815  break;
3817  LOG(WARNING) << "import_geo_table: '" << layer.name
3818  << "' (will not import, unsupported geo type)";
3819  break;
3821  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
3822  break;
3823  default:
3824  break;
3825  }
3826  }
3827 
3828  // if nothing is loadable, stop now
3829  if (load_layers.size() == 0) {
3830  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
3831  }
3832 
3833  // if we've been given an explicit layer name, check that it exists and is loadable
3834  // scan the original list, as it may exist but not have been gathered as loadable
3835  if (copy_params.geo_layer_name.size()) {
3836  bool found = false;
3837  for (const auto& layer : layer_info) {
3838  if (copy_params.geo_layer_name == layer.name) {
3839  if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::GEO ||
3841  // forget all the other layers and just load this one
3842  load_layers.clear();
3843  load_layers[layer.name] = layer.contents;
3844  found = true;
3845  break;
3846  } else if (layer.contents ==
3848  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3849  copy_params.geo_layer_name +
3850  "' has unsupported geo type!");
3851  } else if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::EMPTY) {
3852  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3853  copy_params.geo_layer_name + "' is empty!");
3854  }
3855  }
3856  }
3857  if (!found) {
3858  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3859  copy_params.geo_layer_name + "' not found!");
3860  }
3861  }
3862 
3863  // Immerse import of multiple layers is not yet supported
3864  // @TODO fix this!
3865  if (row_desc.size() > 0 && load_layers.size() > 1) {
3867  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
3868  }
3869 
3870  // one definition of layer table name construction
3871  // we append the layer name if we're loading more than one table
3872  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
3873  const std::string& layer_name) {
3874  if (load_layers.size() > 1) {
3875  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
3876  if (sanitized_layer_name != layer_name) {
3877  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
3878  << sanitized_layer_name << "' for table name";
3879  }
3880  return table_name + "_" + sanitized_layer_name;
3881  }
3882  return table_name;
3883  };
3884 
3885  // if we're importing multiple tables, then NONE of them must exist already
3886  if (load_layers.size() > 1) {
3887  for (const auto& layer : load_layers) {
3888  // construct table name
3889  auto this_table_name = construct_layer_table_name(table_name, layer.first);
3890 
3891  // table must not exist
3892  if (cat.getMetadataForTable(this_table_name)) {
3893  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
3894  "' already exists, aborting!");
3895  }
3896  }
3897  }
3898 
3899  // prepare to gather errors that would otherwise be exceptions, as we can only throw one
3900  std::vector<std::string> caught_exception_messages;
3901 
3902  // prepare to time multi-layer import
3903  double total_import_ms = 0.0;
3904 
3905  // now we're safe to start importing
3906  // we loop over the layers we're going to attempt to load
3907  for (const auto& layer : load_layers) {
3908  // unpack
3909  const auto& layer_name = layer.first;
3910  const auto& layer_contents = layer.second;
3911  bool is_geo_layer =
3913 
3914  // construct table name again
3915  auto this_table_name = construct_layer_table_name(table_name, layer_name);
3916 
3917  // report
3918  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
3919 
3920  // we need a row descriptor
3921  TRowDescriptor rd;
3922  if (row_desc.size() > 0) {
3923  // we have a valid RowDescriptor
3924  // this is the case where Immerse has already detected and created
3925  // all we need to do is import and trust that the data will match
3926  // use the provided row descriptor
3927  // table must already exist (we check this below)
3928  rd = row_desc;
3929  } else {
3930  // we don't have a RowDescriptor
3931  // we have to detect the file ourselves
3932  TDetectResult cds;
3933  TCopyParams cp_copy = cp; // retain S3 auth tokens
3934  cp_copy.geo_layer_name = layer_name;
3935  cp_copy.file_type = TFileType::POLYGON;
3936  try {
3937  detect_column_types(cds, session, file_name_in, cp_copy);
3938  } catch (const std::exception& e) {
3939  // capture the error and abort this layer
3940  caught_exception_messages.emplace_back(
3941  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
3942  continue;
3943  }
3944  rd = cds.row_set.row_desc;
3945 
3946  // then, if the table does NOT already exist, create it
3947  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
3948  if (!td) {
3949  try {
3950  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
3951  } catch (const std::exception& e) {
3952  // capture the error and abort this layer
3953  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
3954  layer_name + "':" + e.what());
3955  continue;
3956  }
3957  }
3958  }
3959 
3960  // by this point, the table should exist, one way or another
3961  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
3962  if (!td) {
3963  // capture the error and abort this layer
3964  std::string exception_message =
3965  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
3966  this_table_name + "'; table does not exist or failed to create.";
3967  caught_exception_messages.emplace_back(exception_message);
3968  continue;
3969  }
3970 
3971  // then, we have to verify that the structure matches
3972  // get column descriptors (non-system, non-deleted, logical columns only)
3973  const auto col_descriptors =
3974  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
3975 
3976  // first, compare the column count
3977  if (col_descriptors.size() != rd.size()) {
3978  // capture the error and abort this layer
3979  std::string exception_message =
3980  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
3981  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
3982  ", expecting " + std::to_string(col_descriptors.size()) + ")";
3983  caught_exception_messages.emplace_back(exception_message);
3984  continue;
3985  }
3986 
3987  try {
3988  // then the names and types
3989  int rd_index = 0;
3990  for (auto cd : col_descriptors) {
3991  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
3992  std::string gname = rd[rd_index].col_name; // got
3993  std::string ename = cd->columnName; // expecting
3994 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3995  TTypeInfo gti = rd[rd_index].col_type; // got
3996 #endif
3997  TTypeInfo eti = cd_col_type.col_type; // expecting
3998  // check for name match
3999  if (gname != ename) {
4000  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
4001  gname == OMNISCI_GEO_PREFIX) {
4002  // rename incoming geo column to match existing legacy default geo column
4003  rd[rd_index].col_name = gname;
4004  LOG(INFO)
4005  << "import_geo_table: Renaming incoming geo column to match existing "
4006  "legacy default geo column";
4007  } else {
4008  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
4009  }
4010  }
4011 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4012  // check for type attributes match
4013  // these attrs must always match regardless of type
4014  if (gti.type != eti.type) {
4016  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
4017  }
4018  if (gti.is_array != eti.is_array) {
4020  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
4021  }
4022  if (gti.nullable != eti.nullable) {
4024  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
4025  }
4026  if (TTypeInfo_IsGeo(eti.type)) {
4027  // for geo, only these other attrs must also match
4028  // encoding and comp_param are allowed to differ
4029  // this allows appending to existing geo table
4030  // without needing to know the existing encoding
4031  if (gti.precision != eti.precision) {
4033  "geo sub-type",
4034  TTypeInfo_GeoSubTypeToString(gti.precision),
4035  TTypeInfo_GeoSubTypeToString(eti.precision));
4036  }
4037  if (gti.scale != eti.scale) {
4039  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
4040  }
4041  if (gti.encoding != eti.encoding) {
4042  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
4043  }
4044  if (gti.comp_param != eti.comp_param) {
4045  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
4046  }
4047  } else {
4048  // non-geo, all other attrs must also match
4049  // @TODO consider relaxing some of these dependent on type
4050  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
4051  if (gti.precision != eti.precision) {
4053  std::to_string(gti.precision),
4054  std::to_string(eti.precision));
4055  }
4056  if (gti.scale != eti.scale) {
4058  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
4059  }
4060  if (gti.encoding != eti.encoding) {
4062  "encoding",
4063  TTypeInfo_EncodingToString(gti.encoding),
4064  TTypeInfo_EncodingToString(eti.encoding));
4065  }
4066  if (gti.comp_param != eti.comp_param) {
4068  std::to_string(gti.comp_param),
4069  std::to_string(eti.comp_param));
4070  }
4071  }
4072 #endif
4073  rd_index++;
4074  }
4075  } catch (const std::exception& e) {
4076  // capture the error and abort this layer
4077  caught_exception_messages.emplace_back(e.what());
4078  continue;
4079  }
4080 
4081  std::map<std::string, std::string> colname_to_src;
4082  for (auto r : rd) {
4083  colname_to_src[r.col_name] =
4084  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
4085  }
4086 
4087  try {
4088  check_table_load_privileges(*session_ptr, this_table_name);
4089  } catch (const std::exception& e) {
4090  // capture the error and abort this layer
4091  caught_exception_messages.emplace_back(e.what());
4092  continue;
4093  }
4094 
4095  if (is_geo_layer) {
4096  // Final check to ensure that we actually have a geo column
4097  // of the expected name and type before doing the actual import,
4098  // in case the user naively overrode the name or type in Immerse
4099  // Preview (which as of 6/8/18 it still allows you to do).
4100  // This avoids a fatal assert later when it fails to find the
4101  // column. We should make Immerse more robust and disallow this.
4102  bool have_geo_column_with_correct_name = false;
4103  for (const auto& r : rd) {
4104  if (TTypeInfo_IsGeo(r.col_type.type)) {
4105  // TODO(team): allow user to override the geo column name
4106  if (r.col_name == OMNISCI_GEO_PREFIX) {
4107  have_geo_column_with_correct_name = true;
4108  } else if (r.col_name == LEGACY_GEO_PREFIX) {
4109  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
4110  // Normalize column names for geo append with legacy column naming scheme
4111  colname_to_src[r.col_name] = r.col_name;
4112  have_geo_column_with_correct_name = true;
4113  }
4114  }
4115  }
4116  if (!have_geo_column_with_correct_name) {
4117  std::string exception_message = "Table " + this_table_name +
4118  " does not have a geo column with name '" +
4119  OMNISCI_GEO_PREFIX + "'. Import aborted!";
4120  caught_exception_messages.emplace_back(exception_message);
4121  continue;
4122  }
4123  }
4124 
4125  try {
4126  // import this layer only?
4127  copy_params.geo_layer_name = layer_name;
4128 
4129  // create an importer
4130  std::unique_ptr<Importer_NS::Importer> importer;
4131  if (leaf_aggregator_.leafCount() > 0) {
4132  importer.reset(new Importer_NS::Importer(
4133  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4134  file_path.string(),
4135  copy_params));
4136  } else {
4137  importer.reset(
4138  new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
4139  }
4140 
4141  // import
4142  auto ms = measure<>::execution([&]() { importer->importGDAL(colname_to_src); });
4143  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
4144  << "s";
4145  total_import_ms += ms;
4146  } catch (const std::exception& e) {
4147  std::string exception_message =
4148  "Import of Layer '" + this_table_name + "' failed: " + e.what();
4149  caught_exception_messages.emplace_back(exception_message);
4150  continue;
4151  }
4152  }
4153 
4154  // did we catch any exceptions?
4155  if (caught_exception_messages.size()) {
4156  // combine all the strings into one and throw a single Thrift exception
4157  std::string combined_exception_message = "Failed to import geo file:\n";
4158  for (const auto& message : caught_exception_messages) {
4159  combined_exception_message += message + "\n";
4160  }
4161  THROW_MAPD_EXCEPTION(combined_exception_message);
4162  } else {
4163  // report success and total time
4164  LOG(INFO) << "Import Successful!";
4165  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
4166  }
4167 }
4168 
4169 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
4170 
4171 void MapDHandler::import_table_status(TImportStatus& _return,
4172  const TSessionId& session,
4173  const std::string& import_id) {
4174  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
4175  auto is = Importer_NS::Importer::get_import_status(import_id);
4176  _return.elapsed = is.elapsed.count();
4177  _return.rows_completed = is.rows_completed;
4178  _return.rows_estimated = is.rows_estimated;
4179  _return.rows_rejected = is.rows_rejected;
4180 }
4181 
4183  const TSessionId& session,
4184  const std::string& archive_path_in,
4185  const TCopyParams& copy_params) {
4186  auto stdlog =
4187  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
4188  std::string archive_path(archive_path_in);
4189 
4190  if (path_is_relative(archive_path)) {
4191  // assume relative paths are relative to data_path / mapd_import / <session>
4192  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4193  boost::filesystem::path(archive_path).filename();
4194  archive_path = file_path.string();
4195  }
4196 
4197  if (is_a_supported_archive_file(archive_path)) {
4198  // find the archive file
4199  add_vsi_network_prefix(archive_path);
4200  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4201  thrift_to_copyparams(copy_params))) {
4202  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4203  }
4204  // find geo file in archive
4205  add_vsi_archive_prefix(archive_path);
4206  std::string geo_file =
4207  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
4208  // what did we get?
4209  if (geo_file.size()) {
4210  // prepend it with the original path
4211  _return = archive_path_in + std::string("/") + geo_file;
4212  } else {
4213  // just return the original path
4214  _return = archive_path_in;
4215  }
4216  } else {
4217  // just return the original path
4218  _return = archive_path_in;
4219  }
4220 }
4221 
4222 void MapDHandler::get_all_files_in_archive(std::vector<std::string>& _return,
4223  const TSessionId& session,
4224  const std::string& archive_path_in,
4225  const TCopyParams& copy_params) {
4226  auto stdlog =
4227  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
4228 
4229  std::string archive_path(archive_path_in);
4230  if (path_is_relative(archive_path)) {
4231  // assume relative paths are relative to data_path / mapd_import / <session>
4232  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4233  boost::filesystem::path(archive_path).filename();
4234  archive_path = file_path.string();
4235  }
4236 
4237  if (is_a_supported_archive_file(archive_path)) {
4238  // find the archive file
4239  add_vsi_network_prefix(archive_path);
4240  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4241  thrift_to_copyparams(copy_params))) {
4242  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4243  }
4244  // find all files in archive
4245  add_vsi_archive_prefix(archive_path);
4247  archive_path, thrift_to_copyparams(copy_params));
4248  // prepend them all with original path
4249  for (auto& s : _return) {
4250  s = archive_path_in + '/' + s;
4251  }
4252  }
4253 }
4254 
4255 void MapDHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
4256  const TSessionId& session,
4257  const std::string& file_name_in,
4258  const TCopyParams& cp) {
4259  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
4260  std::string file_name(file_name_in);
4261 
4263 
4264  // handle relative paths
4265  if (path_is_relative(file_name)) {
4266  // assume relative paths are relative to data_path / mapd_import / <session>
4267  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4268  boost::filesystem::path(file_name).filename();
4269  file_name = file_path.string();
4270  }
4271 
4272  // validate file_name
4273  if (is_a_supported_geo_file(file_name, true)) {
4274  // prepare to load geo file directly
4275  add_vsi_network_prefix(file_name);
4276  add_vsi_geo_prefix(file_name);
4277  } else if (is_a_supported_archive_file(file_name)) {
4278  // find the archive file
4279  add_vsi_network_prefix(file_name);
4280  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
4281  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4282  }
4283  // find geo file in archive
4284  add_vsi_archive_prefix(file_name);
4285  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4286  // prepare to load that geo file
4287  if (geo_file.size()) {
4288  file_name = file_name + std::string("/") + geo_file;
4289  }
4290  } else {
4291  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4292  file_name_in);
4293  }
4294 
4295  // check the file actually exists
4296  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4297  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
4298  }
4299 
4300  // find all layers
4301  auto internal_layer_info =
4302  Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4303 
4304  // convert to Thrift type
4305  for (const auto& internal_layer : internal_layer_info) {
4306  TGeoFileLayerInfo layer;
4307  layer.name = internal_layer.name;
4308  switch (internal_layer.contents) {
4310  layer.contents = TGeoFileLayerContents::EMPTY;
4311  break;
4313  layer.contents = TGeoFileLayerContents::GEO;
4314  break;
4316  layer.contents = TGeoFileLayerContents::NON_GEO;
4317  break;
4319  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
4320  break;
4321  default:
4322  CHECK(false);
4323  }
4324  _return.emplace_back(layer); // no suitable constructor to just pass parameters
4325  }
4326 }
4327 
4328 void MapDHandler::start_heap_profile(const TSessionId& session) {
4329  auto stdlog = STDLOG(get_session_ptr(session));
4330 #ifdef HAVE_PROFILER
4331  if (IsHeapProfilerRunning()) {
4332  THROW_MAPD_EXCEPTION("Profiler already started");
4333  }
4334  HeapProfilerStart("omnisci");
4335 #else
4336  THROW_MAPD_EXCEPTION("Profiler not enabled");
4337 #endif // HAVE_PROFILER
4338 }
4339 
4340 void MapDHandler::stop_heap_profile(const TSessionId& session) {
4341  auto stdlog = STDLOG(get_session_ptr(session));
4342 #ifdef HAVE_PROFILER
4343  if (!IsHeapProfilerRunning()) {
4344  THROW_MAPD_EXCEPTION("Profiler not running");
4345  }
4346  HeapProfilerStop();
4347 #else
4348  THROW_MAPD_EXCEPTION("Profiler not enabled");
4349 #endif // HAVE_PROFILER
4350 }
4351 
4352 void MapDHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
4353  auto stdlog = STDLOG(get_session_ptr(session));
4354 #ifdef HAVE_PROFILER
4355  if (!IsHeapProfilerRunning()) {
4356  THROW_MAPD_EXCEPTION("Profiler not running");
4357  }
4358  auto profile_buff = GetHeapProfile();
4359  profile = profile_buff;
4360  free(profile_buff);
4361 #else
4362  THROW_MAPD_EXCEPTION("Profiler not enabled");
4363 #endif // HAVE_PROFILER
4364 }
4365 
4366 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
4367 void MapDHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
4368  if (session_it->second.use_count() > 2 ||
4369  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
4370  // SessionInfo is being used in more than one active operation. Original copy + one
4371  // stored in StdLog. Skip the checks.
4372  return;
4373  }
4374  time_t last_used_time = session_it->second->get_last_used_time();
4375  time_t start_time = session_it->second->get_start_time();
4376  if ((time(0) - last_used_time) > idle_session_duration_) {
4377  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
4378  } else if ((time(0) - start_time) > max_session_duration_) {
4379  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
4380  }
4381 }
4382 
4383 std::shared_ptr<const Catalog_Namespace::SessionInfo> MapDHandler::get_const_session_ptr(
4384  const TSessionId& session) {
4385  return get_session_ptr(session);
4386 }
4387 
4389  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4390  return *get_session_it_unsafe(session, read_lock)->second;
4391 }
4392 
4393 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_copy_ptr(
4394  const TSessionId& session) {
4395  // Note(Wamsi): We have `get_const_session_ptr` which would return as const SessionInfo
4396  // stored in the map. You can use `get_const_session_ptr` instead of the copy of
4397  // SessionInfo but beware that it can be changed in teh map. So if you do not care about
4398  // the changes then use `get_const_session_ptr` if you do then use this function to get
4399  // a copy. We should eventually aim to merge both `get_const_session_ptr` and
4400  // `get_session_copy_ptr`.
4401  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4402  auto& session_info_ref = *get_session_it_unsafe(session, read_lock)->second;
4403  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
4404 }
4405 
4406 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_ptr(
4407  const TSessionId& session_id) {
4408  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
4409  // Should be used only when you need to make updates to original SessionInfo object.
4410  // Currently used by `update_session_last_used_duration`
4411 
4412  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will be
4413  // invalid during disconnect. SessionInfo will be erased from map by the time it reaches
4414  // here. In both the above cases, we would return `nullptr` and can skip SessionInfo
4415  // updates.
4416  if (session_id.empty()) {
4417  return {};
4418  }
4419  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4420  return get_session_it_unsafe(session_id, read_lock)->second;
4421 }
4422 
4424  const Catalog_Namespace::SessionInfo& session_info,
4425  const std::string& table_name) {
4426  auto user_metadata = session_info.get_currentUser();
4427  auto& cat = session_info.getCatalog();
4428  DBObject dbObject(table_name, TableDBObjectType);
4429  dbObject.loadKey(cat);
4431  std::vector<DBObject> privObjects;
4432  privObjects.push_back(dbObject);
4433  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
4434  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
4435  user_metadata.userName + " has no insert privileges for table " +
4436  table_name + ".");
4437  }
4438 }
4439 
4440 void MapDHandler::check_table_load_privileges(const TSessionId& session,
4441  const std::string& table_name) {
4442  const auto session_info = get_session_copy(session);
4443  check_table_load_privileges(session_info, table_name);
4444 }
4445 
4447  const TExecuteMode::type mode) {
4448  const std::string& user_name = session_ptr->get_currentUser().userName;
4449  switch (mode) {
4450  case TExecuteMode::GPU:
4451  if (cpu_mode_only_) {
4452  TMapDException e;
4453  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
4454  throw e;
4455  }
4457  LOG(INFO) << "User " << user_name << " sets GPU mode.";
4458  break;
4459  case TExecuteMode::CPU:
4461  LOG(INFO) << "User " << user_name << " sets CPU mode.";
4462  break;
4463  }
4464 }
4465 
4466 std::vector<PushedDownFilterInfo> MapDHandler::execute_rel_alg(
4467  TQueryResult& _return,
4468  QueryStateProxy query_state_proxy,
4469  const std::string& query_ra,
4470  const bool column_format,
4471  const ExecutorDeviceType executor_device_type,
4472  const int32_t first_n,
4473  const int32_t at_most_n,
4474  const bool just_explain,
4475  const bool just_validate,
4476  const bool find_push_down_candidates,
4477  const bool just_calcite_explain,
4478  const bool explain_optimized_ir) const {
4479  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4480  const auto& cat = query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog();
4481  CompilationOptions co = {executor_device_type,
4482  true,
4485  explain_optimized_ir ? ExecutorExplainType::Optimized
4490  just_explain,
4491  allow_loop_joins_ || just_validate,
4493  jit_debug_,
4494  just_validate,
4497  find_push_down_candidates,
4498  just_calcite_explain,
4500  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4501  jit_debug_ ? "/tmp" : "",
4502  jit_debug_ ? "mapdquery" : "",
4504  nullptr);
4505  RelAlgExecutor ra_executor(executor.get(), cat);
4506  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4509  nullptr,
4510  nullptr),
4511  {}};
4512  _return.execution_time_ms += measure<>::execution(
4513  [&]() { result = ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr); });
4514  // reduce execution time by the time spent during queue waiting
4515  _return.execution_time_ms -= result.getRows()->getQueueTime();
4516  const auto& filter_push_down_info = result.getPushedDownFilterInfo();
4517  if (!filter_push_down_info.empty()) {
4518  return filter_push_down_info;
4519  }
4520  if (just_explain) {
4521  convert_explain(_return, *result.getRows(), column_format);
4522  } else if (!just_calcite_explain) {
4523  convert_rows(_return,
4524  timer.createQueryStateProxy(),
4525  result.getTargetsMeta(),
4526  *result.getRows(),
4527  column_format,
4528  first_n,
4529  at_most_n);
4530  }
4531  return {};
4532 }
4533 
4534 void MapDHandler::execute_rel_alg_df(TDataFrame& _return,
4535  const std::string& query_ra,
4536  const Catalog_Namespace::SessionInfo& session_info,
4537  const ExecutorDeviceType device_type,
4538  const size_t device_id,
4539  const int32_t first_n) const {
4540  const auto& cat = session_info.getCatalog();
4541  CHECK(device_type == ExecutorDeviceType::CPU ||
4543  CompilationOptions co = {session_info.get_executor_device_type(),
4544  true,
4549  ExecutionOptions eo = {false,
4551  false,
4554  jit_debug_,
4555  false,
4558  false,
4559  false,
4561  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4562  jit_debug_ ? "/tmp" : "",
4563  jit_debug_ ? "mapdquery" : "",
4565  nullptr);
4566  RelAlgExecutor ra_executor(executor.get(), cat);
4567  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4570  nullptr,
4571  nullptr),
4572  {}};
4573  _return.execution_time_ms += measure<>::execution(
4574  [&]() { result = ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr); });
4575  _return.execution_time_ms -= result.getRows()->getQueueTime();
4576  const auto rs = result.getRows();
4577  const auto converter =
4578  std::make_unique<ArrowResultSetConverter>(rs,
4579  data_mgr_,
4580  device_type,
4581  device_id,
4582  getTargetNames(result.getTargetsMeta()),
4583  first_n);
4584  ArrowResult arrow_result;
4585 
4586  _return.arrow_conversion_time_ms +=
4587  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
4588  _return.sm_handle =
4589  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
4590  _return.sm_size = arrow_result.sm_size;
4591  _return.df_handle =
4592  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
4593  if (device_type == ExecutorDeviceType::GPU) {
4594  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
4595  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
4596  ipc_handle_to_dev_ptr_.insert(
4597  std::make_pair(_return.df_handle, arrow_result.df_dev_ptr));
4598  }
4599  _return.df_size = arrow_result.df_size;
4600 }
4601 
4602 void MapDHandler::execute_root_plan(TQueryResult& _return,
4603  QueryStateProxy query_state_proxy,
4604  const Planner::RootPlan* root_plan,
4605  const bool column_format,
4606  const Catalog_Namespace::SessionInfo& session_info,
4607  const ExecutorDeviceType executor_device_type,
4608  const int32_t first_n) const {
4609  auto executor = Executor::getExecutor(
4610  root_plan->getCatalog().getCurrentDB().dbId,
4611  jit_debug_ ? "/tmp" : "",
4612  jit_debug_ ? "mapdquery" : "",
4614  render_handler_ ? render_handler_->get_render_manager() : nullptr);
4615  std::shared_ptr<ResultSet> results;
4616  _return.execution_time_ms += measure<>::execution([&]() {
4617  results = executor->execute(root_plan,
4618  session_info,
4619  true,
4620  executor_device_type,
4624  });
4625  // reduce execution time by the time spent during queue waiting
4626  _return.execution_time_ms -= results->getQueueTime();
4627  if (root_plan->get_plan_dest() == Planner::RootPlan::Dest::kEXPLAIN) {
4628  convert_explain(_return, *results, column_format);
4629  return;
4630  }
4631  const auto plan = root_plan->get_plan();
4632  CHECK(plan);
4633  const auto& targets = plan->get_targetlist();
4634  convert_rows(_return,
4635  query_state_proxy,
4636  getTargetMetaInfo(targets),
4637  *results,
4638  column_format,
4639  -1,
4640  -1);
4641 }
4642 
4643 std::vector<TargetMetaInfo> MapDHandler::getTargetMetaInfo(
4644  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4645  std::vector<TargetMetaInfo> result;
4646  for (const auto target : targets) {
4647  CHECK(target);
4648  CHECK(target->get_expr());
4649  result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
4650  }
4651  return result;
4652 }
4653 
4654 std::vector<std::string> MapDHandler::getTargetNames(
4655  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4656  std::vector<std::string> names;
4657  for (const auto target : targets) {
4658  CHECK(target);
4659  CHECK(target->get_expr());
4660  names.push_back(target->get_resname());
4661  }
4662  return names;
4663 }
4664 
4665 std::vector<std::string> MapDHandler::getTargetNames(
4666  const std::vector<TargetMetaInfo>& targets) const {
4667  std::vector<std::string> names;
4668  for (const auto target : targets) {
4669  names.push_back(target.get_resname());
4670  }
4671  return names;
4672 }
4673 
4675  const size_t idx) const {
4676  TColumnType proj_info;
4677  proj_info.col_name = target.get_resname();
4678  if (proj_info.col_name.empty()) {
4679  proj_info.col_name = "result_" + std::to_string(idx + 1);
4680  }
4681  const auto& target_ti = target.get_type_info();
4682  proj_info.col_type.type = type_to_thrift(target_ti);
4683  proj_info.col_type.encoding = encoding_to_thrift(target_ti);
4684  proj_info.col_type.nullable = !target_ti.get_notnull();
4685  proj_info.col_type.is_array = target_ti.get_type() == kARRAY;
4686  if (IS_GEO(target_ti.get_type())) {
4688  proj_info, target_ti.get_subtype(), target_ti.get_output_srid());
4689  } else {
4690  proj_info.col_type.precision = target_ti.get_precision();
4691  proj_info.col_type.scale = target_ti.get_scale();
4692  }
4693  if (target_ti.get_type() == kDATE) {
4694  proj_info.col_type.size = target_ti.get_size();
4695  }
4696  proj_info.col_type.comp_param =
4697  (target_ti.is_date_in_days() && target_ti.get_comp_param() == 0)
4698  ? 32
4699  : target_ti.get_comp_param();
4700  return proj_info;
4701 }
4702 
4704  const std::vector<TargetMetaInfo>& targets) const {
4705  TRowDescriptor row_desc;
4706  size_t i = 0;
4707  for (const auto target : targets) {
4708  row_desc.push_back(convert_target_metainfo(target, i));
4709  ++i;
4710  }
4711  return row_desc;
4712 }
4713 
4714 template <class R>
4715 void MapDHandler::convert_rows(TQueryResult& _return,
4716  QueryStateProxy query_state_proxy,
4717  const std::vector<TargetMetaInfo>& targets,
4718  const R& results,
4719  const bool column_format,
4720  const int32_t first_n,
4721  const int32_t at_most_n) const {
4722  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4723  _return.row_set.row_desc = convert_target_metainfo(targets);
4724  int32_t fetched{0};
4725  if (column_format) {
4726  _return.row_set.is_columnar = true;
4727  std::vector<TColumn> tcolumns(results.colCount());
4728  while (first_n == -1 || fetched < first_n) {
4729  const auto crt_row = results.getNextRow(true, true);
4730  if (crt_row.empty()) {
4731  break;
4732  }
4733  ++fetched;
4734  if (at_most_n >= 0 && fetched > at_most_n) {
4735  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4736  std::to_string(at_most_n));
4737  }
4738  for (size_t i = 0; i < results.colCount(); ++i) {
4739  const auto agg_result = crt_row[i];
4740  value_to_thrift_column(agg_result, targets[i].get_type_info(), tcolumns[i]);
4741  }
4742  }
4743  for (size_t i = 0; i < results.colCount(); ++i) {
4744  _return.row_set.columns.push_back(tcolumns[i]);
4745  }
4746  } else {
4747  _return.row_set.is_columnar = false;
4748  while (first_n == -1 || fetched < first_n) {
4749  const auto crt_row = results.getNextRow(true, true);
4750  if (crt_row.empty()) {
4751  break;
4752  }
4753  ++fetched;
4754  if (at_most_n >= 0 && fetched > at_most_n) {
4755  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4756  std::to_string(at_most_n));
4757  }
4758  TRow trow;
4759  trow.cols.reserve(results.colCount());
4760  for (size_t i = 0; i < results.colCount(); ++i) {
4761  const auto agg_result = crt_row[i];
4762  trow.cols.push_back(value_to_thrift(agg_result, targets[i].get_type_info()));
4763  }
4764  _return.row_set.rows.push_back(trow);
4765  }
4766  }
4767 }
4768 
4769 TRowDescriptor MapDHandler::fixup_row_descriptor(const TRowDescriptor& row_desc,
4770  const Catalog& cat) {
4771  TRowDescriptor fixedup_row_desc;
4772  for (const TColumnType& col_desc : row_desc) {
4773  auto fixedup_col_desc = col_desc;
4774  if (col_desc.col_type.encoding == TEncodingType::DICT &&
4775  col_desc.col_type.comp_param > 0) {
4776  const auto dd = cat.getMetadataForDict(col_desc.col_type.comp_param, false);
4777  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
4778  }
4779  fixedup_row_desc.push_back(fixedup_col_desc);
4780  }
4781  return fixedup_row_desc;
4782 }
4783 
4784 // create simple result set to return a single column result
4785 void MapDHandler::create_simple_result(TQueryResult& _return,
4786  const ResultSet& results,
4787  const bool column_format,
4788  const std::string label) const {
4789  CHECK_EQ(size_t(1), results.rowCount());
4790  TColumnType proj_info;
4791  proj_info.col_name = label;
4792  proj_info.col_type.type = TDatumType::STR;
4793  proj_info.col_type.nullable = false;
4794  proj_info.col_type.is_array = false;
4795  _return.row_set.row_desc.push_back(proj_info);
4796  const auto crt_row = results.getNextRow(true, true);
4797  const auto tv = crt_row[0];
4798  CHECK(results.getNextRow(true, true).empty());
4799  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
4800  CHECK(scalar_tv);
4801  const auto s_n = boost::get<NullableString>(scalar_tv);
4802  CHECK(s_n);
4803  const auto s = boost::get<std::string>(s_n);
4804  CHECK(s);
4805  if (column_format) {
4806  TColumn tcol;
4807  tcol.data.str_col.push_back(*s);
4808  tcol.nulls.push_back(false);
4809  _return.row_set.is_columnar = true;
4810  _return.row_set.columns.push_back(tcol);
4811  } else {
4812  TDatum explanation;
4813  explanation.val.str_val = *s;
4814  explanation.is_null = false;
4815  TRow trow;
4816  trow.cols.push_back(explanation);
4817  _return.row_set.is_columnar = false;
4818  _return.row_set.rows.push_back(trow);
4819  }
4820 }
4821 
4822 void MapDHandler::convert_explain(TQueryResult& _return,
4823  const ResultSet& results,
4824  const bool column_format) const {
4825  create_simple_result(_return, results, column_format, "Explanation");
4826 }
4827 
4828 void MapDHandler::convert_result(TQueryResult& _return,
4829  const ResultSet& results,
4830  const bool column_format) const {
4831  create_simple_result(_return, results, column_format, "Result");
4832 }
4833 
4834 // this all should be moved out of here to catalog
4836  const Catalog_Namespace::SessionInfo& session_info,
4837  const TableDescriptor* td,
4838  const AccessPrivileges access_priv) {
4839  CHECK(td);
4840  auto& cat = session_info.getCatalog();
4841  std::vector<DBObject> privObjects;
4842  DBObject dbObject(td->tableName, TableDBObjectType);
4843  dbObject.loadKey(cat);
4844  dbObject.setPrivileges(access_priv);
4845  privObjects.push_back(dbObject);
4846  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
4847  privObjects);
4848 };
4849 
4851  const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl);
4852  if (drop_db_stmt) {
4853  invalidate_sessions(*drop_db_stmt->getDatabaseName(), drop_db_stmt);
4854  return;
4855  }
4856  const auto rename_db_stmt = dynamic_cast<Parser::RenameDatabaseStmt*>(ddl);
4857  if (rename_db_stmt) {
4858  invalidate_sessions(*rename_db_stmt->getPreviousDatabaseName(), rename_db_stmt);
4859  return;
4860  }
4861  const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl);
4862  if (drop_user_stmt) {
4863  invalidate_sessions(*drop_user_stmt->getUserName(), drop_user_stmt);
4864  return;
4865  }
4866  const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl);
4867  if (rename_user_stmt) {
4868  invalidate_sessions(*rename_user_stmt->getOldUserName(), rename_user_stmt);
4869  return;
4870  }
4871 }
4872 
4873 void MapDHandler::sql_execute_impl(TQueryResult& _return,
4874  QueryStateProxy query_state_proxy,
4875  const bool column_format,
4876  const std::string& nonce,
4877  const ExecutorDeviceType executor_device_type,
4878  const int32_t first_n,
4879  const int32_t at_most_n) {
4880  if (leaf_handler_) {
4881  leaf_handler_->flush_queue();
4882  }
4883 
4884  _return.nonce = nonce;
4885  _return.execution_time_ms = 0;
4886  auto const& query_str = query_state_proxy.getQueryState().getQueryStr();
4887  auto session_ptr = query_state_proxy.getQueryState().getConstSessionInfo();
4888  // Call to DistributedValidate() below may change cat.
4889  auto& cat = session_ptr->getCatalog();
4890 
4891  SQLParser parser;
4892  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
4893  std::string last_parsed;
4894  int num_parse_errors = 0;
4895  std::unique_ptr<Planner::RootPlan> root_plan;
4896 
4897  /*
4898  Use this seq to simplify locking:
4899  INSERT_VALUES: CheckpointLock [ >> TableWriteLock ]
4900  INSERT_SELECT: CheckpointLock >> TableReadLock [ >>
4901  TableWriteLock ] COPY_TO/SELECT: TableReadLock COPY_FROM: CheckpointLock [
4902  >> TableWriteLock ] DROP/TRUNC: CheckpointLock >> TableWriteLock
4903  DELETE/UPDATE: CheckpointLock >> TableWriteLock
4904  */
4905 
4906  std::vector<Lock_Namespace::TableLock> table_locks;
4907 
4908  mapd_unique_lock<mapd_shared_mutex> chkptlLock;
4909  mapd_unique_lock<mapd_shared_mutex> executeWriteLock;
4910  mapd_shared_lock<mapd_shared_mutex> executeReadLock;
4911 
4912  try {
4913  ParserWrapper pw{query_str};
4914  TableMap table_map;
4915  OptionalTableMap tableNames(table_map);
4916  if (pw.isCalcitePathPermissable(read_only_)) {
4917  std::string query_ra;
4918  _return.execution_time_ms += measure<>::execution([&]() {
4919  query_ra =
4920  parse_to_ra(query_state_proxy, query_str, {}, tableNames, mapd_parameters_);
4921  });
4922 
4923  std::string query_ra_calcite_explain;
4924  if (pw.isCalciteExplain() && (!g_enable_filter_push_down || g_cluster)) {
4925  // return the ra as the result
4926  convert_explain(_return, ResultSet(query_ra), true);
4927  return;
4928  } else if (pw.isCalciteExplain()) {
4929  // removing the "explain calcite " from the beginning of the "query_str":
4930  std::string temp_query_str =
4931  query_str.substr(std::string("explain calcite ").length());
4932  query_ra_calcite_explain = parse_to_ra(
4933  query_state_proxy, temp_query_str, {}, boost::none, mapd_parameters_);
4934  }
4935 
4936  // UPDATE/DELETE needs to get a checkpoint lock as the first lock
4937  for (const auto& table : tableNames.value()) {
4938  if (table.second) {
4939  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
4940  session_ptr->getCatalog(), table.first, LockType::CheckpointLock);
4941  }
4942  }
4943  // COPY_TO/SELECT: read ExecutorOuterLock >> TableReadLock locks
4944  executeReadLock = mapd_shared_lock<mapd_shared_mutex>(
4947  session_ptr->getCatalog(), tableNames.value(), table_locks);
4948 
4949  const auto filter_push_down_requests =
4950  execute_rel_alg(_return,
4951  query_state_proxy,
4952  pw.isCalciteExplain() ? query_ra_calcite_explain : query_ra,
4953  column_format,
4954  executor_device_type,
4955  first_n,
4956  at_most_n,
4957  pw.isIRExplain(),
4958  false,
4960  pw.isCalciteExplain(),
4961  pw.getExplainType() == ParserWrapper::ExplainType::OptimizedIR);
4962  if (pw.isCalciteExplain() && filter_push_down_requests.empty()) {
4963  // we only reach here if filter push down was enabled, but no filter
4964  // push down candidate was found
4965  convert_explain(_return, ResultSet(query_ra), true);
4966  return;
4967  }
4968  if (!filter_push_down_requests.empty()) {
4970  query_state_proxy,
4971  query_ra,
4972  column_format,
4973  executor_device_type,
4974  first_n,
4975  at_most_n,
4976  pw.isIRExplain(),
4977  pw.isCalciteExplain(),
4978  filter_push_down_requests);
4979  } else if (pw.isCalciteExplain() && filter_push_down_requests.empty()) {
4980  // return the ra as the result:
4981  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
4982  // no filter push down so we continue with the initial (unchanged) query's calcite
4983  // explanation.
4984  query_ra =
4985  parse_to_ra(query_state_proxy, query_str, {}, boost::none, mapd_parameters_);
4986  convert_explain(_return, ResultSet(query_ra), true);
4987  return;
4988  }
4989  if (pw.isCalciteExplain()) {
4990  // If we reach here, the filter push down candidates has been selected and
4991  // proper output result has been already created.
4992  return;
4993  }
4994  if (pw.isCalciteExplain()) {
4995  // return the ra as the result:
4996  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
4997  // no filter push down so we continue with the initial (unchanged) query's calcite
4998  // explanation.
4999  query_ra =
5000  parse_to_ra(query_state_proxy, query_str, {}, boost::none, mapd_parameters_);
5001  convert_explain(_return, ResultSet(query_ra), true);
5002  return;
5003  }
5004  return;
5005  } else if (pw.is_optimize || pw.is_validate) {
5006  // Get the Stmt object
5007  try {
5008  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
5009  } catch (std::exception& e) {
5010  throw std::runtime_error(e.what());
5011  }
5012  if (num_parse_errors > 0) {
5013  throw std::runtime_error("Syntax error at: " + last_parsed);
5014  }
5015  CHECK_EQ(parse_trees.size(), 1u);
5016 
5017  if (pw.is_optimize) {
5018  const auto optimize_stmt =
5019  dynamic_cast<Parser::OptimizeTableStmt*>(parse_trees.front().get());
5020  CHECK(optimize_stmt);
5021 
5022  _return.execution_time_ms += measure<>::execution([&]() {
5023  const auto td = cat.getMetadataForTable(optimize_stmt->getTableName(),
5024  /*populateFragmenter=*/true);
5025 
5026  if (!td || !user_can_access_table(
5027  *session_ptr, td, AccessPrivileges::DELETE_FROM_TABLE)) {
5028  throw std::runtime_error("Table " + optimize_stmt->getTableName() +
5029  " does not exist.");
5030  }
5031 
5032  auto chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5033  cat, td->tableName, LockType::CheckpointLock);
5034  auto table_write_lock = TableLockMgr::getWriteLockForTable(cat, td->tableName);
5035 
5036  auto executor = Executor::getExecutor(
5037  cat.getCurrentDB().dbId, "", "", mapd_parameters_, nullptr);
5038  const TableOptimizer optimizer(td, executor.get(), cat);
5039  if (optimize_stmt->shouldVacuumDeletedRows()) {
5040  optimizer.vacuumDeletedRows();
5041  }
5042  optimizer.recomputeMetadata();
5043  });
5044 
5045  return;
5046  }
5047  if (pw.is_validate) {
5048  // check user is superuser
5049  if (!session_ptr->get_currentUser().isSuper) {
5050  throw std::runtime_error("Superuser is required to run VALIDATE");
5051  }
5052  const auto validate_stmt =
5053  dynamic_cast<Parser::ValidateStmt*>(parse_trees.front().get());
5054  CHECK(validate_stmt);
5055 
5056  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5058 
5059  std::string output{"Result for validate"};
5061  _return.execution_time_ms += measure<>::execution([&]() {
5062  const DistributedValidate validator(validate_stmt->getType(),
5063  validate_stmt->isRepairTypeRemove(),
5064  cat, // tables may be dropped here
5066  *session_ptr,
5067  *this);
5068  output = validator.validate();
5069  });
5070  } else {
5071  output = "Not running on a cluster nothing to validate";
5072  }
5073  convert_result(_return, ResultSet(output), true);
5074  return;
5075  }
5076  }
5077  LOG(INFO) << "passing query to legacy processor";
5078  } catch (std::exception& e) {
5079  if (strstr(e.what(), "java.lang.NullPointerException")) {
5080  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
5081  "query failed from broken view or other schema related issue");
5082  } else {
5083  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5084  }
5085  }
5086  try {
5087  // check for COPY TO stmt replace as required parser expects #~# markers
5088  const auto result = apply_copy_to_shim(query_str);
5089  num_parse_errors = parser.parse(result, parse_trees, last_parsed);
5090  } catch (std::exception& e) {
5091  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5092  }
5093  if (num_parse_errors > 0) {
5094  THROW_MAPD_EXCEPTION("Syntax error at: " + last_parsed);
5095  }
5096 
5097  auto get_legacy_plan =
5098  [&cat](const Parser::DMLStmt* dml,
5099  const bool is_explain) -> std::unique_ptr<Planner::RootPlan> {
5100  Analyzer::Query query;
5101  dml->analyze(cat, query);
5102  Planner::Optimizer optimizer(query, cat);
5103  auto root_plan_ptr = std::unique_ptr<Planner::RootPlan>(optimizer.optimize());
5104  CHECK(root_plan_ptr);
5105  if (is_explain) {
5106  root_plan_ptr->set_plan_dest(Planner::RootPlan::Dest::kEXPLAIN);
5107  }
5108  return root_plan_ptr;
5109  };
5110 
5111  auto handle_ddl =
5112  [&query_state_proxy, &session_ptr, &_return, &chkptlLock, &table_locks, this](
5113  Parser::DDLStmt* ddl) -> bool {
5114  if (!ddl) {
5115  return false;
5116  }
5117  const auto show_create_stmt = dynamic_cast<Parser::ShowCreateTableStmt*>(ddl);
5118  if (show_create_stmt) {
5119  // ParserNode ShowCreateTableStmt is currently unimplemented
5120  throw std::runtime_error(
5121  "SHOW CREATE TABLE is currently unsupported. Use `\\d` from omnisql for table "
5122  "DDL.");
5123  }
5124 
5125  const auto import_stmt = dynamic_cast<Parser::CopyTableStmt*>(ddl);
5126  if (import_stmt) {
5127  // get the lock if the table exists
5128  // thus allowing COPY FROM to execute to create a table
5129  // is it safe to do this check without a lock?
5130  // if the table doesn't exist, the getTableLock will throw an exception anyway
5131  const TableDescriptor* td =
5132  session_ptr->getCatalog().getMetadataForTable(import_stmt->get_table());
5133  if (td) {
5134  // COPY_FROM: CheckpointLock [ >> TableWriteLocks ]
5135  chkptlLock =
5136  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_ptr->getCatalog(),
5137  import_stmt->get_table(),
5139  // [ TableWriteLocks ] lock is deferred in
5140  // InsertOrderFragmenter::deleteFragments
5141  }
5142 
5143  if (g_cluster && !leaf_aggregator_.leafCount()) {
5144  // Don't allow copy from imports directly on a leaf node
5145  throw std::runtime_error(
5146  "Cannot import on an individual leaf. Please import from the Aggregator.");
5147  } else if (leaf_aggregator_.leafCount() > 0) {
5148  _return.execution_time_ms += measure<>::execution(
5149  [&]() { execute_distributed_copy_statement(import_stmt, *session_ptr); });
5150  } else {
5151  _return.execution_time_ms +=
5152  measure<>::execution([&]() { ddl->execute(*session_ptr); });
5153  }
5154 
5155  // Read response message
5156  convert_result(_return, ResultSet(*import_stmt->return_message.get()), true);
5157 
5158  // get geo_copy_from info
5159  _was_geo_copy_from = import_stmt->was_geo_copy_from();
5160  import_stmt->get_geo_copy_from_payload(_geo_copy_from_table,
5164  return true;
5165  }
5166 
5167  // Check for DDL statements requiring locking and get locks
5168  auto export_stmt = dynamic_cast<Parser::ExportQueryStmt*>(ddl);
5169  if (export_stmt) {
5170  TableMap table_map;
5171  OptionalTableMap tableNames(table_map);
5172  const auto query_string = export_stmt->get_select_stmt();
5173  const auto query_ra =
5174  parse_to_ra(query_state_proxy, query_string, {}, tableNames, mapd_parameters_);
5176  session_ptr->getCatalog(), tableNames.value(), table_locks);
5177  }
5178  auto truncate_stmt = dynamic_cast<Parser::TruncateTableStmt*>(ddl);
5179  if (truncate_stmt) {
5180  chkptlLock =
5181  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_ptr->getCatalog(),
5182  *truncate_stmt->get_table(),
5184  table_locks.emplace_back();
5185  table_locks.back().write_lock = TableLockMgr::getWriteLockForTable(
5186  session_ptr->getCatalog(), *truncate_stmt->get_table());
5187  }
5188  auto add_col_stmt = dynamic_cast<Parser::AddColumnStmt*>(ddl);
5189  if (add_col_stmt) {
5190  add_col_stmt->check_executable(*session_ptr);
5191  chkptlLock =
5192  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_ptr->getCatalog(),
5193  *add_col_stmt->get_table(),
5195  table_locks.emplace_back();
5196  table_locks.back().write_lock = TableLockMgr::getWriteLockForTable(
5197  session_ptr->getCatalog(), *add_col_stmt->get_table());
5198  }
5199 
5200  _return.execution_time_ms += measure<>::execution([&]() {
5201  ddl->execute(*session_ptr);
5203  });
5204  return true;
5205  };
5206 
5207  for (const auto& stmt : parse_trees) {
5208  try {
5209  auto select_stmt = dynamic_cast<Parser::SelectStmt*>(stmt.get());
5210  if (!select_stmt) {
5211  check_read_only("Non-SELECT statements");
5212  }
5213  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt.get());
5214  if (handle_ddl(ddl)) {
5215  if (render_handler_) {
5216  render_handler_->handle_ddl(ddl);
5217  }
5218  continue;
5219  }
5220  if (!root_plan) {
5221  // assume DML / non-explain plan (an insert or copy statement)
5222  const auto dml = dynamic_cast<Parser::DMLStmt*>(stmt.get());
5223  CHECK(dml);
5224  root_plan = get_legacy_plan(dml, false);
5225  CHECK(root_plan);
5226  }
5227  if (auto stmtp = dynamic_cast<Parser::InsertQueryStmt*>(stmt.get())) {
5228  // INSERT_SELECT: CheckpointLock >> TableReadLocks [ >> TableWriteLocks ]
5229  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5230  session_ptr->getCatalog(), *stmtp->get_table(), LockType::CheckpointLock);
5231  // >> TableReadLock locks
5232  TableMap table_map;
5233  OptionalTableMap tableNames(table_map);
5234  const auto query_string = stmtp->get_query()->to_string();
5235  const auto query_ra =
5236  parse_to_ra(query_state_proxy, query_str, {}, tableNames, mapd_parameters_);
5238  session_ptr->getCatalog(), tableNames.value(), table_locks);
5239 
5240  // [ TableWriteLocks ] lock is deferred in
5241  // InsertOrderFragmenter::deleteFragments
5242  // TODO: this statement is not supported. once supported, it must not go thru
5243  // InsertOrderFragmenter::insertData, or deadlock will occur w/o moving the
5244  // following lock back to here!!!
5245  } else if (auto stmtp = dynamic_cast<Parser::InsertValuesStmt*>(stmt.get())) {
5246  // INSERT_VALUES: CheckpointLock >> write ExecutorOuterLock [ >>
5247  // TableWriteLocks ]
5248  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5249  session_ptr->getCatalog(), *stmtp->get_table(), LockType::CheckpointLock);
5250  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5252  // [ TableWriteLocks ] lock is deferred in
5253  // InsertOrderFragmenter::deleteFragments
5254  }
5255 
5256  execute_root_plan(_return,
5257  query_state_proxy,
5258  root_plan.get(),
5259  column_format,
5260  *session_ptr,
5261  executor_device_type,
5262  first_n);
5263  } catch (std::exception& e) {
5264  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
5265  THROW_MAPD_EXCEPTION(thrift_exception ? std::string(thrift_exception->what())
5266  : std::string("Exception: ") + e.what());
5267  }
5268  }
5269 }
5270 
5272  TQueryResult& _return,
5273  QueryStateProxy query_state_proxy,
5274  std::string& query_ra,
5275  const bool column_format,
5276  const ExecutorDeviceType executor_device_type,
5277  const int32_t first_n,
5278  const int32_t at_most_n,
5279  const bool just_explain,
5280  const bool just_calcite_explain,
5281  const std::vector<PushedDownFilterInfo> filter_push_down_requests) {
5282  // collecting the selected filters' info to be sent to Calcite:
5283  std::vector<TFilterPushDownInfo> filter_push_down_info;
5284  for (const auto& req : filter_push_down_requests) {
5285  TFilterPushDownInfo filter_push_down_info_for_request;
5286  filter_push_down_info_for_request.input_prev = req.input_prev;
5287  filter_push_down_info_for_request.input_start = req.input_start;
5288  filter_push_down_info_for_request.input_next = req.input_next;
5289  filter_push_down_info.push_back(filter_push_down_info_for_request);
5290  }
5291  // deriving the new relational algebra plan with respect to the pushed down filters
5292  _return.execution_time_ms += measure<>::execution([&]() {
5293  query_ra = parse_to_ra(query_state_proxy,
5294  query_state_proxy.getQueryState().getQueryStr(),
5295  filter_push_down_info,
5296  boost::none,
5298  });
5299 
5300  if (just_calcite_explain) {
5301  // return the new ra as the result
5302  convert_explain(_return, ResultSet(query_ra), true);
5303  return;
5304  }
5305 
5306  // execute the new relational algebra plan:
5307  execute_rel_alg(_return,
5308  query_state_proxy,
5309  query_ra,
5310  column_format,
5311  executor_device_type,
5312  first_n,
5313  at_most_n,
5314  just_explain,
5315  /*just_validate = */ false,
5316  /*find_push_down_candidates = */ false,
5317  /*just_calcite_explain = */ false,
5318  /*TODO: explain optimized*/ false);
5319 }
5320 
5322  Parser::CopyTableStmt* copy_stmt,
5323  const Catalog_Namespace::SessionInfo& session_info) {
5324  auto importer_factory = [&session_info, this](
5325  const Catalog& catalog,
5326  const TableDescriptor* td,
5327  const std::string& file_path,
5328  const Importer_NS::CopyParams& copy_params) {
5329  return boost::make_unique<Importer_NS::Importer>(
5330  new DistributedLoader(session_info, td, &leaf_aggregator_),
5331  file_path,
5332  copy_params);
5333  };
5334  copy_stmt->execute(session_info, importer_factory);
5335 }
5336 
5338  QueryStateProxy query_state_proxy,
5339  const std::string& query_str,
5340  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
5341  OptionalTableMap tableNames,
5342  const MapDParameters mapd_parameters,
5343  RenderInfo* render_info) {
5344  query_state::Timer timer = query_state_proxy.createTimer(__func__);
5345  ParserWrapper pw{query_str};
5346  const std::string actual_query{pw.isSelectExplain() ? pw.actual_query : query_str};
5347  if (pw.isCalcitePathPermissable()) {
5348  TPlanResult result;
5349  auto session_cleanup_handler = [&](const auto& session_id) {
5350  removeInMemoryCalciteSession(session_id);
5351  };
5352  auto process_calcite_request = [&] {
5353  const auto& in_memory_session_id = createInMemoryCalciteSession(
5354  query_state_proxy.getQueryState().getConstSessionInfo()->get_catalog_ptr());
5355  try {
5356  result = calcite_->process(timer.createQueryStateProxy(),
5357  legacy_syntax_ ? pg_shim(actual_query) : actual_query,
5358  filter_push_down_info,
5360  pw.isCalciteExplain(),
5361  mapd_parameters.enable_calcite_view_optimize,
5362  in_memory_session_id);
5363  session_cleanup_handler(in_memory_session_id);
5364  } catch (std::exception&) {
5365  session_cleanup_handler(in_memory_session_id);
5366  throw;
5367  }
5368  };
5369  process_calcite_request();
5370  if (tableNames) {
5371  for (const auto& table : result.resolved_accessed_objects.tables_selected_from) {
5372  (tableNames.value())[table] = false;
5373  }
5374  for (const auto& tables :
5375  std::vector<decltype(result.resolved_accessed_objects.tables_inserted_into)>{
5376  result.resolved_accessed_objects.tables_inserted_into,
5377  result.resolved_accessed_objects.tables_updated_in,
5378  result.resolved_accessed_objects.tables_deleted_from}) {
5379  for (const auto& table : tables) {
5380  (tableNames.value())[table] = true;
5381  }
5382  }
5383  }
5384  if (render_info) {
5385  // grabs all the selected-from tables, even views. This is used by the renderer to
5386  // resolve view hit-testing.
5387  // NOTE: the same table name could exist in both the primary and resolved tables.
5388  auto selected_tables = &result.primary_accessed_objects.tables_selected_from;
5389  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5390  selected_tables = &result.resolved_accessed_objects.tables_selected_from;
5391  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5392  }
5393  return result.plan_result;
5394  }
5395  return "";
5396 }
5397 
5398 void MapDHandler::check_table_consistency(TTableMeta& _return,
5399  const TSessionId& session,
5400  const int32_t table_id) {
5401  auto stdlog = STDLOG(get_session_ptr(session));
5402  if (!leaf_handler_) {
5403  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5404  }
5405  try {
5406  leaf_handler_->check_table_consistency(_return, session, table_id);
5407  } catch (std::exception& e) {
5408  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5409  }
5410 }
5411 
5412 void MapDHandler::start_query(TPendingQuery& _return,
5413  const TSessionId& session,
5414  const std::string& query_ra,
5415  const bool just_explain) {
5416  auto stdlog = STDLOG(get_session_ptr(session));
5417  auto session_ptr = stdlog.getConstSessionInfo();
5418  if (!leaf_handler_) {
5419  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5420  }
5421  LOG(INFO) << "start_query :" << *session_ptr << " :" << just_explain;
5422  auto time_ms = measure<>::execution([&]() {
5423  try {
5424  leaf_handler_->start_query(_return, session, query_ra, just_explain);
5425  } catch (std::exception& e) {
5426  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5427  }
5428  });
5429  LOG(INFO) << "start_query-COMPLETED " << time_ms << "ms "
5430  << "id is " << _return.id;
5431 }
5432 
5433 void MapDHandler::execute_query_step(TStepResult& _return,
5434  const TPendingQuery& pending_query) {
5435  if (!leaf_handler_) {
5436  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5437  }
5438  LOG(INFO) << "execute_query_step : id:" << pending_query.id;
5439  auto time_ms = measure<>::execution([&]() {
5440  try {
5441  leaf_handler_->execute_query_step(_return, pending_query);
5442  } catch (std::exception& e) {
5443  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5444  }
5445  });
5446  LOG(INFO) << "execute_query_step-COMPLETED " << time_ms << "ms";
5447 }
5448 
5449 void MapDHandler::broadcast_serialized_rows(const TSerializedRows& serialized_rows,
5450  const TRowDescriptor& row_desc,
5451  const TQueryId query_id) {
5452  if (!leaf_handler_) {
5453  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5454  }
5455  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS id:" << query_id;
5456  auto time_ms = measure<>::execution([&]() {
5457  try {
5458  leaf_handler_->broadcast_serialized_rows(serialized_rows, row_desc, query_id);
5459  } catch (std::exception& e) {
5460  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5461  }
5462  });
5463  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS COMPLETED " << time_ms << "ms";
5464 }
5465 
5466 void MapDHandler::insert_data(const TSessionId& session,
5467  const TInsertData& thrift_insert_data) {
5468  auto stdlog = STDLOG(get_session_ptr(session));
5469  auto session_ptr = stdlog.getConstSessionInfo();
5470  CHECK_EQ(thrift_insert_data.column_ids.size(), thrift_insert_data.data.size());
5471  auto const& cat = session_ptr->getCatalog();
5473  insert_data.databaseId = thrift_insert_data.db_id;
5474  insert_data.tableId = thrift_insert_data.table_id;
5475  insert_data.columnIds = thrift_insert_data.column_ids;
5476  std::vector<std::unique_ptr<std::vector<std::string>>> none_encoded_string_columns;
5477  std::vector<std::unique_ptr<std::vector<ArrayDatum>>> array_columns;
5478  for (size_t col_idx = 0; col_idx < insert_data.columnIds.size(); ++col_idx) {
5479  const int column_id = insert_data.columnIds[col_idx];
5480  DataBlockPtr p;
5481  const auto cd = cat.getMetadataForColumn(insert_data.tableId, column_id);
5482  CHECK(cd);
5483  const auto& ti = cd->columnType;
5484  if (ti.is_number() || ti.is_time() || ti.is_boolean()) {
5485  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5486  } else if (ti.is_string()) {
5487  if (ti.get_compression() == kENCODING_DICT) {
5488  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5489  } else {
5490  CHECK_EQ(kENCODING_NONE, ti.get_compression());
5491  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5492  auto& none_encoded_strings = none_encoded_string_columns.back();
5493  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5494  thrift_insert_data.data[col_idx].var_len_data.size());
5495  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5496  none_encoded_strings->push_back(varlen_str.payload);
5497  }
5498  p.stringsPtr = none_encoded_strings.get();
5499  }
5500  } else if (ti.is_geometry()) {
5501  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5502  auto& none_encoded_strings = none_encoded_string_columns.back();
5503  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5504  thrift_insert_data.data[col_idx].var_len_data.size());
5505  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5506  none_encoded_strings->push_back(varlen_str.payload);
5507  }
5508  p.stringsPtr = none_encoded_strings.get();
5509  } else {
5510  CHECK(ti.is_array());
5511  array_columns.emplace_back(new std::vector<ArrayDatum>());
5512  auto& array_column = array_columns.back();
5513  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5514  thrift_insert_data.data[col_idx].var_len_data.size());
5515  for (const auto& t_arr_datum : thrift_insert_data.data[col_idx].var_len_data) {
5516  if (t_arr_datum.is_null) {
5517  if (ti.get_size() > 0 && !ti.get_elem_type().is_string()) {
5518  array_column->push_back(Importer_NS::ImporterUtils::composeNullArray(ti));
5519  } else {
5520  array_column->emplace_back(0, nullptr, true);
5521  }
5522  } else {
5523  ArrayDatum arr_datum;
5524  arr_datum.length = t_arr_datum.payload.size();
5525  int8_t* ptr = (int8_t*)(t_arr_datum.payload.data());
5526  arr_datum.pointer = ptr;
5527  // In this special case, ArrayDatum does not handle freeing the underlying
5528  // memory
5529  arr_datum.data_ptr = std::shared_ptr<int8_t>(ptr, [](auto p) {});
5530  arr_datum.is_null = false;
5531  array_column->push_back(arr_datum);
5532  }
5533  }
5534  p.arraysPtr = array_column.get();
5535  }
5536  insert_data.data.push_back(p);
5537  }
5538  insert_data.numRows = thrift_insert_data.num_rows;
5539  const auto td = cat.getMetadataForTable(insert_data.tableId);
5540  try {
5541  // this should have the same lock seq as COPY FROM
5542  ChunkKey chunkKey = {insert_data.databaseId, insert_data.tableId};
5543  mapd_unique_lock<mapd_shared_mutex> tableLevelWriteLock(
5546  // [ TableWriteLocks ] lock is deferred in
5547  // InsertOrderFragmenter::deleteFragments
5548  td->fragmenter->insertDataNoCheckpoint(insert_data);
5549  } catch (const std::exception& e) {
5550  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5551  }
5552 }
5553 
5554 void MapDHandler::start_render_query(TPendingRenderQuery& _return,
5555  const TSessionId& session,
5556  const int64_t widget_id,
5557  const int16_t node_idx,
5558  const std::string& vega_json) {
5559  auto stdlog = STDLOG(get_session_ptr(session));
5560  auto session_ptr = stdlog.getConstSessionInfo();
5561  if (!render_handler_) {
5562  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5563  }
5564  LOG(INFO) << "start_render_query :" << *session_ptr << " :widget_id:" << widget_id
5565  << ":vega_json:" << vega_json;
5566  auto time_ms = measure<>::execution([&]() {
5567  try {
5568  render_handler_->start_render_query(
5569  _return, session, widget_id, node_idx, vega_json);
5570  } catch (std::exception& e) {
5571  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5572  }
5573  });
5574  LOG(INFO) << "start_render_query-COMPLETED " << time_ms << "ms "
5575  << "id is " << _return.id;
5576 }
5577 
5578 void MapDHandler::execute_next_render_step(TRenderStepResult& _return,
5579  const TPendingRenderQuery& pending_render,
5580  const TRenderAggDataMap& merged_data) {
5581  if (!render_handler_) {
5582  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5583  }
5584 
5585  LOG(INFO) << "execute_next_render_step: id:" << pending_render.id;
5586  auto time_ms = measure<>::execution([&]() {
5587  try {
5588  render_handler_->execute_next_render_step(_return, pending_render, merged_data);
5589  } catch (std::exception& e) {
5590  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5591  }
5592  });
5593  LOG(INFO) << "execute_next_render_step-COMPLETED id: " << pending_render.id
5594  << ", time: " << time_ms << "ms ";
5595 }
5596 
5597 void MapDHandler::checkpoint(const TSessionId& session,
5598  const int32_t db_id,
5599  const int32_t table_id) {
5600  auto stdlog = STDLOG(get_session_ptr(session));
5601  auto session_ptr = stdlog.getConstSessionInfo();
5602  auto& cat = session_ptr->getCatalog();
5603  cat.getDataMgr().checkpoint(db_id, table_id);
5604 }
5605 
5606 // check and reset epoch if a request has been made
5607 void MapDHandler::set_table_epoch(const TSessionId& session,
5608  const int db_id,
5609  const int table_id,
5610  const int new_epoch) {
5611  auto stdlog = STDLOG(get_session_ptr(session));
5612  auto session_ptr = stdlog.getConstSessionInfo();
5613  if (!session_ptr->get_currentUser().isSuper) {
5614  throw std::runtime_error("Only superuser can set_table_epoch");
5615  }
5616  auto& cat = session_ptr->getCatalog();
5617 
5618  if (leaf_aggregator_.leafCount() > 0) {
5619  return leaf_aggregator_.set_table_epochLeaf(*session_ptr, db_id, table_id, new_epoch);
5620  }
5621  cat.setTableEpoch(db_id, table_id, new_epoch);
5622 }
5623 
5624 // check and reset epoch if a request has been made
5625 void MapDHandler::set_table_epoch_by_name(const TSessionId& session,
5626  const std::string& table_name,
5627  const int new_epoch) {
5628  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
5629  auto session_ptr = stdlog.getConstSessionInfo();
5630  if (!session_ptr->get_currentUser().isSuper) {
5631  throw std::runtime_error("Only superuser can set_table_epoch");
5632  }
5633  auto& cat = session_ptr->getCatalog();
5634  auto td = cat.getMetadataForTable(
5635  table_name,
5636  false); // don't populate fragmenter on this call since we only want metadata
5637  int32_t db_id = cat.getCurrentDB().dbId;
5638  if (leaf_aggregator_.leafCount() > 0) {
5640  *session_ptr, db_id, td->tableId, new_epoch);
5641  }
5642  cat.setTableEpoch(db_id, td->tableId, new_epoch);
5643 }
5644 
5645 int32_t MapDHandler::get_table_epoch(const TSessionId& session,
5646  const int32_t db_id,
5647  const int32_t table_id) {
5648  auto stdlog = STDLOG(get_session_ptr(session));
5649  auto session_ptr = stdlog.getConstSessionInfo();
5650  auto const& cat = session_ptr->getCatalog();
5651 
5652  if (leaf_aggregator_.leafCount() > 0) {
5653  return leaf_aggregator_.get_table_epochLeaf(*session_ptr, db_id, table_id);
5654  }
5655  return cat.getTableEpoch(db_id, table_id);
5656 }
5657 
5658 int32_t MapDHandler::get_table_epoch_by_name(const TSessionId& session,
5659  const std::string& table_name) {
5660  auto stdlog = STDLOG(