OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DBEngine.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "DBEngine.h"
18 #include <boost/filesystem.hpp>
19 #include <stdexcept>
22 #include "Parser/ParserWrapper.h"
23 #include "Parser/parser.h"
25 #include "QueryEngine/Execute.h"
30 
31 extern bool g_enable_union;
32 extern bool g_serialize_temp_tables;
33 
34 namespace EmbeddedDatabase {
35 
36 class DBEngineImpl;
37 
41 class CursorImpl : public Cursor {
42  public:
43  CursorImpl(std::shared_ptr<ResultSet> result_set, std::vector<std::string> col_names)
44  : result_set_(result_set), col_names_(col_names) {}
45 
47  col_names_.clear();
48  record_batch_.reset();
49  result_set_.reset();
50  }
51 
52  size_t getColCount() { return result_set_ ? result_set_->colCount() : 0; }
53 
54  size_t getRowCount() { return result_set_ ? result_set_->rowCount() : 0; }
55 
57  if (result_set_) {
58  auto row = result_set_->getNextRow(true, false);
59  return row.empty() ? Row() : Row(row);
60  }
61  return Row();
62  }
63 
64  ColumnType getColType(uint32_t col_num) {
65  if (col_num < getColCount()) {
66  SQLTypeInfo type_info = result_set_->getColType(col_num);
67  return sqlToColumnType(type_info.get_type());
68  }
69  return ColumnType::UNKNOWN;
70  }
71 
72  std::shared_ptr<arrow::RecordBatch> getArrowRecordBatch() {
73  if (record_batch_) {
74  return record_batch_;
75  }
76  auto col_count = getColCount();
77  if (col_count > 0) {
78  auto row_count = getRowCount();
79  if (row_count > 0) {
80  auto converter =
81  std::make_unique<ArrowResultSetConverter>(result_set_, col_names_, -1);
82  record_batch_ = converter->convertToArrow();
83  return record_batch_;
84  }
85  }
86  return nullptr;
87  }
88 
89  private:
90  std::shared_ptr<ResultSet> result_set_;
91  std::vector<std::string> col_names_;
92  std::shared_ptr<arrow::RecordBatch> record_batch_;
93 };
94 
98 class DBEngineImpl : public DBEngine {
99  public:
101 
103 
104  bool init(const std::string& cmd_line) {
105  static bool initialized{false};
106  if (initialized) {
107  throw std::runtime_error("Database engine already initialized");
108  }
109 
111 
112  // Split the command line into parameters
113  std::vector<std::string> parameters;
114  if (!cmd_line.empty()) {
115  parameters = boost::program_options::split_unix(cmd_line);
116  }
117 
118  // Generate command line to initialize CommandLineOptions for DBHandler
119  const char* log_option = "omnisci_dbe";
120  std::vector<const char*> cstrings;
121  cstrings.push_back(log_option);
122  for (auto& param : parameters) {
123  cstrings.push_back(param.c_str());
124  }
125  int argc = cstrings.size();
126  const char** argv = cstrings.data();
127 
128  CommandLineOptions prog_config_opts(log_option);
129  if (prog_config_opts.parse_command_line(argc, argv, false)) {
130  throw std::runtime_error("DBE paramerameters parsing failed");
131  }
132 
133  auto base_path = prog_config_opts.base_path;
134 
135  // Check path to the database
136  bool is_new_db = base_path.empty() || !catalogExists(base_path);
137  if (is_new_db) {
138  base_path = createCatalog(base_path);
139  if (base_path.empty()) {
140  throw std::runtime_error("Database directory could not be created");
141  }
142  }
143  prog_config_opts.base_path = base_path;
144  prog_config_opts.init_logging();
145 
146  prog_config_opts.system_parameters.omnisci_server_port = -1;
147  prog_config_opts.system_parameters.calcite_keepalive = true;
148 
149  try {
150  db_handler_ =
151  mapd::make_shared<DBHandler>(prog_config_opts.db_leaves,
152  prog_config_opts.string_leaves,
153  prog_config_opts.base_path,
154  prog_config_opts.allow_multifrag,
155  prog_config_opts.jit_debug,
156  prog_config_opts.intel_jit_profile,
157  prog_config_opts.read_only,
158  prog_config_opts.allow_loop_joins,
159  prog_config_opts.enable_rendering,
160  prog_config_opts.renderer_use_vulkan_driver,
161  prog_config_opts.enable_auto_clear_render_mem,
162  prog_config_opts.render_oom_retry_threshold,
163  prog_config_opts.render_mem_bytes,
164  prog_config_opts.max_concurrent_render_sessions,
165  prog_config_opts.reserved_gpu_mem,
166  prog_config_opts.render_compositor_use_last_gpu,
167  prog_config_opts.num_reader_threads,
168  prog_config_opts.authMetadata,
169  prog_config_opts.system_parameters,
170  prog_config_opts.enable_legacy_syntax,
171  prog_config_opts.idle_session_duration,
172  prog_config_opts.max_session_duration,
173  prog_config_opts.enable_runtime_udf,
174  prog_config_opts.udf_file_name,
175  prog_config_opts.udf_compiler_path,
176  prog_config_opts.udf_compiler_options,
177 #ifdef ENABLE_GEOS
178  prog_config_opts.libgeos_so_filename,
179 #endif
180  prog_config_opts.disk_cache_config,
181  is_new_db);
182  } catch (const std::exception& e) {
183  LOG(FATAL) << "Failed to initialize database handler: " << e.what();
184  }
185  db_handler_->connect(
187  base_path_ = base_path;
188  initialized = true;
189  return true;
190  }
191 
192  std::shared_ptr<CursorImpl> sql_execute_dbe(const TSessionId& session_id,
193  const std::string& query_str,
194  const bool column_format,
195  const int32_t first_n,
196  const int32_t at_most_n) {
197  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
200  nullptr,
201  nullptr,
202  0,
203  0),
204  {}};
205  db_handler_->sql_execute(
206  result, session_id, query_str, column_format, first_n, at_most_n);
207  auto& targets = result.getTargetsMeta();
208  std::vector<std::string> col_names;
209  for (const auto target : targets) {
210  col_names.push_back(target.get_resname());
211  }
212  return std::make_shared<CursorImpl>(result.getRows(), col_names);
213  }
214 
215  void executeDDL(const std::string& query) {
216  auto res = sql_execute_dbe(session_id_, query, false, -1, -1);
217  }
218 
219  void importArrowTable(const std::string& name,
220  std::shared_ptr<arrow::Table>& table,
221  uint64_t fragment_size) {
222  setArrowTable(name, table);
223  try {
224  auto session = db_handler_->get_session_copy(session_id_);
225  TableDescriptor td;
226  td.tableName = name;
227  td.userId = session.get_currentUser().userId;
228  td.storageType = "ARROW:" + name;
230  td.isView = false;
231  td.fragmenter = nullptr;
233  td.maxFragRows = fragment_size > 0 ? fragment_size : DEFAULT_FRAGMENT_ROWS;
237  td.keyMetainfo = "[]";
238 
239  std::list<ColumnDescriptor> cols;
240  std::vector<Parser::SharedDictionaryDef> dictionaries;
241  auto catalog = session.get_catalog_ptr();
242  // nColumns
243  catalog->createTable(td, cols, dictionaries, false);
245  session.get_currentUser(), td.tableName, TableDBObjectType, *catalog);
246  } catch (...) {
247  releaseArrowTable(name);
248  throw;
249  }
250  releaseArrowTable(name);
251  }
252 
253  std::shared_ptr<CursorImpl> executeDML(const std::string& query) {
254  return sql_execute_dbe(session_id_, query, false, -1, -1);
255  }
256 
257  std::shared_ptr<CursorImpl> executeRA(const std::string& query) {
258  return sql_execute_dbe(session_id_, query, false, -1, -1);
259  }
260 
261  std::vector<std::string> getTables() {
262  std::vector<std::string> table_names;
263  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
264  if (catalog) {
265  const auto tables = catalog->getAllTableMetadata();
266  for (const auto td : tables) {
267  if (td->shard >= 0) {
268  // skip shards, they're not standalone tables
269  continue;
270  }
271  table_names.push_back(td->tableName);
272  }
273  } else {
274  throw std::runtime_error("System catalog uninitialized");
275  }
276  return table_names;
277  }
278 
279  std::vector<ColumnDetails> getTableDetails(const std::string& table_name) {
280  std::vector<ColumnDetails> result;
281  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
282  if (catalog) {
283  auto metadata = catalog->getMetadataForTable(table_name, false);
284  if (metadata) {
285  const auto col_descriptors =
286  catalog->getAllColumnMetadataForTable(metadata->tableId, false, true, false);
287  const auto deleted_cd = catalog->getDeletedColumn(metadata);
288  for (const auto cd : col_descriptors) {
289  if (cd == deleted_cd) {
290  continue;
291  }
292  ColumnDetails col_details;
293  col_details.col_name = cd->columnName;
294  auto ct = cd->columnType;
295  SQLTypes sql_type = ct.get_type();
296  EncodingType sql_enc = ct.get_compression();
297  col_details.col_type = sqlToColumnType(sql_type);
298  col_details.encoding = sqlToColumnEncoding(sql_enc);
299  col_details.nullable = !ct.get_notnull();
300  col_details.is_array = (sql_type == kARRAY);
301  if (IS_GEO(sql_type)) {
302  col_details.precision = static_cast<int>(ct.get_subtype());
303  col_details.scale = ct.get_output_srid();
304  } else {
305  col_details.precision = ct.get_precision();
306  col_details.scale = ct.get_scale();
307  }
308  if (col_details.encoding == ColumnEncoding::DICT) {
309  // have to get the actual size of the encoding from the dictionary
310  // definition
311  const int dict_id = ct.get_comp_param();
312  auto dd = catalog->getMetadataForDict(dict_id, false);
313  if (dd) {
314  col_details.comp_param = dd->dictNBits;
315  } else {
316  throw std::runtime_error("Dictionary definition for column doesn't exist");
317  }
318  } else {
319  col_details.comp_param = ct.get_comp_param();
320  if (ct.is_date_in_days() && col_details.comp_param == 0) {
321  col_details.comp_param = 32;
322  }
323  }
324  result.push_back(col_details);
325  }
326  }
327  }
328  return result;
329  }
330 
331  bool setDatabase(std::string& db_name) {
332  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
333  auto& user = db_handler_->get_session_copy(session_id_).get_currentUser();
334  sys_cat.switchDatabase(db_name, user.userName);
335  return true;
336  }
337 
338  bool login(std::string& db_name, std::string& user_name, const std::string& password) {
339  db_handler_->disconnect(session_id_);
340  db_handler_->connect(session_id_, user_name, password, db_name);
341  return true;
342  }
343 
344  protected:
345  void reset() {
346  if (db_handler_) {
347  db_handler_->disconnect(session_id_);
348  db_handler_->shutdown();
349  }
351  db_handler_.reset();
352 
354  if (is_temp_db_) {
355  boost::filesystem::remove_all(base_path_);
356  }
357  base_path_.clear();
358  }
359 
360  bool catalogExists(const std::string& base_path) {
361  if (!boost::filesystem::exists(base_path)) {
362  return false;
363  }
364  for (auto& subdir : system_folders_) {
365  std::string path = base_path + "/" + subdir;
366  if (!boost::filesystem::exists(path)) {
367  return false;
368  }
369  }
370  return true;
371  }
372 
373  void cleanCatalog(const std::string& base_path) {
374  if (boost::filesystem::exists(base_path)) {
375  for (auto& subdir : system_folders_) {
376  std::string path = base_path + "/" + subdir;
377  if (boost::filesystem::exists(path)) {
378  boost::filesystem::remove_all(path);
379  }
380  }
381  }
382  }
383 
384  std::string createCatalog(const std::string& base_path) {
385  std::string root_dir = base_path;
386  if (base_path.empty()) {
387  boost::system::error_code error;
388  auto tmp_path = boost::filesystem::temp_directory_path(error);
389  if (boost::system::errc::success != error.value()) {
390  std::cerr << error.message() << std::endl;
391  return "";
392  }
393  tmp_path /= "omnidbe_%%%%-%%%%-%%%%";
394  auto uniq_path = boost::filesystem::unique_path(tmp_path, error);
395  if (boost::system::errc::success != error.value()) {
396  std::cerr << error.message() << std::endl;
397  return "";
398  }
399  root_dir = uniq_path.string();
400  is_temp_db_ = true;
401  }
402  if (!boost::filesystem::exists(root_dir)) {
403  if (!boost::filesystem::create_directory(root_dir)) {
404  std::cerr << "Cannot create database directory: " << root_dir << std::endl;
405  return "";
406  }
407  }
408  size_t absent_count = 0;
409  for (auto& sub_dir : system_folders_) {
410  std::string path = root_dir + "/" + sub_dir;
411  if (!boost::filesystem::exists(path)) {
412  if (!boost::filesystem::create_directory(path)) {
413  std::cerr << "Cannot create database subdirectory: " << path << std::endl;
414  return "";
415  }
416  ++absent_count;
417  }
418  }
419  if ((absent_count > 0) && (absent_count < system_folders_.size())) {
420  std::cerr << "Database directory structure is broken: " << root_dir << std::endl;
421  return "";
422  }
423  return root_dir;
424  }
425 
426  private:
427  std::string base_path_;
428  std::string session_id_;
429  mapd::shared_ptr<DBHandler> db_handler_;
431  std::string udf_filename_;
432 
433  std::vector<std::string> system_folders_ = {"mapd_catalogs",
434  "mapd_data",
435  "mapd_export"};
436 };
437 
438 namespace {
440 }
441 
442 std::shared_ptr<DBEngine> DBEngine::create(const std::string& cmd_line) {
443  const std::lock_guard<std::mutex> lock(engine_create_mutex);
444  auto engine = std::make_shared<DBEngineImpl>();
445  if (!engine->init(cmd_line)) {
446  throw std::runtime_error("DBE initialization failed");
447  }
448  return engine;
449 }
450 
454  return (DBEngineImpl*)ptr;
455 }
456 inline const DBEngineImpl* getImpl(const DBEngine* ptr) {
457  return (const DBEngineImpl*)ptr;
458 }
459 
462 void DBEngine::executeDDL(const std::string& query) {
463  DBEngineImpl* engine = getImpl(this);
464  engine->executeDDL(query);
465 }
466 
467 std::shared_ptr<Cursor> DBEngine::executeDML(const std::string& query) {
468  DBEngineImpl* engine = getImpl(this);
469  return engine->executeDML(query);
470 }
471 
472 std::shared_ptr<Cursor> DBEngine::executeRA(const std::string& query) {
473  DBEngineImpl* engine = getImpl(this);
474  return engine->executeRA(query);
475 }
476 
477 void DBEngine::importArrowTable(const std::string& name,
478  std::shared_ptr<arrow::Table>& table,
479  uint64_t fragment_size) {
480  DBEngineImpl* engine = getImpl(this);
481  engine->importArrowTable(name, table, fragment_size);
482 }
483 
484 std::vector<std::string> DBEngine::getTables() {
485  DBEngineImpl* engine = getImpl(this);
486  return engine->getTables();
487 }
488 
489 std::vector<ColumnDetails> DBEngine::getTableDetails(const std::string& table_name) {
490  DBEngineImpl* engine = getImpl(this);
491  return engine->getTableDetails(table_name);
492 }
493 
494 bool DBEngine::setDatabase(std::string& db_name) {
495  DBEngineImpl* engine = getImpl(this);
496  return engine->setDatabase(db_name);
497 }
498 
499 bool DBEngine::login(std::string& db_name,
500  std::string& user_name,
501  const std::string& password) {
502  DBEngineImpl* engine = getImpl(this);
503  return engine->login(db_name, user_name, password);
504 }
505 
508 inline CursorImpl* getImpl(Cursor* ptr) {
509  return (CursorImpl*)ptr;
510 }
511 
512 inline const CursorImpl* getImpl(const Cursor* ptr) {
513  return (const CursorImpl*)ptr;
514 }
515 
519  CursorImpl* cursor = getImpl(this);
520  return cursor->getColCount();
521 }
522 
524  CursorImpl* cursor = getImpl(this);
525  return cursor->getRowCount();
526 }
527 
529  CursorImpl* cursor = getImpl(this);
530  return cursor->getNextRow();
531 }
532 
533 ColumnType Cursor::getColType(uint32_t col_num) {
534  CursorImpl* cursor = getImpl(this);
535  return cursor->getColType(col_num);
536 }
537 
538 std::shared_ptr<arrow::RecordBatch> Cursor::getArrowRecordBatch() {
539  CursorImpl* cursor = getImpl(this);
540  return cursor->getArrowRecordBatch();
541 }
542 } // namespace EmbeddedDatabase
Classes used to wrap parser calls for calcite redirection.
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:331
std::shared_ptr< CursorImpl > sql_execute_dbe(const TSessionId &session_id, const std::string &query_str, const bool column_format, const int32_t first_n, const int32_t at_most_n)
Definition: DBEngine.cpp:192
SQLTypes
Definition: sqltypes.h:37
std::string tableName
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size=0)
Definition: DBEngine.cpp:477
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:72
#define LOG(tag)
Definition: Logger.h:194
DiskCacheConfig disk_cache_config
std::string storageType
string name
Definition: setup.in.py:72
#define DEFAULT_MAX_CHUNK_SIZE
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:494
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:279
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:215
bool init(const std::string &cmd_line)
Definition: DBEngine.cpp:104
bool catalogExists(const std::string &base_path)
Definition: DBEngine.cpp:360
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
void createDBObject(const UserMetadata &user, const std::string &objectName, DBObjectType type, const Catalog_Namespace::Catalog &catalog, int32_t objectId=-1)
void releaseArrowTable(std::string name)
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:64
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:489
ColumnType sqlToColumnType(const SQLTypes &type)
Definition: DBETypes.cpp:111
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:338
EncodingType
Definition: sqltypes.h:227
#define DEFAULT_MAX_ROWS
size_t max_concurrent_render_sessions
static SysCatalog & instance()
Definition: SysCatalog.h:292
std::shared_ptr< Cursor > executeDML(const std::string &query)
Definition: DBEngine.cpp:467
std::shared_ptr< ResultSet > result_set_
Definition: DBEngine.cpp:90
std::vector< LeafHostInfo > db_leaves
const std::string OMNISCI_DEFAULT_DB
Definition: SysCatalog.h:59
CursorImpl(std::shared_ptr< ResultSet > result_set, std::vector< std::string > col_names)
Definition: DBEngine.cpp:43
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size)
Definition: DBEngine.cpp:219
std::shared_ptr< arrow::RecordBatch > record_batch_
Definition: DBEngine.cpp:92
std::shared_ptr< CursorImpl > executeRA(const std::string &query)
Definition: DBEngine.cpp:257
mapd::shared_ptr< DBHandler > db_handler_
Definition: DBEngine.cpp:429
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:538
std::vector< std::string > col_names_
Definition: DBEngine.cpp:91
std::string createCatalog(const std::string &base_path)
Definition: DBEngine.cpp:384
std::string keyMetainfo
AuthMetadata authMetadata
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool g_serialize_temp_tables
Definition: Catalog.cpp:98
#define DEFAULT_PAGE_SIZE
const std::string OMNISCI_ROOT_USER
Definition: SysCatalog.h:60
const std::string OMNISCI_ROOT_PASSWD_DEFAULT
Definition: SysCatalog.h:63
std::vector< std::string > getTables()
Definition: DBEngine.cpp:484
static std::shared_ptr< DBEngine > create(const std::string &cmd_line)
Definition: DBEngine.cpp:442
#define DEFAULT_FRAGMENT_ROWS
void shutdown()
Definition: Logger.cpp:314
Fragmenter_Namespace::FragmenterType fragType
Data_Namespace::MemoryLevel persistenceLevel
std::vector< std::string > udf_compiler_options
ColumnEncoding sqlToColumnEncoding(const EncodingType &type)
Definition: DBETypes.cpp:162
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:499
bool g_enable_watchdog false
Definition: Execute.cpp:76
bool g_enable_union
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:533
DBEngineImpl * getImpl(DBEngine *ptr)
Definition: DBEngine.cpp:453
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:462
#define IS_GEO(T)
Definition: sqltypes.h:245
std::vector< std::string > system_folders_
Definition: DBEngine.cpp:433
std::shared_ptr< CursorImpl > executeDML(const std::string &query)
Definition: DBEngine.cpp:253
std::shared_ptr< Cursor > executeRA(const std::string &query)
Definition: DBEngine.cpp:472
void cleanCatalog(const std::string &base_path)
Definition: DBEngine.cpp:373
SystemParameters system_parameters
std::vector< std::string > getTables()
Definition: DBEngine.cpp:261