OmniSciDB  ca0c39ec8f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "Catalog/Catalog.h"
24 
25 #include <algorithm>
26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/range/adaptor/map.hpp>
29 #include <boost/version.hpp>
30 #include <cassert>
31 #include <cerrno>
32 #include <cstdio>
33 #include <cstring>
34 #include <exception>
35 #include <fstream>
36 #include <list>
37 #include <memory>
38 #include <random>
39 #include <regex>
40 #include <sstream>
41 
42 #if BOOST_VERSION >= 106600
43 #include <boost/uuid/detail/sha1.hpp>
44 #else
45 #include <boost/uuid/sha1.hpp>
46 #endif
47 #include <rapidjson/document.h>
48 #include <rapidjson/istreamwrapper.h>
49 #include <rapidjson/ostreamwrapper.h>
50 #include <rapidjson/writer.h>
51 
52 #include "Catalog/SysCatalog.h"
53 
54 #include "QueryEngine/Execute.h"
56 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
68 #include "Parser/ParserNode.h"
69 #include "QueryEngine/Execute.h"
71 #include "RefreshTimeCalculator.h"
72 #include "Shared/DateTimeParser.h"
73 #include "Shared/File.h"
74 #include "Shared/StringTransform.h"
75 #include "Shared/SysDefinitions.h"
76 #include "Shared/measure.h"
77 #include "Shared/misc.h"
79 
80 #include "MapDRelease.h"
81 #include "RWLocks.h"
83 
84 #include "Shared/distributed.h"
85 
86 using Chunk_NS::Chunk;
89 using std::list;
90 using std::map;
91 using std::pair;
92 using std::runtime_error;
93 using std::string;
94 using std::vector;
95 
96 bool g_enable_fsi{true};
97 bool g_enable_s3_fsi{false};
101 extern bool g_cache_string_hash;
102 extern bool g_enable_system_tables;
103 
104 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
105 // under unit testing.
107 
108 namespace Catalog_Namespace {
109 
110 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
112  1073741824; // 2^30, give room for over a billion non-temp tables
114  1073741824; // 2^30, give room for over a billion non-temp dictionaries
115 
116 const std::string Catalog::physicalTableNameTag_("_shard_#");
117 
118 thread_local bool Catalog::thread_holds_read_lock = false;
119 
124 
125 // migration will be done as two step process this release
126 // will create and use new table
127 // next release will remove old table, doing this to have fall back path
128 // incase of migration failure
131  sqliteConnector_.query("BEGIN TRANSACTION");
132  try {
134  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
135  if (sqliteConnector_.getNumRows() != 0) {
136  // already done
137  sqliteConnector_.query("END TRANSACTION");
138  return;
139  }
141  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
142  "userid integer references mapd_users, state text, image_hash text, update_time "
143  "timestamp, "
144  "metadata text, UNIQUE(userid, name) )");
145  // now copy content from old table to new table
147  "insert into mapd_dashboards (id, name , "
148  "userid, state, image_hash, update_time , "
149  "metadata) "
150  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
151  "view_metadata "
152  "from mapd_frontend_views");
153  } catch (const std::exception& e) {
154  sqliteConnector_.query("ROLLBACK TRANSACTION");
155  throw;
156  }
157  sqliteConnector_.query("END TRANSACTION");
158 }
159 
160 namespace {
161 
162 inline auto table_json_filepath(const std::string& base_path,
163  const std::string& db_name) {
164  return boost::filesystem::path(base_path + "/" + shared::kCatalogDirectoryName + "/" +
165  db_name + "_temp_tables.json");
166 }
167 
168 std::map<int32_t, std::string> get_user_id_to_user_name_map();
169 } // namespace
170 
171 Catalog::Catalog(const string& basePath,
172  const DBMetadata& curDB,
173  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
174  const std::vector<LeafHostInfo>& string_dict_hosts,
175  std::shared_ptr<Calcite> calcite,
176  bool is_new_db)
177  : basePath_(basePath)
178  , sqliteConnector_(curDB.dbName, basePath + "/" + shared::kCatalogDirectoryName + "/")
179  , currentDB_(curDB)
180  , dataMgr_(dataMgr)
181  , string_dict_hosts_(string_dict_hosts)
182  , calciteMgr_(calcite)
183  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
184  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
185  , dcatalogMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
186  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
187  shared::kCatalogDirectoryName / (currentDB_.dbName + ".lockfile"),
188  [this](size_t) {
189  if (!initialized_) {
190  return;
191  }
192  const auto user_name_by_user_id = get_user_id_to_user_name_map();
194  *dsqliteMutex_);
195  reloadCatalogMetadataUnlocked(user_name_by_user_id);
196  }))
197  , dsqliteMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
198  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
199  shared::kCatalogDirectoryName / (currentDB_.dbName + ".sqlite.lockfile")))
200  , sqliteMutex_()
201  , sharedMutex_()
204  if (!g_enable_fsi) {
205  CHECK(!g_enable_system_tables) << "System tables require FSI to be enabled";
206  CHECK(!g_enable_s3_fsi) << "S3 FSI requires FSI to be enabled";
207  }
208 
209  if (!is_new_db && !g_multi_instance) {
210  CheckAndExecuteMigrations();
211  }
212 
213  buildMaps();
214 
215  if (g_enable_fsi) {
216  createDefaultServersIfNotExists();
217  }
218  if (!is_new_db) {
219  CheckAndExecuteMigrationsPostBuildMaps();
220  }
222  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
223  }
224  conditionallyInitializeSystemObjects();
225  // once all initialized use real object
226  initialized_ = true;
227 }
228 
230  // cat_write_lock write_lock(this);
231 
232  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
233  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
234  tableDescIt != tableDescriptorMap_.end();
235  ++tableDescIt) {
236  tableDescIt->second->fragmenter = nullptr;
237  delete tableDescIt->second;
238  }
239 
240  // TableDescriptorMapById points to the same descriptors. No need to delete
241 
242  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
243  columnDescIt != columnDescriptorMap_.end();
244  ++columnDescIt) {
245  delete columnDescIt->second;
246  }
247 
248  // ColumnDescriptorMapById points to the same descriptors. No need to delete
249 
251  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
252  }
253 }
254 
257  sqliteConnector_.query("BEGIN TRANSACTION");
258  try {
259  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
260  std::vector<std::string> cols;
261  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
262  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
263  }
264  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
265  cols.end()) {
266  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
268  sqliteConnector_.query(queryString);
269  }
270  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
271  cols.end()) {
272  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
273  std::to_string(0));
274  sqliteConnector_.query(queryString);
275  }
276  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
277  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
278  std::to_string(-1));
279  sqliteConnector_.query(queryString);
280  }
281  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
282  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
283  std::to_string(0));
284  sqliteConnector_.query(queryString);
285  }
286  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
287  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
288  sqliteConnector_.query(queryString);
289  }
290  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
291  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
293  sqliteConnector_.query(queryString);
294  }
295  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
296  cols.end()) {
298  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
299  }
300  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
301  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
302  sqliteConnector_.query(queryString);
303  }
304  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
305  cols.end()) {
306  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
307  std::to_string(-1));
308  sqliteConnector_.query(queryString);
309  }
310  if (std::find(cols.begin(), cols.end(), std::string("is_system_table")) ==
311  cols.end()) {
312  string queryString("ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
313  sqliteConnector_.query(queryString);
314  }
315  } catch (std::exception& e) {
316  sqliteConnector_.query("ROLLBACK TRANSACTION");
317  throw;
318  }
319  sqliteConnector_.query("END TRANSACTION");
320 }
321 
324  sqliteConnector_.query("BEGIN TRANSACTION");
325  try {
327  "select name from sqlite_master WHERE type='table' AND "
328  "name='mapd_version_history'");
329  if (sqliteConnector_.getNumRows() == 0) {
331  "CREATE TABLE mapd_version_history(version integer, migration_history text "
332  "unique)");
333  } else {
335  "select * from mapd_version_history where migration_history = "
336  "'notnull_fixlen_arrays'");
337  if (sqliteConnector_.getNumRows() != 0) {
338  // legacy fixlen arrays had migrated
339  // no need for further execution
340  sqliteConnector_.query("END TRANSACTION");
341  return;
342  }
343  }
344  // Insert check for migration
346  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
347  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
348  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
349  // Upating all fixlen array columns
350  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
351  std::to_string(kARRAY) + " AND size>0;");
352  sqliteConnector_.query(queryString);
353  } catch (std::exception& e) {
354  sqliteConnector_.query("ROLLBACK TRANSACTION");
355  throw;
356  }
357  sqliteConnector_.query("END TRANSACTION");
358 }
359 
362  sqliteConnector_.query("BEGIN TRANSACTION");
363  try {
365  "select name from sqlite_master WHERE type='table' AND "
366  "name='mapd_version_history'");
367  if (sqliteConnector_.getNumRows() == 0) {
369  "CREATE TABLE mapd_version_history(version integer, migration_history text "
370  "unique)");
371  } else {
373  "select * from mapd_version_history where migration_history = "
374  "'notnull_geo_columns'");
375  if (sqliteConnector_.getNumRows() != 0) {
376  // legacy geo columns had migrated
377  // no need for further execution
378  sqliteConnector_.query("END TRANSACTION");
379  return;
380  }
381  }
382  // Insert check for migration
384  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
385  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
386  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
387  // Upating all geo columns
388  string queryString(
389  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
390  " OR coltype=" + std::to_string(kMULTIPOINT) + " OR coltype=" +
392  " OR coltype=" + std::to_string(kPOLYGON) +
393  " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
394  sqliteConnector_.query(queryString);
395  } catch (std::exception& e) {
396  sqliteConnector_.query("ROLLBACK TRANSACTION");
397  throw;
398  }
399  sqliteConnector_.query("END TRANSACTION");
400 }
401 
404  sqliteConnector_.query("BEGIN TRANSACTION");
405  try {
406  // check table still exists
408  "SELECT name FROM sqlite_master WHERE type='table' AND "
409  "name='mapd_frontend_views'");
410  if (sqliteConnector_.getNumRows() == 0) {
411  // table does not exists
412  // no need to migrate
413  sqliteConnector_.query("END TRANSACTION");
414  return;
415  }
416  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
417  std::vector<std::string> cols;
418  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
419  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
420  }
421  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
422  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
423  }
424  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
425  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
426  }
427  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
428  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
429  }
430  } catch (std::exception& e) {
431  sqliteConnector_.query("ROLLBACK TRANSACTION");
432  throw;
433  }
434  sqliteConnector_.query("END TRANSACTION");
435 }
436 
439  sqliteConnector_.query("BEGIN TRANSACTION");
440  try {
442  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
443  "integer references mapd_users, "
444  "link text unique, view_state text, update_time timestamp, view_metadata text)");
445  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
446  std::vector<std::string> cols;
447  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
448  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
449  }
450  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
451  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
452  }
453  } catch (const std::exception& e) {
454  sqliteConnector_.query("ROLLBACK TRANSACTION");
455  throw;
456  }
457  sqliteConnector_.query("END TRANSACTION");
458 }
459 
462  sqliteConnector_.query("BEGIN TRANSACTION");
463  try {
464  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
465  // check table still exists
467  "SELECT name FROM sqlite_master WHERE type='table' AND "
468  "name='mapd_frontend_views'");
469  if (sqliteConnector_.getNumRows() == 0) {
470  // table does not exists
471  // no need to migrate
472  sqliteConnector_.query("END TRANSACTION");
473  return;
474  }
476  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
477  } catch (const std::exception& e) {
478  sqliteConnector_.query("ROLLBACK TRANSACTION");
479  throw;
480  }
481  sqliteConnector_.query("END TRANSACTION");
482 }
483 
484 // introduce DB version into the tables table
485 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
486 // old value
487 
490  if (currentDB_.dbName.length() == 0) {
491  // updateDictionaryNames dbName length is zero nothing to do here
492  return;
493  }
494  sqliteConnector_.query("BEGIN TRANSACTION");
495  try {
496  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
497  std::vector<std::string> cols;
498  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
499  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
500  }
501  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
502  LOG(INFO) << "Updating mapd_tables updatePageSize";
503  // No version number
504  // need to update the defaul tpagesize to old correct value
505  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
506  // need to add new version info
507  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
509  sqliteConnector_.query(queryString);
510  }
511  } catch (std::exception& e) {
512  sqliteConnector_.query("ROLLBACK TRANSACTION");
513  throw;
514  }
515  sqliteConnector_.query("END TRANSACTION");
516 }
517 
520  sqliteConnector_.query("BEGIN TRANSACTION");
521  try {
522  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
523  std::vector<std::string> cols;
524  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
525  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
526  }
527  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
528  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
529  // need to add new version info
530  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
532  sqliteConnector_.query(queryString);
533  // need to add new column to table defintion to indicate deleted column, column used
534  // as bitmap for deleted rows.
536  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
537  }
538  } catch (std::exception& e) {
539  sqliteConnector_.query("ROLLBACK TRANSACTION");
540  throw;
541  }
542  sqliteConnector_.query("END TRANSACTION");
543 }
544 
547  sqliteConnector_.query("BEGIN TRANSACTION");
548  try {
549  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
550  std::vector<std::string> cols;
551  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
552  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
553  }
554  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
555  LOG(INFO) << "Adding support for default values to mapd_columns";
556  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
557  }
558  } catch (std::exception& e) {
559  LOG(ERROR) << "Failed to make metadata update for default values` support";
560  sqliteConnector_.query("ROLLBACK TRANSACTION");
561  throw;
562  }
563  sqliteConnector_.query("END TRANSACTION");
564 }
565 
566 // introduce DB version into the dictionary tables
567 // if the DB does not have a version rename all dictionary tables
568 
571  if (currentDB_.dbName.length() == 0) {
572  // updateDictionaryNames dbName length is zero nothing to do here
573  return;
574  }
575  sqliteConnector_.query("BEGIN TRANSACTION");
576  try {
577  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
578  std::vector<std::string> cols;
579  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
580  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
581  }
582  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
583  // No version number
584  // need to rename dictionaries
585  string dictQuery("SELECT dictid, name from mapd_dictionaries");
586  sqliteConnector_.query(dictQuery);
587  size_t numRows = sqliteConnector_.getNumRows();
588  for (size_t r = 0; r < numRows; ++r) {
589  int dictId = sqliteConnector_.getData<int>(r, 0);
590  std::string dictName = sqliteConnector_.getData<string>(r, 1);
591 
592  std::string oldName = g_base_path + "/" + shared::kDataDirectoryName + "/" +
593  currentDB_.dbName + "_" + dictName;
594  std::string newName = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
595  std::to_string(currentDB_.dbId) + "_DICT_" +
596  std::to_string(dictId);
597 
598  int result = rename(oldName.c_str(), newName.c_str());
599 
600  if (result == 0) {
601  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
602  << newName;
603  } else {
604  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
605  << newName + " dbname '" << currentDB_.dbName << "' error code "
606  << std::to_string(result);
607  }
608  }
609  // need to add new version info
610  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
612  sqliteConnector_.query(queryString);
613  }
614  } catch (std::exception& e) {
615  sqliteConnector_.query("ROLLBACK TRANSACTION");
616  throw;
617  }
618  sqliteConnector_.query("END TRANSACTION");
619 }
620 
623  sqliteConnector_.query("BEGIN TRANSACTION");
624  try {
626  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
627  "logical_table_id integer, physical_table_id integer)");
628  } catch (const std::exception& e) {
629  sqliteConnector_.query("ROLLBACK TRANSACTION");
630  throw;
631  }
632  sqliteConnector_.query("END TRANSACTION");
633 }
634 
635 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
636  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
637  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
638  */
639 
641  sqliteConnector_.query("BEGIN TRANSACTION");
642  try {
643  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
644  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
645  const auto physicalTables = physicalTableIt->second;
646  CHECK(!physicalTables.empty());
647  for (size_t i = 0; i < physicalTables.size(); i++) {
648  int32_t physical_tb_id = physicalTables[i];
650  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
651  "physical_table_id) VALUES (?1, ?2)",
652  std::vector<std::string>{std::to_string(logical_tb_id),
653  std::to_string(physical_tb_id)});
654  }
655  }
656  } catch (std::exception& e) {
657  sqliteConnector_.query("ROLLBACK TRANSACTION");
658  throw;
659  }
660  sqliteConnector_.query("END TRANSACTION");
661 }
662 
665  sqliteConnector_.query("BEGIN TRANSACTION");
666  try {
667  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
668  std::vector<std::string> cols;
669  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
670  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
671  }
672  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
673  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
674  }
675  } catch (std::exception& e) {
676  sqliteConnector_.query("ROLLBACK TRANSACTION");
677  throw;
678  }
679  sqliteConnector_.query("END TRANSACTION");
680 }
681 
684  sqliteConnector_.query("BEGIN TRANSACTION");
685  try {
688  } catch (std::exception& e) {
689  sqliteConnector_.query("ROLLBACK TRANSACTION");
690  throw;
691  }
692  sqliteConnector_.query("END TRANSACTION");
693 }
694 
696  // TODO: Move common migration logic to a shared function.
698  sqliteConnector_.query("BEGIN TRANSACTION");
699  try {
701  "select name from sqlite_master WHERE type='table' AND "
702  "name='mapd_version_history'");
703  static const std::string migration_name{"rename_legacy_data_wrappers"};
704  if (sqliteConnector_.getNumRows() == 0) {
706  "CREATE TABLE mapd_version_history(version integer, migration_history text "
707  "unique)");
708  } else {
710  "select * from mapd_version_history where migration_history = "
711  "'" +
712  migration_name + "'");
713  if (sqliteConnector_.getNumRows() != 0) {
714  // Migration already done.
715  sqliteConnector_.query("END TRANSACTION");
716  return;
717  }
718  }
719  LOG(INFO) << "Executing " << migration_name << " migration.";
720 
721  // Update legacy data wrapper names
723  // clang-format off
724  std::map<std::string, std::string> old_to_new_wrapper_names{
725  {"OMNISCI_CSV", DataWrapperType::CSV},
726  {"OMNISCI_PARQUET", DataWrapperType::PARQUET},
727  {"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
728  {"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
729  {"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
730  {"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
731  };
732  // clang-format on
733 
734  for (const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
736  "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
737  "data_wrapper_type = ?",
738  std::vector<std::string>{new_wrapper_name, old_wrapper_name});
739  }
740 
741  // Record migration.
743  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
744  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
745  LOG(INFO) << migration_name << " migration completed.";
746  } catch (std::exception& e) {
747  sqliteConnector_.query("ROLLBACK TRANSACTION");
748  throw;
749  }
750  sqliteConnector_.query("END TRANSACTION");
751 }
752 
755  sqliteConnector_.query("BEGIN TRANSACTION");
756  try {
758  } catch (const std::exception& e) {
759  sqliteConnector_.query("ROLLBACK TRANSACTION");
760  throw;
761  }
762  sqliteConnector_.query("END TRANSACTION");
763 }
764 
765 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
766  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
767  "omnisci_foreign_servers(id integer primary key, name text unique, " +
768  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
769  "options text)";
770 }
771 
772 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
773  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
774  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
775  "options text, last_refresh_time integer, next_refresh_time integer, " +
776  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
777  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
778 }
779 
780 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
781  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
782  "omnisci_custom_expressions(id integer primary key, name text, " +
783  "expression_json text, data_source_type text, " +
784  "data_source_id integer, is_deleted boolean)";
785 }
786 
789  sqliteConnector_.query("BEGIN TRANSACTION");
790  std::vector<DBObject> objects;
791  try {
793  "SELECT name FROM sqlite_master WHERE type='table' AND "
794  "name='mapd_record_ownership_marker'");
795  // check if mapd catalog - marker exists
796  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
797  // already done
798  sqliteConnector_.query("END TRANSACTION");
799  return;
800  }
801  // check if different catalog - marker exists
802  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
803  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
804  // Check if migration is being performed on existing non mapd catalogs
805  // Older non mapd dbs will have table but no record in them
806  if (sqliteConnector_.getNumRows() != 0) {
807  // already done
808  sqliteConnector_.query("END TRANSACTION");
809  return;
810  }
811  }
812  // marker not exists - create one
813  else {
814  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
815  }
816 
817  DBMetadata db;
818  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
819  // place dbId as a refernce for migration being performed
821  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
822  std::vector<std::string>{std::to_string(db.dbOwner)});
823 
824  static const std::map<const DBObjectType, const AccessPrivileges>
825  object_level_all_privs_lookup{
831 
832  // grant owner all permissions on DB
833  DBObjectKey key;
834  key.dbId = currentDB_.dbId;
835  auto _key_place = [&key](auto type) {
836  key.permissionType = type;
837  return key;
838  };
839  for (auto& it : object_level_all_privs_lookup) {
840  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
841  objects.back().setName(currentDB_.dbName);
842  }
843 
844  {
845  // other users tables and views
846  string tableQuery(
847  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
848  sqliteConnector_.query(tableQuery);
849  size_t numRows = sqliteConnector_.getNumRows();
850  for (size_t r = 0; r < numRows; ++r) {
851  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
852  std::string tableName = sqliteConnector_.getData<string>(r, 1);
853  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
854  bool isview = sqliteConnector_.getData<bool>(r, 3);
855 
858  DBObjectKey key;
859  key.dbId = currentDB_.dbId;
860  key.objectId = tableid;
861  key.permissionType = type;
862 
863  DBObject obj(tableName, type);
864  obj.setObjectKey(key);
865  obj.setOwner(ownerid);
868 
869  objects.push_back(obj);
870  }
871  }
872 
873  {
874  // other users dashboards
875  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
876  sqliteConnector_.query(tableQuery);
877  size_t numRows = sqliteConnector_.getNumRows();
878  for (size_t r = 0; r < numRows; ++r) {
879  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
880  std::string dashName = sqliteConnector_.getData<string>(r, 1);
881  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
882 
884  DBObjectKey key;
885  key.dbId = currentDB_.dbId;
886  key.objectId = dashId;
887  key.permissionType = type;
888 
889  DBObject obj(dashName, type);
890  obj.setObjectKey(key);
891  obj.setOwner(ownerid);
893 
894  objects.push_back(obj);
895  }
896  }
897  } catch (const std::exception& e) {
898  sqliteConnector_.query("ROLLBACK TRANSACTION");
899  throw;
900  }
901  sqliteConnector_.query("END TRANSACTION");
902 
903  // now apply the objects to the syscat to track the permisisons
904  // moved outside transaction to avoid lock in sqlite
905  try {
907  } catch (const std::exception& e) {
908  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
909  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
910  e.what());
911  // will need to remove the mapd_record_ownership_marker table and retry
912  }
913 }
914 
919 }
920 
922  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
923  std::vector<std::string> dashboard_ids;
924  static const std::string migration_name{"dashboard_roles_migration"};
925  {
927  sqliteConnector_.query("BEGIN TRANSACTION");
928  try {
929  // migration_history should be present in all catalogs by now
930  // if not then would be created before this migration
932  "select * from mapd_version_history where migration_history = '" +
933  migration_name + "'");
934  if (sqliteConnector_.getNumRows() != 0) {
935  // no need for further execution
936  sqliteConnector_.query("END TRANSACTION");
937  return;
938  }
939  LOG(INFO) << "Performing dashboard internal roles Migration.";
940  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
941  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
944  sqliteConnector_.getData<string>(i, 0)))) {
945  // Successfully created roles during previous migration/crash
946  // No need to include them
947  continue;
948  }
949  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
950  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
951  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
952  }
953  } catch (const std::exception& e) {
954  sqliteConnector_.query("ROLLBACK TRANSACTION");
955  throw;
956  }
957  sqliteConnector_.query("END TRANSACTION");
958  }
959  // All current grantees with shared dashboards.
960  const auto active_grantees =
962 
963  try {
964  // NOTE(wamsi): Transactionally unsafe
965  for (auto dash : dashboards) {
966  createOrUpdateDashboardSystemRole(dash.second.second,
967  dash.second.first,
969  std::to_string(currentDB_.dbId), dash.first));
970  auto result = active_grantees.find(dash.first);
971  if (result != active_grantees.end()) {
974  dash.first)},
975  result->second);
976  }
977  }
979  // check if this has already been completed
981  "select * from mapd_version_history where migration_history = '" +
982  migration_name + "'");
983  if (sqliteConnector_.getNumRows() != 0) {
984  return;
985  }
987  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
988  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
989  } catch (const std::exception& e) {
990  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
991  << e.what();
992  throw;
993  }
994  LOG(INFO) << "Successfully created dashboard system roles during migration.";
995 }
996 
1002  updateGeoColumns();
1005  updateLinkSchema();
1009  updatePageSize();
1013  if (g_enable_fsi) {
1014  updateFsiSchemas();
1016  }
1019 }
1020 
1024 }
1025 
1026 namespace {
1027 std::map<int32_t, std::string> get_user_id_to_user_name_map() {
1028  auto users = SysCatalog::instance().getAllUserMetadata();
1029  std::map<int32_t, std::string> user_name_by_user_id;
1030  for (const auto& user : users) {
1031  user_name_by_user_id[user.userId] = user.userName;
1032  }
1033  return user_name_by_user_id;
1034 }
1035 
1037  int32_t id,
1038  const std::map<int32_t, std::string>& user_name_by_user_id) {
1039  auto entry = user_name_by_user_id.find(id);
1040  if (entry != user_name_by_user_id.end()) {
1041  return entry->second;
1042  }
1043  // a user could be deleted and a dashboard still exist?
1044  return "Unknown";
1045 }
1046 } // namespace
1047 
1049  std::string dictQuery(
1050  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1051  sqliteConnector_.query(dictQuery);
1052  auto numRows = sqliteConnector_.getNumRows();
1053  for (size_t r = 0; r < numRows; ++r) {
1054  auto dictId = sqliteConnector_.getData<int>(r, 0);
1055  auto dictName = sqliteConnector_.getData<string>(r, 1);
1056  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1057  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1058  auto refcount = sqliteConnector_.getData<int>(r, 4);
1059  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1060  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1061  DictRef dict_ref(currentDB_.dbId, dictId);
1062  auto dd = new DictDescriptor(
1063  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1064  dictDescriptorMapByRef_[dict_ref].reset(dd);
1065  }
1066 }
1067 
1068 // NOTE(sy): Only used by --multi-instance clusters.
1069 void Catalog::reloadTableMetadata(int table_id) {
1070  cat_write_lock write_lock(this);
1072  reloadTableMetadataUnlocked(table_id);
1073 }
1074 
1075 // NOTE(sy): Only used by --multi-instance clusters.
1077  // Reload dictionaries first.
1078  // TODO(sy): Does dictionary reloading really belong here?
1079  // We don't have dictionary locks in the system but maybe we need them.
1080  list<DictDescriptor> dicts;
1081  std::string dictQuery(
1082  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1083  sqliteConnector_.query(dictQuery);
1084  auto numRows = sqliteConnector_.getNumRows();
1085  for (size_t r = 0; r < numRows; ++r) {
1086  auto dictId = sqliteConnector_.getData<int>(r, 0);
1087  auto dictName = sqliteConnector_.getData<string>(r, 1);
1088  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1089  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1090  auto refcount = sqliteConnector_.getData<int>(r, 4);
1091  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1092  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1093  DictRef dict_ref(currentDB_.dbId, dictId);
1094  DictDescriptor dd(dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1095  if (auto it = dictDescriptorMapByRef_.find(dict_ref);
1096  it == dictDescriptorMapByRef_.end()) {
1097  dictDescriptorMapByRef_[dict_ref] = std::make_unique<DictDescriptor>(dd);
1098  } else {
1099  *it->second = dd;
1100  }
1101  }
1102 
1103  // Delete this table's metadata from the in-memory cache before reloading.
1104  TableDescriptor* original_td;
1105  std::list<ColumnDescriptor*> original_cds;
1106  if (auto it1 = tableDescriptorMapById_.find(table_id);
1107  it1 != tableDescriptorMapById_.end()) {
1108  original_td = it1->second;
1109  tableDescriptorMapById_.erase(it1);
1110  if (auto it2 = tableDescriptorMap_.find(original_td->tableName);
1111  it2 != tableDescriptorMap_.end()) {
1112  CHECK_EQ(original_td, it2->second);
1113  tableDescriptorMap_.erase(it2);
1114  }
1115  if (original_td->hasDeletedCol) {
1116  const auto ret = deletedColumnPerTable_.erase(original_td);
1117  CHECK_EQ(ret, size_t(1));
1118  }
1119  for (int column_id = 0; column_id < original_td->nColumns; ++column_id) {
1120  if (auto it3 = columnDescriptorMapById_.find({table_id, column_id});
1121  it3 != columnDescriptorMapById_.end()) {
1122  ColumnDescriptor* original_cd = it3->second;
1123  original_cds.push_back(original_cd);
1124  removeFromColumnMap(original_cd);
1125  }
1126  }
1127  }
1128 
1129  // Reload the table descriptor.
1130  std::string tableQuery(
1131  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1132  "max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, "
1133  "num_shards, key_metainfo, userid, sort_column_id, storage_type, "
1134  "max_rollback_epochs, is_system_table from mapd_tables WHERE tableid = " +
1135  std::to_string(table_id));
1136  sqliteConnector_.query(tableQuery);
1137  numRows = sqliteConnector_.getNumRows();
1138  if (!numRows) {
1139  return; // Table was deleted by another node.
1140  }
1141 
1142  TableDescriptor* td;
1143  const auto& storage_type = sqliteConnector_.getData<string>(0, 17);
1144  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1145  const auto& table_name = sqliteConnector_.getData<string>(0, 1);
1146  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1147  "supported table option (table "
1148  << table_name << " [" << table_id << "] in database " << currentDB_.dbName
1149  << ").";
1150  }
1151 
1152  if (storage_type == StorageType::FOREIGN_TABLE) {
1153  td = new foreign_storage::ForeignTable();
1154  } else {
1155  td = new TableDescriptor();
1156  }
1157 
1158  td->storageType = storage_type;
1159  td->tableId = sqliteConnector_.getData<int>(0, 0);
1160  td->tableName = sqliteConnector_.getData<string>(0, 1);
1161  td->nColumns = sqliteConnector_.getData<int>(0, 2);
1162  td->isView = sqliteConnector_.getData<bool>(0, 3);
1163  td->fragments = sqliteConnector_.getData<string>(0, 4);
1164  td->fragType =
1165  (Fragmenter_Namespace::FragmenterType)sqliteConnector_.getData<int>(0, 5);
1166  td->maxFragRows = sqliteConnector_.getData<int>(0, 6);
1167  td->maxChunkSize = sqliteConnector_.getData<int64_t>(0, 7);
1168  td->fragPageSize = sqliteConnector_.getData<int>(0, 8);
1169  td->maxRows = sqliteConnector_.getData<int64_t>(0, 9);
1170  td->partitions = sqliteConnector_.getData<string>(0, 10);
1171  td->shardedColumnId = sqliteConnector_.getData<int>(0, 11);
1172  td->shard = sqliteConnector_.getData<int>(0, 12);
1173  td->nShards = sqliteConnector_.getData<int>(0, 13);
1174  td->keyMetainfo = sqliteConnector_.getData<string>(0, 14);
1175  td->userId = sqliteConnector_.getData<int>(0, 15);
1176  td->sortedColumnId =
1177  sqliteConnector_.isNull(0, 16) ? 0 : sqliteConnector_.getData<int>(0, 16);
1178  if (!td->isView) {
1179  td->fragmenter = nullptr;
1180  }
1181  td->maxRollbackEpochs = sqliteConnector_.getData<int>(0, 18);
1182  td->is_system_table = sqliteConnector_.getData<bool>(0, 19);
1183  td->hasDeletedCol = false;
1184 
1185  if (auto tableDescIt = tableDescriptorMapById_.find(table_id);
1186  tableDescIt != tableDescriptorMapById_.end()) {
1187  tableDescIt->second->fragmenter = nullptr;
1188  delete tableDescIt->second;
1189  }
1190 
1191  // Reload the column descriptors.
1192  std::list<ColumnDescriptor*> cds;
1193  std::string columnQuery(
1194  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1195  "is_notnull, compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
1196  "virtual_expr, is_deletedcol, default_value from mapd_columns WHERE tableid = " +
1197  std::to_string(table_id) + " ORDER BY tableid, columnid");
1198  sqliteConnector_.query(columnQuery);
1199  numRows = sqliteConnector_.getNumRows();
1200  int32_t skip_physical_cols = 0;
1201  for (size_t r = 0; r < numRows; ++r) {
1202  ColumnDescriptor* cd = new ColumnDescriptor();
1203  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1204  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1205  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1206  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
1207  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
1208  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
1209  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1210  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
1211  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
1212  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
1213  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1214  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1215  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1216  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1217  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1218  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1219  if (sqliteConnector_.isNull(r, 16)) {
1220  cd->default_value = std::nullopt;
1221  } else {
1222  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1223  }
1224  cd->isGeoPhyCol = skip_physical_cols-- > 0;
1225  cds.push_back(cd);
1226  }
1227 
1228  // Store the descriptors into the cache.
1229  if (original_td) {
1230  td->mutex_ = original_td->mutex_; // TODO(sy): Unnecessary?
1231  delete original_td;
1232  original_td = nullptr;
1233  }
1234  for (ColumnDescriptor* original_cd : original_cds) {
1235  delete original_cd;
1236  }
1237  original_cds.clear();
1238  tableDescriptorMap_[to_upper(td->tableName)] = td;
1239  tableDescriptorMapById_[td->tableId] = td;
1240  skip_physical_cols = 0;
1241  for (ColumnDescriptor* cd : cds) {
1242  addToColumnMap(cd);
1243 
1244  if (skip_physical_cols <= 0) {
1245  skip_physical_cols = cd->columnType.get_physical_cols();
1246  }
1247 
1248  if (cd->isDeletedCol) {
1249  td->hasDeletedCol = true;
1250  setDeletedColumnUnlocked(td, cd);
1251  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1252  td->columnIdBySpi_.push_back(cd->columnId);
1253  }
1254  }
1255 
1256  // Notify Calcite about the reloaded table.
1257  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
1258 }
1259 
1260 // NOTE(sy): Only used by --multi-instance clusters.
1261 void Catalog::reloadCatalogMetadata(
1262  const std::map<int32_t, std::string>& user_name_by_user_id) {
1263  cat_write_lock write_lock(this);
1265  reloadCatalogMetadataUnlocked(get_user_id_to_user_name_map());
1266 }
1267 
1268 // NOTE(sy): Only used by --multi-instance clusters.
1269 void Catalog::reloadCatalogMetadataUnlocked(
1270  const std::map<int32_t, std::string>& user_name_by_user_id) {
1272 
1273  // Notice when tables or columns have been created, dropped, or changed by other nodes.
1274  // Needed so that users will see reasonably-correct lists of what objects exist.
1275 
1276  // Load the list of table ID's that exist on disk storage.
1277  std::set<int> cluster_table_ids;
1278  std::string tableQuery("SELECT tableid from mapd_tables");
1279  sqliteConnector_.query(tableQuery);
1280  auto numRows = sqliteConnector_.getNumRows();
1281  for (size_t r = 0; r < numRows; ++r) {
1282  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1283  cluster_table_ids.insert(table_id);
1284  }
1285 
1286  // Ignore any table ID's locked by other threads on this node.
1287  // Those other threads are already handling any necessary reloading for those tables.
1288  std::set<int> ignored_table_ids;
1289  for (ChunkKey const& k : lockmgr::TableSchemaLockMgr::instance().getLockedTables()) {
1290  CHECK_EQ(k.size(), 2U);
1291  if (k[CHUNK_KEY_DB_IDX] != currentDB_.dbId) {
1292  continue;
1293  }
1294  ignored_table_ids.insert(k[CHUNK_KEY_TABLE_IDX]);
1295  }
1296 
1297  // For this node's Catalog cache:
1298  // Newly created table schemas created by other nodes need to be loaded.
1299  // Unlocked table schemas might have been renamed by other nodes; just reload them all.
1300  // Deleted table schemas still in this node's cache need to be flushed.
1301  std::set<int> reload_table_ids;
1302  for (auto const& cluster_table_id : cluster_table_ids) {
1303  if (ignored_table_ids.find(cluster_table_id) == ignored_table_ids.end()) {
1304  reload_table_ids.insert(cluster_table_id);
1305  }
1306  }
1307  for (auto const& [cached_table_id, td] : tableDescriptorMapById_) {
1308  if (cluster_table_ids.find(cached_table_id) == cluster_table_ids.end()) {
1309  reload_table_ids.insert(cached_table_id);
1310  }
1311  }
1312 
1313  // Reload tables.
1314  for (auto const& reload_table_id : reload_table_ids) {
1315  reloadTableMetadataUnlocked(reload_table_id);
1316  }
1317 
1319 
1320  dashboardDescriptorMap_.clear();
1321  linkDescriptorMap_.clear();
1322  linkDescriptorMapById_.clear();
1323  foreignServerMap_.clear();
1324  foreignServerMapById_.clear();
1325  custom_expr_map_by_id_.clear();
1326 
1327  if (g_enable_fsi) {
1328  buildForeignServerMapUnlocked();
1329  }
1330 
1331  updateViewsInMapUnlocked();
1332  buildDashboardsMapUnlocked(user_name_by_user_id);
1333  buildLinksMapUnlocked();
1334  buildCustomExpressionsMapUnlocked();
1335 
1336  // Notify Calcite about the reloaded database.
1337  if (calciteMgr_) {
1338  calciteMgr_->updateMetadata(currentDB_.dbName, {});
1339  }
1340 }
1341 
1342 void Catalog::buildTablesMapUnlocked() {
1343  std::string tableQuery(
1344  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1345  "max_chunk_size, frag_page_size, "
1346  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1347  "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1348  "from mapd_tables");
1349  sqliteConnector_.query(tableQuery);
1350  auto numRows = sqliteConnector_.getNumRows();
1351  for (size_t r = 0; r < numRows; ++r) {
1352  TableDescriptor* td;
1353  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
1354  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1355  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1356  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
1357  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1358  "supported table option (table "
1359  << table_name << " [" << table_id << "] in database "
1360  << currentDB_.dbName << ").";
1361  }
1362 
1363  if (storage_type == StorageType::FOREIGN_TABLE) {
1364  td = new foreign_storage::ForeignTable();
1365  } else {
1366  td = new TableDescriptor();
1367  }
1368 
1369  td->storageType = storage_type;
1370  td->tableId = sqliteConnector_.getData<int>(r, 0);
1371  td->tableName = sqliteConnector_.getData<string>(r, 1);
1372  td->nColumns = sqliteConnector_.getData<int>(r, 2);
1373  td->isView = sqliteConnector_.getData<bool>(r, 3);
1374  td->fragments = sqliteConnector_.getData<string>(r, 4);
1375  td->fragType =
1376  (Fragmenter_Namespace::FragmenterType)sqliteConnector_.getData<int>(r, 5);
1377  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1378  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1379  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1380  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1381  td->partitions = sqliteConnector_.getData<string>(r, 10);
1382  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1383  td->shard = sqliteConnector_.getData<int>(r, 12);
1384  td->nShards = sqliteConnector_.getData<int>(r, 13);
1385  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1386  td->userId = sqliteConnector_.getData<int>(r, 15);
1387  td->sortedColumnId =
1388  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1389  if (!td->isView) {
1390  td->fragmenter = nullptr;
1391  }
1392  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1393  td->is_system_table = sqliteConnector_.getData<bool>(r, 19);
1394  td->hasDeletedCol = false;
1395 
1396  tableDescriptorMap_[to_upper(td->tableName)] = td;
1397  tableDescriptorMapById_[td->tableId] = td;
1398  }
1399 }
1400 
1401 void Catalog::buildColumnsMapUnlocked() {
1402  std::string columnQuery(
1403  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1404  "is_notnull, compression, comp_param, "
1405  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1406  "default_value from "
1407  "mapd_columns ORDER BY tableid, "
1408  "columnid");
1409  sqliteConnector_.query(columnQuery);
1410  auto numRows = sqliteConnector_.getNumRows();
1411  int32_t skip_physical_cols = 0;
1412  for (size_t r = 0; r < numRows; ++r) {
1413  ColumnDescriptor* cd = new ColumnDescriptor();
1414  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1415  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1416  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1417  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
1418  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
1419  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
1420  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1421  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
1422  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
1423  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
1424  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1425  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1426  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1427  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1428  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1429  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1430  if (sqliteConnector_.isNull(r, 16)) {
1431  cd->default_value = std::nullopt;
1432  } else {
1433  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1434  }
1435  cd->isGeoPhyCol = skip_physical_cols > 0;
1436  addToColumnMap(cd);
1437 
1438  if (skip_physical_cols <= 0) {
1439  skip_physical_cols = cd->columnType.get_physical_cols();
1440  }
1441 
1442  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1443  CHECK(td_itr != tableDescriptorMapById_.end());
1444 
1445  if (cd->isDeletedCol) {
1446  td_itr->second->hasDeletedCol = true;
1447  setDeletedColumnUnlocked(td_itr->second, cd);
1448  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1449  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1450  }
1451  }
1452 
1453  // sort columnIdBySpi_ based on columnId
1454  for (auto& tit : tableDescriptorMapById_) {
1455  std::sort(tit.second->columnIdBySpi_.begin(),
1456  tit.second->columnIdBySpi_.end(),
1457  [](const size_t a, const size_t b) -> bool { return a < b; });
1458  }
1459 } // namespace Catalog_Namespace
1460 
1461 void Catalog::updateViewsInMapUnlocked() {
1462  std::string viewQuery("SELECT tableid, sql FROM mapd_views");
1463  sqliteConnector_.query(viewQuery);
1464  auto numRows = sqliteConnector_.getNumRows();
1465  for (size_t r = 0; r < numRows; ++r) {
1466  auto tableId = sqliteConnector_.getData<int>(r, 0);
1467  auto td = tableDescriptorMapById_[tableId];
1468  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1469  td->fragmenter = nullptr;
1470  }
1471 }
1472 
1473 void Catalog::buildDashboardsMapUnlocked(
1474  const std::map<int32_t, std::string>& user_name_by_user_id) {
1475  std::string frontendViewQuery(
1476  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1477  "userid, "
1478  "metadata "
1479  "FROM mapd_dashboards");
1480  sqliteConnector_.query(frontendViewQuery);
1481  auto numRows = sqliteConnector_.getNumRows();
1482  for (size_t r = 0; r < numRows; ++r) {
1483  auto vd = std::make_shared<DashboardDescriptor>();
1484  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1485  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1486  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1487  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1488  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1489  vd->userId = sqliteConnector_.getData<int>(r, 5);
1490  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1491  vd->user = get_user_name_from_id(vd->userId, user_name_by_user_id);
1492  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1493  std::to_string(currentDB_.dbId), sqliteConnector_.getData<string>(r, 0));
1494  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1495  }
1496 }
1497 
1498 void Catalog::buildLinksMapUnlocked() {
1499  std::string linkQuery(
1500  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1501  "update_time), view_metadata "
1502  "FROM mapd_links");
1503  sqliteConnector_.query(linkQuery);
1504  auto numRows = sqliteConnector_.getNumRows();
1505  for (size_t r = 0; r < numRows; ++r) {
1506  auto ld = new LinkDescriptor();
1507  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1508  ld->userId = sqliteConnector_.getData<int>(r, 1);
1509  ld->link = sqliteConnector_.getData<string>(r, 2);
1510  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1511  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1512  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1513  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld->link] = ld;
1514  linkDescriptorMapById_[ld->linkId] = ld;
1515  }
1516 }
1517 
1518 void Catalog::buildLogicalToPhysicalMapUnlocked() {
1519  /* rebuild map linking logical tables to corresponding physical ones */
1520  std::string logicalToPhysicalTableMapQuery(
1521  "SELECT logical_table_id, physical_table_id "
1522  "FROM mapd_logical_to_physical");
1523  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1524  auto numRows = sqliteConnector_.getNumRows();
1525  for (size_t r = 0; r < numRows; ++r) {
1526  auto logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1527  auto physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1528  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1529  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1530  /* add new entity to the map logicalToPhysicalTableMapById_ */
1531  std::vector<int32_t> physicalTables{physical_tb_id};
1532  const auto it_ok =
1533  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1534  CHECK(it_ok.second);
1535  } else {
1536  /* update map logicalToPhysicalTableMapById_ */
1537  physicalTableIt->second.push_back(physical_tb_id);
1538  }
1539  }
1540 }
1541 
1542 // The catalog uses a series of maps to cache data that have been read from the sqlite
1543 // tables. Usually we update these maps whenever we write using sqlite, so this function
1544 // is responsible for initializing all of them based on the sqlite db state.
1545 void Catalog::buildMaps() {
1546  // Get all user id to username mapping here in order to avoid making a call to
1547  // SysCatalog (and attempting to acquire SysCatalog locks) while holding locks for this
1548  // catalog.
1549  const auto user_name_by_user_id = get_user_id_to_user_name_map();
1550 
1551  cat_write_lock write_lock(this);
1553 
1554  buildDictionaryMapUnlocked();
1555  buildTablesMapUnlocked();
1556 
1557  if (g_enable_fsi) {
1558  buildForeignServerMapUnlocked();
1559  updateForeignTablesInMapUnlocked();
1560  }
1561 
1562  buildColumnsMapUnlocked();
1563  updateViewsInMapUnlocked();
1564  buildDashboardsMapUnlocked(user_name_by_user_id);
1565  buildLinksMapUnlocked();
1566  buildLogicalToPhysicalMapUnlocked();
1567  buildCustomExpressionsMapUnlocked();
1568 }
1569 
1570 void Catalog::buildCustomExpressionsMapUnlocked() {
1571  sqliteConnector_.query(
1572  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1573  "is_deleted "
1574  "FROM omnisci_custom_expressions");
1575  auto num_rows = sqliteConnector_.getNumRows();
1576  for (size_t row = 0; row < num_rows; row++) {
1577  auto custom_expr = getCustomExpressionFromConnector(row);
1578  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1579  }
1580 }
1581 
1582 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1583  auto id = sqliteConnector_.getData<int>(row, 0);
1584  auto name = sqliteConnector_.getData<string>(row, 1);
1585  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1586  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1587  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1588  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1589  return std::make_unique<CustomExpression>(
1590  id,
1591  name,
1592  expression_json,
1593  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1594  data_source_id,
1595  is_deleted);
1596 }
1597 
1598 void Catalog::addTableToMap(const TableDescriptor* td,
1599  const list<ColumnDescriptor>& columns,
1600  const list<DictDescriptor>& dicts) {
1601  cat_write_lock write_lock(this);
1602  TableDescriptor* new_td;
1603 
1604  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1605  if (foreign_table) {
1606  auto new_foreign_table = new foreign_storage::ForeignTable();
1607  *new_foreign_table = *foreign_table;
1608  new_td = new_foreign_table;
1609  } else {
1610  new_td = new TableDescriptor();
1611  *new_td = *td;
1612  }
1613 
1614  new_td->mutex_ = std::make_shared<std::mutex>();
1615  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1616  tableDescriptorMapById_[td->tableId] = new_td;
1617  for (auto cd : columns) {
1618  ColumnDescriptor* new_cd = new ColumnDescriptor();
1619  *new_cd = cd;
1620  addToColumnMap(new_cd);
1621 
1622  // Add deleted column to the map
1623  if (cd.isDeletedCol) {
1624  CHECK(new_td->hasDeletedCol);
1625  setDeletedColumnUnlocked(new_td, new_cd);
1626  }
1627  }
1628 
1629  std::sort(new_td->columnIdBySpi_.begin(),
1630  new_td->columnIdBySpi_.end(),
1631  [](const size_t a, const size_t b) -> bool { return a < b; });
1632  // TODO(sy): Why does addTableToMap() sort columnIdBySpi_ but not insert into it while
1633  // buildColumnsMapUnlocked() does both?
1634 
1635  std::unique_ptr<StringDictionaryClient> client;
1636  DictRef dict_ref(currentDB_.dbId, -1);
1637  if (!string_dict_hosts_.empty()) {
1638  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1639  }
1640  for (auto dd : dicts) {
1641  if (!dd.dictRef.dictId) {
1642  // Dummy entry created for a shard of a logical table, nothing to do.
1643  continue;
1644  }
1645  dict_ref.dictId = dd.dictRef.dictId;
1646  if (client) {
1647  client->create(dict_ref, dd.dictIsTemp);
1648  }
1649  DictDescriptor* new_dd = new DictDescriptor(dd);
1650  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1651  if (!dd.dictIsTemp) {
1652  boost::filesystem::create_directory(new_dd->dictFolderPath);
1653  }
1654  }
1655 }
1656 
1657 void Catalog::removeTableFromMap(const string& tableName,
1658  const int tableId,
1659  const bool is_on_error) {
1660  cat_write_lock write_lock(this);
1661  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1662  if (tableDescIt == tableDescriptorMapById_.end()) {
1663  throw runtime_error("Table " + tableName + " does not exist.");
1664  }
1665 
1666  TableDescriptor* td = tableDescIt->second;
1667 
1668  if (td->hasDeletedCol) {
1669  const auto ret = deletedColumnPerTable_.erase(td);
1670  CHECK_EQ(ret, size_t(1));
1671  }
1672 
1673  tableDescriptorMapById_.erase(tableDescIt);
1674  tableDescriptorMap_.erase(to_upper(tableName));
1675  td->fragmenter = nullptr;
1676  dict_columns_by_table_id_.erase(tableId);
1677 
1679  delete td;
1680 
1681  std::unique_ptr<StringDictionaryClient> client;
1682  if (SysCatalog::instance().isAggregator()) {
1683  CHECK(!string_dict_hosts_.empty());
1684  DictRef dict_ref(currentDB_.dbId, -1);
1685  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1686  }
1687 
1688  // delete all column descriptors for the table
1689  // no more link columnIds to sequential indexes!
1690  for (auto cit = columnDescriptorMapById_.begin();
1691  cit != columnDescriptorMapById_.end();) {
1692  if (tableId != std::get<0>(cit->first)) {
1693  ++cit;
1694  } else {
1695  int i = std::get<1>(cit++->first);
1696  ColumnIdKey cidKey(tableId, i);
1697  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1698  ColumnDescriptor* cd = colDescIt->second;
1699  columnDescriptorMapById_.erase(colDescIt);
1700  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1701  columnDescriptorMap_.erase(cnameKey);
1702  const int dictId = cd->columnType.get_comp_param();
1703  // Dummy dictionaries created for a shard of a logical table have the id set to
1704  // zero.
1705  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1706  INJECT_TIMER(removingDicts);
1707  DictRef dict_ref(currentDB_.dbId, dictId);
1708  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1709  // If we're removing this table due to an error, it is possible that the string
1710  // dictionary reference was never populated. Don't crash, just continue cleaning
1711  // up the TableDescriptor and ColumnDescriptors
1712  if (!is_on_error) {
1713  CHECK(dictIt != dictDescriptorMapByRef_.end());
1714  } else {
1715  if (dictIt == dictDescriptorMapByRef_.end()) {
1716  continue;
1717  }
1718  }
1719  const auto& dd = dictIt->second;
1720  CHECK_GE(dd->refcount, 1);
1721  --dd->refcount;
1722  if (!dd->refcount) {
1723  dd->stringDict.reset();
1724  if (!isTemp) {
1725  File_Namespace::renameForDelete(dd->dictFolderPath);
1726  }
1727  if (client) {
1728  client->drop(dict_ref);
1729  }
1730  dictDescriptorMapByRef_.erase(dictIt);
1731  }
1732  }
1733 
1734  delete cd;
1735  }
1736  }
1737 }
1738 
1739 void Catalog::addFrontendViewToMap(DashboardDescriptor& vd) {
1740  cat_write_lock write_lock(this);
1741  addFrontendViewToMapNoLock(vd);
1742 }
1743 
1744 void Catalog::addFrontendViewToMapNoLock(DashboardDescriptor& vd) {
1745  cat_write_lock write_lock(this);
1746  dashboardDescriptorMap_[std::to_string(vd.userId) + ":" + vd.dashboardName] =
1747  std::make_shared<DashboardDescriptor>(vd);
1748 }
1749 
1750 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1751  const int& user_id) {
1752  std::vector<DBObject> objects;
1753  DBObjectKey key;
1754  key.dbId = currentDB_.dbId;
1755  auto _key_place = [&key](auto type, auto id) {
1756  key.permissionType = type;
1757  key.objectId = id;
1758  return key;
1759  };
1760  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1761  auto td = getMetadataForTable(object_name, false);
1762  if (!td) {
1763  // Parsed object source is not present in current database
1764  // LOG the info and ignore
1765  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1766  << " no longer exists in current DB.";
1767  continue;
1768  }
1769  // Dashboard source can be Table or View
1770  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1771  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1773  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1774  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1775  objects.back().setName(td->tableName);
1776  }
1777  return objects;
1778 }
1779 
1780 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1781  const int32_t& user_id,
1782  const std::string& dash_role_name) {
1783  auto objects = parseDashboardObjects(view_meta, user_id);
1784  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1785  if (!rl) {
1786  // Dashboard role does not exist
1787  // create role and grant privileges
1788  // NOTE(wamsi): Transactionally unsafe
1789  SysCatalog::instance().createRole(
1790  dash_role_name, /*user_private_role=*/false, /*is_temporary=*/false);
1791  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1792  } else {
1793  // Dashboard system role already exists
1794  // Add/remove privileges on objects
1795  std::set<DBObjectKey> revoke_keys;
1796  auto ex_objects = rl->getDbObjects(true);
1797  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1798  if (key.permissionType != TableDBObjectType &&
1799  key.permissionType != ViewDBObjectType) {
1800  continue;
1801  }
1802  bool found = false;
1803  for (auto obj : objects) {
1804  found = key == obj.getObjectKey() ? true : false;
1805  if (found) {
1806  break;
1807  }
1808  }
1809  if (!found) {
1810  revoke_keys.insert(key);
1811  }
1812  }
1813  for (auto& key : revoke_keys) {
1814  // revoke privs on object since the object is no
1815  // longer used by the dashboard as source
1816  // NOTE(wamsi): Transactionally unsafe
1817  SysCatalog::instance().revokeDBObjectPrivileges(
1818  dash_role_name, *rl->findDbObject(key, true), *this);
1819  }
1820  // Update privileges on remaining objects
1821  // NOTE(wamsi): Transactionally unsafe
1822  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1823  }
1824 }
1825 
1826 void Catalog::addLinkToMap(LinkDescriptor& ld) {
1827  cat_write_lock write_lock(this);
1828  LinkDescriptor* new_ld = new LinkDescriptor();
1829  *new_ld = ld;
1830  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld.link] = new_ld;
1831  linkDescriptorMapById_[ld.linkId] = new_ld;
1832 }
1833 
1834 void Catalog::instantiateFragmenter(TableDescriptor* td) const {
1835  auto time_ms = measure<>::execution([&]() {
1836  // instanciate table fragmenter upon first use
1837  // assume only insert order fragmenter is supported
1839  vector<Chunk> chunkVec;
1840  auto columnDescs = getAllColumnMetadataForTable(td->tableId, true, false, true);
1841  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1842  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1843  if (td->sortedColumnId > 0) {
1844  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1845  chunkVec,
1846  dataMgr_.get(),
1847  const_cast<Catalog*>(this),
1848  td->tableId,
1849  td->shard,
1850  td->maxFragRows,
1851  td->maxChunkSize,
1852  td->fragPageSize,
1853  td->maxRows,
1854  td->persistenceLevel);
1855  } else {
1856  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1857  chunkVec,
1858  dataMgr_.get(),
1859  const_cast<Catalog*>(this),
1860  td->tableId,
1861  td->shard,
1862  td->maxFragRows,
1863  td->maxChunkSize,
1864  td->fragPageSize,
1865  td->maxRows,
1866  td->persistenceLevel,
1867  !td->storageType.empty());
1868  }
1869  });
1870  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1871  << time_ms << "ms";
1872 }
1873 
1874 foreign_storage::ForeignTable* Catalog::getForeignTableUnlocked(
1875  const std::string& tableName) const {
1876  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1877  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1878  return nullptr;
1879  }
1880  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1881 }
1882 
1883 const foreign_storage::ForeignTable* Catalog::getForeignTable(
1884  const std::string& tableName) const {
1885  cat_read_lock read_lock(this);
1886  return getForeignTableUnlocked(tableName);
1887 }
1888 
1889 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1890  const bool populateFragmenter) const {
1891  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1892  // pure metadata calls
1893  cat_read_lock read_lock(this);
1894  auto td = getMutableMetadataForTableUnlocked(tableName);
1895  if (!td) {
1896  return nullptr;
1897  }
1898  read_lock.unlock();
1899  if (populateFragmenter) {
1900  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1901  if (td->fragmenter == nullptr && !td->isView) {
1902  instantiateFragmenter(td);
1903  }
1904  }
1905  return td; // returns pointer to table descriptor
1906 }
1907 
1908 const TableDescriptor* Catalog::getMetadataForTable(int table_id,
1909  bool populateFragmenter) const {
1910  cat_read_lock read_lock(this);
1911  auto td = getMutableMetadataForTableUnlocked(table_id);
1912  if (!td) {
1913  return nullptr;
1914  }
1915  read_lock.unlock();
1916  if (populateFragmenter) {
1917  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1918  if (td->fragmenter == nullptr && !td->isView) {
1919  instantiateFragmenter(td);
1920  }
1921  }
1922  return td;
1923 }
1924 
1925 std::optional<std::string> Catalog::getTableName(int32_t table_id) const {
1926  cat_read_lock read_lock(this);
1927  auto td = getMutableMetadataForTableUnlocked(table_id);
1928  if (!td) {
1929  return {};
1930  }
1931  return td->tableName;
1932 }
1933 
1934 std::optional<int32_t> Catalog::getTableId(const std::string& table_name) const {
1935  cat_read_lock read_lock(this);
1936  auto td = getMutableMetadataForTableUnlocked(table_name);
1937  if (!td) {
1938  return {};
1939  }
1940  return td->tableId;
1941 }
1942 
1943 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(
1944  const std::string& table_name) const {
1945  auto it = tableDescriptorMap_.find(to_upper(table_name));
1946  if (it == tableDescriptorMap_.end()) {
1947  return nullptr;
1948  }
1949  return it->second;
1950 }
1951 
1952 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(int table_id) const {
1953  auto tableDescIt = tableDescriptorMapById_.find(table_id);
1954  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1955  return nullptr;
1956  }
1957  return tableDescIt->second;
1958 }
1959 
1960 const DictDescriptor* Catalog::getMetadataForDict(const int dict_id,
1961  const bool load_dict) const {
1962  cat_read_lock read_lock(this);
1963  const DictRef dictRef(currentDB_.dbId, dict_id);
1964  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1965  if (dictDescIt ==
1966  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1967  return nullptr;
1968  }
1969  auto& dd = dictDescIt->second;
1970 
1971  if (load_dict) {
1972  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1973  if (!dd->stringDict) {
1974  auto time_ms = measure<>::execution([&]() {
1975  if (string_dict_hosts_.empty()) {
1976  if (dd->dictIsTemp) {
1977  dd->stringDict = std::make_shared<StringDictionary>(
1978  dd->dictRef, dd->dictFolderPath, true, true, g_cache_string_hash);
1979  } else {
1980  dd->stringDict = std::make_shared<StringDictionary>(
1981  dd->dictRef, dd->dictFolderPath, false, true, g_cache_string_hash);
1982  }
1983  } else {
1984  dd->stringDict =
1985  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1986  }
1987  });
1988  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1989  << dd->dictRef.dictId << " was " << time_ms << "ms";
1990  }
1991  }
1992 
1993  return dd.get();
1994 }
1995 
1996 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1997  return string_dict_hosts_;
1998 }
1999 
2000 const ColumnDescriptor* Catalog::getMetadataForColumn(int tableId,
2001  const string& columnName) const {
2002  cat_read_lock read_lock(this);
2003 
2004  ColumnKey columnKey(tableId, to_upper(columnName));
2005  auto colDescIt = columnDescriptorMap_.find(columnKey);
2006  if (colDescIt ==
2007  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
2008  return nullptr;
2009  }
2010  return colDescIt->second;
2011 }
2012 
2013 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
2014  cat_read_lock read_lock(this);
2015  ColumnIdKey columnIdKey(table_id, column_id);
2016  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2017  if (colDescIt == columnDescriptorMapById_
2018  .end()) { // need to check to make sure column exists for table
2019  return nullptr;
2020  }
2021  return colDescIt->second;
2022 }
2023 
2024 const std::optional<std::string> Catalog::getColumnName(int table_id,
2025  int column_id) const {
2026  cat_read_lock read_lock(this);
2027  auto it = columnDescriptorMapById_.find(ColumnIdKey{table_id, column_id});
2028  if (it == columnDescriptorMapById_.end()) {
2029  return {};
2030  }
2031  return it->second->columnName;
2032 }
2033 
2034 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
2035  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
2036  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2037  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
2038 
2039  auto spx = spi;
2040  int phi = 0;
2041  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
2042  {
2043  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
2044  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
2045  }
2046 
2047  CHECK(0 < spx && spx <= columnIdBySpi.size())
2048  << "spx = " << spx << ", size = " << columnIdBySpi.size();
2049  return columnIdBySpi[spx - 1] + phi;
2050 }
2051 
2052 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
2053  cat_read_lock read_lock(this);
2054  return getColumnIdBySpiUnlocked(table_id, spi);
2055 }
2056 
2057 const ColumnDescriptor* Catalog::getMetadataForColumnBySpi(const int tableId,
2058  const size_t spi) const {
2059  cat_read_lock read_lock(this);
2060 
2061  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
2062  ColumnIdKey columnIdKey(tableId, columnId);
2063  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2064  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
2065 }
2066 
2067 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
2068  const UserMetadata& user) {
2069  std::stringstream invalid_ids, restricted_ids;
2070 
2071  for (int32_t dashboard_id : dashboard_ids) {
2072  if (!getMetadataForDashboard(dashboard_id)) {
2073  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
2074  continue;
2075  }
2076  DBObject object(dashboard_id, DashboardDBObjectType);
2077  object.loadKey(*this);
2078  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
2079  std::vector<DBObject> privs = {object};
2080  if (!SysCatalog::instance().checkPrivileges(user, privs)) {
2081  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
2082  }
2083  }
2084 
2085  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
2086  std::stringstream error_message;
2087  error_message << "Delete dashboard(s) failed with error(s):";
2088  if (invalid_ids.str().size() > 0) {
2089  error_message << "\nDashboard id: " << invalid_ids.str()
2090  << " - Dashboard id does not exist";
2091  }
2092  if (restricted_ids.str().size() > 0) {
2093  error_message
2094  << "\nDashboard id: " << restricted_ids.str()
2095  << " - User should be either owner of dashboard or super user to delete it";
2096  }
2097  throw std::runtime_error(error_message.str());
2098  }
2099  std::vector<DBObject> dash_objs;
2100 
2101  for (int32_t dashboard_id : dashboard_ids) {
2102  dash_objs.emplace_back(dashboard_id, DashboardDBObjectType);
2103  }
2104  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
2105  SysCatalog::instance().revokeDBObjectPrivilegesFromAllBatch(dash_objs, this);
2106  {
2107  cat_write_lock write_lock(this);
2109 
2110  sqliteConnector_.query("BEGIN TRANSACTION");
2111  try {
2112  for (int32_t dashboard_id : dashboard_ids) {
2113  auto dash = getMetadataForDashboard(dashboard_id);
2114  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
2115  // rollback if already deleted
2116  if (!dash) {
2117  throw std::runtime_error(
2118  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
2119  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
2120  }
2121  std::string user_id = std::to_string(dash->userId);
2122  std::string dash_name = dash->dashboardName;
2123  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
2124  dashboardDescriptorMap_.erase(viewDescIt);
2125  sqliteConnector_.query_with_text_params(
2126  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
2127  std::vector<std::string>{dash_name, user_id});
2128  }
2129  } catch (std::exception& e) {
2130  sqliteConnector_.query("ROLLBACK TRANSACTION");
2131  throw;
2132  }
2133  sqliteConnector_.query("END TRANSACTION");
2134  }
2135 }
2136 
2137 const DashboardDescriptor* Catalog::getMetadataForDashboard(
2138  const string& userId,
2139  const string& dashName) const {
2140  cat_read_lock read_lock(this);
2141 
2142  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
2143  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
2144  return nullptr;
2145  }
2146  return viewDescIt->second.get(); // returns pointer to view descriptor
2147 }
2148 
2149 const DashboardDescriptor* Catalog::getMetadataForDashboard(const int32_t id) const {
2150  cat_read_lock read_lock(this);
2151  std::string userId;
2152  std::string name;
2153  bool found{false};
2154  {
2155  for (auto descp : dashboardDescriptorMap_) {
2156  auto dash = descp.second.get();
2157  if (dash->dashboardId == id) {
2158  userId = std::to_string(dash->userId);
2159  name = dash->dashboardName;
2160  found = true;
2161  break;
2162  }
2163  }
2164  }
2165  if (found) {
2166  return getMetadataForDashboard(userId, name);
2167  }
2168  return nullptr;
2169 }
2170 
2171 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
2172  cat_read_lock read_lock(this);
2173  auto linkDescIt = linkDescriptorMap_.find(link);
2174  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
2175  return nullptr;
2176  }
2177  return linkDescIt->second; // returns pointer to view descriptor
2178 }
2179 
2180 const LinkDescriptor* Catalog::getMetadataForLink(int linkId) const {
2181  cat_read_lock read_lock(this);
2182  auto linkDescIt = linkDescriptorMapById_.find(linkId);
2183  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
2184  return nullptr;
2185  }
2186  return linkDescIt->second;
2187 }
2188 
2189 const foreign_storage::ForeignTable* Catalog::getForeignTable(int table_id) const {
2190  cat_read_lock read_lock(this);
2191  const auto table = getMutableMetadataForTableUnlocked(table_id);
2192  CHECK(table);
2193  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
2194  CHECK(foreign_table);
2195  return foreign_table;
2196 }
2197 
2198 void Catalog::getAllColumnMetadataForTableImpl(
2199  const TableDescriptor* td,
2200  list<const ColumnDescriptor*>& columnDescriptors,
2201  const bool fetchSystemColumns,
2202  const bool fetchVirtualColumns,
2203  const bool fetchPhysicalColumns) const {
2204  int32_t skip_physical_cols = 0;
2205  for (const auto& columnDescriptor : columnDescriptorMapById_) {
2206  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
2207  --skip_physical_cols;
2208  continue;
2209  }
2210  auto cd = columnDescriptor.second;
2211  if (cd->tableId != td->tableId) {
2212  continue;
2213  }
2214  if (!fetchSystemColumns && cd->isSystemCol) {
2215  continue;
2216  }
2217  if (!fetchVirtualColumns && cd->isVirtualCol) {
2218  continue;
2219  }
2220  if (!fetchPhysicalColumns) {
2221  const auto& col_ti = cd->columnType;
2222  skip_physical_cols = col_ti.get_physical_cols();
2223  }
2224  columnDescriptors.push_back(cd);
2225  }
2226 }
2227 
2228 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
2229  const int tableId,
2230  const bool fetchSystemColumns,
2231  const bool fetchVirtualColumns,
2232  const bool fetchPhysicalColumns) const {
2233  cat_read_lock read_lock(this);
2234  std::list<const ColumnDescriptor*> columnDescriptors;
2235  const TableDescriptor* td = getMutableMetadataForTableUnlocked(tableId);
2236  getAllColumnMetadataForTableImpl(td,
2237  columnDescriptors,
2238  fetchSystemColumns,
2239  fetchVirtualColumns,
2240  fetchPhysicalColumns);
2241  return columnDescriptors;
2242 }
2243 
2244 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
2245  cat_read_lock read_lock(this);
2246  list<const TableDescriptor*> table_list;
2247  for (auto p : tableDescriptorMapById_) {
2248  table_list.push_back(p.second);
2249  }
2250  return table_list;
2251 }
2252 
2253 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy() const {
2254  cat_read_lock read_lock(this);
2255  std::vector<TableDescriptor> tables;
2256  tables.reserve(tableDescriptorMapById_.size());
2257  for (auto table_entry : tableDescriptorMapById_) {
2258  tables.emplace_back(*table_entry.second);
2259  tables.back().fragmenter = nullptr;
2260  }
2261  return tables;
2262 }
2263 
2264 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
2265  cat_read_lock read_lock(this);
2266  list<const DashboardDescriptor*> dashboards;
2267  for (auto dashboard_entry : dashboardDescriptorMap_) {
2268  dashboards.push_back(dashboard_entry.second.get());
2269  }
2270  return dashboards;
2271 }
2272 
2273 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataCopy() const {
2274  cat_read_lock read_lock(this);
2275  std::vector<DashboardDescriptor> dashboards;
2276  dashboards.reserve(dashboardDescriptorMap_.size());
2277  for (auto dashboard_entry : dashboardDescriptorMap_) {
2278  dashboards.emplace_back(*dashboard_entry.second);
2279  }
2280  return dashboards;
2281 }
2282 
2283 DictRef Catalog::addDictionary(ColumnDescriptor& cd) {
2284  cat_write_lock write_lock(this);
2285  const auto& td = *tableDescriptorMapById_[cd.tableId];
2286  list<DictDescriptor> dds;
2287  setColumnDictionary(cd, dds, td, true);
2288  auto& dd = dds.back();
2289  CHECK(dd.dictRef.dictId);
2290 
2291  std::unique_ptr<StringDictionaryClient> client;
2292  if (!string_dict_hosts_.empty()) {
2293  client.reset(new StringDictionaryClient(
2294  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
2295  }
2296  if (client) {
2297  client->create(dd.dictRef, dd.dictIsTemp);
2298  }
2299 
2300  DictDescriptor* new_dd = new DictDescriptor(dd);
2301  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2302  if (!dd.dictIsTemp) {
2303  boost::filesystem::create_directory(new_dd->dictFolderPath);
2304  }
2305  return dd.dictRef;
2306 }
2307 
2308 void Catalog::delDictionary(const ColumnDescriptor& cd) {
2309  cat_write_lock write_lock(this);
2311  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
2312  return;
2313  }
2314  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
2315  return;
2316  }
2317  const auto dictId = cd.columnType.get_comp_param();
2318  CHECK_GT(dictId, 0);
2319  // decrement and zero check dict ref count
2320  const auto td = getMetadataForTable(cd.tableId, false);
2321  CHECK(td);
2322  sqliteConnector_.query_with_text_param(
2323  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2324  std::to_string(dictId));
2325  sqliteConnector_.query_with_text_param(
2326  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
2327  const auto refcount = sqliteConnector_.getData<int>(0, 0);
2328  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
2329  << refcount;
2330  if (refcount > 0) {
2331  return;
2332  }
2333  const DictRef dictRef(currentDB_.dbId, dictId);
2334  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
2335  std::to_string(dictId));
2337  "/DB_" + std::to_string(currentDB_.dbId) + "_DICT_" +
2338  std::to_string(dictId));
2339 
2340  std::unique_ptr<StringDictionaryClient> client;
2341  if (!string_dict_hosts_.empty()) {
2342  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
2343  }
2344  if (client) {
2345  client->drop(dictRef);
2346  }
2347 
2348  dictDescriptorMapByRef_.erase(dictRef);
2349 }
2350 
2351 void Catalog::getDictionary(const ColumnDescriptor& cd,
2352  std::map<int, StringDictionary*>& stringDicts) {
2353  // learn 'committed' ColumnDescriptor of this column
2354  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2355  CHECK(cit != columnDescriptorMap_.end());
2356  auto& ccd = *cit->second;
2357 
2358  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2359  return;
2360  }
2361  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
2362  return;
2363  }
2364  if (!(ccd.columnType.get_comp_param() > 0)) {
2365  return;
2366  }
2367 
2368  auto dictId = ccd.columnType.get_comp_param();
2369  getMetadataForDict(dictId);
2370 
2371  const DictRef dictRef(currentDB_.dbId, dictId);
2372  auto dit = dictDescriptorMapByRef_.find(dictRef);
2373  CHECK(dit != dictDescriptorMapByRef_.end());
2374  CHECK(dit->second);
2375  CHECK(dit->second.get()->stringDict);
2376  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2377 }
2378 
2379 void Catalog::addColumn(const TableDescriptor& td, ColumnDescriptor& cd) {
2380  // caller must handle sqlite/chunk transaction TOGETHER
2381  cd.tableId = td.tableId;
2382  if (td.nShards > 0 && td.shard < 0) {
2383  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2384  auto shard_cd = cd;
2385  addColumn(*shard, shard_cd);
2386  }
2387  }
2389  addDictionary(cd);
2390  }
2391 
2392  using BindType = SqliteConnector::BindType;
2393  std::vector<BindType> types(17, BindType::TEXT);
2394  if (!cd.default_value.has_value()) {
2395  types[16] = BindType::NULL_TYPE;
2396  }
2397  sqliteConnector_.query_with_text_params(
2398  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2399  "colscale, is_notnull, "
2400  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2401  "is_deletedcol, default_value) "
2402  "VALUES (?, "
2403  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2404  "?, ?, ?, "
2405  "?, "
2406  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2407  std::vector<std::string>{std::to_string(td.tableId),
2408  std::to_string(td.tableId),
2409  cd.columnName,
2418  "",
2421  cd.virtualExpr,
2423  cd.default_value.value_or("NULL")},
2424  types);
2425 
2426  sqliteConnector_.query_with_text_params(
2427  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2428  std::vector<std::string>{std::to_string(td.tableId)});
2429 
2430  sqliteConnector_.query_with_text_params(
2431  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2432  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
2433  cd.columnId = sqliteConnector_.getData<int>(0, 0);
2434 
2435  ++tableDescriptorMapById_[td.tableId]->nColumns;
2436  auto ncd = new ColumnDescriptor(cd);
2437  addToColumnMap(ncd);
2438  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
2439 }
2440 
2441 void Catalog::dropColumn(const TableDescriptor& td, const ColumnDescriptor& cd) {
2442  {
2443  cat_write_lock write_lock(this);
2445  // caller must handle sqlite/chunk transaction TOGETHER
2446  sqliteConnector_.query_with_text_params(
2447  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2448  std::vector<std::string>{std::to_string(td.tableId),
2449  std::to_string(cd.columnId)});
2450 
2451  sqliteConnector_.query_with_text_params(
2452  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2453  std::vector<std::string>{std::to_string(td.tableId)});
2454 
2455  ColumnDescriptorMap::iterator columnDescIt =
2456  columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2457  CHECK(columnDescIt != columnDescriptorMap_.end());
2458 
2459  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2460  removeFromColumnMap(columnDescIt->second);
2461  --tableDescriptorMapById_[td.tableId]->nColumns;
2462  }
2463 
2464  // for each shard
2465  if (td.nShards > 0 && td.shard < 0) {
2466  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2467  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2468  CHECK(shard_cd);
2469  dropColumn(*shard, *shard_cd);
2470  }
2471  }
2472 }
2473 
2474 void Catalog::roll(const bool forward) {
2475  cat_write_lock write_lock(this);
2476  std::set<const TableDescriptor*> tds;
2477 
2478  for (const auto& cdr : columnDescriptorsForRoll) {
2479  auto ocd = cdr.first;
2480  auto ncd = cdr.second;
2481  CHECK(ocd || ncd);
2482  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2483  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2484  auto td = tabDescIt->second;
2485  auto& vc = td->columnIdBySpi_;
2486  if (forward) {
2487  if (ocd) {
2488  if (nullptr == ncd ||
2489  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2490  delDictionary(*ocd);
2491  }
2492 
2493  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2494 
2495  delete ocd;
2496  }
2497  if (ncd) {
2498  // append columnId if its new and not phy geo
2499  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2500  if (!ncd->isGeoPhyCol) {
2501  vc.push_back(ncd->columnId);
2502  }
2503  }
2504  }
2505  tds.insert(td);
2506  } else {
2507  if (ocd) {
2508  addToColumnMap(ocd);
2509  }
2510  // roll back the dict of new column
2511  if (ncd) {
2512  removeFromColumnMap(ncd);
2513  if (nullptr == ocd ||
2514  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2515  delDictionary(*ncd);
2516  }
2517  delete ncd;
2518  }
2519  }
2520  }
2521  columnDescriptorsForRoll.clear();
2522 
2523  if (forward) {
2524  for (const auto td : tds) {
2525  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2526  }
2527  }
2528 }
2529 
2530 void Catalog::expandGeoColumn(const ColumnDescriptor& cd,
2531  list<ColumnDescriptor>& columns) {
2532  const auto& col_ti = cd.columnType;
2533  if (IS_GEO(col_ti.get_type())) {
2534  switch (col_ti.get_type()) {
2535  case kPOINT: {
2536  ColumnDescriptor physical_cd_coords(true);
2537  physical_cd_coords.columnName = cd.columnName + "_coords";
2538  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2539  // Raw data: compressed/uncompressed coords
2540  coords_ti.set_subtype(kTINYINT);
2541  size_t unit_size;
2542  if (col_ti.get_compression() == kENCODING_GEOINT &&
2543  col_ti.get_comp_param() == 32) {
2544  unit_size = 4 * sizeof(int8_t);
2545  } else {
2546  CHECK(col_ti.get_compression() == kENCODING_NONE);
2547  unit_size = 8 * sizeof(int8_t);
2548  }
2549  coords_ti.set_size(2 * unit_size);
2550  physical_cd_coords.columnType = coords_ti;
2551  columns.push_back(physical_cd_coords);
2552 
2553  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2554 
2555  break;
2556  }
2557  case kMULTIPOINT:
2558  case kLINESTRING: {
2559  ColumnDescriptor physical_cd_coords(true);
2560  physical_cd_coords.columnName = cd.columnName + "_coords";
2561  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2562  // Raw data: compressed/uncompressed coords
2563  coords_ti.set_subtype(kTINYINT);
2564  physical_cd_coords.columnType = coords_ti;
2565  columns.push_back(physical_cd_coords);
2566 
2567  ColumnDescriptor physical_cd_bounds(true);
2568  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2569  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2570  bounds_ti.set_subtype(kDOUBLE);
2571  bounds_ti.set_size(4 * sizeof(double));
2572  physical_cd_bounds.columnType = bounds_ti;
2573  columns.push_back(physical_cd_bounds);
2574 
2575  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2576 
2577  break;
2578  }
2579  case kMULTILINESTRING: {
2580  ColumnDescriptor physical_cd_coords(true);
2581  physical_cd_coords.columnName = cd.columnName + "_coords";
2582  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2583  // Raw data: compressed/uncompressed coords
2584  coords_ti.set_subtype(kTINYINT);
2585  physical_cd_coords.columnType = coords_ti;
2586  columns.push_back(physical_cd_coords);
2587 
2588  ColumnDescriptor physical_cd_linestring_sizes(true);
2589  physical_cd_linestring_sizes.columnName = cd.columnName + "_linestring_sizes";
2590  SQLTypeInfo linestring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2591  linestring_sizes_ti.set_subtype(kINT);
2592  physical_cd_linestring_sizes.columnType = linestring_sizes_ti;
2593  columns.push_back(physical_cd_linestring_sizes);
2594 
2595  ColumnDescriptor physical_cd_bounds(true);
2596  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2597  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2598  bounds_ti.set_subtype(kDOUBLE);
2599  bounds_ti.set_size(4 * sizeof(double));
2600  physical_cd_bounds.columnType = bounds_ti;
2601  columns.push_back(physical_cd_bounds);
2602 
2603  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2604 
2605  break;
2606  }
2607  case kPOLYGON: {
2608  ColumnDescriptor physical_cd_coords(true);
2609  physical_cd_coords.columnName = cd.columnName + "_coords";
2610  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2611  // Raw data: compressed/uncompressed coords
2612  coords_ti.set_subtype(kTINYINT);
2613  physical_cd_coords.columnType = coords_ti;
2614  columns.push_back(physical_cd_coords);
2615 
2616  ColumnDescriptor physical_cd_ring_sizes(true);
2617  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2618  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2619  ring_sizes_ti.set_subtype(kINT);
2620  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2621  columns.push_back(physical_cd_ring_sizes);
2622 
2623  ColumnDescriptor physical_cd_bounds(true);
2624  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2625  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2626  bounds_ti.set_subtype(kDOUBLE);
2627  bounds_ti.set_size(4 * sizeof(double));
2628  physical_cd_bounds.columnType = bounds_ti;
2629  columns.push_back(physical_cd_bounds);
2630 
2631  ColumnDescriptor physical_cd_render_group(true);
2632  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2633  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2634  physical_cd_render_group.columnType = render_group_ti;
2635  columns.push_back(physical_cd_render_group);
2636 
2637  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2638 
2639  break;
2640  }
2641  case kMULTIPOLYGON: {
2642  ColumnDescriptor physical_cd_coords(true);
2643  physical_cd_coords.columnName = cd.columnName + "_coords";
2644  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2645  // Raw data: compressed/uncompressed coords
2646  coords_ti.set_subtype(kTINYINT);
2647  physical_cd_coords.columnType = coords_ti;
2648  columns.push_back(physical_cd_coords);
2649 
2650  ColumnDescriptor physical_cd_ring_sizes(true);
2651  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2652  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2653  ring_sizes_ti.set_subtype(kINT);
2654  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2655  columns.push_back(physical_cd_ring_sizes);
2656 
2657  ColumnDescriptor physical_cd_poly_rings(true);
2658  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2659  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2660  poly_rings_ti.set_subtype(kINT);
2661  physical_cd_poly_rings.columnType = poly_rings_ti;
2662  columns.push_back(physical_cd_poly_rings);
2663 
2664  ColumnDescriptor physical_cd_bounds(true);
2665  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2666  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2667  bounds_ti.set_subtype(kDOUBLE);
2668  bounds_ti.set_size(4 * sizeof(double));
2669  physical_cd_bounds.columnType = bounds_ti;
2670  columns.push_back(physical_cd_bounds);
2671 
2672  ColumnDescriptor physical_cd_render_group(true);
2673  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2674  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2675  physical_cd_render_group.columnType = render_group_ti;
2676  columns.push_back(physical_cd_render_group);
2677 
2678  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2679 
2680  break;
2681  }
2682  default:
2683  throw runtime_error("Unrecognized geometry type.");
2684  break;
2685  }
2686  }
2687 }
2688 
2689 namespace {
2691  auto timing_type_entry =
2693  CHECK(timing_type_entry != foreign_table.options.end());
2694  if (timing_type_entry->second ==
2697  foreign_table.options);
2698  }
2700 }
2701 } // namespace
2702 
2703 void Catalog::createTable(
2704  TableDescriptor& td,
2705  const list<ColumnDescriptor>& cols,
2706  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2707  bool isLogicalTable) {
2708  cat_write_lock write_lock(this);
2709  list<ColumnDescriptor> cds = cols;
2710  list<DictDescriptor> dds;
2711  std::set<std::string> toplevel_column_names;
2712  list<ColumnDescriptor> columns;
2713 
2714  if (!td.storageType.empty() &&
2717  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2718  }
2719  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2720  }
2721 
2722  for (auto cd : cds) {
2723  if (cd.columnName == "rowid") {
2724  throw std::runtime_error(
2725  "Cannot create column with name rowid. rowid is a system defined column.");
2726  }
2727  columns.push_back(cd);
2728  toplevel_column_names.insert(cd.columnName);
2729  if (cd.columnType.is_geometry()) {
2730  expandGeoColumn(cd, columns);
2731  }
2732  }
2733  cds.clear();
2734 
2735  ColumnDescriptor cd;
2736  // add row_id column -- Must be last column in the table
2737  cd.columnName = "rowid";
2738  cd.isSystemCol = true;
2739  cd.columnType = SQLTypeInfo(kBIGINT, true);
2740 #ifdef MATERIALIZED_ROWID
2741  cd.isVirtualCol = false;
2742 #else
2743  cd.isVirtualCol = true;
2744  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2745 #endif
2746  columns.push_back(cd);
2747  toplevel_column_names.insert(cd.columnName);
2748 
2749  if (td.hasDeletedCol) {
2750  ColumnDescriptor cd_del;
2751  cd_del.columnName = "$deleted$";
2752  cd_del.isSystemCol = true;
2753  cd_del.isVirtualCol = false;
2754  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2755  cd_del.isDeletedCol = true;
2756 
2757  columns.push_back(cd_del);
2758  }
2759 
2760  td.nColumns = columns.size();
2761  // TODO(sy): don't take disk locks or touch sqlite connector for temporary tables
2763  sqliteConnector_.query("BEGIN TRANSACTION");
2765  try {
2766  sqliteConnector_.query_with_text_params(
2767  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2768  std::vector<std::string>{td.tableName,
2769  std::to_string(td.userId),
2771  std::to_string(td.isView),
2772  "",
2777  std::to_string(td.maxRows),
2778  td.partitions,
2780  std::to_string(td.shard),
2781  std::to_string(td.nShards),
2783  td.storageType,
2786  td.keyMetainfo});
2787 
2788  // now get the auto generated tableid
2789  sqliteConnector_.query_with_text_param(
2790  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2791  td.tableId = sqliteConnector_.getData<int>(0, 0);
2792  int colId = 1;
2793  for (auto cd : columns) {
2795  const bool is_foreign_col =
2796  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2797  if (!is_foreign_col) {
2798  // Ideally we would like to not persist string dictionaries for system tables,
2799  // since system table content can be highly dynamic and string dictionaries
2800  // are not currently vacuumed. However, in distributed this causes issues
2801  // when the cluster is out of sync (when the agg resets but leaves persist) so
2802  // for the sake of testing we need to leave this as normal dictionaries until
2803  // we solve the distributed issue.
2804  auto use_temp_dictionary = false; // td.is_system_table;
2805  setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
2806  }
2807  }
2808 
2809  if (toplevel_column_names.count(cd.columnName)) {
2810  if (!cd.isGeoPhyCol) {
2811  td.columnIdBySpi_.push_back(colId);
2812  }
2813  }
2814 
2815  using BindType = SqliteConnector::BindType;
2816  std::vector<BindType> types(17, BindType::TEXT);
2817  if (!cd.default_value.has_value()) {
2818  types[16] = BindType::NULL_TYPE;
2819  }
2820  sqliteConnector_.query_with_text_params(
2821  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2822  "coldim, colscale, is_notnull, "
2823  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2824  "virtual_expr, is_deletedcol, default_value) "
2825  "VALUES (?, ?, ?, ?, ?, "
2826  "?, "
2827  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2828  std::vector<std::string>{std::to_string(td.tableId),
2829  std::to_string(colId),
2830  cd.columnName,
2839  "",
2842  cd.virtualExpr,
2844  cd.default_value.value_or("NULL")},
2845  types);
2846  cd.tableId = td.tableId;
2847  cd.columnId = colId++;
2848  cds.push_back(cd);
2849  }
2850  if (td.isView) {
2851  sqliteConnector_.query_with_text_params(
2852  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2853  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2854  }
2856  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2857  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2858  sqliteConnector_.query_with_text_params(
2859  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2860  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2861  std::vector<std::string>{std::to_string(foreign_table.tableId),
2862  std::to_string(foreign_table.foreign_server->id),
2863  foreign_table.getOptionsAsJsonString(),
2864  std::to_string(foreign_table.last_refresh_time),
2865  std::to_string(foreign_table.next_refresh_time)});
2866  }
2867  } catch (std::exception& e) {
2868  sqliteConnector_.query("ROLLBACK TRANSACTION");
2869  throw;
2870  }
2871  } else { // Temporary table
2872  td.tableId = nextTempTableId_++;
2873  int colId = 1;
2874  for (auto cd : columns) {
2876  const bool is_foreign_col =
2877  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2878 
2879  if (!is_foreign_col) {
2880  // Create a new temporary dictionary
2881  std::string fileName("");
2882  std::string folderPath("");
2883  DictRef dict_ref(currentDB_.dbId, nextTempDictId_);
2884  nextTempDictId_++;
2885  DictDescriptor dd(dict_ref,
2886  fileName,
2888  false,
2889  1,
2890  folderPath,
2891  true); // Is dictName (2nd argument) used?
2892  dds.push_back(dd);
2893  if (!cd.columnType.is_array()) {
2895  }
2896  cd.columnType.set_comp_param(dict_ref.dictId);
2897  }
2898  }
2899  if (toplevel_column_names.count(cd.columnName)) {
2900  if (!cd.isGeoPhyCol) {
2901  td.columnIdBySpi_.push_back(colId);
2902  }
2903  }
2904  cd.tableId = td.tableId;
2905  cd.columnId = colId++;
2906  cds.push_back(cd);
2907  }
2908 
2910  serializeTableJsonUnlocked(&td, cds);
2911  }
2912  }
2913 
2914  try {
2915  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
2916  if (cache) {
2917  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
2918  << "Disk cache at " + cache->getCacheDirectory()
2919  << " contains preexisting data for new table. Please "
2920  "delete or clear cache before continuing";
2921  }
2922 
2923  addTableToMap(&td, cds, dds);
2924  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2925  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2926  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2927  }
2928  } catch (std::exception& e) {
2929  sqliteConnector_.query("ROLLBACK TRANSACTION");
2930  removeTableFromMap(td.tableName, td.tableId, true);
2931  throw;
2932  }
2933  sqliteConnector_.query("END TRANSACTION");
2934 
2935  if (td.storageType != StorageType::FOREIGN_TABLE) {
2936  write_lock.unlock();
2937  sqlite_lock.unlock();
2938  getMetadataForTable(td.tableName,
2939  true); // cause instantiateFragmenter() to be called
2940  }
2941 }
2942 
2943 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
2944  const std::list<ColumnDescriptor>& cds) const {
2945  // relies on the catalog write lock
2946  using namespace rapidjson;
2947 
2948  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2949 
2950  const auto db_name = currentDB_.dbName;
2951  const auto file_path = table_json_filepath(basePath_, db_name);
2952 
2953  Document d;
2954  if (boost::filesystem::exists(file_path)) {
2955  // look for an existing file for this database
2956  std::ifstream reader(file_path.string());
2957  CHECK(reader.is_open());
2958  IStreamWrapper json_read_wrapper(reader);
2959  d.ParseStream(json_read_wrapper);
2960  } else {
2961  d.SetObject();
2962  }
2963  CHECK(d.IsObject());
2964  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
2965 
2966  Value table(kObjectType);
2967  table.AddMember(
2968  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
2969  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
2970  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
2971 
2972  for (const auto& cd : cds) {
2973  Value column(kObjectType);
2974  column.AddMember(
2975  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
2976  column.AddMember("coltype",
2977  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
2978  d.GetAllocator());
2979  column.AddMember("colsubtype",
2980  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
2981  d.GetAllocator());
2982  column.AddMember("compression",
2983  Value().SetInt(static_cast<int>(cd.columnType.get_compression())),
2984  d.GetAllocator());
2985  column.AddMember("comp_param",
2986  Value().SetInt(static_cast<int>(cd.columnType.get_comp_param())),
2987  d.GetAllocator());
2988  column.AddMember("size",
2989  Value().SetInt(static_cast<int>(cd.columnType.get_size())),
2990  d.GetAllocator());
2991  column.AddMember(
2992  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
2993  column.AddMember(
2994  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
2995  column.AddMember(
2996  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
2997  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
2998  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
2999  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
3000  table["columns"].PushBack(column, d.GetAllocator());
3001  }
3002  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
3003 
3004  // Overwrite the existing file
3005  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3006  CHECK(writer.is_open());
3007  OStreamWrapper json_wrapper(writer);
3008 
3009  Writer<OStreamWrapper> json_writer(json_wrapper);
3010  d.Accept(json_writer);
3011  writer.close();
3012 }
3013 
3014 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
3015  // relies on the catalog write lock
3016  using namespace rapidjson;
3017 
3018  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
3019 
3020  const auto db_name = currentDB_.dbName;
3021  const auto file_path = table_json_filepath(basePath_, db_name);
3022 
3023  CHECK(boost::filesystem::exists(file_path));
3024  Document d;
3025 
3026  std::ifstream reader(file_path.string());
3027  CHECK(reader.is_open());
3028  IStreamWrapper json_read_wrapper(reader);
3029  d.ParseStream(json_read_wrapper);
3030 
3031  CHECK(d.IsObject());
3032  auto table_name_ref = StringRef(table_name.c_str());
3033  CHECK(d.HasMember(table_name_ref));
3034  CHECK(d.RemoveMember(table_name_ref));
3035 
3036  // Overwrite the existing file
3037  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3038  CHECK(writer.is_open());
3039  OStreamWrapper json_wrapper(writer);
3040 
3041  Writer<OStreamWrapper> json_writer(json_wrapper);
3042  d.Accept(json_writer);
3043  writer.close();
3044 }
3045 
3046 void Catalog::createForeignServer(
3047  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3048  bool if_not_exists) {
3049  cat_write_lock write_lock(this);
3051  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
3052 }
3053 
3054 void Catalog::createForeignServerNoLocks(
3055  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3056  bool if_not_exists) {
3057  const auto& name = foreign_server->name;
3058 
3059  sqliteConnector_.query_with_text_params(
3060  "SELECT name from omnisci_foreign_servers where name = ?",
3061  std::vector<std::string>{name});
3062 
3063  if (sqliteConnector_.getNumRows() == 0) {
3064  foreign_server->creation_time = std::time(nullptr);
3065  sqliteConnector_.query_with_text_params(
3066  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
3067  "creation_time, "
3068  "options) "
3069  "VALUES (?, ?, ?, ?, ?)",
3070  std::vector<std::string>{name,
3071  foreign_server->data_wrapper_type,
3072  std::to_string(foreign_server->user_id),
3073  std::to_string(foreign_server->creation_time),
3074  foreign_server->getOptionsAsJsonString()});
3075  sqliteConnector_.query_with_text_params(
3076  "SELECT id from omnisci_foreign_servers where name = ?",
3077  std::vector<std::string>{name});
3078  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
3079  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
3080  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
3081  std::move(foreign_server);
3082  CHECK(foreignServerMap_.find(name) == foreignServerMap_.end())
3083  << "Attempting to insert a foreign server into foreign server map that already "
3084  "exists.";
3085  foreignServerMap_[name] = foreign_server_shared;
3086  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
3087  } else if (!if_not_exists) {
3088  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
3089  "\" already exists."};
3090  }
3091 
3092  const auto& server_it = foreignServerMap_.find(name);
3093  CHECK(server_it != foreignServerMap_.end());
3094  CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
3095 }
3096 
3097 const foreign_storage::ForeignServer* Catalog::getForeignServer(
3098  const std::string& server_name) const {
3099  foreign_storage::ForeignServer* foreign_server = nullptr;
3100  cat_read_lock read_lock(this);
3101 
3102  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
3103  foreign_server = foreignServerMap_.find(server_name)->second.get();
3104  }
3105  return foreign_server;
3106 }
3107 
3108 const std::unique_ptr<const foreign_storage::ForeignServer>
3109 Catalog::getForeignServerFromStorage(const std::string& server_name) {
3110  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
3112  sqliteConnector_.query_with_text_params(
3113  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
3114  "FROM omnisci_foreign_servers WHERE name = ?",
3115  std::vector<std::string>{server_name});
3116  if (sqliteConnector_.getNumRows() > 0) {
3117  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
3118  sqliteConnector_.getData<int>(0, 0),
3119  sqliteConnector_.getData<std::string>(0, 1),
3120  sqliteConnector_.getData<std::string>(0, 2),
3121  sqliteConnector_.getData<std::string>(0, 3),
3122  sqliteConnector_.getData<std::int32_t>(0, 4),
3123  sqliteConnector_.getData<std::int32_t>(0, 5));
3124  }
3125  return foreign_server;
3126 }
3127 
3128 const std::unique_ptr<const foreign_storage::ForeignTable>
3129 Catalog::getForeignTableFromStorage(int table_id) {
3130  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
3132  sqliteConnector_.query_with_text_params(
3133  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
3134  "omnisci_foreign_tables WHERE table_id = ?",
3135  std::vector<std::string>{to_string(table_id)});
3136  auto num_rows = sqliteConnector_.getNumRows();
3137  if (num_rows > 0) {
3138  CHECK_EQ(size_t(1), num_rows);
3139  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
3140  sqliteConnector_.getData<int>(0, 0),
3141  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
3142  sqliteConnector_.getData<std::string>(0, 2),
3143  sqliteConnector_.getData<int64_t>(0, 3),
3144  sqliteConnector_.getData<int64_t>(0, 4));
3145  }
3146  return foreign_table;
3147 }
3148 
3149 void Catalog::changeForeignServerOwner(const std::string& server_name,
3150  const int new_owner_id) {
3151  cat_write_lock write_lock(this);
3152  foreign_storage::ForeignServer* foreign_server =
3153  foreignServerMap_.find(server_name)->second.get();
3154  CHECK(foreign_server);
3155  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
3156  // update in-memory server
3157  foreign_server->user_id = new_owner_id;
3158 }
3159 
3160 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
3161  const std::string& data_wrapper) {
3162  cat_write_lock write_lock(this);
3163  auto data_wrapper_type = to_upper(data_wrapper);
3164  // update in-memory server
3165  foreign_storage::ForeignServer* foreign_server =
3166  foreignServerMap_.find(server_name)->second.get();
3167  CHECK(foreign_server);
3168  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
3169  foreign_server->data_wrapper_type = data_wrapper_type;
3170  try {
3171  foreign_server->validate();
3172  } catch (const std::exception& e) {
3173  // validation did not succeed:
3174  // revert to saved data_wrapper_type & throw exception
3175  foreign_server->data_wrapper_type = saved_data_wrapper_type;
3176  throw;
3177  }
3178  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
3179 }
3180 
3181 void Catalog::setForeignServerOptions(const std::string& server_name,
3182  const std::string& options) {
3183  cat_write_lock write_lock(this);
3184  // update in-memory server
3185  foreign_storage::ForeignServer* foreign_server =
3186  foreignServerMap_.find(server_name)->second.get();
3187  CHECK(foreign_server);
3188  auto saved_options = foreign_server->options;
3189  foreign_server->populateOptionsMap(options, true);
3190  try {
3191  foreign_server->validate();
3192  } catch (const std::exception& e) {
3193  // validation did not succeed:
3194  // revert to saved options & throw exception
3195  foreign_server->options = saved_options;
3196  throw;
3197  }
3198  setForeignServerProperty(server_name, "options", options);
3199 }
3200 
3201 void Catalog::renameForeignServer(const std::string& server_name,
3202  const std::string& name) {
3203  cat_write_lock write_lock(this);
3204  auto foreign_server_it = foreignServerMap_.find(server_name);
3205  CHECK(foreign_server_it != foreignServerMap_.end());
3206  setForeignServerProperty(server_name, "name", name);
3207  auto foreign_server_shared = foreign_server_it->second;
3208  foreign_server_shared->name = name;
3209  foreignServerMap_[name] = foreign_server_shared;
3210  foreignServerMap_.erase(foreign_server_it);
3211 }
3212 
3213 void Catalog::dropForeignServer(const std::string& server_name) {
3214  cat_write_lock write_lock(this);
3216 
3217  sqliteConnector_.query_with_text_params(
3218  "SELECT id from omnisci_foreign_servers where name = ?",
3219  std::vector<std::string>{server_name});
3220  auto num_rows = sqliteConnector_.getNumRows();
3221  if (num_rows > 0) {
3222  CHECK_EQ(size_t(1), num_rows);
3223  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
3224  sqliteConnector_.query_with_text_param(
3225  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
3226  std::to_string(server_id));
3227  if (sqliteConnector_.getNumRows() > 0) {
3228  throw std::runtime_error{"Foreign server \"" + server_name +
3229  "\" is referenced "
3230  "by existing foreign tables and cannot be dropped."};
3231  }
3232  sqliteConnector_.query("BEGIN TRANSACTION");
3233  try {
3234  sqliteConnector_.query_with_text_params(
3235  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
3236  std::vector<std::string>{server_name});
3237  } catch (const std::exception& e) {
3238  sqliteConnector_.query("ROLLBACK TRANSACTION");
3239  throw;
3240  }
3241  sqliteConnector_.query("END TRANSACTION");
3242  foreignServerMap_.erase(server_name);
3243  foreignServerMapById_.erase(server_id);
3244  }
3245 }
3246 
3247 void Catalog::getForeignServersForUser(
3248  const rapidjson::Value* filters,
3249  const UserMetadata& user,
3250  std::vector<const foreign_storage::ForeignServer*>& results) {
3251  sys_read_lock syscat_read_lock(&SysCatalog::instance());
3252  cat_read_lock read_lock(this);
3254  // Customer facing and internal SQlite names
3255  std::map<std::string, std::string> col_names{{"server_name", "name"},
3256  {"data_wrapper", "data_wrapper_type"},
3257  {"created_at", "creation_time"},
3258  {"options", "options"}};
3259 
3260  // TODO add "owner" when FSI privilege is implemented
3261  std::stringstream filter_string;
3262  std::vector<std::string> arguments;
3263 
3264  if (filters != nullptr) {
3265  // Create SQL WHERE clause for SQLite query
3266  int num_filters = 0;
3267  filter_string << " WHERE";
3268  for (auto& filter_def : filters->GetArray()) {
3269  if (num_filters > 0) {
3270  filter_string << " " << std::string(filter_def["chain"].GetString());
3271  ;
3272  }
3273 
3274  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
3275  col_names.end()) {
3276  throw std::runtime_error{"Attribute with name \"" +
3277  std::string(filter_def["attribute"].GetString()) +
3278  "\" does not exist."};
3279  }
3280 
3281  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
3282 
3283  bool equals_operator = false;
3284  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
3285  filter_string << " = ? ";
3286  equals_operator = true;
3287  } else {
3288  filter_string << " LIKE ? ";
3289  }
3290 
3291  bool timestamp_column =
3292  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
3293 
3294  if (timestamp_column && !equals_operator) {
3295  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
3296  }
3297 
3298  if (timestamp_column && equals_operator) {
3299  arguments.push_back(std::to_string(
3300  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
3301  } else {
3302  arguments.emplace_back(filter_def["value"].GetString());
3303  }
3304 
3305  num_filters++;
3306  }
3307  }
3308  // Create select query for the omnisci_foreign_servers table
3309  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
3310  query += filter_string.str();
3311 
3312  sqliteConnector_.query_with_text_params(query, arguments);
3313  auto num_rows = sqliteConnector_.getNumRows();
3314 
3315  if (sqliteConnector_.getNumRows() == 0) {
3316  return;
3317  }
3318 
3319  CHECK(sqliteConnector_.getNumCols() == 1);
3320  // Return pointers to objects
3321  results.reserve(num_rows);
3322  for (size_t row = 0; row < num_rows; ++row) {
3323  const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
3324  if (shared::contains(INTERNAL_SERVERS, server_name)) {
3325  continue;
3326  }
3327  const foreign_storage::ForeignServer* foreign_server = getForeignServer(server_name);
3328  CHECK(foreign_server != nullptr);
3329 
3330  DBObject dbObject(foreign_server->name, ServerDBObjectType);
3331  dbObject.loadKey(*this);
3332  std::vector<DBObject> privObjects = {dbObject};
3333  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
3334  // skip server, as there are no privileges to access it
3335  continue;
3336  }
3337  results.push_back(foreign_server);
3338  }
3339 }
3340 
3341 // returns the table epoch or -1 if there is something wrong with the shared epoch
3342 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
3343  cat_read_lock read_lock(this);
3344  const auto td = getMetadataForTable(table_id, false);
3345  if (!td) {
3346  std::stringstream table_not_found_error_message;
3347  table_not_found_error_message << "Table (" << db_id << "," << table_id
3348  << ") not found";
3349  throw std::runtime_error(table_not_found_error_message.str());
3350  }
3351  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3352  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3353  // check all shards have same checkpoint
3354  const auto physicalTables = physicalTableIt->second;
3355  CHECK(!physicalTables.empty());
3356  size_t curr_epoch{0}, first_epoch{0};
3357  int32_t first_table_id{0};
3358  bool are_epochs_inconsistent{false};
3359  for (size_t i = 0; i < physicalTables.size(); i++) {
3360  int32_t physical_tb_id = physicalTables[i];
3361  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3362  CHECK(phys_td);
3363 
3364  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3365  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3366  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
3367  if (i == 0) {
3368  first_epoch = curr_epoch;
3369  first_table_id = physical_tb_id;
3370  } else if (first_epoch != curr_epoch) {
3371  are_epochs_inconsistent = true;
3372  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
3373  << ", db id: " << db_id
3374  << ". First table (table id: " << first_table_id
3375  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
3376  << ", has inconsistent epoch: " << curr_epoch
3377  << ". See previous INFO logs for all epochs and their table ids.";
3378  }
3379  }
3380  if (are_epochs_inconsistent) {
3381  // oh dear the shards do not agree on the epoch for this table
3382  return -1;
3383  }
3384  return curr_epoch;
3385  } else {
3386  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3387  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3388  << ", epoch: " << epoch;
3389  return epoch;
3390  }
3391 }
3392 
3393 std::vector<const foreign_storage::ForeignTable*>
3394 Catalog::getAllForeignTablesForForeignServer(const int32_t foreign_server_id) {
3395  cat_read_lock read_lock(this);
3396  std::vector<const foreign_storage::ForeignTable*> foreign_tables;
3397  for (auto entry : tableDescriptorMapById_) {
3398  auto table_descriptor = entry.second;
3399  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
3400  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
3401  CHECK(foreign_table);
3402  if (foreign_table->foreign_server->id == foreign_server_id) {
3403  foreign_tables.emplace_back(foreign_table);
3404  }
3405  }
3406  }
3407  return foreign_tables;
3408 }
3409 
3410 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
3411  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
3412  << " back to new epoch " << new_epoch;
3413  const auto td = getMetadataForTable(table_id, false);
3414  if (!td) {
3415  std::stringstream table_not_found_error_message;
3416  table_not_found_error_message << "Table (" << db_id << "," << table_id
3417  << ") not found";
3418  throw std::runtime_error(table_not_found_error_message.str());
3419  }
3420  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3421  std::stringstream is_temp_table_error_message;
3422  is_temp_table_error_message << "Cannot set epoch on temporary table";
3423  throw std::runtime_error(is_temp_table_error_message.str());
3424  }
3425 
3426  File_Namespace::FileMgrParams file_mgr_params;
3427  file_mgr_params.epoch = new_epoch;
3428  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3429 
3430  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3431  CHECK(!physical_tables.empty());
3432  for (const auto table : physical_tables) {
3433  auto table_id = table->tableId;
3434  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID " << table_id
3435  << " back to new epoch " << new_epoch;
3436  // Should have table lock from caller so safe to do this after, avoids
3437  // having to repopulate data on error
3438  removeChunks(table_id);
3439  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3440  }
3441 }
3442 
3443 void Catalog::alterPhysicalTableMetadata(
3444  const TableDescriptor* td,
3445  const TableDescriptorUpdateParams& table_update_params) {
3446  // Only called from parent alterTableParamMetadata, expect already to have catalog and
3447  // sqlite write locks
3448 
3449  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
3450 
3451  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
3452  CHECK(mutable_td);
3453  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
3454  sqliteConnector_.query_with_text_params(
3455  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3456  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
3457  std::to_string(td->tableId)});
3458  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
3459  }
3460 
3461  if (td->maxRows != table_update_params.max_rows) {
3462  sqliteConnector_.query_with_text_params(
3463  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3464  std::vector<std::string>{std::to_string(table_update_params.max_rows),
3465  std::to_string(td->tableId)});
3466  mutable_td->maxRows = table_update_params.max_rows;
3467  }
3468 }
3469 
3470 void Catalog::alterTableMetadata(const TableDescriptor* td,
3471  const TableDescriptorUpdateParams& table_update_params) {
3472  cat_write_lock write_lock(this);
3474  sqliteConnector_.query("BEGIN TRANSACTION");
3475  try {
3476  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
3477  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3478  const auto physical_tables = physical_table_it->second;
3479  CHECK(!physical_tables.empty());
3480  for (size_t i = 0; i < physical_tables.size(); i++) {
3481  int32_t physical_tb_id = physical_tables[i];
3482  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3483  CHECK(phys_td);
3484  alterPhysicalTableMetadata(phys_td, table_update_params);
3485  }
3486  }
3487  alterPhysicalTableMetadata(td, table_update_params);
3488  } catch (std::exception& e) {
3489  sqliteConnector_.query("ROLLBACK TRANSACTION");
3490  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
3491  }
3492  sqliteConnector_.query("END TRANSACTION");
3493 }
3494 
3495 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3496  const int32_t max_rollback_epochs) {
3497  // Must be called from AlterTableParamStmt or other method that takes executor and
3498  // TableSchema locks
3499  if (max_rollback_epochs <= -1) {
3500  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3501  }
3502  const auto td = getMetadataForTable(
3503  table_id, false); // Deep copy as there will be gap between read and write locks
3504  CHECK(td); // Existence should have already been checked in
3505  // ParserNode::AlterTableParmStmt
3506  TableDescriptorUpdateParams table_update_params(td);
3507  table_update_params.max_rollback_epochs = max_rollback_epochs;
3508  if (table_update_params == td) { // Operator is overloaded to test for equality
3509  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3510  << " to existing value, skipping operation";
3511  return;
3512  }
3513  File_Namespace::FileMgrParams file_mgr_params;
3514  file_mgr_params.epoch = -1; // Use existing epoch
3515  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3516  setTableFileMgrParams(table_id, file_mgr_params);
3517  alterTableMetadata(td, table_update_params);
3518 }
3519 
3520 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3521  if (max_rows < 0) {
3522  throw std::runtime_error("Max rows cannot be a negative number.");
3523  }
3524  const auto td = getMetadataForTable(table_id);
3525  CHECK(td);
3526  TableDescriptorUpdateParams table_update_params(td);
3527  table_update_params.max_rows = max_rows;
3528  if (table_update_params == td) {
3529  LOG(INFO) << "Max rows value of " << max_rows
3530  << " is the same as the existing value. Skipping update.";
3531  return;
3532  }
3533  alterTableMetadata(td, table_update_params);
3534  CHECK(td->fragmenter);
3535  td->fragmenter->dropFragmentsToSize(max_rows);
3536 }
3537 
3538 // For testing purposes only
3539 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3540  cat_write_lock write_lock(this);
3541  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3542  CHECK(td_entry != tableDescriptorMap_.end());
3543  auto td = td_entry->second;
3544 
3545  std::vector<int> table_key{getCurrentDB().dbId, td->tableId};
3546  ResultSetCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
3547 
3548  TableDescriptorUpdateParams table_update_params(td);
3549  table_update_params.max_rollback_epochs = -1;
3550  write_lock.unlock();
3551 
3552  alterTableMetadata(td, table_update_params);
3553  File_Namespace::FileMgrParams file_mgr_params;
3554  file_mgr_params.max_rollback_epochs = -1;
3555  setTableFileMgrParams(td->tableId, file_mgr_params);
3556 }
3557 
3558 void Catalog::setTableFileMgrParams(
3559  const int table_id,
3560  const File_Namespace::FileMgrParams& file_mgr_params) {
3561  // Expects parent to have write lock
3562  const auto td = getMetadataForTable(table_id, false);
3563  const auto db_id = this->getDatabaseId();
3564  if (!td) {
3565  std::stringstream table_not_found_error_message;
3566  table_not_found_error_message << "Table (" << db_id << "," << table_id
3567  << ") not found";
3568  throw std::runtime_error(table_not_found_error_message.str());
3569  }
3570  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3571  std::stringstream is_temp_table_error_message;
3572  is_temp_table_error_message << "Cannot set storage params on temporary table";
3573  throw std::runtime_error(is_temp_table_error_message.str());
3574  }
3575 
3576  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3577  CHECK(!physical_tables.empty());
3578  for (const auto table : physical_tables) {
3579  auto table_id = table->tableId;
3580  removeChunks(table_id);
3581  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3582  }
3583 }
3584 
3585 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3586  const int32_t table_id) const {
3587  cat_read_lock read_lock(this);
3588  std::vector<TableEpochInfo> table_epochs;
3589  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3590  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3591  const auto physical_tables = physical_table_it->second;
3592  CHECK(!physical_tables.empty());
3593 
3594  for (const auto physical_tb_id : physical_tables) {
3595  const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3596  CHECK(phys_td);
3597 
3598  auto table_id = phys_td->tableId;
3599  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3600  table_epochs.emplace_back(table_id, epoch);
3601  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3602  << ", table id: " << table_id << ", epoch: " << epoch;
3603  }
3604  } else {
3605  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3606  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3607  << ", epoch: " << epoch;
3608  table_epochs.emplace_back(table_id, epoch);
3609  }
3610  return table_epochs;
3611 }
3612 
3613 void Catalog::setTableEpochs(const int32_t db_id,
3614  const std::vector<TableEpochInfo>& table_epochs) const {
3615  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3616  CHECK(td);
3617  File_Namespace::FileMgrParams file_mgr_params;
3618  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3619 
3620  for (const auto& table_epoch_info : table_epochs) {
3621  removeChunks(table_epoch_info.table_id);
3622  file_mgr_params.epoch = table_epoch_info.table_epoch;
3623  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3624  db_id, table_epoch_info.table_id, file_mgr_params);
3625  LOG(INFO) << "Set table epoch for db id: " << db_id
3626  << ", table id: " << table_epoch_info.table_id
3627  << ", back to epoch: " << table_epoch_info.table_epoch;
3628  }
3629 }
3630 
3631 namespace {
3632 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3633  std::string table_epochs_str{"["};
3634  bool first_entry{true};
3635  for (const auto& table_epoch : table_epochs) {
3636  if (first_entry) {
3637  first_entry = false;
3638  } else {
3639  table_epochs_str += ", ";
3640  }
3641  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3642  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3643  }
3644  table_epochs_str += "]";
3645  return table_epochs_str;
3646 }
3647 } // namespace
3648 
3649 void Catalog::setTableEpochsLogExceptions(
3650  const int32_t db_id,
3651  const std::vector<TableEpochInfo>& table_epochs) const {
3652  try {
3653  setTableEpochs(db_id, table_epochs);
3654  } catch (std::exception& e) {
3655  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3656  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3657  << ", Error: " << e.what();
3658  }
3659 }
3660 
3661 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3662  cat_read_lock read_lock(this);
3663  const auto it = deletedColumnPerTable_.find(td);
3664  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3665 }
3666 
3667 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3668  int delete_column_id) const {
3669  // check if there are rows deleted by examining the deletedColumn metadata
3670  CHECK(td);
3671  auto fragmenter = td->fragmenter;
3672  if (fragmenter) {
3673  return fragmenter->hasDeletedRows(delete_column_id);
3674  } else {
3675  return false;
3676  }
3677 }
3678 
3679 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3680  const TableDescriptor* td) const {
3681  std::vector<const TableDescriptor*> tds;
3682  const ColumnDescriptor* cd;
3683  {
3684  cat_read_lock read_lock(this);
3685 
3686  const auto it = deletedColumnPerTable_.find(td);
3687  // if not a table that supports delete return nullptr, nothing more to do
3688  if (it == deletedColumnPerTable_.end()) {
3689  return nullptr;
3690  }
3691  cd = it->second;
3692  tds = getPhysicalTablesDescriptors(td, false);
3693  }
3694  // individual tables are still protected by higher level locks
3695  for (auto tdd : tds) {
3696  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3697  return cd;
3698  }
3699  }
3700  // no deletes so far recorded in metadata
3701  return nullptr;
3702 }
3703 
3704 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3705  cat_write_lock write_lock(this);
3706  setDeletedColumnUnlocked(td, cd);
3707 }
3708 
3709 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3710  const ColumnDescriptor* cd) {
3711  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3712  CHECK(it_ok.second);
3713 }
3714 
3715 namespace {
3716 
3718  const Catalog& cat,
3719  const Parser::SharedDictionaryDef& shared_dict_def) {
3720  const auto& table_name = shared_dict_def.get_foreign_table();
3721  const auto td = cat.getMetadataForTable(table_name, false);
3722  CHECK(td);
3723  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3724  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3725 }
3726 
3727 } // namespace
3728 
3729 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3730  Parser::SharedDictionaryDef shared_dict_def,
3731  const bool persist_reference) {
3732  cat_write_lock write_lock(this);
3733  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3734  CHECK(foreign_ref_col);
3735  referencing_column.columnType = foreign_ref_col->columnType;
3736  const int dict_id = referencing_column.columnType.get_comp_param();
3737  const DictRef dict_ref(currentDB_.dbId, dict_id);
3738  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3739  CHECK(dictIt != dictDescriptorMapByRef_.end());
3740  const auto& dd = dictIt->second;
3741  CHECK_GE(dd->refcount, 1);
3742  ++dd->refcount;
3743  if (persist_reference) {
3745  sqliteConnector_.query_with_text_params(
3746  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3747  {std::to_string(dict_id)});
3748  }
3749 }
3750 
3751 bool Catalog::setColumnSharedDictionary(
3752  ColumnDescriptor& cd,
3753  std::list<ColumnDescriptor>& cdd,
3754  std::list<DictDescriptor>& dds,
3755  const TableDescriptor td,
3756  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3757  cat_write_lock write_lock(this);
3759 
3760  if (shared_dict_defs.empty()) {
3761  return false;
3762  }
3763  for (const auto& shared_dict_def : shared_dict_defs) {
3764  // check if the current column is a referencing column
3765  const auto& column = shared_dict_def.get_column();
3766  if (cd.columnName == column) {
3767  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3768  // Dictionaries are being shared in table to be created
3769  const auto& ref_column = shared_dict_def.get_foreign_column();
3770  auto colIt =
3771  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3772  return !ref_column.compare(it.columnName);
3773  });
3774  CHECK(colIt != cdd.end());
3775  cd.columnType = colIt->columnType;
3776 
3777  const int dict_id = colIt->columnType.get_comp_param();
3778  CHECK_GE(dict_id, 1);
3779  auto dictIt = std::find_if(
3780  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3781  return it.dictRef.dbId == this->currentDB_.dbId &&
3782  it.dictRef.dictId == dict_id;
3783  });
3784  if (dictIt != dds.end()) {
3785  // There exists dictionary definition of a dictionary column
3786  CHECK_GE(dictIt->refcount, 1);
3787  ++dictIt->refcount;
3788  if (!table_is_temporary(&td)) {
3789  // Persist reference count
3790  sqliteConnector_.query_with_text_params(
3791  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3792  {std::to_string(dict_id)});
3793  }
3794  } else {
3795  // The dictionary is referencing a column which is referencing a column in
3796  // diffrent table
3797  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3798  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3799  }
3800  } else {
3801  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3802  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3803  if (table_is_temporary(foreign_td)) {
3804  if (!table_is_temporary(&td)) {
3805  throw std::runtime_error(
3806  "Only temporary tables can share dictionaries with other temporary "
3807  "tables.");
3808  }
3809  addReferenceToForeignDict(cd, shared_dict_def, false);
3810  } else {
3811  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3812  }
3813  }
3814  return true;
3815  }
3816  }
3817  return false;
3818 }
3819 
3820 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
3821  std::list<DictDescriptor>& dds,
3822  const TableDescriptor& td,
3823  bool is_logical_table,
3824  bool use_temp_dictionary) {
3825  cat_write_lock write_lock(this);
3826 
3827  std::string dictName{"Initial_key"};
3828  int dictId{0};
3829  std::string folderPath;
3830  if (is_logical_table) {
3832 
3833  sqliteConnector_.query_with_text_params(
3834  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3835  "?, 1)",
3836  std::vector<std::string>{
3837  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3838  sqliteConnector_.query_with_text_param(
3839  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3840  dictId = sqliteConnector_.getData<int>(0, 0);
3841  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3842  sqliteConnector_.query_with_text_param(
3843  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3844  folderPath = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
3845  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
3846  }
3847  DictDescriptor dd(currentDB_.dbId,
3848  dictId,
3849  dictName,
3851  false,
3852  1,
3853  folderPath,
3854  use_temp_dictionary);
3855  dds.push_back(dd);
3856  if (!cd.columnType.is_array()) {
3858  }
3859  cd.columnType.set_comp_param(dictId);
3860 }
3861 
3862 void Catalog::createShardedTable(
3863  TableDescriptor& td,
3864  const list<ColumnDescriptor>& cols,
3865  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3866  /* create logical table */
3867  TableDescriptor* tdl = &td;
3868  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3869  int32_t logical_tb_id = tdl->tableId;
3870  std::string logical_table_name = tdl->tableName;
3871 
3872  /* create physical tables and link them to the logical table */
3873  std::vector<int32_t> physicalTables;
3874  for (int32_t i = 1; i <= td.nShards; i++) {
3875  TableDescriptor* tdp = &td;
3876  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3877  tdp->shard = i - 1;
3878  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3879  int32_t physical_tb_id = tdp->tableId;
3880 
3881  /* add physical table to the vector of physical tables */
3882  physicalTables.push_back(physical_tb_id);
3883  }
3884 
3885  if (!physicalTables.empty()) {
3886  cat_write_lock write_lock(this);
3887  /* add logical to physical tables correspondence to the map */
3888  const auto it_ok =
3889  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3890  CHECK(it_ok.second);
3891  /* update sqlite mapd_logical_to_physical in sqlite database */
3892  if (!table_is_temporary(&td)) {
3893  updateLogicalToPhysicalTableMap(logical_tb_id);
3894  }
3895  }
3896 }
3897 
3898 void Catalog::truncateTable(const TableDescriptor* td) {
3899  // truncate all corresponding physical tables
3900  const auto physical_tables = getPhysicalTablesDescriptors(td);
3901  for (const auto table : physical_tables) {
3902  doTruncateTable(table);
3903  }
3904 }
3905 
3906 void Catalog::doTruncateTable(const TableDescriptor* td) {
3907  // must destroy fragmenter before deleteChunks is called.
3908  removeFragmenterForTable(td->tableId);
3909 
3910  const int tableId = td->tableId;
3911  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3912  // assuming deleteChunksWithPrefix is atomic
3913  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3914  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3915 
3916  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3917 
3918  cat_write_lock write_lock(this);
3919  std::unique_ptr<StringDictionaryClient> client;
3920  if (SysCatalog::instance().isAggregator()) {
3921  CHECK(!string_dict_hosts_.empty());
3922  DictRef dict_ref(currentDB_.dbId, -1);
3923  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3924  }
3925  // clean up any dictionaries
3926  // delete all column descriptors for the table
3927  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3928  auto cd = columnDescriptor.second;
3929  if (cd->tableId != td->tableId) {
3930  continue;
3931  }
3932  const int dict_id = cd->columnType.get_comp_param();
3933  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3934  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3935  const DictRef dict_ref(currentDB_.dbId, dict_id);
3936  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3937  CHECK(dictIt != dictDescriptorMapByRef_.end());
3938  const auto& dd = dictIt->second;
3939  CHECK_GE(dd->refcount, 1);
3940  // if this is the only table using this dict reset the dict
3941  if (dd->refcount == 1) {
3942  // close the dictionary
3943  dd->stringDict.reset();
3944  File_Namespace::renameForDelete(dd->dictFolderPath);
3945  if (client) {
3946  client->drop(dd->dictRef);
3947  }
3948  if (!dd->dictIsTemp) {
3949  boost::filesystem::create_directory(dd->dictFolderPath);
3950  }
3951  }
3952 
3953  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
3954  dd->dictName,
3955  dd->dictNBits,
3956  dd->dictIsShared,
3957  dd->refcount,
3958  dd->dictFolderPath,
3959  dd->dictIsTemp);
3960  dictDescriptorMapByRef_.erase(dictIt);
3961  // now create new Dict -- need to figure out what to do here for temp tables
3962  if (client) {
3963  client->create(new_dd->dictRef, new_dd->dictIsTemp);
3964  }
3965  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
3966  getMetadataForDict(new_dd->dictRef.dictId);
3967  }
3968  }
3969 }
3970 
3971 // NOTE(sy): Only used by --multi-instance clusters.
3972 void Catalog::invalidateCachesForTable(const int table_id) {
3973  // When called, exactly one thread has a LockMgr data or insert lock for the table.
3974  cat_read_lock read_lock(this);
3975  ChunkKey const table_key{getDatabaseId(), table_id};
3976  auto td = getMutableMetadataForTableUnlocked(table_id);
3977  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::GPU_LEVEL);
3978  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::CPU_LEVEL);
3979  DeleteTriggeredCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
3980  // TODO(sy): doTruncateTable() says "destroy fragmenter before deleteChunks is called"
3981  // removeFragmenterForTable(table_key[CHUNK_KEY_TABLE_IDX]);
3982  if (td->fragmenter != nullptr) {
3983  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3984  CHECK(tableDescIt != tableDescriptorMapById_.end());
3985  tableDescIt->second->fragmenter = nullptr;
3986  CHECK(td->fragmenter == nullptr);
3987  }
3988  getDataMgr().getGlobalFileMgr()->closeFileMgr(table_key[CHUNK_KEY_DB_IDX],
3989  table_key[CHUNK_KEY_TABLE_IDX]);
3990  // getMetadataForTable(table_key[CHUNK_KEY_TABLE_IDX], /*populateFragmenter=*/true);
3991  instantiateFragmenter(td);
3992 }
3993 
3994 void Catalog::removeFragmenterForTable(const int table_id) const {
3995  cat_write_lock write_lock(this);
3996  auto td = getMetadataForTable(table_id, false);
3997  if (td->fragmenter != nullptr) {
3998  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3999  CHECK(tableDescIt != tableDescriptorMapById_.end());
4000  tableDescIt->second->fragmenter = nullptr;
4001  CHECK(td->fragmenter == nullptr);
4002  }
4003 }
4004 
4005 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
4006 void Catalog::removeChunks(const int table_id) const {
4007  removeFragmenterForTable(table_id);
4008 
4009  // remove the chunks from in memory structures
4010  ChunkKey chunkKey = {currentDB_.dbId, table_id};
4011 
4012  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
4013  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
4014 }
4015 
4016 void Catalog::dropTable(const TableDescriptor* td) {
4017  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
4019  std::vector<const TableDescriptor*> tables_to_drop;
4020  {
4021  cat_read_lock read_lock(this);
4022  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
4023  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4024  // remove all corresponding physical tables if this is a logical table
4025  const auto physicalTables = physicalTableIt->second;
4026  CHECK(!physicalTables.empty());
4027  for (size_t i = 0; i < physicalTables.size(); i++) {
4028  int32_t physical_tb_id = physicalTables[i];
4029  const TableDescriptor* phys_td =
4030  getMutableMetadataForTableUnlocked(physical_tb_id);
4031  CHECK(phys_td);
4032  tables_to_drop.emplace_back(phys_td);
4033  }
4034  }
4035  tables_to_drop.emplace_back(td);
4036  }
4037 
4038  for (auto table : tables_to_drop) {
4039  eraseTablePhysicalData(table);
4040  }
4041  deleteTableCatalogMetadata(td, tables_to_drop);
4042 }
4043 
4044 void Catalog::deleteTableCatalogMetadata(
4045  const TableDescriptor* logical_table,
4046  const std::vector<const TableDescriptor*>& physical_tables) {
4047  cat_write_lock write_lock(this);
4049  sqliteConnector_.query("BEGIN TRANSACTION");
4050  try {
4051  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
4052  sqliteConnector_.query_with_text_param(
4053  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
4054  std::to_string(logical_table->tableId));
4055  logicalToPhysicalTableMapById_.erase(logical_table->tableId);
4056  for (auto table : physical_tables) {
4057  eraseTableMetadata(table);
4058  }
4059  } catch (std::exception& e) {
4060  sqliteConnector_.query("ROLLBACK TRANSACTION");
4061  throw;
4062  }
4063  sqliteConnector_.query("END TRANSACTION");
4064 }
4065 
4066 void Catalog::eraseTableMetadata(const TableDescriptor* td) {
4067  executeDropTableSqliteQueries(td);
4069  dropTableFromJsonUnlocked(td->tableName);
4070  }
4071  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4072  {
4073  INJECT_TIMER(removeTableFromMap_);
4074  removeTableFromMap(td->tableName, td->tableId);
4075  }
4076 }
4077 
4078 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
4079  const int tableId = td->tableId;
4080  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
4081  std::to_string(tableId));
4082  sqliteConnector_.query_with_text_params(
4083  "select comp_param from mapd_columns where compression = ? and tableid = ?",
4084  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4085  int numRows = sqliteConnector_.getNumRows();
4086  std::vector<int> dict_id_list;
4087  for (int r = 0; r < numRows; ++r) {
4088  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
4089  }
4090  for (auto dict_id : dict_id_list) {
4091  sqliteConnector_.query_with_text_params(
4092  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
4093  std::vector<std::string>{std::to_string(dict_id)});
4094  }
4095  sqliteConnector_.query_with_text_params(
4096  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
4097  "mapd_columns where compression = ? "
4098  "and tableid = ?) and refcount = 0",
4099  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4100  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
4101  std::to_string(tableId));
4102  if (td->isView) {
4103  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
4104  std::to_string(tableId));
4105  }
4107  sqliteConnector_.query_with_text_param(
4108  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
4109  }
4110 }
4111 
4112 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
4113  cat_write_lock write_lock(this);
4115 
4116  sqliteConnector_.query("BEGIN TRANSACTION");
4117  try {
4118  sqliteConnector_.query_with_text_params(
4119  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4120  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
4121  } catch (std::exception& e) {
4122  sqliteConnector_.query("ROLLBACK TRANSACTION");
4123  throw;
4124  }
4125  sqliteConnector_.query("END TRANSACTION");
4126  TableDescriptorMap::iterator tableDescIt =
4127  tableDescriptorMap_.find(to_upper(td->tableName));
4128  CHECK(tableDescIt != tableDescriptorMap_.end());
4129  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4130  // Get table descriptor to change it
4131  TableDescriptor* changeTd = tableDescIt->second;
4132  changeTd->tableName = newTableName;
4133  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4134  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4135  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4136 }
4137 
4138 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
4139  {
4140  cat_write_lock write_lock(this);
4141  cat_sqlite_lock sqlite_lock(this);
4142  // rename all corresponding physical tables if this is a logical table
4143  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
4144  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4145  const auto physicalTables = physicalTableIt->second;
4146  CHECK(!physicalTables.empty());
4147  for (size_t i = 0; i < physicalTables.size(); i++) {
4148  int32_t physical_tb_id = physicalTables[i];
4149  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4150  CHECK(phys_td);
4151  std::string newPhysTableName =
4152  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
4153  renamePhysicalTable(phys_td, newPhysTableName);
4154  }
4155  }
4156  renamePhysicalTable(td, newTableName);
4157  }
4158  {
4159  DBObject object(newTableName, TableDBObjectType);
4160  // update table name in direct and effective priv map
4161  DBObjectKey key;
4162  key.dbId = currentDB_.dbId;
4163  key.objectId = td->tableId;
4164  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4165  object.setObjectKey(key);
4166  auto objdescs = SysCatalog::instance().getMetadataForObject(
4167  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
4168  for (auto obj : objdescs) {
4169  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4170  if (grnt) {
4171  grnt->renameDbObject(object);
4172  }
4173  }
4174  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4175  }
4176 }
4177 
4178 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
4179  std::vector<int>& tableIds) {
4180  cat_write_lock write_lock(this);
4182 
4183  // execute the SQL query
4184  try {
4185  for (size_t i = 0; i < names.size(); i++) {
4186  int tableId = tableIds[i];
4187  std::string& newTableName = names[i].second;
4188 
4189  sqliteConnector_.query_with_text_params(
4190  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4191  std::vector<std::string>{newTableName, std::to_string(tableId)});
4192  }
4193  } catch (std::exception& e) {
4194  sqliteConnector_.query("ROLLBACK TRANSACTION");
4195  throw;
4196  }
4197 
4198  // reset the table descriptors, give Calcite a kick
4199  for (size_t i = 0; i < names.size(); i++) {
4200  std::string& curTableName = names[i].first;
4201  std::string& newTableName = names[i].second;
4202 
4203  TableDescriptorMap::iterator tableDescIt =
4204  tableDescriptorMap_.find(to_upper(curTableName));
4205  CHECK(tableDescIt != tableDescriptorMap_.end());
4206  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4207 
4208  // Get table descriptor to change it
4209  TableDescriptor* changeTd = tableDescIt->second;
4210  changeTd->tableName = newTableName;
4211  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4212  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4213  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4214  }
4215 }
4216 
4217 // Collect an 'overlay' mapping of the tableNames->tableId
4218 // to account for possible chained renames
4219 // (for swap: a->b, b->c, c->d, d->a)
4220 
4222  std::map<std::string, int>& cachedTableMap,
4223  std::string& curTableName) {
4224  auto iter = cachedTableMap.find(curTableName);
4225  if ((iter != cachedTableMap.end())) {
4226  // get the cached tableId
4227  // and use that to lookup the TableDescriptor
4228  int tableId = (*iter).second;
4229  if (tableId == -1) {
4230  return NULL;
4231  } else {
4232  return cat->getMetadataForTable(tableId);
4233  }
4234  }
4235 
4236  // else ... lookup in standard location
4237  return cat->getMetadataForTable(curTableName);
4238 }
4239 
4240 void replaceTableName(std::map<std::string, int>& cachedTableMap,
4241  std::string& curTableName,
4242  std::string& newTableName,
4243  int tableId) {
4244  // mark old/cur name as deleted
4245  cachedTableMap[curTableName] = -1;
4246 
4247  // insert the 'new' name
4248  cachedTableMap[newTableName] = tableId;
4249 }
4250 
4251 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
4252  // tableId of all tables being renamed
4253  // ... in matching order to 'names'
4254  std::vector<int> tableIds;
4255 
4256  // (sorted & unique) list of tables ids for locking
4257  // (with names index of src in case of error)
4258  // <tableId, strIndex>
4259  // std::map is by definition/implementation sorted
4260  // std::map current usage below tests to avoid over-write
4261  std::map<int, size_t> uniqueOrderedTableIds;
4262 
4263  // mapping of modified tables names -> tableId
4264  std::map<std::string, int> cachedTableMap;
4265 
4266  // -------- Setup --------
4267 
4268  // gather tableIds pre-execute; build maps
4269  for (size_t i = 0; i < names.size(); i++) {
4270  std::string& curTableName = names[i].first;
4271  std::string& newTableName = names[i].second;
4272 
4273  // make sure the table being renamed exists,
4274  // or will exist when executed in 'name' order
4275  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
4276  CHECK(td);
4277 
4278  tableIds.push_back(td->tableId);
4279  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
4280  // don't overwrite as it should map to the first names index 'i'
4281  uniqueOrderedTableIds[td->tableId] = i;
4282  }
4283  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
4284  }
4285 
4286  CHECK_EQ(tableIds.size(), names.size());
4287 
4288  // The outer Stmt created a write lock before calling the catalog rename table
4289  // -> TODO: might want to sort out which really should set the lock :
4290  // the comment in the outer scope indicates it should be in here
4291  // but it's not clear if the access done there *requires* it out there
4292  //
4293  // Lock tables pre-execute (may/will be in different order than rename occurs)
4294  // const auto execute_write_lock = heavyai::unique_lock<heavyai::shared_mutex>(
4295  // *legacylockmgr::LockMgr<heavyai::shared_mutex, bool>::getMutex(
4296  // legacylockmgr::ExecutorOuterLock, true));
4297 
4298  // acquire the locks for all tables being renamed
4300  for (auto& idPair : uniqueOrderedTableIds) {
4301  std::string& tableName = names[idPair.second].first;
4302  tableLocks.emplace_back(
4305  *this, tableName, false)));
4306  }
4307 
4308  // -------- Rename --------
4309 
4310  {
4311  cat_write_lock write_lock(this);
4313 
4314  sqliteConnector_.query("BEGIN TRANSACTION");
4315 
4316  // collect all (tables + physical tables) into a single list
4317  std::vector<std::pair<std::string, std::string>> allNames;
4318  std::vector<int> allTableIds;
4319 
4320  for (size_t i = 0; i < names.size(); i++) {
4321  int tableId = tableIds[i];
4322  std::string& curTableName = names[i].first;
4323  std::string& newTableName = names[i].second;
4324 
4325  // rename all corresponding physical tables if this is a logical table
4326  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
4327  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4328  const auto physicalTables = physicalTableIt->second;
4329  CHECK(!physicalTables.empty());
4330  for (size_t k = 0; k < physicalTables.size(); k++) {
4331  int32_t physical_tb_id = physicalTables[k];
4332  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4333  CHECK(phys_td);
4334  std::string newPhysTableName =
4335  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
4336  allNames.emplace_back(phys_td->tableName, newPhysTableName);
4337  allTableIds.push_back(phys_td->tableId);
4338  }
4339  }
4340  allNames.emplace_back(curTableName, newTableName);
4341  allTableIds.push_back(tableId);
4342  }
4343  // rename all tables in one shot
4344  renamePhysicalTable(allNames, allTableIds);
4345 
4346  sqliteConnector_.query("END TRANSACTION");
4347  // cat write/sqlite locks are released when they go out scope
4348  }
4349  {
4350  // now update the SysCatalog
4351  for (size_t i = 0; i < names.size(); i++) {
4352  int tableId = tableIds[i];
4353  std::string& newTableName = names[i].second;
4354  {
4355  // update table name in direct and effective priv map
4356  DBObjectKey key;
4357  key.dbId = currentDB_.dbId;
4358  key.objectId = tableId;
4359  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4360 
4361  DBObject object(newTableName, TableDBObjectType);
4362  object.setObjectKey(key);
4363 
4364  auto objdescs = SysCatalog::instance().getMetadataForObject(
4365  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
4366  for (auto obj : objdescs) {
4367  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4368  if (grnt) {
4369  grnt->renameDbObject(object);
4370  }
4371  }
4372  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4373  }
4374  }
4375  }
4376 
4377  // -------- Cleanup --------
4378 
4379  // table locks are released when 'tableLocks' goes out of scope
4380 }
4381 
4382 void Catalog::renameColumn(const TableDescriptor* td,
4383  const ColumnDescriptor* cd,
4384  const string& newColumnName) {
4385  cat_write_lock write_lock(this);
4387  sqliteConnector_.query("BEGIN TRANSACTION");
4388  try {
4389  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4390  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4391  CHECK(cdx);
4392  std::string new_column_name = cdx->columnName;
4393  new_column_name.replace(0, cd->columnName.size(), newColumnName);
4394  sqliteConnector_.query_with_text_params(
4395  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4396  std::vector<std::string>{new_column_name,
4397  std::to_string(td->tableId),
4398  std::to_string(cdx->columnId)});
4399  }
4400  } catch (std::exception& e) {
4401  sqliteConnector_.query("ROLLBACK TRANSACTION");
4402  throw;
4403  }
4404  sqliteConnector_.query("END TRANSACTION");
4405  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4406  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4407  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4408  CHECK(cdx);
4409  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4410  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
4411  CHECK(columnDescIt != columnDescriptorMap_.end());
4412  ColumnDescriptor* changeCd = columnDescIt->second;
4413  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
4414  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
4415  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
4416  changeCd;
4417  }
4418  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4419 }
4420 
4421 int32_t Catalog::createDashboard(DashboardDescriptor& vd) {
4422  cat_write_lock write_lock(this);
4424  sqliteConnector_.query("BEGIN TRANSACTION");
4425  try {
4426  // TODO(andrew): this should be an upsert
4427  sqliteConnector_.query_with_text_params(
4428  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4429  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4430  if (sqliteConnector_.getNumRows() > 0) {
4431  sqliteConnector_.query_with_text_params(
4432  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4433  "update_time = "
4434  "datetime('now') where name = ? "
4435  "and userid = ?",
4436  std::vector<std::string>{vd.dashboardState,
4437  vd.imageHash,
4438  vd.dashboardMetadata,
4439  vd.dashboardName,
4440  std::to_string(vd.userId)});
4441  } else {
4442  sqliteConnector_.query_with_text_params(
4443  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4444  "update_time, "
4445  "userid) "
4446  "VALUES "
4447  "(?,?,?,?, "
4448  "datetime('now'), ?)",
4449  std::vector<std::string>{vd.dashboardName,
4450  vd.dashboardState,
4451  vd.imageHash,
4452  vd.dashboardMetadata,
4453  std::to_string(vd.userId)});
4454  }
4455  } catch (std::exception& e) {
4456  sqliteConnector_.query("ROLLBACK TRANSACTION");
4457  throw;
4458  }
4459  sqliteConnector_.query("END TRANSACTION");
4460 
4461  // now get the auto generated dashboardId
4462  try {
4463  sqliteConnector_.query_with_text_params(
4464  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4465  "WHERE name = ? and userid = ?",
4466  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4467  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
4468  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4469  } catch (std::exception& e) {
4470  throw;
4471  }
4473  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4474  addFrontendViewToMap(vd);
4475  sqlite_lock.unlock();
4476  write_lock.unlock();
4477  if (!isInfoSchemaDb()) {
4478  // NOTE(wamsi): Transactionally unsafe
4479  createOrUpdateDashboardSystemRole(
4481  }
4482  return vd.dashboardId;
4483 }
4484 
4485 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
4486  cat_write_lock write_lock(this);
4488 
4489  CHECK(sqliteConnector_.getSqlitePtr());
4490  sqliteConnector_.query("BEGIN TRANSACTION");
4491  try {
4492  sqliteConnector_.query_with_text_params(
4493  "SELECT id FROM mapd_dashboards WHERE id = ?",
4494  std::vector<std::string>{std::to_string(vd.dashboardId)});
4495  if (sqliteConnector_.getNumRows() > 0) {
4496  sqliteConnector_.query_with_text_params(
4497  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4498  "?, userid = ?, update_time = datetime('now') where id = ? ",
4499  std::vector<std::string>{vd.dashboardName,
4500  vd.dashboardState,
4501  vd.imageHash,
4502  vd.dashboardMetadata,
4503  std::to_string(vd.userId),
4505  } else {
4506  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4507  << " does not exist in db";
4508  throw runtime_error("Error replacing dashboard id " +
4509  std::to_string(vd.dashboardId) + " does not exist in db");
4510  }
4511  } catch (std::exception& e) {
4512  sqliteConnector_.query("ROLLBACK TRANSACTION");
4513  throw;
4514  }
4515  sqliteConnector_.query("END TRANSACTION");
4516 
4517  bool found{false};
4518  for (auto descp : dashboardDescriptorMap_) {
4519  auto dash = descp.second.get();
4520  if (dash->dashboardId == vd.dashboardId) {
4521  found = true;
4522  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
4523  dash->dashboardName);
4524  if (viewDescIt ==
4525  dashboardDescriptorMap_.end()) { // check to make sure view exists
4526  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4527  << " dashboard " << dash->dashboardName << " does not exist in map";
4528  throw runtime_error("No metadata for dashboard for user " +
4529  std::to_string(dash->userId) + " dashboard " +
4530  dash->dashboardName + " does not exist in map");
4531  }
4532  dashboardDescriptorMap_.erase(viewDescIt);
4533  break;
4534  }
4535  }
4536  if (!found) {
4537  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4538  << " does not exist in map";
4539  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4540  " does not exist in map");
4541  }
4542 
4543  // now reload the object
4544  sqliteConnector_.query_with_text_params(
4545  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4546  "mapd_dashboards "
4547  "WHERE id = ?",
4548  std::vector<std::string>{std::to_string(vd.dashboardId)});
4549  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4551  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4552  addFrontendViewToMapNoLock(vd);
4553  sqlite_lock.unlock();
4554  write_lock.unlock();
4555  if (!isInfoSchemaDb()) {
4556  // NOTE(wamsi): Transactionally unsafe
4557  createOrUpdateDashboardSystemRole(
4559  }
4560 }
4561 
4562 std::string Catalog::calculateSHA1(const std::string& data) {
4563  boost::uuids::detail::sha1 sha1;
4564  unsigned int digest[5];
4565  sha1.process_bytes(data.c_str(), data.length());
4566  sha1.get_digest(digest);
4567  std::stringstream ss;
4568  for (size_t i = 0; i < 5; i++) {
4569  ss << std::hex << digest[i];
4570  }
4571  return ss.str();
4572 }
4573 
4574 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4575  cat_write_lock write_lock(this);
4577  sqliteConnector_.query("BEGIN TRANSACTION");
4578  try {
4579  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4580  .substr(0, 8);
4581  sqliteConnector_.query_with_text_params(
4582  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4583  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4584  if (sqliteConnector_.getNumRows() > 0) {
4585  sqliteConnector_.query_with_text_params(
4586  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4587  "link = ?",
4588  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4589  } else {
4590  sqliteConnector_.query_with_text_params(
4591  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4592  "update_time) VALUES (?,?,?,?, datetime('now'))",
4593  std::vector<std::string>{
4594  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4595  }
4596  // now get the auto generated dashid
4597  sqliteConnector_.query_with_text_param(
4598  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4599  "WHERE link = ?",
4600  ld.link);
4601  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4602  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4603  } catch (std::exception& e) {
4604  sqliteConnector_.query("ROLLBACK TRANSACTION");
4605  throw;
4606  }
4607  sqliteConnector_.query("END TRANSACTION");
4608  addLinkToMap(ld);
4609  return ld.link;
4610 }
4611 
4612 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4613  const TableDescriptor* td) const {
4614  cat_read_lock read_lock(this);
4615 
4616  const auto column_descriptors =
4617  getAllColumnMetadataForTable(td->tableId, false, true, true);
4618 
4619  const ColumnDescriptor* shard_cd{nullptr};
4620  int i = 1;
4621  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4622  ++cd_itr, ++i) {
4623  if (i == td->shardedColumnId) {
4624  shard_cd = *cd_itr;
4625  }
4626  }
4627  return shard_cd;
4628 }
4629 
4630 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4631  const TableDescriptor* logical_table_desc,
4632  bool populate_fragmenter) const {
4633  cat_read_lock read_lock(this);
4634  const auto physicalTableIt =
4635  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4636  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4637  return {logical_table_desc};
4638  }
4639  const auto physicalTablesIds = physicalTableIt->second;
4640  CHECK(!physicalTablesIds.empty());
4641  read_lock.unlock();
4642  std::vector<const TableDescriptor*> physicalTables;
4643  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4644  physicalTables.push_back(
4645  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4646  }
4647  return physicalTables;
4648 }
4649 
4650 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4651  const {
4652  cat_read_lock read_lock(this);
4653  std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4654  table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4655  for (const auto [table_id, td] : tableDescriptorMapById_) {
4656  // Only include ids for physical persisted tables
4657  if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4658  logicalToPhysicalTableMapById_.find(table_id) ==
4659  logicalToPhysicalTableMapById_.end()) {
4660  table_and_shard_ids.emplace_back(table_id, td->shard);
4661  }
4662  }
4663  return table_and_shard_ids;
4664 }
4665 
4666 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4667  cat_read_lock read_lock(this);
4668 
4669  std::map<int, const ColumnDescriptor*> mapping;
4670 
4671  const auto tables = getAllTableMetadata();
4672  for (const auto td : tables) {
4673  if (td->shard >= 0) {
4674  // skip shards, they're not standalone tables
4675  continue;
4676  }
4677 
4678  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4679  const auto& ti = cd->columnType;
4680  if (ti.is_string()) {
4681  if (ti.get_compression() == kENCODING_DICT) {
4682  // if foreign reference, get referenced tab.col
4683  const auto dict_id = ti.get_comp_param();
4684 
4685  // ignore temp (negative) dictionaries
4686  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4687  mapping[dict_id] = cd;
4688  }
4689  }
4690  }
4691  }
4692  }
4693 
4694  return mapping;
4695 }
4696 
4697 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4698  const UserMetadata& user_metadata,
4699  const GetTablesType get_tables_type) const {
4700  if (td->shard >= 0) {
4701  // skip shards, they're not standalone tables
4702  return false;
4703  }
4704  switch (get_tables_type) {
4705  case GET_PHYSICAL_TABLES: {
4706  if (td->isView) {
4707  return false;
4708  }
4709  break;
4710  }
4711  case GET_VIEWS: {
4712  if (!td->isView) {
4713  return false;
4714  }
4715  break;
4716  }
4717  default:
4718  break;
4719  }
4721  dbObject.loadKey(*this);
4722  std::vector<DBObject> privObjects = {dbObject};
4723  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4724  // skip table, as there are no privileges to access it
4725  return false;
4726  }
4727  return true;
4728 }
4729 
4730 std::vector<std::string> Catalog::getTableNamesForUser(
4731  const UserMetadata& user_metadata,
4732  const GetTablesType get_tables_type) const {
4733  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4734  cat_read_lock read_lock(this);
4735  std::vector<std::string> table_names;
4736  const auto tables = getAllTableMetadata();
4737  for (const auto td : tables) {
4738  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4739  table_names.push_back(td->tableName);
4740  }
4741  }
4742  return table_names;
4743 }
4744 
4745 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4746  const UserMetadata& user_metadata,
4747  const GetTablesType get_tables_type,
4748  const std::string& filter_table_name) const {
4749  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4750  cat_read_lock read_lock(this);
4751 
4752  std::vector<TableMetadata> tables_metadata;
4753  const auto tables = getAllTableMetadata();
4754  for (const auto td : tables) {
4755  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4756  if (!filter_table_name.empty()) {
4757  if (td->tableName != filter_table_name) {
4758  continue;
4759  }
4760  }
4761  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4762  // descriptor outside catalog lock
4763  tables_metadata.emplace_back(table_metadata);
4764  }
4765  }
4766  return tables_metadata;
4767 }
4768 
4769 int Catalog::getLogicalTableId(const int physicalTableId) const {
4770  cat_read_lock read_lock(this);
4771  for (const auto& l : logicalToPhysicalTableMapById_) {
4772  if (l.second.end() != std::find_if(l.second.begin(),
4773  l.second.end(),
4774  [&](decltype(*l.second.begin()) tid) -> bool {
4775  return physicalTableId == tid;
4776  })) {
4777  return l.first;
4778  }
4779  }
4780  return physicalTableId;
4781 }
4782 
4783 void Catalog::checkpoint(const int logicalTableId) const {
4784  const auto td = getMetadataForTable(logicalTableId);
4785  const auto shards = getPhysicalTablesDescriptors(td);
4786  for (const auto shard : shards) {
4787  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4788  }
4789 }
4790 
4791 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4792  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4793  try {
4794  checkpoint(logical_table_id);
4795  } catch (...) {
4796  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4797  throw;
4798  }
4799 }
4800 
4801 void Catalog::resetTableEpochFloor(const int logicalTableId) const {
4802  cat_read_lock read_lock(this);
4803  const auto td = getMetadataForTable(logicalTableId, false);
4804  const auto shards = getPhysicalTablesDescriptors(td, false);
4805  for (const auto shard : shards) {
4806  getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
4807  }
4808 }
4809 
4810 void Catalog::eraseDbMetadata() {
4811  const auto tables = getAllTableMetadata();
4812  for (const auto table : tables) {
4813  eraseTableMetadata(table);
4814  }
4815  // Physically erase database metadata
4816  boost::filesystem::remove(basePath_ + "/" + shared::kCatalogDirectoryName + "/" +
4817  currentDB_.dbName);
4818  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4819 }
4820 
4821 void Catalog::eraseDbPhysicalData() {
4822  const auto tables = getAllTableMetadata();
4823  for (const auto table : tables) {
4824  eraseTablePhysicalData(table);
4825  }
4826 }
4827 
4828 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
4829  const int tableId = td->tableId;
4830  // must destroy fragmenter before deleteChunks is called.
4831  removeFragmenterForTable(tableId);
4832 
4833  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4834  {
4835  INJECT_TIMER(deleteChunksWithPrefix);
4836  // assuming deleteChunksWithPrefix is atomic
4837  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4838  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4839  }
4840  if (!td->isView) {
4841  INJECT_TIMER(Remove_Table);
4842  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4843  }
4844 }
4845 
4846 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4847  const int32_t& shardNumber) {
4848  std::string physicalTableName =
4849  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4850  return (physicalTableName);
4851 }
4852 
4853 void Catalog::buildForeignServerMapUnlocked() {
4855  sqliteConnector_.query(
4856  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4857  "omnisci_foreign_servers");
4858  auto num_rows = sqliteConnector_.getNumRows();
4859 
4860  for (size_t row = 0; row < num_rows; row++) {
4861  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4862  sqliteConnector_.getData<int>(row, 0),
4863  sqliteConnector_.getData<std::string>(row, 1),
4864  sqliteConnector_.getData<std::string>(row, 2),
4865  sqliteConnector_.getData<std::string>(row, 3),
4866  sqliteConnector_.getData<std::int32_t>(row, 4),
4867  sqliteConnector_.getData<std::int32_t>(row, 5));
4868  foreignServerMap_[foreign_server->name] = foreign_server;
4869  foreignServerMapById_[foreign_server->id] = foreign_server;
4870  }
4871 }
4872 
4873 void Catalog::updateForeignTablesInMapUnlocked() {
4875  sqliteConnector_.query(
4876  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4877  "omnisci_foreign_tables");
4878  auto num_rows = sqliteConnector_.getNumRows();
4879  for (size_t r = 0; r < num_rows; r++) {
4880  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4881  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4882  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4883  const auto last_refresh_time = sqliteConnector_.getData<int64_t>(r, 3);
4884  const auto next_refresh_time = sqliteConnector_.getData<int64_t>(r, 4);
4885 
4886  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4887  auto foreign_table =
4888  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4889  CHECK(foreign_table);
4890  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4891  CHECK(foreign_table->foreign_server);
4892  foreign_table->populateOptionsMap(options);
4893  foreign_table->last_refresh_time = last_refresh_time;
4894  foreign_table->next_refresh_time = next_refresh_time;
4895  if (foreign_table->is_system_table) {
4896  foreign_table->is_in_memory_system_table =
4898  foreign_table->foreign_server->data_wrapper_type);
4899  }
4900  }
4901 }
4902 
4903 void Catalog::setForeignServerProperty(const std::string& server_name,
4904  const std::string& property,
4905  const std::string& value) {
4907  sqliteConnector_.query_with_text_params(
4908  "SELECT id from omnisci_foreign_servers where name = ?",
4909  std::vector<std::string>{server_name});
4910  auto num_rows = sqliteConnector_.getNumRows();
4911  if (num_rows > 0) {
4912  CHECK_EQ(size_t(1), num_rows);
4913  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4914  sqliteConnector_.query_with_text_params(
4915  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4916  std::vector<std::string>{value, std::to_string(server_id)});
4917  } else {
4918  throw std::runtime_error{"Can not change property \"" + property +
4919  "\" for foreign server." + " Foreign server \"" +
4920  server_name + "\" is not found."};
4921  }
4922 }
4923 
4924 void Catalog::createDefaultServersIfNotExists() {
4929 
4930  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
4931  "default_local_delimited",
4933  options,
4935  local_csv_server->validate();
4936  createForeignServerNoLocks(std::move(local_csv_server), true);
4937 
4938 #ifdef ENABLE_IMPORT_PARQUET
4939  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
4940  "default_local_parquet",
4942  options,
4944  local_parquet_server->validate();
4945  createForeignServerNoLocks(std::move(local_parquet_server), true);
4946 #endif
4947 
4948  auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
4949  "default_local_regex_parsed",
4951  options,
4953  local_regex_parser_server->validate();
4954  createForeignServerNoLocks(std::move(local_regex_parser_server), true);
4955 }
4956 
4957 // prepare a fresh file reload on next table access
4958 void Catalog::setForReload(const int32_t tableId) {
4959  const auto td = getMetadataForTable(tableId);
4960  for (const auto shard : getPhysicalTablesDescriptors(td)) {
4961  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
4962  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
4963  }
4964 }
4965 
4966 // get a table's data dirs
4967 std::vector<std::string> Catalog::getTableDataDirectories(
4968  const TableDescriptor* td) const {
4969  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
4970  std::vector<std::string> file_paths;
4971  for (auto shard : getPhysicalTablesDescriptors(td)) {
4972  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
4973  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
4974  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
4975  file_paths.push_back(file_path.filename().string());
4976  }
4977  return file_paths;
4978 }
4979 
4980 // get a column's dict dir basename
4981 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd,
4982  bool file_name_only) const {
4983  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
4985  cd->columnType.get_comp_param() > 0) {
4986  const auto dictId = cd->columnType.get_comp_param();
4987  const DictRef dictRef(currentDB_.dbId, dictId);
4988  const auto dit = dictDescriptorMapByRef_.find(dictRef);
4989  CHECK(dit != dictDescriptorMapByRef_.end());
4990  CHECK(dit->second);
4991  if (file_name_only) {
4992  boost::filesystem::path file_path(dit->second->dictFolderPath);
4993  return file_path.filename().string();
4994  } else {
4995  return dit->second->dictFolderPath;
4996  }
4997  }
4998  return std::string();
4999 }
5000 
5001 // get a table's dict dirs
5002 std::vector<std::string> Catalog::getTableDictDirectories(
5003  const TableDescriptor* td) const {
5004  std::vector<std::string> file_paths;
5005  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
5006  auto file_base = getColumnDictDirectory(cd);
5007  if (!file_base.empty() &&
5008  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
5009  file_paths.push_back(file_base);
5010  }
5011  }
5012  return file_paths;
5013 }
5014 
5015 std::set<std::string> Catalog::getTableDictDirectoryPaths(int32_t table_id) const {
5016  cat_read_lock read_lock(this);
5017  std::set<std::string> directory_paths;
5018  auto it = dict_columns_by_table_id_.find(table_id);
5019  if (it != dict_columns_by_table_id_.end()) {
5020  for (auto cd : it->second) {
5021  auto directory_path = getColumnDictDirectory(cd, false);
5022  if (!directory_path.empty()) {
5023  directory_paths.emplace(directory_path);
5024  }
5025  }
5026  }
5027  return directory_paths;
5028 }
5029 
5030 // returns table schema in a string
5031 // NOTE(sy): Might be able to replace dumpSchema() later with
5032 // dumpCreateTable() after a deeper review of the TableArchiver code.
5033 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
5034  CHECK(!td->is_system_table);
5035  cat_read_lock read_lock(this);
5036 
5037  std::ostringstream os;
5038  os << "CREATE TABLE @T (";
5039  // gather column defines
5040  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
5041  std::string comma;
5042  std::vector<std::string> shared_dicts;
5043  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5044  for (const auto cd : cds) {
5045  if (!(cd->isSystemCol || cd->isVirtualCol)) {
5046  const auto& ti = cd->columnType;
5047  os << comma << cd->columnName;
5048  // CHAR is perculiar... better dump it as TEXT(32) like \d does
5049  if (ti.get_type() == SQLTypes::kCHAR) {
5050  os << " "
5051  << "TEXT";
5052  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
5053  os << " "
5054  << "TEXT[]";
5055  } else {
5056  os << " " << ti.get_type_name();
5057  }
5058  os << (ti.get_notnull() ? " NOT NULL" : "");
5059  if (cd->default_value.has_value()) {
5060  os << " DEFAULT " << cd->getDefaultValueLiteral();
5061  }
5062  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
5063  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5064  if (ti.get_compression() == kENCODING_DICT) {
5065  // if foreign reference, get referenced tab.col
5066  const auto dict_id = ti.get_comp_param();
5067  const DictRef dict_ref(currentDB_.dbId, dict_id);
5068  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
5069  CHECK(dict_it != dictDescriptorMapByRef_.end());
5070  const auto dict_name = dict_it->second->dictName;
5071  // when migrating a table, any foreign dict ref will be dropped
5072  // and the first cd of a dict will become root of the dict
5073  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
5074  dict_root_cds[dict_name] = cd;
5075  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
5076  } else {
5077  const auto dict_root_cd = dict_root_cds[dict_name];
5078  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
5079  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
5080  // "... shouldn't specify an encoding, it borrows from the referenced
5081  // column"
5082  }
5083  } else {
5084  os << " ENCODING NONE";
5085  }
5086  } else if (ti.is_date_in_days() ||
5087  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5088  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5089  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
5090  } else if (ti.is_geometry()) {
5091  if (ti.get_compression() == kENCODING_GEOINT) {
5092  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
5093  << ")";
5094  } else {
5095  os << " ENCODING NONE";
5096  }
5097  }
5098  comma = ", ";
5099  }
5100  }
5101  // gather SHARED DICTIONARYs
5102  if (shared_dicts.size()) {
5103  os << ", " << boost::algorithm::join(shared_dicts, ", ");
5104  }
5105  // gather WITH options ...
5106  std::vector<std::string> with_options;
5107  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
5108  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
5109  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
5110  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
5111  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
5112  : "VACUUM='IMMEDIATE'");
5113  if (!td->partitions.empty()) {
5114  with_options.push_back("PARTITIONS='" + td->partitions + "'");
5115  }
5116  if (td->nShards > 0) {
5117  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
5118  CHECK(shard_cd);
5119  os << ", SHARD KEY(" << shard_cd->columnName << ")";
5120  with_options.push_back(
5121  "SHARD_COUNT=" +
5122  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
5123  }
5124  if (td->sortedColumnId > 0) {
5125  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
5126  CHECK(sort_cd);
5127  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
5128  }
5130  td->maxRollbackEpochs != -1) {
5131  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
5133  }
5134  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
5135  return os.str();
5136 }
5137 
5138 #include "Parser/ReservedKeywords.h"
5139 
5141 inline bool contains_spaces(std::string_view str) {
5142  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
5143  return std::isspace(ch);
5144  }) != str.end();
5145 }
5146 
5149  std::string_view str,
5150  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
5151  return str.find_first_of(chars) != std::string_view::npos;
5152 }
5153 
5155 inline bool is_reserved_sql_keyword(std::string_view str) {
5156  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
5157 }
5158 
5159 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
5160 std::string Catalog::dumpCreateTable(const TableDescriptor* td,
5161  bool multiline_formatting,
5162  bool dump_defaults) const {
5163  cat_read_lock read_lock(this);
5164  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5165 }
5166 
5167 std::optional<std::string> Catalog::dumpCreateTable(int32_t table_id,
5168  bool multiline_formatting,
5169  bool dump_defaults) const {
5170  cat_read_lock read_lock(this);
5171  const auto td = getMutableMetadataForTableUnlocked(table_id);
5172  if (!td) {
5173  return {};
5174  }
5175  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5176 }
5177 
5178 std::string Catalog::dumpCreateTableUnlocked(const TableDescriptor* td,
5179  bool multiline_formatting,
5180  bool dump_defaults) const {
5181  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
5182  std::ostringstream os;
5183 
5184  if (foreign_table && !td->is_system_table) {
5185  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
5186  } else if (!td->isView) {
5187  os << "CREATE ";
5189  os << "TEMPORARY ";
5190  }
5191  os << "TABLE " + td->tableName + " (";
5192  } else {
5193  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
5194  return os.str();
5195  }
5196  // scan column defines
5197  std::vector<std::string> additional_info;
5198  std::set<std::string> shared_dict_column_names;
5199 
5200  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
5201 
5202  // gather column defines
5203  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
5204  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5205  bool first = true;
5206  for (const auto cd : cds) {
5207  if (!(cd->isSystemCol || cd->isVirtualCol)) {
5208  const auto& ti = cd->columnType;
5209  if (!first) {
5210  os << ",";
5211  if (!multiline_formatting) {
5212  os << " ";
5213  }
5214  } else {
5215  first = false;
5216  }
5217  if (multiline_formatting) {
5218  os << "\n ";
5219  }
5220  // column name
5221  os << quoteIfRequired(cd->columnName);
5222  // CHAR is perculiar... better dump it as TEXT(32) like \d does
5223  if (ti.get_type() == SQLTypes::kCHAR) {
5224  os << " "
5225  << "TEXT";
5226  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
5227  os << " "
5228  << "TEXT[]";
5229  } else {
5230  os << " " << ti.get_type_name();
5231  }
5232  os << (ti.get_notnull() ? " NOT NULL" : "");
5233  if (cd->default_value.has_value()) {
5234  os << " DEFAULT " << cd->getDefaultValueLiteral();
5235  }
5236  if (shared_dict_column_names.find(cd->columnName) ==
5237  shared_dict_column_names.end()) {
5238  // avoids "Column ... shouldn't specify an encoding, it borrows it
5239  // from the referenced column"
5240  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
5241  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5242  if (ti.get_compression() == kENCODING_DICT) {
5243  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
5244  } else {
5245  os << " ENCODING NONE";
5246  }
5247  } else if (ti.is_date_in_days() ||
5248  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5249  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5250  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
5251  } else if (ti.is_geometry()) {
5252  if (ti.get_compression() == kENCODING_GEOINT) {
5253  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
5254  << ")";
5255  } else {
5256  os << " ENCODING NONE";
5257  }
5258  }
5259  }
5260  }
5261  }
5262  // gather SHARED DICTIONARYs
5263  if (additional_info.size()) {
5264  std::string comma;
5265  if (!multiline_formatting) {
5266  comma = ", ";
5267  } else {
5268  comma = ",\n ";
5269  }
5270  os << comma;
5271  os << boost::algorithm::join(additional_info, comma);
5272  }
5273  os << ")";
5274 
5275  std::vector<std::string> with_options;
5276  if (foreign_table && !td->is_system_table) {
5277  if (multiline_formatting) {
5278  os << "\n";
5279  } else {
5280  os << " ";
5281  }
5282  os << "SERVER " << foreign_table->foreign_server->name;
5283 
5284  // gather WITH options ...
5285  for (const auto& [option, value] : foreign_table->options) {
5286  with_options.emplace_back(option + "='" + value + "'");
5287  }
5288  }
5289 
5290  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
5291  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
5292  }
5293  if (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE) {
5294  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
5295  }
5296  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
5297  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
5298  }
5299  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
5300  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
5301  }
5302  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
5303  td->maxRollbackEpochs != -1) {
5304  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
5306  }
5307  if (!foreign_table && (dump_defaults || !td->hasDeletedCol)) {
5308  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
5309  : "VACUUM='IMMEDIATE'");
5310  }
5311  if (!foreign_table && !td->partitions.empty()) {
5312  with_options.push_back("PARTITIONS='" + td->partitions + "'");
5313  }
5314  if (!foreign_table && td->nShards > 0) {
5315  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
5316  CHECK(shard_cd);
5317  with_options.push_back(
5318  "SHARD_COUNT=" +
5319  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
5320  }
5321  if (!foreign_table && td->sortedColumnId > 0) {
5322  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
5323  CHECK(sort_cd);
5324  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
5325  }
5326 
5327  if (!with_options.empty()) {
5328  if (!multiline_formatting) {
5329  os << " ";
5330  } else {
5331  os << "\n";
5332  }
5333  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
5334  }
5335  os << ";";
5336  return os.str();
5337 }
5338 
5339 std::string Catalog::dumpCreateServer(const std::string& name,
5340  bool multiline_formatting) const {
5341  cat_read_lock read_lock(this);
5342  auto server_it = foreignServerMap_.find(name);
5343  if (server_it == foreignServerMap_.end()) {
5344  throw std::runtime_error("Foreign server " + name + " does not exist.");
5345  }
5346  auto server = server_it->second.get();
5347  std::ostringstream os;
5348  os << "CREATE SERVER " << name << " FOREIGN DATA WRAPPER " << server->data_wrapper_type;
5349  std::vector<std::string> with_options;
5350  for (const auto& [option, value] : server->options) {
5351  with_options.emplace_back(option + "='" + value + "'");
5352  }
5353  if (!with_options.empty()) {
5354  if (!multiline_formatting) {
5355  os << " ";
5356  } else {
5357  os << "\n";
5358  }
5359  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
5360  }
5361  os << ";";
5362  return os.str();
5363 }
5364 
5365 bool Catalog::validateNonExistentTableOrView(const std::string& name,
5366  const bool if_not_exists) {
5367  if (getMetadataForTable(name, false)) {
5368  if (if_not_exists) {
5369  return false;
5370  }
5371  throw std::runtime_error("Table or View with name \"" + name + "\" already exists.");
5372  }
5373  return true;
5374 }
5375 
5376 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh() const {
5377  cat_read_lock read_lock(this);
5378  std::vector<const TableDescriptor*> tables;
5379