OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
DBEngine.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "DBEngine.h"
18 #include <boost/filesystem.hpp>
19 #include <stdexcept>
22 #include "LockMgr/LockMgr.h"
23 #include "Parser/ParserWrapper.h"
25 #include "QueryEngine/Execute.h"
28 #include "Shared/SysDefinitions.h"
31 
32 extern bool g_enable_thrift_logs;
33 extern bool g_enable_union;
34 extern bool g_serialize_temp_tables;
35 
36 namespace EmbeddedDatabase {
37 
38 class DBEngineImpl;
39 
43 class CursorImpl : public Cursor {
44  public:
45  CursorImpl(std::shared_ptr<ResultSet> result_set, std::vector<std::string> col_names)
46  : result_set_(result_set), col_names_(col_names) {}
47 
49  col_names_.clear();
50  record_batch_.reset();
51  result_set_.reset();
52  }
53 
54  size_t getColCount() { return result_set_ ? result_set_->colCount() : 0; }
55 
56  size_t getRowCount() { return result_set_ ? result_set_->rowCount() : 0; }
57 
59  if (result_set_) {
60  auto row = result_set_->getNextRow(true, false);
61  return row.empty() ? Row() : Row(row);
62  }
63  return Row();
64  }
65 
66  ColumnType getColType(uint32_t col_num) {
67  if (col_num < getColCount()) {
68  SQLTypeInfo type_info = result_set_->getColType(col_num);
69  return sqlToColumnType(type_info.get_type());
70  }
71  return ColumnType::UNKNOWN;
72  }
73 
74  std::shared_ptr<arrow::RecordBatch> getArrowRecordBatch() {
75  if (record_batch_) {
76  return record_batch_;
77  }
78  auto col_count = getColCount();
79  if (col_count > 0) {
80  auto row_count = getRowCount();
81  if (row_count > 0) {
82  auto converter =
83  std::make_unique<ArrowResultSetConverter>(result_set_, col_names_, -1);
84  record_batch_ = converter->convertToArrow();
85  return record_batch_;
86  }
87  }
88  return nullptr;
89  }
90 
91  private:
92  std::shared_ptr<ResultSet> result_set_;
93  std::vector<std::string> col_names_;
94  std::shared_ptr<arrow::RecordBatch> record_batch_;
95 };
96 
100 class DBEngineImpl : public DBEngine {
101  public:
103 
105 
106  bool init(const std::string& cmd_line) {
107  static bool initialized{false};
108  if (initialized) {
109  throw std::runtime_error("Database engine already initialized");
110  }
111 
113 
114  // Split the command line into parameters
115  std::vector<std::string> parameters;
116  if (!cmd_line.empty()) {
117  parameters = boost::program_options::split_unix(cmd_line);
118  }
119 
120  // Generate command line to initialize CommandLineOptions for DBHandler
121  const char* log_option = "omnisci_dbe";
122  std::vector<const char*> cstrings;
123  cstrings.push_back(log_option);
124  for (auto& param : parameters) {
125  cstrings.push_back(param.c_str());
126  }
127  int argc = cstrings.size();
128  const char** argv = cstrings.data();
129 
130  CommandLineOptions prog_config_opts(log_option);
131  if (prog_config_opts.parse_command_line(argc, argv, false)) {
132  throw std::runtime_error("DBE paramerameters parsing failed");
133  }
134 
135  if (!g_enable_thrift_logs) {
136  apache::thrift::GlobalOutput.setOutputFunction([](const char* msg) {});
137  }
138 
139  auto base_path = prog_config_opts.base_path;
140 
141  // Check path to the database
142  bool is_new_db = base_path.empty() || !catalogExists(base_path);
143  if (is_new_db) {
144  base_path = createCatalog(base_path);
145  if (base_path.empty()) {
146  throw std::runtime_error("Database directory could not be created");
147  }
148  }
149  prog_config_opts.base_path = base_path;
150  prog_config_opts.init_logging();
151 
152  prog_config_opts.system_parameters.omnisci_server_port = -1;
153  prog_config_opts.system_parameters.calcite_keepalive = true;
154 
155  try {
156  db_handler_ =
157  std::make_shared<DBHandler>(prog_config_opts.db_leaves,
158  prog_config_opts.string_leaves,
159  prog_config_opts.base_path,
160  prog_config_opts.allow_multifrag,
161  prog_config_opts.jit_debug,
162  prog_config_opts.intel_jit_profile,
163  prog_config_opts.read_only,
164  prog_config_opts.allow_loop_joins,
165  prog_config_opts.enable_rendering,
166  prog_config_opts.renderer_use_ppll_polys,
167  prog_config_opts.renderer_prefer_igpu,
168  prog_config_opts.renderer_vulkan_timeout_ms,
169  prog_config_opts.enable_auto_clear_render_mem,
170  prog_config_opts.render_oom_retry_threshold,
171  prog_config_opts.render_mem_bytes,
172  prog_config_opts.max_concurrent_render_sessions,
173  prog_config_opts.reserved_gpu_mem,
174  prog_config_opts.render_compositor_use_last_gpu,
175  prog_config_opts.num_reader_threads,
176  prog_config_opts.authMetadata,
177  prog_config_opts.system_parameters,
178  prog_config_opts.enable_legacy_syntax,
179  prog_config_opts.idle_session_duration,
180  prog_config_opts.max_session_duration,
181  prog_config_opts.udf_file_name,
182  prog_config_opts.udf_compiler_path,
183  prog_config_opts.udf_compiler_options,
184 #ifdef ENABLE_GEOS
185  prog_config_opts.libgeos_so_filename,
186 #endif
187  prog_config_opts.disk_cache_config,
188  is_new_db);
189  } catch (const std::exception& e) {
190  LOG(FATAL) << "Failed to initialize database handler: " << e.what();
191  }
192  db_handler_->connect(session_id_,
196  base_path_ = base_path;
197  initialized = true;
198  return true;
199  }
200 
201  std::shared_ptr<CursorImpl> sql_execute_dbe(const TSessionId& session_id,
202  const std::string& query_str,
203  const bool column_format,
204  const int32_t first_n,
205  const int32_t at_most_n) {
206  ExecutionResult result{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
209  nullptr,
210  0,
211  0),
212  {}};
214  db_handler_->sql_execute(
215  result, session_id, query_str, column_format, first_n, at_most_n, locks);
216  auto& targets = result.getTargetsMeta();
217  std::vector<std::string> col_names;
218  for (const auto target : targets) {
219  col_names.push_back(target.get_resname());
220  }
221  return std::make_shared<CursorImpl>(result.getRows(), col_names);
222  }
223 
224  void executeDDL(const std::string& query) {
225  auto res = sql_execute_dbe(session_id_, query, false, -1, -1);
226  }
227 
228  void importArrowTable(const std::string& name,
229  std::shared_ptr<arrow::Table>& table,
230  uint64_t fragment_size) {
231  setArrowTable(name, table);
232  try {
233  auto session = db_handler_->get_session_copy(session_id_);
234  TableDescriptor td;
235  td.tableName = name;
236  td.userId = session.get_currentUser().userId;
237  td.storageType = "ARROW:" + name;
239  td.isView = false;
240  td.fragmenter = nullptr;
242  td.maxFragRows = fragment_size > 0 ? fragment_size : DEFAULT_FRAGMENT_ROWS;
246  td.keyMetainfo = "[]";
247 
248  std::list<ColumnDescriptor> cols;
249  std::vector<Parser::SharedDictionaryDef> dictionaries;
250  auto catalog = session.get_catalog_ptr();
251  // nColumns
252  catalog->createTable(td, cols, dictionaries, false);
254  session.get_currentUser(), td.tableName, TableDBObjectType, *catalog);
255  } catch (...) {
256  releaseArrowTable(name);
257  throw;
258  }
259  releaseArrowTable(name);
260  }
261 
262  std::shared_ptr<CursorImpl> executeDML(const std::string& query) {
263  return sql_execute_dbe(session_id_, query, false, -1, -1);
264  }
265 
266  std::shared_ptr<CursorImpl> executeRA(const std::string& query) {
267  return sql_execute_dbe(session_id_, query, false, -1, -1);
268  }
269 
270  std::vector<std::string> getTables() {
271  std::vector<std::string> table_names;
272  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
273  if (catalog) {
274  const auto tables = catalog->getAllTableMetadata();
275  for (const auto td : tables) {
276  if (td->shard >= 0) {
277  // skip shards, they're not standalone tables
278  continue;
279  }
280  table_names.push_back(td->tableName);
281  }
282  } else {
283  throw std::runtime_error("System catalog uninitialized");
284  }
285  return table_names;
286  }
287 
288  std::vector<ColumnDetails> getTableDetails(const std::string& table_name) {
289  std::vector<ColumnDetails> result;
290  auto catalog = db_handler_->get_session_copy(session_id_).get_catalog_ptr();
291  if (catalog) {
292  auto metadata = catalog->getMetadataForTable(table_name, false);
293  if (metadata) {
294  const auto col_descriptors =
295  catalog->getAllColumnMetadataForTable(metadata->tableId, false, true, false);
296  const auto deleted_cd = catalog->getDeletedColumn(metadata);
297  for (const auto cd : col_descriptors) {
298  if (cd == deleted_cd) {
299  continue;
300  }
301  ColumnDetails col_details;
302  col_details.col_name = cd->columnName;
303  auto ct = cd->columnType;
304  SQLTypes sql_type = ct.get_type();
305  EncodingType sql_enc = ct.get_compression();
306  col_details.col_type = sqlToColumnType(sql_type);
307  col_details.encoding = sqlToColumnEncoding(sql_enc);
308  col_details.nullable = !ct.get_notnull();
309  col_details.is_array = (sql_type == kARRAY);
310  if (IS_GEO(sql_type)) {
311  col_details.precision = static_cast<int>(ct.get_subtype());
312  col_details.scale = ct.get_output_srid();
313  } else {
314  col_details.precision = ct.get_precision();
315  col_details.scale = ct.get_scale();
316  }
317  if (col_details.encoding == ColumnEncoding::DICT) {
318  // have to get the actual size of the encoding from the dictionary
319  // definition
320  const int dict_id = ct.get_comp_param();
321  auto dd = catalog->getMetadataForDict(dict_id, false);
322  if (dd) {
323  col_details.comp_param = dd->dictNBits;
324  } else {
325  throw std::runtime_error("Dictionary definition for column doesn't exist");
326  }
327  } else {
328  col_details.comp_param = ct.get_comp_param();
329  if (ct.is_date_in_days() && col_details.comp_param == 0) {
330  col_details.comp_param = 32;
331  }
332  }
333  result.push_back(col_details);
334  }
335  }
336  }
337  return result;
338  }
339 
340  bool setDatabase(std::string& db_name) {
341  auto& sys_cat = Catalog_Namespace::SysCatalog::instance();
342  auto& user = db_handler_->get_session_copy(session_id_).get_currentUser();
343  sys_cat.switchDatabase(db_name, user.userName);
344  return true;
345  }
346 
347  bool login(std::string& db_name, std::string& user_name, const std::string& password) {
348  db_handler_->disconnect(session_id_);
349  db_handler_->connect(session_id_, user_name, password, db_name);
350  return true;
351  }
352 
353  protected:
354  void reset() {
355  if (db_handler_) {
356  db_handler_->disconnect(session_id_);
357  db_handler_->shutdown();
358  }
360  db_handler_.reset();
361 
363  if (is_temp_db_) {
364  boost::filesystem::remove_all(base_path_);
365  }
366  base_path_.clear();
367  }
368 
369  bool catalogExists(const std::string& base_path) {
370  if (!boost::filesystem::exists(base_path)) {
371  return false;
372  }
373  for (auto& subdir : system_folders_) {
374  std::string path = base_path + "/" + subdir;
375  if (!boost::filesystem::exists(path)) {
376  return false;
377  }
378  }
379  return true;
380  }
381 
382  void cleanCatalog(const std::string& base_path) {
383  if (boost::filesystem::exists(base_path)) {
384  for (auto& subdir : system_folders_) {
385  std::string path = base_path + "/" + subdir;
386  if (boost::filesystem::exists(path)) {
387  boost::filesystem::remove_all(path);
388  }
389  }
390  }
391  }
392 
393  std::string createCatalog(const std::string& base_path) {
394  std::string root_dir = base_path;
395  if (base_path.empty()) {
396  boost::system::error_code error;
397  auto tmp_path = boost::filesystem::temp_directory_path(error);
398  if (boost::system::errc::success != error.value()) {
399  std::cerr << error.message() << std::endl;
400  return "";
401  }
402  tmp_path /= "omnidbe_%%%%-%%%%-%%%%";
403  auto uniq_path = boost::filesystem::unique_path(tmp_path, error);
404  if (boost::system::errc::success != error.value()) {
405  std::cerr << error.message() << std::endl;
406  return "";
407  }
408  root_dir = uniq_path.string();
409  is_temp_db_ = true;
410  }
411  if (!boost::filesystem::exists(root_dir)) {
412  if (!boost::filesystem::create_directory(root_dir)) {
413  std::cerr << "Cannot create database directory: " << root_dir << std::endl;
414  return "";
415  }
416  }
417  size_t absent_count = 0;
418  for (auto& sub_dir : system_folders_) {
419  std::string path = root_dir + "/" + sub_dir;
420  if (!boost::filesystem::exists(path)) {
421  if (!boost::filesystem::create_directory(path)) {
422  std::cerr << "Cannot create database subdirectory: " << path << std::endl;
423  return "";
424  }
425  ++absent_count;
426  }
427  }
428  if ((absent_count > 0) && (absent_count < system_folders_.size())) {
429  std::cerr << "Database directory structure is broken: " << root_dir << std::endl;
430  return "";
431  }
432  return root_dir;
433  }
434 
435  private:
436  std::string base_path_;
437  std::string session_id_;
438  std::shared_ptr<DBHandler> db_handler_;
440  std::string udf_filename_;
441 
442  std::vector<std::string> system_folders_ = {shared::kCatalogDirectoryName,
445 };
446 
447 namespace {
449 }
450 
451 std::shared_ptr<DBEngine> DBEngine::create(const std::string& cmd_line) {
452  const std::lock_guard<std::mutex> lock(engine_create_mutex);
453  auto engine = std::make_shared<DBEngineImpl>();
454  if (!engine->init(cmd_line)) {
455  throw std::runtime_error("DBE initialization failed");
456  }
457  return engine;
458 }
459 
463  return (DBEngineImpl*)ptr;
464 }
465 inline const DBEngineImpl* getImpl(const DBEngine* ptr) {
466  return (const DBEngineImpl*)ptr;
467 }
468 
471 void DBEngine::executeDDL(const std::string& query) {
472  DBEngineImpl* engine = getImpl(this);
473  engine->executeDDL(query);
474 }
475 
476 std::shared_ptr<Cursor> DBEngine::executeDML(const std::string& query) {
477  DBEngineImpl* engine = getImpl(this);
478  return engine->executeDML(query);
479 }
480 
481 std::shared_ptr<Cursor> DBEngine::executeRA(const std::string& query) {
482  DBEngineImpl* engine = getImpl(this);
483  return engine->executeRA(query);
484 }
485 
486 void DBEngine::importArrowTable(const std::string& name,
487  std::shared_ptr<arrow::Table>& table,
488  uint64_t fragment_size) {
489  DBEngineImpl* engine = getImpl(this);
490  engine->importArrowTable(name, table, fragment_size);
491 }
492 
493 std::vector<std::string> DBEngine::getTables() {
494  DBEngineImpl* engine = getImpl(this);
495  return engine->getTables();
496 }
497 
498 std::vector<ColumnDetails> DBEngine::getTableDetails(const std::string& table_name) {
499  DBEngineImpl* engine = getImpl(this);
500  return engine->getTableDetails(table_name);
501 }
502 
503 bool DBEngine::setDatabase(std::string& db_name) {
504  DBEngineImpl* engine = getImpl(this);
505  return engine->setDatabase(db_name);
506 }
507 
508 bool DBEngine::login(std::string& db_name,
509  std::string& user_name,
510  const std::string& password) {
511  DBEngineImpl* engine = getImpl(this);
512  return engine->login(db_name, user_name, password);
513 }
514 
517 inline CursorImpl* getImpl(Cursor* ptr) {
518  return (CursorImpl*)ptr;
519 }
520 
521 inline const CursorImpl* getImpl(const Cursor* ptr) {
522  return (const CursorImpl*)ptr;
523 }
524 
528  CursorImpl* cursor = getImpl(this);
529  return cursor->getColCount();
530 }
531 
533  CursorImpl* cursor = getImpl(this);
534  return cursor->getRowCount();
535 }
536 
538  CursorImpl* cursor = getImpl(this);
539  return cursor->getNextRow();
540 }
541 
542 ColumnType Cursor::getColType(uint32_t col_num) {
543  CursorImpl* cursor = getImpl(this);
544  return cursor->getColType(col_num);
545 }
546 
547 std::shared_ptr<arrow::RecordBatch> Cursor::getArrowRecordBatch() {
548  CursorImpl* cursor = getImpl(this);
549  return cursor->getArrowRecordBatch();
550 }
551 } // namespace EmbeddedDatabase
Classes used to wrap parser calls for calcite redirection.
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
Definition: LockMgr.h:271
const std::string kDataDirectoryName
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:340
std::shared_ptr< DBHandler > db_handler_
Definition: DBEngine.cpp:438
std::shared_ptr< CursorImpl > sql_execute_dbe(const TSessionId &session_id, const std::string &query_str, const bool column_format, const int32_t first_n, const int32_t at_most_n)
Definition: DBEngine.cpp:201
SQLTypes
Definition: sqltypes.h:55
std::string tableName
unsigned renderer_vulkan_timeout_ms
std::vector< LeafHostInfo > string_leaves
std::string udf_compiler_path
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size=0)
Definition: DBEngine.cpp:486
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:74
#define LOG(tag)
Definition: Logger.h:285
std::string storageType
#define DEFAULT_MAX_CHUNK_SIZE
bool setDatabase(std::string &db_name)
Definition: DBEngine.cpp:503
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:288
void setArrowTable(std::string name, std::shared_ptr< arrow::Table > table)
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:224
bool init(const std::string &cmd_line)
Definition: DBEngine.cpp:106
bool catalogExists(const std::string &base_path)
Definition: DBEngine.cpp:369
boost::optional< int > parse_command_line(int argc, char const *const *argv, const bool should_init_logging=false)
const std::string kDefaultExportDirName
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:381
void createDBObject(const UserMetadata &user, const std::string &objectName, DBObjectType type, const Catalog_Namespace::Catalog &catalog, int32_t objectId=-1)
void releaseArrowTable(std::string name)
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:66
std::vector< ColumnDetails > getTableDetails(const std::string &table_name)
Definition: DBEngine.cpp:498
ColumnType sqlToColumnType(const SQLTypes &type)
Definition: DBETypes.cpp:111
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:347
EncodingType
Definition: sqltypes.h:230
#define DEFAULT_MAX_ROWS
size_t max_concurrent_render_sessions
Supported runtime functions management and retrieval.
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::shared_ptr< Cursor > executeDML(const std::string &query)
Definition: DBEngine.cpp:476
std::shared_ptr< ResultSet > result_set_
Definition: DBEngine.cpp:92
std::vector< LeafHostInfo > db_leaves
const std::string kDefaultDbName
CursorImpl(std::shared_ptr< ResultSet > result_set, std::vector< std::string > col_names)
Definition: DBEngine.cpp:45
void importArrowTable(const std::string &name, std::shared_ptr< arrow::Table > &table, uint64_t fragment_size)
Definition: DBEngine.cpp:228
std::shared_ptr< arrow::RecordBatch > record_batch_
Definition: DBEngine.cpp:94
std::shared_ptr< CursorImpl > executeRA(const std::string &query)
Definition: DBEngine.cpp:266
std::shared_ptr< arrow::RecordBatch > getArrowRecordBatch()
Definition: DBEngine.cpp:547
std::vector< std::string > col_names_
Definition: DBEngine.cpp:93
std::string createCatalog(const std::string &base_path)
Definition: DBEngine.cpp:393
std::string keyMetainfo
AuthMetadata authMetadata
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
bool g_serialize_temp_tables
Definition: Catalog.cpp:106
#define DEFAULT_PAGE_SIZE
const std::string kRootUsername
std::vector< std::string > getTables()
Definition: DBEngine.cpp:493
static std::shared_ptr< DBEngine > create(const std::string &cmd_line)
Definition: DBEngine.cpp:451
const std::string kDefaultRootPasswd
#define DEFAULT_FRAGMENT_ROWS
void shutdown()
Definition: Logger.cpp:397
Fragmenter_Namespace::FragmenterType fragType
Data_Namespace::MemoryLevel persistenceLevel
std::vector< std::string > udf_compiler_options
ColumnEncoding sqlToColumnEncoding(const EncodingType &type)
Definition: DBETypes.cpp:162
const std::string kCatalogDirectoryName
bool login(std::string &db_name, std::string &user_name, const std::string &password)
Definition: DBEngine.cpp:508
bool g_enable_watchdog false
Definition: Execute.cpp:79
bool g_enable_union
ColumnType getColType(uint32_t col_num)
Definition: DBEngine.cpp:542
DBEngineImpl * getImpl(DBEngine *ptr)
Definition: DBEngine.cpp:462
File_Namespace::DiskCacheConfig disk_cache_config
string name
Definition: setup.in.py:72
void executeDDL(const std::string &query)
Definition: DBEngine.cpp:471
bool g_enable_thrift_logs
Definition: HeavyDB.cpp:289
#define IS_GEO(T)
Definition: sqltypes.h:300
std::vector< std::string > system_folders_
Definition: DBEngine.cpp:442
std::shared_ptr< CursorImpl > executeDML(const std::string &query)
Definition: DBEngine.cpp:262
std::shared_ptr< Cursor > executeRA(const std::string &query)
Definition: DBEngine.cpp:481
void cleanCatalog(const std::string &base_path)
Definition: DBEngine.cpp:382
SystemParameters system_parameters
std::vector< std::string > getTables()
Definition: DBEngine.cpp:270