OmniSciDB  a575cb28ea
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryRunner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryRunner.h"
18 
19 #include "Calcite/Calcite.h"
20 #include "Catalog/Catalog.h"
21 #include "DistributedLoader.h"
22 #include "Geospatial/Transforms.h"
24 #include "Logger/Logger.h"
25 #include "Parser/ParserWrapper.h"
26 #include "Parser/parser.h"
33 #include "Shared/StringTransform.h"
35 #include "Shared/import_helpers.h"
37 #include "bcrypt.h"
38 #include "gen-cpp/CalciteServer.h"
39 
40 #include <boost/filesystem/operations.hpp>
41 #include <csignal>
42 #include <random>
43 
44 #define CALCITEPORT 3279
45 
46 extern size_t g_leaf_count;
47 extern bool g_enable_filter_push_down;
48 
50 
51 extern bool g_serialize_temp_tables;
53 std::mutex calcite_lock;
54 
55 using namespace Catalog_Namespace;
56 namespace {
57 
58 std::shared_ptr<Calcite> g_calcite = nullptr;
59 
60 void calcite_shutdown_handler() noexcept {
61  if (g_calcite) {
62  g_calcite->close_calcite_server();
63  g_calcite.reset();
64  }
65 }
66 
70 }
71 
72 } // namespace
73 
74 namespace QueryRunner {
75 
76 std::unique_ptr<QueryRunner> QueryRunner::qr_instance_ = nullptr;
77 
78 query_state::QueryStates QueryRunner::query_states_;
79 
80 QueryRunner* QueryRunner::init(const char* db_path,
81  const std::string& user,
82  const std::string& pass,
83  const std::string& db_name,
84  const std::vector<LeafHostInfo>& string_servers,
85  const std::vector<LeafHostInfo>& leaf_servers,
86  const std::string& udf_filename,
87  bool uses_gpus,
88  const size_t max_gpu_mem,
89  const int reserved_gpu_mem,
90  const bool create_user,
91  const bool create_db) {
92  LOG_IF(FATAL, !leaf_servers.empty()) << "Distributed test runner not supported.";
93  CHECK(leaf_servers.empty());
94  qr_instance_.reset(new QueryRunner(db_path,
95  user,
96  pass,
97  db_name,
98  string_servers,
99  leaf_servers,
100  udf_filename,
101  uses_gpus,
102  max_gpu_mem,
103  reserved_gpu_mem,
104  create_user,
105  create_db));
106  return qr_instance_.get();
107 }
108 
109 QueryRunner::QueryRunner(const char* db_path,
110  const std::string& user_name,
111  const std::string& passwd,
112  const std::string& db_name,
113  const std::vector<LeafHostInfo>& string_servers,
114  const std::vector<LeafHostInfo>& leaf_servers,
115  const std::string& udf_filename,
116  bool uses_gpus,
117  const size_t max_gpu_mem,
118  const int reserved_gpu_mem,
119  const bool create_user,
120  const bool create_db)
121  : dispatch_queue_(std::make_unique<QueryDispatchQueue>(1)) {
123 
124  boost::filesystem::path base_path{db_path};
125  CHECK(boost::filesystem::exists(base_path));
126  auto system_db_file = base_path / "mapd_catalogs" / OMNISCI_DEFAULT_DB;
127  CHECK(boost::filesystem::exists(system_db_file));
128  auto data_dir = base_path / "mapd_data";
129  DiskCacheConfig disk_cache_config{(base_path / "omnisci_disk_cache").string(),
133 
136  g_calcite =
137  std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024, 5000, true, udf_filename);
138  ExtensionFunctionsWhitelist::add(g_calcite->getExtensionFunctionWhitelist());
139  if (!udf_filename.empty()) {
140  ExtensionFunctionsWhitelist::addUdfs(g_calcite->getUserDefinedFunctionWhitelist());
141  }
142 
144  auto udtfs = ThriftSerializers::to_thrift(
146  std::vector<TUserDefinedFunction> udfs = {};
147  g_calcite->setRuntimeExtensionFunctions(udfs, udtfs, /*is_runtime=*/false);
148 
149  std::unique_ptr<CudaMgr_Namespace::CudaMgr> cuda_mgr;
150 #ifdef HAVE_CUDA
151  if (uses_gpus) {
152  cuda_mgr = std::make_unique<CudaMgr_Namespace::CudaMgr>(-1, 0);
153  }
154 #else
155  uses_gpus = false;
156 #endif
157  SystemParameters mapd_params;
158  mapd_params.gpu_buffer_mem_bytes = max_gpu_mem;
159  mapd_params.aggregator = !leaf_servers.empty();
160 
161  auto data_mgr = std::make_shared<Data_Namespace::DataMgr>(data_dir.string(),
162  mapd_params,
163  std::move(cuda_mgr),
164  uses_gpus,
165  reserved_gpu_mem,
166  0,
167  disk_cache_config);
168 
169  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
170 
171  sys_cat.init(base_path.string(),
172  data_mgr,
173  {},
174  g_calcite,
175  false,
176  mapd_params.aggregator,
177  string_servers);
178 
179  if (create_user) {
180  if (!sys_cat.getMetadataForUser(user_name, user)) {
181  sys_cat.createUser(user_name, passwd, false, "", true);
182  }
183  }
184  CHECK(sys_cat.getMetadataForUser(user_name, user));
185  CHECK(bcrypt_checkpw(passwd.c_str(), user.passwd_hash.c_str()) == 0);
186 
187  if (create_db) {
188  if (!sys_cat.getMetadataForDB(db_name, db)) {
189  sys_cat.createDatabase(db_name, user.userId);
190  }
191  }
192  CHECK(sys_cat.getMetadataForDB(db_name, db));
193  CHECK(user.isSuper || (user.userId == db.dbOwner));
194  auto cat = sys_cat.getCatalog(
195  base_path.string(), db, data_mgr, string_servers, g_calcite, create_db);
196  session_info_ = std::make_unique<Catalog_Namespace::SessionInfo>(
197  cat, user, ExecutorDeviceType::GPU, "");
198 }
199 
200 QueryRunner::~QueryRunner() {
202 }
203 
204 void QueryRunner::resizeDispatchQueue(const size_t num_executors) {
205  dispatch_queue_ = std::make_unique<QueryDispatchQueue>(num_executors);
206 }
207 
208 QueryRunner::QueryRunner(std::unique_ptr<Catalog_Namespace::SessionInfo> session)
209  : session_info_(std::move(session))
210  , dispatch_queue_(std::make_unique<QueryDispatchQueue>(1)) {}
211 
212 std::shared_ptr<Catalog_Namespace::Catalog> QueryRunner::getCatalog() const {
214  return session_info_->get_catalog_ptr();
215 }
216 
217 std::shared_ptr<Calcite> QueryRunner::getCalcite() const {
218  // TODO: Embed Calcite shared_ptr ownership in QueryRunner
219  return g_calcite;
220 }
221 
222 bool QueryRunner::gpusPresent() const {
224  return session_info_->getCatalog().getDataMgr().gpusPresent();
225 }
226 
227 void QueryRunner::clearGpuMemory() const {
230 }
231 
232 void QueryRunner::clearCpuMemory() const {
235 }
236 
237 std::string apply_copy_to_shim(const std::string& query_str) {
238  auto result = query_str;
239  {
240  boost::regex copy_to{R"(COPY\s*\(([^#])(.+)\)\s+TO\s)",
241  boost::regex::extended | boost::regex::icase};
242  apply_shim(result, copy_to, [](std::string& result, const boost::smatch& what) {
243  result.replace(
244  what.position(), what.length(), "COPY (#~#" + what[1] + what[2] + "#~#) TO ");
245  });
246  }
247  return result;
248 }
249 
250 QueryHint QueryRunner::getParsedQueryHint(const std::string& query_str) {
253  auto query_state = create_query_state(session_info_, query_str);
254  const auto& cat = session_info_->getCatalog();
256  auto calcite_mgr = cat.getCalciteMgr();
257  const auto query_ra = calcite_mgr
258  ->process(query_state->createQueryStateProxy(),
259  pg_shim(query_str),
260  {},
261  true,
262  false,
263  false,
264  true)
265  .plan_result;
266  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra);
267  const auto& query_hints = ra_executor.getParsedQueryHints();
268  return query_hints;
269 }
270 
271 void QueryRunner::runDDLStatement(const std::string& stmt_str_in) {
274 
275  std::string stmt_str = stmt_str_in;
276  // First remove special chars
277  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
278  // Then remove spaces
279  boost::algorithm::trim_left(stmt_str);
280 
281  ParserWrapper pw{stmt_str};
282  if (pw.is_copy_to) {
283  stmt_str = apply_copy_to_shim(stmt_str_in);
284  }
285 
286  auto query_state = create_query_state(session_info_, stmt_str);
287  auto stdlog = STDLOG(query_state);
288 
289  SQLParser parser;
290  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
291  std::string last_parsed;
292  CHECK_EQ(parser.parse(stmt_str, parse_trees, last_parsed), 0) << stmt_str_in;
293  CHECK_EQ(parse_trees.size(), size_t(1));
294  auto stmt = parse_trees.front().get();
295  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
296  CHECK(ddl);
297  ddl->execute(*session_info_);
298 }
299 
300 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
301  const ExecutorDeviceType device_type,
302  const bool hoist_literals,
303  const bool allow_loop_joins) {
306 
307  ParserWrapper pw{query_str};
308  if (pw.isCalcitePathPermissable()) {
309  const auto execution_result =
310  runSelectQuery(query_str, device_type, hoist_literals, allow_loop_joins);
311  VLOG(1) << session_info_->getCatalog().getDataMgr().getSystemMemoryUsage();
312  return execution_result->getRows();
313  }
314 
315  auto query_state = create_query_state(session_info_, query_str);
316  auto stdlog = STDLOG(query_state);
317 
318  SQLParser parser;
319  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
320  std::string last_parsed;
321  CHECK_EQ(parser.parse(query_str, parse_trees, last_parsed), 0) << query_str;
322  CHECK_EQ(parse_trees.size(), size_t(1));
323  auto stmt = parse_trees.front().get();
324  auto insert_values_stmt = dynamic_cast<InsertValuesStmt*>(stmt);
325  CHECK(insert_values_stmt);
326  insert_values_stmt->execute(*session_info_);
327  return nullptr;
328 }
329 
330 std::shared_ptr<Executor> QueryRunner::getExecutor() const {
333  auto query_state = create_query_state(session_info_, "");
334  auto stdlog = STDLOG(query_state);
336  return executor;
337 }
338 
339 std::shared_ptr<ResultSet> QueryRunner::runSQLWithAllowingInterrupt(
340  const std::string& query_str,
341  std::shared_ptr<Executor> executor,
342  const std::string& session_id,
343  const ExecutorDeviceType device_type,
344  const double running_query_check_freq,
345  const unsigned pending_query_check_freq) {
348  auto session_info =
349  std::make_shared<Catalog_Namespace::SessionInfo>(session_info_->get_catalog_ptr(),
350  session_info_->get_currentUser(),
352  session_id);
353  auto query_state = create_query_state(session_info, query_str);
354  auto stdlog = STDLOG(query_state);
355  const auto& cat = query_state->getConstSessionInfo()->getCatalog();
356  std::string query_ra{""};
357 
358  std::shared_ptr<ExecutionResult> result;
359  auto query_launch_task = std::make_shared<QueryDispatchQueue::Task>(
360  [&cat, &query_ra, &device_type, &query_state, &result](const size_t worker_id) {
361  auto executor = Executor::getExecutor(worker_id);
364 
366  true,
367  false,
368  true,
369  false,
370  false,
371  false,
372  false,
373  10000,
374  false,
375  false,
377  true,
378  1000};
379  {
380  // async query initiation for interrupt test
381  // incurs data race warning in TSAN since
382  // calcite_mgr is shared across multiple query threads
383  // so here we lock the manager during query parsing
384  std::lock_guard<std::mutex> calcite_lock_guard(calcite_lock);
385  auto calcite_mgr = cat.getCalciteMgr();
386  query_ra = calcite_mgr
387  ->process(query_state->createQueryStateProxy(),
388  pg_shim(query_state->getQueryStr()),
389  {},
390  true,
391  false,
392  false,
393  true)
394  .plan_result;
395  }
396  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra, query_state);
397  const auto& query_hints = ra_executor.getParsedQueryHints();
398  if (query_hints.cpu_mode) {
400  }
401  result = std::make_shared<ExecutionResult>(
402  ra_executor.executeRelAlgQuery(co, eo, false, nullptr));
403  });
404  {
406  auto submitted_time = std::chrono::system_clock::now();
407  query_state->setQuerySubmittedTime(submitted_time);
408  executor->enrollQuerySession(session_id,
409  query_str,
410  submitted_time,
412  QuerySessionStatus::QueryStatus::PENDING_QUEUE);
413  }
415  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
416  auto result_future = query_launch_task->get_future();
417  result_future.get();
418  CHECK(result);
419  return result->getRows();
420 }
421 
422 std::vector<std::shared_ptr<ResultSet>> QueryRunner::runMultipleStatements(
423  const std::string& sql,
424  const ExecutorDeviceType dt) {
425  std::vector<std::shared_ptr<ResultSet>> results;
426  // TODO: Need to properly handle escaped semicolons instead of doing a naive split().
427  auto fields = split(sql, ";");
428  for (const auto& field : fields) {
429  auto text = strip(field) + ";";
430  if (text == ";") {
431  continue;
432  }
433  // TODO: Maybe remove this redundant parsing after enhancing Parser::Stmt?
434  SQLParser parser;
435  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
436  std::string last_parsed;
437  CHECK_EQ(parser.parse(text, parse_trees, last_parsed), 0);
438  CHECK_EQ(parse_trees.size(), size_t(1));
439  auto stmt = parse_trees.front().get();
440  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
441  Parser::DMLStmt* dml = dynamic_cast<Parser::DMLStmt*>(stmt);
442  if (ddl != nullptr && dml == nullptr) {
443  runDDLStatement(text);
444  results.push_back(nullptr);
445  } else if (ddl == nullptr && dml != nullptr) {
446  results.push_back(runSQL(text, dt, true, true));
447  } else {
448  throw std::runtime_error("Unexpected SQL statement type: " + text);
449  }
450  }
451  return results;
452 }
453 
454 void QueryRunner::runImport(Parser::CopyTableStmt* import_stmt) {
455  CHECK(import_stmt);
456  import_stmt->execute(*session_info_);
457 }
458 
459 std::unique_ptr<import_export::Loader> QueryRunner::getLoader(
460  const TableDescriptor* td) const {
461  auto cat = getCatalog();
462  return std::make_unique<import_export::Loader>(*cat, td);
463 }
464 
465 namespace {
466 
467 std::shared_ptr<ExecutionResult> run_select_query_with_filter_push_down(
468  QueryStateProxy query_state_proxy,
469  const ExecutorDeviceType device_type,
470  const bool hoist_literals,
471  const bool allow_loop_joins,
472  const bool just_explain,
473  const bool with_filter_push_down) {
474  auto const& query_state = query_state_proxy.getQueryState();
475  const auto& cat = query_state.getConstSessionInfo()->getCatalog();
479 
481  true,
482  just_explain,
483  allow_loop_joins,
484  false,
485  false,
486  false,
487  false,
488  10000,
489  with_filter_push_down,
490  false,
492  false,
493  1000};
494  auto calcite_mgr = cat.getCalciteMgr();
495  const auto query_ra = calcite_mgr
496  ->process(query_state_proxy,
497  pg_shim(query_state.getQueryStr()),
498  {},
499  true,
500  false,
501  false,
502  true)
503  .plan_result;
504  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra);
505  const auto& query_hints = ra_executor.getParsedQueryHints();
506  if (query_hints.cpu_mode) {
508  }
509  auto result = std::make_shared<ExecutionResult>(
510  ra_executor.executeRelAlgQuery(co, eo, false, nullptr));
511  const auto& filter_push_down_requests = result->getPushedDownFilterInfo();
512  if (!filter_push_down_requests.empty()) {
513  std::vector<TFilterPushDownInfo> filter_push_down_info;
514  for (const auto& req : filter_push_down_requests) {
515  TFilterPushDownInfo filter_push_down_info_for_request;
516  filter_push_down_info_for_request.input_prev = req.input_prev;
517  filter_push_down_info_for_request.input_start = req.input_start;
518  filter_push_down_info_for_request.input_next = req.input_next;
519  filter_push_down_info.push_back(filter_push_down_info_for_request);
520  }
521  const auto new_query_ra = calcite_mgr
522  ->process(query_state_proxy,
523  pg_shim(query_state.getQueryStr()),
524  filter_push_down_info,
525  true,
526  false,
527  false,
528  true)
529  .plan_result;
530  const ExecutionOptions eo_modified{eo.output_columnar_hint,
531  eo.allow_multifrag,
532  eo.just_explain,
533  eo.allow_loop_joins,
534  eo.with_watchdog,
535  eo.jit_debug,
536  eo.just_validate,
539  /*find_push_down_candidates=*/false,
540  /*just_calcite_explain=*/false,
543  auto new_ra_executor = RelAlgExecutor(executor.get(), cat, new_query_ra);
544  return std::make_shared<ExecutionResult>(
545  new_ra_executor.executeRelAlgQuery(co, eo_modified, false, nullptr));
546  } else {
547  return result;
548  }
549 }
550 
551 } // namespace
552 
553 std::shared_ptr<ExecutionResult> QueryRunner::runSelectQuery(
554  const std::string& query_str,
555  const ExecutorDeviceType device_type,
556  const bool hoist_literals,
557  const bool allow_loop_joins,
558  const bool just_explain) {
561  auto query_state = create_query_state(session_info_, query_str);
562  auto stdlog = STDLOG(query_state);
564  return run_select_query_with_filter_push_down(query_state->createQueryStateProxy(),
565  device_type,
566  hoist_literals,
567  allow_loop_joins,
568  just_explain,
570  }
571 
572  const auto& cat = session_info_->getCatalog();
573 
574  std::shared_ptr<ExecutionResult> result;
575  auto query_launch_task =
576  std::make_shared<QueryDispatchQueue::Task>([&cat,
577  &query_str,
578  &device_type,
579  &allow_loop_joins,
580  &just_explain,
581  &query_state,
582  &result](const size_t worker_id) {
583  auto executor = Executor::getExecutor(worker_id);
586 
588  true,
589  just_explain,
590  allow_loop_joins,
591  false,
592  false,
593  false,
594  false,
595  10000,
596  false,
597  false,
599  false,
600  1000};
601  auto calcite_mgr = cat.getCalciteMgr();
602  const auto query_ra = calcite_mgr
603  ->process(query_state->createQueryStateProxy(),
604  pg_shim(query_str),
605  {},
606  true,
607  false,
609  true)
610  .plan_result;
611  auto ra_executor = RelAlgExecutor(executor.get(), cat, query_ra);
612  const auto& query_hints = ra_executor.getParsedQueryHints();
613  if (query_hints.cpu_mode) {
615  }
616  result = std::make_shared<ExecutionResult>(
617  ra_executor.executeRelAlgQuery(co, eo, false, nullptr));
618  });
620  dispatch_queue_->submit(query_launch_task, /*is_update_delete=*/false);
621  auto result_future = query_launch_task->get_future();
622  result_future.get();
623  CHECK(result);
624  return result;
625 }
626 
627 const int32_t* QueryRunner::getCachedJoinHashTable(size_t idx) {
628  auto hash_table_cache = PerfectJoinHashTable::getHashTableCache();
629  CHECK(hash_table_cache);
630  auto hash_table = hash_table_cache->getCachedHashTable(idx);
631  CHECK(hash_table);
632  return reinterpret_cast<int32_t*>(hash_table->getCpuBuffer());
633 };
634 
635 const int8_t* QueryRunner::getCachedBaselineHashTable(size_t idx) {
636  auto hash_table_cache = BaselineJoinHashTable::getHashTableCache();
637  CHECK(hash_table_cache);
638  auto hash_table = hash_table_cache->getCachedHashTable(idx);
639  CHECK(hash_table);
640  return hash_table->getCpuBuffer();
641 };
642 
643 size_t QueryRunner::getEntryCntCachedBaselineHashTable(size_t idx) {
644  auto hash_table_cache = BaselineJoinHashTable::getHashTableCache();
645  CHECK(hash_table_cache);
646  auto hash_table = hash_table_cache->getCachedHashTable(idx);
647  CHECK(hash_table);
648  return hash_table->getEntryCount();
649 }
650 
651 size_t QueryRunner::getNumberOfCachedJoinHashTables() {
652  auto hash_table_cache = PerfectJoinHashTable::getHashTableCache();
653  CHECK(hash_table_cache);
654  return hash_table_cache->getNumberOfCachedHashTables();
655 };
656 
657 size_t QueryRunner::getNumberOfCachedBaselineJoinHashTables() {
658  auto hash_table_cache = BaselineJoinHashTable::getHashTableCache();
659  CHECK(hash_table_cache);
660  return hash_table_cache->getNumberOfCachedHashTables();
661 };
662 
663 size_t QueryRunner::getNumberOfCachedOverlapsHashTables() {
665 }
666 
667 void QueryRunner::reset() {
668  qr_instance_.reset(nullptr);
670 }
671 
672 ImportDriver::ImportDriver(std::shared_ptr<Catalog_Namespace::Catalog> cat,
674  const ExecutorDeviceType dt)
675  : QueryRunner(std::make_unique<Catalog_Namespace::SessionInfo>(cat, user, dt, "")) {}
676 
677 void ImportDriver::importGeoTable(const std::string& file_path,
678  const std::string& table_name,
679  const bool compression,
680  const bool create_table,
681  const bool explode_collections) {
682  using namespace import_export;
683 
685  const std::string geo_column_name(OMNISCI_GEO_PREFIX);
686 
687  CopyParams copy_params;
688  if (compression) {
690  copy_params.geo_coords_comp_param = 32;
691  } else {
692  copy_params.geo_coords_encoding = EncodingType::kENCODING_NONE;
693  copy_params.geo_coords_comp_param = 0;
694  }
695  copy_params.geo_assign_render_groups = true;
696  copy_params.geo_explode_collections = explode_collections;
697 
698  auto cds = Importer::gdalToColumnDescriptors(file_path, geo_column_name, copy_params);
699  std::map<std::string, std::string> colname_to_src;
700  for (auto& cd : cds) {
701  const auto col_name_sanitized = ImportHelpers::sanitize_name(cd.columnName);
702  const auto ret =
703  colname_to_src.insert(std::make_pair(col_name_sanitized, cd.columnName));
704  CHECK(ret.second);
705  cd.columnName = col_name_sanitized;
706  }
707 
708  auto& cat = session_info_->getCatalog();
709 
710  if (create_table) {
711  const auto td = cat.getMetadataForTable(table_name);
712  if (td != nullptr) {
713  throw std::runtime_error("Error: Table " + table_name +
714  " already exists. Possible failure to correctly re-create "
715  "mapd_data directory.");
716  }
717  if (table_name != ImportHelpers::sanitize_name(table_name)) {
718  throw std::runtime_error("Invalid characters in table name: " + table_name);
719  }
720 
721  std::string stmt{"CREATE TABLE " + table_name};
722  std::vector<std::string> col_stmts;
723 
724  for (auto& cd : cds) {
725  if (cd.columnType.get_type() == SQLTypes::kINTERVAL_DAY_TIME ||
726  cd.columnType.get_type() == SQLTypes::kINTERVAL_YEAR_MONTH) {
727  throw std::runtime_error(
728  "Unsupported type: INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH for col " +
729  cd.columnName + " (table: " + table_name + ")");
730  }
731 
732  if (cd.columnType.get_type() == SQLTypes::kDECIMAL) {
733  if (cd.columnType.get_precision() == 0 && cd.columnType.get_scale() == 0) {
734  cd.columnType.set_precision(14);
735  cd.columnType.set_scale(7);
736  }
737  }
738 
739  std::string col_stmt;
740  col_stmt.append(cd.columnName + " " + cd.columnType.get_type_name() + " ");
741 
742  if (cd.columnType.get_compression() != EncodingType::kENCODING_NONE) {
743  col_stmt.append("ENCODING " + cd.columnType.get_compression_name() + " ");
744  } else {
745  if (cd.columnType.is_string()) {
746  col_stmt.append("ENCODING NONE");
747  } else if (cd.columnType.is_geometry()) {
748  if (cd.columnType.get_output_srid() == 4326) {
749  col_stmt.append("ENCODING NONE");
750  }
751  }
752  }
753  col_stmts.push_back(col_stmt);
754  }
755 
756  stmt.append(" (" + boost::algorithm::join(col_stmts, ",") + ");");
757  runDDLStatement(stmt);
758 
759  LOG(INFO) << "Created table: " << table_name;
760  } else {
761  LOG(INFO) << "Not creating table: " << table_name;
762  }
763 
764  const auto td = cat.getMetadataForTable(table_name);
765  if (td == nullptr) {
766  throw std::runtime_error("Error: Failed to create table " + table_name);
767  }
768 
769  import_export::Importer importer(cat, td, file_path, copy_params);
770  auto ms = measure<>::execution([&]() { importer.importGDAL(colname_to_src); });
771  LOG(INFO) << "Import Time for " << table_name << ": " << (double)ms / 1000.0 << " s";
772 }
773 
774 } // namespace QueryRunner
bool g_enable_calcite_view_optimize
Definition: QueryRunner.cpp:52
Classes used to wrap parser calls for calcite redirection.
static void addUdfs(const std::string &json_func_sigs)
#define CHECK_EQ(x, y)
Definition: Logger.h:205
#define CALCITEPORT
Definition: QueryRunner.cpp:44
std::string cat(Ts &&...args)
std::unique_ptr< QueryDispatchQueue > dispatch_queue_
Definition: QueryRunner.h:190
ImportStatus importGDAL(std::map< std::string, std::string > colname_to_src)
Definition: Importer.cpp:4745
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
ExecutorDeviceType
std::string strip(std::string_view str)
trim any whitespace from the left and right ends of a string
#define LOG(tag)
Definition: Logger.h:188
std::string join(T const &container, std::string const &delim)
static void add(const std::string &json_func_sigs)
std::mutex calcite_lock
Definition: QueryRunner.cpp:53
static std::shared_ptr< Executor > getExecutor(const ExecutorId id, const std::string &debug_dir="", const std::string &debug_file="", const SystemParameters system_parameters=SystemParameters())
Definition: Execute.cpp:157
ExecutorOptLevel opt_level
void set_once_fatal_func(FatalFunc fatal_func)
Definition: Logger.cpp:307
void execute(const Catalog_Namespace::SessionInfo &session) override
std::vector< std::string > split(std::string_view str, std::string_view delim, std::optional< size_t > maxsplit)
split apart a string into a vector of substrings
#define LOG_IF(severity, condition)
Definition: Logger.h:287
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:179
virtual std::shared_ptr< ResultSet > runSQL(const std::string &query_str, const ExecutorDeviceType device_type, const bool hoist_literals=true, const bool allow_loop_joins=true)
static void addShutdownCallback(std::function< void()> shutdown_callback)
ImportDriver(std::shared_ptr< Catalog_Namespace::Catalog > cat, const Catalog_Namespace::UserMetadata &user, const ExecutorDeviceType dt=ExecutorDeviceType::GPU)
static auto * getHashTableCache()
This file contains the class specification and related data structures for Catalog.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool g_enable_columnar_output
Definition: Execute.cpp:93
static SysCatalog & instance()
Definition: SysCatalog.h:286
const bool allow_multifrag
const bool just_validate
void init(LogOptions const &log_opts)
Definition: Logger.cpp:280
QueryState & getQueryState()
Definition: QueryState.h:181
const std::string OMNISCI_DEFAULT_DB
Definition: SysCatalog.h:58
const bool with_dynamic_watchdog
const double gpu_input_mem_limit_percent
virtual std::shared_ptr< ExecutionResult > runSelectQuery(const std::string &query_str, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain=false)
virtual void runDDLStatement(const std::string &)
static std::unique_ptr< QueryRunner > qr_instance_
Definition: QueryRunner.h:187
double g_gpu_mem_limit_percent
Definition: QueryRunner.cpp:49
std::string sanitize_name(const std::string &name)
bool g_serialize_temp_tables
Definition: Catalog.cpp:98
QueryHint getParsedQueryHints()
const std::string OMNISCI_GEO_PREFIX
Definition: Transforms.h:22
ExecutorDeviceType device_type
void apply_shim(std::string &result, const boost::regex &reg_expr, const std::function< void(std::string &, const boost::smatch &)> &shim_fn)
void importGeoTable(const std::string &file_path, const std::string &table_name, const bool compression, const bool create_table, const bool explode_collections)
TExtArgumentType::type to_thrift(const ExtArgumentType &t)
static size_t getCombinedHashTableCacheSize()
std::shared_ptr< Catalog_Namespace::SessionInfo > session_info_
Definition: QueryRunner.h:189
std::shared_ptr< Calcite > g_calcite
Definition: QueryRunner.cpp:58
static std::shared_ptr< query_state::QueryState > create_query_state(Ts &&...args)
Definition: QueryRunner.h:169
static CompilationOptions defaults(const ExecutorDeviceType device_type=ExecutorDeviceType::GPU)
bool g_enable_filter_push_down
Definition: Execute.cpp:89
const bool allow_loop_joins
#define CHECK(condition)
Definition: Logger.h:197
static std::vector< TableFunction > get_table_funcs(const std::string &name, const bool is_gpu)
Serializers for query engine types to/from thrift.
std::shared_ptr< ExecutionResult > run_select_query_with_filter_push_down(QueryStateProxy query_state_proxy, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain, const bool with_filter_push_down)
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
size_t g_leaf_count
Definition: ParserNode.cpp:74
const bool allow_runtime_query_interrupt
const unsigned dynamic_watchdog_time_limit
std::string pg_shim(const std::string &query)
#define STDLOG(...)
Definition: QueryState.h:234
#define VLOG(n)
Definition: Logger.h:291
const bool with_watchdog
std::atomic< bool > isSuper
Definition: SysCatalog.h:96
static void nukeCacheOfExecutors()
Definition: Execute.h:392
std::string apply_copy_to_shim(const std::string &query_str)
static const ExecutorId UNITARY_EXECUTOR_ID
Definition: Execute.h:377
EncodingType geo_coords_encoding
Definition: CopyParams.h:73