OmniSciDB  addbbd5075
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryRunner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryRunner.h"
18 
19 #include "Calcite/Calcite.h"
20 #include "Catalog/Catalog.h"
21 #include "DistributedLoader.h"
22 #include "Parser/ParserWrapper.h"
23 #include "Parser/parser.h"
28 #include "Shared/ConfigResolve.h"
29 #include "Shared/Logger.h"
30 #include "Shared/MapDParameters.h"
31 #include "Shared/StringTransform.h"
32 #include "bcrypt.h"
33 #include "gen-cpp/CalciteServer.h"
34 
35 #include <boost/filesystem/operations.hpp>
36 #include <csignal>
37 #include <random>
38 
39 #define CALCITEPORT 3279
40 
41 extern size_t g_leaf_count;
42 extern bool g_enable_filter_push_down;
43 
45 
46 extern bool g_serialize_temp_tables;
47 
48 using namespace Catalog_Namespace;
49 namespace {
50 
51 std::shared_ptr<Calcite> g_calcite = nullptr;
52 
54  if (g_calcite) {
55  g_calcite->close_calcite_server();
56  g_calcite.reset();
57  }
58 }
59 
60 void mapd_signal_handler(int signal_number) {
61  LOG(ERROR) << "Interrupt signal (" << signal_number << ") received.";
63  // shut down logging force a flush
65  // terminate program
66  if (signal_number == SIGTERM) {
67  std::exit(EXIT_SUCCESS);
68  } else {
69  std::exit(signal_number);
70  }
71 }
72 
74  std::signal(SIGTERM, mapd_signal_handler);
75  std::signal(SIGSEGV, mapd_signal_handler);
76  std::signal(SIGABRT, mapd_signal_handler);
77 }
78 
79 } // namespace
80 
81 namespace QueryRunner {
82 
83 std::unique_ptr<QueryRunner> QueryRunner::qr_instance_ = nullptr;
84 
85 query_state::QueryStates QueryRunner::query_states_;
86 
87 QueryRunner* QueryRunner::init(const char* db_path,
88  const std::string& user,
89  const std::string& pass,
90  const std::string& db_name,
91  const std::vector<LeafHostInfo>& string_servers,
92  const std::vector<LeafHostInfo>& leaf_servers,
93  const std::string& udf_filename,
94  bool uses_gpus,
95  const size_t max_gpu_mem,
96  const int reserved_gpu_mem,
97  const bool create_user,
98  const bool create_db) {
99  LOG_IF(FATAL, !leaf_servers.empty()) << "Distributed test runner not supported.";
100  CHECK(leaf_servers.empty());
101  qr_instance_.reset(new QueryRunner(db_path,
102  user,
103  pass,
104  db_name,
105  string_servers,
106  leaf_servers,
107  udf_filename,
108  uses_gpus,
109  max_gpu_mem,
110  reserved_gpu_mem,
111  create_user,
112  create_db));
113  return qr_instance_.get();
114 }
115 
116 QueryRunner::QueryRunner(const char* db_path,
117  const std::string& user_name,
118  const std::string& passwd,
119  const std::string& db_name,
120  const std::vector<LeafHostInfo>& string_servers,
121  const std::vector<LeafHostInfo>& leaf_servers,
122  const std::string& udf_filename,
123  bool uses_gpus,
124  const size_t max_gpu_mem,
125  const int reserved_gpu_mem,
126  const bool create_user,
127  const bool create_db) {
129 
130  boost::filesystem::path base_path{db_path};
131  CHECK(boost::filesystem::exists(base_path));
132  auto system_db_file = base_path / "mapd_catalogs" / OMNISCI_DEFAULT_DB;
133  CHECK(boost::filesystem::exists(system_db_file));
134  auto data_dir = base_path / "mapd_data";
137 
140  g_calcite = std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024, udf_filename);
141  ExtensionFunctionsWhitelist::add(g_calcite->getExtensionFunctionWhitelist());
142  if (!udf_filename.empty()) {
143  ExtensionFunctionsWhitelist::addUdfs(g_calcite->getUserDefinedFunctionWhitelist());
144  }
145 
147 
148  if (std::is_same<CudaBuildSelector, PreprocessorFalse>::value) {
149  uses_gpus = false;
150  }
151  MapDParameters mapd_params;
152  mapd_params.gpu_buffer_mem_bytes = max_gpu_mem;
153  mapd_params.aggregator = !leaf_servers.empty();
154 
155  auto data_mgr = std::make_shared<Data_Namespace::DataMgr>(
156  data_dir.string(), mapd_params, uses_gpus, -1, 0, reserved_gpu_mem);
157 
158  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
159 
160  sys_cat.init(base_path.string(),
161  data_mgr,
162  {},
163  g_calcite,
164  false,
165  mapd_params.aggregator,
166  string_servers);
167 
168  if (create_user) {
169  if (!sys_cat.getMetadataForUser(user_name, user)) {
170  sys_cat.createUser(user_name, passwd, false, "", true);
171  }
172  }
173  CHECK(sys_cat.getMetadataForUser(user_name, user));
174  CHECK(bcrypt_checkpw(passwd.c_str(), user.passwd_hash.c_str()) == 0);
175 
176  if (create_db) {
177  if (!sys_cat.getMetadataForDB(db_name, db)) {
178  sys_cat.createDatabase(db_name, user.userId);
179  }
180  }
181  CHECK(sys_cat.getMetadataForDB(db_name, db));
182  CHECK(user.isSuper || (user.userId == db.dbOwner));
183  auto cat = std::make_shared<Catalog_Namespace::Catalog>(
184  base_path.string(), db, data_mgr, string_servers, g_calcite, create_db);
185  Catalog_Namespace::Catalog::set(cat->getCurrentDB().dbName, cat);
186  session_info_ = std::make_unique<Catalog_Namespace::SessionInfo>(
187  cat, user, ExecutorDeviceType::GPU, "");
188 }
189 
190 QueryRunner::QueryRunner(std::unique_ptr<Catalog_Namespace::SessionInfo> session)
191  : session_info_(std::move(session)) {}
192 
193 std::shared_ptr<Catalog_Namespace::Catalog> QueryRunner::getCatalog() const {
195  return session_info_->get_catalog_ptr();
196 }
197 
198 std::shared_ptr<Calcite> QueryRunner::getCalcite() const {
199  // TODO: Embed Calcite shared_ptr ownership in QueryRunner
200  return g_calcite;
201 }
202 
203 bool QueryRunner::gpusPresent() const {
205  return session_info_->getCatalog().getDataMgr().gpusPresent();
206 }
207 
208 void QueryRunner::clearGpuMemory() const {
211 }
212 
213 void QueryRunner::clearCpuMemory() const {
216 }
217 
218 void QueryRunner::runDDLStatement(const std::string& stmt_str_in) {
221 
222  auto stmt_str = stmt_str_in;
223  // First remove special chars
224  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
225  // Then remove spaces
226  boost::algorithm::trim_left(stmt_str);
227 
228  auto query_state = create_query_state(session_info_, stmt_str);
229  auto stdlog = STDLOG(query_state);
230 
231  SQLParser parser;
232  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
233  std::string last_parsed;
234  CHECK_EQ(parser.parse(stmt_str, parse_trees, last_parsed), 0) << stmt_str_in;
235  CHECK_EQ(parse_trees.size(), size_t(1));
236  auto stmt = parse_trees.front().get();
237  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
238  CHECK(ddl);
239  ddl->execute(*session_info_);
240 }
241 
242 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
243  const ExecutorDeviceType device_type,
244  const bool hoist_literals,
245  const bool allow_loop_joins) {
248 
249  ParserWrapper pw{query_str};
250  if (pw.isCalcitePathPermissable()) {
251  if (ir_file_writer_ && (pw.getDMLType() == ParserWrapper::DMLType::NotDML)) {
252  try {
253  const auto result = runSelectQuery(
254  query_str, device_type, hoist_literals, allow_loop_joins, true);
255  const auto crt_row = result.getRows()->getNextRow(true, true);
256  CHECK_EQ(size_t(1), crt_row.size());
257  const auto scalar_ir = boost::get<ScalarTargetValue>(&crt_row[0]);
258  CHECK(scalar_ir);
259  const auto ir_ns = boost::get<NullableString>(scalar_ir);
260  CHECK(ir_ns);
261  const auto ir_str = boost::get<std::string>(ir_ns);
262  CHECK(ir_str);
263  (*ir_file_writer_)(query_str, *ir_str);
264  } catch (const std::exception& e) {
265  LOG(WARNING) << "Failed to run EXPLAIN on SELECT query: " << query_str << " ("
266  << e.what() << "). Proceeding with query execution.";
267  }
268  }
269  const auto execution_result =
270  runSelectQuery(query_str, device_type, hoist_literals, allow_loop_joins);
271 
272  return execution_result.getRows();
273  }
274 
275  auto query_state = create_query_state(session_info_, query_str);
276  auto stdlog = STDLOG(query_state);
277 
278  const auto& cat = session_info_->getCatalog();
279  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
280 
281  auto plan =
282  std::unique_ptr<Planner::RootPlan>(parsePlan(query_state->createQueryStateProxy()));
283 
284 #ifdef HAVE_CUDA
285  return executor->execute(plan.get(),
286  *session_info_,
287  hoist_literals,
288  device_type,
290  true,
291  allow_loop_joins);
292 #else
293  return executor->execute(plan.get(),
294  *session_info_,
295  hoist_literals,
296  device_type,
298  false,
299  allow_loop_joins);
300 #endif
301 }
302 
303 std::vector<std::shared_ptr<ResultSet>> QueryRunner::runMultipleStatements(
304  const std::string& sql,
305  const ExecutorDeviceType dt) {
306  std::vector<std::shared_ptr<ResultSet>> results;
307  // TODO: Need to properly handle escaped semicolons instead of doing a naive split().
308  auto fields = split(sql, ";");
309  for (const auto& field : fields) {
310  auto text = strip(field) + ";";
311  if (text == ";") {
312  continue;
313  }
314  // TODO: Maybe remove this redundant parsing after enhancing Parser::Stmt?
315  SQLParser parser;
316  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
317  std::string last_parsed;
318  CHECK_EQ(parser.parse(text, parse_trees, last_parsed), 0);
319  CHECK_EQ(parse_trees.size(), size_t(1));
320  auto stmt = parse_trees.front().get();
321  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
322  Parser::DMLStmt* dml = dynamic_cast<Parser::DMLStmt*>(stmt);
323  if (ddl != nullptr && dml == nullptr) {
324  runDDLStatement(text);
325  results.push_back(nullptr);
326  } else if (ddl == nullptr && dml != nullptr) {
327  results.push_back(runSQL(text, dt, true, true));
328  } else {
329  throw std::runtime_error("Unexpected SQL statement type: " + text);
330  }
331  }
332  return results;
333 }
334 
335 void QueryRunner::runImport(Parser::CopyTableStmt* import_stmt) {
336  CHECK(import_stmt);
337  import_stmt->execute(*session_info_);
338 }
339 
340 std::unique_ptr<Importer_NS::Loader> QueryRunner::getLoader(
341  const TableDescriptor* td) const {
342  auto cat = getCatalog();
343  return std::make_unique<Importer_NS::Loader>(*cat, td);
344 }
345 
346 namespace {
347 
349  QueryStateProxy query_state_proxy,
350  const ExecutorDeviceType device_type,
351  const bool hoist_literals,
352  const bool allow_loop_joins,
353  const bool just_explain,
354  const bool with_filter_push_down) {
355  auto const& query_state = query_state_proxy.getQueryState();
356  const auto& cat = query_state.getConstSessionInfo()->getCatalog();
357  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
358  CompilationOptions co = {
359  device_type, true, ExecutorOptLevel::LoopStrengthReduction, false};
361  true,
362  just_explain,
363  allow_loop_joins,
364  false,
365  false,
366  false,
367  false,
368  10000,
369  with_filter_push_down,
370  false,
372  auto calcite_mgr = cat.getCalciteMgr();
373  const auto query_ra = calcite_mgr
374  ->process(query_state_proxy,
375  pg_shim(query_state.getQueryStr()),
376  {},
377  true,
378  false,
379  false)
380  .plan_result;
381  auto result =
382  RelAlgExecutor(executor.get(), cat, query_ra).executeRelAlgQuery(co, eo, nullptr);
383  const auto& filter_push_down_requests = result.getPushedDownFilterInfo();
384  if (!filter_push_down_requests.empty()) {
385  std::vector<TFilterPushDownInfo> filter_push_down_info;
386  for (const auto& req : filter_push_down_requests) {
387  TFilterPushDownInfo filter_push_down_info_for_request;
388  filter_push_down_info_for_request.input_prev = req.input_prev;
389  filter_push_down_info_for_request.input_start = req.input_start;
390  filter_push_down_info_for_request.input_next = req.input_next;
391  filter_push_down_info.push_back(filter_push_down_info_for_request);
392  }
393  const auto new_query_ra = calcite_mgr
394  ->process(query_state_proxy,
395  pg_shim(query_state.getQueryStr()),
396  filter_push_down_info,
397  true,
398  false,
399  false)
400  .plan_result;
401  const ExecutionOptions eo_modified{eo.output_columnar_hint,
402  eo.allow_multifrag,
403  eo.just_explain,
404  eo.allow_loop_joins,
405  eo.with_watchdog,
406  eo.jit_debug,
407  eo.just_validate,
410  /*find_push_down_candidates=*/false,
411  /*just_calcite_explain=*/false,
413  return RelAlgExecutor(executor.get(), cat, new_query_ra)
414  .executeRelAlgQuery(co, eo_modified, nullptr);
415  } else {
416  return result;
417  }
418 }
419 
420 } // namespace
421 
422 ExecutionResult QueryRunner::runSelectQuery(const std::string& query_str,
423  const ExecutorDeviceType device_type,
424  const bool hoist_literals,
425  const bool allow_loop_joins,
426  const bool just_explain) {
429  auto query_state = create_query_state(session_info_, query_str);
430  auto stdlog = STDLOG(query_state);
432  return run_select_query_with_filter_push_down(query_state->createQueryStateProxy(),
433  device_type,
434  hoist_literals,
435  allow_loop_joins,
436  just_explain,
438  }
439 
440  const auto& cat = session_info_->getCatalog();
441  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
442  CompilationOptions co = {
443  device_type, true, ExecutorOptLevel::LoopStrengthReduction, false};
445  true,
446  just_explain,
447  allow_loop_joins,
448  false,
449  false,
450  false,
451  false,
452  10000,
453  false,
454  false,
456  auto calcite_mgr = cat.getCalciteMgr();
457  const auto query_ra = calcite_mgr
458  ->process(query_state->createQueryStateProxy(),
459  pg_shim(query_str),
460  {},
461  true,
462  false,
463  false)
464  .plan_result;
465  return RelAlgExecutor(executor.get(), cat, query_ra)
466  .executeRelAlgQuery(co, eo, nullptr);
467 }
468 
469 void QueryRunner::reset() {
470  qr_instance_.reset(nullptr);
472 }
473 
474 } // namespace QueryRunner
Classes used to wrap parser calls for calcite redirection.
static void addUdfs(const std::string &json_func_sigs)
#define CHECK_EQ(x, y)
Definition: Logger.h:201
virtual ExecutionResult runSelectQuery(const std::string &query_str, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain=false)
#define CALCITEPORT
Definition: QueryRunner.cpp:39
ExecutionResult run_select_query_with_filter_push_down(QueryStateProxy query_state_proxy, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain, const bool with_filter_push_down)
ExecutorDeviceType
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters())
Definition: Execute.cpp:127
#define LOG(tag)
Definition: Logger.h:188
size_t gpu_buffer_mem_bytes
std::string strip(const std::string &str)
trim any whitespace from the left and right ends of a string
void mapd_signal_handler(int signal_number)
Definition: QueryRunner.cpp:60
static void add(const std::string &json_func_sigs)
std::unique_ptr< IRFileWriter > ir_file_writer_
Definition: QueryRunner.h:191
void set_once_fatal_func(FatalFunc fatal_func)
Definition: Logger.cpp:299
void execute(const Catalog_Namespace::SessionInfo &session) override
#define LOG_IF(severity, condition)
Definition: Logger.h:279
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:157
virtual std::shared_ptr< ResultSet > runSQL(const std::string &query_str, const ExecutorDeviceType device_type, const bool hoist_literals=true, const bool allow_loop_joins=true)
Planner::RootPlan * parsePlan(QueryStateProxy)
Definition: ParserUtils.cpp:69
This file contains the class specification and related data structures for Catalog.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool g_enable_columnar_output
Definition: Execute.cpp:86
static SysCatalog & instance()
Definition: SysCatalog.h:257
const bool allow_multifrag
CHECK(cgen_state)
const bool just_validate
size_t g_leaf_count
Definition: ParserNode.cpp:66
void init(LogOptions const &log_opts)
Definition: Logger.cpp:272
QueryState & getQueryState()
Definition: QueryState.h:172
const std::string OMNISCI_DEFAULT_DB
Definition: SysCatalog.h:59
const bool with_dynamic_watchdog
const double gpu_input_mem_limit_percent
virtual void runDDLStatement(const std::string &)
static std::unique_ptr< QueryRunner > qr_instance_
Definition: QueryRunner.h:186
double g_gpu_mem_limit_percent
Definition: QueryRunner.cpp:44
const bool output_columnar_hint
bool g_serialize_temp_tables
Definition: Catalog.cpp:85
static void set(const std::string &dbName, std::shared_ptr< Catalog > cat)
Definition: Catalog.cpp:3131
void shutdown()
Definition: Logger.cpp:306
std::vector< std::string > split(const std::string &str, const std::string &delim)
split apart a string into a vector of substrings
std::shared_ptr< Catalog_Namespace::SessionInfo > session_info_
Definition: QueryRunner.h:188
void register_signal_handler(int signum, void(*handler)(int))
Definition: MapDServer.cpp:116
std::shared_ptr< Calcite > g_calcite
Definition: QueryRunner.cpp:51
static std::shared_ptr< query_state::QueryState > create_query_state(Ts &&...args)
Definition: QueryRunner.h:164
bool g_enable_filter_push_down
Definition: Execute.cpp:82
const bool allow_loop_joins
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
specifies the content in-memory of a row in the table metadata table
const unsigned dynamic_watchdog_time_limit
static unsigned pass
Definition: testing.h:29
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:77
std::string pg_shim(const std::string &query)
#define STDLOG(...)
Definition: QueryState.h:225
const bool with_watchdog
std::atomic< bool > isSuper
Definition: SysCatalog.h:97