OmniSciDB  04ee39c94c
MapDHandler.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * File: MapDHandler.cpp
19  * Author: michael
20  *
21  * Created on Jan 1, 2017, 12:40 PM
22  */
23 
24 #include "MapDHandler.h"
25 #include "DistributedLoader.h"
26 #include "MapDServer.h"
28 #include "TokenCompletionHints.h"
29 #ifdef HAVE_PROFILER
30 #include <gperftools/heap-profiler.h>
31 #endif // HAVE_PROFILER
32 #include <thrift/concurrency/PlatformThreadFactory.h>
33 #include <thrift/concurrency/ThreadManager.h>
34 #include <thrift/protocol/TBinaryProtocol.h>
35 #include <thrift/protocol/TJSONProtocol.h>
36 #include <thrift/server/TThreadedServer.h>
37 #include <thrift/transport/TBufferTransports.h>
38 #include <thrift/transport/THttpServer.h>
39 #include <thrift/transport/TServerSocket.h>
40 #include <thrift/transport/TTransportException.h>
41 
42 #include "MapDRelease.h"
43 
44 #include "Calcite/Calcite.h"
45 #include "gen-cpp/CalciteServer.h"
46 
48 
49 #include "Catalog/Catalog.h"
51 #include "Import/Importer.h"
52 #include "LockMgr/TableLockMgr.h"
53 #include "MapDDistributedHandler.h"
54 #include "MapDRenderHandler.h"
55 #include "Parser/ParserWrapper.h"
57 #include "Parser/parser.h"
58 #include "Planner/Planner.h"
61 #include "QueryEngine/Execute.h"
69 #include "Shared/StringTransform.h"
70 #include "Shared/SysInfo.h"
71 #include "Shared/geo_types.h"
72 #include "Shared/geosupport.h"
73 #include "Shared/import_helpers.h"
75 #include "Shared/measure.h"
76 #include "Shared/scope.h"
77 
78 #include <fcntl.h>
79 #include <picosha2.h>
80 #include <sys/time.h>
81 #include <sys/types.h>
82 #include <sys/wait.h>
83 #include <unistd.h>
84 #include <boost/algorithm/string.hpp>
85 #include <boost/filesystem.hpp>
86 #include <boost/make_shared.hpp>
87 #include <boost/process/search_path.hpp>
88 #include <boost/program_options.hpp>
89 #include <boost/regex.hpp>
90 #include <boost/tokenizer.hpp>
91 #include <cmath>
92 #include <csignal>
93 #include <fstream>
94 #include <future>
95 #include <map>
96 #include <memory>
97 #include <random>
98 #include <regex>
99 #include <string>
100 #include <thread>
101 #include <typeinfo>
102 
103 #include <arrow/api.h>
104 #include <arrow/io/api.h>
105 #include <arrow/ipc/api.h>
106 
107 #include "QueryEngine/ArrowUtil.h"
108 
109 #define ENABLE_GEO_IMPORT_COLUMN_MATCHING 0
110 
113 using namespace Lock_Namespace;
114 
115 #define INVALID_SESSION_ID ""
116 
117 #define THROW_MAPD_EXCEPTION(errstr) \
118  TMapDException ex; \
119  ex.error_msg = errstr; \
120  LOG(ERROR) << ex.error_msg; \
121  throw ex;
122 
123 namespace {
124 
125 SessionMap::iterator get_session_from_map(const TSessionId& session,
126  SessionMap& session_map) {
127  auto session_it = session_map.find(session);
128  if (session_it == session_map.end()) {
129  THROW_MAPD_EXCEPTION("Session not valid.");
130  }
131  return session_it;
132 }
133 
134 } // namespace
135 
136 MapDHandler::MapDHandler(const std::vector<LeafHostInfo>& db_leaves,
137  const std::vector<LeafHostInfo>& string_leaves,
138  const std::string& base_data_path,
139  const bool cpu_only,
140  const bool allow_multifrag,
141  const bool jit_debug,
142  const bool intel_jit_profile,
143  const bool read_only,
144  const bool allow_loop_joins,
145  const bool enable_rendering,
146  const bool enable_spirv,
147  const bool enable_auto_clear_render_mem,
148  const int render_oom_retry_threshold,
149  const size_t render_mem_bytes,
150  const int num_gpus,
151  const int start_gpu,
152  const size_t reserved_gpu_mem,
153  const size_t num_reader_threads,
154  const AuthMetadata& authMetadata,
155  const MapDParameters& mapd_parameters,
156  const bool legacy_syntax,
157  const int idle_session_duration,
158  const int max_session_duration,
159  const bool enable_runtime_udf_registration,
160  const std::string& udf_filename)
161  : leaf_aggregator_(db_leaves)
162  , string_leaves_(string_leaves)
163  , base_data_path_(base_data_path)
164  , random_gen_(std::random_device{}())
165  , session_id_dist_(0, INT32_MAX)
166  , jit_debug_(jit_debug)
167  , intel_jit_profile_(intel_jit_profile)
168  , allow_multifrag_(allow_multifrag)
169  , read_only_(read_only)
170  , allow_loop_joins_(allow_loop_joins)
171  , authMetadata_(authMetadata)
172  , mapd_parameters_(mapd_parameters)
173  , legacy_syntax_(legacy_syntax)
174  , super_user_rights_(false)
175  , idle_session_duration_(idle_session_duration * 60)
176  , max_session_duration_(max_session_duration * 60)
177  , runtime_udf_registration_enabled_(enable_runtime_udf_registration)
178  , _was_geo_copy_from(false) {
179  LOG(INFO) << "OmniSci Server " << MAPD_RELEASE;
180  bool is_rendering_enabled = enable_rendering;
181  if (cpu_only) {
182  is_rendering_enabled = false;
184  cpu_mode_only_ = true;
185  } else {
186 #ifdef HAVE_CUDA
188  cpu_mode_only_ = false;
189 #else
191  LOG(ERROR) << "This build isn't CUDA enabled, will run on CPU";
192  cpu_mode_only_ = true;
193  is_rendering_enabled = false;
194 #endif
195  }
196 
197  const auto data_path = boost::filesystem::path(base_data_path_) / "mapd_data";
198  // calculate the total amount of memory we need to reserve from each gpu that the Buffer
199  // manage cannot ask for
200  size_t total_reserved = reserved_gpu_mem;
201  if (is_rendering_enabled) {
202  total_reserved += render_mem_bytes;
203  }
204  data_mgr_.reset(new Data_Namespace::DataMgr(data_path.string(),
205  mapd_parameters,
207  num_gpus,
208  start_gpu,
209  total_reserved,
210  num_reader_threads));
211 
212  std::string udf_ast_filename("");
213 
214  if (!udf_filename.empty()) {
215  UdfCompiler compiler(udf_filename);
216  int compile_result = compiler.compileUdf();
217 
218  if (compile_result == 0) {
219  udf_ast_filename = compiler.getAstFileName();
220  }
221  }
222 
223  std::string calcite_session_prefix = "calcite-" + generate_random_string(64);
224 
225  calcite_ = std::make_shared<Calcite>(
226  mapd_parameters, base_data_path_, calcite_session_prefix, udf_ast_filename);
227 
228  ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
229  if (!udf_filename.empty()) {
230  ExtensionFunctionsWhitelist::addUdfs(calcite_->getUserDefinedFunctionWhitelist());
231  }
232 
233  if (!data_mgr_->gpusPresent() && !cpu_mode_only_) {
235  LOG(ERROR) << "No GPUs detected, falling back to CPU mode";
236  cpu_mode_only_ = true;
237  }
238 
239  switch (executor_device_type_) {
241  LOG(INFO) << "Started in GPU mode" << std::endl;
242  break;
244  LOG(INFO) << "Started in CPU mode" << std::endl;
245  break;
246  }
247  SysCatalog::instance().init(base_data_path_,
248  data_mgr_,
249  authMetadata,
250  calcite_,
251  false,
252  !db_leaves.empty(),
254  import_path_ = boost::filesystem::path(base_data_path_) / "mapd_import";
255  start_time_ = std::time(nullptr);
256 
257  if (is_rendering_enabled) {
258  try {
260  this, render_mem_bytes, num_gpus, start_gpu, mapd_parameters_));
261  } catch (const std::exception& e) {
262  LOG(ERROR) << "Backend rendering disabled: " << e.what();
263  }
264  }
265 
266  if (leaf_aggregator_.leafCount() > 0) {
267  try {
268  agg_handler_.reset(new MapDAggHandler(this));
269  } catch (const std::exception& e) {
270  LOG(ERROR) << "Distributed aggregator support disabled: " << e.what();
271  }
272  } else if (g_cluster) {
273  try {
274  leaf_handler_.reset(new MapDLeafHandler(this));
275  } catch (const std::exception& e) {
276  LOG(ERROR) << "Distributed leaf support disabled: " << e.what();
277  }
278  }
279 }
280 
282 
283 void MapDHandler::check_read_only(const std::string& str) {
285  THROW_MAPD_EXCEPTION(str + " disabled: server running in read-only mode.");
286  }
287 }
288 
289 // internal connection for connections with no password
291  const std::string& username,
292  const std::string& dbname) {
293  LOG_SESSION(session); // session not assumed valid here
294  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
295  std::string username2 = username; // login() may reset username given as argument
296  std::string dbname2 = dbname; // login() may reset dbname given as argument
298  std::shared_ptr<Catalog> cat = nullptr;
299  try {
300  cat = SysCatalog::instance().login(
301  dbname2, username2, std::string(""), user_meta, false);
302  } catch (std::exception& e) {
303  THROW_MAPD_EXCEPTION(e.what());
304  }
305 
306  DBObject dbObject(dbname2, DatabaseDBObjectType);
307  dbObject.loadKey(*cat);
308  dbObject.setPrivileges(AccessPrivileges::ACCESS);
309  std::vector<DBObject> dbObjects;
310  dbObjects.push_back(dbObject);
311  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
312  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
313  " is not allowed to access database " + dbname2 + ".");
314  }
315  connect_impl(session, std::string(""), dbname2, user_meta, cat, log_session);
316 }
317 
318 void MapDHandler::connect(TSessionId& session,
319  const std::string& username,
320  const std::string& passwd,
321  const std::string& dbname) {
322  LOG_SESSION(session); // session not assumed valid here
323 
324  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
325  std::string username2 = username; // login() may reset username given as argument
326  std::string dbname2 = dbname; // login() may reset dbname given as argument
328  std::shared_ptr<Catalog> cat = nullptr;
329  try {
330  cat = SysCatalog::instance().login(
331  dbname2, username2, passwd, user_meta, !super_user_rights_);
332  } catch (std::exception& e) {
333  THROW_MAPD_EXCEPTION(e.what());
334  }
335 
336  DBObject dbObject(dbname2, DatabaseDBObjectType);
337  dbObject.loadKey(*cat);
338  dbObject.setPrivileges(AccessPrivileges::ACCESS);
339  std::vector<DBObject> dbObjects;
340  dbObjects.push_back(dbObject);
341  if (!SysCatalog::instance().checkPrivileges(user_meta, dbObjects)) {
342  THROW_MAPD_EXCEPTION("Unauthorized Access: user " + username +
343  " is not allowed to access database " + dbname2 + ".");
344  }
345  connect_impl(session, passwd, dbname2, user_meta, cat, log_session);
346 }
347 
349  const std::string& passwd,
350  const std::string& dbname,
352  std::shared_ptr<Catalog> cat,
353  LogSession& log_session) {
354  do {
355  session = generate_random_string(32);
356  } while (sessions_.find(session) != sessions_.end());
357  std::pair<SessionMap::iterator, bool> emplace_retval =
358  sessions_.emplace(session,
359  std::make_shared<Catalog_Namespace::SessionInfo>(
360  cat, user_meta, executor_device_type_, session));
361  CHECK(emplace_retval.second);
362  SessionMap::mapped_type session_ptr = emplace_retval.first->second;
363  log_session.set_session(session_ptr);
364  if (!super_user_rights_) { // no need to connect to leaf_aggregator_ at this time while
365  // doing warmup
366  if (leaf_aggregator_.leafCount() > 0) {
367  const auto parent_session_info_ptr = session_ptr;
368  CHECK(parent_session_info_ptr);
370  *parent_session_info_ptr, user_meta.userName, passwd, dbname);
371  return;
372  }
373  }
374  auto const roles = session_ptr->get_currentUser().isSuper
375  ? std::vector<std::string>{{"super"}}
376  : SysCatalog::instance().getRoles(
377  false, false, session_ptr->get_currentUser().userName);
378  log_session.append_name_value_pairs("roles", boost::algorithm::join(roles, ","));
379  LOG(INFO) << "User " << user_meta.userName << " connected to database " << dbname
380  << " with public_session_id " << session_ptr->get_public_session_id();
381 }
382 
383 void MapDHandler::disconnect(const TSessionId& session) {
384  LOG_SESSION(session);
385  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
386  auto session_it = get_session_it_unsafe(session);
387  const auto dbname = session_it->second->getCatalog().getCurrentDB().dbName;
388  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
389  << " disconnected from database " << dbname << std::endl;
390  disconnect_impl(session_it);
391 }
392 
393 void MapDHandler::disconnect_impl(const SessionMap::iterator& session_it) {
394  // session_it existence should already have been checked (i.e. called via
395  // get_session_it_unsafe(...))
396  const auto session_id = session_it->second->get_session_id();
397  if (leaf_aggregator_.leafCount() > 0) {
398  leaf_aggregator_.disconnect(session_id);
399  }
400  if (render_handler_) {
401  render_handler_->disconnect(session_id);
402  }
403  sessions_.erase(session_it);
404 }
405 
406 void MapDHandler::switch_database(const TSessionId& session, const std::string& dbname) {
407  LOG_SESSION(session);
408  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
409  auto session_it = get_session_it_unsafe(session);
410 
411  std::string dbname2 = dbname; // switchDatabase() may reset dbname given as argument
412 
413  try {
414  std::shared_ptr<Catalog> cat = SysCatalog::instance().switchDatabase(
415  dbname2, session_it->second->get_currentUser().userName);
416  session_it->second->set_catalog_ptr(cat);
417  } catch (std::exception& e) {
418  THROW_MAPD_EXCEPTION(e.what());
419  }
420 }
421 
422 void MapDHandler::interrupt(const TSessionId& session) {
423  LOG_SESSION(session);
425  // Shared lock to allow simultaneous interrupts of multiple sessions
426  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
427  if (leaf_aggregator_.leafCount() > 0) {
428  leaf_aggregator_.interrupt(session);
429  }
430 
431  auto session_it = get_session_it_unsafe(session);
432  auto& cat = session_it->second.get()->getCatalog();
433  const auto dbname = cat.getCurrentDB().dbName;
434  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
435  jit_debug_ ? "/tmp" : "",
436  jit_debug_ ? "mapdquery" : "",
438  nullptr);
439  CHECK(executor);
440 
441  VLOG(1) << "Received interrupt: "
442  << "Session " << *session_it->second << ", Executor " << executor
443  << ", leafCount " << leaf_aggregator_.leafCount() << ", User "
444  << session_it->second->get_currentUser().userName << ", Database " << dbname
445  << std::endl;
446 
447  executor->interrupt();
448 
449  LOG(INFO) << "User " << session_it->second->get_currentUser().userName
450  << " interrupted session with database " << dbname << std::endl;
451  }
452 }
453 
454 void MapDHandler::get_server_status(TServerStatus& _return, const TSessionId& session) {
455  LOG_SESSION(session);
456  const auto rendering_enabled = bool(render_handler_);
457  _return.read_only = read_only_;
458  _return.version = MAPD_RELEASE;
459  _return.rendering_enabled = rendering_enabled;
460  _return.poly_rendering_enabled = rendering_enabled;
461  _return.start_time = start_time_;
462  _return.edition = MAPD_EDITION;
463  _return.host_name = get_hostname();
464 }
465 
466 void MapDHandler::get_status(std::vector<TServerStatus>& _return,
467  const TSessionId& session) {
468  LOG_SESSION(session);
469  const auto rendering_enabled = bool(render_handler_);
470  TServerStatus ret;
471  ret.read_only = read_only_;
472  ret.version = MAPD_RELEASE;
473  ret.rendering_enabled = rendering_enabled;
474  ret.poly_rendering_enabled = rendering_enabled;
475  ret.start_time = start_time_;
476  ret.edition = MAPD_EDITION;
477  ret.host_name = get_hostname();
478 
479  // TSercivePort tcp_port{}
480 
481  if (g_cluster) {
482  ret.role =
483  (leaf_aggregator_.leafCount() > 0) ? TRole::type::AGGREGATOR : TRole::type::LEAF;
484  } else {
485  ret.role = TRole::type::SERVER;
486  }
487 
488  _return.push_back(ret);
489  if (leaf_aggregator_.leafCount() > 0) {
490  std::vector<TServerStatus> leaf_status = leaf_aggregator_.getLeafStatus(session);
491  _return.insert(_return.end(), leaf_status.begin(), leaf_status.end());
492  }
493 }
494 
495 void MapDHandler::get_hardware_info(TClusterHardwareInfo& _return,
496  const TSessionId& session) {
497  LOG_SESSION(session);
498  THardwareInfo ret;
499  const auto cuda_mgr = data_mgr_->getCudaMgr();
500  if (cuda_mgr) {
501  ret.num_gpu_hw = cuda_mgr->getDeviceCount();
502  ret.start_gpu = cuda_mgr->getStartGpu();
503  if (ret.start_gpu >= 0) {
504  ret.num_gpu_allocated = cuda_mgr->getDeviceCount() - cuda_mgr->getStartGpu();
505  // ^ This will break as soon as we allow non contiguous GPU allocations to MapD
506  }
507  for (int16_t device_id = 0; device_id < ret.num_gpu_hw; device_id++) {
508  TGpuSpecification gpu_spec;
509  auto deviceProperties = cuda_mgr->getDeviceProperties(device_id);
510  gpu_spec.num_sm = deviceProperties->numMPs;
511  gpu_spec.clock_frequency_kHz = deviceProperties->clockKhz;
512  gpu_spec.memory = deviceProperties->globalMem;
513  gpu_spec.compute_capability_major = deviceProperties->computeMajor;
514  gpu_spec.compute_capability_minor = deviceProperties->computeMinor;
515  ret.gpu_info.push_back(gpu_spec);
516  }
517  }
518 
519  // start hardware/OS dependent code
520  ret.num_cpu_hw = std::thread::hardware_concurrency();
521  // ^ This might return diffrent results in case of hyper threading
522  // end hardware/OS dependent code
523 
524  _return.hardware_info.push_back(ret);
525  if (leaf_aggregator_.leafCount() > 0) {
526  ret.host_name = "aggregator";
527  TClusterHardwareInfo leaf_hardware = leaf_aggregator_.getHardwareInfo(session);
528  _return.hardware_info.insert(_return.hardware_info.end(),
529  leaf_hardware.hardware_info.begin(),
530  leaf_hardware.hardware_info.end());
531  }
532 }
533 
534 void MapDHandler::get_session_info(TSessionInfo& _return, const TSessionId& session) {
535  LOG_SESSION(session);
536  auto session_info = get_session_copy(session);
537  auto user_metadata = session_info.get_currentUser();
538 
539  _return.user = user_metadata.userName;
540  _return.database = session_info.getCatalog().getCurrentDB().dbName;
541  _return.start_time = session_info.get_start_time();
542  _return.is_super = user_metadata.isSuper;
543 }
544 
546  const SQLTypeInfo& ti,
547  TColumn& column) {
548  if (ti.is_array()) {
549  TColumn tColumn;
550  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
551  CHECK(array_tv);
552  bool is_null = !array_tv->is_initialized();
553  if (!is_null) {
554  const auto& vec = array_tv->get();
555  for (const auto& elem_tv : vec) {
556  value_to_thrift_column(elem_tv, ti.get_elem_type(), tColumn);
557  }
558  }
559  column.data.arr_col.push_back(tColumn);
560  column.nulls.push_back(is_null && !ti.get_notnull());
561  } else if (ti.is_geometry()) {
562  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
563  if (scalar_tv) {
564  auto s_n = boost::get<NullableString>(scalar_tv);
565  auto s = boost::get<std::string>(s_n);
566  if (s) {
567  column.data.str_col.push_back(*s);
568  } else {
569  column.data.str_col.emplace_back(""); // null string
570  auto null_p = boost::get<void*>(s_n);
571  CHECK(null_p && !*null_p);
572  }
573  column.nulls.push_back(!s && !ti.get_notnull());
574  } else {
575  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
576  CHECK(array_tv);
577  bool is_null = !array_tv->is_initialized();
578  if (!is_null) {
579  auto elem_type = SQLTypeInfo(kDOUBLE, false);
580  TColumn tColumn;
581  const auto& vec = array_tv->get();
582  for (const auto& elem_tv : vec) {
583  value_to_thrift_column(elem_tv, elem_type, tColumn);
584  }
585  column.data.arr_col.push_back(tColumn);
586  column.nulls.push_back(false);
587  } else {
588  TColumn tColumn;
589  column.data.arr_col.push_back(tColumn);
590  column.nulls.push_back(is_null && !ti.get_notnull());
591  }
592  }
593  } else {
594  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
595  CHECK(scalar_tv);
596  if (boost::get<int64_t>(scalar_tv)) {
597  int64_t data = *(boost::get<int64_t>(scalar_tv));
598 
599  if (is_member_of_typeset<kNUMERIC, kDECIMAL>(ti)) {
600  double val = static_cast<double>(data);
601  if (ti.get_scale() > 0) {
602  val /= pow(10.0, std::abs(ti.get_scale()));
603  }
604  column.data.real_col.push_back(val);
605  } else {
606  column.data.int_col.push_back(data);
607  }
608 
609  switch (ti.get_type()) {
610  case kBOOLEAN:
611  column.nulls.push_back(data == NULL_BOOLEAN && !ti.get_notnull());
612  break;
613  case kTINYINT:
614  column.nulls.push_back(data == NULL_TINYINT && !ti.get_notnull());
615  break;
616  case kSMALLINT:
617  column.nulls.push_back(data == NULL_SMALLINT && !ti.get_notnull());
618  break;
619  case kINT:
620  column.nulls.push_back(data == NULL_INT && !ti.get_notnull());
621  break;
622  case kNUMERIC:
623  case kDECIMAL:
624  case kBIGINT:
625  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
626  break;
627  case kTIME:
628  case kTIMESTAMP:
629  case kDATE:
630  case kINTERVAL_DAY_TIME:
632  column.nulls.push_back(data == NULL_BIGINT && !ti.get_notnull());
633  break;
634  default:
635  column.nulls.push_back(false);
636  }
637  } else if (boost::get<double>(scalar_tv)) {
638  double data = *(boost::get<double>(scalar_tv));
639  column.data.real_col.push_back(data);
640  if (ti.get_type() == kFLOAT) {
641  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
642  } else {
643  column.nulls.push_back(data == NULL_DOUBLE && !ti.get_notnull());
644  }
645  } else if (boost::get<float>(scalar_tv)) {
646  CHECK_EQ(kFLOAT, ti.get_type());
647  float data = *(boost::get<float>(scalar_tv));
648  column.data.real_col.push_back(data);
649  column.nulls.push_back(data == NULL_FLOAT && !ti.get_notnull());
650  } else if (boost::get<NullableString>(scalar_tv)) {
651  auto s_n = boost::get<NullableString>(scalar_tv);
652  auto s = boost::get<std::string>(s_n);
653  if (s) {
654  column.data.str_col.push_back(*s);
655  } else {
656  column.data.str_col.emplace_back(""); // null string
657  auto null_p = boost::get<void*>(s_n);
658  CHECK(null_p && !*null_p);
659  }
660  column.nulls.push_back(!s && !ti.get_notnull());
661  } else {
662  CHECK(false);
663  }
664  }
665 }
666 
668  TDatum datum;
669  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
670  if (!scalar_tv) {
671  CHECK(ti.is_array());
672  const auto array_tv = boost::get<ArrayTargetValue>(&tv);
673  CHECK(array_tv);
674  if (array_tv->is_initialized()) {
675  const auto& vec = array_tv->get();
676  for (const auto& elem_tv : vec) {
677  const auto scalar_col_val = value_to_thrift(elem_tv, ti.get_elem_type());
678  datum.val.arr_val.push_back(scalar_col_val);
679  }
680  // Datum is not null, at worst it's an empty array Datum
681  datum.is_null = false;
682  } else {
683  datum.is_null = true;
684  }
685  return datum;
686  }
687  if (boost::get<int64_t>(scalar_tv)) {
688  int64_t data = *(boost::get<int64_t>(scalar_tv));
689 
690  if (is_member_of_typeset<kNUMERIC, kDECIMAL>(ti)) {
691  double val = static_cast<double>(data);
692  if (ti.get_scale() > 0) {
693  val /= pow(10.0, std::abs(ti.get_scale()));
694  }
695  datum.val.real_val = val;
696  } else {
697  datum.val.int_val = data;
698  }
699 
700  switch (ti.get_type()) {
701  case kBOOLEAN:
702  datum.is_null = (datum.val.int_val == NULL_BOOLEAN);
703  break;
704  case kTINYINT:
705  datum.is_null = (datum.val.int_val == NULL_TINYINT);
706  break;
707  case kSMALLINT:
708  datum.is_null = (datum.val.int_val == NULL_SMALLINT);
709  break;
710  case kINT:
711  datum.is_null = (datum.val.int_val == NULL_INT);
712  break;
713  case kDECIMAL:
714  case kNUMERIC:
715  case kBIGINT:
716  datum.is_null = (datum.val.int_val == NULL_BIGINT);
717  break;
718  case kTIME:
719  case kTIMESTAMP:
720  case kDATE:
721  case kINTERVAL_DAY_TIME:
723  datum.is_null = (datum.val.int_val == NULL_BIGINT);
724  break;
725  default:
726  datum.is_null = false;
727  }
728  } else if (boost::get<double>(scalar_tv)) {
729  datum.val.real_val = *(boost::get<double>(scalar_tv));
730  if (ti.get_type() == kFLOAT) {
731  datum.is_null = (datum.val.real_val == NULL_FLOAT);
732  } else {
733  datum.is_null = (datum.val.real_val == NULL_DOUBLE);
734  }
735  } else if (boost::get<float>(scalar_tv)) {
736  CHECK_EQ(kFLOAT, ti.get_type());
737  datum.val.real_val = *(boost::get<float>(scalar_tv));
738  datum.is_null = (datum.val.real_val == NULL_FLOAT);
739  } else if (boost::get<NullableString>(scalar_tv)) {
740  auto s_n = boost::get<NullableString>(scalar_tv);
741  auto s = boost::get<std::string>(s_n);
742  if (s) {
743  datum.val.str_val = *s;
744  } else {
745  auto null_p = boost::get<void*>(s_n);
746  CHECK(null_p && !*null_p);
747  }
748  datum.is_null = !s;
749  } else {
750  CHECK(false);
751  }
752  return datum;
753 }
754 
756  const TSessionId& session,
757  const std::string& query_str,
758  const bool column_format,
759  const std::string& nonce,
760  const int32_t first_n,
761  const int32_t at_most_n) {
762  LOG_SESSION(session, "query_str", hide_sensitive_data_from_query(query_str));
763  ScopeGuard reset_was_geo_copy_from = [&] { _was_geo_copy_from = false; };
764  if (first_n >= 0 && at_most_n >= 0) {
765  THROW_MAPD_EXCEPTION(std::string("At most one of first_n and at_most_n can be set"));
766  }
767  const auto session_info = get_session_copy(session);
768  if (leaf_aggregator_.leafCount() > 0) {
769  if (!agg_handler_) {
770  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
771  }
772  _return.total_time_ms = measure<>::execution([&]() {
773  try {
774  agg_handler_->cluster_execute(_return,
775  session_info,
776  query_str,
777  column_format,
778  nonce,
779  first_n,
780  at_most_n,
782  } catch (std::exception& e) {
783  const auto mapd_exception = dynamic_cast<const TMapDException*>(&e);
784  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
786  thrift_exception ? std::string(thrift_exception->what())
787  : mapd_exception ? mapd_exception->error_msg
788  : std::string("Exception: ") + e.what());
789  }
790  _return.nonce = nonce;
791  });
792  } else {
793  _return.total_time_ms = measure<>::execution([&]() {
795  session_info,
796  query_str,
797  column_format,
798  nonce,
799  session_info.get_executor_device_type(),
800  first_n,
801  at_most_n);
802  });
803  }
804 
805  // if the SQL statement we just executed was a geo COPY FROM, the import
806  // parameters were captured, and this flag set, so we do the actual import here
807  if (_was_geo_copy_from) {
808  // import_geo_table() calls create_table() which calls this function to
809  // do the work, so reset the flag now to avoid executing this part a
810  // second time at the end of that, which would fail as the table was
811  // already created! Also reset the flag with a ScopeGuard on exiting
812  // this function any other way, such as an exception from the code above!
813  _was_geo_copy_from = false;
814 
815  // create table as replicated?
816  TCreateParams create_params;
817  if (_geo_copy_from_partitions == "REPLICATED") {
818  create_params.is_replicated = true;
819  }
820 
821  // now do (and time) the import
822  _return.total_time_ms = measure<>::execution([&]() {
823  import_geo_table(session,
827  TRowDescriptor(),
828  create_params);
829  });
830  }
831  log_session.append_name_value_pairs(
832  "execution_time_ms",
833  _return.execution_time_ms,
834  "total_time_ms", // BE-3420 - Redundant with duration field
835  log_session.duration<std::chrono::milliseconds>());
836 }
837 
838 void MapDHandler::sql_execute_df(TDataFrame& _return,
839  const TSessionId& session,
840  const std::string& query_str,
841  const TDeviceType::type device_type,
842  const int32_t device_id,
843  const int32_t first_n) {
844  LOG_SESSION(session, "query_str", hide_sensitive_data_from_query(query_str));
845  const auto session_info = get_session_copy(session);
846  if (device_type == TDeviceType::GPU) {
847  const auto executor_device_type = session_info.get_executor_device_type();
848  if (executor_device_type != ExecutorDeviceType::GPU) {
850  std::string("Exception: GPU mode is not allowed in this session"));
851  }
852  if (!data_mgr_->gpusPresent()) {
853  THROW_MAPD_EXCEPTION(std::string("Exception: no GPU is available in this server"));
854  }
855  if (device_id < 0 || device_id >= data_mgr_->getCudaMgr()->getDeviceCount()) {
857  std::string("Exception: invalid device_id or unavailable GPU with this ID"));
858  }
859  }
860  SQLParser parser;
861  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
862  std::string last_parsed;
863  try {
864  ParserWrapper pw{query_str};
865  if (!pw.is_ddl && !pw.is_update_dml &&
866  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
867  std::string query_ra;
868  TableMap table_map;
869  OptionalTableMap tableNames(table_map);
870  query_ra = parse_to_ra(query_str, {}, session_info, tableNames, mapd_parameters_);
871 
872  // COPY_TO/SELECT: get read ExecutorOuterLock >> TableReadLock for each table (note
873  // for update/delete this would be a table write lock)
874  mapd_shared_lock<mapd_shared_mutex> executeReadLock(
876  std::vector<Lock_Namespace::TableLock> table_locks;
878  session_info.getCatalog(), tableNames.value(), table_locks);
879 
880  if (pw.isCalciteExplain()) {
881  throw std::runtime_error("explain is not unsupported by current thrift API");
882  }
883  execute_rel_alg_df(_return,
884  query_ra,
885  session_info,
888  static_cast<size_t>(device_id),
889  first_n);
890  if (!_return.sm_size) {
891  throw std::runtime_error("schema is missing in returned result");
892  }
893  return;
894  }
895  LOG(INFO) << "passing query to legacy processor";
896  } catch (std::exception& e) {
897  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
898  }
900  "Exception: DDL or update DML are not unsupported by current thrift API");
901 }
902 
903 void MapDHandler::sql_execute_gdf(TDataFrame& _return,
904  const TSessionId& session,
905  const std::string& query_str,
906  const int32_t device_id,
907  const int32_t first_n) {
908  LOG_SESSION(session);
909  sql_execute_df(_return, session, query_str, TDeviceType::GPU, device_id, first_n);
910 }
911 
912 // For now we have only one user of a data frame in all cases.
913 void MapDHandler::deallocate_df(const TSessionId& session,
914  const TDataFrame& df,
915  const TDeviceType::type device_type,
916  const int32_t device_id) {
917  LOG_SESSION(session);
918  const auto session_info = get_session_copy(session);
919  int8_t* dev_ptr{0};
920  if (device_type == TDeviceType::GPU) {
921  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
922  if (ipc_handle_to_dev_ptr_.count(df.df_handle) != size_t(1)) {
923  TMapDException ex;
924  ex.error_msg = std::string(
925  "Exception: current data frame handle is not bookkept or been inserted twice");
926  LOG(ERROR) << ex.error_msg;
927  throw ex;
928  }
929  dev_ptr = ipc_handle_to_dev_ptr_[df.df_handle];
930  ipc_handle_to_dev_ptr_.erase(df.df_handle);
931  }
932  std::vector<char> sm_handle(df.sm_handle.begin(), df.sm_handle.end());
933  std::vector<char> df_handle(df.df_handle.begin(), df.df_handle.end());
934  ArrowResult result{sm_handle, df.sm_size, df_handle, df.df_size, dev_ptr};
936  result,
938  device_id,
939  data_mgr_);
940 }
941 
942 std::string MapDHandler::apply_copy_to_shim(const std::string& query_str) {
943  auto result = query_str;
944  {
945  // boost::regex copy_to{R"(COPY\s\((.*)\)\sTO\s(.*))", boost::regex::extended |
946  // boost::regex::icase};
947  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
948  boost::regex::extended | boost::regex::icase};
949  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
950  result.replace(
951  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
952  });
953  }
954  return result;
955 }
956 
957 void MapDHandler::sql_validate(TTableDescriptor& _return,
958  const TSessionId& session,
959  const std::string& query_str) {
960  LOG_SESSION(session);
961  std::unique_ptr<const Planner::RootPlan> root_plan;
962  const auto session_info = get_session_copy(session);
963  ParserWrapper pw{query_str};
964  if ((pw.getExplainType() != ParserWrapper::ExplainType::None) || pw.is_ddl ||
965  pw.is_update_dml) {
966  THROW_MAPD_EXCEPTION("Can only validate SELECT statements.");
967  }
968  MapDHandler::validate_rel_alg(_return, query_str, session_info);
969 }
970 
971 namespace {
972 
974  std::unordered_set<std::string> uc_column_names;
975  std::unordered_set<std::string> uc_column_table_qualifiers;
976 };
977 
978 // Extract what looks like a (qualified) identifier from the partial query.
979 // The results will be used to rank the auto-completion results: tables which
980 // contain at least one of the identifiers first.
982  const std::string& sql) {
983  boost::regex id_regex{R"(([[:alnum:]]|_|\.)+)",
984  boost::regex::extended | boost::regex::icase};
985  boost::sregex_token_iterator tok_it(sql.begin(), sql.end(), id_regex, 0);
986  boost::sregex_token_iterator end;
987  std::unordered_set<std::string> uc_column_names;
988  std::unordered_set<std::string> uc_column_table_qualifiers;
989  for (; tok_it != end; ++tok_it) {
990  std::string column_name = *tok_it;
991  std::vector<std::string> column_tokens;
992  boost::split(column_tokens, column_name, boost::is_any_of("."));
993  if (column_tokens.size() == 2) {
994  // If the column name is qualified, take user's word.
995  uc_column_table_qualifiers.insert(to_upper(column_tokens.front()));
996  } else {
997  uc_column_names.insert(to_upper(column_name));
998  }
999  }
1000  return {uc_column_names, uc_column_table_qualifiers};
1001 }
1002 
1003 } // namespace
1004 
1005 void MapDHandler::get_completion_hints(std::vector<TCompletionHint>& hints,
1006  const TSessionId& session,
1007  const std::string& sql,
1008  const int cursor) {
1009  LOG_SESSION(session);
1010  std::vector<std::string> visible_tables; // Tables allowed for the given session.
1011  get_completion_hints_unsorted(hints, visible_tables, session, sql, cursor);
1012  const auto proj_tokens = extract_projection_tokens_for_completion(sql);
1013  auto compatible_table_names = get_uc_compatible_table_names_by_column(
1014  proj_tokens.uc_column_names, visible_tables, session);
1015  // Add the table qualifiers explicitly specified by the user.
1016  compatible_table_names.insert(proj_tokens.uc_column_table_qualifiers.begin(),
1017  proj_tokens.uc_column_table_qualifiers.end());
1018  // Sort the hints by category, from COLUMN (most specific) to KEYWORD.
1019  std::sort(
1020  hints.begin(),
1021  hints.end(),
1022  [&compatible_table_names](const TCompletionHint& lhs, const TCompletionHint& rhs) {
1023  if (lhs.type == TCompletionHintType::TABLE &&
1024  rhs.type == TCompletionHintType::TABLE) {
1025  // Between two tables, one which is compatible with the specified projections
1026  // and one which isn't, pick the one which is compatible.
1027  if (compatible_table_names.find(to_upper(lhs.hints.back())) !=
1028  compatible_table_names.end() &&
1029  compatible_table_names.find(to_upper(rhs.hints.back())) ==
1030  compatible_table_names.end()) {
1031  return true;
1032  }
1033  }
1034  return lhs.type < rhs.type;
1035  });
1036 }
1037 
1038 void MapDHandler::get_completion_hints_unsorted(std::vector<TCompletionHint>& hints,
1039  std::vector<std::string>& visible_tables,
1040  const TSessionId& session,
1041  const std::string& sql,
1042  const int cursor) {
1043  const auto session_info = get_session_copy(session);
1044  try {
1045  get_tables(visible_tables, session);
1046  // Filter out keywords suggested by Calcite which we don't support.
1048  calcite_->getCompletionHints(session_info, visible_tables, sql, cursor));
1049  } catch (const std::exception& e) {
1050  TMapDException ex;
1051  ex.error_msg = "Exception: " + std::string(e.what());
1052  LOG(ERROR) << ex.error_msg;
1053  throw ex;
1054  }
1055  boost::regex from_expr{R"(\s+from\s+)", boost::regex::extended | boost::regex::icase};
1056  const size_t length_to_cursor =
1057  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1058  // Trust hints from Calcite after the FROM keyword.
1059  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, from_expr)) {
1060  return;
1061  }
1062  // Before FROM, the query is too incomplete for context-sensitive completions.
1063  get_token_based_completions(hints, session, visible_tables, sql, cursor);
1064 }
1065 
1067  std::vector<TCompletionHint>& hints,
1068  const TSessionId& session,
1069  const std::vector<std::string>& visible_tables,
1070  const std::string& sql,
1071  const int cursor) {
1072  const auto last_word =
1073  find_last_word_from_cursor(sql, cursor < 0 ? sql.size() : cursor);
1074  boost::regex select_expr{R"(\s*select\s+)",
1075  boost::regex::extended | boost::regex::icase};
1076  const size_t length_to_cursor =
1077  cursor < 0 ? sql.size() : std::min(sql.size(), static_cast<size_t>(cursor));
1078  // After SELECT but before FROM, look for all columns in all tables which match the
1079  // prefix.
1080  if (boost::regex_search(sql.cbegin(), sql.cbegin() + length_to_cursor, select_expr)) {
1081  const auto column_names_by_table =
1082  fill_column_names_by_table(visible_tables, session);
1083  // Trust the fully qualified columns the most.
1084  if (get_qualified_column_hints(hints, last_word, column_names_by_table)) {
1085  return;
1086  }
1087  // Not much information to use, just retrieve column names which match the prefix.
1088  if (should_suggest_column_hints(sql)) {
1089  get_column_hints(hints, last_word, column_names_by_table);
1090  return;
1091  }
1092  const std::string kFromKeyword{"FROM"};
1093  if (boost::istarts_with(kFromKeyword, last_word)) {
1094  TCompletionHint keyword_hint;
1095  keyword_hint.type = TCompletionHintType::KEYWORD;
1096  keyword_hint.replaced = last_word;
1097  keyword_hint.hints.emplace_back(kFromKeyword);
1098  hints.push_back(keyword_hint);
1099  }
1100  } else {
1101  const std::string kSelectKeyword{"SELECT"};
1102  if (boost::istarts_with(kSelectKeyword, last_word)) {
1103  TCompletionHint keyword_hint;
1104  keyword_hint.type = TCompletionHintType::KEYWORD;
1105  keyword_hint.replaced = last_word;
1106  keyword_hint.hints.emplace_back(kSelectKeyword);
1107  hints.push_back(keyword_hint);
1108  }
1109  }
1110 }
1111 
1112 std::unordered_map<std::string, std::unordered_set<std::string>>
1113 MapDHandler::fill_column_names_by_table(const std::vector<std::string>& table_names,
1114  const TSessionId& session) {
1115  std::unordered_map<std::string, std::unordered_set<std::string>> column_names_by_table;
1116  for (const auto& table_name : table_names) {
1117  TTableDetails table_details;
1118  get_table_details(table_details, session, table_name);
1119  for (const auto& column_type : table_details.row_desc) {
1120  column_names_by_table[table_name].emplace(column_type.col_name);
1121  }
1122  }
1123  return column_names_by_table;
1124 }
1125 
1127  const std::unordered_set<std::string>& uc_column_names,
1128  const std::vector<std::string>& table_names,
1129  const TSessionId& session) {
1130  std::unordered_set<std::string> compatible_table_names_by_column;
1131  for (const auto& table_name : table_names) {
1132  TTableDetails table_details;
1133  get_table_details(table_details, session, table_name);
1134  for (const auto& column_type : table_details.row_desc) {
1135  if (uc_column_names.find(to_upper(column_type.col_name)) != uc_column_names.end()) {
1136  compatible_table_names_by_column.emplace(to_upper(table_name));
1137  break;
1138  }
1139  }
1140  }
1141  return compatible_table_names_by_column;
1142 }
1143 
1144 void MapDHandler::validate_rel_alg(TTableDescriptor& _return,
1145  const std::string& query_str,
1146  const Catalog_Namespace::SessionInfo& session_info) {
1147  try {
1148  const auto query_ra =
1149  parse_to_ra(query_str, {}, session_info, boost::none, mapd_parameters_);
1152  query_ra,
1153  true,
1154  session_info,
1156  -1,
1157  -1,
1158  false,
1159  true,
1160  false,
1161  false,
1162  false);
1163  const auto& row_desc =
1164  fixup_row_descriptor(result.row_set.row_desc, session_info.getCatalog());
1165  for (const auto& col_desc : row_desc) {
1166  const auto it_ok = _return.insert(std::make_pair(col_desc.col_name, col_desc));
1167  CHECK(it_ok.second);
1168  }
1169  } catch (std::exception& e) {
1170  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1171  }
1172 }
1173 
1174 void MapDHandler::get_roles(std::vector<std::string>& roles, const TSessionId& session) {
1175  LOG_SESSION(session);
1176  auto session_info = get_session_copy(session);
1177  if (!session_info.get_currentUser().isSuper) {
1178  // WARNING: This appears to not include roles a user is a member of,
1179  // if the role has no permissions granted to it.
1180  roles =
1181  SysCatalog::instance().getRoles(session_info.get_currentUser().userName,
1182  session_info.getCatalog().getCurrentDB().dbId);
1183  } else {
1184  roles = SysCatalog::instance().getRoles(
1185  false, true, session_info.get_currentUser().userName);
1186  }
1187 }
1188 
1189 static TDBObject serialize_db_object(const std::string& roleName,
1190  const DBObject& inObject) {
1191  TDBObject outObject;
1192  outObject.objectName = inObject.getName();
1193  outObject.grantee = roleName;
1194  const auto ap = inObject.getPrivileges();
1195  switch (inObject.getObjectKey().permissionType) {
1196  case DatabaseDBObjectType:
1197  outObject.objectType = TDBObjectType::DatabaseDBObjectType;
1198  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::CREATE_DATABASE));
1199  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::DROP_DATABASE));
1200  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::VIEW_SQL_EDITOR));
1201  outObject.privs.push_back(ap.hasPermission(DatabasePrivileges::ACCESS));
1202 
1203  break;
1204  case TableDBObjectType:
1205  outObject.objectType = TDBObjectType::TableDBObjectType;
1206  outObject.privs.push_back(ap.hasPermission(TablePrivileges::CREATE_TABLE));
1207  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DROP_TABLE));
1208  outObject.privs.push_back(ap.hasPermission(TablePrivileges::SELECT_FROM_TABLE));
1209  outObject.privs.push_back(ap.hasPermission(TablePrivileges::INSERT_INTO_TABLE));
1210  outObject.privs.push_back(ap.hasPermission(TablePrivileges::UPDATE_IN_TABLE));
1211  outObject.privs.push_back(ap.hasPermission(TablePrivileges::DELETE_FROM_TABLE));
1212  outObject.privs.push_back(ap.hasPermission(TablePrivileges::TRUNCATE_TABLE));
1213  outObject.privs.push_back(ap.hasPermission(TablePrivileges::ALTER_TABLE));
1214 
1215  break;
1216  case DashboardDBObjectType:
1217  outObject.objectType = TDBObjectType::DashboardDBObjectType;
1218  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::CREATE_DASHBOARD));
1219  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::DELETE_DASHBOARD));
1220  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::VIEW_DASHBOARD));
1221  outObject.privs.push_back(ap.hasPermission(DashboardPrivileges::EDIT_DASHBOARD));
1222 
1223  break;
1224  case ViewDBObjectType:
1225  outObject.objectType = TDBObjectType::ViewDBObjectType;
1226  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::CREATE_VIEW));
1227  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DROP_VIEW));
1228  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::SELECT_FROM_VIEW));
1229  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::INSERT_INTO_VIEW));
1230  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::UPDATE_IN_VIEW));
1231  outObject.privs.push_back(ap.hasPermission(ViewPrivileges::DELETE_FROM_VIEW));
1232 
1233  break;
1234  default:
1235  CHECK(false);
1236  }
1237  return outObject;
1238 }
1239 
1241  const TDBObjectPermissions& permissions) {
1242  if (!permissions.__isset.database_permissions_) {
1243  THROW_MAPD_EXCEPTION("Database permissions not set for check.")
1244  }
1245  auto perms = permissions.database_permissions_;
1246  if ((perms.create_ && !privs.hasPermission(DatabasePrivileges::CREATE_DATABASE)) ||
1247  (perms.delete_ && !privs.hasPermission(DatabasePrivileges::DROP_DATABASE)) ||
1248  (perms.view_sql_editor_ &&
1250  (perms.access_ && !privs.hasPermission(DatabasePrivileges::ACCESS))) {
1251  return false;
1252  } else {
1253  return true;
1254  }
1255 }
1256 
1258  const TDBObjectPermissions& permissions) {
1259  if (!permissions.__isset.table_permissions_) {
1260  THROW_MAPD_EXCEPTION("Table permissions not set for check.")
1261  }
1262  auto perms = permissions.table_permissions_;
1263  if ((perms.create_ && !privs.hasPermission(TablePrivileges::CREATE_TABLE)) ||
1264  (perms.drop_ && !privs.hasPermission(TablePrivileges::DROP_TABLE)) ||
1265  (perms.select_ && !privs.hasPermission(TablePrivileges::SELECT_FROM_TABLE)) ||
1266  (perms.insert_ && !privs.hasPermission(TablePrivileges::INSERT_INTO_TABLE)) ||
1267  (perms.update_ && !privs.hasPermission(TablePrivileges::UPDATE_IN_TABLE)) ||
1268  (perms.delete_ && !privs.hasPermission(TablePrivileges::DELETE_FROM_TABLE)) ||
1269  (perms.truncate_ && !privs.hasPermission(TablePrivileges::TRUNCATE_TABLE)) ||
1270  (perms.alter_ && !privs.hasPermission(TablePrivileges::ALTER_TABLE))) {
1271  return false;
1272  } else {
1273  return true;
1274  }
1275 }
1276 
1278  const TDBObjectPermissions& permissions) {
1279  if (!permissions.__isset.dashboard_permissions_) {
1280  THROW_MAPD_EXCEPTION("Dashboard permissions not set for check.")
1281  }
1282  auto perms = permissions.dashboard_permissions_;
1283  if ((perms.create_ && !privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD)) ||
1284  (perms.delete_ && !privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD)) ||
1285  (perms.view_ && !privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD)) ||
1286  (perms.edit_ && !privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD))) {
1287  return false;
1288  } else {
1289  return true;
1290  }
1291 }
1292 
1294  const TDBObjectPermissions& permissions) {
1295  if (!permissions.__isset.view_permissions_) {
1296  THROW_MAPD_EXCEPTION("View permissions not set for check.")
1297  }
1298  auto perms = permissions.view_permissions_;
1299  if ((perms.create_ && !privs.hasPermission(ViewPrivileges::CREATE_VIEW)) ||
1300  (perms.drop_ && !privs.hasPermission(ViewPrivileges::DROP_VIEW)) ||
1301  (perms.select_ && !privs.hasPermission(ViewPrivileges::SELECT_FROM_VIEW)) ||
1302  (perms.insert_ && !privs.hasPermission(ViewPrivileges::INSERT_INTO_VIEW)) ||
1303  (perms.update_ && !privs.hasPermission(ViewPrivileges::UPDATE_IN_VIEW)) ||
1304  (perms.delete_ && !privs.hasPermission(ViewPrivileges::DELETE_FROM_VIEW))) {
1305  return false;
1306  } else {
1307  return true;
1308  }
1309 }
1310 
1311 bool MapDHandler::has_object_privilege(const TSessionId& sessionId,
1312  const std::string& granteeName,
1313  const std::string& objectName,
1314  const TDBObjectType::type objectType,
1315  const TDBObjectPermissions& permissions) {
1316  LOG_SESSION(sessionId);
1317  auto session = get_session_copy(sessionId);
1318  auto& cat = session.getCatalog();
1319  auto current_user = session.get_currentUser();
1320  if (!current_user.isSuper && !current_user.isReallySuper &&
1321  !SysCatalog::instance().isRoleGrantedToGrantee(
1322  current_user.userName, granteeName, false)) {
1324  "Users except superusers can only check privileges for self or roles granted to "
1325  "them.")
1326  }
1328  if (SysCatalog::instance().getMetadataForUser(granteeName, user_meta) &&
1329  (user_meta.isSuper || user_meta.isReallySuper)) {
1330  return true;
1331  }
1332  Grantee* grnt = SysCatalog::instance().getGrantee(granteeName);
1333  if (!grnt) {
1334  THROW_MAPD_EXCEPTION("User or Role " + granteeName + " does not exist.")
1335  }
1337  std::string func_name;
1338  switch (objectType) {
1341  func_name = "database";
1342  break;
1345  func_name = "table";
1346  break;
1349  func_name = "dashboard";
1350  break;
1353  func_name = "view";
1354  break;
1355  default:
1356  THROW_MAPD_EXCEPTION("Invalid object type (" + std::to_string(objectType) + ").");
1357  }
1358  DBObject req_object(objectName, type);
1359  req_object.loadKey(cat);
1360 
1361  auto grantee_object = grnt->findDbObject(req_object.getObjectKey(), false);
1362  if (grantee_object) {
1363  // if grantee has privs on the object
1364  return permissionFuncMap_[func_name](grantee_object->getPrivileges(), permissions);
1365  } else {
1366  // no privileges on that object
1367  return false;
1368  }
1369 }
1370 
1371 void MapDHandler::get_db_objects_for_grantee(std::vector<TDBObject>& TDBObjectsForRole,
1372  const TSessionId& sessionId,
1373  const std::string& roleName) {
1374  LOG_SESSION(sessionId);
1375  auto session = get_session_copy(sessionId);
1376  auto user = session.get_currentUser();
1377  if (!user.isSuper &&
1378  !SysCatalog::instance().isRoleGrantedToGrantee(user.userName, roleName, false)) {
1379  return;
1380  }
1381  auto* rl = SysCatalog::instance().getGrantee(roleName);
1382  if (rl) {
1383  auto dbId = session.getCatalog().getCurrentDB().dbId;
1384  for (auto& dbObject : *rl->getDbObjects(true)) {
1385  if (dbObject.first.dbId != dbId) {
1386  // TODO (max): it doesn't scale well in case we have many DBs (not a typical
1387  // usecase for now, though)
1388  continue;
1389  }
1390  TDBObject tdbObject = serialize_db_object(roleName, *dbObject.second);
1391  TDBObjectsForRole.push_back(tdbObject);
1392  }
1393  } else {
1394  THROW_MAPD_EXCEPTION("User or role " + roleName + " does not exist.");
1395  }
1396 }
1397 
1398 void MapDHandler::get_db_object_privs(std::vector<TDBObject>& TDBObjects,
1399  const TSessionId& sessionId,
1400  const std::string& objectName,
1401  const TDBObjectType::type type) {
1402  LOG_SESSION(sessionId);
1403  auto session = get_session_copy(sessionId);
1404  DBObjectType object_type;
1405  switch (type) {
1407  object_type = DBObjectType::DatabaseDBObjectType;
1408  break;
1410  object_type = DBObjectType::TableDBObjectType;
1411  break;
1414  break;
1416  object_type = DBObjectType::ViewDBObjectType;
1417  break;
1418  default:
1419  THROW_MAPD_EXCEPTION("Failed to get object privileges for " + objectName +
1420  ": unknown object type (" + std::to_string(type) + ").");
1421  }
1422  DBObject object_to_find(objectName, object_type);
1423 
1424  try {
1425  if (object_type == DashboardDBObjectType) {
1426  if (objectName == "") {
1427  object_to_find = DBObject(-1, object_type);
1428  } else {
1429  object_to_find = DBObject(std::stoi(objectName), object_type);
1430  }
1431  } else if ((object_type == TableDBObjectType || object_type == ViewDBObjectType) &&
1432  !objectName.empty()) {
1433  // special handling for view / table
1434  auto& cat = session.getCatalog();
1435  auto td = cat.getMetadataForTable(objectName, false);
1436  if (td) {
1437  object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1438  object_to_find = DBObject(objectName, object_type);
1439  }
1440  }
1441  object_to_find.loadKey(session.getCatalog());
1442  } catch (const std::exception&) {
1443  THROW_MAPD_EXCEPTION("Object with name " + objectName + " does not exist.");
1444  }
1445 
1446  // object type on database level
1447  DBObject object_to_find_dblevel("", object_type);
1448  object_to_find_dblevel.loadKey(session.getCatalog());
1449  // if user is superuser respond with a full priv
1450  if (session.get_currentUser().isSuper) {
1451  // using ALL_TABLE here to set max permissions
1452  DBObject dbObj{object_to_find.getObjectKey(),
1454  session.get_currentUser().userId};
1455  dbObj.setName("super");
1456  TDBObjects.push_back(serialize_db_object(session.get_currentUser().userName, dbObj));
1457  };
1458 
1459  std::vector<std::string> grantees = SysCatalog::instance().getRoles(
1460  true, session.get_currentUser().isSuper, session.get_currentUser().userName);
1461  for (const auto& grantee : grantees) {
1462  DBObject* object_found;
1463  auto* gr = SysCatalog::instance().getGrantee(grantee);
1464  if (gr && (object_found = gr->findDbObject(object_to_find.getObjectKey(), true))) {
1465  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1466  }
1467  // check object permissions on Database level
1468  if (gr &&
1469  (object_found = gr->findDbObject(object_to_find_dblevel.getObjectKey(), true))) {
1470  TDBObjects.push_back(serialize_db_object(grantee, *object_found));
1471  }
1472  }
1473 }
1474 
1475 void MapDHandler::get_all_roles_for_user(std::vector<std::string>& roles,
1476  const TSessionId& sessionId,
1477  const std::string& granteeName) {
1478  LOG_SESSION(sessionId);
1479  auto session = get_session_copy(sessionId);
1480  auto* grantee = SysCatalog::instance().getGrantee(granteeName);
1481  if (grantee) {
1482  if (session.get_currentUser().isSuper) {
1483  roles = grantee->getRoles();
1484  } else if (grantee->isUser()) {
1485  if (session.get_currentUser().userName == granteeName) {
1486  roles = grantee->getRoles();
1487  } else {
1489  "Only a superuser is authorized to request list of roles granted to another "
1490  "user.");
1491  }
1492  } else {
1493  CHECK(!grantee->isUser());
1494  // granteeName is actually a roleName here and we can check a role
1495  // only if it is granted to us
1496  if (SysCatalog::instance().isRoleGrantedToGrantee(
1497  session.get_currentUser().userName, granteeName, false)) {
1498  roles = grantee->getRoles();
1499  } else {
1500  THROW_MAPD_EXCEPTION("A user can check only roles granted to him.");
1501  }
1502  }
1503  } else {
1504  THROW_MAPD_EXCEPTION("Grantee " + granteeName + " does not exist.");
1505  }
1506 }
1507 
1509  TPixelTableRowResult& _return,
1510  const TSessionId& session,
1511  const int64_t widget_id,
1512  const TPixel& pixel,
1513  const std::map<std::string, std::vector<std::string>>& table_col_names,
1514  const bool column_format,
1515  const int32_t pixel_radius,
1516  const std::string& nonce) {
1517  LOG_SESSION(session,
1518  "widget_id",
1519  widget_id,
1520  "pixel.x",
1521  pixel.x,
1522  "pixel.y",
1523  pixel.y,
1524  "column_format",
1525  column_format,
1526  "pixel_radius",
1527  pixel_radius,
1528  "table_col_names",
1529  MapDRenderHandler::dump_table_col_names(table_col_names),
1530  "nonce",
1531  nonce);
1532  if (!render_handler_) {
1533  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
1534  }
1535 
1536  auto session_ptr = get_session_copy_ptr(session);
1537  try {
1538  render_handler_->get_result_row_for_pixel(_return,
1539  session_ptr,
1540  widget_id,
1541  pixel,
1542  table_col_names,
1543  column_format,
1544  pixel_radius,
1545  nonce);
1546  } catch (std::exception& e) {
1547  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
1548  }
1549 }
1550 
1551 namespace {
1552 
1553 inline void fixup_geo_column_descriptor(TColumnType& col_type,
1554  const SQLTypes subtype,
1555  const int output_srid) {
1556  col_type.col_type.precision = static_cast<int>(subtype);
1557  col_type.col_type.scale = output_srid;
1558 }
1559 
1560 } // namespace
1561 
1563  const ColumnDescriptor* cd) {
1564  TColumnType col_type;
1565  col_type.col_name = cd->columnName;
1566  col_type.src_name = cd->sourceName;
1567  col_type.col_id = cd->columnId;
1568  col_type.col_type.type = type_to_thrift(cd->columnType);
1569  col_type.col_type.encoding = encoding_to_thrift(cd->columnType);
1570  col_type.col_type.nullable = !cd->columnType.get_notnull();
1571  col_type.col_type.is_array = cd->columnType.get_type() == kARRAY;
1572  if (col_type.col_type.is_array || cd->columnType.get_type() == kDATE) {
1573  col_type.col_type.size = cd->columnType.get_size(); // only for arrays and dates
1574  }
1575  if (IS_GEO(cd->columnType.get_type())) {
1577  col_type, cd->columnType.get_subtype(), cd->columnType.get_output_srid());
1578  } else {
1579  col_type.col_type.precision = cd->columnType.get_precision();
1580  col_type.col_type.scale = cd->columnType.get_scale();
1581  }
1582  col_type.is_system = cd->isSystemCol;
1584  cat != nullptr) {
1585  // have to get the actual size of the encoding from the dictionary definition
1586  const int dict_id = cd->columnType.get_comp_param();
1587  if (!cat->getMetadataForDict(dict_id, false)) {
1588  col_type.col_type.comp_param = 0;
1589  return col_type;
1590  }
1591  auto dd = cat->getMetadataForDict(dict_id, false);
1592  if (!dd) {
1593  THROW_MAPD_EXCEPTION("Dictionary doesn't exist");
1594  }
1595  col_type.col_type.comp_param = dd->dictNBits;
1596  } else {
1597  col_type.col_type.comp_param =
1598  (cd->columnType.is_date_in_days() && cd->columnType.get_comp_param() == 0)
1599  ? 32
1600  : cd->columnType.get_comp_param();
1601  }
1602  col_type.is_reserved_keyword = ImportHelpers::is_reserved_name(col_type.col_name);
1603  return col_type;
1604 }
1605 
1606 void MapDHandler::get_internal_table_details(TTableDetails& _return,
1607  const TSessionId& session,
1608  const std::string& table_name) {
1609  LOG_SESSION(session, "table_name", table_name);
1610  get_table_details_impl(_return, session, table_name, true, false);
1611 }
1612 
1613 void MapDHandler::get_table_details(TTableDetails& _return,
1614  const TSessionId& session,
1615  const std::string& table_name) {
1616  LOG_SESSION(session, "table_name", table_name);
1617  get_table_details_impl(_return, session, table_name, false, false);
1618 }
1619 
1620 void MapDHandler::get_table_details_impl(TTableDetails& _return,
1621  const TSessionId& session,
1622  const std::string& table_name,
1623  const bool get_system,
1624  const bool get_physical) {
1625  auto session_info = get_session_copy(session);
1626  auto& cat = session_info.getCatalog();
1627  auto td = cat.getMetadataForTable(
1628  table_name,
1629  false); // don't populate fragmenter on this call since we only want metadata
1630  if (!td) {
1631  THROW_MAPD_EXCEPTION("Table " + table_name + " doesn't exist");
1632  }
1633  if (td->isView) {
1634  try {
1635  if (hasTableAccessPrivileges(td, session)) {
1636  session_info.make_superuser();
1637  const auto query_ra =
1638  parse_to_ra(td->viewSQL, {}, session_info, boost::none, mapd_parameters_);
1640  execute_rel_alg(result,
1641  query_ra,
1642  true,
1643  session_info,
1645  -1,
1646  -1,
1647  false,
1648  true,
1649  false,
1650  false,
1651  false);
1652  _return.row_desc = fixup_row_descriptor(result.row_set.row_desc, cat);
1653  } else {
1654  THROW_MAPD_EXCEPTION("User has no access privileges to table " + table_name);
1655  }
1656  } catch (std::exception& e) {
1657  TColumnType tColumnType;
1658  tColumnType.col_name = "BROKEN_VIEW_PLEASE_FIX";
1659  _return.row_desc.push_back(tColumnType);
1660  }
1661  } else {
1662  try {
1663  if (hasTableAccessPrivileges(td, session)) {
1664  const auto col_descriptors =
1665  cat.getAllColumnMetadataForTable(td->tableId, get_system, true, get_physical);
1666  const auto deleted_cd = cat.getDeletedColumn(td);
1667  for (const auto cd : col_descriptors) {
1668  if (cd == deleted_cd) {
1669  continue;
1670  }
1671  _return.row_desc.push_back(populateThriftColumnType(&cat, cd));
1672  }
1673  } else {
1674  THROW_MAPD_EXCEPTION("User has no access privileges to table " + table_name);
1675  }
1676  } catch (const std::runtime_error& e) {
1677  THROW_MAPD_EXCEPTION(e.what());
1678  }
1679  }
1680  _return.fragment_size = td->maxFragRows;
1681  _return.page_size = td->fragPageSize;
1682  _return.max_rows = td->maxRows;
1683  _return.view_sql = td->viewSQL;
1684  _return.shard_count = td->nShards;
1685  _return.key_metainfo = td->keyMetainfo;
1686  _return.is_temporary = td->persistenceLevel == Data_Namespace::MemoryLevel::CPU_LEVEL;
1687  _return.partition_detail =
1688  td->partitions.empty()
1689  ? TPartitionDetail::DEFAULT
1690  : (table_is_replicated(td)
1691  ? TPartitionDetail::REPLICATED
1692  : (td->partitions == "SHARDED" ? TPartitionDetail::SHARDED
1693  : TPartitionDetail::OTHER));
1694 }
1695 
1696 void MapDHandler::get_link_view(TFrontendView& _return,
1697  const TSessionId& session,
1698  const std::string& link) {
1699  LOG_SESSION(session);
1700  const auto session_info = get_session_copy(session);
1701  auto& cat = session_info.getCatalog();
1702  auto ld = cat.getMetadataForLink(std::to_string(cat.getCurrentDB().dbId) + link);
1703  if (!ld) {
1704  THROW_MAPD_EXCEPTION("Link " + link + " is not valid.");
1705  }
1706  _return.view_state = ld->viewState;
1707  _return.view_name = ld->link;
1708  _return.update_time = ld->updateTime;
1709  _return.view_metadata = ld->viewMetadata;
1710 }
1711 
1713  const TSessionId& session) {
1714  const auto session_info = get_session_copy(session);
1715  auto& cat = session_info.getCatalog();
1716  auto user_metadata = session_info.get_currentUser();
1717 
1718  if (user_metadata.isSuper) {
1719  return true;
1720  }
1721 
1723  dbObject.loadKey(cat);
1724  std::vector<DBObject> privObjects = {dbObject};
1725 
1726  return SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects);
1727 }
1728 
1729 void MapDHandler::get_tables_impl(std::vector<std::string>& table_names,
1730  const TSessionId& session,
1731  const GetTablesType get_tables_type) {
1732  const auto session_info = get_session_copy(session);
1733  auto& cat = session_info.getCatalog();
1734  const auto tables = cat.getAllTableMetadata();
1735  for (const auto td : tables) {
1736  if (td->shard >= 0) {
1737  // skip shards, they're not standalone tables
1738  continue;
1739  }
1740  switch (get_tables_type) {
1741  case GET_PHYSICAL_TABLES: {
1742  if (td->isView) {
1743  continue;
1744  }
1745  break;
1746  }
1747  case GET_VIEWS: {
1748  if (!td->isView) {
1749  continue;
1750  }
1751  }
1752  default: {
1753  break;
1754  }
1755  }
1756  if (!hasTableAccessPrivileges(td, session)) {
1757  // skip table, as there are no privileges to access it
1758  continue;
1759  }
1760  table_names.push_back(td->tableName);
1761  }
1762 }
1763 
1764 void MapDHandler::get_tables(std::vector<std::string>& table_names,
1765  const TSessionId& session) {
1766  LOG_SESSION(session);
1767  get_tables_impl(table_names, session, GET_PHYSICAL_TABLES_AND_VIEWS);
1768 }
1769 
1770 void MapDHandler::get_physical_tables(std::vector<std::string>& table_names,
1771  const TSessionId& session) {
1772  LOG_SESSION(session);
1773  get_tables_impl(table_names, session, GET_PHYSICAL_TABLES);
1774 }
1775 
1776 void MapDHandler::get_views(std::vector<std::string>& table_names,
1777  const TSessionId& session) {
1778  LOG_SESSION(session);
1779  get_tables_impl(table_names, session, GET_VIEWS);
1780 }
1781 
1782 void MapDHandler::get_tables_meta(std::vector<TTableMeta>& _return,
1783  const TSessionId& session) {
1784  LOG_SESSION(session);
1785  const auto session_info = get_session_copy(session);
1786  auto& cat = session_info.getCatalog();
1787  const auto tables = cat.getAllTableMetadata();
1788  _return.reserve(tables.size());
1789 
1790  for (const auto td : tables) {
1791  if (td->shard >= 0) {
1792  // skip shards, they're not standalone tables
1793  continue;
1794  }
1795  if (!hasTableAccessPrivileges(td, session)) {
1796  // skip table, as there are no privileges to access it
1797  continue;
1798  }
1799 
1800  TTableMeta ret;
1801  ret.table_name = td->tableName;
1802  ret.is_view = td->isView;
1803  ret.is_replicated = table_is_replicated(td);
1804  ret.shard_count = td->nShards;
1805  ret.max_rows = td->maxRows;
1806  ret.table_id = td->tableId;
1807 
1808  std::vector<TTypeInfo> col_types;
1809  std::vector<std::string> col_names;
1810  size_t num_cols = 0;
1811  if (td->isView) {
1812  try {
1813  const auto query_ra =
1814  parse_to_ra(td->viewSQL, {}, session_info, boost::none, mapd_parameters_);
1816  execute_rel_alg(result,
1817  query_ra,
1818  true,
1819  session_info,
1821  -1,
1822  -1,
1823  false,
1824  true,
1825  false,
1826  false,
1827  false);
1828  num_cols = result.row_set.row_desc.size();
1829  for (const auto col : result.row_set.row_desc) {
1830  if (col.is_physical) {
1831  num_cols--;
1832  continue;
1833  }
1834  col_types.push_back(col.col_type);
1835  col_names.push_back(col.col_name);
1836  }
1837  } catch (std::exception& e) {
1838  LOG(WARNING) << "get_tables_meta: Ignoring broken view: " << td->tableName;
1839  }
1840  } else {
1841  try {
1842  if (hasTableAccessPrivileges(td, session)) {
1843  const auto col_descriptors =
1844  cat.getAllColumnMetadataForTable(td->tableId, false, true, false);
1845  const auto deleted_cd = cat.getDeletedColumn(td);
1846  for (const auto cd : col_descriptors) {
1847  if (cd == deleted_cd) {
1848  continue;
1849  }
1850  col_types.push_back(ThriftSerializers::type_info_to_thrift(cd->columnType));
1851  col_names.push_back(cd->columnName);
1852  }
1853  num_cols = col_descriptors.size();
1854  } else {
1855  continue;
1856  }
1857  } catch (const std::runtime_error& e) {
1858  THROW_MAPD_EXCEPTION(e.what());
1859  }
1860  }
1861 
1862  ret.num_cols = num_cols;
1863  std::copy(col_types.begin(), col_types.end(), std::back_inserter(ret.col_types));
1864  std::copy(col_names.begin(), col_names.end(), std::back_inserter(ret.col_names));
1865 
1866  _return.push_back(ret);
1867  }
1868 }
1869 
1870 void MapDHandler::get_users(std::vector<std::string>& user_names,
1871  const TSessionId& session) {
1872  LOG_SESSION(session);
1873  std::list<Catalog_Namespace::UserMetadata> user_list;
1874  const auto session_info = get_session_copy(session);
1875 
1876  if (!session_info.get_currentUser().isSuper) {
1877  user_list = SysCatalog::instance().getAllUserMetadata(
1878  session_info.getCatalog().getCurrentDB().dbId);
1879  } else {
1880  user_list = SysCatalog::instance().getAllUserMetadata();
1881  }
1882  for (auto u : user_list) {
1883  user_names.push_back(u.userName);
1884  }
1885 }
1886 
1887 void MapDHandler::get_version(std::string& version) {
1888  version = MAPD_RELEASE;
1889 }
1890 
1891 void MapDHandler::clear_gpu_memory(const TSessionId& session) {
1892  LOG_SESSION(session);
1893  const auto session_info = get_session_copy(session);
1894  if (!session_info.get_currentUser().isSuper) {
1895  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_gpu_memory");
1896  }
1897  try {
1899  } catch (const std::exception& e) {
1900  THROW_MAPD_EXCEPTION(e.what());
1901  }
1902  if (render_handler_) {
1903  render_handler_->clear_gpu_memory();
1904  }
1905 
1906  if (leaf_aggregator_.leafCount() > 0) {
1908  }
1909 }
1910 
1911 void MapDHandler::clear_cpu_memory(const TSessionId& session) {
1912  LOG_SESSION(session);
1913  const auto session_info = get_session_copy(session);
1914  if (!session_info.get_currentUser().isSuper) {
1915  THROW_MAPD_EXCEPTION("Superuser privilege is required to run clear_cpu_memory");
1916  }
1917  try {
1919  } catch (const std::exception& e) {
1920  THROW_MAPD_EXCEPTION(e.what());
1921  }
1922  if (render_handler_) {
1923  render_handler_->clear_cpu_memory();
1924  }
1925 
1926  if (leaf_aggregator_.leafCount() > 0) {
1928  }
1929 }
1930 
1932  return INVALID_SESSION_ID;
1933 }
1934 
1935 void MapDHandler::get_memory(std::vector<TNodeMemoryInfo>& _return,
1936  const TSessionId& session,
1937  const std::string& memory_level) {
1938  LOG_SESSION(session);
1939  const auto session_info = get_session_copy(session);
1940  std::vector<Data_Namespace::MemoryInfo> internal_memory;
1941  Data_Namespace::MemoryLevel mem_level;
1942  if (!memory_level.compare("gpu")) {
1944  internal_memory =
1945  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::GPU_LEVEL);
1946  } else {
1948  internal_memory =
1949  SysCatalog::instance().getDataMgr().getMemoryInfo(MemoryLevel::CPU_LEVEL);
1950  }
1951 
1952  for (auto memInfo : internal_memory) {
1953  TNodeMemoryInfo nodeInfo;
1954  if (leaf_aggregator_.leafCount() > 0) {
1955  nodeInfo.host_name = get_hostname();
1956  }
1957  nodeInfo.page_size = memInfo.pageSize;
1958  nodeInfo.max_num_pages = memInfo.maxNumPages;
1959  nodeInfo.num_pages_allocated = memInfo.numPageAllocated;
1960  nodeInfo.is_allocation_capped = memInfo.isAllocationCapped;
1961  for (auto gpu : memInfo.nodeMemoryData) {
1962  TMemoryData md;
1963  md.slab = gpu.slabNum;
1964  md.start_page = gpu.startPage;
1965  md.num_pages = gpu.numPages;
1966  md.touch = gpu.touch;
1967  md.chunk_key.insert(md.chunk_key.end(), gpu.chunk_key.begin(), gpu.chunk_key.end());
1968  md.is_free = gpu.isFree == Buffer_Namespace::MemStatus::FREE;
1969  nodeInfo.node_memory_data.push_back(md);
1970  }
1971  _return.push_back(nodeInfo);
1972  }
1973  if (leaf_aggregator_.leafCount() > 0) {
1974  std::vector<TNodeMemoryInfo> leafSummary =
1975  leaf_aggregator_.getLeafMemoryInfo(session, mem_level);
1976  _return.insert(_return.begin(), leafSummary.begin(), leafSummary.end());
1977  }
1978 }
1979 
1980 void MapDHandler::get_databases(std::vector<TDBInfo>& dbinfos,
1981  const TSessionId& session) {
1982  LOG_SESSION(session);
1983  const auto session_info = get_session_copy(session);
1984  const auto& user = session_info.get_currentUser();
1986  SysCatalog::instance().getDatabaseListForUser(user);
1987  for (auto& db : dbs) {
1988  TDBInfo dbinfo;
1989  dbinfo.db_name = std::move(db.dbName);
1990  dbinfo.db_owner = std::move(db.dbOwnerName);
1991  dbinfos.push_back(std::move(dbinfo));
1992  }
1993 }
1994 
1996  const TExecuteMode::type mode) {
1997  LOG_SESSION(session);
1998  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
1999  auto session_it = get_session_it_unsafe(session);
2000  if (leaf_aggregator_.leafCount() > 0) {
2001  leaf_aggregator_.set_execution_mode(session, mode);
2002  try {
2003  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2004  } catch (const TMapDException& e) {
2005  LOG(INFO) << "Aggregator failed to set execution mode: " << e.error_msg;
2006  }
2007  return;
2008  }
2009  MapDHandler::set_execution_mode_nolock(session_it->second.get(), mode);
2010 }
2011 
2012 namespace {
2013 
2014 void check_table_not_sharded(const Catalog& cat, const std::string& table_name) {
2015  const auto td = cat.getMetadataForTable(table_name);
2016  if (td && td->nShards) {
2017  throw std::runtime_error("Cannot import a sharded table directly to a leaf");
2018  }
2019 }
2020 
2021 } // namespace
2022 
2024  const std::string& table_name,
2025  const std::vector<TRow>& rows) {
2026  LOG_SESSION(session, "table_name", table_name);
2027  check_read_only("load_table_binary");
2028  const auto session_info = get_session_copy(session);
2029  auto& cat = session_info.getCatalog();
2030  if (g_cluster && !leaf_aggregator_.leafCount()) {
2031  // Sharded table rows need to be routed to the leaf by an aggregator.
2032  check_table_not_sharded(cat, table_name);
2033  }
2034  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2035  if (td == nullptr) {
2036  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2037  }
2038  check_table_load_privileges(session_info, table_name);
2039  if (rows.empty()) {
2040  return;
2041  }
2042  std::unique_ptr<Importer_NS::Loader> loader;
2043  if (leaf_aggregator_.leafCount() > 0) {
2044  loader.reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2045  } else {
2046  loader.reset(new Importer_NS::Loader(cat, td));
2047  }
2048  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2049  // Subtracting 1 (rowid) until TableDescriptor is updated.
2050  if (rows.front().cols.size() !=
2051  static_cast<size_t>(td->nColumns) - (td->hasDeletedCol ? 2 : 1)) {
2052  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2053  }
2054  auto col_descs = loader->get_column_descs();
2055  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2056  for (auto cd : col_descs) {
2057  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2058  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2059  }
2060  for (auto const& row : rows) {
2061  size_t col_idx = 0;
2062  try {
2063  for (auto cd : col_descs) {
2064  import_buffers[col_idx]->add_value(
2065  cd, row.cols[col_idx], row.cols[col_idx].is_null);
2066  col_idx++;
2067  }
2068  } catch (const std::exception& e) {
2069  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2070  import_buffers[col_idx_to_pop]->pop_value();
2071  }
2072  LOG(ERROR) << "Input exception thrown: " << e.what()
2073  << ". Row discarded, issue at column : " << (col_idx + 1)
2074  << " data :" << row;
2075  }
2076  }
2077  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2078  session_info.getCatalog(), table_name, LockType::CheckpointLock);
2079  loader->load(import_buffers, rows.size());
2080 }
2081 
2083  const Catalog_Namespace::SessionInfo& session_info,
2084  const std::string& table_name,
2085  size_t num_cols,
2086  std::unique_ptr<Importer_NS::Loader>* loader,
2087  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>>* import_buffers) {
2088  auto& cat = session_info.getCatalog();
2089  if (g_cluster && !leaf_aggregator_.leafCount()) {
2090  // Sharded table rows need to be routed to the leaf by an aggregator.
2091  check_table_not_sharded(cat, table_name);
2092  }
2093  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2094  if (td == nullptr) {
2095  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2096  }
2097  check_table_load_privileges(session_info, table_name);
2098  if (leaf_aggregator_.leafCount() > 0) {
2099  loader->reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2100  } else {
2101  loader->reset(new Importer_NS::Loader(cat, td));
2102  }
2103  auto col_descs = (*loader)->get_column_descs();
2104  auto geo_physical_cols = std::count_if(
2105  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2106  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2107  // Subtracting 1 (rowid) until TableDescriptor is updated.
2108  if (num_cols != static_cast<size_t>(td->nColumns) - geo_physical_cols -
2109  (td->hasDeletedCol ? 2 : 1) ||
2110  num_cols < 1) {
2111  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name);
2112  }
2113  for (auto cd : col_descs) {
2114  import_buffers->push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2115  new Importer_NS::TypedImportBuffer(cd, (*loader)->getStringDict(cd))));
2116  }
2117 }
2118 
2120  const std::string& table_name,
2121  const std::vector<TColumn>& cols) {
2122  LOG_SESSION(session, "table_name", table_name);
2123  check_read_only("load_table_binary_columnar");
2124 
2125  std::unique_ptr<Importer_NS::Loader> loader;
2126  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2127  const auto session_info = get_session_copy(session);
2128  auto& cat = session_info.getCatalog();
2130  session_info, table_name, cols.size(), &loader, &import_buffers);
2131 
2132  size_t numRows = 0;
2133  size_t import_idx = 0; // index into the TColumn vector being loaded
2134  size_t col_idx = 0; // index into column description vector
2135  try {
2136  size_t skip_physical_cols = 0;
2137  for (auto cd : loader->get_column_descs()) {
2138  if (skip_physical_cols > 0) {
2139  if (!cd->isGeoPhyCol) {
2140  throw std::runtime_error("Unexpected physical column");
2141  }
2142  skip_physical_cols--;
2143  continue;
2144  }
2145  size_t colRows = import_buffers[col_idx]->add_values(cd, cols[import_idx]);
2146  if (col_idx == 0) {
2147  numRows = colRows;
2148  } else if (colRows != numRows) {
2149  std::ostringstream oss;
2150  oss << "load_table_binary_columnar: Inconsistent number of rows in column "
2151  << cd->columnName << " , expecting " << numRows << " rows, column "
2152  << col_idx << " has " << colRows << " rows";
2153  THROW_MAPD_EXCEPTION(oss.str());
2154  }
2155  // Advance to the next column in the table
2156  col_idx++;
2157 
2158  // For geometry columns: process WKT strings and fill physical columns
2159  if (cd->columnType.is_geometry()) {
2160  auto geo_col_idx = col_idx - 1;
2161  const auto wkt_column = import_buffers[geo_col_idx]->getGeoStringBuffer();
2162  std::vector<std::vector<double>> coords_column, bounds_column;
2163  std::vector<std::vector<int>> ring_sizes_column, poly_rings_column;
2164  int render_group = 0;
2165  SQLTypeInfo ti = cd->columnType;
2166  if (numRows != wkt_column->size() ||
2168  ti,
2169  coords_column,
2170  bounds_column,
2171  ring_sizes_column,
2172  poly_rings_column,
2173  false)) {
2174  std::ostringstream oss;
2175  oss << "load_table_binary_columnar: Invalid geometry in column "
2176  << cd->columnName;
2177  THROW_MAPD_EXCEPTION(oss.str());
2178  }
2179  // Populate physical columns, advance col_idx
2181  cd,
2182  import_buffers,
2183  col_idx,
2184  coords_column,
2185  bounds_column,
2186  ring_sizes_column,
2187  poly_rings_column,
2188  render_group);
2189  skip_physical_cols = cd->columnType.get_physical_cols();
2190  }
2191  // Advance to the next column of values being loaded
2192  import_idx++;
2193  }
2194  } catch (const std::exception& e) {
2195  std::ostringstream oss;
2196  oss << "load_table_binary_columnar: Input exception thrown: " << e.what()
2197  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2198  THROW_MAPD_EXCEPTION(oss.str());
2199  }
2200  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2201  session_info.getCatalog(), table_name, LockType::CheckpointLock);
2202  loader->load(import_buffers, numRows);
2203 }
2204 
2205 using RecordBatchVector = std::vector<std::shared_ptr<arrow::RecordBatch>>;
2206 
2207 #define ARROW_THRIFT_THROW_NOT_OK(s) \
2208  do { \
2209  ::arrow::Status _s = (s); \
2210  if (UNLIKELY(!_s.ok())) { \
2211  TMapDException ex; \
2212  ex.error_msg = _s.ToString(); \
2213  LOG(ERROR) << s.ToString(); \
2214  throw ex; \
2215  } \
2216  } while (0)
2217 
2218 namespace {
2219 
2220 RecordBatchVector loadArrowStream(const std::string& stream) {
2221  RecordBatchVector batches;
2222  try {
2223  // TODO(wesm): Make this simpler in general, see ARROW-1600
2224  auto stream_buffer =
2225  std::make_shared<arrow::Buffer>(reinterpret_cast<const uint8_t*>(stream.c_str()),
2226  static_cast<int64_t>(stream.size()));
2227 
2228  arrow::io::BufferReader buf_reader(stream_buffer);
2229  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
2231  arrow::ipc::RecordBatchStreamReader::Open(&buf_reader, &batch_reader));
2232 
2233  while (true) {
2234  std::shared_ptr<arrow::RecordBatch> batch;
2235  // Read batch (zero-copy) from the stream
2236  ARROW_THRIFT_THROW_NOT_OK(batch_reader->ReadNext(&batch));
2237  if (batch == nullptr) {
2238  break;
2239  }
2240  batches.emplace_back(std::move(batch));
2241  }
2242  } catch (const std::exception& e) {
2243  LOG(ERROR) << "Error parsing Arrow stream: " << e.what() << ". Import aborted";
2244  }
2245  return batches;
2246 }
2247 
2248 } // namespace
2249 
2251  const std::string& table_name,
2252  const std::string& arrow_stream) {
2253  LOG_SESSION(session, "table_name", table_name);
2254  check_read_only("load_table_binary_arrow");
2255 
2256  RecordBatchVector batches = loadArrowStream(arrow_stream);
2257 
2258  // Assuming have one batch for now
2259  if (batches.size() != 1) {
2260  THROW_MAPD_EXCEPTION("Expected a single Arrow record batch. Import aborted");
2261  }
2262 
2263  std::shared_ptr<arrow::RecordBatch> batch = batches[0];
2264 
2265  std::unique_ptr<Importer_NS::Loader> loader;
2266  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2267  const auto session_info = get_session_copy(session);
2268  prepare_columnar_loader(session_info,
2269  table_name,
2270  static_cast<size_t>(batch->num_columns()),
2271  &loader,
2272  &import_buffers);
2273 
2274  size_t numRows = 0;
2275  size_t col_idx = 0;
2276  try {
2277  for (auto cd : loader->get_column_descs()) {
2278  auto& array = *batch->column(col_idx);
2279  Importer_NS::ArraySliceRange row_slice(0, array.length());
2280  numRows =
2281  import_buffers[col_idx]->add_arrow_values(cd, array, true, row_slice, nullptr);
2282  col_idx++;
2283  }
2284  } catch (const std::exception& e) {
2285  LOG(ERROR) << "Input exception thrown: " << e.what()
2286  << ". Issue at column : " << (col_idx + 1) << ". Import aborted";
2287  // TODO(tmostak): Go row-wise on binary columnar import to be consistent with our
2288  // other import paths
2289  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2290  }
2291  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2292  session_info.getCatalog(), table_name, LockType::CheckpointLock);
2293  loader->load(import_buffers, numRows);
2294 }
2295 
2296 void MapDHandler::load_table(const TSessionId& session,
2297  const std::string& table_name,
2298  const std::vector<TStringRow>& rows) {
2299  LOG_SESSION(session, "table_name", table_name);
2300  check_read_only("load_table");
2301  const auto session_info = get_session_copy(session);
2302  auto& cat = session_info.getCatalog();
2303  if (g_cluster && !leaf_aggregator_.leafCount()) {
2304  // Sharded table rows need to be routed to the leaf by an aggregator.
2305  check_table_not_sharded(cat, table_name);
2306  }
2307  const TableDescriptor* td = cat.getMetadataForTable(table_name);
2308  if (td == nullptr) {
2309  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
2310  }
2311  check_table_load_privileges(session_info, table_name);
2312  if (rows.empty()) {
2313  return;
2314  }
2315  std::unique_ptr<Importer_NS::Loader> loader;
2316  if (leaf_aggregator_.leafCount() > 0) {
2317  loader.reset(new DistributedLoader(session_info, td, &leaf_aggregator_));
2318  } else {
2319  loader.reset(new Importer_NS::Loader(cat, td));
2320  }
2321  auto col_descs = loader->get_column_descs();
2322  auto geo_physical_cols = std::count_if(
2323  col_descs.begin(), col_descs.end(), [](auto cd) { return cd->isGeoPhyCol; });
2324  // TODO(andrew): nColumns should be number of non-virtual/non-system columns.
2325  // Subtracting 1 (rowid) until TableDescriptor is updated.
2326  if (rows.front().cols.size() != static_cast<size_t>(td->nColumns) - geo_physical_cols -
2327  (td->hasDeletedCol ? 2 : 1)) {
2328  THROW_MAPD_EXCEPTION("Wrong number of columns to load into Table " + table_name +
2329  " (" + std::to_string(rows.front().cols.size()) + " vs " +
2330  std::to_string(td->nColumns - geo_physical_cols - 1) + ")");
2331  }
2332  std::vector<std::unique_ptr<Importer_NS::TypedImportBuffer>> import_buffers;
2333  for (auto cd : col_descs) {
2334  import_buffers.push_back(std::unique_ptr<Importer_NS::TypedImportBuffer>(
2335  new Importer_NS::TypedImportBuffer(cd, loader->getStringDict(cd))));
2336  }
2337  Importer_NS::CopyParams copy_params;
2338  size_t rows_completed = 0;
2339  for (auto const& row : rows) {
2340  size_t import_idx = 0; // index into the TStringRow being loaded
2341  size_t col_idx = 0; // index into column description vector
2342  try {
2343  size_t skip_physical_cols = 0;
2344  for (auto cd : col_descs) {
2345  if (skip_physical_cols > 0) {
2346  if (!cd->isGeoPhyCol) {
2347  throw std::runtime_error("Unexpected physical column");
2348  }
2349  skip_physical_cols--;
2350  continue;
2351  }
2352  import_buffers[col_idx]->add_value(
2353  cd, row.cols[import_idx].str_val, row.cols[import_idx].is_null, copy_params);
2354  // Advance to the next column within the table
2355  col_idx++;
2356 
2357  if (cd->columnType.is_geometry()) {
2358  // Populate physical columns
2359  std::vector<double> coords, bounds;
2360  std::vector<int> ring_sizes, poly_rings;
2361  int render_group = 0;
2362  SQLTypeInfo ti;
2363  if (row.cols[import_idx].is_null ||
2364  !Geo_namespace::GeoTypesFactory::getGeoColumns(row.cols[import_idx].str_val,
2365  ti,
2366  coords,
2367  bounds,
2368  ring_sizes,
2369  poly_rings,
2370  false)) {
2371  throw std::runtime_error("Invalid geometry");
2372  }
2373  if (cd->columnType.get_type() != ti.get_type()) {
2374  throw std::runtime_error("Geometry type mismatch");
2375  }
2377  cd,
2378  import_buffers,
2379  col_idx,
2380  coords,
2381  bounds,
2382  ring_sizes,
2383  poly_rings,
2384  render_group);
2385  skip_physical_cols = cd->columnType.get_physical_cols();
2386  }
2387  // Advance to the next field within the row
2388  import_idx++;
2389  }
2390  rows_completed++;
2391  } catch (const std::exception& e) {
2392  for (size_t col_idx_to_pop = 0; col_idx_to_pop < col_idx; ++col_idx_to_pop) {
2393  import_buffers[col_idx_to_pop]->pop_value();
2394  }
2395  LOG(ERROR) << "Input exception thrown: " << e.what()
2396  << ". Row discarded, issue at column : " << (col_idx + 1)
2397  << " data :" << row;
2398  }
2399  }
2400  auto checkpoint_lock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
2401  session_info.getCatalog(), table_name, LockType::CheckpointLock);
2402  loader->load(import_buffers, rows_completed);
2403 }
2404 
2405 char MapDHandler::unescape_char(std::string str) {
2406  char out = str[0];
2407  if (str.size() == 2 && str[0] == '\\') {
2408  if (str[1] == 't') {
2409  out = '\t';
2410  } else if (str[1] == 'n') {
2411  out = '\n';
2412  } else if (str[1] == '0') {
2413  out = '\0';
2414  } else if (str[1] == '\'') {
2415  out = '\'';
2416  } else if (str[1] == '\\') {
2417  out = '\\';
2418  }
2419  }
2420  return out;
2421 }
2422 
2424  Importer_NS::CopyParams copy_params;
2425  switch (cp.has_header) {
2426  case TImportHeaderRow::AUTODETECT:
2428  break;
2429  case TImportHeaderRow::NO_HEADER:
2431  break;
2432  case TImportHeaderRow::HAS_HEADER:
2434  break;
2435  default:
2436  THROW_MAPD_EXCEPTION("Invalid has_header in TCopyParams: " +
2437  std::to_string((int)cp.has_header));
2438  break;
2439  }
2440  copy_params.quoted = cp.quoted;
2441  if (cp.delimiter.length() > 0) {
2442  copy_params.delimiter = unescape_char(cp.delimiter);
2443  } else {
2444  copy_params.delimiter = '\0';
2445  }
2446  if (cp.null_str.length() > 0) {
2447  copy_params.null_str = cp.null_str;
2448  }
2449  if (cp.quote.length() > 0) {
2450  copy_params.quote = unescape_char(cp.quote);
2451  }
2452  if (cp.escape.length() > 0) {
2453  copy_params.escape = unescape_char(cp.escape);
2454  }
2455  if (cp.line_delim.length() > 0) {
2456  copy_params.line_delim = unescape_char(cp.line_delim);
2457  }
2458  if (cp.array_delim.length() > 0) {
2459  copy_params.array_delim = unescape_char(cp.array_delim);
2460  }
2461  if (cp.array_begin.length() > 0) {
2462  copy_params.array_begin = unescape_char(cp.array_begin);
2463  }
2464  if (cp.array_end.length() > 0) {
2465  copy_params.array_end = unescape_char(cp.array_end);
2466  }
2467  if (cp.threads != 0) {
2468  copy_params.threads = cp.threads;
2469  }
2470  if (cp.s3_access_key.length() > 0) {
2471  copy_params.s3_access_key = cp.s3_access_key;
2472  }
2473  if (cp.s3_secret_key.length() > 0) {
2474  copy_params.s3_secret_key = cp.s3_secret_key;
2475  }
2476  if (cp.s3_region.length() > 0) {
2477  copy_params.s3_region = cp.s3_region;
2478  }
2479  if (cp.s3_endpoint.length() > 0) {
2480  copy_params.s3_endpoint = cp.s3_endpoint;
2481  }
2482  switch (cp.file_type) {
2483  case TFileType::POLYGON:
2485  break;
2486  case TFileType::DELIMITED:
2488  break;
2489 #ifdef ENABLE_IMPORT_PARQUET
2490  case TFileType::PARQUET:
2491  copy_params.file_type = Importer_NS::FileType::PARQUET;
2492  break;
2493 #endif
2494  default:
2495  THROW_MAPD_EXCEPTION("Invalid file_type in TCopyParams: " +
2496  std::to_string((int)cp.file_type));
2497  break;
2498  }
2499  switch (cp.geo_coords_encoding) {
2500  case TEncodingType::GEOINT:
2501  copy_params.geo_coords_encoding = kENCODING_GEOINT;
2502  break;
2503  case TEncodingType::NONE:
2504  copy_params.geo_coords_encoding = kENCODING_NONE;
2505  break;
2506  default:
2507  THROW_MAPD_EXCEPTION("Invalid geo_coords_encoding in TCopyParams: " +
2508  std::to_string((int)cp.geo_coords_encoding));
2509  break;
2510  }
2511  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2512  switch (cp.geo_coords_type) {
2513  case TDatumType::GEOGRAPHY:
2514  copy_params.geo_coords_type = kGEOGRAPHY;
2515  break;
2516  case TDatumType::GEOMETRY:
2517  copy_params.geo_coords_type = kGEOMETRY;
2518  break;
2519  default:
2520  THROW_MAPD_EXCEPTION("Invalid geo_coords_type in TCopyParams: " +
2521  std::to_string((int)cp.geo_coords_type));
2522  break;
2523  }
2524  switch (cp.geo_coords_srid) {
2525  case 4326:
2526  case 3857:
2527  case 900913:
2528  copy_params.geo_coords_srid = cp.geo_coords_srid;
2529  break;
2530  default:
2531  THROW_MAPD_EXCEPTION("Invalid geo_coords_srid in TCopyParams (" +
2532  std::to_string((int)cp.geo_coords_srid));
2533  break;
2534  }
2535  copy_params.sanitize_column_names = cp.sanitize_column_names;
2536  copy_params.geo_layer_name = cp.geo_layer_name;
2537  return copy_params;
2538 }
2539 
2541  TCopyParams copy_params;
2542  copy_params.delimiter = cp.delimiter;
2543  copy_params.null_str = cp.null_str;
2544  switch (cp.has_header) {
2546  copy_params.has_header = TImportHeaderRow::AUTODETECT;
2547  break;
2549  copy_params.has_header = TImportHeaderRow::NO_HEADER;
2550  break;
2552  copy_params.has_header = TImportHeaderRow::HAS_HEADER;
2553  break;
2554  default:
2555  CHECK(false);
2556  break;
2557  }
2558  copy_params.quoted = cp.quoted;
2559  copy_params.quote = cp.quote;
2560  copy_params.escape = cp.escape;
2561  copy_params.line_delim = cp.line_delim;
2562  copy_params.array_delim = cp.array_delim;
2563  copy_params.array_begin = cp.array_begin;
2564  copy_params.array_end = cp.array_end;
2565  copy_params.threads = cp.threads;
2566  copy_params.s3_access_key = cp.s3_access_key;
2567  copy_params.s3_secret_key = cp.s3_secret_key;
2568  copy_params.s3_region = cp.s3_region;
2569  copy_params.s3_endpoint = cp.s3_endpoint;
2570  switch (cp.file_type) {
2572  copy_params.file_type = TFileType::POLYGON;
2573  break;
2574  default:
2575  copy_params.file_type = TFileType::DELIMITED;
2576  break;
2577  }
2578  switch (cp.geo_coords_encoding) {
2579  case kENCODING_GEOINT:
2580  copy_params.geo_coords_encoding = TEncodingType::GEOINT;
2581  break;
2582  default:
2583  copy_params.geo_coords_encoding = TEncodingType::NONE;
2584  break;
2585  }
2586  copy_params.geo_coords_comp_param = cp.geo_coords_comp_param;
2587  switch (cp.geo_coords_type) {
2588  case kGEOGRAPHY:
2589  copy_params.geo_coords_type = TDatumType::GEOGRAPHY;
2590  break;
2591  case kGEOMETRY:
2592  copy_params.geo_coords_type = TDatumType::GEOMETRY;
2593  break;
2594  default:
2595  CHECK(false);
2596  break;
2597  }
2598  copy_params.geo_coords_srid = cp.geo_coords_srid;
2599  copy_params.sanitize_column_names = cp.sanitize_column_names;
2600  copy_params.geo_layer_name = cp.geo_layer_name;
2601  return copy_params;
2602 }
2603 
2604 void add_vsi_network_prefix(std::string& path) {
2605  // do we support network file access?
2607 
2608  // modify head of filename based on source location
2609  if (boost::istarts_with(path, "http://") || boost::istarts_with(path, "https://")) {
2610  if (!gdal_network) {
2612  "HTTP geo file import not supported! Update to GDAL 2.2 or later!");
2613  }
2614  // invoke GDAL CURL virtual file reader
2615  path = "/vsicurl/" + path;
2616  } else if (boost::istarts_with(path, "s3://")) {
2617  if (!gdal_network) {
2619  "S3 geo file import not supported! Update to GDAL 2.2 or later!");
2620  }
2621  // invoke GDAL S3 virtual file reader
2622  boost::replace_first(path, "s3://", "/vsis3/");
2623  }
2624 }
2625 
2626 void add_vsi_geo_prefix(std::string& path) {
2627  // single gzip'd file (not an archive)?
2628  if (boost::iends_with(path, ".gz") && !boost::iends_with(path, ".tar.gz")) {
2629  path = "/vsigzip/" + path;
2630  }
2631 }
2632 
2633 void add_vsi_archive_prefix(std::string& path) {
2634  // check for compressed file or file bundle
2635  if (boost::iends_with(path, ".zip")) {
2636  // zip archive
2637  path = "/vsizip/" + path;
2638  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
2639  boost::iends_with(path, ".tar.gz")) {
2640  // tar archive (compressed or uncompressed)
2641  path = "/vsitar/" + path;
2642  }
2643 }
2644 
2645 std::string remove_vsi_prefixes(const std::string& path_in) {
2646  std::string path(path_in);
2647 
2648  // these will be first
2649  if (boost::istarts_with(path, "/vsizip/")) {
2650  boost::replace_first(path, "/vsizip/", "");
2651  } else if (boost::istarts_with(path, "/vsitar/")) {
2652  boost::replace_first(path, "/vsitar/", "");
2653  } else if (boost::istarts_with(path, "/vsigzip/")) {
2654  boost::replace_first(path, "/vsigzip/", "");
2655  }
2656 
2657  // then these
2658  if (boost::istarts_with(path, "/vsicurl/")) {
2659  boost::replace_first(path, "/vsicurl/", "");
2660  } else if (boost::istarts_with(path, "/vsis3/")) {
2661  boost::replace_first(path, "/vsis3/", "s3://");
2662  }
2663 
2664  return path;
2665 }
2666 
2667 bool path_is_relative(const std::string& path) {
2668  if (boost::istarts_with(path, "s3://") || boost::istarts_with(path, "http://") ||
2669  boost::istarts_with(path, "https://")) {
2670  return false;
2671  }
2672  return !boost::filesystem::path(path).is_absolute();
2673 }
2674 
2675 bool path_has_valid_filename(const std::string& path) {
2676  auto filename = boost::filesystem::path(path).filename().string();
2677  if (filename.size() == 0 || filename[0] == '.' || filename[0] == '/') {
2678  return false;
2679  }
2680  return true;
2681 }
2682 
2683 bool is_a_supported_geo_file(const std::string& path, bool include_gz) {
2684  if (!path_has_valid_filename(path)) {
2685  return false;
2686  }
2687  if (include_gz) {
2688  if (boost::iends_with(path, ".geojson.gz") || boost::iends_with(path, ".json.gz")) {
2689  return true;
2690  }
2691  }
2692  if (boost::iends_with(path, ".shp") || boost::iends_with(path, ".geojson") ||
2693  boost::iends_with(path, ".json") || boost::iends_with(path, ".kml") ||
2694  boost::iends_with(path, ".kmz") || boost::iends_with(path, ".gdb") ||
2695  boost::iends_with(path, ".gdb.zip")) {
2696  return true;
2697  }
2698  return false;
2699 }
2700 
2701 bool is_a_supported_archive_file(const std::string& path) {
2702  if (!path_has_valid_filename(path)) {
2703  return false;
2704  }
2705  if (boost::iends_with(path, ".zip") && !boost::iends_with(path, ".gdb.zip")) {
2706  return true;
2707  } else if (boost::iends_with(path, ".tar") || boost::iends_with(path, ".tgz") ||
2708  boost::iends_with(path, ".tar.gz")) {
2709  return true;
2710  }
2711  return false;
2712 }
2713 
2714 std::string find_first_geo_file_in_archive(const std::string& archive_path,
2715  const Importer_NS::CopyParams& copy_params) {
2716  // get the recursive list of all files in the archive
2717  std::vector<std::string> files =
2718  Importer_NS::Importer::gdalGetAllFilesInArchive(archive_path, copy_params);
2719 
2720  // report the list
2721  LOG(INFO) << "Found " << files.size() << " files in Archive "
2722  << remove_vsi_prefixes(archive_path);
2723  for (const auto& file : files) {
2724  LOG(INFO) << " " << file;
2725  }
2726 
2727  // scan the list for the first candidate file
2728  bool found_suitable_file = false;
2729  std::string file_name;
2730  for (const auto& file : files) {
2731  if (is_a_supported_geo_file(file, false)) {
2732  file_name = file;
2733  found_suitable_file = true;
2734  break;
2735  }
2736  }
2737 
2738  // if we didn't find anything
2739  if (!found_suitable_file) {
2740  LOG(INFO) << "Failed to find any supported geo files in Archive: " +
2741  remove_vsi_prefixes(archive_path);
2742  file_name.clear();
2743  }
2744 
2745  // done
2746  return file_name;
2747 }
2748 
2749 void MapDHandler::detect_column_types(TDetectResult& _return,
2750  const TSessionId& session,
2751  const std::string& file_name_in,
2752  const TCopyParams& cp) {
2753  LOG_SESSION(session);
2754  check_read_only("detect_column_types");
2755  get_session_copy(session);
2756 
2758 
2759  std::string file_name{file_name_in};
2760 
2761  if (path_is_relative(file_name)) {
2762  // assume relative paths are relative to data_path / mapd_import / <session>
2763  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
2764  boost::filesystem::path(file_name).filename();
2765  file_name = file_path.string();
2766  }
2767 
2768  // if it's a geo table, handle alternative paths (S3, HTTP, archive etc.)
2769  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
2770  if (is_a_supported_geo_file(file_name, true)) {
2771  // prepare to detect geo file directly
2772  add_vsi_network_prefix(file_name);
2773  add_vsi_geo_prefix(file_name);
2774  } else if (is_a_supported_archive_file(file_name)) {
2775  // find the archive file
2776  add_vsi_network_prefix(file_name);
2777  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
2778  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
2779  }
2780  // find geo file in archive
2781  add_vsi_archive_prefix(file_name);
2782  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
2783  // prepare to detect that geo file
2784  if (geo_file.size()) {
2785  file_name = file_name + std::string("/") + geo_file;
2786  }
2787  } else {
2788  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive format: " +
2789  file_name_in);
2790  }
2791  }
2792 
2793  auto file_path = boost::filesystem::path(file_name);
2794  // can be a s3 url
2795  if (!boost::istarts_with(file_name, "s3://")) {
2796  if (!boost::filesystem::path(file_name).is_absolute()) {
2797  file_path = import_path_ / picosha2::hash256_hex_string(session) /
2798  boost::filesystem::path(file_name).filename();
2799  file_name = file_path.string();
2800  }
2801 
2802  if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
2803  // check for geo file
2804  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
2805  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
2806  }
2807  } else {
2808  // check for regular file
2809  if (!boost::filesystem::exists(file_path)) {
2810  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
2811  }
2812  }
2813  }
2814  try {
2815  if (copy_params.file_type == Importer_NS::FileType::DELIMITED
2816 #ifdef ENABLE_IMPORT_PARQUET
2817  || (copy_params.file_type == Importer_NS::FileType::PARQUET)
2818 #endif
2819  ) {
2820  Importer_NS::Detector detector(file_path, copy_params);
2821  std::vector<SQLTypes> best_types = detector.best_sqltypes;
2822  std::vector<EncodingType> best_encodings = detector.best_encodings;
2823  std::vector<std::string> headers = detector.get_headers();
2824  copy_params = detector.get_copy_params();
2825 
2826  _return.copy_params = copyparams_to_thrift(copy_params);
2827  _return.row_set.row_desc.resize(best_types.size());
2828  for (size_t col_idx = 0; col_idx < best_types.size(); col_idx++) {
2829  TColumnType col;
2830  SQLTypes t = best_types[col_idx];
2831  EncodingType encodingType = best_encodings[col_idx];
2832  SQLTypeInfo ti(t, false, encodingType);
2833  if (IS_GEO(t)) {
2834  // set this so encoding_to_thrift does the right thing
2835  ti.set_compression(copy_params.geo_coords_encoding);
2836  // fill in these directly
2837  col.col_type.precision = static_cast<int>(copy_params.geo_coords_type);
2838  col.col_type.scale = copy_params.geo_coords_srid;
2839  col.col_type.comp_param = copy_params.geo_coords_comp_param;
2840  }
2841  col.col_type.type = type_to_thrift(ti);
2842  col.col_type.encoding = encoding_to_thrift(ti);
2843  if (copy_params.sanitize_column_names) {
2844  col.col_name = ImportHelpers::sanitize_name(headers[col_idx]);
2845  } else {
2846  col.col_name = headers[col_idx];
2847  }
2848  col.is_reserved_keyword = ImportHelpers::is_reserved_name(col.col_name);
2849  _return.row_set.row_desc[col_idx] = col;
2850  }
2851  size_t num_samples = 100;
2852  auto sample_data = detector.get_sample_rows(num_samples);
2853 
2854  TRow sample_row;
2855  for (auto row : sample_data) {
2856  sample_row.cols.clear();
2857  for (const auto& s : row) {
2858  TDatum td;
2859  td.val.str_val = s;
2860  td.is_null = s.empty();
2861  sample_row.cols.push_back(td);
2862  }
2863  _return.row_set.rows.push_back(sample_row);
2864  }
2865  } else if (copy_params.file_type == Importer_NS::FileType::POLYGON) {
2866  // @TODO simon.eves get this from somewhere!
2867  const std::string geoColumnName(OMNISCI_GEO_PREFIX);
2868 
2869  check_geospatial_files(file_path, copy_params);
2870  std::list<ColumnDescriptor> cds = Importer_NS::Importer::gdalToColumnDescriptors(
2871  file_path.string(), geoColumnName, copy_params);
2872  for (auto cd : cds) {
2873  if (copy_params.sanitize_column_names) {
2874  cd.columnName = ImportHelpers::sanitize_name(cd.columnName);
2875  }
2876  _return.row_set.row_desc.push_back(populateThriftColumnType(nullptr, &cd));
2877  }
2878  std::map<std::string, std::vector<std::string>> sample_data;
2880  file_path.string(), geoColumnName, sample_data, 100, copy_params);
2881  if (sample_data.size() > 0) {
2882  for (size_t i = 0; i < sample_data.begin()->second.size(); i++) {
2883  TRow sample_row;
2884  for (auto cd : cds) {
2885  TDatum td;
2886  td.val.str_val = sample_data[cd.sourceName].at(i);
2887  td.is_null = td.val.str_val.empty();
2888  sample_row.cols.push_back(td);
2889  }
2890  _return.row_set.rows.push_back(sample_row);
2891  }
2892  }
2893  _return.copy_params = copyparams_to_thrift(copy_params);
2894  }
2895  } catch (const std::exception& e) {
2896  THROW_MAPD_EXCEPTION("detect_column_types error: " + std::string(e.what()));
2897  }
2898 }
2899 
2901  const std::string& query_str,
2902  const Catalog_Namespace::SessionInfo& session_info,
2903  const std::string& action /* render or validate */) {
2904  auto& cat = session_info.getCatalog();
2905  LOG(INFO) << action << ": " << hide_sensitive_data_from_query(query_str);
2906  SQLParser parser;
2907  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
2908  std::string last_parsed;
2909  int num_parse_errors = 0;
2910  try {
2911  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
2912  } catch (std::exception& e) {
2913  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2914  }
2915  if (num_parse_errors > 0) {
2916  THROW_MAPD_EXCEPTION("Syntax error at: " + last_parsed);
2917  }
2918  if (parse_trees.size() != 1) {
2919  THROW_MAPD_EXCEPTION("Can only " + action + " a single query at a time.");
2920  }
2921  Parser::Stmt* stmt = parse_trees.front().get();
2922  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
2923  if (ddl != nullptr) {
2924  THROW_MAPD_EXCEPTION("Can only " + action + " SELECT statements.");
2925  }
2926  auto dml = static_cast<Parser::DMLStmt*>(stmt);
2928  dml->analyze(cat, query);
2929  Planner::Optimizer optimizer(query, cat);
2930  return optimizer.optimize();
2931 }
2932 
2933 void MapDHandler::render_vega(TRenderResult& _return,
2934  const TSessionId& session,
2935  const int64_t widget_id,
2936  const std::string& vega_json,
2937  const int compression_level,
2938  const std::string& nonce) {
2939  LOG_SESSION(session,
2940  "widget_id",
2941  widget_id,
2942  "compression_level",
2943  compression_level,
2944  "vega_json",
2945  vega_json,
2946  "nonce",
2947  nonce);
2948  if (!render_handler_) {
2949  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
2950  }
2951 
2952  auto session_ptr = get_session_copy_ptr(session);
2953  _return.total_time_ms = measure<>::execution([&]() {
2954  try {
2955  render_handler_->render_vega(
2956  _return, session_ptr, widget_id, vega_json, compression_level, nonce);
2957  } catch (std::exception& e) {
2958  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
2959  }
2960  });
2961 }
2962 
2964  int32_t dashboard_id,
2965  AccessPrivileges requestedPermissions) {
2966  DBObject object(dashboard_id, DashboardDBObjectType);
2967  auto& catalog = session_info.getCatalog();
2968  auto& user = session_info.get_currentUser();
2969  object.loadKey(catalog);
2970  object.setPrivileges(requestedPermissions);
2971  std::vector<DBObject> privs = {object};
2972  return SysCatalog::instance().checkPrivileges(user, privs);
2973 }
2974 
2975 // dashboards
2976 void MapDHandler::get_dashboard(TDashboard& dashboard,
2977  const TSessionId& session,
2978  const int32_t dashboard_id) {
2979  LOG_SESSION(session);
2980  const auto session_info = get_session_copy(session);
2981  auto& cat = session_info.getCatalog();
2983  auto dash = cat.getMetadataForDashboard(dashboard_id);
2984  if (!dash) {
2985  THROW_MAPD_EXCEPTION("Dashboard with dashboard id " + std::to_string(dashboard_id) +
2986  " doesn't exist");
2987  }
2989  session_info, dash->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
2990  THROW_MAPD_EXCEPTION("User has no view privileges for the dashboard with id " +
2991  std::to_string(dashboard_id));
2992  }
2993  user_meta.userName = "";
2994  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
2995  auto objects_list = SysCatalog::instance().getMetadataForObject(
2996  cat.getCurrentDB().dbId,
2997  static_cast<int>(DBObjectType::DashboardDBObjectType),
2998  dashboard_id);
2999  dashboard.dashboard_name = dash->dashboardName;
3000  dashboard.dashboard_state = dash->dashboardState;
3001  dashboard.image_hash = dash->imageHash;
3002  dashboard.update_time = dash->updateTime;
3003  dashboard.dashboard_metadata = dash->dashboardMetadata;
3004  dashboard.dashboard_owner = dash->user;
3005  dashboard.dashboard_id = dash->dashboardId;
3006  if (objects_list.empty() ||
3007  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3008  dashboard.is_dash_shared = false;
3009  } else {
3010  dashboard.is_dash_shared = true;
3011  }
3012 }
3013 
3014 void MapDHandler::get_dashboards(std::vector<TDashboard>& dashboards,
3015  const TSessionId& session) {
3016  LOG_SESSION(session);
3017  const auto session_info = get_session_copy(session);
3018  auto& cat = session_info.getCatalog();
3020  const auto dashes = cat.getAllDashboardsMetadata();
3021  user_meta.userName = "";
3022  for (const auto d : dashes) {
3023  SysCatalog::instance().getMetadataForUserById(d->userId, user_meta);
3025  session_info, d->dashboardId, AccessPrivileges::VIEW_DASHBOARD)) {
3026  auto objects_list = SysCatalog::instance().getMetadataForObject(
3027  cat.getCurrentDB().dbId,
3028  static_cast<int>(DBObjectType::DashboardDBObjectType),
3029  d->dashboardId);
3030  TDashboard dash;
3031  dash.dashboard_name = d->dashboardName;
3032  dash.image_hash = d->imageHash;
3033  dash.update_time = d->updateTime;
3034  dash.dashboard_metadata = d->dashboardMetadata;
3035  dash.dashboard_id = d->dashboardId;
3036  dash.dashboard_owner = d->user;
3037  // dashboardState is intentionally not populated here
3038  // for payload reasons
3039  // use get_dashboard call to get state
3040  if (objects_list.empty() ||
3041  (objects_list.size() == 1 && objects_list[0]->roleName == user_meta.userName)) {
3042  dash.is_dash_shared = false;
3043  } else {
3044  dash.is_dash_shared = true;
3045  }
3046  dashboards.push_back(dash);
3047  }
3048  }
3049 }
3050 
3051 int32_t MapDHandler::create_dashboard(const TSessionId& session,
3052  const std::string& dashboard_name,
3053  const std::string& dashboard_state,
3054  const std::string& image_hash,
3055  const std::string& dashboard_metadata) {
3056  LOG_SESSION(session);
3057  check_read_only("create_dashboard");
3058  const auto session_info = get_session_copy(session);
3059  auto& cat = session_info.getCatalog();
3060 
3061  if (!session_info.checkDBAccessPrivileges(DBObjectType::DashboardDBObjectType,
3063  THROW_MAPD_EXCEPTION("Not enough privileges to create a dashboard.");
3064  }
3065 
3066  auto dash = cat.getMetadataForDashboard(
3067  std::to_string(session_info.get_currentUser().userId), dashboard_name);
3068  if (dash) {
3069  THROW_MAPD_EXCEPTION("Dashboard with name: " + dashboard_name + " already exists.");
3070  }
3071 
3073  dd.dashboardName = dashboard_name;
3074  dd.dashboardState = dashboard_state;
3075  dd.imageHash = image_hash;
3076  dd.dashboardMetadata = dashboard_metadata;
3077  dd.userId = session_info.get_currentUser().userId;
3078  dd.user = session_info.get_currentUser().userName;
3079 
3080  try {
3081  auto id = cat.createDashboard(dd);
3082  // TODO: transactionally unsafe
3083  SysCatalog::instance().createDBObject(
3084  session_info.get_currentUser(), dashboard_name, DashboardDBObjectType, cat, id);
3085  return id;
3086  } catch (const std::exception& e) {
3087  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3088  }
3089 }
3090 
3092  const int32_t dashboard_id,
3093  const std::string& dashboard_name,
3094  const std::string& dashboard_owner,
3095  const std::string& dashboard_state,
3096  const std::string& image_hash,
3097  const std::string& dashboard_metadata) {
3098  LOG_SESSION(session);
3099  check_read_only("replace_dashboard");
3100  const auto session_info = get_session_copy(session);
3101  auto& cat = session_info.getCatalog();
3102 
3104  session_info, dashboard_id, AccessPrivileges::EDIT_DASHBOARD)) {
3105  THROW_MAPD_EXCEPTION("Not enough privileges to replace a dashboard.");
3106  }
3107 
3109  dd.dashboardName = dashboard_name;
3110  dd.dashboardState = dashboard_state;
3111  dd.imageHash = image_hash;
3112  dd.dashboardMetadata = dashboard_metadata;
3114  if (!SysCatalog::instance().getMetadataForUser(dashboard_owner, user)) {
3115  THROW_MAPD_EXCEPTION(std::string("Dashboard owner ") + dashboard_owner +
3116  " does not exist");
3117  }
3118  dd.userId = user.userId;
3119  dd.user = dashboard_owner;
3120  dd.dashboardId = dashboard_id;
3121 
3122  try {
3123  cat.replaceDashboard(dd);
3124  } catch (const std::exception& e) {
3125  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3126  }
3127 }
3128 
3130  const int32_t dashboard_id) {
3131  LOG_SESSION(session);
3132  check_read_only("delete_dashboard");
3133  const auto session_info = get_session_copy(session);
3134  auto& cat = session_info.getCatalog();
3135  auto dash = cat.getMetadataForDashboard(dashboard_id);
3136  if (!dash) {
3137  THROW_MAPD_EXCEPTION("Dashboard with id" + std::to_string(dashboard_id) +
3138  " doesn't exist, so cannot delete it");
3139  }
3141  session_info, dash->dashboardId, AccessPrivileges::DELETE_DASHBOARD)) {
3142  THROW_MAPD_EXCEPTION("Not enough privileges to delete a dashboard.");
3143  }
3144  try {
3145  cat.deleteMetadataForDashboard(dashboard_id);
3146  } catch (const std::exception& e) {
3147  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3148  }
3149 }
3150 
3151 std::vector<std::string> MapDHandler::get_valid_groups(const TSessionId& session,
3152  int32_t dashboard_id,
3153  std::vector<std::string> groups) {
3154  const auto session_info = get_session_copy(session);
3155  auto& cat = session_info.getCatalog();
3156  auto dash = cat.getMetadataForDashboard(dashboard_id);
3157  if (!dash) {
3158  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3159  " does not exist");
3160  } else if (session_info.get_currentUser().userId != dash->userId &&
3161  !session_info.get_currentUser().isSuper) {
3162  throw std::runtime_error(
3163  "User should be either owner of dashboard or super user to share/unshare it");
3164  }
3165  std::vector<std::string> valid_groups;
3167  for (auto& group : groups) {
3168  user_meta.isSuper = false; // initialize default flag
3169  if (!SysCatalog::instance().getGrantee(group)) {
3170  THROW_MAPD_EXCEPTION("Exception: User/Role " + group + " does not exist");
3171  } else if (!user_meta.isSuper) {
3172  valid_groups.push_back(group);
3173  }
3174  }
3175  return valid_groups;
3176 }
3177 
3178 // NOOP: Grants not available for objects as of now
3179 void MapDHandler::share_dashboard(const TSessionId& session,
3180  const int32_t dashboard_id,
3181  const std::vector<std::string>& groups,
3182  const std::vector<std::string>& objects,
3183  const TDashboardPermissions& permissions,
3184  const bool grant_role = false) {
3185  LOG_SESSION(session);
3186  check_read_only("share_dashboard");
3187  std::vector<std::string> valid_groups;
3188  valid_groups = get_valid_groups(session, dashboard_id, groups);
3189  const auto session_info = get_session_copy(session);
3190  auto& cat = session_info.getCatalog();
3191  // By default object type can only be dashboard
3193  DBObject object(dashboard_id, object_type);
3194  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3195  !permissions.view_) {
3196  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for grants");
3197  } else {
3198  AccessPrivileges privs;
3199 
3200  object.resetPrivileges();
3201  if (permissions.delete_) {
3203  }
3204 
3205  if (permissions.create_) {
3207  }
3208 
3209  if (permissions.edit_) {
3211  }
3212 
3213  if (permissions.view_) {
3215  }
3216 
3217  object.setPrivileges(privs);
3218  }
3219  SysCatalog::instance().grantDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3220  // grant system_role to grantees for underlying objects
3221  if (grant_role) {
3222  auto dash = cat.getMetadataForDashboard(dashboard_id);
3223  SysCatalog::instance().grantRoleBatch({dash->dashboardSystemRoleName}, valid_groups);
3224  }
3225 }
3226 
3228  const int32_t dashboard_id,
3229  const std::vector<std::string>& groups,
3230  const std::vector<std::string>& objects,
3231  const TDashboardPermissions& permissions) {
3232  LOG_SESSION(session);
3233  check_read_only("unshare_dashboard");
3234  std::vector<std::string> valid_groups;
3235  valid_groups = get_valid_groups(session, dashboard_id, groups);
3236  const auto session_info = get_session_copy(session);
3237  auto& cat = session_info.getCatalog();
3238  // By default object type can only be dashboard
3240  DBObject object(dashboard_id, object_type);
3241  if (!permissions.create_ && !permissions.delete_ && !permissions.edit_ &&
3242  !permissions.view_) {
3243  THROW_MAPD_EXCEPTION("Atleast one privilege should be assigned for revokes");
3244  } else {
3245  AccessPrivileges privs;
3246 
3247  object.resetPrivileges();
3248  if (permissions.delete_) {
3250  }
3251 
3252  if (permissions.create_) {
3254  }
3255 
3256  if (permissions.edit_) {
3258  }
3259 
3260  if (permissions.view_) {
3262  }
3263 
3264  object.setPrivileges(privs);
3265  }
3266  SysCatalog::instance().revokeDBObjectPrivilegesBatch(valid_groups, {object}, cat);
3267  // revoke system_role from grantees for underlying objects
3268  const auto dash = cat.getMetadataForDashboard(dashboard_id);
3269  SysCatalog::instance().revokeDashboardSystemRole(dash->dashboardSystemRoleName,
3270  valid_groups);
3271 }
3272 
3274  std::vector<TDashboardGrantees>& dashboard_grantees,
3275  const TSessionId& session,
3276  int32_t dashboard_id) {
3277  LOG_SESSION(session);
3278  const auto session_info = get_session_copy(session);
3279  auto& cat = session_info.getCatalog();
3281  auto dash = cat.getMetadataForDashboard(dashboard_id);
3282  if (!dash) {
3283  THROW_MAPD_EXCEPTION("Exception: Dashboard id " + std::to_string(dashboard_id) +
3284  " does not exist");
3285  } else if (session_info.get_currentUser().userId != dash->userId &&
3286  !session_info.get_currentUser().isSuper) {
3288  "User should be either owner of dashboard or super user to access grantees");
3289  }
3290  std::vector<ObjectRoleDescriptor*> objectsList;
3291  objectsList = SysCatalog::instance().getMetadataForObject(
3292  cat.getCurrentDB().dbId,
3293  static_cast<int>(DBObjectType::DashboardDBObjectType),
3294  dashboard_id); // By default objecttypecan be only dashabaords
3295  user_meta.userId = -1;
3296  user_meta.userName = "";
3297  SysCatalog::instance().getMetadataForUserById(dash->userId, user_meta);
3298  for (auto object : objectsList) {
3299  if (user_meta.userName == object->roleName) {
3300  // Mask owner
3301  continue;
3302  }
3303  TDashboardGrantees grantee;
3304  TDashboardPermissions perm;
3305  grantee.name = object->roleName;
3306  grantee.is_user = object->roleType;
3307  perm.create_ = object->privs.hasPermission(DashboardPrivileges::CREATE_DASHBOARD);
3308  perm.delete_ = object->privs.hasPermission(DashboardPrivileges::DELETE_DASHBOARD);
3309  perm.edit_ = object->privs.hasPermission(DashboardPrivileges::EDIT_DASHBOARD);
3310  perm.view_ = object->privs.hasPermission(DashboardPrivileges::VIEW_DASHBOARD);
3311  grantee.permissions = perm;
3312  dashboard_grantees.push_back(grantee);
3313  }
3314 }
3315 
3316 void MapDHandler::create_link(std::string& _return,
3317  const TSessionId& session,
3318  const std::string& view_state,
3319  const std::string& view_metadata) {
3320  LOG_SESSION(session);
3321  // check_read_only("create_link");
3322  const auto session_info = get_session_copy(session);
3323  auto& cat = session_info.getCatalog();
3324 
3325  LinkDescriptor ld;
3326  ld.userId = session_info.get_currentUser().userId;
3327  ld.viewState = view_state;
3328  ld.viewMetadata = view_metadata;
3329 
3330  try {
3331  _return = cat.createLink(ld, 6);
3332  } catch (const std::exception& e) {
3333  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
3334  }
3335 }
3336 
3338  const std::string& name,
3339  const bool is_array) {
3340  TColumnType ct;
3341  ct.col_name = name;
3342  ct.col_type.type = type;
3343  ct.col_type.is_array = is_array;
3344  return ct;
3345 }
3346 
3347 void MapDHandler::check_geospatial_files(const boost::filesystem::path file_path,
3348  const Importer_NS::CopyParams& copy_params) {
3349  const std::list<std::string> shp_ext{".shp", ".shx", ".dbf"};
3350  if (std::find(shp_ext.begin(),
3351  shp_ext.end(),
3352  boost::algorithm::to_lower_copy(file_path.extension().string())) !=
3353  shp_ext.end()) {
3354  for (auto ext : shp_ext) {
3355  auto aux_file = file_path;
3357  aux_file.replace_extension(boost::algorithm::to_upper_copy(ext)).string(),
3358  copy_params) &&
3359  !Importer_NS::Importer::gdalFileExists(aux_file.replace_extension(ext).string(),
3360  copy_params)) {
3361  throw std::runtime_error("required file for shapefile does not exist: " +
3362  aux_file.filename().string());
3363  }
3364  }
3365  }
3366 }
3367 
3368 void MapDHandler::create_table(const TSessionId& session,
3369  const std::string& table_name,
3370  const TRowDescriptor& rd,
3371  const TFileType::type file_type,
3372  const TCreateParams& create_params) {
3373  // sql_execute() below also logs session info.
3374  LOG_SESSION(session, "table_name", table_name);
3375  check_read_only("create_table");
3376 
3377  if (ImportHelpers::is_reserved_name(table_name)) {
3378  THROW_MAPD_EXCEPTION("Invalid table name (reserved keyword): " + table_name);
3379  } else if (table_name != ImportHelpers::sanitize_name(table_name)) {
3380  THROW_MAPD_EXCEPTION("Invalid characters in table name: " + table_name);
3381  }
3382 
3383  auto rds = rd;
3384 
3385  // no longer need to manually add the poly column for a TFileType::POLYGON table
3386  // a column of the correct geo type has already been added
3387  // @TODO simon.eves rename TFileType::POLYGON to TFileType::GEO or something!
3388 
3389  std::string stmt{"CREATE TABLE " + table_name};
3390  std::vector<std::string> col_stmts;
3391 
3392  for (auto col : rds) {
3393  if (ImportHelpers::is_reserved_name(col.col_name)) {
3394  THROW_MAPD_EXCEPTION("Invalid column name (reserved keyword): " + col.col_name);
3395  } else if (col.col_name != ImportHelpers::sanitize_name(col.col_name)) {
3396  THROW_MAPD_EXCEPTION("Invalid characters in column name: " + col.col_name);
3397  }
3398  if (col.col_type.type == TDatumType::INTERVAL_DAY_TIME ||
3399  col.col_type.type == TDatumType::INTERVAL_YEAR_MONTH) {
3400  THROW_MAPD_EXCEPTION("Unsupported type: " + thrift_to_name(col.col_type) +
3401  " for column: " + col.col_name);
3402  }
3403 
3404  if (col.col_type.type == TDatumType::DECIMAL) {
3405  // if no precision or scale passed in set to default 14,7
3406  if (col.col_type.precision == 0 && col.col_type.scale == 0) {
3407  col.col_type.precision = 14;
3408  col.col_type.scale = 7;
3409  }
3410  }
3411 
3412  std::string col_stmt;
3413  col_stmt.append(col.col_name + " " + thrift_to_name(col.col_type));
3414 
3415  // As of 2016-06-27 the Immerse v1 frontend does not explicitly set the
3416  // `nullable` argument, leading this to default to false. Uncomment for v2.
3417  // if (!col.col_type.nullable) col_stmt.append(" NOT NULL");
3418 
3419  if (thrift_to_encoding(col.col_type.encoding) != kENCODING_NONE) {
3420  col_stmt.append(" ENCODING " + thrift_to_encoding_name(col.col_type));
3421  if (thrift_to_encoding(col.col_type.encoding) == kENCODING_DICT ||
3422  thrift_to_encoding(col.col_type.encoding) == kENCODING_FIXED ||
3423  thrift_to_encoding(col.col_type.encoding) == kENCODING_GEOINT) {
3424  col_stmt.append("(" + std::to_string(col.col_type.comp_param) + ")");
3425  }
3426  } else if (col.col_type.type == TDatumType::STR) {
3427  // non DICT encoded strings
3428  col_stmt.append(" ENCODING NONE");
3429  } else if (col.col_type.type == TDatumType::POINT ||
3430  col.col_type.type == TDatumType::LINESTRING ||
3431  col.col_type.type == TDatumType::POLYGON ||
3432  col.col_type.type == TDatumType::MULTIPOLYGON) {
3433  // non encoded compressable geo
3434  if (col.col_type.scale == 4326) {
3435  col_stmt.append(" ENCODING NONE");
3436  }
3437  }
3438  col_stmts.push_back(col_stmt);
3439  }
3440 
3441  stmt.append(" (" + boost::algorithm::join(col_stmts, ", ") + ")");
3442 
3443  if (create_params.is_replicated) {
3444  stmt.append(" WITH (PARTITIONS = 'REPLICATED')");
3445  }
3446 
3447  stmt.append(";");
3448 
3449  TQueryResult ret;
3450  sql_execute(ret, session, stmt, true, "", -1, -1);
3451 }
3452 
3453 void MapDHandler::import_table(const TSessionId& session,
3454  const std::string& table_name,
3455  const std::string& file_name_in,
3456  const TCopyParams& cp) {
3457  LOG_SESSION(session, "table_name", table_name);
3458  check_read_only("import_table");
3459  LOG(INFO) << "import_table " << table_name << " from " << file_name_in;
3460  const auto session_info = get_session_copy(session);
3461  auto& cat = session_info.getCatalog();
3462 
3463  const TableDescriptor* td = cat.getMetadataForTable(table_name);
3464  if (td == nullptr) {
3465  THROW_MAPD_EXCEPTION("Table " + table_name + " does not exist.");
3466  }
3467  check_table_load_privileges(session_info, table_name);
3468 
3469  std::string file_name{file_name_in};
3470  auto file_path = boost::filesystem::path(file_name);
3472  if (!boost::istarts_with(file_name, "s3://")) {
3473  if (!boost::filesystem::path(file_name).is_absolute()) {
3474  file_path = import_path_ / picosha2::hash256_hex_string(session) /
3475  boost::filesystem::path(file_name).filename();
3476  file_name = file_path.string();
3477  }
3478  if (!boost::filesystem::exists(file_path)) {
3479  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.string());
3480  }
3481  }
3482 
3483  // TODO(andrew): add delimiter detection to Importer
3484  if (copy_params.delimiter == '\0') {
3485  copy_params.delimiter = ',';
3486  if (boost::filesystem::extension(file_path) == ".tsv") {
3487  copy_params.delimiter = '\t';
3488  }
3489  }
3490 
3491  try {
3492  std::unique_ptr<Importer_NS::Importer> importer;
3493  if (leaf_aggregator_.leafCount() > 0) {
3494  importer.reset(new Importer_NS::Importer(
3495  new DistributedLoader(session_info, td, &leaf_aggregator_),
3496  file_path.string(),
3497  copy_params));
3498  } else {
3499  importer.reset(new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
3500  }
3501  auto ms = measure<>::execution([&]() { importer->import(); });
3502  std::cout << "Total Import Time: " << (double)ms / 1000.0 << " Seconds." << std::endl;
3503  } catch (const std::exception& e) {
3504  THROW_MAPD_EXCEPTION("Exception: " + std::string(e.what()));
3505  }
3506 }
3507 
3508 namespace {
3509 
3510 // helper functions for error checking below
3511 // these would usefully be added as methods of TDatumType
3512 // but that's not possible as it's auto-generated by Thrift
3513 
3515  return (t == TDatumType::POLYGON || t == TDatumType::MULTIPOLYGON ||
3516  t == TDatumType::LINESTRING || t == TDatumType::POINT);
3517 }
3518 
3519 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3520 
3521 std::string TTypeInfo_TypeToString(const TDatumType::type& t) {
3522  std::stringstream ss;
3523  ss << t;
3524  return ss.str();
3525 }
3526 
3527 std::string TTypeInfo_GeoSubTypeToString(const int32_t p) {
3528  std::string result;
3529  switch (p) {
3530  case SQLTypes::kGEOGRAPHY:
3531  result = "GEOGRAPHY";
3532  break;
3533  case SQLTypes::kGEOMETRY:
3534  result = "GEOMETRY";
3535  break;
3536  default:
3537  result = "INVALID";
3538  break;
3539  }
3540  return result;
3541 }
3542 
3543 std::string TTypeInfo_EncodingToString(const TEncodingType::type& t) {
3544  std::stringstream ss;
3545  ss << t;
3546  return ss.str();
3547 }
3548 
3549 #endif
3550 
3551 } // namespace
3552 
3553 #define THROW_COLUMN_ATTR_MISMATCH_EXCEPTION(attr, got, expected) \
3554  THROW_MAPD_EXCEPTION("Could not append geo file '" + file_path.filename().string() + \
3555  "' to table '" + table_name + "'. Column '" + cd->columnName + \
3556  "' " + attr + " mismatch (got '" + got + "', expected '" + \
3557  expected + "')");
3558 
3560  const std::string& table_name,
3561  const std::string& file_name_in,
3562  const TCopyParams& cp,
3563  const TRowDescriptor& row_desc,
3564  const TCreateParams& create_params) {
3565  LOG_SESSION(session, "table_name", table_name);
3566  check_read_only("import_table");
3567  const auto session_info = get_session_copy(session);
3568  auto& cat = session_info.getCatalog();
3569 
3571 
3572  std::string file_name{file_name_in};
3573 
3574  if (path_is_relative(file_name)) {
3575  // assume relative paths are relative to data_path / mapd_import / <session>
3576  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
3577  boost::filesystem::path(file_name).filename();
3578  file_name = file_path.string();
3579  }
3580 
3581  if (is_a_supported_geo_file(file_name, true)) {
3582  // prepare to load geo file directly
3583  add_vsi_network_prefix(file_name);
3584  add_vsi_geo_prefix(file_name);
3585  } else if (is_a_supported_archive_file(file_name)) {
3586  // find the archive file
3587  add_vsi_network_prefix(file_name);
3588  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
3589  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
3590  }
3591  // find geo file in archive
3592  add_vsi_archive_prefix(file_name);
3593  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
3594  // prepare to load that geo file
3595  if (geo_file.size()) {
3596  file_name = file_name + std::string("/") + geo_file;
3597  }
3598  } else {
3599  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
3600  file_name_in);
3601  }
3602 
3603  // log what we're about to try to do
3604  LOG(INFO) << "import_geo_table: Original filename: " << file_name_in;
3605  LOG(INFO) << "import_geo_table: Actual filename: " << file_name;
3606 
3607  // use GDAL to check the primary file exists (even if on S3 and/or in archive)
3608  auto file_path = boost::filesystem::path(file_name);
3609  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
3610  THROW_MAPD_EXCEPTION("File does not exist: " + file_path.filename().string());
3611  }
3612 
3613  // use GDAL to check any dependent files exist (ditto)
3614  try {
3615  check_geospatial_files(file_path, copy_params);
3616  } catch (const std::exception& e) {
3617  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3618  }
3619 
3620  // get layer info and deconstruct
3621  // in general, we will get a combination of layers of these four types:
3622  // EMPTY: no rows, report and skip
3623  // GEO: create a geo table from this
3624  // NON_GEO: create a regular table from this
3625  // UNSUPPORTED_GEO: report and skip
3626  std::vector<Importer_NS::Importer::GeoFileLayerInfo> layer_info;
3627  try {
3628  layer_info = Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
3629  } catch (const std::exception& e) {
3630  THROW_MAPD_EXCEPTION("import_geo_table error: " + std::string(e.what()));
3631  }
3632 
3633  // categorize the results
3634  using LayerNameToContentsMap =
3635  std::map<std::string, Importer_NS::Importer::GeoFileLayerContents>;
3636  LayerNameToContentsMap load_layers;
3637  LOG(INFO) << "import_geo_table: Found the following layers in the geo file:";
3638  for (const auto& layer : layer_info) {
3639  switch (layer.contents) {
3641  LOG(INFO) << "import_geo_table: '" << layer.name
3642  << "' (will import as geo table)";
3643  load_layers[layer.name] = layer.contents;
3644  break;
3646  LOG(INFO) << "import_geo_table: '" << layer.name
3647  << "' (will import as regular table)";
3648  load_layers[layer.name] = layer.contents;
3649  break;
3651  LOG(WARNING) << "import_geo_table: '" << layer.name
3652  << "' (will not import, unsupported geo type)";
3653  break;
3655  LOG(INFO) << "import_geo_table: '" << layer.name << "' (ignoring, empty)";
3656  break;
3657  default:
3658  break;
3659  }
3660  }
3661 
3662  // if nothing is loadable, stop now
3663  if (load_layers.size() == 0) {
3664  THROW_MAPD_EXCEPTION("import_geo_table: No loadable layers found, aborting!");
3665  }
3666 
3667  // if we've been given an explicit layer name, check that it exists and is loadable
3668  // scan the original list, as it may exist but not have been gathered as loadable
3669  if (copy_params.geo_layer_name.size()) {
3670  bool found = false;
3671  for (const auto& layer : layer_info) {
3672  if (copy_params.geo_layer_name == layer.name) {
3673  if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::GEO ||
3675  // forget all the other layers and just load this one
3676  load_layers.clear();
3677  load_layers[layer.name] = layer.contents;
3678  found = true;
3679  break;
3680  } else if (layer.contents ==
3682  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3683  copy_params.geo_layer_name +
3684  "' has unsupported geo type!");
3685  } else if (layer.contents == Importer_NS::Importer::GeoFileLayerContents::EMPTY) {
3686  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3687  copy_params.geo_layer_name + "' is empty!");
3688  }
3689  }
3690  }
3691  if (!found) {
3692  THROW_MAPD_EXCEPTION("import_geo_table: Explicit geo layer '" +
3693  copy_params.geo_layer_name + "' not found!");
3694  }
3695  }
3696 
3697  // Immerse import of multiple layers is not yet supported
3698  // @TODO fix this!
3699  if (row_desc.size() > 0 && load_layers.size() > 1) {
3701  "import_geo_table: Multi-layer geo import not yet supported from Immerse!");
3702  }
3703 
3704  // one definition of layer table name construction
3705  // we append the layer name if we're loading more than one table
3706  auto construct_layer_table_name = [&load_layers](const std::string& table_name,
3707  const std::string& layer_name) {
3708  if (load_layers.size() > 1) {
3709  auto sanitized_layer_name = ImportHelpers::sanitize_name(layer_name);
3710  if (sanitized_layer_name != layer_name) {
3711  LOG(INFO) << "import_geo_table: Using sanitized layer name '"
3712  << sanitized_layer_name << "' for table name";
3713  }
3714  return table_name + "_" + sanitized_layer_name;
3715  }
3716  return table_name;
3717  };
3718 
3719  // if we're importing multiple tables, then NONE of them must exist already
3720  if (load_layers.size() > 1) {
3721  for (const auto& layer : load_layers) {
3722  // construct table name
3723  auto this_table_name = construct_layer_table_name(table_name, layer.first);
3724 
3725  // table must not exist
3726  if (cat.getMetadataForTable(this_table_name)) {
3727  THROW_MAPD_EXCEPTION("import_geo_table: Table '" + this_table_name +
3728  "' already exists, aborting!");
3729  }
3730  }
3731  }
3732 
3733  // prepare to gather errors that would otherwise be exceptions, as we can only throw one
3734  std::vector<std::string> caught_exception_messages;
3735 
3736  // prepare to time multi-layer import
3737  double total_import_ms = 0.0;
3738 
3739  // now we're safe to start importing
3740  // we loop over the layers we're going to attempt to load
3741  for (const auto& layer : load_layers) {
3742  // unpack
3743  const auto& layer_name = layer.first;
3744  const auto& layer_contents = layer.second;
3745  bool is_geo_layer =
3747 
3748  // construct table name again
3749  auto this_table_name = construct_layer_table_name(table_name, layer_name);
3750 
3751  // report
3752  LOG(INFO) << "import_geo_table: Creating table: " << this_table_name;
3753 
3754  // we need a row descriptor
3755  TRowDescriptor rd;
3756  if (row_desc.size() > 0) {
3757  // we have a valid RowDescriptor
3758  // this is the case where Immerse has already detected and created
3759  // all we need to do is import and trust that the data will match
3760  // use the provided row descriptor
3761  // table must already exist (we check this below)
3762  rd = row_desc;
3763  } else {
3764  // we don't have a RowDescriptor
3765  // we have to detect the file ourselves
3766  TDetectResult cds;
3767  TCopyParams cp_copy = cp; // retain S3 auth tokens
3768  cp_copy.geo_layer_name = layer_name;
3769  cp_copy.file_type = TFileType::POLYGON;
3770  try {
3771  detect_column_types(cds, session, file_name_in, cp_copy);
3772  } catch (const std::exception& e) {
3773  // capture the error and abort this layer
3774  caught_exception_messages.emplace_back(
3775  "Invalid/Unsupported Column Types in Layer '" + layer_name + "':" + e.what());
3776  continue;
3777  }
3778  rd = cds.row_set.row_desc;
3779 
3780  // then, if the table does NOT already exist, create it
3781  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
3782  if (!td) {
3783  try {
3784  create_table(session, this_table_name, rd, TFileType::POLYGON, create_params);
3785  } catch (const std::exception& e) {
3786  // capture the error and abort this layer
3787  caught_exception_messages.emplace_back("Failed to create table for Layer '" +
3788  layer_name + "':" + e.what());
3789  continue;
3790  }
3791  }
3792  }
3793 
3794  // by this point, the table should exist, one way or another
3795  const TableDescriptor* td = cat.getMetadataForTable(this_table_name);
3796  if (!td) {
3797  // capture the error and abort this layer
3798  std::string exception_message =
3799  "Could not import geo file '" + file_path.filename().string() + "' to table '" +
3800  this_table_name + "'; table does not exist or failed to create.";
3801  caught_exception_messages.emplace_back(exception_message);
3802  continue;
3803  }
3804 
3805  // then, we have to verify that the structure matches
3806  // get column descriptors (non-system, non-deleted, logical columns only)
3807  const auto col_descriptors =
3808  cat.getAllColumnMetadataForTable(td->tableId, false, false, false);
3809 
3810  // first, compare the column count
3811  if (col_descriptors.size() != rd.size()) {
3812  // capture the error and abort this layer
3813  std::string exception_message =
3814  "Could not append geo file '" + file_path.filename().string() + "' to table '" +
3815  this_table_name + "'. Column count mismatch (got " + std::to_string(rd.size()) +
3816  ", expecting " + std::to_string(col_descriptors.size()) + ")";
3817  caught_exception_messages.emplace_back(exception_message);
3818  continue;
3819  }
3820 
3821  try {
3822  // then the names and types
3823  int rd_index = 0;
3824  for (auto cd : col_descriptors) {
3825  TColumnType cd_col_type = populateThriftColumnType(&cat, cd);
3826  std::string gname = rd[rd_index].col_name; // got
3827  std::string ename = cd->columnName; // expecting
3828 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3829  TTypeInfo gti = rd[rd_index].col_type; // got
3830 #endif
3831  TTypeInfo eti = cd_col_type.col_type; // expecting
3832  // check for name match
3833  if (gname != ename) {
3834  if (TTypeInfo_IsGeo(eti.type) && ename == LEGACY_GEO_PREFIX &&
3835  gname == OMNISCI_GEO_PREFIX) {
3836  // rename incoming geo column to match existing legacy default geo column
3837  rd[rd_index].col_name = gname;
3838  LOG(INFO)
3839  << "import_geo_table: Renaming incoming geo column to match existing "
3840  "legacy default geo column";
3841  } else {
3842  THROW_COLUMN_ATTR_MISMATCH_EXCEPTION("name", gname, ename);
3843  }
3844  }
3845 #if ENABLE_GEO_IMPORT_COLUMN_MATCHING
3846  // check for type attributes match
3847  // these attrs must always match regardless of type
3848  if (gti.type != eti.type) {
3850  "type", TTypeInfo_TypeToString(gti.type), TTypeInfo_TypeToString(eti.type));
3851  }
3852  if (gti.is_array != eti.is_array) {
3854  "array-ness", std::to_string(gti.is_array), std::to_string(eti.is_array));
3855  }
3856  if (gti.nullable != eti.nullable) {
3858  "nullability", std::to_string(gti.nullable), std::to_string(eti.nullable));
3859  }
3860  if (TTypeInfo_IsGeo(eti.type)) {
3861  // for geo, only these other attrs must also match
3862  // encoding and comp_param are allowed to differ
3863  // this allows appending to existing geo table
3864  // without needing to know the existing encoding
3865  if (gti.precision != eti.precision) {
3867  "geo sub-type",
3868  TTypeInfo_GeoSubTypeToString(gti.precision),
3869  TTypeInfo_GeoSubTypeToString(eti.precision));
3870  }
3871  if (gti.scale != eti.scale) {
3873  "SRID", std::to_string(gti.scale), std::to_string(eti.scale));
3874  }
3875  if (gti.encoding != eti.encoding) {
3876  LOG(INFO) << "import_geo_table: Ignoring geo encoding mismatch";
3877  }
3878  if (gti.comp_param != eti.comp_param) {
3879  LOG(INFO) << "import_geo_table: Ignoring geo comp_param mismatch";
3880  }
3881  } else {
3882  // non-geo, all other attrs must also match
3883  // @TODO consider relaxing some of these dependent on type
3884  // e.g. DECIMAL precision/scale, TEXT dict or non-dict, INTEGER up-sizing
3885  if (gti.precision != eti.precision) {
3887  std::to_string(gti.precision),
3888  std::to_string(eti.precision));
3889  }
3890  if (gti.scale != eti.scale) {
3892  "scale", std::to_string(gti.scale), std::to_string(eti.scale));
3893  }
3894  if (gti.encoding != eti.encoding) {
3896  "encoding",
3897  TTypeInfo_EncodingToString(gti.encoding),
3898  TTypeInfo_EncodingToString(eti.encoding));
3899  }
3900  if (gti.comp_param != eti.comp_param) {
3902  std::to_string(gti.comp_param),
3903  std::to_string(eti.comp_param));
3904  }
3905  }
3906 #endif
3907  rd_index++;
3908  }
3909  } catch (const std::exception& e) {
3910  // capture the error and abort this layer
3911  caught_exception_messages.emplace_back(e.what());
3912  continue;
3913  }
3914 
3915  std::map<std::string, std::string> colname_to_src;
3916  for (auto r : rd) {
3917  colname_to_src[r.col_name] =
3918  r.src_name.length() > 0 ? r.src_name : ImportHelpers::sanitize_name(r.src_name);
3919  }
3920 
3921  try {
3922  check_table_load_privileges(session_info, this_table_name);
3923  } catch (const std::exception& e) {
3924  // capture the error and abort this layer
3925  caught_exception_messages.emplace_back(e.what());
3926  continue;
3927  }
3928 
3929  if (is_geo_layer) {
3930  // Final check to ensure that we actually have a geo column
3931  // of the expected name and type before doing the actual import,
3932  // in case the user naively overrode the name or type in Immerse
3933  // Preview (which as of 6/8/18 it still allows you to do).
3934  // This avoids a fatal assert later when it fails to find the
3935  // column. We should make Immerse more robust and disallow this.
3936  bool have_geo_column_with_correct_name = false;
3937  for (const auto& r : rd) {
3938  if (TTypeInfo_IsGeo(r.col_type.type)) {
3939  // TODO(team): allow user to override the geo column name
3940  if (r.col_name == OMNISCI_GEO_PREFIX) {
3941  have_geo_column_with_correct_name = true;
3942  } else if (r.col_name == LEGACY_GEO_PREFIX) {
3943  CHECK(colname_to_src.find(r.col_name) != colname_to_src.end());
3944  // Normalize column names for geo append with legacy column naming scheme
3945  colname_to_src[r.col_name] = r.col_name;
3946  have_geo_column_with_correct_name = true;
3947  }
3948  }
3949  }
3950  if (!have_geo_column_with_correct_name) {
3951  std::string exception_message = "Table " + this_table_name +
3952  " does not have a geo column with name '" +
3953  OMNISCI_GEO_PREFIX + "'. Import aborted!";
3954  caught_exception_messages.emplace_back(exception_message);
3955  continue;
3956  }
3957  }
3958 
3959  try {
3960  // import this layer only?
3961  copy_params.geo_layer_name = layer_name;
3962 
3963  // create an importer
3964  std::unique_ptr<Importer_NS::Importer> importer;
3965  if (leaf_aggregator_.leafCount() > 0) {
3966  importer.reset(new Importer_NS::Importer(
3967  new DistributedLoader(session_info, td, &leaf_aggregator_),
3968  file_path.string(),
3969  copy_params));
3970  } else {
3971  importer.reset(
3972  new Importer_NS::Importer(cat, td, file_path.string(), copy_params));
3973  }
3974 
3975  // import
3976  auto ms = measure<>::execution([&]() { importer->importGDAL(colname_to_src); });
3977  LOG(INFO) << "Import of Layer '" << layer_name << "' took " << (double)ms / 1000.0
3978  << "s";
3979  total_import_ms += ms;
3980  } catch (const std::exception& e) {
3981  std::string exception_message =
3982  "Import of Layer '" + this_table_name + "' failed: " + e.what();
3983  caught_exception_messages.emplace_back(exception_message);
3984  continue;
3985  }
3986  }
3987 
3988  // did we catch any exceptions?
3989  if (caught_exception_messages.size()) {
3990  // combine all the strings into one and throw a single Thrift exception
3991  std::string combined_exception_message = "Failed to import geo file:\n";
3992  for (const auto& message : caught_exception_messages) {
3993  combined_exception_message += message + "\n";
3994  }
3995  THROW_MAPD_EXCEPTION(combined_exception_message);
3996  } else {
3997  // report success and total time
3998  LOG(INFO) << "Import Successful!";
3999  LOG(INFO) << "Total Import Time: " << total_import_ms / 1000.0 << "s";
4000  }
4001 }
4002 
4003 #undef THROW_COLUMN_ATTR_MISMATCH_EXCEPTION
4004 
4005 void MapDHandler::import_table_status(TImportStatus& _return,
4006  const TSessionId& session,
4007  const std::string& import_id) {
4008  LOG_SESSION(session, "import_table_status", import_id);
4009  auto is = Importer_NS::Importer::get_import_status(import_id);
4010  _return.elapsed = is.elapsed.count();
4011  _return.rows_completed = is.rows_completed;
4012  _return.rows_estimated = is.rows_estimated;
4013  _return.rows_rejected = is.rows_rejected;
4014 }
4015 
4017  const TSessionId& session,
4018  const std::string& archive_path_in,
4019  const TCopyParams& copy_params) {
4020  LOG_SESSION(session, "get_first_geo_file_in_archive", archive_path_in);
4021  std::string archive_path(archive_path_in);
4022 
4023  if (path_is_relative(archive_path)) {
4024  // assume relative paths are relative to data_path / mapd_import / <session>
4025  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4026  boost::filesystem::path(archive_path).filename();
4027  archive_path = file_path.string();
4028  }
4029 
4030  if (is_a_supported_archive_file(archive_path)) {
4031  // find the archive file
4032  add_vsi_network_prefix(archive_path);
4033  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4034  thrift_to_copyparams(copy_params))) {
4035  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4036  }
4037  // find geo file in archive
4038  add_vsi_archive_prefix(archive_path);
4039  std::string geo_file =
4040  find_first_geo_file_in_archive(archive_path, thrift_to_copyparams(copy_params));
4041  // what did we get?
4042  if (geo_file.size()) {
4043  // prepend it with the original path
4044  _return = archive_path_in + std::string("/") + geo_file;
4045  } else {
4046  // just return the original path
4047  _return = archive_path_in;
4048  }
4049  } else {
4050  // just return the original path
4051  _return = archive_path_in;
4052  }
4053 }
4054 
4055 void MapDHandler::get_all_files_in_archive(std::vector<std::string>& _return,
4056  const TSessionId& session,
4057  const std::string& archive_path_in,
4058  const TCopyParams& copy_params) {
4059  LOG_SESSION(session, "get_all_files_in_archive", archive_path_in);
4060  std::string archive_path(archive_path_in);
4061 
4062  if (path_is_relative(archive_path)) {
4063  // assume relative paths are relative to data_path / mapd_import / <session>
4064  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4065  boost::filesystem::path(archive_path).filename();
4066  archive_path = file_path.string();
4067  }
4068 
4069  if (is_a_supported_archive_file(archive_path)) {
4070  // find the archive file
4071  add_vsi_network_prefix(archive_path);
4072  if (!Importer_NS::Importer::gdalFileExists(archive_path,
4073  thrift_to_copyparams(copy_params))) {
4074  THROW_MAPD_EXCEPTION("Archive does not exist: " + archive_path_in);
4075  }
4076  // find all files in archive
4077  add_vsi_archive_prefix(archive_path);
4079  archive_path, thrift_to_copyparams(copy_params));
4080  // prepend them all with original path
4081  for (auto& s : _return) {
4082  s = archive_path_in + std::string("/") + s;
4083  }
4084  }
4085 }
4086 
4087 void MapDHandler::get_layers_in_geo_file(std::vector<TGeoFileLayerInfo>& _return,
4088  const TSessionId& session,
4089  const std::string& file_name_in,
4090  const TCopyParams& cp) {
4091  LOG_SESSION(session, "get_layers_in_geo_file", file_name_in);
4092  std::string file_name(file_name_in);
4093 
4095 
4096  // handle relative paths
4097  if (path_is_relative(file_name)) {
4098  // assume relative paths are relative to data_path / mapd_import / <session>
4099  auto file_path = import_path_ / picosha2::hash256_hex_string(session) /
4100  boost::filesystem::path(file_name).filename();
4101  file_name = file_path.string();
4102  }
4103 
4104  // validate file_name
4105  if (is_a_supported_geo_file(file_name, true)) {
4106  // prepare to load geo file directly
4107  add_vsi_network_prefix(file_name);
4108  add_vsi_geo_prefix(file_name);
4109  } else if (is_a_supported_archive_file(file_name)) {
4110  // find the archive file
4111  add_vsi_network_prefix(file_name);
4112  if (!Importer_NS::Importer::gdalFileExists(file_name, copy_params)) {
4113  THROW_MAPD_EXCEPTION("Archive does not exist: " + file_name_in);
4114  }
4115  // find geo file in archive
4116  add_vsi_archive_prefix(file_name);
4117  std::string geo_file = find_first_geo_file_in_archive(file_name, copy_params);
4118  // prepare to load that geo file
4119  if (geo_file.size()) {
4120  file_name = file_name + std::string("/") + geo_file;
4121  }
4122  } else {
4123  THROW_MAPD_EXCEPTION("File is not a supported geo or geo archive file: " +
4124  file_name_in);
4125  }
4126 
4127  // check the file actually exists
4128  if (!Importer_NS::Importer::gdalFileOrDirectoryExists(file_name, copy_params)) {
4129  THROW_MAPD_EXCEPTION("Geo file/archive does not exist: " + file_name_in);
4130  }
4131 
4132  // find all layers
4133  auto internal_layer_info =
4134  Importer_NS::Importer::gdalGetLayersInGeoFile(file_name, copy_params);
4135 
4136  // convert to Thrift type
4137  for (const auto& internal_layer : internal_layer_info) {
4138  TGeoFileLayerInfo layer;
4139  layer.name = internal_layer.name;
4140  switch (internal_layer.contents) {
4142  layer.contents = TGeoFileLayerContents::EMPTY;
4143  break;
4145  layer.contents = TGeoFileLayerContents::GEO;
4146  break;
4148  layer.contents = TGeoFileLayerContents::NON_GEO;
4149  break;
4151  layer.contents = TGeoFileLayerContents::UNSUPPORTED_GEO;
4152  break;
4153  default:
4154  CHECK(false);
4155  }
4156  _return.emplace_back(layer); // no suitable constructor to just pass parameters
4157  }
4158 }
4159 
4160 void MapDHandler::start_heap_profile(const TSessionId& session) {
4161  LOG_SESSION(session);
4162  const auto session_info = get_session_copy(session);
4163 #ifdef HAVE_PROFILER
4164  if (IsHeapProfilerRunning()) {
4165  THROW_MAPD_EXCEPTION("Profiler already started");
4166  }
4167  HeapProfilerStart("omnisci");
4168 #else
4169  THROW_MAPD_EXCEPTION("Profiler not enabled");
4170 #endif // HAVE_PROFILER
4171 }
4172 
4173 void MapDHandler::stop_heap_profile(const TSessionId& session) {
4174  LOG_SESSION(session);
4175  const auto session_info = get_session_copy(session);
4176 #ifdef HAVE_PROFILER
4177  if (!IsHeapProfilerRunning()) {
4178  THROW_MAPD_EXCEPTION("Profiler not running");
4179  }
4180  HeapProfilerStop();
4181 #else
4182  THROW_MAPD_EXCEPTION("Profiler not enabled");
4183 #endif // HAVE_PROFILER
4184 }
4185 
4186 void MapDHandler::get_heap_profile(std::string& profile, const TSessionId& session) {
4187  LOG_SESSION(session);
4188  const auto session_info = get_session_copy(session);
4189 #ifdef HAVE_PROFILER
4190  if (!IsHeapProfilerRunning()) {
4191  THROW_MAPD_EXCEPTION("Profiler not running");
4192  }
4193  auto profile_buff = GetHeapProfile();
4194  profile = profile_buff;
4195  free(profile_buff);
4196 #else
4197  THROW_MAPD_EXCEPTION("Profiler not enabled");
4198 #endif // HAVE_PROFILER
4199 }
4200 
4201 // NOTE: Only call check_session_exp_unsafe() when you hold a lock on sessions_mutex_.
4202 void MapDHandler::check_session_exp_unsafe(const SessionMap::iterator& session_it) {
4203  if (session_it->second.use_count() > 2) {
4204  // SessionInfo is being used in more than one active operation. Original copy + one
4205  // stored in LogSession. Skip the checks.
4206  return;
4207  }
4208  time_t last_used_time = session_it->second->get_last_used_time();
4209  time_t start_time = session_it->second->get_start_time();
4210  if ((time(0) - last_used_time) > idle_session_duration_) {
4212  session_it); // Already checked session existance in get_session_it_unsafe
4213  THROW_MAPD_EXCEPTION("Idle Session Timeout. User should re-authenticate.")
4214  } else if ((time(0) - start_time) > max_session_duration_) {
4216  session_it); // Already checked session existance in get_session_it_unsafe
4217  THROW_MAPD_EXCEPTION("Maximum active Session Timeout. User should re-authenticate.")
4218  }
4219 }
4220 
4221 // NOTE: Only call get_session_it_unsafe() while holding a lock on sessions_mutex_.
4222 SessionMap::iterator MapDHandler::get_session_it_unsafe(const TSessionId& session) {
4223  SessionMap::iterator session_it;
4224  const auto calcite_session_prefix = calcite_->get_session_prefix();
4225  const auto prefix_length = calcite_session_prefix.size();
4226  if (prefix_length && 0 == session.compare(0, prefix_length, calcite_session_prefix)) {
4227  session_it = get_session_from_map(session.substr(prefix_length + 1), sessions_);
4228  check_session_exp_unsafe(session_it);
4229  session_it->second->make_superuser();
4230  } else {
4231  session_it = get_session_from_map(session, sessions_);
4232  check_session_exp_unsafe(session_it);
4233  session_it->second->reset_superuser();
4234  }
4235  return session_it;
4236 }
4237 
4238 std::shared_ptr<const Catalog_Namespace::SessionInfo> MapDHandler::get_const_session_ptr(
4239  const TSessionId& session) {
4240  if (session.empty()) {
4241  return {};
4242  }
4243  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4244  return get_session_it_unsafe(session)->second;
4245 }
4246 
4248  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4249  return *get_session_it_unsafe(session)->second;
4250 }
4251 
4252 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_copy_ptr(
4253  const TSessionId& session) {
4254  // Note(Wamsi): We have `get_const_session_ptr` which would return as const SessionInfo
4255  // stored in the map. You can use `get_const_session_ptr` instead of the copy of
4256  // SessionInfo but beware that it can be changed in teh map. So if you do not care about
4257  // the changes then use `get_const_session_ptr` if you do then use this function to get
4258  // a copy. We should eventually aim to merge both `get_const_session_ptr` and
4259  // `get_session_copy_ptr`.
4260  mapd_shared_lock<mapd_shared_mutex> read_lock(sessions_mutex_);
4261  auto& session_info_ref = *get_session_it_unsafe(session)->second;
4262  return std::make_shared<Catalog_Namespace::SessionInfo>(session_info_ref);
4263 }
4264 
4265 std::shared_ptr<Catalog_Namespace::SessionInfo> MapDHandler::get_session_ptr(
4266  const TSessionId& session_id) {
4267  // Note(Wamsi): This method will give you a shared_ptr to master SessionInfo itself.
4268  // Should be used only when you need to make updates to original SessionInfo object.
4269  // Currently used by `update_session_last_used_duration`
4270 
4271  // 1) `session_id` will be empty during intial connect. 2)`sessionmapd iterator` will be
4272  // invalid during disconnect. SessionInfo will be erased from map by the time it reaches
4273  // here. In both the above cases, we would return `nullptr` and can skip SessionInfo
4274  // updates.
4275  if (!session_id.empty()) {
4276  try {
4277  mapd_lock_guard<mapd_shared_mutex> write_lock(sessions_mutex_);
4278  return get_session_it_unsafe(session_id)->second;
4279  } catch (TMapDException&) {
4280  return nullptr;
4281  }
4282  }
4283  return nullptr;
4284 }
4285 
4287  const Catalog_Namespace::SessionInfo& session_info,
4288  const std::string& table_name) {
4289  auto user_metadata = session_info.get_currentUser();
4290  auto& cat = session_info.getCatalog();
4291  DBObject dbObject(table_name, TableDBObjectType);
4292  dbObject.loadKey(cat);
4294  std::vector<DBObject> privObjects;
4295  privObjects.push_back(dbObject);
4296  if (!SysCatalog::instance().checkPrivileges(user_metadata, privObjects)) {
4297  THROW_MAPD_EXCEPTION("Violation of access privileges: user " +
4298  user_metadata.userName + " has no insert privileges for table " +
4299  table_name + ".");
4300  }
4301 }
4302 
4304  const std::string& table_name) {
4305  const auto session_info = get_session_copy(session);
4306  check_table_load_privileges(session_info, table_name);
4307 }
4308 
4310  const TExecuteMode::type mode) {
4311  const std::string& user_name = session_ptr->get_currentUser().userName;
4312  switch (mode) {
4313  case TExecuteMode::GPU:
4314  if (cpu_mode_only_) {
4315  TMapDException e;
4316  e.error_msg = "Cannot switch to GPU mode in a server started in CPU-only mode.";
4317  throw e;
4318  }
4320  LOG(INFO) << "User " << user_name << " sets GPU mode.";
4321  break;
4322  case TExecuteMode::CPU:
4324  LOG(INFO) << "User " << user_name << " sets CPU mode.";
4325  break;
4326  }
4327 }
4328 
4329 std::vector<PushedDownFilterInfo> MapDHandler::execute_rel_alg(
4330  TQueryResult& _return,
4331  const std::string& query_ra,
4332  const bool column_format,
4333  const Catalog_Namespace::SessionInfo& session_info,
4334  const ExecutorDeviceType executor_device_type,
4335  const int32_t first_n,
4336  const int32_t at_most_n,
4337  const bool just_explain,
4338  const bool just_validate,
4339  const bool find_push_down_candidates,
4340  const bool just_calcite_explain,
4341  const bool explain_optimized_ir) const {
4343  const auto& cat = session_info.getCatalog();
4344  CompilationOptions co = {executor_device_type,
4345  true,
4348  explain_optimized_ir ? ExecutorExplainType::Optimized
4353  just_explain,
4354  allow_loop_joins_ || just_validate,
4356  jit_debug_,
4357  just_validate,
4360  find_push_down_candidates,
4361  just_calcite_explain,
4363  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4364  jit_debug_ ? "/tmp" : "",
4365  jit_debug_ ? "mapdquery" : "",
4367  nullptr);
4368  RelAlgExecutor ra_executor(executor.get(), cat);
4369  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
4372  nullptr,
4373  nullptr),
4374  {}};
4375  _return.execution_time_ms += measure<>::execution(
4376  [&]() { result = ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr); });
4377  // reduce execution time by the time spent during queue waiting
4378  _return.execution_time_ms -= result.getRows()->getQueueTime();
4379  const auto& filter_push_down_info = result.getPushedDownFilterInfo();
4380  if (!filter_push_down_info.empty()) {
4381  return filter_push_down_info;
4382  }
4383  if (just_explain) {
4384  convert_explain(_return, *result.getRows(), column_format);
4385  } else if (!just_calcite_explain) {
4386  convert_rows(_return,
4387  result.getTargetsMeta(),
4388  *result.getRows(),
4389  column_format,
4390  first_n,
4391  at_most_n);
4392  }
4393  return {};
4394 }
4395 
4396 void MapDHandler::execute_rel_alg_df(TDataFrame& _return,
4397  const std::string& query_ra,
4398  const Catalog_Namespace::SessionInfo& session_info,
4399  const ExecutorDeviceType device_type,
4400  const size_t device_id,
4401  const int32_t first_n) const {
4402  const auto& cat = session_info.getCatalog();
4403  CHECK(device_type == ExecutorDeviceType::CPU ||
4405  CompilationOptions co = {session_info.get_executor_device_type(),
4406  true,
4411  ExecutionOptions eo = {false,
4413  false,
4416  jit_debug_,
4417  false,
4420  false,
4421  false,
4423  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId,
4424  jit_debug_ ? "/tmp" : "",
4425  jit_debug_ ? "mapdquery" : "",
4427  nullptr);
4428  RelAlgExecutor ra_executor(executor.get(), cat);
4429  const auto result = ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr);
4430  const auto rs = result.getRows();
4431  const auto converter =
4432  std::make_unique<ArrowResultSetConverter>(rs,
4433  data_mgr_,
4434  device_type,
4435  device_id,
4436  getTargetNames(result.getTargetsMeta()),
4437  first_n);
4438  const auto copy = converter->getArrowResult();
4439  _return.sm_handle = std::string(copy.sm_handle.begin(), copy.sm_handle.end());
4440  _return.sm_size = copy.sm_size;
4441  _return.df_handle = std::string(copy.df_handle.begin(), copy.df_handle.end());
4442  if (device_type == ExecutorDeviceType::GPU) {
4443  std::lock_guard<std::mutex> map_lock(handle_to_dev_ptr_mutex_);
4444  CHECK(!ipc_handle_to_dev_ptr_.count(_return.df_handle));
4445  ipc_handle_to_dev_ptr_.insert(std::make_pair(_return.df_handle, copy.df_dev_ptr));
4446  }
4447  _return.df_size = copy.df_size;
4448 }
4449 
4451  const Planner::RootPlan* root_plan,
4452  const bool column_format,
4453  const Catalog_Namespace::SessionInfo& session_info,
4454  const ExecutorDeviceType executor_device_type,
4455  const int32_t first_n) const {
4456  auto executor = Executor::getExecutor(
4457  root_plan->getCatalog().getCurrentDB().dbId,
4458  jit_debug_ ? "/tmp" : "",
4459  jit_debug_ ? "mapdquery" : "",
4461  render_handler_ ? render_handler_->get_render_manager() : nullptr);
4462  std::shared_ptr<ResultSet> results;
4463  _return.execution_time_ms += measure<>::execution([&]() {
4464  results = executor->execute(root_plan,
4465  session_info,
4466  true,
4467  executor_device_type,
4471  });
4472  // reduce execution time by the time spent during queue waiting
4473  _return.execution_time_ms -= results->getQueueTime();
4474  if (root_plan->get_plan_dest() == Planner::RootPlan::Dest::kEXPLAIN) {
4475  convert_explain(_return, *results, column_format);
4476  return;
4477  }
4478  const auto plan = root_plan->get_plan();
4479  CHECK(plan);
4480  const auto& targets = plan->get_targetlist();
4481  convert_rows(_return, getTargetMetaInfo(targets), *results, column_format, -1, -1);
4482 }
4483 
4484 std::vector<TargetMetaInfo> MapDHandler::getTargetMetaInfo(
4485  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4486  std::vector<TargetMetaInfo> result;
4487  for (const auto target : targets) {
4488  CHECK(target);
4489  CHECK(target->get_expr());
4490  result.emplace_back(target->get_resname(), target->get_expr()->get_type_info());
4491  }
4492  return result;
4493 }
4494 
4495 std::vector<std::string> MapDHandler::getTargetNames(
4496  const std::vector<std::shared_ptr<Analyzer::TargetEntry>>& targets) const {
4497  std::vector<std::string> names;
4498  for (const auto target : targets) {
4499  CHECK(target);
4500  CHECK(target->get_expr());
4501  names.push_back(target->get_resname());
4502  }
4503  return names;
4504 }
4505 
4506 std::vector<std::string> MapDHandler::getTargetNames(
4507  const std::vector<TargetMetaInfo>& targets) const {
4508  std::vector<std::string> names;
4509  for (const auto target : targets) {
4510  names.push_back(target.get_resname());
4511  }
4512  return names;
4513 }
4514 
4516  const size_t idx) const {
4517  TColumnType proj_info;
4518  proj_info.col_name = target.get_resname();
4519  if (proj_info.col_name.empty()) {
4520  proj_info.col_name = "result_" + std::to_string(idx + 1);
4521  }
4522  const auto& target_ti = target.get_type_info();
4523  proj_info.col_type.type = type_to_thrift(target_ti);
4524  proj_info.col_type.encoding = encoding_to_thrift(target_ti);
4525  proj_info.col_type.nullable = !target_ti.get_notnull();
4526  proj_info.col_type.is_array = target_ti.get_type() == kARRAY;
4527  if (IS_GEO(target_ti.get_type())) {
4529  proj_info, target_ti.get_subtype(), target_ti.get_output_srid());
4530  } else {
4531  proj_info.col_type.precision = target_ti.get_precision();
4532  proj_info.col_type.scale = target_ti.get_scale();
4533  }
4534  if (target_ti.get_type() == kDATE) {
4535  proj_info.col_type.size = target_ti.get_size();
4536  }
4537  proj_info.col_type.comp_param =
4538  (target_ti.is_date_in_days() && target_ti.get_comp_param() == 0)
4539  ? 32
4540  : target_ti.get_comp_param();
4541  return proj_info;
4542 }
4543 
4545  const std::vector<TargetMetaInfo>& targets) const {
4546  TRowDescriptor row_desc;
4547  size_t i = 0;
4548  for (const auto target : targets) {
4549  row_desc.push_back(convert_target_metainfo(target, i));
4550  ++i;
4551  }
4552  return row_desc;
4553 }
4554 
4555 template <class R>
4557  const std::vector<TargetMetaInfo>& targets,
4558  const R& results,
4559  const bool column_format,
4560  const int32_t first_n,
4561  const int32_t at_most_n) const {
4563  _return.row_set.row_desc = convert_target_metainfo(targets);
4564  int32_t fetched{0};
4565  if (column_format) {
4566  _return.row_set.is_columnar = true;
4567  std::vector<TColumn> tcolumns(results.colCount());
4568  while (first_n == -1 || fetched < first_n) {
4569  const auto crt_row = results.getNextRow(true, true);
4570  if (crt_row.empty()) {
4571  break;
4572  }
4573  ++fetched;
4574  if (at_most_n >= 0 && fetched > at_most_n) {
4575  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4576  std::to_string(at_most_n));
4577  }
4578  for (size_t i = 0; i < results.colCount(); ++i) {
4579  const auto agg_result = crt_row[i];
4580  value_to_thrift_column(agg_result, targets[i].get_type_info(), tcolumns[i]);
4581  }
4582  }
4583  for (size_t i = 0; i < results.colCount(); ++i) {
4584  _return.row_set.columns.push_back(tcolumns[i]);
4585  }
4586  } else {
4587  _return.row_set.is_columnar = false;
4588  while (first_n == -1 || fetched < first_n) {
4589  const auto crt_row = results.getNextRow(true, true);
4590  if (crt_row.empty()) {
4591  break;
4592  }
4593  ++fetched;
4594  if (at_most_n >= 0 && fetched > at_most_n) {
4595  THROW_MAPD_EXCEPTION("The result contains more rows than the specified cap of " +
4596  std::to_string(at_most_n));
4597  }
4598  TRow trow;
4599  trow.cols.reserve(results.colCount());
4600  for (size_t i = 0; i < results.colCount(); ++i) {
4601  const auto agg_result = crt_row[i];
4602  trow.cols.push_back(value_to_thrift(agg_result, targets[i].get_type_info()));
4603  }
4604  _return.row_set.rows.push_back(trow);
4605  }
4606  }
4607 }
4608 
4609 TRowDescriptor MapDHandler::fixup_row_descriptor(const TRowDescriptor& row_desc,
4610  const Catalog& cat) {
4611  TRowDescriptor fixedup_row_desc;
4612  for (const TColumnType& col_desc : row_desc) {
4613  auto fixedup_col_desc = col_desc;
4614  if (col_desc.col_type.encoding == TEncodingType::DICT &&
4615  col_desc.col_type.comp_param > 0) {
4616  const auto dd = cat.getMetadataForDict(col_desc.col_type.comp_param, false);
4617  fixedup_col_desc.col_type.comp_param = dd->dictNBits;
4618  }
4619  fixedup_row_desc.push_back(fixedup_col_desc);
4620  }
4621  return fixedup_row_desc;
4622 }
4623 
4624 // create simple result set to return a single column result
4626  const ResultSet& results,
4627  const bool column_format,
4628  const std::string label) const {
4629  CHECK_EQ(size_t(1), results.rowCount());
4630  TColumnType proj_info;
4631  proj_info.col_name = label;
4632  proj_info.col_type.type = TDatumType::STR;
4633  proj_info.col_type.nullable = false;
4634  proj_info.col_type.is_array = false;
4635  _return.row_set.row_desc.push_back(proj_info);
4636  const auto crt_row = results.getNextRow(true, true);
4637  const auto tv = crt_row[0];
4638  CHECK(results.getNextRow(true, true).empty());
4639  const auto scalar_tv = boost::get<ScalarTargetValue>(&tv);
4640  CHECK(scalar_tv);
4641  const auto s_n = boost::get<NullableString>(scalar_tv);
4642  CHECK(s_n);
4643  const auto s = boost::get<std::string>(s_n);
4644  CHECK(s);
4645  if (column_format) {
4646  TColumn tcol;
4647  tcol.data.str_col.push_back(*s);
4648  tcol.nulls.push_back(false);
4649  _return.row_set.is_columnar = true;
4650  _return.row_set.columns.push_back(tcol);
4651  } else {
4652  TDatum explanation;
4653  explanation.val.str_val = *s;
4654  explanation.is_null = false;
4655  TRow trow;
4656  trow.cols.push_back(explanation);
4657  _return.row_set.is_columnar = false;
4658  _return.row_set.rows.push_back(trow);
4659  }
4660 }
4661 
4663  const ResultSet& results,
4664  const bool column_format) const {
4665  create_simple_result(_return, results, column_format, "Explanation");
4666 }
4667 
4669  const ResultSet& results,
4670  const bool column_format) const {
4671  create_simple_result(_return, results, column_format, "Result");
4672 }
4673 
4674 // this all should be moved out of here to catalog
4676  const Catalog_Namespace::SessionInfo& session_info,
4677  const TableDescriptor* td,
4678  const AccessPrivileges access_priv) {
4679  CHECK(td);
4680  auto& cat = session_info.getCatalog();
4681  std::vector<DBObject> privObjects;
4682  DBObject dbObject(td->tableName, TableDBObjectType);
4683  dbObject.loadKey(cat);
4684  dbObject.setPrivileges(access_priv);
4685  privObjects.push_back(dbObject);
4686  return SysCatalog::instance().checkPrivileges(session_info.get_currentUser(),
4687  privObjects);
4688 };
4689 
4691  const auto drop_db_stmt = dynamic_cast<Parser::DropDBStmt*>(ddl);
4692  if (drop_db_stmt) {
4693  invalidate_sessions(*drop_db_stmt->getDatabaseName(), drop_db_stmt);
4694  return;
4695  }
4696  const auto rename_db_stmt = dynamic_cast<Parser::RenameDatabaseStmt*>(ddl);
4697  if (rename_db_stmt) {
4698  invalidate_sessions(*rename_db_stmt->getPreviousDatabaseName(), rename_db_stmt);
4699  return;
4700  }
4701  const auto drop_user_stmt = dynamic_cast<Parser::DropUserStmt*>(ddl);
4702  if (drop_user_stmt) {
4703  invalidate_sessions(*drop_user_stmt->getUserName(), drop_user_stmt);
4704  return;
4705  }
4706  const auto rename_user_stmt = dynamic_cast<Parser::RenameUserStmt*>(ddl);
4707  if (rename_user_stmt) {
4708  invalidate_sessions(*rename_user_stmt->getOldUserName(), rename_user_stmt);
4709  return;
4710  }
4711 }
4712 
4714  const Catalog_Namespace::SessionInfo& session_info,
4715  const std::string& query_str,
4716  const bool column_format,
4717  const std::string& nonce,
4718  const ExecutorDeviceType executor_device_type,
4719  const int32_t first_n,
4720  const int32_t at_most_n) {
4721  if (leaf_handler_) {
4722  leaf_handler_->flush_queue();
4723  }
4724 
4725  _return.nonce = nonce;
4726  _return.execution_time_ms = 0;
4727  auto& cat = session_info.getCatalog();
4728 
4729  SQLParser parser;
4730  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
4731  std::string last_parsed;
4732  int num_parse_errors = 0;
4733  std::unique_ptr<Planner::RootPlan> root_plan;
4734 
4735  /*
4736  Use this seq to simplify locking:
4737  INSERT_VALUES: CheckpointLock [ >> TableWriteLock ]
4738  INSERT_SELECT: CheckpointLock >> TableReadLock [ >>
4739  TableWriteLock ] COPY_TO/SELECT: TableReadLock COPY_FROM: CheckpointLock [
4740  >> TableWriteLock ] DROP/TRUNC: CheckpointLock >> TableWriteLock
4741  DELETE/UPDATE: CheckpointLock >> TableWriteLock
4742  */
4743 
4744  std::vector<Lock_Namespace::TableLock> table_locks;
4745 
4746  mapd_unique_lock<mapd_shared_mutex> chkptlLock;
4747  mapd_unique_lock<mapd_shared_mutex> executeWriteLock;
4748  mapd_shared_lock<mapd_shared_mutex> executeReadLock;
4749 
4750  try {
4751  ParserWrapper pw{query_str};
4752  TableMap table_map;
4753  OptionalTableMap tableNames(table_map);
4754  if (pw.isCalcitePathPermissable(read_only_)) {
4755  std::string query_ra;
4756  _return.execution_time_ms += measure<>::execution([&]() {
4757  // query_ra = TIME_WRAP(parse_to_ra)(query_str, session_info);
4758  query_ra = parse_to_ra(query_str, {}, session_info, tableNames, mapd_parameters_);
4759  });
4760 
4761  std::string query_ra_calcite_explain;
4762  if (pw.isCalciteExplain() && (!g_enable_filter_push_down || g_cluster)) {
4763  // return the ra as the result
4764  convert_explain(_return, ResultSet(query_ra), true);
4765  return;
4766  } else if (pw.isCalciteExplain()) {
4767  // removing the "explain calcite " from the beginning of the "query_str":
4768  std::string temp_query_str =
4769  query_str.substr(std::string("explain calcite ").length());
4770  query_ra_calcite_explain =
4771  parse_to_ra(temp_query_str, {}, session_info, boost::none, mapd_parameters_);
4772  }
4773 
4774  // UPDATE/DELETE needs to get a checkpoint lock as the first lock
4775  for (const auto& table : tableNames.value()) {
4776  if (table.second) {
4777  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
4778  session_info.getCatalog(), table.first, LockType::CheckpointLock);
4779  }
4780  }
4781  // COPY_TO/SELECT: read ExecutorOuterLock >> TableReadLock locks
4782  executeReadLock = mapd_shared_lock<mapd_shared_mutex>(
4785  session_info.getCatalog(), tableNames.value(), table_locks);
4786 
4787  const auto filter_push_down_requests =
4788  execute_rel_alg(_return,
4789  pw.isCalciteExplain() ? query_ra_calcite_explain : query_ra,
4790  column_format,
4791  session_info,
4792  executor_device_type,
4793  first_n,
4794  at_most_n,
4795  pw.isIRExplain(),
4796  false,
4798  pw.isCalciteExplain(),
4799  pw.getExplainType() == ParserWrapper::ExplainType::OptimizedIR);
4800  if (pw.isCalciteExplain() && filter_push_down_requests.empty()) {
4801  // we only reach here if filter push down was enabled, but no filter
4802  // push down candidate was found
4803  convert_explain(_return, ResultSet(query_ra), true);
4804  return;
4805  }
4806  if (!filter_push_down_requests.empty()) {
4808  query_ra,
4809  column_format,
4810  session_info,
4811  executor_device_type,
4812  first_n,
4813  at_most_n,
4814  pw.isIRExplain(),
4815  pw.isCalciteExplain(),
4816  query_str,
4817  filter_push_down_requests);
4818  } else if (pw.isCalciteExplain() && filter_push_down_requests.empty()) {
4819  // return the ra as the result:
4820  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
4821  // no filter push down so we continue with the initial (unchanged) query's calcite
4822  // explanation.
4823  query_ra =
4824  parse_to_ra(query_str, {}, session_info, boost::none, mapd_parameters_);
4825  convert_explain(_return, ResultSet(query_ra), true);
4826  return;
4827  }
4828  if (pw.isCalciteExplain()) {
4829  // If we reach here, the filter push down candidates has been selected and
4830  // proper output result has been already created.
4831  return;
4832  }
4833  if (pw.isCalciteExplain()) {
4834  // return the ra as the result:
4835  // If we reach here, the 'filter_push_down_request' turned out to be empty, i.e.,
4836  // no filter push down so we continue with the initial (unchanged) query's calcite
4837  // explanation.
4838  query_ra =
4839  parse_to_ra(query_str, {}, session_info, boost::none, mapd_parameters_);
4840  convert_explain(_return, ResultSet(query_ra), true);
4841  return;
4842  }
4843  return;
4844  } else if (pw.is_optimize || pw.is_validate) {
4845  // Get the Stmt object
4846  try {
4847  num_parse_errors = parser.parse(query_str, parse_trees, last_parsed);
4848  } catch (std::exception& e) {
4849  throw std::runtime_error(e.what());
4850  }
4851  if (num_parse_errors > 0) {
4852  throw std::runtime_error("Syntax error at: " + last_parsed);
4853  }
4854  CHECK_EQ(parse_trees.size(), 1u);
4855 
4856  if (pw.is_optimize) {
4857  const auto optimize_stmt =
4858  dynamic_cast<Parser::OptimizeTableStmt*>(parse_trees.front().get());
4859  CHECK(optimize_stmt);
4860 
4861  _return.execution_time_ms += measure<>::execution([&]() {
4862  const auto td = cat.getMetadataForTable(optimize_stmt->getTableName(),
4863  /*populateFragmenter=*/true);
4864 
4865  if (!td || !user_can_access_table(
4866  session_info, td, AccessPrivileges::DELETE_FROM_TABLE)) {
4867  throw std::runtime_error("Table " + optimize_stmt->getTableName() +
4868  " does not exist.");
4869  }
4870 
4871  auto chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
4872  cat, td->tableName, LockType::CheckpointLock);
4873  auto table_write_lock = TableLockMgr::getWriteLockForTable(cat, td->tableName);
4874 
4875  auto executor = Executor::getExecutor(
4876  cat.getCurrentDB().dbId, "", "", mapd_parameters_, nullptr);
4877  const TableOptimizer optimizer(td, executor.get(), cat);
4878  if (optimize_stmt->shouldVacuumDeletedRows()) {
4879  optimizer.vacuumDeletedRows();
4880  }
4881  optimizer.recomputeMetadata();
4882  });
4883 
4884  return;
4885  }
4886  if (pw.is_validate) {
4887  // check user is superuser
4888  if (!session_info.get_currentUser().isSuper) {
4889  throw std::runtime_error("Superuser is required to run VALIDATE");
4890  }
4891  const auto validate_stmt =
4892  dynamic_cast<Parser::ValidateStmt*>(parse_trees.front().get());
4893  CHECK(validate_stmt);
4894 
4895  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
4897 
4898  std::string output{"Result for validate"};
4900  _return.execution_time_ms += measure<>::execution([&]() {
4901  const DistributedValidate validator(validate_stmt->getType(),
4902  validate_stmt->isRepairTypeRemove(),
4903  cat,
4905  session_info,
4906  *this);
4907  output = validator.validate();
4908  });
4909  } else {
4910  output = "Not running on a cluster nothing to validate";
4911  }
4912  convert_result(_return, ResultSet(output), true);
4913  return;
4914  }
4915  }
4916  LOG(INFO) << "passing query to legacy processor";
4917  } catch (std::exception& e) {
4918  if (strstr(e.what(), "java.lang.NullPointerException")) {
4919  THROW_MAPD_EXCEPTION(std::string("Exception: ") +
4920  "query failed from broken view or other schema related issue");
4921  } else {
4922  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
4923  }
4924  }
4925  try {
4926  // check for COPY TO stmt replace as required parser expects #~# markers
4927  const auto result = apply_copy_to_shim(query_str);
4928  num_parse_errors = parser.parse(result, parse_trees, last_parsed);
4929  } catch (std::exception& e) {
4930  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
4931  }
4932  if (num_parse_errors > 0) {
4933  THROW_MAPD_EXCEPTION("Syntax error at: " + last_parsed);
4934  }
4935 
4936  auto get_legacy_plan =
4937  [&cat](const Parser::DMLStmt* dml,
4938  const bool is_explain) -> std::unique_ptr<Planner::RootPlan> {
4940  dml->analyze(cat, query);
4941  Planner::Optimizer optimizer(query, cat);
4942  auto root_plan_ptr = std::unique_ptr<Planner::RootPlan>(optimizer.optimize());
4943  CHECK(root_plan_ptr);
4944  if (is_explain) {
4945  root_plan_ptr->set_plan_dest(Planner::RootPlan::Dest::kEXPLAIN);
4946  }
4947  return root_plan_ptr;
4948  };
4949 
4950  auto handle_ddl = [&session_info, &_return, &chkptlLock, &table_locks, this](
4951  Parser::DDLStmt* ddl) -> bool {
4952  if (!ddl) {
4953  return false;
4954  }
4955  const auto show_create_stmt = dynamic_cast<Parser::ShowCreateTableStmt*>(ddl);
4956  if (show_create_stmt) {
4957  // ParserNode ShowCreateTableStmt is currently unimplemented
4958  throw std::runtime_error(
4959  "SHOW CREATE TABLE is currently unsupported. Use `\\d` from omnisql for table "
4960  "DDL.");
4961  }
4962 
4963  const auto import_stmt = dynamic_cast<Parser::CopyTableStmt*>(ddl);
4964  if (import_stmt) {
4965  // get the lock if the table exists
4966  // thus allowing COPY FROM to execute to create a table
4967  // is it safe to do this check without a lock?
4968  // if the table doesn't exist, the getTableLock will throw an exception anyway
4969  const TableDescriptor* td =
4970  session_info.getCatalog().getMetadataForTable(import_stmt->get_table());
4971  if (td) {
4972  // COPY_FROM: CheckpointLock [ >> TableWriteLocks ]
4973  chkptlLock =
4974  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_info.getCatalog(),
4975  import_stmt->get_table(),
4977  // [ TableWriteLocks ] lock is deferred in
4978  // InsertOrderFragmenter::deleteFragments
4979  }
4980 
4981  if (g_cluster && !leaf_aggregator_.leafCount()) {
4982  // Don't allow copy from imports directly on a leaf node
4983  throw std::runtime_error(
4984  "Cannot import on an individual leaf. Please import from the Aggregator.");
4985  } else if (leaf_aggregator_.leafCount() > 0) {
4986  _return.execution_time_ms += measure<>::execution(
4987  [&]() { execute_distributed_copy_statement(import_stmt, session_info); });
4988  } else {
4989  _return.execution_time_ms +=
4990  measure<>::execution([&]() { ddl->execute(session_info); });
4991  }
4992 
4993  // Read response message
4994  convert_result(_return, ResultSet(*import_stmt->return_message.get()), true);
4995 
4996  // get geo_copy_from info
4997  _was_geo_copy_from = import_stmt->was_geo_copy_from();
4998  import_stmt->get_geo_copy_from_payload(_geo_copy_from_table,
5002  return true;
5003  }
5004 
5005  // Check for DDL statements requiring locking and get locks
5006  auto export_stmt = dynamic_cast<Parser::ExportQueryStmt*>(ddl);
5007  if (export_stmt) {
5008  TableMap table_map;
5009  OptionalTableMap tableNames(table_map);
5010  const auto query_string = export_stmt->get_select_stmt();
5011  const auto query_ra =
5012  parse_to_ra(query_string, {}, session_info, tableNames, mapd_parameters_);
5014  session_info.getCatalog(), tableNames.value(), table_locks);
5015  }
5016  auto truncate_stmt = dynamic_cast<Parser::TruncateTableStmt*>(ddl);
5017  if (truncate_stmt) {
5018  chkptlLock =
5019  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_info.getCatalog(),
5020  *truncate_stmt->get_table(),
5022  table_locks.emplace_back();
5023  table_locks.back().write_lock = TableLockMgr::getWriteLockForTable(
5024  session_info.getCatalog(), *truncate_stmt->get_table());
5025  }
5026  auto add_col_stmt = dynamic_cast<Parser::AddColumnStmt*>(ddl);
5027  if (add_col_stmt) {
5028  add_col_stmt->check_executable(session_info);
5029  chkptlLock =
5030  getTableLock<mapd_shared_mutex, mapd_unique_lock>(session_info.getCatalog(),
5031  *add_col_stmt->get_table(),
5033  table_locks.emplace_back();
5034  table_locks.back().write_lock = TableLockMgr::getWriteLockForTable(
5035  session_info.getCatalog(), *add_col_stmt->get_table());
5036  }
5037 
5038  _return.execution_time_ms += measure<>::execution([&]() {
5039  ddl->execute(session_info);
5041  });
5042  return true;
5043  };
5044 
5045  for (const auto& stmt : parse_trees) {
5046  try {
5047  auto select_stmt = dynamic_cast<Parser::SelectStmt*>(stmt.get());
5048  if (!select_stmt) {
5049  check_read_only("Non-SELECT statements");
5050  }
5051  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt.get());
5052  if (handle_ddl(ddl)) {
5053  if (render_handler_) {
5054  render_handler_->handle_ddl(ddl);
5055  }
5056  continue;
5057  }
5058  if (!root_plan) {
5059  // assume DML / non-explain plan (an insert or copy statement)
5060  const auto dml = dynamic_cast<Parser::DMLStmt*>(stmt.get());
5061  CHECK(dml);
5062  root_plan = get_legacy_plan(dml, false);
5063  CHECK(root_plan);
5064  }
5065  if (auto stmtp = dynamic_cast<Parser::InsertQueryStmt*>(stmt.get())) {
5066  // INSERT_SELECT: CheckpointLock >> TableReadLocks [ >> TableWriteLocks ]
5067  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5068  session_info.getCatalog(), *stmtp->get_table(), LockType::CheckpointLock);
5069  // >> TableReadLock locks
5070  TableMap table_map;
5071  OptionalTableMap tableNames(table_map);
5072  const auto query_string = stmtp->get_query()->to_string();
5073  const auto query_ra =
5074  parse_to_ra(query_str, {}, session_info, tableNames, mapd_parameters_);
5076  session_info.getCatalog(), tableNames.value(), table_locks);
5077 
5078  // [ TableWriteLocks ] lock is deferred in
5079  // InsertOrderFragmenter::deleteFragments
5080  // TODO: this statement is not supported. once supported, it must not go thru
5081  // InsertOrderFragmenter::insertData, or deadlock will occur w/o moving the
5082  // following lock back to here!!!
5083  } else if (auto stmtp = dynamic_cast<Parser::InsertValuesStmt*>(stmt.get())) {
5084  // INSERT_VALUES: CheckpointLock >> write ExecutorOuterLock [ >>
5085  // TableWriteLocks ]
5086  chkptlLock = getTableLock<mapd_shared_mutex, mapd_unique_lock>(
5087  session_info.getCatalog(), *stmtp->get_table(), LockType::CheckpointLock);
5088  executeWriteLock = mapd_unique_lock<mapd_shared_mutex>(
5090  // [ TableWriteLocks ] lock is deferred in
5091  // InsertOrderFragmenter::deleteFragments
5092  }
5093 
5094  execute_root_plan(_return,
5095  root_plan.get(),
5096  column_format,
5097  session_info,
5098  executor_device_type,
5099  first_n);
5100  } catch (std::exception& e) {
5101  const auto thrift_exception = dynamic_cast<const apache::thrift::TException*>(&e);
5102  THROW_MAPD_EXCEPTION(thrift_exception ? std::string(thrift_exception->what())
5103  : std::string("Exception: ") + e.what());
5104  }
5105  }
5106 }
5107 
5109  TQueryResult& _return,
5110  std::string& query_ra,
5111  const bool column_format,
5112  const Catalog_Namespace::SessionInfo& session_info,
5113  const ExecutorDeviceType executor_device_type,
5114  const int32_t first_n,
5115  const int32_t at_most_n,
5116  const bool just_explain,
5117  const bool just_calcite_explain,
5118  const std::string& query_str,
5119  const std::vector<PushedDownFilterInfo> filter_push_down_requests) {
5120  // collecting the selected filters' info to be sent to Calcite:
5121  std::vector<TFilterPushDownInfo> filter_push_down_info;
5122  for (const auto& req : filter_push_down_requests) {
5123  TFilterPushDownInfo filter_push_down_info_for_request;
5124  filter_push_down_info_for_request.input_prev = req.input_prev;
5125  filter_push_down_info_for_request.input_start = req.input_start;
5126  filter_push_down_info_for_request.input_next = req.input_next;
5127  filter_push_down_info.push_back(filter_push_down_info_for_request);
5128  }
5129  // deriving the new relational algebra plan with respect to the pushed down filters
5130  _return.execution_time_ms += measure<>::execution([&]() {
5131  query_ra = parse_to_ra(
5132  query_str, filter_push_down_info, session_info, boost::none, mapd_parameters_);
5133  });
5134 
5135  if (just_calcite_explain) {
5136  // return the new ra as the result
5137  convert_explain(_return, ResultSet(query_ra), true);
5138  return;
5139  }
5140 
5141  // execute the new relational algebra plan:
5142  execute_rel_alg(_return,
5143  query_ra,
5144  column_format,
5145  session_info,
5146  executor_device_type,
5147  first_n,
5148  at_most_n,
5149  just_explain,
5150  /*just_validate = */ false,
5151  /*find_push_down_candidates = */ false,
5152  /*just_calcite_explain = */ false,
5153  /*TODO: explain optimized*/ false);
5154 }
5155 
5157  Parser::CopyTableStmt* copy_stmt,
5158  const Catalog_Namespace::SessionInfo& session_info) {
5159  auto importer_factory = [&session_info, this](
5160  const Catalog& catalog,
5161  const TableDescriptor* td,
5162  const std::string& file_path,
5163  const Importer_NS::CopyParams& copy_params) {
5164  return boost::make_unique<Importer_NS::Importer>(
5165  new DistributedLoader(session_info, td, &leaf_aggregator_),
5166  file_path,
5167  copy_params);
5168  };
5169  copy_stmt->execute(session_info, importer_factory);
5170 }
5171 
5173  const std::string& query_str,
5174  const Catalog_Namespace::SessionInfo& session_info) {
5175  auto& cat = session_info.getCatalog();
5176  ParserWrapper pw{query_str};
5177  // if this is a calcite select or explain select run in calcite
5178  if (!pw.is_ddl && !pw.is_update_dml &&
5179  !(pw.getExplainType() == ParserWrapper::ExplainType::Other)) {
5180  const std::string actual_query{pw.isSelectExplain() ? pw.actual_query : query_str};
5181  const auto query_ra =
5182  calcite_
5183  ->process(session_info,
5184  legacy_syntax_ ? pg_shim(actual_query) : actual_query,
5185  {},
5187  pw.isCalciteExplain(),
5189  .plan_result;
5190  auto root_plan = translate_query(query_ra, cat);
5191  CHECK(root_plan);
5192  if (pw.isSelectExplain()) {
5193  root_plan->set_plan_dest(Planner::RootPlan::Dest::kEXPLAIN);
5194  }
5195  return root_plan;
5196  }
5197  return nullptr;
5198 }
5199 
5201  const std::string& query_str,
5202  const std::vector<TFilterPushDownInfo>& filter_push_down_info,
5203  const Catalog_Namespace::SessionInfo& session_info,
5204  OptionalTableMap tableNames,
5205  const MapDParameters mapd_parameters,
5206  RenderInfo* render_info) {
5208  ParserWrapper pw{query_str};
5209  const std::string actual_query{pw.isSelectExplain() ? pw.actual_query : query_str};
5210  if (pw.isCalcitePathPermissable()) {
5211  auto result = calcite_->process(session_info,
5212  legacy_syntax_ ? pg_shim(actual_query) : actual_query,
5213  filter_push_down_info,
5215  pw.isCalciteExplain(),
5216  mapd_parameters.enable_calcite_view_optimize);
5217  if (tableNames) {
5218  for (const auto& table : result.resolved_accessed_objects.tables_selected_from) {
5219  (tableNames.value())[table] = false;
5220  }
5221  for (const auto& tables :
5222  std::vector<decltype(result.resolved_accessed_objects.tables_inserted_into)>{
5223  result.resolved_accessed_objects.tables_inserted_into,
5224  result.resolved_accessed_objects.tables_updated_in,
5225  result.resolved_accessed_objects.tables_deleted_from}) {
5226  for (const auto& table : tables) {
5227  (tableNames.value())[table] = true;
5228  }
5229  }
5230  }
5231  if (render_info) {
5232  // grabs all the selected-from tables, even views. This is used by the renderer to
5233  // resolve view hit-testing.
5234  // NOTE: the same table name could exist in both the primary and resolved tables.
5235  auto selected_tables = &result.primary_accessed_objects.tables_selected_from;
5236  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5237  selected_tables = &result.resolved_accessed_objects.tables_selected_from;
5238  render_info->table_names.insert(selected_tables->begin(), selected_tables->end());
5239  }
5240  return result.plan_result;
5241  }
5242  return "";
5243 }
5244 
5245 void MapDHandler::check_table_consistency(TTableMeta& _return,
5246  const TSessionId& session,
5247  const int32_t table_id) {
5248  LOG_SESSION(session);
5249  if (!leaf_handler_) {
5250  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5251  }
5252  try {
5253  leaf_handler_->check_table_consistency(_return, session, table_id);
5254  } catch (std::exception& e) {
5255  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5256  }
5257 }
5258 
5259 void MapDHandler::start_query(TPendingQuery& _return,
5260  const TSessionId& session,
5261  const std::string& query_ra,
5262  const bool just_explain) {
5263  LOG_SESSION(session);
5264  if (!leaf_handler_) {
5265  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5266  }
5267  const auto session_info = get_session_copy(session);
5268  LOG(INFO) << "start_query :" << session_info << " :" << just_explain;
5269  auto time_ms = measure<>::execution([&]() {
5270  try {
5271  leaf_handler_->start_query(_return, session, query_ra, just_explain);
5272  } catch (std::exception& e) {
5273  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5274  }
5275  });
5276  LOG(INFO) << "start_query-COMPLETED " << time_ms << "ms "
5277  << "id is " << _return.id;
5278 }
5279 
5280 void MapDHandler::execute_first_step(TStepResult& _return,
5281  const TPendingQuery& pending_query) {
5282  if (!leaf_handler_) {
5283  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5284  }
5285  LOG(INFO) << "execute_first_step : id:" << pending_query.id;
5286  auto time_ms = measure<>::execution([&]() {
5287  try {
5288  leaf_handler_->execute_first_step(_return, pending_query);
5289  } catch (std::exception& e) {
5290  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5291  }
5292  });
5293  LOG(INFO) << "execute_first_step-COMPLETED " << time_ms << "ms";
5294 }
5295 
5296 void MapDHandler::broadcast_serialized_rows(const TSerializedRows& serialized_rows,
5297  const TRowDescriptor& row_desc,
5298  const TQueryId query_id) {
5299  if (!leaf_handler_) {
5300  THROW_MAPD_EXCEPTION("Distributed support is disabled.");
5301  }
5302  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS id:" << query_id;
5303  auto time_ms = measure<>::execution([&]() {
5304  try {
5305  leaf_handler_->broadcast_serialized_rows(serialized_rows, row_desc, query_id);
5306  } catch (std::exception& e) {
5307  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5308  }
5309  });
5310  LOG(INFO) << "BROADCAST-SERIALIZED-ROWS COMPLETED " << time_ms << "ms";
5311 }
5312 
5313 void MapDHandler::insert_data(const TSessionId& session,
5314  const TInsertData& thrift_insert_data) {
5315  LOG_SESSION(session);
5316  CHECK_EQ(thrift_insert_data.column_ids.size(), thrift_insert_data.data.size());
5317  const auto session_info = get_session_copy(session);
5318  auto& cat = session_info.getCatalog();
5320  insert_data.databaseId = thrift_insert_data.db_id;
5321  insert_data.tableId = thrift_insert_data.table_id;
5322  insert_data.columnIds = thrift_insert_data.column_ids;
5323  std::vector<std::unique_ptr<std::vector<std::string>>> none_encoded_string_columns;
5324  std::vector<std::unique_ptr<std::vector<ArrayDatum>>> array_columns;
5325  for (size_t col_idx = 0; col_idx < insert_data.columnIds.size(); ++col_idx) {
5326  const int column_id = insert_data.columnIds[col_idx];
5327  DataBlockPtr p;
5328  const auto cd = cat.getMetadataForColumn(insert_data.tableId, column_id);
5329  CHECK(cd);
5330  const auto& ti = cd->columnType;
5331  if (ti.is_number() || ti.is_time() || ti.is_boolean()) {
5332  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5333  } else if (ti.is_string()) {
5334  if (ti.get_compression() == kENCODING_DICT) {
5335  p.numbersPtr = (int8_t*)thrift_insert_data.data[col_idx].fixed_len_data.data();
5336  } else {
5337  CHECK_EQ(kENCODING_NONE, ti.get_compression());
5338  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5339  auto& none_encoded_strings = none_encoded_string_columns.back();
5340  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5341  thrift_insert_data.data[col_idx].var_len_data.size());
5342  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5343  none_encoded_strings->push_back(varlen_str.payload);
5344  }
5345  p.stringsPtr = none_encoded_strings.get();
5346  }
5347  } else if (ti.is_geometry()) {
5348  none_encoded_string_columns.emplace_back(new std::vector<std::string>());
5349  auto& none_encoded_strings = none_encoded_string_columns.back();
5350  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5351  thrift_insert_data.data[col_idx].var_len_data.size());
5352  for (const auto& varlen_str : thrift_insert_data.data[col_idx].var_len_data) {
5353  none_encoded_strings->push_back(varlen_str.payload);
5354  }
5355  p.stringsPtr = none_encoded_strings.get();
5356  } else {
5357  CHECK(ti.is_array());
5358  array_columns.emplace_back(new std::vector<ArrayDatum>());
5359  auto& array_column = array_columns.back();
5360  CHECK_EQ(static_cast<size_t>(thrift_insert_data.num_rows),
5361  thrift_insert_data.data[col_idx].var_len_data.size());
5362  for (const auto& t_arr_datum : thrift_insert_data.data[col_idx].var_len_data) {
5363  if (t_arr_datum.is_null) {
5364  if (ti.get_size() > 0 && !ti.get_elem_type().is_string()) {
5365  array_column->push_back(Importer_NS::ImporterUtils::composeNullArray(ti));
5366  } else {
5367  array_column->emplace_back(0, nullptr, true);
5368  }
5369  } else {
5370  ArrayDatum arr_datum;
5371  arr_datum.length = t_arr_datum.payload.size();
5372  int8_t* ptr = (int8_t*)(t_arr_datum.payload.data());
5373  arr_datum.pointer = ptr;
5374  // In this special case, ArrayDatum does not handle freeing the underlying
5375  // memory
5376  arr_datum.data_ptr = std::shared_ptr<int8_t>(ptr, [](auto p) {});
5377  arr_datum.is_null = false;
5378  array_column->push_back(arr_datum);
5379  }
5380  }
5381  p.arraysPtr = array_column.get();
5382  }
5383  insert_data.data.push_back(p);
5384  }
5385  insert_data.numRows = thrift_insert_data.num_rows;
5386  const auto td = cat.getMetadataForTable(insert_data.tableId);
5387  try {
5388  // this should have the same lock seq as COPY FROM
5389  ChunkKey chunkKey = {insert_data.databaseId, insert_data.tableId};
5390  mapd_unique_lock<mapd_shared_mutex> tableLevelWriteLock(
5393  // [ TableWriteLocks ] lock is deferred in
5394  // InsertOrderFragmenter::deleteFragments
5395  td->fragmenter->insertDataNoCheckpoint(insert_data);
5396  } catch (const std::exception& e) {
5397  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5398  }
5399 }
5400 
5401 void MapDHandler::start_render_query(TPendingRenderQuery& _return,
5402  const TSessionId& session,
5403  const int64_t widget_id,
5404  const int16_t node_idx,
5405  const std::string& vega_json) {
5406  LOG_SESSION(session);
5407  if (!render_handler_) {
5408  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5409  }
5410  const auto session_info = get_session_copy(session);
5411  LOG(INFO) << "start_render_query :" << session_info << " :widget_id:" << widget_id
5412  << ":vega_json:" << vega_json;
5413  auto time_ms = measure<>::execution([&]() {
5414  try {
5415  render_handler_->start_render_query(
5416  _return, session, widget_id, node_idx, vega_json);
5417  } catch (std::exception& e) {
5418  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5419  }
5420  });
5421  LOG(INFO) << "start_render_query-COMPLETED " << time_ms << "ms "
5422  << "id is " << _return.id;
5423 }
5424 
5425 void MapDHandler::execute_next_render_step(TRenderStepResult& _return,
5426  const TPendingRenderQuery& pending_render,
5427  const TRenderAggDataMap& merged_data) {
5428  if (!render_handler_) {
5429  THROW_MAPD_EXCEPTION("Backend rendering is disabled.");
5430  }
5431 
5432  LOG(INFO) << "execute_next_render_step: id:" << pending_render.id;
5433  auto time_ms = measure<>::execution([&]() {
5434  try {
5435  render_handler_->execute_next_render_step(_return, pending_render, merged_data);
5436  } catch (std::exception& e) {
5437  THROW_MAPD_EXCEPTION(std::string("Exception: ") + e.what());
5438  }
5439  });
5440  LOG(INFO) << "execute_next_render_step-COMPLETED id: " << pending_render.id
5441  << ", time: " << time_ms << "ms ";
5442 }
5443 
5444 void MapDHandler::checkpoint(const TSessionId& session,
5445  const int32_t db_id,
5446  const int32_t table_id) {
5447  LOG_SESSION(session);
5448  const auto session_info = get_session_copy(session);
5449  auto& cat = session_info.getCatalog();
5450  cat.getDataMgr().checkpoint(db_id, table_id);
5451 }
5452 
5453 // check and reset epoch if a request has been made
5454 void MapDHandler::set_table_epoch(const TSessionId& session,
5455  const int db_id,
5456  const int table_id,
5457  const int new_epoch) {
5458  LOG_SESSION(session);
5459  const auto session_info = get_session_copy(session);
5460  if (!session_info.get_currentUser().isSuper) {
5461  throw std::runtime_error("Only superuser can set_table_epoch");
5462  }
5463  auto& cat = session_info.getCatalog();
5464 
5465  if (leaf_aggregator_.leafCount() > 0) {
5466  return leaf_aggregator_.set_table_epochLeaf(session_info, db_id, table_id, new_epoch);
5467  }
5468  cat.setTableEpoch(db_id, table_id, new_epoch);
5469 }