26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/range/adaptor/map.hpp>
29 #include <boost/version.hpp>
42 #if BOOST_VERSION >= 106600
43 #include <boost/uuid/detail/sha1.hpp>
45 #include <boost/uuid/sha1.hpp>
47 #include <rapidjson/document.h>
48 #include <rapidjson/istreamwrapper.h>
49 #include <rapidjson/ostreamwrapper.h>
50 #include <rapidjson/writer.h>
80 #include "MapDRelease.h"
92 using std::runtime_error;
108 namespace Catalog_Namespace {
134 "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
141 "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
142 "userid integer references mapd_users, state text, image_hash text, update_time "
144 "metadata text, UNIQUE(userid, name) )");
147 "insert into mapd_dashboards (id, name , "
148 "userid, state, image_hash, update_time , "
150 "SELECT viewid , name , userid, view_state, image_hash, update_time, "
152 "from mapd_frontend_views");
153 }
catch (
const std::exception& e) {
163 const std::string& db_name) {
165 db_name +
"_temp_tables.json");
175 std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
176 const std::vector<LeafHostInfo>& string_dict_hosts,
177 std::shared_ptr<Calcite> calcite,
179 : basePath_(basePath)
183 , string_dict_hosts_(string_dict_hosts)
184 , calciteMgr_(calcite)
187 , dcatalogMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
199 ,
dsqliteMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
212 CheckAndExecuteMigrations();
218 createDefaultServersIfNotExists();
221 CheckAndExecuteMigrationsPostBuildMaps();
226 conditionallyInitializeSystemObjects();
238 tableDescIt->second->fragmenter =
nullptr;
239 delete tableDescIt->second;
247 delete columnDescIt->second;
270 std::vector<std::string> cols;
274 if (std::find(cols.begin(), cols.end(), std::string(
"max_chunk_size")) ==
276 string queryString(
"ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
280 if (std::find(cols.begin(), cols.end(), std::string(
"shard_column_id")) ==
282 string queryString(
"ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
286 if (std::find(cols.begin(), cols.end(), std::string(
"shard")) == cols.end()) {
287 string queryString(
"ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
291 if (std::find(cols.begin(), cols.end(), std::string(
"num_shards")) == cols.end()) {
292 string queryString(
"ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
296 if (std::find(cols.begin(), cols.end(), std::string(
"key_metainfo")) == cols.end()) {
297 string queryString(
"ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
300 if (std::find(cols.begin(), cols.end(), std::string(
"userid")) == cols.end()) {
301 string queryString(
"ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
305 if (std::find(cols.begin(), cols.end(), std::string(
"sort_column_id")) ==
308 "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
310 if (std::find(cols.begin(), cols.end(), std::string(
"storage_type")) == cols.end()) {
311 string queryString(
"ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
314 if (std::find(cols.begin(), cols.end(), std::string(
"max_rollback_epochs")) ==
316 string queryString(
"ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
320 if (std::find(cols.begin(), cols.end(), std::string(
"is_system_table")) ==
322 string queryString(
"ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
325 }
catch (std::exception& e) {
337 "select name from sqlite_master WHERE type='table' AND "
338 "name='mapd_version_history'");
341 "CREATE TABLE mapd_version_history(version integer, migration_history text "
345 "select * from mapd_version_history where migration_history = "
346 "'notnull_fixlen_arrays'");
356 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
358 LOG(
INFO) <<
"Updating mapd_columns, legacy fixlen arrays";
360 string queryString(
"UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
363 }
catch (std::exception& e) {
375 "select name from sqlite_master WHERE type='table' AND "
376 "name='mapd_version_history'");
379 "CREATE TABLE mapd_version_history(version integer, migration_history text "
383 "select * from mapd_version_history where migration_history = "
384 "'notnull_geo_columns'");
394 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
396 LOG(
INFO) <<
"Updating mapd_columns, legacy geo columns";
405 }
catch (std::exception& e) {
418 "SELECT name FROM sqlite_master WHERE type='table' AND "
419 "name='mapd_frontend_views'");
427 std::vector<std::string> cols;
431 if (std::find(cols.begin(), cols.end(), std::string(
"image_hash")) == cols.end()) {
434 if (std::find(cols.begin(), cols.end(), std::string(
"update_time")) == cols.end()) {
437 if (std::find(cols.begin(), cols.end(), std::string(
"view_metadata")) == cols.end()) {
440 }
catch (std::exception& e) {
452 "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
453 "integer references mapd_users, "
454 "link text unique, view_state text, update_time timestamp, view_metadata text)");
456 std::vector<std::string> cols;
460 if (std::find(cols.begin(), cols.end(), std::string(
"view_metadata")) == cols.end()) {
463 }
catch (
const std::exception& e) {
477 "SELECT name FROM sqlite_master WHERE type='table' AND "
478 "name='mapd_frontend_views'");
486 "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
487 }
catch (
const std::exception& e) {
507 std::vector<std::string> cols;
511 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
512 LOG(
INFO) <<
"Updating mapd_tables updatePageSize";
517 string queryString(
"ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
521 }
catch (std::exception& e) {
533 std::vector<std::string> cols;
537 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
538 LOG(
INFO) <<
"Updating mapd_columns updateDeletedColumnIndicator";
540 string queryString(
"ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
546 "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
548 }
catch (std::exception& e) {
560 std::vector<std::string> cols;
564 if (std::find(cols.begin(), cols.end(), std::string(
"default_value")) == cols.end()) {
565 LOG(
INFO) <<
"Adding support for default values to mapd_columns";
568 }
catch (std::exception& e) {
570 LOG(
ERROR) <<
"Failed to make metadata update for default values` support";
588 std::vector<std::string> cols;
592 if (std::find(cols.begin(), cols.end(), std::string(
"version_num")) == cols.end()) {
595 string dictQuery(
"SELECT dictid, name from mapd_dictionaries");
598 for (
size_t r = 0; r < numRows; ++r) {
608 int result = rename(oldName.c_str(), newName.c_str());
611 LOG(
INFO) <<
"Dictionary upgrade: successfully renamed " << oldName <<
" to "
614 LOG(
ERROR) <<
"Failed to rename old dictionary directory " << oldName <<
" to "
620 string queryString(
"ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
624 }
catch (std::exception& e) {
636 "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
637 "logical_table_id integer, physical_table_id integer)");
638 }
catch (
const std::exception& e) {
655 const auto physicalTables = physicalTableIt->second;
656 CHECK(!physicalTables.empty());
657 for (
size_t i = 0; i < physicalTables.size(); i++) {
658 int32_t physical_tb_id = physicalTables[i];
660 "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
661 "physical_table_id) VALUES (?1, ?2)",
666 }
catch (std::exception& e) {
678 std::vector<std::string> cols;
682 if (std::find(cols.begin(), cols.end(), std::string(
"refcount")) == cols.end()) {
685 }
catch (std::exception& e) {
698 }
catch (std::exception& e) {
711 "select name from sqlite_master WHERE type='table' AND "
712 "name='mapd_version_history'");
713 static const std::string migration_name{
"rename_legacy_data_wrappers"};
716 "CREATE TABLE mapd_version_history(version integer, migration_history text "
720 "select * from mapd_version_history where migration_history = "
722 migration_name +
"'");
729 LOG(
INFO) <<
"Executing " << migration_name <<
" migration.";
734 std::map<std::string, std::string> old_to_new_wrapper_names{
735 {
"OMNISCI_CSV", DataWrapperType::CSV},
736 {
"OMNISCI_PARQUET", DataWrapperType::PARQUET},
737 {
"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
738 {
"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
739 {
"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
740 {
"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
744 for (
const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
746 "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
747 "data_wrapper_type = ?",
748 std::vector<std::string>{new_wrapper_name, old_wrapper_name});
753 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
755 LOG(
INFO) << migration_name <<
" migration completed.";
756 }
catch (std::exception& e) {
768 }
catch (
const std::exception& e) {
776 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
777 "omnisci_foreign_servers(id integer primary key, name text unique, " +
778 "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
783 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
784 "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
785 "options text, last_refresh_time integer, next_refresh_time integer, " +
786 "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
787 "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
791 return "CREATE TABLE " + (if_not_exists ? std::string{
"IF NOT EXISTS "} :
"") +
792 "omnisci_custom_expressions(id integer primary key, name text, " +
793 "expression_json text, data_source_type text, " +
794 "data_source_id integer, is_deleted boolean)";
800 std::vector<DBObject> objects;
803 "SELECT name FROM sqlite_master WHERE type='table' AND "
804 "name='mapd_record_ownership_marker'");
831 "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
834 static const std::map<const DBObjectType, const AccessPrivileges>
835 object_level_all_privs_lookup{
845 auto _key_place = [&key](
auto type) {
849 for (
auto& it : object_level_all_privs_lookup) {
850 objects.emplace_back(_key_place(it.first), it.second, db.
dbOwner);
857 "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
860 for (
size_t r = 0; r < numRows; ++r) {
879 objects.push_back(obj);
885 string tableQuery(
"SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
888 for (
size_t r = 0; r < numRows; ++r) {
904 objects.push_back(obj);
907 }
catch (
const std::exception& e) {
917 }
catch (
const std::exception& e) {
918 LOG(
ERROR) <<
" Issue during migration of DB " <<
name() <<
" issue was " << e.what();
919 throw std::runtime_error(
" Issue during migration of DB " +
name() +
" issue was " +
937 std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
938 std::vector<std::string> dashboard_ids;
939 static const std::string migration_name{
"dashboard_roles_migration"};
947 "select * from mapd_version_history where migration_history = '" +
948 migration_name +
"'");
954 LOG(
INFO) <<
"Performing dashboard internal roles Migration.";
968 }
catch (
const std::exception& e) {
975 const auto active_grantees =
980 for (
auto dash : dashboards) {
985 auto result = active_grantees.find(dash.first);
986 if (
result != active_grantees.end()) {
996 "select * from mapd_version_history where migration_history = '" +
997 migration_name +
"'");
1002 "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
1004 }
catch (
const std::exception& e) {
1005 LOG(
ERROR) <<
"Failed to create dashboard system roles during migration: "
1009 LOG(
INFO) <<
"Successfully created dashboard system roles during migration.";
1044 std::map<int32_t, std::string> user_name_by_user_id;
1045 for (
const auto& user : users) {
1046 user_name_by_user_id[user.userId] = user.userName;
1048 return user_name_by_user_id;
1053 const std::map<int32_t, std::string>& user_name_by_user_id) {
1054 auto entry = user_name_by_user_id.find(
id);
1055 if (entry != user_name_by_user_id.end()) {
1056 return entry->second;
1065 if (column_type.is_dict_encoded_string() ||
1066 column_type.is_subtype_dict_encoded_string()) {
1073 std::string dictQuery(
1074 "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1077 for (
size_t r = 0; r < numRows; ++r) {
1087 dict_ref, dictName, dictNBits, is_shared, refcount, fname,
false);
1108 std::list<ColumnDescriptor*> original_cds;
1111 original_td = it1->second;
1112 if (dynamic_cast<foreign_storage::ForeignTable*>(original_td)) {
1123 CHECK_EQ(original_td, it2->second);
1130 for (
int column_id = 0; column_id < original_td->
nColumns; ++column_id) {
1134 original_cds.push_back(original_cd);
1142 td = createTableFromDiskUnlocked(table_id);
1143 }
catch (
const NoTableFoundException& e) {
1148 if (
auto tableDescIt = tableDescriptorMapById_.find(table_id);
1149 tableDescIt != tableDescriptorMapById_.end()) {
1150 tableDescIt->second->fragmenter =
nullptr;
1151 delete tableDescIt->second;
1155 auto cds = sqliteGetColumnsForTableUnlocked(table_id);
1159 td->mutex_ = original_td->mutex_;
1161 original_td =
nullptr;
1166 original_cds.clear();
1167 tableDescriptorMap_[
to_upper(td->tableName)] = td;
1168 tableDescriptorMapById_[td->tableId] = td;
1169 int32_t skip_physical_cols = 0;
1173 if (skip_physical_cols <= 0) {
1174 skip_physical_cols = cd->columnType.get_physical_cols();
1177 if (cd->isDeletedCol) {
1178 td->hasDeletedCol =
true;
1179 setDeletedColumnUnlocked(td, cd);
1180 }
else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1181 td->columnIdBySpi_.push_back(cd->columnId);
1186 calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
1190 void Catalog::reloadCatalogMetadata(
1191 const std::map<int32_t, std::string>& user_name_by_user_id) {
1198 void Catalog::reloadCatalogMetadataUnlocked(
1199 const std::map<int32_t, std::string>& user_name_by_user_id) {
1209 std::set<int> cluster_table_ids;
1210 std::string tableQuery(
"SELECT tableid from mapd_tables");
1211 sqliteConnector_.query(tableQuery);
1212 auto numRows = sqliteConnector_.getNumRows();
1213 for (
size_t r = 0; r < numRows; ++r) {
1214 const auto table_id = sqliteConnector_.getData<
int>(r, 0);
1215 cluster_table_ids.insert(table_id);
1220 std::set<int> ignored_table_ids;
1233 std::set<int> reload_table_ids;
1234 for (
auto const& cluster_table_id : cluster_table_ids) {
1235 if (ignored_table_ids.find(cluster_table_id) == ignored_table_ids.end()) {
1236 reload_table_ids.insert(cluster_table_id);
1239 for (
auto const& [cached_table_id, td] : tableDescriptorMapById_) {
1240 if (cluster_table_ids.find(cached_table_id) == cluster_table_ids.end()) {
1241 reload_table_ids.insert(cached_table_id);
1246 for (
auto const& reload_table_id : reload_table_ids) {
1247 reloadTableMetadataUnlocked(reload_table_id);
1252 dashboardDescriptorMap_.clear();
1253 linkDescriptorMap_.clear();
1254 linkDescriptorMapById_.clear();
1255 foreignServerMap_.clear();
1256 foreignServerMapById_.clear();
1257 custom_expr_map_by_id_.clear();
1260 buildForeignServerMapUnlocked();
1263 updateViewsInMapUnlocked();
1264 buildDashboardsMapUnlocked(user_name_by_user_id);
1265 buildLinksMapUnlocked();
1266 buildCustomExpressionsMapUnlocked();
1270 calciteMgr_->updateMetadata(currentDB_.dbName, {});
1274 void Catalog::buildTablesMapUnlocked() {
1275 std::string tableQuery(
1276 "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1277 "max_chunk_size, frag_page_size, "
1278 "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1279 "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1280 "from mapd_tables");
1281 sqliteConnector_.query(tableQuery);
1282 auto numRows = sqliteConnector_.getNumRows();
1283 for (
size_t r = 0; r < numRows; ++r) {
1285 const auto& storage_type = sqliteConnector_.getData<
string>(r, 17);
1287 const auto table_id = sqliteConnector_.getData<
int>(r, 0);
1288 const auto& table_name = sqliteConnector_.getData<
string>(r, 1);
1289 LOG(
FATAL) <<
"Unable to read Catalog metadata: storage type is currently not a "
1290 "supported table option (table "
1291 << table_name <<
" [" << table_id <<
"] in database "
1292 << currentDB_.dbName <<
").";
1302 td->
tableId = sqliteConnector_.getData<
int>(r, 0);
1303 td->
tableName = sqliteConnector_.getData<
string>(r, 1);
1304 td->
nColumns = sqliteConnector_.getData<
int>(r, 2);
1305 td->
isView = sqliteConnector_.getData<
bool>(r, 3);
1306 td->
fragments = sqliteConnector_.getData<
string>(r, 4);
1309 td->
maxFragRows = sqliteConnector_.getData<
int>(r, 6);
1310 td->
maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1311 td->
fragPageSize = sqliteConnector_.getData<
int>(r, 8);
1312 td->
maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1313 td->
partitions = sqliteConnector_.getData<
string>(r, 10);
1315 td->
shard = sqliteConnector_.getData<
int>(r, 12);
1316 td->
nShards = sqliteConnector_.getData<
int>(r, 13);
1317 td->
keyMetainfo = sqliteConnector_.getData<
string>(r, 14);
1318 td->
userId = sqliteConnector_.getData<
int>(r, 15);
1320 sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<
int>(r, 16);
1329 tableDescriptorMapById_[td->
tableId] = td;
1333 void Catalog::buildColumnsMapUnlocked() {
1334 std::string columnQuery(
1335 "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1336 "is_notnull, compression, comp_param, "
1337 "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1338 "default_value from "
1339 "mapd_columns ORDER BY tableid, "
1341 sqliteConnector_.query(columnQuery);
1342 auto numRows = sqliteConnector_.getNumRows();
1343 int32_t skip_physical_cols = 0;
1344 for (
size_t r = 0; r < numRows; ++r) {
1346 cd->
tableId = sqliteConnector_.getData<
int>(r, 0);
1347 cd->
columnId = sqliteConnector_.getData<
int>(r, 1);
1348 cd->
columnName = sqliteConnector_.getData<
string>(r, 2);
1357 cd->
chunks = sqliteConnector_.getData<
string>(r, 11);
1358 cd->
isSystemCol = sqliteConnector_.getData<
bool>(r, 12);
1359 cd->
isVirtualCol = sqliteConnector_.getData<
bool>(r, 13);
1360 cd->
virtualExpr = sqliteConnector_.getData<
string>(r, 14);
1361 cd->
isDeletedCol = sqliteConnector_.getData<
bool>(r, 15);
1362 if (sqliteConnector_.isNull(r, 16)) {
1365 cd->
default_value = std::make_optional(sqliteConnector_.getData<
string>(r, 16));
1368 cd->
db_id = getDatabaseId();
1372 if (skip_physical_cols <= 0) {
1376 auto td_itr = tableDescriptorMapById_.find(cd->
tableId);
1377 CHECK(td_itr != tableDescriptorMapById_.end());
1380 td_itr->second->hasDeletedCol =
true;
1381 setDeletedColumnUnlocked(td_itr->second, cd);
1383 tableDescriptorMapById_[cd->
tableId]->columnIdBySpi_.push_back(cd->
columnId);
1388 for (
auto& tit : tableDescriptorMapById_) {
1389 std::sort(tit.second->columnIdBySpi_.begin(),
1390 tit.second->columnIdBySpi_.end(),
1391 [](
const size_t a,
const size_t b) ->
bool {
return a < b; });
1396 std::string viewQuery(
"SELECT sql FROM mapd_views where tableid = " +
1398 sqliteConnector_.query(viewQuery);
1399 auto num_rows = sqliteConnector_.getNumRows();
1400 CHECK_EQ(num_rows, 1U) <<
"Expected single entry in mapd_views for view '"
1401 << td.
tableName <<
"', instead got " << num_rows;
1402 td.
viewSQL = sqliteConnector_.getData<
string>(0, 0);
1405 void Catalog::updateViewsInMapUnlocked() {
1406 std::string viewQuery(
"SELECT tableid, sql FROM mapd_views");
1407 sqliteConnector_.query(viewQuery);
1408 auto numRows = sqliteConnector_.getNumRows();
1409 for (
size_t r = 0; r < numRows; ++r) {
1410 auto tableId = sqliteConnector_.getData<
int>(r, 0);
1411 auto td = tableDescriptorMapById_[tableId];
1412 td->viewSQL = sqliteConnector_.getData<
string>(r, 1);
1413 td->fragmenter =
nullptr;
1417 void Catalog::buildDashboardsMapUnlocked(
1418 const std::map<int32_t, std::string>& user_name_by_user_id) {
1419 std::string frontendViewQuery(
1420 "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1423 "FROM mapd_dashboards");
1424 sqliteConnector_.query(frontendViewQuery);
1425 auto numRows = sqliteConnector_.getNumRows();
1426 for (
size_t r = 0; r < numRows; ++r) {
1427 auto vd = std::make_shared<DashboardDescriptor>();
1428 vd->dashboardId = sqliteConnector_.getData<
int>(r, 0);
1429 vd->dashboardState = sqliteConnector_.getData<
string>(r, 1);
1430 vd->dashboardName = sqliteConnector_.getData<
string>(r, 2);
1431 vd->imageHash = sqliteConnector_.getData<
string>(r, 3);
1432 vd->updateTime = sqliteConnector_.getData<
string>(r, 4);
1433 vd->userId = sqliteConnector_.getData<
int>(r, 5);
1434 vd->dashboardMetadata = sqliteConnector_.getData<
string>(r, 6);
1437 std::to_string(currentDB_.dbId), sqliteConnector_.getData<
string>(r, 0));
1438 dashboardDescriptorMap_[
std::to_string(vd->userId) +
":" + vd->dashboardName] = vd;
1442 void Catalog::buildLinksMapUnlocked() {
1443 std::string linkQuery(
1444 "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1445 "update_time), view_metadata "
1447 sqliteConnector_.query(linkQuery);
1448 auto numRows = sqliteConnector_.getNumRows();
1449 for (
size_t r = 0; r < numRows; ++r) {
1451 ld->linkId = sqliteConnector_.getData<
int>(r, 0);
1452 ld->userId = sqliteConnector_.getData<
int>(r, 1);
1453 ld->link = sqliteConnector_.getData<
string>(r, 2);
1454 ld->viewState = sqliteConnector_.getData<
string>(r, 3);
1455 ld->updateTime = sqliteConnector_.getData<
string>(r, 4);
1456 ld->viewMetadata = sqliteConnector_.getData<
string>(r, 5);
1457 linkDescriptorMap_[
std::to_string(currentDB_.dbId) + ld->link] = ld;
1458 linkDescriptorMapById_[ld->linkId] = ld;
1462 void Catalog::buildLogicalToPhysicalMapUnlocked() {
1464 std::string logicalToPhysicalTableMapQuery(
1465 "SELECT logical_table_id, physical_table_id "
1466 "FROM mapd_logical_to_physical");
1467 sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1468 auto numRows = sqliteConnector_.getNumRows();
1469 for (
size_t r = 0; r < numRows; ++r) {
1470 auto logical_tb_id = sqliteConnector_.getData<
int>(r, 0);
1471 auto physical_tb_id = sqliteConnector_.getData<
int>(r, 1);
1472 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1473 if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1475 std::vector<int32_t> physicalTables{physical_tb_id};
1477 logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1478 CHECK(it_ok.second);
1481 physicalTableIt->second.push_back(physical_tb_id);
1489 void Catalog::buildMaps() {
1498 buildDictionaryMapUnlocked();
1499 buildTablesMapUnlocked();
1502 buildForeignServerMapUnlocked();
1503 updateForeignTablesInMapUnlocked();
1506 buildColumnsMapUnlocked();
1507 updateViewsInMapUnlocked();
1508 buildDashboardsMapUnlocked(user_name_by_user_id);
1509 buildLinksMapUnlocked();
1510 buildLogicalToPhysicalMapUnlocked();
1511 buildCustomExpressionsMapUnlocked();
1514 void Catalog::buildCustomExpressionsMapUnlocked() {
1515 sqliteConnector_.query(
1516 "SELECT id, name, expression_json, data_source_type, data_source_id, "
1518 "FROM omnisci_custom_expressions");
1519 auto num_rows = sqliteConnector_.getNumRows();
1520 for (
size_t row = 0; row < num_rows; row++) {
1521 auto custom_expr = getCustomExpressionFromConnector(row);
1522 custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1526 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(
size_t row) {
1527 auto id = sqliteConnector_.getData<
int>(row, 0);
1528 auto name = sqliteConnector_.getData<
string>(row, 1);
1529 auto expression_json = sqliteConnector_.getData<
string>(row, 2);
1530 auto data_source_type_str = sqliteConnector_.getData<
string>(row, 3);
1531 auto data_source_id = sqliteConnector_.getData<
int>(row, 4);
1532 auto is_deleted = sqliteConnector_.getData<
bool>(row, 5);
1533 return std::make_unique<CustomExpression>(
1537 CustomExpression::dataSourceTypeFromString(data_source_type_str),
1543 const list<ColumnDescriptor>& columns,
1544 const list<DictDescriptor>& dicts) {
1549 if (foreign_table) {
1551 *new_foreign_table = *foreign_table;
1552 new_td = new_foreign_table;
1558 new_td->
mutex_ = std::make_shared<std::mutex>();
1560 tableDescriptorMapById_[td->
tableId] = new_td;
1561 for (
auto cd : columns) {
1564 addToColumnMap(new_cd);
1567 if (cd.isDeletedCol) {
1569 setDeletedColumnUnlocked(new_td, new_cd);
1575 [](
const size_t a,
const size_t b) ->
bool {
return a < b; });
1579 std::unique_ptr<StringDictionaryClient> client;
1580 DictRef dict_ref(currentDB_.dbId, -1);
1581 if (!string_dict_hosts_.empty()) {
1584 for (
auto dd : dicts) {
1585 if (!dd.dictRef.dictId) {
1589 dict_ref.
dictId = dd.dictRef.dictId;
1591 client->create(dict_ref, dd.dictIsTemp);
1594 dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1595 if (!dd.dictIsTemp) {
1601 void Catalog::removeTableFromMap(
const string& tableName,
1603 const bool is_on_error) {
1605 TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1606 if (tableDescIt == tableDescriptorMapById_.end()) {
1613 const auto ret = deletedColumnPerTable_.erase(td);
1617 tableDescriptorMapById_.erase(tableDescIt);
1618 tableDescriptorMap_.erase(
to_upper(tableName));
1620 dict_columns_by_table_id_.erase(tableId);
1625 std::unique_ptr<StringDictionaryClient> client;
1626 if (SysCatalog::instance().isAggregator()) {
1627 CHECK(!string_dict_hosts_.empty());
1628 DictRef dict_ref(currentDB_.dbId, -1);
1634 for (
auto cit = columnDescriptorMapById_.begin();
1635 cit != columnDescriptorMapById_.end();) {
1636 if (tableId != std::get<0>(cit->first)) {
1639 int i = std::get<1>(cit++->first);
1641 ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1643 columnDescriptorMapById_.erase(colDescIt);
1645 columnDescriptorMap_.erase(cnameKey);
1651 DictRef dict_ref(currentDB_.dbId, dictId);
1652 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1657 CHECK(dictIt != dictDescriptorMapByRef_.end());
1659 if (dictIt == dictDescriptorMapByRef_.end()) {
1663 const auto& dd = dictIt->second;
1666 if (!dd->refcount) {
1667 dd->stringDict.reset();
1672 client->drop(dict_ref);
1674 dictDescriptorMapByRef_.erase(dictIt);
1685 addFrontendViewToMapNoLock(vd);
1691 std::make_shared<DashboardDescriptor>(vd);
1694 std::vector<DBObject> Catalog::parseDashboardObjects(
const std::string& view_meta,
1695 const int& user_id) {
1696 std::vector<DBObject> objects;
1698 key.
dbId = currentDB_.dbId;
1699 auto _key_place = [&key](
auto type,
auto id) {
1705 auto td = getMetadataForTable(object_name,
false);
1709 LOG(
INFO) <<
"Ignoring dashboard source Table/View: " << object_name
1710 <<
" no longer exists in current DB.";
1717 objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1719 objects.back().setName(td->tableName);
1724 void Catalog::createOrUpdateDashboardSystemRole(
const std::string& view_meta,
1725 const int32_t& user_id,
1726 const std::string& dash_role_name) {
1727 auto objects = parseDashboardObjects(view_meta, user_id);
1728 Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1733 SysCatalog::instance().createRole(
1734 dash_role_name,
false,
false);
1735 SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *
this);
1739 std::set<DBObjectKey> revoke_keys;
1741 for (
auto key : *ex_objects | boost::adaptors::map_keys) {
1747 for (
auto obj : objects) {
1748 found = key == obj.getObjectKey() ?
true :
false;
1754 revoke_keys.insert(key);
1757 for (
auto& key : revoke_keys) {
1761 SysCatalog::instance().revokeDBObjectPrivileges(
1766 SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *
this);
1775 linkDescriptorMapById_[ld.
linkId] = new_ld;
1783 vector<Chunk> chunkVec;
1784 auto columnDescs = getAllColumnMetadataForTable(td->
tableId,
true,
false,
true);
1785 Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1788 td->
fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1800 td->
fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1814 LOG(
INFO) <<
"Instantiating Fragmenter for table " << td->
tableName <<
" took "
1819 const std::string& tableName)
const {
1820 auto tableDescIt = tableDescriptorMap_.find(
to_upper(tableName));
1821 if (tableDescIt == tableDescriptorMap_.end()) {
1828 const std::string& tableName)
const {
1830 return getForeignTableUnlocked(tableName);
1833 const TableDescriptor* Catalog::getMetadataForTable(
const string& tableName,
1834 const bool populateFragmenter)
const {
1838 auto td = getMutableMetadataForTableUnlocked(tableName);
1843 if (populateFragmenter) {
1844 std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1845 if (td->fragmenter ==
nullptr && !td->isView) {
1846 instantiateFragmenter(td);
1853 bool populateFragmenter)
const {
1855 auto td = getMutableMetadataForTableUnlocked(table_id);
1860 if (populateFragmenter) {
1861 std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1862 if (td->fragmenter ==
nullptr && !td->isView) {
1863 instantiateFragmenter(td);
1869 std::optional<std::string> Catalog::getTableName(int32_t table_id)
const {
1871 auto td = getMutableMetadataForTableUnlocked(table_id);
1875 return td->tableName;
1878 std::optional<int32_t> Catalog::getTableId(
const std::string& table_name)
const {
1880 auto td = getMutableMetadataForTableUnlocked(table_name);
1888 const std::string& table_name)
const {
1889 auto it = tableDescriptorMap_.find(
to_upper(table_name));
1890 if (it == tableDescriptorMap_.end()) {
1897 auto tableDescIt = tableDescriptorMapById_.find(table_id);
1898 if (tableDescIt == tableDescriptorMapById_.end()) {
1901 return tableDescIt->second;
1905 const bool load_dict)
const {
1907 const DictRef dictRef(currentDB_.dbId, dict_id);
1908 auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1910 dictDescriptorMapByRef_.end()) {
1913 auto& dd = dictDescIt->second;
1917 if (!dd->stringDict) {
1919 if (string_dict_hosts_.empty()) {
1920 if (dd->dictIsTemp) {
1921 dd->stringDict = std::make_shared<StringDictionary>(
1924 dd->stringDict = std::make_shared<StringDictionary>(
1929 std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1932 LOG(
INFO) <<
"Time to load Dictionary " << dd->dictRef.dbId <<
"_"
1933 << dd->dictRef.dictId <<
" was " << time_ms <<
"ms";
1940 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts()
const {
1941 return string_dict_hosts_;
1945 const string& columnName)
const {
1949 auto colDescIt = columnDescriptorMap_.find(columnKey);
1951 columnDescriptorMap_.end()) {
1954 return colDescIt->second;
1960 auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
1961 if (colDescIt == columnDescriptorMapById_
1965 return colDescIt->second;
1968 const std::optional<std::string> Catalog::getColumnName(
int table_id,
1969 int column_id)
const {
1971 auto it = columnDescriptorMapById_.find(
ColumnIdKey{table_id, column_id});
1972 if (it == columnDescriptorMapById_.end()) {
1975 return it->second->columnName;
1978 const int Catalog::getColumnIdBySpiUnlocked(
const int table_id,
const size_t spi)
const {
1979 const auto tabDescIt = tableDescriptorMapById_.find(table_id);
1980 CHECK(tableDescriptorMapById_.end() != tabDescIt);
1981 const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
1991 CHECK(0 < spx && spx <= columnIdBySpi.size())
1992 <<
"spx = " << spx <<
", size = " << columnIdBySpi.size();
1993 return columnIdBySpi[spx - 1] + phi;
1996 const int Catalog::getColumnIdBySpi(
const int table_id,
const size_t spi)
const {
1998 return getColumnIdBySpiUnlocked(table_id, spi);
2002 const size_t spi)
const {
2005 const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
2007 const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2008 return columnDescriptorMapById_.end() == colDescIt ?
nullptr : colDescIt->second;
2011 void Catalog::deleteMetadataForDashboards(
const std::vector<int32_t> dashboard_ids,
2013 std::stringstream invalid_ids, restricted_ids;
2015 for (int32_t dashboard_id : dashboard_ids) {
2016 if (!getMetadataForDashboard(dashboard_id)) {
2017 invalid_ids << (!invalid_ids.str().empty() ?
", " :
"") << dashboard_id;
2021 object.loadKey(*
this);
2023 std::vector<DBObject> privs = {
object};
2024 if (!SysCatalog::instance().checkPrivileges(user, privs)) {
2025 restricted_ids << (!restricted_ids.str().empty() ?
", " :
"") << dashboard_id;
2029 if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
2030 std::stringstream error_message;
2031 error_message <<
"Delete dashboard(s) failed with error(s):";
2032 if (invalid_ids.str().size() > 0) {
2033 error_message <<
"\nDashboard id: " << invalid_ids.str()
2034 <<
" - Dashboard id does not exist";
2036 if (restricted_ids.str().size() > 0) {
2038 <<
"\nDashboard id: " << restricted_ids.str()
2039 <<
" - User should be either owner of dashboard or super user to delete it";
2041 throw std::runtime_error(error_message.str());
2043 std::vector<DBObject> dash_objs;
2045 for (int32_t dashboard_id : dashboard_ids) {
2049 SysCatalog::instance().revokeDBObjectPrivilegesFromAllBatch(dash_objs,
this);
2054 sqliteConnector_.query(
"BEGIN TRANSACTION");
2056 for (int32_t dashboard_id : dashboard_ids) {
2057 auto dash = getMetadataForDashboard(dashboard_id);
2061 throw std::runtime_error(
2062 std::string(
"Delete dashboard(s) failed with error(s):\nDashboard id: ") +
2063 std::to_string(dashboard_id) +
" - Dashboard id does not exist ");
2066 std::string dash_name = dash->dashboardName;
2067 auto viewDescIt = dashboardDescriptorMap_.find(user_id +
":" + dash_name);
2068 dashboardDescriptorMap_.erase(viewDescIt);
2069 sqliteConnector_.query_with_text_params(
2070 "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
2071 std::vector<std::string>{dash_name, user_id});
2073 }
catch (std::exception& e) {
2074 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2077 sqliteConnector_.query(
"END TRANSACTION");
2082 const string& userId,
2083 const string& dashName)
const {
2086 auto viewDescIt = dashboardDescriptorMap_.find(userId +
":" + dashName);
2087 if (viewDescIt == dashboardDescriptorMap_.end()) {
2090 return viewDescIt->second.get();
2099 for (
auto descp : dashboardDescriptorMap_) {
2100 auto dash = descp.second.get();
2101 if (dash->dashboardId ==
id) {
2103 name = dash->dashboardName;
2110 return getMetadataForDashboard(userId, name);
2115 const LinkDescriptor* Catalog::getMetadataForLink(
const string& link)
const {
2117 auto linkDescIt = linkDescriptorMap_.find(link);
2118 if (linkDescIt == linkDescriptorMap_.end()) {
2121 return linkDescIt->second;
2126 auto linkDescIt = linkDescriptorMapById_.find(linkId);
2127 if (linkDescIt == linkDescriptorMapById_.end()) {
2130 return linkDescIt->second;
2135 const auto table = getMutableMetadataForTableUnlocked(table_id);
2138 CHECK(foreign_table);
2139 return foreign_table;
2142 void Catalog::getAllColumnMetadataForTableImpl(
2144 list<const ColumnDescriptor*>& columnDescriptors,
2145 const bool fetchSystemColumns,
2146 const bool fetchVirtualColumns,
2147 const bool fetchPhysicalColumns)
const {
2148 int32_t skip_physical_cols = 0;
2149 for (
const auto& columnDescriptor : columnDescriptorMapById_) {
2150 if (!fetchPhysicalColumns && skip_physical_cols > 0) {
2151 --skip_physical_cols;
2154 auto cd = columnDescriptor.second;
2155 if (cd->tableId != td->
tableId) {
2158 if (!fetchSystemColumns && cd->isSystemCol) {
2161 if (!fetchVirtualColumns && cd->isVirtualCol) {
2164 if (!fetchPhysicalColumns) {
2165 const auto& col_ti = cd->columnType;
2166 skip_physical_cols = col_ti.get_physical_cols();
2168 columnDescriptors.push_back(cd);
2172 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
2174 const bool fetchSystemColumns,
2175 const bool fetchVirtualColumns,
2176 const bool fetchPhysicalColumns)
const {
2178 std::list<const ColumnDescriptor*> columnDescriptors;
2179 const TableDescriptor* td = getMutableMetadataForTableUnlocked(tableId);
2180 getAllColumnMetadataForTableImpl(td,
2183 fetchVirtualColumns,
2184 fetchPhysicalColumns);
2185 return columnDescriptors;
2188 list<const TableDescriptor*> Catalog::getAllTableMetadata()
const {
2190 list<const TableDescriptor*> table_list;
2191 for (
auto p : tableDescriptorMapById_) {
2192 table_list.push_back(p.second);
2197 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy()
const {
2199 std::vector<TableDescriptor>
tables;
2200 tables.reserve(tableDescriptorMapById_.size());
2201 for (
auto table_entry : tableDescriptorMapById_) {
2202 tables.emplace_back(*table_entry.second);
2203 tables.back().fragmenter =
nullptr;
2208 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata()
const {
2210 list<const DashboardDescriptor*> dashboards;
2211 for (
auto dashboard_entry : dashboardDescriptorMap_) {
2212 dashboards.push_back(dashboard_entry.second.get());
2217 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataCopy()
const {
2219 std::vector<DashboardDescriptor> dashboards;
2220 dashboards.reserve(dashboardDescriptorMap_.size());
2221 for (
auto dashboard_entry : dashboardDescriptorMap_) {
2222 dashboards.emplace_back(*dashboard_entry.second);
2231 sqliteConnector_.query(
"BEGIN TRANSACTION");
2233 ref = addDictionaryNontransactional(cd);
2234 }
catch (std::exception& e) {
2235 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2238 sqliteConnector_.query(
"END TRANSACTION");
2244 const auto& td = *tableDescriptorMapById_[cd.
tableId];
2245 list<DictDescriptor> dds;
2246 setColumnDictionary(cd, dds, td,
true);
2247 auto& dd = dds.back();
2248 CHECK(dd.dictRef.dictId);
2250 std::unique_ptr<StringDictionaryClient> client;
2251 if (!string_dict_hosts_.empty()) {
2253 string_dict_hosts_.front(),
DictRef(currentDB_.dbId, -1),
true));
2256 client->create(dd.dictRef, dd.dictIsTemp);
2260 dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2261 if (!dd.dictIsTemp) {
2270 sqliteConnector_.query(
"BEGIN TRANSACTION");
2272 delDictionaryNontransactional(cd);
2273 }
catch (std::exception& e) {
2274 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2277 sqliteConnector_.query(
"END TRANSACTION");
2292 const auto td = getMetadataForTable(cd.
tableId,
false);
2294 sqliteConnector_.query_with_text_param(
2295 "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2297 sqliteConnector_.query_with_text_param(
2298 "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?",
std::to_string(dictId));
2299 const auto refcount = sqliteConnector_.getData<
int>(0, 0);
2300 VLOG(3) <<
"Dictionary " << dictId <<
"from dropped table has reference count "
2305 const DictRef dictRef(currentDB_.dbId, dictId);
2306 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_dictionaries WHERE dictid = ?",
2312 std::unique_ptr<StringDictionaryClient> client;
2313 if (!string_dict_hosts_.empty()) {
2317 client->drop(dictRef);
2320 dictDescriptorMapByRef_.erase(dictRef);
2323 std::list<const DictDescriptor*> Catalog::getAllDictionariesWithColumnInName(
2326 std::list<const DictDescriptor*> dds;
2328 auto table_name_opt = getTableName(cd->
tableId);
2329 CHECK(table_name_opt.has_value());
2330 auto table_name = table_name_opt.value();
2332 for (
const auto& [dkey, dd] : dictDescriptorMapByRef_) {
2333 if (dd->dictName.find(table_name +
"_" + cd->
columnName +
"_dict") !=
2334 std::string::npos) {
2335 dds.push_back(dd.get());
2343 std::map<int, StringDictionary*>& stringDicts) {
2346 CHECK(cit != columnDescriptorMap_.end());
2347 auto& ccd = *cit->second;
2349 if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2355 if (!(ccd.columnType.get_comp_param() > 0)) {
2359 auto dictId = ccd.columnType.get_comp_param();
2360 getMetadataForDict(dictId);
2362 const DictRef dictRef(currentDB_.dbId, dictId);
2363 auto dit = dictDescriptorMapByRef_.find(dictRef);
2364 CHECK(dit != dictDescriptorMapByRef_.end());
2366 CHECK(dit->second.get()->stringDict);
2367 stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2370 size_t Catalog::getTotalMemorySizeForDictionariesForDatabase()
const {
2372 for (
auto const& kv : dictDescriptorMapByRef_) {
2373 if (kv.first.dbId == currentDB_.dbId) {
2374 auto dictionary = kv.second.get()->stringDict.get();
2376 ret += dictionary->computeCacheSize();
2387 sqliteConnector_.query(
"BEGIN TRANSACTION");
2389 const auto table_id = cd.
tableId;
2391 auto catalog_cd = getMetadataForColumn(table_id, cd.
columnId);
2393 CHECK(catalog_cd) <<
" can not alter non existing column";
2396 std::vector<BindType> types(11, BindType::TEXT);
2398 types[8] = BindType::NULL_TYPE;
2400 sqliteConnector_.query_with_text_params(
2401 "UPDATE mapd_columns SET "
2410 "default_value = ? "
2411 "WHERE tableid = ? and columnid = ?",
2428 ColumnDescriptorMap::iterator columnDescIt =
2430 CHECK(columnDescIt != columnDescriptorMap_.end());
2431 auto ocd = columnDescIt->second;
2433 updateInColumnMap(ncd, ocd);
2434 }
catch (std::exception& e) {
2435 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2438 sqliteConnector_.query(
"END TRANSACTION");
2443 sqliteConnector_.query_with_text_params(
2444 "SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?",
2446 return sqliteConnector_.getData<
int>(0, 0);
2452 sqliteConnector_.query(
"BEGIN TRANSACTION");
2454 addColumnNontransactional(td, cd);
2455 }
catch (std::exception& e) {
2456 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2459 sqliteConnector_.query(
"END TRANSACTION");
2466 cd.
db_id = getDatabaseId();
2468 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2470 addColumnNontransactional(*shard, shard_cd);
2474 addDictionaryNontransactional(cd);
2478 std::vector<BindType> types(17, BindType::TEXT);
2480 types[16] = BindType::NULL_TYPE;
2482 sqliteConnector_.query_with_text_params(
2483 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2484 "colscale, is_notnull, "
2485 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2486 "is_deletedcol, default_value) "
2488 "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2491 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2511 sqliteConnector_.query_with_text_params(
2512 "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2515 sqliteConnector_.query_with_text_params(
2516 "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2518 cd.
columnId = sqliteConnector_.getData<
int>(0, 0);
2520 ++tableDescriptorMapById_[td.
tableId]->nColumns;
2522 addToColumnMap(ncd);
2523 addColumnDescriptor(ncd);
2524 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
2531 cd.
db_id = getDatabaseId();
2533 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2535 addColumn(*shard, shard_cd);
2539 addDictionaryNontransactional(cd);
2543 std::vector<BindType> types(17, BindType::TEXT);
2545 types[16] = BindType::NULL_TYPE;
2547 sqliteConnector_.query_with_text_params(
2548 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2549 "colscale, is_notnull, "
2550 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2551 "is_deletedcol, default_value) "
2553 "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2556 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2576 sqliteConnector_.query_with_text_params(
2577 "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2580 sqliteConnector_.query_with_text_params(
2581 "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2583 cd.
columnId = sqliteConnector_.getData<
int>(0, 0);
2585 ++tableDescriptorMapById_[td.
tableId]->nColumns;
2587 addToColumnMap(ncd);
2588 columnDescriptorsForRoll.emplace_back(
nullptr, ncd);
2593 dropColumnPolicies(td, cd);
2597 sqliteConnector_.query(
"BEGIN TRANSACTION");
2599 dropColumnNontransactional(td, cd);
2600 }
catch (std::exception& e) {
2601 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
2604 sqliteConnector_.query(
"END TRANSACTION");
2612 sqliteConnector_.query_with_text_params(
2613 "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2616 sqliteConnector_.query_with_text_params(
2617 "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2620 ColumnDescriptorMap::iterator columnDescIt =
2622 CHECK(columnDescIt != columnDescriptorMap_.end());
2624 auto ocd = columnDescIt->second;
2625 removeFromColumnMap(ocd);
2626 --tableDescriptorMapById_[td.
tableId]->nColumns;
2627 removeColumnDescriptor(ocd);
2628 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
2632 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2633 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2635 dropColumnNontransactional(*shard, *shard_cd);
2644 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2645 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2647 dropColumnPolicies(*shard, *shard_cd);
2658 sqliteConnector_.query_with_text_params(
2659 "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2663 sqliteConnector_.query_with_text_params(
2664 "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2667 ColumnDescriptorMap::iterator columnDescIt =
2669 CHECK(columnDescIt != columnDescriptorMap_.end());
2671 columnDescriptorsForRoll.emplace_back(columnDescIt->second,
nullptr);
2672 removeFromColumnMap(columnDescIt->second);
2673 --tableDescriptorMapById_[td.
tableId]->nColumns;
2678 for (
const auto shard : getPhysicalTablesDescriptors(&td)) {
2679 const auto shard_cd = getMetadataForColumn(shard->tableId, cd.
columnId);
2681 dropColumn(*shard, *shard_cd);
2691 auto tabDescIt = tableDescriptorMapById_.find(cd->
tableId);
2692 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2693 auto td = tabDescIt->second;
2694 auto& cd_by_spi = td->columnIdBySpi_;
2695 cd_by_spi.erase(std::remove(cd_by_spi.begin(), cd_by_spi.end(), cd->
columnId),
2698 std::sort(cd_by_spi.begin(), cd_by_spi.end());
2706 auto tabDescIt = tableDescriptorMapById_.find(cd->
tableId);
2707 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2708 auto td = tabDescIt->second;
2709 auto& cd_by_spi = td->columnIdBySpi_;
2711 if (cd_by_spi.end() == std::find(cd_by_spi.begin(), cd_by_spi.end(), cd->
columnId)) {
2714 std::sort(cd_by_spi.begin(), cd_by_spi.end());
2718 void Catalog::rollLegacy(
const bool forward) {
2720 std::set<const TableDescriptor*> tds;
2722 for (
const auto& cdr : columnDescriptorsForRoll) {
2723 auto ocd = cdr.first;
2724 auto ncd = cdr.second;
2726 auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2727 CHECK(tableDescriptorMapById_.end() != tabDescIt);
2728 auto td = tabDescIt->second;
2729 auto& vc = td->columnIdBySpi_;
2732 if (
nullptr == ncd ||
2733 ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2734 delDictionaryNontransactional(*ocd);
2737 vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2743 if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2744 if (!ncd->isGeoPhyCol) {
2745 vc.push_back(ncd->columnId);
2752 addToColumnMap(ocd);
2756 removeFromColumnMap(ncd);
2757 if (
nullptr == ocd ||
2758 ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2759 delDictionaryNontransactional(*ncd);
2765 columnDescriptorsForRoll.clear();
2768 for (
const auto td : tds) {
2769 calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2775 list<ColumnDescriptor>& columns) {
2777 if (
IS_GEO(col_ti.get_type())) {
2778 switch (col_ti.get_type()) {
2787 col_ti.get_comp_param() == 32) {
2788 unit_size = 4 *
sizeof(int8_t);
2791 unit_size = 8 *
sizeof(int8_t);
2795 columns.push_back(physical_cd_coords);
2809 columns.push_back(physical_cd_coords);
2815 bounds_ti.
set_size(4 *
sizeof(
double));
2817 columns.push_back(physical_cd_bounds);
2830 columns.push_back(physical_cd_coords);
2836 physical_cd_linestring_sizes.
columnType = linestring_sizes_ti;
2837 columns.push_back(physical_cd_linestring_sizes);
2843 bounds_ti.
set_size(4 *
sizeof(
double));
2845 columns.push_back(physical_cd_bounds);
2858 columns.push_back(physical_cd_coords);
2864 physical_cd_ring_sizes.
columnType = ring_sizes_ti;
2865 columns.push_back(physical_cd_ring_sizes);
2871 bounds_ti.
set_size(4 *
sizeof(
double));
2873 columns.push_back(physical_cd_bounds);
2886 columns.push_back(physical_cd_coords);
2892 physical_cd_ring_sizes.
columnType = ring_sizes_ti;
2893 columns.push_back(physical_cd_ring_sizes);
2899 physical_cd_poly_rings.
columnType = poly_rings_ti;
2900 columns.push_back(physical_cd_poly_rings);
2906 bounds_ti.
set_size(4 *
sizeof(
double));
2908 columns.push_back(physical_cd_bounds);
2915 throw runtime_error(
"Unrecognized geometry type.");
2923 auto timing_type_entry =
2925 CHECK(timing_type_entry != foreign_table.
options.end());
2926 if (timing_type_entry->second ==
2935 void Catalog::createTable(
2937 const list<ColumnDescriptor>& cols,
2938 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2939 bool isLogicalTable) {
2941 list<ColumnDescriptor> cds = cols;
2942 list<DictDescriptor> dds;
2943 std::set<std::string> toplevel_column_names;
2944 list<ColumnDescriptor> columns;
2949 throw std::runtime_error(
"Only temporary tables can be backed by foreign storage.");
2951 dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2954 for (
auto cd : cds) {
2955 if (cd.columnName ==
"rowid") {
2956 throw std::runtime_error(
2957 "Cannot create column with name rowid. rowid is a system defined column.");
2959 columns.push_back(cd);
2960 toplevel_column_names.insert(cd.columnName);
2961 if (cd.columnType.is_geometry()) {
2962 expandGeoColumn(cd, columns);
2972 #ifdef MATERIALIZED_ROWID
2976 cd.
virtualExpr =
"MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2978 columns.push_back(cd);
2989 columns.push_back(cd_del);
2992 for (
auto& column : columns) {
2993 column.db_id = getDatabaseId();
2999 sqliteConnector_.query(
"BEGIN TRANSACTION");
3002 sqliteConnector_.query_with_text_params(
3003 R
"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
3025 sqliteConnector_.query_with_text_param(
3026 "SELECT tableid FROM mapd_tables WHERE name = ?", td.
tableName);
3027 td.
tableId = sqliteConnector_.getData<
int>(0, 0);
3029 for (
auto cd : columns) {
3031 const bool is_foreign_col =
3032 setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3033 if (!is_foreign_col) {
3040 auto use_temp_dictionary =
false;
3041 setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
3045 if (toplevel_column_names.count(cd.
columnName)) {
3052 std::vector<BindType> types(17, BindType::TEXT);
3054 types[16] = BindType::NULL_TYPE;
3056 sqliteConnector_.query_with_text_params(
3057 "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
3058 "coldim, colscale, is_notnull, "
3059 "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
3060 "virtual_expr, is_deletedcol, default_value) "
3061 "VALUES (?, ?, ?, ?, ?, "
3063 "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
3087 sqliteConnector_.query_with_text_params(
3088 "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
3094 sqliteConnector_.query_with_text_params(
3095 "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
3096 "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
3099 foreign_table.getOptionsAsJsonString(),
3103 }
catch (std::exception& e) {
3104 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3108 td.
tableId = nextTempTableId_++;
3110 for (
auto cd : columns) {
3112 const bool is_foreign_col =
3113 setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
3115 if (!is_foreign_col) {
3117 std::string fileName(
"");
3118 std::string folderPath(
"");
3119 DictRef dict_ref(currentDB_.dbId, nextTempDictId_);
3136 if (toplevel_column_names.count(cd.
columnName)) {
3147 serializeTableJsonUnlocked(&td, cds);
3152 auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
3154 CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.
tableId}))
3155 <<
"Disk cache at " + cache->getCacheDirectory()
3156 <<
" contains preexisting data for new table. Please "
3157 "delete or clear cache before continuing";
3160 addTableToMap(&td, cds, dds);
3161 calciteMgr_->updateMetadata(currentDB_.dbName, td.
tableName);
3163 dataMgr_->getForeignStorageInterface()->registerTable(
this, td, cds);
3165 }
catch (std::exception& e) {
3166 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3167 removeTableFromMap(td.tableName, td.tableId,
true);
3170 sqliteConnector_.query(
"END TRANSACTION");
3173 write_lock.unlock();
3174 sqlite_lock.unlock();
3175 getMetadataForTable(td.tableName,
3181 const std::list<ColumnDescriptor>& cds)
const {
3183 using namespace rapidjson;
3185 VLOG(1) <<
"Serializing temporary table " << td->
tableName <<
" to JSON for Calcite.";
3187 const auto db_name = currentDB_.dbName;
3191 if (boost::filesystem::exists(file_path)) {
3193 std::ifstream reader(file_path.string());
3194 CHECK(reader.is_open());
3195 IStreamWrapper json_read_wrapper(reader);
3196 d.ParseStream(json_read_wrapper);
3200 CHECK(d.IsObject());
3203 Value table(kObjectType);
3205 "name",
Value().SetString(StringRef(td->
tableName.c_str())), d.GetAllocator());
3206 table.AddMember(
"id",
Value().SetInt(td->
tableId), d.GetAllocator());
3207 table.AddMember(
"columns",
Value(kArrayType), d.GetAllocator());
3209 for (
const auto& cd : cds) {
3210 Value column(kObjectType);
3212 "name",
Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
3213 column.AddMember(
"coltype",
3214 Value().SetInt(static_cast<int>(cd.columnType.get_type())),
3216 column.AddMember(
"colsubtype",
3217 Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
3219 column.AddMember(
"compression",
3220 Value().SetInt(static_cast<int>(cd.columnType.get_compression())),
3222 column.AddMember(
"comp_param",
3223 Value().SetInt(static_cast<int>(cd.columnType.get_comp_param())),
3225 column.AddMember(
"size",
3226 Value().SetInt(static_cast<int>(cd.columnType.get_size())),
3229 "coldim",
Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
3231 "colscale",
Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
3233 "is_notnull",
Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
3234 column.AddMember(
"is_systemcol",
Value().SetBool(cd.isSystemCol), d.GetAllocator());
3235 column.AddMember(
"is_virtualcol",
Value().SetBool(cd.isVirtualCol), d.GetAllocator());
3236 column.AddMember(
"is_deletedcol",
Value().SetBool(cd.isDeletedCol), d.GetAllocator());
3237 table[
"columns"].PushBack(column, d.GetAllocator());
3239 d.AddMember(StringRef(td->
tableName.c_str()), table, d.GetAllocator());
3242 std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3243 CHECK(writer.is_open());
3244 OStreamWrapper json_wrapper(writer);
3246 Writer<OStreamWrapper> json_writer(json_wrapper);
3247 d.Accept(json_writer);
3251 void Catalog::dropTableFromJsonUnlocked(
const std::string& table_name)
const {
3253 using namespace rapidjson;
3255 VLOG(1) <<
"Dropping temporary table " << table_name <<
" to JSON for Calcite.";
3257 const auto db_name = currentDB_.dbName;
3260 CHECK(boost::filesystem::exists(file_path));
3263 std::ifstream reader(file_path.string());
3264 CHECK(reader.is_open());
3265 IStreamWrapper json_read_wrapper(reader);
3266 d.ParseStream(json_read_wrapper);
3268 CHECK(d.IsObject());
3269 auto table_name_ref = StringRef(table_name.c_str());
3270 CHECK(d.HasMember(table_name_ref));
3271 CHECK(d.RemoveMember(table_name_ref));
3274 std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3275 CHECK(writer.is_open());
3276 OStreamWrapper json_wrapper(writer);
3278 Writer<OStreamWrapper> json_writer(json_wrapper);
3279 d.Accept(json_writer);
3283 void Catalog::createForeignServer(
3284 std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3285 bool if_not_exists) {
3288 createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
3291 void Catalog::createForeignServerNoLocks(
3292 std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3293 bool if_not_exists) {
3294 const auto&
name = foreign_server->name;
3296 sqliteConnector_.query_with_text_params(
3297 "SELECT name from omnisci_foreign_servers where name = ?",
3298 std::vector<std::string>{
name});
3300 if (sqliteConnector_.getNumRows() == 0) {
3301 foreign_server->creation_time = std::time(
nullptr);
3302 sqliteConnector_.query_with_text_params(
3303 "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
3306 "VALUES (?, ?, ?, ?, ?)",
3307 std::vector<std::string>{
name,
3308 foreign_server->data_wrapper_type,
3311 foreign_server->getOptionsAsJsonString()});
3312 sqliteConnector_.query_with_text_params(
3313 "SELECT id from omnisci_foreign_servers where name = ?",
3314 std::vector<std::string>{
name});
3315 CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
3316 foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
3317 std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
3318 std::move(foreign_server);
3319 CHECK(foreignServerMap_.find(
name) == foreignServerMap_.end())
3320 <<
"Attempting to insert a foreign server into foreign server map that already "
3322 foreignServerMap_[
name] = foreign_server_shared;
3323 foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
3324 }
else if (!if_not_exists) {
3325 throw std::runtime_error{
"A foreign server with name \"" + foreign_server->name +
3326 "\" already exists."};
3329 const auto& server_it = foreignServerMap_.find(
name);
3330 CHECK(server_it != foreignServerMap_.end());
3331 CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
3335 const std::string& server_name)
const {
3339 if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
3340 foreign_server = foreignServerMap_.find(server_name)->second.get();
3342 return foreign_server;
3345 const std::unique_ptr<const foreign_storage::ForeignServer>
3346 Catalog::getForeignServerFromStorage(
const std::string& server_name) {
3347 std::unique_ptr<foreign_storage::ForeignServer> foreign_server =
nullptr;
3349 sqliteConnector_.query_with_text_params(
3350 "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
3351 "FROM omnisci_foreign_servers WHERE name = ?",
3352 std::vector<std::string>{server_name});
3353 if (sqliteConnector_.getNumRows() > 0) {
3354 foreign_server = std::make_unique<foreign_storage::ForeignServer>(
3355 sqliteConnector_.getData<
int>(0, 0),
3356 sqliteConnector_.getData<std::string>(0, 1),
3357 sqliteConnector_.getData<std::string>(0, 2),
3358 sqliteConnector_.getData<std::string>(0, 3),
3359 sqliteConnector_.getData<std::int32_t>(0, 4),
3360 sqliteConnector_.getData<std::int32_t>(0, 5));
3362 return foreign_server;
3365 const std::unique_ptr<const foreign_storage::ForeignTable>
3366 Catalog::getForeignTableFromStorage(
int table_id) {
3367 std::unique_ptr<foreign_storage::ForeignTable> foreign_table =
nullptr;
3369 sqliteConnector_.query_with_text_params(
3370 "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
3371 "omnisci_foreign_tables WHERE table_id = ?",
3372 std::vector<std::string>{
to_string(table_id)});
3373 auto num_rows = sqliteConnector_.getNumRows();
3376 foreign_table = std::make_unique<foreign_storage::ForeignTable>(
3377 sqliteConnector_.getData<
int>(0, 0),
3378 foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
3379 sqliteConnector_.getData<std::string>(0, 2),
3380 sqliteConnector_.getData<int64_t>(0, 3),
3381 sqliteConnector_.getData<int64_t>(0, 4));
3383 return foreign_table;
3386 void Catalog::changeForeignServerOwner(
const std::string& server_name,
3387 const int new_owner_id) {
3390 foreignServerMap_.find(server_name)->second.get();
3391 CHECK(foreign_server);
3392 setForeignServerProperty(server_name,
"owner_user_id",
std::to_string(new_owner_id));
3394 foreign_server->
user_id = new_owner_id;
3397 void Catalog::setForeignServerDataWrapper(
const std::string& server_name,
3398 const std::string& data_wrapper) {
3400 auto data_wrapper_type =
to_upper(data_wrapper);
3403 foreignServerMap_.find(server_name)->second.get();
3404 CHECK(foreign_server);
3409 }
catch (
const std::exception& e) {
3415 setForeignServerProperty(server_name,
"data_wrapper_type", data_wrapper_type);
3418 void Catalog::setForeignServerOptions(
const std::string& server_name,
3419 const std::string& options) {
3423 foreignServerMap_.find(server_name)->second.get();
3424 CHECK(foreign_server);
3425 auto saved_options = foreign_server->
options;
3429 }
catch (
const std::exception& e) {
3432 foreign_server->
options = saved_options;
3435 setForeignServerProperty(server_name,
"options", options);
3438 void Catalog::renameForeignServer(
const std::string& server_name,
3439 const std::string&
name) {
3441 auto foreign_server_it = foreignServerMap_.find(server_name);
3442 CHECK(foreign_server_it != foreignServerMap_.end());
3443 setForeignServerProperty(server_name,
"name", name);
3444 auto foreign_server_shared = foreign_server_it->second;
3445 foreign_server_shared->name =
name;
3446 foreignServerMap_[
name] = foreign_server_shared;
3447 foreignServerMap_.erase(foreign_server_it);
3450 void Catalog::dropForeignServer(
const std::string& server_name) {
3454 sqliteConnector_.query_with_text_params(
3455 "SELECT id from omnisci_foreign_servers where name = ?",
3456 std::vector<std::string>{server_name});
3457 auto num_rows = sqliteConnector_.getNumRows();
3460 auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
3461 sqliteConnector_.query_with_text_param(
3462 "SELECT table_id from omnisci_foreign_tables where server_id = ?",
3464 if (sqliteConnector_.getNumRows() > 0) {
3465 throw std::runtime_error{
"Foreign server \"" + server_name +
3467 "by existing foreign tables and cannot be dropped."};
3469 sqliteConnector_.query(
"BEGIN TRANSACTION");
3471 sqliteConnector_.query_with_text_params(
3472 "DELETE FROM omnisci_foreign_servers WHERE name = ?",
3473 std::vector<std::string>{server_name});
3474 }
catch (
const std::exception& e) {
3475 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3478 sqliteConnector_.query(
"END TRANSACTION");
3479 foreignServerMap_.erase(server_name);
3480 foreignServerMapById_.erase(server_id);
3484 void Catalog::getForeignServersForUser(
3485 const rapidjson::Value* filters,
3487 std::vector<const foreign_storage::ForeignServer*>& results) {
3492 std::map<std::string, std::string> col_names{{
"server_name",
"name"},
3493 {
"data_wrapper",
"data_wrapper_type"},
3494 {
"created_at",
"creation_time"},
3495 {
"options",
"options"}};
3498 std::stringstream filter_string;
3499 std::vector<std::string> arguments;
3501 if (filters !=
nullptr) {
3503 int num_filters = 0;
3504 filter_string <<
" WHERE";
3505 for (
auto& filter_def : filters->GetArray()) {
3506 if (num_filters > 0) {
3507 filter_string <<
" " << std::string(filter_def[
"chain"].GetString());
3511 if (col_names.find(std::string(filter_def[
"attribute"].GetString())) ==
3513 throw std::runtime_error{
"Attribute with name \"" +
3514 std::string(filter_def[
"attribute"].GetString()) +
3515 "\" does not exist."};
3518 filter_string <<
" " << col_names[std::string(filter_def[
"attribute"].GetString())];
3520 bool equals_operator =
false;
3521 if (std::strcmp(filter_def[
"operation"].GetString(),
"EQUALS") == 0) {
3522 filter_string <<
" = ? ";
3523 equals_operator =
true;
3525 filter_string <<
" LIKE ? ";
3528 bool timestamp_column =
3529 (std::strcmp(filter_def[
"attribute"].GetString(),
"created_at") == 0);
3531 if (timestamp_column && !equals_operator) {
3532 throw std::runtime_error{
"LIKE operator is incompatible with TIMESTAMP data"};
3535 if (timestamp_column && equals_operator) {
3537 dateTimeParse<kTIMESTAMP>(filter_def[
"value"].GetString(), 0)));
3539 arguments.emplace_back(filter_def[
"value"].GetString());
3546 std::string query = std::string(
"SELECT name from omnisci_foreign_servers ");
3547 query += filter_string.str();
3549 sqliteConnector_.query_with_text_params(query, arguments);
3550 auto num_rows = sqliteConnector_.getNumRows();
3552 if (sqliteConnector_.getNumRows() == 0) {
3556 CHECK(sqliteConnector_.getNumCols() == 1);
3558 results.reserve(num_rows);
3559 for (
size_t row = 0; row < num_rows; ++row) {
3560 const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
3565 CHECK(foreign_server !=
nullptr);
3569 std::vector<DBObject> privObjects = {dbObject};
3570 if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
3574 results.push_back(foreign_server);
3579 int32_t Catalog::getTableEpoch(
const int32_t db_id,
const int32_t table_id)
const {
3581 const auto td = getMetadataForTable(table_id,
false);
3583 std::stringstream table_not_found_error_message;
3584 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3586 throw std::runtime_error(table_not_found_error_message.str());
3588 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3589 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3591 const auto physicalTables = physicalTableIt->second;
3592 CHECK(!physicalTables.empty());
3593 size_t curr_epoch{0}, first_epoch{0};
3594 int32_t first_table_id{0};
3595 bool are_epochs_inconsistent{
false};
3596 for (
size_t i = 0; i < physicalTables.size(); i++) {
3597 int32_t physical_tb_id = physicalTables[i];
3598 const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id,
false);
3601 curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3602 LOG(
INFO) <<
"Got sharded table epoch for db id: " << db_id
3603 <<
", table id: " << physical_tb_id <<
", epoch: " << curr_epoch;
3605 first_epoch = curr_epoch;
3606 first_table_id = physical_tb_id;
3607 }
else if (first_epoch != curr_epoch) {
3608 are_epochs_inconsistent =
true;
3609 LOG(
ERROR) <<
"Epochs on shards do not all agree on table id: " << table_id
3610 <<
", db id: " << db_id
3611 <<
". First table (table id: " << first_table_id
3612 <<
") has epoch: " << first_epoch <<
". Table id: " << physical_tb_id
3613 <<
", has inconsistent epoch: " << curr_epoch
3614 <<
". See previous INFO logs for all epochs and their table ids.";
3617 if (are_epochs_inconsistent) {
3623 auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3624 LOG(
INFO) <<
"Got table epoch for db id: " << db_id <<
", table id: " << table_id
3625 <<
", epoch: " << epoch;
3630 std::vector<const foreign_storage::ForeignTable*>
3631 Catalog::getAllForeignTablesForForeignServer(
const int32_t foreign_server_id) {
3633 std::vector<const foreign_storage::ForeignTable*> foreign_tables;
3634 for (
auto entry : tableDescriptorMapById_) {
3635 auto table_descriptor = entry.second;
3638 CHECK(foreign_table);
3639 if (foreign_table->foreign_server->id == foreign_server_id) {
3640 foreign_tables.emplace_back(foreign_table);
3644 return foreign_tables;
3647 void Catalog::setTableEpoch(
const int db_id,
const int table_id,
int new_epoch) {
3648 LOG(
INFO) <<
"Set table epoch db:" << db_id <<
" Table ID " << table_id
3649 <<
" back to new epoch " << new_epoch;
3650 const auto td = getMetadataForTable(table_id,
false);
3652 std::stringstream table_not_found_error_message;
3653 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3655 throw std::runtime_error(table_not_found_error_message.str());
3658 std::stringstream is_temp_table_error_message;
3659 is_temp_table_error_message <<
"Cannot set epoch on temporary table";
3660 throw std::runtime_error(is_temp_table_error_message.str());
3664 file_mgr_params.
epoch = new_epoch;
3667 const auto physical_tables = getPhysicalTablesDescriptors(td,
false);
3668 CHECK(!physical_tables.empty());
3669 for (
const auto table : physical_tables) {
3670 auto table_id = table->tableId;
3671 LOG(
INFO) <<
"Set sharded table epoch db:" << db_id <<
" Table ID " << table_id
3672 <<
" back to new epoch " << new_epoch;
3675 removeChunks(table_id);
3676 dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3680 void Catalog::alterPhysicalTableMetadata(
3691 sqliteConnector_.query_with_text_params(
3692 "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3699 sqliteConnector_.query_with_text_params(
3700 "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3711 sqliteConnector_.query(
"BEGIN TRANSACTION");
3713 const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->
tableId);
3714 if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3715 const auto physical_tables = physical_table_it->second;
3716 CHECK(!physical_tables.empty());
3717 for (
size_t i = 0; i < physical_tables.size(); i++) {
3718 int32_t physical_tb_id = physical_tables[i];
3719 const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id,
false);
3721 alterPhysicalTableMetadata(phys_td, table_update_params);
3724 alterPhysicalTableMetadata(td, table_update_params);
3725 }
catch (std::exception& e) {
3726 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
3729 sqliteConnector_.query(
"END TRANSACTION");
3732 void Catalog::setMaxRollbackEpochs(
const int32_t table_id,
3733 const int32_t max_rollback_epochs) {
3736 if (max_rollback_epochs <= -1) {
3737 throw std::runtime_error(
"Cannot set max_rollback_epochs < 0.");
3739 const auto td = getMetadataForTable(
3745 if (table_update_params == td) {
3746 LOG(
INFO) <<
"Setting max_rollback_epochs for table " << table_id
3747 <<
" to existing value, skipping operation";
3751 file_mgr_params.
epoch = -1;
3753 setTableFileMgrParams(table_id, file_mgr_params);
3754 alterTableMetadata(td, table_update_params);
3757 void Catalog::setMaxRows(
const int32_t table_id,
const int64_t max_rows) {
3759 throw std::runtime_error(
"Max rows cannot be a negative number.");
3761 const auto td = getMetadataForTable(table_id);
3764 table_update_params.
max_rows = max_rows;
3765 if (table_update_params == td) {
3766 LOG(
INFO) <<
"Max rows value of " << max_rows
3767 <<
" is the same as the existing value. Skipping update.";
3770 alterTableMetadata(td, table_update_params);
3771 CHECK(td->fragmenter);
3772 td->fragmenter->dropFragmentsToSize(max_rows);
3776 void Catalog::setUncappedTableEpoch(
const std::string& table_name) {
3778 auto td_entry = tableDescriptorMap_.find(
to_upper(table_name));
3779 CHECK(td_entry != tableDescriptorMap_.end());
3780 auto td = td_entry->second;
3788 alterTableMetadata(td, table_update_params);
3791 setTableFileMgrParams(td->tableId, file_mgr_params);
3794 void Catalog::setTableFileMgrParams(
3798 const auto td = getMetadataForTable(table_id,
false);
3799 const auto db_id = this->getDatabaseId();
3801 std::stringstream table_not_found_error_message;
3802 table_not_found_error_message <<
"Table (" << db_id <<
"," << table_id
3804 throw std::runtime_error(table_not_found_error_message.str());
3807 std::stringstream is_temp_table_error_message;
3808 is_temp_table_error_message <<
"Cannot set storage params on temporary table";
3809 throw std::runtime_error(is_temp_table_error_message.str());
3812 const auto physical_tables = getPhysicalTablesDescriptors(td,
false);
3813 CHECK(!physical_tables.empty());
3814 for (
const auto table : physical_tables) {
3815 auto table_id = table->tableId;
3816 removeChunks(table_id);
3817 dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3821 std::vector<TableEpochInfo> Catalog::getTableEpochs(
const int32_t db_id,
3822 const int32_t table_id)
const {
3824 std::vector<TableEpochInfo> table_epochs;
3825 const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3826 if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3827 const auto physical_tables = physical_table_it->second;
3828 CHECK(!physical_tables.empty());
3830 for (
const auto physical_tb_id : physical_tables) {
3831 const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3834 auto table_id = phys_td->tableId;
3835 auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3836 table_epochs.emplace_back(table_id, epoch);
3837 LOG(
INFO) <<
"Got sharded table epoch for db id: " << db_id
3838 <<
", table id: " << table_id <<
", epoch: " << epoch;
3841 auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3842 LOG(
INFO) <<
"Got table epoch for db id: " << db_id <<
", table id: " << table_id
3843 <<
", epoch: " << epoch;
3844 table_epochs.emplace_back(table_id, epoch);
3846 return table_epochs;
3849 void Catalog::setTableEpochs(
const int32_t db_id,
3850 const std::vector<TableEpochInfo>& table_epochs)
const {
3851 const auto td = getMetadataForTable(table_epochs[0].table_id,
false);
3856 for (
const auto& table_epoch_info : table_epochs) {
3857 removeChunks(table_epoch_info.table_id);
3858 file_mgr_params.
epoch = table_epoch_info.table_epoch;
3859 dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3860 db_id, table_epoch_info.table_id, file_mgr_params);
3861 LOG(
INFO) <<
"Set table epoch for db id: " << db_id
3862 <<
", table id: " << table_epoch_info.table_id
3863 <<
", back to epoch: " << table_epoch_info.table_epoch;
3869 std::string table_epochs_str{
"["};
3870 bool first_entry{
true};
3871 for (
const auto& table_epoch : table_epochs) {
3873 first_entry =
false;
3875 table_epochs_str +=
", ";
3877 table_epochs_str +=
"(table_id: " +
std::to_string(table_epoch.table_id) +
3880 table_epochs_str +=
"]";
3881 return table_epochs_str;
3885 void Catalog::setTableEpochsLogExceptions(
3886 const int32_t db_id,
3887 const std::vector<TableEpochInfo>& table_epochs)
const {
3889 setTableEpochs(db_id, table_epochs);
3890 }
catch (std::exception& e) {
3891 LOG(
ERROR) <<
"An error occurred when attempting to set table epochs. DB id: "
3893 <<
", Error: " << e.what();
3899 const auto it = deletedColumnPerTable_.find(td);
3900 return it != deletedColumnPerTable_.end() ? it->second :
nullptr;
3904 int delete_column_id)
const {
3909 return fragmenter->hasDeletedRows(delete_column_id);
3917 std::vector<const TableDescriptor*> tds;
3922 const auto it = deletedColumnPerTable_.find(td);
3924 if (it == deletedColumnPerTable_.end()) {
3928 tds = getPhysicalTablesDescriptors(td,
false);
3931 for (
auto tdd : tds) {
3932 if (checkMetadataForDeletedRecs(tdd, cd->
columnId)) {
3942 setDeletedColumnUnlocked(td, cd);
3947 const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3948 CHECK(it_ok.second);
3967 const bool persist_reference) {
3970 CHECK(foreign_ref_col);
3971 referencing_column.
columnType = foreign_ref_col->columnType;
3973 const DictRef dict_ref(currentDB_.dbId, dict_id);
3974 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3975 CHECK(dictIt != dictDescriptorMapByRef_.end());
3976 const auto& dd = dictIt->second;
3979 if (persist_reference) {
3981 sqliteConnector_.query_with_text_params(
3982 "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3987 bool Catalog::setColumnSharedDictionary(
3989 std::list<ColumnDescriptor>& cdd,
3990 std::list<DictDescriptor>& dds,
3992 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3996 if (shared_dict_defs.empty()) {
3999 for (
const auto& shared_dict_def : shared_dict_defs) {
4001 const auto& column = shared_dict_def.get_column();
4003 if (!shared_dict_def.get_foreign_table().compare(td.
tableName)) {
4005 const auto& ref_column = shared_dict_def.get_foreign_column();
4007 std::find_if(cdd.begin(), cdd.end(), [ref_column](
const ColumnDescriptor it) {
4008 return !ref_column.compare(it.columnName);
4010 CHECK(colIt != cdd.end());
4015 auto dictIt = std::find_if(
4016 dds.begin(), dds.end(), [
this, dict_id](
const DictDescriptor it) {
4017 return it.dictRef.dbId == this->currentDB_.dbId &&
4018 it.dictRef.dictId == dict_id;
4020 if (dictIt != dds.end()) {
4026 sqliteConnector_.query_with_text_params(
4027 "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
4037 const auto& foreign_table_name = shared_dict_def.get_foreign_table();
4038 const auto foreign_td = getMetadataForTable(foreign_table_name,
false);
4041 throw std::runtime_error(
4042 "Only temporary tables can share dictionaries with other temporary "
4045 addReferenceToForeignDict(cd, shared_dict_def,
false);
4057 std::list<DictDescriptor>& dds,
4059 bool is_logical_table,
4060 bool use_temp_dictionary) {
4063 std::string dictName{
"Initial_key"};
4065 std::string folderPath;
4066 if (is_logical_table) {
4069 sqliteConnector_.query_with_text_params(
4070 "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
4072 std::vector<std::string>{
4074 sqliteConnector_.query_with_text_param(
4075 "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
4076 dictId = sqliteConnector_.getData<
int>(0, 0);
4078 sqliteConnector_.query_with_text_param(
4079 "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
4090 use_temp_dictionary);
4099 void Catalog::createShardedTable(
4101 const list<ColumnDescriptor>& cols,
4102 const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
4105 createTable(*tdl, cols, shared_dict_defs,
true);
4106 int32_t logical_tb_id = tdl->
tableId;
4107 std::string logical_table_name = tdl->
tableName;
4110 std::vector<int32_t> physicalTables;
4111 for (int32_t i = 1; i <= td.
nShards; i++) {
4113 tdp->
tableName = generatePhysicalTableName(logical_table_name, i);
4115 createTable(*tdp, cols, shared_dict_defs,
false);
4116 int32_t physical_tb_id = tdp->
tableId;
4119 physicalTables.push_back(physical_tb_id);
4122 if (!physicalTables.empty()) {
4126 logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
4127 CHECK(it_ok.second);
4130 updateLogicalToPhysicalTableMap(logical_tb_id);
4137 const auto physical_tables = getPhysicalTablesDescriptors(td);
4138 for (
const auto table : physical_tables) {
4139 doTruncateTable(table);
4145 removeFragmenterForTable(td->
tableId);
4147 const int tableId = td->
tableId;
4148 ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4153 dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4156 std::unique_ptr<StringDictionaryClient> client;
4157 if (SysCatalog::instance().isAggregator()) {
4158 CHECK(!string_dict_hosts_.empty());
4159 DictRef dict_ref(currentDB_.dbId, -1);
4164 for (
const auto& columnDescriptor : columnDescriptorMapById_) {
4165 auto cd = columnDescriptor.second;
4172 const DictRef dict_ref(currentDB_.dbId, dict_id);
4173 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
4174 CHECK(dictIt != dictDescriptorMapByRef_.end());
4175 const auto& dd = dictIt->second;
4178 if (dd->refcount == 1) {
4180 dd->stringDict.reset();
4183 client->drop(dd->dictRef);
4185 if (!dd->dictIsTemp) {
4186 boost::filesystem::create_directory(dd->dictFolderPath);
4197 dictDescriptorMapByRef_.erase(dictIt);
4202 dictDescriptorMapByRef_[new_dd->
dictRef].reset(new_dd);
4210 for (
auto col_id = 0; col_id < td.
nColumns; ++col_id) {
4211 if (
auto it = columnDescriptorMapById_.find({td.
tableId, col_id});
4212 it != columnDescriptorMapById_.end()) {
4213 auto cd = it->second;
4216 DictRef dict_ref(currentDB_.dbId, dict_id);
4217 if (
auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4218 dict_it != dictDescriptorMapByRef_.end()) {
4220 dict_it->second->stringDict =
nullptr;
4222 getMetadataForDict(dict_id,
true);
4229 void Catalog::invalidateCachesForTable(
const int table_id) {
4232 ChunkKey const table_key{getDatabaseId(), table_id};
4233 auto td = getMutableMetadataForTableUnlocked(table_id);
4239 refreshDictionaryCachesForTableUnlocked(*td);
4243 if (td->fragmenter !=
nullptr) {
4244 auto tableDescIt = tableDescriptorMapById_.find(table_id);
4245 CHECK(tableDescIt != tableDescriptorMapById_.end());
4246 tableDescIt->second->fragmenter =
nullptr;
4247 CHECK(td->fragmenter ==
nullptr);
4251 if (dynamic_cast<foreign_storage::ForeignTable*>(td)) {
4252 dataMgr_->getPersistentStorageMgr()->getForeignStorageMgr()->refreshTable(table_key,
4255 dataMgr_->removeMutableTableDiskCacheData(currentDB_.dbId, table_id);
4257 instantiateFragmenter(td);
4260 void Catalog::removeFragmenterForTable(
const int table_id)
const {
4262 auto td = getMetadataForTable(table_id,
false);
4263 if (td->fragmenter !=
nullptr) {
4264 auto tableDescIt = tableDescriptorMapById_.find(table_id);
4265 CHECK(tableDescIt != tableDescriptorMapById_.end());
4266 tableDescIt->second->fragmenter =
nullptr;
4267 CHECK(td->fragmenter ==
nullptr);
4272 void Catalog::removeChunks(
const int table_id)
const {
4273 removeFragmenterForTable(table_id);
4276 ChunkKey chunkKey = {currentDB_.dbId, table_id};
4283 SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
4285 std::vector<const TableDescriptor*> tables_to_drop;
4288 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->
tableId);
4289 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4291 const auto physicalTables = physicalTableIt->second;
4292 CHECK(!physicalTables.empty());
4293 for (
size_t i = 0; i < physicalTables.size(); i++) {
4294 int32_t physical_tb_id = physicalTables[i];
4296 getMutableMetadataForTableUnlocked(physical_tb_id);
4298 tables_to_drop.emplace_back(phys_td);
4301 tables_to_drop.emplace_back(td);
4304 for (
auto table : tables_to_drop) {
4305 eraseTablePhysicalData(table);
4307 deleteTableCatalogMetadata(td, tables_to_drop);
4310 void Catalog::deleteTableCatalogMetadata(
4312 const std::vector<const TableDescriptor*>& physical_tables) {
4315 sqliteConnector_.query(
"BEGIN TRANSACTION");
4318 sqliteConnector_.query_with_text_param(
4319 "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
4321 logicalToPhysicalTableMapById_.erase(logical_table->
tableId);
4322 for (
auto table : physical_tables) {
4323 eraseTableMetadata(table);
4325 }
catch (std::exception& e) {
4326 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4329 sqliteConnector_.query(
"END TRANSACTION");
4333 executeDropTableSqliteQueries(td);
4335 dropTableFromJsonUnlocked(td->
tableName);
4337 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4345 const int tableId = td->
tableId;
4346 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_tables WHERE tableid = ?",
4348 sqliteConnector_.query_with_text_params(
4349 "select comp_param from mapd_columns where compression = ? and tableid = ?",
4351 int numRows = sqliteConnector_.getNumRows();
4352 std::vector<int> dict_id_list;
4353 for (
int r = 0; r < numRows; ++r) {
4354 dict_id_list.push_back(sqliteConnector_.getData<
int>(r, 0));
4356 for (
auto dict_id : dict_id_list) {
4357 sqliteConnector_.query_with_text_params(
4358 "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
4361 sqliteConnector_.query_with_text_params(
4362 "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
4363 "mapd_columns where compression = ? "
4364 "and tableid = ?) and refcount = 0",
4366 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_columns WHERE tableid = ?",
4369 sqliteConnector_.query_with_text_param(
"DELETE FROM mapd_views WHERE tableid = ?",
4373 sqliteConnector_.query_with_text_param(
4374 "DELETE FROM omnisci_foreign_tables WHERE table_id = ?",
std::to_string(tableId));
4378 void Catalog::renamePhysicalTable(
const TableDescriptor* td,
const string& newTableName) {
4382 sqliteConnector_.query(
"BEGIN TRANSACTION");
4384 sqliteConnector_.query_with_text_params(
4385 "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4387 }
catch (std::exception& e) {
4388 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4391 sqliteConnector_.query(
"END TRANSACTION");
4392 TableDescriptorMap::iterator tableDescIt =
4394 CHECK(tableDescIt != tableDescriptorMap_.end());
4395 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4399 tableDescriptorMap_.erase(tableDescIt);
4400 tableDescriptorMap_[
to_upper(newTableName)] = changeTd;
4401 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4409 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->
tableId);
4410 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4411 const auto physicalTables = physicalTableIt->second;
4412 CHECK(!physicalTables.empty());
4413 for (
size_t i = 0; i < physicalTables.size(); i++) {
4414 int32_t physical_tb_id = physicalTables[i];
4417 std::string newPhysTableName =
4418 generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
4419 renamePhysicalTable(phys_td, newPhysTableName);
4422 renamePhysicalTable(td, newTableName);
4428 key.
dbId = currentDB_.dbId;
4431 object.setObjectKey(key);
4432 auto objdescs = SysCatalog::instance().getMetadataForObject(
4434 for (
auto obj : objdescs) {
4435 Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4440 SysCatalog::instance().renameObjectsInDescriptorMap(
object, *
this);
4444 void Catalog::renamePhysicalTables(
4445 std::vector<std::pair<std::string, std::string>>& names,
4446 std::vector<int>& tableIds) {
4451 for (
size_t i = 0; i < names.size(); i++) {
4452 int tableId = tableIds[i];
4453 const std::string& newTableName = names[i].second;
4454 sqliteConnector_.query_with_text_params(
4455 "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4456 std::vector<std::string>{newTableName,
std::to_string(tableId)});
4460 for (
size_t i = 0; i < names.size(); i++) {
4461 const auto& [curTableName, newTableName] = names[i];
4463 TableDescriptorMap::iterator tableDescIt =
4464 tableDescriptorMap_.find(
to_upper(curTableName));
4465 CHECK(tableDescIt != tableDescriptorMap_.end());
4466 calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4471 tableDescriptorMap_.erase(tableDescIt);
4472 tableDescriptorMap_[
to_upper(newTableName)] = changeTd;
4473 calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4481 const std::map<std::string, int>& cached_table_map,
4482 const std::string& cur_table_name) {
4483 if (
auto it = cached_table_map.find(cur_table_name); it != cached_table_map.end()) {
4484 auto table_id = it->second;
4485 return (table_id == -1) ? NULL : getMetadataForTable(table_id);
4487 return getMetadataForTable(cur_table_name);
4492 const std::string& curTableName,
4493 const std::string& newTableName,
4496 cachedTableMap[curTableName] = -1;
4499 cachedTableMap[newTableName] = tableId;
4503 void Catalog::renameTables(
4504 const std::vector<std::pair<std::string, std::string>>& names) {
4507 std::vector<int> tableIds;
4514 std::map<int, size_t> uniqueOrderedTableIds;
4517 std::map<std::string, int> cachedTableMap;
4522 for (
size_t i = 0; i < names.size(); i++) {
4523 const auto& [curTableName, newTableName] = names[i];
4527 auto td = getCachedTableDescriptor(cachedTableMap, curTableName);
4530 tableIds.push_back(td->tableId);
4531 if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
4533 uniqueOrderedTableIds[td->tableId] = i;
4538 CHECK_EQ(tableIds.size(), names.size());
4552 for (
auto& idPair : uniqueOrderedTableIds) {
4553 const std::string& tableName = names[idPair.second].first;
4554 tableLocks.emplace_back(
4557 *
this, tableName,
false)));
4565 std::vector<std::pair<std::string, std::string>> allNames;
4566 std::vector<int> allTableIds;
4568 for (
size_t i = 0; i < names.size(); i++) {
4569 int tableId = tableIds[i];
4570 const auto& [curTableName, newTableName] = names[i];
4573 const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
4574 if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4575 const auto physicalTables = physicalTableIt->second;
4576 CHECK(!physicalTables.empty());
4577 for (
size_t k = 0; k < physicalTables.size(); k++) {
4578 int32_t physical_tb_id = physicalTables[k];
4581 std::string newPhysTableName = generatePhysicalTableName(newTableName, (k + 1));
4582 allNames.emplace_back(phys_td->
tableName, newPhysTableName);
4583 allTableIds.push_back(phys_td->
tableId);
4586 allNames.emplace_back(curTableName, newTableName);
4587 allTableIds.push_back(tableId);
4591 execInTransaction(&Catalog::renamePhysicalTables, allNames, allTableIds);
4595 for (
size_t i = 0; i < names.size(); i++) {
4596 int tableId = tableIds[i];
4597 const std::string& newTableName = names[i].second;
4601 key.
dbId = currentDB_.dbId;
4606 object.setObjectKey(key);
4608 auto objdescs = SysCatalog::instance().getMetadataForObject(
4610 for (
auto obj : objdescs) {
4611 Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4616 SysCatalog::instance().renameObjectsInDescriptorMap(
object, *
this);
4623 const string& newColumnName) {
4626 sqliteConnector_.query(
"BEGIN TRANSACTION");
4631 std::string new_column_name = cdx->columnName;
4632 new_column_name.replace(0, cd->
columnName.size(), newColumnName);
4633 sqliteConnector_.query_with_text_params(
4634 "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4635 std::vector<std::string>{new_column_name,
4639 }
catch (std::exception& e) {
4640 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4643 sqliteConnector_.query(
"END TRANSACTION");
4644 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4648 ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4650 CHECK(columnDescIt != columnDescriptorMap_.end());
4653 columnDescriptorMap_.erase(columnDescIt);
4654 columnDescriptorMap_[std::make_tuple(td->
tableId,
to_upper(changeCd->columnName))] =
4657 calciteMgr_->updateMetadata(currentDB_.dbName, td->
tableName);
4663 sqliteConnector_.query(
"BEGIN TRANSACTION");
4666 sqliteConnector_.query_with_text_params(
4667 "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4669 if (sqliteConnector_.getNumRows() > 0) {
4670 sqliteConnector_.query_with_text_params(
4671 "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4673 "datetime('now') where name = ? "
4681 sqliteConnector_.query_with_text_params(
4682 "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4687 "datetime('now'), ?)",
4694 }
catch (std::exception& e) {
4695 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4698 sqliteConnector_.query(
"END TRANSACTION");
4702 sqliteConnector_.query_with_text_params(
4703 "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4704 "WHERE name = ? and userid = ?",
4706 vd.
dashboardId = sqliteConnector_.getData<
int>(0, 0);
4707 vd.
updateTime = sqliteConnector_.getData<std::string>(0, 1);
4708 }
catch (std::exception& e) {
4713 addFrontendViewToMap(vd);
4716 if (!isInfoSchemaDb()) {
4718 createOrUpdateDashboardSystemRole(
4728 CHECK(sqliteConnector_.getSqlitePtr());
4729 sqliteConnector_.query(
"BEGIN TRANSACTION");
4731 sqliteConnector_.query_with_text_params(
4732 "SELECT id FROM mapd_dashboards WHERE id = ?",
4734 if (sqliteConnector_.getNumRows() > 0) {
4735 sqliteConnector_.query_with_text_params(
4736 "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4737 "?, userid = ?, update_time = datetime('now') where id = ? ",
4746 <<
" does not exist in db";
4747 throw runtime_error(
"Error replacing dashboard id " +
4750 }
catch (std::exception& e) {
4751 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4754 sqliteConnector_.query(
"END TRANSACTION");
4757 for (
auto descp : dashboardDescriptorMap_) {
4758 auto dash = descp.second.get();
4761 auto viewDescIt = dashboardDescriptorMap_.find(
std::to_string(dash->userId) +
":" +
4762 dash->dashboardName);
4764 dashboardDescriptorMap_.end()) {
4765 LOG(
ERROR) <<
"No metadata for dashboard for user " << dash->userId
4766 <<
" dashboard " << dash->dashboardName <<
" does not exist in map";
4767 throw runtime_error(
"No metadata for dashboard for user " +
4769 dash->dashboardName +
" does not exist in map");
4771 dashboardDescriptorMap_.erase(viewDescIt);
4777 <<
" does not exist in map";
4779 " does not exist in map");
4783 sqliteConnector_.query_with_text_params(
4784 "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4788 vd.
updateTime = sqliteConnector_.getData<
string>(0, 1);
4791 addFrontendViewToMapNoLock(vd);
4794 if (!isInfoSchemaDb()) {
4796 createOrUpdateDashboardSystemRole(
4801 std::string Catalog::calculateSHA1(
const std::string& data) {
4802 boost::uuids::detail::sha1 sha1;
4803 unsigned int digest[5];
4804 sha1.process_bytes(data.c_str(), data.length());
4805 sha1.get_digest(digest);
4806 std::stringstream ss;
4807 for (
size_t i = 0; i < 5; i++) {
4808 ss << std::hex << digest[i];
4816 sqliteConnector_.query(
"BEGIN TRANSACTION");
4820 sqliteConnector_.query_with_text_params(
4821 "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4823 if (sqliteConnector_.getNumRows() > 0) {
4824 sqliteConnector_.query_with_text_params(
4825 "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4829 sqliteConnector_.query_with_text_params(
4830 "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4831 "update_time) VALUES (?,?,?,?, datetime('now'))",
4832 std::vector<std::string>{
4836 sqliteConnector_.query_with_text_param(
4837 "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4840 ld.
linkId = sqliteConnector_.getData<
int>(0, 0);
4841 ld.
updateTime = sqliteConnector_.getData<std::string>(0, 1);
4842 }
catch (std::exception& e) {
4843 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
4846 sqliteConnector_.query(
"END TRANSACTION");
4855 const auto column_descriptors =
4856 getAllColumnMetadataForTable(td->
tableId,
false,
true,
true);
4860 for (
auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4869 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4871 bool populate_fragmenter)
const {
4873 const auto physicalTableIt =
4874 logicalToPhysicalTableMapById_.find(logical_table_desc->
tableId);
4875 if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4876 return {logical_table_desc};
4878 const auto physicalTablesIds = physicalTableIt->second;
4879 CHECK(!physicalTablesIds.empty());
4881 std::vector<const TableDescriptor*> physicalTables;
4882 for (
size_t i = 0; i < physicalTablesIds.size(); i++) {
4883 physicalTables.push_back(
4884 getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4886 return physicalTables;
4889 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4892 std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4893 table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4894 for (
const auto [table_id, td] : tableDescriptorMapById_) {
4896 if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4897 logicalToPhysicalTableMapById_.find(table_id) ==
4898 logicalToPhysicalTableMapById_.end()) {
4899 table_and_shard_ids.emplace_back(table_id, td->shard);
4902 return table_and_shard_ids;
4905 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4908 std::map<int, const ColumnDescriptor*> mapping;
4910 const auto tables = getAllTableMetadata();
4911 for (
const auto td :
tables) {
4912 if (td->shard >= 0) {
4917 for (
auto& cd : getAllColumnMetadataForTable(td->tableId,
false,
false,
true)) {
4918 const auto& ti = cd->columnType;
4919 if (ti.is_string()) {
4922 const auto dict_id = ti.get_comp_param();
4925 if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4926 mapping[dict_id] = cd;
4939 if (td->
shard >= 0) {
4943 switch (get_tables_type) {
4961 std::vector<DBObject> privObjects = {dbObject};
4962 if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4969 std::vector<std::string> Catalog::getTableNamesForUser(
4974 std::vector<std::string> table_names;
4975 const auto tables = getAllTableMetadata();
4976 for (
const auto td :
tables) {
4977 if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4978 table_names.push_back(td->tableName);
4984 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4987 const std::string& filter_table_name)
const {
4991 std::vector<TableMetadata> tables_metadata;
4992 const auto tables = getAllTableMetadata();
4993 for (
const auto td :
tables) {
4994 if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4995 if (!filter_table_name.empty()) {
4996 if (td->tableName != filter_table_name) {
5002 tables_metadata.emplace_back(table_metadata);
5005 return tables_metadata;
5008 int Catalog::getLogicalTableId(
const int physicalTableId)
const {
5010 for (
const auto& l : logicalToPhysicalTableMapById_) {
5011 if (l.second.end() != std::find_if(l.second.begin(),
5013 [&](decltype(*l.second.begin()) tid) ->
bool {
5014 return physicalTableId == tid;
5019 return physicalTableId;
5022 void Catalog::checkpoint(
const int logicalTableId)
const {
5023 const auto td = getMetadataForTable(logicalTableId);
5024 const auto shards = getPhysicalTablesDescriptors(td);
5025 for (
const auto shard : shards) {
5026 getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
5030 void Catalog::checkpointWithAutoRollback(
const int logical_table_id)
const {
5031 auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
5033 checkpoint(logical_table_id);
5035 setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
5040 void Catalog::resetTableEpochFloor(
const int logicalTableId)
const {
5042 const auto td = getMetadataForTable(logicalTableId,
false);
5043 const auto shards = getPhysicalTablesDescriptors(td,
false);
5044 for (
const auto shard : shards) {
5045 getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
5049 void Catalog::eraseDbMetadata() {
5050 const auto tables = getAllTableMetadata();
5051 for (
const auto table :
tables) {
5052 eraseTableMetadata(table);
5057 calciteMgr_->updateMetadata(currentDB_.dbName,
"");
5060 void Catalog::eraseDbPhysicalData() {
5061 const auto tables = getAllTableMetadata();
5062 for (
const auto table :
tables) {
5063 eraseTablePhysicalData(table);
5068 const int tableId = td->
tableId;
5070 removeFragmenterForTable(tableId);
5072 ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
5081 dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
5085 std::string Catalog::generatePhysicalTableName(
const std::string& logicalTableName,
5086 const size_t shardNumber) {
5087 std::string physicalTableName =
5088 logicalTableName + physicalTableNameTag_ +
std::to_string(shardNumber);
5089 return (physicalTableName);
5092 void Catalog::buildForeignServerMapUnlocked() {
5094 sqliteConnector_.query(
5095 "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
5096 "omnisci_foreign_servers");
5097 auto num_rows = sqliteConnector_.getNumRows();
5099 for (
size_t row = 0; row < num_rows; row++) {
5100 auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
5101 sqliteConnector_.getData<
int>(row, 0),
5102 sqliteConnector_.getData<std::string>(row, 1),
5103 sqliteConnector_.getData<std::string>(row, 2),
5104 sqliteConnector_.getData<std::string>(row, 3),
5105 sqliteConnector_.getData<std::int32_t>(row, 4),
5106 sqliteConnector_.getData<std::int32_t>(row, 5));
5107 foreignServerMap_[foreign_server->name] = foreign_server;
5108 foreignServerMapById_[foreign_server->id] = foreign_server;
5112 void Catalog::updateForeignTablesInMapUnlocked() {
5114 sqliteConnector_.query(
5115 "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
5116 "omnisci_foreign_tables");
5117 auto num_rows = sqliteConnector_.getNumRows();
5118 for (
size_t r = 0; r < num_rows; r++) {
5119 const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
5120 const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
5121 const auto& options = sqliteConnector_.getData<std::string>(r, 2);
5122 const auto last_refresh_time = sqliteConnector_.getData<int64_t>(r, 3);
5123 const auto next_refresh_time = sqliteConnector_.getData<int64_t>(r, 4);
5125 CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5126 auto foreign_table =
5128 CHECK(foreign_table);
5129 foreign_table->foreign_server = foreignServerMapById_[server_id].get();
5130 CHECK(foreign_table->foreign_server);
5131 foreign_table->populateOptionsMap(options);
5132 foreign_table->last_refresh_time = last_refresh_time;
5133 foreign_table->next_refresh_time = next_refresh_time;
5134 if (foreign_table->is_system_table) {
5135 foreign_table->is_in_memory_system_table =
5137 foreign_table->foreign_server->data_wrapper_type);
5145 <<
"reloadForeignTable expects a table with valid id";
5146 sqliteConnector_.query(
5147 "SELECT server_id, options, last_refresh_time, next_refresh_time from "
5148 "omnisci_foreign_tables WHERE table_id == " +
5150 auto num_rows = sqliteConnector_.getNumRows();
5151 CHECK_EQ(num_rows, 1U) <<
"Expected single entry in omnisci_foreign_tables for table'"
5152 << foreign_table.
tableName <<
"', instead got " << num_rows;
5153 const auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5154 const auto& options = sqliteConnector_.getData<std::string>(0, 1);
5155 const auto last_refresh_time = sqliteConnector_.getData<int64_t>(0, 2);
5156 const auto next_refresh_time = sqliteConnector_.getData<int64_t>(0, 3);
5158 foreign_table.
foreign_server = foreignServerMapById_[server_id].get();
5170 void Catalog::reloadDictionariesFromDiskUnlocked() {
5171 std::string dictQuery(
5172 "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
5173 sqliteConnector_.query(dictQuery);
5174 auto numRows = sqliteConnector_.getNumRows();
5175 for (
size_t r = 0; r < numRows; ++r) {
5176 auto dictId = sqliteConnector_.getData<
int>(r, 0);
5177 auto dictName = sqliteConnector_.getData<
string>(r, 1);
5178 auto dictNBits = sqliteConnector_.getData<
int>(r, 2);
5179 auto is_shared = sqliteConnector_.getData<
bool>(r, 3);
5180 auto refcount = sqliteConnector_.getData<
int>(r, 4);
5183 DictRef dict_ref(currentDB_.dbId, dictId);
5184 DictDescriptor dd(dict_ref, dictName, dictNBits, is_shared, refcount, fname,
false);
5185 if (
auto it = dictDescriptorMapByRef_.find(dict_ref);
5186 it == dictDescriptorMapByRef_.end()) {
5187 dictDescriptorMapByRef_[dict_ref] = std::make_unique<DictDescriptor>(dd);
5194 std::list<ColumnDescriptor*> Catalog::sqliteGetColumnsForTableUnlocked(int32_t table_id) {
5195 std::list<ColumnDescriptor*> cds;
5198 std::list<std::unique_ptr<ColumnDescriptor>> smart_cds;
5199 std::string columnQuery(
5200 "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
5201 "is_notnull, compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
5202 "virtual_expr, is_deletedcol, default_value from mapd_columns WHERE tableid = " +
5204 sqliteConnector_.query(columnQuery);
5205 auto numRows = sqliteConnector_.getNumRows();
5206 int32_t skip_physical_cols = 0;
5207 for (
size_t r = 0; r < numRows; ++r) {
5208 std::unique_ptr<ColumnDescriptor> cd = std::make_unique<ColumnDescriptor>();
5209 cd->tableId = sqliteConnector_.getData<
int>(r, 0);
5210 cd->columnId = sqliteConnector_.getData<
int>(r, 1);
5211 cd->columnName = sqliteConnector_.getData<
string>(r, 2);
5212 cd->columnType.set_type((
SQLTypes)sqliteConnector_.getData<
int>(r, 3));
5213 cd->columnType.set_subtype((
SQLTypes)sqliteConnector_.getData<
int>(r, 4));
5214 cd->columnType.set_dimension(sqliteConnector_.getData<
int>(r, 5));
5215 cd->columnType.set_scale(sqliteConnector_.getData<
int>(r, 6));
5216 cd->columnType.set_notnull(sqliteConnector_.getData<
bool>(r, 7));
5217 cd->columnType.set_compression((
EncodingType)sqliteConnector_.getData<
int>(r, 8));
5218 cd->columnType.set_comp_param(sqliteConnector_.getData<
int>(r, 9));
5219 cd->columnType.set_size(sqliteConnector_.getData<
int>(r, 10));
5220 cd->chunks = sqliteConnector_.getData<
string>(r, 11);
5221 cd->isSystemCol = sqliteConnector_.getData<
bool>(r, 12);
5222 cd->isVirtualCol = sqliteConnector_.getData<
bool>(r, 13);
5223 cd->virtualExpr = sqliteConnector_.getData<
string>(r, 14);
5224 cd->isDeletedCol = sqliteConnector_.getData<
bool>(r, 15);
5225 if (sqliteConnector_.isNull(r, 16)) {
5226 cd->default_value = std::nullopt;
5228 cd->default_value = std::make_optional(sqliteConnector_.getData<
string>(r, 16));
5230 cd->isGeoPhyCol = skip_physical_cols-- > 0;
5231 cd->db_id = getDatabaseId();
5233 smart_cds.emplace_back(std::move(cd));
5237 for (
auto& cd : smart_cds) {
5238 cds.emplace_back(cd.release());
5245 "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
5246 "max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, "
5247 "num_shards, key_metainfo, userid, sort_column_id, storage_type, "
5248 "max_rollback_epochs, is_system_table from mapd_tables WHERE tableid = " +
5250 sqliteConnector_.query(query);
5251 auto numRows = sqliteConnector_.getNumRows();
5256 const auto& storage_type = sqliteConnector_.getData<
string>(0, 17);
5258 const auto& table_name = sqliteConnector_.getData<
string>(0, 1);
5259 LOG(
FATAL) <<
"Unable to read Catalog metadata: storage type is currently not a "
5260 "supported table option (table "
5261 << table_name <<
" [" << table_id <<
"] in database " << currentDB_.dbName
5268 std::unique_ptr<TableDescriptor> td;
5270 ? std::make_unique<foreign_storage::ForeignTable>()
5271 : std::make_unique<TableDescriptor>();
5273 td->tableId = sqliteConnector_.getData<
int>(0, 0);
5274 td->tableName = sqliteConnector_.getData<
string>(0, 1);
5275 td->nColumns = sqliteConnector_.getData<
int>(0, 2);
5276 td->isView = sqliteConnector_.getData<
bool>(0, 3);
5277 td->fragments = sqliteConnector_.getData<
string>(0, 4);
5279 sqliteConnector_.getData<
int>(0, 5));
5280 td->maxFragRows = sqliteConnector_.getData<
int>(0, 6);
5281 td->maxChunkSize = sqliteConnector_.getData<int64_t>(0, 7);
5282 td->fragPageSize = sqliteConnector_.getData<
int>(0, 8);
5283 td->maxRows = sqliteConnector_.getData<int64_t>(0, 9);
5284 td->partitions = sqliteConnector_.getData<
string>(0, 10);
5285 td->shardedColumnId = sqliteConnector_.getData<
int>(0, 11);
5286 td->shard = sqliteConnector_.getData<
int>(0, 12);
5287 td->nShards = sqliteConnector_.getData<
int>(0, 13);
5288 td->keyMetainfo = sqliteConnector_.getData<
string>(0, 14);
5289 td->userId = sqliteConnector_.getData<
int>(0, 15);
5290 td->sortedColumnId =
5291 sqliteConnector_.isNull(0, 16) ? 0 : sqliteConnector_.getData<
int>(0, 16);
5292 td->storageType = storage_type;
5293 td->maxRollbackEpochs = sqliteConnector_.getData<
int>(0, 18);
5294 td->is_system_table = sqliteConnector_.getData<
bool>(0, 19);
5295 td->hasDeletedCol =
false;
5298 updateViewUnlocked(*td);
5300 td->fragmenter =
nullptr;
5303 if (
auto ftd = dynamic_cast<foreign_storage::ForeignTable*>(td.get())) {
5304 reloadForeignTableUnlocked(*ftd);
5307 return td.release();
5310 void Catalog::setForeignServerProperty(
const std::string& server_name,
5311 const std::string& property,
5312 const std::string& value) {
5314 sqliteConnector_.query_with_text_params(
5315 "SELECT id from omnisci_foreign_servers where name = ?",
5316 std::vector<std::string>{server_name});
5317 auto num_rows = sqliteConnector_.getNumRows();
5320 auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
5321 sqliteConnector_.query_with_text_params(
5322 "UPDATE omnisci_foreign_servers SET " + property +
" = ? WHERE id = ?",
5325 throw std::runtime_error{
"Can not change property \"" +
property +
5326 "\" for foreign server." +
" Foreign server \"" +
5327 server_name +
"\" is not found."};
5331 void Catalog::createDefaultServersIfNotExists() {
5337 auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
5338 "default_local_delimited",
5342 local_csv_server->validate();
5343 createForeignServerNoLocks(std::move(local_csv_server),
true);
5345 #ifdef ENABLE_IMPORT_PARQUET
5346 auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
5347 "default_local_parquet",
5351 local_parquet_server->validate();
5352 createForeignServerNoLocks(std::move(local_parquet_server),
true);
5355 auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
5356 "default_local_regex_parsed",
5360 local_regex_parser_server->validate();
5361 createForeignServerNoLocks(std::move(local_regex_parser_server),
true);
5365 void Catalog::setForReload(
const int32_t tableId) {
5366 const auto td = getMetadataForTable(tableId);
5367 for (
const auto shard : getPhysicalTablesDescriptors(td)) {
5368 const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
5369 setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
5374 std::vector<std::string> Catalog::getTableDataDirectories(
5376 const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
5377 std::vector<std::string> file_paths;
5378 for (
auto shard : getPhysicalTablesDescriptors(td)) {
5380 global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
5381 boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
5382 file_paths.push_back(file_path.filename().string());
5389 bool file_name_only)
const {
5394 const DictRef dictRef(currentDB_.dbId, dictId);
5395 const auto dit = dictDescriptorMapByRef_.find(dictRef);
5396 CHECK(dit != dictDescriptorMapByRef_.end());
5398 if (file_name_only) {
5399 boost::filesystem::path file_path(dit->second->dictFolderPath);
5400 return file_path.filename().string();
5402 return dit->second->dictFolderPath;
5405 return std::string();
5409 std::vector<std::string> Catalog::getTableDictDirectories(
5411 std::vector<std::string> file_paths;
5412 for (
auto cd : getAllColumnMetadataForTable(td->
tableId,
false,
false,
true)) {
5413 auto file_base = getColumnDictDirectory(cd);
5414 if (!file_base.empty() &&
5415 file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
5416 file_paths.push_back(file_base);
5422 std::set<std::string> Catalog::getTableDictDirectoryPaths(int32_t table_id)
const {
5424 std::set<std::string> directory_paths;
5425 auto it = dict_columns_by_table_id_.find(table_id);
5426 if (it != dict_columns_by_table_id_.end()) {
5427 for (
auto cd : it->second) {
5428 auto directory_path = getColumnDictDirectory(cd,
false);
5429 if (!directory_path.empty()) {
5430 directory_paths.emplace(directory_path);
5434 return directory_paths;
5444 std::ostringstream os;
5445 os <<
"CREATE TABLE @T (";
5447 const auto cds = getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5449 std::vector<std::string> shared_dicts;
5450 std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5451 for (
const auto cd : cds) {
5452 if (!(cd->isSystemCol || cd->isVirtualCol)) {
5453 const auto& ti = cd->columnType;
5454 os << comma << quoteIfRequired(cd->columnName);
5463 os <<
" " << ti.get_type_name();
5465 os << (ti.get_notnull() ?
" NOT NULL" :
"");
5466 if (cd->default_value.has_value()) {
5467 os <<
" DEFAULT " << cd->getDefaultValueLiteral();
5469 if (ti.is_string() || (ti.is_array() && ti.get_subtype() ==
kTEXT)) {
5470 auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5473 const auto dict_id = ti.get_comp_param();
5474 const DictRef dict_ref(currentDB_.dbId, dict_id);
5475 const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
5476 CHECK(dict_it != dictDescriptorMapByRef_.end());
5477 const auto dict_name = dict_it->second->dictName;
5480 if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
5481 dict_root_cds[dict_name] = cd;
5482 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << (size * 8) <<
")";
5484 const auto dict_root_cd = dict_root_cds[dict_name];
5485 shared_dicts.push_back(
"SHARED DICTIONARY (" + cd->columnName +
5486 ") REFERENCES @T(" + dict_root_cd->columnName +
")");
5491 os <<
" ENCODING NONE";
5493 }
else if (ti.is_date_in_days() ||
5494 (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5495 const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5496 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << comp_param <<
")";
5497 }
else if (ti.is_geometry()) {
5499 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << ti.get_comp_param()
5502 os <<
" ENCODING NONE";
5509 if (shared_dicts.size()) {
5513 std::vector<std::string> with_options;
5518 with_options.emplace_back(td->
hasDeletedCol ?
"VACUUM='DELAYED'"
5519 :
"VACUUM='IMMEDIATE'");
5521 with_options.push_back(
"PARTITIONS='" + td->
partitions +
"'");
5526 os <<
", SHARD KEY(" << shard_cd->columnName <<
")";
5527 with_options.push_back(
5534 with_options.push_back(
"SORT_COLUMN='" + sort_cd->columnName +
"'");
5538 with_options.push_back(
"MAX_ROLLBACK_EPOCHS=" +
5549 return std::find_if(str.begin(), str.end(), [](
const unsigned char& ch) {
5550 return std::isspace(ch);
5556 std::string_view str,
5557 std::string_view chars =
"`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
5558 return str.find_first_of(chars) != std::string_view::npos;
5568 bool multiline_formatting,
5569 bool dump_defaults)
const {
5571 return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5574 std::optional<std::string> Catalog::dumpCreateTable(int32_t table_id,
5575 bool multiline_formatting,
5576 bool dump_defaults)
const {
5578 const auto td = getMutableMetadataForTableUnlocked(table_id);
5582 return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5586 bool multiline_formatting,
5587 bool dump_defaults)
const {
5589 std::ostringstream os;
5592 os <<
"CREATE FOREIGN TABLE " << td->
tableName <<
" (";
5593 }
else if (!td->
isView) {
5604 std::vector<std::string> additional_info;
5605 std::set<std::string> shared_dict_column_names;
5607 gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
5610 const auto cds = getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5611 std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5613 for (
const auto cd : cds) {
5614 if (!(cd->isSystemCol || cd->isVirtualCol)) {
5615 const auto& ti = cd->columnType;
5618 if (!multiline_formatting) {
5624 if (multiline_formatting) {
5628 os << quoteIfRequired(cd->columnName);
5637 os <<
" " << ti.get_type_name();
5639 os << (ti.get_notnull() ?
" NOT NULL" :
"");
5640 if (cd->default_value.has_value()) {
5641 os <<
" DEFAULT " << cd->getDefaultValueLiteral();
5643 if (shared_dict_column_names.find(cd->columnName) ==
5644 shared_dict_column_names.end()) {
5647 if (ti.is_string() || (ti.is_array() && ti.get_subtype() ==
kTEXT)) {
5648 auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5650 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << (size * 8) <<
")";
5652 os <<
" ENCODING NONE";
5654 }
else if (ti.is_date_in_days() ||
5655 (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5656 const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5657 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << comp_param <<
")";
5658 }
else if (ti.is_geometry()) {
5660 os <<
" ENCODING " << ti.get_compression_name() <<
"(" << ti.get_comp_param()
5663 os <<
" ENCODING NONE";
5670 if (additional_info.size()) {
5672 if (!multiline_formatting) {
5682 std::vector<std::string> with_options;
5684 if (multiline_formatting) {
5689 os <<
"SERVER " << foreign_table->foreign_server->name;
5692 for (
const auto& [option, value] : foreign_table->options) {
5693 with_options.emplace_back(option +
"='" + value +
"'");
5711 with_options.push_back(
"MAX_ROLLBACK_EPOCHS=" +
5714 if (!foreign_table && (dump_defaults || !td->
hasDeletedCol)) {
5715 with_options.emplace_back(td->
hasDeletedCol ?
"VACUUM='DELAYED'"
5716 :
"VACUUM='IMMEDIATE'");
5718 if (!foreign_table && !td->
partitions.empty()) {
5719 with_options.push_back(
"PARTITIONS='" + td->
partitions +
"'");
5721 if (!foreign_table && td->
nShards > 0) {
5724 with_options.push_back(
5731 with_options.push_back(
"SORT_COLUMN='" + sort_cd->columnName +
"'");
5734 if (!with_options.empty()) {
5735 if (!multiline_formatting) {
5746 std::string Catalog::dumpCreateServer(
const std::string&
name,
5747 bool multiline_formatting)
const {
5749 auto server_it = foreignServerMap_.find(name);
5750 if (server_it == foreignServerMap_.end()) {
5751 throw std::runtime_error(
"Foreign server " + name +
" does not exist.");
5753 auto server = server_it->second.get();
5754 std::ostringstream os;
5755 os <<
"CREATE SERVER " << name <<
" FOREIGN DATA WRAPPER " << server->data_wrapper_type;
5756 std::vector<std::string> with_options;
5757 for (
const auto& [option, value] : server->options) {
5758 with_options.emplace_back(option +
"='" + value +
"'");
5760 if (!with_options.empty()) {
5761 if (!multiline_formatting) {
5772 bool Catalog::validateNonExistentTableOrView(
const std::string&
name,
5773 const bool if_not_exists) {
5774 if (getMetadataForTable(name,
false)) {
5775 if (if_not_exists) {
5778 throw std::runtime_error(
"Table or View with name \"" + name +
"\" already exists.");
5783 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh()
const {
5785 std::vector<const TableDescriptor*>
tables;
5786 for (
auto entry : tableDescriptorMapById_) {
5787 auto table_descriptor = entry.second;
5790 CHECK(foreign_table);
5791 auto timing_type_entry = foreign_table->options.find(
5793 CHECK(timing_type_entry != foreign_table->options.end());
5795 if (timing_type_entry->second ==
5797 foreign_table->next_refresh_time <= current_time) {
5798 tables.emplace_back(foreign_table);
5805 void Catalog::updateForeignTableRefreshTimes(
const int32_t table_id) {
5808 CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5809 auto table_descriptor = tableDescriptorMapById_.find(table_id)->second;
5810 CHECK(table_descriptor);
5812 CHECK(foreign_table);
5815 sqliteConnector_.query_with_text_params(
5816 "UPDATE omnisci_foreign_tables SET last_refresh_time = ?, next_refresh_time = ? "
5817 "WHERE table_id = ?",
5821 foreign_table->last_refresh_time = last_refresh_time;
5822 foreign_table->next_refresh_time = next_refresh_time;
5827 void Catalog::setForeignTableOptions(
const std::string& table_name,
5829 bool clear_existing_options) {
5832 auto foreign_table = getForeignTableUnlocked(table_name);
5833 auto saved_options = foreign_table->options;
5834 foreign_table->populateOptionsMap(std::move(options_map), clear_existing_options);
5836 foreign_table->validateOptionValues();
5837 }
catch (
const std::exception& e) {
5840 foreign_table->options = saved_options;
5843 setForeignTableProperty(
5844 foreign_table,
"options", foreign_table->getOptionsAsJsonString());
5848 const std::string& property,
5849 const std::string& value) {
5851 sqliteConnector_.query_with_text_params(
5852 "SELECT table_id from omnisci_foreign_tables where table_id = ?",
5854 auto num_rows = sqliteConnector_.getNumRows();
5857 sqliteConnector_.query_with_text_params(
5858 "UPDATE omnisci_foreign_tables SET " + property +
" = ? WHERE table_id = ?",
5861 throw std::runtime_error{
"Can not change property \"" +
property +
5862 "\" for foreign table." +
" Foreign table \"" +
5867 std::string Catalog::quoteIfRequired(
const std::string& column_name)
const {
5878 void Catalog::gatherAdditionalInfo(std::vector<std::string>& additional_info,
5879 std::set<std::string>& shared_dict_column_names,
5883 auto scd = columnDescriptorMapById_.find(columnIdKey)->second;
5885 std::string txt =
"SHARD KEY (" + quoteIfRequired(scd->columnName) +
")";
5886 additional_info.emplace_back(txt);
5888 const auto cds = getAllColumnMetadataForTable(td->
tableId,
false,
false,
false);
5889 for (
const auto cd : cds) {
5890 if (!(cd->isSystemCol || cd->isVirtualCol)) {
5899 DictRef dict_ref(currentDB_.dbId, dictId);
5900 const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
5901 if (dictIt == dictDescriptorMapByRef_.end()) {
5902 LOG(
ERROR) <<
"missing dictionary " << dictId <<
" for table " << td->
tableName;
5906 const auto& dd = dictIt->second;
5907 if (dd->refcount > 1) {
5908 auto lowest_table = td->
tableId;
5909 auto lowest_column = cd->columnId;
5910 std::string lowest_column_name;
5913 for (
auto const& [key, val] : columnDescriptorMap_) {
5915 val->columnType.get_comp_param() == dictId &&
5916 !(val->tableId == td->
tableId && val->columnId == cd->columnId)) {
5917 if (val->tableId < lowest_table) {
5918 lowest_table = val->tableId;
5919 lowest_column = val->columnId;
5920 lowest_column_name = val->columnName;
5922 if (val->columnId < lowest_column) {
5923 lowest_column = val->columnId;
5924 lowest_column_name = val->columnName;
5928 if (lowest_table != td->
tableId || lowest_column != cd->columnId) {
5930 auto lowest_td = tableDescriptorMapById_.find(lowest_table)->second;
5932 std::string txt =
"SHARED DICTIONARY (" + quoteIfRequired(cd->columnName) +
5933 ") REFERENCES " + lowest_td->tableName +
"(" +
5934 quoteIfRequired(lowest_column_name) +
")";
5936 additional_info.emplace_back(txt);
5937 shared_dict_column_names.insert(cd->columnName);
5944 int32_t Catalog::createCustomExpression(
5945 std::unique_ptr<CustomExpression> custom_expression) {
5948 sqliteConnector_.query(
"BEGIN TRANSACTION");
5949 int32_t custom_expression_id{-1};
5951 auto data_source_type_str =
5952 CustomExpression::dataSourceTypeToString(custom_expression->data_source_type);
5953 auto data_source_id_str =
std::to_string(custom_expression->data_source_id);
5954 std::string custom_expr_select_query{
5955 "SELECT id FROM omnisci_custom_expressions WHERE name = ? and data_source_type = "
5956 "? and data_source_id = ? and is_deleted = ?"};
5957 std::vector<std::string> custom_expr_select_params{custom_expression->name,
5958 data_source_type_str,
5961 sqliteConnector_.query_with_text_params(custom_expr_select_query,
5962 custom_expr_select_params);
5963 if (sqliteConnector_.getNumRows() > 0) {
5964 throw std::runtime_error{
5965 "A custom expression with the given "
5966 "name and data source already exists."};
5968 sqliteConnector_.query_with_text_params(
5969 "INSERT INTO omnisci_custom_expressions(name, expression_json, "
5970 "data_source_type, data_source_id, is_deleted) VALUES (?,?,?,?,?)",
5971 std::vector<std::string>{custom_expression->name,
5972 custom_expression->expression_json,
5973 data_source_type_str,
5976 sqliteConnector_.query_with_text_params(custom_expr_select_query,
5977 custom_expr_select_params);
5978 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
5979 custom_expression->id = sqliteConnector_.getData<int32_t>(0, 0);
5980 custom_expression_id = custom_expression->id;
5981 CHECK(custom_expr_map_by_id_.find(custom_expression->id) ==
5982 custom_expr_map_by_id_.end());
5983 custom_expr_map_by_id_[custom_expression->id] = std::move(custom_expression);
5984 }
catch (std::exception& e) {
5985 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
5988 sqliteConnector_.query(
"END TRANSACTION");
5990 return custom_expression_id;
5995 auto it = custom_expr_map_by_id_.find(custom_expression_id);
5996 if (it != custom_expr_map_by_id_.end()) {
5997 return it->second.get();
6002 const std::unique_ptr<const CustomExpression> Catalog::getCustomExpressionFromStorage(
6003 int32_t custom_expression_id) {
6005 sqliteConnector_.query_with_text_params(
6006 "SELECT id, name, expression_json, data_source_type, data_source_id, "
6007 "is_deleted FROM omnisci_custom_expressions WHERE id = ?",
6008 std::vector<std::string>{
to_string(custom_expression_id)});
6009 if (sqliteConnector_.getNumRows() > 0) {
6010 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
6011 return getCustomExpressionFromConnector(0);
6016 std::vector<const CustomExpression*> Catalog::getCustomExpressionsForUser(
6018 std::vector<const CustomExpression*> all_custom_expressions;
6023 for (
const auto& [
id, custom_expression] : custom_expr_map_by_id_) {
6024 all_custom_expressions.emplace_back(custom_expression.get());
6028 std::vector<const CustomExpression*> filtered_custom_expressions;
6029 for (
const auto custom_expression : all_custom_expressions) {
6030 CHECK(custom_expression->data_source_type == DataSourceType::TABLE);
6032 db_object.loadKey(*
this);
6034 if (SysCatalog::instance().checkPrivileges(user, {db_object})) {
6035 filtered_custom_expressions.emplace_back(custom_expression);
6038 return filtered_custom_expressions;
6041 void Catalog::updateCustomExpression(int32_t custom_expression_id,
6042 const std::string& expression_json) {
6045 auto it = custom_expr_map_by_id_.find(custom_expression_id);
6046 if (it == custom_expr_map_by_id_.end() || it->second->is_deleted) {
6047 throw std::runtime_error{
"Custom expression with id \"" +
6050 auto old_expression_json = it->second->expression_json;
6051 sqliteConnector_.query(
"BEGIN TRANSACTION");
6053 sqliteConnector_.query_with_text_params(
6054 "SELECT id FROM omnisci_custom_expressions WHERE id = ?",
6056 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
6057 sqliteConnector_.query_with_text_params(
6058 "UPDATE omnisci_custom_expressions SET expression_json = ? WHERE id = ?",
6059 std::vector<std::string>{expression_json,
std::to_string(custom_expression_id)});
6060 it->second->expression_json = expression_json;
6061 }
catch (std::exception& e) {
6062 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6063 it->second->expression_json = old_expression_json;
6066 sqliteConnector_.query(
"END TRANSACTION");
6069 void Catalog::deleteCustomExpressions(
const std::vector<int32_t>& custom_expression_ids,
6070 bool do_soft_delete) {
6074 std::vector<int32_t> invalid_ids;
6075 for (
const auto id : custom_expression_ids) {
6076 if (custom_expr_map_by_id_.find(
id) == custom_expr_map_by_id_.end()) {
6077 invalid_ids.emplace_back(
id);
6080 if (!invalid_ids.empty()) {
6081 throw std::runtime_error{
"Custom expressions with ids: " +
join(invalid_ids,
",") +
6084 sqliteConnector_.query(
"BEGIN TRANSACTION");
6086 for (
const auto id : custom_expression_ids) {
6087 sqliteConnector_.query_with_text_params(
6088 "SELECT id FROM omnisci_custom_expressions WHERE id = ?",
6090 CHECK_EQ(sqliteConnector_.getNumRows(),
static_cast<size_t>(1));
6091 if (do_soft_delete) {
6092 sqliteConnector_.query_with_text_params(
6093 "UPDATE omnisci_custom_expressions SET is_deleted = ? WHERE id = ?",
6096 sqliteConnector_.query_with_text_params(
6097 "DELETE FROM omnisci_custom_expressions WHERE id = ?",
6102 for (
const auto id : custom_expression_ids) {
6103 if (do_soft_delete) {
6104 auto it = custom_expr_map_by_id_.find(
id);
6105 CHECK(it != custom_expr_map_by_id_.end());
6106 it->second->is_deleted =
true;
6108 custom_expr_map_by_id_.erase(
id);
6111 }
catch (std::exception& e) {
6112 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6115 sqliteConnector_.query(
"END TRANSACTION");
6121 if (!SysCatalog::instance().getMetadataForUser(user_name, user)) {
6122 throw std::runtime_error{
"User with username \"" + user_name +
"\" does not exist."};
6129 int32_t new_owner_id,
6130 const std::map<int32_t, std::vector<DBObject>>& old_owner_db_objects) {
6131 std::stringstream
result;
6132 for (
const auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6133 result <<
"db_id: " << db_id <<
", new_owner_user_id: " << new_owner_id
6134 <<
", old_owner_user_id: " << old_owner_id <<
", db_objects: [";
6135 bool first_object{
true};
6136 for (
const auto& db_object : db_objects) {
6138 first_object =
false;
6142 result <<
"\"object_id: " << db_object.getObjectKey().objectId
6147 return result.str();
6154 std::map<int32_t, std::vector<DBObject>>& db_objects) {
6155 DBObject db_object{object_name, object_type};
6157 db_objects[user_id].emplace_back(db_object);
6161 void Catalog::reassignOwners(
const std::set<std::string>& old_owners,
6162 const std::string& new_owner) {
6163 CHECK(!old_owners.empty());
6165 std::map<int32_t, std::string> old_owners_user_name_by_id;
6166 std::set<int32_t> old_owner_ids;
6167 for (
const auto& old_owner : old_owners) {
6169 if (old_owner_id != new_owner_id) {
6170 old_owner_ids.emplace(old_owner_id);
6171 old_owners_user_name_by_id[old_owner_id] = old_owner;
6177 if (old_owner_ids.empty()) {
6181 std::map<int32_t, std::vector<DBObject>> old_owner_db_objects;
6185 sqliteConnector_.query(
"BEGIN TRANSACTION");
6187 for (
const auto old_user_id : old_owner_ids) {
6188 sqliteConnector_.query_with_text_params(
6189 "UPDATE mapd_tables SET userid = ? WHERE userid = ?",
6193 sqliteConnector_.query_with_text_params(
6194 "UPDATE mapd_dashboards SET userid = ? WHERE userid = ?",
6199 sqliteConnector_.query_with_text_params(
6200 "UPDATE omnisci_foreign_servers SET owner_user_id = ? "
6201 "WHERE owner_user_id = ?",
6207 for (
const auto& [table_name, td] : tableDescriptorMap_) {
6214 old_owner_db_objects);
6220 old_owner_db_objects);
6222 td->userId = new_owner_id;
6227 for (
auto it = dashboardDescriptorMap_.begin();
6228 it != dashboardDescriptorMap_.end();) {
6229 if (
auto dashboard = it->second;
6233 old_owner_db_objects[dashboard->userId].emplace_back(db_object);
6239 dashboard->dashboardName};
6242 dashboard->dashboardName};
6243 CHECK(dashboardDescriptorMap_.find(new_key) == dashboardDescriptorMap_.end());
6244 new_owner_dashboard_map[new_key] = dashboard;
6245 dashboard->userId = new_owner_id;
6246 dashboard->user = new_owner;
6247 it = dashboardDescriptorMap_.erase(it);
6252 dashboardDescriptorMap_.merge(new_owner_dashboard_map);
6255 for (
const auto& [server_name, server] : foreignServerMap_) {
6261 old_owner_db_objects);
6262 server->user_id = new_owner_id;
6268 for (
auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6269 for (
auto& db_object : db_objects) {
6270 db_object.loadKey(*
this);
6271 CHECK_EQ(db_object.getOwner(), new_owner_id);
6272 const auto& object_key = db_object.getObjectKey();
6273 CHECK_EQ(object_key.dbId, getDatabaseId());
6277 }
catch (std::exception& e) {
6278 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6279 restoreOldOwnersInMemory(
6280 old_owners_user_name_by_id, old_owner_db_objects, new_owner_id);
6283 sqliteConnector_.query(
"END TRANSACTION");
6287 SysCatalog::instance().reassignObjectOwners(
6288 old_owner_db_objects, new_owner_id, *
this);
6289 }
catch (std::exception& e) {
6290 restoreOldOwners(old_owners_user_name_by_id, old_owner_db_objects, new_owner_id);
6295 void Catalog::restoreOldOwners(
6296 const std::map<int32_t, std::string>& old_owners_user_name_by_id,
6297 const std::map<int32_t, std::vector<DBObject>>& old_owner_db_objects,
6298 int32_t new_owner_id) {
6301 sqliteConnector_.query(
"BEGIN TRANSACTION");
6303 for (
const auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6304 for (
const auto& db_object : db_objects) {
6305 auto object_id = db_object.getObjectKey().objectId;
6307 std::vector<std::string> query_params{
std::to_string(old_owner_id),
6310 auto object_type = db_object.getType();
6313 sqliteConnector_.query_with_text_params(
6314 "UPDATE mapd_tables SET userid = ? WHERE userid = ? AND tableid = ?",
6317 sqliteConnector_.query_with_text_params(
6318 "UPDATE mapd_dashboards SET userid = ? WHERE userid = ? AND id = ?",
6322 sqliteConnector_.query_with_text_params(
6323 "UPDATE omnisci_foreign_servers SET owner_user_id = ? "
6324 "WHERE owner_user_id = ? AND id = ?",
6327 UNREACHABLE() <<
"Unexpected DB object type: " <<
static_cast<int>(object_type);
6331 restoreOldOwnersInMemory(
6332 old_owners_user_name_by_id, old_owner_db_objects, new_owner_id);
6333 }
catch (std::exception& e) {
6334 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
6336 <<
"Unable to restore database objects ownership after an error occurred. "
6337 "Database object ownership information may be in an inconsistent state. " +
6339 getDatabaseId(), new_owner_id, old_owner_db_objects);
6341 sqliteConnector_.query(
"END TRANSACTION");
6344 void Catalog::restoreOldOwnersInMemory(
6345 const std::map<int32_t, std::string>& old_owners_user_name_by_id,
6346 const std::map<int32_t, std::vector<DBObject>>& old_owner_db_objects,
6347 int32_t new_owner_id) {
6348 for (
const auto& [old_owner_id, db_objects] : old_owner_db_objects) {
6349 for (
const auto& db_object : db_objects) {
6350 auto object_id = db_object.getObjectKey().objectId;
6351 auto object_type = db_object.getType();
6354 auto it = tableDescriptorMapById_.find(object_id);
6355 CHECK(it != tableDescriptorMapById_.end());
6357 it->second->userId = old_owner_id;
6359 auto it = dashboardDescriptorMap_.find(
std::to_string(new_owner_id) +
":" +
6360 db_object.getName());
6361 CHECK(it != dashboardDescriptorMap_.end());
6363 it->second->userId = old_owner_id;
6364 auto user_name_it = old_owners_user_name_by_id.find(old_owner_id);
6365 CHECK(user_name_it != old_owners_user_name_by_id.end());
6366 it->second->user = user_name_it->second;
6368 db_object.getName()] = it->second;
6369 dashboardDescriptorMap_.erase(it);
6371 auto it = foreignServerMapById_.find(object_id);
6372 CHECK(it != foreignServerMapById_.end());
6374 it->second->user_id = old_owner_id;
6376 UNREACHABLE() <<
"Unexpected DB object type: " <<
static_cast<int>(object_type);
6382 void Catalog::conditionallyInitializeSystemObjects() {
6384 initializeSystemServers();
6385 initializeSystemTables();
6389 bool Catalog::isInfoSchemaDb()
const {
6393 void Catalog::initializeSystemServers() {
6394 createSystemTableServer(CATALOG_SERVER_NAME,
6396 createSystemTableServer(MEMORY_STATS_SERVER_NAME,
6398 createSystemTableServer(STORAGE_STATS_SERVER_NAME,
6400 createSystemTableServer(EXECUTOR_STATS_SERVER_NAME,
6402 createSystemTableServer(ML_METADATA_SERVER_NAME,
6412 createSystemTableServer(LOGS_SERVER_NAME,
6414 log_server_options);
6425 sql_type_info.set_size(-1);
6426 return sql_type_info;
6432 sql_type_info.set_comp_param(32);
6433 return sql_type_info;
6438 foreign_table.
options[ForeignTable::REFRESH_TIMING_TYPE_KEY] =
6439 ForeignTable::MANUAL_REFRESH_TIMING_TYPE;
6440 foreign_table.
options[ForeignTable::REFRESH_UPDATE_TYPE_KEY] =
6441 ForeignTable::APPEND_REFRESH_UPDATE_TYPE;
6443 foreign_table.
options[AbstractFileStorageDataWrapper::ALLOW_FILE_ROLL_OFF_KEY] =
"TRUE";
6450 foreign_table.
options[RegexFileBufferParser::LINE_START_REGEX_KEY] =
6451 "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}";
6453 foreign_table.
options[AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY] =
6454 ".*heavydb\\.INFO\\..*";
6468 for (
const auto& table_name : table_names) {
6476 void Catalog::initializeSystemTables() {
6477 initializeUsersSystemTable();
6478 initializeDatabasesSystemTable();
6479 initializePermissionsSystemTable();
6480 initializeRolesSystemTable();
6481 initializeTablesSystemTable();
6482 initializeDashboardsSystemTable();
6483 initializeRoleAssignmentsSystemTable();
6484 initializeMemorySummarySystemTable();
6485 initializeMemoryDetailsSystemTable();
6486 initializeStorageDetailsSystemTable();
6487 initializeExecutorResourcePoolSummarySystemTable();
6488 initializeMLModelMetadataSystemTable();
6491 initializeServerLogsSystemTables();
6492 initializeRequestLogsSystemTables();
6499 if (getForeignServer(LOGS_SERVER_NAME)) {
6500 dropForeignServer(LOGS_SERVER_NAME);
6505 void Catalog::initializeUsersSystemTable() {
6506 auto [foreign_table, columns] =
6508 CATALOG_SERVER_NAME,
6509 {{
"user_id", {
kINT}},
6512 {
"default_db_id", {
kINT}},
6516 recreateSystemTableIfUpdated(foreign_table, columns);
6519 void Catalog::initializeDatabasesSystemTable() {
6520 auto [foreign_table, columns] =
6522 CATALOG_SERVER_NAME,
6523 {{
"database_id", {
kINT}},
6525 {
"owner_id", {
kINT}},
6528 recreateSystemTableIfUpdated(foreign_table, columns);
6531 void Catalog::initializePermissionsSystemTable() {
6532 auto [foreign_table, columns] =
6534 CATALOG_SERVER_NAME,
6537 {
"database_id", {
kINT}},
6540 {
"object_id", {
kINT}},
6541 {
"object_owner_id", {
kINT}},
6546 recreateSystemTableIfUpdated(foreign_table, columns);
6549 void Catalog::initializeRolesSystemTable() {
6550 auto [foreign_table, columns] =
6552 CATALOG_SERVER_NAME,
6555 recreateSystemTableIfUpdated(foreign_table, columns);
6558 void Catalog::initializeTablesSystemTable() {
6559 auto [foreign_table, columns] =
6561 CATALOG_SERVER_NAME,
6562 {{
"database_id", {
kINT}},
6564 {
"table_id", {
kINT}},
6566 {
"owner_id", {
kINT}},
6568 {
"column_count", {
kINT}},
6571 {
"max_fragment_size", {
kINT}},
6572 {
"max_chunk_size", {
kBIGINT}},
6573 {
"fragment_page_size", {
kINT}},
6575 {
"max_rollback_epochs", {
kINT}},
6576 {
"shard_count", {
kINT}},
6579 recreateSystemTableIfUpdated(foreign_table, columns);
6582 void Catalog::initializeDashboardsSystemTable() {
6583 auto [foreign_table, columns] =
6585 CATALOG_SERVER_NAME,
6586 {{
"database_id", {
kINT}},
6588 {
"dashboard_id", {
kINT}},
6590 {
"owner_id", {
kINT}},
6595 recreateSystemTableIfUpdated(foreign_table, columns);
6598 void Catalog::initializeRoleAssignmentsSystemTable() {
6599 auto [foreign_table, columns] = getSystemTableSchema(
6601 CATALOG_SERVER_NAME,
6604 recreateSystemTableIfUpdated(foreign_table, columns);
6607 void Catalog::initializeMemorySummarySystemTable() {
6608 auto [foreign_table, columns] =
6610 MEMORY_STATS_SERVER_NAME,
6612 {
"device_id", {
kINT}},
6614 {
"max_page_count", {
kBIGINT}},
6616 {
"allocated_page_count", {
kBIGINT}},
6617 {
"used_page_count", {
kBIGINT}},
6618 {
"free_page_count", {
kBIGINT}}},
6620 recreateSystemTableIfUpdated(foreign_table, columns);
6623 void Catalog::initializeMemoryDetailsSystemTable() {
6624 auto [foreign_table, columns] =
6626 MEMORY_STATS_SERVER_NAME,
6628 {
"database_id", {
kINT}},
6630 {
"table_id", {
kINT}},
6632 {
"column_id", {
kINT}},
6635 {
"device_id", {
kINT}},
6640 {
"slab_id", {
kINT}},
6642 {
"last_touch_epoch", {
kBIGINT}}},
6644 recreateSystemTableIfUpdated(foreign_table, columns);
6647 void Catalog::initializeStorageDetailsSystemTable() {
6648 auto [foreign_table, columns] =
6650 STORAGE_STATS_SERVER_NAME,
6652 {
"database_id", {
kINT}},
6654 {
"table_id", {
kINT}},
6657 {
"epoch_floor", {
kINT}},
6658 {
"fragment_count", {
kINT}},
6659 {
"shard_id", {
kINT}},
6660 {
"data_file_count", {
kINT}},
6661 {
"metadata_file_count", {
kINT}},
6662 {
"total_data_file_size", {
kBIGINT}},
6663 {
"total_data_page_count", {
kBIGINT}},
6664 {
"total_free_data_page_count", {
kBIGINT}},
6665 {
"total_metadata_file_size", {
kBIGINT}},
6666 {
"total_metadata_page_count", {
kBIGINT}},
6667 {
"total_free_metadata_page_count", {
kBIGINT}},
6668 {
"total_dictionary_data_file_size", {
kBIGINT}}},
6670 recreateSystemTableIfUpdated(foreign_table, columns);
6673 void Catalog::initializeExecutorResourcePoolSummarySystemTable() {
6674 auto [foreign_table, columns] =
6676 EXECUTOR_STATS_SERVER_NAME,
6677 {{
"total_cpu_slots", {
kINT}},
6678 {
"total_gpu_slots", {
kINT}},
6679 {
"total_cpu_result_mem", {
kBIGINT}},
6680 {
"total_cpu_buffer_pool_mem", {
kBIGINT}},
6681 {
"total_gpu_buffer_pool_mem", {
kBIGINT}},
6682 {
"allocated_cpu_slots", {
kINT}},
6683 {
"allocated_gpu_slots", {
kINT}},
6684 {
"allocated_cpu_result_mem", {
kBIGINT}},
6685 {
"allocated_cpu_buffer_pool_mem", {
kBIGINT}},
6686 {
"allocated_gpu_buffer_pool_mem", {
kBIGINT}},
6687 {
"allocated_cpu_buffers", {
kINT}},
6688 {
"allocated_gpu_buffers", {
kINT}},
6689 {
"allocated_temp_cpu_buffer_pool_mem", {
kBIGINT}},
6690 {
"allocated_temp_gpu_buffer_pool_mem", {
kBIGINT}},
6691 {
"total_requests", {
kBIGINT}},
6692 {
"outstanding_requests", {
kINT}},
6693 {
"outstanding_cpu_slots_requests", {
kINT}},
6694 {
"outstanding_gpu_slots_requests", {
kINT}},
6695 {
"outstanding_cpu_result_mem_requests", {
kINT}},
6696 {
"outstanding_cpu_buffer_pool_mem_requests", {
kINT}},
6697 {
"outstanding_gpu_buffer_pool_mem_requests", {
kINT}}},
6699 recreateSystemTableIfUpdated(foreign_table, columns);
6702 void Catalog::initializeMLModelMetadataSystemTable() {
6703 auto [foreign_table, columns] =
6705 ML_METADATA_SERVER_NAME,
6711 {
"num_logical_features", {
kBIGINT}},
6712 {
"num_physical_features", {
kBIGINT}},
6713 {
"num_categorical_features", {
kBIGINT}},
6714 {
"num_numeric_features", {
kBIGINT}},
6715 {
"train_fraction", {
kDOUBLE}},
6716 {
"eval_fraction", {
kDOUBLE}}},
6718 recreateSystemTableIfUpdated(foreign_table, columns);
6721 void Catalog::initializeServerLogsSystemTables() {
6722 auto [foreign_table, columns] =
6728 {
"process_id", {
kINT}},
6729 {
"query_id", {
kINT}},
6730 {
"thread_id", {
kINT}},
6738 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6739 "^([^\\s]+)\\s(\\w)\\s(\\d+)\\s(\\d+)\\s(\\d+)\\s([^\\s]+)\\s(.+)$";
6740 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6746 void Catalog::initializeRequestLogsSystemTables() {
6747 auto [foreign_table, columns] =
6752 {
"process_id", {
kINT}},
6753 {
"query_id", {
kINT}},
6754 {
"thread_id", {
kINT}},
6757 {
"request_duration_ms", {
kBIGINT}},
6763 {
"dashboard_id", {
kINT}},
6765 {
"chart_id", {
kINT}},
6766 {
"execution_time_ms", {
kBIGINT}},
6767 {
"total_time_ms", {
kBIGINT}}},
6773 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6774 "^([^\\s]+)\\s(\\w)\\s(\\d+)\\s(\\d+)\\s(\\d+)\\s([^\\s]+)\\s(?:stdlog)\\s(\\w+)"
6775 "\\s(?:\\d+)\\s(\\d+)\\s(\\w+)\\s([^\\s]+)\\s([^\\s]+)\\s(\\{[^\\}]+\\})\\s(\\{[^"
6777 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6783 void Catalog::initializeWebServerLogsSystemTables() {
6784 auto [foreign_table, columns] =
6793 foreign_table.options[AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY] =
6794 ".*heavy_web_server.*ALL\\..*";
6798 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6799 "^time=\"([^\"]+)\"\\slevel=([^\\s]+)\\smsg=\"([^\"]+)\"$";
6800 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6806 void Catalog::initializeWebServerAccessLogsSystemTables() {
6807 auto [foreign_table, columns] =
6815 {
"response_size", {
kBIGINT}}},
6819 foreign_table.options[AbstractFileStorageDataWrapper::REGEX_PATH_FILTER_KEY] =
6820 ".*heavy_web_server.*ACCESS\\..*";
6824 foreign_table.options[RegexFileBufferParser::LINE_REGEX_KEY] =
6825 "^(\\d+\\.\\d+\\.\\d+\\.\\d+)\\s+\\-\\s+\\-\\s+\\[([^\\]]+)\\]\\s+\"(\\w+)\\s+([^"
6826 "\\s]+)\\s+HTTP\\/1\\.1\"\\s+(\\d+)\\s+(\\d+)$";
6827 if (recreateSystemTableIfUpdated(foreign_table, columns)) {
6833 void Catalog::createSystemTableServer(
const std::string& server_name,
6834 const std::string& data_wrapper_type,
6836 auto server = std::make_unique<foreign_storage::ForeignServer>(
6839 auto stored_server = getForeignServer(server_name);
6840 if (stored_server && stored_server->options != server->options) {
6842 auto tables = getAllForeignTablesForForeignServer(stored_server->id);
6843 for (
const auto table :
tables) {
6844 LOG(
INFO) <<
"Dropping existing \"" << table->tableName <<
"\" system table for \""
6845 << server_name <<
"\" foreign server.";
6848 LOG(
INFO) <<
"Dropping existing \"" << server_name
6849 <<
"\" system table foreign server.";
6850 dropForeignServer(server_name);
6851 stored_server =
nullptr;
6853 if (!stored_server) {
6854 LOG(
INFO) <<
"Creating a new \"" << server_name <<
"\" system table foreign server.";
6855 createForeignServer(std::move(server),
true);
6859 std::pair<foreign_storage::ForeignTable, std::list<ColumnDescriptor>>
6860 Catalog::getSystemTableSchema(
6861 const std::string& table_name,
6862 const std::string& server_name,
6863 const std::vector<std::pair<std::string, SQLTypeInfo>>& column_type_by_name,
6864 bool is_in_memory_system_table) {
6867 foreign_table.
nColumns = column_type_by_name.size();
6868 foreign_table.
isView =
false;
6886 list<ColumnDescriptor> columns;
6887 for (
const auto& [column_name, column_type] : column_type_by_name) {
6888 columns.emplace_back();
6889 auto& cd = columns.back();
6890 cd.columnName = column_name;
6891 cd.columnType = column_type;
6892 cd.isSystemCol =
false;
6893 cd.isVirtualCol =
false;
6895 return {foreign_table, columns};
6899 const std::list<ColumnDescriptor>& columns) {
6900 auto stored_td = getMetadataForTable(foreign_table.
tableName,
false);
6901 bool should_recreate{
false};
6903 auto stored_foreign_table =
6905 CHECK(stored_foreign_table);
6906 if (stored_foreign_table->foreign_server->name !=
6908 stored_foreign_table->options != foreign_table.
options) {
6909 should_recreate =
true;
6911 auto stored_columns =
6912 getAllColumnMetadataForTable(stored_td->tableId,
false,
false,
false);
6913 if (stored_columns.size() != columns.size()) {
6914 should_recreate =
true;
6916 auto it_1 = stored_columns.begin();
6917 auto it_2 = columns.begin();
6918 for (; it_1 != stored_columns.end() && it_2 != columns.end(); it_1++, it_2++) {
6922 if ((*it_1)->columnName != it_2->columnName ||
6923 (*it_1)->columnType.get_type() != it_2->columnType.get_type() ||
6924 (*it_1)->columnType.get_subtype() != it_2->columnType.get_subtype() ||
6925 (*it_1)->columnType.get_dimension() != it_2->columnType.get_dimension() ||
6926 (*it_1)->columnType.get_scale() != it_2->columnType.get_scale() ||
6927 (*it_1)->columnType.get_notnull() != it_2->columnType.get_notnull() ||
6928 (*it_1)->columnType.get_compression() !=
6929 it_2->columnType.get_compression() ||
6930 (*it_1)->columnType.get_size() != it_2->columnType.get_size()) {
6931 should_recreate =
true;
6938 should_recreate =
true;
6940 if (should_recreate) {
6943 <<
"\" system table.";
6944 deleteTableCatalogMetadata(stored_td, {stored_td});
6946 LOG(
INFO) <<
"Creating a new \"" << foreign_table.
tableName <<
"\" system table.";
6947 createTable(foreign_table, columns, {},
true);
6949 return should_recreate;
6955 CHECK(table_name_opt.has_value());
6956 return table_name_opt.value();
6965 auto dict_it = dict_columns_by_table_id_.find(cd->
tableId);
6966 if (dict_it != dict_columns_by_table_id_.end()) {
6967 auto& set = dict_it->second;
6968 for (
auto it = set.begin(); it != set.end(); ++it) {
6969 if ((*it)->columnId == cd->
columnId) {
6976 dict_columns_by_table_id_[cd->
tableId].emplace(cd);
6979 removeColumnDescriptor(old_cd);
6980 addColumnDescriptor(cd);
6988 dict_columns_by_table_id_[cd->
tableId].emplace(cd);
6994 dict_columns_by_table_id_[cd->
tableId].erase(cd);
7001 template <
typename F,
typename... Args>
7002 void Catalog::execInTransaction(F&&
f, Args&&...
args) {
7005 sqliteConnector_.query(
"BEGIN TRANSACTION");
7007 (this->*
f)(std::forward<Args>(
args)...);
7008 }
catch (std::exception&) {
7009 sqliteConnector_.query(
"ROLLBACK TRANSACTION");
7012 sqliteConnector_.query(
"END TRANSACTION");
std::lock_guard< T > lock_guard
static constexpr const char * MEMORY_DETAILS_SYS_TABLE_NAME
bool contains(const T &container, const U &element)
int32_t maxRollbackEpochs
int64_t get_next_refresh_time(const foreign_storage::ForeignTable &foreign_table)
const Parser::SharedDictionaryDef compress_reference_path(Parser::SharedDictionaryDef cur_node, const std::vector< Parser::SharedDictionaryDef > &shared_dict_defs)
static std::set< std::string > reserved_keywords
void removeFromColumnMap(ColumnDescriptor *cd)
static constexpr const char * WS_SERVER_ACCESS_LOGS_SYS_TABLE_NAME
HOST DEVICE SQLTypes get_subtype() const
void set_compression(EncodingType c)
const int MAPD_TEMP_TABLE_START_ID
std::vector< int > ChunkKey
std::vector< std::unique_ptr< lockmgr::AbstractLockContainer< const TableDescriptor * >>> LockedTableDescriptors
static constexpr const char * SERVER_LOGS_SYS_TABLE_NAME
specifies the content in-memory of a row in the link metadata view
static constexpr const char * EXECUTOR_RESOURCE_POOL_SUMMARY_SYS_TABLE_NAME
int64_t next_refresh_time
static constexpr char const * REGEX_PARSER
void add_db_object(const std::string &object_name, DBObjectType object_type, int32_t user_id, const AccessPrivileges &privileges, std::map< int32_t, std::vector< DBObject >> &db_objects)
void CheckAndExecuteMigrations()
const std::string kDataDirectoryName
std::string dictFolderPath
std::tuple< int, std::string > ColumnKey
HOST DEVICE int get_size() const
static TableSchemaLockMgr & instance()
~Catalog()
Destructor - deletes all ColumnDescriptor and TableDescriptor structures which were allocated on the ...
SQLTypeInfo get_encoded_text_type()
SQLTypeInfo get_var_array_type(SQLTypes type)
T getData(const int row, const int col)
class for a per-database catalog. also includes metadata for the current database and the current use...
ColumnDescriptorMap columnDescriptorMap_
static const AccessPrivileges ALL_DATABASE
static TimeT::rep execution(F func, Args &&...args)
bool g_enable_logs_system_tables
void updateFrontendViewAndLinkUsers()
static bool dropRenderGroupColumns(const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, Catalog_Namespace::Catalog *cat)
virtual void query_with_text_params(std::string const &query_only)
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
int64_t last_refresh_time
static const std::string LOCAL_FILE_STORAGE_TYPE
static constexpr char const * INTERNAL_STORAGE_STATS
void set_common_log_system_table_options(foreign_storage::ForeignTable &foreign_table)
void clearForTablePrefix(const ChunkKey &)
static constexpr const char * DASHBOARDS_SYS_TABLE_NAME
SqliteConnector sqliteConnector_
const DBMetadata currentDB_
DictDescriptorMapById dictDescriptorMapByRef_
HOST DEVICE int get_scale() const
PersistentStorageMgr * getPersistentStorageMgr() const
void updateDictionarySchema()
#define DEFAULT_MAX_CHUNK_SIZE
void updateDictionaryNames()
std::shared_ptr< Data_Namespace::DataMgr > dataMgr_
This file includes the class specification for the FILE manager (FileMgr), and related data structure...
thread_holding_write_lock()
HOST DEVICE void set_subtype(SQLTypes st)
DEVICE void sort(ARGS &&...args)
std::shared_ptr< Catalog > getDummyCatalog()
const std::string & get_foreign_table() const
virtual void query(const std::string &queryString)
fs::path get_log_dir_path()
void setObjectKey(const DBObjectKey &objectKey)
auto table_json_filepath(const std::string &base_path, const std::string &db_name)
const int MAPD_TEMP_DICT_START_ID
The InsertOrderFragmenter is a child class of AbstractFragmenter, and fragments data in insert order...
void checkDateInDaysColumnMigration()
void clear_cached_table_data(const Data_Namespace::DataMgr *data_mgr, int32_t db_id, int32_t table_id)
int32_t max_rollback_epochs
void setPrivileges(const AccessPrivileges &privs)
bool contains_spaces(std::string_view str)
returns true if the string contains one or more spaces
void reloadCatalogMetadataUnlocked(const std::map< int32_t, std::string > &user_name_by_user_id)
HOST DEVICE SQLTypes get_type() const
const std::string kInfoSchemaDbName
dsqliteMutex_(std::make_unique< heavyai::DistributedSharedMutex >(std::filesystem::path(basePath_)/shared::kLockfilesDirectoryName/shared::kCatalogDirectoryName/(currentDB_.dbName+".sqlite.lockfile")))
static const AccessPrivileges SELECT_FROM_TABLE
std::vector< int > columnIdBySpi_
static constexpr char const * INTERNAL_CATALOG
void updateFrontendViewSchema()
DeletedColumnPerTableMap deletedColumnPerTable_
const ColumnDescriptor * get_foreign_col(const Catalog &cat, const Parser::SharedDictionaryDef &shared_dict_def)
ColumnDescriptorMapById columnDescriptorMapById_
DBObject * findDbObject(const DBObjectKey &objectKey, bool only_direct) const
std::shared_ptr< std::mutex > mutex_
static constexpr std::array< char const *, 5 > IN_MEMORY_DATA_WRAPPERS
bool contains_sql_reserved_chars(std::string_view str, std::string_view chars="`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?")
returns true if the string contains one or more OmniSci SQL reserved characters
bool is_in_memory_system_table
std::string dashboardMetadata
static const AccessPrivileges ALL_VIEW
void grantRoleBatch(const std::vector< std::string > &roles, const std::vector< std::string > &grantees)
static constexpr const char * REQUEST_LOGS_SYS_TABLE_NAME
void recordOwnershipOfObjectsInObjectPermissions()
void updateFrontendViewsToDashboards()
std::string dashboardSystemRoleName
This file contains the class specification and related data structures for Catalog.
void set_common_db_log_system_table_options(foreign_storage::ForeignTable &foreign_table)
foreign_storage::ForeignStorageCache * getDiskCache() const
int get_physical_cols() const
std::string dashboardState
static constexpr const char * ROLES_SYS_TABLE_NAME
static SysCatalog & instance()
This file contains the class specification and related data structures for SysCatalog.
static const std::string STORAGE_TYPE_KEY
std::string get_user_name_from_id(int32_t id, const std::map< int32_t, std::string > &user_name_by_user_id)
Classes representing a parse tree.
static int64_t getNextRefreshTime(const std::map< std::string, std::string, std::less<>> &foreign_table_options)
const DBMetadata & getCurrentDB() const
bool g_enable_system_tables
static const std::string getForeignTableSchema(bool if_not_exists=false)
#define INJECT_TIMER(DESC)
void populateOptionsMap(OptionsMap &&options_map, bool clear=false)
static constexpr char const * INTERNAL_ML_MODEL_METADATA
const std::string & get_foreign_column() const
void dropTable(const TableDescriptor *td)
std::string table_epochs_to_string(const std::vector< TableEpochInfo > &table_epochs)
void updateDefaultColumnValues()
std::vector< std::string > parse_underlying_dashboard_objects(const std::string &meta)
Encapsulates an enumeration of foreign data wrapper type strings.
std::unique_lock< T > unique_lock
const ColumnDescriptor * getMetadataForColumn(int tableId, const std::string &colName) const
#define CHUNK_KEY_TABLE_IDX
static const int32_t MAPD_VERSION
Role * getRoleGrantee(const std::string &name) const
static const std::string getForeignServerSchema(bool if_not_exists=false)
static const AccessPrivileges ALL_SERVER
void set_dict_key(ColumnDescriptor &cd)
static constexpr const char * MEMORY_SUMMARY_SYS_TABLE_NAME
TableDescriptorMapById tableDescriptorMapById_
void setOwner(int32_t userId)
bool is_dict_encoded_type() const
std::string get_checked_table_name(const Catalog *catalog, const ColumnDescriptor *cd)
specifies the content in-memory of a row in the column metadata table
#define DEFAULT_MAX_ROLLBACK_EPOCHS
specifies the content in-memory of a row in the table metadata table
static constexpr const char * ROLE_ASSIGNMENTS_SYS_TABLE_NAME
void updateDeletedColumnIndicator()
std::list< UserMetadata > getAllUserMetadata()
void buildDictionaryMapUnlocked()
static constexpr const char * USERS_SYS_TABLE_NAME
void createOrUpdateDashboardSystemRole(const std::string &view_meta, const int32_t &user_id, const std::string &dash_role_name)
std::shared_ptr< Fragmenter_Namespace::AbstractFragmenter > fragmenter
void replace_cached_table_name(std::map< std::string, int > &cachedTableMap, const std::string &curTableName, const std::string &newTableName, int tableId)
bool g_serialize_temp_tables
static constexpr char const * INTERNAL_EXECUTOR_STATS
std::string DBObjectTypeToString(DBObjectType type)
struct dict_ref_t DictRef
static constexpr const char * DATABASES_SYS_TABLE_NAME
#define DEFAULT_PAGE_SIZE
const DBObjectMap * getDbObjects(bool only_direct) const
void set_comp_param(int p)
void updateFixlenArrayColumns()
static constexpr const char * TABLES_SYS_TABLE_NAME
const int DEFAULT_INITIAL_VERSION
std::optional< std::string > default_value
V & get_from_map(std::map< K, V, comp > &map, const K &key)
static const std::string physicalTableNameTag_
int32_t g_distributed_leaf_idx
void reloadTableMetadata(int table_id)
HOST DEVICE EncodingType get_compression() const
static const AccessPrivileges SELECT_FROM_VIEW
bool table_is_temporary(const TableDescriptor *const td)
static constexpr const char * ML_MODEL_METADATA_SYS_TABLE_NAME
void set_dimension(int d)
#define DEFAULT_FRAGMENT_ROWS
void setStringDictKey(const shared::StringDictKey &dict_key)
std::map< std::string, std::shared_ptr< DashboardDescriptor >> DashboardDescriptorMap
static const std::string BASE_PATH_KEY
static const std::string getCustomExpressionsSchema(bool if_not_exists=false)
static constexpr const char * PERMISSIONS_SYS_TABLE_NAME
Fragmenter_Namespace::FragmenterType fragType
static constexpr char const * INTERNAL_MEMORY_STATS
Data_Namespace::MemoryLevel persistenceLevel
const Catalog * getObjForLock()
HOST DEVICE int get_dimension() const
static constexpr const char * STORAGE_DETAILS_SYS_TABLE_NAME
std::string convert_object_owners_map_to_string(int32_t db_id, int32_t new_owner_id, const std::map< int32_t, std::vector< DBObject >> &old_owner_db_objects)
static const AccessPrivileges ALL_DASHBOARD
torch::Tensor f(torch::Tensor x, torch::Tensor W_target, torch::Tensor b_target)
bool checkDropRenderGroupColumnsMigration()
std::string dashboardName
static const AccessPrivileges ALL_TABLE
std::string data_wrapper_type
const std::string kCatalogDirectoryName
HOST DEVICE int get_comp_param() const
int32_t g_distributed_num_leaves
void updateCustomExpressionsSchema()
const ForeignServer * foreign_server
static constexpr const char * REFRESH_TIMING_TYPE_KEY
std::unique_ptr< heavyai::DistributedSharedMutex > dsqliteMutex_
static constexpr char const * CSV
std::map< std::string, std::string, std::less<>> OptionsMap
std::optional< std::string > getTableName(int32_t table_id) const
void reloadTableMetadataUnlocked(int table_id)
static void migrateDateInDaysMetadata(const Catalog_Namespace::TableDescriptorMapById &table_descriptors_by_id, const int database_id, Catalog_Namespace::Catalog *cat, SqliteConnector &sqlite)
static void clearExternalCaches(bool for_update, const TableDescriptor *td, const int current_db_id)
Descriptor for a dictionary for a string columne.
void updateLogicalToPhysicalTableLinkSchema()
static constexpr int NULL_REFRESH_TIME
FileBuffer Chunk
A Chunk is the fundamental unit of execution in Map-D.
static int64_t getCurrentTime()
std::unordered_map< std::string, std::vector< std::string > > getGranteesOfSharedDashboards(const std::vector< std::string > &dashboard_ids)
const std::string kLockfilesDirectoryName
void populateRoleDbObjects(const std::vector< DBObject > &objects)
static const AccessPrivileges DELETE_DASHBOARD
const TableDescriptor * getMetadataForTable(const std::string &tableName, const bool populateFragmenter=true) const
Returns a pointer to a const TableDescriptor struct matching the provided tableName.
void drop_tables(Catalog &catalog, const std::vector< std::string > &table_names)
HOST DEVICE bool get_notnull() const
int32_t validate_and_get_user_id(const std::string &user_name)
void renameLegacyDataWrappers()
void updateTableDescriptorSchema()
void renameForDelete(const std::string directoryName)
Renames a directory to DELETE_ME_<EPOCH>_<oldname>.
void reloadDictionariesFromDiskUnlocked()
static constexpr char const * FOREIGN_TABLE
bool is_string_array() const
int32_t max_rollback_epochs
static constexpr char const * INTERNAL_LOGS
static constexpr const char * WS_SERVER_LOGS_SYS_TABLE_NAME
SQLTypeInfo get_var_encoded_text_array_type()
bool is_reserved_sql_keyword(std::string_view str)
returns true if the string equals an OmniSci SQL reserved keyword
thread_holding_sqlite_lock()
std::map< int32_t, std::string > get_user_id_to_user_name_map()
static constexpr char const * PARQUET
virtual void renameDbObject(const DBObject &object)
virtual size_t getNumRows() const
A selection of helper methods for File I/O.
LogicalToPhysicalTableMapById logicalToPhysicalTableMapById_
Catalog()
Constructor builds a hollow catalog used during constructor of other catalogs.
TableDescriptorMap tableDescriptorMap_
static thread_local bool thread_holds_read_lock
void CheckAndExecuteMigrationsPostBuildMaps()
static constexpr const char * SCHEDULE_REFRESH_TIMING_TYPE
std::string generate_dashboard_system_rolename(const std::string &db_id, const std::string &dash_id)
std::tuple< int, int > ColumnIdKey
void createDashboardSystemRoles()
void updateLogicalToPhysicalTableMap(const int32_t logical_tb_id)
HOST DEVICE void set_type(SQLTypes t)