OmniSciDB  c1a53651b2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "Catalog/Catalog.h"
24 
25 #include <algorithm>
26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/range/adaptor/map.hpp>
29 #include <boost/version.hpp>
30 #include <cassert>
31 #include <cerrno>
32 #include <cstdio>
33 #include <cstring>
34 #include <exception>
35 #include <fstream>
36 #include <list>
37 #include <memory>
38 #include <random>
39 #include <regex>
40 #include <sstream>
41 
42 #if BOOST_VERSION >= 106600
43 #include <boost/uuid/detail/sha1.hpp>
44 #else
45 #include <boost/uuid/sha1.hpp>
46 #endif
47 #include <rapidjson/document.h>
48 #include <rapidjson/istreamwrapper.h>
49 #include <rapidjson/ostreamwrapper.h>
50 #include <rapidjson/writer.h>
51 
52 #include "Catalog/SysCatalog.h"
53 
54 #include "QueryEngine/Execute.h"
56 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
68 #include "Parser/ParserNode.h"
69 #include "QueryEngine/Execute.h"
71 #include "RefreshTimeCalculator.h"
72 #include "Shared/DateTimeParser.h"
73 #include "Shared/File.h"
74 #include "Shared/StringTransform.h"
75 #include "Shared/SysDefinitions.h"
76 #include "Shared/measure.h"
77 #include "Shared/misc.h"
79 
80 #include "MapDRelease.h"
81 #include "RWLocks.h"
83 
84 #include "Shared/distributed.h"
85 
86 using Chunk_NS::Chunk;
89 using std::list;
90 using std::map;
91 using std::pair;
92 using std::runtime_error;
93 using std::string;
94 using std::vector;
95 
96 bool g_enable_fsi{true};
97 bool g_enable_s3_fsi{false};
101 extern bool g_cache_string_hash;
102 extern bool g_enable_system_tables;
103 
104 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
105 // under unit testing.
107 
108 namespace Catalog_Namespace {
109 
110 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
112  1073741824; // 2^30, give room for over a billion non-temp tables
114  1073741824; // 2^30, give room for over a billion non-temp dictionaries
115 
116 const std::string Catalog::physicalTableNameTag_("_shard_#");
117 
118 thread_local bool Catalog::thread_holds_read_lock = false;
119 
124 
125 // migration will be done as two step process this release
126 // will create and use new table
127 // next release will remove old table, doing this to have fall back path
128 // incase of migration failure
131  sqliteConnector_.query("BEGIN TRANSACTION");
132  try {
134  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
135  if (sqliteConnector_.getNumRows() != 0) {
136  // already done
137  sqliteConnector_.query("END TRANSACTION");
138  return;
139  }
141  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
142  "userid integer references mapd_users, state text, image_hash text, update_time "
143  "timestamp, "
144  "metadata text, UNIQUE(userid, name) )");
145  // now copy content from old table to new table
147  "insert into mapd_dashboards (id, name , "
148  "userid, state, image_hash, update_time , "
149  "metadata) "
150  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
151  "view_metadata "
152  "from mapd_frontend_views");
153  } catch (const std::exception& e) {
154  sqliteConnector_.query("ROLLBACK TRANSACTION");
155  throw;
156  }
157  sqliteConnector_.query("END TRANSACTION");
158 }
159 
160 namespace {
161 
162 inline auto table_json_filepath(const std::string& base_path,
163  const std::string& db_name) {
164  return boost::filesystem::path(base_path + "/" + shared::kCatalogDirectoryName + "/" +
165  db_name + "_temp_tables.json");
166 }
167 
168 std::map<int32_t, std::string> get_user_id_to_user_name_map();
169 } // namespace
170 
172 
173 Catalog::Catalog(const string& basePath,
174  const DBMetadata& curDB,
175  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
176  const std::vector<LeafHostInfo>& string_dict_hosts,
177  std::shared_ptr<Calcite> calcite,
178  bool is_new_db)
179  : basePath_(basePath)
180  , sqliteConnector_(curDB.dbName, basePath + "/" + shared::kCatalogDirectoryName + "/")
181  , currentDB_(curDB)
182  , dataMgr_(dataMgr)
183  , string_dict_hosts_(string_dict_hosts)
184  , calciteMgr_(calcite)
185  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
186  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
187  , dcatalogMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
188  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
189  shared::kCatalogDirectoryName / (currentDB_.dbName + ".lockfile"),
190  [this](size_t) {
191  if (!initialized_) {
192  return;
193  }
194  const auto user_name_by_user_id = get_user_id_to_user_name_map();
196  *dsqliteMutex_);
197  reloadCatalogMetadataUnlocked(user_name_by_user_id);
198  }))
199  , dsqliteMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
200  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
201  shared::kCatalogDirectoryName / (currentDB_.dbName + ".sqlite.lockfile")))
202  , sqliteMutex_()
203  , sharedMutex_()
206  if (!g_enable_fsi) {
207  CHECK(!g_enable_system_tables) << "System tables require FSI to be enabled";
208  CHECK(!g_enable_s3_fsi) << "S3 FSI requires FSI to be enabled";
209  }
210 
211  if (!is_new_db && !g_multi_instance) {
212  CheckAndExecuteMigrations();
213  }
214 
215  buildMaps();
216 
217  if (g_enable_fsi) {
218  createDefaultServersIfNotExists();
219  }
220  if (!is_new_db) {
221  CheckAndExecuteMigrationsPostBuildMaps();
222  }
224  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
225  }
226  conditionallyInitializeSystemObjects();
227  // once all initialized use real object
228  initialized_ = true;
229 }
230 
232  // cat_write_lock write_lock(this);
233 
234  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
235  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
236  tableDescIt != tableDescriptorMap_.end();
237  ++tableDescIt) {
238  tableDescIt->second->fragmenter = nullptr;
239  delete tableDescIt->second;
240  }
241 
242  // TableDescriptorMapById points to the same descriptors. No need to delete
243 
244  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
245  columnDescIt != columnDescriptorMap_.end();
246  ++columnDescIt) {
247  delete columnDescIt->second;
248  }
249 
250  // ColumnDescriptorMapById points to the same descriptors. No need to delete
251 
253  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
254  }
255 }
256 
258  if (initialized_) {
259  return this;
260  } else {
261  return SysCatalog::instance().getDummyCatalog().get();
262  }
263 }
264 
267  sqliteConnector_.query("BEGIN TRANSACTION");
268  try {
269  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
270  std::vector<std::string> cols;
271  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
272  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
273  }
274  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
275  cols.end()) {
276  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
278  sqliteConnector_.query(queryString);
279  }
280  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
281  cols.end()) {
282  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
283  std::to_string(0));
284  sqliteConnector_.query(queryString);
285  }
286  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
287  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
288  std::to_string(-1));
289  sqliteConnector_.query(queryString);
290  }
291  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
292  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
293  std::to_string(0));
294  sqliteConnector_.query(queryString);
295  }
296  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
297  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
298  sqliteConnector_.query(queryString);
299  }
300  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
301  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
303  sqliteConnector_.query(queryString);
304  }
305  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
306  cols.end()) {
308  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
309  }
310  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
311  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
312  sqliteConnector_.query(queryString);
313  }
314  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
315  cols.end()) {
316  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
317  std::to_string(-1));
318  sqliteConnector_.query(queryString);
319  }
320  if (std::find(cols.begin(), cols.end(), std::string("is_system_table")) ==
321  cols.end()) {
322  string queryString("ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
323  sqliteConnector_.query(queryString);
324  }
325  } catch (std::exception& e) {
326  sqliteConnector_.query("ROLLBACK TRANSACTION");
327  throw;
328  }
329  sqliteConnector_.query("END TRANSACTION");
330 }
331 
334  sqliteConnector_.query("BEGIN TRANSACTION");
335  try {
337  "select name from sqlite_master WHERE type='table' AND "
338  "name='mapd_version_history'");
339  if (sqliteConnector_.getNumRows() == 0) {
341  "CREATE TABLE mapd_version_history(version integer, migration_history text "
342  "unique)");
343  } else {
345  "select * from mapd_version_history where migration_history = "
346  "'notnull_fixlen_arrays'");
347  if (sqliteConnector_.getNumRows() != 0) {
348  // legacy fixlen arrays had migrated
349  // no need for further execution
350  sqliteConnector_.query("END TRANSACTION");
351  return;
352  }
353  }
354  // Insert check for migration
356  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
357  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
358  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
359  // Upating all fixlen array columns
360  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
361  std::to_string(kARRAY) + " AND size>0;");
362  sqliteConnector_.query(queryString);
363  } catch (std::exception& e) {
364  sqliteConnector_.query("ROLLBACK TRANSACTION");
365  throw;
366  }
367  sqliteConnector_.query("END TRANSACTION");
368 }
369 
372  sqliteConnector_.query("BEGIN TRANSACTION");
373  try {
375  "select name from sqlite_master WHERE type='table' AND "
376  "name='mapd_version_history'");
377  if (sqliteConnector_.getNumRows() == 0) {
379  "CREATE TABLE mapd_version_history(version integer, migration_history text "
380  "unique)");
381  } else {
383  "select * from mapd_version_history where migration_history = "
384  "'notnull_geo_columns'");
385  if (sqliteConnector_.getNumRows() != 0) {
386  // legacy geo columns had migrated
387  // no need for further execution
388  sqliteConnector_.query("END TRANSACTION");
389  return;
390  }
391  }
392  // Insert check for migration
394  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
395  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
396  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
397  // Upating all geo columns
398  string queryString(
399  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
400  " OR coltype=" + std::to_string(kMULTIPOINT) + " OR coltype=" +
402  " OR coltype=" + std::to_string(kPOLYGON) +
403  " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
404  sqliteConnector_.query(queryString);
405  } catch (std::exception& e) {
406  sqliteConnector_.query("ROLLBACK TRANSACTION");
407  throw;
408  }
409  sqliteConnector_.query("END TRANSACTION");
410 }
411 
414  sqliteConnector_.query("BEGIN TRANSACTION");
415  try {
416  // check table still exists
418  "SELECT name FROM sqlite_master WHERE type='table' AND "
419  "name='mapd_frontend_views'");
420  if (sqliteConnector_.getNumRows() == 0) {
421  // table does not exists
422  // no need to migrate
423  sqliteConnector_.query("END TRANSACTION");
424  return;
425  }
426  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
427  std::vector<std::string> cols;
428  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
429  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
430  }
431  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
432  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
433  }
434  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
435  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
436  }
437  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
438  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
439  }
440  } catch (std::exception& e) {
441  sqliteConnector_.query("ROLLBACK TRANSACTION");
442  throw;
443  }
444  sqliteConnector_.query("END TRANSACTION");
445 }
446 
449  sqliteConnector_.query("BEGIN TRANSACTION");
450  try {
452  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
453  "integer references mapd_users, "
454  "link text unique, view_state text, update_time timestamp, view_metadata text)");
455  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
456  std::vector<std::string> cols;
457  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
458  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
459  }
460  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
461  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
462  }
463  } catch (const std::exception& e) {
464  sqliteConnector_.query("ROLLBACK TRANSACTION");
465  throw;
466  }
467  sqliteConnector_.query("END TRANSACTION");
468 }
469 
472  sqliteConnector_.query("BEGIN TRANSACTION");
473  try {
474  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
475  // check table still exists
477  "SELECT name FROM sqlite_master WHERE type='table' AND "
478  "name='mapd_frontend_views'");
479  if (sqliteConnector_.getNumRows() == 0) {
480  // table does not exists
481  // no need to migrate
482  sqliteConnector_.query("END TRANSACTION");
483  return;
484  }
486  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
487  } catch (const std::exception& e) {
488  sqliteConnector_.query("ROLLBACK TRANSACTION");
489  throw;
490  }
491  sqliteConnector_.query("END TRANSACTION");
492 }
493 
494 // introduce DB version into the tables table
495 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
496 // old value
497 
500  if (currentDB_.dbName.length() == 0) {
501  // updateDictionaryNames dbName length is zero nothing to do here
502  return;
503  }
504  sqliteConnector_.query("BEGIN TRANSACTION");
505  try {
506  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
507  std::vector<std::string> cols;
508  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
509  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
510  }
511  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
512  LOG(INFO) << "Updating mapd_tables updatePageSize";
513  // No version number
514  // need to update the defaul tpagesize to old correct value
515  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
516  // need to add new version info
517  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
519  sqliteConnector_.query(queryString);
520  }
521  } catch (std::exception& e) {
522  sqliteConnector_.query("ROLLBACK TRANSACTION");
523  throw;
524  }
525  sqliteConnector_.query("END TRANSACTION");
526 }
527 
530  sqliteConnector_.query("BEGIN TRANSACTION");
531  try {
532  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
533  std::vector<std::string> cols;
534  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
535  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
536  }
537  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
538  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
539  // need to add new version info
540  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
542  sqliteConnector_.query(queryString);
543  // need to add new column to table defintion to indicate deleted column, column used
544  // as bitmap for deleted rows.
546  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
547  }
548  } catch (std::exception& e) {
549  sqliteConnector_.query("ROLLBACK TRANSACTION");
550  throw;
551  }
552  sqliteConnector_.query("END TRANSACTION");
553 }
554 
557  sqliteConnector_.query("BEGIN TRANSACTION");
558  try {
559  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
560  std::vector<std::string> cols;
561  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
562  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
563  }
564  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
565  LOG(INFO) << "Adding support for default values to mapd_columns";
566  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
567  }
568  } catch (std::exception& e) {
569  LOG(ERROR) << "Failed to make metadata update for default values` support";
570  sqliteConnector_.query("ROLLBACK TRANSACTION");
571  throw;
572  }
573  sqliteConnector_.query("END TRANSACTION");
574 }
575 
576 // introduce DB version into the dictionary tables
577 // if the DB does not have a version rename all dictionary tables
578 
581  if (currentDB_.dbName.length() == 0) {
582  // updateDictionaryNames dbName length is zero nothing to do here
583  return;
584  }
585  sqliteConnector_.query("BEGIN TRANSACTION");
586  try {
587  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
588  std::vector<std::string> cols;
589  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
590  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
591  }
592  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
593  // No version number
594  // need to rename dictionaries
595  string dictQuery("SELECT dictid, name from mapd_dictionaries");
596  sqliteConnector_.query(dictQuery);
597  size_t numRows = sqliteConnector_.getNumRows();
598  for (size_t r = 0; r < numRows; ++r) {
599  int dictId = sqliteConnector_.getData<int>(r, 0);
600  std::string dictName = sqliteConnector_.getData<string>(r, 1);
601 
602  std::string oldName = g_base_path + "/" + shared::kDataDirectoryName + "/" +
603  currentDB_.dbName + "_" + dictName;
604  std::string newName = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
605  std::to_string(currentDB_.dbId) + "_DICT_" +
606  std::to_string(dictId);
607 
608  int result = rename(oldName.c_str(), newName.c_str());
609 
610  if (result == 0) {
611  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
612  << newName;
613  } else {
614  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
615  << newName + " dbname '" << currentDB_.dbName << "' error code "
616  << std::to_string(result);
617  }
618  }
619  // need to add new version info
620  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
622  sqliteConnector_.query(queryString);
623  }
624  } catch (std::exception& e) {
625  sqliteConnector_.query("ROLLBACK TRANSACTION");
626  throw;
627  }
628  sqliteConnector_.query("END TRANSACTION");
629 }
630 
633  sqliteConnector_.query("BEGIN TRANSACTION");
634  try {
636  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
637  "logical_table_id integer, physical_table_id integer)");
638  } catch (const std::exception& e) {
639  sqliteConnector_.query("ROLLBACK TRANSACTION");
640  throw;
641  }
642  sqliteConnector_.query("END TRANSACTION");
643 }
644 
645 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
646  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
647  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
648  */
649 
651  sqliteConnector_.query("BEGIN TRANSACTION");
652  try {
653  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
654  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
655  const auto physicalTables = physicalTableIt->second;
656  CHECK(!physicalTables.empty());
657  for (size_t i = 0; i < physicalTables.size(); i++) {
658  int32_t physical_tb_id = physicalTables[i];
660  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
661  "physical_table_id) VALUES (?1, ?2)",
662  std::vector<std::string>{std::to_string(logical_tb_id),
663  std::to_string(physical_tb_id)});
664  }
665  }
666  } catch (std::exception& e) {
667  sqliteConnector_.query("ROLLBACK TRANSACTION");
668  throw;
669  }
670  sqliteConnector_.query("END TRANSACTION");
671 }
672 
675  sqliteConnector_.query("BEGIN TRANSACTION");
676  try {
677  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
678  std::vector<std::string> cols;
679  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
680  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
681  }
682  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
683  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
684  }
685  } catch (std::exception& e) {
686  sqliteConnector_.query("ROLLBACK TRANSACTION");
687  throw;
688  }
689  sqliteConnector_.query("END TRANSACTION");
690 }
691 
694  sqliteConnector_.query("BEGIN TRANSACTION");
695  try {
698  } catch (std::exception& e) {
699  sqliteConnector_.query("ROLLBACK TRANSACTION");
700  throw;
701  }
702  sqliteConnector_.query("END TRANSACTION");
703 }
704 
706  // TODO: Move common migration logic to a shared function.
708  sqliteConnector_.query("BEGIN TRANSACTION");
709  try {
711  "select name from sqlite_master WHERE type='table' AND "
712  "name='mapd_version_history'");
713  static const std::string migration_name{"rename_legacy_data_wrappers"};
714  if (sqliteConnector_.getNumRows() == 0) {
716  "CREATE TABLE mapd_version_history(version integer, migration_history text "
717  "unique)");
718  } else {
720  "select * from mapd_version_history where migration_history = "
721  "'" +
722  migration_name + "'");
723  if (sqliteConnector_.getNumRows() != 0) {
724  // Migration already done.
725  sqliteConnector_.query("END TRANSACTION");
726  return;
727  }
728  }
729  LOG(INFO) << "Executing " << migration_name << " migration.";
730 
731  // Update legacy data wrapper names
733  // clang-format off
734  std::map<std::string, std::string> old_to_new_wrapper_names{
735  {"OMNISCI_CSV", DataWrapperType::CSV},
736  {"OMNISCI_PARQUET", DataWrapperType::PARQUET},
737  {"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
738  {"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
739  {"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
740  {"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
741  };
742  // clang-format on
743 
744  for (const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
746  "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
747  "data_wrapper_type = ?",
748  std::vector<std::string>{new_wrapper_name, old_wrapper_name});
749  }
750 
751  // Record migration.
753  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
754  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
755  LOG(INFO) << migration_name << " migration completed.";
756  } catch (std::exception& e) {
757  sqliteConnector_.query("ROLLBACK TRANSACTION");
758  throw;
759  }
760  sqliteConnector_.query("END TRANSACTION");
761 }
762 
765  sqliteConnector_.query("BEGIN TRANSACTION");
766  try {
768  } catch (const std::exception& e) {
769  sqliteConnector_.query("ROLLBACK TRANSACTION");
770  throw;
771  }
772  sqliteConnector_.query("END TRANSACTION");
773 }
774 
775 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
776  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
777  "omnisci_foreign_servers(id integer primary key, name text unique, " +
778  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
779  "options text)";
780 }
781 
782 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
783  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
784  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
785  "options text, last_refresh_time integer, next_refresh_time integer, " +
786  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
787  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
788 }
789 
790 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
791  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
792  "omnisci_custom_expressions(id integer primary key, name text, " +
793  "expression_json text, data_source_type text, " +
794  "data_source_id integer, is_deleted boolean)";
795 }
796 
799  sqliteConnector_.query("BEGIN TRANSACTION");
800  std::vector<DBObject> objects;
801  try {
803  "SELECT name FROM sqlite_master WHERE type='table' AND "
804  "name='mapd_record_ownership_marker'");
805  // check if mapd catalog - marker exists
806  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
807  // already done
808  sqliteConnector_.query("END TRANSACTION");
809  return;
810  }
811  // check if different catalog - marker exists
812  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
813  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
814  // Check if migration is being performed on existing non mapd catalogs
815  // Older non mapd dbs will have table but no record in them
816  if (sqliteConnector_.getNumRows() != 0) {
817  // already done
818  sqliteConnector_.query("END TRANSACTION");
819  return;
820  }
821  }
822  // marker not exists - create one
823  else {
824  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
825  }
826 
827  DBMetadata db;
828  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
829  // place dbId as a refernce for migration being performed
831  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
832  std::vector<std::string>{std::to_string(db.dbOwner)});
833 
834  static const std::map<const DBObjectType, const AccessPrivileges>
835  object_level_all_privs_lookup{
841 
842  // grant owner all permissions on DB
843  DBObjectKey key;
844  key.dbId = currentDB_.dbId;
845  auto _key_place = [&key](auto type) {
846  key.permissionType = type;
847  return key;
848  };
849  for (auto& it : object_level_all_privs_lookup) {
850  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
851  objects.back().setName(currentDB_.dbName);
852  }
853 
854  {
855  // other users tables and views
856  string tableQuery(
857  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
858  sqliteConnector_.query(tableQuery);
859  size_t numRows = sqliteConnector_.getNumRows();
860  for (size_t r = 0; r < numRows; ++r) {
861  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
862  std::string tableName = sqliteConnector_.getData<string>(r, 1);
863  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
864  bool isview = sqliteConnector_.getData<bool>(r, 3);
865 
868  DBObjectKey key;
869  key.dbId = currentDB_.dbId;
870  key.objectId = tableid;
871  key.permissionType = type;
872 
873  DBObject obj(tableName, type);
874  obj.setObjectKey(key);
875  obj.setOwner(ownerid);
878 
879  objects.push_back(obj);
880  }
881  }
882 
883  {
884  // other users dashboards
885  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
886  sqliteConnector_.query(tableQuery);
887  size_t numRows = sqliteConnector_.getNumRows();
888  for (size_t r = 0; r < numRows; ++r) {
889  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
890  std::string dashName = sqliteConnector_.getData<string>(r, 1);
891  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
892 
894  DBObjectKey key;
895  key.dbId = currentDB_.dbId;
896  key.objectId = dashId;
897  key.permissionType = type;
898 
899  DBObject obj(dashName, type);
900  obj.setObjectKey(key);
901  obj.setOwner(ownerid);
903 
904  objects.push_back(obj);
905  }
906  }
907  } catch (const std::exception& e) {
908  sqliteConnector_.query("ROLLBACK TRANSACTION");
909  throw;
910  }
911  sqliteConnector_.query("END TRANSACTION");
912 
913  // now apply the objects to the syscat to track the permisisons
914  // moved outside transaction to avoid lock in sqlite
915  try {
917  } catch (const std::exception& e) {
918  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
919  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
920  e.what());
921  // will need to remove the mapd_record_ownership_marker table and retry
922  }
923 }
924 
929 }
930 
932  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
933  std::vector<std::string> dashboard_ids;
934  static const std::string migration_name{"dashboard_roles_migration"};
935  {
937  sqliteConnector_.query("BEGIN TRANSACTION");
938  try {
939  // migration_history should be present in all catalogs by now
940  // if not then would be created before this migration
942  "select * from mapd_version_history where migration_history = '" +
943  migration_name + "'");
944  if (sqliteConnector_.getNumRows() != 0) {
945  // no need for further execution
946  sqliteConnector_.query("END TRANSACTION");
947  return;
948  }
949  LOG(INFO) << "Performing dashboard internal roles Migration.";
950  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
951  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
954  sqliteConnector_.getData<string>(i, 0)))) {
955  // Successfully created roles during previous migration/crash
956  // No need to include them
957  continue;
958  }
959  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
960  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
961  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
962  }
963  } catch (const std::exception& e) {
964  sqliteConnector_.query("ROLLBACK TRANSACTION");
965  throw;
966  }
967  sqliteConnector_.query("END TRANSACTION");
968  }
969  // All current grantees with shared dashboards.
970  const auto active_grantees =
972 
973  try {
974  // NOTE(wamsi): Transactionally unsafe
975  for (auto dash : dashboards) {
976  createOrUpdateDashboardSystemRole(dash.second.second,
977  dash.second.first,
979  std::to_string(currentDB_.dbId), dash.first));
980  auto result = active_grantees.find(dash.first);
981  if (result != active_grantees.end()) {
984  dash.first)},
985  result->second);
986  }
987  }
989  // check if this has already been completed
991  "select * from mapd_version_history where migration_history = '" +
992  migration_name + "'");
993  if (sqliteConnector_.getNumRows() != 0) {
994  return;
995  }
997  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
998  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
999  } catch (const std::exception& e) {
1000  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
1001  << e.what();
1002  throw;
1003  }
1004  LOG(INFO) << "Successfully created dashboard system roles during migration.";
1005 }
1006 
1008  cat_write_lock write_lock(this);
1012  updateGeoColumns();
1015  updateLinkSchema();
1019  updatePageSize();
1023  if (g_enable_fsi) {
1024  updateFsiSchemas();
1026  }
1029 }
1030 
1034 }
1035 
1036 namespace {
1037 std::map<int32_t, std::string> get_user_id_to_user_name_map() {
1038  auto users = SysCatalog::instance().getAllUserMetadata();
1039  std::map<int32_t, std::string> user_name_by_user_id;
1040  for (const auto& user : users) {
1041  user_name_by_user_id[user.userId] = user.userName;
1042  }
1043  return user_name_by_user_id;
1044 }
1045 
1047  int32_t id,
1048  const std::map<int32_t, std::string>& user_name_by_user_id) {
1049  auto entry = user_name_by_user_id.find(id);
1050  if (entry != user_name_by_user_id.end()) {
1051  return entry->second;
1052  }
1053  // a user could be deleted and a dashboard still exist?
1054  return "Unknown";
1055 }
1056 
1058  CHECK_GT(cd.db_id, 0);
1059  auto& column_type = cd.columnType;
1060  if (column_type.is_dict_encoded_string() ||
1061  column_type.is_subtype_dict_encoded_string()) {
1062  column_type.setStringDictKey({cd.db_id, column_type.get_comp_param()});
1063  }
1064 }
1065 } // namespace
1066 
1068  std::string dictQuery(
1069  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1070  sqliteConnector_.query(dictQuery);
1071  auto numRows = sqliteConnector_.getNumRows();
1072  for (size_t r = 0; r < numRows; ++r) {
1073  auto dictId = sqliteConnector_.getData<int>(r, 0);
1074  auto dictName = sqliteConnector_.getData<string>(r, 1);
1075  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1076  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1077  auto refcount = sqliteConnector_.getData<int>(r, 4);
1078  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1079  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1080  DictRef dict_ref(currentDB_.dbId, dictId);
1081  auto dd = new DictDescriptor(
1082  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1083  dictDescriptorMapByRef_[dict_ref].reset(dd);
1084  }
1085 }
1086 
1087 // NOTE(sy): Only used by --multi-instance clusters.
1088 void Catalog::reloadTableMetadata(int table_id) {
1089  cat_write_lock write_lock(this);
1091  reloadTableMetadataUnlocked(table_id);
1092 }
1093 
1094 // NOTE(sy): Only used by --multi-instance clusters.
1096  // Reload dictionaries first.
1097  // TODO(sy): Does dictionary reloading really belong here?
1098  // We don't have dictionary locks in the system but maybe we need them.
1099  list<DictDescriptor> dicts;
1100  std::string dictQuery(
1101  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1102  sqliteConnector_.query(dictQuery);
1103  auto numRows = sqliteConnector_.getNumRows();
1104  for (size_t r = 0; r < numRows; ++r) {
1105  auto dictId = sqliteConnector_.getData<int>(r, 0);
1106  auto dictName = sqliteConnector_.getData<string>(r, 1);
1107  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1108  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1109  auto refcount = sqliteConnector_.getData<int>(r, 4);
1110  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1111  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1112  DictRef dict_ref(currentDB_.dbId, dictId);
1113  DictDescriptor dd(dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1114  if (auto it = dictDescriptorMapByRef_.find(dict_ref);
1115  it == dictDescriptorMapByRef_.end()) {
1116  dictDescriptorMapByRef_[dict_ref] = std::make_unique<DictDescriptor>(dd);
1117  } else {
1118  *it->second = dd;
1119  }
1120  }
1121 
1122  // Delete this table's metadata from the in-memory cache before reloading.
1123  TableDescriptor* original_td;
1124  std::list<ColumnDescriptor*> original_cds;
1125  if (auto it1 = tableDescriptorMapById_.find(table_id);
1126  it1 != tableDescriptorMapById_.end()) {
1127  original_td = it1->second;
1128  tableDescriptorMapById_.erase(it1);
1129  if (auto it2 = tableDescriptorMap_.find(to_upper(original_td->tableName));
1130  it2 != tableDescriptorMap_.end()) {
1131  CHECK_EQ(original_td, it2->second);
1132  tableDescriptorMap_.erase(it2);
1133  }
1134  if (original_td->hasDeletedCol) {
1135  const auto ret = deletedColumnPerTable_.erase(original_td);
1136  CHECK_EQ(ret, size_t(1));
1137  }
1138  for (int column_id = 0; column_id < original_td->nColumns; ++column_id) {
1139  if (auto it3 = columnDescriptorMapById_.find({table_id, column_id});
1140  it3 != columnDescriptorMapById_.end()) {
1141  ColumnDescriptor* original_cd = it3->second;
1142  original_cds.push_back(original_cd);
1143  removeFromColumnMap(original_cd);
1144  }
1145  }
1146  }
1147 
1148  // Reload the table descriptor.
1149  std::string tableQuery(
1150  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1151  "max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, "
1152  "num_shards, key_metainfo, userid, sort_column_id, storage_type, "
1153  "max_rollback_epochs, is_system_table from mapd_tables WHERE tableid = " +
1154  std::to_string(table_id));
1155  sqliteConnector_.query(tableQuery);
1156  numRows = sqliteConnector_.getNumRows();
1157  if (!numRows) {
1158  return; // Table was deleted by another node.
1159  }
1160 
1161  TableDescriptor* td;
1162  const auto& storage_type = sqliteConnector_.getData<string>(0, 17);
1163  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1164  const auto& table_name = sqliteConnector_.getData<string>(0, 1);
1165  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1166  "supported table option (table "
1167  << table_name << " [" << table_id << "] in database " << currentDB_.dbName
1168  << ").";
1169  }
1170 
1171  if (storage_type == StorageType::FOREIGN_TABLE) {
1172  td = new foreign_storage::ForeignTable();
1173  } else {
1174  td = new TableDescriptor();
1175  }
1176 
1177  td->storageType = storage_type;
1178  td->tableId = sqliteConnector_.getData<int>(0, 0);
1179  td->tableName = sqliteConnector_.getData<string>(0, 1);
1180  td->nColumns = sqliteConnector_.getData<int>(0, 2);
1181  td->isView = sqliteConnector_.getData<bool>(0, 3);
1182  td->fragments = sqliteConnector_.getData<string>(0, 4);
1183  td->fragType =
1184  (Fragmenter_Namespace::FragmenterType)sqliteConnector_.getData<int>(0, 5);
1185  td->maxFragRows = sqliteConnector_.getData<int>(0, 6);
1186  td->maxChunkSize = sqliteConnector_.getData<int64_t>(0, 7);
1187  td->fragPageSize = sqliteConnector_.getData<int>(0, 8);
1188  td->maxRows = sqliteConnector_.getData<int64_t>(0, 9);
1189  td->partitions = sqliteConnector_.getData<string>(0, 10);
1190  td->shardedColumnId = sqliteConnector_.getData<int>(0, 11);
1191  td->shard = sqliteConnector_.getData<int>(0, 12);
1192  td->nShards = sqliteConnector_.getData<int>(0, 13);
1193  td->keyMetainfo = sqliteConnector_.getData<string>(0, 14);
1194  td->userId = sqliteConnector_.getData<int>(0, 15);
1195  td->sortedColumnId =
1196  sqliteConnector_.isNull(0, 16) ? 0 : sqliteConnector_.getData<int>(0, 16);
1197  if (!td->isView) {
1198  td->fragmenter = nullptr;
1199  }
1200  td->maxRollbackEpochs = sqliteConnector_.getData<int>(0, 18);
1201  td->is_system_table = sqliteConnector_.getData<bool>(0, 19);
1202  td->hasDeletedCol = false;
1203 
1204  if (td->isView) {
1205  // If we have a view, then we need to refresh the viewSQL from the mapd_views table,
1206  // since this value is not contained in the mapd_tables table.
1207  updateViewUnlocked(*td);
1208  }
1209 
1210  if (auto tableDescIt = tableDescriptorMapById_.find(table_id);
1211  tableDescIt != tableDescriptorMapById_.end()) {
1212  tableDescIt->second->fragmenter = nullptr;
1213  delete tableDescIt->second;
1214  }
1215 
1216  // Reload the column descriptors.
1217  std::list<ColumnDescriptor*> cds;
1218  std::string columnQuery(
1219  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1220  "is_notnull, compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
1221  "virtual_expr, is_deletedcol, default_value from mapd_columns WHERE tableid = " +
1222  std::to_string(table_id) + " ORDER BY tableid, columnid");
1223  sqliteConnector_.query(columnQuery);
1224  numRows = sqliteConnector_.getNumRows();
1225  int32_t skip_physical_cols = 0;
1226  for (size_t r = 0; r < numRows; ++r) {
1227  ColumnDescriptor* cd = new ColumnDescriptor();
1228  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1229  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1230  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1231  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
1232  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
1233  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
1234  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1235  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
1236  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
1237  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
1238  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1239  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1240  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1241  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1242  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1243  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1244  if (sqliteConnector_.isNull(r, 16)) {
1245  cd->default_value = std::nullopt;
1246  } else {
1247  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1248  }
1249  cd->isGeoPhyCol = skip_physical_cols-- > 0;
1250  cd->db_id = getDatabaseId();
1251  set_dict_key(*cd);
1252  cds.push_back(cd);
1253  }
1254 
1255  // Store the descriptors into the cache.
1256  if (original_td) {
1257  td->mutex_ = original_td->mutex_; // TODO(sy): Unnecessary?
1258  delete original_td;
1259  original_td = nullptr;
1260  }
1261  for (ColumnDescriptor* original_cd : original_cds) {
1262  delete original_cd;
1263  }
1264  original_cds.clear();
1265  tableDescriptorMap_[to_upper(td->tableName)] = td;
1266  tableDescriptorMapById_[td->tableId] = td;
1267  skip_physical_cols = 0;
1268  for (ColumnDescriptor* cd : cds) {
1269  addToColumnMap(cd);
1270 
1271  if (skip_physical_cols <= 0) {
1272  skip_physical_cols = cd->columnType.get_physical_cols();
1273  }
1274 
1275  if (cd->isDeletedCol) {
1276  td->hasDeletedCol = true;
1277  setDeletedColumnUnlocked(td, cd);
1278  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1279  td->columnIdBySpi_.push_back(cd->columnId);
1280  }
1281  }
1282 
1283  // Notify Calcite about the reloaded table.
1284  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
1285 }
1286 
1287 // NOTE(sy): Only used by --multi-instance clusters.
1288 void Catalog::reloadCatalogMetadata(
1289  const std::map<int32_t, std::string>& user_name_by_user_id) {
1290  cat_write_lock write_lock(this);
1291  cat_sqlite_lock sqlite_lock(getObjForLock());
1292  reloadCatalogMetadataUnlocked(get_user_id_to_user_name_map());
1293 }
1294 
1295 // NOTE(sy): Only used by --multi-instance clusters.
1296 void Catalog::reloadCatalogMetadataUnlocked(
1297  const std::map<int32_t, std::string>& user_name_by_user_id) {
1299 
1300  // Notice when tables or columns have been created, dropped, or changed by other nodes.
1301  // Needed so that users will see reasonably-correct lists of what objects exist.
1302 
1303  // Load the list of table ID's that exist on disk storage.
1304  std::set<int> cluster_table_ids;
1305  std::string tableQuery("SELECT tableid from mapd_tables");
1306  sqliteConnector_.query(tableQuery);
1307  auto numRows = sqliteConnector_.getNumRows();
1308  for (size_t r = 0; r < numRows; ++r) {
1309  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1310  cluster_table_ids.insert(table_id);
1311  }
1312 
1313  // Ignore any table ID's locked by other threads on this node.
1314  // Those other threads are already handling any necessary reloading for those tables.
1315  std::set<int> ignored_table_ids;
1316  for (ChunkKey const& k : lockmgr::TableSchemaLockMgr::instance().getLockedTables()) {
1317  CHECK_EQ(k.size(), 2U);
1318  if (k[CHUNK_KEY_DB_IDX] != currentDB_.dbId) {
1319  continue;
1320  }
1321  ignored_table_ids.insert(k[CHUNK_KEY_TABLE_IDX]);
1322  }
1323 
1324  // For this node's Catalog cache:
1325  // Newly created table schemas created by other nodes need to be loaded.
1326  // Unlocked table schemas might have been renamed by other nodes; just reload them all.
1327  // Deleted table schemas still in this node's cache need to be flushed.
1328  std::set<int> reload_table_ids;
1329  for (auto const& cluster_table_id : cluster_table_ids) {
1330  if (ignored_table_ids.find(cluster_table_id) == ignored_table_ids.end()) {
1331  reload_table_ids.insert(cluster_table_id);
1332  }
1333  }
1334  for (auto const& [cached_table_id, td] : tableDescriptorMapById_) {
1335  if (cluster_table_ids.find(cached_table_id) == cluster_table_ids.end()) {
1336  reload_table_ids.insert(cached_table_id);
1337  }
1338  }
1339 
1340  // Reload tables.
1341  for (auto const& reload_table_id : reload_table_ids) {
1342  reloadTableMetadataUnlocked(reload_table_id);
1343  }
1344 
1346 
1347  dashboardDescriptorMap_.clear();
1348  linkDescriptorMap_.clear();
1349  linkDescriptorMapById_.clear();
1350  foreignServerMap_.clear();
1351  foreignServerMapById_.clear();
1352  custom_expr_map_by_id_.clear();
1353 
1354  if (g_enable_fsi) {
1355  buildForeignServerMapUnlocked();
1356  }
1357 
1358  updateViewsInMapUnlocked();
1359  buildDashboardsMapUnlocked(user_name_by_user_id);
1360  buildLinksMapUnlocked();
1361  buildCustomExpressionsMapUnlocked();
1362 
1363  // Notify Calcite about the reloaded database.
1364  if (calciteMgr_) {
1365  calciteMgr_->updateMetadata(currentDB_.dbName, {});
1366  }
1367 }
1368 
1369 void Catalog::buildTablesMapUnlocked() {
1370  std::string tableQuery(
1371  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1372  "max_chunk_size, frag_page_size, "
1373  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1374  "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1375  "from mapd_tables");
1376  sqliteConnector_.query(tableQuery);
1377  auto numRows = sqliteConnector_.getNumRows();
1378  for (size_t r = 0; r < numRows; ++r) {
1379  TableDescriptor* td;
1380  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
1381  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1382  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1383  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
1384  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1385  "supported table option (table "
1386  << table_name << " [" << table_id << "] in database "
1387  << currentDB_.dbName << ").";
1388  }
1389 
1390  if (storage_type == StorageType::FOREIGN_TABLE) {
1391  td = new foreign_storage::ForeignTable();
1392  } else {
1393  td = new TableDescriptor();
1394  }
1395 
1396  td->storageType = storage_type;
1397  td->tableId = sqliteConnector_.getData<int>(r, 0);
1398  td->tableName = sqliteConnector_.getData<string>(r, 1);
1399  td->nColumns = sqliteConnector_.getData<int>(r, 2);
1400  td->isView = sqliteConnector_.getData<bool>(r, 3);
1401  td->fragments = sqliteConnector_.getData<string>(r, 4);
1402  td->fragType =
1403  (Fragmenter_Namespace::FragmenterType)sqliteConnector_.getData<int>(r, 5);
1404  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1405  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1406  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1407  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1408  td->partitions = sqliteConnector_.getData<string>(r, 10);
1409  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1410  td->shard = sqliteConnector_.getData<int>(r, 12);
1411  td->nShards = sqliteConnector_.getData<int>(r, 13);
1412  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1413  td->userId = sqliteConnector_.getData<int>(r, 15);
1414  td->sortedColumnId =
1415  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1416  if (!td->isView) {
1417  td->fragmenter = nullptr;
1418  }
1419  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1420  td->is_system_table = sqliteConnector_.getData<bool>(r, 19);
1421  td->hasDeletedCol = false;
1422 
1423  tableDescriptorMap_[to_upper(td->tableName)] = td;
1424  tableDescriptorMapById_[td->tableId] = td;
1425  }
1426 }
1427 
1428 void Catalog::buildColumnsMapUnlocked() {
1429  std::string columnQuery(
1430  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1431  "is_notnull, compression, comp_param, "
1432  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1433  "default_value from "
1434  "mapd_columns ORDER BY tableid, "
1435  "columnid");
1436  sqliteConnector_.query(columnQuery);
1437  auto numRows = sqliteConnector_.getNumRows();
1438  int32_t skip_physical_cols = 0;
1439  for (size_t r = 0; r < numRows; ++r) {
1440  ColumnDescriptor* cd = new ColumnDescriptor();
1441  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1442  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1443  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1444  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
1445  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
1446  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
1447  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1448  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
1449  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
1450  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
1451  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1452  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1453  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1454  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1455  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1456  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1457  if (sqliteConnector_.isNull(r, 16)) {
1458  cd->default_value = std::nullopt;
1459  } else {
1460  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1461  }
1462  cd->isGeoPhyCol = skip_physical_cols > 0;
1463  cd->db_id = getDatabaseId();
1464  set_dict_key(*cd);
1465  addToColumnMap(cd);
1466 
1467  if (skip_physical_cols <= 0) {
1468  skip_physical_cols = cd->columnType.get_physical_cols();
1469  }
1470 
1471  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1472  CHECK(td_itr != tableDescriptorMapById_.end());
1473 
1474  if (cd->isDeletedCol) {
1475  td_itr->second->hasDeletedCol = true;
1476  setDeletedColumnUnlocked(td_itr->second, cd);
1477  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1478  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1479  }
1480  }
1481 
1482  // sort columnIdBySpi_ based on columnId
1483  for (auto& tit : tableDescriptorMapById_) {
1484  std::sort(tit.second->columnIdBySpi_.begin(),
1485  tit.second->columnIdBySpi_.end(),
1486  [](const size_t a, const size_t b) -> bool { return a < b; });
1487  }
1488 }
1489 
1490 void Catalog::updateViewUnlocked(TableDescriptor& td) {
1491  std::string viewQuery("SELECT sql FROM mapd_views where tableid = " +
1492  std::to_string(td.tableId));
1493  sqliteConnector_.query(viewQuery);
1494  auto num_rows = sqliteConnector_.getNumRows();
1495  CHECK_EQ(num_rows, 1U) << "Expected single entry in mapd_views for view '"
1496  << td.tableName << "', instead got " << num_rows;
1497  td.viewSQL = sqliteConnector_.getData<string>(0, 0);
1498 }
1499 
1500 void Catalog::updateViewsInMapUnlocked() {
1501  std::string viewQuery("SELECT tableid, sql FROM mapd_views");
1502  sqliteConnector_.query(viewQuery);
1503  auto numRows = sqliteConnector_.getNumRows();
1504  for (size_t r = 0; r < numRows; ++r) {
1505  auto tableId = sqliteConnector_.getData<int>(r, 0);
1506  auto td = tableDescriptorMapById_[tableId];
1507  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1508  td->fragmenter = nullptr;
1509  }
1510 }
1511 
1512 void Catalog::buildDashboardsMapUnlocked(
1513  const std::map<int32_t, std::string>& user_name_by_user_id) {
1514  std::string frontendViewQuery(
1515  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1516  "userid, "
1517  "metadata "
1518  "FROM mapd_dashboards");
1519  sqliteConnector_.query(frontendViewQuery);
1520  auto numRows = sqliteConnector_.getNumRows();
1521  for (size_t r = 0; r < numRows; ++r) {
1522  auto vd = std::make_shared<DashboardDescriptor>();
1523  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1524  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1525  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1526  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1527  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1528  vd->userId = sqliteConnector_.getData<int>(r, 5);
1529  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1530  vd->user = get_user_name_from_id(vd->userId, user_name_by_user_id);
1531  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1532  std::to_string(currentDB_.dbId), sqliteConnector_.getData<string>(r, 0));
1533  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1534  }
1535 }
1536 
1537 void Catalog::buildLinksMapUnlocked() {
1538  std::string linkQuery(
1539  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1540  "update_time), view_metadata "
1541  "FROM mapd_links");
1542  sqliteConnector_.query(linkQuery);
1543  auto numRows = sqliteConnector_.getNumRows();
1544  for (size_t r = 0; r < numRows; ++r) {
1545  auto ld = new LinkDescriptor();
1546  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1547  ld->userId = sqliteConnector_.getData<int>(r, 1);
1548  ld->link = sqliteConnector_.getData<string>(r, 2);
1549  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1550  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1551  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1552  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld->link] = ld;
1553  linkDescriptorMapById_[ld->linkId] = ld;
1554  }
1555 }
1556 
1557 void Catalog::buildLogicalToPhysicalMapUnlocked() {
1558  /* rebuild map linking logical tables to corresponding physical ones */
1559  std::string logicalToPhysicalTableMapQuery(
1560  "SELECT logical_table_id, physical_table_id "
1561  "FROM mapd_logical_to_physical");
1562  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1563  auto numRows = sqliteConnector_.getNumRows();
1564  for (size_t r = 0; r < numRows; ++r) {
1565  auto logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1566  auto physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1567  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1568  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1569  /* add new entity to the map logicalToPhysicalTableMapById_ */
1570  std::vector<int32_t> physicalTables{physical_tb_id};
1571  const auto it_ok =
1572  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1573  CHECK(it_ok.second);
1574  } else {
1575  /* update map logicalToPhysicalTableMapById_ */
1576  physicalTableIt->second.push_back(physical_tb_id);
1577  }
1578  }
1579 }
1580 
1581 // The catalog uses a series of maps to cache data that have been read from the sqlite
1582 // tables. Usually we update these maps whenever we write using sqlite, so this function
1583 // is responsible for initializing all of them based on the sqlite db state.
1584 void Catalog::buildMaps() {
1585  // Get all user id to username mapping here in order to avoid making a call to
1586  // SysCatalog (and attempting to acquire SysCatalog locks) while holding locks for this
1587  // catalog.
1588  const auto user_name_by_user_id = get_user_id_to_user_name_map();
1589 
1590  cat_write_lock write_lock(this);
1591  cat_sqlite_lock sqlite_lock(getObjForLock());
1592 
1593  buildDictionaryMapUnlocked();
1594  buildTablesMapUnlocked();
1595 
1596  if (g_enable_fsi) {
1597  buildForeignServerMapUnlocked();
1598  updateForeignTablesInMapUnlocked();
1599  }
1600 
1601  buildColumnsMapUnlocked();
1602  updateViewsInMapUnlocked();
1603  buildDashboardsMapUnlocked(user_name_by_user_id);
1604  buildLinksMapUnlocked();
1605  buildLogicalToPhysicalMapUnlocked();
1606  buildCustomExpressionsMapUnlocked();
1607 }
1608 
1609 void Catalog::buildCustomExpressionsMapUnlocked() {
1610  sqliteConnector_.query(
1611  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1612  "is_deleted "
1613  "FROM omnisci_custom_expressions");
1614  auto num_rows = sqliteConnector_.getNumRows();
1615  for (size_t row = 0; row < num_rows; row++) {
1616  auto custom_expr = getCustomExpressionFromConnector(row);
1617  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1618  }
1619 }
1620 
1621 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1622  auto id = sqliteConnector_.getData<int>(row, 0);
1623  auto name = sqliteConnector_.getData<string>(row, 1);
1624  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1625  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1626  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1627  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1628  return std::make_unique<CustomExpression>(
1629  id,
1630  name,
1631  expression_json,
1632  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1633  data_source_id,
1634  is_deleted);
1635 }
1636 
1637 void Catalog::addTableToMap(const TableDescriptor* td,
1638  const list<ColumnDescriptor>& columns,
1639  const list<DictDescriptor>& dicts) {
1640  cat_write_lock write_lock(this);
1641  TableDescriptor* new_td;
1642 
1643  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1644  if (foreign_table) {
1645  auto new_foreign_table = new foreign_storage::ForeignTable();
1646  *new_foreign_table = *foreign_table;
1647  new_td = new_foreign_table;
1648  } else {
1649  new_td = new TableDescriptor();
1650  *new_td = *td;
1651  }
1652 
1653  new_td->mutex_ = std::make_shared<std::mutex>();
1654  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1655  tableDescriptorMapById_[td->tableId] = new_td;
1656  for (auto cd : columns) {
1657  ColumnDescriptor* new_cd = new ColumnDescriptor();
1658  *new_cd = cd;
1659  addToColumnMap(new_cd);
1660 
1661  // Add deleted column to the map
1662  if (cd.isDeletedCol) {
1663  CHECK(new_td->hasDeletedCol);
1664  setDeletedColumnUnlocked(new_td, new_cd);
1665  }
1666  }
1667 
1668  std::sort(new_td->columnIdBySpi_.begin(),
1669  new_td->columnIdBySpi_.end(),
1670  [](const size_t a, const size_t b) -> bool { return a < b; });
1671  // TODO(sy): Why does addTableToMap() sort columnIdBySpi_ but not insert into it while
1672  // buildColumnsMapUnlocked() does both?
1673 
1674  std::unique_ptr<StringDictionaryClient> client;
1675  DictRef dict_ref(currentDB_.dbId, -1);
1676  if (!string_dict_hosts_.empty()) {
1677  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1678  }
1679  for (auto dd : dicts) {
1680  if (!dd.dictRef.dictId) {
1681  // Dummy entry created for a shard of a logical table, nothing to do.
1682  continue;
1683  }
1684  dict_ref.dictId = dd.dictRef.dictId;
1685  if (client) {
1686  client->create(dict_ref, dd.dictIsTemp);
1687  }
1688  DictDescriptor* new_dd = new DictDescriptor(dd);
1689  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1690  if (!dd.dictIsTemp) {
1691  boost::filesystem::create_directory(new_dd->dictFolderPath);
1692  }
1693  }
1694 }
1695 
1696 void Catalog::removeTableFromMap(const string& tableName,
1697  const int tableId,
1698  const bool is_on_error) {
1699  cat_write_lock write_lock(this);
1700  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1701  if (tableDescIt == tableDescriptorMapById_.end()) {
1702  throw runtime_error("Table " + tableName + " does not exist.");
1703  }
1704 
1705  TableDescriptor* td = tableDescIt->second;
1706 
1707  if (td->hasDeletedCol) {
1708  const auto ret = deletedColumnPerTable_.erase(td);
1709  CHECK_EQ(ret, size_t(1));
1710  }
1711 
1712  tableDescriptorMapById_.erase(tableDescIt);
1713  tableDescriptorMap_.erase(to_upper(tableName));
1714  td->fragmenter = nullptr;
1715  dict_columns_by_table_id_.erase(tableId);
1716 
1718  delete td;
1719 
1720  std::unique_ptr<StringDictionaryClient> client;
1721  if (SysCatalog::instance().isAggregator()) {
1722  CHECK(!string_dict_hosts_.empty());
1723  DictRef dict_ref(currentDB_.dbId, -1);
1724  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1725  }
1726 
1727  // delete all column descriptors for the table
1728  // no more link columnIds to sequential indexes!
1729  for (auto cit = columnDescriptorMapById_.begin();
1730  cit != columnDescriptorMapById_.end();) {
1731  if (tableId != std::get<0>(cit->first)) {
1732  ++cit;
1733  } else {
1734  int i = std::get<1>(cit++->first);
1735  ColumnIdKey cidKey(tableId, i);
1736  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1737  ColumnDescriptor* cd = colDescIt->second;
1738  columnDescriptorMapById_.erase(colDescIt);
1739  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1740  columnDescriptorMap_.erase(cnameKey);
1741  const int dictId = cd->columnType.get_comp_param();
1742  // Dummy dictionaries created for a shard of a logical table have the id set to
1743  // zero.
1744  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1745  INJECT_TIMER(removingDicts);
1746  DictRef dict_ref(currentDB_.dbId, dictId);
1747  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1748  // If we're removing this table due to an error, it is possible that the string
1749  // dictionary reference was never populated. Don't crash, just continue cleaning
1750  // up the TableDescriptor and ColumnDescriptors
1751  if (!is_on_error) {
1752  CHECK(dictIt != dictDescriptorMapByRef_.end());
1753  } else {
1754  if (dictIt == dictDescriptorMapByRef_.end()) {
1755  continue;
1756  }
1757  }
1758  const auto& dd = dictIt->second;
1759  CHECK_GE(dd->refcount, 1);
1760  --dd->refcount;
1761  if (!dd->refcount) {
1762  dd->stringDict.reset();
1763  if (!isTemp) {
1764  File_Namespace::renameForDelete(dd->dictFolderPath);
1765  }
1766  if (client) {
1767  client->drop(dict_ref);
1768  }
1769  dictDescriptorMapByRef_.erase(dictIt);
1770  }
1771  }
1772 
1773  delete cd;
1774  }
1775  }
1776 }
1777 
1778 void Catalog::addFrontendViewToMap(DashboardDescriptor& vd) {
1779  cat_write_lock write_lock(this);
1780  addFrontendViewToMapNoLock(vd);
1781 }
1782 
1783 void Catalog::addFrontendViewToMapNoLock(DashboardDescriptor& vd) {
1784  cat_write_lock write_lock(this);
1785  dashboardDescriptorMap_[std::to_string(vd.userId) + ":" + vd.dashboardName] =
1786  std::make_shared<DashboardDescriptor>(vd);
1787 }
1788 
1789 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1790  const int& user_id) {
1791  std::vector<DBObject> objects;
1792  DBObjectKey key;
1793  key.dbId = currentDB_.dbId;
1794  auto _key_place = [&key](auto type, auto id) {
1795  key.permissionType = type;
1796  key.objectId = id;
1797  return key;
1798  };
1799  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1800  auto td = getMetadataForTable(object_name, false);
1801  if (!td) {
1802  // Parsed object source is not present in current database
1803  // LOG the info and ignore
1804  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1805  << " no longer exists in current DB.";
1806  continue;
1807  }
1808  // Dashboard source can be Table or View
1809  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1810  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1812  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1813  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1814  objects.back().setName(td->tableName);
1815  }
1816  return objects;
1817 }
1818 
1819 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1820  const int32_t& user_id,
1821  const std::string& dash_role_name) {
1822  auto objects = parseDashboardObjects(view_meta, user_id);
1823  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1824  if (!rl) {
1825  // Dashboard role does not exist
1826  // create role and grant privileges
1827  // NOTE(wamsi): Transactionally unsafe
1828  SysCatalog::instance().createRole(
1829  dash_role_name, /*user_private_role=*/false, /*is_temporary=*/false);
1830  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1831  } else {
1832  // Dashboard system role already exists
1833  // Add/remove privileges on objects
1834  std::set<DBObjectKey> revoke_keys;
1835  auto ex_objects = rl->getDbObjects(true);
1836  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1837  if (key.permissionType != TableDBObjectType &&
1838  key.permissionType != ViewDBObjectType) {
1839  continue;
1840  }
1841  bool found = false;
1842  for (auto obj : objects) {
1843  found = key == obj.getObjectKey() ? true : false;
1844  if (found) {
1845  break;
1846  }
1847  }
1848  if (!found) {
1849  revoke_keys.insert(key);
1850  }
1851  }
1852  for (auto& key : revoke_keys) {
1853  // revoke privs on object since the object is no
1854  // longer used by the dashboard as source
1855  // NOTE(wamsi): Transactionally unsafe
1856  SysCatalog::instance().revokeDBObjectPrivileges(
1857  dash_role_name, *rl->findDbObject(key, true), *this);
1858  }
1859  // Update privileges on remaining objects
1860  // NOTE(wamsi): Transactionally unsafe
1861  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1862  }
1863 }
1864 
1865 void Catalog::addLinkToMap(LinkDescriptor& ld) {
1866  cat_write_lock write_lock(this);
1867  LinkDescriptor* new_ld = new LinkDescriptor();
1868  *new_ld = ld;
1869  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld.link] = new_ld;
1870  linkDescriptorMapById_[ld.linkId] = new_ld;
1871 }
1872 
1873 void Catalog::instantiateFragmenter(TableDescriptor* td) const {
1874  auto time_ms = measure<>::execution([&]() {
1875  // instanciate table fragmenter upon first use
1876  // assume only insert order fragmenter is supported
1878  vector<Chunk> chunkVec;
1879  auto columnDescs = getAllColumnMetadataForTable(td->tableId, true, false, true);
1880  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1881  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1882  if (td->sortedColumnId > 0) {
1883  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1884  chunkVec,
1885  dataMgr_.get(),
1886  const_cast<Catalog*>(this),
1887  td->tableId,
1888  td->shard,
1889  td->maxFragRows,
1890  td->maxChunkSize,
1891  td->fragPageSize,
1892  td->maxRows,
1893  td->persistenceLevel);
1894  } else {
1895  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1896  chunkVec,
1897  dataMgr_.get(),
1898  const_cast<Catalog*>(this),
1899  td->tableId,
1900  td->shard,
1901  td->maxFragRows,
1902  td->maxChunkSize,
1903  td->fragPageSize,
1904  td->maxRows,
1905  td->persistenceLevel,
1906  !td->storageType.empty());
1907  }
1908  });
1909  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1910  << time_ms << "ms";
1911 }
1912 
1913 foreign_storage::ForeignTable* Catalog::getForeignTableUnlocked(
1914  const std::string& tableName) const {
1915  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1916  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1917  return nullptr;
1918  }
1919  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1920 }
1921 
1922 const foreign_storage::ForeignTable* Catalog::getForeignTable(
1923  const std::string& tableName) const {
1924  cat_read_lock read_lock(this);
1925  return getForeignTableUnlocked(tableName);
1926 }
1927 
1928 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1929  const bool populateFragmenter) const {
1930  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1931  // pure metadata calls
1932  cat_read_lock read_lock(this);
1933  auto td = getMutableMetadataForTableUnlocked(tableName);
1934  if (!td) {
1935  return nullptr;
1936  }
1937  read_lock.unlock();
1938  if (populateFragmenter) {
1939  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1940  if (td->fragmenter == nullptr && !td->isView) {
1941  instantiateFragmenter(td);
1942  }
1943  }
1944  return td; // returns pointer to table descriptor
1945 }
1946 
1947 const TableDescriptor* Catalog::getMetadataForTable(int table_id,
1948  bool populateFragmenter) const {
1949  cat_read_lock read_lock(this);
1950  auto td = getMutableMetadataForTableUnlocked(table_id);
1951  if (!td) {
1952  return nullptr;
1953  }
1954  read_lock.unlock();
1955  if (populateFragmenter) {
1956  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1957  if (td->fragmenter == nullptr && !td->isView) {
1958  instantiateFragmenter(td);
1959  }
1960  }
1961  return td;
1962 }
1963 
1964 std::optional<std::string> Catalog::getTableName(int32_t table_id) const {
1965  cat_read_lock read_lock(this);
1966  auto td = getMutableMetadataForTableUnlocked(table_id);
1967  if (!td) {
1968  return {};
1969  }
1970  return td->tableName;
1971 }
1972 
1973 std::optional<int32_t> Catalog::getTableId(const std::string& table_name) const {
1974  cat_read_lock read_lock(this);
1975  auto td = getMutableMetadataForTableUnlocked(table_name);
1976  if (!td) {
1977  return {};
1978  }
1979  return td->tableId;
1980 }
1981 
1982 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(
1983  const std::string& table_name) const {
1984  auto it = tableDescriptorMap_.find(to_upper(table_name));
1985  if (it == tableDescriptorMap_.end()) {
1986  return nullptr;
1987  }
1988  return it->second;
1989 }
1990 
1991 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(int table_id) const {
1992  auto tableDescIt = tableDescriptorMapById_.find(table_id);
1993  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1994  return nullptr;
1995  }
1996  return tableDescIt->second;
1997 }
1998 
1999 const DictDescriptor* Catalog::getMetadataForDict(const int dict_id,
2000  const bool load_dict) const {
2001  cat_read_lock read_lock(this);
2002  const DictRef dictRef(currentDB_.dbId, dict_id);
2003  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
2004  if (dictDescIt ==
2005  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
2006  return nullptr;
2007  }
2008  auto& dd = dictDescIt->second;
2009 
2010  if (load_dict) {
2011  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
2012  if (!dd->stringDict) {
2013  auto time_ms = measure<>::execution([&]() {
2014  if (string_dict_hosts_.empty()) {
2015  if (dd->dictIsTemp) {
2016  dd->stringDict = std::make_shared<StringDictionary>(
2017  dd->dictRef, dd->dictFolderPath, true, true, g_cache_string_hash);
2018  } else {
2019  dd->stringDict = std::make_shared<StringDictionary>(
2020  dd->dictRef, dd->dictFolderPath, false, true, g_cache_string_hash);
2021  }
2022  } else {
2023  dd->stringDict =
2024  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
2025  }
2026  });
2027  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
2028  << dd->dictRef.dictId << " was " << time_ms << "ms";
2029  }
2030  }
2031 
2032  return dd.get();
2033 }
2034 
2035 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
2036  return string_dict_hosts_;
2037 }
2038 
2039 const ColumnDescriptor* Catalog::getMetadataForColumn(int tableId,
2040  const string& columnName) const {
2041  cat_read_lock read_lock(this);
2042 
2043  ColumnKey columnKey(tableId, to_upper(columnName));
2044  auto colDescIt = columnDescriptorMap_.find(columnKey);
2045  if (colDescIt ==
2046  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
2047  return nullptr;
2048  }
2049  return colDescIt->second;
2050 }
2051 
2052 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
2053  cat_read_lock read_lock(this);
2054  ColumnIdKey columnIdKey(table_id, column_id);
2055  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2056  if (colDescIt == columnDescriptorMapById_
2057  .end()) { // need to check to make sure column exists for table
2058  return nullptr;
2059  }
2060  return colDescIt->second;
2061 }
2062 
2063 const std::optional<std::string> Catalog::getColumnName(int table_id,
2064  int column_id) const {
2065  cat_read_lock read_lock(this);
2066  auto it = columnDescriptorMapById_.find(ColumnIdKey{table_id, column_id});
2067  if (it == columnDescriptorMapById_.end()) {
2068  return {};
2069  }
2070  return it->second->columnName;
2071 }
2072 
2073 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
2074  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
2075  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2076  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
2077 
2078  auto spx = spi;
2079  int phi = 0;
2080  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
2081  {
2082  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
2083  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
2084  }
2085 
2086  CHECK(0 < spx && spx <= columnIdBySpi.size())
2087  << "spx = " << spx << ", size = " << columnIdBySpi.size();
2088  return columnIdBySpi[spx - 1] + phi;
2089 }
2090 
2091 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
2092  cat_read_lock read_lock(this);
2093  return getColumnIdBySpiUnlocked(table_id, spi);
2094 }
2095 
2096 const ColumnDescriptor* Catalog::getMetadataForColumnBySpi(const int tableId,
2097  const size_t spi) const {
2098  cat_read_lock read_lock(this);
2099 
2100  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
2101  ColumnIdKey columnIdKey(tableId, columnId);
2102  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2103  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
2104 }
2105 
2106 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
2107  const UserMetadata& user) {
2108  std::stringstream invalid_ids, restricted_ids;
2109 
2110  for (int32_t dashboard_id : dashboard_ids) {
2111  if (!getMetadataForDashboard(dashboard_id)) {
2112  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
2113  continue;
2114  }
2115  DBObject object(dashboard_id, DashboardDBObjectType);
2116  object.loadKey(*this);
2117  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
2118  std::vector<DBObject> privs = {object};
2119  if (!SysCatalog::instance().checkPrivileges(user, privs)) {
2120  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
2121  }
2122  }
2123 
2124  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
2125  std::stringstream error_message;
2126  error_message << "Delete dashboard(s) failed with error(s):";
2127  if (invalid_ids.str().size() > 0) {
2128  error_message << "\nDashboard id: " << invalid_ids.str()
2129  << " - Dashboard id does not exist";
2130  }
2131  if (restricted_ids.str().size() > 0) {
2132  error_message
2133  << "\nDashboard id: " << restricted_ids.str()
2134  << " - User should be either owner of dashboard or super user to delete it";
2135  }
2136  throw std::runtime_error(error_message.str());
2137  }
2138  std::vector<DBObject> dash_objs;
2139 
2140  for (int32_t dashboard_id : dashboard_ids) {
2141  dash_objs.emplace_back(dashboard_id, DashboardDBObjectType);
2142  }
2143  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
2144  SysCatalog::instance().revokeDBObjectPrivilegesFromAllBatch(dash_objs, this);
2145  {
2146  cat_write_lock write_lock(this);
2147  cat_sqlite_lock sqlite_lock(getObjForLock());
2148 
2149  sqliteConnector_.query("BEGIN TRANSACTION");
2150  try {
2151  for (int32_t dashboard_id : dashboard_ids) {
2152  auto dash = getMetadataForDashboard(dashboard_id);
2153  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
2154  // rollback if already deleted
2155  if (!dash) {
2156  throw std::runtime_error(
2157  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
2158  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
2159  }
2160  std::string user_id = std::to_string(dash->userId);
2161  std::string dash_name = dash->dashboardName;
2162  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
2163  dashboardDescriptorMap_.erase(viewDescIt);
2164  sqliteConnector_.query_with_text_params(
2165  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
2166  std::vector<std::string>{dash_name, user_id});
2167  }
2168  } catch (std::exception& e) {
2169  sqliteConnector_.query("ROLLBACK TRANSACTION");
2170  throw;
2171  }
2172  sqliteConnector_.query("END TRANSACTION");
2173  }
2174 }
2175 
2176 const DashboardDescriptor* Catalog::getMetadataForDashboard(
2177  const string& userId,
2178  const string& dashName) const {
2179  cat_read_lock read_lock(this);
2180 
2181  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
2182  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
2183  return nullptr;
2184  }
2185  return viewDescIt->second.get(); // returns pointer to view descriptor
2186 }
2187 
2188 const DashboardDescriptor* Catalog::getMetadataForDashboard(const int32_t id) const {
2189  cat_read_lock read_lock(this);
2190  std::string userId;
2191  std::string name;
2192  bool found{false};
2193  {
2194  for (auto descp : dashboardDescriptorMap_) {
2195  auto dash = descp.second.get();
2196  if (dash->dashboardId == id) {
2197  userId = std::to_string(dash->userId);
2198  name = dash->dashboardName;
2199  found = true;
2200  break;
2201  }
2202  }
2203  }
2204  if (found) {
2205  return getMetadataForDashboard(userId, name);
2206  }
2207  return nullptr;
2208 }
2209 
2210 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
2211  cat_read_lock read_lock(this);
2212  auto linkDescIt = linkDescriptorMap_.find(link);
2213  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
2214  return nullptr;
2215  }
2216  return linkDescIt->second; // returns pointer to view descriptor
2217 }
2218 
2219 const LinkDescriptor* Catalog::getMetadataForLink(int linkId) const {
2220  cat_read_lock read_lock(this);
2221  auto linkDescIt = linkDescriptorMapById_.find(linkId);
2222  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
2223  return nullptr;
2224  }
2225  return linkDescIt->second;
2226 }
2227 
2228 const foreign_storage::ForeignTable* Catalog::getForeignTable(int table_id) const {
2229  cat_read_lock read_lock(this);
2230  const auto table = getMutableMetadataForTableUnlocked(table_id);
2231  CHECK(table);
2232  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
2233  CHECK(foreign_table);
2234  return foreign_table;
2235 }
2236 
2237 void Catalog::getAllColumnMetadataForTableImpl(
2238  const TableDescriptor* td,
2239  list<const ColumnDescriptor*>& columnDescriptors,
2240  const bool fetchSystemColumns,
2241  const bool fetchVirtualColumns,
2242  const bool fetchPhysicalColumns) const {
2243  int32_t skip_physical_cols = 0;
2244  for (const auto& columnDescriptor : columnDescriptorMapById_) {
2245  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
2246  --skip_physical_cols;
2247  continue;
2248  }
2249  auto cd = columnDescriptor.second;
2250  if (cd->tableId != td->tableId) {
2251  continue;
2252  }
2253  if (!fetchSystemColumns && cd->isSystemCol) {
2254  continue;
2255  }
2256  if (!fetchVirtualColumns && cd->isVirtualCol) {
2257  continue;
2258  }
2259  if (!fetchPhysicalColumns) {
2260  const auto& col_ti = cd->columnType;
2261  skip_physical_cols = col_ti.get_physical_cols();
2262  }
2263  columnDescriptors.push_back(cd);
2264  }
2265 }
2266 
2267 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
2268  const int tableId,
2269  const bool fetchSystemColumns,
2270  const bool fetchVirtualColumns,
2271  const bool fetchPhysicalColumns) const {
2272  cat_read_lock read_lock(this);
2273  std::list<const ColumnDescriptor*> columnDescriptors;
2274  const TableDescriptor* td = getMutableMetadataForTableUnlocked(tableId);
2275  getAllColumnMetadataForTableImpl(td,
2276  columnDescriptors,
2277  fetchSystemColumns,
2278  fetchVirtualColumns,
2279  fetchPhysicalColumns);
2280  return columnDescriptors;
2281 }
2282 
2283 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
2284  cat_read_lock read_lock(this);
2285  list<const TableDescriptor*> table_list;
2286  for (auto p : tableDescriptorMapById_) {
2287  table_list.push_back(p.second);
2288  }
2289  return table_list;
2290 }
2291 
2292 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy() const {
2293  cat_read_lock read_lock(this);
2294  std::vector<TableDescriptor> tables;
2295  tables.reserve(tableDescriptorMapById_.size());
2296  for (auto table_entry : tableDescriptorMapById_) {
2297  tables.emplace_back(*table_entry.second);
2298  tables.back().fragmenter = nullptr;
2299  }
2300  return tables;
2301 }
2302 
2303 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
2304  cat_read_lock read_lock(this);
2305  list<const DashboardDescriptor*> dashboards;
2306  for (auto dashboard_entry : dashboardDescriptorMap_) {
2307  dashboards.push_back(dashboard_entry.second.get());
2308  }
2309  return dashboards;
2310 }
2311 
2312 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataCopy() const {
2313  cat_read_lock read_lock(this);
2314  std::vector<DashboardDescriptor> dashboards;
2315  dashboards.reserve(dashboardDescriptorMap_.size());
2316  for (auto dashboard_entry : dashboardDescriptorMap_) {
2317  dashboards.emplace_back(*dashboard_entry.second);
2318  }
2319  return dashboards;
2320 }
2321 
2322 DictRef Catalog::addDictionary(ColumnDescriptor& cd) {
2323  cat_write_lock write_lock(this);
2324  const auto& td = *tableDescriptorMapById_[cd.tableId];
2325  list<DictDescriptor> dds;
2326  setColumnDictionary(cd, dds, td, true);
2327  auto& dd = dds.back();
2328  CHECK(dd.dictRef.dictId);
2329 
2330  std::unique_ptr<StringDictionaryClient> client;
2331  if (!string_dict_hosts_.empty()) {
2332  client.reset(new StringDictionaryClient(
2333  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
2334  }
2335  if (client) {
2336  client->create(dd.dictRef, dd.dictIsTemp);
2337  }
2338 
2339  DictDescriptor* new_dd = new DictDescriptor(dd);
2340  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2341  if (!dd.dictIsTemp) {
2342  boost::filesystem::create_directory(new_dd->dictFolderPath);
2343  }
2344  return dd.dictRef;
2345 }
2346 
2347 void Catalog::delDictionary(const ColumnDescriptor& cd) {
2348  cat_write_lock write_lock(this);
2349  cat_sqlite_lock sqlite_lock(getObjForLock());
2350  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
2351  return;
2352  }
2353  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
2354  return;
2355  }
2356  const auto dictId = cd.columnType.get_comp_param();
2357  CHECK_GT(dictId, 0);
2358  // decrement and zero check dict ref count
2359  const auto td = getMetadataForTable(cd.tableId, false);
2360  CHECK(td);
2361  sqliteConnector_.query_with_text_param(
2362  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2363  std::to_string(dictId));
2364  sqliteConnector_.query_with_text_param(
2365  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
2366  const auto refcount = sqliteConnector_.getData<int>(0, 0);
2367  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
2368  << refcount;
2369  if (refcount > 0) {
2370  return;
2371  }
2372  const DictRef dictRef(currentDB_.dbId, dictId);
2373  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
2374  std::to_string(dictId));
2376  "/DB_" + std::to_string(currentDB_.dbId) + "_DICT_" +
2377  std::to_string(dictId));
2378 
2379  std::unique_ptr<StringDictionaryClient> client;
2380  if (!string_dict_hosts_.empty()) {
2381  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
2382  }
2383  if (client) {
2384  client->drop(dictRef);
2385  }
2386 
2387  dictDescriptorMapByRef_.erase(dictRef);
2388 }
2389 
2390 void Catalog::getDictionary(const ColumnDescriptor& cd,
2391  std::map<int, StringDictionary*>& stringDicts) {
2392  // learn 'committed' ColumnDescriptor of this column
2393  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2394  CHECK(cit != columnDescriptorMap_.end());
2395  auto& ccd = *cit->second;
2396 
2397  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2398  return;
2399  }
2400  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
2401  return;
2402  }
2403  if (!(ccd.columnType.get_comp_param() > 0)) {
2404  return;
2405  }
2406 
2407  auto dictId = ccd.columnType.get_comp_param();
2408  getMetadataForDict(dictId);
2409 
2410  const DictRef dictRef(currentDB_.dbId, dictId);
2411  auto dit = dictDescriptorMapByRef_.find(dictRef);
2412  CHECK(dit != dictDescriptorMapByRef_.end());
2413  CHECK(dit->second);
2414  CHECK(dit->second.get()->stringDict);
2415  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2416 }
2417 
2418 void Catalog::addColumn(const TableDescriptor& td, ColumnDescriptor& cd) {
2419  // caller must handle sqlite/chunk transaction TOGETHER
2420  cd.tableId = td.tableId;
2421  cd.db_id = getDatabaseId();
2422  if (td.nShards > 0 && td.shard < 0) {
2423  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2424  auto shard_cd = cd;
2425  addColumn(*shard, shard_cd);
2426  }
2427  }
2429  addDictionary(cd);
2430  }
2431 
2432  using BindType = SqliteConnector::BindType;
2433  std::vector<BindType> types(17, BindType::TEXT);
2434  if (!cd.default_value.has_value()) {
2435  types[16] = BindType::NULL_TYPE;
2436  }
2437  sqliteConnector_.query_with_text_params(
2438  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2439  "colscale, is_notnull, "
2440  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2441  "is_deletedcol, default_value) "
2442  "VALUES (?, "
2443  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2444  "?, ?, ?, "
2445  "?, "
2446  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2447  std::vector<std::string>{std::to_string(td.tableId),
2448  std::to_string(td.tableId),
2449  cd.columnName,
2458  "",
2461  cd.virtualExpr,
2463  cd.default_value.value_or("NULL")},
2464  types);
2465 
2466  sqliteConnector_.query_with_text_params(
2467  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2468  std::vector<std::string>{std::to_string(td.tableId)});
2469 
2470  sqliteConnector_.query_with_text_params(
2471  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2472  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
2473  cd.columnId = sqliteConnector_.getData<int>(0, 0);
2474 
2475  ++tableDescriptorMapById_[td.tableId]->nColumns;
2476  auto ncd = new ColumnDescriptor(cd);
2477  addToColumnMap(ncd);
2478  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
2479 }
2480 
2481 void Catalog::dropColumn(const TableDescriptor& td, const ColumnDescriptor& cd) {
2482  {
2483  cat_write_lock write_lock(this);
2484  cat_sqlite_lock sqlite_lock(getObjForLock());
2485  // caller must handle sqlite/chunk transaction TOGETHER
2486  sqliteConnector_.query_with_text_params(
2487  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2488  std::vector<std::string>{std::to_string(td.tableId),
2489  std::to_string(cd.columnId)});
2490 
2491  sqliteConnector_.query_with_text_params(
2492  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2493  std::vector<std::string>{std::to_string(td.tableId)});
2494 
2495  ColumnDescriptorMap::iterator columnDescIt =
2496  columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2497  CHECK(columnDescIt != columnDescriptorMap_.end());
2498 
2499  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2500  removeFromColumnMap(columnDescIt->second);
2501  --tableDescriptorMapById_[td.tableId]->nColumns;
2502  }
2503 
2504  // for each shard
2505  if (td.nShards > 0 && td.shard < 0) {
2506  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2507  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2508  CHECK(shard_cd);
2509  dropColumn(*shard, *shard_cd);
2510  }
2511  }
2512 }
2513 
2514 void Catalog::roll(const bool forward) {
2515  cat_write_lock write_lock(this);
2516  std::set<const TableDescriptor*> tds;
2517 
2518  for (const auto& cdr : columnDescriptorsForRoll) {
2519  auto ocd = cdr.first;
2520  auto ncd = cdr.second;
2521  CHECK(ocd || ncd);
2522  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2523  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2524  auto td = tabDescIt->second;
2525  auto& vc = td->columnIdBySpi_;
2526  if (forward) {
2527  if (ocd) {
2528  if (nullptr == ncd ||
2529  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2530  delDictionary(*ocd);
2531  }
2532 
2533  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2534 
2535  delete ocd;
2536  }
2537  if (ncd) {
2538  // append columnId if its new and not phy geo
2539  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2540  if (!ncd->isGeoPhyCol) {
2541  vc.push_back(ncd->columnId);
2542  }
2543  }
2544  }
2545  tds.insert(td);
2546  } else {
2547  if (ocd) {
2548  addToColumnMap(ocd);
2549  }
2550  // roll back the dict of new column
2551  if (ncd) {
2552  removeFromColumnMap(ncd);
2553  if (nullptr == ocd ||
2554  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2555  delDictionary(*ncd);
2556  }
2557  delete ncd;
2558  }
2559  }
2560  }
2561  columnDescriptorsForRoll.clear();
2562 
2563  if (forward) {
2564  for (const auto td : tds) {
2565  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2566  }
2567  }
2568 }
2569 
2570 void Catalog::expandGeoColumn(const ColumnDescriptor& cd,
2571  list<ColumnDescriptor>& columns) {
2572  const auto& col_ti = cd.columnType;
2573  if (IS_GEO(col_ti.get_type())) {
2574  switch (col_ti.get_type()) {
2575  case kPOINT: {
2576  ColumnDescriptor physical_cd_coords(true);
2577  physical_cd_coords.columnName = cd.columnName + "_coords";
2578  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2579  // Raw data: compressed/uncompressed coords
2580  coords_ti.set_subtype(kTINYINT);
2581  size_t unit_size;
2582  if (col_ti.get_compression() == kENCODING_GEOINT &&
2583  col_ti.get_comp_param() == 32) {
2584  unit_size = 4 * sizeof(int8_t);
2585  } else {
2586  CHECK(col_ti.get_compression() == kENCODING_NONE);
2587  unit_size = 8 * sizeof(int8_t);
2588  }
2589  coords_ti.set_size(2 * unit_size);
2590  physical_cd_coords.columnType = coords_ti;
2591  columns.push_back(physical_cd_coords);
2592 
2593  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2594 
2595  break;
2596  }
2597  case kMULTIPOINT:
2598  case kLINESTRING: {
2599  ColumnDescriptor physical_cd_coords(true);
2600  physical_cd_coords.columnName = cd.columnName + "_coords";
2601  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2602  // Raw data: compressed/uncompressed coords
2603  coords_ti.set_subtype(kTINYINT);
2604  physical_cd_coords.columnType = coords_ti;
2605  columns.push_back(physical_cd_coords);
2606 
2607  ColumnDescriptor physical_cd_bounds(true);
2608  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2609  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2610  bounds_ti.set_subtype(kDOUBLE);
2611  bounds_ti.set_size(4 * sizeof(double));
2612  physical_cd_bounds.columnType = bounds_ti;
2613  columns.push_back(physical_cd_bounds);
2614 
2615  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2616 
2617  break;
2618  }
2619  case kMULTILINESTRING: {
2620  ColumnDescriptor physical_cd_coords(true);
2621  physical_cd_coords.columnName = cd.columnName + "_coords";
2622  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2623  // Raw data: compressed/uncompressed coords
2624  coords_ti.set_subtype(kTINYINT);
2625  physical_cd_coords.columnType = coords_ti;
2626  columns.push_back(physical_cd_coords);
2627 
2628  ColumnDescriptor physical_cd_linestring_sizes(true);
2629  physical_cd_linestring_sizes.columnName = cd.columnName + "_linestring_sizes";
2630  SQLTypeInfo linestring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2631  linestring_sizes_ti.set_subtype(kINT);
2632  physical_cd_linestring_sizes.columnType = linestring_sizes_ti;
2633  columns.push_back(physical_cd_linestring_sizes);
2634 
2635  ColumnDescriptor physical_cd_bounds(true);
2636  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2637  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2638  bounds_ti.set_subtype(kDOUBLE);
2639  bounds_ti.set_size(4 * sizeof(double));
2640  physical_cd_bounds.columnType = bounds_ti;
2641  columns.push_back(physical_cd_bounds);
2642 
2643  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2644 
2645  break;
2646  }
2647  case kPOLYGON: {
2648  ColumnDescriptor physical_cd_coords(true);
2649  physical_cd_coords.columnName = cd.columnName + "_coords";
2650  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2651  // Raw data: compressed/uncompressed coords
2652  coords_ti.set_subtype(kTINYINT);
2653  physical_cd_coords.columnType = coords_ti;
2654  columns.push_back(physical_cd_coords);
2655 
2656  ColumnDescriptor physical_cd_ring_sizes(true);
2657  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2658  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2659  ring_sizes_ti.set_subtype(kINT);
2660  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2661  columns.push_back(physical_cd_ring_sizes);
2662 
2663  ColumnDescriptor physical_cd_bounds(true);
2664  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2665  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2666  bounds_ti.set_subtype(kDOUBLE);
2667  bounds_ti.set_size(4 * sizeof(double));
2668  physical_cd_bounds.columnType = bounds_ti;
2669  columns.push_back(physical_cd_bounds);
2670 
2671  ColumnDescriptor physical_cd_render_group(true);
2672  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2673  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2674  physical_cd_render_group.columnType = render_group_ti;
2675  columns.push_back(physical_cd_render_group);
2676 
2677  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2678 
2679  break;
2680  }
2681  case kMULTIPOLYGON: {
2682  ColumnDescriptor physical_cd_coords(true);
2683  physical_cd_coords.columnName = cd.columnName + "_coords";
2684  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2685  // Raw data: compressed/uncompressed coords
2686  coords_ti.set_subtype(kTINYINT);
2687  physical_cd_coords.columnType = coords_ti;
2688  columns.push_back(physical_cd_coords);
2689 
2690  ColumnDescriptor physical_cd_ring_sizes(true);
2691  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2692  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2693  ring_sizes_ti.set_subtype(kINT);
2694  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2695  columns.push_back(physical_cd_ring_sizes);
2696 
2697  ColumnDescriptor physical_cd_poly_rings(true);
2698  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2699  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2700  poly_rings_ti.set_subtype(kINT);
2701  physical_cd_poly_rings.columnType = poly_rings_ti;
2702  columns.push_back(physical_cd_poly_rings);
2703 
2704  ColumnDescriptor physical_cd_bounds(true);
2705  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2706  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2707  bounds_ti.set_subtype(kDOUBLE);
2708  bounds_ti.set_size(4 * sizeof(double));
2709  physical_cd_bounds.columnType = bounds_ti;
2710  columns.push_back(physical_cd_bounds);
2711 
2712  ColumnDescriptor physical_cd_render_group(true);
2713  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2714  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2715  physical_cd_render_group.columnType = render_group_ti;
2716  columns.push_back(physical_cd_render_group);
2717 
2718  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2719 
2720  break;
2721  }
2722  default:
2723  throw runtime_error("Unrecognized geometry type.");
2724  break;
2725  }
2726  }
2727 }
2728 
2729 namespace {
2731  auto timing_type_entry =
2733  CHECK(timing_type_entry != foreign_table.options.end());
2734  if (timing_type_entry->second ==
2737  foreign_table.options);
2738  }
2740 }
2741 } // namespace
2742 
2743 void Catalog::createTable(
2744  TableDescriptor& td,
2745  const list<ColumnDescriptor>& cols,
2746  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2747  bool isLogicalTable) {
2748  cat_write_lock write_lock(this);
2749  list<ColumnDescriptor> cds = cols;
2750  list<DictDescriptor> dds;
2751  std::set<std::string> toplevel_column_names;
2752  list<ColumnDescriptor> columns;
2753 
2754  if (!td.storageType.empty() &&
2757  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2758  }
2759  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2760  }
2761 
2762  for (auto cd : cds) {
2763  if (cd.columnName == "rowid") {
2764  throw std::runtime_error(
2765  "Cannot create column with name rowid. rowid is a system defined column.");
2766  }
2767  columns.push_back(cd);
2768  toplevel_column_names.insert(cd.columnName);
2769  if (cd.columnType.is_geometry()) {
2770  expandGeoColumn(cd, columns);
2771  }
2772  }
2773  cds.clear();
2774 
2775  ColumnDescriptor cd;
2776  // add row_id column -- Must be last column in the table
2777  cd.columnName = "rowid";
2778  cd.isSystemCol = true;
2779  cd.columnType = SQLTypeInfo(kBIGINT, true);
2780 #ifdef MATERIALIZED_ROWID
2781  cd.isVirtualCol = false;
2782 #else
2783  cd.isVirtualCol = true;
2784  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2785 #endif
2786  columns.push_back(cd);
2787  toplevel_column_names.insert(cd.columnName);
2788 
2789  if (td.hasDeletedCol) {
2790  ColumnDescriptor cd_del;
2791  cd_del.columnName = "$deleted$";
2792  cd_del.isSystemCol = true;
2793  cd_del.isVirtualCol = false;
2794  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2795  cd_del.isDeletedCol = true;
2796 
2797  columns.push_back(cd_del);
2798  }
2799 
2800  for (auto& column : columns) {
2801  column.db_id = getDatabaseId();
2802  }
2803 
2804  td.nColumns = columns.size();
2805  // TODO(sy): don't take disk locks or touch sqlite connector for temporary tables
2806  cat_sqlite_lock sqlite_lock(getObjForLock());
2807  sqliteConnector_.query("BEGIN TRANSACTION");
2809  try {
2810  sqliteConnector_.query_with_text_params(
2811  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2812  std::vector<std::string>{td.tableName,
2813  std::to_string(td.userId),
2815  std::to_string(td.isView),
2816  "",
2821  std::to_string(td.maxRows),
2822  td.partitions,
2824  std::to_string(td.shard),
2825  std::to_string(td.nShards),
2827  td.storageType,
2830  td.keyMetainfo});
2831 
2832  // now get the auto generated tableid
2833  sqliteConnector_.query_with_text_param(
2834  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2835  td.tableId = sqliteConnector_.getData<int>(0, 0);
2836  int colId = 1;
2837  for (auto cd : columns) {
2839  const bool is_foreign_col =
2840  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2841  if (!is_foreign_col) {
2842  // Ideally we would like to not persist string dictionaries for system tables,
2843  // since system table content can be highly dynamic and string dictionaries
2844  // are not currently vacuumed. However, in distributed this causes issues
2845  // when the cluster is out of sync (when the agg resets but leaves persist) so
2846  // for the sake of testing we need to leave this as normal dictionaries until
2847  // we solve the distributed issue.
2848  auto use_temp_dictionary = false; // td.is_system_table;
2849  setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
2850  }
2851  }
2852 
2853  if (toplevel_column_names.count(cd.columnName)) {
2854  if (!cd.isGeoPhyCol) {
2855  td.columnIdBySpi_.push_back(colId);
2856  }
2857  }
2858 
2859  using BindType = SqliteConnector::BindType;
2860  std::vector<BindType> types(17, BindType::TEXT);
2861  if (!cd.default_value.has_value()) {
2862  types[16] = BindType::NULL_TYPE;
2863  }
2864  sqliteConnector_.query_with_text_params(
2865  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2866  "coldim, colscale, is_notnull, "
2867  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2868  "virtual_expr, is_deletedcol, default_value) "
2869  "VALUES (?, ?, ?, ?, ?, "
2870  "?, "
2871  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2872  std::vector<std::string>{std::to_string(td.tableId),
2873  std::to_string(colId),
2874  cd.columnName,
2883  "",
2886  cd.virtualExpr,
2888  cd.default_value.value_or("NULL")},
2889  types);
2890  cd.tableId = td.tableId;
2891  cd.columnId = colId++;
2892  cds.push_back(cd);
2893  }
2894  if (td.isView) {
2895  sqliteConnector_.query_with_text_params(
2896  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2897  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2898  }
2900  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2901  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2902  sqliteConnector_.query_with_text_params(
2903  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2904  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2905  std::vector<std::string>{std::to_string(foreign_table.tableId),
2906  std::to_string(foreign_table.foreign_server->id),
2907  foreign_table.getOptionsAsJsonString(),
2908  std::to_string(foreign_table.last_refresh_time),
2909  std::to_string(foreign_table.next_refresh_time)});
2910  }
2911  } catch (std::exception& e) {
2912  sqliteConnector_.query("ROLLBACK TRANSACTION");
2913  throw;
2914  }
2915  } else { // Temporary table
2916  td.tableId = nextTempTableId_++;
2917  int colId = 1;
2918  for (auto cd : columns) {
2920  const bool is_foreign_col =
2921  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2922 
2923  if (!is_foreign_col) {
2924  // Create a new temporary dictionary
2925  std::string fileName("");
2926  std::string folderPath("");
2927  DictRef dict_ref(currentDB_.dbId, nextTempDictId_);
2928  nextTempDictId_++;
2929  DictDescriptor dd(dict_ref,
2930  fileName,
2932  false,
2933  1,
2934  folderPath,
2935  true); // Is dictName (2nd argument) used?
2936  dds.push_back(dd);
2937  if (!cd.columnType.is_array()) {
2939  }
2940  cd.columnType.set_comp_param(dict_ref.dictId);
2941  set_dict_key(cd);
2942  }
2943  }
2944  if (toplevel_column_names.count(cd.columnName)) {
2945  if (!cd.isGeoPhyCol) {
2946  td.columnIdBySpi_.push_back(colId);
2947  }
2948  }
2949  cd.tableId = td.tableId;
2950  cd.columnId = colId++;
2951  cds.push_back(cd);
2952  }
2953 
2955  serializeTableJsonUnlocked(&td, cds);
2956  }
2957  }
2958 
2959  try {
2960  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
2961  if (cache) {
2962  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
2963  << "Disk cache at " + cache->getCacheDirectory()
2964  << " contains preexisting data for new table. Please "
2965  "delete or clear cache before continuing";
2966  }
2967 
2968  addTableToMap(&td, cds, dds);
2969  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2970  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2971  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2972  }
2973  } catch (std::exception& e) {
2974  sqliteConnector_.query("ROLLBACK TRANSACTION");
2975  removeTableFromMap(td.tableName, td.tableId, true);
2976  throw;
2977  }
2978  sqliteConnector_.query("END TRANSACTION");
2979 
2980  if (td.storageType != StorageType::FOREIGN_TABLE) {
2981  write_lock.unlock();
2982  sqlite_lock.unlock();
2983  getMetadataForTable(td.tableName,
2984  true); // cause instantiateFragmenter() to be called
2985  }
2986 }
2987 
2988 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
2989  const std::list<ColumnDescriptor>& cds) const {
2990  // relies on the catalog write lock
2991  using namespace rapidjson;
2992 
2993  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2994 
2995  const auto db_name = currentDB_.dbName;
2996  const auto file_path = table_json_filepath(basePath_, db_name);
2997 
2998  Document d;
2999  if (boost::filesystem::exists(file_path)) {
3000  // look for an existing file for this database
3001  std::ifstream reader(file_path.string());
3002  CHECK(reader.is_open());
3003  IStreamWrapper json_read_wrapper(reader);
3004  d.ParseStream(json_read_wrapper);
3005  } else {
3006  d.SetObject();
3007  }
3008  CHECK(d.IsObject());
3009  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
3010 
3011  Value table(kObjectType);
3012  table.AddMember(
3013  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
3014  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
3015  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
3016 
3017  for (const auto& cd : cds) {
3018  Value column(kObjectType);
3019  column.AddMember(
3020  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
3021  column.AddMember("coltype",
3022  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
3023  d.GetAllocator());
3024  column.AddMember("colsubtype",
3025  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
3026  d.GetAllocator());
3027  column.AddMember("compression",
3028  Value().SetInt(static_cast<int>(cd.columnType.get_compression())),
3029  d.GetAllocator());
3030  column.AddMember("comp_param",
3031  Value().SetInt(static_cast<int>(cd.columnType.get_comp_param())),
3032  d.GetAllocator());
3033  column.AddMember("size",
3034  Value().SetInt(static_cast<int>(cd.columnType.get_size())),
3035  d.GetAllocator());
3036  column.AddMember(
3037  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
3038  column.AddMember(
3039  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
3040  column.AddMember(
3041  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
3042  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
3043  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
3044  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
3045  table["columns"].PushBack(column, d.GetAllocator());
3046  }
3047  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
3048 
3049  // Overwrite the existing file
3050  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3051  CHECK(writer.is_open());
3052  OStreamWrapper json_wrapper(writer);
3053 
3054  Writer<OStreamWrapper> json_writer(json_wrapper);
3055  d.Accept(json_writer);
3056  writer.close();
3057 }
3058 
3059 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
3060  // relies on the catalog write lock
3061  using namespace rapidjson;
3062 
3063  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
3064 
3065  const auto db_name = currentDB_.dbName;
3066  const auto file_path = table_json_filepath(basePath_, db_name);
3067 
3068  CHECK(boost::filesystem::exists(file_path));
3069  Document d;
3070 
3071  std::ifstream reader(file_path.string());
3072  CHECK(reader.is_open());
3073  IStreamWrapper json_read_wrapper(reader);
3074  d.ParseStream(json_read_wrapper);
3075 
3076  CHECK(d.IsObject());
3077  auto table_name_ref = StringRef(table_name.c_str());
3078  CHECK(d.HasMember(table_name_ref));
3079  CHECK(d.RemoveMember(table_name_ref));
3080 
3081  // Overwrite the existing file
3082  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3083  CHECK(writer.is_open());
3084  OStreamWrapper json_wrapper(writer);
3085 
3086  Writer<OStreamWrapper> json_writer(json_wrapper);
3087  d.Accept(json_writer);
3088  writer.close();
3089 }
3090 
3091 void Catalog::createForeignServer(
3092  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3093  bool if_not_exists) {
3094  cat_write_lock write_lock(this);
3095  cat_sqlite_lock sqlite_lock(getObjForLock());
3096  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
3097 }
3098 
3099 void Catalog::createForeignServerNoLocks(
3100  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3101  bool if_not_exists) {
3102  const auto& name = foreign_server->name;
3103 
3104  sqliteConnector_.query_with_text_params(
3105  "SELECT name from omnisci_foreign_servers where name = ?",
3106  std::vector<std::string>{name});
3107 
3108  if (sqliteConnector_.getNumRows() == 0) {
3109  foreign_server->creation_time = std::time(nullptr);
3110  sqliteConnector_.query_with_text_params(
3111  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
3112  "creation_time, "
3113  "options) "
3114  "VALUES (?, ?, ?, ?, ?)",
3115  std::vector<std::string>{name,
3116  foreign_server->data_wrapper_type,
3117  std::to_string(foreign_server->user_id),
3118  std::to_string(foreign_server->creation_time),
3119  foreign_server->getOptionsAsJsonString()});
3120  sqliteConnector_.query_with_text_params(
3121  "SELECT id from omnisci_foreign_servers where name = ?",
3122  std::vector<std::string>{name});
3123  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
3124  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
3125  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
3126  std::move(foreign_server);
3127  CHECK(foreignServerMap_.find(name) == foreignServerMap_.end())
3128  << "Attempting to insert a foreign server into foreign server map that already "
3129  "exists.";
3130  foreignServerMap_[name] = foreign_server_shared;
3131  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
3132  } else if (!if_not_exists) {
3133  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
3134  "\" already exists."};
3135  }
3136 
3137  const auto& server_it = foreignServerMap_.find(name);
3138  CHECK(server_it != foreignServerMap_.end());
3139  CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
3140 }
3141 
3142 const foreign_storage::ForeignServer* Catalog::getForeignServer(
3143  const std::string& server_name) const {
3144  foreign_storage::ForeignServer* foreign_server = nullptr;
3145  cat_read_lock read_lock(this);
3146 
3147  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
3148  foreign_server = foreignServerMap_.find(server_name)->second.get();
3149  }
3150  return foreign_server;
3151 }
3152 
3153 const std::unique_ptr<const foreign_storage::ForeignServer>
3154 Catalog::getForeignServerFromStorage(const std::string& server_name) {
3155  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
3156  cat_sqlite_lock sqlite_lock(getObjForLock());
3157  sqliteConnector_.query_with_text_params(
3158  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
3159  "FROM omnisci_foreign_servers WHERE name = ?",
3160  std::vector<std::string>{server_name});
3161  if (sqliteConnector_.getNumRows() > 0) {
3162  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
3163  sqliteConnector_.getData<int>(0, 0),
3164  sqliteConnector_.getData<std::string>(0, 1),
3165  sqliteConnector_.getData<std::string>(0, 2),
3166  sqliteConnector_.getData<std::string>(0, 3),
3167  sqliteConnector_.getData<std::int32_t>(0, 4),
3168  sqliteConnector_.getData<std::int32_t>(0, 5));
3169  }
3170  return foreign_server;
3171 }
3172 
3173 const std::unique_ptr<const foreign_storage::ForeignTable>
3174 Catalog::getForeignTableFromStorage(int table_id) {
3175  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
3176  cat_sqlite_lock sqlite_lock(getObjForLock());
3177  sqliteConnector_.query_with_text_params(
3178  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
3179  "omnisci_foreign_tables WHERE table_id = ?",
3180  std::vector<std::string>{to_string(table_id)});
3181  auto num_rows = sqliteConnector_.getNumRows();
3182  if (num_rows > 0) {
3183  CHECK_EQ(size_t(1), num_rows);
3184  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
3185  sqliteConnector_.getData<int>(0, 0),
3186  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
3187  sqliteConnector_.getData<std::string>(0, 2),
3188  sqliteConnector_.getData<int64_t>(0, 3),
3189  sqliteConnector_.getData<int64_t>(0, 4));
3190  }
3191  return foreign_table;
3192 }
3193 
3194 void Catalog::changeForeignServerOwner(const std::string& server_name,
3195  const int new_owner_id) {
3196  cat_write_lock write_lock(this);
3197  foreign_storage::ForeignServer* foreign_server =
3198  foreignServerMap_.find(server_name)->second.get();
3199  CHECK(foreign_server);
3200  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
3201  // update in-memory server
3202  foreign_server->user_id = new_owner_id;
3203 }
3204 
3205 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
3206  const std::string& data_wrapper) {
3207  cat_write_lock write_lock(this);
3208  auto data_wrapper_type = to_upper(data_wrapper);
3209  // update in-memory server
3210  foreign_storage::ForeignServer* foreign_server =
3211  foreignServerMap_.find(server_name)->second.get();
3212  CHECK(foreign_server);
3213  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
3214  foreign_server->data_wrapper_type = data_wrapper_type;
3215  try {
3216  foreign_server->validate();
3217  } catch (const std::exception& e) {
3218  // validation did not succeed:
3219  // revert to saved data_wrapper_type & throw exception
3220  foreign_server->data_wrapper_type = saved_data_wrapper_type;
3221  throw;
3222  }
3223  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
3224 }
3225 
3226 void Catalog::setForeignServerOptions(const std::string& server_name,
3227  const std::string& options) {
3228  cat_write_lock write_lock(this);
3229  // update in-memory server
3230  foreign_storage::ForeignServer* foreign_server =
3231  foreignServerMap_.find(server_name)->second.get();
3232  CHECK(foreign_server);
3233  auto saved_options = foreign_server->options;
3234  foreign_server->populateOptionsMap(options, true);
3235  try {
3236  foreign_server->validate();
3237  } catch (const std::exception& e) {
3238  // validation did not succeed:
3239  // revert to saved options & throw exception
3240  foreign_server->options = saved_options;
3241  throw;
3242  }
3243  setForeignServerProperty(server_name, "options", options);
3244 }
3245 
3246 void Catalog::renameForeignServer(const std::string& server_name,
3247  const std::string& name) {
3248  cat_write_lock write_lock(this);
3249  auto foreign_server_it = foreignServerMap_.find(server_name);
3250  CHECK(foreign_server_it != foreignServerMap_.end());
3251  setForeignServerProperty(server_name, "name", name);
3252  auto foreign_server_shared = foreign_server_it->second;
3253  foreign_server_shared->name = name;
3254  foreignServerMap_[name] = foreign_server_shared;
3255  foreignServerMap_.erase(foreign_server_it);
3256 }
3257 
3258 void Catalog::dropForeignServer(const std::string& server_name) {
3259  cat_write_lock write_lock(this);
3260  cat_sqlite_lock sqlite_lock(getObjForLock());
3261 
3262  sqliteConnector_.query_with_text_params(
3263  "SELECT id from omnisci_foreign_servers where name = ?",
3264  std::vector<std::string>{server_name});
3265  auto num_rows = sqliteConnector_.getNumRows();
3266  if (num_rows > 0) {
3267  CHECK_EQ(size_t(1), num_rows);
3268  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
3269  sqliteConnector_.query_with_text_param(
3270  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
3271  std::to_string(server_id));
3272  if (sqliteConnector_.getNumRows() > 0) {
3273  throw std::runtime_error{"Foreign server \"" + server_name +
3274  "\" is referenced "
3275  "by existing foreign tables and cannot be dropped."};
3276  }
3277  sqliteConnector_.query("BEGIN TRANSACTION");
3278  try {
3279  sqliteConnector_.query_with_text_params(
3280  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
3281  std::vector<std::string>{server_name});
3282  } catch (const std::exception& e) {
3283  sqliteConnector_.query("ROLLBACK TRANSACTION");
3284  throw;
3285  }
3286  sqliteConnector_.query("END TRANSACTION");
3287  foreignServerMap_.erase(server_name);
3288  foreignServerMapById_.erase(server_id);
3289  }
3290 }
3291 
3292 void Catalog::getForeignServersForUser(
3293  const rapidjson::Value* filters,
3294  const UserMetadata& user,
3295  std::vector<const foreign_storage::ForeignServer*>& results) {
3296  sys_read_lock syscat_read_lock(&SysCatalog::instance());
3297  cat_read_lock read_lock(this);
3298  cat_sqlite_lock sqlite_lock(getObjForLock());
3299  // Customer facing and internal SQlite names
3300  std::map<std::string, std::string> col_names{{"server_name", "name"},
3301  {"data_wrapper", "data_wrapper_type"},
3302  {"created_at", "creation_time"},
3303  {"options", "options"}};
3304 
3305  // TODO add "owner" when FSI privilege is implemented
3306  std::stringstream filter_string;
3307  std::vector<std::string> arguments;
3308 
3309  if (filters != nullptr) {
3310  // Create SQL WHERE clause for SQLite query
3311  int num_filters = 0;
3312  filter_string << " WHERE";
3313  for (auto& filter_def : filters->GetArray()) {
3314  if (num_filters > 0) {
3315  filter_string << " " << std::string(filter_def["chain"].GetString());
3316  ;
3317  }
3318 
3319  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
3320  col_names.end()) {
3321  throw std::runtime_error{"Attribute with name \"" +
3322  std::string(filter_def["attribute"].GetString()) +
3323  "\" does not exist."};
3324  }
3325 
3326  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
3327 
3328  bool equals_operator = false;
3329  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
3330  filter_string << " = ? ";
3331  equals_operator = true;
3332  } else {
3333  filter_string << " LIKE ? ";
3334  }
3335 
3336  bool timestamp_column =
3337  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
3338 
3339  if (timestamp_column && !equals_operator) {
3340  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
3341  }
3342 
3343  if (timestamp_column && equals_operator) {
3344  arguments.push_back(std::to_string(
3345  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
3346  } else {
3347  arguments.emplace_back(filter_def["value"].GetString());
3348  }
3349 
3350  num_filters++;
3351  }
3352  }
3353  // Create select query for the omnisci_foreign_servers table
3354  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
3355  query += filter_string.str();
3356 
3357  sqliteConnector_.query_with_text_params(query, arguments);
3358  auto num_rows = sqliteConnector_.getNumRows();
3359 
3360  if (sqliteConnector_.getNumRows() == 0) {
3361  return;
3362  }
3363 
3364  CHECK(sqliteConnector_.getNumCols() == 1);
3365  // Return pointers to objects
3366  results.reserve(num_rows);
3367  for (size_t row = 0; row < num_rows; ++row) {
3368  const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
3369  if (shared::contains(INTERNAL_SERVERS, server_name)) {
3370  continue;
3371  }
3372  const foreign_storage::ForeignServer* foreign_server = getForeignServer(server_name);
3373  CHECK(foreign_server != nullptr);
3374 
3375  DBObject dbObject(foreign_server->name, ServerDBObjectType);
3376  dbObject.loadKey(*this);
3377  std::vector<DBObject> privObjects = {dbObject};
3378  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
3379  // skip server, as there are no privileges to access it
3380  continue;
3381  }
3382  results.push_back(foreign_server);
3383  }
3384 }
3385 
3386 // returns the table epoch or -1 if there is something wrong with the shared epoch
3387 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
3388  cat_read_lock read_lock(this);
3389  const auto td = getMetadataForTable(table_id, false);
3390  if (!td) {
3391  std::stringstream table_not_found_error_message;
3392  table_not_found_error_message << "Table (" << db_id << "," << table_id
3393  << ") not found";
3394  throw std::runtime_error(table_not_found_error_message.str());
3395  }
3396  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3397  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3398  // check all shards have same checkpoint
3399  const auto physicalTables = physicalTableIt->second;
3400  CHECK(!physicalTables.empty());
3401  size_t curr_epoch{0}, first_epoch{0};
3402  int32_t first_table_id{0};
3403  bool are_epochs_inconsistent{false};
3404  for (size_t i = 0; i < physicalTables.size(); i++) {
3405  int32_t physical_tb_id = physicalTables[i];
3406  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3407  CHECK(phys_td);
3408 
3409  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3410  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3411  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
3412  if (i == 0) {
3413  first_epoch = curr_epoch;
3414  first_table_id = physical_tb_id;
3415  } else if (first_epoch != curr_epoch) {
3416  are_epochs_inconsistent = true;
3417  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
3418  << ", db id: " << db_id
3419  << ". First table (table id: " << first_table_id
3420  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
3421  << ", has inconsistent epoch: " << curr_epoch
3422  << ". See previous INFO logs for all epochs and their table ids.";
3423  }
3424  }
3425  if (are_epochs_inconsistent) {
3426  // oh dear the shards do not agree on the epoch for this table
3427  return -1;
3428  }
3429  return curr_epoch;
3430  } else {
3431  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3432  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3433  << ", epoch: " << epoch;
3434  return epoch;
3435  }
3436 }
3437 
3438 std::vector<const foreign_storage::ForeignTable*>
3439 Catalog::getAllForeignTablesForForeignServer(const int32_t foreign_server_id) {
3440  cat_read_lock read_lock(this);
3441  std::vector<const foreign_storage::ForeignTable*> foreign_tables;
3442  for (auto entry : tableDescriptorMapById_) {
3443  auto table_descriptor = entry.second;
3444  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
3445  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
3446  CHECK(foreign_table);
3447  if (foreign_table->foreign_server->id == foreign_server_id) {
3448  foreign_tables.emplace_back(foreign_table);
3449  }
3450  }
3451  }
3452  return foreign_tables;
3453 }
3454 
3455 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
3456  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
3457  << " back to new epoch " << new_epoch;
3458  const auto td = getMetadataForTable(table_id, false);
3459  if (!td) {
3460  std::stringstream table_not_found_error_message;
3461  table_not_found_error_message << "Table (" << db_id << "," << table_id
3462  << ") not found";
3463  throw std::runtime_error(table_not_found_error_message.str());
3464  }
3465  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3466  std::stringstream is_temp_table_error_message;
3467  is_temp_table_error_message << "Cannot set epoch on temporary table";
3468  throw std::runtime_error(is_temp_table_error_message.str());
3469  }
3470 
3471  File_Namespace::FileMgrParams file_mgr_params;
3472  file_mgr_params.epoch = new_epoch;
3473  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3474 
3475  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3476  CHECK(!physical_tables.empty());
3477  for (const auto table : physical_tables) {
3478  auto table_id = table->tableId;
3479  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID " << table_id
3480  << " back to new epoch " << new_epoch;
3481  // Should have table lock from caller so safe to do this after, avoids
3482  // having to repopulate data on error
3483  removeChunks(table_id);
3484  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3485  }
3486 }
3487 
3488 void Catalog::alterPhysicalTableMetadata(
3489  const TableDescriptor* td,
3490  const TableDescriptorUpdateParams& table_update_params) {
3491  // Only called from parent alterTableParamMetadata, expect already to have catalog and
3492  // sqlite write locks
3493 
3494  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
3495 
3496  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
3497  CHECK(mutable_td);
3498  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
3499  sqliteConnector_.query_with_text_params(
3500  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3501  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
3502  std::to_string(td->tableId)});
3503  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
3504  }
3505 
3506  if (td->maxRows != table_update_params.max_rows) {
3507  sqliteConnector_.query_with_text_params(
3508  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3509  std::vector<std::string>{std::to_string(table_update_params.max_rows),
3510  std::to_string(td->tableId)});
3511  mutable_td->maxRows = table_update_params.max_rows;
3512  }
3513 }
3514 
3515 void Catalog::alterTableMetadata(const TableDescriptor* td,
3516  const TableDescriptorUpdateParams& table_update_params) {
3517  cat_write_lock write_lock(this);
3518  cat_sqlite_lock sqlite_lock(getObjForLock());
3519  sqliteConnector_.query("BEGIN TRANSACTION");
3520  try {
3521  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
3522  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3523  const auto physical_tables = physical_table_it->second;
3524  CHECK(!physical_tables.empty());
3525  for (size_t i = 0; i < physical_tables.size(); i++) {
3526  int32_t physical_tb_id = physical_tables[i];
3527  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3528  CHECK(phys_td);
3529  alterPhysicalTableMetadata(phys_td, table_update_params);
3530  }
3531  }
3532  alterPhysicalTableMetadata(td, table_update_params);
3533  } catch (std::exception& e) {
3534  sqliteConnector_.query("ROLLBACK TRANSACTION");
3535  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
3536  }
3537  sqliteConnector_.query("END TRANSACTION");
3538 }
3539 
3540 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3541  const int32_t max_rollback_epochs) {
3542  // Must be called from AlterTableParamStmt or other method that takes executor and
3543  // TableSchema locks
3544  if (max_rollback_epochs <= -1) {
3545  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3546  }
3547  const auto td = getMetadataForTable(
3548  table_id, false); // Deep copy as there will be gap between read and write locks
3549  CHECK(td); // Existence should have already been checked in
3550  // ParserNode::AlterTableParmStmt
3551  TableDescriptorUpdateParams table_update_params(td);
3552  table_update_params.max_rollback_epochs = max_rollback_epochs;
3553  if (table_update_params == td) { // Operator is overloaded to test for equality
3554  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3555  << " to existing value, skipping operation";
3556  return;
3557  }
3558  File_Namespace::FileMgrParams file_mgr_params;
3559  file_mgr_params.epoch = -1; // Use existing epoch
3560  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3561  setTableFileMgrParams(table_id, file_mgr_params);
3562  alterTableMetadata(td, table_update_params);
3563 }
3564 
3565 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3566  if (max_rows < 0) {
3567  throw std::runtime_error("Max rows cannot be a negative number.");
3568  }
3569  const auto td = getMetadataForTable(table_id);
3570  CHECK(td);
3571  TableDescriptorUpdateParams table_update_params(td);
3572  table_update_params.max_rows = max_rows;
3573  if (table_update_params == td) {
3574  LOG(INFO) << "Max rows value of " << max_rows
3575  << " is the same as the existing value. Skipping update.";
3576  return;
3577  }
3578  alterTableMetadata(td, table_update_params);
3579  CHECK(td->fragmenter);
3580  td->fragmenter->dropFragmentsToSize(max_rows);
3581 }
3582 
3583 // For testing purposes only
3584 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3585  cat_write_lock write_lock(this);
3586  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3587  CHECK(td_entry != tableDescriptorMap_.end());
3588  auto td = td_entry->second;
3589 
3590  std::vector<int> table_key{getCurrentDB().dbId, td->tableId};
3591  ResultSetCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
3592 
3593  TableDescriptorUpdateParams table_update_params(td);
3594  table_update_params.max_rollback_epochs = -1;
3595  write_lock.unlock();
3596 
3597  alterTableMetadata(td, table_update_params);
3598  File_Namespace::FileMgrParams file_mgr_params;
3599  file_mgr_params.max_rollback_epochs = -1;
3600  setTableFileMgrParams(td->tableId, file_mgr_params);
3601 }
3602 
3603 void Catalog::setTableFileMgrParams(
3604  const int table_id,
3605  const File_Namespace::FileMgrParams& file_mgr_params) {
3606  // Expects parent to have write lock
3607  const auto td = getMetadataForTable(table_id, false);
3608  const auto db_id = this->getDatabaseId();
3609  if (!td) {
3610  std::stringstream table_not_found_error_message;
3611  table_not_found_error_message << "Table (" << db_id << "," << table_id
3612  << ") not found";
3613  throw std::runtime_error(table_not_found_error_message.str());
3614  }
3615  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3616  std::stringstream is_temp_table_error_message;
3617  is_temp_table_error_message << "Cannot set storage params on temporary table";
3618  throw std::runtime_error(is_temp_table_error_message.str());
3619  }
3620 
3621  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3622  CHECK(!physical_tables.empty());
3623  for (const auto table : physical_tables) {
3624  auto table_id = table->tableId;
3625  removeChunks(table_id);
3626  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3627  }
3628 }
3629 
3630 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3631  const int32_t table_id) const {
3632  cat_read_lock read_lock(this);
3633  std::vector<TableEpochInfo> table_epochs;
3634  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3635  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3636  const auto physical_tables = physical_table_it->second;
3637  CHECK(!physical_tables.empty());
3638 
3639  for (const auto physical_tb_id : physical_tables) {
3640  const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3641  CHECK(phys_td);
3642 
3643  auto table_id = phys_td->tableId;
3644  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3645  table_epochs.emplace_back(table_id, epoch);
3646  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3647  << ", table id: " << table_id << ", epoch: " << epoch;
3648  }
3649  } else {
3650  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3651  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3652  << ", epoch: " << epoch;
3653  table_epochs.emplace_back(table_id, epoch);
3654  }
3655  return table_epochs;
3656 }
3657 
3658 void Catalog::setTableEpochs(const int32_t db_id,
3659  const std::vector<TableEpochInfo>& table_epochs) const {
3660  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3661  CHECK(td);
3662  File_Namespace::FileMgrParams file_mgr_params;
3663  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3664 
3665  for (const auto& table_epoch_info : table_epochs) {
3666  removeChunks(table_epoch_info.table_id);
3667  file_mgr_params.epoch = table_epoch_info.table_epoch;
3668  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3669  db_id, table_epoch_info.table_id, file_mgr_params);
3670  LOG(INFO) << "Set table epoch for db id: " << db_id
3671  << ", table id: " << table_epoch_info.table_id
3672  << ", back to epoch: " << table_epoch_info.table_epoch;
3673  }
3674 }
3675 
3676 namespace {
3677 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3678  std::string table_epochs_str{"["};
3679  bool first_entry{true};
3680  for (const auto& table_epoch : table_epochs) {
3681  if (first_entry) {
3682  first_entry = false;
3683  } else {
3684  table_epochs_str += ", ";
3685  }
3686  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3687  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3688  }
3689  table_epochs_str += "]";
3690  return table_epochs_str;
3691 }
3692 } // namespace
3693 
3694 void Catalog::setTableEpochsLogExceptions(
3695  const int32_t db_id,
3696  const std::vector<TableEpochInfo>& table_epochs) const {
3697  try {
3698  setTableEpochs(db_id, table_epochs);
3699  } catch (std::exception& e) {
3700  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3701  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3702  << ", Error: " << e.what();
3703  }
3704 }
3705 
3706 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3707  cat_read_lock read_lock(this);
3708  const auto it = deletedColumnPerTable_.find(td);
3709  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3710 }
3711 
3712 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3713  int delete_column_id) const {
3714  // check if there are rows deleted by examining the deletedColumn metadata
3715  CHECK(td);
3716  auto fragmenter = td->fragmenter;
3717  if (fragmenter) {
3718  return fragmenter->hasDeletedRows(delete_column_id);
3719  } else {
3720  return false;
3721  }
3722 }
3723 
3724 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3725  const TableDescriptor* td) const {
3726  std::vector<const TableDescriptor*> tds;
3727  const ColumnDescriptor* cd;
3728  {
3729  cat_read_lock read_lock(this);
3730 
3731  const auto it = deletedColumnPerTable_.find(td);
3732  // if not a table that supports delete return nullptr, nothing more to do
3733  if (it == deletedColumnPerTable_.end()) {
3734  return nullptr;
3735  }
3736  cd = it->second;
3737  tds = getPhysicalTablesDescriptors(td, false);
3738  }
3739  // individual tables are still protected by higher level locks
3740  for (auto tdd : tds) {
3741  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3742  return cd;
3743  }
3744  }
3745  // no deletes so far recorded in metadata
3746  return nullptr;
3747 }
3748 
3749 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3750  cat_write_lock write_lock(this);
3751  setDeletedColumnUnlocked(td, cd);
3752 }
3753 
3754 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3755  const ColumnDescriptor* cd) {
3756  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3757  CHECK(it_ok.second);
3758 }
3759 
3760 namespace {
3761 
3763  const Catalog& cat,
3764  const Parser::SharedDictionaryDef& shared_dict_def) {
3765  const auto& table_name = shared_dict_def.get_foreign_table();
3766  const auto td = cat.getMetadataForTable(table_name, false);
3767  CHECK(td);
3768  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3769  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3770 }
3771 
3772 } // namespace
3773 
3774 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3775  Parser::SharedDictionaryDef shared_dict_def,
3776  const bool persist_reference) {
3777  cat_write_lock write_lock(this);
3778  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3779  CHECK(foreign_ref_col);
3780  referencing_column.columnType = foreign_ref_col->columnType;
3781  const int dict_id = referencing_column.columnType.get_comp_param();
3782  const DictRef dict_ref(currentDB_.dbId, dict_id);
3783  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3784  CHECK(dictIt != dictDescriptorMapByRef_.end());
3785  const auto& dd = dictIt->second;
3786  CHECK_GE(dd->refcount, 1);
3787  ++dd->refcount;
3788  if (persist_reference) {
3789  cat_sqlite_lock sqlite_lock(getObjForLock());
3790  sqliteConnector_.query_with_text_params(
3791  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3792  {std::to_string(dict_id)});
3793  }
3794 }
3795 
3796 bool Catalog::setColumnSharedDictionary(
3797  ColumnDescriptor& cd,
3798  std::list<ColumnDescriptor>& cdd,
3799  std::list<DictDescriptor>& dds,
3800  const TableDescriptor td,
3801  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3802  cat_write_lock write_lock(this);
3803  cat_sqlite_lock sqlite_lock(getObjForLock());
3804 
3805  if (shared_dict_defs.empty()) {
3806  return false;
3807  }
3808  for (const auto& shared_dict_def : shared_dict_defs) {
3809  // check if the current column is a referencing column
3810  const auto& column = shared_dict_def.get_column();
3811  if (cd.columnName == column) {
3812  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3813  // Dictionaries are being shared in table to be created
3814  const auto& ref_column = shared_dict_def.get_foreign_column();
3815  auto colIt =
3816  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3817  return !ref_column.compare(it.columnName);
3818  });
3819  CHECK(colIt != cdd.end());
3820  cd.columnType = colIt->columnType;
3821 
3822  const int dict_id = colIt->columnType.get_comp_param();
3823  CHECK_GE(dict_id, 1);
3824  auto dictIt = std::find_if(
3825  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3826  return it.dictRef.dbId == this->currentDB_.dbId &&
3827  it.dictRef.dictId == dict_id;
3828  });
3829  if (dictIt != dds.end()) {
3830  // There exists dictionary definition of a dictionary column
3831  CHECK_GE(dictIt->refcount, 1);
3832  ++dictIt->refcount;
3833  if (!table_is_temporary(&td)) {
3834  // Persist reference count
3835  sqliteConnector_.query_with_text_params(
3836  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3837  {std::to_string(dict_id)});
3838  }
3839  } else {
3840  // The dictionary is referencing a column which is referencing a column in
3841  // diffrent table
3842  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3843  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3844  }
3845  } else {
3846  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3847  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3848  if (table_is_temporary(foreign_td)) {
3849  if (!table_is_temporary(&td)) {
3850  throw std::runtime_error(
3851  "Only temporary tables can share dictionaries with other temporary "
3852  "tables.");
3853  }
3854  addReferenceToForeignDict(cd, shared_dict_def, false);
3855  } else {
3856  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3857  }
3858  }
3859  return true;
3860  }
3861  }
3862  return false;
3863 }
3864 
3865 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
3866  std::list<DictDescriptor>& dds,
3867  const TableDescriptor& td,
3868  bool is_logical_table,
3869  bool use_temp_dictionary) {
3870  cat_write_lock write_lock(this);
3871 
3872  std::string dictName{"Initial_key"};
3873  int dictId{0};
3874  std::string folderPath;
3875  if (is_logical_table) {
3876  cat_sqlite_lock sqlite_lock(getObjForLock());
3877 
3878  sqliteConnector_.query_with_text_params(
3879  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3880  "?, 1)",
3881  std::vector<std::string>{
3882  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3883  sqliteConnector_.query_with_text_param(
3884  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3885  dictId = sqliteConnector_.getData<int>(0, 0);
3886  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3887  sqliteConnector_.query_with_text_param(
3888  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3889  folderPath = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
3890  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
3891  }
3892  DictDescriptor dd(currentDB_.dbId,
3893  dictId,
3894  dictName,
3896  false,
3897  1,
3898  folderPath,
3899  use_temp_dictionary);
3900  dds.push_back(dd);
3901  if (!cd.columnType.is_array()) {
3903  }
3904  cd.columnType.set_comp_param(dictId);
3905  set_dict_key(cd);
3906 }
3907 
3908 void Catalog::createShardedTable(
3909  TableDescriptor& td,
3910  const list<ColumnDescriptor>& cols,
3911  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3912  /* create logical table */
3913  TableDescriptor* tdl = &td;
3914  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3915  int32_t logical_tb_id = tdl->tableId;
3916  std::string logical_table_name = tdl->tableName;
3917 
3918  /* create physical tables and link them to the logical table */
3919  std::vector<int32_t> physicalTables;
3920  for (int32_t i = 1; i <= td.nShards; i++) {
3921  TableDescriptor* tdp = &td;
3922  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3923  tdp->shard = i - 1;
3924  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3925  int32_t physical_tb_id = tdp->tableId;
3926 
3927  /* add physical table to the vector of physical tables */
3928  physicalTables.push_back(physical_tb_id);
3929  }
3930 
3931  if (!physicalTables.empty()) {
3932  cat_write_lock write_lock(this);
3933  /* add logical to physical tables correspondence to the map */
3934  const auto it_ok =
3935  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3936  CHECK(it_ok.second);
3937  /* update sqlite mapd_logical_to_physical in sqlite database */
3938  if (!table_is_temporary(&td)) {
3939  updateLogicalToPhysicalTableMap(logical_tb_id);
3940  }
3941  }
3942 }
3943 
3944 void Catalog::truncateTable(const TableDescriptor* td) {
3945  // truncate all corresponding physical tables
3946  const auto physical_tables = getPhysicalTablesDescriptors(td);
3947  for (const auto table : physical_tables) {
3948  doTruncateTable(table);
3949  }
3950 }
3951 
3952 void Catalog::doTruncateTable(const TableDescriptor* td) {
3953  // must destroy fragmenter before deleteChunks is called.
3954  removeFragmenterForTable(td->tableId);
3955 
3956  const int tableId = td->tableId;
3957  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3958  // assuming deleteChunksWithPrefix is atomic
3959  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3960  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3961 
3962  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3963 
3964  cat_write_lock write_lock(this);
3965  std::unique_ptr<StringDictionaryClient> client;
3966  if (SysCatalog::instance().isAggregator()) {
3967  CHECK(!string_dict_hosts_.empty());
3968  DictRef dict_ref(currentDB_.dbId, -1);
3969  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3970  }
3971  // clean up any dictionaries
3972  // delete all column descriptors for the table
3973  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3974  auto cd = columnDescriptor.second;
3975  if (cd->tableId != td->tableId) {
3976  continue;
3977  }
3978  const int dict_id = cd->columnType.get_comp_param();
3979  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3980  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3981  const DictRef dict_ref(currentDB_.dbId, dict_id);
3982  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3983  CHECK(dictIt != dictDescriptorMapByRef_.end());
3984  const auto& dd = dictIt->second;
3985  CHECK_GE(dd->refcount, 1);
3986  // if this is the only table using this dict reset the dict
3987  if (dd->refcount == 1) {
3988  // close the dictionary
3989  dd->stringDict.reset();
3990  File_Namespace::renameForDelete(dd->dictFolderPath);
3991  if (client) {
3992  client->drop(dd->dictRef);
3993  }
3994  if (!dd->dictIsTemp) {
3995  boost::filesystem::create_directory(dd->dictFolderPath);
3996  }
3997  }
3998 
3999  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
4000  dd->dictName,
4001  dd->dictNBits,
4002  dd->dictIsShared,
4003  dd->refcount,
4004  dd->dictFolderPath,
4005  dd->dictIsTemp);
4006  dictDescriptorMapByRef_.erase(dictIt);
4007  // now create new Dict -- need to figure out what to do here for temp tables
4008  if (client) {
4009  client->create(new_dd->dictRef, new_dd->dictIsTemp);
4010  }
4011  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
4012  getMetadataForDict(new_dd->dictRef.dictId);
4013  }
4014  }
4015 }
4016 
4017 // NOTE(Misiu): Only used by --multi-instance clusters.
4018 void Catalog::refreshDictionaryCachesForTableUnlocked(const TableDescriptor& td) {
4019  for (auto col_id = 0; col_id < td.nColumns; ++col_id) {
4020  if (auto it = columnDescriptorMapById_.find({td.tableId, col_id});
4021  it != columnDescriptorMapById_.end()) {
4022  auto cd = it->second;
4023  auto dict_id = cd->columnType.get_comp_param();
4024  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
4025  DictRef dict_ref(currentDB_.dbId, dict_id);
4026  if (auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
4027  dict_it != dictDescriptorMapByRef_.end()) {
4028  // getMetadataForDict() will only reload if the stringDict is null.
4029  dict_it->second->stringDict = nullptr;
4030  }
4031  getMetadataForDict(dict_id, true);
4032  }
4033  }
4034  }
4035 }
4036 
4037 // NOTE(sy): Only used by --multi-instance clusters.
4038 void Catalog::invalidateCachesForTable(const int table_id) {
4039  // When called, exactly one thread has a LockMgr data or insert lock for the table.
4040  cat_read_lock read_lock(this);
4041  ChunkKey const table_key{getDatabaseId(), table_id};
4042  auto td = getMutableMetadataForTableUnlocked(table_id);
4043  CHECK(td);
4044  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::GPU_LEVEL);
4045  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::CPU_LEVEL);
4046  DeleteTriggeredCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
4047 
4048  refreshDictionaryCachesForTableUnlocked(*td);
4049 
4050  // TODO(sy): doTruncateTable() says "destroy fragmenter before deleteChunks is called"
4051  // removeFragmenterForTable(table_key[CHUNK_KEY_TABLE_IDX]);
4052  if (td->fragmenter != nullptr) {
4053  auto tableDescIt = tableDescriptorMapById_.find(table_id);
4054  CHECK(tableDescIt != tableDescriptorMapById_.end());
4055  tableDescIt->second->fragmenter = nullptr;
4056  CHECK(td->fragmenter == nullptr);
4057  }
4058  getDataMgr().getGlobalFileMgr()->closeFileMgr(table_key[CHUNK_KEY_DB_IDX],
4059  table_key[CHUNK_KEY_TABLE_IDX]);
4060  // getMetadataForTable(table_key[CHUNK_KEY_TABLE_IDX], /*populateFragmenter=*/true);
4061  instantiateFragmenter(td);
4062 }
4063 
4064 void Catalog::removeFragmenterForTable(const int table_id) const {
4065  cat_write_lock write_lock(this);
4066  auto td = getMetadataForTable(table_id, false);
4067  if (td->fragmenter != nullptr) {
4068  auto tableDescIt = tableDescriptorMapById_.find(table_id);
4069  CHECK(tableDescIt != tableDescriptorMapById_.end());
4070  tableDescIt->second->fragmenter = nullptr;
4071  CHECK(td->fragmenter == nullptr);
4072  }
4073 }
4074 
4075 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
4076 void Catalog::removeChunks(const int table_id) const {
4077  removeFragmenterForTable(table_id);
4078 
4079  // remove the chunks from in memory structures
4080  ChunkKey chunkKey = {currentDB_.dbId, table_id};
4081 
4082  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
4083  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
4084 }
4085 
4086 void Catalog::dropTable(const TableDescriptor* td) {
4087  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
4089  std::vector<const TableDescriptor*> tables_to_drop;
4090  {
4091  cat_read_lock read_lock(this);
4092  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
4093  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4094  // remove all corresponding physical tables if this is a logical table
4095  const auto physicalTables = physicalTableIt->second;
4096  CHECK(!physicalTables.empty());
4097  for (size_t i = 0; i < physicalTables.size(); i++) {
4098  int32_t physical_tb_id = physicalTables[i];
4099  const TableDescriptor* phys_td =
4100  getMutableMetadataForTableUnlocked(physical_tb_id);
4101  CHECK(phys_td);
4102  tables_to_drop.emplace_back(phys_td);
4103  }
4104  }
4105  tables_to_drop.emplace_back(td);
4106  }
4107 
4108  for (auto table : tables_to_drop) {
4109  eraseTablePhysicalData(table);
4110  }
4111  deleteTableCatalogMetadata(td, tables_to_drop);
4112 }
4113 
4114 void Catalog::deleteTableCatalogMetadata(
4115  const TableDescriptor* logical_table,
4116  const std::vector<const TableDescriptor*>& physical_tables) {
4117  cat_write_lock write_lock(this);
4118  cat_sqlite_lock sqlite_lock(getObjForLock());
4119  sqliteConnector_.query("BEGIN TRANSACTION");
4120  try {
4121  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
4122  sqliteConnector_.query_with_text_param(
4123  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
4124  std::to_string(logical_table->tableId));
4125  logicalToPhysicalTableMapById_.erase(logical_table->tableId);
4126  for (auto table : physical_tables) {
4127  eraseTableMetadata(table);
4128  }
4129  } catch (std::exception& e) {
4130  sqliteConnector_.query("ROLLBACK TRANSACTION");
4131  throw;
4132  }
4133  sqliteConnector_.query("END TRANSACTION");
4134 }
4135 
4136 void Catalog::eraseTableMetadata(const TableDescriptor* td) {
4137  executeDropTableSqliteQueries(td);
4139  dropTableFromJsonUnlocked(td->tableName);
4140  }
4141  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4142  {
4143  INJECT_TIMER(removeTableFromMap_);
4144  removeTableFromMap(td->tableName, td->tableId);
4145  }
4146 }
4147 
4148 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
4149  const int tableId = td->tableId;
4150  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
4151  std::to_string(tableId));
4152  sqliteConnector_.query_with_text_params(
4153  "select comp_param from mapd_columns where compression = ? and tableid = ?",
4154  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4155  int numRows = sqliteConnector_.getNumRows();
4156  std::vector<int> dict_id_list;
4157  for (int r = 0; r < numRows; ++r) {
4158  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
4159  }
4160  for (auto dict_id : dict_id_list) {
4161  sqliteConnector_.query_with_text_params(
4162  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
4163  std::vector<std::string>{std::to_string(dict_id)});
4164  }
4165  sqliteConnector_.query_with_text_params(
4166  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
4167  "mapd_columns where compression = ? "
4168  "and tableid = ?) and refcount = 0",
4169  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4170  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
4171  std::to_string(tableId));
4172  if (td->isView) {
4173  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
4174  std::to_string(tableId));
4175  }
4177  sqliteConnector_.query_with_text_param(
4178  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
4179  }
4180 }
4181 
4182 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
4183  cat_write_lock write_lock(this);
4184  cat_sqlite_lock sqlite_lock(getObjForLock());
4185 
4186  sqliteConnector_.query("BEGIN TRANSACTION");
4187  try {
4188  sqliteConnector_.query_with_text_params(
4189  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4190  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
4191  } catch (std::exception& e) {
4192  sqliteConnector_.query("ROLLBACK TRANSACTION");
4193  throw;
4194  }
4195  sqliteConnector_.query("END TRANSACTION");
4196  TableDescriptorMap::iterator tableDescIt =
4197  tableDescriptorMap_.find(to_upper(td->tableName));
4198  CHECK(tableDescIt != tableDescriptorMap_.end());
4199  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4200  // Get table descriptor to change it
4201  TableDescriptor* changeTd = tableDescIt->second;
4202  changeTd->tableName = newTableName;
4203  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4204  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4205  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4206 }
4207 
4208 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
4209  {
4210  cat_write_lock write_lock(this);
4211  cat_sqlite_lock sqlite_lock(getObjForLock());
4212  // rename all corresponding physical tables if this is a logical table
4213  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
4214  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4215  const auto physicalTables = physicalTableIt->second;
4216  CHECK(!physicalTables.empty());
4217  for (size_t i = 0; i < physicalTables.size(); i++) {
4218  int32_t physical_tb_id = physicalTables[i];
4219  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4220  CHECK(phys_td);
4221  std::string newPhysTableName =
4222  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
4223  renamePhysicalTable(phys_td, newPhysTableName);
4224  }
4225  }
4226  renamePhysicalTable(td, newTableName);
4227  }
4228  {
4229  DBObject object(newTableName, TableDBObjectType);
4230  // update table name in direct and effective priv map
4231  DBObjectKey key;
4232  key.dbId = currentDB_.dbId;
4233  key.objectId = td->tableId;
4234  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4235  object.setObjectKey(key);
4236  auto objdescs = SysCatalog::instance().getMetadataForObject(
4237  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
4238  for (auto obj : objdescs) {
4239  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4240  if (grnt) {
4241  grnt->renameDbObject(object);
4242  }
4243  }
4244  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4245  }
4246 }
4247 
4248 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
4249  std::vector<int>& tableIds) {
4250  cat_write_lock write_lock(this);
4251  cat_sqlite_lock sqlite_lock(getObjForLock());
4252 
4253  // execute the SQL query
4254  try {
4255  for (size_t i = 0; i < names.size(); i++) {
4256  int tableId = tableIds[i];
4257  std::string& newTableName = names[i].second;
4258 
4259  sqliteConnector_.query_with_text_params(
4260  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4261  std::vector<std::string>{newTableName, std::to_string(tableId)});
4262  }
4263  } catch (std::exception& e) {
4264  sqliteConnector_.query("ROLLBACK TRANSACTION");
4265  throw;
4266  }
4267 
4268  // reset the table descriptors, give Calcite a kick
4269  for (size_t i = 0; i < names.size(); i++) {
4270  std::string& curTableName = names[i].first;
4271  std::string& newTableName = names[i].second;
4272 
4273  TableDescriptorMap::iterator tableDescIt =
4274  tableDescriptorMap_.find(to_upper(curTableName));
4275  CHECK(tableDescIt != tableDescriptorMap_.end());
4276  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4277 
4278  // Get table descriptor to change it
4279  TableDescriptor* changeTd = tableDescIt->second;
4280  changeTd->tableName = newTableName;
4281  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4282  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4283  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4284  }
4285 }
4286 
4287 // Collect an 'overlay' mapping of the tableNames->tableId
4288 // to account for possible chained renames
4289 // (for swap: a->b, b->c, c->d, d->a)
4290 
4292  std::map<std::string, int>& cachedTableMap,
4293  std::string& curTableName) {
4294  auto iter = cachedTableMap.find(curTableName);
4295  if ((iter != cachedTableMap.end())) {
4296  // get the cached tableId
4297  // and use that to lookup the TableDescriptor
4298  int tableId = (*iter).second;
4299  if (tableId == -1) {
4300  return NULL;
4301  } else {
4302  return cat->getMetadataForTable(tableId);
4303  }
4304  }
4305 
4306  // else ... lookup in standard location
4307  return cat->getMetadataForTable(curTableName);
4308 }
4309 
4310 void replaceTableName(std::map<std::string, int>& cachedTableMap,
4311  std::string& curTableName,
4312  std::string& newTableName,
4313  int tableId) {
4314  // mark old/cur name as deleted
4315  cachedTableMap[curTableName] = -1;
4316 
4317  // insert the 'new' name
4318  cachedTableMap[newTableName] = tableId;
4319 }
4320 
4321 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
4322  // tableId of all tables being renamed
4323  // ... in matching order to 'names'
4324  std::vector<int> tableIds;
4325 
4326  // (sorted & unique) list of tables ids for locking
4327  // (with names index of src in case of error)
4328  // <tableId, strIndex>
4329  // std::map is by definition/implementation sorted
4330  // std::map current usage below tests to avoid over-write
4331  std::map<int, size_t> uniqueOrderedTableIds;
4332 
4333  // mapping of modified tables names -> tableId
4334  std::map<std::string, int> cachedTableMap;
4335 
4336  // -------- Setup --------
4337 
4338  // gather tableIds pre-execute; build maps
4339  for (size_t i = 0; i < names.size(); i++) {
4340  std::string& curTableName = names[i].first;
4341  std::string& newTableName = names[i].second;
4342 
4343  // make sure the table being renamed exists,
4344  // or will exist when executed in 'name' order
4345  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
4346  CHECK(td);
4347 
4348  tableIds.push_back(td->tableId);
4349  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
4350  // don't overwrite as it should map to the first names index 'i'
4351  uniqueOrderedTableIds[td->tableId] = i;
4352  }
4353  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
4354  }
4355 
4356  CHECK_EQ(tableIds.size(), names.size());
4357 
4358  // The outer Stmt created a write lock before calling the catalog rename table
4359  // -> TODO: might want to sort out which really should set the lock :
4360  // the comment in the outer scope indicates it should be in here
4361  // but it's not clear if the access done there *requires* it out there
4362  //
4363  // Lock tables pre-execute (may/will be in different order than rename occurs)
4364  // const auto execute_write_lock = heavyai::unique_lock<heavyai::shared_mutex>(
4365  // *legacylockmgr::LockMgr<heavyai::shared_mutex, bool>::getMutex(
4366  // legacylockmgr::ExecutorOuterLock, true));
4367 
4368  // acquire the locks for all tables being renamed
4370  for (auto& idPair : uniqueOrderedTableIds) {
4371  std::string& tableName = names[idPair.second].first;
4372  tableLocks.emplace_back(
4375  *this, tableName, false)));
4376  }
4377 
4378  // -------- Rename --------
4379 
4380  {
4381  cat_write_lock write_lock(this);
4382  cat_sqlite_lock sqlite_lock(getObjForLock());
4383 
4384  sqliteConnector_.query("BEGIN TRANSACTION");
4385 
4386  // collect all (tables + physical tables) into a single list
4387  std::vector<std::pair<std::string, std::string>> allNames;
4388  std::vector<int> allTableIds;
4389 
4390  for (size_t i = 0; i < names.size(); i++) {
4391  int tableId = tableIds[i];
4392  std::string& curTableName = names[i].first;
4393  std::string& newTableName = names[i].second;
4394 
4395  // rename all corresponding physical tables if this is a logical table
4396  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
4397  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4398  const auto physicalTables = physicalTableIt->second;
4399  CHECK(!physicalTables.empty());
4400  for (size_t k = 0; k < physicalTables.size(); k++) {
4401  int32_t physical_tb_id = physicalTables[k];
4402  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4403  CHECK(phys_td);
4404  std::string newPhysTableName =
4405  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
4406  allNames.emplace_back(phys_td->tableName, newPhysTableName);
4407  allTableIds.push_back(phys_td->tableId);
4408  }
4409  }
4410  allNames.emplace_back(curTableName, newTableName);
4411  allTableIds.push_back(tableId);
4412  }
4413  // rename all tables in one shot
4414  renamePhysicalTable(allNames, allTableIds);
4415 
4416  sqliteConnector_.query("END TRANSACTION");
4417  // cat write/sqlite locks are released when they go out scope
4418  }
4419  {
4420  // now update the SysCatalog
4421  for (size_t i = 0; i < names.size(); i++) {
4422  int tableId = tableIds[i];
4423  std::string& newTableName = names[i].second;
4424  {
4425  // update table name in direct and effective priv map
4426  DBObjectKey key;
4427  key.dbId = currentDB_.dbId;
4428  key.objectId = tableId;
4429  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4430 
4431  DBObject object(newTableName, TableDBObjectType);
4432  object.setObjectKey(key);
4433 
4434  auto objdescs = SysCatalog::instance().getMetadataForObject(
4435  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
4436  for (auto obj : objdescs) {
4437  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4438  if (grnt) {
4439  grnt->renameDbObject(object);
4440  }
4441  }
4442  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4443  }
4444  }
4445  }
4446 
4447  // -------- Cleanup --------
4448 
4449  // table locks are released when 'tableLocks' goes out of scope
4450 }
4451 
4452 void Catalog::renameColumn(const TableDescriptor* td,
4453  const ColumnDescriptor* cd,
4454  const string& newColumnName) {
4455  cat_write_lock write_lock(this);
4456  cat_sqlite_lock sqlite_lock(getObjForLock());
4457  sqliteConnector_.query("BEGIN TRANSACTION");
4458  try {
4459  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4460  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4461  CHECK(cdx);
4462  std::string new_column_name = cdx->columnName;
4463  new_column_name.replace(0, cd->columnName.size(), newColumnName);
4464  sqliteConnector_.query_with_text_params(
4465  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4466  std::vector<std::string>{new_column_name,
4467  std::to_string(td->tableId),
4468  std::to_string(cdx->columnId)});
4469  }
4470  } catch (std::exception& e) {
4471  sqliteConnector_.query("ROLLBACK TRANSACTION");
4472  throw;
4473  }
4474  sqliteConnector_.query("END TRANSACTION");
4475  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4476  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4477  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4478  CHECK(cdx);
4479  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4480  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
4481  CHECK(columnDescIt != columnDescriptorMap_.end());
4482  ColumnDescriptor* changeCd = columnDescIt->second;
4483  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
4484  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
4485  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
4486  changeCd;
4487  }
4488  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4489 }
4490 
4491 int32_t Catalog::createDashboard(DashboardDescriptor& vd) {
4492  cat_write_lock write_lock(this);
4493  cat_sqlite_lock sqlite_lock(getObjForLock());
4494  sqliteConnector_.query("BEGIN TRANSACTION");
4495  try {
4496  // TODO(andrew): this should be an upsert
4497  sqliteConnector_.query_with_text_params(
4498  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4499  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4500  if (sqliteConnector_.getNumRows() > 0) {
4501  sqliteConnector_.query_with_text_params(
4502  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4503  "update_time = "
4504  "datetime('now') where name = ? "
4505  "and userid = ?",
4506  std::vector<std::string>{vd.dashboardState,
4507  vd.imageHash,
4508  vd.dashboardMetadata,
4509  vd.dashboardName,
4510  std::to_string(vd.userId)});
4511  } else {
4512  sqliteConnector_.query_with_text_params(
4513  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4514  "update_time, "
4515  "userid) "
4516  "VALUES "
4517  "(?,?,?,?, "
4518  "datetime('now'), ?)",
4519  std::vector<std::string>{vd.dashboardName,
4520  vd.dashboardState,
4521  vd.imageHash,
4522  vd.dashboardMetadata,
4523  std::to_string(vd.userId)});
4524  }
4525  } catch (std::exception& e) {
4526  sqliteConnector_.query("ROLLBACK TRANSACTION");
4527  throw;
4528  }
4529  sqliteConnector_.query("END TRANSACTION");
4530 
4531  // now get the auto generated dashboardId
4532  try {
4533  sqliteConnector_.query_with_text_params(
4534  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4535  "WHERE name = ? and userid = ?",
4536  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4537  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
4538  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4539  } catch (std::exception& e) {
4540  throw;
4541  }
4543  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4544  addFrontendViewToMap(vd);
4545  sqlite_lock.unlock();
4546  write_lock.unlock();
4547  if (!isInfoSchemaDb()) {
4548  // NOTE(wamsi): Transactionally unsafe
4549  createOrUpdateDashboardSystemRole(
4551  }
4552  return vd.dashboardId;
4553 }
4554 
4555 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
4556  cat_write_lock write_lock(this);
4557  cat_sqlite_lock sqlite_lock(getObjForLock());
4558 
4559  CHECK(sqliteConnector_.getSqlitePtr());
4560  sqliteConnector_.query("BEGIN TRANSACTION");
4561  try {
4562  sqliteConnector_.query_with_text_params(
4563  "SELECT id FROM mapd_dashboards WHERE id = ?",
4564  std::vector<std::string>{std::to_string(vd.dashboardId)});
4565  if (sqliteConnector_.getNumRows() > 0) {
4566  sqliteConnector_.query_with_text_params(
4567  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4568  "?, userid = ?, update_time = datetime('now') where id = ? ",
4569  std::vector<std::string>{vd.dashboardName,
4570  vd.dashboardState,
4571  vd.imageHash,
4572  vd.dashboardMetadata,
4573  std::to_string(vd.userId),
4575  } else {
4576  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4577  << " does not exist in db";
4578  throw runtime_error("Error replacing dashboard id " +
4579  std::to_string(vd.dashboardId) + " does not exist in db");
4580  }
4581  } catch (std::exception& e) {
4582  sqliteConnector_.query("ROLLBACK TRANSACTION");
4583  throw;
4584  }
4585  sqliteConnector_.query("END TRANSACTION");
4586 
4587  bool found{false};
4588  for (auto descp : dashboardDescriptorMap_) {
4589  auto dash = descp.second.get();
4590  if (dash->dashboardId == vd.dashboardId) {
4591  found = true;
4592  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
4593  dash->dashboardName);
4594  if (viewDescIt ==
4595  dashboardDescriptorMap_.end()) { // check to make sure view exists
4596  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4597  << " dashboard " << dash->dashboardName << " does not exist in map";
4598  throw runtime_error("No metadata for dashboard for user " +
4599  std::to_string(dash->userId) + " dashboard " +
4600  dash->dashboardName + " does not exist in map");
4601  }
4602  dashboardDescriptorMap_.erase(viewDescIt);
4603  break;
4604  }
4605  }
4606  if (!found) {
4607  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4608  << " does not exist in map";
4609  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4610  " does not exist in map");
4611  }
4612 
4613  // now reload the object
4614  sqliteConnector_.query_with_text_params(
4615  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4616  "mapd_dashboards "
4617  "WHERE id = ?",
4618  std::vector<std::string>{std::to_string(vd.dashboardId)});
4619  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4621  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4622  addFrontendViewToMapNoLock(vd);
4623  sqlite_lock.unlock();
4624  write_lock.unlock();
4625  if (!isInfoSchemaDb()) {
4626  // NOTE(wamsi): Transactionally unsafe
4627  createOrUpdateDashboardSystemRole(
4629  }
4630 }
4631 
4632 std::string Catalog::calculateSHA1(const std::string& data) {
4633  boost::uuids::detail::sha1 sha1;
4634  unsigned int digest[5];
4635  sha1.process_bytes(data.c_str(), data.length());
4636  sha1.get_digest(digest);
4637  std::stringstream ss;
4638  for (size_t i = 0; i < 5; i++) {
4639  ss << std::hex << digest[i];
4640  }
4641  return ss.str();
4642 }
4643 
4644 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4645  cat_write_lock write_lock(this);
4646  cat_sqlite_lock sqlite_lock(getObjForLock());
4647  sqliteConnector_.query("BEGIN TRANSACTION");
4648  try {
4649  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4650  .substr(0, 8);
4651  sqliteConnector_.query_with_text_params(
4652  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4653  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4654  if (sqliteConnector_.getNumRows() > 0) {
4655  sqliteConnector_.query_with_text_params(
4656  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4657  "link = ?",
4658  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4659  } else {
4660  sqliteConnector_.query_with_text_params(
4661  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4662  "update_time) VALUES (?,?,?,?, datetime('now'))",
4663  std::vector<std::string>{
4664  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4665  }
4666  // now get the auto generated dashid
4667  sqliteConnector_.query_with_text_param(
4668  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4669  "WHERE link = ?",
4670  ld.link);
4671  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4672  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4673  } catch (std::exception& e) {
4674  sqliteConnector_.query("ROLLBACK TRANSACTION");
4675  throw;
4676  }
4677  sqliteConnector_.query("END TRANSACTION");
4678  addLinkToMap(ld);
4679  return ld.link;
4680 }
4681 
4682 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4683  const TableDescriptor* td) const {
4684  cat_read_lock read_lock(this);
4685 
4686  const auto column_descriptors =
4687  getAllColumnMetadataForTable(td->tableId, false, true, true);
4688 
4689  const ColumnDescriptor* shard_cd{nullptr};
4690  int i = 1;
4691  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4692  ++cd_itr, ++i) {
4693  if (i == td->shardedColumnId) {
4694  shard_cd = *cd_itr;
4695  }
4696  }
4697  return shard_cd;
4698 }
4699 
4700 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4701  const TableDescriptor* logical_table_desc,
4702  bool populate_fragmenter) const {
4703  cat_read_lock read_lock(this);
4704  const auto physicalTableIt =
4705  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4706  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4707  return {logical_table_desc};
4708  }
4709  const auto physicalTablesIds = physicalTableIt->second;
4710  CHECK(!physicalTablesIds.empty());
4711  read_lock.unlock();
4712  std::vector<const TableDescriptor*> physicalTables;
4713  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4714  physicalTables.push_back(
4715  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4716  }
4717  return physicalTables;
4718 }
4719 
4720 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4721  const {
4722  cat_read_lock read_lock(this);
4723  std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4724  table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4725  for (const auto [table_id, td] : tableDescriptorMapById_) {
4726  // Only include ids for physical persisted tables
4727  if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4728  logicalToPhysicalTableMapById_.find(table_id) ==
4729  logicalToPhysicalTableMapById_.end()) {
4730  table_and_shard_ids.emplace_back(table_id, td->shard);
4731  }
4732  }
4733  return table_and_shard_ids;
4734 }
4735 
4736 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4737  cat_read_lock read_lock(this);
4738 
4739  std::map<int, const ColumnDescriptor*> mapping;
4740 
4741  const auto tables = getAllTableMetadata();
4742  for (const auto td : tables) {
4743  if (td->shard >= 0) {
4744  // skip shards, they're not standalone tables
4745  continue;
4746  }
4747 
4748  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4749  const auto& ti = cd->columnType;
4750  if (ti.is_string()) {
4751  if (ti.get_compression() == kENCODING_DICT) {
4752  // if foreign reference, get referenced tab.col
4753  const auto dict_id = ti.get_comp_param();
4754 
4755  // ignore temp (negative) dictionaries
4756  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4757  mapping[dict_id] = cd;
4758  }
4759  }
4760  }
4761  }
4762  }
4763 
4764  return mapping;
4765 }
4766 
4767 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4768  const UserMetadata& user_metadata,
4769  const GetTablesType get_tables_type) const {
4770  if (td->shard >= 0) {
4771  // skip shards, they're not standalone tables
4772  return false;
4773  }
4774  switch (get_tables_type) {
4775  case GET_PHYSICAL_TABLES: {
4776  if (td->isView) {
4777  return false;
4778  }
4779  break;
4780  }
4781  case GET_VIEWS: {
4782  if (!td->isView) {
4783  return false;
4784  }
4785  break;
4786  }
4787  default:
4788  break;
4789  }
4791  dbObject.loadKey(*this);
4792  std::vector<DBObject> privObjects = {dbObject};
4793  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4794  // skip table, as there are no privileges to access it
4795  return false;
4796  }
4797  return true;
4798 }
4799 
4800 std::vector<std::string> Catalog::getTableNamesForUser(
4801  const UserMetadata& user_metadata,
4802  const GetTablesType get_tables_type) const {
4803  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4804  cat_read_lock read_lock(this);
4805  std::vector<std::string> table_names;
4806  const auto tables = getAllTableMetadata();
4807  for (const auto td : tables) {
4808  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4809  table_names.push_back(td->tableName);
4810  }
4811  }
4812  return table_names;
4813 }
4814 
4815 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4816  const UserMetadata& user_metadata,
4817  const GetTablesType get_tables_type,
4818  const std::string& filter_table_name) const {
4819  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4820  cat_read_lock read_lock(this);
4821 
4822  std::vector<TableMetadata> tables_metadata;
4823  const auto tables = getAllTableMetadata();
4824  for (const auto td : tables) {
4825  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4826  if (!filter_table_name.empty()) {
4827  if (td->tableName != filter_table_name) {
4828  continue;
4829  }
4830  }
4831  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4832  // descriptor outside catalog lock
4833  tables_metadata.emplace_back(table_metadata);
4834  }
4835  }
4836  return tables_metadata;
4837 }
4838 
4839 int Catalog::getLogicalTableId(const int physicalTableId) const {
4840  cat_read_lock read_lock(this);
4841  for (const auto& l : logicalToPhysicalTableMapById_) {
4842  if (l.second.end() != std::find_if(l.second.begin(),
4843  l.second.end(),
4844  [&](decltype(*l.second.begin()) tid) -> bool {
4845  return physicalTableId == tid;
4846  })) {
4847  return l.first;
4848  }
4849  }
4850  return physicalTableId;
4851 }
4852 
4853 void Catalog::checkpoint(const int logicalTableId) const {
4854  const auto td = getMetadataForTable(logicalTableId);
4855  const auto shards = getPhysicalTablesDescriptors(td);
4856  for (const auto shard : shards) {
4857  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4858  }
4859 }
4860 
4861 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4862  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4863  try {
4864  checkpoint(logical_table_id);
4865  } catch (...) {
4866  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4867  throw;
4868  }
4869 }
4870 
4871 void Catalog::resetTableEpochFloor(const int logicalTableId) const {
4872  cat_read_lock read_lock(this);
4873  const auto td = getMetadataForTable(logicalTableId, false);
4874  const auto shards = getPhysicalTablesDescriptors(td, false);
4875  for (const auto shard : shards) {
4876  getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
4877  }
4878 }
4879 
4880 void Catalog::eraseDbMetadata() {
4881  const auto tables = getAllTableMetadata();
4882  for (const auto table : tables) {
4883  eraseTableMetadata(table);
4884  }
4885  // Physically erase database metadata
4886  boost::filesystem::remove(basePath_ + "/" + shared::kCatalogDirectoryName + "/" +
4887  currentDB_.dbName);
4888  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4889 }
4890 
4891 void Catalog::eraseDbPhysicalData() {
4892  const auto tables = getAllTableMetadata();
4893  for (const auto table : tables) {
4894  eraseTablePhysicalData(table);
4895  }
4896 }
4897 
4898 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
4899  const int tableId = td->tableId;
4900  // must destroy fragmenter before deleteChunks is called.
4901  removeFragmenterForTable(tableId);
4902 
4903  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4904  {
4905  INJECT_TIMER(deleteChunksWithPrefix);
4906  // assuming deleteChunksWithPrefix is atomic
4907  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4908  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4909  }
4910  if (!td->isView) {
4911  INJECT_TIMER(Remove_Table);
4912  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4913  }
4914 }
4915 
4916 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4917  const int32_t& shardNumber) {
4918  std::string physicalTableName =
4919  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4920  return (physicalTableName);
4921 }
4922 
4923 void Catalog::buildForeignServerMapUnlocked() {
4925  sqliteConnector_.query(
4926  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4927  "omnisci_foreign_servers");
4928  auto num_rows = sqliteConnector_.getNumRows();
4929 
4930  for (size_t row = 0; row < num_rows; row++) {
4931  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4932  sqliteConnector_.getData<int>(row, 0),
4933  sqliteConnector_.getData<std::string>(row, 1),
4934  sqliteConnector_.getData<std::string>(row, 2),
4935  sqliteConnector_.getData<std::string>(row, 3),
4936  sqliteConnector_.getData<std::int32_t>(row, 4),
4937  sqliteConnector_.getData<std::int32_t>(row, 5));
4938  foreignServerMap_[foreign_server->name] = foreign_server;
4939  foreignServerMapById_[foreign_server->id] = foreign_server;
4940  }
4941 }
4942 
4943 void Catalog::updateForeignTablesInMapUnlocked() {
4945  sqliteConnector_.query(
4946  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4947  "omnisci_foreign_tables");
4948  auto num_rows = sqliteConnector_.getNumRows();
4949  for (size_t r = 0; r < num_rows; r++) {
4950  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4951  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4952  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4953  const auto last_refresh_time = sqliteConnector_.getData<int64_t>(r, 3);
4954  const auto next_refresh_time = sqliteConnector_.getData<int64_t>(r, 4);
4955 
4956  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4957  auto foreign_table =
4958  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4959  CHECK(foreign_table);
4960  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4961  CHECK(foreign_table->foreign_server);
4962  foreign_table->populateOptionsMap(options);
4963  foreign_table->last_refresh_time = last_refresh_time;
4964  foreign_table->next_refresh_time = next_refresh_time;
4965  if (foreign_table->is_system_table) {
4966  foreign_table->is_in_memory_system_table =
4968  foreign_table->foreign_server->data_wrapper_type);
4969  }
4970  }
4971 }
4972 
4973 void Catalog::setForeignServerProperty(const std::string& server_name,
4974  const std::string& property,
4975  const std::string& value) {
4976  cat_sqlite_lock sqlite_lock(getObjForLock());
4977  sqliteConnector_.query_with_text_params(
4978  "SELECT id from omnisci_foreign_servers where name = ?",
4979  std::vector<std::string>{server_name});
4980  auto num_rows = sqliteConnector_.getNumRows();
4981  if (num_rows > 0) {
4982  CHECK_EQ(size_t(1), num_rows);
4983  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4984  sqliteConnector_.query_with_text_params(
4985  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4986  std::vector<std::string>{value, std::to_string(server_id)});
4987  } else {
4988  throw std::runtime_error{"Can not change property \"" + property +
4989  "\" for foreign server." + " Foreign server \"" +
4990  server_name + "\" is not found."};
4991  }
4992 }
4993 
4994 void Catalog::createDefaultServersIfNotExists() {
4999 
5000  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
5001  "default_local_delimited",
5003  options,
5005  local_csv_server->validate();
5006  createForeignServerNoLocks(std::move(local_csv_server), true);
5007 
5008 #ifdef ENABLE_IMPORT_PARQUET
5009  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
5010  "default_local_parquet",
5012  options,
5014  local_parquet_server->validate();
5015  createForeignServerNoLocks(std::move(local_parquet_server), true);
5016 #endif
5017 
5018  auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
5019  "default_local_regex_parsed",
5021  options,
5023  local_regex_parser_server->validate();
5024  createForeignServerNoLocks(std::move(local_regex_parser_server), true);
5025 }
5026 
5027 // prepare a fresh file reload on next table access
5028 void Catalog::setForReload(const int32_t tableId) {
5029  const auto td = getMetadataForTable(tableId);
5030  for (const auto shard : getPhysicalTablesDescriptors(td)) {
5031  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
5032  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
5033  }
5034 }
5035 
5036 // get a table's data dirs
5037 std::vector<std::string> Catalog::getTableDataDirectories(
5038  const TableDescriptor* td) const {
5039  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
5040  std::vector<std::string> file_paths;
5041  for (auto shard : getPhysicalTablesDescriptors(td)) {
5042  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
5043  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
5044  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
5045  file_paths.push_back(file_path.filename().string());
5046  }
5047  return file_paths;
5048 }
5049 
5050 // get a column's dict dir basename
5051 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd,
5052  bool file_name_only) const {
5053  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
5055  cd->columnType.get_comp_param() > 0) {
5056  const auto dictId = cd->columnType.get_comp_param();
5057  const DictRef dictRef(currentDB_.dbId, dictId);
5058  const auto dit = dictDescriptorMapByRef_.find(dictRef);
5059  CHECK(dit != dictDescriptorMapByRef_.end());
5060  CHECK(dit->second);
5061  if (file_name_only) {
5062  boost::filesystem::path file_path(dit->second->dictFolderPath);
5063  return file_path.filename().string();
5064  } else {
5065  return dit->second->dictFolderPath;
5066  }
5067  }
5068  return std::string();
5069 }
5070 
5071 // get a table's dict dirs
5072 std::vector<std::string> Catalog::getTableDictDirectories(
5073  const TableDescriptor* td) const {
5074  std::vector<std::string> file_paths;
5075  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
5076  auto file_base = getColumnDictDirectory(cd);
5077  if (!file_base.empty() &&
5078  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
5079  file_paths.push_back(file_base);
5080  }
5081  }
5082  return file_paths;
5083 }
5084 
5085 std::set<std::string> Catalog::getTableDictDirectoryPaths(int32_t table_id) const {
5086  cat_read_lock read_lock(this);
5087  std::set<std::string> directory_paths;
5088  auto it = dict_columns_by_table_id_.find(table_id);
5089  if (it != dict_columns_by_table_id_.end()) {
5090  for (auto cd : it->second) {
5091  auto directory_path = getColumnDictDirectory(cd, false);
5092  if (!directory_path.empty()) {
5093  directory_paths.emplace(directory_path);
5094  }
5095  }
5096  }
5097  return directory_paths;
5098 }
5099 
5100 // returns table schema in a string
5101 // NOTE(sy): Might be able to replace dumpSchema() later with
5102 // dumpCreateTable() after a deeper review of the TableArchiver code.
5103 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
5104  CHECK(!td->is_system_table);
5105  cat_read_lock read_lock(this);
5106 
5107  std::ostringstream os;
5108  os << "CREATE TABLE @T (";
5109  // gather column defines
5110  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
5111  std::string comma;
5112  std::vector<std::string> shared_dicts;
5113  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5114  for (const auto cd : cds) {
5115  if (!(cd->isSystemCol || cd->isVirtualCol)) {
5116  const auto& ti = cd->columnType;
5117  os << comma << cd->columnName;
5118  // CHAR is perculiar... better dump it as TEXT(32) like \d does
5119  if (ti.get_type() == SQLTypes::kCHAR) {
5120  os << " "
5121  << "TEXT";
5122  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
5123  os << " "
5124  << "TEXT[]";
5125  } else {
5126  os << " " << ti.get_type_name();
5127  }
5128  os << (ti.get_notnull() ? " NOT NULL" : "");
5129  if (cd->default_value.has_value()) {
5130  os << " DEFAULT " << cd->getDefaultValueLiteral();
5131  }
5132  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
5133  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5134  if (ti.get_compression() == kENCODING_DICT) {
5135  // if foreign reference, get referenced tab.col
5136  const auto dict_id = ti.get_comp_param();
5137  const DictRef dict_ref(currentDB_.dbId, dict_id);
5138  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
5139  CHECK(dict_it != dictDescriptorMapByRef_.end());
5140  const auto dict_name = dict_it->second->dictName;
5141  // when migrating a table, any foreign dict ref will be dropped
5142  // and the first cd of a dict will become root of the dict
5143  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
5144  dict_root_cds[dict_name] = cd;
5145  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
5146  } else {
5147  const auto dict_root_cd = dict_root_cds[dict_name];
5148  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
5149  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
5150  // "... shouldn't specify an encoding, it borrows from the referenced
5151  // column"
5152  }
5153  } else {
5154  os << " ENCODING NONE";
5155  }
5156  } else if (ti.is_date_in_days() ||
5157  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5158  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5159  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
5160  } else if (ti.is_geometry()) {
5161  if (ti.get_compression() == kENCODING_GEOINT) {
5162  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
5163  << ")";
5164  } else {
5165  os << " ENCODING NONE";
5166  }
5167  }
5168  comma = ", ";
5169  }
5170  }
5171  // gather SHARED DICTIONARYs
5172  if (shared_dicts.size()) {
5173  os << ", " << boost::algorithm::join(shared_dicts, ", ");
5174  }
5175  // gather WITH options ...
5176  std::vector<std::string> with_options;
5177  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
5178  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
5179  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
5180  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
5181  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
5182  : "VACUUM='IMMEDIATE'");
5183  if (!td->partitions.empty()) {
5184  with_options.push_back("PARTITIONS='" + td->partitions + "'");
5185  }
5186  if (td->nShards > 0) {
5187  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
5188  CHECK(shard_cd);
5189  os << ", SHARD KEY(" << shard_cd->columnName << ")";
5190  with_options.push_back(
5191  "SHARD_COUNT=" +
5192  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
5193  }
5194  if (td->sortedColumnId > 0) {
5195  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
5196  CHECK(sort_cd);
5197  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
5198  }
5200  td->maxRollbackEpochs != -1) {
5201  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
5203  }
5204  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
5205  return os.str();
5206 }
5207 
5208 #include "Parser/ReservedKeywords.h"
5209 
5211 inline bool contains_spaces(std::string_view str) {
5212  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
5213  return std::isspace(ch);
5214  }) != str.end();
5215 }
5216 
5219  std::string_view str,
5220  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
5221  return str.find_first_of(chars) != std::string_view::npos;
5222 }
5223 
5225 inline bool is_reserved_sql_keyword(std::string_view str) {
5226  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
5227 }
5228 
5229 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
5230 std::string Catalog::dumpCreateTable(const TableDescriptor* td,
5231  bool multiline_formatting,
5232  bool dump_defaults) const {
5233  cat_read_lock read_lock(this);
5234  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5235 }
5236 
5237 std::optional<std::string> Catalog::dumpCreateTable(int32_t table_id,
5238  bool multiline_formatting,
5239  bool dump_defaults) const {
5240  cat_read_lock read_lock(this);
5241  const auto td = getMutableMetadataForTableUnlocked(table_id);
5242  if (!td) {
5243  return {};
5244  }
5245  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5246 }
5247 
5248 std::string Catalog::dumpCreateTableUnlocked(const TableDescriptor* td,
5249  bool multiline_formatting,
5250  bool dump_defaults) const {
5251  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
5252  std::ostringstream os;
5253 
5254  if (foreign_table && !td->is_system_table) {
5255  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
5256  } else if (!td->isView) {
5257  os << "CREATE ";
5259  os << "TEMPORARY ";
5260  }
5261  os << "TABLE " + td->tableName + " (";
5262  } else {
5263  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
5264  return os.str();
5265  }
5266  // scan column defines
5267  std::vector<std::string> additional_info;
5268  std::set<std::string> shared_dict_column_names;
5269 
5270  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
5271 
5272  // gather column defines
5273  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
5274  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5275  bool first = true;
5276  for (const auto cd : cds) {
5277  if (!(cd->isSystemCol || cd->isVirtualCol)) {
5278  const auto& ti = cd->columnType;
5279  if (!first) {
5280  os << ",";
5281  if (!multiline_formatting) {
5282  os << " ";
5283  }
5284  } else {
5285  first = false;
5286  }
5287  if (multiline_formatting) {
5288  os << "\n ";
5289  }
5290  // column name
5291  os << quoteIfRequired(cd->columnName);
5292  // CHAR is perculiar... better dump it as TEXT(32) like \d does
5293  if (ti.get_type() == SQLTypes::kCHAR) {
5294  os << " "
5295  << "TEXT";
5296  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
5297  os << " "
5298  << "TEXT[]";
5299  } else {
5300  os << " " << ti.get_type_name();
5301  }
5302  os << (ti.get_notnull() ? " NOT NULL" : "");
5303  if (cd->default_value.has_value()) {
5304  os << " DEFAULT " << cd->getDefaultValueLiteral();
5305  }
5306  if (shared_dict_column_names.find(cd->columnName) ==
5307  shared_dict_column_names.end()) {
5308  // avoids "Column ... shouldn't specify an encoding, it borrows it
5309  // from the referenced column"
5310  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
5311  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5312  if (ti.get_compression() == kENCODING_DICT) {
5313  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
5314  } else {
5315  os << " ENCODING NONE";
5316  }
5317  } else if (ti.is_date_in_days() ||
5318  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5319  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5320  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
5321  } else if (ti.is_geometry()) {
5322  if (ti.get_compression() == kENCODING_GEOINT) {
5323  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
5324  << ")";
5325  } else {
5326  os << " ENCODING NONE";
5327  }
5328  }
5329  }
5330  }
5331  }
5332  // gather SHARED DICTIONARYs
5333  if (additional_info.size()) {
5334  std::string comma;
5335  if (!multiline_formatting) {
5336  comma = ", ";
5337  } else {
5338  comma = ",\n ";
5339  }
5340  os << comma;
5341  os << boost::algorithm::join(additional_info, comma);
5342  }
5343  os << ")";
5344 
5345  std::vector<std::string> with_options;
5346  if (foreign_table && !td->is_system_table) {
5347  if (multiline_formatting) {
5348  os << "\n";
5349  } else {
5350  os << " ";
5351  }
5352  os << "SERVER " << foreign_table->foreign_server->name;
5353 
5354  // gather WITH options ...
5355  for (const auto& [option, value] : foreign_table->options) {
5356  with_options.emplace_back(option + "='" + value + "'");
5357  }
5358  }
5359 
5360  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
5361  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
5362  }
5363  if (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE) {
5364  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
5365  }
5366  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
5367  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
5368  }
5369  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
5370  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
5371  }
5372  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
5373