OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryRunner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryRunner.h"
18 
19 #include "Calcite/Calcite.h"
20 #include "Catalog/Catalog.h"
21 #include "DistributedLoader.h"
22 #include "Geospatial/Transforms.h"
24 #include "Logger/Logger.h"
25 #include "Parser/ParserWrapper.h"
26 #include "Parser/parser.h"
33 #include "Shared/StringTransform.h"
35 #include "Shared/import_helpers.h"
37 #include "gen-cpp/CalciteServer.h"
38 #include "include/bcrypt.h"
39 
40 #include <boost/filesystem/operations.hpp>
41 #include <csignal>
42 #include <random>
43 
44 #define CALCITEPORT 3279
45 
46 extern size_t g_leaf_count;
47 extern bool g_enable_filter_push_down;
48 
50 
51 extern bool g_serialize_temp_tables;
53 std::mutex calcite_lock;
54 
55 using namespace Catalog_Namespace;
56 namespace {
57 
58 std::shared_ptr<Calcite> g_calcite = nullptr;
59 
60 void calcite_shutdown_handler() noexcept {
61  if (g_calcite) {
62  g_calcite->close_calcite_server();
63  g_calcite.reset();
64  }
65 }
66 
70 }
71 
72 } // namespace
73 
74 namespace QueryRunner {
75 
76 std::unique_ptr<QueryRunner> QueryRunner::qr_instance_ = nullptr;
77 
78 query_state::QueryStates QueryRunner::query_states_;
79 
80 QueryRunner* QueryRunner::init(const char* db_path,
81  const std::string& udf_filename,
82  const size_t max_gpu_mem,
83  const int reserved_gpu_mem) {
84  return QueryRunner::init(db_path,
85  std::string{OMNISCI_ROOT_USER},
86  "HyperInteractive",
87  std::string{OMNISCI_DEFAULT_DB},
88  {},
89  {},
90  udf_filename,
91  true,
92  max_gpu_mem,
93  reserved_gpu_mem);
94 }
95 
96 QueryRunner* QueryRunner::init(const char* db_path,
97  const std::string& user,
98  const std::string& pass,
99  const std::string& db_name,
100  const std::vector<LeafHostInfo>& string_servers,
101  const std::vector<LeafHostInfo>& leaf_servers,
102  const std::string& udf_filename,
103  bool uses_gpus,
104  const size_t max_gpu_mem,
105  const int reserved_gpu_mem,
106  const bool create_user,
107  const bool create_db) {
108  // Whitelist root path for tests by default
110  ddl_utils::FilePathWhitelist::initialize(db_path, "[\"/\"]", "[\"/\"]");
111  LOG_IF(FATAL, !leaf_servers.empty()) << "Distributed test runner not supported.";
112  CHECK(leaf_servers.empty());
113  qr_instance_.reset(new QueryRunner(db_path,
114  user,
115  pass,
116  db_name,
117  string_servers,
118  leaf_servers,
119  udf_filename,
120  uses_gpus,
121  max_gpu_mem,
122  reserved_gpu_mem,
123  create_user,
124  create_db));
125  return qr_instance_.get();
126 }
127 
128 QueryRunner::QueryRunner(const char* db_path,
129  const std::string& user_name,
130  const std::string& passwd,
131  const std::string& db_name,
132  const std::vector<LeafHostInfo>& string_servers,
133  const std::vector<LeafHostInfo>& leaf_servers,
134  const std::string& udf_filename,
135  bool uses_gpus,
136  const size_t max_gpu_mem,
137  const int reserved_gpu_mem,
138  const bool create_user,
139  const bool create_db)
140  : dispatch_queue_(std::make_unique<QueryDispatchQueue>(1)) {
142  boost::filesystem::path base_path{db_path};
143  CHECK(boost::filesystem::exists(base_path));
144  auto system_db_file = base_path / "mapd_catalogs" / OMNISCI_DEFAULT_DB;
145  CHECK(boost::filesystem::exists(system_db_file));
146  auto data_dir = base_path / "mapd_data";
147  DiskCacheConfig disk_cache_config{(base_path / "omnisci_disk_cache").string(),
151 
154  g_calcite =
155  std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024, 5000, true, udf_filename);
156  ExtensionFunctionsWhitelist::add(g_calcite->getExtensionFunctionWhitelist());
157  if (!udf_filename.empty()) {
158  ExtensionFunctionsWhitelist::addUdfs(g_calcite->getUserDefinedFunctionWhitelist());
159  }
160 
162  auto udtfs = ThriftSerializers::to_thrift(
164  std::vector<TUserDefinedFunction> udfs = {};
165  g_calcite->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
166 
167  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
168 #ifdef HAVE_CUDA
169  if (uses_gpus) {
170  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(-1, 0);
171  }
172 #else
173  uses_gpus = false;
174 #endif
175  SystemParameters mapd_params;
176  mapd_params.gpu_buffer_mem_bytes = max_gpu_mem;
177  mapd_params.aggregator = !leaf_servers.empty();
178 
179  data_mgr_.reset(new Data_Namespace::DataMgr(data_dir.string(),
180  mapd_params,
181  std::move(cuda_mgr),
182  uses_gpus,
183  reserved_gpu_mem,
184  0,
185  disk_cache_config));
186 
187  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
188 
189  g_base_path = base_path.string();
190  sys_cat.init(g_base_path,
191  data_mgr_,
192  {},
193  g_calcite,
194  false,
195  mapd_params.aggregator,
196  string_servers);
197 
198  if (create_user) {
199  if (!sys_cat.getMetadataForUser(user_name, user)) {
200  sys_cat.createUser(user_name, passwd, false, "", true);
201  }
202  }
203  CHECK(sys_cat.getMetadataForUser(user_name, user));
204  CHECK(bcrypt_checkpw(passwd.c_str(), user.passwd_hash.c_str()) == 0);
205 
206  if (create_db) {
207  if (!sys_cat.getMetadataForDB(db_name, db)) {
208  sys_cat.createDatabase(db_name, user.userId);
209  }
210  }
211  CHECK(sys_cat.getMetadataForDB(db_name, db));
212  CHECK(user.isSuper || (user.userId == db.dbOwner));
213  auto cat = sys_cat.getCatalog(db, create_db);
214  CHECK(cat);
215  session_info_ = std::make_unique<Catalog_Namespace::SessionInfo>(
216  cat, user, ExecutorDeviceType::GPU, "");
217 }
218 
219 QueryRunner::~QueryRunner() {
221 }
222 
223 void QueryRunner::resizeDispatchQueue(const size_t num_executors) {
224  dispatch_queue_ = std::make_unique<QueryDispatchQueue>(num_executors);
225 }
226 
227 QueryRunner::QueryRunner(std::unique_ptr<Catalog_Namespace::SessionInfo> session)
228  : session_info_(std::move(session))
229  , dispatch_queue_(std::make_unique<QueryDispatchQueue>(1)) {}
230 
231 std::shared_ptr<Catalog_Namespace::Catalog> QueryRunner::getCatalog() const {
233  return session_info_->get_catalog_ptr();
234 }
235 
236 std::shared_ptr<Calcite> QueryRunner::getCalcite() const {
237  // TODO: Embed Calcite shared_ptr ownership in QueryRunner
238  return g_calcite;
239 }
240 
241 bool QueryRunner::gpusPresent() const {
243  return session_info_->getCatalog().getDataMgr().gpusPresent();
244 }
245 
246 void QueryRunner::clearGpuMemory() const {
249 }
250 
251 void QueryRunner::clearCpuMemory() const {
254 }
255 
256 std::string apply_copy_to_shim(const std::string& query_str) {
257  auto result = query_str;
258  {
259  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
260  boost::regex::extended | boost::regex::icase};
261  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
262  result.replace(
263  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
264  });
265  }
266  return result;
267 }
268 
269 RegisteredQueryHint QueryRunner::getParsedQueryHint(const std::string& query_str) {
272  auto query_state = create_query_state(session_info_, query_str);
273  const auto& cat = session_info_->getCatalog();
275  auto calcite_mgr = cat.getCalciteMgr();
276  const auto query_ra = calcite_mgr
277  ->process(query_state->createQueryStateProxy(),
278  pg_shim(query_str),
279  {},
280  true,
281  false,
282  false,
283  true)
284  .plan_result;
285  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra, query_state);
286  const auto& query_hints = ra_executor.getParsedQueryHints();
287  return query_hints;
288 }
289 
290 void QueryRunner::runDDLStatement(const std::string& stmt_str_in) {
293 
294  std::string stmt_str = stmt_str_in;
295  // First remove special chars
296  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
297  // Then remove spaces
298  boost::algorithm::trim_left(stmt_str);
299 
300  ParserWrapper pw{stmt_str};
301  if (pw.is_copy_to) {
302  stmt_str = apply_copy_to_shim(stmt_str_in);
303  }
304 
305  auto query_state = create_query_state(session_info_, stmt_str);
306  auto stdlog = STDLOG(query_state);
307 
308  SQLParser parser;
309  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
310  std::string last_parsed;
311  CHECK_EQ(parser.parse(stmt_str, parse_trees, last_parsed), 0) << stmt_str_in;
312  CHECK_EQ(parse_trees.size(), size_t(1));
313  auto stmt = parse_trees.front().get();
314  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
315  CHECK(ddl);
316  ddl->execute(*session_info_);
317 }
318 
319 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
321  ExecutionOptions eo) {
324 
325  ParserWrapper pw{query_str};
326  if (pw.isCalcitePathPermissable()) {
327  const auto execution_result = runSelectQuery(query_str, std::move(co), std::move(eo));
328  VLOG(1) << session_info_->getCatalog().getDataMgr().getSystemMemoryUsage();
329  return execution_result->getRows();
330  }
331 
332  auto query_state = create_query_state(session_info_, query_str);
333  auto stdlog = STDLOG(query_state);
334 
335  SQLParser parser;
336  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
337  std::string last_parsed;
338  CHECK_EQ(parser.parse(query_str, parse_trees, last_parsed), 0) << query_str;
339  CHECK_EQ(parse_trees.size(), size_t(1));
340  auto stmt = parse_trees.front().get();
341  auto insert_values_stmt = dynamic_cast<Parser::InsertValuesStmt*>(stmt);
342  if (insert_values_stmt) {
343  insert_values_stmt->execute(*session_info_);
344  return nullptr;
345  }
346  auto ctas_stmt = dynamic_cast<Parser::CreateTableAsSelectStmt*>(stmt);
347  if (ctas_stmt) {
348  ctas_stmt->execute(*session_info_);
349  return nullptr;
350  }
351  auto itas_stmt = dynamic_cast<Parser::InsertIntoTableAsSelectStmt*>(stmt);
352  if (itas_stmt) {
353  itas_stmt->execute(*session_info_);
354  return nullptr;
355  }
356  UNREACHABLE();
357  return nullptr;
358 }
359 
360 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
361  const ExecutorDeviceType device_type,
362  const bool hoist_literals,
363  const bool allow_loop_joins) {
364  auto co = CompilationOptions::defaults(device_type);
365  co.hoist_literals = hoist_literals;
366  return runSQL(
367  query_str, std::move(co), defaultExecutionOptionsForRunSQL(allow_loop_joins));
368 }
369 
370 ExecutionOptions QueryRunner::defaultExecutionOptionsForRunSQL(bool allow_loop_joins,
371  bool just_explain) {
372  return {g_enable_columnar_output,
373  true,
374  just_explain,
375  allow_loop_joins,
376  false,
377  false,
378  false,
379  false,
380  10000,
381  false,
382  false,
384  false,
385  1000};
386 }
387 
388 std::shared_ptr<Executor> QueryRunner::getExecutor() const {
391  auto query_state = create_query_state(session_info_, "");
392  auto stdlog = STDLOG(query_state);
394  return executor;
395 }
396 
397 std::shared_ptr<ResultSet> QueryRunner::runSQLWithAllowingInterrupt(
398  const std::string& query_str,
399  const std::string& session_id,
400  const ExecutorDeviceType device_type,
401  const double running_query_check_freq,
402  const unsigned pending_query_check_freq) {
405  auto session_info =
406  std::make_shared<Catalog_Namespace::SessionInfo>(session_info_->get_catalog_ptr(),
407  session_info_->get_currentUser(),
408  device_type,
409  session_id);
410  auto query_state = create_query_state(session_info, query_str);
411  auto stdlog = STDLOG(query_state);
412  const auto& cat = query_state->getConstSessionInfo()->getCatalog();
413  std::string query_ra{""};
414 
415  std::shared_ptr<ExecutionResult> result;
416  auto query_launch_task = std::make_shared<QueryDispatchQueue::Task>(
417  [&cat,
418  &query_ra,
419  &device_type,
420  &query_state,
421  &result,
422  &running_query_check_freq,
423  &pending_query_check_freq](const size_t worker_id) {
424  auto executor = Executor::getExecutor(worker_id);
427 
429  true,
430  false,
431  true,
432  false,
433  false,
434  false,
435  false,
436  10000,
437  false,
438  false,
440  true,
441  running_query_check_freq,
442  pending_query_check_freq};
443  {
444  // async query initiation for interrupt test
445  // incurs data race warning in TSAN since
446  // calcite_mgr is shared across multiple query threads
447  // so here we lock the manager during query parsing
448  std::lock_guard<std::mutex> calcite_lock_guard(calcite_lock);
449  auto calcite_mgr = cat.getCalciteMgr();
450  query_ra = calcite_mgr
451  ->process(query_state->createQueryStateProxy(),
452  pg_shim(query_state->getQueryStr()),
453  {},
454  true,
455  false,
456  false,
457  true)
458  .plan_result;
459  }
460  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra, query_state);
461  const auto& query_hints = ra_executor.getParsedQueryHints();
462  const bool cpu_mode_enabled = query_hints.isHintRegistered(QueryHint::kCpuMode);
463  if (cpu_mode_enabled) {
465  }
466  result = std::make_shared<ExecutionResult>(
467  ra_executor.executeRelAlgQuery(co, eo, false, nullptr));
468  });
470  executor->enrollQuerySession(session_id,
471  query_str,
472  query_state->getQuerySubmittedTime(),
474  QuerySessionStatus::QueryStatus::PENDING_QUEUE);
476  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
477  auto result_future = query_launch_task->get_future();
478  result_future.get();
479  CHECK(result);
480  return result->getRows();
481 }
482 
483 std::vector<std::shared_ptr<ResultSet>> QueryRunner::runMultipleStatements(
484  const std::string& sql,
485  const ExecutorDeviceType dt) {
486  std::vector<std::shared_ptr<ResultSet>> results;
487  // TODO: Need to properly handle escaped semicolons instead of doing a naive split().
488  auto fields = split(sql, ";");
489  for (const auto& field : fields) {
490  auto text = strip(field) + ";";
491  if (text == ";") {
492  continue;
493  }
494  // TODO: Maybe remove this redundant parsing after enhancing Parser::Stmt?
495  SQLParser parser;
496  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
497  std::string last_parsed;
498  CHECK_EQ(parser.parse(text, parse_trees, last_parsed), 0);
499  CHECK_EQ(parse_trees.size(), size_t(1));
500  auto stmt = parse_trees.front().get();
501  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
502  Parser::DMLStmt* dml = dynamic_cast<Parser::DMLStmt*>(stmt);
503  if (ddl != nullptr && dml == nullptr) {
504  runDDLStatement(text);
505  results.push_back(nullptr);
506  } else if (ddl == nullptr && dml != nullptr) {
507  results.push_back(runSQL(text, dt, true, true));
508  } else {
509  throw std::runtime_error("Unexpected SQL statement type: " + text);
510  }
511  }
512  return results;
513 }
514 
515 void QueryRunner::runImport(Parser::CopyTableStmt* import_stmt) {
516  CHECK(import_stmt);
517  import_stmt->execute(*session_info_);
518 }
519 
520 std::unique_ptr<import_export::Loader> QueryRunner::getLoader(
521  const TableDescriptor* td) const {
522  auto cat = getCatalog();
523  return std::make_unique<import_export::Loader>(*cat, td);
524 }
525 
526 namespace {
527 
528 std::shared_ptr<ExecutionResult> run_select_query_with_filter_push_down(
529  QueryStateProxy query_state_proxy,
530  const ExecutorDeviceType device_type,
531  const bool hoist_literals,
532  const bool allow_loop_joins,
533  const bool just_explain,
534  const bool with_filter_push_down) {
535  auto const& query_state = query_state_proxy.getQueryState();
536  const auto& cat = query_state.getConstSessionInfo()->getCatalog();
540 
542  true,
543  just_explain,
544  allow_loop_joins,
545  false,
546  false,
547  false,
548  false,
549  10000,
550  with_filter_push_down,
551  false,
553  false};
554  auto calcite_mgr = cat.getCalciteMgr();
555  const auto query_ra = calcite_mgr
556  ->process(query_state_proxy,
557  pg_shim(query_state.getQueryStr()),
558  {},
559  true,
560  false,
561  false,
562  true)
563  .plan_result;
564  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra);
565  const auto& query_hints = ra_executor.getParsedQueryHints();
566  const bool cpu_mode_enabled = query_hints.isHintRegistered(QueryHint::kCpuMode);
567  if (cpu_mode_enabled) {
569  }
570  auto result = std::make_shared<ExecutionResult>(
571  ra_executor.executeRelAlgQuery(co, eo, false, nullptr));
572  const auto& filter_push_down_requests = result->getPushedDownFilterInfo();
573  if (!filter_push_down_requests.empty()) {
574  std::vector<TFilterPushDownInfo> filter_push_down_info;
575  for (const auto& req : filter_push_down_requests) {
576  TFilterPushDownInfo filter_push_down_info_for_request;
577  filter_push_down_info_for_request.input_prev = req.input_prev;
578  filter_push_down_info_for_request.input_start = req.input_start;
579  filter_push_down_info_for_request.input_next = req.input_next;
580  filter_push_down_info.push_back(filter_push_down_info_for_request);
581  }
582  const auto new_query_ra = calcite_mgr
583  ->process(query_state_proxy,
584  pg_shim(query_state.getQueryStr()),
585  filter_push_down_info,
586  true,
587  false,
588  false,
589  true)
590  .plan_result;
591  const ExecutionOptions eo_modified{eo.output_columnar_hint,
592  eo.allow_multifrag,
593  eo.just_explain,
594  eo.allow_loop_joins,
595  eo.with_watchdog,
596  eo.jit_debug,
597  eo.just_validate,
600  /*find_push_down_candidates=*/false,
601  /*just_calcite_explain=*/false,
606  auto new_ra_executor = RelAlgExecutor(executor.get(), cat, new_query_ra);
607  return std::make_shared<ExecutionResult>(
608  new_ra_executor.executeRelAlgQuery(co, eo_modified, false, nullptr));
609  } else {
610  return result;
611  }
612 }
613 
614 } // namespace
615 
616 std::shared_ptr<ExecutionResult> QueryRunner::runSelectQuery(const std::string& query_str,
618  ExecutionOptions eo) {
621  auto query_state = create_query_state(session_info_, query_str);
622  auto stdlog = STDLOG(query_state);
624  return run_select_query_with_filter_push_down(query_state->createQueryStateProxy(),
625  co.device_type,
626  co.hoist_literals,
627  eo.allow_loop_joins,
628  eo.just_explain,
630  }
631 
632  const auto& cat = session_info_->getCatalog();
633 
634  std::shared_ptr<ExecutionResult> result;
635  auto query_launch_task = std::make_shared<QueryDispatchQueue::Task>(
636  [&cat, &query_str, &co, &eo, &query_state, &result](const size_t worker_id) {
637  auto executor = Executor::getExecutor(worker_id);
638  // TODO The next line should be deleted since it overwrites co, but then
639  // NycTaxiTest.RunSelectsEncodingDictWhereGreater fails due to co not getting
640  // reset to its default values.
641  co = CompilationOptions::defaults(co.device_type);
643  auto calcite_mgr = cat.getCalciteMgr();
644  const auto query_ra = calcite_mgr
645  ->process(query_state->createQueryStateProxy(),
646  pg_shim(query_str),
647  {},
648  true,
649  false,
651  true)
652  .plan_result;
653  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra);
654  const auto& query_hints = ra_executor.getParsedQueryHints();
655  const bool cpu_mode_enabled = query_hints.isHintRegistered(QueryHint::kCpuMode);
656  if (cpu_mode_enabled) {
657  co.device_type = ExecutorDeviceType::CPU;
658  }
659  result = std::make_shared<ExecutionResult>(
660  ra_executor.executeRelAlgQuery(co, eo, false, nullptr));
661  });
663  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
664  auto result_future = query_launch_task->get_future();
665  result_future.get();
666  CHECK(result);
667  return result;
668 }
669 
670 std::shared_ptr<ExecutionResult> QueryRunner::runSelectQuery(
671  const std::string& query_str,
672  const ExecutorDeviceType device_type,
673  const bool hoist_literals,
674  const bool allow_loop_joins,
675  const bool just_explain) {
676  auto co = CompilationOptions::defaults(device_type);
677  co.hoist_literals = hoist_literals;
678  return runSelectQuery(query_str,
679  std::move(co),
680  defaultExecutionOptionsForRunSQL(allow_loop_joins, just_explain));
681 }
682 
683 const int32_t* QueryRunner::getCachedJoinHashTable(size_t idx) {
684  auto hash_table_cache = PerfectJoinHashTable::getHashTableCache();
685  CHECK(hash_table_cache);
686  auto hash_table = hash_table_cache->getCachedHashTable(idx);
687  CHECK(hash_table);
688  return reinterpret_cast<int32_t*>(hash_table->getCpuBuffer());
689 };
690 
691 const int8_t* QueryRunner::getCachedBaselineHashTable(size_t idx) {
692  auto hash_table_cache = BaselineJoinHashTable::getHashTableCache();
693  CHECK(hash_table_cache);
694  auto hash_table = hash_table_cache->getCachedHashTable(idx);
695  CHECK(hash_table);
696  return hash_table->getCpuBuffer();
697 };
698 
699 size_t QueryRunner::getEntryCntCachedBaselineHashTable(size_t idx) {
700  auto hash_table_cache = BaselineJoinHashTable::getHashTableCache();
701  CHECK(hash_table_cache);
702  auto hash_table = hash_table_cache->getCachedHashTable(idx);
703  CHECK(hash_table);
704  return hash_table->getEntryCount();
705 }
706 
707 size_t QueryRunner::getNumberOfCachedJoinHashTables() {
708  auto hash_table_cache = PerfectJoinHashTable::getHashTableCache();
709  CHECK(hash_table_cache);
710  return hash_table_cache->getNumberOfCachedHashTables();
711 };
712 
713 size_t QueryRunner::getNumberOfCachedBaselineJoinHashTables() {
714  auto hash_table_cache = BaselineJoinHashTable::getHashTableCache();
715  CHECK(hash_table_cache);
716  return hash_table_cache->getNumberOfCachedHashTables();
717 };
718 
719 size_t QueryRunner::getNumberOfCachedOverlapsHashTables() {
721 }
722 
723 void QueryRunner::reset() {
724  qr_instance_.reset(nullptr);
726 }
727 
728 ImportDriver::ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> cat,
730  const ExecutorDeviceType dt,
731  const std::string session_id)
732  : QueryRunner(
733  std::make_unique<Catalog_Namespace::SessionInfo>(cat, user, dt, session_id)) {}
734 
735 void ImportDriver::importGeoTable(const std::string& file_path,
736  const std::string& table_name,
737  const bool compression,
738  const bool create_table,
739  const bool explode_collections) {
740  using namespace import_export;
741 
743  const std::string geo_column_name(OMNISCI_GEO_PREFIX);
744 
745  CopyParams copy_params;
746  if (compression) {
748  copy_params.geo_coords_comp_param = 32;
749  } else {
750  copy_params.geo_coords_encoding = EncodingType::kENCODING_NONE;
751  copy_params.geo_coords_comp_param = 0;
752  }
753  copy_params.geo_assign_render_groups = true;
754  copy_params.geo_explode_collections = explode_collections;
755 
756  auto cds = Importer::gdalToColumnDescriptors(file_path, geo_column_name, copy_params);
757  std::map<std::string, std::string> colname_to_src;
758  for (auto& cd : cds) {
759  const auto col_name_sanitized = ImportHelpers::sanitize_name(cd.columnName);
760  const auto ret =
761  colname_to_src.insert(std::make_pair(col_name_sanitized, cd.columnName));
762  CHECK(ret.second);
763  cd.columnName = col_name_sanitized;
764  }
765 
766  auto& cat = session_info_->getCatalog();
767 
768  if (create_table) {
769  const auto td = cat.getMetadataForTable(table_name);
770  if (td != nullptr) {
771  throw std::runtime_error("Error: Table " + table_name +
772  " already exists. Possible failure to correctly re-create "
773  "mapd_data directory.");
774  }
775  if (table_name != ImportHelpers::sanitize_name(table_name)) {
776  throw std::runtime_error("Invalid characters in table name: " + table_name);
777  }
778 
779  std::string stmt{"CREATE TABLE " + table_name};
780  std::vector<std::string> col_stmts;
781 
782  for (auto& cd : cds) {
783  if (cd.columnType.get_type() == SQLTypes::kINTERVAL_DAY_TIME ||
784  cd.columnType.get_type() == SQLTypes::kINTERVAL_YEAR_MONTH) {
785  throw std::runtime_error(
786  "Unsupported type: INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH for col " +
787  cd.columnName + " (table: " + table_name + ")");
788  }
789 
790  if (cd.columnType.get_type() == SQLTypes::kDECIMAL) {
791  if (cd.columnType.get_precision() == 0 && cd.columnType.get_scale() == 0) {
792  cd.columnType.set_precision(14);
793  cd.columnType.set_scale(7);
794  }
795  }
796 
797  std::string col_stmt;
798  col_stmt.append(cd.columnName + " " + cd.columnType.get_type_name() + " ");
799 
800  if (cd.columnType.get_compression() != EncodingType::kENCODING_NONE) {
801  col_stmt.append("ENCODING " + cd.columnType.get_compression_name() + " ");
802  } else {
803  if (cd.columnType.is_string()) {
804  col_stmt.append("ENCODING NONE");
805  } else if (cd.columnType.is_geometry()) {
806  if (cd.columnType.get_output_srid() == 4326) {
807  col_stmt.append("ENCODING NONE");
808  }
809  }
810  }
811  col_stmts.push_back(col_stmt);
812  }
813 
814  stmt.append(" (" + boost::algorithm::join(col_stmts, ",") + ");");
815  runDDLStatement(stmt);
816 
817  LOG(INFO) << "Created table: " << table_name;
818  } else {
819  LOG(INFO) << "Not creating table: " << table_name;
820  }
821 
822  const auto td = cat.getMetadataForTable(table_name);
823  if (td == nullptr) {
824  throw std::runtime_error("Error: Failed to create table " + table_name);
825  }
826 
827  import_export::Importer importer(cat, td, file_path, copy_params);
828  auto ms = measure<>::execution(
829  [&]() { importer.importGDAL(colname_to_src, session_info_.get()); });
830  LOG(INFO) << "Import Time for " << table_name << ": " << (double)ms / 1000.0 << " s";
831 }
832 
833 } // namespace QueryRunner
bool g_enable_calcite_view_optimize
Definition: QueryRunner.cpp:52
Classes used to wrap parser calls for calcite redirection.
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src, const Catalog_Namespace::SessionInfo *session_info)
Definition: Importer.cpp:4906
static void addUdfs(const std::string &json_func_sigs)
#define CHECK_EQ(x, y)
Definition: Logger.h:211
#define CALCITEPORT
Definition: QueryRunner.cpp:44
std::string cat(Ts &&...args)
std::unique_ptr< QueryDispatchQueue > dispatch_queue_
Definition: QueryRunner.h:209
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
ExecutorDeviceType
static void initialize(const std::string &data_dir, const std::string &allowed_import_paths, const std::string &allowed_export_paths)
Definition: DdlUtils.cpp:690
std::string strip(std::string_view str)
trim any whitespace from the left and right ends of a string
ImportDriver(std::shared_ptr< Catalog_Namespace::Catalog > cat, const Catalog_Namespace::UserMetadata &user, const ExecutorDeviceType dt=ExecutorDeviceType::GPU, const std::string session_id="")
#define LOG(tag)
Definition: Logger.h:194
std::string join(T const &container, std::string const &delim)
static void add(const std::string &json_func_sigs)
#define UNREACHABLE()
Definition: Logger.h:247
std::mutex calcite_lock
Definition: QueryRunner.cpp:53
ExecutorOptLevel opt_level
void set_once_fatal_func(FatalFunc fatal_func)
Definition: Logger.cpp:307
void execute(const Catalog_Namespace::SessionInfo &session) override
static ExecutionOptions defaultExecutionOptionsForRunSQL(bool allow_loop_joins=true, bool just_explain=false)
virtual std::shared_ptr< ResultSet > runSQL(const std::string &query_str, CompilationOptions co, ExecutionOptions eo)
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
double running_query_interrupt_freq
#define LOG_IF(severity, condition)
Definition: Logger.h:293
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:185
static void addShutdownCallback(std::function< void()> shutdown_callback)
static auto * getHashTableCache()
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters &system_parameters=SystemParameters())
Definition: Execute.cpp:163
This file contains the class specification and related data structures for Catalog.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool g_enable_columnar_output
Definition: Execute.cpp:93
static SysCatalog & instance()
Definition: SysCatalog.h:292
std::string g_base_path
Definition: SysCatalog.cpp:62
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
QueryState & getQueryState()
Definition: QueryState.h:175
const std::string OMNISCI_DEFAULT_DB
Definition: SysCatalog.h:59
void execute(const Catalog_Namespace::SessionInfo &session) override
virtual void runDDLStatement(const std::string &)
static std::unique_ptr< QueryRunner > qr_instance_
Definition: QueryRunner.h:206
double g_gpu_mem_limit_percent
Definition: QueryRunner.cpp:49
std::string sanitize_name(const std::string &name)
void execute(const Catalog_Namespace::SessionInfo &session) override
unsigned pending_query_interrupt_freq
bool g_serialize_temp_tables
Definition: Catalog.cpp:98
const std::string OMNISCI_GEO_PREFIX
Definition: Transforms.h:22
ExecutorDeviceType device_type
void apply_shim(std::string &result, const boost::regex &reg_expr, const std::function< void(std::string &, const boost::smatch &)> &shim_fn)
void importGeoTable(const std::string &file_path, const std::string &table_name, const bool compression, const bool create_table, const bool explode_collections)
TExtArgumentType::type to_thrift(const ExtArgumentType &t)
const std::string OMNISCI_ROOT_USER
Definition: SysCatalog.h:60
static size_t getCombinedHashTableCacheSize()
RegisteredQueryHint getParsedQueryHints()
std::shared_ptr< Catalog_Namespace::SessionInfo > session_info_
Definition: QueryRunner.h:208
virtual std::shared_ptr< ExecutionResult > runSelectQuery(const std::string &query_str, CompilationOptions co, ExecutionOptions eo)
std::shared_ptr< Calcite > g_calcite
Definition: QueryRunner.cpp:58
static std::shared_ptr< query_state::QueryState > create_query_state(Ts &&...args)
Definition: QueryRunner.h:188
static CompilationOptions defaults(const ExecutorDeviceType device_type=ExecutorDeviceType::GPU)
bool g_enable_filter_push_down
Definition: Execute.cpp:89
#define CHECK(condition)
Definition: Logger.h:203
double gpu_input_mem_limit_percent
static std::vector< TableFunction > get_table_funcs(const std::string &name, const bool is_gpu)
Serializers for query engine types to/from thrift.
std::shared_ptr< ExecutionResult > run_select_query_with_filter_push_down(QueryStateProxy query_state_proxy, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain, const bool with_filter_push_down)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
unsigned dynamic_watchdog_time_limit
void execute(const Catalog_Namespace::SessionInfo &session)
std::shared_ptr< Data_Namespace::DataMgr > data_mgr_
Definition: QueryRunner.h:210
size_t g_leaf_count
Definition: ParserNode.cpp:76
std::string pg_shim(const std::string &query)
#define STDLOG(...)
Definition: QueryState.h:229
#define VLOG(n)
Definition: Logger.h:297
std::atomic< bool > isSuper
Definition: SysCatalog.h:99
static void nukeCacheOfExecutors()
Definition: Execute.h:381
std::string apply_copy_to_shim(const std::string &query_str)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:366
EncodingType geo_coords_encoding
Definition: CopyParams.h:74