OmniSciDB  471d68cefb
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBEngine.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "DBEngine.h"
18 #include <boost/filesystem.hpp>
19 #include <stdexcept>
22 #include "Parser/ParserWrapper.h"
23 #include "Parser/parser.h"
25 #include "QueryEngine/Execute.h"
30 
31 extern bool g_enable_union;
32 extern bool g_serialize_temp_tables;
33 
34 namespace EmbeddedDatabase {
35 
36 class DBEngineImpl;
37 
41 class CursorImpl : public Cursor {
42  public:
43  CursorImpl(std::shared_ptr<ResultSet> result_set, std::vector<std::string> col_names)
44  : result_set_(result_set), col_names_(col_names) {}
45 
47  col_names_.clear();
48  record_batch_.reset();
49  result_set_.reset();
50  }
51 
52  size_t getColCount() { return result_set_ ? result_set_->colCount() : 0; }
53 
54  size_t getRowCount() { return result_set_ ? result_set_->rowCount() : 0; }
55 
57  if (result_set_) {
58  auto row = result_set_->getNextRow(true, false);
59  return row.empty() ? Row() : Row(row);
60  }
61  return Row();
62  }
63 
64  ColumnType getColType(uint32_t col_num) {
65  if (col_num < getColCount()) {
66  SQLTypeInfo type_info = result_set_->getColType(col_num);
67  return sqlToColumnType(type_info.get_type());
68  }
69  return ColumnType::UNKNOWN;
70  }
71 
72  std::shared_ptr<arrow::RecordBatch> getArrowRecordBatch() {
73  if (record_batch_) {
74  return record_batch_;
75  }
76  auto col_count = getColCount();
77  if (col_count > 0) {
78  auto row_count = getRowCount();
79  if (row_count > 0) {
80  auto converter =
81  std::make_unique<ArrowResultSetConverter>(result_set_, col_names_, -1);
82  record_batch_ = converter->convertToArrow();
83  return record_batch_;
84  }
85  }
86  return nullptr;
87  }
88 
89  private:
90  std::shared_ptr<ResultSet> result_set_;
91  std::vector<std::string> col_names_;
92  std::shared_ptr<arrow::RecordBatch> record_batch_;
93 };
94 
98 class DBEngineImpl : public DBEngine {
99  public:
101 
103 
104  bool init(const std::string& cmd_line) {
105  static bool initialized{false};
106  if (initialized) {
107  throw std::runtime_error("Database engine already initialized");
108  }
109 
111 
112  // Split the command line into parameters
113  std::vector<std::string> parameters;
114  if (!cmd_line.empty()) {
115  parameters = boost::program_options::split_unix(cmd_line);
116  }
117 
118  // Generate command line to initialize CommandLineOptions for DBHandler
119  const char* log_option = "omnisci_dbe";
120  std::vector<const char*> cstrings;
121  cstrings.push_back(log_option);
122  for (auto& param : parameters) {
123  cstrings.push_back(param.c_str());
124  }
125  int argc = cstrings.size();
126  const char** argv = cstrings.data();
127 
128  CommandLineOptions prog_config_opts(log_option);
129  if (prog_config_opts.parse_command_line(argc, argv, false)) {
130  throw std::runtime_error("DBE paramerameters parsing failed");
131  }
132 
133  auto base_path = prog_config_opts.base_path;
134 
135  // Check path to the database
136  bool is_new_db = base_path.empty() || !catalogExists(base_path);
137  if (is_new_db) {
138  base_path = createCatalog(base_path);
139  if (base_path.empty()) {
140  throw std::runtime_error("Database directory could not be created");
141  }
142  }
143  prog_config_opts.base_path = base_path;
144  prog_config_opts.init_logging();
145 
146  prog_config_opts.system_parameters.omnisci_server_port = -1;
147  prog_config_opts.system_parameters.calcite_keepalive = true;
148 
149  try {
150  db_handler_ =
151  std::make_shared<DBHandler>(prog_config_opts.db_leaves,
152  prog_config_opts.string_leaves,
153  prog_config_opts.base_path,
154  prog_config_opts.allow_multifrag,
155  prog_config_opts.jit_debug,
156  prog_config_opts.intel_jit_profile,
157  prog_config_opts.read_only,
158  prog_config_opts.allow_loop_joins,
159  prog_config_opts.enable_rendering,
160  prog_config_opts.renderer_use_vulkan_driver,
161  prog_config_opts.renderer_prefer_igpu,
162  prog_config_opts.enable_auto_clear_render_mem,
163  prog_config_opts.render_oom_retry_threshold,
164  prog_config_opts.render_mem_bytes,
165  prog_config_opts.max_concurrent_render_sessions,
166  prog_config_opts.reserved_gpu_mem,
167  prog_config_opts.render_compositor_use_last_gpu,
168  prog_config_opts.num_reader_threads,
169  prog_config_opts.authMetadata,
170  prog_config_opts.system_parameters,
171  prog_config_opts.enable_legacy_syntax,
172  prog_config_opts.idle_session_duration,
173  prog_config_opts.max_session_duration,
174  prog_config_opts.enable_runtime_udf,
175  prog_config_opts.udf_file_name,
176  prog_config_opts.udf_compiler_path,
177  prog_config_opts.udf_compiler_options,
178 #ifdef ENABLE_GEOS
179  prog_config_opts.libgeos_so_filename,
180 #endif
181  prog_config_opts.disk_cache_config,
182  is_new_db);
183  } catch (const std::exception& e) {
184  LOG(FATAL) << "Failed to initialize database handler: " << e.what();
185  }
186  db_handler_->connect(
188  base_path_ = base_path;
189  initialized = true;
190  return true;
191  }
192 
193  std::shared_ptr<CursorImpl> sql_execute_dbe(const TSessionId& session_id,
194  const std::string& query_str,
195  const bool column_format,
196  const int32_t first_n,
197  const int32_t at_most_n) {
198  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
201  nullptr,
202  nullptr,
203  0,
204  0),
205  {}};
206  db_handler_->sql_execute(
207  result, session_id, query_str, column_format, first_n, at_most_n);
208  auto& targets = result.getTargetsMeta();
209  std::vector<std::string> col_names;
210  for (const auto target : targets) {
211  col_names.push_back(target.get_resname());
212  }
213  return std::make_shared<CursorImpl>(result.getRows(), col_names);
214  }
215 
216  void executeDDL(const std::string& query) {
217  auto res = sql_execute_dbe(session_id_, query, false, -1, -1);
218  }
219 
220  void importArrowTable(const std::string& name,
221  std::shared_ptr<arrow::Table>& table,
222  uint64_t fragment_size) {
223  setArrowTable(name, table);
224  try {
225  auto session = db_handler_->get_session_copy(session_id_);
226  TableDescriptor td;
227  td.tableName = name;
228  td.userId = session.get_currentUser().userId;
229  td.storageType = "ARROW:" + name;
231  td.isView = false;
232  td.fragmenter = nullptr;
234  td.maxFragRows = fragment_size > 0 ? fragment_size : DEFAULT_FRAGMENT_ROWS;
238  td.keyMetainfo = "[]";
239 
240  std::list<ColumnDescriptor> cols;
241  std::vector<Parser::SharedDictionaryDef> dictionaries;
242  auto catalog = session.get_catalog_ptr();
243  // nColumns
244  catalog->createTable(td, cols, dictionaries, false);
246  session.get_currentUser(), td.tableName, TableDBObjectType, *catalog);
247  } catch (...) {
248  releaseArrowTable(name);
249  throw;
250  }
251  releaseArrowTable(name);
252  }
253 
254  std::shared_ptr<CursorImpl> executeDML(const std::string& query) {
255  return sql_execute_dbe(session_id_, query, false, -1, -1);
256  }
257 
258  std::shared_ptr<CursorImpl> executeRA(const std::string& query) {
259  return sql_execute_dbe(session_id_, query, false, -1, -1);
260  }
261 
262  std::vector<std::string> getTables() {
263  std::vector<std::string> table_names;
264  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
265  if (catalog) {
266  const auto tables = catalog->getAllTableMetadata();
267  for (const auto td : tables) {
268  if (td->shard >= 0) {
269  // skip shards, they're not standalone tables
270  continue;
271  }
272  table_names.push_back(td->tableName);
273  }
274  } else {
275  throw std::runtime_error("System catalog uninitialized");
276  }
277  return table_names;
278  }
279 
280  std::vector<ColumnDetails> getTableDetails(const std::string& table_name) {
281  std::vector<ColumnDetails> result;
282  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
283  if (catalog) {
284  auto metadata = catalog->getMetadataForTable(table_name, false);
285  if (metadata) {
286  const auto col_descriptors =
287  catalog->getAllColumnMetadataForTable(metadata->tableId, false, true, false);
288  const auto deleted_cd = catalog->getDeletedColumn(metadata);
289  for (const auto cd : col_descriptors) {
290  if (cd == deleted_cd) {
291  continue;
292  }
293  ColumnDetails col_details;
294  col_details.col_name = cd->columnName;
295  auto ct = cd->columnType;
296  SQLTypes sql_type = ct.get_type();
297  EncodingType sql_enc = ct.get_compression();
298  col_details.col_type = sqlToColumnType(sql_type);
299  col_details.encoding = sqlToColumnEncoding(sql_enc);
300  col_details.nullable = !ct.get_notnull();
301  col_details.is_array = (sql_type == kARRAY);
302  if (IS_GEO(sql_type)) {
303  col_details.precision = static_cast<int>(ct.get_subtype());
304  col_details.scale = ct.get_output_srid();
305  } else {
306  col_details.precision = ct.get_precision();
307  col_details.scale = ct.get_scale();
308  }
309  if (col_details.encoding == ColumnEncoding::DICT) {
310  // have to get the actual size of the encoding from the dictionary
311  // definition
312  const int dict_id = ct.get_comp_param();
313  auto dd = catalog->getMetadataForDict(dict_id, false);
314  if (dd) {
315  col_details.comp_param = dd->dictNBits;
316  } else {
317  throw std::runtime_error("Dictionary definition for column doesn't exist");
318  }
319  } else {
320  col_details.comp_param = ct.get_comp_param();
321  if (ct.is_date_in_days() && col_details.comp_param == 0) {
322  col_details.comp_param = 32;
323  }
324  }
325  result.push_back(col_details);
326  }
327  }
328  }
329  return result;
330  }
331 
332  bool setDatabase(std::string& db_name) {
333  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
334  auto& user = db_handler_->get_session_copy(session_id_).get_currentUser();
335  sys_cat.switchDatabase(db_name, user.userName);
336  return true;
337  }
338 
339  bool login(std::string& db_name, std::string& user_name, const std::string& password) {
340  db_handler_->disconnect(session_id_);
341  db_handler_->connect(session_id_, user_name, password, db_name);
342  return true;
343  }
344 
345  protected:
346  void reset() {
347  if (db_handler_) {
348  db_handler_->disconnect(session_id_);
349  db_handler_->shutdown();
350  }
352  db_handler_.reset();
353 
355  if (is_temp_db_) {
356  boost::filesystem::remove_all(base_path_);
357  }
358  base_path_.clear();
359  }
360 
361  bool catalogExists(const std::string& base_path) {
362  if (!boost::filesystem::exists(base_path)) {
363  return false;
364  }
365  for (auto& subdir : system_folders_) {
366  std::string path = base_path + "/" + subdir;
367  if (!boost::filesystem::exists(path)) {
368  return false;
369  }
370  }
371  return true;
372  }
373 
374  void cleanCatalog(const std::string& base_path) {
375  if (boost::filesystem::exists(base_path)) {
376  for (auto& subdir : system_folders_) {
377  std::string path = base_path + "/" + subdir;
378  if (boost::filesystem::exists(path)) {
379  boost::filesystem::remove_all(path);
380  }
381  }
382  }
383  }
384 
385  std::string createCatalog(const std::string& base_path) {
386  std::string root_dir = base_path;
387  if (base_path.empty()) {
388  boost::system::error_code error;
389  auto tmp_path = boost::filesystem::temp_directory_path(error);
390  if (boost::system::errc::success != error.value()) {
391  std::cerr << error.message() << std::endl;
392  return "";
393  }
394  tmp_path /= "omnidbe_%%%%-%%%%-%%%%";
395  auto uniq_path = boost::filesystem::unique_path(tmp_path, error);
396  if (boost::system::errc::success != error.value()) {
397  std::cerr << error.message() << std::endl;
398  return "";
399  }
400  root_dir = uniq_path.string();
401  is_temp_db_ = true;
402  }
403  if (!boost::filesystem::exists(root_dir)) {
404  if (!boost::filesystem::create_directory(root_dir)) {
405  std::cerr << "Cannot create database directory: " << root_dir << std::endl;
406  return "";
407  }
408  }
409  size_t absent_count = 0;
410  for (auto& sub_dir : system_folders_) {
411  std::string path = root_dir + "/" + sub_dir;
412  if (!boost::filesystem::exists(path)) {
413  if (!boost::filesystem::create_directory(path)) {
414  std::cerr << "Cannot create database subdirectory: " << path << std::endl;
415  return "";
416  }
417  ++absent_count;
418  }
419  }
420  if ((absent_count > 0) && (absent_count < system_folders_.size())) {
421  std::cerr << "Database directory structure is broken: " << root_dir << std::endl;
422  return "";
423  }
424  return root_dir;
425  }
426 
427  private:
428  std::string base_path_;
429  std::string session_id_;
430  std::shared_ptr<DBHandler> db_handler_;
432  std::string udf_filename_;
433 
434  std::vector<std::string> system_folders_ = {"mapd_catalogs",
435  "mapd_data",
436  "mapd_export"};
437 };
438 
439 namespace {
441 }
442 
443 std::shared_ptr<DBEngine> DBEngine::create(const std::string& cmd_line) {
444  const std::lock_guard<std::mutex> lock(engine_create_mutex);
445  auto engine = std::make_shared<DBEngineImpl>();
446  if (!engine->init(cmd_line)) {
447  throw std::runtime_error("DBE initialization failed");
448  }
449  return engine;
450 }
451 
455  return (DBEngineImpl*)ptr;
456 }
457 inline const DBEngineImpl* getImpl(const DBEngine* ptr) {
458  return (const DBEngineImpl*)ptr;
459 }
460 
463 void DBEngine::executeDDL(const std::string& query) {
464  DBEngineImpl* engine = getImpl(this);
465  engine->executeDDL(query);
466 }
467 
468 std::shared_ptr<Cursor> DBEngine::executeDML(const std::string& query) {
469  DBEngineImpl* engine = getImpl(this);
470  return engine->executeDML(query);
471 }
472 
473 std::shared_ptr<Cursor> DBEngine::executeRA(const std::string& query) {
474  DBEngineImpl* engine = getImpl(this);
475  return engine->executeRA(query);
476 }
477 
478 void DBEngine::importArrowTable(const std::string& name,
479  std::shared_ptr<arrow::Table>& table,
480  uint64_t fragment_size) {
481  DBEngineImpl* engine = getImpl(this);
482  engine->importArrowTable(name, table, fragment_size);
483 }
484 
485 std::vector<std::string> DBEngine::getTables() {
486  DBEngineImpl* engine = getImpl(this);
487  return engine->getTables();
488 }
489 
490 std::vector<ColumnDetails> DBEngine::getTableDetails(const std::string& table_name) {
491  DBEngineImpl* engine = getImpl(this);
492  return engine->getTableDetails(table_name);
493 }
494 
495 bool DBEngine::setDatabase(std::string& db_name) {
496  DBEngineImpl* engine = getImpl(this);
497  return engine->setDatabase(db_name);
498 }
499 
500 bool DBEngine::login(std::string& db_name,
501  std::string& user_name,
502  const std::string& password) {
503  DBEngineImpl* engine = getImpl(this);
504  return engine->login(db_name, user_name, password);
505 }
506 
509 inline CursorImpl* getImpl(Cursor* ptr) {
510  return (CursorImpl*)ptr;
511 }
512 
513 inline const CursorImpl* getImpl(const Cursor* ptr) {
514  return (const CursorImpl*)ptr;
515 }
516 
520  CursorImpl* cursor = getImpl(this);
521  return cursor->getColCount();
522 }
523 
525  CursorImpl* cursor = getImpl(this);
526  return cursor->getRowCount();
527 }
528 
530  CursorImpl* cursor = getImpl(this);
531  return cursor->getNextRow();
532 }
533 
534 ColumnType Cursor::getColType(uint32_t col_num) {
535  CursorImpl* cursor = getImpl(this);
536  return cursor->getColType(col_num);
537 }
538 
539 std::shared_ptr<arrow::RecordBatch> Cursor::getArrowRecordBatch() {
540  CursorImpl* cursor = getImpl(this);
541  return cursor->getArrowRecordBatch();
542 }
543 } // namespace EmbeddedDatabase
Classes used to wrap parser calls for calcite redirection.
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:332
std::shared_ptr< DBHandler > db_handler_
Definition: DBEngine.cpp:430
std::shared_ptr< CursorImpl > sql_execute_dbe(const TSessionId &session_id, const std::string &query_str, const bool column_format, const int32_t first_n, const int32_t at_most_n)
Definition: DBEngine.cpp:193
SQLTypes
Definition: sqltypes.h:38
std::string tableName
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size=0)
Definition: DBEngine.cpp:478
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:72
#define LOG(tag)
Definition: Logger.h:203
std::string storageType
string name
Definition: setup.in.py:72
#define DEFAULT_MAX_CHUNK_SIZE
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:495
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:280
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:216
bool init(const std::string &cmd_line)
Definition: DBEngine.cpp:104
bool catalogExists(const std::string &base_path)
Definition: DBEngine.cpp:361
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
void createDBObject(const UserMetadata &user, const std::string &objectName, DBObjectType type, const Catalog_Namespace::Catalog &catalog, int32_t objectId=-1)
void releaseArrowTable(std::string name)
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:64
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:490
ColumnType sqlToColumnType(const SQLTypes &type)
Definition: DBETypes.cpp:111
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:339
EncodingType
Definition: sqltypes.h:233
#define DEFAULT_MAX_ROWS
size_t max_concurrent_render_sessions
static SysCatalog & instance()
Definition: SysCatalog.h:325
std::shared_ptr< Cursor > executeDML(const std::string &query)
Definition: DBEngine.cpp:468
std::shared_ptr< ResultSet > result_set_
Definition: DBEngine.cpp:90
std::vector< LeafHostInfo > db_leaves
const std::string OMNISCI_DEFAULT_DB
Definition: SysCatalog.h:59
CursorImpl(std::shared_ptr< ResultSet > result_set, std::vector< std::string > col_names)
Definition: DBEngine.cpp:43
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size)
Definition: DBEngine.cpp:220
std::shared_ptr< arrow::RecordBatch > record_batch_
Definition: DBEngine.cpp:92
std::shared_ptr< CursorImpl > executeRA(const std::string &query)
Definition: DBEngine.cpp:258
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:539
std::vector< std::string > col_names_
Definition: DBEngine.cpp:91
std::string createCatalog(const std::string &base_path)
Definition: DBEngine.cpp:385
std::string keyMetainfo
AuthMetadata authMetadata
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool g_serialize_temp_tables
Definition: Catalog.cpp:100
#define DEFAULT_PAGE_SIZE
const std::string OMNISCI_ROOT_USER
Definition: SysCatalog.h:60
const std::string OMNISCI_ROOT_PASSWD_DEFAULT
Definition: SysCatalog.h:63
std::vector< std::string > getTables()
Definition: DBEngine.cpp:485
static std::shared_ptr< DBEngine > create(const std::string &cmd_line)
Definition: DBEngine.cpp:443
#define DEFAULT_FRAGMENT_ROWS
void shutdown()
Definition: Logger.cpp:325
Fragmenter_Namespace::FragmenterType fragType
Data_Namespace::MemoryLevel persistenceLevel
std::vector< std::string > udf_compiler_options
ColumnEncoding sqlToColumnEncoding(const EncodingType &type)
Definition: DBETypes.cpp:162
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:500
bool g_enable_watchdog false
Definition: Execute.cpp:76
bool g_enable_union
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:534
DBEngineImpl * getImpl(DBEngine *ptr)
Definition: DBEngine.cpp:454
File_Namespace::DiskCacheConfig disk_cache_config
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:463
#define IS_GEO(T)
Definition: sqltypes.h:251
std::vector< std::string > system_folders_
Definition: DBEngine.cpp:434
std::shared_ptr< CursorImpl > executeDML(const std::string &query)
Definition: DBEngine.cpp:254
std::shared_ptr< Cursor > executeRA(const std::string &query)
Definition: DBEngine.cpp:473
void cleanCatalog(const std::string &base_path)
Definition: DBEngine.cpp:374
SystemParameters system_parameters
std::vector< std::string > getTables()
Definition: DBEngine.cpp:262