OmniSciDB  29e35f4d58
MapDHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: MapDHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "MapDHandler.h"
25 #include "DistributedLoader.h"
26 #include "MapDServer.h"
28 #include "TokenCompletionHints.h"
29 
30 #ifdef HAVE_PROFILER
31 #include <gperftools/heap-profiler.h>
32 #endif // HAVE_PROFILER
33 
34 #include "MapDRelease.h"
35 
36 #include "Calcite/Calcite.h"
37 #include "gen-cpp/CalciteServer.h"
38 
40 
41 #include "Catalog/Catalog.h"
43 #include "Import/Importer.h"
44 #include "LockMgr/TableLockMgr.h"
45 #include "MapDDistributedHandler.h"
46 #include "Parser/ParserWrapper.h"
48 #include "Parser/parser.h"
49 #include "Planner/Planner.h"
52 #include "QueryEngine/Execute.h"
61 #include "Shared/StringTransform.h"
62 #include "Shared/SysInfo.h"
63 #include "Shared/geo_types.h"
64 #include "Shared/geosupport.h"
65 #include "Shared/import_helpers.h"
67 #include "Shared/measure.h"
68 #include "Shared/scope.h"
69 
70 #include <fcntl.h>
71 #include <picosha2.h>
72 #include <sys/time.h>
73 #include <sys/types.h>
74 #include <sys/wait.h>
75 #include <unistd.h>
76 #include <algorithm>
77 #include <boost/algorithm/string.hpp>
78 #include <boost/filesystem.hpp>
79 #include <boost/make_shared.hpp>
80 #include <boost/process/search_path.hpp>
81 #include <boost/program_options.hpp>
82 #include <boost/regex.hpp>
83 #include <boost/tokenizer.hpp>
84 #include <cmath>
85 #include <csignal>
86 #include <fstream>
87 #include <future>
88 #include <map>
89 #include <memory>
90 #include <random>
91 #include <regex>
92 #include <string>
93 #include <thread>
94 #include <typeinfo>
95 
96 #include <arrow/api.h>
97 #include <arrow/io/api.h>
98 #include <arrow/ipc/api.h>
99 
100 #include "QueryEngine/ArrowUtil.h"
101 
102 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
103 
106 using namespace Lock_Namespace;
107 
108 #define INVALID_SESSION_ID ""
109 
110 #define THROW_MAPD_EXCEPTION(errstr) \
111  { \
112  TMapDException ex; \
113  ex.error_msg = errstr; \
114  LOG(ERROR) << ex.error_msg; \
115  throw ex; \
116  }
117 
118 namespace {
119 
120 SessionMap::iterator get_session_from_map(const TSessionId& session,
121  SessionMap& session_map) {
122  auto session_it = session_map.find(session);
123  if (session_it == session_map.end()) {
124  THROW_MAPD_EXCEPTION("Session not valid.");
125  }
126  return session_it;
127 }
128 
129 struct ForceDisconnect : public std::runtime_error {
130  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
131 };
132 
133 } // namespace
134 
135 template <>
137  const TSessionId& session,
138  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
139  auto session_it = get_session_from_map(session, sessions_);
140  try {
141  check_session_exp_unsafe(session_it);
142  } catch (const ForceDisconnect& e) {
143  read_lock.unlock();
144  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
145  auto session_it2 = get_session_from_map(session, sessions_);
146  disconnect_impl(session_it2);
147  THROW_MAPD_EXCEPTION(e.what());
148  }
149  return session_it;
150 }
151 
152 template <>
154  const TSessionId& session,
155  mapd_lock_guard<mapd_shared_mutex>& write_lock) {
156  auto session_it = get_session_from_map(session, sessions_);
157  try {
158  check_session_exp_unsafe(session_it);
159  } catch (const ForceDisconnect& e) {
160  disconnect_impl(session_it);
161  THROW_MAPD_EXCEPTION(e.what());
162  }
163  return session_it;
164 }
165 
166 MapDHandler::MapDHandler(const std::vector<LeafHostInfo>& db_leaves,
167  const std::vector<LeafHostInfo>& string_leaves,
168  const std::string& base_data_path,
169  const bool cpu_only,
170  const bool allow_multifrag,
171  const bool jit_debug,
172  const bool intel_jit_profile,
173  const bool read_only,
174  const bool allow_loop_joins,
175  const bool enable_rendering,
176  const bool enable_auto_clear_render_mem,
177  const int render_oom_retry_threshold,
178  const size_t render_mem_bytes,
179  const int num_gpus,
180  const int start_gpu,
181  const size_t reserved_gpu_mem,
182  const size_t num_reader_threads,
183  const AuthMetadata& authMetadata,
184  const MapDParameters& mapd_parameters,
185  const bool legacy_syntax,
186  const int idle_session_duration,
187  const int max_session_duration,
188  const bool enable_runtime_udf_registration,
189  const std::string& udf_filename)
190  : leaf_aggregator_(db_leaves)
191  , string_leaves_(string_leaves)
192  , base_data_path_(base_data_path)
193  , random_gen_(std::random_device{}())
194  , session_id_dist_(0, INT32_MAX)
195  , jit_debug_(jit_debug)
196  , intel_jit_profile_(intel_jit_profile)
197  , allow_multifrag_(allow_multifrag)
198  , read_only_(read_only)
199  , allow_loop_joins_(allow_loop_joins)
200  , authMetadata_(authMetadata)
201  , mapd_parameters_(mapd_parameters)
202  , legacy_syntax_(legacy_syntax)
203  , super_user_rights_(false)
204  , idle_session_duration_(idle_session_duration * 60)
205  , max_session_duration_(max_session_duration * 60)
206  , runtime_udf_registration_enabled_(enable_runtime_udf_registration)
207  , _was_geo_copy_from(false) {
208  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
209  bool is_rendering_enabled = enable_rendering;
210 
211  try {
212  if (cpu_only) {
213  is_rendering_enabled = false;
215  cpu_mode_only_ = true;
216  } else {
217 #ifdef HAVE_CUDA
219  cpu_mode_only_ = false;
220 #else
222  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
223  cpu_mode_only_ = true;
224  is_rendering_enabled = false;
225 #endif
226  }
227  } catch (const std::exception& e) {
228  LOG(FATAL) << "Failed to executor device type: " << e.what();
229  }
230 
231  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
232  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
233  // manage cannot ask for
234  size_t total_reserved = reserved_gpu_mem;
235  if (is_rendering_enabled) {
236  total_reserved += render_mem_bytes;
237  }
238 
239  try {
240  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
241  mapd_parameters,
243  num_gpus,
244  start_gpu,
245  total_reserved,
246  num_reader_threads));
247  } catch (const std::exception& e) {
248  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
249  }
250 
251  std::string udf_ast_filename("");
252 
253  try {
254  if (!udf_filename.empty()) {
255  UdfCompiler compiler(udf_filename);
256  int compile_result = compiler.compileUdf();
257 
258  if (compile_result == 0) {
259  udf_ast_filename = compiler.getAstFileName();
260  }
261  }
262  } catch (const std::exception& e) {
263  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
264  }
265 
266  try {
267  calcite_ =
268  std::make_shared<Calcite>(mapd_parameters, base_data_path_, udf_ast_filename);
269  } catch (const std::exception& e) {
270  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
271  }
272 
273  try {
274  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
275  if (!udf_filename.empty()) {
276  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
277  }
278  } catch (const std::exception& e) {
279  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
280  }
281 
282  try {
284  } catch (const std::exception& e) {
285  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
286  }
287 
288  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
290  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
291  cpu_mode_only_ = true;
292  }
293 
294  switch (executor_device_type_) {
296  LOG(INFO) << "Started in GPU mode" << std::endl;
297  break;
299  LOG(INFO) << "Started in CPU mode" << std::endl;
300  break;
301  }
302 
303  try {
304  SysCatalog::instance().init(base_data_path_,
305  data_mgr_,
306  authMetadata,
307  calcite_,
308  false,
309  !db_leaves.empty(),
311  } catch (const std::exception& e) {
312  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
313  }
314 
315  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
316  start_time_ = std::time(nullptr);
317 
318  if (is_rendering_enabled) {
319  try {
320  render_handler_.reset(
321  new MapDRenderHandler(this, render_mem_bytes, 0u, false, 0, mapd_parameters_));
322  } catch (const std::exception& e) {
323  LOG(ERROR) << "Backend rendering disabled: " << e.what();
324  }
325  }
326 
327  if (leaf_aggregator_.leafCount() > 0) {
328  try {
329  agg_handler_.reset(new MapDAggHandler(this));
330  } catch (const std::exception& e) {
331  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
332  }
333  } else if (g_cluster) {
334  try {
335  leaf_handler_.reset(new MapDLeafHandler(this));
336  } catch (const std::exception& e) {
337  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
338  }
339  }
340 }
341 
343 
344 void MapDHandler::check_read_only(const std::string& str) {
346  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
347  }
348 }
349 
351  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
352  // We would create an in memory session for calcite with super user privileges which
353  // would be used for getting all tables metadata when a user runs the query. The
354  // session would be under the name of a proxy user/password which would only persist
355  // till server's lifetime or execution of calcite query(in memory) whichever is the
356  // earliest.
357  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
358  std::string session_id;
359  do {
360  session_id = generate_random_string(64);
361  } while (sessions_.find(session_id) != sessions_.end());
362  Catalog_Namespace::UserMetadata user_meta(-1,
363  calcite_->getInternalSessionProxyUserName(),
364  calcite_->getInternalSessionProxyPassword(),
365  true,
366  -1,
367  true);
368  const auto emplace_ret =
369  sessions_.emplace(session_id,
370  std::make_shared<Catalog_Namespace::SessionInfo>(
371  catalog_ptr, user_meta, executor_device_type_, session_id));
372  CHECK(emplace_ret.second);
373  return session_id;
374 }
375 
377  const Catalog_Namespace::UserMetadata user_meta) {
378  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
379  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
380  user_meta.isSuper.load();
381 }
382 
383 void MapDHandler::removeInMemoryCalciteSession(const std::string& session_id) {
384  // Remove InMemory calcite Session.
385  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
386  const auto it = sessions_.find(session_id);
387  CHECK(it != sessions_.end());
388  sessions_.erase(it);
389 }
390 // internal connection for connections with no password
391 void MapDHandler::internal_connect(TSessionId& session,
392  const std::string& username,
393  const std::string& dbname) {
394  auto stdlog = STDLOG(); // session_info set by connect_impl()
395  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
396  std::string username2 = username; // login() may reset username given as argument
397  std::string dbname2 = dbname; // login() may reset dbname given as argument
399  std::shared_ptr<Catalog> cat = nullptr;
400  try {
401  cat = SysCatalog::instance().login(
402  dbname2, username2, std::string(""), user_meta, false);
403  } catch (std::exception& e) {
404  THROW_MAPD_EXCEPTION(e.what());
405  }
406 
407  DBObject dbObject(dbname2, DatabaseDBObjectType);
408  dbObject.loadKey(*cat);
409  dbObject.setPrivileges(AccessPrivileges::ACCESS);
410  std::vector<DBObject> dbObjects;
411  dbObjects.push_back(dbObject);
412  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
413  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
414  " is not allowed to access database " + dbname2 + ".");
415  }
416  connect_impl(session, std::string(""), dbname2, user_meta, cat, stdlog);
417 }
418 
419 void MapDHandler::krb5_connect(TKrb5Session& session,
420  const std::string& inputToken,
421  const std::string& dbname) {
422  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
423 }
424 
425 void MapDHandler::connect(TSessionId& session,
426  const std::string& username,
427  const std::string& passwd,
428  const std::string& dbname) {
429  auto stdlog = STDLOG(); // session_info set by connect_impl()
430  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
431  std::string username2 = username; // login() may reset username given as argument
432  std::string dbname2 = dbname; // login() may reset dbname given as argument
434  std::shared_ptr<Catalog> cat = nullptr;
435  try {
436  cat = SysCatalog::instance().login(
437  dbname2, username2, passwd, user_meta, !super_user_rights_);
438  } catch (std::exception& e) {
439  THROW_MAPD_EXCEPTION(e.what());
440  }
441 
442  DBObject dbObject(dbname2, DatabaseDBObjectType);
443  dbObject.loadKey(*cat);
444  dbObject.setPrivileges(AccessPrivileges::ACCESS);
445  std::vector<DBObject> dbObjects;
446  dbObjects.push_back(dbObject);
447  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
448  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
449  " is not allowed to access database " + dbname2 + ".");
450  }
451  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
452  // if pki auth session will come back encrypted with user pubkey
453  SysCatalog::instance().check_for_session_encryption(passwd, session);
454 }
455 
456 void MapDHandler::connect_impl(TSessionId& session,
457  const std::string& passwd,
458  const std::string& dbname,
460  std::shared_ptr<Catalog> cat,
461  query_state::StdLog& stdlog) {
462  do {
463  session = generate_random_string(32);
464  } while (sessions_.find(session) != sessions_.end());
465  std::pair<SessionMap::iterator, bool> emplace_retval =
466  sessions_.emplace(session,
467  std::make_shared<Catalog_Namespace::SessionInfo>(
468  cat, user_meta, executor_device_type_, session));
469  CHECK(emplace_retval.second);
470  auto& session_ptr = emplace_retval.first->second;
471  stdlog.setSessionInfo(session_ptr);
472  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time while
473  // doing warmup
474  if (leaf_aggregator_.leafCount() > 0) {
475  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
476  return;
477  }
478  }
479  auto const roles = session_ptr->get_currentUser().isSuper
480  ? std::vector<std::string>{{"super"}}
481  : SysCatalog::instance().getRoles(
482  false, false, session_ptr->get_currentUser().userName);
483  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
484  LOG(INFO) << "User " << user_meta.userName << " connected to database " << dbname
485  << " with public_session_id " << session_ptr->get_public_session_id();
486 }
487 
488 void MapDHandler::disconnect(const TSessionId& session) {
489  auto stdlog = STDLOG();
490  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
491  auto session_it = get_session_it_unsafe(session, write_lock);
492  stdlog.setSessionInfo(session_it->second);
493  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
494  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
495  << " disconnected from database " << dbname << std::endl;
496  disconnect_impl(session_it);
497 }
498 
499 void MapDHandler::disconnect_impl(const SessionMap::iterator& session_it) {
500  // session_it existence should already have been checked (i.e. called via
501  // get_session_it_unsafe(...))
502  const auto session_id = session_it->second->get_session_id();
503  if (leaf_aggregator_.leafCount() > 0) {
504  leaf_aggregator_.disconnect(session_id);
505  }
506  if (render_handler_) {
507  render_handler_->disconnect(session_id);
508  }
509  sessions_.erase(session_it);
510 }
511 
512 void MapDHandler::switch_database(const TSessionId& session, const std::string& dbname) {
513  auto stdlog = STDLOG(get_session_ptr(session));
514  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
515  auto session_it = get_session_it_unsafe(session, write_lock);
516 
517  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
518 
519  try {
520  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
521  dbname2, session_it->second->get_currentUser().userName);
522  session_it->second->set_catalog_ptr(cat);
523  } catch (std::exception& e) {
524  THROW_MAPD_EXCEPTION(e.what());
525  }
526 }
527 
528 void MapDHandler::interrupt(const TSessionId& session) {
529  auto stdlog = STDLOG(get_session_ptr(session));
531  // Shared lock to allow simultaneous interrupts of multiple sessions
532  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
533  if (leaf_aggregator_.leafCount() > 0) {
534  leaf_aggregator_.interrupt(session);
535  }
536 
537  auto session_it = get_session_it_unsafe(session, read_lock);
538  auto& cat = session_it->second.get()->getCatalog();
539  const auto dbname = cat.getCurrentDB().dbName;
540  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
541  jit_debug_ ? "/tmp" : "",
542  jit_debug_ ? "mapdquery" : "",
544  CHECK(executor);
545 
546  VLOG(1) << "Received interrupt: "
547  << "Session " << *session_it->second << ", Executor " << executor
548  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
549  << session_it->second->get_currentUser().userName << ", Database " << dbname
550  << std::endl;
551 
552  executor->interrupt();
553 
554  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
555  << " interrupted session with database " << dbname << std::endl;
556  }
557 }
558 
559 void MapDHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
560  auto stdlog = STDLOG(get_session_ptr(session));
561  const auto rendering_enabled = bool(render_handler_);
562  _return.read_only = read_only_;
563  _return.version = MAPD_RELEASE;
564  _return.rendering_enabled = rendering_enabled;
565  _return.poly_rendering_enabled = rendering_enabled;
566  _return.start_time = start_time_;
567  _return.edition = MAPD_EDITION;
568  _return.host_name = get_hostname();
569 }
570 
571 void MapDHandler::get_status(std::vector<TServerStatus>& _return,
572  const TSessionId& session) {
573  auto stdlog = STDLOG(get_session_ptr(session));
574  const auto rendering_enabled = bool(render_handler_);
575  TServerStatus ret;
576  ret.read_only = read_only_;
577  ret.version = MAPD_RELEASE;
578  ret.rendering_enabled = rendering_enabled;
579  ret.poly_rendering_enabled = rendering_enabled;
580  ret.start_time = start_time_;
581  ret.edition = MAPD_EDITION;
582  ret.host_name = get_hostname();
583 
584  // TSercivePort tcp_port{}
585 
586  if (g_cluster) {
587  ret.role =
588  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
589  } else {
590  ret.role = TRole::type::SERVER;
591  }
592 
593  _return.push_back(ret);
594  if (leaf_aggregator_.leafCount() > 0) {
595  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
596  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
597  }
598 }
599 
600 void MapDHandler::get_hardware_info(TClusterHardwareInfo& _return,
601  const TSessionId& session) {
602  auto stdlog = STDLOG(get_session_ptr(session));
603  THardwareInfo ret;
604  const auto cuda_mgr = data_mgr_->getCudaMgr();
605  if (cuda_mgr) {
606  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
607  ret.start_gpu = cuda_mgr->getStartGpu();
608  if (ret.start_gpu >= 0) {
609  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
610  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
611  }
612  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
613  TGpuSpecification gpu_spec;
614  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
615  gpu_spec.num_sm = deviceProperties->numMPs;
616  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
617  gpu_spec.memory = deviceProperties->globalMem;
618  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
619  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
620  ret.gpu_info.push_back(gpu_spec);
621  }
622  }
623 
624  // start hardware/OS dependent code
625  ret.num_cpu_hw = std::thread::hardware_concurrency();
626  // ^ This might return diffrent results in case of hyper threading
627  // end hardware/OS dependent code
628 
629  _return.hardware_info.push_back(ret);
630  if (leaf_aggregator_.leafCount() > 0) {
631  ret.host_name = "aggregator";
632  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
633  _return.hardware_info.insert(_return.hardware_info.end(),
634  leaf_hardware.hardware_info.begin(),
635  leaf_hardware.hardware_info.end());
636  }
637 }
638 
639 void MapDHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
640  auto session_ptr = get_session_ptr(session);
641  auto stdlog = STDLOG(session_ptr);
642  auto user_metadata = session_ptr->get_currentUser();
643 
644  _return.user = user_metadata.userName;
645  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
646  _return.start_time = session_ptr->get_start_time();
647  _return.is_super = user_metadata.isSuper;
648 }
649 
651  const SQLTypeInfo& ti,
652  TColumn& column) {
653  if (ti.is_array()) {
654  TColumn tColumn;
655  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
656  CHECK(array_tv);
657  bool is_null = !array_tv->is_initialized();
658  if (!is_null) {
659  const auto& vec = array_tv->get();
660  for (const auto& elem_tv : vec) {
661  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
662  }
663  }
664  column.data.arr_col.push_back(tColumn);
665  column.nulls.push_back(is_null && !ti.get_notnull());
666  } else if (ti.is_geometry()) {
667  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
668  if (scalar_tv) {
669  auto s_n = boost::get<NullableString>(scalar_tv);
670  auto s = boost::get<std::string>(s_n);
671  if (s) {
672  column.data.str_col.push_back(*s);
673  } else {
674  column.data.str_col.emplace_back(""); // null string
675  auto null_p = boost::get<void*>(s_n);
676  CHECK(null_p && !*null_p);
677  }
678  column.nulls.push_back(!s && !ti.get_notnull());
679  } else {
680  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
681  CHECK(array_tv);
682  bool is_null = !array_tv->is_initialized();
683  if (!is_null) {
684  auto elem_type = SQLTypeInfo(kDOUBLE, false);
685  TColumn tColumn;
686  const auto& vec = array_tv->get();
687  for (const auto& elem_tv : vec) {
688  value_to_thrift_column(elem_tv, elem_type, tColumn);
689  }
690  column.data.arr_col.push_back(tColumn);
691  column.nulls.push_back(false);
692  } else {
693  TColumn tColumn;
694  column.data.arr_col.push_back(tColumn);
695  column.nulls.push_back(is_null && !ti.get_notnull());
696  }
697  }
698  } else {
699  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
700  CHECK(scalar_tv);
701  if (boost::get<int64_t>(scalar_tv)) {
702  int64_t data = *(boost::get<int64_t>(scalar_tv));
703 
704  if (is_member_of_typeset<kNUMERIC, kDECIMAL>(ti)) {
705  double val = static_cast<double>(data);
706  if (ti.get_scale() > 0) {
707  val /= pow(10.0, std::abs(ti.get_scale()));
708  }
709  column.data.real_col.push_back(val);
710  } else {
711  column.data.int_col.push_back(data);
712  }
713 
714  switch (ti.get_type()) {
715  case kBOOLEAN:
716  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
717  break;
718  case kTINYINT:
719  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
720  break;
721  case kSMALLINT:
722  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
723  break;
724  case kINT:
725  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
726  break;
727  case kNUMERIC:
728  case kDECIMAL:
729  case kBIGINT:
730  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
731  break;
732  case kTIME:
733  case kTIMESTAMP:
734  case kDATE:
735  case kINTERVAL_DAY_TIME:
737  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
738  break;
739  default:
740  column.nulls.push_back(false);
741  }
742  } else if (boost::get<double>(scalar_tv)) {
743  double data = *(boost::get<double>(scalar_tv));
744  column.data.real_col.push_back(data);
745  if (ti.get_type() == kFLOAT) {
746  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
747  } else {
748  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
749  }
750  } else if (boost::get<float>(scalar_tv)) {
751  CHECK_EQ(kFLOAT, ti.get_type());
752  float data = *(boost::get<float>(scalar_tv));
753  column.data.real_col.push_back(data);
754  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
755  } else if (boost::get<NullableString>(scalar_tv)) {
756  auto s_n = boost::get<NullableString>(scalar_tv);
757  auto s = boost::get<std::string>(s_n);
758  if (s) {
759  column.data.str_col.push_back(*s);
760  } else {
761  column.data.str_col.emplace_back(""); // null string
762  auto null_p = boost::get<void*>(s_n);
763  CHECK(null_p && !*null_p);
764  }
765  column.nulls.push_back(!s && !ti.get_notnull());
766  } else {
767  CHECK(false);
768  }
769  }
770 }
771 
773  TDatum datum;
774  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
775  if (!scalar_tv) {
776  CHECK(ti.is_array());
777  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
778  CHECK(array_tv);
779  if (array_tv->is_initialized()) {
780  const auto& vec = array_tv->get();
781  for (const auto& elem_tv : vec) {
782  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
783  datum.val.arr_val.push_back(scalar_col_val);
784  }
785  // Datum is not null, at worst it's an empty array Datum
786  datum.is_null = false;
787  } else {
788  datum.is_null = true;
789  }
790  return datum;
791  }
792  if (boost::get<int64_t>(scalar_tv)) {
793  int64_t data = *(boost::get<int64_t>(scalar_tv));
794 
795  if (is_member_of_typeset<kNUMERIC, kDECIMAL>(ti)) {
796  double val = static_cast<double>(data);
797  if (ti.get_scale() > 0) {
798  val /= pow(10.0, std::abs(ti.get_scale()));
799  }
800  datum.val.real_val = val;
801  } else {
802  datum.val.int_val = data;
803  }
804 
805  switch (ti.get_type()) {
806  case kBOOLEAN:
807  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
808  break;
809  case kTINYINT:
810  datum.is_null = (datum.val.int_val == NULL_TINYINT);
811  break;
812  case kSMALLINT:
813  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
814  break;
815  case kINT:
816  datum.is_null = (datum.val.int_val == NULL_INT);
817  break;
818  case kDECIMAL:
819  case kNUMERIC:
820  case kBIGINT:
821  datum.is_null = (datum.val.int_val == NULL_BIGINT);
822  break;
823  case kTIME:
824  case kTIMESTAMP:
825  case kDATE:
826  case kINTERVAL_DAY_TIME:
828  datum.is_null = (datum.val.int_val == NULL_BIGINT);
829  break;
830  default:
831  datum.is_null = false;
832  }
833  } else if (boost::get<double>(scalar_tv)) {
834  datum.val.real_val = *(boost::get<double>(scalar_tv));
835  if (ti.get_type() == kFLOAT) {
836  datum.is_null = (datum.val.real_val == NULL_FLOAT);
837  } else {
838  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
839  }
840  } else if (boost::get<float>(scalar_tv)) {
841  CHECK_EQ(kFLOAT, ti.get_type());
842  datum.val.real_val = *(boost::get<float>(scalar_tv));
843  datum.is_null = (datum.val.real_val == NULL_FLOAT);
844  } else if (boost::get<NullableString>(scalar_tv)) {
845  auto s_n = boost::get<NullableString>(scalar_tv);
846  auto s = boost::get<std::string>(s_n);
847  if (s) {
848  datum.val.str_val = *s;
849  } else {
850  auto null_p = boost::get<void*>(s_n);
851  CHECK(null_p && !*null_p);
852  }
853  datum.is_null = !s;
854  } else {
855  CHECK(false);
856  }
857  return datum;
858 }
859 
860 void MapDHandler::sql_execute(TQueryResult& _return,
861  const TSessionId& session,
862  const std::string& query_str,
863  const bool column_format,
864  const std::string& nonce,
865  const int32_t first_n,
866  const int32_t at_most_n) {
867  auto session_ptr = get_session_ptr(session);
868  auto query_state = create_query_state(session_ptr, query_str);
869  auto stdlog = STDLOG(query_state);
870 
871  ScopeGuard reset_was_geo_copy_from = [&] { _was_geo_copy_from = false; };
872 
873  if (first_n >= 0 && at_most_n >= 0) {
874  THROW_MAPD_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
875  }
876 
877  if (leaf_aggregator_.leafCount() > 0) {
878  if (!agg_handler_) {
879  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
880  }
881  _return.total_time_ms = measure<>::execution([&]() {
882  try {
883  agg_handler_->cluster_execute(_return,
884  query_state->createQueryStateProxy(),
885  query_state->getQueryStr(),
886  column_format,
887  nonce,
888  first_n,
889  at_most_n,
891  } catch (std::exception& e) {
892  const auto mapd_exception = dynamic_cast<const TMapDException*>(&e);
893  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
895  thrift_exception ? std::string(thrift_exception->what())
896  : mapd_exception ? mapd_exception->error_msg
897  : std::string("Exception: ") + e.what());
898  }
899  _return.nonce = nonce;
900  });
901  } else {
902  _return.total_time_ms = measure<>::execution([&]() {
904  query_state->createQueryStateProxy(),
905  column_format,
906  nonce,
907  session_ptr->get_executor_device_type(),
908  first_n,
909  at_most_n);
910  });
911  }
912 
913  // if the SQL statement we just executed was a geo COPY FROM, the import
914  // parameters were captured, and this flag set, so we do the actual import here
915  if (_was_geo_copy_from) {
916  // import_geo_table() calls create_table() which calls this function to
917  // do the work, so reset the flag now to avoid executing this part a
918  // second time at the end of that, which would fail as the table was
919  // already created! Also reset the flag with a ScopeGuard on exiting
920  // this function any other way, such as an exception from the code above!
921  _was_geo_copy_from = false;
922 
923  // create table as replicated?
924  TCreateParams create_params;
925  if (_geo_copy_from_partitions == "REPLICATED") {
926  create_params.is_replicated = true;
927  }
928 
929  // now do (and time) the import
930  _return.total_time_ms = measure<>::execution([&]() {
931  import_geo_table(session,
935  TRowDescriptor(),
936  create_params);
937  });
938  }
939  stdlog.appendNameValuePairs("execution_time_ms",
940  _return.execution_time_ms,
941  "total_time_ms", // BE-3420 - Redundant with duration field
942  stdlog.duration<std::chrono::milliseconds>());
943 }
944 
945 void MapDHandler::sql_execute_df(TDataFrame& _return,
946  const TSessionId& session,
947  const std::string& query_str,
948  const TDeviceType::type device_type,
949  const int32_t device_id,
950  const int32_t first_n) {
951  auto session_ptr = get_session_ptr(session);
952  auto query_state = create_query_state(session_ptr, query_str);
953  auto stdlog = STDLOG(session_ptr, query_state);
954 
955  if (device_type == TDeviceType::GPU) {
956  const auto executor_device_type = session_ptr->get_executor_device_type();
957  if (executor_device_type != ExecutorDeviceType::GPU) {
959  std::string("Exception: GPU mode is not allowed in this session"));
960  }
961  if (!data_mgr_->gpusPresent()) {
962  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
963  }
964  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
966  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
967  }
968  }
969  _return.execution_time_ms = 0;
970 
971  SQLParser parser;
972  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
973  std::string last_parsed;
974  try {
975  ParserWrapper pw{query_str};
976  if (!pw.is_ddl && !pw.is_update_dml &&
977  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
978  std::string query_ra;
979  TableMap table_map;
980  OptionalTableMap tableNames(table_map);
981  _return.execution_time_ms += measure<>::execution([&]() {
982  query_ra = parse_to_ra(query_state->createQueryStateProxy(),
983  query_str,
984  {},
985  tableNames,
987  });
988 
989  // COPY_TO/SELECT: get read ExecutorOuterLock >> TableReadLock for each table (note
990  // for update/delete this would be a table write lock)
991  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
993  std::vector<Lock_Namespace::TableLock> table_locks;
995  session_ptr->getCatalog(), tableNames.value(), table_locks);
996 
997  if (pw.isCalciteExplain()) {
998  throw std::runtime_error("explain is not unsupported by current thrift API");
999  }
1000  execute_rel_alg_df(_return,
1001  query_ra,
1002  query_state->createQueryStateProxy(),
1003  *session_ptr,
1004  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
1006  static_cast<size_t>(device_id),
1007  first_n);
1008  if (!_return.sm_size) {
1009  throw std::runtime_error("schema is missing in returned result");
1010  }
1011  return;
1012  }
1013  LOG(INFO) << "passing query to legacy processor";
1014  } catch (std::exception& e) {
1015  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1016  }
1018  "Exception: DDL or update DML are not unsupported by current thrift API");
1019 }
1020 
1021 void MapDHandler::sql_execute_gdf(TDataFrame& _return,
1022  const TSessionId& session,
1023  const std::string& query_str,
1024  const int32_t device_id,
1025  const int32_t first_n) {
1026  auto stdlog = STDLOG(get_session_ptr(session));
1027  sql_execute_df(_return, session, query_str, TDeviceType::GPU, device_id, first_n);
1028 }
1029 
1030 // For now we have only one user of a data frame in all cases.
1031 void MapDHandler::deallocate_df(const TSessionId& session,
1032  const TDataFrame& df,
1033  const TDeviceType::type device_type,
1034  const int32_t device_id) {
1035  auto stdlog = STDLOG(get_session_ptr(session));
1036  int8_t* dev_ptr{0};
1037  if (device_type == TDeviceType::GPU) {
1038  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1039  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1040  TMapDException ex;
1041  ex.error_msg = std::string(
1042  "Exception: current data frame handle is not bookkept or been inserted twice");
1043  LOG(ERROR) << ex.error_msg;
1044  throw ex;
1045  }
1046  dev_ptr = ipc_handle_to_dev_ptr_[df.df_handle];
1047  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1048  }
1049  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1050  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1051  ArrowResult result{sm_handle, df.sm_size, df_handle, df.df_size, dev_ptr};
1053  result,
1054  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1055  device_id,
1056  data_mgr_);
1057 }
1058 
1059 std::string MapDHandler::apply_copy_to_shim(const std::string& query_str) {
1060  auto result = query_str;
1061  {
1062  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
1063  // boost::regex::icase};
1064  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
1065  boost::regex::extended | boost::regex::icase};
1066  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
1067  result.replace(
1068  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
1069  });
1070  }
1071  return result;
1072 }
1073 
1074 void MapDHandler::sql_validate(TTableDescriptor& _return,
1075  const TSessionId& session,
1076  const std::string& query_str) {
1077  auto stdlog = STDLOG(get_session_ptr(session));
1078  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1079  stdlog.setQueryState(query_state);
1080 
1081  ParserWrapper pw{query_str};
1082  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1083  pw.is_update_dml) {
1084  THROW_MAPD_EXCEPTION("Can only validate SELECT statements.");
1085  }
1086  MapDHandler::validate_rel_alg(_return, query_state->createQueryStateProxy());
1087 }
1088 
1089 namespace {
1090 
1092  std::unordered_set<std::string> uc_column_names;
1093  std::unordered_set<std::string> uc_column_table_qualifiers;
1094 };
1095 
1096 // Extract what looks like a (qualified) identifier from the partial query.
1097 // The results will be used to rank the auto-completion results: tables which
1098 // contain at least one of the identifiers first.
1100  const std::string& sql) {
1101  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1102  boost::regex::extended | boost::regex::icase};
1103  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1104  boost::sregex_token_iterator end;
1105  std::unordered_set<std::string> uc_column_names;
1106  std::unordered_set<std::string> uc_column_table_qualifiers;
1107  for (; tok_it != end; ++tok_it) {
1108  std::string column_name = *tok_it;
1109  std::vector<std::string> column_tokens;
1110  boost::split(column_tokens, column_name, boost::is_any_of("."));
1111  if (column_tokens.size() == 2) {
1112  // If the column name is qualified, take user's word.
1113  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1114  } else {
1115  uc_column_names.insert(to_upper(column_name));
1116  }
1117  }
1118  return {uc_column_names, uc_column_table_qualifiers};
1119 }
1120 
1121 } // namespace
1122 
1123 void MapDHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1124  const TSessionId& session,
1125  const std::string& sql,
1126  const int cursor) {
1127  auto stdlog = STDLOG(get_session_ptr(session));
1128  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1129  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1130  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1131  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1132  proj_tokens.uc_column_names, visible_tables, stdlog);
1133  // Add the table qualifiers explicitly specified by the user.
1134  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1135  proj_tokens.uc_column_table_qualifiers.end());
1136  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1137  std::sort(
1138  hints.begin(),
1139  hints.end(),
1140  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1141  if (lhs.type == TCompletionHintType::TABLE &&
1142  rhs.type == TCompletionHintType::TABLE) {
1143  // Between two tables, one which is compatible with the specified projections
1144  // and one which isn't, pick the one which is compatible.
1145  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1146  compatible_table_names.end() &&
1147  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1148  compatible_table_names.end()) {
1149  return true;
1150  }
1151  }
1152  return lhs.type < rhs.type;
1153  });
1154 }
1155 
1156 void MapDHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1157  std::vector<std::string>& visible_tables,
1158  query_state::StdLog& stdlog,
1159  const std::string& sql,
1160  const int cursor) {
1161  const auto& session_info = *stdlog.getConstSessionInfo();
1162  try {
1163  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1164  // Filter out keywords suggested by Calcite which we don't support.
1166  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1167  } catch (const std::exception& e) {
1168  TMapDException ex;
1169  ex.error_msg = "Exception: " + std::string(e.what());
1170  LOG(ERROR) << ex.error_msg;
1171  throw ex;
1172  }
1173  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1174  const size_t length_to_cursor =
1175  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1176  // Trust hints from Calcite after the FROM keyword.
1177  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1178  return;
1179  }
1180  // Before FROM, the query is too incomplete for context-sensitive completions.
1181  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1182 }
1183 
1184 void MapDHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1185  query_state::StdLog& stdlog,
1186  std::vector<std::string>& visible_tables,
1187  const std::string& sql,
1188  const int cursor) {
1189  const auto last_word =
1190  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1191  boost::regex select_expr{R"(\s*select\s+)",
1192  boost::regex::extended | boost::regex::icase};
1193  const size_t length_to_cursor =
1194  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1195  // After SELECT but before FROM, look for all columns in all tables which match the
1196  // prefix.
1197  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1198  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1199  // Trust the fully qualified columns the most.
1200  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1201  return;
1202  }
1203  // Not much information to use, just retrieve column names which match the prefix.
1204  if (should_suggest_column_hints(sql)) {
1205  get_column_hints(hints, last_word, column_names_by_table);
1206  return;
1207  }
1208  const std::string kFromKeyword{"FROM"};
1209  if (boost::istarts_with(kFromKeyword, last_word)) {
1210  TCompletionHint keyword_hint;
1211  keyword_hint.type = TCompletionHintType::KEYWORD;
1212  keyword_hint.replaced = last_word;
1213  keyword_hint.hints.emplace_back(kFromKeyword);
1214  hints.push_back(keyword_hint);
1215  }
1216  } else {
1217  const std::string kSelectKeyword{"SELECT"};
1218  if (boost::istarts_with(kSelectKeyword, last_word)) {
1219  TCompletionHint keyword_hint;
1220  keyword_hint.type = TCompletionHintType::KEYWORD;
1221  keyword_hint.replaced = last_word;
1222  keyword_hint.hints.emplace_back(kSelectKeyword);
1223  hints.push_back(keyword_hint);
1224  }
1225  }
1226 }
1227 
1228 std::unordered_map<std::string, std::unordered_set<std::string>>
1229 MapDHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1230  query_state::StdLog& stdlog) {
1231  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1232  for (auto it = table_names.begin(); it != table_names.end();) {
1233  TTableDetails table_details;
1234  try {
1235  get_table_details_impl(table_details, stdlog, *it, false, false);
1236  } catch (const TMapDException& e) {
1237  // Remove the corrupted Table/View name from the list for further processing.
1238  it = table_names.erase(it);
1239  continue;
1240  }
1241  for (const auto& column_type : table_details.row_desc) {
1242  column_names_by_table[*it].emplace(column_type.col_name);
1243  }
1244  ++it;
1245  }
1246  return column_names_by_table;
1247 }
1248 
1250  const std::unordered_set<std::string>& uc_column_names,
1251  std::vector<std::string>& table_names,
1252  query_state::StdLog& stdlog) {
1253  std::unordered_set<std::string> compatible_table_names_by_column;
1254  for (auto it = table_names.begin(); it != table_names.end();) {
1255  TTableDetails table_details;
1256  try {
1257  get_table_details_impl(table_details, stdlog, *it, false, false);
1258  } catch (const TMapDException& e) {
1259  // Remove the corrupted Table/View name from the list for further processing.
1260  it = table_names.erase(it);
1261  continue;
1262  }
1263  for (const auto& column_type : table_details.row_desc) {
1264  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1265  compatible_table_names_by_column.emplace(to_upper(*it));
1266  break;
1267  }
1268  }
1269  ++it;
1270  }
1271  return compatible_table_names_by_column;
1272 }
1273 
1274 void MapDHandler::validate_rel_alg(TTableDescriptor& _return,
1275  QueryStateProxy query_state_proxy) {
1276  try {
1277  const auto query_ra = parse_to_ra(query_state_proxy,
1278  query_state_proxy.getQueryState().getQueryStr(),
1279  {},
1280  boost::none,
1282  TQueryResult result;
1284  query_state_proxy,
1285  query_ra,
1286  true,
1288  -1,
1289  -1,
1290  false,
1291  true,
1292  false,
1293  false,
1294  false);
1295  const auto& row_desc = fixup_row_descriptor(
1296  result.row_set.row_desc,
1297  query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog());
1298  for (const auto& col_desc : row_desc) {
1299  const auto it_ok = _return.insert(std::make_pair(col_desc.col_name, col_desc));
1300  CHECK(it_ok.second);
1301  }
1302  } catch (std::exception& e) {
1303  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1304  }
1305 }
1306 
1307 void MapDHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1308  auto stdlog = STDLOG(get_session_ptr(session));
1309  auto session_ptr = stdlog.getConstSessionInfo();
1310  if (!session_ptr->get_currentUser().isSuper) {
1311  // WARNING: This appears to not include roles a user is a member of,
1312  // if the role has no permissions granted to it.
1313  roles =
1314  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1315  session_ptr->getCatalog().getCurrentDB().dbId);
1316  } else {
1317  roles = SysCatalog::instance().getRoles(
1318  false, true, session_ptr->get_currentUser().userName);
1319  }
1320 }
1321 
1322 bool MapDHandler::has_role(const TSessionId& sessionId,
1323  const std::string& granteeName,
1324  const std::string& roleName) {
1325  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1326  const auto session_ptr = stdlog.getConstSessionInfo();
1327  const auto current_user = session_ptr->get_currentUser();
1328  if (!current_user.isSuper) {
1329  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1330  user && current_user.userName != granteeName) {
1331  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1332  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1333  current_user.userName, granteeName, true)) {
1335  "Only super users can check roles assignment that have not been directly "
1336  "granted to a user.");
1337  }
1338  }
1339  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1340 }
1341 
1342 static TDBObject serialize_db_object(const std::string& roleName,
1343  const DBObject& inObject) {
1344  TDBObject outObject;
1345  outObject.objectName = inObject.getName();
1346  outObject.grantee = roleName;
1347  const auto ap = inObject.getPrivileges();
1348  switch (inObject.getObjectKey().permissionType) {
1349  case DatabaseDBObjectType:
1350  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1351  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1352  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1353  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1354  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1355 
1356  break;
1357  case TableDBObjectType:
1358  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1359  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1360  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1361  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1362  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1363  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1364  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1365  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1366  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1367 
1368  break;
1369  case DashboardDBObjectType:
1370  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1371  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1372  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1373  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1374  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1375 
1376  break;
1377  case ViewDBObjectType:
1378  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1379  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1380  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1381  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1382  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1383  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1384  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1385 
1386  break;
1387  default:
1388  CHECK(false);
1389  }
1390  const int type_val = static_cast<int>(inObject.getType());
1391  CHECK(type_val >= 0 && type_val < 5);
1392  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1393  return outObject;
1394 }
1395 
1397  const TDBObjectPermissions& permissions) {
1398  if (!permissions.__isset.database_permissions_) {
1399  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1400  }
1401  auto perms = permissions.database_permissions_;
1402  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1403  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1404  (perms.view_sql_editor_ &&
1406  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1407  return false;
1408  } else {
1409  return true;
1410  }
1411 }
1412 
1414  const TDBObjectPermissions& permissions) {
1415  if (!permissions.__isset.table_permissions_) {
1416  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1417  }
1418  auto perms = permissions.table_permissions_;
1419  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1420  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1421  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1422  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1423  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1424  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1425  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1426  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1427  return false;
1428  } else {
1429  return true;
1430  }
1431 }
1432 
1434  const TDBObjectPermissions& permissions) {
1435  if (!permissions.__isset.dashboard_permissions_) {
1436  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1437  }
1438  auto perms = permissions.dashboard_permissions_;
1439  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1440  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1441  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1442  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1443  return false;
1444  } else {
1445  return true;
1446  }
1447 }
1448 
1450  const TDBObjectPermissions& permissions) {
1451  if (!permissions.__isset.view_permissions_) {
1452  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1453  }
1454  auto perms = permissions.view_permissions_;
1455  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1456  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1457  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1458  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1459  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1460  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1461  return false;
1462  } else {
1463  return true;
1464  }
1465 }
1466 
1467 bool MapDHandler::has_object_privilege(const TSessionId& sessionId,
1468  const std::string& granteeName,
1469  const std::string& objectName,
1470  const TDBObjectType::type objectType,
1471  const TDBObjectPermissions& permissions) {
1472  auto stdlog = STDLOG(get_session_ptr(sessionId));
1473  auto session_ptr = stdlog.getConstSessionInfo();
1474  auto const& cat = session_ptr->getCatalog();
1475  auto const& current_user = session_ptr->get_currentUser();
1476  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1477  current_user.userName, granteeName, false)) {
1479  "Users except superusers can only check privileges for self or roles granted to "
1480  "them.")
1481  }
1483  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1484  user_meta.isSuper) {
1485  return true;
1486  }
1487  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1488  if (!grnt) {
1489  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1490  }
1492  std::string func_name;
1493  switch (objectType) {
1496  func_name = "database";
1497  break;
1500  func_name = "table";
1501  break;
1504  func_name = "dashboard";
1505  break;
1508  func_name = "view";
1509  break;
1510  default:
1511  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1512  }
1513  DBObject req_object(objectName, type);
1514  req_object.loadKey(cat);
1515 
1516  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1517  if (grantee_object) {
1518  // if grantee has privs on the object
1519  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1520  } else {
1521  // no privileges on that object
1522  return false;
1523  }
1524 }
1525 
1526 void MapDHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1527  const TSessionId& sessionId,
1528  const std::string& roleName) {
1529  auto stdlog = STDLOG(get_session_ptr(sessionId));
1530  auto session_ptr = stdlog.getConstSessionInfo();
1531  auto const& user = session_ptr->get_currentUser();
1532  if (!user.isSuper &&
1533  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1534  return;
1535  }
1536  auto* rl = SysCatalog::instance().getGrantee(roleName);
1537  if (rl) {
1538  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1539  for (auto& dbObject : *rl->getDbObjects(true)) {
1540  if (dbObject.first.dbId != dbId) {
1541  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1542  // usecase for now, though)
1543  continue;
1544  }
1545  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1546  TDBObjectsForRole.push_back(tdbObject);
1547  }
1548  } else {
1549  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
1550  }
1551 }
1552 
1553 void MapDHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1554  const TSessionId& sessionId,
1555  const std::string& objectName,
1556  const TDBObjectType::type type) {
1557  auto stdlog = STDLOG(get_session_ptr(sessionId));
1558  auto session_ptr = stdlog.getConstSessionInfo();
1559  DBObjectType object_type;
1560  switch (type) {
1562  object_type = DBObjectType::DatabaseDBObjectType;
1563  break;
1565  object_type = DBObjectType::TableDBObjectType;
1566  break;
1569  break;
1571  object_type = DBObjectType::ViewDBObjectType;
1572  break;
1573  default:
1574  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
1575  ": unknown object type (" + std::to_string(type) + ").");
1576  }
1577  DBObject object_to_find(objectName, object_type);
1578 
1579  try {
1580  if (object_type == DashboardDBObjectType) {
1581  if (objectName == "") {
1582  object_to_find = DBObject(-1, object_type);
1583  } else {
1584  object_to_find = DBObject(std::stoi(objectName), object_type);
1585  }
1586  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1587  !objectName.empty()) {
1588  // special handling for view / table
1589  auto const& cat = session_ptr->getCatalog();
1590  auto td = cat.getMetadataForTable(objectName, false);
1591  if (td) {
1592  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1593  object_to_find = DBObject(objectName, object_type);
1594  }
1595  }
1596  object_to_find.loadKey(session_ptr->getCatalog());
1597  } catch (const std::exception&) {
1598  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
1599  }
1600 
1601  // object type on database level
1602  DBObject object_to_find_dblevel("", object_type);
1603  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
1604  // if user is superuser respond with a full priv
1605  if (session_ptr->get_currentUser().isSuper) {
1606  // using ALL_TABLE here to set max permissions
1607  DBObject dbObj{object_to_find.getObjectKey(),
1609  session_ptr->get_currentUser().userId};
1610  dbObj.setName("super");
1611  TDBObjects.push_back(
1612  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
1613  };
1614 
1615  std::vector<std::string> grantees =
1616  SysCatalog::instance().getRoles(true,
1617  session_ptr->get_currentUser().isSuper,
1618  session_ptr->get_currentUser().userName);
1619  for (const auto& grantee : grantees) {
1620  DBObject* object_found;
1621  auto* gr = SysCatalog::instance().getGrantee(grantee);
1622  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1623  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1624  }
1625  // check object permissions on Database level
1626  if (gr &&
1627  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1628  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1629  }
1630  }
1631 }
1632 
1633 void MapDHandler::get_all_roles_for_user(std::vector<std::string>& roles,
1634  const TSessionId& sessionId,
1635  const std::string& granteeName) {
1636  auto stdlog = STDLOG(get_session_ptr(sessionId));
1637  auto session_ptr = stdlog.getConstSessionInfo();
1638  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
1639  if (grantee) {
1640  if (session_ptr->get_currentUser().isSuper) {
1641  roles = grantee->getRoles();
1642  } else if (grantee->isUser()) {
1643  if (session_ptr->get_currentUser().userName == granteeName) {
1644  roles = grantee->getRoles();
1645  } else {
1647  "Only a superuser is authorized to request list of roles granted to another "
1648  "user.");
1649  }
1650  } else {
1651  CHECK(!grantee->isUser());
1652  // granteeName is actually a roleName here and we can check a role
1653  // only if it is granted to us
1654  if (SysCatalog::instance().isRoleGrantedToGrantee(
1655  session_ptr->get_currentUser().userName, granteeName, false)) {
1656  roles = grantee->getRoles();
1657  } else {
1658  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
1659  }
1660  }
1661  } else {
1662  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
1663  }
1664 }
1665 
1666 namespace {
1668  const std::map<std::string, std::vector<std::string>>& table_col_names) {
1669  std::ostringstream oss;
1670  for (const auto& [table_name, col_names] : table_col_names) {
1671  oss << ":" << table_name;
1672  for (const auto& col_name : col_names) {
1673  oss << "," << col_name;
1674  }
1675  }
1676  return oss.str();
1677 }
1678 } // namespace
1679 
1681  TPixelTableRowResult& _return,
1682  const TSessionId& session,
1683  const int64_t widget_id,
1684  const TPixel& pixel,
1685  const std::map<std::string, std::vector<std::string>>& table_col_names,
1686  const bool column_format,
1687  const int32_t pixel_radius,
1688  const std::string& nonce) {
1689  auto stdlog = STDLOG(get_session_ptr(session),
1690  "widget_id",
1691  widget_id,
1692  "pixel.x",
1693  pixel.x,
1694  "pixel.y",
1695  pixel.y,
1696  "column_format",
1697  column_format,
1698  "pixel_radius",
1699  pixel_radius,
1700  "table_col_names",
1701  dump_table_col_names(table_col_names),
1702  "nonce",
1703  nonce);
1704  auto session_ptr = stdlog.getSessionInfo();
1705  if (!render_handler_) {
1706  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
1707  }
1708 
1709  try {
1710  render_handler_->get_result_row_for_pixel(_return,
1711  session_ptr,
1712  widget_id,
1713  pixel,
1714  table_col_names,
1715  column_format,
1716  pixel_radius,
1717  nonce);
1718  } catch (std::exception& e) {
1719  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1720  }
1721 }
1722 
1723 namespace {
1724 
1725 inline void fixup_geo_column_descriptor(TColumnType& col_type,
1726  const SQLTypes subtype,
1727  const int output_srid) {
1728  col_type.col_type.precision = static_cast<int>(subtype);
1729  col_type.col_type.scale = output_srid;
1730 }
1731 
1732 } // namespace
1733 
1735  const ColumnDescriptor* cd) {
1736  TColumnType col_type;
1737  col_type.col_name = cd->columnName;
1738  col_type.src_name = cd->sourceName;
1739  col_type.col_id = cd->columnId;
1740  col_type.col_type.type = type_to_thrift(cd->columnType);
1741  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
1742  col_type.col_type.nullable = !cd->columnType.get_notnull();
1743  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
1744  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
1745  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
1746  }
1747  if (IS_GEO(cd->columnType.get_type())) {
1749  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
1750  } else {
1751  col_type.col_type.precision = cd->columnType.get_precision();
1752  col_type.col_type.scale = cd->columnType.get_scale();
1753  }
1754  col_type.is_system = cd->isSystemCol;
1756  cat != nullptr) {
1757  // have to get the actual size of the encoding from the dictionary definition
1758  const int dict_id = cd->columnType.get_comp_param();
1759  if (!cat->getMetadataForDict(dict_id, false)) {
1760  col_type.col_type.comp_param = 0;
1761  return col_type;
1762  }
1763  auto dd = cat->getMetadataForDict(dict_id, false);
1764  if (!dd) {
1765  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
1766  }
1767  col_type.col_type.comp_param = dd->dictNBits;
1768  } else {
1769  col_type.col_type.comp_param =
1770  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
1771  ? 32
1772  : cd->columnType.get_comp_param();
1773  }
1774  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
1775  return col_type;
1776 }
1777 
1778 void MapDHandler::get_internal_table_details(TTableDetails& _return,
1779  const TSessionId& session,
1780  const std::string& table_name) {
1781  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1782  get_table_details_impl(_return, stdlog, table_name, true, false);
1783 }
1784 
1785 void MapDHandler::get_table_details(TTableDetails& _return,
1786  const TSessionId& session,
1787  const std::string& table_name) {
1788  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1789  get_table_details_impl(_return, stdlog, table_name, false, false);
1790 }
1791 
1792 void MapDHandler::get_table_details_impl(TTableDetails& _return,
1793  query_state::StdLog& stdlog,
1794  const std::string& table_name,
1795  const bool get_system,
1796  const bool get_physical) {
1797  auto session_info = stdlog.getSessionInfo();
1798  auto& cat = session_info->getCatalog();
1799  auto td = cat.getMetadataForTable(
1800  table_name,
1801  false); // don't populate fragmenter on this call since we only want metadata
1802  if (!td) {
1803  THROW_MAPD_EXCEPTION("Table " + table_name + " doesn't exist");
1804  }
1805  if (td->isView) {
1806  auto query_state = create_query_state(session_info, td->viewSQL);
1807  stdlog.setQueryState(query_state);
1808  try {
1809  if (hasTableAccessPrivileges(td, *session_info)) {
1810  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
1811  query_state->getQueryStr(),
1812  {},
1813  boost::none,
1815  TQueryResult result;
1816  execute_rel_alg(result,
1817  query_state->createQueryStateProxy(),
1818  query_ra,
1819  true,
1821  -1,
1822  -1,
1823  false,
1824  true,
1825  false,
1826  false,
1827  false);
1828  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
1829  } else {
1830  THROW_MAPD_EXCEPTION("User has no access privileges to table " + table_name);
1831  }
1832  } catch (const std::exception& e) {
1833  THROW_MAPD_EXCEPTION("View '" + table_name + "' query has failed with an error: '" +
1834  std::string(e.what()) +
1835  "'.\nThe view must be dropped and re-created to "
1836  "resolve the error. \nQuery:\n" +
1837  query_state->getQueryStr());
1838  }
1839  } else {
1840  try {
1841  if (hasTableAccessPrivileges(td, *session_info)) {
1842  const auto col_descriptors =
1843  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
1844  const auto deleted_cd = cat.getDeletedColumn(td);
1845  for (const auto cd : col_descriptors) {
1846  if (cd == deleted_cd) {
1847  continue;
1848  }
1849  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
1850  }
1851  } else {
1852  THROW_MAPD_EXCEPTION("User has no access privileges to table " + table_name);
1853  }
1854  } catch (const std::runtime_error& e) {
1855  THROW_MAPD_EXCEPTION(e.what());
1856  }
1857  }
1858  _return.fragment_size = td->maxFragRows;
1859  _return.page_size = td->fragPageSize;
1860  _return.max_rows = td->maxRows;
1861  _return.view_sql = td->viewSQL;
1862  _return.shard_count = td->nShards;
1863  _return.key_metainfo = td->keyMetainfo;
1864  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
1865  _return.partition_detail =
1866  td->partitions.empty()
1867  ? TPartitionDetail::DEFAULT
1868  : (table_is_replicated(td)
1869  ? TPartitionDetail::REPLICATED
1870  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
1871  : TPartitionDetail::OTHER));
1872 }
1873 
1874 void MapDHandler::get_link_view(TFrontendView& _return,
1875  const TSessionId& session,
1876  const std::string& link) {
1877  auto stdlog = STDLOG(get_session_ptr(session));
1878  auto session_ptr = stdlog.getConstSessionInfo();
1879  auto const& cat = session_ptr->getCatalog();
1880  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
1881  if (!ld) {
1882  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
1883  }
1884  _return.view_state = ld->viewState;
1885  _return.view_name = ld->link;
1886  _return.update_time = ld->updateTime;
1887  _return.view_metadata = ld->viewMetadata;
1888 }
1889 
1891  const TableDescriptor* td,
1892  const Catalog_Namespace::SessionInfo& session_info) {
1893  auto& cat = session_info.getCatalog();
1894  auto user_metadata = session_info.get_currentUser();
1895 
1896  if (user_metadata.isSuper) {
1897  return true;
1898  }
1899 
1901  dbObject.loadKey(cat);
1902  std::vector<DBObject> privObjects = {dbObject};
1903 
1904  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
1905 }
1906 
1907 void MapDHandler::get_tables_impl(std::vector<std::string>& table_names,
1908  const Catalog_Namespace::SessionInfo& session_info,
1909  const GetTablesType get_tables_type) {
1910  auto const& cat = session_info.getCatalog();
1911  const auto tables = cat.getAllTableMetadata();
1912  for (const auto td : tables) {
1913  if (td->shard >= 0) {
1914  // skip shards, they're not standalone tables
1915  continue;
1916  }
1917  switch (get_tables_type) {
1918  case GET_PHYSICAL_TABLES: {
1919  if (td->isView) {
1920  continue;
1921  }
1922  break;
1923  }
1924  case GET_VIEWS: {
1925  if (!td->isView) {
1926  continue;
1927  }
1928  }
1929  default:
1930  break;
1931  }
1932  if (!hasTableAccessPrivileges(td, session_info)) {
1933  // skip table, as there are no privileges to access it
1934  continue;
1935  }
1936  table_names.push_back(td->tableName);
1937  }
1938 }
1939 
1940 void MapDHandler::get_tables(std::vector<std::string>& table_names,
1941  const TSessionId& session) {
1942  auto stdlog = STDLOG(get_session_ptr(session));
1944  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
1945 }
1946 
1947 void MapDHandler::get_physical_tables(std::vector<std::string>& table_names,
1948  const TSessionId& session) {
1949  auto stdlog = STDLOG(get_session_ptr(session));
1950  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
1951 }
1952 
1953 void MapDHandler::get_views(std::vector<std::string>& table_names,
1954  const TSessionId& session) {
1955  auto stdlog = STDLOG(get_session_ptr(session));
1956  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
1957 }
1958 
1959 void MapDHandler::get_tables_meta(std::vector<TTableMeta>& _return,
1960  const TSessionId& session) {
1961  auto stdlog = STDLOG(get_session_ptr(session));
1962  auto session_ptr = stdlog.getConstSessionInfo();
1963  auto query_state = create_query_state(session_ptr, "");
1964  stdlog.setQueryState(query_state);
1965 
1966  const auto& cat = session_ptr->getCatalog();
1967  const auto tables = cat.getAllTableMetadata();
1968  _return.reserve(tables.size());
1969 
1970  for (const auto td : tables) {
1971  if (td->shard >= 0) {
1972  // skip shards, they're not standalone tables
1973  continue;
1974  }
1975  if (!hasTableAccessPrivileges(td, *session_ptr)) {
1976  // skip table, as there are no privileges to access it
1977  continue;
1978  }
1979 
1980  TTableMeta ret;
1981  ret.table_name = td->tableName;
1982  ret.is_view = td->isView;
1983  ret.is_replicated = table_is_replicated(td);
1984  ret.shard_count = td->nShards;
1985  ret.max_rows = td->maxRows;
1986  ret.table_id = td->tableId;
1987 
1988  std::vector<TTypeInfo> col_types;
1989  std::vector<std::string> col_names;
1990  size_t num_cols = 0;
1991  if (td->isView) {
1992  try {
1993  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
1994  td->viewSQL,
1995  {},
1996  boost::none,
1998  TQueryResult result;
1999  execute_rel_alg(result,
2000  query_state->createQueryStateProxy(),
2001  query_ra,
2002  true,
2004  -1,
2005  -1,
2006  false,
2007  true,
2008  false,
2009  false,
2010  false);
2011  num_cols = result.row_set.row_desc.size();
2012  for (const auto col : result.row_set.row_desc) {
2013  if (col.is_physical) {
2014  num_cols--;
2015  continue;
2016  }
2017  col_types.push_back(col.col_type);
2018  col_names.push_back(col.col_name);
2019  }
2020  } catch (std::exception& e) {
2021  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
2022  }
2023  } else {
2024  try {
2025  if (hasTableAccessPrivileges(td, *session_ptr)) {
2026  const auto col_descriptors =
2027  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2028  const auto deleted_cd = cat.getDeletedColumn(td);
2029  for (const auto cd : col_descriptors) {
2030  if (cd == deleted_cd) {
2031  continue;
2032  }
2033  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2034  col_names.push_back(cd->columnName);
2035  }
2036  num_cols = col_descriptors.size();
2037  } else {
2038  continue;
2039  }
2040  } catch (const std::runtime_error& e) {
2041  THROW_MAPD_EXCEPTION(e.what());
2042  }
2043  }
2044 
2045  ret.num_cols = num_cols;
2046  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2047  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2048 
2049  _return.push_back(ret);
2050  }
2051 }
2052 
2053 void MapDHandler::get_users(std::vector<std::string>& user_names,
2054  const TSessionId& session) {
2055  auto stdlog = STDLOG(get_session_ptr(session));
2056  auto session_ptr = stdlog.getConstSessionInfo();
2057  std::list<Catalog_Namespace::UserMetadata> user_list;
2058 
2059  if (!session_ptr->get_currentUser().isSuper) {
2060  user_list = SysCatalog::instance().getAllUserMetadata(
2061  session_ptr->getCatalog().getCurrentDB().dbId);
2062  } else {
2063  user_list = SysCatalog::instance().getAllUserMetadata();
2064  }
2065  for (auto u : user_list) {
2066  user_names.push_back(u.userName);
2067  }
2068 }
2069 
2070 void MapDHandler::get_version(std::string& version) {
2071  version = MAPD_RELEASE;
2072 }
2073 
2074 void MapDHandler::clear_gpu_memory(const TSessionId& session) {
2075  auto stdlog = STDLOG(get_session_ptr(session));
2076  auto session_ptr = stdlog.getConstSessionInfo();
2077  if (!session_ptr->get_currentUser().isSuper) {
2078  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2079  }
2080  try {
2082  } catch (const std::exception& e) {
2083  THROW_MAPD_EXCEPTION(e.what());
2084  }
2085  if (render_handler_) {
2086  render_handler_->clear_gpu_memory();
2087  }
2088 
2089  if (leaf_aggregator_.leafCount() > 0) {
2091  }
2092 }
2093 
2094 void MapDHandler::clear_cpu_memory(const TSessionId& session) {
2095  auto stdlog = STDLOG(get_session_ptr(session));
2096  auto session_ptr = stdlog.getConstSessionInfo();
2097  if (!session_ptr->get_currentUser().isSuper) {
2098  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2099  }
2100  try {
2102  } catch (const std::exception& e) {
2103  THROW_MAPD_EXCEPTION(e.what());
2104  }
2105  if (render_handler_) {
2106  render_handler_->clear_cpu_memory();
2107  }
2108 
2109  if (leaf_aggregator_.leafCount() > 0) {
2111  }
2112 }
2113 
2115  return INVALID_SESSION_ID;
2116 }
2117 
2118 void MapDHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2119  const TSessionId& session,
2120  const std::string& memory_level) {
2121  auto stdlog = STDLOG(get_session_ptr(session));
2122  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2123  Data_Namespace::MemoryLevel mem_level;
2124  if (!memory_level.compare("gpu")) {
2126  internal_memory =
2127  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2128  } else {
2130  internal_memory =
2131  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2132  }
2133 
2134  for (auto memInfo : internal_memory) {
2135  TNodeMemoryInfo nodeInfo;
2136  if (leaf_aggregator_.leafCount() > 0) {
2137  nodeInfo.host_name = get_hostname();
2138  }
2139  nodeInfo.page_size = memInfo.pageSize;
2140  nodeInfo.max_num_pages = memInfo.maxNumPages;
2141  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2142  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2143  for (auto gpu : memInfo.nodeMemoryData) {
2144  TMemoryData md;
2145  md.slab = gpu.slabNum;
2146  md.start_page = gpu.startPage;
2147  md.num_pages = gpu.numPages;
2148  md.touch = gpu.touch;
2149  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2150  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2151  nodeInfo.node_memory_data.push_back(md);
2152  }
2153  _return.push_back(nodeInfo);
2154  }
2155  if (leaf_aggregator_.leafCount() > 0) {
2156  std::vector<TNodeMemoryInfo> leafSummary =
2157  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2158  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2159  }
2160 }
2161 
2162 void MapDHandler::get_databases(std::vector<TDBInfo>& dbinfos,
2163  const TSessionId& session) {
2164  auto stdlog = STDLOG(get_session_ptr(session));
2165  auto session_ptr = stdlog.getConstSessionInfo();
2166  const auto& user = session_ptr->get_currentUser();
2168  SysCatalog::instance().getDatabaseListForUser(user);
2169  for (auto& db : dbs) {
2170  TDBInfo dbinfo;
2171  dbinfo.db_name = std::move(db.dbName);
2172  dbinfo.db_owner = std::move(db.dbOwnerName);
2173  dbinfos.push_back(std::move(dbinfo));
2174  }
2175 }
2176 
2177 void MapDHandler::set_execution_mode(const TSessionId& session,
2178  const TExecuteMode::type mode) {
2179  auto stdlog = STDLOG(get_session_ptr(session));
2180  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
2181  auto session_it = get_session_it_unsafe(session, write_lock);
2182  if (leaf_aggregator_.leafCount() > 0) {
2183  leaf_aggregator_.set_execution_mode(session, mode);
2184  try {
2185  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2186  } catch (const TMapDException& e) {
2187  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2188  }
2189  return;
2190  }
2191  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2192 }
2193 
2194 namespace {
2195 
2196 void check_table_not_sharded(const Catalog& cat, const std::string& table_name) {
2197  const auto td = cat.getMetadataForTable(table_name);
2198  if (td && td->nShards) {
2199  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2200  }
2201 }
2202 
2203 } // namespace
2204 
2205 void MapDHandler::load_table_binary(const TSessionId& session,
2206  const std::string& table_name,
2207  const std::vector<TRow>& rows) {
2208  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2209  auto session_ptr = stdlog.getConstSessionInfo();
2210  check_read_only("load_table_binary");
2211  auto& cat = session_ptr->getCatalog();
2212  if (g_cluster && !leaf_aggregator_.leafCount()) {
2213  // Sharded table rows need to be routed to the leaf by an aggregator.
2214  check_table_not_sharded(cat, table_name);
2215  }
2216  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2217  if (td == nullptr) {
2218  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2219  }
2220  check_table_load_privileges(*session_ptr, table_name);
2221  if (rows.empty()) {
2222  return;
2223  }
2224  std::unique_ptr<Importer_NS::Loader> loader;
2225  if (leaf_aggregator_.leafCount() > 0) {
2226  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2227  } else {
2228  loader.reset(new Importer_NS::Loader(cat, td));
2229  }
2230  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2231  // Subtracting 1 (rowid) until TableDescriptor is updated.
2232  if (rows.front().cols.size() !=
2233  static_cast<size_t>(td->nColumns) - (td->hasDeletedCol ? 2 : 1)) {
2234  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2235  }
2236  auto col_descs = loader->get_column_descs();
2237  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2238  for (auto cd : col_descs) {
2239  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2240  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2241  }
2242  for (auto const& row : rows) {
2243  size_t col_idx = 0;
2244  try {
2245  for (auto cd : col_descs) {
2246  import_buffers[col_idx]->add_value(
2247  cd, row.cols[col_idx], row.cols[col_idx].is_null);
2248  col_idx++;
2249  }
2250  } catch (const std::exception& e) {
2251  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2252  import_buffers[col_idx_to_pop]->pop_value();
2253  }
2254  LOG(ERROR) << "Input exception thrown: " << e.what()
2255  << ". Row discarded, issue at column : " << (col_idx + 1)
2256  << " data :" << row;
2257  }
2258  }
2259  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2260  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2261  loader->load(import_buffers, rows.size());
2262 }
2263 
2265  const Catalog_Namespace::SessionInfo& session_info,
2266  const std::string& table_name,
2267  size_t num_cols,
2268  std::unique_ptr<Importer_NS::Loader>* loader,
2269  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>>* import_buffers) {
2270  auto& cat = session_info.getCatalog();
2271  if (g_cluster && !leaf_aggregator_.leafCount()) {
2272  // Sharded table rows need to be routed to the leaf by an aggregator.
2273  check_table_not_sharded(cat, table_name);
2274  }
2275  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2276  if (td == nullptr) {
2277  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2278  }
2279  check_table_load_privileges(session_info, table_name);
2280  if (leaf_aggregator_.leafCount() > 0) {
2281  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2282  } else {
2283  loader->reset(new Importer_NS::Loader(cat, td));
2284  }
2285  auto col_descs = (*loader)->get_column_descs();
2286  auto geo_physical_cols = std::count_if(
2287  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2288  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2289  // Subtracting 1 (rowid) until TableDescriptor is updated.
2290  if (num_cols != static_cast<size_t>(td->nColumns) - geo_physical_cols -
2291  (td->hasDeletedCol ? 2 : 1) ||
2292  num_cols < 1) {
2293  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2294  }
2295  for (auto cd : col_descs) {
2296  import_buffers->push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2297  new Importer_NS::TypedImportBuffer(cd, (*loader)->getStringDict(cd))));
2298  }
2299 }
2300 
2301 void MapDHandler::load_table_binary_columnar(const TSessionId& session,
2302  const std::string& table_name,
2303  const std::vector<TColumn>& cols) {
2304  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2305  auto session_ptr = stdlog.getConstSessionInfo();
2306  check_read_only("load_table_binary_columnar");
2307  auto const& cat = session_ptr->getCatalog();
2308 
2309  std::unique_ptr<Importer_NS::Loader> loader;
2310  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2312  *session_ptr, table_name, cols.size(), &loader, &import_buffers);
2313 
2314  size_t numRows = 0;
2315  size_t import_idx = 0; // index into the TColumn vector being loaded
2316  size_t col_idx = 0; // index into column description vector
2317  try {
2318  size_t skip_physical_cols = 0;
2319  for (auto cd : loader->get_column_descs()) {
2320  if (skip_physical_cols > 0) {
2321  if (!cd->isGeoPhyCol) {
2322  throw std::runtime_error("Unexpected physical column");
2323  }
2324  skip_physical_cols--;
2325  continue;
2326  }
2327  size_t colRows = import_buffers[col_idx]->add_values(cd, cols[import_idx]);
2328  if (col_idx == 0) {
2329  numRows = colRows;
2330  } else if (colRows != numRows) {
2331  std::ostringstream oss;
2332  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
2333  << cd->columnName << " , expecting " << numRows << " rows, column "
2334  << col_idx << " has " << colRows << " rows";
2335  THROW_MAPD_EXCEPTION(oss.str());
2336  }
2337  // Advance to the next column in the table
2338  col_idx++;
2339 
2340  // For geometry columns: process WKT strings and fill physical columns
2341  if (cd->columnType.is_geometry()) {
2342  auto geo_col_idx = col_idx - 1;
2343  const auto wkt_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2344  std::vector<std::vector<double>> coords_column, bounds_column;
2345  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2346  int render_group = 0;
2347  SQLTypeInfo ti = cd->columnType;
2348  if (numRows != wkt_column->size() ||
2350  ti,
2351  coords_column,
2352  bounds_column,
2353  ring_sizes_column,
2354  poly_rings_column,
2355  false)) {
2356  std::ostringstream oss;
2357  oss << "load_table_binary_columnar: Invalid geometry in column "
2358  << cd->columnName;
2359  THROW_MAPD_EXCEPTION(oss.str());
2360  }
2361  // Populate physical columns, advance col_idx
2363  cd,
2364  import_buffers,
2365  col_idx,
2366  coords_column,
2367  bounds_column,
2368  ring_sizes_column,
2369  poly_rings_column,
2370  render_group);
2371  skip_physical_cols = cd->columnType.get_physical_cols();
2372  }
2373  // Advance to the next column of values being loaded
2374  import_idx++;
2375  }
2376  } catch (const std::exception& e) {
2377  std::ostringstream oss;
2378  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
2379  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2380  THROW_MAPD_EXCEPTION(oss.str());
2381  }
2382  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2383  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2384  loader->load(import_buffers, numRows);
2385 }
2386 
2387 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
2388 
2389 #define ARROW_THRIFT_THROW_NOT_OK(s) \
2390  do { \
2391  ::arrow::Status _s = (s); \
2392  if (UNLIKELY(!_s.ok())) { \
2393  TMapDException ex; \
2394  ex.error_msg = _s.ToString(); \
2395  LOG(ERROR) << s.ToString(); \
2396  throw ex; \
2397  } \
2398  } while (0)
2399 
2400 namespace {
2401 
2402 RecordBatchVector loadArrowStream(const std::string& stream) {
2403  RecordBatchVector batches;
2404  try {
2405  // TODO(wesm): Make this simpler in general, see ARROW-1600
2406  auto stream_buffer =
2407  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
2408  static_cast<int64_t>(stream.size()));
2409 
2410  arrow::io::BufferReader buf_reader(stream_buffer);
2411  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
2413  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader, &batch_reader));
2414 
2415  while (true) {
2416  std::shared_ptr<arrow::RecordBatch> batch;
2417  // Read batch (zero-copy) from the stream
2418  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
2419  if (batch == nullptr) {
2420  break;
2421  }
2422  batches.emplace_back(std::move(batch));
2423  }
2424  } catch (const std::exception& e) {
2425  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
2426  }
2427  return batches;
2428 }
2429 
2430 } // namespace
2431 
2432 void MapDHandler::load_table_binary_arrow(const TSessionId& session,
2433  const std::string& table_name,
2434  const std::string& arrow_stream) {
2435  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2436  auto session_ptr = stdlog.getConstSessionInfo();
2437  check_read_only("load_table_binary_arrow");
2438 
2439  RecordBatchVector batches = loadArrowStream(arrow_stream);
2440 
2441  // Assuming have one batch for now
2442  if (batches.size() != 1) {
2443  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
2444  }
2445 
2446  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
2447 
2448  std::unique_ptr<Importer_NS::Loader> loader;
2449  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2450  prepare_columnar_loader(*session_ptr,
2451  table_name,
2452  static_cast<size_t>(batch->num_columns()),
2453  &loader,
2454  &import_buffers);
2455 
2456  size_t numRows = 0;
2457  size_t col_idx = 0;
2458  try {
2459  for (auto cd : loader->get_column_descs()) {
2460  auto& array = *batch->column(col_idx);
2461  Importer_NS::ArraySliceRange row_slice(0, array.length());
2462  numRows =
2463  import_buffers[col_idx]->add_arrow_values(cd, array, true, row_slice, nullptr);
2464  col_idx++;
2465  }
2466  } catch (const std::exception& e) {
2467  LOG(ERROR) << "Input exception thrown: " << e.what()
2468  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2469  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
2470  // other import paths
2471  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2472  }
2473  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2474  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2475  loader->load(import_buffers, numRows);
2476 }
2477 
2478 void MapDHandler::load_table(const TSessionId& session,
2479  const std::string& table_name,
2480  const std::vector<TStringRow>& rows) {
2481  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2482  auto session_ptr = stdlog.getConstSessionInfo();
2483  check_read_only("load_table");
2484  auto& cat = session_ptr->getCatalog();
2485  if (g_cluster && !leaf_aggregator_.leafCount()) {
2486  // Sharded table rows need to be routed to the leaf by an aggregator.
2487  check_table_not_sharded(cat, table_name);
2488  }
2489  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2490  if (td == nullptr) {
2491  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2492  }
2493  check_table_load_privileges(*session_ptr, table_name);
2494  if (rows.empty()) {
2495  return;
2496  }
2497  std::unique_ptr<Importer_NS::Loader> loader;
2498  if (leaf_aggregator_.leafCount() > 0) {
2499  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2500  } else {
2501  loader.reset(new Importer_NS::Loader(cat, td));
2502  }
2503  auto col_descs = loader->get_column_descs();
2504  auto geo_physical_cols = std::count_if(
2505  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2506  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2507  // Subtracting 1 (rowid) until TableDescriptor is updated.
2508  if (rows.front().cols.size() != static_cast<size_t>(td->nColumns) - geo_physical_cols -
2509  (td->hasDeletedCol ? 2 : 1)) {
2510  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name +
2511  " (" + std::to_string(rows.front().cols.size()) + " vs " +
2512  std::to_string(td->nColumns - geo_physical_cols - 1) + ")");
2513  }
2514  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2515  for (auto cd : col_descs) {
2516  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2517  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2518  }
2519  Importer_NS::CopyParams copy_params;
2520  size_t rows_completed = 0;
2521  for (auto const& row : rows) {
2522  size_t import_idx = 0; // index into the TStringRow being loaded
2523  size_t col_idx = 0; // index into column description vector
2524  try {
2525  size_t skip_physical_cols = 0;
2526  for (auto cd : col_descs) {
2527  if (skip_physical_cols > 0) {
2528  if (!cd->isGeoPhyCol) {
2529  throw std::runtime_error("Unexpected physical column");
2530  }
2531  skip_physical_cols--;
2532  continue;
2533  }
2534  import_buffers[col_idx]->add_value(
2535  cd, row.cols[import_idx].str_val, row.cols[import_idx].is_null, copy_params);
2536  // Advance to the next column within the table
2537  col_idx++;
2538 
2539  if (cd->columnType.is_geometry()) {
2540  // Populate physical columns
2541  std::vector<double> coords, bounds;
2542  std::vector<int> ring_sizes, poly_rings;
2543  int render_group = 0;
2544  SQLTypeInfo ti;
2545  if (row.cols[import_idx].is_null ||
2546  !Geo_namespace::GeoTypesFactory::getGeoColumns(row.cols[import_idx].str_val,
2547  ti,
2548  coords,
2549  bounds,
2550  ring_sizes,
2551  poly_rings,
2552  false)) {
2553  throw std::runtime_error("Invalid geometry");
2554  }
2555  if (cd->columnType.get_type() != ti.get_type()) {
2556  throw std::runtime_error("Geometry type mismatch");
2557  }
2559  cd,
2560  import_buffers,
2561  col_idx,
2562  coords,
2563  bounds,
2564  ring_sizes,
2565  poly_rings,
2566  render_group);
2567  skip_physical_cols = cd->columnType.get_physical_cols();
2568  }
2569  // Advance to the next field within the row
2570  import_idx++;
2571  }
2572  rows_completed++;
2573  } catch (const std::exception& e) {
2574  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2575  import_buffers[col_idx_to_pop]->pop_value();
2576  }
2577  LOG(ERROR) << "Input exception thrown: " << e.what()
2578  << ". Row discarded, issue at column : " << (col_idx + 1)
2579  << " data :" << row;
2580  }
2581  }
2582  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2583  session_ptr->getCatalog(), table_name, LockType::CheckpointLock);
2584  loader->load(import_buffers, rows_completed);
2585 }
2586 
2587 char MapDHandler::unescape_char(std::string str) {
2588  char out = str[0];
2589  if (str.size() == 2 && str[0] == '\\') {
2590  if (str[1] == 't') {
2591  out = '\t';
2592  } else if (str[1] == 'n') {
2593  out = '\n';
2594  } else if (str[1] == '0') {
2595  out = '\0';
2596  } else if (str[1] == '\'') {
2597  out = '\'';
2598  } else if (str[1] == '\\') {
2599  out = '\\';
2600  }
2601  }
2602  return out;
2603 }
2604 
2606  Importer_NS::CopyParams copy_params;
2607  switch (cp.has_header) {
2608  case TImportHeaderRow::AUTODETECT:
2610  break;
2611  case TImportHeaderRow::NO_HEADER:
2613  break;
2614  case TImportHeaderRow::HAS_HEADER:
2616  break;
2617  default:
2618  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
2619  std::to_string((int)cp.has_header));
2620  break;
2621  }
2622  copy_params.quoted = cp.quoted;
2623  if (cp.delimiter.length() > 0) {
2624  copy_params.delimiter = unescape_char(cp.delimiter);
2625  } else {
2626  copy_params.delimiter = '\0';
2627  }
2628  if (cp.null_str.length() > 0) {
2629  copy_params.null_str = cp.null_str;
2630  }
2631  if (cp.quote.length() > 0) {
2632  copy_params.quote = unescape_char(cp.quote);
2633  }
2634  if (cp.escape.length() > 0) {
2635  copy_params.escape = unescape_char(cp.escape);
2636  }
2637  if (cp.line_delim.length() > 0) {
2638  copy_params.line_delim = unescape_char(cp.line_delim);
2639  }
2640  if (cp.array_delim.length() > 0) {
2641  copy_params.array_delim = unescape_char(cp.array_delim);
2642  }
2643  if (cp.array_begin.length() > 0) {
2644  copy_params.array_begin = unescape_char(cp.array_begin);
2645  }
2646  if (cp.array_end.length() > 0) {
2647  copy_params.array_end = unescape_char(cp.array_end);
2648  }
2649  if (cp.threads != 0) {
2650  copy_params.threads = cp.threads;
2651  }
2652  if (cp.s3_access_key.length() > 0) {
2653  copy_params.s3_access_key = cp.s3_access_key;
2654  }
2655  if (cp.s3_secret_key.length() > 0) {
2656  copy_params.s3_secret_key = cp.s3_secret_key;
2657  }
2658  if (cp.s3_region.length() > 0) {
2659  copy_params.s3_region = cp.s3_region;
2660  }
2661  if (cp.s3_endpoint.length() > 0) {
2662  copy_params.s3_endpoint = cp.s3_endpoint;
2663  }
2664  switch (cp.file_type) {
2665  case TFileType::POLYGON:
2667  break;
2668  case TFileType::DELIMITED:
2670  break;
2671 #ifdef ENABLE_IMPORT_PARQUET
2672  case TFileType::PARQUET:
2673  copy_params.file_type = Importer_NS::FileType::PARQUET;
2674  break;
2675 #endif
2676  default:
2677  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
2678  std::to_string((int)cp.file_type));
2679  break;
2680  }
2681  switch (cp.geo_coords_encoding) {
2682  case TEncodingType::GEOINT:
2683  copy_params.geo_coords_encoding = kENCODING_GEOINT;
2684  break;
2685  case TEncodingType::NONE:
2686  copy_params.geo_coords_encoding = kENCODING_NONE;
2687  break;
2688  default:
2689  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
2690  std::to_string((int)cp.geo_coords_encoding));
2691  break;
2692  }
2693  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2694  switch (cp.geo_coords_type) {
2695  case TDatumType::GEOGRAPHY:
2696  copy_params.geo_coords_type = kGEOGRAPHY;
2697  break;
2698  case TDatumType::GEOMETRY:
2699  copy_params.geo_coords_type = kGEOMETRY;
2700  break;
2701  default:
2702  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
2703  std::to_string((int)cp.geo_coords_type));
2704  break;
2705  }
2706  switch (cp.geo_coords_srid) {
2707  case 4326:
2708  case 3857:
2709  case 900913:
2710  copy_params.geo_coords_srid = cp.geo_coords_srid;
2711  break;
2712  default:
2713  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
2714  std::to_string((int)cp.geo_coords_srid));
2715  break;
2716  }
2717  copy_params.sanitize_column_names = cp.sanitize_column_names;
2718  copy_params.geo_layer_name = cp.geo_layer_name;
2719  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2720  copy_params.geo_explode_collections = cp.geo_explode_collections;
2721  return copy_params;
2722 }
2723 
2725  TCopyParams copy_params;
2726  copy_params.delimiter = cp.delimiter;
2727  copy_params.null_str = cp.null_str;
2728  switch (cp.has_header) {
2730  copy_params.has_header = TImportHeaderRow::AUTODETECT;
2731  break;
2733  copy_params.has_header = TImportHeaderRow::NO_HEADER;
2734  break;
2736  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
2737  break;
2738  default:
2739  CHECK(false);
2740  break;
2741  }
2742  copy_params.quoted = cp.quoted;
2743  copy_params.quote = cp.quote;
2744  copy_params.escape = cp.escape;
2745  copy_params.line_delim = cp.line_delim;
2746  copy_params.array_delim = cp.array_delim;
2747  copy_params.array_begin = cp.array_begin;
2748  copy_params.array_end = cp.array_end;
2749  copy_params.threads = cp.threads;
2750  copy_params.s3_access_key = cp.s3_access_key;
2751  copy_params.s3_secret_key = cp.s3_secret_key;
2752  copy_params.s3_region = cp.s3_region;
2753  copy_params.s3_endpoint = cp.s3_endpoint;
2754  switch (cp.file_type) {
2756  copy_params.file_type = TFileType::POLYGON;
2757  break;
2758  default:
2759  copy_params.file_type = TFileType::DELIMITED;
2760  break;
2761  }
2762  switch (cp.geo_coords_encoding) {
2763  case kENCODING_GEOINT:
2764  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
2765  break;
2766  default:
2767  copy_params.geo_coords_encoding = TEncodingType::NONE;
2768  break;
2769  }
2770  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2771  switch (cp.geo_coords_type) {
2772  case kGEOGRAPHY:
2773  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
2774  break;
2775  case kGEOMETRY:
2776  copy_params.geo_coords_type = TDatumType::GEOMETRY;
2777  break;
2778  default:
2779  CHECK(false);
2780  break;
2781  }
2782  copy_params.geo_coords_srid = cp.geo_coords_srid;
2783  copy_params.sanitize_column_names = cp.sanitize_column_names;
2784  copy_params.geo_layer_name = cp.geo_layer_name;
2785  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2786  copy_params.geo_explode_collections = cp.geo_explode_collections;
2787  return copy_params;
2788 }
2789 
2790 void add_vsi_network_prefix(std::string& path) {
2791  // do we support network file access?
2793 
2794  // modify head of filename based on source location
2795  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
2796  if (!gdal_network) {
2798  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
2799  }
2800  // invoke GDAL CURL virtual file reader
2801  path = "/vsicurl/" + path;
2802  } else if (boost::istarts_with(path, "s3://")) {
2803  if (!gdal_network) {
2805  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
2806  }
2807  // invoke GDAL S3 virtual file reader
2808  boost::replace_first(path, "s3://", "/vsis3/");
2809  }
2810 }
2811 
2812 void add_vsi_geo_prefix(std::string& path) {
2813  // single gzip'd file (not an archive)?
2814  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
2815  path = "/vsigzip/" + path;
2816  }
2817 }
2818 
2819 void add_vsi_archive_prefix(std::string& path) {
2820  // check for compressed file or file bundle
2821  if (boost::iends_with(path, ".zip")) {
2822  // zip archive
2823  path = "/vsizip/" + path;
2824  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
2825  boost::iends_with(path, ".tar.gz")) {
2826  // tar archive (compressed or uncompressed)
2827  path = "/vsitar/" + path;
2828  }
2829 }
2830 
2831 std::string remove_vsi_prefixes(const std::string& path_in) {
2832  std::string path(path_in);
2833 
2834  // these will be first
2835  if (boost::istarts_with(path, "/vsizip/")) {
2836  boost::replace_first(path, "/vsizip/", "");
2837  } else if (boost::istarts_with(path, "/vsitar/")) {
2838  boost::replace_first(path, "/vsitar/", "");
2839  } else if (boost::istarts_with(path, "/vsigzip/")) {
2840  boost::replace_first(path, "/vsigzip/", "");
2841  }
2842 
2843  // then these
2844  if (boost::istarts_with(path, "/vsicurl/")) {
2845  boost::replace_first(path, "/vsicurl/", "");
2846  } else if (boost::istarts_with(path, "/vsis3/")) {
2847  boost::replace_first(path, "/vsis3/", "s3://");
2848  }
2849 
2850  return path;
2851 }
2852 
2853 bool path_is_relative(const std::string& path) {
2854  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
2855  boost::istarts_with(path, "https://")) {
2856  return false;
2857  }
2858  return !boost::filesystem::path(path).is_absolute();
2859 }
2860 
2861 bool path_has_valid_filename(const std::string& path) {
2862  auto filename = boost::filesystem::path(path).filename().string();
2863  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
2864  return false;
2865  }
2866  return true;
2867 }
2868 
2869 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
2870  if (!path_has_valid_filename(path)) {
2871  return false;
2872  }
2873  if (include_gz) {
2874  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
2875  return true;
2876  }
2877  }
2878  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
2879  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
2880  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
2881  boost::iends_with(path, ".gdb.zip")) {
2882  return true;
2883  }
2884  return false;
2885 }
2886 
2887 bool is_a_supported_archive_file(const std::string& path) {
2888  if (!path_has_valid_filename(path)) {
2889  return false;
2890  }
2891  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
2892  return true;
2893  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
2894  boost::iends_with(path, ".tar.gz")) {
2895  return true;
2896  }
2897  return false;
2898 }
2899 
2900 std::string find_first_geo_file_in_archive(const std::string& archive_path,
2901  const Importer_NS::CopyParams& copy_params) {
2902  // get the recursive list of all files in the archive
2903  std::vector<std::string> files =
2904  Importer_NS::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
2905 
2906  // report the list
2907  LOG(INFO) << "Found " << files.size() << " files in Archive "
2908  << remove_vsi_prefixes(archive_path);
2909  for (const auto& file : files) {
2910  LOG(INFO) << " " << file;
2911  }
2912 
2913  // scan the list for the first candidate file
2914  bool found_suitable_file = false;
2915  std::string file_name;
2916  for (const auto& file : files) {
2917  if (is_a_supported_geo_file(file, false)) {
2918  file_name = file;
2919  found_suitable_file = true;
2920  break;
2921  }
2922  }
2923 
2924  // if we didn't find anything
2925  if (!found_suitable_file) {
2926  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
2927  remove_vsi_prefixes(archive_path);
2928  file_name.clear();
2929  }
2930 
2931  // done
2932  return file_name;
2933 }
2934 
2935 void MapDHandler::detect_column_types(TDetectResult& _return,
2936  const TSessionId& session,
2937  const std::string& file_name_in,
2938  const TCopyParams& cp) {
2939  auto stdlog = STDLOG(get_session_ptr(session));
2940  check_read_only("detect_column_types");
2941 
2943 
2944  std::string file_name{file_name_in};
2945 
2946  if (path_is_relative(file_name)) {
2947  // assume relative paths are relative to data_path / mapd_import / <session>
2948  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
2949  boost::filesystem::path(file_name).filename();
2950  file_name = file_path.string();
2951  }
2952 
2953  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
2954  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
2955  if (is_a_supported_geo_file(file_name, true)) {
2956  // prepare to detect geo file directly
2957  add_vsi_network_prefix(file_name);
2958  add_vsi_geo_prefix(file_name);
2959  } else if (is_a_supported_archive_file(file_name)) {
2960  // find the archive file
2961  add_vsi_network_prefix(file_name);
2962  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
2963  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
2964  }
2965  // find geo file in archive
2966  add_vsi_archive_prefix(file_name);
2967  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
2968  // prepare to detect that geo file
2969  if (geo_file.size()) {
2970  file_name = file_name + std::string("/") + geo_file;
2971  }
2972  } else {
2973  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
2974  file_name_in);
2975  }
2976  }
2977 
2978  auto file_path = boost::filesystem::path(file_name);
2979  // can be a s3 url
2980  if (!boost::istarts_with(file_name, "s3://")) {
2981  if (!boost::filesystem::path(file_name).is_absolute()) {
2982  file_path = import_path_ / picosha2::hash256_hex_string(session) /
2983  boost::filesystem::path(file_name).filename();
2984  file_name = file_path.string();
2985  }
2986 
2987  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
2988  // check for geo file
2989  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
2990  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
2991  }
2992  } else {
2993  // check for regular file
2994  if (!boost::filesystem::exists(file_path)) {
2995  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
2996  }
2997  }
2998  }
2999  try {
3000  if (copy_params.file_type == Importer_NS::FileType::DELIMITED
3001 #ifdef ENABLE_IMPORT_PARQUET
3002  || (copy_params.file_type == Importer_NS::FileType::PARQUET)
3003 #endif
3004  ) {
3005  Importer_NS::Detector detector(file_path, copy_params);
3006  std::vector<SQLTypes> best_types = detector.best_sqltypes;
3007  std::vector<EncodingType> best_encodings = detector.best_encodings;
3008  std::vector<std::string> headers = detector.get_headers();
3009  copy_params = detector.get_copy_params();
3010 
3011  _return.copy_params = copyparams_to_thrift(copy_params);
3012  _return.row_set.row_desc.resize(best_types.size());
3013  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
3014  TColumnType col;
3015  SQLTypes t = best_types[col_idx];
3016  EncodingType encodingType = best_encodings[col_idx];
3017  SQLTypeInfo ti(t, false, encodingType);
3018  if (IS_GEO(t)) {
3019  // set this so encoding_to_thrift does the right thing
3020  ti.set_compression(copy_params.geo_coords_encoding);
3021  // fill in these directly
3022  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3023  col.col_type.scale = copy_params.geo_coords_srid;
3024  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3025  }
3026  col.col_type.type = type_to_thrift(ti);
3027  col.col_type.encoding = encoding_to_thrift(ti);
3028  if (copy_params.sanitize_column_names) {
3029  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3030  } else {
3031  col.col_name = headers[col_idx];
3032  }
3033  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3034  _return.row_set.row_desc[col_idx] = col;
3035  }
3036  size_t num_samples = 100;
3037  auto sample_data = detector.get_sample_rows(num_samples);
3038 
3039  TRow sample_row;
3040  for (auto row : sample_data) {
3041  sample_row.cols.clear();
3042  for (const auto& s : row) {
3043  TDatum td;
3044  td.val.str_val = s;
3045  td.is_null = s.empty();
3046  sample_row.cols.push_back(td);
3047  }
3048  _return.row_set.rows.push_back(sample_row);
3049  }
3050  } else if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
3051  // @TODO simon.eves get this from somewhere!
3052  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
3053 
3054  check_geospatial_files(file_path, copy_params);
3055  std::list<ColumnDescriptor> cds = Importer_NS::Importer::gdalToColumnDescriptors(
3056  file_path.string(), geoColumnName, copy_params);
3057  for (auto cd : cds) {
3058  if (copy_params.sanitize_column_names) {
3059  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
3060  }
3061  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
3062  }
3063  std::map<std::string, std::vector<std::string>> sample_data;
3065  file_path.string(), geoColumnName, sample_data, 100, copy_params);
3066  if (sample_data.size() > 0) {
3067  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
3068  TRow sample_row;
3069  for (auto cd : cds) {
3070  TDatum td;
3071  td.val.str_val = sample_data[cd.sourceName].at(i);
3072  td.is_null = td.val.str_val.empty();
3073  sample_row.cols.push_back(td);
3074  }
3075  _return.row_set.rows.push_back(sample_row);
3076  }
3077  }
3078  _return.copy_params = copyparams_to_thrift(copy_params);
3079  }
3080  } catch (const std::exception& e) {
3081  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
3082  }
3083 }
3084 
3086  const std::string& query_str,
3087  const Catalog_Namespace::SessionInfo& session_info,
3088  const std::string& action /* render or validate */) {
3089  auto& cat = session_info.getCatalog();
3090  LOG(INFO) << action << ": " << hide_sensitive_data_from_query(query_str);
3091  SQLParser parser;
3092  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
3093  std::string last_parsed;
3094  int num_parse_errors = 0;
3095  try {
3096  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
3097  } catch (std::exception& e) {
3098  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3099  }
3100  if (num_parse_errors > 0) {
3101  THROW_MAPD_EXCEPTION("Syntax error at: " + last_parsed);
3102  }
3103  if (parse_trees.size() != 1) {
3104  THROW_MAPD_EXCEPTION("Can only " + action + " a single query at a time.");
3105  }
3106  Parser::Stmt* stmt = parse_trees.front().get();
3107  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
3108  if (ddl != nullptr) {
3109  THROW_MAPD_EXCEPTION("Can only " + action + " SELECT statements.");
3110  }
3111  auto dml = static_cast<Parser::DMLStmt*>(stmt);
3112  Analyzer::Query query;
3113  dml->analyze(cat, query);
3114  Planner::Optimizer optimizer(query, cat);
3115  return optimizer.optimize();
3116 }
3117 
3118 void MapDHandler::render_vega(TRenderResult& _return,
3119  const TSessionId& session,
3120  const int64_t widget_id,
3121  const std::string& vega_json,
3122  const int compression_level,
3123  const std::string& nonce) {
3124  auto stdlog = STDLOG(get_session_ptr(session),
3125  "widget_id",
3126  widget_id,
3127  "compression_level",
3128  compression_level,
3129  "vega_json",
3130  vega_json,
3131  "nonce",
3132  nonce);
3133  auto session_ptr = stdlog.getConstSessionInfo();
3134  if (!render_handler_) {
3135  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
3136  }
3137 
3138  _return.total_time_ms = measure<>::execution([&]() {
3139  try {
3140  render_handler_->render_vega(
3141  _return,
3142  std::make_shared<Catalog_Namespace::SessionInfo>(*session_ptr),
3143  widget_id,
3144  vega_json,
3145  compression_level,
3146  nonce);
3147  } catch (std::exception& e) {
3148  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3149  }
3150  });
3151 }
3152 
3154  int32_t dashboard_id,
3155  AccessPrivileges requestedPermissions) {
3156  DBObject object(dashboard_id, DashboardDBObjectType);
3157  auto& catalog = session_info.getCatalog();
3158  auto& user = session_info.get_currentUser();
3159  object.loadKey(catalog);
3160  object.setPrivileges(requestedPermissions);
3161  std::vector<DBObject> privs = {object};
3162  return SysCatalog::instance().checkPrivileges(user, privs);
3163 }
3164 
3165 // dashboards
3166 void MapDHandler::get_dashboard(TDashboard& dashboard,
3167  const TSessionId& session,
3168  const int32_t dashboard_id) {
3169  auto stdlog = STDLOG(get_session_ptr(session));
3170  auto session_ptr = stdlog.getConstSessionInfo();
3171  auto const& cat = session_ptr->getCatalog();
3173  auto dash = cat.getMetadataForDashboard(dashboard_id);
3174  if (!dash) {
3175  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
3176  " doesn't exist");
3177  }
3179  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3180  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
3181  std::to_string(dashboard_id));
3182  }
3183  user_meta.userName = "";
3184  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3185  auto objects_list = SysCatalog::instance().getMetadataForObject(
3186  cat.getCurrentDB().dbId,
3187  static_cast<int>(DBObjectType::DashboardDBObjectType),
3188  dashboard_id);
3189  dashboard.dashboard_name = dash->dashboardName;
3190  dashboard.dashboard_state = dash->dashboardState;
3191  dashboard.image_hash = dash->imageHash;
3192  dashboard.update_time = dash->updateTime;
3193  dashboard.dashboard_metadata = dash->dashboardMetadata;
3194  dashboard.dashboard_owner = dash->user;
3195  dashboard.dashboard_id = dash->dashboardId;
3196  if (objects_list.empty() ||
3197  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3198  dashboard.is_dash_shared = false;
3199  } else {
3200  dashboard.is_dash_shared = true;
3201  }
3202 }
3203 
3204 void MapDHandler::get_dashboards(std::vector<TDashboard>& dashboards,
3205  const TSessionId& session) {
3206  auto stdlog = STDLOG(get_session_ptr(session));
3207  auto session_ptr = stdlog.getConstSessionInfo();
3208  auto const& cat = session_ptr->getCatalog();
3210  const auto dashes = cat.getAllDashboardsMetadata();
3211  user_meta.userName = "";
3212  for (const auto d : dashes) {
3213  SysCatalog::instance().getMetadataForUserById(d->userId, user_meta);
3215  *session_ptr, d->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3216  auto objects_list = SysCatalog::instance().getMetadataForObject(
3217  cat.getCurrentDB().dbId,
3218  static_cast<int>(DBObjectType::DashboardDBObjectType),
3219  d->dashboardId);
3220  TDashboard dash;
3221  dash.dashboard_name = d->dashboardName;
3222  dash.image_hash = d->imageHash;
3223  dash.update_time = d->updateTime;
3224  dash.dashboard_metadata = d->dashboardMetadata;
3225  dash.dashboard_id = d->dashboardId;
3226  dash.dashboard_owner = d->user;
3227  // dashboardState is intentionally not populated here
3228  // for payload reasons
3229  // use get_dashboard call to get state
3230  if (objects_list.empty() ||
3231  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3232  dash.is_dash_shared = false;
3233  } else {
3234  dash.is_dash_shared = true;
3235  }
3236  dashboards.push_back(dash);
3237  }
3238  }
3239 }
3240 
3241 int32_t MapDHandler::create_dashboard(const TSessionId& session,
3242  const std::string& dashboard_name,
3243  const std::string& dashboard_state,
3244  const std::string& image_hash,
3245  const std::string& dashboard_metadata) {
3246  auto stdlog = STDLOG(get_session_ptr(session));
3247  auto session_ptr = stdlog.getConstSessionInfo();
3248  check_read_only("create_dashboard");
3249  auto& cat = session_ptr->getCatalog();
3250 
3251  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
3253  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
3254  }
3255 
3256  auto dash = cat.getMetadataForDashboard(
3257  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
3258  if (dash) {
3259  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
3260  }
3261 
3263  dd.dashboardName = dashboard_name;
3264  dd.dashboardState = dashboard_state;
3265  dd.imageHash = image_hash;
3266  dd.dashboardMetadata = dashboard_metadata;
3267  dd.userId = session_ptr->get_currentUser().userId;
3268  dd.user = session_ptr->get_currentUser().userName;
3269 
3270  try {
3271  auto id = cat.createDashboard(dd);
3272  // TODO: transactionally unsafe
3273  SysCatalog::instance().createDBObject(
3274  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
3275  return id;
3276  } catch (const std::exception& e) {
3277  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3278  }
3279 }
3280 
3281 void MapDHandler::replace_dashboard(const TSessionId& session,
3282  const int32_t dashboard_id,
3283  const std::string& dashboard_name,
3284  const std::string& dashboard_owner,
3285  const std::string& dashboard_state,
3286  const std::string& image_hash,
3287  const std::string& dashboard_metadata) {
3288  auto stdlog = STDLOG(get_session_ptr(session));
3289  auto session_ptr = stdlog.getConstSessionInfo();
3290  check_read_only("replace_dashboard");
3291  auto& cat = session_ptr->getCatalog();
3292 
3294  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
3295  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
3296  }
3297 
3299  dd.dashboardName = dashboard_name;
3300  dd.dashboardState = dashboard_state;
3301  dd.imageHash = image_hash;
3302  dd.dashboardMetadata = dashboard_metadata;
3304  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
3305  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
3306  " does not exist");
3307  }
3308  dd.userId = user.userId;
3309  dd.user = dashboard_owner;
3310  dd.dashboardId = dashboard_id;
3311 
3312  try {
3313  cat.replaceDashboard(dd);
3314  } catch (const std::exception& e) {
3315  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3316  }
3317 }
3318 
3319 void MapDHandler::delete_dashboard(const TSessionId& session,
3320  const int32_t dashboard_id) {
3321  auto stdlog = STDLOG(get_session_ptr(session));
3322  auto session_ptr = stdlog.getConstSessionInfo();
3323  check_read_only("delete_dashboard");
3324  auto& cat = session_ptr->getCatalog();
3325  auto dash = cat.getMetadataForDashboard(dashboard_id);
3326  if (!dash) {
3327  THROW_MAPD_EXCEPTION("Dashboard with id" + std::to_string(dashboard_id) +
3328  " doesn't exist, so cannot delete it");
3329  }
3331  *session_ptr, dash->dashboardId, AccessPrivileges::DELETE_DASHBOARD)) {
3332  THROW_MAPD_EXCEPTION("Not enough privileges to delete a dashboard.");
3333  }
3334  try {
3335  cat.deleteMetadataForDashboard(dashboard_id);
3336  } catch (const std::exception& e) {
3337  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3338  }
3339 }
3340 
3341 std::vector<std::string> MapDHandler::get_valid_groups(const TSessionId& session,
3342  int32_t dashboard_id,
3343  std::vector<std::string> groups) {
3344  const auto session_info = get_session_copy(session);
3345  auto& cat = session_info.getCatalog();
3346  auto dash = cat.getMetadataForDashboard(dashboard_id);
3347  if (!dash) {
3348  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3349  " does not exist");
3350  } else if (session_info.get_currentUser().userId != dash->userId &&
3351  !session_info.get_currentUser().isSuper) {
3352  throw std::runtime_error(
3353  "User should be either owner of dashboard or super user to share/unshare it");
3354  }
3355  std::vector<std::string> valid_groups;
3357  for (auto& group : groups) {
3358  user_meta.isSuper = false; // initialize default flag
3359  if (!SysCatalog::instance().getGrantee(group)) {
3360  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
3361  } else if (!user_meta.isSuper) {
3362  valid_groups.push_back(group);
3363  }
3364  }
3365  return valid_groups;
3366 }
3367 
3368 // NOOP: Grants not available for objects as of now
3369 void MapDHandler::share_dashboard(const TSessionId& session,
3370  const int32_t dashboard_id,
3371  const std::vector<std::string>& groups,
3372  const std::vector<std::string>& objects,
3373  const TDashboardPermissions& permissions,
3374  const bool grant_role = false) {
3375  auto stdlog = STDLOG(get_session_ptr(session));
3376  auto session_ptr = stdlog.getConstSessionInfo();
3377  check_read_only("share_dashboard");
3378  std::vector<std::string> valid_groups;
3379  valid_groups = get_valid_groups(session, dashboard_id, groups);
3380  auto const& cat = session_ptr->getCatalog();
3381  // By default object type can only be dashboard
3383  DBObject object(dashboard_id, object_type);
3384  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3385  !permissions.view_) {
3386  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for grants");
3387  } else {
3388  AccessPrivileges privs;
3389 
3390  object.resetPrivileges();
3391  if (permissions.delete_) {
3393  }
3394 
3395  if (permissions.create_) {
3397  }
3398 
3399  if (permissions.edit_) {
3401  }
3402 
3403  if (permissions.view_) {
3405  }
3406 
3407  object.setPrivileges(privs);
3408  }
3409  SysCatalog::instance().grantDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3410  // grant system_role to grantees for underlying objects
3411  if (grant_role) {
3412  auto dash = cat.getMetadataForDashboard(dashboard_id);
3413  SysCatalog::instance().grantRoleBatch({dash->dashboardSystemRoleName}, valid_groups);
3414  }
3415 }
3416 
3417 void MapDHandler::unshare_dashboard(const TSessionId& session,
3418  const int32_t dashboard_id,
3419  const std::vector<std::string>& groups,
3420  const std::vector<std::string>& objects,
3421  const TDashboardPermissions& permissions) {
3422  auto stdlog = STDLOG(get_session_ptr(session));
3423  auto session_ptr = stdlog.getConstSessionInfo();
3424  check_read_only("unshare_dashboard");
3425  std::vector<std::string> valid_groups;
3426  valid_groups = get_valid_groups(session, dashboard_id, groups);
3427  auto const& cat = session_ptr->getCatalog();
3428  // By default object type can only be dashboard
3430  DBObject object(dashboard_id, object_type);
3431  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3432  !permissions.view_) {
3433  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for revokes");
3434  } else {
3435  AccessPrivileges privs;
3436 
3437  object.resetPrivileges();
3438  if (permissions.delete_) {
3440  }
3441 
3442  if (permissions.create_) {
3444  }
3445 
3446  if (permissions.edit_) {
3448  }
3449 
3450  if (permissions.view_) {
3452  }
3453 
3454  object.setPrivileges(privs);
3455  }
3456  SysCatalog::instance().revokeDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3457  // revoke system_role from grantees for underlying objects
3458  const auto dash = cat.getMetadataForDashboard(dashboard_id);
3459  SysCatalog::instance().revokeDashboardSystemRole(dash->dashboardSystemRoleName,
3460  valid_groups);
3461 }
3462 
3464  std::vector<TDashboardGrantees>& dashboard_grantees,
3465  const TSessionId& session,
3466  int32_t dashboard_id) {
3467  auto stdlog = STDLOG(get_session_ptr(session));
3468  auto session_ptr = stdlog.getConstSessionInfo();
3469  auto const& cat = session_ptr->getCatalog();
3471  auto dash = cat.getMetadataForDashboard(dashboard_id);
3472  if (!dash) {
3473  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3474  " does not exist");
3475  } else if (session_ptr->get_currentUser().userId != dash->userId &&
3476  !session_ptr->get_currentUser().isSuper) {
3478  "User should be either owner of dashboard or super user to access grantees");
3479  }
3480  std::vector<ObjectRoleDescriptor*> objectsList;
3481  objectsList = SysCatalog::instance().getMetadataForObject(
3482  cat.getCurrentDB().dbId,
3483  static_cast<int>(DBObjectType::DashboardDBObjectType),
3484  dashboard_id); // By default objecttypecan be only dashabaords
3485  user_meta.userId = -1;
3486  user_meta.userName = "";
3487  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3488  for (auto object : objectsList) {
3489  if (user_meta.userName == object->roleName) {
3490  // Mask owner
3491  continue;
3492  }
3493  TDashboardGrantees grantee;
3494  TDashboardPermissions perm;
3495  grantee.name = object->roleName;
3496  grantee.is_user = object->roleType;
3497  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3498  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3499  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3500  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3501  grantee.permissions = perm;
3502  dashboard_grantees.push_back(grantee);
3503  }
3504 }
3505 
3506 void MapDHandler::create_link(std::string& _return,
3507  const TSessionId& session,
3508  const std::string& view_state,
3509  const std::string& view_metadata) {
3510  auto stdlog = STDLOG(get_session_ptr(session));
3511  auto session_ptr = stdlog.getConstSessionInfo();
3512  // check_read_only("create_link");
3513  auto& cat = session_ptr->getCatalog();
3514 
3515  LinkDescriptor ld;
3516  ld.userId = session_ptr->get_currentUser().userId;
3517  ld.viewState = view_state;
3518  ld.viewMetadata = view_metadata;
3519 
3520  try {
3521  _return = cat.createLink(ld, 6);
3522  } catch (const std::exception& e) {
3523  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3524  }
3525 }
3526 
3528  const std::string& name,
3529  const bool is_array) {
3530  TColumnType ct;
3531  ct.col_name = name;
3532  ct.col_type.type = type;
3533  ct.col_type.is_array = is_array;
3534  return ct;
3535 }
3536 
3537 void MapDHandler::check_geospatial_files(const boost::filesystem::path file_path,
3538  const Importer_NS::CopyParams& copy_params) {
3539  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
3540  if (std::find(shp_ext.begin(),
3541  shp_ext.end(),
3542  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
3543  shp_ext.end()) {
3544  for (auto ext : shp_ext) {
3545  auto aux_file = file_path;
3547  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
3548  copy_params) &&
3549  !Importer_NS::Importer::gdalFileExists(aux_file.replace_extension(ext).string(),
3550  copy_params)) {
3551  throw std::runtime_error("required file for shapefile does not exist: " +
3552  aux_file.filename().string());
3553  }
3554  }
3555  }
3556 }
3557 
3558 void MapDHandler::create_table(const TSessionId& session,
3559  const std::string& table_name,
3560  const TRowDescriptor& rd,
3561  const TFileType::type file_type,
3562  const TCreateParams& create_params) {
3563  auto stdlog = STDLOG("table_name", table_name);
3564  check_read_only("create_table");
3565 
3566  if (ImportHelpers::is_reserved_name(table_name)) {
3567  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
3568  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
3569  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
3570  }
3571 
3572  auto rds = rd;
3573 
3574  // no longer need to manually add the poly column for a TFileType::POLYGON table
3575  // a column of the correct geo type has already been added
3576  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
3577 
3578  std::string stmt{"CREATE TABLE " + table_name};
3579  std::vector<std::string> col_stmts;
3580 
3581  for (auto col : rds) {
3582  if (ImportHelpers::is_reserved_name(col.col_name)) {
3583  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
3584  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
3585  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
3586  }
3587  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
3588  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
3589  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
3590  " for column: " + col.col_name);
3591  }
3592 
3593  if (col.col_type.type == TDatumType::DECIMAL) {
3594  // if no precision or scale passed in set to default 14,7
3595  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
3596  col.col_type.precision = 14;
3597  col.col_type.scale = 7;
3598  }
3599  }
3600 
3601  std::string col_stmt;
3602  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
3603 
3604  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
3605  // `nullable` argument, leading this to default to false. Uncomment for v2.
3606  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
3607 
3608  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
3609  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
3610  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
3611  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
3612  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
3613  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
3614  }
3615  } else if (col.col_type.type == TDatumType::STR) {
3616  // non DICT encoded strings
3617  col_stmt.append(" ENCODING NONE");
3618  } else if (col.col_type.type == TDatumType::POINT ||
3619  col.col_type.type == TDatumType::LINESTRING ||
3620  col.col_type.type == TDatumType::POLYGON ||
3621  col.col_type.type == TDatumType::MULTIPOLYGON) {
3622  // non encoded compressable geo
3623  if (col.col_type.scale == 4326) {
3624  col_stmt.append(" ENCODING NONE");
3625  }
3626  }
3627  col_stmts.push_back(col_stmt);
3628  }
3629 
3630  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
3631 
3632  if (create_params.is_replicated) {
3633  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
3634  }
3635 
3636  stmt.append(";");
3637 
3638  TQueryResult ret;
3639  sql_execute(ret, session, stmt, true, "", -1, -1);
3640 }
3641 
3642 void MapDHandler::import_table(const TSessionId& session,
3643  const std::string& table_name,
3644  const std::string& file_name_in,
3645  const TCopyParams& cp) {
3646  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3647  auto session_ptr = stdlog.getConstSessionInfo();
3648  check_read_only("import_table");
3649  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
3650  auto& cat = session_ptr->getCatalog();
3651 
3652  const TableDescriptor* td = cat.getMetadataForTable(table_name);
3653  if (td == nullptr) {
3654  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
3655  }
3656  check_table_load_privileges(*session_ptr, table_name);
3657 
3658  std::string file_name{file_name_in};
3659  auto file_path = boost::filesystem::path(file_name);
3661  if (!boost::istarts_with(file_name, "s3://")) {
3662  if (!boost::filesystem::path(file_name).is_absolute()) {
3663  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3664  boost::filesystem::path(file_name).filename();
3665  file_name = file_path.string();
3666  }
3667  if (!boost::filesystem::exists(file_path)) {
3668  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3669  }
3670  }
3671 
3672  // TODO(andrew): add delimiter detection to Importer
3673  if (copy_params.delimiter == '\0') {
3674  copy_params.delimiter = ',';
3675  if (boost::filesystem::extension(file_path) == ".tsv") {
3676  copy_params.delimiter = '\t';
3677  }
3678  }
3679 
3680  try {
3681  std::unique_ptr<Importer_NS::Importer> importer;
3682  if (leaf_aggregator_.leafCount() > 0) {
3683  importer.reset(new Importer_NS::Importer(
3684  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
3685  file_path.string(),
3686  copy_params));
3687  } else {
3688  importer.reset(new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
3689  }
3690  auto ms = measure<>::execution([&]() { importer->import(); });
3691  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
3692  } catch (const std::exception& e) {
3693  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
3694  }
3695 }
3696 
3697 namespace {
3698 
3699 // helper functions for error checking below
3700 // these would usefully be added as methods of TDatumType
3701 // but that's not possible as it's auto-generated by Thrift
3702 
3704  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
3705  t == TDatumType::LINESTRING || t == TDatumType::POINT);
3706 }
3707 
3708 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3709 
3710 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
3711  std::stringstream ss;
3712  ss << t;
3713  return ss.str();
3714 }
3715 
3716 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
3717  std::string result;
3718  switch (p) {
3719  case SQLTypes::kGEOGRAPHY:
3720  result = "GEOGRAPHY";
3721  break;
3722  case SQLTypes::kGEOMETRY:
3723  result = "GEOMETRY";
3724  break;
3725  default:
3726  result = "INVALID";
3727  break;
3728  }
3729  return result;
3730 }
3731 
3732 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
3733  std::stringstream ss;
3734  ss << t;
3735  return ss.str();
3736 }
3737 
3738 #endif
3739 
3740 } // namespace
3741 
3742 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
3743  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
3744  "' to table '" + table_name + "'. Column '" + cd->columnName + \
3745  "' " + attr + " mismatch (got '" + got + "', expected '" + \
3746  expected + "')");
3747 
3748 void MapDHandler::import_geo_table(const TSessionId& session,
3749  const std::string& table_name,
3750  const std::string& file_name_in,
3751  const TCopyParams& cp,
3752  const TRowDescriptor& row_desc,
3753  const TCreateParams& create_params) {
3754  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3755  auto session_ptr = stdlog.getConstSessionInfo();
3756  check_read_only("import_table");
3757  auto& cat = session_ptr->getCatalog();
3758 
3760 
3761  std::string file_name{file_name_in};
3762 
3763  if (path_is_relative(file_name)) {
3764  // assume relative paths are relative to data_path / mapd_import / <session>
3765  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3766  boost::filesystem::path(file_name).filename();
3767  file_name = file_path.string();
3768  }
3769 
3770  if (is_a_supported_geo_file(file_name, true)) {
3771  // prepare to load geo file directly
3772  add_vsi_network_prefix(file_name);
3773  add_vsi_geo_prefix(file_name);
3774  } else if (is_a_supported_archive_file(file_name)) {
3775  // find the archive file
3776  add_vsi_network_prefix(file_name);
3777  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
3778  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3779  }
3780  // find geo file in archive
3781  add_vsi_archive_prefix(file_name);
3782  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3783  // prepare to load that geo file
3784  if (geo_file.size()) {
3785  file_name = file_name + std::string("/") + geo_file;
3786  }
3787  } else {
3788  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
3789  file_name_in);
3790  }
3791 
3792  // log what we're about to try to do
3793  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
3794  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
3795 
3796  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
3797  auto file_path = boost::filesystem::path(file_name);
3798  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3799  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
3800  }
3801 
3802  // use GDAL to check any dependent files exist (ditto)
3803  try {
3804  check_geospatial_files(file_path, copy_params);
3805  } catch (const std::exception& e) {
3806  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3807  }
3808 
3809  // get layer info and deconstruct
3810  // in general, we will get a combination of layers of these four types:
3811  // EMPTY: no rows, report and skip
3812  // GEO: create a geo table from this
3813  // NON_GEO: create a regular table from this
3814  // UNSUPPORTED_GEO: report and skip
3815  std::vector<Importer_NS::Importer::GeoFileLayerInfo> layer_info;
3816  try {
3817  layer_info = Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
3818  } catch (const std::exception& e) {
3819  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3820  }
3821 
3822  // categorize the results
3823  using LayerNameToContentsMap =
3824  std::map<std::string, Importer_NS::Importer::GeoFileLayerContents>;
3825  LayerNameToContentsMap load_layers;
3826  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
3827  for (const auto& layer : layer_info) {
3828  switch (layer.contents) {
3830  LOG(INFO) << "import_geo_table: '" << layer.name
3831  << "' (will import as geo table)";
3832  load_layers[layer.name] = layer.contents;
3833  break;
3835  LOG(INFO) << "import_geo_table: '" << layer.name
3836  << "' (will import as regular table)";
3837  load_layers[layer.name] = layer.contents;
3838  break;
3840  LOG(WARNING) << "import_geo_table: '" << layer.name
3841  << "' (will not import, unsupported geo type)";
3842  break;
3844  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
3845  break;
3846  default:
3847  break;
3848  }
3849  }
3850 
3851  // if nothing is loadable, stop now
3852  if (load_layers.size() == 0) {
3853  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
3854  }
3855 
3856  // if we've been given an explicit layer name, check that it exists and is loadable
3857  // scan the original list, as it may exist but not have been gathered as loadable
3858  if (copy_params.geo_layer_name.size()) {
3859  bool found = false;
3860  for (const auto& layer : layer_info) {
3861  if (copy_params.geo_layer_name == layer.name) {
3862  if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::GEO ||
3864  // forget all the other layers and just load this one
3865  load_layers.clear();
3866  load_layers[layer.name] = layer.contents;
3867  found = true;
3868  break;
3869  } else if (layer.contents ==
3871  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3872  copy_params.geo_layer_name +
3873  "' has unsupported geo type!");
3874  } else if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::EMPTY) {
3875  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3876  copy_params.geo_layer_name + "' is empty!");
3877  }
3878  }
3879  }
3880  if (!found) {
3881  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3882  copy_params.geo_layer_name + "' not found!");
3883  }
3884  }
3885 
3886  // Immerse import of multiple layers is not yet supported
3887  // @TODO fix this!
3888  if (row_desc.size() > 0 && load_layers.size() > 1) {
3890  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
3891  }
3892 
3893  // one definition of layer table name construction
3894  // we append the layer name if we're loading more than one table
3895  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
3896  const std::string& layer_name) {
3897  if (load_layers.size() > 1) {
3898  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
3899  if (sanitized_layer_name != layer_name) {
3900  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
3901  << sanitized_layer_name << "' for table name";
3902  }
3903  return table_name + "_" + sanitized_layer_name;
3904  }
3905  return table_name;
3906  };
3907 
3908  // if we're importing multiple tables, then NONE of them must exist already
3909  if (load_layers.size() > 1) {
3910  for (const auto& layer : load_layers) {
3911  // construct table name
3912  auto this_table_name = construct_layer_table_name(table_name, layer.first);
3913 
3914  // table must not exist
3915  if (cat.getMetadataForTable(this_table_name)) {
3916  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
3917  "' already exists, aborting!");
3918  }
3919  }
3920  }
3921 
3922  // prepare to gather errors that would otherwise be exceptions, as we can only throw one
3923  std::vector<std::string> caught_exception_messages;
3924 
3925  // prepare to time multi-layer import
3926  double total_import_ms = 0.0;
3927 
3928  // now we're safe to start importing
3929  // we loop over the layers we're going to attempt to load
3930  for (const auto& layer : load_layers) {
3931  // unpack
3932  const auto& layer_name = layer.first;
3933  const auto& layer_contents = layer.second;
3934  bool is_geo_layer =
3936 
3937  // construct table name again
3938  auto this_table_name = construct_layer_table_name(table_name, layer_name);
3939 
3940  // report
3941  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
3942 
3943  // we need a row descriptor
3944  TRowDescriptor rd;
3945  if (row_desc.size() > 0) {
3946  // we have a valid RowDescriptor
3947  // this is the case where Immerse has already detected and created
3948  // all we need to do is import and trust that the data will match
3949  // use the provided row descriptor
3950  // table must already exist (we check this below)
3951  rd = row_desc;
3952  } else {
3953  // we don't have a RowDescriptor
3954  // we have to detect the file ourselves
3955  TDetectResult cds;
3956  TCopyParams cp_copy = cp; // retain S3 auth tokens
3957  cp_copy.geo_layer_name = layer_name;
3958  cp_copy.file_type = TFileType::POLYGON;
3959  try {
3960  detect_column_types(cds, session, file_name_in, cp_copy);
3961  } catch (const std::exception& e) {
3962  // capture the error and abort this layer
3963  caught_exception_messages.emplace_back(
3964  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
3965  continue;
3966  }
3967  rd = cds.row_set.row_desc;
3968 
3969  // then, if the table does NOT already exist, create it
3970  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
3971  if (!td) {
3972  try {
3973  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
3974  } catch (const std::exception& e) {
3975  // capture the error and abort this layer
3976  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
3977  layer_name + "':" + e.what());
3978  continue;
3979  }
3980  }
3981  }
3982 
3983  // by this point, the table should exist, one way or another
3984  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
3985  if (!td) {
3986  // capture the error and abort this layer
3987  std::string exception_message =
3988  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
3989  this_table_name + "'; table does not exist or failed to create.";
3990  caught_exception_messages.emplace_back(exception_message);
3991  continue;
3992  }
3993 
3994  // then, we have to verify that the structure matches
3995  // get column descriptors (non-system, non-deleted, logical columns only)
3996  const auto col_descriptors =
3997  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
3998 
3999  // first, compare the column count
4000  if (col_descriptors.size() != rd.size()) {
4001  // capture the error and abort this layer
4002  std::string exception_message =
4003  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
4004  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
4005  ", expecting " + std::to_string(col_descriptors.size()) + ")";
4006  caught_exception_messages.emplace_back(exception_message);
4007  continue;
4008  }
4009 
4010  try {
4011  // then the names and types
4012  int rd_index = 0;
4013  for (auto cd : col_descriptors) {
4014  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
4015  std::string gname = rd[rd_index].col_name; // got
4016  std::string ename = cd->columnName; // expecting
4017 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4018  TTypeInfo gti = rd[rd_index].col_type; // got
4019 #endif
4020  TTypeInfo eti = cd_col_type.col_type; // expecting
4021  // check for name match
4022  if (gname != ename) {
4023  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
4024  gname == OMNISCI_GEO_PREFIX) {
4025  // rename incoming geo column to match existing legacy default geo column
4026  rd[rd_index].col_name = gname;
4027  LOG(INFO)
4028  << "import_geo_table: Renaming incoming geo column to match existing "
4029  "legacy default geo column";
4030  } else {
4031  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
4032  }
4033  }
4034 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4035  // check for type attributes match
4036  // these attrs must always match regardless of type
4037  if (gti.type != eti.type) {
4039  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
4040  }
4041  if (gti.is_array != eti.is_array) {
4043  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
4044  }
4045  if (gti.nullable != eti.nullable) {
4047  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
4048  }
4049  if (TTypeInfo_IsGeo(eti.type)) {
4050  // for geo, only these other attrs must also match
4051  // encoding and comp_param are allowed to differ
4052  // this allows appending to existing geo table
4053  // without needing to know the existing encoding
4054  if (gti.precision != eti.precision) {
4056  "geo sub-type",
4057  TTypeInfo_GeoSubTypeToString(gti.precision),
4058  TTypeInfo_GeoSubTypeToString(eti.precision));
4059  }
4060  if (gti.scale != eti.scale) {
4062  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
4063  }
4064  if (gti.encoding != eti.encoding) {
4065  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
4066  }
4067  if (gti.comp_param != eti.comp_param) {
4068  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
4069  }
4070  } else {
4071  // non-geo, all other attrs must also match
4072  // @TODO consider relaxing some of these dependent on type
4073  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
4074  if (gti.precision != eti.precision) {
4076  std::to_string(gti.precision),
4077  std::to_string(eti.precision));
4078  }
4079  if (gti.scale != eti.scale) {
4081  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
4082  }
4083  if (gti.encoding != eti.encoding) {
4085  "encoding",
4086  TTypeInfo_EncodingToString(gti.encoding),
4087  TTypeInfo_EncodingToString(eti.encoding));
4088  }
4089  if (gti.comp_param != eti.comp_param) {
4091  std::to_string(gti.comp_param),
4092  std::to_string(eti.comp_param));
4093  }
4094  }
4095 #endif
4096  rd_index++;
4097  }
4098  } catch (const std::exception& e) {
4099  // capture the error and abort this layer
4100  caught_exception_messages.emplace_back(e.what());
4101  continue;
4102  }
4103 
4104  std::map<std::string, std::string> colname_to_src;
4105  for (auto r : rd) {
4106  colname_to_src[r.col_name] =
4107  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
4108  }
4109 
4110  try {
4111  check_table_load_privileges(*session_ptr, this_table_name);
4112  } catch (const std::exception& e) {
4113  // capture the error and abort this layer
4114  caught_exception_messages.emplace_back(e.what());
4115  continue;
4116  }
4117 
4118  if (is_geo_layer) {
4119  // Final check to ensure that we actually have a geo column
4120  // of the expected name and type before doing the actual import,
4121  // in case the user naively overrode the name or type in Immerse
4122  // Preview (which as of 6/8/18 it still allows you to do).
4123  // This avoids a fatal assert later when it fails to find the
4124  // column. We should make Immerse more robust and disallow this.
4125  bool have_geo_column_with_correct_name = false;
4126  for (const auto& r : rd) {
4127  if (TTypeInfo_IsGeo(r.col_type.type)) {
4128  // TODO(team): allow user to override the geo column name
4129  if (r.col_name == OMNISCI_GEO_PREFIX) {
4130  have_geo_column_with_correct_name = true;
4131  } else if (r.col_name == LEGACY_GEO_PREFIX) {
4132  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
4133  // Normalize column names for geo append with legacy column naming scheme
4134  colname_to_src[r.col_name] = r.col_name;
4135  have_geo_column_with_correct_name = true;
4136  }
4137  }
4138  }
4139  if (!have_geo_column_with_correct_name) {
4140  std::string exception_message = "Table " + this_table_name +
4141  " does not have a geo column with name '" +
4142  OMNISCI_GEO_PREFIX + "'. Import aborted!";
4143  caught_exception_messages.emplace_back(exception_message);
4144  continue;
4145  }
4146  }
4147 
4148  try {
4149  // import this layer only?
4150  copy_params.geo_layer_name = layer_name;
4151 
4152  // create an importer
4153  std::unique_ptr<Importer_NS::Importer> importer;
4154  if (leaf_aggregator_.leafCount() > 0) {
4155  importer.reset(new Importer_NS::Importer(
4156  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4157  file_path.string(),
4158  copy_params));
4159  } else {
4160  importer.reset(
4161  new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
4162  }
4163 
4164  // import
4165  auto ms = measure<>::execution([&]() { importer->importGDAL(colname_to_src); });
4166  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
4167  << "s";
4168  total_import_ms += ms;
4169  } catch (const std::exception& e) {
4170  std::string exception_message =
4171  "Import of Layer '" + this_table_name + "' failed: " + e.what();
4172  caught_exception_messages.emplace_back(exception_message);
4173  continue;
4174  }
4175  }
4176 
4177  // did we catch any exceptions?
4178  if (caught_exception_messages.size()) {
4179  // combine all the strings into one and throw a single Thrift exception
4180  std::string combined_exception_message = "Failed to import geo file:\n";
4181  for (const auto& message : caught_exception_messages) {
4182  combined_exception_message += message + "\n";
4183  }
4184  THROW_MAPD_EXCEPTION(combined_exception_message);
4185  } else {
4186  // report success and total time
4187  LOG(INFO) << "Import Successful!";
4188  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
4189  }
4190 }
4191 
4192 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
4193 
4194 void MapDHandler::import_table_status(TImportStatus& _return,
4195  const TSessionId& session,
4196  const std::string& import_id) {
4197  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
4198  auto is = Importer_NS::Importer::get_import_status(import_id);
4199  _return.elapsed = is.elapsed.count();
4200  _return.rows_completed = is.rows_completed;
4201  _return.rows_estimated = is.rows_estimated;
4202  _return.rows_rejected = is.rows_rejected;
4203 }
4204 
4206  const TSessionId& session,
4207  const std::string& archive_path_in,
4208  const TCopyParams& copy_params) {
4209  auto stdlog =
4210  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
4211  std::string archive_path(archive_path_in);
4212 
4213  if (path_is_relative(archive_path)) {
4214  // assume relative paths are relative to data_path / mapd_import / <session>
4215  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4216  boost::filesystem::path(archive_path).filename();
4217  archive_path = file_path.string();
4218  }
4219 
4220  if (is_a_supported_archive_file(archive_path)) {
4221  // find the archive file
4222  add_vsi_network_prefix(archive_path);
4223  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4224  thrift_to_copyparams(copy_params))) {
4225  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4226  }
4227  // find geo file in archive
4228  add_vsi_archive_prefix(archive_path);
4229  std::string geo_file =
4230  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
4231  // what did we get?
4232  if (geo_file.size()) {
4233  // prepend it with the original path
4234  _return = archive_path_in + std::string("/") + geo_file;
4235  } else {
4236  // just return the original path
4237  _return = archive_path_in;
4238  }
4239  } else {
4240  // just return the original path
4241  _return = archive_path_in;
4242  }
4243 }
4244 
4245 void MapDHandler::get_all_files_in_archive(std::vector<std::string>& _return,
4246  const TSessionId& session,
4247  const std::string& archive_path_in,
4248  const TCopyParams& copy_params) {
4249  auto stdlog =
4250  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
4251 
4252  std::string archive_path(archive_path_in);
4253  if (path_is_relative(archive_path)) {
4254  // assume relative paths are relative to data_path / mapd_import / <session>
4255  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4256  boost::filesystem::path(archive_path).filename();
4257  archive_path = file_path.string();
4258  }
4259 
4260  if (is_a_supported_archive_file(archive_path)) {
4261  // find the archive file
4262  add_vsi_network_prefix(archive_path);
4263  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4264  thrift_to_copyparams(copy_params))) {
4265  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4266  }
4267  // find all files in archive
4268  add_vsi_archive_prefix(archive_path);
4270  archive_path, thrift_to_copyparams(copy_params));
4271  // prepend them all with original path
4272  for (auto& s : _return) {
4273  s = archive_path_in + '/' + s;
4274  }
4275  }
4276 }
4277 
4278 void MapDHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
4279  const TSessionId& session,
4280  const std::string& file_name_in,
4281  const TCopyParams& cp) {
4282  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
4283  std::string file_name(file_name_in);
4284 
4286 
4287  // handle relative paths
4288  if (path_is_relative(file_name)) {
4289  // assume relative paths are relative to data_path / mapd_import / <session>
4290  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4291  boost::filesystem::path(file_name).filename();
4292  file_name = file_path.string();
4293  }
4294 
4295  // validate file_name
4296  if (is_a_supported_geo_file(file_name, true)) {
4297  // prepare to load geo file directly
4298  add_vsi_network_prefix(file_name);
4299  add_vsi_geo_prefix(file_name);
4300  } else if (is_a_supported_archive_file(file_name)) {
4301  // find the archive file
4302  add_vsi_network_prefix(file_name);
4303  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
4304  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4305  }
4306  // find geo file in archive
4307  add_vsi_archive_prefix(file_name);
4308  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4309  // prepare to load that geo file
4310  if (geo_file.size()) {
4311  file_name = file_name + std::string("/") + geo_file;
4312  }
4313  } else {
4314  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4315  file_name_in);
4316  }
4317 
4318  // check the file actually exists
4319  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4320  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
4321  }
4322 
4323  // find all layers
4324  auto internal_layer_info =
4325  Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4326 
4327  // convert to Thrift type
4328  for (const auto& internal_layer : internal_layer_info) {
4329  TGeoFileLayerInfo layer;
4330  layer.name = internal_layer.name;
4331  switch (internal_layer.contents) {
4333  layer.contents = TGeoFileLayerContents::EMPTY;
4334  break;
4336  layer.contents = TGeoFileLayerContents::GEO;
4337  break;
4339  layer.contents = TGeoFileLayerContents::NON_GEO;
4340  break;
4342  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
4343  break;
4344  default:
4345  CHECK(false);
4346  }
4347  _return.emplace_back(layer); // no suitable constructor to just pass parameters
4348  }
4349 }
4350 
4351 void MapDHandler::start_heap_profile(const TSessionId& session) {
4352  auto stdlog = STDLOG(get_session_ptr(session));
4353 #ifdef HAVE_PROFILER
4354  if (IsHeapProfilerRunning()) {
4355  THROW_MAPD_EXCEPTION("Profiler already started");
4356  }
4357  HeapProfilerStart("omnisci");
4358 #else
4359  THROW_MAPD_EXCEPTION("Profiler not enabled");
4360 #endif // HAVE_PROFILER
4361 }
4362 
4363 void MapDHandler::stop_heap_profile(const TSessionId& session) {
4364  auto stdlog = STDLOG(get_session_ptr(session));
4365 #ifdef HAVE_PROFILER
4366  if (!IsHeapProfilerRunning()) {
4367  THROW_MAPD_EXCEPTION("Profiler not running");
4368  }
4369  HeapProfilerStop();
4370 #else
4371  THROW_MAPD_EXCEPTION("Profiler not enabled");
4372 #endif // HAVE_PROFILER
4373 }
4374 
4375 void MapDHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
4376  auto stdlog = STDLOG(get_session_ptr(session));
4377 #ifdef HAVE_PROFILER
4378  if (!IsHeapProfilerRunning()) {
4379  THROW_MAPD_EXCEPTION("Profiler not running");
4380  }
4381  auto profile_buff = GetHeapProfile();
4382  profile = profile_buff;
4383  free(profile_buff);
4384 #else
4385  THROW_MAPD_EXCEPTION("Profiler not enabled");
4386 #endif // HAVE_PROFILER
4387 }
4388 
4389 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
4390 void MapDHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
4391  if (session_it->second.use_count() > 2 ||
4392  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
4393  // SessionInfo is being used in more than one active operation. Original copy + one
4394  // stored in StdLog. Skip the checks.
4395  return;
4396  }
4397  time_t last_used_time = session_it->second->get_last_used_time();
4398  time_t start_time = session_it->second->get_start_time();
4399  if ((time(0) - last_used_time) > idle_session_duration_) {
4400  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
4401  } else if ((time(0) - start_time) > max_session_duration_) {
4402  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
4403  }
4404 }
4405 
4406 std::shared_ptr<const Catalog_Namespace::SessionInfo> MapDHandler::get_const_session_ptr(
4407  const TSessionId& session) {
4408  return get_session_ptr(session);
4409 }
4410 
4412  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4413  return *get_session_it_unsafe(session, read_lock)->second;
4414 }
4415 
4416 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_copy_ptr(
4417  const TSessionId& session) {
4418  // Note(Wamsi): We have `get_const_session_ptr` which would return as const SessionInfo
4419  // stored in the map. You can use `get_const_session_ptr` instead of the copy of
4420  // SessionInfo but beware that it can be changed in teh map. So if you do not care about
4421  // the changes then use `get_const_session_ptr` if you do then use this function to get
4422  // a copy. We should eventually aim to merge both `get_const_session_ptr` and
4423  // `get_session_copy_ptr`.
4424  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4425  auto& session_info_ref = *get_session_it_unsafe(session, read_lock)->second;
4426  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
4427 }
4428 
4429 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_ptr(
4430  const TSessionId& session_id) {
4431  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
4432  // Should be used only when you need to make updates to original SessionInfo object.
4433  // Currently used by `update_session_last_used_duration`
4434 
4435  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will be
4436  // invalid during disconnect. SessionInfo will be erased from map by the time it reaches
4437  // here. In both the above cases, we would return `nullptr` and can skip SessionInfo
4438  // updates.
4439  if (session_id.empty()) {
4440  return {};
4441  }
4442  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4443  return get_session_it_unsafe(session_id, read_lock)->second;
4444 }
4445 
4447  const Catalog_Namespace::SessionInfo& session_info,
4448  const std::string& table_name) {
4449  auto user_metadata = session_info.get_currentUser();
4450  auto& cat = session_info.getCatalog();
4451  DBObject dbObject(table_name, TableDBObjectType);
4452  dbObject.loadKey(cat);
4454  std::vector<DBObject> privObjects;
4455  privObjects.push_back(dbObject);
4456  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
4457  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
4458  user_metadata.userName + " has no insert privileges for table " +
4459  table_name + ".");
4460  }
4461 }
4462 
4463 void MapDHandler::check_table_load_privileges(const TSessionId& session,
4464  const std::string& table_name) {
4465  const auto session_info = get_session_copy(session);
4466  check_table_load_privileges(session_info, table_name);
4467 }
4468 
4470  const TExecuteMode::type mode) {
4471  const std::string& user_name = session_ptr->get_currentUser().userName;
4472  switch (mode) {
4473  case TExecuteMode::GPU:
4474  if (cpu_mode_only_) {
4475  TMapDException e;
4476  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
4477  throw e;
4478  }
4480  LOG(INFO) << "User " << user_name << " sets GPU mode.";
4481  break;
4482  case TExecuteMode::CPU:
4484  LOG(INFO) << "User " << user_name << " sets CPU mode.";
4485  break;
4486  }
4487 }
4488 
4489 std::vector<PushedDownFilterInfo> MapDHandler::execute_rel_alg(
4490  TQueryResult& _return,
4491  QueryStateProxy query_state_proxy,
4492  const std::string& query_ra,
4493  const bool column_format,
4494  const ExecutorDeviceType executor_device_type,
4495  const int32_t first_n,
4496  const int32_t at_most_n,
4497  const bool just_explain,
4498  const bool just_validate,
4499  const bool find_push_down_candidates,
4500  const bool just_calcite_explain,
4501  const bool explain_optimized_ir) const {
4502  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4503  const auto& cat = query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog();
4504  CompilationOptions co = {executor_device_type,
4505  true,
4508  explain_optimized_ir ? ExecutorExplainType::Optimized
4513  just_explain,
4514  allow_loop_joins_ || just_validate,
4516  jit_debug_,
4517  just_validate,
4520  find_push_down_candidates,
4521  just_calcite_explain,
4523  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4524  jit_debug_ ? "/tmp" : "",
4525  jit_debug_ ? "mapdquery" : "",
4527  RelAlgExecutor ra_executor(executor.get(),
4528  cat,
4529  query_ra,
4530  query_state_proxy.getQueryState().shared_from_this());
4531  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4534  nullptr,
4535  nullptr),
4536  {}};
4537  _return.execution_time_ms += measure<>::execution(
4538  [&]() { result = ra_executor.executeRelAlgQuery(co, eo, nullptr); });
4539  // reduce execution time by the time spent during queue waiting
4540  _return.execution_time_ms -= result.getRows()->getQueueTime();
4541  const auto& filter_push_down_info = result.getPushedDownFilterInfo();
4542  if (!filter_push_down_info.empty()) {
4543  return filter_push_down_info;
4544  }
4545  if (just_explain) {
4546  convert_explain(_return, *result.getRows(), column_format);
4547  } else if (!just_calcite_explain) {
4548  convert_rows(_return,
4549  timer.createQueryStateProxy(),
4550  result.getTargetsMeta(),
4551  *result.getRows(),
4552  column_format,
4553  first_n,
4554  at_most_n);
4555  }
4556  return {};
4557 }
4558 
4559 void MapDHandler::execute_rel_alg_df(TDataFrame& _return,
4560  const std::string& query_ra,
4561  QueryStateProxy query_state_proxy,
4562  const Catalog_Namespace::SessionInfo& session_info,
4563  const ExecutorDeviceType device_type,
4564  const size_t device_id,
4565  const int32_t first_n) const {
4566  const auto& cat = session_info.getCatalog();
4567  CHECK(device_type == ExecutorDeviceType::CPU ||
4569  CompilationOptions co = {session_info.get_executor_device_type(),
4570  true,
4575  ExecutionOptions eo = {false,
4577  false,
4580  jit_debug_,
4581  false,
4584  false,
4585  false,
4587  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4588  jit_debug_ ? "/tmp" : "",
4589  jit_debug_ ? "mapdquery" : "",
4591  RelAlgExecutor ra_executor(executor.get(),
4592  cat,
4593  query_ra,
4594  query_state_proxy.getQueryState().shared_from_this());
4595  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4598  nullptr,
4599  nullptr),
4600  {}};
4601  _return.execution_time_ms += measure<>::execution(
4602  [&]() { result = ra_executor.executeRelAlgQuery(co, eo, nullptr); });
4603  _return.execution_time_ms -= result.getRows()->getQueueTime();
4604  const auto rs = result.getRows();
4605  const auto converter =
4606  std::make_unique<ArrowResultSetConverter>(rs,
4607  data_mgr_,
4608  device_type,
4609  device_id,
4610  getTargetNames(result.getTargetsMeta()),
4611  first_n);
4612  ArrowResult arrow_result;
4613 
4614  _return.arrow_conversion_time_ms +=
4615  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
4616  _return.sm_handle =
4617  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
4618  _return.sm_size = arrow_result.sm_size;
4619  _return.df_handle =
4620  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
4621  if (device_type == ExecutorDeviceType::GPU) {
4622  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
4623  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
4624  ipc_handle_to_dev_ptr_.insert(
4625  std::make_pair(_return.df_handle, arrow_result.df_dev_ptr));
4626  }
4627  _return.df_size = arrow_result.df_size;
4628 }
4629 
4630 void MapDHandler::execute_root_plan(TQueryResult& _return,
4631  QueryStateProxy query_state_proxy,
4632  const Planner::RootPlan* root_plan,
4633  const bool column_format,
4634  const Catalog_Namespace::SessionInfo& session_info,
4635  const ExecutorDeviceType executor_device_type,
4636  const int32_t first_n) const {
4637  auto executor = Executor::getExecutor(root_plan->getCatalog().getCurrentDB().dbId,
4638  jit_debug_ ? "/tmp" : "",
4639  jit_debug_ ? "mapdquery" : "",
4641  std::shared_ptr<ResultSet> results;
4642  _return.execution_time_ms += measure<>::execution([&]() {
4643  results = executor->execute(root_plan,
4644  session_info,
4645  true,
4646  executor_device_type,
4650  });
4651  // reduce execution time by the time spent during queue waiting
4652  _return.execution_time_ms -= results->getQueueTime();
4653  if (root_plan->get_plan_dest() == Planner::RootPlan::Dest::kEXPLAIN) {
4654  convert_explain(_return, *results, column_format);
4655  return;
4656  }
4657  const auto plan = root_plan->get_plan();
4658  CHECK(plan);
4659  const auto& targets = plan->get_targetlist();
4660  convert_rows(_return,
4661  query_state_proxy,
4662  getTargetMetaInfo(targets),
4663  *results,
4664  column_format,
4665  -1,
4666  -1);
4667 }
4668 
4669 std::vector<TargetMetaInfo> MapDHandler::getTargetMetaInfo(
4670  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4671  std::vector<TargetMetaInfo> result;
4672  for (const auto target : targets) {
4673  CHECK(target);
4674  CHECK(target->get_expr());
4675  result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
4676  }
4677  return result;
4678 }
4679 
4680 std::vector<std::string> MapDHandler::getTargetNames(
4681  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4682  std::vector<std::string> names;
4683  for (const auto target : targets) {
4684  CHECK(target);
4685  CHECK(target->get_expr());
4686  names.push_back(target->get_resname());
4687  }
4688  return names;
4689 }
4690 
4691 std::vector<std::string> MapDHandler::getTargetNames(
4692  const std::vector<TargetMetaInfo>& targets) const {
4693  std::vector<std::string> names;
4694  for (const auto target : targets) {
4695  names.push_back(target.get_resname());
4696  }
4697  return names;
4698 }
4699 
4701  const size_t idx) const {
4702  TColumnType proj_info;
4703  proj_info.col_name = target.get_resname();
4704  if (proj_info.col_name.empty()) {
4705  proj_info.col_name = "result_" + std::to_string(idx + 1);
4706  }
4707  const auto& target_ti = target.get_type_info();
4708  proj_info.col_type.type = type_to_thrift(target_ti);
4709  proj_info.col_type.encoding = encoding_to_thrift(target_ti);
4710  proj_info.col_type.nullable = !target_ti.get_notnull();
4711  proj_info.col_type.is_array = target_ti.get_type() == kARRAY;
4712  if (IS_GEO(target_ti.get_type())) {
4714  proj_info, target_ti.get_subtype(), target_ti.get_output_srid());
4715  } else {
4716  proj_info.col_type.precision = target_ti.get_precision();
4717  proj_info.col_type.scale = target_ti.get_scale();
4718  }
4719  if (target_ti.get_type() == kDATE) {
4720  proj_info.col_type.size = target_ti.get_size();
4721  }
4722  proj_info.col_type.comp_param =
4723  (target_ti.is_date_in_days() && target_ti.get_comp_param() == 0)
4724  ? 32
4725  : target_ti.get_comp_param();
4726  return proj_info;
4727 }
4728 
4730  const std::vector<TargetMetaInfo>& targets) const {
4731  TRowDescriptor row_desc;
4732  size_t i = 0;
4733  for (const auto target : targets) {
4734  row_desc.push_back(convert_target_metainfo(target, i));
4735  ++i;
4736  }
4737  return row_desc;
4738 }
4739 
4740 template <class R>
4741 void MapDHandler::convert_rows(TQueryResult& _return,
4742  QueryStateProxy query_state_proxy,
4743  const std::vector<TargetMetaInfo>& targets,
4744  const R& results,
4745  const bool column_format,
4746  const int32_t first_n,
4747  const int32_t at_most_n) const {
4748  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4749  _return.row_set.row_desc = convert_target_metainfo(targets);
4750  int32_t fetched{0};
4751  if (column_format) {
4752  _return.row_set.is_columnar = true;
4753  std::vector<TColumn> tcolumns(results.colCount());
4754  while (first_n == -1 || fetched < first_n) {
4755  const auto crt_row = results.getNextRow(true, true);
4756  if (crt_row.empty()) {
4757  break;
4758  }
4759  ++fetched;
4760  if (at_most_n >= 0 && fetched > at_most_n) {
4761  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4762  std::to_string(at_most_n));
4763  }
4764  for (size_t i = 0; i < results.colCount(); ++i) {
4765  const auto agg_result = crt_row[i];
4766  value_to_thrift_column(agg_result, targets[i].get_type_info(), tcolumns[i]);
4767  }
4768  }
4769  for (size_t i = 0; i < results.colCount(); ++i) {
4770  _return.row_set.columns.push_back(tcolumns[i]);
4771  }
4772  } else {
4773  _return.row_set.is_columnar = false;
4774  while (first_n == -1 || fetched < first_n) {
4775  const auto crt_row = results.getNextRow(true, true);
4776  if (crt_row.empty()) {
4777  break;
4778  }
4779  ++fetched;
4780  if (at_most_n >= 0 && fetched > at_most_n) {
4781  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4782  std::to_string(at_most_n));
4783  }
4784  TRow trow;
4785  trow.cols.reserve(results.colCount());
4786  for (size_t i = 0; i < results.colCount(); ++i) {
4787  const auto agg_result = crt_row[i];
4788  trow.cols.push_back(value_to_thrift(agg_result, targets[i].get_type_info()));
4789  }
4790  _return.row_set.rows.push_back(trow);
4791  }
4792  }
4793 }
4794 
4795 TRowDescriptor MapDHandler::fixup_row_descriptor(const TRowDescriptor& row_desc,
4796  const Catalog& cat) {
4797  TRowDescriptor fixedup_row_desc;
4798  for (const TColumnType& col_desc : row_desc) {
4799  auto fixedup_col_desc = col_desc;
4800  if (col_desc.col_type.encoding == TEncodingType::DICT &&
4801  col_desc.col_type.comp_param > 0) {
4802  const auto dd = cat.getMetadataForDict(col_desc.col_type.comp_param, false);
4803  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
4804  }
4805  fixedup_row_desc.push_back(fixedup_col_desc);
4806  }
4807  return fixedup_row_desc;
4808 }
4809 
4810 // create simple result set to return a single column result
4811 void MapDHandler::create_simple_result(TQueryResult& _return,
4812  const ResultSet& results,
4813  const bool column_format,
4814  const std::string label) const {
4815  CHECK_EQ(size_t(1), results.rowCount());
4816  TColumnType proj_info;
4817  proj_info.col_name = label;
4818  proj_info.col_type.type = TDatumType::STR;
4819  proj_info.col_type.nullable = false;
4820  proj_info.col_type.is_array = false;
4821  _return.row_set.row_desc.push_back(proj_info);
4822  const auto crt_row = results.getNextRow(true, true);
4823  const auto tv = crt_row[0];
4824  CHECK(results.getNextRow(true, true).empty());
4825  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
4826  CHECK(scalar_tv);
4827  const auto s_n = boost::get<NullableString>(scalar_tv);
4828  CHECK(s_n);
4829  const auto s = boost::get<std::string>(s_n);
4830  CHECK(s);
4831  if (column_format) {
4832  TColumn tcol;
4833  tcol.data.str_col.push_back(*s);
4834  tcol.nulls.push_back(false);
4835  _return.row_set.is_columnar = true;
4836  _return.row_set.columns.push_back(tcol);
4837  } else {
4838  TDatum explanation;
4839  explanation.val.str_val = *s;
4840  explanation.is_null = false;
4841  TRow trow;
4842  trow.cols.push_back(explanation);
4843  _return.row_set.is_columnar = false;
4844  _return.row_set.rows.push_back(trow);
4845  }
4846 }
4847 
4848 void MapDHandler::convert_explain(TQueryResult& _return,
4849  const ResultSet& results,
4850  const bool column_format) const {
4851  create_simple_result(_return, results, column_format, "Explanation");
4852 }
4853 
4854 void MapDHandler::convert_result(TQueryResult& _return,
4855  const ResultSet& results,
4856  const bool column_format) const {
4857  create_simple_result(_return, results, column_format, "Result");
4858 }
4859 
4860 // this all should be moved out of here to catalog
4862  const Catalog_Namespace::SessionInfo& session_info,
4863  const TableDescriptor* td,
4864  const AccessPrivileges access_priv) {
4865  CHECK(td);
4866  auto& cat = session_info.getCatalog();
4867  std::vector<DBObject> privObjects;
4868  DBObject dbObject(td->tableName, TableDBObjectType);
4869  dbObject.loadKey(cat);
4870  dbObject.setPrivileges(access_priv);
4871  privObjects.push_back(dbObject);
4872  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
4873  privObjects);
4874 };
4875 
4877  const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl);
4878  if (drop_db_stmt) {
4879  invalidate_sessions(*drop_db_stmt->getDatabaseName(), drop_db_stmt);
4880  return;
4881  }
4882  const auto rename_db_stmt = dynamic_cast<Parser::RenameDatabaseStmt*>(ddl);
4883  if (rename_db_stmt) {
4884  invalidate_sessions(*rename_db_stmt->getPreviousDatabaseName(), rename_db_stmt);
4885  return;
4886  }
4887  const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl);
4888  if (drop_user_stmt) {
4889  invalidate_sessions(*drop_user_stmt->getUserName(), drop_user_stmt);
4890  return;
4891  }
4892  const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl);
4893  if (rename_user_stmt) {
4894  invalidate_sessions(*rename_user_stmt->getOldUserName(), rename_user_stmt);
4895  return;
4896  }
4897 }
4898 
4899 void MapDHandler::sql_execute_impl(TQueryResult& _return,
4900  QueryStateProxy query_state_proxy,
4901  const bool column_format,
4902  const std::string& nonce,
4903  const ExecutorDeviceType executor_device_type,
4904  const int32_t first_n,
4905  const int32_t at_most_n) {
4906  if (leaf_handler_) {
4907  leaf_handler_->flush_queue();
4908  }
4909 
4910  _return.nonce = nonce;
4911  _return.execution_time_ms = 0;
4912  auto const& query_str = query_state_proxy.getQueryState().getQueryStr();
4913  auto session_ptr = query_state_proxy.getQueryState().getConstSessionInfo();
4914  // Call to DistributedValidate() below may change cat.
4915  auto& cat = session_ptr->getCatalog();
4916 
4917  SQLParser parser;
4918  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
4919  std::string last_parsed;
4920  int num_parse_errors = 0;
4921  std::unique_ptr<Planner::RootPlan> root_plan;
4922 
4923  /*
4924  Use this seq to simplify locking:
4925  INSERT_VALUES: CheckpointLock [ >> TableWriteLock ]
4926  INSERT_SELECT: CheckpointLock >> TableReadLock [ >>
4927  TableWriteLock ] COPY_TO/SELECT: TableReadLock COPY_FROM: CheckpointLock [
4928  >> TableWriteLock ] DROP/TRUNC: CheckpointLock >> TableWriteLock
4929  DELETE/UPDATE: CheckpointLock >> TableWriteLock
4930  */
4931 
4932  std::vector<Lock_Namespace::TableLock> table_locks;
4933 
4934  mapd_unique_lock<mapd_shared_mutex> chkptlLock;
4935  mapd_unique_lock<mapd_shared_mutex> executeWriteLock;
4936  mapd_shared_lock<mapd_shared_mutex> executeReadLock;
4937 
4938  try {
4939  ParserWrapper pw{query_str};
4940  TableMap table_map;
4941  OptionalTableMap tableNames(table_map);
4942  if (pw.isCalcitePathPermissable(read_only_)) {
4943  std::string query_ra;
4944  _return.execution_time_ms += measure<>::execution([&]() {
4945  query_ra =
4946  parse_to_ra(query_state_proxy, query_str, {}, tableNames, mapd_parameters_);
4947  });
4948 
4949  std::string query_ra_calcite_explain;
4950  if (pw.isCalciteExplain() && (!g_enable_filter_push_down || g_cluster)) {
4951  // return the ra as the result
4952  convert_explain(_return, ResultSet(query_ra), true);
4953  return;
4954  } else if (pw.isCalciteExplain()) {
4955  // removing the "explain calcite " from the beginning of the "query_str":
4956  std::string temp_query_str =
4957  query_str.substr(std::string("explain calcite ").length());
4958  query_ra_calcite_explain = parse_to_ra(
4959  query_state_proxy, temp_query_str, {}, boost::none, mapd_parameters_);
4960  }
4961 
4962  // UPDATE/DELETE needs to get a checkpoint lock as the first lock
4963  for (const auto& table : tableNames.value()) {
4964  if (table.second) {
4965  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
4966  session_ptr->getCatalog(), table.first, LockType::CheckpointLock);
4967  }
4968  }
4969  // COPY_TO/SELECT: read ExecutorOuterLock >> TableReadLock locks
4970  executeReadLock = mapd_shared_lock<mapd_shared_mutex>(
4973  session_ptr->getCatalog(), tableNames.value(), table_locks);
4974 
4975  const auto filter_push_down_requests =
4976  execute_rel_alg(_return,
4977  query_state_proxy,
4978  pw.isCalciteExplain() ? query_ra_calcite_explain : query_ra,
4979  column_format,
4980  executor_device_type,
4981  first_n,
4982  at_most_n,
4983  pw.isIRExplain(),
4984  false,
4986  pw.isCalciteExplain(),
4987  pw.getExplainType() == ParserWrapper::ExplainType::OptimizedIR);
4988  if (pw.isCalciteExplain() && filter_push_down_requests.empty()) {
4989  // we only reach here if filter push down was enabled, but no filter
4990  // push down candidate was found
4991  convert_explain(_return, ResultSet(query_ra), true);
4992  return;
4993  }
4994  if (!filter_push_down_requests.empty()) {
4996  query_state_proxy,
4997  query_ra,
4998  column_format,
4999  executor_device_type,
5000  first_n,
5001  at_most_n,
5002  pw.isIRExplain(),
5003  pw.isCalciteExplain(),
5004  filter_push_down_requests);
5005  } else if (pw.isCalciteExplain() && filter_push_down_requests.empty()) {
5006  // return the ra as the result:
5007  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
5008  // no filter push down so we continue with the initial (unchanged) query's calcite
5009  // explanation.
5010  query_ra =
5011  parse_to_ra(query_state_proxy, query_str, {}, boost::none, mapd_parameters_);
5012  convert_explain(_return, ResultSet(query_ra), true);
5013  return;
5014  }
5015  if (pw.isCalciteExplain()) {
5016  // If we reach here, the filter push down candidates has been selected and
5017  // proper output result has been already created.
5018  return;
5019  }
5020  if (pw.isCalciteExplain()) {
5021  // return the ra as the result:
5022  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
5023  // no filter push down so we continue with the initial (unchanged) query's calcite
5024  // explanation.
5025  query_ra =
5026  parse_to_ra(query_state_proxy, query_str, {}, boost::none, mapd_parameters_);
5027  convert_explain(_return, ResultSet(query_ra), true);
5028  return;
5029  }
5030  return;
5031  } else if (pw.is_optimize || pw.is_validate) {
5032  // Get the Stmt object
5033  try {
5034  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
5035  } catch (std::exception& e) {
5036  throw std::runtime_error(e.what());
5037  }
5038  if (num_parse_errors > 0) {
5039  throw std::runtime_error("Syntax error at: " + last_parsed);
5040  }
5041  CHECK_EQ(parse_trees.size(), 1u);
5042 
5043  if (pw.is_optimize) {
5044  const auto optimize_stmt =
5045  dynamic_cast<Parser::OptimizeTableStmt*>(parse_trees.front().get());
5046  CHECK(optimize_stmt);
5047 
5048  _return.execution_time_ms += measure<>::execution([&]() {
5049  const auto td = cat.getMetadataForTable(optimize_stmt->getTableName(),
5050  /*populateFragmenter=*/true);
5051 
5052  if (!td || !user_can_access_table(
5053  *session_ptr, td, AccessPrivileges::DELETE_FROM_TABLE)) {
5054  throw std::runtime_error("Table " + optimize_stmt->getTableName() +
5055  " does not exist.");
5056  }
5057 
5058  auto chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5059  cat, td->tableName, LockType::CheckpointLock);
5060  auto table_write_lock = TableLockMgr::getWriteLockForTable(cat, td->tableName);
5061 
5062  auto executor =
5063  Executor::getExecutor(cat.getCurrentDB().dbId, "", "", mapd_parameters_);
5064  const TableOptimizer optimizer(td, executor.get(), cat);
5065  if (optimize_stmt->shouldVacuumDeletedRows()) {
5066  optimizer.vacuumDeletedRows();
5067  }
5068  optimizer.recomputeMetadata();
5069  });
5070 
5071  return;
5072  }
5073  if (pw.is_validate) {
5074  // check user is superuser
5075  if (!session_ptr->get_currentUser().isSuper) {
5076  throw std::runtime_error("Superuser is required to run VALIDATE");
5077  }
5078  const auto validate_stmt =
5079  dynamic_cast<Parser::ValidateStmt*>(parse_trees.front().get());
5080  CHECK(validate_stmt);
5081 
5082  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5084 
5085  std::string output{"Result for validate"};
5087  _return.execution_time_ms += measure<>::execution([&]() {
5088  const DistributedValidate validator(validate_stmt->getType(),
5089  validate_stmt->isRepairTypeRemove(),
5090  cat, // tables may be dropped here
5092  *session_ptr,
5093  *this);
5094  output = validator.validate();
5095  });
5096  } else {
5097  output = "Not running on a cluster nothing to validate";
5098  }
5099  convert_result(_return, ResultSet(output), true);
5100  return;
5101  }
5102  }
5103  LOG(INFO) << "passing query to legacy processor";
5104  } catch (std::exception& e) {
5105  if (strstr(e.what(), "java.lang.NullPointerException")) {
5106  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
5107  "query failed from broken view or other schema related issue");
5108  } else {
5109  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5110  }
5111  }
5112  try {
5113  // check for COPY TO stmt replace as required parser expects #~# markers
5114  const auto result = apply_copy_to_shim(query_str);
5115  num_parse_errors = parser.parse(result, parse_trees, last_parsed);
5116  } catch (std::exception& e) {
5117  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5118  }
5119  if (num_parse_errors > 0) {
5120  THROW_MAPD_EXCEPTION("Syntax error at: " + last_parsed);
5121  }
5122 
5123  auto get_legacy_plan =
5124  [&cat](const Parser::DMLStmt* dml,
5125  const bool is_explain) -> std::unique_ptr<Planner::RootPlan> {
5126  Analyzer::Query query;
5127  dml->analyze(cat, query);
5128  Planner::Optimizer optimizer(query, cat);
5129  auto root_plan_ptr = std::unique_ptr<Planner::RootPlan>(optimizer.optimize());
5130  CHECK(root_plan_ptr);
5131  if (is_explain) {
5132  root_plan_ptr->set_plan_dest(Planner::RootPlan::Dest::kEXPLAIN);
5133  }
5134  return root_plan_ptr;
5135  };
5136 
5137  auto handle_ddl =
5138  [&query_state_proxy, &session_ptr, &_return, &chkptlLock, &table_locks, this](
5139  Parser::DDLStmt* ddl) -> bool {
5140  if (!ddl) {
5141  return false;
5142  }
5143  const auto show_create_stmt = dynamic_cast<Parser::ShowCreateTableStmt*>(ddl);
5144  if (show_create_stmt) {
5145  // ParserNode ShowCreateTableStmt is currently unimplemented
5146  throw std::runtime_error(
5147  "SHOW CREATE TABLE is currently unsupported. Use `\\d` from omnisql for table "
5148  "DDL.");
5149  }
5150 
5151  const auto import_stmt = dynamic_cast<Parser::CopyTableStmt*>(ddl);
5152  if (import_stmt) {
5153  // get the lock if the table exists
5154  // thus allowing COPY FROM to execute to create a table
5155  // is it safe to do this check without a lock?
5156  // if the table doesn't exist, the getTableLock will throw an exception anyway
5157  const TableDescriptor* td =
5158  session_ptr->getCatalog().getMetadataForTable(import_stmt->get_table());
5159  if (td) {
5160  // COPY_FROM: CheckpointLock [ >> TableWriteLocks ]
5161  chkptlLock =
5162  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_ptr->getCatalog(),
5163  import_stmt->get_table(),
5165  // [ TableWriteLocks ] lock is deferred in
5166  // InsertOrderFragmenter::deleteFragments
5167  }
5168 
5169  if (g_cluster && !leaf_aggregator_.leafCount()) {
5170  // Don't allow copy from imports directly on a leaf node
5171  throw std::runtime_error(
5172  "Cannot import on an individual leaf. Please import from the Aggregator.");
5173  } else if (leaf_aggregator_.leafCount() > 0) {
5174  _return.execution_time_ms += measure<>::execution(
5175  [&]() { execute_distributed_copy_statement(import_stmt, *session_ptr); });
5176  } else {
5177  _return.execution_time_ms +=
5178  measure<>::execution([&]() { ddl->execute(*session_ptr); });
5179  }
5180 
5181  // Read response message
5182  convert_result(_return, ResultSet(*import_stmt->return_message.get()), true);
5183 
5184  // get geo_copy_from info
5185  _was_geo_copy_from = import_stmt->was_geo_copy_from();
5186  import_stmt->get_geo_copy_from_payload(_geo_copy_from_table,
5190  return true;
5191  }
5192 
5193  // Check for DDL statements requiring locking and get locks
5194  auto export_stmt = dynamic_cast<Parser::ExportQueryStmt*>(ddl);
5195  if (export_stmt) {
5196  TableMap table_map;
5197  OptionalTableMap tableNames(table_map);
5198  const auto query_string = export_stmt->get_select_stmt();
5199  const auto query_ra =
5200  parse_to_ra(query_state_proxy, query_string, {}, tableNames, mapd_parameters_);
5202  session_ptr->getCatalog(), tableNames.value(), table_locks);
5203  }
5204  auto truncate_stmt = dynamic_cast<Parser::TruncateTableStmt*>(ddl);
5205  if (truncate_stmt) {
5206  chkptlLock =
5207  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_ptr->getCatalog(),
5208  *truncate_stmt->get_table(),
5210  table_locks.emplace_back();
5211  table_locks.back().write_lock = TableLockMgr::getWriteLockForTable(
5212  session_ptr->getCatalog(), *truncate_stmt->get_table());
5213  }
5214  auto add_col_stmt = dynamic_cast<Parser::AddColumnStmt*>(ddl);
5215  if (add_col_stmt) {
5216  add_col_stmt->check_executable(*session_ptr);
5217  chkptlLock =
5218  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_ptr->getCatalog(),
5219  *add_col_stmt->get_table(),
5221  table_locks.emplace_back();
5222  table_locks.back().write_lock = TableLockMgr::getWriteLockForTable(
5223  session_ptr->getCatalog(), *add_col_stmt->get_table());
5224  }
5225 
5226  _return.execution_time_ms += measure<>::execution([&]() {
5227  ddl->execute(*session_ptr);
5229  });
5230  return true;
5231  };
5232 
5233  for (const auto& stmt : parse_trees) {
5234  try {
5235  auto select_stmt = dynamic_cast<Parser::SelectStmt*>(stmt.get());
5236  if (!select_stmt) {
5237  check_read_only("Non-SELECT statements");
5238  }
5239  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt.get());
5240  if (handle_ddl(ddl)) {
5241  if (render_handler_) {
5242  render_handler_->handle_ddl(ddl);
5243  }
5244  continue;
5245  }
5246  if (!root_plan) {
5247  // assume DML / non-explain plan (an insert or copy statement)
5248  const auto dml = dynamic_cast<Parser::DMLStmt*>(stmt.get());
5249  CHECK(dml);
5250  root_plan = get_legacy_plan(dml, false);
5251  CHECK(root_plan);
5252  }
5253  if (auto stmtp = dynamic_cast<Parser::InsertQueryStmt*>(stmt.get())) {
5254  // INSERT_SELECT: CheckpointLock >> TableReadLocks [ >> TableWriteLocks ]
5255  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5256  session_ptr->getCatalog(), *stmtp->get_table(), LockType::CheckpointLock);
5257  // >> TableReadLock locks
5258  TableMap table_map;
5259  OptionalTableMap tableNames(table_map);
5260  const auto query_string = stmtp->get_query()->to_string();
5261  const auto query_ra =
5262  parse_to_ra(query_state_proxy, query_str, {}, tableNames, mapd_parameters_);
5264  session_ptr->getCatalog(), tableNames.value(), table_locks);
5265 
5266  // [ TableWriteLocks ] lock is deferred in
5267  // InsertOrderFragmenter::deleteFragments
5268  // TODO: this statement is not supported. once supported, it must not go thru
5269  // InsertOrderFragmenter::insertData, or deadlock will occur w/o moving the
5270  // following lock back to here!!!
5271  } else if (auto stmtp = dynamic_cast<Parser::InsertValuesStmt*>(stmt.get())) {
5272  // INSERT_VALUES: CheckpointLock >> write ExecutorOuterLock [ >>
5273  // TableWriteLocks ]
5274  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5275  session_ptr->getCatalog(), *stmtp->get_table(), LockType::CheckpointLock);
5276  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5278  // [ TableWriteLocks ] lock is deferred in
5279  // InsertOrderFragmenter::deleteFragments
5280  }
5281 
5282  execute_root_plan(_return,
5283  query_state_proxy,
5284  root_plan.get(),
5285  column_format,
5286  *session_ptr,
5287  executor_device_type,
5288  first_n);
5289  } catch (std::exception& e) {
5290  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
5291  THROW_MAPD_EXCEPTION(thrift_exception ? std::string(thrift_exception->what())
5292  : std::string("Exception: ") + e.what());
5293  }
5294  }
5295 }
5296 
5298  TQueryResult& _return,
5299  QueryStateProxy query_state_proxy,
5300  std::string& query_ra,
5301  const bool column_format,
5302  const ExecutorDeviceType executor_device_type,
5303  const int32_t first_n,
5304  const int32_t at_most_n,
5305  const bool just_explain,
5306  const bool just_calcite_explain,
5307  const std::vector<PushedDownFilterInfo> filter_push_down_requests) {
5308  // collecting the selected filters' info to be sent to Calcite:
5309  std::vector<TFilterPushDownInfo> filter_push_down_info;
5310  for (const auto& req : filter_push_down_requests) {
5311  TFilterPushDownInfo filter_push_down_info_for_request;
5312  filter_push_down_info_for_request.input_prev = req.input_prev;
5313  filter_push_down_info_for_request.input_start = req.input_start;
5314  filter_push_down_info_for_request.input_next = req.input_next;
5315  filter_push_down_info.push_back(filter_push_down_info_for_request);
5316  }
5317  // deriving the new relational algebra plan with respect to the pushed down filters
5318  _return.execution_time_ms += measure<>::execution([&]() {
5319  query_ra = parse_to_ra(query_state_proxy,
5320  query_state_proxy.getQueryState().getQueryStr(),
5321  filter_push_down_info,
5322  boost::none,
5324  });
5325 
5326  if (just_calcite_explain) {
5327  // return the new ra as the result
5328  convert_explain(_return, ResultSet(query_ra), true);
5329  return;
5330  }
5331 
5332  // execute the new relational algebra plan:
5333  execute_rel_alg(_return,
5334  query_state_proxy,
5335  query_ra,
5336  column_format,
5337  executor_device_type,
5338  first_n,
5339  at_most_n,
5340  just_explain,
5341  /*just_validate = */ false,
5342  /*find_push_down_candidates = */ false,
5343  /*just_calcite_explain = */ false,
5344  /*TODO: explain optimized*/ false);
5345 }
5346 
5348  Parser::CopyTableStmt* copy_stmt,
5349  const Catalog_Namespace::SessionInfo& session_info) {
5350  auto importer_factory = [&session_info, this](
5351  const Catalog& catalog,
5352  const TableDescriptor* td,
5353  const std::string& file_path,
5354  const Importer_NS::CopyParams& copy_params) {
5355  return boost::make_unique<Importer_NS::Importer>(
5356  new DistributedLoader(session_info, td, &leaf_aggregator_),
5357  file_path,
5358  copy_params);
5359  };
5360  copy_stmt->execute(session_info, importer_factory);
5361 }
5362 
5364  QueryStateProxy query_state_proxy,
5365  const std::string& query_str,
5366  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
5367  OptionalTableMap tableNames,
5368  const MapDParameters mapd_parameters,
5369  RenderInfo* render_info) {
5370  query_state::Timer timer = query_state_proxy.createTimer(__func__);
5371  ParserWrapper pw{query_str};
5372  const std::string actual_query{pw.isSelectExplain() ? pw.actual_query : query_str};
5373  if (pw.isCalcitePathPermissable()) {
5374  TPlanResult result;
5375  auto session_cleanup_handler = [&](const auto& session_id) {
5376  removeInMemoryCalciteSession(session_id);
5377  };
5378  auto process_calcite_request = [&] {
5379  const auto& in_memory_session_id = createInMemoryCalciteSession(
5380  query_state_proxy.getQueryState().getConstSessionInfo()->get_catalog_ptr());
5381  try {
5382  result = calcite_->process(timer.createQueryStateProxy(),
5383  legacy_syntax_ ? pg_shim(actual_query) : actual_query,
5384  filter_push_down_info,
5386  pw.isCalciteExplain(),
5387  mapd_parameters.enable_calcite_view_optimize,
5388  in_memory_session_id);
5389  session_cleanup_handler(in_memory_session_id);
5390  } catch (std::exception&) {
5391  session_cleanup_handler(in_memory_session_id);
5392  throw;
5393  }
5394  };
5395  process_calcite_request();
5396  if (tableNames) {
5397  for (const auto& table : result.resolved_accessed_objects.tables_selected_from) {
5398  (tableNames.value())[table] = false;
5399  }
5400  for (const auto& tables :
5401  std::vector<decltype(result.resolved_accessed_objects.tables_inserted_into)>{
5402  result.resolved_accessed_objects.tables_inserted_into,
5403  result.resolved_accessed_objects.tables_updated_in,
5404  result.resolved_accessed_objects.tables_deleted_from}) {
5405  for (const auto& table : tables) {
5406  (tableNames.value())[table] = true;
5407  }
5408  }
5409  }
5410  if (render_info) {
5411  // grabs all the selected-from tables, even views. This is used by the renderer to
5412  // resolve view hit-testing.
5413  // NOTE: the same table name could exist in both the primary and resolved tables.
5414  auto selected_tables = &result.primary_accessed_objects.tables_selected_from;
5415  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5416  selected_tables = &result.resolved_accessed_objects.tables_selected_from;
5417  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5418  }
5419  return result.plan_result;
5420  }
5421  return "";
5422 }
5423 
5424 void MapDHandler::check_table_consistency(TTableMeta& _return,
5425  const TSessionId& session,
5426  const int32_t table_id) {
5427  auto stdlog = STDLOG(get_session_ptr(session));
5428  if (!leaf_handler_) {
5429  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5430  }
5431  try {
5432  leaf_handler_->check_table_consistency(_return, session, table_id);
5433  } catch (std::exception& e) {
5434  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5435  }
5436 }
5437 
5438 void MapDHandler::start_query(TPendingQuery& _return,
5439  const TSessionId& session,
5440  const std::string& query_ra,
5441  const bool just_explain) {
5442  auto stdlog = STDLOG(get_session_ptr(session));
5443  auto session_ptr = stdlog.getConstSessionInfo();
5444  if (!leaf_handler_) {
5445  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5446  }
5447  LOG(INFO) << "start_query :" << *session_ptr << " :" << just_explain;
5448  auto time_ms = measure<>::execution([&]() {
5449  try {
5450  leaf_handler_->start_query(_return, session, query_ra, just_explain);
5451  } catch (std::exception& e) {
5452  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5453  }
5454  });
5455  LOG(INFO) << "start_query-COMPLETED " << time_ms << "ms "
5456  << "id is " << _return.id;
5457 }
5458 
5459 void MapDHandler::execute_query_step(TStepResult& _return,
5460  const TPendingQuery& pending_query) {
5461  if (!leaf_handler_) {
5462  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5463  }
5464  LOG(INFO) << "execute_query_step : id:" << pending_query.id;
5465  auto time_ms = measure<>::execution([&]() {
5466  try {
5467  leaf_handler_->execute_query_step(_return, pending_query);
5468  } catch (std::exception& e) {
5469  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5470  }
5471  });
5472  LOG(INFO) << "execute_query_step-COMPLETED " << time_ms << "ms";
5473 }
5474 
5475 void MapDHandler::broadcast_serialized_rows(const TSerializedRows& serialized_rows,
5476  const TRowDescriptor& row_desc,
5477  const TQueryId query_id) {
5478  if (!leaf_handler_) {
5479  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5480  }
5481  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS id:" << query_id;
5482  auto time_ms = measure<>::execution([&]() {
5483  try {
5484  leaf_handler_->broadcast_serialized_rows(serialized_rows, row_desc, query_id);
5485  } catch (std::exception& e) {
5486  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5487  }
5488  });
5489  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS COMPLETED " << time_ms << "ms";
5490 }
5491 
5492 void MapDHandler::insert_data(const TSessionId& session,
5493  const TInsertData& thrift_insert_data) {
5494  auto stdlog = STDLOG(get_session_ptr(session));
5495  auto session_ptr = stdlog.getConstSessionInfo();
5496  CHECK_EQ(thrift_insert_data.column_ids.size(), thrift_insert_data.data.size());
5497  auto const& cat = session_ptr->getCatalog();
5499  insert_data.databaseId = thrift_insert_data.db_id;
5500  insert_data.tableId = thrift_insert_data.table_id;
5501  insert_data.columnIds = thrift_insert_data.column_ids;
5502  std::vector<std::unique_ptr<std::vector<std::string>>> none_encoded_string_columns;
5503  std::vector<std::unique_ptr<std::vector<ArrayDatum>>> array_columns;
5504  for (size_t col_idx = 0; col_idx < insert_data.columnIds.size(); ++col_idx) {
5505  const int column_id = insert_data.columnIds[col_idx];
5506  DataBlockPtr p;
5507  const auto cd = cat.getMetadataForColumn(insert_data.tableId, column_id);
5508  CHECK(cd);
5509  const auto& ti = cd->columnType;
5510  if (ti.is_number() || ti.is_time() || ti.is_boolean()) {
5511  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5512  } else if (ti.is_string()) {
5513  if (ti.get_compression() == kENCODING_DICT) {
5514  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5515  } else {
5516  CHECK_EQ(kENCODING_NONE, ti.get_compression());
5517  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5518  auto& none_encoded_strings = none_encoded_string_columns.back();
5519  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5520  thrift_insert_data.data[col_idx].var_len_data.size());
5521  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5522  none_encoded_strings->push_back(varlen_str.payload);
5523  }
5524  p.stringsPtr = none_encoded_strings.get();
5525  }
5526  } else if (ti.is_geometry()) {
5527  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5528  auto& none_encoded_strings = none_encoded_string_columns.back();
5529  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5530  thrift_insert_data.data[col_idx].var_len_data.size());
5531  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5532  none_encoded_strings->push_back(varlen_str.payload);
5533  }
5534  p.stringsPtr = none_encoded_strings.get();
5535  } else {
5536  CHECK(ti.is_array());
5537  array_columns.emplace_back(new std::vector<ArrayDatum>());
5538  auto& array_column = array_columns.back();
5539  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5540  thrift_insert_data.data[col_idx].var_len_data.size());
5541  for (const auto& t_arr_datum : thrift_insert_data.data[col_idx].var_len_data) {
5542  if (t_arr_datum.is_null) {
5543  if (ti.get_size() > 0 && !ti.get_elem_type().is_string()) {
5544  array_column->push_back(Importer_NS::ImporterUtils::composeNullArray(ti));
5545  } else {
5546  array_column->emplace_back(0, nullptr, true);
5547  }
5548  } else {
5549  ArrayDatum arr_datum;
5550  arr_datum.length = t_arr_datum.payload.size();
5551  int8_t* ptr = (int8_t*)(t_arr_datum.payload.data());
5552  arr_datum.pointer = ptr;
5553  // In this special case, ArrayDatum does not handle freeing the underlying
5554  // memory
5555  arr_datum.data_ptr = std::shared_ptr<int8_t>(ptr, [](auto p) {});
5556  arr_datum.is_null = false;
5557  array_column->push_back(arr_datum);
5558  }
5559  }
5560  p.arraysPtr = array_column.get();
5561  }
5562  insert_data.data.push_back(p);
5563  }
5564  insert_data.numRows = thrift_insert_data.num_rows;
5565  const auto td = cat.getMetadataForTable(insert_data.tableId);
5566  try {
5567  // this should have the same lock seq as COPY FROM
5568  ChunkKey chunkKey = {insert_data.databaseId, insert_data.tableId};
5569  mapd_unique_lock<mapd_shared_mutex> tableLevelWriteLock(
5572  // [ TableWriteLocks ] lock is deferred in
5573  // InsertOrderFragmenter::deleteFragments
5574  td->fragmenter->insertDataNoCheckpoint(insert_data);
5575  } catch (const std::exception& e) {
5576  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5577  }
5578 }
5579 
5580 void MapDHandler::start_render_query(TPendingRenderQuery& _return,
5581  const TSessionId& session,
5582  const int64_t widget_id,
5583  const int16_t node_idx,
5584  const std::string& vega_json) {
5585  auto stdlog = STDLOG(get_session_ptr(session));
5586  auto session_ptr = stdlog.getConstSessionInfo();
5587  if (!render_handler_) {
5588  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5589  }
5590  LOG(INFO) << "start_render_query :" << *session_ptr << " :widget_id:" << widget_id
5591  << ":vega_json:" << vega_json;
5592  auto time_ms = measure<>::execution([&]() {
5593  try {
5594  render_handler_->start_render_query(
5595  _return, session, widget_id, node_idx, vega_json);
5596  } catch (std::exception& e) {
5597  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5598  }
5599  });
5600  LOG(INFO) << "start_render_query-COMPLETED " << time_ms << "ms "
5601  << "id is " << _return.id;
5602 }
5603 
5604 void MapDHandler::execute_next_render_step(TRenderStepResult& _return,
5605  const TPendingRenderQuery& pending_render,
5606  const TRenderAggDataMap& merged_data) {
5607  if (!render_handler_) {
5608  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5609  }
5610 
5611  LOG(INFO) << "execute_next_render_step: id:" << pending_render.id;
5612  auto time_ms = measure<>::execution([&]() {
5613  try {
5614  render_handler_->execute_next_render_step(_return, pending_render, merged_data);
5615  } catch (std::exception& e) {
5616  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5617  }
5618  });
5619  LOG(INFO) << "execute_next_render_step-COMPLETED id: " << pending_render.id
5620  << ", time: " << time_ms << "ms ";
5621 }
5622 
5623 void MapDHandler::checkpoint(const TSessionId& session,
5624  const int32_t db_id,
5625  const int32_t table_id) {
5626  auto stdlog = STDLOG(get_session_ptr(session));
5627  auto session_ptr = stdlog.getConstSessionInfo();
5628  auto& cat = session_ptr->getCatalog();
5629  cat.getDataMgr().checkpoint(db_id, table_id);
5630 }
5631 
5632 // check and reset epoch if a request has been made
5633 void MapDHandler::set_table_epoch(const TSessionId& session,
5634  const int db_id,
5635  const int table_id,
5636  const int new_epoch) {
5637  auto stdlog = STDLOG(get_session_ptr(session));
5638  auto session_ptr = stdlog.getConstSessionInfo();
5639  if (!session_ptr->get_currentUser().isSuper) {
5640  throw std::runtime_error("Only superuser can set_table_epoch");
5641  }
5642  auto& cat = session_ptr->getCatalog();
5643 
5644  if (leaf_aggregator_.leafCount() > 0) {
5645  return leaf_aggregator_.set_table_epochLeaf(*session_ptr, db_id, table_id, new_epoch);
5646  }
5647  cat.setTableEpoch(db_id, table_id, new_epoch);
5648 }
5649 
5650 // check and reset epoch if a request has been made
5651 void MapDHandler::set_table_epoch_by_name(const TSessionId& session,
5652  const std::string& table_name,
5653  const int new_epoch) {
5654  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
5655  auto session_ptr = stdlog.getConstSessionInfo();
5656  if (!session_ptr->get_currentUser().isSuper) {
5657  throw std::runtime_error("Only superuser can set_table_epoch");
5658  }
5659  auto& cat = session_ptr->getCatalog();
5660  auto td = cat.getMetadataForTable(
5661  table_name,
5662  false); // don't populate fragmenter on this call since we only want metadata
5663  int32_t db_id = cat.getCurrentDB().dbId;
5664  if (leaf_aggregator_.leafCount() > 0) {
5666  *session_ptr, db_id, td->tableId, new_epoch);
5667  }
5668  cat.setTableEpoch(db_id, td->tableId, new_epoch);
5669 }
5670 
5671 int32_t MapDHandler::get_table_epoch(const TSessionId& session,
5672  const int32_t db_id,
5673  const int32_t table_id) {
5674  auto stdlog = STDLOG(get_session_ptr(session));
5675  auto session_ptr = stdlog.getConstSessionInfo();
5676  auto const& cat = session_ptr->getCatalog();
5677 
5678  if (leaf_aggregator_.leafCount() > 0) {
5679  return leaf_aggregator_.get_table_epochLeaf(*session_ptr, db_id, table_id);
5680  }
5681  return cat.getTableEpoch(db_id, table_id);
5682 }
5683 
5684 int32_t