OmniSciDB  1dac507f6e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
QueryRunner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryRunner.h"
18 
19 #include "Calcite/Calcite.h"
20 #include "Catalog/Catalog.h"
21 #include "DistributedLoader.h"
22 #include "Parser/ParserWrapper.h"
23 #include "Parser/parser.h"
28 #include "Shared/ConfigResolve.h"
29 #include "Shared/Logger.h"
30 #include "Shared/MapDParameters.h"
31 #include "Shared/StringTransform.h"
32 #include "bcrypt.h"
33 #include "gen-cpp/CalciteServer.h"
34 
35 #include <boost/filesystem/operations.hpp>
36 #include <csignal>
37 #include <random>
38 
39 #define CALCITEPORT 3279
40 
41 extern size_t g_leaf_count;
42 extern bool g_enable_filter_push_down;
43 
45 
46 using namespace Catalog_Namespace;
47 namespace {
48 
49 std::shared_ptr<Calcite> g_calcite = nullptr;
50 
52  if (g_calcite) {
53  g_calcite->close_calcite_server();
54  g_calcite.reset();
55  }
56 }
57 
58 void mapd_signal_handler(int signal_number) {
59  LOG(ERROR) << "Interrupt signal (" << signal_number << ") received.";
61  // shut down logging force a flush
63  // terminate program
64  if (signal_number == SIGTERM) {
65  std::exit(EXIT_SUCCESS);
66  } else {
67  std::exit(signal_number);
68  }
69 }
70 
72  std::signal(SIGTERM, mapd_signal_handler);
73  std::signal(SIGSEGV, mapd_signal_handler);
74  std::signal(SIGABRT, mapd_signal_handler);
75 }
76 
77 } // namespace
78 
79 namespace QueryRunner {
80 
81 std::unique_ptr<QueryRunner> QueryRunner::qr_instance_ = nullptr;
82 
83 query_state::QueryStates QueryRunner::query_states_;
84 
85 QueryRunner* QueryRunner::init(const char* db_path,
86  const std::string& user,
87  const std::string& pass,
88  const std::string& db_name,
89  const std::vector<LeafHostInfo>& string_servers,
90  const std::vector<LeafHostInfo>& leaf_servers,
91  const std::string& udf_filename,
92  bool uses_gpus,
93  const size_t max_gpu_mem,
94  const int reserved_gpu_mem,
95  const bool create_user,
96  const bool create_db) {
97  LOG_IF(FATAL, !leaf_servers.empty()) << "Distributed test runner not supported.";
98  CHECK(leaf_servers.empty());
99  qr_instance_.reset(new QueryRunner(db_path,
100  user,
101  pass,
102  db_name,
103  string_servers,
104  leaf_servers,
105  udf_filename,
106  uses_gpus,
107  max_gpu_mem,
108  reserved_gpu_mem,
109  create_user,
110  create_db));
111  return qr_instance_.get();
112 }
113 
114 QueryRunner::QueryRunner(const char* db_path,
115  const std::string& user_name,
116  const std::string& passwd,
117  const std::string& db_name,
118  const std::vector<LeafHostInfo>& string_servers,
119  const std::vector<LeafHostInfo>& leaf_servers,
120  const std::string& udf_filename,
121  bool uses_gpus,
122  const size_t max_gpu_mem,
123  const int reserved_gpu_mem,
124  const bool create_user,
125  const bool create_db) {
126  boost::filesystem::path base_path{db_path};
127  CHECK(boost::filesystem::exists(base_path));
128  auto system_db_file = base_path / "mapd_catalogs" / OMNISCI_DEFAULT_DB;
129  CHECK(boost::filesystem::exists(system_db_file));
130  auto data_dir = base_path / "mapd_data";
133 
136  g_calcite = std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024, udf_filename);
137  ExtensionFunctionsWhitelist::add(g_calcite->getExtensionFunctionWhitelist());
138  if (!udf_filename.empty()) {
139  ExtensionFunctionsWhitelist::addUdfs(g_calcite->getUserDefinedFunctionWhitelist());
140  }
141 
143 
144  if (std::is_same<CudaBuildSelector, PreprocessorFalse>::value) {
145  uses_gpus = false;
146  }
147  MapDParameters mapd_params;
148  mapd_params.gpu_buffer_mem_bytes = max_gpu_mem;
149  mapd_params.aggregator = !leaf_servers.empty();
150 
151  auto data_mgr = std::make_shared<Data_Namespace::DataMgr>(
152  data_dir.string(), mapd_params, uses_gpus, -1, 0, reserved_gpu_mem);
153 
154  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
155 
156  sys_cat.init(base_path.string(),
157  data_mgr,
158  {},
159  g_calcite,
160  false,
161  mapd_params.aggregator,
162  string_servers);
163 
164  if (create_user) {
165  if (!sys_cat.getMetadataForUser(user_name, user)) {
166  sys_cat.createUser(user_name, passwd, false, "");
167  }
168  }
169  CHECK(sys_cat.getMetadataForUser(user_name, user));
170  CHECK(bcrypt_checkpw(passwd.c_str(), user.passwd_hash.c_str()) == 0);
171 
172  if (create_db) {
173  if (!sys_cat.getMetadataForDB(db_name, db)) {
174  sys_cat.createDatabase(db_name, user.userId);
175  }
176  }
177  CHECK(sys_cat.getMetadataForDB(db_name, db));
178  CHECK(user.isSuper || (user.userId == db.dbOwner));
179  auto cat = std::make_shared<Catalog_Namespace::Catalog>(
180  base_path.string(), db, data_mgr, string_servers, g_calcite, create_db);
181  Catalog_Namespace::Catalog::set(cat->getCurrentDB().dbName, cat);
182  session_info_ = std::make_unique<Catalog_Namespace::SessionInfo>(
183  cat, user, ExecutorDeviceType::GPU, "");
184 }
185 
186 QueryRunner::QueryRunner(std::unique_ptr<Catalog_Namespace::SessionInfo> session)
187  : session_info_(std::move(session)) {}
188 
189 std::shared_ptr<Catalog_Namespace::Catalog> QueryRunner::getCatalog() const {
191  return session_info_->get_catalog_ptr();
192 }
193 
194 std::shared_ptr<Calcite> QueryRunner::getCalcite() const {
195  // TODO: Embed Calcite shared_ptr ownership in QueryRunner
196  return g_calcite;
197 }
198 
199 bool QueryRunner::gpusPresent() const {
201  return session_info_->getCatalog().getDataMgr().gpusPresent();
202 }
203 
204 void QueryRunner::clearGpuMemory() const {
207 }
208 
209 void QueryRunner::clearCpuMemory() const {
212 }
213 
214 void QueryRunner::runDDLStatement(const std::string& stmt_str_in) {
217 
218  auto stmt_str = stmt_str_in;
219  // First remove special chars
220  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
221  // Then remove spaces
222  boost::algorithm::trim_left(stmt_str);
223 
224  auto query_state = create_query_state(session_info_, stmt_str);
225  auto stdlog = STDLOG(query_state);
226 
227  SQLParser parser;
228  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
229  std::string last_parsed;
230  CHECK_EQ(parser.parse(stmt_str, parse_trees, last_parsed), 0) << stmt_str_in;
231  CHECK_EQ(parse_trees.size(), size_t(1));
232  auto stmt = parse_trees.front().get();
233  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
234  CHECK(ddl);
235  ddl->execute(*session_info_);
236 }
237 
238 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
239  const ExecutorDeviceType device_type,
240  const bool hoist_literals,
241  const bool allow_loop_joins) {
244 
245  ParserWrapper pw{query_str};
246  if (pw.isCalcitePathPermissable()) {
247  if (ir_file_writer_ && (pw.getDMLType() == ParserWrapper::DMLType::NotDML)) {
248  try {
249  const auto result = runSelectQuery(
250  query_str, device_type, hoist_literals, allow_loop_joins, true);
251  const auto crt_row = result.getRows()->getNextRow(true, true);
252  CHECK_EQ(size_t(1), crt_row.size());
253  const auto scalar_ir = boost::get<ScalarTargetValue>(&crt_row[0]);
254  CHECK(scalar_ir);
255  const auto ir_ns = boost::get<NullableString>(scalar_ir);
256  CHECK(ir_ns);
257  const auto ir_str = boost::get<std::string>(ir_ns);
258  CHECK(ir_str);
259  (*ir_file_writer_)(query_str, *ir_str);
260  } catch (const std::exception& e) {
261  LOG(WARNING) << "Failed to run EXPLAIN on SELECT query: " << query_str << " ("
262  << e.what() << "). Proceeding with query execution.";
263  }
264  }
265  const auto execution_result =
266  runSelectQuery(query_str, device_type, hoist_literals, allow_loop_joins);
267 
268  return execution_result.getRows();
269  }
270 
271  auto query_state = create_query_state(session_info_, query_str);
272  auto stdlog = STDLOG(query_state);
273 
274  const auto& cat = session_info_->getCatalog();
275  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
276 
277  auto plan =
278  std::unique_ptr<Planner::RootPlan>(parsePlan(query_state->createQueryStateProxy()));
279 
280 #ifdef HAVE_CUDA
281  return executor->execute(plan.get(),
282  *session_info_,
283  hoist_literals,
284  device_type,
286  true,
287  allow_loop_joins);
288 #else
289  return executor->execute(plan.get(),
290  *session_info_,
291  hoist_literals,
292  device_type,
294  false,
295  allow_loop_joins);
296 #endif
297 }
298 
299 std::vector<std::shared_ptr<ResultSet>> QueryRunner::runMultipleStatements(
300  const std::string& sql,
301  const ExecutorDeviceType dt) {
302  std::vector<std::shared_ptr<ResultSet>> results;
303  // TODO: Need to properly handle escaped semicolons instead of doing a naive split().
304  auto fields = split(sql, ";");
305  for (const auto& field : fields) {
306  auto text = strip(field) + ";";
307  if (text == ";") {
308  continue;
309  }
310  // TODO: Maybe remove this redundant parsing after enhancing Parser::Stmt?
311  SQLParser parser;
312  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
313  std::string last_parsed;
314  CHECK_EQ(parser.parse(text, parse_trees, last_parsed), 0);
315  CHECK_EQ(parse_trees.size(), size_t(1));
316  auto stmt = parse_trees.front().get();
317  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
318  Parser::DMLStmt* dml = dynamic_cast<Parser::DMLStmt*>(stmt);
319  if (ddl != nullptr && dml == nullptr) {
320  runDDLStatement(text);
321  results.push_back(nullptr);
322  } else if (ddl == nullptr && dml != nullptr) {
323  results.push_back(runSQL(text, dt, true, true));
324  } else {
325  throw std::runtime_error("Unexpected SQL statement type: " + text);
326  }
327  }
328  return results;
329 }
330 
331 void QueryRunner::runImport(Parser::CopyTableStmt* import_stmt) {
332  CHECK(import_stmt);
333  import_stmt->execute(*session_info_);
334 }
335 
336 std::unique_ptr<Importer_NS::Loader> QueryRunner::getLoader(
337  const TableDescriptor* td) const {
338  auto cat = getCatalog();
339  return std::make_unique<Importer_NS::Loader>(*cat, td);
340 }
341 
342 namespace {
343 
345  QueryStateProxy query_state_proxy,
346  const ExecutorDeviceType device_type,
347  const bool hoist_literals,
348  const bool allow_loop_joins,
349  const bool just_explain,
350  const bool with_filter_push_down) {
351  auto const& query_state = query_state_proxy.getQueryState();
352  const auto& cat = query_state.getConstSessionInfo()->getCatalog();
353  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
354  CompilationOptions co = {
355  device_type, true, ExecutorOptLevel::LoopStrengthReduction, false};
357  true,
358  just_explain,
359  allow_loop_joins,
360  false,
361  false,
362  false,
363  false,
364  10000,
365  with_filter_push_down,
366  false,
368  auto calcite_mgr = cat.getCalciteMgr();
369  const auto query_ra = calcite_mgr
370  ->process(query_state_proxy,
371  pg_shim(query_state.getQueryStr()),
372  {},
373  true,
374  false,
375  false)
376  .plan_result;
377  RelAlgExecutor ra_executor(executor.get(), cat);
378 
379  auto result = ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr);
380  const auto& filter_push_down_requests = result.getPushedDownFilterInfo();
381  if (!filter_push_down_requests.empty()) {
382  std::vector<TFilterPushDownInfo> filter_push_down_info;
383  for (const auto& req : filter_push_down_requests) {
384  TFilterPushDownInfo filter_push_down_info_for_request;
385  filter_push_down_info_for_request.input_prev = req.input_prev;
386  filter_push_down_info_for_request.input_start = req.input_start;
387  filter_push_down_info_for_request.input_next = req.input_next;
388  filter_push_down_info.push_back(filter_push_down_info_for_request);
389  }
390  const auto new_query_ra = calcite_mgr
391  ->process(query_state_proxy,
392  pg_shim(query_state.getQueryStr()),
393  filter_push_down_info,
394  true,
395  false,
396  false)
397  .plan_result;
398  const ExecutionOptions eo_modified{eo.output_columnar_hint,
399  eo.allow_multifrag,
400  eo.just_explain,
401  eo.allow_loop_joins,
402  eo.with_watchdog,
403  eo.jit_debug,
404  eo.just_validate,
407  /*find_push_down_candidates=*/false,
408  /*just_calcite_explain=*/false,
410  return ra_executor.executeRelAlgQuery(new_query_ra, co, eo_modified, nullptr);
411  } else {
412  return result;
413  }
414 }
415 
416 } // namespace
417 
418 ExecutionResult QueryRunner::runSelectQuery(const std::string& query_str,
419  const ExecutorDeviceType device_type,
420  const bool hoist_literals,
421  const bool allow_loop_joins,
422  const bool just_explain) {
425  auto query_state = create_query_state(session_info_, query_str);
426  auto stdlog = STDLOG(query_state);
428  return run_select_query_with_filter_push_down(query_state->createQueryStateProxy(),
429  device_type,
430  hoist_literals,
431  allow_loop_joins,
432  just_explain,
434  }
435 
436  const auto& cat = session_info_->getCatalog();
437  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
438  CompilationOptions co = {
439  device_type, true, ExecutorOptLevel::LoopStrengthReduction, false};
441  true,
442  just_explain,
443  allow_loop_joins,
444  false,
445  false,
446  false,
447  false,
448  10000,
449  false,
450  false,
452  auto calcite_mgr = cat.getCalciteMgr();
453  const auto query_ra = calcite_mgr
454  ->process(query_state->createQueryStateProxy(),
455  pg_shim(query_str),
456  {},
457  true,
458  false,
459  false)
460  .plan_result;
461  RelAlgExecutor ra_executor(executor.get(), cat);
462  return ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr);
463 }
464 
465 void QueryRunner::reset() {
466  qr_instance_.reset(nullptr);
468 }
469 
470 } // namespace QueryRunner
Classes used to wrap parser calls for calcite redirection.
static void addUdfs(const std::string &json_func_sigs)
#define CHECK_EQ(x, y)
Definition: Logger.h:198
virtual ExecutionResult runSelectQuery(const std::string &query_str, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain=false)
#define CALCITEPORT
Definition: QueryRunner.cpp:39
ExecutionResult run_select_query_with_filter_push_down(QueryStateProxy query_state_proxy, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain, const bool with_filter_push_down)
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters(),::QueryRenderer::QueryRenderManager *render_manager=nullptr)
Definition: Execute.cpp:127
ExecutorDeviceType
#define LOG(tag)
Definition: Logger.h:185
size_t gpu_buffer_mem_bytes
std::string strip(const std::string &str)
trim any whitespace from the left and right ends of a string
void mapd_signal_handler(int signal_number)
Definition: QueryRunner.cpp:58
static void add(const std::string &json_func_sigs)
std::unique_ptr< IRFileWriter > ir_file_writer_
Definition: QueryRunner.h:191
void set_once_fatal_func(FatalFunc fatal_func)
Definition: Logger.cpp:292
void execute(const Catalog_Namespace::SessionInfo &session) override
#define LOG_IF(severity, condition)
Definition: Logger.h:276
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:160
virtual std::shared_ptr< ResultSet > runSQL(const std::string &query_str, const ExecutorDeviceType device_type, const bool hoist_literals=true, const bool allow_loop_joins=true)
Planner::RootPlan * parsePlan(QueryStateProxy)
Definition: ParserUtils.cpp:69
This file contains the class specification and related data structures for Catalog.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool g_enable_columnar_output
Definition: Execute.cpp:86
static SysCatalog & instance()
Definition: SysCatalog.h:242
const bool allow_multifrag
CHECK(cgen_state)
const bool just_validate
size_t g_leaf_count
Definition: ParserNode.cpp:66
void init(LogOptions const &log_opts)
Definition: Logger.cpp:265
QueryState & getQueryState()
Definition: QueryState.h:172
const std::string OMNISCI_DEFAULT_DB
Definition: SysCatalog.h:58
const bool with_dynamic_watchdog
const double gpu_input_mem_limit_percent
virtual void runDDLStatement(const std::string &)
static std::unique_ptr< QueryRunner > qr_instance_
Definition: QueryRunner.h:186
double g_gpu_mem_limit_percent
Definition: QueryRunner.cpp:44
const bool output_columnar_hint
static void set(const std::string &dbName, std::shared_ptr< Catalog > cat)
Definition: Catalog.cpp:2986
void shutdown()
Definition: Logger.cpp:299
std::vector< std::string > split(const std::string &str, const std::string &delim)
split apart a string into a vector of substrings
std::shared_ptr< Catalog_Namespace::SessionInfo > session_info_
Definition: QueryRunner.h:188
void register_signal_handler(int signum, void(*handler)(int))
Definition: MapDServer.cpp:116
std::shared_ptr< Calcite > g_calcite
Definition: QueryRunner.cpp:49
static std::shared_ptr< query_state::QueryState > create_query_state(Ts &&...args)
Definition: QueryRunner.h:164
bool g_enable_filter_push_down
Definition: Execute.cpp:82
const bool allow_loop_joins
std::shared_ptr< Catalog_Namespace::Catalog > getCatalog() const
ExecutionResult executeRelAlgQuery(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
specifies the content in-memory of a row in the table metadata table
const unsigned dynamic_watchdog_time_limit
static unsigned pass
Definition: testing.h:29
std::shared_ptr< Catalog_Namespace::SessionInfo const > getConstSessionInfo() const
Definition: QueryState.cpp:75
std::string pg_shim(const std::string &query)
#define STDLOG(...)
Definition: QueryState.h:225
const bool with_watchdog
std::atomic< bool > isSuper
Definition: SysCatalog.h:85