OmniSciDB  04ee39c94c
QueryRunner.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "QueryRunner.h"
18 
19 #include "Calcite/Calcite.h"
20 #include "Catalog/Catalog.h"
21 #include "Parser/ParserWrapper.h"
22 #include "Parser/parser.h"
24 #include "Shared/ConfigResolve.h"
25 #include "Shared/Logger.h"
26 #include "Shared/MapDParameters.h"
27 #include "Shared/StringTransform.h"
28 #include "bcrypt.h"
29 #include "gen-cpp/CalciteServer.h"
30 
33 
34 #include "DistributedLoader.h"
35 
36 #include <boost/filesystem/operations.hpp>
37 #include <csignal>
38 #include <random>
39 
40 #define CALCITEPORT 36279
41 
42 extern size_t g_leaf_count;
43 extern bool g_enable_filter_push_down;
44 
46 
47 using namespace Catalog_Namespace;
48 namespace {
49 
50 std::shared_ptr<Calcite> g_calcite = nullptr;
51 
53  if (g_calcite) {
54  g_calcite->close_calcite_server();
55  }
56 }
57 
58 void mapd_signal_handler(int signal_number) {
59  LOG(ERROR) << "Interrupt signal (" << signal_number << ") received.";
61  // shut down logging force a flush
63  // terminate program
64  if (signal_number == SIGTERM) {
65  std::exit(EXIT_SUCCESS);
66  } else {
67  std::exit(signal_number);
68  }
69 }
70 
72  std::signal(SIGTERM, mapd_signal_handler);
73  std::signal(SIGSEGV, mapd_signal_handler);
74  std::signal(SIGABRT, mapd_signal_handler);
75 }
76 
77 } // namespace
78 
79 namespace QueryRunner {
80 
81 std::unique_ptr<QueryRunner> QueryRunner::qr_instance_ = nullptr;
82 
83 QueryRunner* QueryRunner::init(const char* db_path,
84  const std::string& user,
85  const std::string& pass,
86  const std::string& db_name,
87  const std::vector<LeafHostInfo>& string_servers,
88  const std::vector<LeafHostInfo>& leaf_servers,
89  const std::string& udf_filename,
90  bool uses_gpus,
91  const size_t max_gpu_mem,
92  const int reserved_gpu_mem,
93  const bool create_user,
94  const bool create_db) {
95  LOG_IF(FATAL, !leaf_servers.empty()) << "Distributed test runner not supported.";
96  CHECK(leaf_servers.empty());
97  qr_instance_.reset(new QueryRunner(db_path,
98  user,
99  pass,
100  db_name,
101  string_servers,
102  leaf_servers,
103  udf_filename,
104  uses_gpus,
105  max_gpu_mem,
106  reserved_gpu_mem,
107  create_user,
108  create_db));
109  return qr_instance_.get();
110 }
111 
112 QueryRunner::QueryRunner(const char* db_path,
113  const std::string& user_name,
114  const std::string& passwd,
115  const std::string& db_name,
116  const std::vector<LeafHostInfo>& string_servers,
117  const std::vector<LeafHostInfo>& leaf_servers,
118  const std::string& udf_filename,
119  bool uses_gpus,
120  const size_t max_gpu_mem,
121  const int reserved_gpu_mem,
122  const bool create_user,
123  const bool create_db) {
124  boost::filesystem::path base_path{db_path};
125  CHECK(boost::filesystem::exists(base_path));
126  auto system_db_file = base_path / "mapd_catalogs" / OMNISCI_DEFAULT_DB;
127  CHECK(boost::filesystem::exists(system_db_file));
128  auto data_dir = base_path / "mapd_data";
131 
134  g_calcite = std::make_shared<Calcite>(-1, CALCITEPORT, db_path, 1024, "", udf_filename);
135  ExtensionFunctionsWhitelist::add(g_calcite->getExtensionFunctionWhitelist());
136  if (!udf_filename.empty()) {
137  ExtensionFunctionsWhitelist::addUdfs(g_calcite->getUserDefinedFunctionWhitelist());
138  }
139 
140  if (std::is_same<CudaBuildSelector, PreprocessorFalse>::value) {
141  uses_gpus = false;
142  }
143  MapDParameters mapd_params;
144  mapd_params.gpu_buffer_mem_bytes = max_gpu_mem;
145  mapd_params.aggregator = !leaf_servers.empty();
146 
147  auto data_mgr = std::make_shared<Data_Namespace::DataMgr>(
148  data_dir.string(), mapd_params, uses_gpus, -1, 0, reserved_gpu_mem);
149 
151 
152  sys_cat.init(base_path.string(),
153  data_mgr,
154  {},
155  g_calcite,
156  false,
157  mapd_params.aggregator,
158  string_servers);
159 
160  if (create_user) {
161  if (!sys_cat.getMetadataForUser(user_name, user)) {
162  sys_cat.createUser(user_name, passwd, false, "");
163  }
164  }
165  CHECK(sys_cat.getMetadataForUser(user_name, user));
166  CHECK(bcrypt_checkpw(passwd.c_str(), user.passwd_hash.c_str()) == 0);
167 
168  if (create_db) {
169  if (!sys_cat.getMetadataForDB(db_name, db)) {
170  sys_cat.createDatabase(db_name, user.userId);
171  }
172  }
173  CHECK(sys_cat.getMetadataForDB(db_name, db));
174  CHECK(user.isSuper || (user.userId == db.dbOwner));
175  auto cat = std::make_shared<Catalog_Namespace::Catalog>(
176  base_path.string(), db, data_mgr, string_servers, g_calcite, create_db);
177  Catalog_Namespace::Catalog::set(cat->getCurrentDB().dbName, cat);
178  session_info_ = std::make_unique<Catalog_Namespace::SessionInfo>(
179  cat, user, ExecutorDeviceType::GPU, "");
180 }
181 
182 QueryRunner::QueryRunner(std::unique_ptr<Catalog_Namespace::SessionInfo> session)
183  : session_info_(std::move(session)) {}
184 
185 std::shared_ptr<Catalog_Namespace::Catalog> QueryRunner::getCatalog() const {
186  CHECK(session_info_);
187  return session_info_->get_catalog_ptr();
188 }
189 
190 std::shared_ptr<Calcite> QueryRunner::getCalcite() const {
191  // TODO: Embed Calcite shared_ptr ownership in QueryRunner
192  return g_calcite;
193 }
194 
195 bool QueryRunner::gpusPresent() const {
196  CHECK(session_info_);
197  return session_info_->getCatalog().getDataMgr().gpusPresent();
198 }
199 
200 void QueryRunner::clearGpuMemory() const {
203 }
204 
205 void QueryRunner::clearCpuMemory() const {
208 }
209 
210 void QueryRunner::runDDLStatement(const std::string& stmt_str_in) {
211  CHECK(session_info_);
213 
214  auto stmt_str = stmt_str_in;
215  // First remove special chars
216  boost::algorithm::trim_left_if(stmt_str, boost::algorithm::is_any_of("\n"));
217  // Then remove spaces
218  boost::algorithm::trim_left(stmt_str);
219 
220  SQLParser parser;
221  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
222  std::string last_parsed;
223  CHECK_EQ(parser.parse(stmt_str, parse_trees, last_parsed), 0) << stmt_str_in;
224  CHECK_EQ(parse_trees.size(), size_t(1));
225  auto stmt = parse_trees.front().get();
226  auto ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
227  CHECK(ddl);
228  ddl->execute(*session_info_);
229 }
230 
231 std::shared_ptr<ResultSet> QueryRunner::runSQL(const std::string& query_str,
232  const ExecutorDeviceType device_type,
233  const bool hoist_literals,
234  const bool allow_loop_joins) {
235  CHECK(session_info_);
237 
238  ParserWrapper pw{query_str};
239  if (pw.isCalcitePathPermissable()) {
240  if (ir_file_writer_ && (pw.getDMLType() == ParserWrapper::DMLType::NotDML)) {
241  try {
242  const auto result = runSelectQuery(
243  query_str, device_type, hoist_literals, allow_loop_joins, true);
244  const auto crt_row = result.getRows()->getNextRow(true, true);
245  CHECK_EQ(size_t(1), crt_row.size());
246  const auto scalar_ir = boost::get<ScalarTargetValue>(&crt_row[0]);
247  CHECK(scalar_ir);
248  const auto ir_ns = boost::get<NullableString>(scalar_ir);
249  CHECK(ir_ns);
250  const auto ir_str = boost::get<std::string>(ir_ns);
251  CHECK(ir_str);
252  (*ir_file_writer_)(query_str, *ir_str);
253  } catch (const std::exception& e) {
254  LOG(WARNING) << "Failed to run EXPLAIN on SELECT query: " << query_str << " ("
255  << e.what() << "). Proceeding with query execution.";
256  }
257  }
258  const auto execution_result =
259  runSelectQuery(query_str, device_type, hoist_literals, allow_loop_joins);
260 
261  return execution_result.getRows();
262  }
263 
264  const auto& cat = session_info_->getCatalog();
265  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
266 
267  auto plan = std::unique_ptr<Planner::RootPlan>(parsePlan(query_str));
268 
269 #ifdef HAVE_CUDA
270  return executor->execute(plan.get(),
271  *session_info_,
272  hoist_literals,
273  device_type,
275  true,
276  allow_loop_joins);
277 #else
278  return executor->execute(plan.get(),
279  *session_info_,
280  hoist_literals,
281  device_type,
283  false,
284  allow_loop_joins);
285 #endif
286 }
287 
288 std::vector<std::shared_ptr<ResultSet>> QueryRunner::runMultipleStatements(
289  const std::string& sql,
290  const ExecutorDeviceType dt) {
291  std::vector<std::shared_ptr<ResultSet>> results;
292  // TODO: Need to properly handle escaped semicolons instead of doing a naive split().
293  auto fields = split(sql, ";");
294  for (const auto& field : fields) {
295  auto text = strip(field) + ";";
296  if (text == ";") {
297  continue;
298  }
299  // TODO: Maybe remove this redundant parsing after enhancing Parser::Stmt?
300  SQLParser parser;
301  std::list<std::unique_ptr<Parser::Stmt>> parse_trees;
302  std::string last_parsed;
303  CHECK_EQ(parser.parse(text, parse_trees, last_parsed), 0);
304  CHECK_EQ(parse_trees.size(), size_t(1));
305  auto stmt = parse_trees.front().get();
306  Parser::DDLStmt* ddl = dynamic_cast<Parser::DDLStmt*>(stmt);
307  Parser::DMLStmt* dml = dynamic_cast<Parser::DMLStmt*>(stmt);
308  if (ddl != nullptr && dml == nullptr) {
309  runDDLStatement(text);
310  results.push_back(nullptr);
311  } else if (ddl == nullptr && dml != nullptr) {
312  results.push_back(runSQL(text, dt, true, true));
313  } else {
314  throw std::runtime_error("Unexpected SQL statement type: " + text);
315  }
316  }
317  return results;
318 }
319 
320 void QueryRunner::runImport(Parser::CopyTableStmt* import_stmt) {
321  CHECK(import_stmt);
322  import_stmt->execute(*session_info_);
323 }
324 
325 std::unique_ptr<Importer_NS::Loader> QueryRunner::getLoader(
326  const TableDescriptor* td) const {
327  auto cat = getCatalog();
328  return std::make_unique<Importer_NS::Loader>(*cat, td);
329 }
330 
331 namespace {
332 
334  const std::string& query_str,
335  const std::unique_ptr<Catalog_Namespace::SessionInfo>& session,
336  const ExecutorDeviceType device_type,
337  const bool hoist_literals,
338  const bool allow_loop_joins,
339  const bool just_explain,
340  const bool with_filter_push_down) {
341  const auto& cat = session->getCatalog();
342  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
343  CompilationOptions co = {
344  device_type, true, ExecutorOptLevel::LoopStrengthReduction, false};
346  true,
347  just_explain,
348  allow_loop_joins,
349  false,
350  false,
351  false,
352  false,
353  10000,
354  with_filter_push_down,
355  false,
357  auto calcite_mgr = cat.getCalciteMgr();
358  const auto query_ra =
359  calcite_mgr->process(*session, pg_shim(query_str), {}, true, false, false)
360  .plan_result;
361  RelAlgExecutor ra_executor(executor.get(), cat);
362 
363  auto result = ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr);
364  const auto& filter_push_down_requests = result.getPushedDownFilterInfo();
365  if (!filter_push_down_requests.empty()) {
366  std::vector<TFilterPushDownInfo> filter_push_down_info;
367  for (const auto& req : filter_push_down_requests) {
368  TFilterPushDownInfo filter_push_down_info_for_request;
369  filter_push_down_info_for_request.input_prev = req.input_prev;
370  filter_push_down_info_for_request.input_start = req.input_start;
371  filter_push_down_info_for_request.input_next = req.input_next;
372  filter_push_down_info.push_back(filter_push_down_info_for_request);
373  }
374  const auto new_query_ra =
375  calcite_mgr
376  ->process(
377  *session, pg_shim(query_str), filter_push_down_info, true, false, false)
378  .plan_result;
379  const ExecutionOptions eo_modified{eo.output_columnar_hint,
380  eo.allow_multifrag,
381  eo.just_explain,
382  eo.allow_loop_joins,
383  eo.with_watchdog,
384  eo.jit_debug,
385  eo.just_validate,
388  /*find_push_down_candidates=*/false,
389  /*just_calcite_explain=*/false,
391  return ra_executor.executeRelAlgQuery(new_query_ra, co, eo_modified, nullptr);
392  } else {
393  return result;
394  }
395 }
396 
397 } // namespace
398 
399 ExecutionResult QueryRunner::runSelectQuery(const std::string& query_str,
400  const ExecutorDeviceType device_type,
401  const bool hoist_literals,
402  const bool allow_loop_joins,
403  const bool just_explain) {
404  CHECK(session_info_);
408  session_info_,
409  device_type,
410  hoist_literals,
411  allow_loop_joins,
412  just_explain,
414  }
415 
416  const auto& cat = session_info_->getCatalog();
417  auto executor = Executor::getExecutor(cat.getCurrentDB().dbId);
418  CompilationOptions co = {
419  device_type, true, ExecutorOptLevel::LoopStrengthReduction, false};
421  true,
422  just_explain,
423  allow_loop_joins,
424  false,
425  false,
426  false,
427  false,
428  10000,
429  false,
430  false,
432  auto calcite_mgr = cat.getCalciteMgr();
433  const auto query_ra =
434  calcite_mgr->process(*session_info_, pg_shim(query_str), {}, true, false, false)
435  .plan_result;
436  RelAlgExecutor ra_executor(executor.get(), cat);
437  return ra_executor.executeRelAlgQuery(query_ra, co, eo, nullptr);
438 }
439 
441  const Catalog_Namespace::SessionInfo* session) {
442  return session->get_currentUser();
443 }
444 
445 } // namespace QueryRunner
Classes used to wrap parser calls for calcite redirection.
static void addUdfs(const std::string &json_func_sigs)
#define CHECK_EQ(x, y)
Definition: Logger.h:195
#define CALCITEPORT
Definition: QueryRunner.cpp:40
static std::shared_ptr< Executor > getExecutor(const int db_id, const std::string &debug_dir="", const std::string &debug_file="", const MapDParameters mapd_parameters=MapDParameters(), ::QueryRenderer::QueryRenderManager *render_manager=nullptr)
Definition: Execute.cpp:122
ExecutorDeviceType
ExecutionResult run_select_query_with_filter_push_down(const std::string &query_str, const std::unique_ptr< Catalog_Namespace::SessionInfo > &session, const ExecutorDeviceType device_type, const bool hoist_literals, const bool allow_loop_joins, const bool just_explain, const bool with_filter_push_down)
#define LOG(tag)
Definition: Logger.h:182
size_t gpu_buffer_mem_bytes
auto sql(const std::string &sql_stmts)
std::string strip(const std::string &str)
void mapd_signal_handler(int signal_number)
Definition: QueryRunner.cpp:58
static void add(const std::string &json_func_sigs)
const UserMetadata & get_currentUser() const
Definition: SessionInfo.h:93
void set_once_fatal_func(FatalFunc fatal_func)
Definition: Logger.cpp:287
void execute(const Catalog_Namespace::SessionInfo &session) override
#define LOG_IF(severity, condition)
Definition: Logger.h:273
static void clearMemory(const Data_Namespace::MemoryLevel memory_level)
Definition: Execute.cpp:155
This file contains the class specification and related data structures for Catalog.
const rapidjson::Value & field(const rapidjson::Value &obj, const char field[]) noexcept
Definition: JsonAccessors.h:31
bool g_enable_columnar_output
Definition: Execute.cpp:84
static SysCatalog & instance()
Definition: SysCatalog.h:240
const bool allow_multifrag
const bool just_validate
void init(LogOptions const &log_opts)
Definition: Logger.cpp:260
const std::string OMNISCI_DEFAULT_DB
Definition: SysCatalog.h:58
const bool with_dynamic_watchdog
const double gpu_input_mem_limit_percent
size_t g_leaf_count
Definition: ParserNode.cpp:63
bool g_enable_filter_push_down
Definition: Execute.cpp:80
double g_gpu_mem_limit_percent
Definition: QueryRunner.cpp:45
const bool output_columnar_hint
static void set(const std::string &dbName, std::shared_ptr< Catalog > cat)
Definition: Catalog.cpp:2984
void shutdown()
Definition: Logger.cpp:294
Catalog_Namespace::UserMetadata get_user_metadata(const Catalog_Namespace::SessionInfo *session)
std::vector< std::string > split(const std::string &str, const std::string &delim)
void register_signal_handler(int signum, void(*handler)(int))
Definition: MapDServer.cpp:100
std::shared_ptr< Calcite > g_calcite
Definition: QueryRunner.cpp:50
const bool allow_loop_joins
#define CHECK(condition)
Definition: Logger.h:187
ExecutionResult executeRelAlgQuery(const std::string &query_ra, const CompilationOptions &co, const ExecutionOptions &eo, RenderInfo *render_info)
specifies the content in-memory of a row in the table metadata table
const unsigned dynamic_watchdog_time_limit
static unsigned pass
Definition: testing.h:29
TSessionId session
std::string pg_shim(const std::string &query)
const bool with_watchdog