OmniSciDB  0fdbebe030
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
MapDHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: MapDHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "MapDHandler.h"
25 #include "DistributedLoader.h"
26 #include "MapDServer.h"
28 #include "TokenCompletionHints.h"
29 
30 #ifdef HAVE_PROFILER
31 #include <gperftools/heap-profiler.h>
32 #endif // HAVE_PROFILER
33 
34 #include "MapDRelease.h"
35 
36 #include "Calcite/Calcite.h"
37 #include "gen-cpp/CalciteServer.h"
38 
40 
41 #include "Catalog/Catalog.h"
47 #include "Import/Importer.h"
48 #include "LockMgr/LockMgr.h"
49 #include "MapDDistributedHandler.h"
50 #include "Parser/ParserWrapper.h"
52 #include "Parser/parser.h"
55 #include "QueryEngine/Execute.h"
63 #include "Shared/StringTransform.h"
64 #include "Shared/SysInfo.h"
65 #include "Shared/geo_types.h"
66 #include "Shared/geosupport.h"
67 #include "Shared/import_helpers.h"
69 #include "Shared/measure.h"
70 #include "Shared/scope.h"
71 
72 #include <fcntl.h>
73 #include <picosha2.h>
74 #include <sys/time.h>
75 #include <sys/types.h>
76 #include <sys/wait.h>
77 #include <unistd.h>
78 #include <algorithm>
79 #include <boost/algorithm/string.hpp>
80 #include <boost/filesystem.hpp>
81 #include <boost/make_shared.hpp>
82 #include <boost/process/search_path.hpp>
83 #include <boost/program_options.hpp>
84 #include <boost/regex.hpp>
85 #include <boost/tokenizer.hpp>
86 #include <cmath>
87 #include <csignal>
88 #include <fstream>
89 #include <future>
90 #include <map>
91 #include <memory>
92 #include <random>
93 #include <regex>
94 #include <string>
95 #include <thread>
96 #include <typeinfo>
97 
98 #include <arrow/api.h>
99 #include <arrow/io/api.h>
100 #include <arrow/ipc/api.h>
101 
102 #include "Shared/ArrowUtil.h"
103 
104 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
105 
108 
109 #define INVALID_SESSION_ID ""
110 
111 #define THROW_MAPD_EXCEPTION(errstr) \
112  { \
113  TMapDException ex; \
114  ex.error_msg = errstr; \
115  LOG(ERROR) << ex.error_msg; \
116  throw ex; \
117  }
118 
119 thread_local std::string MapDTrackingProcessor::client_address;
121 
122 namespace {
123 
124 SessionMap::iterator get_session_from_map(const TSessionId& session,
125  SessionMap& session_map) {
126  auto session_it = session_map.find(session);
127  if (session_it == session_map.end()) {
128  THROW_MAPD_EXCEPTION("Session not valid.");
129  }
130  return session_it;
131 }
132 
133 struct ForceDisconnect : public std::runtime_error {
134  ForceDisconnect(const std::string& cause) : std::runtime_error(cause) {}
135 };
136 
137 } // namespace
138 
139 template <>
141  const TSessionId& session,
142  mapd_shared_lock<mapd_shared_mutex>& read_lock) {
143  auto session_it = get_session_from_map(session, sessions_);
144  try {
145  check_session_exp_unsafe(session_it);
146  } catch (const ForceDisconnect& e) {
147  read_lock.unlock();
148  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
149  auto session_it2 = get_session_from_map(session, sessions_);
150  disconnect_impl(session_it2, write_lock);
151  THROW_MAPD_EXCEPTION(e.what());
152  }
153  return session_it;
154 }
155 
156 template <>
158  const TSessionId& session,
159  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
160  auto session_it = get_session_from_map(session, sessions_);
161  try {
162  check_session_exp_unsafe(session_it);
163  } catch (const ForceDisconnect& e) {
164  disconnect_impl(session_it, write_lock);
165  THROW_MAPD_EXCEPTION(e.what());
166  }
167  return session_it;
168 }
169 
170 MapDHandler::MapDHandler(const std::vector<LeafHostInfo>& db_leaves,
171  const std::vector<LeafHostInfo>& string_leaves,
172  const std::string& base_data_path,
173  const bool cpu_only,
174  const bool allow_multifrag,
175  const bool jit_debug,
176  const bool intel_jit_profile,
177  const bool read_only,
178  const bool allow_loop_joins,
179  const bool enable_rendering,
180  const bool enable_auto_clear_render_mem,
181  const int render_oom_retry_threshold,
182  const size_t render_mem_bytes,
183  const size_t max_concurrent_render_sessions,
184  const int num_gpus,
185  const int start_gpu,
186  const size_t reserved_gpu_mem,
187  const size_t num_reader_threads,
188  const AuthMetadata& authMetadata,
189  const MapDParameters& mapd_parameters,
190  const bool legacy_syntax,
191  const int idle_session_duration,
192  const int max_session_duration,
193  const bool enable_runtime_udf_registration,
194  const std::string& udf_filename,
195  const std::string& clang_path,
196  const std::vector<std::string>& clang_options)
197  : leaf_aggregator_(db_leaves)
198  , string_leaves_(string_leaves)
199  , base_data_path_(base_data_path)
200  , random_gen_(std::random_device{}())
201  , session_id_dist_(0, INT32_MAX)
202  , jit_debug_(jit_debug)
203  , intel_jit_profile_(intel_jit_profile)
204  , allow_multifrag_(allow_multifrag)
205  , read_only_(read_only)
206  , allow_loop_joins_(allow_loop_joins)
207  , authMetadata_(authMetadata)
208  , mapd_parameters_(mapd_parameters)
209  , legacy_syntax_(legacy_syntax)
210  , super_user_rights_(false)
211  , idle_session_duration_(idle_session_duration * 60)
212  , max_session_duration_(max_session_duration * 60)
213  , runtime_udf_registration_enabled_(enable_runtime_udf_registration) {
214  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
215  // Register foreign storage interfaces here
217  bool is_rendering_enabled = enable_rendering;
218 
219  try {
220  if (cpu_only) {
221  is_rendering_enabled = false;
222  executor_device_type_ = ExecutorDeviceType::CPU;
223  cpu_mode_only_ = true;
224  } else {
225 #ifdef HAVE_CUDA
226  executor_device_type_ = ExecutorDeviceType::GPU;
227  cpu_mode_only_ = false;
228 #else
229  executor_device_type_ = ExecutorDeviceType::CPU;
230  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
231  cpu_mode_only_ = true;
232  is_rendering_enabled = false;
233 #endif
234  }
235  } catch (const std::exception& e) {
236  LOG(FATAL) << "Failed to executor device type: " << e.what();
237  }
238 
239  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
240  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
241  // manage cannot ask for
242  size_t total_reserved = reserved_gpu_mem;
243  if (is_rendering_enabled) {
244  total_reserved += render_mem_bytes;
245  }
246 
247  try {
248  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
249  mapd_parameters,
250  !cpu_mode_only_,
251  num_gpus,
252  start_gpu,
253  total_reserved,
254  num_reader_threads));
255  } catch (const std::exception& e) {
256  LOG(FATAL) << "Failed to initialize data manager: " << e.what();
257  }
258 
259  std::string udf_ast_filename("");
260 
261  try {
262  if (!udf_filename.empty()) {
263  UdfCompiler compiler(udf_filename, clang_path, clang_options);
264  int compile_result = compiler.compileUdf();
265 
266  if (compile_result == 0) {
267  udf_ast_filename = compiler.getAstFileName();
268  }
269  }
270  } catch (const std::exception& e) {
271  LOG(FATAL) << "Failed to initialize UDF compiler: " << e.what();
272  }
273 
274  try {
275  calcite_ =
276  std::make_shared<Calcite>(mapd_parameters, base_data_path_, udf_ast_filename);
277  } catch (const std::exception& e) {
278  LOG(FATAL) << "Failed to initialize Calcite server: " << e.what();
279  }
280 
281  try {
282  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
283  if (!udf_filename.empty()) {
284  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
285  }
286  } catch (const std::exception& e) {
287  LOG(FATAL) << "Failed to initialize extension functions: " << e.what();
288  }
289 
290  try {
292  } catch (const std::exception& e) {
293  LOG(FATAL) << "Failed to initialize table functions factory: " << e.what();
294  }
295 
296  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
297  executor_device_type_ = ExecutorDeviceType::CPU;
298  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
299  cpu_mode_only_ = true;
300  }
301 
302  switch (executor_device_type_) {
304  LOG(INFO) << "Started in GPU mode" << std::endl;
305  break;
307  LOG(INFO) << "Started in CPU mode" << std::endl;
308  break;
309  }
310 
311  try {
312  SysCatalog::instance().init(base_data_path_,
313  data_mgr_,
314  authMetadata,
315  calcite_,
316  false,
317  !db_leaves.empty(),
318  string_leaves_);
319  } catch (const std::exception& e) {
320  LOG(FATAL) << "Failed to initialize system catalog: " << e.what();
321  }
322 
323  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
324  start_time_ = std::time(nullptr);
325 
326  if (is_rendering_enabled) {
327  try {
328  render_handler_.reset(new MapDRenderHandler(this,
329  render_mem_bytes,
330  max_concurrent_render_sessions,
331  0u,
332  false,
333  0,
334  mapd_parameters_));
335  } catch (const std::exception& e) {
336  LOG(ERROR) << "Backend rendering disabled: " << e.what();
337  }
338  }
339 
340  if (leaf_aggregator_.leafCount() > 0) {
341  try {
342  agg_handler_.reset(new MapDAggHandler(this));
343  } catch (const std::exception& e) {
344  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
345  }
346  } else if (g_cluster) {
347  try {
348  leaf_handler_.reset(new MapDLeafHandler(this));
349  } catch (const std::exception& e) {
350  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
351  }
352  }
353 }
354 
356 
357 void MapDHandler::check_read_only(const std::string& str) {
359  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
360  }
361 }
362 
364  const std::shared_ptr<Catalog_Namespace::Catalog>& catalog_ptr) {
365  // We would create an in memory session for calcite with super user privileges which
366  // would be used for getting all tables metadata when a user runs the query. The
367  // session would be under the name of a proxy user/password which would only persist
368  // till server's lifetime or execution of calcite query(in memory) whichever is the
369  // earliest.
370  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
371  std::string session_id;
372  do {
373  session_id = generate_random_string(64);
374  } while (sessions_.find(session_id) != sessions_.end());
375  Catalog_Namespace::UserMetadata user_meta(-1,
376  calcite_->getInternalSessionProxyUserName(),
377  calcite_->getInternalSessionProxyPassword(),
378  true,
379  -1,
380  true);
381  const auto emplace_ret =
382  sessions_.emplace(session_id,
383  std::make_shared<Catalog_Namespace::SessionInfo>(
384  catalog_ptr, user_meta, executor_device_type_, session_id));
385  CHECK(emplace_ret.second);
386  return session_id;
387 }
388 
390  const Catalog_Namespace::UserMetadata user_meta) {
391  return user_meta.userName == calcite_->getInternalSessionProxyUserName() &&
392  user_meta.userId == -1 && user_meta.defaultDbId == -1 &&
393  user_meta.isSuper.load();
394 }
395 
396 void MapDHandler::removeInMemoryCalciteSession(const std::string& session_id) {
397  // Remove InMemory calcite Session.
398  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
399  const auto it = sessions_.find(session_id);
400  CHECK(it != sessions_.end());
401  sessions_.erase(it);
402 }
403 
404 // internal connection for connections with no password
405 void MapDHandler::internal_connect(TSessionId& session,
406  const std::string& username,
407  const std::string& dbname) {
408  auto stdlog = STDLOG(); // session_info set by connect_impl()
409  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
410  std::string username2 = username; // login() may reset username given as argument
411  std::string dbname2 = dbname; // login() may reset dbname given as argument
413  std::shared_ptr<Catalog> cat = nullptr;
414  try {
415  cat =
416  SysCatalog::instance().login(dbname2, username2, std::string(), user_meta, false);
417  } catch (std::exception& e) {
418  THROW_MAPD_EXCEPTION(e.what());
419  }
420 
421  DBObject dbObject(dbname2, DatabaseDBObjectType);
422  dbObject.loadKey(*cat);
424  std::vector<DBObject> dbObjects;
425  dbObjects.push_back(dbObject);
426  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
427  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
428  " is not allowed to access database " + dbname2 + ".");
429  }
430  connect_impl(session, std::string(), dbname2, user_meta, cat, stdlog);
431 }
432 
433 void MapDHandler::krb5_connect(TKrb5Session& session,
434  const std::string& inputToken,
435  const std::string& dbname) {
436  THROW_MAPD_EXCEPTION("Unauthrorized Access. Kerberos login not supported");
437 }
438 
439 void MapDHandler::connect(TSessionId& session,
440  const std::string& username,
441  const std::string& passwd,
442  const std::string& dbname) {
443  auto stdlog = STDLOG(); // session_info set by connect_impl()
444  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
445  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
446  std::string username2 = username; // login() may reset username given as argument
447  std::string dbname2 = dbname; // login() may reset dbname given as argument
449  std::shared_ptr<Catalog> cat = nullptr;
450  try {
451  cat = SysCatalog::instance().login(
452  dbname2, username2, passwd, user_meta, !super_user_rights_);
453  } catch (std::exception& e) {
454  stdlog.appendNameValuePairs("user", username, "db", dbname, "exception", e.what());
455  THROW_MAPD_EXCEPTION(e.what());
456  }
457 
458  DBObject dbObject(dbname2, DatabaseDBObjectType);
459  dbObject.loadKey(*cat);
461  std::vector<DBObject> dbObjects;
462  dbObjects.push_back(dbObject);
463  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
464  stdlog.appendNameValuePairs(
465  "user", username, "db", dbname, "exception", "Missing Privileges");
466  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
467  " is not allowed to access database " + dbname2 + ".");
468  }
469  connect_impl(session, passwd, dbname2, user_meta, cat, stdlog);
470  // if pki auth session will come back encrypted with user pubkey
471  SysCatalog::instance().check_for_session_encryption(passwd, session);
472 }
473 
474 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::create_new_session(
475  TSessionId& session,
476  const std::string& dbname,
477  const Catalog_Namespace::UserMetadata& user_meta,
478  std::shared_ptr<Catalog> cat) {
479  do {
480  session = generate_random_string(32);
481  } while (sessions_.find(session) != sessions_.end());
482  std::pair<SessionMap::iterator, bool> emplace_retval =
483  sessions_.emplace(session,
484  std::make_shared<Catalog_Namespace::SessionInfo>(
485  cat, user_meta, executor_device_type_, session));
486  CHECK(emplace_retval.second);
487  auto& session_ptr = emplace_retval.first->second;
488  LOG(INFO) << "User " << user_meta.userName << " connected to database " << dbname;
489  return session_ptr;
490 }
491 
492 void MapDHandler::connect_impl(TSessionId& session,
493  const std::string& passwd,
494  const std::string& dbname,
495  const Catalog_Namespace::UserMetadata& user_meta,
496  std::shared_ptr<Catalog> cat,
497  query_state::StdLog& stdlog) {
498  // TODO(sy): Is there any reason to have dbname as a parameter
499  // here when the cat parameter already provides cat->name()?
500  // Should dbname and cat->name() ever differ?
501  auto session_ptr = create_new_session(session, dbname, user_meta, cat);
502  stdlog.setSessionInfo(session_ptr);
503  session_ptr->set_connection_info(getConnectionInfo().toString());
504  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time while
505  // doing warmup
506  if (leaf_aggregator_.leafCount() > 0) {
507  leaf_aggregator_.connect(*session_ptr, user_meta.userName, passwd, dbname);
508  return;
509  }
510  }
511  auto const roles = session_ptr->get_currentUser().isSuper
512  ? std::vector<std::string>{{"super"}}
513  : SysCatalog::instance().getRoles(
514  false, false, session_ptr->get_currentUser().userName);
515  stdlog.appendNameValuePairs("roles", boost::algorithm::join(roles, ","));
516 }
517 
518 void MapDHandler::disconnect(const TSessionId& session) {
519  auto stdlog = STDLOG();
520  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
521 
522  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
523  auto session_it = get_session_it_unsafe(session, write_lock);
524  stdlog.setSessionInfo(session_it->second);
525  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
526 
527  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
528  << " disconnected from database " << dbname
529  << " with public_session_id: " << session_it->second->get_public_session_id();
530  disconnect_impl(session_it, write_lock);
531 }
532 
533 void MapDHandler::disconnect_impl(const SessionMap::iterator& session_it,
534  mapd_unique_lock<mapd_shared_mutex>& write_lock) {
535  // session_it existence should already have been checked (i.e. called via
536  // get_session_it_unsafe(...))
537  const auto session_id = session_it->second->get_session_id();
538  if (leaf_aggregator_.leafCount() > 0) {
539  leaf_aggregator_.disconnect(session_id);
540  }
541  sessions_.erase(session_it);
542  write_lock.unlock();
543 
544  if (render_handler_) {
545  render_handler_->disconnect(session_id);
546  }
547 }
548 
549 void MapDHandler::switch_database(const TSessionId& session, const std::string& dbname) {
550  auto stdlog = STDLOG(get_session_ptr(session));
551  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
552  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
553  auto session_it = get_session_it_unsafe(session, write_lock);
554 
555  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
556 
557  try {
558  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
559  dbname2, session_it->second->get_currentUser().userName);
560  session_it->second->set_catalog_ptr(cat);
561  } catch (std::exception& e) {
562  THROW_MAPD_EXCEPTION(e.what());
563  }
564 }
565 
566 void MapDHandler::clone_session(TSessionId& session2, const TSessionId& session1) {
567  auto stdlog = STDLOG(get_session_ptr(session1));
568  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
569  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
570  auto session_it = get_session_it_unsafe(session1, write_lock);
571 
572  try {
573  const Catalog_Namespace::UserMetadata& user_meta =
574  session_it->second->get_currentUser();
575  std::shared_ptr<Catalog> cat = session_it->second->get_catalog_ptr();
576  auto session2_ptr = create_new_session(session2, cat->name(), user_meta, cat);
577  if (leaf_aggregator_.leafCount() > 0) {
578  leaf_aggregator_.clone_session(session1, session2);
579  return;
580  }
581  } catch (std::exception& e) {
582  THROW_MAPD_EXCEPTION(e.what());
583  }
584 }
585 
586 void MapDHandler::interrupt(const TSessionId& query_session,
587  const TSessionId& interrupt_session) {
588  // if this is for distributed setting, query_session becomes a parent session (agg)
589  // and the interrupt session is one of existing session in the leaf node (leaf)
590  // so we can think there exists a logical mapping
591  // between query_session (agg) and interrupt_session (leaf)
592  auto stdlog = STDLOG(get_session_ptr(interrupt_session));
593  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
595  // Shared lock to allow simultaneous interrupts of multiple sessions
596  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
597 
598  auto session_it = get_session_it_unsafe(interrupt_session, read_lock);
599  auto& cat = session_it->second.get()->getCatalog();
600  const auto dbname = cat.getCurrentDB().dbName;
601  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
602  jit_debug_ ? "/tmp" : "",
603  jit_debug_ ? "mapdquery" : "",
605  CHECK(executor);
606 
607  VLOG(1) << "Received interrupt: "
608  << "Session " << *session_it->second << ", Executor " << executor
609  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
610  << session_it->second->get_currentUser().userName << ", Database " << dbname
611  << std::endl;
612 
613  if (leaf_aggregator_.leafCount() > 0) {
614  leaf_aggregator_.interrupt(query_session, interrupt_session);
615  }
616  executor->interrupt(query_session, interrupt_session);
617 
618  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
619  << " interrupted session with database " << dbname << std::endl;
620  }
621 }
622 
623 void MapDHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
624  auto stdlog = STDLOG(get_session_ptr(session));
625  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
626  const auto rendering_enabled = bool(render_handler_);
627  _return.read_only = read_only_;
628  _return.version = MAPD_RELEASE;
629  _return.rendering_enabled = rendering_enabled;
630  _return.poly_rendering_enabled = rendering_enabled;
631  _return.start_time = start_time_;
632  _return.edition = MAPD_EDITION;
633  _return.host_name = get_hostname();
634 }
635 
636 void MapDHandler::get_status(std::vector<TServerStatus>& _return,
637  const TSessionId& session) {
638  auto stdlog = STDLOG(get_session_ptr(session));
639  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
640  const auto rendering_enabled = bool(render_handler_);
641  TServerStatus ret;
642  ret.read_only = read_only_;
643  ret.version = MAPD_RELEASE;
644  ret.rendering_enabled = rendering_enabled;
645  ret.poly_rendering_enabled = rendering_enabled;
646  ret.start_time = start_time_;
647  ret.edition = MAPD_EDITION;
648  ret.host_name = get_hostname();
649 
650  // TSercivePort tcp_port{}
651 
652  if (g_cluster) {
653  ret.role =
654  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
655  } else {
656  ret.role = TRole::type::SERVER;
657  }
658 
659  _return.push_back(ret);
660  if (leaf_aggregator_.leafCount() > 0) {
661  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
662  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
663  }
664 }
665 
666 void MapDHandler::get_hardware_info(TClusterHardwareInfo& _return,
667  const TSessionId& session) {
668  auto stdlog = STDLOG(get_session_ptr(session));
669  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
670  THardwareInfo ret;
671  const auto cuda_mgr = data_mgr_->getCudaMgr();
672  if (cuda_mgr) {
673  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
674  ret.start_gpu = cuda_mgr->getStartGpu();
675  if (ret.start_gpu >= 0) {
676  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
677  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
678  }
679  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
680  TGpuSpecification gpu_spec;
681  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
682  gpu_spec.num_sm = deviceProperties->numMPs;
683  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
684  gpu_spec.memory = deviceProperties->globalMem;
685  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
686  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
687  ret.gpu_info.push_back(gpu_spec);
688  }
689  }
690 
691  // start hardware/OS dependent code
692  ret.num_cpu_hw = std::thread::hardware_concurrency();
693  // ^ This might return diffrent results in case of hyper threading
694  // end hardware/OS dependent code
695 
696  _return.hardware_info.push_back(ret);
697  if (leaf_aggregator_.leafCount() > 0) {
698  ret.host_name = "aggregator";
699  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
700  _return.hardware_info.insert(_return.hardware_info.end(),
701  leaf_hardware.hardware_info.begin(),
702  leaf_hardware.hardware_info.end());
703  }
704 }
705 
706 void MapDHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
707  auto session_ptr = get_session_ptr(session);
708  auto stdlog = STDLOG(session_ptr);
709  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
710  auto user_metadata = session_ptr->get_currentUser();
711 
712  _return.user = user_metadata.userName;
713  _return.database = session_ptr->getCatalog().getCurrentDB().dbName;
714  _return.start_time = session_ptr->get_start_time();
715  _return.is_super = user_metadata.isSuper;
716 }
717 
719  const SQLTypeInfo& ti,
720  TColumn& column) {
721  if (ti.is_array()) {
722  TColumn tColumn;
723  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
724  CHECK(array_tv);
725  bool is_null = !array_tv->is_initialized();
726  if (!is_null) {
727  const auto& vec = array_tv->get();
728  for (const auto& elem_tv : vec) {
729  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
730  }
731  }
732  column.data.arr_col.push_back(tColumn);
733  column.nulls.push_back(is_null && !ti.get_notnull());
734  } else if (ti.is_geometry()) {
735  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
736  if (scalar_tv) {
737  auto s_n = boost::get<NullableString>(scalar_tv);
738  auto s = boost::get<std::string>(s_n);
739  if (s) {
740  column.data.str_col.push_back(*s);
741  } else {
742  column.data.str_col.emplace_back(""); // null string
743  auto null_p = boost::get<void*>(s_n);
744  CHECK(null_p && !*null_p);
745  }
746  column.nulls.push_back(!s && !ti.get_notnull());
747  } else {
748  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
749  CHECK(array_tv);
750  bool is_null = !array_tv->is_initialized();
751  if (!is_null) {
752  auto elem_type = SQLTypeInfo(kDOUBLE, false);
753  TColumn tColumn;
754  const auto& vec = array_tv->get();
755  for (const auto& elem_tv : vec) {
756  value_to_thrift_column(elem_tv, elem_type, tColumn);
757  }
758  column.data.arr_col.push_back(tColumn);
759  column.nulls.push_back(false);
760  } else {
761  TColumn tColumn;
762  column.data.arr_col.push_back(tColumn);
763  column.nulls.push_back(is_null && !ti.get_notnull());
764  }
765  }
766  } else {
767  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
768  CHECK(scalar_tv);
769  if (boost::get<int64_t>(scalar_tv)) {
770  int64_t data = *(boost::get<int64_t>(scalar_tv));
771 
772  if (ti.is_decimal()) {
773  double val = static_cast<double>(data);
774  if (ti.get_scale() > 0) {
775  val /= pow(10.0, std::abs(ti.get_scale()));
776  }
777  column.data.real_col.push_back(val);
778  } else {
779  column.data.int_col.push_back(data);
780  }
781 
782  switch (ti.get_type()) {
783  case kBOOLEAN:
784  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
785  break;
786  case kTINYINT:
787  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
788  break;
789  case kSMALLINT:
790  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
791  break;
792  case kINT:
793  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
794  break;
795  case kNUMERIC:
796  case kDECIMAL:
797  case kBIGINT:
798  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
799  break;
800  case kTIME:
801  case kTIMESTAMP:
802  case kDATE:
803  case kINTERVAL_DAY_TIME:
805  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
806  break;
807  default:
808  column.nulls.push_back(false);
809  }
810  } else if (boost::get<double>(scalar_tv)) {
811  double data = *(boost::get<double>(scalar_tv));
812  column.data.real_col.push_back(data);
813  if (ti.get_type() == kFLOAT) {
814  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
815  } else {
816  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
817  }
818  } else if (boost::get<float>(scalar_tv)) {
819  CHECK_EQ(kFLOAT, ti.get_type());
820  float data = *(boost::get<float>(scalar_tv));
821  column.data.real_col.push_back(data);
822  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
823  } else if (boost::get<NullableString>(scalar_tv)) {
824  auto s_n = boost::get<NullableString>(scalar_tv);
825  auto s = boost::get<std::string>(s_n);
826  if (s) {
827  column.data.str_col.push_back(*s);
828  } else {
829  column.data.str_col.emplace_back(""); // null string
830  auto null_p = boost::get<void*>(s_n);
831  CHECK(null_p && !*null_p);
832  }
833  column.nulls.push_back(!s && !ti.get_notnull());
834  } else {
835  CHECK(false);
836  }
837  }
838 }
839 
841  TDatum datum;
842  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
843  if (!scalar_tv) {
844  CHECK(ti.is_array());
845  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
846  CHECK(array_tv);
847  if (array_tv->is_initialized()) {
848  const auto& vec = array_tv->get();
849  for (const auto& elem_tv : vec) {
850  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
851  datum.val.arr_val.push_back(scalar_col_val);
852  }
853  // Datum is not null, at worst it's an empty array Datum
854  datum.is_null = false;
855  } else {
856  datum.is_null = true;
857  }
858  return datum;
859  }
860  if (boost::get<int64_t>(scalar_tv)) {
861  int64_t data = *(boost::get<int64_t>(scalar_tv));
862 
863  if (ti.is_decimal()) {
864  double val = static_cast<double>(data);
865  if (ti.get_scale() > 0) {
866  val /= pow(10.0, std::abs(ti.get_scale()));
867  }
868  datum.val.real_val = val;
869  } else {
870  datum.val.int_val = data;
871  }
872 
873  switch (ti.get_type()) {
874  case kBOOLEAN:
875  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
876  break;
877  case kTINYINT:
878  datum.is_null = (datum.val.int_val == NULL_TINYINT);
879  break;
880  case kSMALLINT:
881  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
882  break;
883  case kINT:
884  datum.is_null = (datum.val.int_val == NULL_INT);
885  break;
886  case kDECIMAL:
887  case kNUMERIC:
888  case kBIGINT:
889  datum.is_null = (datum.val.int_val == NULL_BIGINT);
890  break;
891  case kTIME:
892  case kTIMESTAMP:
893  case kDATE:
894  case kINTERVAL_DAY_TIME:
896  datum.is_null = (datum.val.int_val == NULL_BIGINT);
897  break;
898  default:
899  datum.is_null = false;
900  }
901  } else if (boost::get<double>(scalar_tv)) {
902  datum.val.real_val = *(boost::get<double>(scalar_tv));
903  if (ti.get_type() == kFLOAT) {
904  datum.is_null = (datum.val.real_val == NULL_FLOAT);
905  } else {
906  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
907  }
908  } else if (boost::get<float>(scalar_tv)) {
909  CHECK_EQ(kFLOAT, ti.get_type());
910  datum.val.real_val = *(boost::get<float>(scalar_tv));
911  datum.is_null = (datum.val.real_val == NULL_FLOAT);
912  } else if (boost::get<NullableString>(scalar_tv)) {
913  auto s_n = boost::get<NullableString>(scalar_tv);
914  auto s = boost::get<std::string>(s_n);
915  if (s) {
916  datum.val.str_val = *s;
917  } else {
918  auto null_p = boost::get<void*>(s_n);
919  CHECK(null_p && !*null_p);
920  }
921  datum.is_null = !s;
922  } else {
923  CHECK(false);
924  }
925  return datum;
926 }
927 
928 void MapDHandler::sql_execute(TQueryResult& _return,
929  const TSessionId& session,
930  const std::string& query_str,
931  const bool column_format,
932  const std::string& nonce,
933  const int32_t first_n,
934  const int32_t at_most_n) {
935  auto session_ptr = get_session_ptr(session);
936  auto query_state = create_query_state(session_ptr, query_str);
937  auto stdlog = STDLOG(query_state);
938  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
939  auto timer = DEBUG_TIMER(__func__);
940 
941  ScopeGuard reset_was_geo_copy_from = [this, &session_ptr] {
942  geo_copy_from_sessions.remove(session_ptr->get_session_id());
943  };
944 
945  if (first_n >= 0 && at_most_n >= 0) {
946  THROW_MAPD_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
947  }
948 
949  if (leaf_aggregator_.leafCount() > 0) {
950  if (!agg_handler_) {
951  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
952  }
953  _return.total_time_ms = measure<>::execution([&]() {
954  try {
955  agg_handler_->cluster_execute(_return,
956  query_state->createQueryStateProxy(),
957  query_state->getQueryStr(),
958  column_format,
959  nonce,
960  first_n,
961  at_most_n,
963  } catch (std::exception& e) {
964  const auto mapd_exception = dynamic_cast<const TMapDException*>(&e);
965  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
967  thrift_exception ? std::string(thrift_exception->what())
968  : mapd_exception ? mapd_exception->error_msg
969  : std::string("Exception: ") + e.what());
970  }
971  _return.nonce = nonce;
972  });
973  } else {
974  _return.total_time_ms = measure<>::execution([&]() {
976  query_state->createQueryStateProxy(),
977  column_format,
978  nonce,
979  session_ptr->get_executor_device_type(),
980  first_n,
981  at_most_n);
982  });
983  }
984 
985  // if the SQL statement we just executed was a geo COPY FROM, the import
986  // parameters were captured, and this flag set, so we do the actual import here
987  if (auto geo_copy_from_state = geo_copy_from_sessions(session_ptr->get_session_id())) {
988  // import_geo_table() calls create_table() which calls this function to
989  // do the work, so reset the flag now to avoid executing this part a
990  // second time at the end of that, which would fail as the table was
991  // already created! Also reset the flag with a ScopeGuard on exiting
992  // this function any other way, such as an exception from the code above!
993  geo_copy_from_sessions.remove(session_ptr->get_session_id());
994 
995  // create table as replicated?
996  TCreateParams create_params;
997  if (geo_copy_from_state->geo_copy_from_partitions == "REPLICATED") {
998  create_params.is_replicated = true;
999  }
1000 
1001  // now do (and time) the import
1002  _return.total_time_ms = measure<>::execution([&]() {
1004  session,
1005  geo_copy_from_state->geo_copy_from_table,
1006  geo_copy_from_state->geo_copy_from_file_name,
1007  copyparams_to_thrift(geo_copy_from_state->geo_copy_from_copy_params),
1008  TRowDescriptor(),
1009  create_params);
1010  });
1011  }
1012  std::string debug_json = timer.stopAndGetJson();
1013  if (!debug_json.empty()) {
1014  _return.__set_debug(std::move(debug_json));
1015  }
1016  stdlog.appendNameValuePairs("execution_time_ms",
1017  _return.execution_time_ms,
1018  "total_time_ms", // BE-3420 - Redundant with duration field
1019  stdlog.duration<std::chrono::milliseconds>());
1020 }
1021 
1022 void MapDHandler::sql_execute_df(TDataFrame& _return,
1023  const TSessionId& session,
1024  const std::string& query_str,
1025  const TDeviceType::type device_type,
1026  const int32_t device_id,
1027  const int32_t first_n) {
1028  auto session_ptr = get_session_ptr(session);
1029  auto query_state = create_query_state(session_ptr, query_str);
1030  auto stdlog = STDLOG(session_ptr, query_state);
1031 
1032  if (device_type == TDeviceType::GPU) {
1033  const auto executor_device_type = session_ptr->get_executor_device_type();
1034  if (executor_device_type != ExecutorDeviceType::GPU) {
1036  std::string("Exception: GPU mode is not allowed in this session"));
1037  }
1038  if (!data_mgr_->gpusPresent()) {
1039  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
1040  }
1041  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
1043  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
1044  }
1045  }
1046  _return.execution_time_ms = 0;
1047 
1048  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
1051 
1052  SQLParser parser;
1053  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
1054  std::string last_parsed;
1055  try {
1056  ParserWrapper pw{query_str};
1057  if (!pw.is_ddl && !pw.is_update_dml &&
1058  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
1059  std::string query_ra;
1061  _return.execution_time_ms += measure<>::execution([&]() {
1062  TPlanResult result;
1063  std::tie(result, locks) = parse_to_ra(
1064  query_state->createQueryStateProxy(), query_str, {}, true, mapd_parameters_);
1065  query_ra = result.plan_result;
1066  });
1067 
1068  if (pw.isCalciteExplain()) {
1069  throw std::runtime_error("explain is not unsupported by current thrift API");
1070  }
1071  execute_rel_alg_df(_return,
1072  query_ra,
1073  query_state->createQueryStateProxy(),
1074  *session_ptr,
1075  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU
1077  static_cast<size_t>(device_id),
1078  first_n);
1079  return;
1080  }
1081  } catch (std::exception& e) {
1082  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1083  }
1085  "Exception: DDL or update DML are not unsupported by current thrift API");
1086 }
1087 
1088 void MapDHandler::sql_execute_gdf(TDataFrame& _return,
1089  const TSessionId& session,
1090  const std::string& query_str,
1091  const int32_t device_id,
1092  const int32_t first_n) {
1093  auto stdlog = STDLOG(get_session_ptr(session));
1094  sql_execute_df(_return, session, query_str, TDeviceType::GPU, device_id, first_n);
1095 }
1096 
1097 // For now we have only one user of a data frame in all cases.
1098 void MapDHandler::deallocate_df(const TSessionId& session,
1099  const TDataFrame& df,
1100  const TDeviceType::type device_type,
1101  const int32_t device_id) {
1102  auto stdlog = STDLOG(get_session_ptr(session));
1103  std::string serialized_cuda_handle = "";
1104  if (device_type == TDeviceType::GPU) {
1105  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
1106  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
1107  TMapDException ex;
1108  ex.error_msg = std::string(
1109  "Exception: current data frame handle is not bookkept or been inserted twice");
1110  LOG(ERROR) << ex.error_msg;
1111  throw ex;
1112  }
1113  serialized_cuda_handle = ipc_handle_to_dev_ptr_[df.df_handle];
1114  ipc_handle_to_dev_ptr_.erase(df.df_handle);
1115  }
1116  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
1117  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
1119  sm_handle, df.sm_size, df_handle, df.df_size, serialized_cuda_handle};
1121  result,
1122  device_type == TDeviceType::CPU ? ExecutorDeviceType::CPU : ExecutorDeviceType::GPU,
1123  device_id,
1124  data_mgr_);
1125 }
1126 
1127 std::string MapDHandler::apply_copy_to_shim(const std::string& query_str) {
1128  auto result = query_str;
1129  {
1130  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
1131  // boost::regex::icase};
1132  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
1133  boost::regex::extended | boost::regex::icase};
1134  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
1135  result.replace(
1136  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
1137  });
1138  }
1139  return result;
1140 }
1141 
1142 void MapDHandler::sql_validate(TTableDescriptor& _return,
1143  const TSessionId& session,
1144  const std::string& query_str) {
1145  auto stdlog = STDLOG(get_session_ptr(session));
1146  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1147  auto query_state = create_query_state(stdlog.getSessionInfo(), query_str);
1148  stdlog.setQueryState(query_state);
1149 
1150  ParserWrapper pw{query_str};
1151  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
1152  pw.is_update_dml) {
1153  THROW_MAPD_EXCEPTION("Can only validate SELECT statements.");
1154  }
1155  MapDHandler::validate_rel_alg(_return, query_state->createQueryStateProxy());
1156 }
1157 
1158 namespace {
1159 
1161  std::unordered_set<std::string> uc_column_names;
1162  std::unordered_set<std::string> uc_column_table_qualifiers;
1163 };
1164 
1165 // Extract what looks like a (qualified) identifier from the partial query.
1166 // The results will be used to rank the auto-completion results: tables which
1167 // contain at least one of the identifiers first.
1169  const std::string& sql) {
1170  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
1171  boost::regex::extended | boost::regex::icase};
1172  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
1173  boost::sregex_token_iterator end;
1174  std::unordered_set<std::string> uc_column_names;
1175  std::unordered_set<std::string> uc_column_table_qualifiers;
1176  for (; tok_it != end; ++tok_it) {
1177  std::string column_name = *tok_it;
1178  std::vector<std::string> column_tokens;
1179  boost::split(column_tokens, column_name, boost::is_any_of("."));
1180  if (column_tokens.size() == 2) {
1181  // If the column name is qualified, take user's word.
1182  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
1183  } else {
1184  uc_column_names.insert(to_upper(column_name));
1185  }
1186  }
1187  return {uc_column_names, uc_column_table_qualifiers};
1188 }
1189 
1190 } // namespace
1191 
1192 void MapDHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1193  const TSessionId& session,
1194  const std::string& sql,
1195  const int cursor) {
1196  auto stdlog = STDLOG(get_session_ptr(session));
1197  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1198  get_completion_hints_unsorted(hints, visible_tables, stdlog, sql, cursor);
1199  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1200  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1201  proj_tokens.uc_column_names, visible_tables, stdlog);
1202  // Add the table qualifiers explicitly specified by the user.
1203  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1204  proj_tokens.uc_column_table_qualifiers.end());
1205  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1206  std::sort(
1207  hints.begin(),
1208  hints.end(),
1209  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1210  if (lhs.type == TCompletionHintType::TABLE &&
1211  rhs.type == TCompletionHintType::TABLE) {
1212  // Between two tables, one which is compatible with the specified projections
1213  // and one which isn't, pick the one which is compatible.
1214  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1215  compatible_table_names.end() &&
1216  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1217  compatible_table_names.end()) {
1218  return true;
1219  }
1220  }
1221  return lhs.type < rhs.type;
1222  });
1223 }
1224 
1225 void MapDHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1226  std::vector<std::string>& visible_tables,
1227  query_state::StdLog& stdlog,
1228  const std::string& sql,
1229  const int cursor) {
1230  const auto& session_info = *stdlog.getConstSessionInfo();
1231  try {
1232  get_tables_impl(visible_tables, session_info, GET_PHYSICAL_TABLES_AND_VIEWS);
1233  // Filter out keywords suggested by Calcite which we don't support.
1235  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1236  } catch (const std::exception& e) {
1237  TMapDException ex;
1238  ex.error_msg = "Exception: " + std::string(e.what());
1239  LOG(ERROR) << ex.error_msg;
1240  throw ex;
1241  }
1242  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1243  const size_t length_to_cursor =
1244  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1245  // Trust hints from Calcite after the FROM keyword.
1246  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1247  return;
1248  }
1249  // Before FROM, the query is too incomplete for context-sensitive completions.
1250  get_token_based_completions(hints, stdlog, visible_tables, sql, cursor);
1251 }
1252 
1253 void MapDHandler::get_token_based_completions(std::vector<TCompletionHint>& hints,
1254  query_state::StdLog& stdlog,
1255  std::vector<std::string>& visible_tables,
1256  const std::string& sql,
1257  const int cursor) {
1258  const auto last_word =
1259  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1260  boost::regex select_expr{R"(\s*select\s+)",
1261  boost::regex::extended | boost::regex::icase};
1262  const size_t length_to_cursor =
1263  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1264  // After SELECT but before FROM, look for all columns in all tables which match the
1265  // prefix.
1266  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1267  const auto column_names_by_table = fill_column_names_by_table(visible_tables, stdlog);
1268  // Trust the fully qualified columns the most.
1269  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1270  return;
1271  }
1272  // Not much information to use, just retrieve column names which match the prefix.
1273  if (should_suggest_column_hints(sql)) {
1274  get_column_hints(hints, last_word, column_names_by_table);
1275  return;
1276  }
1277  const std::string kFromKeyword{"FROM"};
1278  if (boost::istarts_with(kFromKeyword, last_word)) {
1279  TCompletionHint keyword_hint;
1280  keyword_hint.type = TCompletionHintType::KEYWORD;
1281  keyword_hint.replaced = last_word;
1282  keyword_hint.hints.emplace_back(kFromKeyword);
1283  hints.push_back(keyword_hint);
1284  }
1285  } else {
1286  const std::string kSelectKeyword{"SELECT"};
1287  if (boost::istarts_with(kSelectKeyword, last_word)) {
1288  TCompletionHint keyword_hint;
1289  keyword_hint.type = TCompletionHintType::KEYWORD;
1290  keyword_hint.replaced = last_word;
1291  keyword_hint.hints.emplace_back(kSelectKeyword);
1292  hints.push_back(keyword_hint);
1293  }
1294  }
1295 }
1296 
1297 std::unordered_map<std::string, std::unordered_set<std::string>>
1298 MapDHandler::fill_column_names_by_table(std::vector<std::string>& table_names,
1299  query_state::StdLog& stdlog) {
1300  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1301  for (auto it = table_names.begin(); it != table_names.end();) {
1302  TTableDetails table_details;
1303  try {
1304  get_table_details_impl(table_details, stdlog, *it, false, false);
1305  } catch (const TMapDException& e) {
1306  // Remove the corrupted Table/View name from the list for further processing.
1307  it = table_names.erase(it);
1308  continue;
1309  }
1310  for (const auto& column_type : table_details.row_desc) {
1311  column_names_by_table[*it].emplace(column_type.col_name);
1312  }
1313  ++it;
1314  }
1315  return column_names_by_table;
1316 }
1317 
1321 }
1322 
1324  const std::unordered_set<std::string>& uc_column_names,
1325  std::vector<std::string>& table_names,
1326  query_state::StdLog& stdlog) {
1327  std::unordered_set<std::string> compatible_table_names_by_column;
1328  for (auto it = table_names.begin(); it != table_names.end();) {
1329  TTableDetails table_details;
1330  try {
1331  get_table_details_impl(table_details, stdlog, *it, false, false);
1332  } catch (const TMapDException& e) {
1333  // Remove the corrupted Table/View name from the list for further processing.
1334  it = table_names.erase(it);
1335  continue;
1336  }
1337  for (const auto& column_type : table_details.row_desc) {
1338  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1339  compatible_table_names_by_column.emplace(to_upper(*it));
1340  break;
1341  }
1342  }
1343  ++it;
1344  }
1345  return compatible_table_names_by_column;
1346 }
1347 
1348 void MapDHandler::validate_rel_alg(TTableDescriptor& _return,
1349  QueryStateProxy query_state_proxy) {
1350  try {
1351  const auto execute_read_lock = mapd_shared_lock<mapd_shared_mutex>(
1354 
1355  // TODO(adb): for a validate query we do not need write locks, though the lock would
1356  // generally be short lived.
1357  TPlanResult parse_result;
1359  std::tie(parse_result, locks) =
1360  parse_to_ra(query_state_proxy,
1361  query_state_proxy.getQueryState().getQueryStr(),
1362  {},
1363  true,
1365  const auto query_ra = parse_result.plan_result;
1366 
1367  TQueryResult result;
1369  query_state_proxy,
1370  query_ra,
1371  true,
1373  -1,
1374  -1,
1375  /*just_validate=*/true,
1376  /*find_filter_push_down_candidates=*/false,
1378  const auto& row_desc = fixup_row_descriptor(
1379  result.row_set.row_desc,
1380  query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog());
1381  for (const auto& col_desc : row_desc) {
1382  const auto it_ok = _return.insert(std::make_pair(col_desc.col_name, col_desc));
1383  CHECK(it_ok.second);
1384  }
1385  } catch (std::exception& e) {
1386  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1387  }
1388 }
1389 
1390 void MapDHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1391  auto stdlog = STDLOG(get_session_ptr(session));
1392  auto session_ptr = stdlog.getConstSessionInfo();
1393  if (!session_ptr->get_currentUser().isSuper) {
1394  // WARNING: This appears to not include roles a user is a member of,
1395  // if the role has no permissions granted to it.
1396  roles =
1397  SysCatalog::instance().getRoles(session_ptr->get_currentUser().userName,
1398  session_ptr->getCatalog().getCurrentDB().dbId);
1399  } else {
1400  roles = SysCatalog::instance().getRoles(
1401  false, true, session_ptr->get_currentUser().userName);
1402  }
1403 }
1404 
1405 bool MapDHandler::has_role(const TSessionId& sessionId,
1406  const std::string& granteeName,
1407  const std::string& roleName) {
1408  const auto stdlog = STDLOG(get_session_ptr(sessionId));
1409  const auto session_ptr = stdlog.getConstSessionInfo();
1410  const auto current_user = session_ptr->get_currentUser();
1411  if (!current_user.isSuper) {
1412  if (const auto* user = SysCatalog::instance().getUserGrantee(granteeName);
1413  user && current_user.userName != granteeName) {
1414  THROW_MAPD_EXCEPTION("Only super users can check other user's roles.");
1415  } else if (!SysCatalog::instance().isRoleGrantedToGrantee(
1416  current_user.userName, granteeName, true)) {
1418  "Only super users can check roles assignment that have not been directly "
1419  "granted to a user.");
1420  }
1421  }
1422  return SysCatalog::instance().isRoleGrantedToGrantee(granteeName, roleName, false);
1423 }
1424 
1425 static TDBObject serialize_db_object(const std::string& roleName,
1426  const DBObject& inObject) {
1427  TDBObject outObject;
1428  outObject.objectName = inObject.getName();
1429  outObject.grantee = roleName;
1430  const auto ap = inObject.getPrivileges();
1431  switch (inObject.getObjectKey().permissionType) {
1432  case DatabaseDBObjectType:
1433  outObject.privilegeObjectType = TDBObjectType::DatabaseDBObjectType;
1434  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1435  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1436  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1437  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1438 
1439  break;
1440  case TableDBObjectType:
1441  outObject.privilegeObjectType = TDBObjectType::TableDBObjectType;
1442  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1443  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1444  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1445  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1446  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1447  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1448  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1449  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1450 
1451  break;
1452  case DashboardDBObjectType:
1453  outObject.privilegeObjectType = TDBObjectType::DashboardDBObjectType;
1454  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1455  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1456  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1457  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1458 
1459  break;
1460  case ViewDBObjectType:
1461  outObject.privilegeObjectType = TDBObjectType::ViewDBObjectType;
1462  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1463  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1464  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1465  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1466  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1467  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1468 
1469  break;
1470  default:
1471  CHECK(false);
1472  }
1473  const int type_val = static_cast<int>(inObject.getType());
1474  CHECK(type_val >= 0 && type_val < 5);
1475  outObject.objectType = static_cast<TDBObjectType::type>(type_val);
1476  return outObject;
1477 }
1478 
1480  const TDBObjectPermissions& permissions) {
1481  if (!permissions.__isset.database_permissions_) {
1482  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1483  }
1484  auto perms = permissions.database_permissions_;
1485  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1486  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1487  (perms.view_sql_editor_ &&
1489  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1490  return false;
1491  } else {
1492  return true;
1493  }
1494 }
1495 
1497  const TDBObjectPermissions& permissions) {
1498  if (!permissions.__isset.table_permissions_) {
1499  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1500  }
1501  auto perms = permissions.table_permissions_;
1502  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1503  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1504  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1505  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1506  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1507  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1508  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1509  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1510  return false;
1511  } else {
1512  return true;
1513  }
1514 }
1515 
1517  const TDBObjectPermissions& permissions) {
1518  if (!permissions.__isset.dashboard_permissions_) {
1519  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1520  }
1521  auto perms = permissions.dashboard_permissions_;
1522  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1523  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1524  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1525  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1526  return false;
1527  } else {
1528  return true;
1529  }
1530 }
1531 
1533  const TDBObjectPermissions& permissions) {
1534  if (!permissions.__isset.view_permissions_) {
1535  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1536  }
1537  auto perms = permissions.view_permissions_;
1538  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1539  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1540  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1541  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1542  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1543  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1544  return false;
1545  } else {
1546  return true;
1547  }
1548 }
1549 
1550 bool MapDHandler::has_object_privilege(const TSessionId& sessionId,
1551  const std::string& granteeName,
1552  const std::string& objectName,
1553  const TDBObjectType::type objectType,
1554  const TDBObjectPermissions& permissions) {
1555  auto stdlog = STDLOG(get_session_ptr(sessionId));
1556  auto session_ptr = stdlog.getConstSessionInfo();
1557  auto const& cat = session_ptr->getCatalog();
1558  auto const& current_user = session_ptr->get_currentUser();
1559  if (!current_user.isSuper && !SysCatalog::instance().isRoleGrantedToGrantee(
1560  current_user.userName, granteeName, false)) {
1562  "Users except superusers can only check privileges for self or roles granted to "
1563  "them.")
1564  }
1566  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1567  user_meta.isSuper) {
1568  return true;
1569  }
1570  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1571  if (!grnt) {
1572  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1573  }
1575  std::string func_name;
1576  switch (objectType) {
1579  func_name = "database";
1580  break;
1583  func_name = "table";
1584  break;
1587  func_name = "dashboard";
1588  break;
1591  func_name = "view";
1592  break;
1593  default:
1594  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1595  }
1596  DBObject req_object(objectName, type);
1597  req_object.loadKey(cat);
1598 
1599  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1600  if (grantee_object) {
1601  // if grantee has privs on the object
1602  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1603  } else {
1604  // no privileges on that object
1605  return false;
1606  }
1607 }
1608 
1609 void MapDHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1610  const TSessionId& sessionId,
1611  const std::string& roleName) {
1612  auto stdlog = STDLOG(get_session_ptr(sessionId));
1613  auto session_ptr = stdlog.getConstSessionInfo();
1614  auto const& user = session_ptr->get_currentUser();
1615  if (!user.isSuper &&
1616  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1617  return;
1618  }
1619  auto* rl = SysCatalog::instance().getGrantee(roleName);
1620  if (rl) {
1621  auto dbId = session_ptr->getCatalog().getCurrentDB().dbId;
1622  for (auto& dbObject : *rl->getDbObjects(true)) {
1623  if (dbObject.first.dbId != dbId) {
1624  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1625  // usecase for now, though)
1626  continue;
1627  }
1628  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1629  TDBObjectsForRole.push_back(tdbObject);
1630  }
1631  } else {
1632  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
1633  }
1634 }
1635 
1636 void MapDHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1637  const TSessionId& sessionId,
1638  const std::string& objectName,
1639  const TDBObjectType::type type) {
1640  auto stdlog = STDLOG(get_session_ptr(sessionId));
1641  auto session_ptr = stdlog.getConstSessionInfo();
1642  DBObjectType object_type;
1643  switch (type) {
1645  object_type = DBObjectType::DatabaseDBObjectType;
1646  break;
1648  object_type = DBObjectType::TableDBObjectType;
1649  break;
1652  break;
1654  object_type = DBObjectType::ViewDBObjectType;
1655  break;
1656  default:
1657  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
1658  ": unknown object type (" + std::to_string(type) + ").");
1659  }
1660  DBObject object_to_find(objectName, object_type);
1661 
1662  // TODO(adb): Use DatabaseLock to protect method
1663  try {
1664  if (object_type == DashboardDBObjectType) {
1665  if (objectName == "") {
1666  object_to_find = DBObject(-1, object_type);
1667  } else {
1668  object_to_find = DBObject(std::stoi(objectName), object_type);
1669  }
1670  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1671  !objectName.empty()) {
1672  // special handling for view / table
1673  auto const& cat = session_ptr->getCatalog();
1674  auto td = cat.getMetadataForTable(objectName, false);
1675  if (td) {
1676  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1677  object_to_find = DBObject(objectName, object_type);
1678  }
1679  }
1680  object_to_find.loadKey(session_ptr->getCatalog());
1681  } catch (const std::exception&) {
1682  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
1683  }
1684 
1685  // object type on database level
1686  DBObject object_to_find_dblevel("", object_type);
1687  object_to_find_dblevel.loadKey(session_ptr->getCatalog());
1688  // if user is superuser respond with a full priv
1689  if (session_ptr->get_currentUser().isSuper) {
1690  // using ALL_TABLE here to set max permissions
1691  DBObject dbObj{object_to_find.getObjectKey(),
1693  session_ptr->get_currentUser().userId};
1694  dbObj.setName("super");
1695  TDBObjects.push_back(
1696  serialize_db_object(session_ptr->get_currentUser().userName, dbObj));
1697  };
1698 
1699  std::vector<std::string> grantees =
1700  SysCatalog::instance().getRoles(true,
1701  session_ptr->get_currentUser().isSuper,
1702  session_ptr->get_currentUser().userName);
1703  for (const auto& grantee : grantees) {
1704  DBObject* object_found;
1705  auto* gr = SysCatalog::instance().getGrantee(grantee);
1706  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1707  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1708  }
1709  // check object permissions on Database level
1710  if (gr &&
1711  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1712  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1713  }
1714  }
1715 }
1716 
1717 void MapDHandler::get_all_roles_for_user(std::vector<std::string>& roles,
1718  const TSessionId& sessionId,
1719  const std::string& granteeName) {
1720  auto stdlog = STDLOG(get_session_ptr(sessionId));
1721  auto session_ptr = stdlog.getConstSessionInfo();
1722  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
1723  if (grantee) {
1724  if (session_ptr->get_currentUser().isSuper) {
1725  roles = grantee->getRoles();
1726  } else if (grantee->isUser()) {
1727  if (session_ptr->get_currentUser().userName == granteeName) {
1728  roles = grantee->getRoles();
1729  } else {
1731  "Only a superuser is authorized to request list of roles granted to another "
1732  "user.");
1733  }
1734  } else {
1735  CHECK(!grantee->isUser());
1736  // granteeName is actually a roleName here and we can check a role
1737  // only if it is granted to us
1738  if (SysCatalog::instance().isRoleGrantedToGrantee(
1739  session_ptr->get_currentUser().userName, granteeName, false)) {
1740  roles = grantee->getRoles();
1741  } else {
1742  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
1743  }
1744  }
1745  } else {
1746  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
1747  }
1748 }
1749 
1750 namespace {
1752  const std::map<std::string, std::vector<std::string>>& table_col_names) {
1753  std::ostringstream oss;
1754  for (const auto& [table_name, col_names] : table_col_names) {
1755  oss << ":" << table_name;
1756  for (const auto& col_name : col_names) {
1757  oss << "," << col_name;
1758  }
1759  }
1760  return oss.str();
1761 }
1762 } // namespace
1763 
1765  TPixelTableRowResult& _return,
1766  const TSessionId& session,
1767  const int64_t widget_id,
1768  const TPixel& pixel,
1769  const std::map<std::string, std::vector<std::string>>& table_col_names,
1770  const bool column_format,
1771  const int32_t pixel_radius,
1772  const std::string& nonce) {
1773  auto stdlog = STDLOG(get_session_ptr(session),
1774  "widget_id",
1775  widget_id,
1776  "pixel.x",
1777  pixel.x,
1778  "pixel.y",
1779  pixel.y,
1780  "column_format",
1781  column_format,
1782  "pixel_radius",
1783  pixel_radius,
1784  "table_col_names",
1785  dump_table_col_names(table_col_names),
1786  "nonce",
1787  nonce);
1788  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1789  auto session_ptr = stdlog.getSessionInfo();
1790  if (!render_handler_) {
1791  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
1792  }
1793 
1794  try {
1795  render_handler_->get_result_row_for_pixel(_return,
1796  session_ptr,
1797  widget_id,
1798  pixel,
1799  table_col_names,
1800  column_format,
1801  pixel_radius,
1802  nonce);
1803  } catch (std::exception& e) {
1804  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1805  }
1806 }
1807 
1808 namespace {
1809 
1810 inline void fixup_geo_column_descriptor(TColumnType& col_type,
1811  const SQLTypes subtype,
1812  const int output_srid) {
1813  col_type.col_type.precision = static_cast<int>(subtype);
1814  col_type.col_type.scale = output_srid;
1815 }
1816 
1817 } // namespace
1818 
1820  const ColumnDescriptor* cd) {
1821  TColumnType col_type;
1822  col_type.col_name = cd->columnName;
1823  col_type.src_name = cd->sourceName;
1824  col_type.col_id = cd->columnId;
1825  col_type.col_type.type = type_to_thrift(cd->columnType);
1826  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
1827  col_type.col_type.nullable = !cd->columnType.get_notnull();
1828  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
1829  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
1830  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
1831  }
1832  if (IS_GEO(cd->columnType.get_type())) {
1834  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
1835  } else {
1836  col_type.col_type.precision = cd->columnType.get_precision();
1837  col_type.col_type.scale = cd->columnType.get_scale();
1838  }
1839  col_type.is_system = cd->isSystemCol;
1841  cat != nullptr) {
1842  // have to get the actual size of the encoding from the dictionary definition
1843  const int dict_id = cd->columnType.get_comp_param();
1844  if (!cat->getMetadataForDict(dict_id, false)) {
1845  col_type.col_type.comp_param = 0;
1846  return col_type;
1847  }
1848  auto dd = cat->getMetadataForDict(dict_id, false);
1849  if (!dd) {
1850  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
1851  }
1852  col_type.col_type.comp_param = dd->dictNBits;
1853  } else {
1854  col_type.col_type.comp_param =
1855  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
1856  ? 32
1857  : cd->columnType.get_comp_param();
1858  }
1859  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
1860  return col_type;
1861 }
1862 
1863 void MapDHandler::get_internal_table_details(TTableDetails& _return,
1864  const TSessionId& session,
1865  const std::string& table_name) {
1866  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1867  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1868  get_table_details_impl(_return, stdlog, table_name, true, false);
1869 }
1870 
1871 void MapDHandler::get_table_details(TTableDetails& _return,
1872  const TSessionId& session,
1873  const std::string& table_name) {
1874  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
1875  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1876  get_table_details_impl(_return, stdlog, table_name, false, false);
1877 }
1878 
1879 void MapDHandler::get_table_details_impl(TTableDetails& _return,
1880  query_state::StdLog& stdlog,
1881  const std::string& table_name,
1882  const bool get_system,
1883  const bool get_physical) {
1884  try {
1885  auto session_info = stdlog.getSessionInfo();
1886  auto& cat = session_info->getCatalog();
1887  const auto td_with_lock =
1889  cat, table_name, false);
1890  const auto td = td_with_lock();
1891  CHECK(td);
1892 
1893  bool have_privileges_on_view_sources = true;
1894  if (td->isView) {
1895  auto query_state = create_query_state(session_info, td->viewSQL);
1896  stdlog.setQueryState(query_state);
1897  try {
1898  if (hasTableAccessPrivileges(td, *session_info)) {
1899  // TODO(adb): we should take schema read locks on all tables making up the view,
1900  // at minimum
1901  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
1902  query_state->getQueryStr(),
1903  {},
1904  false,
1906  nullptr,
1907  false);
1908  try {
1909  calcite_->checkAccessedObjectsPrivileges(query_state->createQueryStateProxy(),
1910  query_ra.first);
1911  } catch (const std::runtime_error&) {
1912  have_privileges_on_view_sources = false;
1913  }
1914 
1915  TQueryResult result;
1916  execute_rel_alg(result,
1917  query_state->createQueryStateProxy(),
1918  query_ra.first.plan_result,
1919  true,
1921  -1,
1922  -1,
1923  /*just_validate=*/true,
1924  /*find_filter_push_down_candidates=*/false,
1926  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
1927  } else {
1928  throw std::runtime_error(
1929  "Unable to access view " + table_name +
1930  ". The view may not exist, or the logged in user may not "
1931  "have permission to access the view.");
1932  }
1933  } catch (const std::exception& e) {
1934  throw std::runtime_error("View '" + table_name +
1935  "' query has failed with an error: '" +
1936  std::string(e.what()) +
1937  "'.\nThe view must be dropped and re-created to "
1938  "resolve the error. \nQuery:\n" +
1939  query_state->getQueryStr());
1940  }
1941  } else {
1942  if (hasTableAccessPrivileges(td, *session_info)) {
1943  const auto col_descriptors =
1944  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
1945  const auto deleted_cd = cat.getDeletedColumn(td);
1946  for (const auto cd : col_descriptors) {
1947  if (cd == deleted_cd) {
1948  continue;
1949  }
1950  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
1951  }
1952  } else {
1953  throw std::runtime_error(
1954  "Unable to access table " + table_name +
1955  ". The table may not exist, or the logged in user may not "
1956  "have permission to access the table.");
1957  }
1958  }
1959  _return.fragment_size = td->maxFragRows;
1960  _return.page_size = td->fragPageSize;
1961  _return.max_rows = td->maxRows;
1962  _return.view_sql =
1963  (have_privileges_on_view_sources ? td->viewSQL
1964  : "[Not enough privileges to see the view SQL]");
1965  _return.shard_count = td->nShards;
1966  _return.key_metainfo = td->keyMetainfo;
1967  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
1968  _return.partition_detail =
1969  td->partitions.empty()
1970  ? TPartitionDetail::DEFAULT
1971  : (table_is_replicated(td)
1972  ? TPartitionDetail::REPLICATED
1973  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
1974  : TPartitionDetail::OTHER));
1975  } catch (const std::runtime_error& e) {
1976  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
1977  }
1978 }
1979 
1980 void MapDHandler::get_link_view(TFrontendView& _return,
1981  const TSessionId& session,
1982  const std::string& link) {
1983  auto stdlog = STDLOG(get_session_ptr(session));
1984  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
1985  auto session_ptr = stdlog.getConstSessionInfo();
1986  auto const& cat = session_ptr->getCatalog();
1987  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
1988  if (!ld) {
1989  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
1990  }
1991  _return.view_state = ld->viewState;
1992  _return.view_name = ld->link;
1993  _return.update_time = ld->updateTime;
1994  _return.view_metadata = ld->viewMetadata;
1995 }
1996 
1998  const TableDescriptor* td,
1999  const Catalog_Namespace::SessionInfo& session_info) {
2000  auto& cat = session_info.getCatalog();
2001  auto user_metadata = session_info.get_currentUser();
2002 
2003  if (user_metadata.isSuper) {
2004  return true;
2005  }
2006 
2008  dbObject.loadKey(cat);
2009  std::vector<DBObject> privObjects = {dbObject};
2010 
2011  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
2012 }
2013 
2014 void MapDHandler::get_tables_impl(std::vector<std::string>& table_names,
2015  const Catalog_Namespace::SessionInfo& session_info,
2016  const GetTablesType get_tables_type) {
2017  table_names = session_info.getCatalog().getTableNamesForUser(
2018  session_info.get_currentUser(), get_tables_type);
2019 }
2020 
2021 void MapDHandler::get_tables(std::vector<std::string>& table_names,
2022  const TSessionId& session) {
2023  auto stdlog = STDLOG(get_session_ptr(session));
2024  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2026  table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES_AND_VIEWS);
2027 }
2028 
2029 void MapDHandler::get_physical_tables(std::vector<std::string>& table_names,
2030  const TSessionId& session) {
2031  auto stdlog = STDLOG(get_session_ptr(session));
2032  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2033  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_PHYSICAL_TABLES);
2034 }
2035 
2036 void MapDHandler::get_views(std::vector<std::string>& table_names,
2037  const TSessionId& session) {
2038  auto stdlog = STDLOG(get_session_ptr(session));
2039  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2040  get_tables_impl(table_names, *stdlog.getConstSessionInfo(), GET_VIEWS);
2041 }
2042 
2043 void MapDHandler::get_tables_meta(std::vector<TTableMeta>& _return,
2044  const TSessionId& session) {
2045  auto stdlog = STDLOG(get_session_ptr(session));
2046  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2047  auto session_ptr = stdlog.getConstSessionInfo();
2048  auto query_state = create_query_state(session_ptr, "");
2049  stdlog.setQueryState(query_state);
2050 
2051  const auto& cat = session_ptr->getCatalog();
2052  const auto tables = cat.getAllTableMetadata();
2053  _return.reserve(tables.size());
2054 
2055  for (const auto td : tables) {
2056  if (td->shard >= 0) {
2057  // skip shards, they're not standalone tables
2058  continue;
2059  }
2060  if (!hasTableAccessPrivileges(td, *session_ptr)) {
2061  // skip table, as there are no privileges to access it
2062  continue;
2063  }
2064 
2065  TTableMeta ret;
2066  ret.table_name = td->tableName;
2067  ret.is_view = td->isView;
2068  ret.is_replicated = table_is_replicated(td);
2069  ret.shard_count = td->nShards;
2070  ret.max_rows = td->maxRows;
2071  ret.table_id = td->tableId;
2072 
2073  std::vector<TTypeInfo> col_types;
2074  std::vector<std::string> col_names;
2075  size_t num_cols = 0;
2076  if (td->isView) {
2077  try {
2078  const auto query_ra = parse_to_ra(query_state->createQueryStateProxy(),
2079  td->viewSQL,
2080  {},
2081  false,
2083  .first.plan_result;
2084  TQueryResult result;
2086  query_state->createQueryStateProxy(),
2087  query_ra,
2088  true,
2090  -1,
2091  -1,
2092  /*just_validate=*/true,
2093  /*find_push_down_candidates=*/false,
2095  num_cols = result.row_set.row_desc.size();
2096  for (const auto col : result.row_set.row_desc) {
2097  if (col.is_physical) {
2098  num_cols--;
2099  continue;
2100  }
2101  col_types.push_back(col.col_type);
2102  col_names.push_back(col.col_name);
2103  }
2104  } catch (std::exception& e) {
2105  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
2106  }
2107  } else {
2108  try {
2109  if (hasTableAccessPrivileges(td, *session_ptr)) {
2110  const auto col_descriptors =
2111  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
2112  const auto deleted_cd = cat.getDeletedColumn(td);
2113  for (const auto cd : col_descriptors) {
2114  if (cd == deleted_cd) {
2115  continue;
2116  }
2117  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
2118  col_names.push_back(cd->columnName);
2119  }
2120  num_cols = col_descriptors.size();
2121  } else {
2122  continue;
2123  }
2124  } catch (const std::runtime_error& e) {
2125  THROW_MAPD_EXCEPTION(e.what());
2126  }
2127  }
2128 
2129  ret.num_cols = num_cols;
2130  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
2131  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
2132 
2133  _return.push_back(ret);
2134  }
2135 }
2136 
2137 void MapDHandler::get_users(std::vector<std::string>& user_names,
2138  const TSessionId& session) {
2139  auto stdlog = STDLOG(get_session_ptr(session));
2140  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2141  auto session_ptr = stdlog.getConstSessionInfo();
2142  std::list<Catalog_Namespace::UserMetadata> user_list;
2143 
2144  if (!session_ptr->get_currentUser().isSuper) {
2145  user_list = SysCatalog::instance().getAllUserMetadata(
2146  session_ptr->getCatalog().getCurrentDB().dbId);
2147  } else {
2148  user_list = SysCatalog::instance().getAllUserMetadata();
2149  }
2150  for (auto u : user_list) {
2151  user_names.push_back(u.userName);
2152  }
2153 }
2154 
2155 void MapDHandler::get_version(std::string& version) {
2156  version = MAPD_RELEASE;
2157 }
2158 
2159 void MapDHandler::clear_gpu_memory(const TSessionId& session) {
2160  auto stdlog = STDLOG(get_session_ptr(session));
2161  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2162  auto session_ptr = stdlog.getConstSessionInfo();
2163  if (!session_ptr->get_currentUser().isSuper) {
2164  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
2165  }
2166  try {
2168  } catch (const std::exception& e) {
2169  THROW_MAPD_EXCEPTION(e.what());
2170  }
2171  if (render_handler_) {
2172  render_handler_->clear_gpu_memory();
2173  }
2174 
2175  if (leaf_aggregator_.leafCount() > 0) {
2177  }
2178 }
2179 
2180 void MapDHandler::clear_cpu_memory(const TSessionId& session) {
2181  auto stdlog = STDLOG(get_session_ptr(session));
2182  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2183  auto session_ptr = stdlog.getConstSessionInfo();
2184  if (!session_ptr->get_currentUser().isSuper) {
2185  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
2186  }
2187  try {
2189  } catch (const std::exception& e) {
2190  THROW_MAPD_EXCEPTION(e.what());
2191  }
2192  if (render_handler_) {
2193  render_handler_->clear_cpu_memory();
2194  }
2195 
2196  if (leaf_aggregator_.leafCount() > 0) {
2198  }
2199 }
2200 
2202  return INVALID_SESSION_ID;
2203 }
2204 
2205 void MapDHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
2206  const TSessionId& session,
2207  const std::string& memory_level) {
2208  auto stdlog = STDLOG(get_session_ptr(session));
2209  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2210  std::vector<Data_Namespace::MemoryInfo> internal_memory;
2211  Data_Namespace::MemoryLevel mem_level;
2212  if (!memory_level.compare("gpu")) {
2214  internal_memory =
2215  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
2216  } else {
2218  internal_memory =
2219  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
2220  }
2221 
2222  for (auto memInfo : internal_memory) {
2223  TNodeMemoryInfo nodeInfo;
2224  if (leaf_aggregator_.leafCount() > 0) {
2225  nodeInfo.host_name = get_hostname();
2226  }
2227  nodeInfo.page_size = memInfo.pageSize;
2228  nodeInfo.max_num_pages = memInfo.maxNumPages;
2229  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
2230  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
2231  for (auto gpu : memInfo.nodeMemoryData) {
2232  TMemoryData md;
2233  md.slab = gpu.slabNum;
2234  md.start_page = gpu.startPage;
2235  md.num_pages = gpu.numPages;
2236  md.touch = gpu.touch;
2237  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
2238  md.is_free = gpu.memStatus == Buffer_Namespace::MemStatus::FREE;
2239  nodeInfo.node_memory_data.push_back(md);
2240  }
2241  _return.push_back(nodeInfo);
2242  }
2243  if (leaf_aggregator_.leafCount() > 0) {
2244  std::vector<TNodeMemoryInfo> leafSummary =
2245  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
2246  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
2247  }
2248 }
2249 
2250 void MapDHandler::get_databases(std::vector<TDBInfo>& dbinfos,
2251  const TSessionId& session) {
2252  auto stdlog = STDLOG(get_session_ptr(session));
2253  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2254  auto session_ptr = stdlog.getConstSessionInfo();
2255  const auto& user = session_ptr->get_currentUser();
2257  SysCatalog::instance().getDatabaseListForUser(user);
2258  for (auto& db : dbs) {
2259  TDBInfo dbinfo;
2260  dbinfo.db_name = std::move(db.dbName);
2261  dbinfo.db_owner = std::move(db.dbOwnerName);
2262  dbinfos.push_back(std::move(dbinfo));
2263  }
2264 }
2265 
2266 void MapDHandler::set_execution_mode(const TSessionId& session,
2267  const TExecuteMode::type mode) {
2268  auto stdlog = STDLOG(get_session_ptr(session));
2269  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2270  mapd_unique_lock<mapd_shared_mutex> write_lock(sessions_mutex_);
2271  auto session_it = get_session_it_unsafe(session, write_lock);
2272  if (leaf_aggregator_.leafCount() > 0) {
2273  leaf_aggregator_.set_execution_mode(session, mode);
2274  try {
2275  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2276  } catch (const TMapDException& e) {
2277  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2278  }
2279  return;
2280  }
2281  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2282 }
2283 
2284 namespace {
2285 
2287  if (td && td->nShards) {
2288  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2289  }
2290 }
2291 
2292 } // namespace
2293 
2294 void MapDHandler::load_table_binary(const TSessionId& session,
2295  const std::string& table_name,
2296  const std::vector<TRow>& rows) {
2297  try {
2298  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2299  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2300  auto session_ptr = stdlog.getConstSessionInfo();
2301  check_read_only("load_table_binary");
2302  auto& cat = session_ptr->getCatalog();
2303  const auto td_with_lock =
2305  cat, table_name, true);
2306  const auto td = td_with_lock();
2307  CHECK(td);
2308  if (g_cluster && !leaf_aggregator_.leafCount()) {
2309  // Sharded table rows need to be routed to the leaf by an aggregator.
2311  }
2312  check_table_load_privileges(*session_ptr, table_name);
2313  if (rows.empty()) {
2314  return;
2315  }
2316  std::unique_ptr<Importer_NS::Loader> loader;
2317  if (leaf_aggregator_.leafCount() > 0) {
2318  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2319  } else {
2320  loader.reset(new Importer_NS::Loader(cat, td));
2321  }
2322  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2323  // Subtracting 1 (rowid) until TableDescriptor is updated.
2324  if (rows.front().cols.size() !=
2325  static_cast<size_t>(td->nColumns) - (td->hasDeletedCol ? 2 : 1)) {
2326  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2327  }
2328  auto col_descs = loader->get_column_descs();
2329  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2330  for (auto cd : col_descs) {
2331  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2332  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2333  }
2334  for (auto const& row : rows) {
2335  size_t col_idx = 0;
2336  try {
2337  for (auto cd : col_descs) {
2338  import_buffers[col_idx]->add_value(
2339  cd, row.cols[col_idx], row.cols[col_idx].is_null);
2340  col_idx++;
2341  }
2342  } catch (const std::exception& e) {
2343  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2344  import_buffers[col_idx_to_pop]->pop_value();
2345  }
2346  LOG(ERROR) << "Input exception thrown: " << e.what()
2347  << ". Row discarded, issue at column : " << (col_idx + 1)
2348  << " data :" << row;
2349  }
2350  }
2351  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2352  session_ptr->getCatalog(), table_name);
2353  loader->load(import_buffers, rows.size());
2354  } catch (const std::exception& e) {
2355  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2356  }
2357 }
2358 
2359 std::unique_ptr<lockmgr::AbstractLockContainer<const TableDescriptor*>>
2361  const Catalog_Namespace::SessionInfo& session_info,
2362  const std::string& table_name,
2363  size_t num_cols,
2364  std::unique_ptr<Importer_NS::Loader>* loader,
2365  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>>* import_buffers) {
2366  auto& cat = session_info.getCatalog();
2367  auto td_with_lock =
2368  std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::ReadLock>>(
2370  cat, table_name, true));
2371  const auto td = (*td_with_lock)();
2372  CHECK(td);
2373  if (g_cluster && !leaf_aggregator_.leafCount()) {
2374  // Sharded table rows need to be routed to the leaf by an aggregator.
2376  }
2377  check_table_load_privileges(session_info, table_name);
2378 
2379  if (leaf_aggregator_.leafCount() > 0) {
2380  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2381  } else {
2382  loader->reset(new Importer_NS::Loader(cat, td));
2383  }
2384 
2385  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2386  // Subtracting 1 (rowid) until TableDescriptor is updated.
2387  auto col_descs = (*loader)->get_column_descs();
2388  const auto geo_physical_cols = std::count_if(
2389  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2390  const auto num_table_cols =
2391  static_cast<size_t>(td->nColumns) - geo_physical_cols - (td->hasDeletedCol ? 2 : 1);
2392  if (num_cols != num_table_cols) {
2393  throw std::runtime_error("Number of columns to load (" + std::to_string(num_cols) +
2394  ") does not match number of columns in table " +
2395  td->tableName + " (" + std::to_string(num_table_cols) + ")");
2396  }
2397 
2398  *import_buffers = Importer_NS::setup_column_loaders(td, loader->get());
2399  return std::move(td_with_lock);
2400 }
2401 
2402 void MapDHandler::load_table_binary_columnar(const TSessionId& session,
2403  const std::string& table_name,
2404  const std::vector<TColumn>& cols) {
2405  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2406  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2407  auto session_ptr = stdlog.getConstSessionInfo();
2408  check_read_only("load_table_binary_columnar");
2409  auto const& cat = session_ptr->getCatalog();
2410 
2411  std::unique_ptr<Importer_NS::Loader> loader;
2412  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2413  auto read_lock = prepare_columnar_loader(
2414  *session_ptr, table_name, cols.size(), &loader, &import_buffers);
2415  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2416  session_ptr->getCatalog(), table_name);
2417 
2418  size_t numRows = 0;
2419  size_t import_idx = 0; // index into the TColumn vector being loaded
2420  size_t col_idx = 0; // index into column description vector
2421  try {
2422  size_t skip_physical_cols = 0;
2423  for (auto cd : loader->get_column_descs()) {
2424  if (skip_physical_cols > 0) {
2425  if (!cd->isGeoPhyCol) {
2426  throw std::runtime_error("Unexpected physical column");
2427  }
2428  skip_physical_cols--;
2429  continue;
2430  }
2431  size_t colRows = import_buffers[col_idx]->add_values(cd, cols[import_idx]);
2432  if (col_idx == 0) {
2433  numRows = colRows;
2434  } else if (colRows != numRows) {
2435  std::ostringstream oss;
2436  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
2437  << cd->columnName << " , expecting " << numRows << " rows, column "
2438  << col_idx << " has " << colRows << " rows";
2439  THROW_MAPD_EXCEPTION(oss.str());
2440  }
2441  // Advance to the next column in the table
2442  col_idx++;
2443 
2444  // For geometry columns: process WKT strings and fill physical columns
2445  if (cd->columnType.is_geometry()) {
2446  auto geo_col_idx = col_idx - 1;
2447  const auto wkt_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2448  std::vector<std::vector<double>> coords_column, bounds_column;
2449  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2450  int render_group = 0;
2451  SQLTypeInfo ti = cd->columnType;
2452  if (numRows != wkt_column->size() ||
2454  ti,
2455  coords_column,
2456  bounds_column,
2457  ring_sizes_column,
2458  poly_rings_column,
2459  false)) {
2460  std::ostringstream oss;
2461  oss << "load_table_binary_columnar: Invalid geometry in column "
2462  << cd->columnName;
2463  THROW_MAPD_EXCEPTION(oss.str());
2464  }
2465  // Populate physical columns, advance col_idx
2467  cd,
2468  import_buffers,
2469  col_idx,
2470  coords_column,
2471  bounds_column,
2472  ring_sizes_column,
2473  poly_rings_column,
2474  render_group);
2475  skip_physical_cols = cd->columnType.get_physical_cols();
2476  }
2477  // Advance to the next column of values being loaded
2478  import_idx++;
2479  }
2480  } catch (const std::exception& e) {
2481  std::ostringstream oss;
2482  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
2483  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2484  THROW_MAPD_EXCEPTION(oss.str());
2485  }
2486  loader->load(import_buffers, numRows);
2487 }
2488 
2489 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
2490 
2491 #define ARROW_THRIFT_THROW_NOT_OK(s) \
2492  do { \
2493  ::arrow::Status _s = (s); \
2494  if (UNLIKELY(!_s.ok())) { \
2495  TMapDException ex; \
2496  ex.error_msg = _s.ToString(); \
2497  LOG(ERROR) << s.ToString(); \
2498  throw ex; \
2499  } \
2500  } while (0)
2501 
2502 namespace {
2503 
2504 RecordBatchVector loadArrowStream(const std::string& stream) {
2505  RecordBatchVector batches;
2506  try {
2507  // TODO(wesm): Make this simpler in general, see ARROW-1600
2508  auto stream_buffer =
2509  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
2510  static_cast<int64_t>(stream.size()));
2511 
2512  arrow::io::BufferReader buf_reader(stream_buffer);
2513  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
2515  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader, &batch_reader));
2516 
2517  while (true) {
2518  std::shared_ptr<arrow::RecordBatch> batch;
2519  // Read batch (zero-copy) from the stream
2520  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
2521  if (batch == nullptr) {
2522  break;
2523  }
2524  batches.emplace_back(std::move(batch));
2525  }
2526  } catch (const std::exception& e) {
2527  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
2528  }
2529  return batches;
2530 }
2531 
2532 } // namespace
2533 
2534 void MapDHandler::load_table_binary_arrow(const TSessionId& session,
2535  const std::string& table_name,
2536  const std::string& arrow_stream) {
2537  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2538  auto session_ptr = stdlog.getConstSessionInfo();
2539  check_read_only("load_table_binary_arrow");
2540 
2541  RecordBatchVector batches = loadArrowStream(arrow_stream);
2542 
2543  // Assuming have one batch for now
2544  if (batches.size() != 1) {
2545  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
2546  }
2547 
2548  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
2549 
2550  std::unique_ptr<Importer_NS::Loader> loader;
2551  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2552  auto read_lock = prepare_columnar_loader(*session_ptr,
2553  table_name,
2554  static_cast<size_t>(batch->num_columns()),
2555  &loader,
2556  &import_buffers);
2557  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2558  session_ptr->getCatalog(), table_name);
2559 
2560  size_t numRows = 0;
2561  size_t col_idx = 0;
2562  try {
2563  for (auto cd : loader->get_column_descs()) {
2564  auto& array = *batch->column(col_idx);
2565  Importer_NS::ArraySliceRange row_slice(0, array.length());
2566  numRows =
2567  import_buffers[col_idx]->add_arrow_values(cd, array, true, row_slice, nullptr);
2568  col_idx++;
2569  }
2570  } catch (const std::exception& e) {
2571  LOG(ERROR) << "Input exception thrown: " << e.what()
2572  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2573  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
2574  // other import paths
2575  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2576  }
2577  loader->load(import_buffers, numRows);
2578 }
2579 
2580 void MapDHandler::load_table(const TSessionId& session,
2581  const std::string& table_name,
2582  const std::vector<TStringRow>& rows) {
2583  try {
2584  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
2585  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
2586  auto session_ptr = stdlog.getConstSessionInfo();
2587  check_read_only("load_table");
2588  auto& cat = session_ptr->getCatalog();
2589  const auto td_with_lock =
2591  cat, table_name, true);
2592  const auto td = td_with_lock();
2593  CHECK(td);
2594 
2595  if (g_cluster && !leaf_aggregator_.leafCount()) {
2596  // Sharded table rows need to be routed to the leaf by an aggregator.
2598  }
2599  check_table_load_privileges(*session_ptr, table_name);
2600  if (rows.empty()) {
2601  return;
2602  }
2603  std::unique_ptr<Importer_NS::Loader> loader;
2604  if (leaf_aggregator_.leafCount() > 0) {
2605  loader.reset(new DistributedLoader(*session_ptr, td, &leaf_aggregator_));
2606  } else {
2607  loader.reset(new Importer_NS::Loader(cat, td));
2608  }
2609  auto col_descs = loader->get_column_descs();
2610  auto geo_physical_cols = std::count_if(
2611  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2612  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2613  // Subtracting 1 (rowid) until TableDescriptor is updated.
2614  if (rows.front().cols.size() != static_cast<size_t>(td->nColumns) -
2615  geo_physical_cols - (td->hasDeletedCol ? 2 : 1)) {
2616  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name +
2617  " (" + std::to_string(rows.front().cols.size()) + " vs " +
2618  std::to_string(td->nColumns - geo_physical_cols - 1) + ")");
2619  }
2620  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2621  for (auto cd : col_descs) {
2622  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2623  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2624  }
2625  Importer_NS::CopyParams copy_params;
2626  size_t rows_completed = 0;
2627  for (auto const& row : rows) {
2628  size_t import_idx = 0; // index into the TStringRow being loaded
2629  size_t col_idx = 0; // index into column description vector
2630  try {
2631  size_t skip_physical_cols = 0;
2632  for (auto cd : col_descs) {
2633  if (skip_physical_cols > 0) {
2634  if (!cd->isGeoPhyCol) {
2635  throw std::runtime_error("Unexpected physical column");
2636  }
2637  skip_physical_cols--;
2638  continue;
2639  }
2640  import_buffers[col_idx]->add_value(cd,
2641  row.cols[import_idx].str_val,
2642  row.cols[import_idx].is_null,
2643  copy_params);
2644  // Advance to the next column within the table
2645  col_idx++;
2646 
2647  if (cd->columnType.is_geometry()) {
2648  // Populate physical columns
2649  std::vector<double> coords, bounds;
2650  std::vector<int> ring_sizes, poly_rings;
2651  int render_group = 0;
2652  SQLTypeInfo ti;
2653  if (row.cols[import_idx].is_null ||
2655  row.cols[import_idx].str_val,
2656  ti,
2657  coords,
2658  bounds,
2659  ring_sizes,
2660  poly_rings,
2661  false)) {
2662  throw std::runtime_error("Invalid geometry");
2663  }
2664  if (cd->columnType.get_type() != ti.get_type()) {
2665  throw std::runtime_error("Geometry type mismatch");
2666  }
2668  cd,
2669  import_buffers,
2670  col_idx,
2671  coords,
2672  bounds,
2673  ring_sizes,
2674  poly_rings,
2675  render_group);
2676  skip_physical_cols = cd->columnType.get_physical_cols();
2677  }
2678  // Advance to the next field within the row
2679  import_idx++;
2680  }
2681  rows_completed++;
2682  } catch (const std::exception& e) {
2683  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2684  import_buffers[col_idx_to_pop]->pop_value();
2685  }
2686  LOG(ERROR) << "Input exception thrown: " << e.what()
2687  << ". Row discarded, issue at column : " << (col_idx + 1)
2688  << " data :" << row;
2689  }
2690  }
2691  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
2692  session_ptr->getCatalog(), table_name);
2693  loader->load(import_buffers, rows_completed);
2694  } catch (const std::exception& e) {
2695  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
2696  }
2697 }
2698 
2699 char MapDHandler::unescape_char(std::string str) {
2700  char out = str[0];
2701  if (str.size() == 2 && str[0] == '\\') {
2702  if (str[1] == 't') {
2703  out = '\t';
2704  } else if (str[1] == 'n') {
2705  out = '\n';
2706  } else if (str[1] == '0') {
2707  out = '\0';
2708  } else if (str[1] == '\'') {
2709  out = '\'';
2710  } else if (str[1] == '\\') {
2711  out = '\\';
2712  }
2713  }
2714  return out;
2715 }
2716 
2718  Importer_NS::CopyParams copy_params;
2719  switch (cp.has_header) {
2720  case TImportHeaderRow::AUTODETECT:
2722  break;
2723  case TImportHeaderRow::NO_HEADER:
2725  break;
2726  case TImportHeaderRow::HAS_HEADER:
2728  break;
2729  default:
2730  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
2731  std::to_string((int)cp.has_header));
2732  break;
2733  }
2734  copy_params.quoted = cp.quoted;
2735  if (cp.delimiter.length() > 0) {
2736  copy_params.delimiter = unescape_char(cp.delimiter);
2737  } else {
2738  copy_params.delimiter = '\0';
2739  }
2740  if (cp.null_str.length() > 0) {
2741  copy_params.null_str = cp.null_str;
2742  }
2743  if (cp.quote.length() > 0) {
2744  copy_params.quote = unescape_char(cp.quote);
2745  }
2746  if (cp.escape.length() > 0) {
2747  copy_params.escape = unescape_char(cp.escape);
2748  }
2749  if (cp.line_delim.length() > 0) {
2750  copy_params.line_delim = unescape_char(cp.line_delim);
2751  }
2752  if (cp.array_delim.length() > 0) {
2753  copy_params.array_delim = unescape_char(cp.array_delim);
2754  }
2755  if (cp.array_begin.length() > 0) {
2756  copy_params.array_begin = unescape_char(cp.array_begin);
2757  }
2758  if (cp.array_end.length() > 0) {
2759  copy_params.array_end = unescape_char(cp.array_end);
2760  }
2761  if (cp.threads != 0) {
2762  copy_params.threads = cp.threads;
2763  }
2764  if (cp.s3_access_key.length() > 0) {
2765  copy_params.s3_access_key = cp.s3_access_key;
2766  }
2767  if (cp.s3_secret_key.length() > 0) {
2768  copy_params.s3_secret_key = cp.s3_secret_key;
2769  }
2770  if (cp.s3_region.length() > 0) {
2771  copy_params.s3_region = cp.s3_region;
2772  }
2773  if (cp.s3_endpoint.length() > 0) {
2774  copy_params.s3_endpoint = cp.s3_endpoint;
2775  }
2776  switch (cp.file_type) {
2777  case TFileType::POLYGON:
2779  break;
2780  case TFileType::DELIMITED:
2782  break;
2783 #ifdef ENABLE_IMPORT_PARQUET
2784  case TFileType::PARQUET:
2785  copy_params.file_type = Importer_NS::FileType::PARQUET;
2786  break;
2787 #endif
2788  default:
2789  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
2790  std::to_string((int)cp.file_type));
2791  break;
2792  }
2793  switch (cp.geo_coords_encoding) {
2794  case TEncodingType::GEOINT:
2795  copy_params.geo_coords_encoding = kENCODING_GEOINT;
2796  break;
2797  case TEncodingType::NONE:
2798  copy_params.geo_coords_encoding = kENCODING_NONE;
2799  break;
2800  default:
2801  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
2802  std::to_string((int)cp.geo_coords_encoding));
2803  break;
2804  }
2805  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2806  switch (cp.geo_coords_type) {
2807  case TDatumType::GEOGRAPHY:
2808  copy_params.geo_coords_type = kGEOGRAPHY;
2809  break;
2810  case TDatumType::GEOMETRY:
2811  copy_params.geo_coords_type = kGEOMETRY;
2812  break;
2813  default:
2814  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
2815  std::to_string((int)cp.geo_coords_type));
2816  break;
2817  }
2818  switch (cp.geo_coords_srid) {
2819  case 4326:
2820  case 3857:
2821  case 900913:
2822  copy_params.geo_coords_srid = cp.geo_coords_srid;
2823  break;
2824  default:
2825  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
2826  std::to_string((int)cp.geo_coords_srid));
2827  break;
2828  }
2829  copy_params.sanitize_column_names = cp.sanitize_column_names;
2830  copy_params.geo_layer_name = cp.geo_layer_name;
2831  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2832  copy_params.geo_explode_collections = cp.geo_explode_collections;
2833  return copy_params;
2834 }
2835 
2837  TCopyParams copy_params;
2838  copy_params.delimiter = cp.delimiter;
2839  copy_params.null_str = cp.null_str;
2840  switch (cp.has_header) {
2842  copy_params.has_header = TImportHeaderRow::AUTODETECT;
2843  break;
2845  copy_params.has_header = TImportHeaderRow::NO_HEADER;
2846  break;
2848  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
2849  break;
2850  default:
2851  CHECK(false);
2852  break;
2853  }
2854  copy_params.quoted = cp.quoted;
2855  copy_params.quote = cp.quote;
2856  copy_params.escape = cp.escape;
2857  copy_params.line_delim = cp.line_delim;
2858  copy_params.array_delim = cp.array_delim;
2859  copy_params.array_begin = cp.array_begin;
2860  copy_params.array_end = cp.array_end;
2861  copy_params.threads = cp.threads;
2862  copy_params.s3_access_key = cp.s3_access_key;
2863  copy_params.s3_secret_key = cp.s3_secret_key;
2864  copy_params.s3_region = cp.s3_region;
2865  copy_params.s3_endpoint = cp.s3_endpoint;
2866  switch (cp.file_type) {
2868  copy_params.file_type = TFileType::POLYGON;
2869  break;
2870  default:
2871  copy_params.file_type = TFileType::DELIMITED;
2872  break;
2873  }
2874  switch (cp.geo_coords_encoding) {
2875  case kENCODING_GEOINT:
2876  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
2877  break;
2878  default:
2879  copy_params.geo_coords_encoding = TEncodingType::NONE;
2880  break;
2881  }
2882  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2883  switch (cp.geo_coords_type) {
2884  case kGEOGRAPHY:
2885  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
2886  break;
2887  case kGEOMETRY:
2888  copy_params.geo_coords_type = TDatumType::GEOMETRY;
2889  break;
2890  default:
2891  CHECK(false);
2892  break;
2893  }
2894  copy_params.geo_coords_srid = cp.geo_coords_srid;
2895  copy_params.sanitize_column_names = cp.sanitize_column_names;
2896  copy_params.geo_layer_name = cp.geo_layer_name;
2897  copy_params.geo_assign_render_groups = cp.geo_assign_render_groups;
2898  copy_params.geo_explode_collections = cp.geo_explode_collections;
2899  return copy_params;
2900 }
2901 
2902 void add_vsi_network_prefix(std::string& path) {
2903  // do we support network file access?
2905 
2906  // modify head of filename based on source location
2907  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
2908  if (!gdal_network) {
2910  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
2911  }
2912  // invoke GDAL CURL virtual file reader
2913  path = "/vsicurl/" + path;
2914  } else if (boost::istarts_with(path, "s3://")) {
2915  if (!gdal_network) {
2917  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
2918  }
2919  // invoke GDAL S3 virtual file reader
2920  boost::replace_first(path, "s3://", "/vsis3/");
2921  }
2922 }
2923 
2924 void add_vsi_geo_prefix(std::string& path) {
2925  // single gzip'd file (not an archive)?
2926  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
2927  path = "/vsigzip/" + path;
2928  }
2929 }
2930 
2931 void add_vsi_archive_prefix(std::string& path) {
2932  // check for compressed file or file bundle
2933  if (boost::iends_with(path, ".zip")) {
2934  // zip archive
2935  path = "/vsizip/" + path;
2936  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
2937  boost::iends_with(path, ".tar.gz")) {
2938  // tar archive (compressed or uncompressed)
2939  path = "/vsitar/" + path;
2940  }
2941 }
2942 
2943 std::string remove_vsi_prefixes(const std::string& path_in) {
2944  std::string path(path_in);
2945 
2946  // these will be first
2947  if (boost::istarts_with(path, "/vsizip/")) {
2948  boost::replace_first(path, "/vsizip/", "");
2949  } else if (boost::istarts_with(path, "/vsitar/")) {
2950  boost::replace_first(path, "/vsitar/", "");
2951  } else if (boost::istarts_with(path, "/vsigzip/")) {
2952  boost::replace_first(path, "/vsigzip/", "");
2953  }
2954 
2955  // then these
2956  if (boost::istarts_with(path, "/vsicurl/")) {
2957  boost::replace_first(path, "/vsicurl/", "");
2958  } else if (boost::istarts_with(path, "/vsis3/")) {
2959  boost::replace_first(path, "/vsis3/", "s3://");
2960  }
2961 
2962  return path;
2963 }
2964 
2965 bool path_is_relative(const std::string& path) {
2966  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
2967  boost::istarts_with(path, "https://")) {
2968  return false;
2969  }
2970  return !boost::filesystem::path(path).is_absolute();
2971 }
2972 
2973 bool path_has_valid_filename(const std::string& path) {
2974  auto filename = boost::filesystem::path(path).filename().string();
2975  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
2976  return false;
2977  }
2978  return true;
2979 }
2980 
2981 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
2982  if (!path_has_valid_filename(path)) {
2983  return false;
2984  }
2985  if (include_gz) {
2986  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
2987  return true;
2988  }
2989  }
2990  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
2991  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
2992  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
2993  boost::iends_with(path, ".gdb.zip")) {
2994  return true;
2995  }
2996  return false;
2997 }
2998 
2999 bool is_a_supported_archive_file(const std::string& path) {
3000  if (!path_has_valid_filename(path)) {
3001  return false;
3002  }
3003  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
3004  return true;
3005  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
3006  boost::iends_with(path, ".tar.gz")) {
3007  return true;
3008  }
3009  return false;
3010 }
3011 
3012 std::string find_first_geo_file_in_archive(const std::string& archive_path,
3013  const Importer_NS::CopyParams& copy_params) {
3014  // get the recursive list of all files in the archive
3015  std::vector<std::string> files =
3016  Importer_NS::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
3017 
3018  // report the list
3019  LOG(INFO) << "Found " << files.size() << " files in Archive "
3020  << remove_vsi_prefixes(archive_path);
3021  for (const auto& file : files) {
3022  LOG(INFO) << " " << file;
3023  }
3024 
3025  // scan the list for the first candidate file
3026  bool found_suitable_file = false;
3027  std::string file_name;
3028  for (const auto& file : files) {
3029  if (is_a_supported_geo_file(file, false)) {
3030  file_name = file;
3031  found_suitable_file = true;
3032  break;
3033  }
3034  }
3035 
3036  // if we didn't find anything
3037  if (!found_suitable_file) {
3038  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
3039  remove_vsi_prefixes(archive_path);
3040  file_name.clear();
3041  }
3042 
3043  // done
3044  return file_name;
3045 }
3046 
3047 void MapDHandler::detect_column_types(TDetectResult& _return,
3048  const TSessionId& session,
3049  const std::string& file_name_in,
3050  const TCopyParams& cp) {
3051  auto stdlog = STDLOG(get_session_ptr(session));
3052  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3053  check_read_only("detect_column_types");
3054 
3056 
3057  std::string file_name{file_name_in};
3058 
3059  if (path_is_relative(file_name)) {
3060  // assume relative paths are relative to data_path / mapd_import / <session>
3061  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3062  boost::filesystem::path(file_name).filename();
3063  file_name = file_path.string();
3064  }
3065 
3066  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
3067  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
3068  if (is_a_supported_geo_file(file_name, true)) {
3069  // prepare to detect geo file directly
3070  add_vsi_network_prefix(file_name);
3071  add_vsi_geo_prefix(file_name);
3072  } else if (is_a_supported_archive_file(file_name)) {
3073  // find the archive file
3074  add_vsi_network_prefix(file_name);
3075  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
3076  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3077  }
3078  // find geo file in archive
3079  add_vsi_archive_prefix(file_name);
3080  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3081  // prepare to detect that geo file
3082  if (geo_file.size()) {
3083  file_name = file_name + std::string("/") + geo_file;
3084  }
3085  } else {
3086  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
3087  file_name_in);
3088  }
3089  }
3090 
3091  auto file_path = boost::filesystem::path(file_name);
3092  // can be a s3 url
3093  if (!boost::istarts_with(file_name, "s3://")) {
3094  if (!boost::filesystem::path(file_name).is_absolute()) {
3095  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3096  boost::filesystem::path(file_name).filename();
3097  file_name = file_path.string();
3098  }
3099 
3100  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
3101  // check for geo file
3102  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3103  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3104  }
3105  } else {
3106  // check for regular file
3107  if (!boost::filesystem::exists(file_path)) {
3108  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3109  }
3110  }
3111  }
3112  try {
3113  if (copy_params.file_type == Importer_NS::FileType::DELIMITED
3114 #ifdef ENABLE_IMPORT_PARQUET
3115  || (copy_params.file_type == Importer_NS::FileType::PARQUET)
3116 #endif
3117  ) {
3118  Importer_NS::Detector detector(file_path, copy_params);
3119  std::vector<SQLTypes> best_types = detector.best_sqltypes;
3120  std::vector<EncodingType> best_encodings = detector.best_encodings;
3121  std::vector<std::string> headers = detector.get_headers();
3122  copy_params = detector.get_copy_params();
3123 
3124  _return.copy_params = copyparams_to_thrift(copy_params);
3125  _return.row_set.row_desc.resize(best_types.size());
3126  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
3127  TColumnType col;
3128  SQLTypes t = best_types[col_idx];
3129  EncodingType encodingType = best_encodings[col_idx];
3130  SQLTypeInfo ti(t, false, encodingType);
3131  if (IS_GEO(t)) {
3132  // set this so encoding_to_thrift does the right thing
3133  ti.set_compression(copy_params.geo_coords_encoding);
3134  // fill in these directly
3135  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
3136  col.col_type.scale = copy_params.geo_coords_srid;
3137  col.col_type.comp_param = copy_params.geo_coords_comp_param;
3138  }
3139  col.col_type.type = type_to_thrift(ti);
3140  col.col_type.encoding = encoding_to_thrift(ti);
3141  if (copy_params.sanitize_column_names) {
3142  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
3143  } else {
3144  col.col_name = headers[col_idx];
3145  }
3146  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
3147  _return.row_set.row_desc[col_idx] = col;
3148  }
3149  size_t num_samples = 100;
3150  auto sample_data = detector.get_sample_rows(num_samples);
3151 
3152  TRow sample_row;
3153  for (auto row : sample_data) {
3154  sample_row.cols.clear();
3155  for (const auto& s : row) {
3156  TDatum td;
3157  td.val.str_val = s;
3158  td.is_null = s.empty();
3159  sample_row.cols.push_back(td);
3160  }
3161  _return.row_set.rows.push_back(sample_row);
3162  }
3163  } else if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
3164  // @TODO simon.eves get this from somewhere!
3165  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
3166 
3167  check_geospatial_files(file_path, copy_params);
3168  std::list<ColumnDescriptor> cds = Importer_NS::Importer::gdalToColumnDescriptors(
3169  file_path.string(), geoColumnName, copy_params);
3170  for (auto cd : cds) {
3171  if (copy_params.sanitize_column_names) {
3172  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
3173  }
3174  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
3175  }
3176  std::map<std::string, std::vector<std::string>> sample_data;
3178  file_path.string(), geoColumnName, sample_data, 100, copy_params);
3179  if (sample_data.size() > 0) {
3180  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
3181  TRow sample_row;
3182  for (auto cd : cds) {
3183  TDatum td;
3184  td.val.str_val = sample_data[cd.sourceName].at(i);
3185  td.is_null = td.val.str_val.empty();
3186  sample_row.cols.push_back(td);
3187  }
3188  _return.row_set.rows.push_back(sample_row);
3189  }
3190  }
3191  _return.copy_params = copyparams_to_thrift(copy_params);
3192  }
3193  } catch (const std::exception& e) {
3194  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
3195  }
3196 }
3197 
3198 void MapDHandler::render_vega(TRenderResult& _return,
3199  const TSessionId& session,
3200  const int64_t widget_id,
3201  const std::string& vega_json,
3202  const int compression_level,
3203  const std::string& nonce) {
3204  auto stdlog = STDLOG(get_session_ptr(session),
3205  "widget_id",
3206  widget_id,
3207  "compression_level",
3208  compression_level,
3209  "vega_json",
3210  vega_json,
3211  "nonce",
3212  nonce);
3213  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3214  auto session_ptr = stdlog.getConstSessionInfo();
3215  if (!render_handler_) {
3216  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
3217  }
3218 
3219  _return.total_time_ms = measure<>::execution([&]() {
3220  try {
3221  render_handler_->render_vega(
3222  _return,
3223  std::make_shared<Catalog_Namespace::SessionInfo>(*session_ptr),
3224  widget_id,
3225  vega_json,
3226  compression_level,
3227  nonce);
3228  } catch (std::exception& e) {
3229  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3230  }
3231  });
3232 }
3233 
3235  int32_t dashboard_id,
3236  AccessPrivileges requestedPermissions) {
3237  DBObject object(dashboard_id, DashboardDBObjectType);
3238  auto& catalog = session_info.getCatalog();
3239  auto& user = session_info.get_currentUser();
3240  object.loadKey(catalog);
3241  object.setPrivileges(requestedPermissions);
3242  std::vector<DBObject> privs = {object};
3243  return SysCatalog::instance().checkPrivileges(user, privs);
3244 }
3245 
3246 // dashboards
3247 void MapDHandler::get_dashboard(TDashboard& dashboard,
3248  const TSessionId& session,
3249  const int32_t dashboard_id) {
3250  auto stdlog = STDLOG(get_session_ptr(session));
3251  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3252  auto session_ptr = stdlog.getConstSessionInfo();
3253  auto const& cat = session_ptr->getCatalog();
3255  auto dash = cat.getMetadataForDashboard(dashboard_id);
3256  if (!dash) {
3257  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
3258  " doesn't exist");
3259  }
3261  *session_ptr, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3262  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
3263  std::to_string(dashboard_id));
3264  }
3265  user_meta.userName = "";
3266  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3267  auto objects_list = SysCatalog::instance().getMetadataForObject(
3268  cat.getCurrentDB().dbId,
3269  static_cast<int>(DBObjectType::DashboardDBObjectType),
3270  dashboard_id);
3271  dashboard.dashboard_name = dash->dashboardName;
3272  dashboard.dashboard_state = dash->dashboardState;
3273  dashboard.image_hash = dash->imageHash;
3274  dashboard.update_time = dash->updateTime;
3275  dashboard.dashboard_metadata = dash->dashboardMetadata;
3276  dashboard.dashboard_owner = dash->user;
3277  dashboard.dashboard_id = dash->dashboardId;
3278  if (objects_list.empty() ||
3279  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3280  dashboard.is_dash_shared = false;
3281  } else {
3282  dashboard.is_dash_shared = true;
3283  }
3284 }
3285 
3286 void MapDHandler::get_dashboards(std::vector<TDashboard>& dashboards,
3287  const TSessionId& session) {
3288  auto stdlog = STDLOG(get_session_ptr(session));
3289  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3290  auto session_ptr = stdlog.getConstSessionInfo();
3291  auto const& cat = session_ptr->getCatalog();
3293  const auto dashes = cat.getAllDashboardsMetadata();
3294  user_meta.userName = "";
3295  for (const auto d : dashes) {
3296  SysCatalog::instance().getMetadataForUserById(d->userId, user_meta);
3298  *session_ptr, d->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3299  auto objects_list = SysCatalog::instance().getMetadataForObject(
3300  cat.getCurrentDB().dbId,
3301  static_cast<int>(DBObjectType::DashboardDBObjectType),
3302  d->dashboardId);
3303  TDashboard dash;
3304  dash.dashboard_name = d->dashboardName;
3305  dash.image_hash = d->imageHash;
3306  dash.update_time = d->updateTime;
3307  dash.dashboard_metadata = d->dashboardMetadata;
3308  dash.dashboard_id = d->dashboardId;
3309  dash.dashboard_owner = d->user;
3310  // dashboardState is intentionally not populated here
3311  // for payload reasons
3312  // use get_dashboard call to get state
3313  if (objects_list.empty() ||
3314  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3315  dash.is_dash_shared = false;
3316  } else {
3317  dash.is_dash_shared = true;
3318  }
3319  dashboards.push_back(dash);
3320  }
3321  }
3322 }
3323 
3324 int32_t MapDHandler::create_dashboard(const TSessionId& session,
3325  const std::string& dashboard_name,
3326  const std::string& dashboard_state,
3327  const std::string& image_hash,
3328  const std::string& dashboard_metadata) {
3329  auto stdlog = STDLOG(get_session_ptr(session));
3330  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3331  auto session_ptr = stdlog.getConstSessionInfo();
3332  check_read_only("create_dashboard");
3333  auto& cat = session_ptr->getCatalog();
3334 
3335  if (!session_ptr->checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
3337  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
3338  }
3339 
3340  auto dash = cat.getMetadataForDashboard(
3341  std::to_string(session_ptr->get_currentUser().userId), dashboard_name);
3342  if (dash) {
3343  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
3344  }
3345 
3347  dd.dashboardName = dashboard_name;
3348  dd.dashboardState = dashboard_state;
3349  dd.imageHash = image_hash;
3350  dd.dashboardMetadata = dashboard_metadata;
3351  dd.userId = session_ptr->get_currentUser().userId;
3352  dd.user = session_ptr->get_currentUser().userName;
3353 
3354  try {
3355  auto id = cat.createDashboard(dd);
3356  // TODO: transactionally unsafe
3357  SysCatalog::instance().createDBObject(
3358  session_ptr->get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
3359  return id;
3360  } catch (const std::exception& e) {
3361  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3362  }
3363 }
3364 
3365 void MapDHandler::replace_dashboard(const TSessionId& session,
3366  const int32_t dashboard_id,
3367  const std::string& dashboard_name,
3368  const std::string& dashboard_owner,
3369  const std::string& dashboard_state,
3370  const std::string& image_hash,
3371  const std::string& dashboard_metadata) {
3372  auto stdlog = STDLOG(get_session_ptr(session));
3373  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3374  auto session_ptr = stdlog.getConstSessionInfo();
3375  check_read_only("replace_dashboard");
3376  auto& cat = session_ptr->getCatalog();
3377 
3379  *session_ptr, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
3380  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
3381  }
3382 
3384  dd.dashboardName = dashboard_name;
3385  dd.dashboardState = dashboard_state;
3386  dd.imageHash = image_hash;
3387  dd.dashboardMetadata = dashboard_metadata;
3389  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
3390  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
3391  " does not exist");
3392  }
3393  dd.userId = user.userId;
3394  dd.user = dashboard_owner;
3395  dd.dashboardId = dashboard_id;
3396 
3397  try {
3398  cat.replaceDashboard(dd);
3399  } catch (const std::exception& e) {
3400  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3401  }
3402 }
3403 
3404 void MapDHandler::delete_dashboard(const TSessionId& session,
3405  const int32_t dashboard_id) {
3406  auto stdlog = STDLOG(get_session_ptr(session));
3407  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3408  auto session_ptr = stdlog.getConstSessionInfo();
3409  check_read_only("delete_dashboard");
3410  auto& cat = session_ptr->getCatalog();
3411  auto dash = cat.getMetadataForDashboard(dashboard_id);
3412  if (!dash) {
3413  THROW_MAPD_EXCEPTION("Dashboard with id" + std::to_string(dashboard_id) +
3414  " doesn't exist, so cannot delete it");
3415  }
3417  *session_ptr, dash->dashboardId, AccessPrivileges::DELETE_DASHBOARD)) {
3418  THROW_MAPD_EXCEPTION("Not enough privileges to delete a dashboard.");
3419  }
3420  try {
3421  cat.deleteMetadataForDashboard(dashboard_id);
3422  } catch (const std::exception& e) {
3423  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3424  }
3425 }
3426 
3427 std::vector<std::string> MapDHandler::get_valid_groups(const TSessionId& session,
3428  int32_t dashboard_id,
3429  std::vector<std::string> groups) {
3430  const auto session_info = get_session_copy(session);
3431  auto& cat = session_info.getCatalog();
3432  auto dash = cat.getMetadataForDashboard(dashboard_id);
3433  if (!dash) {
3434  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3435  " does not exist");
3436  } else if (session_info.get_currentUser().userId != dash->userId &&
3437  !session_info.get_currentUser().isSuper) {
3438  throw std::runtime_error(
3439  "User should be either owner of dashboard or super user to share/unshare it");
3440  }
3441  std::vector<std::string> valid_groups;
3443  for (auto& group : groups) {
3444  user_meta.isSuper = false; // initialize default flag
3445  if (!SysCatalog::instance().getGrantee(group)) {
3446  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
3447  } else if (!user_meta.isSuper) {
3448  valid_groups.push_back(group);
3449  }
3450  }
3451  return valid_groups;
3452 }
3453 
3454 // NOOP: Grants not available for objects as of now
3455 void MapDHandler::share_dashboard(const TSessionId& session,
3456  const int32_t dashboard_id,
3457  const std::vector<std::string>& groups,
3458  const std::vector<std::string>& objects,
3459  const TDashboardPermissions& permissions,
3460  const bool grant_role = false) {
3461  auto stdlog = STDLOG(get_session_ptr(session));
3462  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3463  auto session_ptr = stdlog.getConstSessionInfo();
3464  check_read_only("share_dashboard");
3465  std::vector<std::string> valid_groups;
3466  valid_groups = get_valid_groups(session, dashboard_id, groups);
3467  auto const& cat = session_ptr->getCatalog();
3468  // By default object type can only be dashboard
3470  DBObject object(dashboard_id, object_type);
3471  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3472  !permissions.view_) {
3473  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for grants");
3474  } else {
3475  AccessPrivileges privs;
3476 
3477  object.resetPrivileges();
3478  if (permissions.delete_) {
3480  }
3481 
3482  if (permissions.create_) {
3484  }
3485 
3486  if (permissions.edit_) {
3488  }
3489 
3490  if (permissions.view_) {
3492  }
3493 
3494  object.setPrivileges(privs);
3495  }
3496  SysCatalog::instance().grantDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3497  // grant system_role to grantees for underlying objects
3498  if (grant_role) {
3499  auto dash = cat.getMetadataForDashboard(dashboard_id);
3500  SysCatalog::instance().grantRoleBatch({dash->dashboardSystemRoleName}, valid_groups);
3501  }
3502 }
3503 
3504 void MapDHandler::unshare_dashboard(const TSessionId& session,
3505  const int32_t dashboard_id,
3506  const std::vector<std::string>& groups,
3507  const std::vector<std::string>& objects,
3508  const TDashboardPermissions& permissions) {
3509  auto stdlog = STDLOG(get_session_ptr(session));
3510  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3511  auto session_ptr = stdlog.getConstSessionInfo();
3512  check_read_only("unshare_dashboard");
3513  std::vector<std::string> valid_groups;
3514  valid_groups = get_valid_groups(session, dashboard_id, groups);
3515  auto const& cat = session_ptr->getCatalog();
3516  // By default object type can only be dashboard
3518  DBObject object(dashboard_id, object_type);
3519  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3520  !permissions.view_) {
3521  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for revokes");
3522  } else {
3523  AccessPrivileges privs;
3524 
3525  object.resetPrivileges();
3526  if (permissions.delete_) {
3528  }
3529 
3530  if (permissions.create_) {
3532  }
3533 
3534  if (permissions.edit_) {
3536  }
3537 
3538  if (permissions.view_) {
3540  }
3541 
3542  object.setPrivileges(privs);
3543  }
3544  SysCatalog::instance().revokeDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3545  // revoke system_role from grantees for underlying objects
3546  const auto dash = cat.getMetadataForDashboard(dashboard_id);
3547  SysCatalog::instance().revokeDashboardSystemRole(dash->dashboardSystemRoleName,
3548  valid_groups);
3549 }
3550 
3552  std::vector<TDashboardGrantees>& dashboard_grantees,
3553  const TSessionId& session,
3554  int32_t dashboard_id) {
3555  auto stdlog = STDLOG(get_session_ptr(session));
3556  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3557  auto session_ptr = stdlog.getConstSessionInfo();
3558  auto const& cat = session_ptr->getCatalog();
3560  auto dash = cat.getMetadataForDashboard(dashboard_id);
3561  if (!dash) {
3562  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3563  " does not exist");
3564  } else if (session_ptr->get_currentUser().userId != dash->userId &&
3565  !session_ptr->get_currentUser().isSuper) {
3567  "User should be either owner of dashboard or super user to access grantees");
3568  }
3569  std::vector<ObjectRoleDescriptor*> objectsList;
3570  objectsList = SysCatalog::instance().getMetadataForObject(
3571  cat.getCurrentDB().dbId,
3572  static_cast<int>(DBObjectType::DashboardDBObjectType),
3573  dashboard_id); // By default objecttypecan be only dashabaords
3574  user_meta.userId = -1;
3575  user_meta.userName = "";
3576  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3577  for (auto object : objectsList) {
3578  if (user_meta.userName == object->roleName) {
3579  // Mask owner
3580  continue;
3581  }
3582  TDashboardGrantees grantee;
3583  TDashboardPermissions perm;
3584  grantee.name = object->roleName;
3585  grantee.is_user = object->roleType;
3586  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3587  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3588  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3589  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3590  grantee.permissions = perm;
3591  dashboard_grantees.push_back(grantee);
3592  }
3593 }
3594 
3595 void MapDHandler::create_link(std::string& _return,
3596  const TSessionId& session,
3597  const std::string& view_state,
3598  const std::string& view_metadata) {
3599  auto stdlog = STDLOG(get_session_ptr(session));
3600  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3601  auto session_ptr = stdlog.getConstSessionInfo();
3602  // check_read_only("create_link");
3603  auto& cat = session_ptr->getCatalog();
3604 
3605  LinkDescriptor ld;
3606  ld.userId = session_ptr->get_currentUser().userId;
3607  ld.viewState = view_state;
3608  ld.viewMetadata = view_metadata;
3609 
3610  try {
3611  _return = cat.createLink(ld, 6);
3612  } catch (const std::exception& e) {
3613  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3614  }
3615 }
3616 
3618  const std::string& name,
3619  const bool is_array) {
3620  TColumnType ct;
3621  ct.col_name = name;
3622  ct.col_type.type = type;
3623  ct.col_type.is_array = is_array;
3624  return ct;
3625 }
3626 
3627 void MapDHandler::check_geospatial_files(const boost::filesystem::path file_path,
3628  const Importer_NS::CopyParams& copy_params) {
3629  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
3630  if (std::find(shp_ext.begin(),
3631  shp_ext.end(),
3632  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
3633  shp_ext.end()) {
3634  for (auto ext : shp_ext) {
3635  auto aux_file = file_path;
3637  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
3638  copy_params) &&
3639  !Importer_NS::Importer::gdalFileExists(aux_file.replace_extension(ext).string(),
3640  copy_params)) {
3641  throw std::runtime_error("required file for shapefile does not exist: " +
3642  aux_file.filename().string());
3643  }
3644  }
3645  }
3646 }
3647 
3648 void MapDHandler::create_table(const TSessionId& session,
3649  const std::string& table_name,
3650  const TRowDescriptor& rd,
3651  const TFileType::type file_type,
3652  const TCreateParams& create_params) {
3653  auto stdlog = STDLOG("table_name", table_name);
3654  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3655  check_read_only("create_table");
3656 
3657  if (ImportHelpers::is_reserved_name(table_name)) {
3658  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
3659  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
3660  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
3661  }
3662 
3663  auto rds = rd;
3664 
3665  // no longer need to manually add the poly column for a TFileType::POLYGON table
3666  // a column of the correct geo type has already been added
3667  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
3668 
3669  std::string stmt{"CREATE TABLE " + table_name};
3670  std::vector<std::string> col_stmts;
3671 
3672  for (auto col : rds) {
3673  if (ImportHelpers::is_reserved_name(col.col_name)) {
3674  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
3675  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
3676  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
3677  }
3678  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
3679  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
3680  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
3681  " for column: " + col.col_name);
3682  }
3683 
3684  if (col.col_type.type == TDatumType::DECIMAL) {
3685  // if no precision or scale passed in set to default 14,7
3686  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
3687  col.col_type.precision = 14;
3688  col.col_type.scale = 7;
3689  }
3690  }
3691 
3692  std::string col_stmt;
3693  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
3694 
3695  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
3696  // `nullable` argument, leading this to default to false. Uncomment for v2.
3697  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
3698 
3699  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
3700  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
3701  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
3702  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
3703  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
3704  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
3705  }
3706  } else if (col.col_type.type == TDatumType::STR) {
3707  // non DICT encoded strings
3708  col_stmt.append(" ENCODING NONE");
3709  } else if (col.col_type.type == TDatumType::POINT ||
3710  col.col_type.type == TDatumType::LINESTRING ||
3711  col.col_type.type == TDatumType::POLYGON ||
3712  col.col_type.type == TDatumType::MULTIPOLYGON) {
3713  // non encoded compressable geo
3714  if (col.col_type.scale == 4326) {
3715  col_stmt.append(" ENCODING NONE");
3716  }
3717  }
3718  col_stmts.push_back(col_stmt);
3719  }
3720 
3721  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
3722 
3723  if (create_params.is_replicated) {
3724  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
3725  }
3726 
3727  stmt.append(";");
3728 
3729  TQueryResult ret;
3730  sql_execute(ret, session, stmt, true, "", -1, -1);
3731 }
3732 
3733 void MapDHandler::import_table(const TSessionId& session,
3734  const std::string& table_name,
3735  const std::string& file_name_in,
3736  const TCopyParams& cp) {
3737  try {
3738  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3739  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3740  auto session_ptr = stdlog.getConstSessionInfo();
3741  check_read_only("import_table");
3742  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
3743  auto& cat = session_ptr->getCatalog();
3744  const auto td_with_lock =
3746  cat, table_name);
3747  const auto td = td_with_lock();
3748  CHECK(td);
3749  check_table_load_privileges(*session_ptr, table_name);
3750 
3751  std::string file_name{file_name_in};
3752  auto file_path = boost::filesystem::path(file_name);
3754  if (!boost::istarts_with(file_name, "s3://")) {
3755  if (!boost::filesystem::path(file_name).is_absolute()) {
3756  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3757  boost::filesystem::path(file_name).filename();
3758  file_name = file_path.string();
3759  }
3760  if (!boost::filesystem::exists(file_path)) {
3761  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3762  }
3763  }
3764 
3765  // TODO(andrew): add delimiter detection to Importer
3766  if (copy_params.delimiter == '\0') {
3767  copy_params.delimiter = ',';
3768  if (boost::filesystem::extension(file_path) == ".tsv") {
3769  copy_params.delimiter = '\t';
3770  }
3771  }
3772 
3773  const auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(
3774  session_ptr->getCatalog(), table_name);
3775  std::unique_ptr<Importer_NS::Importer> importer;
3776  if (leaf_aggregator_.leafCount() > 0) {
3777  importer.reset(new Importer_NS::Importer(
3778  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
3779  file_path.string(),
3780  copy_params));
3781  } else {
3782  importer.reset(new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
3783  }
3784  auto ms = measure<>::execution([&]() { importer->import(); });
3785  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
3786  } catch (const std::exception& e) {
3787  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
3788  }
3789 }
3790 
3791 namespace {
3792 
3793 // helper functions for error checking below
3794 // these would usefully be added as methods of TDatumType
3795 // but that's not possible as it's auto-generated by Thrift
3796 
3798  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
3799  t == TDatumType::LINESTRING || t == TDatumType::POINT);
3800 }
3801 
3802 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3803 
3804 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
3805  std::stringstream ss;
3806  ss << t;
3807  return ss.str();
3808 }
3809 
3810 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
3811  std::string result;
3812  switch (p) {
3813  case SQLTypes::kGEOGRAPHY:
3814  result = "GEOGRAPHY";
3815  break;
3816  case SQLTypes::kGEOMETRY:
3817  result = "GEOMETRY";
3818  break;
3819  default:
3820  result = "INVALID";
3821  break;
3822  }
3823  return result;
3824 }
3825 
3826 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
3827  std::stringstream ss;
3828  ss << t;
3829  return ss.str();
3830 }
3831 
3832 #endif
3833 
3834 } // namespace
3835 
3836 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
3837  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
3838  "' to table '" + table_name + "'. Column '" + cd->columnName + \
3839  "' " + attr + " mismatch (got '" + got + "', expected '" + \
3840  expected + "')");
3841 
3842 void MapDHandler::import_geo_table(const TSessionId& session,
3843  const std::string& table_name,
3844  const std::string& file_name_in,
3845  const TCopyParams& cp,
3846  const TRowDescriptor& row_desc,
3847  const TCreateParams& create_params) {
3848  auto stdlog = STDLOG(get_session_ptr(session), "table_name", table_name);
3849  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
3850  auto session_ptr = stdlog.getConstSessionInfo();
3851  check_read_only("import_table");
3852  auto& cat = session_ptr->getCatalog();
3853 
3855 
3856  std::string file_name{file_name_in};
3857 
3858  if (path_is_relative(file_name)) {
3859  // assume relative paths are relative to data_path / mapd_import / <session>
3860  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3861  boost::filesystem::path(file_name).filename();
3862  file_name = file_path.string();
3863  }
3864 
3865  if (is_a_supported_geo_file(file_name, true)) {
3866  // prepare to load geo file directly
3867  add_vsi_network_prefix(file_name);
3868  add_vsi_geo_prefix(file_name);
3869  } else if (is_a_supported_archive_file(file_name)) {
3870  // find the archive file
3871  add_vsi_network_prefix(file_name);
3872  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
3873  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3874  }
3875  // find geo file in archive
3876  add_vsi_archive_prefix(file_name);
3877  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3878  // prepare to load that geo file
3879  if (geo_file.size()) {
3880  file_name = file_name + std::string("/") + geo_file;
3881  }
3882  } else {
3883  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
3884  file_name_in);
3885  }
3886 
3887  // log what we're about to try to do
3888  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
3889  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
3890 
3891  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
3892  auto file_path = boost::filesystem::path(file_name);
3893  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3894  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
3895  }
3896 
3897  // use GDAL to check any dependent files exist (ditto)
3898  try {
3899  check_geospatial_files(file_path, copy_params);
3900  } catch (const std::exception& e) {
3901  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3902  }
3903 
3904  // get layer info and deconstruct
3905  // in general, we will get a combination of layers of these four types:
3906  // EMPTY: no rows, report and skip
3907  // GEO: create a geo table from this
3908  // NON_GEO: create a regular table from this
3909  // UNSUPPORTED_GEO: report and skip
3910  std::vector<Importer_NS::Importer::GeoFileLayerInfo> layer_info;
3911  try {
3912  layer_info = Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
3913  } catch (const std::exception& e) {
3914  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3915  }
3916 
3917  // categorize the results
3918  using LayerNameToContentsMap =
3919  std::map<std::string, Importer_NS::Importer::GeoFileLayerContents>;
3920  LayerNameToContentsMap load_layers;
3921  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
3922  for (const auto& layer : layer_info) {
3923  switch (layer.contents) {
3925  LOG(INFO) << "import_geo_table: '" << layer.name
3926  << "' (will import as geo table)";
3927  load_layers[layer.name] = layer.contents;
3928  break;
3930  LOG(INFO) << "import_geo_table: '" << layer.name
3931  << "' (will import as regular table)";
3932  load_layers[layer.name] = layer.contents;
3933  break;
3935  LOG(WARNING) << "import_geo_table: '" << layer.name
3936  << "' (will not import, unsupported geo type)";
3937  break;
3939  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
3940  break;
3941  default:
3942  break;
3943  }
3944  }
3945 
3946  // if nothing is loadable, stop now
3947  if (load_layers.size() == 0) {
3948  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
3949  }
3950 
3951  // if we've been given an explicit layer name, check that it exists and is loadable
3952  // scan the original list, as it may exist but not have been gathered as loadable
3953  if (copy_params.geo_layer_name.size()) {
3954  bool found = false;
3955  for (const auto& layer : layer_info) {
3956  if (copy_params.geo_layer_name == layer.name) {
3957  if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::GEO ||
3959  // forget all the other layers and just load this one
3960  load_layers.clear();
3961  load_layers[layer.name] = layer.contents;
3962  found = true;
3963  break;
3964  } else if (layer.contents ==
3966  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3967  copy_params.geo_layer_name +
3968  "' has unsupported geo type!");
3969  } else if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::EMPTY) {
3970  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3971  copy_params.geo_layer_name + "' is empty!");
3972  }
3973  }
3974  }
3975  if (!found) {
3976  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3977  copy_params.geo_layer_name + "' not found!");
3978  }
3979  }
3980 
3981  // Immerse import of multiple layers is not yet supported
3982  // @TODO fix this!
3983  if (row_desc.size() > 0 && load_layers.size() > 1) {
3985  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
3986  }
3987 
3988  // one definition of layer table name construction
3989  // we append the layer name if we're loading more than one table
3990  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
3991  const std::string& layer_name) {
3992  if (load_layers.size() > 1) {
3993  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
3994  if (sanitized_layer_name != layer_name) {
3995  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
3996  << sanitized_layer_name << "' for table name";
3997  }
3998  return table_name + "_" + sanitized_layer_name;
3999  }
4000  return table_name;
4001  };
4002 
4003  // if we're importing multiple tables, then NONE of them must exist already
4004  if (load_layers.size() > 1) {
4005  for (const auto& layer : load_layers) {
4006  // construct table name
4007  auto this_table_name = construct_layer_table_name(table_name, layer.first);
4008 
4009  // table must not exist
4010  if (cat.getMetadataForTable(this_table_name)) {
4011  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
4012  "' already exists, aborting!");
4013  }
4014  }
4015  }
4016 
4017  // prepare to gather errors that would otherwise be exceptions, as we can only throw one
4018  std::vector<std::string> caught_exception_messages;
4019 
4020  // prepare to time multi-layer import
4021  double total_import_ms = 0.0;
4022 
4023  // now we're safe to start importing
4024  // we loop over the layers we're going to attempt to load
4025  for (const auto& layer : load_layers) {
4026  // unpack
4027  const auto& layer_name = layer.first;
4028  const auto& layer_contents = layer.second;
4029  bool is_geo_layer =
4031 
4032  // construct table name again
4033  auto this_table_name = construct_layer_table_name(table_name, layer_name);
4034 
4035  // report
4036  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
4037 
4038  // we need a row descriptor
4039  TRowDescriptor rd;
4040  if (row_desc.size() > 0) {
4041  // we have a valid RowDescriptor
4042  // this is the case where Immerse has already detected and created
4043  // all we need to do is import and trust that the data will match
4044  // use the provided row descriptor
4045  // table must already exist (we check this below)
4046  rd = row_desc;
4047  } else {
4048  // we don't have a RowDescriptor
4049  // we have to detect the file ourselves
4050  TDetectResult cds;
4051  TCopyParams cp_copy = cp; // retain S3 auth tokens
4052  cp_copy.geo_layer_name = layer_name;
4053  cp_copy.file_type = TFileType::POLYGON;
4054  try {
4055  detect_column_types(cds, session, file_name_in, cp_copy);
4056  } catch (const std::exception& e) {
4057  // capture the error and abort this layer
4058  caught_exception_messages.emplace_back(
4059  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
4060  continue;
4061  }
4062  rd = cds.row_set.row_desc;
4063 
4064  // then, if the table does NOT already exist, create it
4065  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4066  if (!td) {
4067  try {
4068  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
4069  } catch (const std::exception& e) {
4070  // capture the error and abort this layer
4071  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
4072  layer_name + "':" + e.what());
4073  continue;
4074  }
4075  }
4076  }
4077 
4078  // by this point, the table should exist, one way or another
4079  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
4080  if (!td) {
4081  // capture the error and abort this layer
4082  std::string exception_message =
4083  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
4084  this_table_name + "'; table does not exist or failed to create.";
4085  caught_exception_messages.emplace_back(exception_message);
4086  continue;
4087  }
4088 
4089  // then, we have to verify that the structure matches
4090  // get column descriptors (non-system, non-deleted, logical columns only)
4091  const auto col_descriptors =
4092  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
4093 
4094  // first, compare the column count
4095  if (col_descriptors.size() != rd.size()) {
4096  // capture the error and abort this layer
4097  std::string exception_message =
4098  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
4099  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
4100  ", expecting " + std::to_string(col_descriptors.size()) + ")";
4101  caught_exception_messages.emplace_back(exception_message);
4102  continue;
4103  }
4104 
4105  try {
4106  // then the names and types
4107  int rd_index = 0;
4108  for (auto cd : col_descriptors) {
4109  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
4110  std::string gname = rd[rd_index].col_name; // got
4111  std::string ename = cd->columnName; // expecting
4112 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4113  TTypeInfo gti = rd[rd_index].col_type; // got
4114 #endif
4115  TTypeInfo eti = cd_col_type.col_type; // expecting
4116  // check for name match
4117  if (gname != ename) {
4118  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
4119  gname == OMNISCI_GEO_PREFIX) {
4120  // rename incoming geo column to match existing legacy default geo column
4121  rd[rd_index].col_name = gname;
4122  LOG(INFO)
4123  << "import_geo_table: Renaming incoming geo column to match existing "
4124  "legacy default geo column";
4125  } else {
4126  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
4127  }
4128  }
4129 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
4130  // check for type attributes match
4131  // these attrs must always match regardless of type
4132  if (gti.type != eti.type) {
4134  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
4135  }
4136  if (gti.is_array != eti.is_array) {
4138  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
4139  }
4140  if (gti.nullable != eti.nullable) {
4142  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
4143  }
4144  if (TTypeInfo_IsGeo(eti.type)) {
4145  // for geo, only these other attrs must also match
4146  // encoding and comp_param are allowed to differ
4147  // this allows appending to existing geo table
4148  // without needing to know the existing encoding
4149  if (gti.precision != eti.precision) {
4151  "geo sub-type",
4152  TTypeInfo_GeoSubTypeToString(gti.precision),
4153  TTypeInfo_GeoSubTypeToString(eti.precision));
4154  }
4155  if (gti.scale != eti.scale) {
4157  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
4158  }
4159  if (gti.encoding != eti.encoding) {
4160  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
4161  }
4162  if (gti.comp_param != eti.comp_param) {
4163  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
4164  }
4165  } else {
4166  // non-geo, all other attrs must also match
4167  // @TODO consider relaxing some of these dependent on type
4168  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
4169  if (gti.precision != eti.precision) {
4171  std::to_string(gti.precision),
4172  std::to_string(eti.precision));
4173  }
4174  if (gti.scale != eti.scale) {
4176  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
4177  }
4178  if (gti.encoding != eti.encoding) {
4180  "encoding",
4181  TTypeInfo_EncodingToString(gti.encoding),
4182  TTypeInfo_EncodingToString(eti.encoding));
4183  }
4184  if (gti.comp_param != eti.comp_param) {
4186  std::to_string(gti.comp_param),
4187  std::to_string(eti.comp_param));
4188  }
4189  }
4190 #endif
4191  rd_index++;
4192  }
4193  } catch (const std::exception& e) {
4194  // capture the error and abort this layer
4195  caught_exception_messages.emplace_back(e.what());
4196  continue;
4197  }
4198 
4199  std::map<std::string, std::string> colname_to_src;
4200  for (auto r : rd) {
4201  colname_to_src[r.col_name] =
4202  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
4203  }
4204 
4205  try {
4206  check_table_load_privileges(*session_ptr, this_table_name);
4207  } catch (const std::exception& e) {
4208  // capture the error and abort this layer
4209  caught_exception_messages.emplace_back(e.what());
4210  continue;
4211  }
4212 
4213  if (is_geo_layer) {
4214  // Final check to ensure that we actually have a geo column
4215  // of the expected name and type before doing the actual import,
4216  // in case the user naively overrode the name or type in Immerse
4217  // Preview (which as of 6/8/18 it still allows you to do).
4218  // This avoids a fatal assert later when it fails to find the
4219  // column. We should make Immerse more robust and disallow this.
4220  bool have_geo_column_with_correct_name = false;
4221  for (const auto& r : rd) {
4222  if (TTypeInfo_IsGeo(r.col_type.type)) {
4223  // TODO(team): allow user to override the geo column name
4224  if (r.col_name == OMNISCI_GEO_PREFIX) {
4225  have_geo_column_with_correct_name = true;
4226  } else if (r.col_name == LEGACY_GEO_PREFIX) {
4227  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
4228  // Normalize column names for geo append with legacy column naming scheme
4229  colname_to_src[r.col_name] = r.col_name;
4230  have_geo_column_with_correct_name = true;
4231  }
4232  }
4233  }
4234  if (!have_geo_column_with_correct_name) {
4235  std::string exception_message = "Table " + this_table_name +
4236  " does not have a geo column with name '" +
4237  OMNISCI_GEO_PREFIX + "'. Import aborted!";
4238  caught_exception_messages.emplace_back(exception_message);
4239  continue;
4240  }
4241  }
4242 
4243  try {
4244  // import this layer only?
4245  copy_params.geo_layer_name = layer_name;
4246 
4247  // create an importer
4248  std::unique_ptr<Importer_NS::Importer> importer;
4249  if (leaf_aggregator_.leafCount() > 0) {
4250  importer.reset(new Importer_NS::Importer(
4251  new DistributedLoader(*session_ptr, td, &leaf_aggregator_),
4252  file_path.string(),
4253  copy_params));
4254  } else {
4255  importer.reset(
4256  new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
4257  }
4258 
4259  // import
4260  auto ms = measure<>::execution([&]() { importer->importGDAL(colname_to_src); });
4261  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
4262  << "s";
4263  total_import_ms += ms;
4264  } catch (const std::exception& e) {
4265  std::string exception_message =
4266  "Import of Layer '" + this_table_name + "' failed: " + e.what();
4267  caught_exception_messages.emplace_back(exception_message);
4268  continue;
4269  }
4270  }
4271 
4272  // did we catch any exceptions?
4273  if (caught_exception_messages.size()) {
4274  // combine all the strings into one and throw a single Thrift exception
4275  std::string combined_exception_message = "Failed to import geo file:\n";
4276  for (const auto& message : caught_exception_messages) {
4277  combined_exception_message += message + "\n";
4278  }
4279  THROW_MAPD_EXCEPTION(combined_exception_message);
4280  } else {
4281  // report success and total time
4282  LOG(INFO) << "Import Successful!";
4283  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
4284  }
4285 }
4286 
4287 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
4288 
4289 void MapDHandler::import_table_status(TImportStatus& _return,
4290  const TSessionId& session,
4291  const std::string& import_id) {
4292  auto stdlog = STDLOG(get_session_ptr(session), "import_table_status", import_id);
4293  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4294  auto is = Importer_NS::Importer::get_import_status(import_id);
4295  _return.elapsed = is.elapsed.count();
4296  _return.rows_completed = is.rows_completed;
4297  _return.rows_estimated = is.rows_estimated;
4298  _return.rows_rejected = is.rows_rejected;
4299 }
4300 
4302  const TSessionId& session,
4303  const std::string& archive_path_in,
4304  const TCopyParams& copy_params) {
4305  auto stdlog =
4306  STDLOG(get_session_ptr(session), "get_first_geo_file_in_archive", archive_path_in);
4307  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4308  std::string archive_path(archive_path_in);
4309 
4310  if (path_is_relative(archive_path)) {
4311  // assume relative paths are relative to data_path / mapd_import / <session>
4312  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4313  boost::filesystem::path(archive_path).filename();
4314  archive_path = file_path.string();
4315  }
4316 
4317  if (is_a_supported_archive_file(archive_path)) {
4318  // find the archive file
4319  add_vsi_network_prefix(archive_path);
4320  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4321  thrift_to_copyparams(copy_params))) {
4322  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4323  }
4324  // find geo file in archive
4325  add_vsi_archive_prefix(archive_path);
4326  std::string geo_file =
4327  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
4328  // what did we get?
4329  if (geo_file.size()) {
4330  // prepend it with the original path
4331  _return = archive_path_in + std::string("/") + geo_file;
4332  } else {
4333  // just return the original path
4334  _return = archive_path_in;
4335  }
4336  } else {
4337  // just return the original path
4338  _return = archive_path_in;
4339  }
4340 }
4341 
4342 void MapDHandler::get_all_files_in_archive(std::vector<std::string>& _return,
4343  const TSessionId& session,
4344  const std::string& archive_path_in,
4345  const TCopyParams& copy_params) {
4346  auto stdlog =
4347  STDLOG(get_session_ptr(session), "get_all_files_in_archive", archive_path_in);
4348  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4349 
4350  std::string archive_path(archive_path_in);
4351  if (path_is_relative(archive_path)) {
4352  // assume relative paths are relative to data_path / mapd_import / <session>
4353  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4354  boost::filesystem::path(archive_path).filename();
4355  archive_path = file_path.string();
4356  }
4357 
4358  if (is_a_supported_archive_file(archive_path)) {
4359  // find the archive file
4360  add_vsi_network_prefix(archive_path);
4361  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4362  thrift_to_copyparams(copy_params))) {
4363  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4364  }
4365  // find all files in archive
4366  add_vsi_archive_prefix(archive_path);
4368  archive_path, thrift_to_copyparams(copy_params));
4369  // prepend them all with original path
4370  for (auto& s : _return) {
4371  s = archive_path_in + '/' + s;
4372  }
4373  }
4374 }
4375 
4376 void MapDHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
4377  const TSessionId& session,
4378  const std::string& file_name_in,
4379  const TCopyParams& cp) {
4380  auto stdlog = STDLOG(get_session_ptr(session), "get_layers_in_geo_file", file_name_in);
4381  stdlog.appendNameValuePairs("client", getConnectionInfo().toString());
4382  std::string file_name(file_name_in);
4383 
4385 
4386  // handle relative paths
4387  if (path_is_relative(file_name)) {
4388  // assume relative paths are relative to data_path / mapd_import / <session>
4389  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4390  boost::filesystem::path(file_name).filename();
4391  file_name = file_path.string();
4392  }
4393 
4394  // validate file_name
4395  if (is_a_supported_geo_file(file_name, true)) {
4396  // prepare to load geo file directly
4397  add_vsi_network_prefix(file_name);
4398  add_vsi_geo_prefix(file_name);
4399  } else if (is_a_supported_archive_file(file_name)) {
4400  // find the archive file
4401  add_vsi_network_prefix(file_name);
4402  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
4403  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4404  }
4405  // find geo file in archive
4406  add_vsi_archive_prefix(file_name);
4407  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4408  // prepare to load that geo file
4409  if (geo_file.size()) {
4410  file_name = file_name + std::string("/") + geo_file;
4411  }
4412  } else {
4413  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4414  file_name_in);
4415  }
4416 
4417  // check the file actually exists
4418  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4419  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
4420  }
4421 
4422  // find all layers
4423  auto internal_layer_info =
4424  Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4425 
4426  // convert to Thrift type
4427  for (const auto& internal_layer : internal_layer_info) {
4428  TGeoFileLayerInfo layer;
4429  layer.name = internal_layer.name;
4430  switch (internal_layer.contents) {
4432  layer.contents = TGeoFileLayerContents::EMPTY;
4433  break;
4435  layer.contents = TGeoFileLayerContents::GEO;
4436  break;
4438  layer.contents = TGeoFileLayerContents::NON_GEO;
4439  break;
4441  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
4442  break;
4443  default:
4444  CHECK(false);
4445  }
4446  _return.emplace_back(layer); // no suitable constructor to just pass parameters
4447  }
4448 }
4449 
4450 void MapDHandler::start_heap_profile(const TSessionId& session) {
4451  auto stdlog = STDLOG(get_session_ptr(session));
4452 #ifdef HAVE_PROFILER
4453  if (IsHeapProfilerRunning()) {
4454  THROW_MAPD_EXCEPTION("Profiler already started");
4455  }
4456  HeapProfilerStart("omnisci");
4457 #else
4458  THROW_MAPD_EXCEPTION("Profiler not enabled");
4459 #endif // HAVE_PROFILER
4460 }
4461 
4462 void MapDHandler::stop_heap_profile(const TSessionId& session) {
4463  auto stdlog = STDLOG(get_session_ptr(session));
4464 #ifdef HAVE_PROFILER
4465  if (!IsHeapProfilerRunning()) {
4466  THROW_MAPD_EXCEPTION("Profiler not running");
4467  }
4468  HeapProfilerStop();
4469 #else
4470  THROW_MAPD_EXCEPTION("Profiler not enabled");
4471 #endif // HAVE_PROFILER
4472 }
4473 
4474 void MapDHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
4475  auto stdlog = STDLOG(get_session_ptr(session));
4476 #ifdef HAVE_PROFILER
4477  if (!IsHeapProfilerRunning()) {
4478  THROW_MAPD_EXCEPTION("Profiler not running");
4479  }
4480  auto profile_buff = GetHeapProfile();
4481  profile = profile_buff;
4482  free(profile_buff);
4483 #else
4484  THROW_MAPD_EXCEPTION("Profiler not enabled");
4485 #endif // HAVE_PROFILER
4486 }
4487 
4488 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
4489 void MapDHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
4490  if (session_it->second.use_count() > 2 ||
4491  isInMemoryCalciteSession(session_it->second->get_currentUser())) {
4492  // SessionInfo is being used in more than one active operation. Original copy + one
4493  // stored in StdLog. Skip the checks.
4494  return;
4495  }
4496  time_t last_used_time = session_it->second->get_last_used_time();
4497  time_t start_time = session_it->second->get_start_time();
4498  if ((time(0) - last_used_time) > idle_session_duration_) {
4499  throw ForceDisconnect("Idle Session Timeout. User should re-authenticate.");
4500  } else if ((time(0) - start_time) > max_session_duration_) {
4501  throw ForceDisconnect("Maximum active Session Timeout. User should re-authenticate.");
4502  }
4503 }
4504 
4505 std::shared_ptr<const Catalog_Namespace::SessionInfo> MapDHandler::get_const_session_ptr(
4506  const TSessionId& session) {
4507  return get_session_ptr(session);
4508 }
4509 
4511  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4512  return *get_session_it_unsafe(session, read_lock)->second;
4513 }
4514 
4515 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_copy_ptr(
4516  const TSessionId& session) {
4517  // Note(Wamsi): We have `get_const_session_ptr` which would return as const SessionInfo
4518  // stored in the map. You can use `get_const_session_ptr` instead of the copy of
4519  // SessionInfo but beware that it can be changed in teh map. So if you do not care about
4520  // the changes then use `get_const_session_ptr` if you do then use this function to get
4521  // a copy. We should eventually aim to merge both `get_const_session_ptr` and
4522  // `get_session_copy_ptr`.
4523  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4524  auto& session_info_ref = *get_session_it_unsafe(session, read_lock)->second;
4525  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
4526 }
4527 
4528 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_ptr(
4529  const TSessionId& session_id) {
4530  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
4531  // Should be used only when you need to make updates to original SessionInfo object.
4532  // Currently used by `update_session_last_used_duration`
4533 
4534  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will be
4535  // invalid during disconnect. SessionInfo will be erased from map by the time it reaches
4536  // here. In both the above cases, we would return `nullptr` and can skip SessionInfo
4537  // updates.
4538  if (session_id.empty()) {
4539  return {};
4540  }
4541  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4542  return get_session_it_unsafe(session_id, read_lock)->second;
4543 }
4544 
4546  const Catalog_Namespace::SessionInfo& session_info,
4547  const std::string& table_name) {
4548  auto user_metadata = session_info.get_currentUser();
4549  auto& cat = session_info.getCatalog();
4550  DBObject dbObject(table_name, TableDBObjectType);
4551  dbObject.loadKey(cat);
4553  std::vector<DBObject> privObjects;
4554  privObjects.push_back(dbObject);
4555  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
4556  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
4557  user_metadata.userName + " has no insert privileges for table " +
4558  table_name + ".");
4559  }
4560 }
4561 
4562 void MapDHandler::check_table_load_privileges(const TSessionId& session,
4563  const std::string& table_name) {
4564  const auto session_info = get_session_copy(session);
4565  check_table_load_privileges(session_info, table_name);
4566 }
4567 
4569  const TExecuteMode::type mode) {
4570  const std::string& user_name = session_ptr->get_currentUser().userName;
4571  switch (mode) {
4572  case TExecuteMode::GPU:
4573  if (cpu_mode_only_) {
4574  TMapDException e;
4575  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
4576  throw e;
4577  }
4579  LOG(INFO) << "User " << user_name << " sets GPU mode.";
4580  break;
4581  case TExecuteMode::CPU:
4583  LOG(INFO) << "User " << user_name << " sets CPU mode.";
4584  break;
4585  }
4586 }
4587 
4588 std::vector<PushedDownFilterInfo> MapDHandler::execute_rel_alg(
4589  TQueryResult& _return,
4590  QueryStateProxy query_state_proxy,
4591  const std::string& query_ra,
4592  const bool column_format,
4593  const ExecutorDeviceType executor_device_type,
4594  const int32_t first_n,
4595  const int32_t at_most_n,
4596  const bool just_validate,
4597  const bool find_push_down_candidates,
4598  const ExplainInfo& explain_info) const {
4599  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4600  const auto& cat = query_state_proxy.getQueryState().getConstSessionInfo()->getCatalog();
4601  CompilationOptions co = {executor_device_type,
4602  /*hoist_literals=*/true,
4605  /*allow_lazy_fetch=*/true,
4606  /*add_delete_column=*/true,
4612  explain_info.justExplain(),
4613  allow_loop_joins_ || just_validate,
4615  jit_debug_,
4616  just_validate,
4619  find_push_down_candidates,
4620  explain_info.justCalciteExplain(),
4624  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4625  jit_debug_ ? "/tmp" : "",
4626  jit_debug_ ? "mapdquery" : "",
4628  RelAlgExecutor ra_executor(executor.get(),
4629  cat,
4630  query_ra,
4631  query_state_proxy.getQueryState().shared_from_this());
4632  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4635  nullptr,
4636  nullptr),
4637  {}};
4638  _return.execution_time_ms += measure<>::execution([&]() {
4639  result = ra_executor.executeRelAlgQuery(co, eo, explain_info.explain_plan, nullptr);
4640  });
4641  // reduce execution time by the time spent during queue waiting
4642  _return.execution_time_ms -= result.getRows()->getQueueTime();
4643  const auto& filter_push_down_info = result.getPushedDownFilterInfo();
4644  if (!filter_push_down_info.empty()) {
4645  return filter_push_down_info;
4646  }
4647  if (explain_info.justExplain()) {
4648  convert_explain(_return, *result.getRows(), column_format);
4649  } else if (!explain_info.justCalciteExplain()) {
4650  convert_rows(_return,
4651  timer.createQueryStateProxy(),
4652  result.getTargetsMeta(),
4653  *result.getRows(),
4654  column_format,
4655  first_n,
4656  at_most_n);
4657  }
4658  return {};
4659 }
4660 
4661 void MapDHandler::execute_rel_alg_df(TDataFrame& _return,
4662  const std::string& query_ra,
4663  QueryStateProxy query_state_proxy,
4664  const Catalog_Namespace::SessionInfo& session_info,
4665  const ExecutorDeviceType device_type,
4666  const size_t device_id,
4667  const int32_t first_n) const {
4668  const auto& cat = session_info.getCatalog();
4669  CHECK(device_type == ExecutorDeviceType::CPU ||
4671  CompilationOptions co = {session_info.get_executor_device_type(),
4672  /*hoist_literals=*/true,
4675  /*allow_lazy_fetch=*/true,
4676  /*add_delete_column=*/true,
4681  false,
4684  jit_debug_,
4685  false,
4688  false,
4689  false,
4693  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4694  jit_debug_ ? "/tmp" : "",
4695  jit_debug_ ? "mapdquery" : "",
4697  RelAlgExecutor ra_executor(executor.get(),
4698  cat,
4699  query_ra,
4700  query_state_proxy.getQueryState().shared_from_this());
4701  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4704  nullptr,
4705  nullptr),
4706  {}};
4707  _return.execution_time_ms += measure<>::execution(
4708  [&]() { result = ra_executor.executeRelAlgQuery(co, eo, false, nullptr); });
4709  _return.execution_time_ms -= result.getRows()->getQueueTime();
4710  const auto rs = result.getRows();
4711  const auto converter =
4712  std::make_unique<ArrowResultSetConverter>(rs,
4713  data_mgr_,
4714  device_type,
4715  device_id,
4716  getTargetNames(result.getTargetsMeta()),
4717  first_n);
4718  ArrowResult arrow_result;
4719  _return.arrow_conversion_time_ms +=
4720  measure<>::execution([&] { arrow_result = converter->getArrowResult(); });
4721  _return.sm_handle =
4722  std::string(arrow_result.sm_handle.begin(), arrow_result.sm_handle.end());
4723  _return.sm_size = arrow_result.sm_size;
4724  _return.df_handle =
4725  std::string(arrow_result.df_handle.begin(), arrow_result.df_handle.end());
4726  if (device_type == ExecutorDeviceType::GPU) {
4727  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
4728  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
4729  ipc_handle_to_dev_ptr_.insert(
4730  std::make_pair(_return.df_handle, arrow_result.serialized_cuda_handle));
4731  }
4732  _return.df_size = arrow_result.df_size;
4733 }
4734 
4735 std::vector<TargetMetaInfo> MapDHandler::getTargetMetaInfo(
4736  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4737  std::vector<TargetMetaInfo> result;
4738  for (const auto target : targets) {
4739  CHECK(target);
4740  CHECK(target->get_expr());
4741  result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
4742  }
4743  return result;
4744 }
4745 
4746 std::vector<std::string> MapDHandler::getTargetNames(
4747  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4748  std::vector<std::string> names;
4749  for (const auto target : targets) {
4750  CHECK(target);
4751  CHECK(target->get_expr());
4752  names.push_back(target->get_resname());
4753  }
4754  return names;
4755 }
4756 
4757 std::vector<std::string> MapDHandler::getTargetNames(
4758  const std::vector<TargetMetaInfo>& targets) const {
4759  std::vector<std::string> names;
4760  for (const auto target : targets) {
4761  names.push_back(target.get_resname());
4762  }
4763  return names;
4764 }
4765 
4767  const size_t idx) const {
4768  TColumnType proj_info;
4769  proj_info.col_name = target.get_resname();
4770  if (proj_info.col_name.empty()) {
4771  proj_info.col_name = "result_" + std::to_string(idx + 1);
4772  }
4773  const auto& target_ti = target.get_type_info();
4774  proj_info.col_type.type = type_to_thrift(target_ti);
4775  proj_info.col_type.encoding = encoding_to_thrift(target_ti);
4776  proj_info.col_type.nullable = !target_ti.get_notnull();
4777  proj_info.col_type.is_array = target_ti.get_type() == kARRAY;
4778  if (IS_GEO(target_ti.get_type())) {
4780  proj_info, target_ti.get_subtype(), target_ti.get_output_srid());
4781  } else {
4782  proj_info.col_type.precision = target_ti.get_precision();
4783  proj_info.col_type.scale = target_ti.get_scale();
4784  }
4785  if (target_ti.get_type() == kDATE) {
4786  proj_info.col_type.size = target_ti.get_size();
4787  }
4788  proj_info.col_type.comp_param =
4789  (target_ti.is_date_in_days() && target_ti.get_comp_param() == 0)
4790  ? 32
4791  : target_ti.get_comp_param();
4792  return proj_info;
4793 }
4794 
4796  const std::vector<TargetMetaInfo>& targets) const {
4797  TRowDescriptor row_desc;
4798  size_t i = 0;
4799  for (const auto target : targets) {
4800  row_desc.push_back(convert_target_metainfo(target, i));
4801  ++i;
4802  }
4803  return row_desc;
4804 }
4805 
4806 template <class R>
4807 void MapDHandler::convert_rows(TQueryResult& _return,
4808  QueryStateProxy query_state_proxy,
4809  const std::vector<TargetMetaInfo>& targets,
4810  const R& results,
4811  const bool column_format,
4812  const int32_t first_n,
4813  const int32_t at_most_n) const {
4814  query_state::Timer timer = query_state_proxy.createTimer(__func__);
4815  _return.row_set.row_desc = convert_target_metainfo(targets);
4816  int32_t fetched{0};
4817  if (column_format) {
4818  _return.row_set.is_columnar = true;
4819  std::vector<TColumn> tcolumns(results.colCount());
4820  while (first_n == -1 || fetched < first_n) {
4821  const auto crt_row = results.getNextRow(true, true);
4822  if (crt_row.empty()) {
4823  break;
4824  }
4825  ++fetched;
4826  if (at_most_n >= 0 && fetched > at_most_n) {
4827  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4828  std::to_string(at_most_n));
4829  }
4830  for (size_t i = 0; i < results.colCount(); ++i) {
4831  const auto agg_result = crt_row[i];
4832  value_to_thrift_column(agg_result, targets[i].get_type_info(), tcolumns[i]);
4833  }
4834  }
4835  for (size_t i = 0; i < results.colCount(); ++i) {
4836  _return.row_set.columns.push_back(tcolumns[i]);
4837  }
4838  } else {
4839  _return.row_set.is_columnar = false;
4840  while (first_n == -1 || fetched < first_n) {
4841  const auto crt_row = results.getNextRow(true, true);
4842  if (crt_row.empty()) {
4843  break;
4844  }
4845  ++fetched;
4846  if (at_most_n >= 0 && fetched > at_most_n) {
4847  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4848  std::to_string(at_most_n));
4849  }
4850  TRow trow;
4851  trow.cols.reserve(results.colCount());
4852  for (size_t i = 0; i < results.colCount(); ++i) {
4853  const auto agg_result = crt_row[i];
4854  trow.cols.push_back(value_to_thrift(agg_result, targets[i].get_type_info()));
4855  }
4856  _return.row_set.rows.push_back(trow);
4857  }
4858  }
4859 }
4860 
4861 TRowDescriptor MapDHandler::fixup_row_descriptor(const TRowDescriptor& row_desc,
4862  const Catalog& cat) {
4863  TRowDescriptor fixedup_row_desc;
4864  for (const TColumnType& col_desc : row_desc) {
4865  auto fixedup_col_desc = col_desc;
4866  if (col_desc.col_type.encoding == TEncodingType::DICT &&
4867  col_desc.col_type.comp_param > 0) {
4868  const auto dd = cat.getMetadataForDict(col_desc.col_type.comp_param, false);
4869  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
4870  }
4871  fixedup_row_desc.push_back(fixedup_col_desc);
4872  }
4873  return fixedup_row_desc;
4874 }
4875 
4876 // create simple result set to return a single column result
4877 void MapDHandler::create_simple_result(TQueryResult& _return,
4878  const ResultSet& results,
4879  const bool column_format,
4880  const std::string label) const {
4881  CHECK_EQ(size_t(1), results.rowCount());
4882  TColumnType proj_info;
4883  proj_info.col_name = label;
4884  proj_info.col_type.type = TDatumType::STR;
4885  proj_info.col_type.nullable = false;
4886  proj_info.col_type.is_array = false;
4887  _return.row_set.row_desc.push_back(proj_info);
4888  const auto crt_row = results.getNextRow(true, true);
4889  const auto tv = crt_row[0];
4890  CHECK(results.getNextRow(true, true).empty());
4891  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
4892  CHECK(scalar_tv);
4893  const auto s_n = boost::get<NullableString>(scalar_tv);
4894  CHECK(s_n);
4895  const auto s = boost::get<std::string>(s_n);
4896  CHECK(s);
4897  if (column_format) {
4898  TColumn tcol;
4899  tcol.data.str_col.push_back(*s);
4900  tcol.nulls.push_back(false);
4901  _return.row_set.is_columnar = true;
4902  _return.row_set.columns.push_back(tcol);
4903  } else {
4904  TDatum explanation;
4905  explanation.val.str_val = *s;
4906  explanation.is_null = false;
4907  TRow trow;
4908  trow.cols.push_back(explanation);
4909  _return.row_set.is_columnar = false;
4910  _return.row_set.rows.push_back(trow);
4911  }
4912 }
4913 
4914 void MapDHandler::convert_explain(TQueryResult& _return,
4915  const ResultSet& results,
4916  const bool column_format) const {
4917  create_simple_result(_return, results, column_format, "Explanation");
4918 }
4919 
4920 void MapDHandler::convert_result(TQueryResult& _return,
4921  const ResultSet& results,
4922  const bool column_format) const {
4923  create_simple_result(_return, results, column_format, "Result");
4924 }
4925 
4926 // this all should be moved out of here to catalog
4928  const Catalog_Namespace::SessionInfo& session_info,
4929  const TableDescriptor* td,
4930  const AccessPrivileges access_priv) {
4931  CHECK(td);
4932  auto& cat = session_info.getCatalog();
4933  std::vector<DBObject> privObjects;
4934  DBObject dbObject(td->tableName, TableDBObjectType);
4935  dbObject.loadKey(cat);
4936  dbObject.setPrivileges(access_priv);
4937  privObjects.push_back(dbObject);
4938  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
4939  privObjects);
4940 };
4941 
4943  const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl);
4944  if (drop_db_stmt) {
4945  invalidate_sessions(*drop_db_stmt->getDatabaseName(), drop_db_stmt);
4946  return;
4947  }
4948  const auto rename_db_stmt = dynamic_cast<Parser::RenameDatabaseStmt*>(ddl);
4949  if (rename_db_stmt) {
4950  invalidate_sessions(*rename_db_stmt->getPreviousDatabaseName(), rename_db_stmt);
4951  return;
4952  }
4953  const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl);
4954  if (drop_user_stmt) {
4955  invalidate_sessions(*drop_user_stmt->getUserName(), drop_user_stmt);
4956  return;
4957  }
4958  const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl);
4959  if (rename_user_stmt) {
4960  invalidate_sessions(*rename_user_stmt->getOldUserName(), rename_user_stmt);
4961  return;
4962  }
4963 }
4964 
4965 void MapDHandler::sql_execute_impl(TQueryResult& _return,
4966  QueryStateProxy query_state_proxy,
4967  const bool column_format,
4968  const std::string& nonce,
4969  const ExecutorDeviceType executor_device_type,
4970  const int32_t first_n,
4971  const int32_t at_most_n) {
4972  if (leaf_handler_) {
4973  leaf_handler_->flush_queue();
4974  }
4975 
4976  _return.nonce = nonce;
4977  _return.execution_time_ms = 0;
4978  auto const& query_str = query_state_proxy.getQueryState().getQueryStr();
4979  auto session_ptr = query_state_proxy.getQueryState().getConstSessionInfo();
4980  // Call to DistributedValidate() below may change cat.
4981  auto& cat = session_ptr->getCatalog();
4982 
4983  SQLParser parser;
4984  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
4985  std::string last_parsed;
4986  int num_parse_errors = 0;
4987 
4988  /*
4989  Use this seq to simplify locking:
4990  INSERT_VALUES: CheckpointLock [ >> TableWriteLock ]
4991  INSERT_SELECT: CheckpointLock >> TableReadLock [ >>
4992  TableWriteLock ] COPY_TO/SELECT: TableReadLock COPY_FROM: CheckpointLock [
4993  >> TableWriteLock ] DROP/TRUNC: CheckpointLock >> TableWriteLock
4994  DELETE/UPDATE: CheckpointLock >> TableWriteLock
4995  */
4996 
4997  mapd_unique_lock<mapd_shared_mutex> checkpoint_lock;
4998  mapd_unique_lock<mapd_shared_mutex> executeWriteLock;
4999  mapd_shared_lock<mapd_shared_mutex> executeReadLock;
5000 
5002  try {
5003  ParserWrapper pw{query_str};
5004  if (pw.isCalcitePathPermissable(read_only_)) {
5005  executeReadLock = mapd_shared_lock<mapd_shared_mutex>(
5008 
5009  std::string query_ra;
5010  _return.execution_time_ms += measure<>::execution([&]() {
5011  TPlanResult result;
5012  std::tie(result, locks) =
5013  parse_to_ra(query_state_proxy, query_str, {}, true, mapd_parameters_);
5014  query_ra = result.plan_result;
5015  });
5016 
5017  std::string query_ra_calcite_explain;
5018  if (pw.isCalciteExplain() && (!g_enable_filter_push_down || g_cluster)) {
5019  // return the ra as the result
5020  convert_explain(_return, ResultSet(query_ra), true);
5021  return;
5022  } else if (pw.isCalciteExplain()) {
5023  // removing the "explain calcite " from the beginning of the "query_str":
5024  std::string temp_query_str =
5025  query_str.substr(std::string("explain calcite ").length());
5026  query_ra_calcite_explain =
5027  parse_to_ra(query_state_proxy, temp_query_str, {}, false, mapd_parameters_)
5028  .first.plan_result;
5029  } else if (pw.isCalciteDdl()) {
5030  executeDdl(_return, query_ra, session_ptr);
5031  return;
5032  }
5033  const auto explain_info = pw.getExplainInfo();
5034  const auto filter_push_down_requests = execute_rel_alg(
5035  _return,
5036  query_state_proxy,
5037  explain_info.justCalciteExplain() ? query_ra_calcite_explain : query_ra,
5038  column_format,
5039  executor_device_type,
5040  first_n,
5041  at_most_n,
5042  /*just_validate=*/false,
5044  explain_info);
5045  if (explain_info.justCalciteExplain() && filter_push_down_requests.empty()) {
5046  // we only reach here if filter push down was enabled, but no filter
5047  // push down candidate was found
5048  convert_explain(_return, ResultSet(query_ra), true);
5049  return;
5050  }
5051  if (!filter_push_down_requests.empty()) {
5053  query_state_proxy,
5054  query_ra,
5055  column_format,
5056  executor_device_type,
5057  first_n,
5058  at_most_n,
5059  explain_info.justExplain(),
5060  explain_info.justCalciteExplain(),
5061  filter_push_down_requests);
5062  } else if (explain_info.justCalciteExplain() && filter_push_down_requests.empty()) {
5063  // return the ra as the result:
5064  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
5065  // no filter push down so we continue with the initial (unchanged) query's calcite
5066  // explanation.
5067  query_ra = parse_to_ra(query_state_proxy, query_str, {}, false, mapd_parameters_)
5068  .first.plan_result;
5069  convert_explain(_return, ResultSet(query_ra), true);
5070  return;
5071  }
5072  if (explain_info.justCalciteExplain()) {
5073  // If we reach here, the filter push down candidates has been selected and
5074  // proper output result has been already created.
5075  return;
5076  }
5077  if (explain_info.justCalciteExplain()) {
5078  // return the ra as the result:
5079  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
5080  // no filter push down so we continue with the initial (unchanged) query's calcite
5081  // explanation.
5082  query_ra = parse_to_ra(query_state_proxy, query_str, {}, false, mapd_parameters_)
5083  .first.plan_result;
5084  convert_explain(_return, ResultSet(query_ra), true);
5085  return;
5086  }
5087  return;
5088  } else if (pw.is_optimize || pw.is_validate) {
5089  // Get the Stmt object
5090  try {
5091  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
5092  } catch (std::exception& e) {
5093  throw std::runtime_error(e.what());
5094  }
5095  if (num_parse_errors > 0) {
5096  throw std::runtime_error("Syntax error at: " + last_parsed);
5097  }
5098  CHECK_EQ(parse_trees.size(), 1u);
5099 
5100  if (pw.is_optimize) {
5101  const auto optimize_stmt =
5102  dynamic_cast<Parser::OptimizeTableStmt*>(parse_trees.front().get());
5103  CHECK(optimize_stmt);
5104 
5105  _return.execution_time_ms += measure<>::execution([&]() {
5106  const auto td_with_lock = lockmgr::TableSchemaLockContainer<
5107  lockmgr::WriteLock>::acquireTableDescriptor(cat,
5108  optimize_stmt->getTableName());
5109  const auto td = td_with_lock();
5110 
5111  if (!td || !user_can_access_table(
5112  *session_ptr, td, AccessPrivileges::DELETE_FROM_TABLE)) {
5113  throw std::runtime_error("Table " + optimize_stmt->getTableName() +
5114  " does not exist.");
5115  }
5116 
5117  // acquire write lock on table data
5118  auto data_lock =
5120 
5121  auto executor =
5122  Executor::getExecutor(cat.getCurrentDB().dbId, "", "", mapd_parameters_);
5123  const TableOptimizer optimizer(td, executor.get(), cat);
5124  if (optimize_stmt->shouldVacuumDeletedRows()) {
5125  optimizer.vacuumDeletedRows();
5126  }
5127  optimizer.recomputeMetadata();
5128  });
5129 
5130  return;
5131  }
5132  if (pw.is_validate) {
5133  // check user is superuser
5134  if (!session_ptr->get_currentUser().isSuper) {
5135  throw std::runtime_error("Superuser is required to run VALIDATE");
5136  }
5137  const auto validate_stmt =
5138  dynamic_cast<Parser::ValidateStmt*>(parse_trees.front().get());
5139  CHECK(validate_stmt);
5140 
5141  // Prevent any other query from running while doing validate
5142  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5145 
5146  std::string output{"Result for validate"};
5148  _return.execution_time_ms += measure<>::execution([&]() {
5149  const DistributedValidate validator(validate_stmt->getType(),
5150  validate_stmt->isRepairTypeRemove(),
5151  cat, // tables may be dropped here
5153  *session_ptr,
5154  *this);
5155  output = validator.validate();
5156  });
5157  } else {
5158  output = "Not running on a cluster nothing to validate";
5159  }
5160  convert_result(_return, ResultSet(output), true);
5161  return;
5162  }
5163  }
5164  LOG(INFO) << "passing query to legacy processor";
5165  } catch (std::exception& e) {
5166  if (strstr(e.what(), "java.lang.NullPointerException")) {
5167  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
5168  "query failed from broken view or other schema related issue");
5169  } else {
5170  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5171  }
5172  }
5173  try {
5174  // check for COPY TO stmt replace as required parser expects #~# markers
5175  const auto result = apply_copy_to_shim(query_str);
5176  num_parse_errors = parser.parse(result, parse_trees, last_parsed);
5177  } catch (std::exception& e) {
5178  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5179  }
5180  if (num_parse_errors > 0) {
5181  THROW_MAPD_EXCEPTION("Syntax error at: " + last_parsed);
5182  }
5183 
5184  auto handle_ddl = [&query_state_proxy, &session_ptr, &_return, &locks, this](
5185  Parser::DDLStmt* ddl) -> bool {
5186  if (!ddl) {
5187  return false;
5188  }
5189  const auto show_create_stmt = dynamic_cast<Parser::ShowCreateTableStmt*>(ddl);
5190  if (show_create_stmt) {
5191  // ParserNode ShowCreateTableStmt is currently unimplemented
5192  throw std::runtime_error(
5193  "SHOW CREATE TABLE is currently unsupported. Use `\\d` from omnisql for table "
5194  "DDL.");
5195  }
5196 
5197  const auto import_stmt = dynamic_cast<Parser::CopyTableStmt*>(ddl);
5198  if (import_stmt) {
5199  if (g_cluster && !leaf_aggregator_.leafCount()) {
5200  // Don't allow copy from imports directly on a leaf node
5201  throw std::runtime_error(
5202  "Cannot import on an individual leaf. Please import from the Aggregator.");
5203  } else if (leaf_aggregator_.leafCount() > 0) {
5204  _return.execution_time_ms += measure<>::execution(
5205  [&]() { execute_distributed_copy_statement(import_stmt, *session_ptr); });
5206  } else {
5207  _return.execution_time_ms +=
5208  measure<>::execution([&]() { ddl->execute(*session_ptr); });
5209  }
5210 
5211  // Read response message
5212  convert_result(_return, ResultSet(*import_stmt->return_message.get()), true);
5213  _return.success = import_stmt->get_success();
5214 
5215  // get geo_copy_from info
5216  if (import_stmt->was_geo_copy_from()) {
5217  GeoCopyFromState geo_copy_from_state;
5218  import_stmt->get_geo_copy_from_payload(
5219  geo_copy_from_state.geo_copy_from_table,
5220  geo_copy_from_state.geo_copy_from_file_name,
5221  geo_copy_from_state.geo_copy_from_copy_params,
5222  geo_copy_from_state.geo_copy_from_partitions);
5223  geo_copy_from_sessions.add(session_ptr->get_session_id(), geo_copy_from_state);
5224  }
5225  return true;
5226  }
5227 
5228  // Check for DDL statements requiring locking and get locks
5229  auto export_stmt = dynamic_cast<Parser::ExportQueryStmt*>(ddl);
5230  if (export_stmt) {
5231  const auto query_string = export_stmt->get_select_stmt();
5232  TPlanResult result;
5233  CHECK(locks.empty());
5234  std::tie(result, locks) =
5235  parse_to_ra(query_state_proxy, query_string, {}, true, mapd_parameters_);
5236  }
5237  _return.execution_time_ms += measure<>::execution([&]() {
5238  ddl->execute(*session_ptr);
5240  });
5241  return true;
5242  };
5243 
5244  for (const auto& stmt : parse_trees) {
5245  try {
5246  auto select_stmt = dynamic_cast<Parser::SelectStmt*>(stmt.get());
5247  if (!select_stmt) {
5248  check_read_only("Non-SELECT statements");
5249  }
5250  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt.get());
5251  if (handle_ddl(ddl)) {
5252  if (render_handler_) {
5253  render_handler_->handle_ddl(ddl);
5254  }
5255  } else {
5256  auto stmtp = dynamic_cast<Parser::InsertValuesStmt*>(stmt.get());
5257  CHECK(stmtp); // no other statements supported
5258 
5259  if (parse_trees.size() != 1) {
5260  THROW_MAPD_EXCEPTION("Can only run one INSERT INTO query at a time.");
5261  }
5262 
5263  _return.execution_time_ms +=
5264  measure<>::execution([&]() { stmtp->execute(*session_ptr); });
5265  }
5266  } catch (std::exception& e) {
5267  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
5268  THROW_MAPD_EXCEPTION(thrift_exception ? std::string(thrift_exception->what())
5269  : std::string("Exception: ") + e.what());
5270  }
5271  }
5272 }
5273 
5275  TQueryResult& _return,
5276  QueryStateProxy query_state_proxy,
5277  std::string& query_ra,
5278  const bool column_format,
5279  const ExecutorDeviceType executor_device_type,
5280  const int32_t first_n,
5281  const int32_t at_most_n,
5282  const bool just_explain,
5283  const bool just_calcite_explain,
5284  const std::vector<PushedDownFilterInfo> filter_push_down_requests) {
5285  // collecting the selected filters' info to be sent to Calcite:
5286  std::vector<TFilterPushDownInfo> filter_push_down_info;
5287  for (const auto& req : filter_push_down_requests) {
5288  TFilterPushDownInfo filter_push_down_info_for_request;
5289  filter_push_down_info_for_request.input_prev = req.input_prev;
5290  filter_push_down_info_for_request.input_start = req.input_start;
5291  filter_push_down_info_for_request.input_next = req.input_next;
5292  filter_push_down_info.push_back(filter_push_down_info_for_request);
5293  }
5294  // deriving the new relational algebra plan with respect to the pushed down filters
5295  _return.execution_time_ms += measure<>::execution([&]() {
5296  query_ra = parse_to_ra(query_state_proxy,
5297  query_state_proxy.getQueryState().getQueryStr(),
5298  filter_push_down_info,
5299  false,
5301  .first.plan_result;
5302  });
5303 
5304  if (just_calcite_explain) {
5305  // return the new ra as the result
5306  convert_explain(_return, ResultSet(query_ra), true);
5307  return;
5308  }
5309 
5310  // execute the new relational algebra plan:
5311  auto explain_info = ExplainInfo::defaults();
5312  explain_info.explain = just_explain;
5313  execute_rel_alg(_return,
5314  query_state_proxy,
5315  query_ra,
5316  column_format,
5317  executor_device_type,
5318  first_n,
5319  at_most_n,
5320  /*just_validate=*/false,
5321  /*find_push_down_candidates=*/false,
5322  explain_info);
5323 }
5324 
5326  Parser::CopyTableStmt* copy_stmt,
5327  const Catalog_Namespace::SessionInfo& session_info) {
5328  auto importer_factory = [&session_info, this](
5329  const Catalog& catalog,
5330  const TableDescriptor* td,
5331  const std::string& file_path,
5332  const Importer_NS::CopyParams& copy_params) {
5333  return std::make_unique<Importer_NS::Importer>(
5334  new DistributedLoader(session_info, td, &leaf_aggregator_),
5335  file_path,
5336  copy_params);
5337  };
5338  copy_stmt->execute(session_info, importer_factory);
5339 }
5340 
5341 std::pair<TPlanResult, lockmgr::LockedTableDescriptors> MapDHandler::parse_to_ra(
5342  QueryStateProxy query_state_proxy,
5343  const std::string& query_str,
5344  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
5345  const bool acquire_locks,
5346  const MapDParameters mapd_parameters,
5347  RenderInfo* render_info,
5348  bool check_privileges) {
5349  query_state::Timer timer = query_state_proxy.createTimer(__func__);
5350  ParserWrapper pw{query_str};
5351  const std::string actual_query{pw.isSelectExplain() ? pw.actual_query : query_str};
5352  TPlanResult result;
5353  if (pw.isCalcitePathPermissable()) {
5354  auto cat = query_state_proxy.getQueryState().getConstSessionInfo()->get_catalog_ptr();
5355  auto session_cleanup_handler = [&](const auto& session_id) {
5356  removeInMemoryCalciteSession(session_id);
5357  };
5358  auto process_calcite_request = [&] {
5359  const auto& in_memory_session_id = createInMemoryCalciteSession(cat);
5360  try {
5361  result = calcite_->process(timer.createQueryStateProxy(),
5362  legacy_syntax_ ? pg_shim(actual_query) : actual_query,
5363  filter_push_down_info,
5365  pw.isCalciteExplain(),
5366  mapd_parameters.enable_calcite_view_optimize,
5367  check_privileges,
5368  in_memory_session_id);
5369  session_cleanup_handler(in_memory_session_id);
5370  } catch (std::exception&) {
5371  session_cleanup_handler(in_memory_session_id);
5372  throw;
5373  }
5374  };
5375  process_calcite_request();
5377  if (acquire_locks) {
5378  std::set<std::string> read_only_tables;
5379  for (const auto& table : result.resolved_accessed_objects.tables_selected_from) {
5380  read_only_tables.insert(table);
5381  }
5382  std::vector<std::string> tables;
5383  tables.insert(tables.end(),
5384  result.resolved_accessed_objects.tables_selected_from.begin(),
5385  result.resolved_accessed_objects.tables_selected_from.end());
5386  tables.insert(tables.end(),
5387  result.resolved_accessed_objects.tables_inserted_into.begin(),
5388  result.resolved_accessed_objects.tables_inserted_into.end());
5389  tables.insert(tables.end(),
5390  result.resolved_accessed_objects.tables_updated_in.begin(),
5391  result.resolved_accessed_objects.tables_updated_in.end());
5392  tables.insert(tables.end(),
5393  result.resolved_accessed_objects.tables_deleted_from.begin(),
5394  result.resolved_accessed_objects.tables_deleted_from.end());
5395  // avoid deadlocks by enforcing a deterministic locking sequence
5396  std::sort(tables.begin(), tables.end());
5397  for (const auto& table : tables) {
5398  // first, obtain table schema locks
5399  // then, obtain table data locks
5400  if (read_only_tables.count(table)) {
5401  locks.emplace_back(
5404  lockmgr::ReadLock>::acquireTableDescriptor(*cat.get(), table)));
5405  locks.emplace_back(
5408  cat->getDatabaseId(), (*locks.back())())));
5409  } else {
5410  locks.emplace_back(
5413  lockmgr::WriteLock>::acquireTableDescriptor(*cat.get(), table)));
5414  // TODO(adb): Should we be taking this lock for inserts? Are inserts even going
5415  // down this path?
5416  locks.emplace_back(
5419  cat->getDatabaseId(), (*locks.back())())));
5420  }
5421  }
5422  }
5423  if (render_info) {
5424  // grabs all the selected-from tables, even views. This is used by the renderer to
5425  // resolve view hit-testing.
5426  // NOTE: the same table name could exist in both the primary and resolved tables.
5427  auto selected_tables = &result.primary_accessed_objects.tables_selected_from;
5428  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5429  selected_tables = &result.resolved_accessed_objects.tables_selected_from;
5430  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5431  }
5432  return std::make_pair(result, std::move(locks));
5433  }
5434  return std::make_pair(result, lockmgr::LockedTableDescriptors{});
5435 }
5436 
5437 void MapDHandler::check_table_consistency(TTableMeta& _return,
5438  const TSessionId& session,
5439  const int32_t table_id) {
5440  auto stdlog = STDLOG(get_session_ptr(session));
5441  if (!leaf_handler_) {
5442  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5443  }
5444  try {
5445  leaf_handler_->check_table_consistency(_return, session, table_id);
5446  } catch (std::exception& e) {
5447  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5448  }
5449 }
5450 
5451 void MapDHandler::start_query(TPendingQuery& _return,
5452  const TSessionId& leaf_session,
5453  const TSessionId& parent_session,
5454  const std::string& query_ra,
5455  const bool just_explain) {
5456  auto stdlog = STDLOG(get_session_ptr(leaf_session));
5457  auto session_ptr = stdlog.getConstSessionInfo();
5458  if (!leaf_handler_) {
5459  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5460  }
5461  LOG(INFO) << "start_query :" << *session_ptr << " :" << just_explain;
5462  auto time_ms = measure<>::execution([&]() {
5463  try {
5464  leaf_handler_->start_query(
5465  _return, leaf_session, parent_session, query_ra, just_explain);
5466  } catch (std::exception& e) {
5467  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5468  }
5469  });
5470  LOG(INFO) << "start_query-COMPLETED " << time_ms << "ms "
5471  << "id is " << _return.id;
5472 }
5473 
5474 void MapDHandler::execute_query_step(TStepResult& _return,
5475  const TPendingQuery& pending_query) {
5476  if (!leaf_handler_) {
5477  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5478  }
5479  LOG(INFO) << "execute_query_step : id:" << pending_query.id;
5480  auto time_ms = measure<>::execution([&]() {
5481  try {
5482  leaf_handler_->execute_query_step(_return, pending_query);
5483  } catch (std::exception& e) {
5484  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5485  }
5486  });
5487  LOG(INFO) << "execute_query_step-COMPLETED " << time_ms << "ms";
5488 }
5489 
5490 void MapDHandler::broadcast_serialized_rows(const TSerializedRows& serialized_rows,
5491  const TRowDescriptor& row_desc,
5492  const TQueryId query_id) {
5493  if (!leaf_handler_) {
5494  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5495  }
5496  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS id:" << query_id;
5497  auto time_ms = measure<>::execution([&]() {
5498  try {
5499  leaf_handler_->broadcast_serialized_rows(serialized_rows, row_desc, query_id);
5500  } catch (std::exception& e) {
5501  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5502  }
5503  });
5504  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS COMPLETED " << time_ms << "ms";
5505 }
5506 
5507 void MapDHandler::insert_data(const TSessionId& session,
5508  const TInsertData& thrift_insert_data) {
5509  try {
5510  auto stdlog = STDLOG(get_session_ptr(session));
5511  auto session_ptr = stdlog.getConstSessionInfo();
5512  CHECK_EQ(thrift_insert_data.column_ids.size(), thrift_insert_data.data.size());
5513  auto const& cat = session_ptr->getCatalog();
5515  insert_data.databaseId = thrift_insert_data.db_id;
5516  insert_data.tableId = thrift_insert_data.table_id;
5517  insert_data.columnIds = thrift_insert_data.column_ids;
5518  std::vector<std::unique_ptr<std::vector<std::string>>> none_encoded_string_columns;
5519  std::vector<std::unique_ptr<std::vector<ArrayDatum>>> array_columns;
5520  for (size_t col_idx = 0; col_idx < insert_data.columnIds.size(); ++col_idx) {
5521  const int column_id = insert_data.columnIds[col_idx];
5522  DataBlockPtr p;
5523  const auto cd = cat.getMetadataForColumn(insert_data.tableId, column_id);
5524  CHECK(cd);
5525  const auto& ti = cd->columnType;
5526  if (ti.is_number() || ti.is_time() || ti.is_boolean()) {
5527  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5528  } else if (ti.is_string()) {
5529  if (ti.get_compression() == kENCODING_DICT) {
5530  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5531  } else {
5532  CHECK_EQ(kENCODING_NONE, ti.get_compression());
5533  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5534  auto& none_encoded_strings = none_encoded_string_columns.back();
5535  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5536  thrift_insert_data.data[col_idx].var_len_data.size());
5537  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5538  none_encoded_strings->push_back(varlen_str.payload);
5539  }
5540  p.stringsPtr = none_encoded_strings.get();
5541  }
5542  } else if (ti.is_geometry()) {
5543  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5544  auto& none_encoded_strings = none_encoded_string_columns.back();
5545  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5546  thrift_insert_data.data[col_idx].var_len_data.size());
5547  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5548  none_encoded_strings->push_back(varlen_str.payload);
5549  }
5550  p.stringsPtr = none_encoded_strings.get();
5551  } else {
5552  CHECK(ti.is_array());
5553  array_columns.emplace_back(new std::vector<ArrayDatum>());
5554  auto& array_column = array_columns.back();
5555  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5556  thrift_insert_data.data[col_idx].var_len_data.size());
5557  for (const auto& t_arr_datum : thrift_insert_data.data[col_idx].var_len_data) {
5558  if (t_arr_datum.is_null) {
5559  if (ti.get_size() > 0 && !ti.get_elem_type().is_string()) {
5560  array_column->push_back(Importer_NS::ImporterUtils::composeNullArray(ti));
5561  } else {
5562  array_column->emplace_back(0, nullptr, true);
5563  }
5564  } else {
5565  ArrayDatum arr_datum;
5566  arr_datum.length = t_arr_datum.payload.size();
5567  int8_t* ptr = (int8_t*)(t_arr_datum.payload.data());
5568  arr_datum.pointer = ptr;
5569  // In this special case, ArrayDatum does not handle freeing the underlying
5570  // memory
5571  arr_datum.data_ptr = std::shared_ptr<int8_t>(ptr, [](auto p) {});
5572  arr_datum.is_null = false;
5573  array_column->push_back(arr_datum);
5574  }
5575  }
5576  p.arraysPtr = array_column.get();
5577  }
5578  insert_data.data.push_back(p);
5579  }
5580  insert_data.numRows = thrift_insert_data.num_rows;
5581  const auto td_with_lock =
5583  cat, insert_data.tableId);
5584  const auto td = td_with_lock();
5585  CHECK(td);
5586 
5587  // this should have the same lock seq as COPY FROM
5588  ChunkKey chunkKey = {insert_data.databaseId, insert_data.tableId};
5589  auto insert_data_lock = lockmgr::InsertDataLockMgr::getWriteLockForTable(chunkKey);
5590  td->fragmenter->insertDataNoCheckpoint(insert_data);
5591  } catch (const std::exception& e) {
5592  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
5593  }
5594 }
5595 
5596 void MapDHandler::start_render_query(TPendingRenderQuery& _return,
5597  const TSessionId& session,
5598  const int64_t widget_id,
5599  const int16_t node_idx,
5600  const std::string& vega_json) {
5601  auto stdlog = STDLOG(get_session_ptr(session));
5602  auto session_ptr = stdlog.getConstSessionInfo();
5603  if (!render_handler_) {
5604  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5605  }
5606  LOG(INFO) << "start_render_query :" << *session_ptr << " :widget_id:" << widget_id
5607  << ":vega_json:" << vega_json;
5608  auto time_ms = measure<>::execution([&]() {
5609  try {
5610  render_handler_->start_render_query(
5611  _return, session, widget_id, node_idx, vega_json);
5612  } catch (std::exception& e) {
5613  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5614  }
5615  });
5616  LOG(INFO) << "start_render_query-COMPLETED " << time_ms << "ms "
5617  << "id is " << _return.id;
5618 }
5619 
5620 void MapDHandler::execute_next_render_step(TRenderStepResult& _return,
5621  const TPendingRenderQuery& pending_render,
5622  const TRenderAggDataMap& merged_data) {
5623  if (!render_handler_) {
5624  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5625  }
5626 
5627  LOG(INFO) << "execute_next_render_step: id:" << pending_render.id;
5628  auto time_ms = measure<>::execution([&]() {
5629  try {