OmniSciDB  a987f07e93
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBEngine.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "DBEngine.h"
18 #include <boost/filesystem.hpp>
19 #include <stdexcept>
22 #include "LockMgr/LockMgr.h"
23 #include "Parser/ParserWrapper.h"
25 #include "QueryEngine/Execute.h"
28 #include "Shared/SysDefinitions.h"
31 
32 extern bool g_enable_thrift_logs;
33 extern bool g_enable_union;
34 extern bool g_serialize_temp_tables;
35 
36 namespace EmbeddedDatabase {
37 
38 class DBEngineImpl;
39 
43 class CursorImpl : public Cursor {
44  public:
45  CursorImpl(std::shared_ptr<ResultSet> result_set, std::vector<std::string> col_names)
46  : result_set_(result_set), col_names_(col_names) {}
47 
49  col_names_.clear();
50  record_batch_.reset();
51  result_set_.reset();
52  }
53 
54  size_t getColCount() { return result_set_ ? result_set_->colCount() : 0; }
55 
56  size_t getRowCount() { return result_set_ ? result_set_->rowCount() : 0; }
57 
59  if (result_set_) {
60  auto row = result_set_->getNextRow(true, false);
61  return row.empty() ? Row() : Row(row);
62  }
63  return Row();
64  }
65 
66  ColumnType getColType(uint32_t col_num) {
67  if (col_num < getColCount()) {
68  SQLTypeInfo type_info = result_set_->getColType(col_num);
69  return sqlToColumnType(type_info.get_type());
70  }
71  return ColumnType::UNKNOWN;
72  }
73 
74  std::shared_ptr<arrow::RecordBatch> getArrowRecordBatch() {
75  if (record_batch_) {
76  return record_batch_;
77  }
78  auto col_count = getColCount();
79  if (col_count > 0) {
80  auto row_count = getRowCount();
81  if (row_count > 0) {
82  auto converter =
83  std::make_unique<ArrowResultSetConverter>(result_set_, col_names_, -1);
84  record_batch_ = converter->convertToArrow();
85  return record_batch_;
86  }
87  }
88  return nullptr;
89  }
90 
91  private:
92  std::shared_ptr<ResultSet> result_set_;
93  std::vector<std::string> col_names_;
94  std::shared_ptr<arrow::RecordBatch> record_batch_;
95 };
96 
100 class DBEngineImpl : public DBEngine {
101  public:
103 
105 
106  bool init(const std::string& cmd_line) {
107  static bool initialized{false};
108  if (initialized) {
109  throw std::runtime_error("Database engine already initialized");
110  }
111 
113 
114  // Split the command line into parameters
115  std::vector<std::string> parameters;
116  if (!cmd_line.empty()) {
117  parameters = boost::program_options::split_unix(cmd_line);
118  }
119 
120  // Generate command line to initialize CommandLineOptions for DBHandler
121  const char* log_option = "omnisci_dbe";
122  std::vector<const char*> cstrings;
123  cstrings.push_back(log_option);
124  for (auto& param : parameters) {
125  cstrings.push_back(param.c_str());
126  }
127  int argc = cstrings.size();
128  const char** argv = cstrings.data();
129 
130  CommandLineOptions prog_config_opts(log_option);
131  if (prog_config_opts.parse_command_line(argc, argv, false)) {
132  throw std::runtime_error("DBE paramerameters parsing failed");
133  }
134 
135  if (!g_enable_thrift_logs) {
136  apache::thrift::GlobalOutput.setOutputFunction([](const char* msg) {});
137  }
138 
139  auto base_path = prog_config_opts.base_path;
140 
141  // Check path to the database
142  bool is_new_db = base_path.empty() || !catalogExists(base_path);
143  if (is_new_db) {
144  base_path = createCatalog(base_path);
145  if (base_path.empty()) {
146  throw std::runtime_error("Database directory could not be created");
147  }
148  }
149  prog_config_opts.base_path = base_path;
150  prog_config_opts.init_logging();
151 
152  prog_config_opts.system_parameters.omnisci_server_port = -1;
153  prog_config_opts.system_parameters.calcite_keepalive = true;
154 
155  try {
156  db_handler_ =
157  std::make_shared<DBHandler>(prog_config_opts.db_leaves,
158  prog_config_opts.string_leaves,
159  prog_config_opts.base_path,
160  prog_config_opts.allow_multifrag,
161  prog_config_opts.jit_debug,
162  prog_config_opts.intel_jit_profile,
163  prog_config_opts.read_only,
164  prog_config_opts.allow_loop_joins,
165  prog_config_opts.enable_rendering,
166  prog_config_opts.renderer_use_ppll_polys,
167  prog_config_opts.renderer_prefer_igpu,
168  prog_config_opts.renderer_vulkan_timeout_ms,
169  prog_config_opts.enable_auto_clear_render_mem,
170  prog_config_opts.render_oom_retry_threshold,
171  prog_config_opts.render_mem_bytes,
172  prog_config_opts.max_concurrent_render_sessions,
173  prog_config_opts.reserved_gpu_mem,
174  prog_config_opts.render_compositor_use_last_gpu,
175  prog_config_opts.num_reader_threads,
176  prog_config_opts.authMetadata,
177  prog_config_opts.system_parameters,
178  prog_config_opts.enable_legacy_syntax,
179  prog_config_opts.idle_session_duration,
180  prog_config_opts.max_session_duration,
181  prog_config_opts.udf_file_name,
182  prog_config_opts.udf_compiler_path,
183  prog_config_opts.udf_compiler_options,
184 #ifdef ENABLE_GEOS
185  prog_config_opts.libgeos_so_filename,
186 #endif
187  prog_config_opts.disk_cache_config,
188  is_new_db);
189  } catch (const std::exception& e) {
190  LOG(FATAL) << "Failed to initialize database handler: " << e.what();
191  }
192  db_handler_->connect(session_id_,
196  base_path_ = base_path;
197  initialized = true;
198  return true;
199  }
200 
201  std::shared_ptr<CursorImpl> sql_execute_dbe(const TSessionId& session_id,
202  const std::string& query_str,
203  const bool column_format,
204  const int32_t first_n,
205  const int32_t at_most_n) {
206  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
209  nullptr,
210  nullptr,
211  0,
212  0),
213  {}};
215  db_handler_->sql_execute(
216  result, session_id, query_str, column_format, first_n, at_most_n, locks);
217  auto& targets = result.getTargetsMeta();
218  std::vector<std::string> col_names;
219  for (const auto target : targets) {
220  col_names.push_back(target.get_resname());
221  }
222  return std::make_shared<CursorImpl>(result.getRows(), col_names);
223  }
224 
225  void executeDDL(const std::string& query) {
226  auto res = sql_execute_dbe(session_id_, query, false, -1, -1);
227  }
228 
229  void importArrowTable(const std::string& name,
230  std::shared_ptr<arrow::Table>& table,
231  uint64_t fragment_size) {
232  setArrowTable(name, table);
233  try {
234  auto session = db_handler_->get_session_copy(session_id_);
235  TableDescriptor td;
236  td.tableName = name;
237  td.userId = session.get_currentUser().userId;
238  td.storageType = "ARROW:" + name;
240  td.isView = false;
241  td.fragmenter = nullptr;
243  td.maxFragRows = fragment_size > 0 ? fragment_size : DEFAULT_FRAGMENT_ROWS;
247  td.keyMetainfo = "[]";
248 
249  std::list<ColumnDescriptor> cols;
250  std::vector<Parser::SharedDictionaryDef> dictionaries;
251  auto catalog = session.get_catalog_ptr();
252  // nColumns
253  catalog->createTable(td, cols, dictionaries, false);
255  session.get_currentUser(), td.tableName, TableDBObjectType, *catalog);
256  } catch (...) {
257  releaseArrowTable(name);
258  throw;
259  }
260  releaseArrowTable(name);
261  }
262 
263  std::shared_ptr<CursorImpl> executeDML(const std::string& query) {
264  return sql_execute_dbe(session_id_, query, false, -1, -1);
265  }
266 
267  std::shared_ptr<CursorImpl> executeRA(const std::string& query) {
268  return sql_execute_dbe(session_id_, query, false, -1, -1);
269  }
270 
271  std::vector<std::string> getTables() {
272  std::vector<std::string> table_names;
273  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
274  if (catalog) {
275  const auto tables = catalog->getAllTableMetadata();
276  for (const auto td : tables) {
277  if (td->shard >= 0) {
278  // skip shards, they're not standalone tables
279  continue;
280  }
281  table_names.push_back(td->tableName);
282  }
283  } else {
284  throw std::runtime_error("System catalog uninitialized");
285  }
286  return table_names;
287  }
288 
289  std::vector<ColumnDetails> getTableDetails(const std::string& table_name) {
290  std::vector<ColumnDetails> result;
291  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
292  if (catalog) {
293  auto metadata = catalog->getMetadataForTable(table_name, false);
294  if (metadata) {
295  const auto col_descriptors =
296  catalog->getAllColumnMetadataForTable(metadata->tableId, false, true, false);
297  const auto deleted_cd = catalog->getDeletedColumn(metadata);
298  for (const auto cd : col_descriptors) {
299  if (cd == deleted_cd) {
300  continue;
301  }
302  ColumnDetails col_details;
303  col_details.col_name = cd->columnName;
304  auto ct = cd->columnType;
305  SQLTypes sql_type = ct.get_type();
306  EncodingType sql_enc = ct.get_compression();
307  col_details.col_type = sqlToColumnType(sql_type);
308  col_details.encoding = sqlToColumnEncoding(sql_enc);
309  col_details.nullable = !ct.get_notnull();
310  col_details.is_array = (sql_type == kARRAY);
311  if (IS_GEO(sql_type)) {
312  col_details.precision = static_cast<int>(ct.get_subtype());
313  col_details.scale = ct.get_output_srid();
314  } else {
315  col_details.precision = ct.get_precision();
316  col_details.scale = ct.get_scale();
317  }
318  if (col_details.encoding == ColumnEncoding::DICT) {
319  // have to get the actual size of the encoding from the dictionary
320  // definition
321  const int dict_id = ct.get_comp_param();
322  auto dd = catalog->getMetadataForDict(dict_id, false);
323  if (dd) {
324  col_details.comp_param = dd->dictNBits;
325  } else {
326  throw std::runtime_error("Dictionary definition for column doesn't exist");
327  }
328  } else {
329  col_details.comp_param = ct.get_comp_param();
330  if (ct.is_date_in_days() && col_details.comp_param == 0) {
331  col_details.comp_param = 32;
332  }
333  }
334  result.push_back(col_details);
335  }
336  }
337  }
338  return result;
339  }
340 
341  bool setDatabase(std::string& db_name) {
342  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
343  auto& user = db_handler_->get_session_copy(session_id_).get_currentUser();
344  sys_cat.switchDatabase(db_name, user.userName);
345  return true;
346  }
347 
348  bool login(std::string& db_name, std::string& user_name, const std::string& password) {
349  db_handler_->disconnect(session_id_);
350  db_handler_->connect(session_id_, user_name, password, db_name);
351  return true;
352  }
353 
354  protected:
355  void reset() {
356  if (db_handler_) {
357  db_handler_->disconnect(session_id_);
358  db_handler_->shutdown();
359  }
361  db_handler_.reset();
362 
364  if (is_temp_db_) {
365  boost::filesystem::remove_all(base_path_);
366  }
367  base_path_.clear();
368  }
369 
370  bool catalogExists(const std::string& base_path) {
371  if (!boost::filesystem::exists(base_path)) {
372  return false;
373  }
374  for (auto& subdir : system_folders_) {
375  std::string path = base_path + "/" + subdir;
376  if (!boost::filesystem::exists(path)) {
377  return false;
378  }
379  }
380  return true;
381  }
382 
383  void cleanCatalog(const std::string& base_path) {
384  if (boost::filesystem::exists(base_path)) {
385  for (auto& subdir : system_folders_) {
386  std::string path = base_path + "/" + subdir;
387  if (boost::filesystem::exists(path)) {
388  boost::filesystem::remove_all(path);
389  }
390  }
391  }
392  }
393 
394  std::string createCatalog(const std::string& base_path) {
395  std::string root_dir = base_path;
396  if (base_path.empty()) {
397  boost::system::error_code error;
398  auto tmp_path = boost::filesystem::temp_directory_path(error);
399  if (boost::system::errc::success != error.value()) {
400  std::cerr << error.message() << std::endl;
401  return "";
402  }
403  tmp_path /= "omnidbe_%%%%-%%%%-%%%%";
404  auto uniq_path = boost::filesystem::unique_path(tmp_path, error);
405  if (boost::system::errc::success != error.value()) {
406  std::cerr << error.message() << std::endl;
407  return "";
408  }
409  root_dir = uniq_path.string();
410  is_temp_db_ = true;
411  }
412  if (!boost::filesystem::exists(root_dir)) {
413  if (!boost::filesystem::create_directory(root_dir)) {
414  std::cerr << "Cannot create database directory: " << root_dir << std::endl;
415  return "";
416  }
417  }
418  size_t absent_count = 0;
419  for (auto& sub_dir : system_folders_) {
420  std::string path = root_dir + "/" + sub_dir;
421  if (!boost::filesystem::exists(path)) {
422  if (!boost::filesystem::create_directory(path)) {
423  std::cerr << "Cannot create database subdirectory: " << path << std::endl;
424  return "";
425  }
426  ++absent_count;
427  }
428  }
429  if ((absent_count > 0) && (absent_count < system_folders_.size())) {
430  std::cerr << "Database directory structure is broken: " << root_dir << std::endl;
431  return "";
432  }
433  return root_dir;
434  }
435 
436  private:
437  std::string base_path_;
438  std::string session_id_;
439  std::shared_ptr<DBHandler> db_handler_;
441  std::string udf_filename_;
442 
443  std::vector<std::string> system_folders_ = {shared::kCatalogDirectoryName,
446 };
447 
448 namespace {
450 }
451 
452 std::shared_ptr<DBEngine> DBEngine::create(const std::string& cmd_line) {
453  const std::lock_guard<std::mutex> lock(engine_create_mutex);
454  auto engine = std::make_shared<DBEngineImpl>();
455  if (!engine->init(cmd_line)) {
456  throw std::runtime_error("DBE initialization failed");
457  }
458  return engine;
459 }
460 
464  return (DBEngineImpl*)ptr;
465 }
466 inline const DBEngineImpl* getImpl(const DBEngine* ptr) {
467  return (const DBEngineImpl*)ptr;
468 }
469 
472 void DBEngine::executeDDL(const std::string& query) {
473  DBEngineImpl* engine = getImpl(this);
474  engine->executeDDL(query);
475 }
476 
477 std::shared_ptr<Cursor> DBEngine::executeDML(const std::string& query) {
478  DBEngineImpl* engine = getImpl(this);
479  return engine->executeDML(query);
480 }
481 
482 std::shared_ptr<Cursor> DBEngine::executeRA(const std::string& query) {
483  DBEngineImpl* engine = getImpl(this);
484  return engine->executeRA(query);
485 }
486 
487 void DBEngine::importArrowTable(const std::string& name,
488  std::shared_ptr<arrow::Table>& table,
489  uint64_t fragment_size) {
490  DBEngineImpl* engine = getImpl(this);
491  engine->importArrowTable(name, table, fragment_size);
492 }
493 
494 std::vector<std::string> DBEngine::getTables() {
495  DBEngineImpl* engine = getImpl(this);
496  return engine->getTables();
497 }
498 
499 std::vector<ColumnDetails> DBEngine::getTableDetails(const std::string& table_name) {
500  DBEngineImpl* engine = getImpl(this);
501  return engine->getTableDetails(table_name);
502 }
503 
504 bool DBEngine::setDatabase(std::string& db_name) {
505  DBEngineImpl* engine = getImpl(this);
506  return engine->setDatabase(db_name);
507 }
508 
509 bool DBEngine::login(std::string& db_name,
510  std::string& user_name,
511  const std::string& password) {
512  DBEngineImpl* engine = getImpl(this);
513  return engine->login(db_name, user_name, password);
514 }
515 
518 inline CursorImpl* getImpl(Cursor* ptr) {
519  return (CursorImpl*)ptr;
520 }
521 
522 inline const CursorImpl* getImpl(const Cursor* ptr) {
523  return (const CursorImpl*)ptr;
524 }
525 
529  CursorImpl* cursor = getImpl(this);
530  return cursor->getColCount();
531 }
532 
534  CursorImpl* cursor = getImpl(this);
535  return cursor->getRowCount();
536 }
537 
539  CursorImpl* cursor = getImpl(this);
540  return cursor->getNextRow();
541 }
542 
543 ColumnType Cursor::getColType(uint32_t col_num) {
544  CursorImpl* cursor = getImpl(this);
545  return cursor->getColType(col_num);
546 }
547 
548 std::shared_ptr<arrow::RecordBatch> Cursor::getArrowRecordBatch() {
549  CursorImpl* cursor = getImpl(this);
550  return cursor->getArrowRecordBatch();
551 }
552 } // namespace EmbeddedDatabase
Classes used to wrap parser calls for calcite redirection.
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
Definition: LockMgr.h:271
const std::string kDataDirectoryName
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:341
std::shared_ptr< DBHandler > db_handler_
Definition: DBEngine.cpp:439
std::shared_ptr< CursorImpl > sql_execute_dbe(const TSessionId &session_id, const std::string &query_str, const bool column_format, const int32_t first_n, const int32_t at_most_n)
Definition: DBEngine.cpp:201
SQLTypes
Definition: sqltypes.h:53
std::string tableName
unsigned renderer_vulkan_timeout_ms
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size=0)
Definition: DBEngine.cpp:487
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:74
#define LOG(tag)
Definition: Logger.h:283
std::string storageType
#define DEFAULT_MAX_CHUNK_SIZE
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:504
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:289
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:225
bool init(const std::string &cmd_line)
Definition: DBEngine.cpp:106
bool catalogExists(const std::string &base_path)
Definition: DBEngine.cpp:370
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
const std::string kDefaultExportDirName
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:380
void createDBObject(const UserMetadata &user, const std::string &objectName, DBObjectType type, const Catalog_Namespace::Catalog &catalog, int32_t objectId=-1)
void releaseArrowTable(std::string name)
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:66
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:499
ColumnType sqlToColumnType(const SQLTypes &type)
Definition: DBETypes.cpp:111
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:348
EncodingType
Definition: sqltypes.h:228
#define DEFAULT_MAX_ROWS
size_t max_concurrent_render_sessions
Supported runtime functions management and retrieval.
static SysCatalog & instance()
Definition: SysCatalog.h:341
std::shared_ptr< Cursor > executeDML(const std::string &query)
Definition: DBEngine.cpp:477
std::shared_ptr< ResultSet > result_set_
Definition: DBEngine.cpp:92
std::vector< LeafHostInfo > db_leaves
const std::string kDefaultDbName
CursorImpl(std::shared_ptr< ResultSet > result_set, std::vector< std::string > col_names)
Definition: DBEngine.cpp:45
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size)
Definition: DBEngine.cpp:229
std::shared_ptr< arrow::RecordBatch > record_batch_
Definition: DBEngine.cpp:94
std::shared_ptr< CursorImpl > executeRA(const std::string &query)
Definition: DBEngine.cpp:267
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:548
std::vector< std::string > col_names_
Definition: DBEngine.cpp:93
std::string createCatalog(const std::string &base_path)
Definition: DBEngine.cpp:394
std::string keyMetainfo
AuthMetadata authMetadata
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool g_serialize_temp_tables
Definition: Catalog.cpp:106
#define DEFAULT_PAGE_SIZE
const std::string kRootUsername
std::vector< std::string > getTables()
Definition: DBEngine.cpp:494
static std::shared_ptr< DBEngine > create(const std::string &cmd_line)
Definition: DBEngine.cpp:452
const std::string kDefaultRootPasswd
#define DEFAULT_FRAGMENT_ROWS
void shutdown()
Definition: Logger.cpp:397
Fragmenter_Namespace::FragmenterType fragType
Data_Namespace::MemoryLevel persistenceLevel
std::vector< std::string > udf_compiler_options
ColumnEncoding sqlToColumnEncoding(const EncodingType &type)
Definition: DBETypes.cpp:162
const std::string kCatalogDirectoryName
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:509
bool g_enable_watchdog false
Definition: Execute.cpp:79
bool g_enable_union
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:543
DBEngineImpl * getImpl(DBEngine *ptr)
Definition: DBEngine.cpp:463
File_Namespace::DiskCacheConfig disk_cache_config
string name
Definition: setup.in.py:72
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:472
bool g_enable_thrift_logs
Definition: HeavyDB.cpp:289
#define IS_GEO(T)
Definition: sqltypes.h:298
std::vector< std::string > system_folders_
Definition: DBEngine.cpp:443
std::shared_ptr< CursorImpl > executeDML(const std::string &query)
Definition: DBEngine.cpp:263
std::shared_ptr< Cursor > executeRA(const std::string &query)
Definition: DBEngine.cpp:482
void cleanCatalog(const std::string &base_path)
Definition: DBEngine.cpp:383
SystemParameters system_parameters
std::vector< std::string > getTables()
Definition: DBEngine.cpp:271