OmniSciDB  cde582ebc3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Catalog.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 #include "Catalog/Catalog.h"
24 
25 #include <algorithm>
26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/range/adaptor/map.hpp>
29 #include <boost/version.hpp>
30 #include <cassert>
31 #include <cerrno>
32 #include <cstdio>
33 #include <cstring>
34 #include <exception>
35 #include <fstream>
36 #include <list>
37 #include <memory>
38 #include <random>
39 #include <regex>
40 #include <sstream>
41 
42 #if BOOST_VERSION >= 106600
43 #include <boost/uuid/detail/sha1.hpp>
44 #else
45 #include <boost/uuid/sha1.hpp>
46 #endif
47 #include <rapidjson/document.h>
48 #include <rapidjson/istreamwrapper.h>
49 #include <rapidjson/ostreamwrapper.h>
50 #include <rapidjson/writer.h>
51 
52 #include "Catalog/SysCatalog.h"
53 
54 #include "QueryEngine/Execute.h"
56 
63 #include "Fragmenter/Fragmenter.h"
65 #include "LockMgr/LockMgr.h"
68 #include "Parser/ParserNode.h"
69 #include "QueryEngine/Execute.h"
71 #include "RefreshTimeCalculator.h"
72 #include "Shared/DateTimeParser.h"
73 #include "Shared/File.h"
74 #include "Shared/StringTransform.h"
75 #include "Shared/SysDefinitions.h"
76 #include "Shared/measure.h"
77 #include "Shared/misc.h"
79 
80 #include "MapDRelease.h"
81 #include "RWLocks.h"
83 
84 #include "Shared/distributed.h"
85 
86 using Chunk_NS::Chunk;
89 using std::list;
90 using std::map;
91 using std::pair;
92 using std::runtime_error;
93 using std::string;
94 using std::vector;
95 
96 bool g_enable_fsi{true};
97 bool g_enable_s3_fsi{false};
101 // 10 minutes refresh interval by default
103 extern bool g_cache_string_hash;
104 extern bool g_enable_system_tables;
105 
106 // Serialize temp tables to a json file in the Catalogs directory for Calcite parsing
107 // under unit testing.
109 
110 namespace Catalog_Namespace {
111 
112 const int DEFAULT_INITIAL_VERSION = 1; // start at version 1
114  1073741824; // 2^30, give room for over a billion non-temp tables
116  1073741824; // 2^30, give room for over a billion non-temp dictionaries
117 
118 const std::string Catalog::physicalTableNameTag_("_shard_#");
119 
120 thread_local bool Catalog::thread_holds_read_lock = false;
121 
126 
127 // migration will be done as two step process this release
128 // will create and use new table
129 // next release will remove old table, doing this to have fall back path
130 // incase of migration failure
133  sqliteConnector_.query("BEGIN TRANSACTION");
134  try {
136  "SELECT name FROM sqlite_master WHERE type='table' AND name='mapd_dashboards'");
137  if (sqliteConnector_.getNumRows() != 0) {
138  // already done
139  sqliteConnector_.query("END TRANSACTION");
140  return;
141  }
143  "CREATE TABLE mapd_dashboards (id integer primary key autoincrement, name text , "
144  "userid integer references mapd_users, state text, image_hash text, update_time "
145  "timestamp, "
146  "metadata text, UNIQUE(userid, name) )");
147  // now copy content from old table to new table
149  "insert into mapd_dashboards (id, name , "
150  "userid, state, image_hash, update_time , "
151  "metadata) "
152  "SELECT viewid , name , userid, view_state, image_hash, update_time, "
153  "view_metadata "
154  "from mapd_frontend_views");
155  } catch (const std::exception& e) {
156  sqliteConnector_.query("ROLLBACK TRANSACTION");
157  throw;
158  }
159  sqliteConnector_.query("END TRANSACTION");
160 }
161 
162 namespace {
163 
164 inline auto table_json_filepath(const std::string& base_path,
165  const std::string& db_name) {
166  return boost::filesystem::path(base_path + "/" + shared::kCatalogDirectoryName + "/" +
167  db_name + "_temp_tables.json");
168 }
169 
170 std::map<int32_t, std::string> get_user_id_to_user_name_map();
171 } // namespace
172 
173 Catalog::Catalog(const string& basePath,
174  const DBMetadata& curDB,
175  std::shared_ptr<Data_Namespace::DataMgr> dataMgr,
176  const std::vector<LeafHostInfo>& string_dict_hosts,
177  std::shared_ptr<Calcite> calcite,
178  bool is_new_db)
179  : basePath_(basePath)
180  , sqliteConnector_(curDB.dbName, basePath + "/" + shared::kCatalogDirectoryName + "/")
181  , currentDB_(curDB)
182  , dataMgr_(dataMgr)
183  , string_dict_hosts_(string_dict_hosts)
184  , calciteMgr_(calcite)
185  , nextTempTableId_(MAPD_TEMP_TABLE_START_ID)
186  , nextTempDictId_(MAPD_TEMP_DICT_START_ID)
187  , dcatalogMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
188  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
189  shared::kCatalogDirectoryName / (currentDB_.dbName + ".lockfile"),
190  [this](size_t) {
191  if (!initialized_) {
192  return;
193  }
194  const auto user_name_by_user_id = get_user_id_to_user_name_map();
196  *dsqliteMutex_);
197  reloadCatalogMetadataUnlocked(user_name_by_user_id);
198  }))
199  , dsqliteMutex_(std::make_unique<heavyai::DistributedSharedMutex>(
200  std::filesystem::path(basePath_) / shared::kLockfilesDirectoryName /
201  shared::kCatalogDirectoryName / (currentDB_.dbName + ".sqlite.lockfile")))
202  , sqliteMutex_()
203  , sharedMutex_()
206  if (!g_enable_fsi) {
207  CHECK(!g_enable_system_tables) << "System tables require FSI to be enabled";
208  CHECK(!g_enable_s3_fsi) << "S3 FSI requires FSI to be enabled";
209  }
210 
211  if (!is_new_db && !g_multi_instance) {
212  CheckAndExecuteMigrations();
213  }
214 
215  buildMaps();
216 
217  if (g_enable_fsi) {
218  createDefaultServersIfNotExists();
219  }
220  if (!is_new_db) {
221  CheckAndExecuteMigrationsPostBuildMaps();
222  }
224  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
225  }
226  conditionallyInitializeSystemObjects();
227  // once all initialized use real object
228  initialized_ = true;
229 }
230 
232  // cat_write_lock write_lock(this);
233 
234  // must clean up heap-allocated TableDescriptor and ColumnDescriptor structs
235  for (TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.begin();
236  tableDescIt != tableDescriptorMap_.end();
237  ++tableDescIt) {
238  tableDescIt->second->fragmenter = nullptr;
239  delete tableDescIt->second;
240  }
241 
242  // TableDescriptorMapById points to the same descriptors. No need to delete
243 
244  for (ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.begin();
245  columnDescIt != columnDescriptorMap_.end();
246  ++columnDescIt) {
247  delete columnDescIt->second;
248  }
249 
250  // ColumnDescriptorMapById points to the same descriptors. No need to delete
251 
253  boost::filesystem::remove(table_json_filepath(basePath_, currentDB_.dbName));
254  }
255 }
256 
259  sqliteConnector_.query("BEGIN TRANSACTION");
260  try {
261  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
262  std::vector<std::string> cols;
263  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
264  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
265  }
266  if (std::find(cols.begin(), cols.end(), std::string("max_chunk_size")) ==
267  cols.end()) {
268  string queryString("ALTER TABLE mapd_tables ADD max_chunk_size BIGINT DEFAULT " +
270  sqliteConnector_.query(queryString);
271  }
272  if (std::find(cols.begin(), cols.end(), std::string("shard_column_id")) ==
273  cols.end()) {
274  string queryString("ALTER TABLE mapd_tables ADD shard_column_id BIGINT DEFAULT " +
275  std::to_string(0));
276  sqliteConnector_.query(queryString);
277  }
278  if (std::find(cols.begin(), cols.end(), std::string("shard")) == cols.end()) {
279  string queryString("ALTER TABLE mapd_tables ADD shard BIGINT DEFAULT " +
280  std::to_string(-1));
281  sqliteConnector_.query(queryString);
282  }
283  if (std::find(cols.begin(), cols.end(), std::string("num_shards")) == cols.end()) {
284  string queryString("ALTER TABLE mapd_tables ADD num_shards BIGINT DEFAULT " +
285  std::to_string(0));
286  sqliteConnector_.query(queryString);
287  }
288  if (std::find(cols.begin(), cols.end(), std::string("key_metainfo")) == cols.end()) {
289  string queryString("ALTER TABLE mapd_tables ADD key_metainfo TEXT DEFAULT '[]'");
290  sqliteConnector_.query(queryString);
291  }
292  if (std::find(cols.begin(), cols.end(), std::string("userid")) == cols.end()) {
293  string queryString("ALTER TABLE mapd_tables ADD userid integer DEFAULT " +
295  sqliteConnector_.query(queryString);
296  }
297  if (std::find(cols.begin(), cols.end(), std::string("sort_column_id")) ==
298  cols.end()) {
300  "ALTER TABLE mapd_tables ADD sort_column_id INTEGER DEFAULT 0");
301  }
302  if (std::find(cols.begin(), cols.end(), std::string("storage_type")) == cols.end()) {
303  string queryString("ALTER TABLE mapd_tables ADD storage_type TEXT DEFAULT ''");
304  sqliteConnector_.query(queryString);
305  }
306  if (std::find(cols.begin(), cols.end(), std::string("max_rollback_epochs")) ==
307  cols.end()) {
308  string queryString("ALTER TABLE mapd_tables ADD max_rollback_epochs INT DEFAULT " +
309  std::to_string(-1));
310  sqliteConnector_.query(queryString);
311  }
312  if (std::find(cols.begin(), cols.end(), std::string("is_system_table")) ==
313  cols.end()) {
314  string queryString("ALTER TABLE mapd_tables ADD is_system_table BOOLEAN DEFAULT 0");
315  sqliteConnector_.query(queryString);
316  }
317  } catch (std::exception& e) {
318  sqliteConnector_.query("ROLLBACK TRANSACTION");
319  throw;
320  }
321  sqliteConnector_.query("END TRANSACTION");
322 }
323 
326  sqliteConnector_.query("BEGIN TRANSACTION");
327  try {
329  "select name from sqlite_master WHERE type='table' AND "
330  "name='mapd_version_history'");
331  if (sqliteConnector_.getNumRows() == 0) {
333  "CREATE TABLE mapd_version_history(version integer, migration_history text "
334  "unique)");
335  } else {
337  "select * from mapd_version_history where migration_history = "
338  "'notnull_fixlen_arrays'");
339  if (sqliteConnector_.getNumRows() != 0) {
340  // legacy fixlen arrays had migrated
341  // no need for further execution
342  sqliteConnector_.query("END TRANSACTION");
343  return;
344  }
345  }
346  // Insert check for migration
348  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
349  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_fixlen_arrays"});
350  LOG(INFO) << "Updating mapd_columns, legacy fixlen arrays";
351  // Upating all fixlen array columns
352  string queryString("UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" +
353  std::to_string(kARRAY) + " AND size>0;");
354  sqliteConnector_.query(queryString);
355  } catch (std::exception& e) {
356  sqliteConnector_.query("ROLLBACK TRANSACTION");
357  throw;
358  }
359  sqliteConnector_.query("END TRANSACTION");
360 }
361 
364  sqliteConnector_.query("BEGIN TRANSACTION");
365  try {
367  "select name from sqlite_master WHERE type='table' AND "
368  "name='mapd_version_history'");
369  if (sqliteConnector_.getNumRows() == 0) {
371  "CREATE TABLE mapd_version_history(version integer, migration_history text "
372  "unique)");
373  } else {
375  "select * from mapd_version_history where migration_history = "
376  "'notnull_geo_columns'");
377  if (sqliteConnector_.getNumRows() != 0) {
378  // legacy geo columns had migrated
379  // no need for further execution
380  sqliteConnector_.query("END TRANSACTION");
381  return;
382  }
383  }
384  // Insert check for migration
386  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
387  std::vector<std::string>{std::to_string(MAPD_VERSION), "notnull_geo_columns"});
388  LOG(INFO) << "Updating mapd_columns, legacy geo columns";
389  // Upating all geo columns
390  string queryString(
391  "UPDATE mapd_columns SET is_notnull=1 WHERE coltype=" + std::to_string(kPOINT) +
392  " OR coltype=" + std::to_string(kLINESTRING) + " OR coltype=" +
393  std::to_string(kPOLYGON) + " OR coltype=" + std::to_string(kMULTIPOLYGON) + ";");
394  sqliteConnector_.query(queryString);
395  } catch (std::exception& e) {
396  sqliteConnector_.query("ROLLBACK TRANSACTION");
397  throw;
398  }
399  sqliteConnector_.query("END TRANSACTION");
400 }
401 
404  sqliteConnector_.query("BEGIN TRANSACTION");
405  try {
406  // check table still exists
408  "SELECT name FROM sqlite_master WHERE type='table' AND "
409  "name='mapd_frontend_views'");
410  if (sqliteConnector_.getNumRows() == 0) {
411  // table does not exists
412  // no need to migrate
413  sqliteConnector_.query("END TRANSACTION");
414  return;
415  }
416  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_frontend_views)");
417  std::vector<std::string> cols;
418  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
419  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
420  }
421  if (std::find(cols.begin(), cols.end(), std::string("image_hash")) == cols.end()) {
422  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD image_hash text");
423  }
424  if (std::find(cols.begin(), cols.end(), std::string("update_time")) == cols.end()) {
425  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD update_time timestamp");
426  }
427  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
428  sqliteConnector_.query("ALTER TABLE mapd_frontend_views ADD view_metadata text");
429  }
430  } catch (std::exception& e) {
431  sqliteConnector_.query("ROLLBACK TRANSACTION");
432  throw;
433  }
434  sqliteConnector_.query("END TRANSACTION");
435 }
436 
439  sqliteConnector_.query("BEGIN TRANSACTION");
440  try {
442  "CREATE TABLE IF NOT EXISTS mapd_links (linkid integer primary key, userid "
443  "integer references mapd_users, "
444  "link text unique, view_state text, update_time timestamp, view_metadata text)");
445  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_links)");
446  std::vector<std::string> cols;
447  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
448  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
449  }
450  if (std::find(cols.begin(), cols.end(), std::string("view_metadata")) == cols.end()) {
451  sqliteConnector_.query("ALTER TABLE mapd_links ADD view_metadata text");
452  }
453  } catch (const std::exception& e) {
454  sqliteConnector_.query("ROLLBACK TRANSACTION");
455  throw;
456  }
457  sqliteConnector_.query("END TRANSACTION");
458 }
459 
462  sqliteConnector_.query("BEGIN TRANSACTION");
463  try {
464  sqliteConnector_.query("UPDATE mapd_links SET userid = 0 WHERE userid IS NULL");
465  // check table still exists
467  "SELECT name FROM sqlite_master WHERE type='table' AND "
468  "name='mapd_frontend_views'");
469  if (sqliteConnector_.getNumRows() == 0) {
470  // table does not exists
471  // no need to migrate
472  sqliteConnector_.query("END TRANSACTION");
473  return;
474  }
476  "UPDATE mapd_frontend_views SET userid = 0 WHERE userid IS NULL");
477  } catch (const std::exception& e) {
478  sqliteConnector_.query("ROLLBACK TRANSACTION");
479  throw;
480  }
481  sqliteConnector_.query("END TRANSACTION");
482 }
483 
484 // introduce DB version into the tables table
485 // if the DB does not have a version reset all pagesizes to 2097152 to be compatible with
486 // old value
487 
490  if (currentDB_.dbName.length() == 0) {
491  // updateDictionaryNames dbName length is zero nothing to do here
492  return;
493  }
494  sqliteConnector_.query("BEGIN TRANSACTION");
495  try {
496  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_tables)");
497  std::vector<std::string> cols;
498  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
499  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
500  }
501  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
502  LOG(INFO) << "Updating mapd_tables updatePageSize";
503  // No version number
504  // need to update the defaul tpagesize to old correct value
505  sqliteConnector_.query("UPDATE mapd_tables SET frag_page_size = 2097152 ");
506  // need to add new version info
507  string queryString("ALTER TABLE mapd_tables ADD version_num BIGINT DEFAULT " +
509  sqliteConnector_.query(queryString);
510  }
511  } catch (std::exception& e) {
512  sqliteConnector_.query("ROLLBACK TRANSACTION");
513  throw;
514  }
515  sqliteConnector_.query("END TRANSACTION");
516 }
517 
520  sqliteConnector_.query("BEGIN TRANSACTION");
521  try {
522  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
523  std::vector<std::string> cols;
524  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
525  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
526  }
527  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
528  LOG(INFO) << "Updating mapd_columns updateDeletedColumnIndicator";
529  // need to add new version info
530  string queryString("ALTER TABLE mapd_columns ADD version_num BIGINT DEFAULT " +
532  sqliteConnector_.query(queryString);
533  // need to add new column to table defintion to indicate deleted column, column used
534  // as bitmap for deleted rows.
536  "ALTER TABLE mapd_columns ADD is_deletedcol boolean default 0 ");
537  }
538  } catch (std::exception& e) {
539  sqliteConnector_.query("ROLLBACK TRANSACTION");
540  throw;
541  }
542  sqliteConnector_.query("END TRANSACTION");
543 }
544 
547  sqliteConnector_.query("BEGIN TRANSACTION");
548  try {
549  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_columns)");
550  std::vector<std::string> cols;
551  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
552  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
553  }
554  if (std::find(cols.begin(), cols.end(), std::string("default_value")) == cols.end()) {
555  LOG(INFO) << "Adding support for default values to mapd_columns";
556  sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
557  }
558  } catch (std::exception& e) {
559  LOG(ERROR) << "Failed to make metadata update for default values` support";
560  sqliteConnector_.query("ROLLBACK TRANSACTION");
561  throw;
562  }
563  sqliteConnector_.query("END TRANSACTION");
564 }
565 
566 // introduce DB version into the dictionary tables
567 // if the DB does not have a version rename all dictionary tables
568 
571  if (currentDB_.dbName.length() == 0) {
572  // updateDictionaryNames dbName length is zero nothing to do here
573  return;
574  }
575  sqliteConnector_.query("BEGIN TRANSACTION");
576  try {
577  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
578  std::vector<std::string> cols;
579  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
580  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
581  }
582  if (std::find(cols.begin(), cols.end(), std::string("version_num")) == cols.end()) {
583  // No version number
584  // need to rename dictionaries
585  string dictQuery("SELECT dictid, name from mapd_dictionaries");
586  sqliteConnector_.query(dictQuery);
587  size_t numRows = sqliteConnector_.getNumRows();
588  for (size_t r = 0; r < numRows; ++r) {
589  int dictId = sqliteConnector_.getData<int>(r, 0);
590  std::string dictName = sqliteConnector_.getData<string>(r, 1);
591 
592  std::string oldName = g_base_path + "/" + shared::kDataDirectoryName + "/" +
593  currentDB_.dbName + "_" + dictName;
594  std::string newName = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
595  std::to_string(currentDB_.dbId) + "_DICT_" +
596  std::to_string(dictId);
597 
598  int result = rename(oldName.c_str(), newName.c_str());
599 
600  if (result == 0) {
601  LOG(INFO) << "Dictionary upgrade: successfully renamed " << oldName << " to "
602  << newName;
603  } else {
604  LOG(ERROR) << "Failed to rename old dictionary directory " << oldName << " to "
605  << newName + " dbname '" << currentDB_.dbName << "' error code "
606  << std::to_string(result);
607  }
608  }
609  // need to add new version info
610  string queryString("ALTER TABLE mapd_dictionaries ADD version_num BIGINT DEFAULT " +
612  sqliteConnector_.query(queryString);
613  }
614  } catch (std::exception& e) {
615  sqliteConnector_.query("ROLLBACK TRANSACTION");
616  throw;
617  }
618  sqliteConnector_.query("END TRANSACTION");
619 }
620 
623  sqliteConnector_.query("BEGIN TRANSACTION");
624  try {
626  "CREATE TABLE IF NOT EXISTS mapd_logical_to_physical("
627  "logical_table_id integer, physical_table_id integer)");
628  } catch (const std::exception& e) {
629  sqliteConnector_.query("ROLLBACK TRANSACTION");
630  throw;
631  }
632  sqliteConnector_.query("END TRANSACTION");
633 }
634 
635 void Catalog::updateLogicalToPhysicalTableMap(const int32_t logical_tb_id) {
636  /* this proc inserts/updates all pairs of (logical_tb_id, physical_tb_id) in
637  * sqlite mapd_logical_to_physical table for given logical_tb_id as needed
638  */
639 
641  sqliteConnector_.query("BEGIN TRANSACTION");
642  try {
643  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
644  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
645  const auto physicalTables = physicalTableIt->second;
646  CHECK(!physicalTables.empty());
647  for (size_t i = 0; i < physicalTables.size(); i++) {
648  int32_t physical_tb_id = physicalTables[i];
650  "INSERT OR REPLACE INTO mapd_logical_to_physical (logical_table_id, "
651  "physical_table_id) VALUES (?1, ?2)",
652  std::vector<std::string>{std::to_string(logical_tb_id),
653  std::to_string(physical_tb_id)});
654  }
655  }
656  } catch (std::exception& e) {
657  sqliteConnector_.query("ROLLBACK TRANSACTION");
658  throw;
659  }
660  sqliteConnector_.query("END TRANSACTION");
661 }
662 
665  sqliteConnector_.query("BEGIN TRANSACTION");
666  try {
667  sqliteConnector_.query("PRAGMA TABLE_INFO(mapd_dictionaries)");
668  std::vector<std::string> cols;
669  for (size_t i = 0; i < sqliteConnector_.getNumRows(); i++) {
670  cols.push_back(sqliteConnector_.getData<std::string>(i, 1));
671  }
672  if (std::find(cols.begin(), cols.end(), std::string("refcount")) == cols.end()) {
673  sqliteConnector_.query("ALTER TABLE mapd_dictionaries ADD refcount DEFAULT 1");
674  }
675  } catch (std::exception& e) {
676  sqliteConnector_.query("ROLLBACK TRANSACTION");
677  throw;
678  }
679  sqliteConnector_.query("END TRANSACTION");
680 }
681 
684  sqliteConnector_.query("BEGIN TRANSACTION");
685  try {
688  } catch (std::exception& e) {
689  sqliteConnector_.query("ROLLBACK TRANSACTION");
690  throw;
691  }
692  sqliteConnector_.query("END TRANSACTION");
693 }
694 
696  // TODO: Move common migration logic to a shared function.
698  sqliteConnector_.query("BEGIN TRANSACTION");
699  try {
701  "select name from sqlite_master WHERE type='table' AND "
702  "name='mapd_version_history'");
703  static const std::string migration_name{"rename_legacy_data_wrappers"};
704  if (sqliteConnector_.getNumRows() == 0) {
706  "CREATE TABLE mapd_version_history(version integer, migration_history text "
707  "unique)");
708  } else {
710  "select * from mapd_version_history where migration_history = "
711  "'" +
712  migration_name + "'");
713  if (sqliteConnector_.getNumRows() != 0) {
714  // Migration already done.
715  sqliteConnector_.query("END TRANSACTION");
716  return;
717  }
718  }
719  LOG(INFO) << "Executing " << migration_name << " migration.";
720 
721  // Update legacy data wrapper names
723  // clang-format off
724  std::map<std::string, std::string> old_to_new_wrapper_names{
725  {"OMNISCI_CSV", DataWrapperType::CSV},
726  {"OMNISCI_PARQUET", DataWrapperType::PARQUET},
727  {"OMNISCI_REGEX_PARSER", DataWrapperType::REGEX_PARSER},
728  {"OMNISCI_INTERNAL_CATALOG", DataWrapperType::INTERNAL_CATALOG},
729  {"INTERNAL_OMNISCI_MEMORY_STATS", DataWrapperType::INTERNAL_MEMORY_STATS},
730  {"INTERNAL_OMNISCI_STORAGE_STATS", DataWrapperType::INTERNAL_STORAGE_STATS}
731  };
732  // clang-format on
733 
734  for (const auto& [old_wrapper_name, new_wrapper_name] : old_to_new_wrapper_names) {
736  "UPDATE omnisci_foreign_servers SET data_wrapper_type = ? WHERE "
737  "data_wrapper_type = ?",
738  std::vector<std::string>{new_wrapper_name, old_wrapper_name});
739  }
740 
741  // Record migration.
743  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
744  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
745  LOG(INFO) << migration_name << " migration completed.";
746  } catch (std::exception& e) {
747  sqliteConnector_.query("ROLLBACK TRANSACTION");
748  throw;
749  }
750  sqliteConnector_.query("END TRANSACTION");
751 }
752 
755  sqliteConnector_.query("BEGIN TRANSACTION");
756  try {
758  } catch (const std::exception& e) {
759  sqliteConnector_.query("ROLLBACK TRANSACTION");
760  throw;
761  }
762  sqliteConnector_.query("END TRANSACTION");
763 }
764 
765 const std::string Catalog::getForeignServerSchema(bool if_not_exists) {
766  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
767  "omnisci_foreign_servers(id integer primary key, name text unique, " +
768  "data_wrapper_type text, owner_user_id integer, creation_time integer, " +
769  "options text)";
770 }
771 
772 const std::string Catalog::getForeignTableSchema(bool if_not_exists) {
773  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
774  "omnisci_foreign_tables(table_id integer unique, server_id integer, " +
775  "options text, last_refresh_time integer, next_refresh_time integer, " +
776  "FOREIGN KEY(table_id) REFERENCES mapd_tables(tableid), " +
777  "FOREIGN KEY(server_id) REFERENCES omnisci_foreign_servers(id))";
778 }
779 
780 const std::string Catalog::getCustomExpressionsSchema(bool if_not_exists) {
781  return "CREATE TABLE " + (if_not_exists ? std::string{"IF NOT EXISTS "} : "") +
782  "omnisci_custom_expressions(id integer primary key, name text, " +
783  "expression_json text, data_source_type text, " +
784  "data_source_id integer, is_deleted boolean)";
785 }
786 
789  sqliteConnector_.query("BEGIN TRANSACTION");
790  std::vector<DBObject> objects;
791  try {
793  "SELECT name FROM sqlite_master WHERE type='table' AND "
794  "name='mapd_record_ownership_marker'");
795  // check if mapd catalog - marker exists
796  if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId == 1) {
797  // already done
798  sqliteConnector_.query("END TRANSACTION");
799  return;
800  }
801  // check if different catalog - marker exists
802  else if (sqliteConnector_.getNumRows() != 0 && currentDB_.dbId != 1) {
803  sqliteConnector_.query("SELECT dummy FROM mapd_record_ownership_marker");
804  // Check if migration is being performed on existing non mapd catalogs
805  // Older non mapd dbs will have table but no record in them
806  if (sqliteConnector_.getNumRows() != 0) {
807  // already done
808  sqliteConnector_.query("END TRANSACTION");
809  return;
810  }
811  }
812  // marker not exists - create one
813  else {
814  sqliteConnector_.query("CREATE TABLE mapd_record_ownership_marker (dummy integer)");
815  }
816 
817  DBMetadata db;
818  CHECK(SysCatalog::instance().getMetadataForDB(currentDB_.dbName, db));
819  // place dbId as a refernce for migration being performed
821  "INSERT INTO mapd_record_ownership_marker (dummy) VALUES (?1)",
822  std::vector<std::string>{std::to_string(db.dbOwner)});
823 
824  static const std::map<const DBObjectType, const AccessPrivileges>
825  object_level_all_privs_lookup{
831 
832  // grant owner all permissions on DB
833  DBObjectKey key;
834  key.dbId = currentDB_.dbId;
835  auto _key_place = [&key](auto type) {
836  key.permissionType = type;
837  return key;
838  };
839  for (auto& it : object_level_all_privs_lookup) {
840  objects.emplace_back(_key_place(it.first), it.second, db.dbOwner);
841  objects.back().setName(currentDB_.dbName);
842  }
843 
844  {
845  // other users tables and views
846  string tableQuery(
847  "SELECT tableid, name, userid, isview FROM mapd_tables WHERE userid > 0");
848  sqliteConnector_.query(tableQuery);
849  size_t numRows = sqliteConnector_.getNumRows();
850  for (size_t r = 0; r < numRows; ++r) {
851  int32_t tableid = sqliteConnector_.getData<int>(r, 0);
852  std::string tableName = sqliteConnector_.getData<string>(r, 1);
853  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
854  bool isview = sqliteConnector_.getData<bool>(r, 3);
855 
858  DBObjectKey key;
859  key.dbId = currentDB_.dbId;
860  key.objectId = tableid;
861  key.permissionType = type;
862 
863  DBObject obj(tableName, type);
864  obj.setObjectKey(key);
865  obj.setOwner(ownerid);
868 
869  objects.push_back(obj);
870  }
871  }
872 
873  {
874  // other users dashboards
875  string tableQuery("SELECT id, name, userid FROM mapd_dashboards WHERE userid > 0");
876  sqliteConnector_.query(tableQuery);
877  size_t numRows = sqliteConnector_.getNumRows();
878  for (size_t r = 0; r < numRows; ++r) {
879  int32_t dashId = sqliteConnector_.getData<int>(r, 0);
880  std::string dashName = sqliteConnector_.getData<string>(r, 1);
881  int32_t ownerid = sqliteConnector_.getData<int>(r, 2);
882 
884  DBObjectKey key;
885  key.dbId = currentDB_.dbId;
886  key.objectId = dashId;
887  key.permissionType = type;
888 
889  DBObject obj(dashName, type);
890  obj.setObjectKey(key);
891  obj.setOwner(ownerid);
893 
894  objects.push_back(obj);
895  }
896  }
897  } catch (const std::exception& e) {
898  sqliteConnector_.query("ROLLBACK TRANSACTION");
899  throw;
900  }
901  sqliteConnector_.query("END TRANSACTION");
902 
903  // now apply the objects to the syscat to track the permisisons
904  // moved outside transaction to avoid lock in sqlite
905  try {
907  } catch (const std::exception& e) {
908  LOG(ERROR) << " Issue during migration of DB " << name() << " issue was " << e.what();
909  throw std::runtime_error(" Issue during migration of DB " + name() + " issue was " +
910  e.what());
911  // will need to remove the mapd_record_ownership_marker table and retry
912  }
913 }
914 
919 }
920 
922  std::unordered_map<std::string, std::pair<int, std::string>> dashboards;
923  std::vector<std::string> dashboard_ids;
924  static const std::string migration_name{"dashboard_roles_migration"};
925  {
927  sqliteConnector_.query("BEGIN TRANSACTION");
928  try {
929  // migration_history should be present in all catalogs by now
930  // if not then would be created before this migration
932  "select * from mapd_version_history where migration_history = '" +
933  migration_name + "'");
934  if (sqliteConnector_.getNumRows() != 0) {
935  // no need for further execution
936  sqliteConnector_.query("END TRANSACTION");
937  return;
938  }
939  LOG(INFO) << "Performing dashboard internal roles Migration.";
940  sqliteConnector_.query("select id, userid, metadata from mapd_dashboards");
941  for (size_t i = 0; i < sqliteConnector_.getNumRows(); ++i) {
944  sqliteConnector_.getData<string>(i, 0)))) {
945  // Successfully created roles during previous migration/crash
946  // No need to include them
947  continue;
948  }
949  dashboards[sqliteConnector_.getData<string>(i, 0)] = std::make_pair(
950  sqliteConnector_.getData<int>(i, 1), sqliteConnector_.getData<string>(i, 2));
951  dashboard_ids.push_back(sqliteConnector_.getData<string>(i, 0));
952  }
953  } catch (const std::exception& e) {
954  sqliteConnector_.query("ROLLBACK TRANSACTION");
955  throw;
956  }
957  sqliteConnector_.query("END TRANSACTION");
958  }
959  // All current grantees with shared dashboards.
960  const auto active_grantees =
962 
963  try {
964  // NOTE(wamsi): Transactionally unsafe
965  for (auto dash : dashboards) {
966  createOrUpdateDashboardSystemRole(dash.second.second,
967  dash.second.first,
969  std::to_string(currentDB_.dbId), dash.first));
970  auto result = active_grantees.find(dash.first);
971  if (result != active_grantees.end()) {
974  dash.first)},
975  result->second);
976  }
977  }
979  // check if this has already been completed
981  "select * from mapd_version_history where migration_history = '" +
982  migration_name + "'");
983  if (sqliteConnector_.getNumRows() != 0) {
984  return;
985  }
987  "INSERT INTO mapd_version_history(version, migration_history) values(?,?)",
988  std::vector<std::string>{std::to_string(MAPD_VERSION), migration_name});
989  } catch (const std::exception& e) {
990  LOG(ERROR) << "Failed to create dashboard system roles during migration: "
991  << e.what();
992  throw;
993  }
994  LOG(INFO) << "Successfully created dashboard system roles during migration.";
995 }
996 
1002  updateGeoColumns();
1005  updateLinkSchema();
1009  updatePageSize();
1013  if (g_enable_fsi) {
1014  updateFsiSchemas();
1016  }
1019 }
1020 
1024 }
1025 
1026 namespace {
1027 std::map<int32_t, std::string> get_user_id_to_user_name_map() {
1028  auto users = SysCatalog::instance().getAllUserMetadata();
1029  std::map<int32_t, std::string> user_name_by_user_id;
1030  for (const auto& user : users) {
1031  user_name_by_user_id[user.userId] = user.userName;
1032  }
1033  return user_name_by_user_id;
1034 }
1035 
1037  int32_t id,
1038  const std::map<int32_t, std::string>& user_name_by_user_id) {
1039  auto entry = user_name_by_user_id.find(id);
1040  if (entry != user_name_by_user_id.end()) {
1041  return entry->second;
1042  }
1043  // a user could be deleted and a dashboard still exist?
1044  return "Unknown";
1045 }
1046 } // namespace
1047 
1049  std::string dictQuery(
1050  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1051  sqliteConnector_.query(dictQuery);
1052  auto numRows = sqliteConnector_.getNumRows();
1053  for (size_t r = 0; r < numRows; ++r) {
1054  auto dictId = sqliteConnector_.getData<int>(r, 0);
1055  auto dictName = sqliteConnector_.getData<string>(r, 1);
1056  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1057  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1058  auto refcount = sqliteConnector_.getData<int>(r, 4);
1059  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1060  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1061  DictRef dict_ref(currentDB_.dbId, dictId);
1062  auto dd = new DictDescriptor(
1063  dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1064  dictDescriptorMapByRef_[dict_ref].reset(dd);
1065  }
1066 }
1067 
1068 // NOTE(sy): Only used by --multi-instance clusters.
1069 void Catalog::reloadTableMetadata(int table_id) {
1070  cat_write_lock write_lock(this);
1072  reloadTableMetadataUnlocked(table_id);
1073 }
1074 
1075 // NOTE(sy): Only used by --multi-instance clusters.
1077  // Reload dictionaries first.
1078  // TODO(sy): Does dictionary reloading really belong here?
1079  // We don't have dictionary locks in the system but maybe we need them.
1080  list<DictDescriptor> dicts;
1081  std::string dictQuery(
1082  "SELECT dictid, name, nbits, is_shared, refcount from mapd_dictionaries");
1083  sqliteConnector_.query(dictQuery);
1084  auto numRows = sqliteConnector_.getNumRows();
1085  for (size_t r = 0; r < numRows; ++r) {
1086  auto dictId = sqliteConnector_.getData<int>(r, 0);
1087  auto dictName = sqliteConnector_.getData<string>(r, 1);
1088  auto dictNBits = sqliteConnector_.getData<int>(r, 2);
1089  auto is_shared = sqliteConnector_.getData<bool>(r, 3);
1090  auto refcount = sqliteConnector_.getData<int>(r, 4);
1091  auto fname = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
1092  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
1093  DictRef dict_ref(currentDB_.dbId, dictId);
1094  DictDescriptor dd(dict_ref, dictName, dictNBits, is_shared, refcount, fname, false);
1095  if (auto it = dictDescriptorMapByRef_.find(dict_ref);
1096  it == dictDescriptorMapByRef_.end()) {
1097  dictDescriptorMapByRef_[dict_ref] = std::make_unique<DictDescriptor>(dd);
1098  } else {
1099  *it->second = dd;
1100  }
1101  }
1102 
1103  // Delete this table's metadata from the in-memory cache before reloading.
1104  TableDescriptor* original_td;
1105  std::list<ColumnDescriptor*> original_cds;
1106  if (auto it1 = tableDescriptorMapById_.find(table_id);
1107  it1 != tableDescriptorMapById_.end()) {
1108  original_td = it1->second;
1109  tableDescriptorMapById_.erase(it1);
1110  if (auto it2 = tableDescriptorMap_.find(original_td->tableName);
1111  it2 != tableDescriptorMap_.end()) {
1112  CHECK_EQ(original_td, it2->second);
1113  tableDescriptorMap_.erase(it2);
1114  }
1115  if (original_td->hasDeletedCol) {
1116  const auto ret = deletedColumnPerTable_.erase(original_td);
1117  CHECK_EQ(ret, size_t(1));
1118  }
1119  for (int column_id = 0; column_id < original_td->nColumns; ++column_id) {
1120  if (auto it3 = columnDescriptorMapById_.find({table_id, column_id});
1121  it3 != columnDescriptorMapById_.end()) {
1122  ColumnDescriptor* original_cd = it3->second;
1123  original_cds.push_back(original_cd);
1124  removeFromColumnMap(original_cd);
1125  }
1126  }
1127  }
1128 
1129  // Reload the table descriptor.
1130  std::string tableQuery(
1131  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1132  "max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, "
1133  "num_shards, key_metainfo, userid, sort_column_id, storage_type, "
1134  "max_rollback_epochs, is_system_table from mapd_tables WHERE tableid = " +
1135  std::to_string(table_id));
1136  sqliteConnector_.query(tableQuery);
1137  numRows = sqliteConnector_.getNumRows();
1138  if (!numRows) {
1139  return; // Table was deleted by another node.
1140  }
1141 
1142  TableDescriptor* td;
1143  const auto& storage_type = sqliteConnector_.getData<string>(0, 17);
1144  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1145  const auto& table_name = sqliteConnector_.getData<string>(0, 1);
1146  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1147  "supported table option (table "
1148  << table_name << " [" << table_id << "] in database " << currentDB_.dbName
1149  << ").";
1150  }
1151 
1152  if (storage_type == StorageType::FOREIGN_TABLE) {
1153  td = new foreign_storage::ForeignTable();
1154  } else {
1155  td = new TableDescriptor();
1156  }
1157 
1158  td->storageType = storage_type;
1159  td->tableId = sqliteConnector_.getData<int>(0, 0);
1160  td->tableName = sqliteConnector_.getData<string>(0, 1);
1161  td->nColumns = sqliteConnector_.getData<int>(0, 2);
1162  td->isView = sqliteConnector_.getData<bool>(0, 3);
1163  td->fragments = sqliteConnector_.getData<string>(0, 4);
1164  td->fragType =
1165  (Fragmenter_Namespace::FragmenterType)sqliteConnector_.getData<int>(0, 5);
1166  td->maxFragRows = sqliteConnector_.getData<int>(0, 6);
1167  td->maxChunkSize = sqliteConnector_.getData<int64_t>(0, 7);
1168  td->fragPageSize = sqliteConnector_.getData<int>(0, 8);
1169  td->maxRows = sqliteConnector_.getData<int64_t>(0, 9);
1170  td->partitions = sqliteConnector_.getData<string>(0, 10);
1171  td->shardedColumnId = sqliteConnector_.getData<int>(0, 11);
1172  td->shard = sqliteConnector_.getData<int>(0, 12);
1173  td->nShards = sqliteConnector_.getData<int>(0, 13);
1174  td->keyMetainfo = sqliteConnector_.getData<string>(0, 14);
1175  td->userId = sqliteConnector_.getData<int>(0, 15);
1176  td->sortedColumnId =
1177  sqliteConnector_.isNull(0, 16) ? 0 : sqliteConnector_.getData<int>(0, 16);
1178  if (!td->isView) {
1179  td->fragmenter = nullptr;
1180  }
1181  td->maxRollbackEpochs = sqliteConnector_.getData<int>(0, 18);
1182  td->is_system_table = sqliteConnector_.getData<bool>(0, 19);
1183  td->hasDeletedCol = false;
1184 
1185  if (auto tableDescIt = tableDescriptorMapById_.find(table_id);
1186  tableDescIt != tableDescriptorMapById_.end()) {
1187  tableDescIt->second->fragmenter = nullptr;
1188  delete tableDescIt->second;
1189  }
1190 
1191  // Reload the column descriptors.
1192  std::list<ColumnDescriptor*> cds;
1193  std::string columnQuery(
1194  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1195  "is_notnull, compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
1196  "virtual_expr, is_deletedcol, default_value from mapd_columns WHERE tableid = " +
1197  std::to_string(table_id) + " ORDER BY tableid, columnid");
1198  sqliteConnector_.query(columnQuery);
1199  numRows = sqliteConnector_.getNumRows();
1200  int32_t skip_physical_cols = 0;
1201  for (size_t r = 0; r < numRows; ++r) {
1202  ColumnDescriptor* cd = new ColumnDescriptor();
1203  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1204  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1205  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1206  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
1207  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
1208  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
1209  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1210  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
1211  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
1212  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
1213  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1214  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1215  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1216  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1217  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1218  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1219  if (sqliteConnector_.isNull(r, 16)) {
1220  cd->default_value = std::nullopt;
1221  } else {
1222  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1223  }
1224  cd->isGeoPhyCol = skip_physical_cols-- > 0;
1225  cds.push_back(cd);
1226  }
1227 
1228  // Store the descriptors into the cache.
1229  if (original_td) {
1230  td->mutex_ = original_td->mutex_; // TODO(sy): Unnecessary?
1231  delete original_td;
1232  original_td = nullptr;
1233  }
1234  for (ColumnDescriptor* original_cd : original_cds) {
1235  delete original_cd;
1236  }
1237  original_cds.clear();
1238  tableDescriptorMap_[to_upper(td->tableName)] = td;
1239  tableDescriptorMapById_[td->tableId] = td;
1240  skip_physical_cols = 0;
1241  for (ColumnDescriptor* cd : cds) {
1242  addToColumnMap(cd);
1243 
1244  if (skip_physical_cols <= 0) {
1245  skip_physical_cols = cd->columnType.get_physical_cols();
1246  }
1247 
1248  if (cd->isDeletedCol) {
1249  td->hasDeletedCol = true;
1250  setDeletedColumnUnlocked(td, cd);
1251  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1252  td->columnIdBySpi_.push_back(cd->columnId);
1253  }
1254  }
1255 
1256  // Notify Calcite about the reloaded table.
1257  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
1258 }
1259 
1260 // NOTE(sy): Only used by --multi-instance clusters.
1261 void Catalog::reloadCatalogMetadata(
1262  const std::map<int32_t, std::string>& user_name_by_user_id) {
1263  cat_write_lock write_lock(this);
1265  reloadCatalogMetadataUnlocked(get_user_id_to_user_name_map());
1266 }
1267 
1268 // NOTE(sy): Only used by --multi-instance clusters.
1269 void Catalog::reloadCatalogMetadataUnlocked(
1270  const std::map<int32_t, std::string>& user_name_by_user_id) {
1272 
1273  // Notice when tables or columns have been created, dropped, or changed by other nodes.
1274  // Needed so that users will see reasonably-correct lists of what objects exist.
1275 
1276  // Load the list of table ID's that exist on disk storage.
1277  std::set<int> cluster_table_ids;
1278  std::string tableQuery("SELECT tableid from mapd_tables");
1279  sqliteConnector_.query(tableQuery);
1280  auto numRows = sqliteConnector_.getNumRows();
1281  for (size_t r = 0; r < numRows; ++r) {
1282  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1283  cluster_table_ids.insert(table_id);
1284  }
1285 
1286  // Ignore any table ID's locked by other threads on this node.
1287  // Those other threads are already handling any necessary reloading for those tables.
1288  std::set<int> ignored_table_ids;
1289  for (ChunkKey const& k : lockmgr::TableSchemaLockMgr::instance().getLockedTables()) {
1290  CHECK_EQ(k.size(), 2U);
1291  if (k[CHUNK_KEY_DB_IDX] != currentDB_.dbId) {
1292  continue;
1293  }
1294  ignored_table_ids.insert(k[CHUNK_KEY_TABLE_IDX]);
1295  }
1296 
1297  // For this node's Catalog cache:
1298  // Newly created table schemas created by other nodes need to be loaded.
1299  // Unlocked table schemas might have been renamed by other nodes; just reload them all.
1300  // Deleted table schemas still in this node's cache need to be flushed.
1301  std::set<int> reload_table_ids;
1302  for (auto const& cluster_table_id : cluster_table_ids) {
1303  if (ignored_table_ids.find(cluster_table_id) == ignored_table_ids.end()) {
1304  reload_table_ids.insert(cluster_table_id);
1305  }
1306  }
1307  for (auto const& [cached_table_id, td] : tableDescriptorMapById_) {
1308  if (cluster_table_ids.find(cached_table_id) == cluster_table_ids.end()) {
1309  reload_table_ids.insert(cached_table_id);
1310  }
1311  }
1312 
1313  // Reload tables.
1314  for (auto const& reload_table_id : reload_table_ids) {
1315  reloadTableMetadataUnlocked(reload_table_id);
1316  }
1317 
1319 
1320  dashboardDescriptorMap_.clear();
1321  linkDescriptorMap_.clear();
1322  linkDescriptorMapById_.clear();
1323  foreignServerMap_.clear();
1324  foreignServerMapById_.clear();
1325  custom_expr_map_by_id_.clear();
1326 
1327  if (g_enable_fsi) {
1328  buildForeignServerMapUnlocked();
1329  }
1330 
1331  updateViewsInMapUnlocked();
1332  buildDashboardsMapUnlocked(user_name_by_user_id);
1333  buildLinksMapUnlocked();
1334  buildCustomExpressionsMapUnlocked();
1335 
1336  // Notify Calcite about the reloaded database.
1337  if (calciteMgr_) {
1338  calciteMgr_->updateMetadata(currentDB_.dbName, {});
1339  }
1340 }
1341 
1342 void Catalog::buildTablesMapUnlocked() {
1343  std::string tableQuery(
1344  "SELECT tableid, name, ncolumns, isview, fragments, frag_type, max_frag_rows, "
1345  "max_chunk_size, frag_page_size, "
1346  "max_rows, partitions, shard_column_id, shard, num_shards, key_metainfo, userid, "
1347  "sort_column_id, storage_type, max_rollback_epochs, is_system_table "
1348  "from mapd_tables");
1349  sqliteConnector_.query(tableQuery);
1350  auto numRows = sqliteConnector_.getNumRows();
1351  for (size_t r = 0; r < numRows; ++r) {
1352  TableDescriptor* td;
1353  const auto& storage_type = sqliteConnector_.getData<string>(r, 17);
1354  if (!storage_type.empty() && storage_type != StorageType::FOREIGN_TABLE) {
1355  const auto table_id = sqliteConnector_.getData<int>(r, 0);
1356  const auto& table_name = sqliteConnector_.getData<string>(r, 1);
1357  LOG(FATAL) << "Unable to read Catalog metadata: storage type is currently not a "
1358  "supported table option (table "
1359  << table_name << " [" << table_id << "] in database "
1360  << currentDB_.dbName << ").";
1361  }
1362 
1363  if (storage_type == StorageType::FOREIGN_TABLE) {
1364  td = new foreign_storage::ForeignTable();
1365  } else {
1366  td = new TableDescriptor();
1367  }
1368 
1369  td->storageType = storage_type;
1370  td->tableId = sqliteConnector_.getData<int>(r, 0);
1371  td->tableName = sqliteConnector_.getData<string>(r, 1);
1372  td->nColumns = sqliteConnector_.getData<int>(r, 2);
1373  td->isView = sqliteConnector_.getData<bool>(r, 3);
1374  td->fragments = sqliteConnector_.getData<string>(r, 4);
1375  td->fragType =
1376  (Fragmenter_Namespace::FragmenterType)sqliteConnector_.getData<int>(r, 5);
1377  td->maxFragRows = sqliteConnector_.getData<int>(r, 6);
1378  td->maxChunkSize = sqliteConnector_.getData<int64_t>(r, 7);
1379  td->fragPageSize = sqliteConnector_.getData<int>(r, 8);
1380  td->maxRows = sqliteConnector_.getData<int64_t>(r, 9);
1381  td->partitions = sqliteConnector_.getData<string>(r, 10);
1382  td->shardedColumnId = sqliteConnector_.getData<int>(r, 11);
1383  td->shard = sqliteConnector_.getData<int>(r, 12);
1384  td->nShards = sqliteConnector_.getData<int>(r, 13);
1385  td->keyMetainfo = sqliteConnector_.getData<string>(r, 14);
1386  td->userId = sqliteConnector_.getData<int>(r, 15);
1387  td->sortedColumnId =
1388  sqliteConnector_.isNull(r, 16) ? 0 : sqliteConnector_.getData<int>(r, 16);
1389  if (!td->isView) {
1390  td->fragmenter = nullptr;
1391  }
1392  td->maxRollbackEpochs = sqliteConnector_.getData<int>(r, 18);
1393  td->is_system_table = sqliteConnector_.getData<bool>(r, 19);
1394  td->hasDeletedCol = false;
1395 
1396  tableDescriptorMap_[to_upper(td->tableName)] = td;
1397  tableDescriptorMapById_[td->tableId] = td;
1398  }
1399 }
1400 
1401 void Catalog::buildColumnsMapUnlocked() {
1402  std::string columnQuery(
1403  "SELECT tableid, columnid, name, coltype, colsubtype, coldim, colscale, "
1404  "is_notnull, compression, comp_param, "
1405  "size, chunks, is_systemcol, is_virtualcol, virtual_expr, is_deletedcol, "
1406  "default_value from "
1407  "mapd_columns ORDER BY tableid, "
1408  "columnid");
1409  sqliteConnector_.query(columnQuery);
1410  auto numRows = sqliteConnector_.getNumRows();
1411  int32_t skip_physical_cols = 0;
1412  for (size_t r = 0; r < numRows; ++r) {
1413  ColumnDescriptor* cd = new ColumnDescriptor();
1414  cd->tableId = sqliteConnector_.getData<int>(r, 0);
1415  cd->columnId = sqliteConnector_.getData<int>(r, 1);
1416  cd->columnName = sqliteConnector_.getData<string>(r, 2);
1417  cd->columnType.set_type((SQLTypes)sqliteConnector_.getData<int>(r, 3));
1418  cd->columnType.set_subtype((SQLTypes)sqliteConnector_.getData<int>(r, 4));
1419  cd->columnType.set_dimension(sqliteConnector_.getData<int>(r, 5));
1420  cd->columnType.set_scale(sqliteConnector_.getData<int>(r, 6));
1421  cd->columnType.set_notnull(sqliteConnector_.getData<bool>(r, 7));
1422  cd->columnType.set_compression((EncodingType)sqliteConnector_.getData<int>(r, 8));
1423  cd->columnType.set_comp_param(sqliteConnector_.getData<int>(r, 9));
1424  cd->columnType.set_size(sqliteConnector_.getData<int>(r, 10));
1425  cd->chunks = sqliteConnector_.getData<string>(r, 11);
1426  cd->isSystemCol = sqliteConnector_.getData<bool>(r, 12);
1427  cd->isVirtualCol = sqliteConnector_.getData<bool>(r, 13);
1428  cd->virtualExpr = sqliteConnector_.getData<string>(r, 14);
1429  cd->isDeletedCol = sqliteConnector_.getData<bool>(r, 15);
1430  if (sqliteConnector_.isNull(r, 16)) {
1431  cd->default_value = std::nullopt;
1432  } else {
1433  cd->default_value = std::make_optional(sqliteConnector_.getData<string>(r, 16));
1434  }
1435  cd->isGeoPhyCol = skip_physical_cols > 0;
1436  addToColumnMap(cd);
1437 
1438  if (skip_physical_cols <= 0) {
1439  skip_physical_cols = cd->columnType.get_physical_cols();
1440  }
1441 
1442  auto td_itr = tableDescriptorMapById_.find(cd->tableId);
1443  CHECK(td_itr != tableDescriptorMapById_.end());
1444 
1445  if (cd->isDeletedCol) {
1446  td_itr->second->hasDeletedCol = true;
1447  setDeletedColumnUnlocked(td_itr->second, cd);
1448  } else if (cd->columnType.is_geometry() || skip_physical_cols-- <= 0) {
1449  tableDescriptorMapById_[cd->tableId]->columnIdBySpi_.push_back(cd->columnId);
1450  }
1451  }
1452 
1453  // sort columnIdBySpi_ based on columnId
1454  for (auto& tit : tableDescriptorMapById_) {
1455  std::sort(tit.second->columnIdBySpi_.begin(),
1456  tit.second->columnIdBySpi_.end(),
1457  [](const size_t a, const size_t b) -> bool { return a < b; });
1458  }
1459 } // namespace Catalog_Namespace
1460 
1461 void Catalog::updateViewsInMapUnlocked() {
1462  std::string viewQuery("SELECT tableid, sql FROM mapd_views");
1463  sqliteConnector_.query(viewQuery);
1464  auto numRows = sqliteConnector_.getNumRows();
1465  for (size_t r = 0; r < numRows; ++r) {
1466  auto tableId = sqliteConnector_.getData<int>(r, 0);
1467  auto td = tableDescriptorMapById_[tableId];
1468  td->viewSQL = sqliteConnector_.getData<string>(r, 1);
1469  td->fragmenter = nullptr;
1470  }
1471 }
1472 
1473 void Catalog::buildDashboardsMapUnlocked(
1474  const std::map<int32_t, std::string>& user_name_by_user_id) {
1475  std::string frontendViewQuery(
1476  "SELECT id, state, name, image_hash, strftime('%Y-%m-%dT%H:%M:%SZ', update_time), "
1477  "userid, "
1478  "metadata "
1479  "FROM mapd_dashboards");
1480  sqliteConnector_.query(frontendViewQuery);
1481  auto numRows = sqliteConnector_.getNumRows();
1482  for (size_t r = 0; r < numRows; ++r) {
1483  auto vd = std::make_shared<DashboardDescriptor>();
1484  vd->dashboardId = sqliteConnector_.getData<int>(r, 0);
1485  vd->dashboardState = sqliteConnector_.getData<string>(r, 1);
1486  vd->dashboardName = sqliteConnector_.getData<string>(r, 2);
1487  vd->imageHash = sqliteConnector_.getData<string>(r, 3);
1488  vd->updateTime = sqliteConnector_.getData<string>(r, 4);
1489  vd->userId = sqliteConnector_.getData<int>(r, 5);
1490  vd->dashboardMetadata = sqliteConnector_.getData<string>(r, 6);
1491  vd->user = get_user_name_from_id(vd->userId, user_name_by_user_id);
1492  vd->dashboardSystemRoleName = generate_dashboard_system_rolename(
1493  std::to_string(currentDB_.dbId), sqliteConnector_.getData<string>(r, 0));
1494  dashboardDescriptorMap_[std::to_string(vd->userId) + ":" + vd->dashboardName] = vd;
1495  }
1496 }
1497 
1498 void Catalog::buildLinksMapUnlocked() {
1499  std::string linkQuery(
1500  "SELECT linkid, userid, link, view_state, strftime('%Y-%m-%dT%H:%M:%SZ', "
1501  "update_time), view_metadata "
1502  "FROM mapd_links");
1503  sqliteConnector_.query(linkQuery);
1504  auto numRows = sqliteConnector_.getNumRows();
1505  for (size_t r = 0; r < numRows; ++r) {
1506  auto ld = new LinkDescriptor();
1507  ld->linkId = sqliteConnector_.getData<int>(r, 0);
1508  ld->userId = sqliteConnector_.getData<int>(r, 1);
1509  ld->link = sqliteConnector_.getData<string>(r, 2);
1510  ld->viewState = sqliteConnector_.getData<string>(r, 3);
1511  ld->updateTime = sqliteConnector_.getData<string>(r, 4);
1512  ld->viewMetadata = sqliteConnector_.getData<string>(r, 5);
1513  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld->link] = ld;
1514  linkDescriptorMapById_[ld->linkId] = ld;
1515  }
1516 }
1517 
1518 void Catalog::buildLogicalToPhysicalMapUnlocked() {
1519  /* rebuild map linking logical tables to corresponding physical ones */
1520  std::string logicalToPhysicalTableMapQuery(
1521  "SELECT logical_table_id, physical_table_id "
1522  "FROM mapd_logical_to_physical");
1523  sqliteConnector_.query(logicalToPhysicalTableMapQuery);
1524  auto numRows = sqliteConnector_.getNumRows();
1525  for (size_t r = 0; r < numRows; ++r) {
1526  auto logical_tb_id = sqliteConnector_.getData<int>(r, 0);
1527  auto physical_tb_id = sqliteConnector_.getData<int>(r, 1);
1528  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(logical_tb_id);
1529  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
1530  /* add new entity to the map logicalToPhysicalTableMapById_ */
1531  std::vector<int32_t> physicalTables{physical_tb_id};
1532  const auto it_ok =
1533  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
1534  CHECK(it_ok.second);
1535  } else {
1536  /* update map logicalToPhysicalTableMapById_ */
1537  physicalTableIt->second.push_back(physical_tb_id);
1538  }
1539  }
1540 }
1541 
1542 // The catalog uses a series of maps to cache data that have been read from the sqlite
1543 // tables. Usually we update these maps whenever we write using sqlite, so this function
1544 // is responsible for initializing all of them based on the sqlite db state.
1545 void Catalog::buildMaps() {
1546  // Get all user id to username mapping here in order to avoid making a call to
1547  // SysCatalog (and attempting to acquire SysCatalog locks) while holding locks for this
1548  // catalog.
1549  const auto user_name_by_user_id = get_user_id_to_user_name_map();
1550 
1551  cat_write_lock write_lock(this);
1553 
1554  buildDictionaryMapUnlocked();
1555  buildTablesMapUnlocked();
1556 
1557  if (g_enable_fsi) {
1558  buildForeignServerMapUnlocked();
1559  updateForeignTablesInMapUnlocked();
1560  }
1561 
1562  buildColumnsMapUnlocked();
1563  updateViewsInMapUnlocked();
1564  buildDashboardsMapUnlocked(user_name_by_user_id);
1565  buildLinksMapUnlocked();
1566  buildLogicalToPhysicalMapUnlocked();
1567  buildCustomExpressionsMapUnlocked();
1568 }
1569 
1570 void Catalog::buildCustomExpressionsMapUnlocked() {
1571  sqliteConnector_.query(
1572  "SELECT id, name, expression_json, data_source_type, data_source_id, "
1573  "is_deleted "
1574  "FROM omnisci_custom_expressions");
1575  auto num_rows = sqliteConnector_.getNumRows();
1576  for (size_t row = 0; row < num_rows; row++) {
1577  auto custom_expr = getCustomExpressionFromConnector(row);
1578  custom_expr_map_by_id_[custom_expr->id] = std::move(custom_expr);
1579  }
1580 }
1581 
1582 std::unique_ptr<CustomExpression> Catalog::getCustomExpressionFromConnector(size_t row) {
1583  auto id = sqliteConnector_.getData<int>(row, 0);
1584  auto name = sqliteConnector_.getData<string>(row, 1);
1585  auto expression_json = sqliteConnector_.getData<string>(row, 2);
1586  auto data_source_type_str = sqliteConnector_.getData<string>(row, 3);
1587  auto data_source_id = sqliteConnector_.getData<int>(row, 4);
1588  auto is_deleted = sqliteConnector_.getData<bool>(row, 5);
1589  return std::make_unique<CustomExpression>(
1590  id,
1591  name,
1592  expression_json,
1593  CustomExpression::dataSourceTypeFromString(data_source_type_str),
1594  data_source_id,
1595  is_deleted);
1596 }
1597 
1598 void Catalog::addTableToMap(const TableDescriptor* td,
1599  const list<ColumnDescriptor>& columns,
1600  const list<DictDescriptor>& dicts) {
1601  cat_write_lock write_lock(this);
1602  TableDescriptor* new_td;
1603 
1604  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
1605  if (foreign_table) {
1606  auto new_foreign_table = new foreign_storage::ForeignTable();
1607  *new_foreign_table = *foreign_table;
1608  new_td = new_foreign_table;
1609  } else {
1610  new_td = new TableDescriptor();
1611  *new_td = *td;
1612  }
1613 
1614  new_td->mutex_ = std::make_shared<std::mutex>();
1615  tableDescriptorMap_[to_upper(td->tableName)] = new_td;
1616  tableDescriptorMapById_[td->tableId] = new_td;
1617  for (auto cd : columns) {
1618  ColumnDescriptor* new_cd = new ColumnDescriptor();
1619  *new_cd = cd;
1620  addToColumnMap(new_cd);
1621 
1622  // Add deleted column to the map
1623  if (cd.isDeletedCol) {
1624  CHECK(new_td->hasDeletedCol);
1625  setDeletedColumnUnlocked(new_td, new_cd);
1626  }
1627  }
1628 
1629  std::sort(new_td->columnIdBySpi_.begin(),
1630  new_td->columnIdBySpi_.end(),
1631  [](const size_t a, const size_t b) -> bool { return a < b; });
1632  // TODO(sy): Why does addTableToMap() sort columnIdBySpi_ but not insert into it while
1633  // buildColumnsMapUnlocked() does both?
1634 
1635  std::unique_ptr<StringDictionaryClient> client;
1636  DictRef dict_ref(currentDB_.dbId, -1);
1637  if (!string_dict_hosts_.empty()) {
1638  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1639  }
1640  for (auto dd : dicts) {
1641  if (!dd.dictRef.dictId) {
1642  // Dummy entry created for a shard of a logical table, nothing to do.
1643  continue;
1644  }
1645  dict_ref.dictId = dd.dictRef.dictId;
1646  if (client) {
1647  client->create(dict_ref, dd.dictIsTemp);
1648  }
1649  DictDescriptor* new_dd = new DictDescriptor(dd);
1650  dictDescriptorMapByRef_[dict_ref].reset(new_dd);
1651  if (!dd.dictIsTemp) {
1652  boost::filesystem::create_directory(new_dd->dictFolderPath);
1653  }
1654  }
1655 }
1656 
1657 void Catalog::removeTableFromMap(const string& tableName,
1658  const int tableId,
1659  const bool is_on_error) {
1660  cat_write_lock write_lock(this);
1661  TableDescriptorMapById::iterator tableDescIt = tableDescriptorMapById_.find(tableId);
1662  if (tableDescIt == tableDescriptorMapById_.end()) {
1663  throw runtime_error("Table " + tableName + " does not exist.");
1664  }
1665 
1666  TableDescriptor* td = tableDescIt->second;
1667 
1668  if (td->hasDeletedCol) {
1669  const auto ret = deletedColumnPerTable_.erase(td);
1670  CHECK_EQ(ret, size_t(1));
1671  }
1672 
1673  tableDescriptorMapById_.erase(tableDescIt);
1674  tableDescriptorMap_.erase(to_upper(tableName));
1675  td->fragmenter = nullptr;
1676  dict_columns_by_table_id_.erase(tableId);
1677 
1679  delete td;
1680 
1681  std::unique_ptr<StringDictionaryClient> client;
1682  if (SysCatalog::instance().isAggregator()) {
1683  CHECK(!string_dict_hosts_.empty());
1684  DictRef dict_ref(currentDB_.dbId, -1);
1685  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
1686  }
1687 
1688  // delete all column descriptors for the table
1689  // no more link columnIds to sequential indexes!
1690  for (auto cit = columnDescriptorMapById_.begin();
1691  cit != columnDescriptorMapById_.end();) {
1692  if (tableId != std::get<0>(cit->first)) {
1693  ++cit;
1694  } else {
1695  int i = std::get<1>(cit++->first);
1696  ColumnIdKey cidKey(tableId, i);
1697  ColumnDescriptorMapById::iterator colDescIt = columnDescriptorMapById_.find(cidKey);
1698  ColumnDescriptor* cd = colDescIt->second;
1699  columnDescriptorMapById_.erase(colDescIt);
1700  ColumnKey cnameKey(tableId, to_upper(cd->columnName));
1701  columnDescriptorMap_.erase(cnameKey);
1702  const int dictId = cd->columnType.get_comp_param();
1703  // Dummy dictionaries created for a shard of a logical table have the id set to
1704  // zero.
1705  if (cd->columnType.get_compression() == kENCODING_DICT && dictId) {
1706  INJECT_TIMER(removingDicts);
1707  DictRef dict_ref(currentDB_.dbId, dictId);
1708  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
1709  // If we're removing this table due to an error, it is possible that the string
1710  // dictionary reference was never populated. Don't crash, just continue cleaning
1711  // up the TableDescriptor and ColumnDescriptors
1712  if (!is_on_error) {
1713  CHECK(dictIt != dictDescriptorMapByRef_.end());
1714  } else {
1715  if (dictIt == dictDescriptorMapByRef_.end()) {
1716  continue;
1717  }
1718  }
1719  const auto& dd = dictIt->second;
1720  CHECK_GE(dd->refcount, 1);
1721  --dd->refcount;
1722  if (!dd->refcount) {
1723  dd->stringDict.reset();
1724  if (!isTemp) {
1725  File_Namespace::renameForDelete(dd->dictFolderPath);
1726  }
1727  if (client) {
1728  client->drop(dict_ref);
1729  }
1730  dictDescriptorMapByRef_.erase(dictIt);
1731  }
1732  }
1733 
1734  delete cd;
1735  }
1736  }
1737 }
1738 
1739 void Catalog::addFrontendViewToMap(DashboardDescriptor& vd) {
1740  cat_write_lock write_lock(this);
1741  addFrontendViewToMapNoLock(vd);
1742 }
1743 
1744 void Catalog::addFrontendViewToMapNoLock(DashboardDescriptor& vd) {
1745  cat_write_lock write_lock(this);
1746  dashboardDescriptorMap_[std::to_string(vd.userId) + ":" + vd.dashboardName] =
1747  std::make_shared<DashboardDescriptor>(vd);
1748 }
1749 
1750 std::vector<DBObject> Catalog::parseDashboardObjects(const std::string& view_meta,
1751  const int& user_id) {
1752  std::vector<DBObject> objects;
1753  DBObjectKey key;
1754  key.dbId = currentDB_.dbId;
1755  auto _key_place = [&key](auto type, auto id) {
1756  key.permissionType = type;
1757  key.objectId = id;
1758  return key;
1759  };
1760  for (auto object_name : parse_underlying_dashboard_objects(view_meta)) {
1761  auto td = getMetadataForTable(object_name, false);
1762  if (!td) {
1763  // Parsed object source is not present in current database
1764  // LOG the info and ignore
1765  LOG(INFO) << "Ignoring dashboard source Table/View: " << object_name
1766  << " no longer exists in current DB.";
1767  continue;
1768  }
1769  // Dashboard source can be Table or View
1770  const auto object_type = td->isView ? ViewDBObjectType : TableDBObjectType;
1771  const auto priv = td->isView ? AccessPrivileges::SELECT_FROM_VIEW
1773  objects.emplace_back(_key_place(object_type, td->tableId), priv, user_id);
1774  objects.back().setObjectType(td->isView ? ViewDBObjectType : TableDBObjectType);
1775  objects.back().setName(td->tableName);
1776  }
1777  return objects;
1778 }
1779 
1780 void Catalog::createOrUpdateDashboardSystemRole(const std::string& view_meta,
1781  const int32_t& user_id,
1782  const std::string& dash_role_name) {
1783  auto objects = parseDashboardObjects(view_meta, user_id);
1784  Role* rl = SysCatalog::instance().getRoleGrantee(dash_role_name);
1785  if (!rl) {
1786  // Dashboard role does not exist
1787  // create role and grant privileges
1788  // NOTE(wamsi): Transactionally unsafe
1789  SysCatalog::instance().createRole(
1790  dash_role_name, /*user_private_role=*/false, /*is_temporary=*/false);
1791  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1792  } else {
1793  // Dashboard system role already exists
1794  // Add/remove privileges on objects
1795  std::set<DBObjectKey> revoke_keys;
1796  auto ex_objects = rl->getDbObjects(true);
1797  for (auto key : *ex_objects | boost::adaptors::map_keys) {
1798  if (key.permissionType != TableDBObjectType &&
1799  key.permissionType != ViewDBObjectType) {
1800  continue;
1801  }
1802  bool found = false;
1803  for (auto obj : objects) {
1804  found = key == obj.getObjectKey() ? true : false;
1805  if (found) {
1806  break;
1807  }
1808  }
1809  if (!found) {
1810  revoke_keys.insert(key);
1811  }
1812  }
1813  for (auto& key : revoke_keys) {
1814  // revoke privs on object since the object is no
1815  // longer used by the dashboard as source
1816  // NOTE(wamsi): Transactionally unsafe
1817  SysCatalog::instance().revokeDBObjectPrivileges(
1818  dash_role_name, *rl->findDbObject(key, true), *this);
1819  }
1820  // Update privileges on remaining objects
1821  // NOTE(wamsi): Transactionally unsafe
1822  SysCatalog::instance().grantDBObjectPrivilegesBatch({dash_role_name}, objects, *this);
1823  }
1824 }
1825 
1826 void Catalog::addLinkToMap(LinkDescriptor& ld) {
1827  cat_write_lock write_lock(this);
1828  LinkDescriptor* new_ld = new LinkDescriptor();
1829  *new_ld = ld;
1830  linkDescriptorMap_[std::to_string(currentDB_.dbId) + ld.link] = new_ld;
1831  linkDescriptorMapById_[ld.linkId] = new_ld;
1832 }
1833 
1834 void Catalog::instantiateFragmenter(TableDescriptor* td) const {
1835  auto time_ms = measure<>::execution([&]() {
1836  // instanciate table fragmenter upon first use
1837  // assume only insert order fragmenter is supported
1839  vector<Chunk> chunkVec;
1840  auto columnDescs = getAllColumnMetadataForTable(td->tableId, true, false, true);
1841  Chunk::translateColumnDescriptorsToChunkVec(columnDescs, chunkVec);
1842  ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId};
1843  if (td->sortedColumnId > 0) {
1844  td->fragmenter = std::make_shared<SortedOrderFragmenter>(chunkKeyPrefix,
1845  chunkVec,
1846  dataMgr_.get(),
1847  const_cast<Catalog*>(this),
1848  td->tableId,
1849  td->shard,
1850  td->maxFragRows,
1851  td->maxChunkSize,
1852  td->fragPageSize,
1853  td->maxRows,
1854  td->persistenceLevel);
1855  } else {
1856  td->fragmenter = std::make_shared<InsertOrderFragmenter>(chunkKeyPrefix,
1857  chunkVec,
1858  dataMgr_.get(),
1859  const_cast<Catalog*>(this),
1860  td->tableId,
1861  td->shard,
1862  td->maxFragRows,
1863  td->maxChunkSize,
1864  td->fragPageSize,
1865  td->maxRows,
1866  td->persistenceLevel,
1867  !td->storageType.empty());
1868  }
1869  });
1870  LOG(INFO) << "Instantiating Fragmenter for table " << td->tableName << " took "
1871  << time_ms << "ms";
1872 }
1873 
1874 foreign_storage::ForeignTable* Catalog::getForeignTableUnlocked(
1875  const std::string& tableName) const {
1876  auto tableDescIt = tableDescriptorMap_.find(to_upper(tableName));
1877  if (tableDescIt == tableDescriptorMap_.end()) { // check to make sure table exists
1878  return nullptr;
1879  }
1880  return dynamic_cast<foreign_storage::ForeignTable*>(tableDescIt->second);
1881 }
1882 
1883 const foreign_storage::ForeignTable* Catalog::getForeignTable(
1884  const std::string& tableName) const {
1885  cat_read_lock read_lock(this);
1886  return getForeignTableUnlocked(tableName);
1887 }
1888 
1889 const TableDescriptor* Catalog::getMetadataForTable(const string& tableName,
1890  const bool populateFragmenter) const {
1891  // we give option not to populate fragmenter (default true/yes) as it can be heavy for
1892  // pure metadata calls
1893  cat_read_lock read_lock(this);
1894  auto td = getMutableMetadataForTableUnlocked(tableName);
1895  if (!td) {
1896  return nullptr;
1897  }
1898  read_lock.unlock();
1899  if (populateFragmenter) {
1900  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1901  if (td->fragmenter == nullptr && !td->isView) {
1902  instantiateFragmenter(td);
1903  }
1904  }
1905  return td; // returns pointer to table descriptor
1906 }
1907 
1908 const TableDescriptor* Catalog::getMetadataForTable(int table_id,
1909  bool populateFragmenter) const {
1910  cat_read_lock read_lock(this);
1911  auto td = getMutableMetadataForTableUnlocked(table_id);
1912  if (!td) {
1913  return nullptr;
1914  }
1915  read_lock.unlock();
1916  if (populateFragmenter) {
1917  std::unique_lock<std::mutex> td_lock(*td->mutex_.get());
1918  if (td->fragmenter == nullptr && !td->isView) {
1919  instantiateFragmenter(td);
1920  }
1921  }
1922  return td;
1923 }
1924 
1925 std::optional<std::string> Catalog::getTableName(int32_t table_id) const {
1926  cat_read_lock read_lock(this);
1927  auto td = getMutableMetadataForTableUnlocked(table_id);
1928  if (!td) {
1929  return {};
1930  }
1931  return td->tableName;
1932 }
1933 
1934 std::optional<int32_t> Catalog::getTableId(const std::string& table_name) const {
1935  cat_read_lock read_lock(this);
1936  auto td = getMutableMetadataForTableUnlocked(table_name);
1937  if (!td) {
1938  return {};
1939  }
1940  return td->tableId;
1941 }
1942 
1943 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(
1944  const std::string& table_name) const {
1945  auto it = tableDescriptorMap_.find(to_upper(table_name));
1946  if (it == tableDescriptorMap_.end()) {
1947  return nullptr;
1948  }
1949  return it->second;
1950 }
1951 
1952 TableDescriptor* Catalog::getMutableMetadataForTableUnlocked(int table_id) const {
1953  auto tableDescIt = tableDescriptorMapById_.find(table_id);
1954  if (tableDescIt == tableDescriptorMapById_.end()) { // check to make sure table exists
1955  return nullptr;
1956  }
1957  return tableDescIt->second;
1958 }
1959 
1960 const DictDescriptor* Catalog::getMetadataForDict(const int dict_id,
1961  const bool load_dict) const {
1962  cat_read_lock read_lock(this);
1963  const DictRef dictRef(currentDB_.dbId, dict_id);
1964  auto dictDescIt = dictDescriptorMapByRef_.find(dictRef);
1965  if (dictDescIt ==
1966  dictDescriptorMapByRef_.end()) { // check to make sure dictionary exists
1967  return nullptr;
1968  }
1969  auto& dd = dictDescIt->second;
1970 
1971  if (load_dict) {
1972  std::lock_guard string_dict_lock(*dd->string_dict_mutex);
1973  if (!dd->stringDict) {
1974  auto time_ms = measure<>::execution([&]() {
1975  if (string_dict_hosts_.empty()) {
1976  if (dd->dictIsTemp) {
1977  dd->stringDict = std::make_shared<StringDictionary>(
1978  dd->dictRef, dd->dictFolderPath, true, true, g_cache_string_hash);
1979  } else {
1980  dd->stringDict = std::make_shared<StringDictionary>(
1981  dd->dictRef, dd->dictFolderPath, false, true, g_cache_string_hash);
1982  }
1983  } else {
1984  dd->stringDict =
1985  std::make_shared<StringDictionary>(string_dict_hosts_.front(), dd->dictRef);
1986  }
1987  });
1988  LOG(INFO) << "Time to load Dictionary " << dd->dictRef.dbId << "_"
1989  << dd->dictRef.dictId << " was " << time_ms << "ms";
1990  }
1991  }
1992 
1993  return dd.get();
1994 }
1995 
1996 const std::vector<LeafHostInfo>& Catalog::getStringDictionaryHosts() const {
1997  return string_dict_hosts_;
1998 }
1999 
2000 const ColumnDescriptor* Catalog::getMetadataForColumn(int tableId,
2001  const string& columnName) const {
2002  cat_read_lock read_lock(this);
2003 
2004  ColumnKey columnKey(tableId, to_upper(columnName));
2005  auto colDescIt = columnDescriptorMap_.find(columnKey);
2006  if (colDescIt ==
2007  columnDescriptorMap_.end()) { // need to check to make sure column exists for table
2008  return nullptr;
2009  }
2010  return colDescIt->second;
2011 }
2012 
2013 const ColumnDescriptor* Catalog::getMetadataForColumn(int table_id, int column_id) const {
2014  cat_read_lock read_lock(this);
2015  ColumnIdKey columnIdKey(table_id, column_id);
2016  auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2017  if (colDescIt == columnDescriptorMapById_
2018  .end()) { // need to check to make sure column exists for table
2019  return nullptr;
2020  }
2021  return colDescIt->second;
2022 }
2023 
2024 const std::optional<std::string> Catalog::getColumnName(int table_id,
2025  int column_id) const {
2026  cat_read_lock read_lock(this);
2027  auto it = columnDescriptorMapById_.find(ColumnIdKey{table_id, column_id});
2028  if (it == columnDescriptorMapById_.end()) {
2029  return {};
2030  }
2031  return it->second->columnName;
2032 }
2033 
2034 const int Catalog::getColumnIdBySpiUnlocked(const int table_id, const size_t spi) const {
2035  const auto tabDescIt = tableDescriptorMapById_.find(table_id);
2036  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2037  const auto& columnIdBySpi = tabDescIt->second->columnIdBySpi_;
2038 
2039  auto spx = spi;
2040  int phi = 0;
2041  if (spx >= SPIMAP_MAGIC1) // see Catalog.h
2042  {
2043  phi = (spx - SPIMAP_MAGIC1) % SPIMAP_MAGIC2;
2044  spx = (spx - SPIMAP_MAGIC1) / SPIMAP_MAGIC2;
2045  }
2046 
2047  CHECK(0 < spx && spx <= columnIdBySpi.size())
2048  << "spx = " << spx << ", size = " << columnIdBySpi.size();
2049  return columnIdBySpi[spx - 1] + phi;
2050 }
2051 
2052 const int Catalog::getColumnIdBySpi(const int table_id, const size_t spi) const {
2053  cat_read_lock read_lock(this);
2054  return getColumnIdBySpiUnlocked(table_id, spi);
2055 }
2056 
2057 const ColumnDescriptor* Catalog::getMetadataForColumnBySpi(const int tableId,
2058  const size_t spi) const {
2059  cat_read_lock read_lock(this);
2060 
2061  const auto columnId = getColumnIdBySpiUnlocked(tableId, spi);
2062  ColumnIdKey columnIdKey(tableId, columnId);
2063  const auto colDescIt = columnDescriptorMapById_.find(columnIdKey);
2064  return columnDescriptorMapById_.end() == colDescIt ? nullptr : colDescIt->second;
2065 }
2066 
2067 void Catalog::deleteMetadataForDashboards(const std::vector<int32_t> dashboard_ids,
2068  const UserMetadata& user) {
2069  std::stringstream invalid_ids, restricted_ids;
2070 
2071  for (int32_t dashboard_id : dashboard_ids) {
2072  if (!getMetadataForDashboard(dashboard_id)) {
2073  invalid_ids << (!invalid_ids.str().empty() ? ", " : "") << dashboard_id;
2074  continue;
2075  }
2076  DBObject object(dashboard_id, DashboardDBObjectType);
2077  object.loadKey(*this);
2078  object.setPrivileges(AccessPrivileges::DELETE_DASHBOARD);
2079  std::vector<DBObject> privs = {object};
2080  if (!SysCatalog::instance().checkPrivileges(user, privs)) {
2081  restricted_ids << (!restricted_ids.str().empty() ? ", " : "") << dashboard_id;
2082  }
2083  }
2084 
2085  if (invalid_ids.str().size() > 0 || restricted_ids.str().size() > 0) {
2086  std::stringstream error_message;
2087  error_message << "Delete dashboard(s) failed with error(s):";
2088  if (invalid_ids.str().size() > 0) {
2089  error_message << "\nDashboard id: " << invalid_ids.str()
2090  << " - Dashboard id does not exist";
2091  }
2092  if (restricted_ids.str().size() > 0) {
2093  error_message
2094  << "\nDashboard id: " << restricted_ids.str()
2095  << " - User should be either owner of dashboard or super user to delete it";
2096  }
2097  throw std::runtime_error(error_message.str());
2098  }
2099  std::vector<DBObject> dash_objs;
2100 
2101  for (int32_t dashboard_id : dashboard_ids) {
2102  dash_objs.emplace_back(dashboard_id, DashboardDBObjectType);
2103  }
2104  // BE-5245: Transactionally unsafe (like other combined Catalog/Syscatalog operations)
2105  SysCatalog::instance().revokeDBObjectPrivilegesFromAllBatch(dash_objs, this);
2106  {
2107  cat_write_lock write_lock(this);
2109 
2110  sqliteConnector_.query("BEGIN TRANSACTION");
2111  try {
2112  for (int32_t dashboard_id : dashboard_ids) {
2113  auto dash = getMetadataForDashboard(dashboard_id);
2114  // Dash should still exist if revokeDBObjectPrivileges passed but throw and
2115  // rollback if already deleted
2116  if (!dash) {
2117  throw std::runtime_error(
2118  std::string("Delete dashboard(s) failed with error(s):\nDashboard id: ") +
2119  std::to_string(dashboard_id) + " - Dashboard id does not exist ");
2120  }
2121  std::string user_id = std::to_string(dash->userId);
2122  std::string dash_name = dash->dashboardName;
2123  auto viewDescIt = dashboardDescriptorMap_.find(user_id + ":" + dash_name);
2124  dashboardDescriptorMap_.erase(viewDescIt);
2125  sqliteConnector_.query_with_text_params(
2126  "DELETE FROM mapd_dashboards WHERE name = ? and userid = ?",
2127  std::vector<std::string>{dash_name, user_id});
2128  }
2129  } catch (std::exception& e) {
2130  sqliteConnector_.query("ROLLBACK TRANSACTION");
2131  throw;
2132  }
2133  sqliteConnector_.query("END TRANSACTION");
2134  }
2135 }
2136 
2137 const DashboardDescriptor* Catalog::getMetadataForDashboard(
2138  const string& userId,
2139  const string& dashName) const {
2140  cat_read_lock read_lock(this);
2141 
2142  auto viewDescIt = dashboardDescriptorMap_.find(userId + ":" + dashName);
2143  if (viewDescIt == dashboardDescriptorMap_.end()) { // check to make sure view exists
2144  return nullptr;
2145  }
2146  return viewDescIt->second.get(); // returns pointer to view descriptor
2147 }
2148 
2149 const DashboardDescriptor* Catalog::getMetadataForDashboard(const int32_t id) const {
2150  cat_read_lock read_lock(this);
2151  std::string userId;
2152  std::string name;
2153  bool found{false};
2154  {
2155  for (auto descp : dashboardDescriptorMap_) {
2156  auto dash = descp.second.get();
2157  if (dash->dashboardId == id) {
2158  userId = std::to_string(dash->userId);
2159  name = dash->dashboardName;
2160  found = true;
2161  break;
2162  }
2163  }
2164  }
2165  if (found) {
2166  return getMetadataForDashboard(userId, name);
2167  }
2168  return nullptr;
2169 }
2170 
2171 const LinkDescriptor* Catalog::getMetadataForLink(const string& link) const {
2172  cat_read_lock read_lock(this);
2173  auto linkDescIt = linkDescriptorMap_.find(link);
2174  if (linkDescIt == linkDescriptorMap_.end()) { // check to make sure view exists
2175  return nullptr;
2176  }
2177  return linkDescIt->second; // returns pointer to view descriptor
2178 }
2179 
2180 const LinkDescriptor* Catalog::getMetadataForLink(int linkId) const {
2181  cat_read_lock read_lock(this);
2182  auto linkDescIt = linkDescriptorMapById_.find(linkId);
2183  if (linkDescIt == linkDescriptorMapById_.end()) { // check to make sure view exists
2184  return nullptr;
2185  }
2186  return linkDescIt->second;
2187 }
2188 
2189 const foreign_storage::ForeignTable* Catalog::getForeignTable(int table_id) const {
2190  cat_read_lock read_lock(this);
2191  const auto table = getMutableMetadataForTableUnlocked(table_id);
2192  CHECK(table);
2193  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(table);
2194  CHECK(foreign_table);
2195  return foreign_table;
2196 }
2197 
2198 void Catalog::getAllColumnMetadataForTableImpl(
2199  const TableDescriptor* td,
2200  list<const ColumnDescriptor*>& columnDescriptors,
2201  const bool fetchSystemColumns,
2202  const bool fetchVirtualColumns,
2203  const bool fetchPhysicalColumns) const {
2204  int32_t skip_physical_cols = 0;
2205  for (const auto& columnDescriptor : columnDescriptorMapById_) {
2206  if (!fetchPhysicalColumns && skip_physical_cols > 0) {
2207  --skip_physical_cols;
2208  continue;
2209  }
2210  auto cd = columnDescriptor.second;
2211  if (cd->tableId != td->tableId) {
2212  continue;
2213  }
2214  if (!fetchSystemColumns && cd->isSystemCol) {
2215  continue;
2216  }
2217  if (!fetchVirtualColumns && cd->isVirtualCol) {
2218  continue;
2219  }
2220  if (!fetchPhysicalColumns) {
2221  const auto& col_ti = cd->columnType;
2222  skip_physical_cols = col_ti.get_physical_cols();
2223  }
2224  columnDescriptors.push_back(cd);
2225  }
2226 }
2227 
2228 std::list<const ColumnDescriptor*> Catalog::getAllColumnMetadataForTable(
2229  const int tableId,
2230  const bool fetchSystemColumns,
2231  const bool fetchVirtualColumns,
2232  const bool fetchPhysicalColumns) const {
2233  cat_read_lock read_lock(this);
2234  std::list<const ColumnDescriptor*> columnDescriptors;
2235  const TableDescriptor* td = getMutableMetadataForTableUnlocked(tableId);
2236  getAllColumnMetadataForTableImpl(td,
2237  columnDescriptors,
2238  fetchSystemColumns,
2239  fetchVirtualColumns,
2240  fetchPhysicalColumns);
2241  return columnDescriptors;
2242 }
2243 
2244 list<const TableDescriptor*> Catalog::getAllTableMetadata() const {
2245  cat_read_lock read_lock(this);
2246  list<const TableDescriptor*> table_list;
2247  for (auto p : tableDescriptorMapById_) {
2248  table_list.push_back(p.second);
2249  }
2250  return table_list;
2251 }
2252 
2253 std::vector<TableDescriptor> Catalog::getAllTableMetadataCopy() const {
2254  cat_read_lock read_lock(this);
2255  std::vector<TableDescriptor> tables;
2256  tables.reserve(tableDescriptorMapById_.size());
2257  for (auto table_entry : tableDescriptorMapById_) {
2258  tables.emplace_back(*table_entry.second);
2259  tables.back().fragmenter = nullptr;
2260  }
2261  return tables;
2262 }
2263 
2264 list<const DashboardDescriptor*> Catalog::getAllDashboardsMetadata() const {
2265  cat_read_lock read_lock(this);
2266  list<const DashboardDescriptor*> dashboards;
2267  for (auto dashboard_entry : dashboardDescriptorMap_) {
2268  dashboards.push_back(dashboard_entry.second.get());
2269  }
2270  return dashboards;
2271 }
2272 
2273 std::vector<DashboardDescriptor> Catalog::getAllDashboardsMetadataCopy() const {
2274  cat_read_lock read_lock(this);
2275  std::vector<DashboardDescriptor> dashboards;
2276  dashboards.reserve(dashboardDescriptorMap_.size());
2277  for (auto dashboard_entry : dashboardDescriptorMap_) {
2278  dashboards.emplace_back(*dashboard_entry.second);
2279  }
2280  return dashboards;
2281 }
2282 
2283 DictRef Catalog::addDictionary(ColumnDescriptor& cd) {
2284  cat_write_lock write_lock(this);
2285  const auto& td = *tableDescriptorMapById_[cd.tableId];
2286  list<DictDescriptor> dds;
2287  setColumnDictionary(cd, dds, td, true);
2288  auto& dd = dds.back();
2289  CHECK(dd.dictRef.dictId);
2290 
2291  std::unique_ptr<StringDictionaryClient> client;
2292  if (!string_dict_hosts_.empty()) {
2293  client.reset(new StringDictionaryClient(
2294  string_dict_hosts_.front(), DictRef(currentDB_.dbId, -1), true));
2295  }
2296  if (client) {
2297  client->create(dd.dictRef, dd.dictIsTemp);
2298  }
2299 
2300  DictDescriptor* new_dd = new DictDescriptor(dd);
2301  dictDescriptorMapByRef_[dd.dictRef].reset(new_dd);
2302  if (!dd.dictIsTemp) {
2303  boost::filesystem::create_directory(new_dd->dictFolderPath);
2304  }
2305  return dd.dictRef;
2306 }
2307 
2308 void Catalog::delDictionary(const ColumnDescriptor& cd) {
2309  cat_write_lock write_lock(this);
2311  if (!(cd.columnType.is_string() || cd.columnType.is_string_array())) {
2312  return;
2313  }
2314  if (!(cd.columnType.get_compression() == kENCODING_DICT)) {
2315  return;
2316  }
2317  const auto dictId = cd.columnType.get_comp_param();
2318  CHECK_GT(dictId, 0);
2319  // decrement and zero check dict ref count
2320  const auto td = getMetadataForTable(cd.tableId, false);
2321  CHECK(td);
2322  sqliteConnector_.query_with_text_param(
2323  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
2324  std::to_string(dictId));
2325  sqliteConnector_.query_with_text_param(
2326  "SELECT refcount FROM mapd_dictionaries WHERE dictid = ?", std::to_string(dictId));
2327  const auto refcount = sqliteConnector_.getData<int>(0, 0);
2328  VLOG(3) << "Dictionary " << dictId << "from dropped table has reference count "
2329  << refcount;
2330  if (refcount > 0) {
2331  return;
2332  }
2333  const DictRef dictRef(currentDB_.dbId, dictId);
2334  sqliteConnector_.query_with_text_param("DELETE FROM mapd_dictionaries WHERE dictid = ?",
2335  std::to_string(dictId));
2337  "/DB_" + std::to_string(currentDB_.dbId) + "_DICT_" +
2338  std::to_string(dictId));
2339 
2340  std::unique_ptr<StringDictionaryClient> client;
2341  if (!string_dict_hosts_.empty()) {
2342  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dictRef, true));
2343  }
2344  if (client) {
2345  client->drop(dictRef);
2346  }
2347 
2348  dictDescriptorMapByRef_.erase(dictRef);
2349 }
2350 
2351 void Catalog::getDictionary(const ColumnDescriptor& cd,
2352  std::map<int, StringDictionary*>& stringDicts) {
2353  // learn 'committed' ColumnDescriptor of this column
2354  auto cit = columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2355  CHECK(cit != columnDescriptorMap_.end());
2356  auto& ccd = *cit->second;
2357 
2358  if (!(ccd.columnType.is_string() || ccd.columnType.is_string_array())) {
2359  return;
2360  }
2361  if (!(ccd.columnType.get_compression() == kENCODING_DICT)) {
2362  return;
2363  }
2364  if (!(ccd.columnType.get_comp_param() > 0)) {
2365  return;
2366  }
2367 
2368  auto dictId = ccd.columnType.get_comp_param();
2369  getMetadataForDict(dictId);
2370 
2371  const DictRef dictRef(currentDB_.dbId, dictId);
2372  auto dit = dictDescriptorMapByRef_.find(dictRef);
2373  CHECK(dit != dictDescriptorMapByRef_.end());
2374  CHECK(dit->second);
2375  CHECK(dit->second.get()->stringDict);
2376  stringDicts[ccd.columnId] = dit->second.get()->stringDict.get();
2377 }
2378 
2379 void Catalog::addColumn(const TableDescriptor& td, ColumnDescriptor& cd) {
2380  // caller must handle sqlite/chunk transaction TOGETHER
2381  cd.tableId = td.tableId;
2382  if (td.nShards > 0 && td.shard < 0) {
2383  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2384  auto shard_cd = cd;
2385  addColumn(*shard, shard_cd);
2386  }
2387  }
2389  addDictionary(cd);
2390  }
2391 
2392  using BindType = SqliteConnector::BindType;
2393  std::vector<BindType> types(17, BindType::TEXT);
2394  if (!cd.default_value.has_value()) {
2395  types[16] = BindType::NULL_TYPE;
2396  }
2397  sqliteConnector_.query_with_text_params(
2398  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, coldim, "
2399  "colscale, is_notnull, "
2400  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, virtual_expr, "
2401  "is_deletedcol, default_value) "
2402  "VALUES (?, "
2403  "(SELECT max(columnid) + 1 FROM mapd_columns WHERE tableid = ?), "
2404  "?, ?, ?, "
2405  "?, "
2406  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2407  std::vector<std::string>{std::to_string(td.tableId),
2408  std::to_string(td.tableId),
2409  cd.columnName,
2418  "",
2421  cd.virtualExpr,
2423  cd.default_value.value_or("NULL")},
2424  types);
2425 
2426  sqliteConnector_.query_with_text_params(
2427  "UPDATE mapd_tables SET ncolumns = ncolumns + 1 WHERE tableid = ?",
2428  std::vector<std::string>{std::to_string(td.tableId)});
2429 
2430  sqliteConnector_.query_with_text_params(
2431  "SELECT columnid FROM mapd_columns WHERE tableid = ? AND name = ?",
2432  std::vector<std::string>{std::to_string(td.tableId), cd.columnName});
2433  cd.columnId = sqliteConnector_.getData<int>(0, 0);
2434 
2435  ++tableDescriptorMapById_[td.tableId]->nColumns;
2436  auto ncd = new ColumnDescriptor(cd);
2437  addToColumnMap(ncd);
2438  columnDescriptorsForRoll.emplace_back(nullptr, ncd);
2439 }
2440 
2441 void Catalog::dropColumn(const TableDescriptor& td, const ColumnDescriptor& cd) {
2442  {
2443  cat_write_lock write_lock(this);
2445  // caller must handle sqlite/chunk transaction TOGETHER
2446  sqliteConnector_.query_with_text_params(
2447  "DELETE FROM mapd_columns where tableid = ? and columnid = ?",
2448  std::vector<std::string>{std::to_string(td.tableId),
2449  std::to_string(cd.columnId)});
2450 
2451  sqliteConnector_.query_with_text_params(
2452  "UPDATE mapd_tables SET ncolumns = ncolumns - 1 WHERE tableid = ?",
2453  std::vector<std::string>{std::to_string(td.tableId)});
2454 
2455  ColumnDescriptorMap::iterator columnDescIt =
2456  columnDescriptorMap_.find(ColumnKey(cd.tableId, to_upper(cd.columnName)));
2457  CHECK(columnDescIt != columnDescriptorMap_.end());
2458 
2459  columnDescriptorsForRoll.emplace_back(columnDescIt->second, nullptr);
2460  removeFromColumnMap(columnDescIt->second);
2461  --tableDescriptorMapById_[td.tableId]->nColumns;
2462  }
2463 
2464  // for each shard
2465  if (td.nShards > 0 && td.shard < 0) {
2466  for (const auto shard : getPhysicalTablesDescriptors(&td)) {
2467  const auto shard_cd = getMetadataForColumn(shard->tableId, cd.columnId);
2468  CHECK(shard_cd);
2469  dropColumn(*shard, *shard_cd);
2470  }
2471  }
2472 }
2473 
2474 void Catalog::roll(const bool forward) {
2475  cat_write_lock write_lock(this);
2476  std::set<const TableDescriptor*> tds;
2477 
2478  for (const auto& cdr : columnDescriptorsForRoll) {
2479  auto ocd = cdr.first;
2480  auto ncd = cdr.second;
2481  CHECK(ocd || ncd);
2482  auto tabDescIt = tableDescriptorMapById_.find((ncd ? ncd : ocd)->tableId);
2483  CHECK(tableDescriptorMapById_.end() != tabDescIt);
2484  auto td = tabDescIt->second;
2485  auto& vc = td->columnIdBySpi_;
2486  if (forward) {
2487  if (ocd) {
2488  if (nullptr == ncd ||
2489  ncd->columnType.get_comp_param() != ocd->columnType.get_comp_param()) {
2490  delDictionary(*ocd);
2491  }
2492 
2493  vc.erase(std::remove(vc.begin(), vc.end(), ocd->columnId), vc.end());
2494 
2495  delete ocd;
2496  }
2497  if (ncd) {
2498  // append columnId if its new and not phy geo
2499  if (vc.end() == std::find(vc.begin(), vc.end(), ncd->columnId)) {
2500  if (!ncd->isGeoPhyCol) {
2501  vc.push_back(ncd->columnId);
2502  }
2503  }
2504  }
2505  tds.insert(td);
2506  } else {
2507  if (ocd) {
2508  addToColumnMap(ocd);
2509  }
2510  // roll back the dict of new column
2511  if (ncd) {
2512  removeFromColumnMap(ncd);
2513  if (nullptr == ocd ||
2514  ocd->columnType.get_comp_param() != ncd->columnType.get_comp_param()) {
2515  delDictionary(*ncd);
2516  }
2517  delete ncd;
2518  }
2519  }
2520  }
2521  columnDescriptorsForRoll.clear();
2522 
2523  if (forward) {
2524  for (const auto td : tds) {
2525  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
2526  }
2527  }
2528 }
2529 
2530 void Catalog::expandGeoColumn(const ColumnDescriptor& cd,
2531  list<ColumnDescriptor>& columns) {
2532  const auto& col_ti = cd.columnType;
2533  if (IS_GEO(col_ti.get_type())) {
2534  switch (col_ti.get_type()) {
2535  case kPOINT: {
2536  ColumnDescriptor physical_cd_coords(true);
2537  physical_cd_coords.columnName = cd.columnName + "_coords";
2538  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2539  // Raw data: compressed/uncompressed coords
2540  coords_ti.set_subtype(kTINYINT);
2541  size_t unit_size;
2542  if (col_ti.get_compression() == kENCODING_GEOINT &&
2543  col_ti.get_comp_param() == 32) {
2544  unit_size = 4 * sizeof(int8_t);
2545  } else {
2546  CHECK(col_ti.get_compression() == kENCODING_NONE);
2547  unit_size = 8 * sizeof(int8_t);
2548  }
2549  coords_ti.set_size(2 * unit_size);
2550  physical_cd_coords.columnType = coords_ti;
2551  columns.push_back(physical_cd_coords);
2552 
2553  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2554 
2555  break;
2556  }
2557  case kLINESTRING: {
2558  ColumnDescriptor physical_cd_coords(true);
2559  physical_cd_coords.columnName = cd.columnName + "_coords";
2560  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2561  // Raw data: compressed/uncompressed coords
2562  coords_ti.set_subtype(kTINYINT);
2563  physical_cd_coords.columnType = coords_ti;
2564  columns.push_back(physical_cd_coords);
2565 
2566  ColumnDescriptor physical_cd_bounds(true);
2567  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2568  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2569  bounds_ti.set_subtype(kDOUBLE);
2570  bounds_ti.set_size(4 * sizeof(double));
2571  physical_cd_bounds.columnType = bounds_ti;
2572  columns.push_back(physical_cd_bounds);
2573 
2574  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2575 
2576  break;
2577  }
2578  case kPOLYGON: {
2579  ColumnDescriptor physical_cd_coords(true);
2580  physical_cd_coords.columnName = cd.columnName + "_coords";
2581  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2582  // Raw data: compressed/uncompressed coords
2583  coords_ti.set_subtype(kTINYINT);
2584  physical_cd_coords.columnType = coords_ti;
2585  columns.push_back(physical_cd_coords);
2586 
2587  ColumnDescriptor physical_cd_ring_sizes(true);
2588  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2589  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2590  ring_sizes_ti.set_subtype(kINT);
2591  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2592  columns.push_back(physical_cd_ring_sizes);
2593 
2594  ColumnDescriptor physical_cd_bounds(true);
2595  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2596  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2597  bounds_ti.set_subtype(kDOUBLE);
2598  bounds_ti.set_size(4 * sizeof(double));
2599  physical_cd_bounds.columnType = bounds_ti;
2600  columns.push_back(physical_cd_bounds);
2601 
2602  ColumnDescriptor physical_cd_render_group(true);
2603  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2604  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2605  physical_cd_render_group.columnType = render_group_ti;
2606  columns.push_back(physical_cd_render_group);
2607 
2608  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2609 
2610  break;
2611  }
2612  case kMULTIPOLYGON: {
2613  ColumnDescriptor physical_cd_coords(true);
2614  physical_cd_coords.columnName = cd.columnName + "_coords";
2615  SQLTypeInfo coords_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2616  // Raw data: compressed/uncompressed coords
2617  coords_ti.set_subtype(kTINYINT);
2618  physical_cd_coords.columnType = coords_ti;
2619  columns.push_back(physical_cd_coords);
2620 
2621  ColumnDescriptor physical_cd_ring_sizes(true);
2622  physical_cd_ring_sizes.columnName = cd.columnName + "_ring_sizes";
2623  SQLTypeInfo ring_sizes_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2624  ring_sizes_ti.set_subtype(kINT);
2625  physical_cd_ring_sizes.columnType = ring_sizes_ti;
2626  columns.push_back(physical_cd_ring_sizes);
2627 
2628  ColumnDescriptor physical_cd_poly_rings(true);
2629  physical_cd_poly_rings.columnName = cd.columnName + "_poly_rings";
2630  SQLTypeInfo poly_rings_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2631  poly_rings_ti.set_subtype(kINT);
2632  physical_cd_poly_rings.columnType = poly_rings_ti;
2633  columns.push_back(physical_cd_poly_rings);
2634 
2635  ColumnDescriptor physical_cd_bounds(true);
2636  physical_cd_bounds.columnName = cd.columnName + "_bounds";
2637  SQLTypeInfo bounds_ti = SQLTypeInfo(kARRAY, col_ti.get_notnull());
2638  bounds_ti.set_subtype(kDOUBLE);
2639  bounds_ti.set_size(4 * sizeof(double));
2640  physical_cd_bounds.columnType = bounds_ti;
2641  columns.push_back(physical_cd_bounds);
2642 
2643  ColumnDescriptor physical_cd_render_group(true);
2644  physical_cd_render_group.columnName = cd.columnName + "_render_group";
2645  SQLTypeInfo render_group_ti = SQLTypeInfo(kINT, col_ti.get_notnull());
2646  physical_cd_render_group.columnType = render_group_ti;
2647  columns.push_back(physical_cd_render_group);
2648 
2649  // If adding more physical columns - update SQLTypeInfo::get_physical_cols()
2650 
2651  break;
2652  }
2653  default:
2654  throw runtime_error("Unrecognized geometry type.");
2655  break;
2656  }
2657  }
2658 }
2659 
2660 namespace {
2662  auto timing_type_entry =
2664  CHECK(timing_type_entry != foreign_table.options.end());
2665  if (timing_type_entry->second ==
2668  foreign_table.options);
2669  }
2671 }
2672 } // namespace
2673 
2674 void Catalog::createTable(
2675  TableDescriptor& td,
2676  const list<ColumnDescriptor>& cols,
2677  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs,
2678  bool isLogicalTable) {
2679  cat_write_lock write_lock(this);
2680  list<ColumnDescriptor> cds = cols;
2681  list<DictDescriptor> dds;
2682  std::set<std::string> toplevel_column_names;
2683  list<ColumnDescriptor> columns;
2684 
2685  if (!td.storageType.empty() &&
2688  throw std::runtime_error("Only temporary tables can be backed by foreign storage.");
2689  }
2690  dataMgr_->getForeignStorageInterface()->prepareTable(getCurrentDB().dbId, td, cds);
2691  }
2692 
2693  for (auto cd : cds) {
2694  if (cd.columnName == "rowid") {
2695  throw std::runtime_error(
2696  "Cannot create column with name rowid. rowid is a system defined column.");
2697  }
2698  columns.push_back(cd);
2699  toplevel_column_names.insert(cd.columnName);
2700  if (cd.columnType.is_geometry()) {
2701  expandGeoColumn(cd, columns);
2702  }
2703  }
2704  cds.clear();
2705 
2706  ColumnDescriptor cd;
2707  // add row_id column -- Must be last column in the table
2708  cd.columnName = "rowid";
2709  cd.isSystemCol = true;
2710  cd.columnType = SQLTypeInfo(kBIGINT, true);
2711 #ifdef MATERIALIZED_ROWID
2712  cd.isVirtualCol = false;
2713 #else
2714  cd.isVirtualCol = true;
2715  cd.virtualExpr = "MAPD_FRAG_ID * MAPD_ROWS_PER_FRAG + MAPD_FRAG_ROW_ID";
2716 #endif
2717  columns.push_back(cd);
2718  toplevel_column_names.insert(cd.columnName);
2719 
2720  if (td.hasDeletedCol) {
2721  ColumnDescriptor cd_del;
2722  cd_del.columnName = "$deleted$";
2723  cd_del.isSystemCol = true;
2724  cd_del.isVirtualCol = false;
2725  cd_del.columnType = SQLTypeInfo(kBOOLEAN, true);
2726  cd_del.isDeletedCol = true;
2727 
2728  columns.push_back(cd_del);
2729  }
2730 
2731  td.nColumns = columns.size();
2732  // TODO(sy): don't take disk locks or touch sqlite connector for temporary tables
2734  sqliteConnector_.query("BEGIN TRANSACTION");
2736  try {
2737  sqliteConnector_.query_with_text_params(
2738  R"(INSERT INTO mapd_tables (name, userid, ncolumns, isview, fragments, frag_type, max_frag_rows, max_chunk_size, frag_page_size, max_rows, partitions, shard_column_id, shard, num_shards, sort_column_id, storage_type, max_rollback_epochs, is_system_table, key_metainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))",
2739  std::vector<std::string>{td.tableName,
2740  std::to_string(td.userId),
2742  std::to_string(td.isView),
2743  "",
2748  std::to_string(td.maxRows),
2749  td.partitions,
2751  std::to_string(td.shard),
2752  std::to_string(td.nShards),
2754  td.storageType,
2757  td.keyMetainfo});
2758 
2759  // now get the auto generated tableid
2760  sqliteConnector_.query_with_text_param(
2761  "SELECT tableid FROM mapd_tables WHERE name = ?", td.tableName);
2762  td.tableId = sqliteConnector_.getData<int>(0, 0);
2763  int colId = 1;
2764  for (auto cd : columns) {
2766  const bool is_foreign_col =
2767  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2768  if (!is_foreign_col) {
2769  // Ideally we would like to not persist string dictionaries for system tables,
2770  // since system table content can be highly dynamic and string dictionaries
2771  // are not currently vacuumed. However, in distributed this causes issues
2772  // when the cluster is out of sync (when the agg resets but leaves persist) so
2773  // for the sake of testing we need to leave this as normal dictionaries until
2774  // we solve the distributed issue.
2775  auto use_temp_dictionary = false; // td.is_system_table;
2776  setColumnDictionary(cd, dds, td, isLogicalTable, use_temp_dictionary);
2777  }
2778  }
2779 
2780  if (toplevel_column_names.count(cd.columnName)) {
2781  if (!cd.isGeoPhyCol) {
2782  td.columnIdBySpi_.push_back(colId);
2783  }
2784  }
2785 
2786  using BindType = SqliteConnector::BindType;
2787  std::vector<BindType> types(17, BindType::TEXT);
2788  if (!cd.default_value.has_value()) {
2789  types[16] = BindType::NULL_TYPE;
2790  }
2791  sqliteConnector_.query_with_text_params(
2792  "INSERT INTO mapd_columns (tableid, columnid, name, coltype, colsubtype, "
2793  "coldim, colscale, is_notnull, "
2794  "compression, comp_param, size, chunks, is_systemcol, is_virtualcol, "
2795  "virtual_expr, is_deletedcol, default_value) "
2796  "VALUES (?, ?, ?, ?, ?, "
2797  "?, "
2798  "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
2799  std::vector<std::string>{std::to_string(td.tableId),
2800  std::to_string(colId),
2801  cd.columnName,
2810  "",
2813  cd.virtualExpr,
2815  cd.default_value.value_or("NULL")},
2816  types);
2817  cd.tableId = td.tableId;
2818  cd.columnId = colId++;
2819  cds.push_back(cd);
2820  }
2821  if (td.isView) {
2822  sqliteConnector_.query_with_text_params(
2823  "INSERT INTO mapd_views (tableid, sql) VALUES (?,?)",
2824  std::vector<std::string>{std::to_string(td.tableId), td.viewSQL});
2825  }
2827  auto& foreign_table = dynamic_cast<foreign_storage::ForeignTable&>(td);
2828  foreign_table.next_refresh_time = get_next_refresh_time(foreign_table);
2829  sqliteConnector_.query_with_text_params(
2830  "INSERT INTO omnisci_foreign_tables (table_id, server_id, options, "
2831  "last_refresh_time, next_refresh_time) VALUES (?, ?, ?, ?, ?)",
2832  std::vector<std::string>{std::to_string(foreign_table.tableId),
2833  std::to_string(foreign_table.foreign_server->id),
2834  foreign_table.getOptionsAsJsonString(),
2835  std::to_string(foreign_table.last_refresh_time),
2836  std::to_string(foreign_table.next_refresh_time)});
2837  }
2838  } catch (std::exception& e) {
2839  sqliteConnector_.query("ROLLBACK TRANSACTION");
2840  throw;
2841  }
2842  } else { // Temporary table
2843  td.tableId = nextTempTableId_++;
2844  int colId = 1;
2845  for (auto cd : columns) {
2847  const bool is_foreign_col =
2848  setColumnSharedDictionary(cd, cds, dds, td, shared_dict_defs);
2849 
2850  if (!is_foreign_col) {
2851  // Create a new temporary dictionary
2852  std::string fileName("");
2853  std::string folderPath("");
2854  DictRef dict_ref(currentDB_.dbId, nextTempDictId_);
2855  nextTempDictId_++;
2856  DictDescriptor dd(dict_ref,
2857  fileName,
2859  false,
2860  1,
2861  folderPath,
2862  true); // Is dictName (2nd argument) used?
2863  dds.push_back(dd);
2864  if (!cd.columnType.is_array()) {
2866  }
2867  cd.columnType.set_comp_param(dict_ref.dictId);
2868  }
2869  }
2870  if (toplevel_column_names.count(cd.columnName)) {
2871  if (!cd.isGeoPhyCol) {
2872  td.columnIdBySpi_.push_back(colId);
2873  }
2874  }
2875  cd.tableId = td.tableId;
2876  cd.columnId = colId++;
2877  cds.push_back(cd);
2878  }
2879 
2881  serializeTableJsonUnlocked(&td, cds);
2882  }
2883  }
2884 
2885  try {
2886  auto cache = dataMgr_->getPersistentStorageMgr()->getDiskCache();
2887  if (cache) {
2888  CHECK(!cache->hasCachedMetadataForKeyPrefix({getCurrentDB().dbId, td.tableId}))
2889  << "Disk cache at " + cache->getCacheDirectory()
2890  << " contains preexisting data for new table. Please "
2891  "delete or clear cache before continuing";
2892  }
2893 
2894  addTableToMap(&td, cds, dds);
2895  calciteMgr_->updateMetadata(currentDB_.dbName, td.tableName);
2896  if (!td.storageType.empty() && td.storageType != StorageType::FOREIGN_TABLE) {
2897  dataMgr_->getForeignStorageInterface()->registerTable(this, td, cds);
2898  }
2899  } catch (std::exception& e) {
2900  sqliteConnector_.query("ROLLBACK TRANSACTION");
2901  removeTableFromMap(td.tableName, td.tableId, true);
2902  throw;
2903  }
2904  sqliteConnector_.query("END TRANSACTION");
2905 
2906  if (td.storageType != StorageType::FOREIGN_TABLE) {
2907  write_lock.unlock();
2908  sqlite_lock.unlock();
2909  getMetadataForTable(td.tableName,
2910  true); // cause instantiateFragmenter() to be called
2911  }
2912 }
2913 
2914 void Catalog::serializeTableJsonUnlocked(const TableDescriptor* td,
2915  const std::list<ColumnDescriptor>& cds) const {
2916  // relies on the catalog write lock
2917  using namespace rapidjson;
2918 
2919  VLOG(1) << "Serializing temporary table " << td->tableName << " to JSON for Calcite.";
2920 
2921  const auto db_name = currentDB_.dbName;
2922  const auto file_path = table_json_filepath(basePath_, db_name);
2923 
2924  Document d;
2925  if (boost::filesystem::exists(file_path)) {
2926  // look for an existing file for this database
2927  std::ifstream reader(file_path.string());
2928  CHECK(reader.is_open());
2929  IStreamWrapper json_read_wrapper(reader);
2930  d.ParseStream(json_read_wrapper);
2931  } else {
2932  d.SetObject();
2933  }
2934  CHECK(d.IsObject());
2935  CHECK(!d.HasMember(StringRef(td->tableName.c_str())));
2936 
2937  Value table(kObjectType);
2938  table.AddMember(
2939  "name", Value().SetString(StringRef(td->tableName.c_str())), d.GetAllocator());
2940  table.AddMember("id", Value().SetInt(td->tableId), d.GetAllocator());
2941  table.AddMember("columns", Value(kArrayType), d.GetAllocator());
2942 
2943  for (const auto& cd : cds) {
2944  Value column(kObjectType);
2945  column.AddMember(
2946  "name", Value().SetString(StringRef(cd.columnName)), d.GetAllocator());
2947  column.AddMember("coltype",
2948  Value().SetInt(static_cast<int>(cd.columnType.get_type())),
2949  d.GetAllocator());
2950  column.AddMember("colsubtype",
2951  Value().SetInt(static_cast<int>(cd.columnType.get_subtype())),
2952  d.GetAllocator());
2953  column.AddMember("compression",
2954  Value().SetInt(static_cast<int>(cd.columnType.get_compression())),
2955  d.GetAllocator());
2956  column.AddMember("comp_param",
2957  Value().SetInt(static_cast<int>(cd.columnType.get_comp_param())),
2958  d.GetAllocator());
2959  column.AddMember("size",
2960  Value().SetInt(static_cast<int>(cd.columnType.get_size())),
2961  d.GetAllocator());
2962  column.AddMember(
2963  "coldim", Value().SetInt(cd.columnType.get_dimension()), d.GetAllocator());
2964  column.AddMember(
2965  "colscale", Value().SetInt(cd.columnType.get_scale()), d.GetAllocator());
2966  column.AddMember(
2967  "is_notnull", Value().SetBool(cd.columnType.get_notnull()), d.GetAllocator());
2968  column.AddMember("is_systemcol", Value().SetBool(cd.isSystemCol), d.GetAllocator());
2969  column.AddMember("is_virtualcol", Value().SetBool(cd.isVirtualCol), d.GetAllocator());
2970  column.AddMember("is_deletedcol", Value().SetBool(cd.isDeletedCol), d.GetAllocator());
2971  table["columns"].PushBack(column, d.GetAllocator());
2972  }
2973  d.AddMember(StringRef(td->tableName.c_str()), table, d.GetAllocator());
2974 
2975  // Overwrite the existing file
2976  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
2977  CHECK(writer.is_open());
2978  OStreamWrapper json_wrapper(writer);
2979 
2980  Writer<OStreamWrapper> json_writer(json_wrapper);
2981  d.Accept(json_writer);
2982  writer.close();
2983 }
2984 
2985 void Catalog::dropTableFromJsonUnlocked(const std::string& table_name) const {
2986  // relies on the catalog write lock
2987  using namespace rapidjson;
2988 
2989  VLOG(1) << "Dropping temporary table " << table_name << " to JSON for Calcite.";
2990 
2991  const auto db_name = currentDB_.dbName;
2992  const auto file_path = table_json_filepath(basePath_, db_name);
2993 
2994  CHECK(boost::filesystem::exists(file_path));
2995  Document d;
2996 
2997  std::ifstream reader(file_path.string());
2998  CHECK(reader.is_open());
2999  IStreamWrapper json_read_wrapper(reader);
3000  d.ParseStream(json_read_wrapper);
3001 
3002  CHECK(d.IsObject());
3003  auto table_name_ref = StringRef(table_name.c_str());
3004  CHECK(d.HasMember(table_name_ref));
3005  CHECK(d.RemoveMember(table_name_ref));
3006 
3007  // Overwrite the existing file
3008  std::ofstream writer(file_path.string(), std::ios::trunc | std::ios::out);
3009  CHECK(writer.is_open());
3010  OStreamWrapper json_wrapper(writer);
3011 
3012  Writer<OStreamWrapper> json_writer(json_wrapper);
3013  d.Accept(json_writer);
3014  writer.close();
3015 }
3016 
3017 void Catalog::createForeignServer(
3018  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3019  bool if_not_exists) {
3020  cat_write_lock write_lock(this);
3022  createForeignServerNoLocks(std::move(foreign_server), if_not_exists);
3023 }
3024 
3025 void Catalog::createForeignServerNoLocks(
3026  std::unique_ptr<foreign_storage::ForeignServer> foreign_server,
3027  bool if_not_exists) {
3028  const auto& name = foreign_server->name;
3029 
3030  sqliteConnector_.query_with_text_params(
3031  "SELECT name from omnisci_foreign_servers where name = ?",
3032  std::vector<std::string>{name});
3033 
3034  if (sqliteConnector_.getNumRows() == 0) {
3035  foreign_server->creation_time = std::time(nullptr);
3036  sqliteConnector_.query_with_text_params(
3037  "INSERT INTO omnisci_foreign_servers (name, data_wrapper_type, owner_user_id, "
3038  "creation_time, "
3039  "options) "
3040  "VALUES (?, ?, ?, ?, ?)",
3041  std::vector<std::string>{name,
3042  foreign_server->data_wrapper_type,
3043  std::to_string(foreign_server->user_id),
3044  std::to_string(foreign_server->creation_time),
3045  foreign_server->getOptionsAsJsonString()});
3046  sqliteConnector_.query_with_text_params(
3047  "SELECT id from omnisci_foreign_servers where name = ?",
3048  std::vector<std::string>{name});
3049  CHECK_EQ(sqliteConnector_.getNumRows(), size_t(1));
3050  foreign_server->id = sqliteConnector_.getData<int32_t>(0, 0);
3051  std::shared_ptr<foreign_storage::ForeignServer> foreign_server_shared =
3052  std::move(foreign_server);
3053  CHECK(foreignServerMap_.find(name) == foreignServerMap_.end())
3054  << "Attempting to insert a foreign server into foreign server map that already "
3055  "exists.";
3056  foreignServerMap_[name] = foreign_server_shared;
3057  foreignServerMapById_[foreign_server_shared->id] = foreign_server_shared;
3058  } else if (!if_not_exists) {
3059  throw std::runtime_error{"A foreign server with name \"" + foreign_server->name +
3060  "\" already exists."};
3061  }
3062 
3063  const auto& server_it = foreignServerMap_.find(name);
3064  CHECK(server_it != foreignServerMap_.end());
3065  CHECK(foreignServerMapById_.find(server_it->second->id) != foreignServerMapById_.end());
3066 }
3067 
3068 const foreign_storage::ForeignServer* Catalog::getForeignServer(
3069  const std::string& server_name) const {
3070  foreign_storage::ForeignServer* foreign_server = nullptr;
3071  cat_read_lock read_lock(this);
3072 
3073  if (foreignServerMap_.find(server_name) != foreignServerMap_.end()) {
3074  foreign_server = foreignServerMap_.find(server_name)->second.get();
3075  }
3076  return foreign_server;
3077 }
3078 
3079 const std::unique_ptr<const foreign_storage::ForeignServer>
3080 Catalog::getForeignServerFromStorage(const std::string& server_name) {
3081  std::unique_ptr<foreign_storage::ForeignServer> foreign_server = nullptr;
3083  sqliteConnector_.query_with_text_params(
3084  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time "
3085  "FROM omnisci_foreign_servers WHERE name = ?",
3086  std::vector<std::string>{server_name});
3087  if (sqliteConnector_.getNumRows() > 0) {
3088  foreign_server = std::make_unique<foreign_storage::ForeignServer>(
3089  sqliteConnector_.getData<int>(0, 0),
3090  sqliteConnector_.getData<std::string>(0, 1),
3091  sqliteConnector_.getData<std::string>(0, 2),
3092  sqliteConnector_.getData<std::string>(0, 3),
3093  sqliteConnector_.getData<std::int32_t>(0, 4),
3094  sqliteConnector_.getData<std::int32_t>(0, 5));
3095  }
3096  return foreign_server;
3097 }
3098 
3099 const std::unique_ptr<const foreign_storage::ForeignTable>
3100 Catalog::getForeignTableFromStorage(int table_id) {
3101  std::unique_ptr<foreign_storage::ForeignTable> foreign_table = nullptr;
3103  sqliteConnector_.query_with_text_params(
3104  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
3105  "omnisci_foreign_tables WHERE table_id = ?",
3106  std::vector<std::string>{to_string(table_id)});
3107  auto num_rows = sqliteConnector_.getNumRows();
3108  if (num_rows > 0) {
3109  CHECK_EQ(size_t(1), num_rows);
3110  foreign_table = std::make_unique<foreign_storage::ForeignTable>(
3111  sqliteConnector_.getData<int>(0, 0),
3112  foreignServerMapById_[sqliteConnector_.getData<int32_t>(0, 1)].get(),
3113  sqliteConnector_.getData<std::string>(0, 2),
3114  sqliteConnector_.getData<int64_t>(0, 3),
3115  sqliteConnector_.getData<int64_t>(0, 4));
3116  }
3117  return foreign_table;
3118 }
3119 
3120 void Catalog::changeForeignServerOwner(const std::string& server_name,
3121  const int new_owner_id) {
3122  cat_write_lock write_lock(this);
3123  foreign_storage::ForeignServer* foreign_server =
3124  foreignServerMap_.find(server_name)->second.get();
3125  CHECK(foreign_server);
3126  setForeignServerProperty(server_name, "owner_user_id", std::to_string(new_owner_id));
3127  // update in-memory server
3128  foreign_server->user_id = new_owner_id;
3129 }
3130 
3131 void Catalog::setForeignServerDataWrapper(const std::string& server_name,
3132  const std::string& data_wrapper) {
3133  cat_write_lock write_lock(this);
3134  auto data_wrapper_type = to_upper(data_wrapper);
3135  // update in-memory server
3136  foreign_storage::ForeignServer* foreign_server =
3137  foreignServerMap_.find(server_name)->second.get();
3138  CHECK(foreign_server);
3139  std::string saved_data_wrapper_type = foreign_server->data_wrapper_type;
3140  foreign_server->data_wrapper_type = data_wrapper_type;
3141  try {
3142  foreign_server->validate();
3143  } catch (const std::exception& e) {
3144  // validation did not succeed:
3145  // revert to saved data_wrapper_type & throw exception
3146  foreign_server->data_wrapper_type = saved_data_wrapper_type;
3147  throw;
3148  }
3149  setForeignServerProperty(server_name, "data_wrapper_type", data_wrapper_type);
3150 }
3151 
3152 void Catalog::setForeignServerOptions(const std::string& server_name,
3153  const std::string& options) {
3154  cat_write_lock write_lock(this);
3155  // update in-memory server
3156  foreign_storage::ForeignServer* foreign_server =
3157  foreignServerMap_.find(server_name)->second.get();
3158  CHECK(foreign_server);
3159  auto saved_options = foreign_server->options;
3160  foreign_server->populateOptionsMap(options, true);
3161  try {
3162  foreign_server->validate();
3163  } catch (const std::exception& e) {
3164  // validation did not succeed:
3165  // revert to saved options & throw exception
3166  foreign_server->options = saved_options;
3167  throw;
3168  }
3169  setForeignServerProperty(server_name, "options", options);
3170 }
3171 
3172 void Catalog::renameForeignServer(const std::string& server_name,
3173  const std::string& name) {
3174  cat_write_lock write_lock(this);
3175  auto foreign_server_it = foreignServerMap_.find(server_name);
3176  CHECK(foreign_server_it != foreignServerMap_.end());
3177  setForeignServerProperty(server_name, "name", name);
3178  auto foreign_server_shared = foreign_server_it->second;
3179  foreign_server_shared->name = name;
3180  foreignServerMap_[name] = foreign_server_shared;
3181  foreignServerMap_.erase(foreign_server_it);
3182 }
3183 
3184 void Catalog::dropForeignServer(const std::string& server_name) {
3185  cat_write_lock write_lock(this);
3187 
3188  sqliteConnector_.query_with_text_params(
3189  "SELECT id from omnisci_foreign_servers where name = ?",
3190  std::vector<std::string>{server_name});
3191  auto num_rows = sqliteConnector_.getNumRows();
3192  if (num_rows > 0) {
3193  CHECK_EQ(size_t(1), num_rows);
3194  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
3195  sqliteConnector_.query_with_text_param(
3196  "SELECT table_id from omnisci_foreign_tables where server_id = ?",
3197  std::to_string(server_id));
3198  if (sqliteConnector_.getNumRows() > 0) {
3199  throw std::runtime_error{"Foreign server \"" + server_name +
3200  "\" is referenced "
3201  "by existing foreign tables and cannot be dropped."};
3202  }
3203  sqliteConnector_.query("BEGIN TRANSACTION");
3204  try {
3205  sqliteConnector_.query_with_text_params(
3206  "DELETE FROM omnisci_foreign_servers WHERE name = ?",
3207  std::vector<std::string>{server_name});
3208  } catch (const std::exception& e) {
3209  sqliteConnector_.query("ROLLBACK TRANSACTION");
3210  throw;
3211  }
3212  sqliteConnector_.query("END TRANSACTION");
3213  foreignServerMap_.erase(server_name);
3214  foreignServerMapById_.erase(server_id);
3215  }
3216 }
3217 
3218 void Catalog::getForeignServersForUser(
3219  const rapidjson::Value* filters,
3220  const UserMetadata& user,
3221  std::vector<const foreign_storage::ForeignServer*>& results) {
3222  sys_read_lock syscat_read_lock(&SysCatalog::instance());
3223  cat_read_lock read_lock(this);
3225  // Customer facing and internal SQlite names
3226  std::map<std::string, std::string> col_names{{"server_name", "name"},
3227  {"data_wrapper", "data_wrapper_type"},
3228  {"created_at", "creation_time"},
3229  {"options", "options"}};
3230 
3231  // TODO add "owner" when FSI privilege is implemented
3232  std::stringstream filter_string;
3233  std::vector<std::string> arguments;
3234 
3235  if (filters != nullptr) {
3236  // Create SQL WHERE clause for SQLite query
3237  int num_filters = 0;
3238  filter_string << " WHERE";
3239  for (auto& filter_def : filters->GetArray()) {
3240  if (num_filters > 0) {
3241  filter_string << " " << std::string(filter_def["chain"].GetString());
3242  ;
3243  }
3244 
3245  if (col_names.find(std::string(filter_def["attribute"].GetString())) ==
3246  col_names.end()) {
3247  throw std::runtime_error{"Attribute with name \"" +
3248  std::string(filter_def["attribute"].GetString()) +
3249  "\" does not exist."};
3250  }
3251 
3252  filter_string << " " << col_names[std::string(filter_def["attribute"].GetString())];
3253 
3254  bool equals_operator = false;
3255  if (std::strcmp(filter_def["operation"].GetString(), "EQUALS") == 0) {
3256  filter_string << " = ? ";
3257  equals_operator = true;
3258  } else {
3259  filter_string << " LIKE ? ";
3260  }
3261 
3262  bool timestamp_column =
3263  (std::strcmp(filter_def["attribute"].GetString(), "created_at") == 0);
3264 
3265  if (timestamp_column && !equals_operator) {
3266  throw std::runtime_error{"LIKE operator is incompatible with TIMESTAMP data"};
3267  }
3268 
3269  if (timestamp_column && equals_operator) {
3270  arguments.push_back(std::to_string(
3271  dateTimeParse<kTIMESTAMP>(filter_def["value"].GetString(), 0)));
3272  } else {
3273  arguments.emplace_back(filter_def["value"].GetString());
3274  }
3275 
3276  num_filters++;
3277  }
3278  }
3279  // Create select query for the omnisci_foreign_servers table
3280  std::string query = std::string("SELECT name from omnisci_foreign_servers ");
3281  query += filter_string.str();
3282 
3283  sqliteConnector_.query_with_text_params(query, arguments);
3284  auto num_rows = sqliteConnector_.getNumRows();
3285 
3286  if (sqliteConnector_.getNumRows() == 0) {
3287  return;
3288  }
3289 
3290  CHECK(sqliteConnector_.getNumCols() == 1);
3291  // Return pointers to objects
3292  results.reserve(num_rows);
3293  for (size_t row = 0; row < num_rows; ++row) {
3294  const auto& server_name = sqliteConnector_.getData<std::string>(row, 0);
3295  if (shared::contains(INTERNAL_SERVERS, server_name)) {
3296  continue;
3297  }
3298  const foreign_storage::ForeignServer* foreign_server = getForeignServer(server_name);
3299  CHECK(foreign_server != nullptr);
3300 
3301  DBObject dbObject(foreign_server->name, ServerDBObjectType);
3302  dbObject.loadKey(*this);
3303  std::vector<DBObject> privObjects = {dbObject};
3304  if (!SysCatalog::instance().hasAnyPrivileges(user, privObjects)) {
3305  // skip server, as there are no privileges to access it
3306  continue;
3307  }
3308  results.push_back(foreign_server);
3309  }
3310 }
3311 
3312 // returns the table epoch or -1 if there is something wrong with the shared epoch
3313 int32_t Catalog::getTableEpoch(const int32_t db_id, const int32_t table_id) const {
3314  cat_read_lock read_lock(this);
3315  const auto td = getMetadataForTable(table_id, false);
3316  if (!td) {
3317  std::stringstream table_not_found_error_message;
3318  table_not_found_error_message << "Table (" << db_id << "," << table_id
3319  << ") not found";
3320  throw std::runtime_error(table_not_found_error_message.str());
3321  }
3322  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(table_id);
3323  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3324  // check all shards have same checkpoint
3325  const auto physicalTables = physicalTableIt->second;
3326  CHECK(!physicalTables.empty());
3327  size_t curr_epoch{0}, first_epoch{0};
3328  int32_t first_table_id{0};
3329  bool are_epochs_inconsistent{false};
3330  for (size_t i = 0; i < physicalTables.size(); i++) {
3331  int32_t physical_tb_id = physicalTables[i];
3332  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3333  CHECK(phys_td);
3334 
3335  curr_epoch = dataMgr_->getTableEpoch(db_id, physical_tb_id);
3336  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3337  << ", table id: " << physical_tb_id << ", epoch: " << curr_epoch;
3338  if (i == 0) {
3339  first_epoch = curr_epoch;
3340  first_table_id = physical_tb_id;
3341  } else if (first_epoch != curr_epoch) {
3342  are_epochs_inconsistent = true;
3343  LOG(ERROR) << "Epochs on shards do not all agree on table id: " << table_id
3344  << ", db id: " << db_id
3345  << ". First table (table id: " << first_table_id
3346  << ") has epoch: " << first_epoch << ". Table id: " << physical_tb_id
3347  << ", has inconsistent epoch: " << curr_epoch
3348  << ". See previous INFO logs for all epochs and their table ids.";
3349  }
3350  }
3351  if (are_epochs_inconsistent) {
3352  // oh dear the shards do not agree on the epoch for this table
3353  return -1;
3354  }
3355  return curr_epoch;
3356  } else {
3357  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3358  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3359  << ", epoch: " << epoch;
3360  return epoch;
3361  }
3362 }
3363 
3364 std::vector<const foreign_storage::ForeignTable*>
3365 Catalog::getAllForeignTablesForForeignServer(const int32_t foreign_server_id) {
3366  cat_read_lock read_lock(this);
3367  std::vector<const foreign_storage::ForeignTable*> foreign_tables;
3368  for (auto entry : tableDescriptorMapById_) {
3369  auto table_descriptor = entry.second;
3370  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
3371  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
3372  CHECK(foreign_table);
3373  if (foreign_table->foreign_server->id == foreign_server_id) {
3374  foreign_tables.emplace_back(foreign_table);
3375  }
3376  }
3377  }
3378  return foreign_tables;
3379 }
3380 
3381 void Catalog::setTableEpoch(const int db_id, const int table_id, int new_epoch) {
3382  LOG(INFO) << "Set table epoch db:" << db_id << " Table ID " << table_id
3383  << " back to new epoch " << new_epoch;
3384  const auto td = getMetadataForTable(table_id, false);
3385  if (!td) {
3386  std::stringstream table_not_found_error_message;
3387  table_not_found_error_message << "Table (" << db_id << "," << table_id
3388  << ") not found";
3389  throw std::runtime_error(table_not_found_error_message.str());
3390  }
3391  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3392  std::stringstream is_temp_table_error_message;
3393  is_temp_table_error_message << "Cannot set epoch on temporary table";
3394  throw std::runtime_error(is_temp_table_error_message.str());
3395  }
3396 
3397  File_Namespace::FileMgrParams file_mgr_params;
3398  file_mgr_params.epoch = new_epoch;
3399  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3400 
3401  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3402  CHECK(!physical_tables.empty());
3403  for (const auto table : physical_tables) {
3404  auto table_id = table->tableId;
3405  LOG(INFO) << "Set sharded table epoch db:" << db_id << " Table ID " << table_id
3406  << " back to new epoch " << new_epoch;
3407  // Should have table lock from caller so safe to do this after, avoids
3408  // having to repopulate data on error
3409  removeChunks(table_id);
3410  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3411  }
3412 }
3413 
3414 void Catalog::alterPhysicalTableMetadata(
3415  const TableDescriptor* td,
3416  const TableDescriptorUpdateParams& table_update_params) {
3417  // Only called from parent alterTableParamMetadata, expect already to have catalog and
3418  // sqlite write locks
3419 
3420  // Sqlite transaction should have already been begun in parent alterTableCatalogMetadata
3421 
3422  TableDescriptor* mutable_td = getMutableMetadataForTableUnlocked(td->tableId);
3423  CHECK(mutable_td);
3424  if (td->maxRollbackEpochs != table_update_params.max_rollback_epochs) {
3425  sqliteConnector_.query_with_text_params(
3426  "UPDATE mapd_tables SET max_rollback_epochs = ? WHERE tableid = ?",
3427  std::vector<std::string>{std::to_string(table_update_params.max_rollback_epochs),
3428  std::to_string(td->tableId)});
3429  mutable_td->maxRollbackEpochs = table_update_params.max_rollback_epochs;
3430  }
3431 
3432  if (td->maxRows != table_update_params.max_rows) {
3433  sqliteConnector_.query_with_text_params(
3434  "UPDATE mapd_tables SET max_rows = ? WHERE tableid = ?",
3435  std::vector<std::string>{std::to_string(table_update_params.max_rows),
3436  std::to_string(td->tableId)});
3437  mutable_td->maxRows = table_update_params.max_rows;
3438  }
3439 }
3440 
3441 void Catalog::alterTableMetadata(const TableDescriptor* td,
3442  const TableDescriptorUpdateParams& table_update_params) {
3443  cat_write_lock write_lock(this);
3445  sqliteConnector_.query("BEGIN TRANSACTION");
3446  try {
3447  const auto physical_table_it = logicalToPhysicalTableMapById_.find(td->tableId);
3448  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3449  const auto physical_tables = physical_table_it->second;
3450  CHECK(!physical_tables.empty());
3451  for (size_t i = 0; i < physical_tables.size(); i++) {
3452  int32_t physical_tb_id = physical_tables[i];
3453  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id, false);
3454  CHECK(phys_td);
3455  alterPhysicalTableMetadata(phys_td, table_update_params);
3456  }
3457  }
3458  alterPhysicalTableMetadata(td, table_update_params);
3459  } catch (std::exception& e) {
3460  sqliteConnector_.query("ROLLBACK TRANSACTION");
3461  LOG(FATAL) << "Table '" << td->tableName << "' catalog update failed";
3462  }
3463  sqliteConnector_.query("END TRANSACTION");
3464 }
3465 
3466 void Catalog::setMaxRollbackEpochs(const int32_t table_id,
3467  const int32_t max_rollback_epochs) {
3468  // Must be called from AlterTableParamStmt or other method that takes executor and
3469  // TableSchema locks
3470  if (max_rollback_epochs <= -1) {
3471  throw std::runtime_error("Cannot set max_rollback_epochs < 0.");
3472  }
3473  const auto td = getMetadataForTable(
3474  table_id, false); // Deep copy as there will be gap between read and write locks
3475  CHECK(td); // Existence should have already been checked in
3476  // ParserNode::AlterTableParmStmt
3477  TableDescriptorUpdateParams table_update_params(td);
3478  table_update_params.max_rollback_epochs = max_rollback_epochs;
3479  if (table_update_params == td) { // Operator is overloaded to test for equality
3480  LOG(INFO) << "Setting max_rollback_epochs for table " << table_id
3481  << " to existing value, skipping operation";
3482  return;
3483  }
3484  File_Namespace::FileMgrParams file_mgr_params;
3485  file_mgr_params.epoch = -1; // Use existing epoch
3486  file_mgr_params.max_rollback_epochs = max_rollback_epochs;
3487  setTableFileMgrParams(table_id, file_mgr_params);
3488  alterTableMetadata(td, table_update_params);
3489 }
3490 
3491 void Catalog::setMaxRows(const int32_t table_id, const int64_t max_rows) {
3492  if (max_rows < 0) {
3493  throw std::runtime_error("Max rows cannot be a negative number.");
3494  }
3495  const auto td = getMetadataForTable(table_id);
3496  CHECK(td);
3497  TableDescriptorUpdateParams table_update_params(td);
3498  table_update_params.max_rows = max_rows;
3499  if (table_update_params == td) {
3500  LOG(INFO) << "Max rows value of " << max_rows
3501  << " is the same as the existing value. Skipping update.";
3502  return;
3503  }
3504  alterTableMetadata(td, table_update_params);
3505  CHECK(td->fragmenter);
3506  td->fragmenter->dropFragmentsToSize(max_rows);
3507 }
3508 
3509 // For testing purposes only
3510 void Catalog::setUncappedTableEpoch(const std::string& table_name) {
3511  cat_write_lock write_lock(this);
3512  auto td_entry = tableDescriptorMap_.find(to_upper(table_name));
3513  CHECK(td_entry != tableDescriptorMap_.end());
3514  auto td = td_entry->second;
3515 
3516  std::vector<int> table_key{getCurrentDB().dbId, td->tableId};
3517  ResultSetCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
3518 
3519  TableDescriptorUpdateParams table_update_params(td);
3520  table_update_params.max_rollback_epochs = -1;
3521  write_lock.unlock();
3522 
3523  alterTableMetadata(td, table_update_params);
3524  File_Namespace::FileMgrParams file_mgr_params;
3525  file_mgr_params.max_rollback_epochs = -1;
3526  setTableFileMgrParams(td->tableId, file_mgr_params);
3527 }
3528 
3529 void Catalog::setTableFileMgrParams(
3530  const int table_id,
3531  const File_Namespace::FileMgrParams& file_mgr_params) {
3532  // Expects parent to have write lock
3533  const auto td = getMetadataForTable(table_id, false);
3534  const auto db_id = this->getDatabaseId();
3535  if (!td) {
3536  std::stringstream table_not_found_error_message;
3537  table_not_found_error_message << "Table (" << db_id << "," << table_id
3538  << ") not found";
3539  throw std::runtime_error(table_not_found_error_message.str());
3540  }
3541  if (td->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) {
3542  std::stringstream is_temp_table_error_message;
3543  is_temp_table_error_message << "Cannot set storage params on temporary table";
3544  throw std::runtime_error(is_temp_table_error_message.str());
3545  }
3546 
3547  const auto physical_tables = getPhysicalTablesDescriptors(td, false);
3548  CHECK(!physical_tables.empty());
3549  for (const auto table : physical_tables) {
3550  auto table_id = table->tableId;
3551  removeChunks(table_id);
3552  dataMgr_->getGlobalFileMgr()->setFileMgrParams(db_id, table_id, file_mgr_params);
3553  }
3554 }
3555 
3556 std::vector<TableEpochInfo> Catalog::getTableEpochs(const int32_t db_id,
3557  const int32_t table_id) const {
3558  cat_read_lock read_lock(this);
3559  std::vector<TableEpochInfo> table_epochs;
3560  const auto physical_table_it = logicalToPhysicalTableMapById_.find(table_id);
3561  if (physical_table_it != logicalToPhysicalTableMapById_.end()) {
3562  const auto physical_tables = physical_table_it->second;
3563  CHECK(!physical_tables.empty());
3564 
3565  for (const auto physical_tb_id : physical_tables) {
3566  const auto phys_td = getMutableMetadataForTableUnlocked(physical_tb_id);
3567  CHECK(phys_td);
3568 
3569  auto table_id = phys_td->tableId;
3570  auto epoch = dataMgr_->getTableEpoch(db_id, phys_td->tableId);
3571  table_epochs.emplace_back(table_id, epoch);
3572  LOG(INFO) << "Got sharded table epoch for db id: " << db_id
3573  << ", table id: " << table_id << ", epoch: " << epoch;
3574  }
3575  } else {
3576  auto epoch = dataMgr_->getTableEpoch(db_id, table_id);
3577  LOG(INFO) << "Got table epoch for db id: " << db_id << ", table id: " << table_id
3578  << ", epoch: " << epoch;
3579  table_epochs.emplace_back(table_id, epoch);
3580  }
3581  return table_epochs;
3582 }
3583 
3584 void Catalog::setTableEpochs(const int32_t db_id,
3585  const std::vector<TableEpochInfo>& table_epochs) const {
3586  const auto td = getMetadataForTable(table_epochs[0].table_id, false);
3587  CHECK(td);
3588  File_Namespace::FileMgrParams file_mgr_params;
3589  file_mgr_params.max_rollback_epochs = td->maxRollbackEpochs;
3590 
3591  for (const auto& table_epoch_info : table_epochs) {
3592  removeChunks(table_epoch_info.table_id);
3593  file_mgr_params.epoch = table_epoch_info.table_epoch;
3594  dataMgr_->getGlobalFileMgr()->setFileMgrParams(
3595  db_id, table_epoch_info.table_id, file_mgr_params);
3596  LOG(INFO) << "Set table epoch for db id: " << db_id
3597  << ", table id: " << table_epoch_info.table_id
3598  << ", back to epoch: " << table_epoch_info.table_epoch;
3599  }
3600 }
3601 
3602 namespace {
3603 std::string table_epochs_to_string(const std::vector<TableEpochInfo>& table_epochs) {
3604  std::string table_epochs_str{"["};
3605  bool first_entry{true};
3606  for (const auto& table_epoch : table_epochs) {
3607  if (first_entry) {
3608  first_entry = false;
3609  } else {
3610  table_epochs_str += ", ";
3611  }
3612  table_epochs_str += "(table_id: " + std::to_string(table_epoch.table_id) +
3613  ", epoch: " + std::to_string(table_epoch.table_epoch) + ")";
3614  }
3615  table_epochs_str += "]";
3616  return table_epochs_str;
3617 }
3618 } // namespace
3619 
3620 void Catalog::setTableEpochsLogExceptions(
3621  const int32_t db_id,
3622  const std::vector<TableEpochInfo>& table_epochs) const {
3623  try {
3624  setTableEpochs(db_id, table_epochs);
3625  } catch (std::exception& e) {
3626  LOG(ERROR) << "An error occurred when attempting to set table epochs. DB id: "
3627  << db_id << ", Table epochs: " << table_epochs_to_string(table_epochs)
3628  << ", Error: " << e.what();
3629  }
3630 }
3631 
3632 const ColumnDescriptor* Catalog::getDeletedColumn(const TableDescriptor* td) const {
3633  cat_read_lock read_lock(this);
3634  const auto it = deletedColumnPerTable_.find(td);
3635  return it != deletedColumnPerTable_.end() ? it->second : nullptr;
3636 }
3637 
3638 const bool Catalog::checkMetadataForDeletedRecs(const TableDescriptor* td,
3639  int delete_column_id) const {
3640  // check if there are rows deleted by examining the deletedColumn metadata
3641  CHECK(td);
3642  auto fragmenter = td->fragmenter;
3643  if (fragmenter) {
3644  return fragmenter->hasDeletedRows(delete_column_id);
3645  } else {
3646  return false;
3647  }
3648 }
3649 
3650 const ColumnDescriptor* Catalog::getDeletedColumnIfRowsDeleted(
3651  const TableDescriptor* td) const {
3652  std::vector<const TableDescriptor*> tds;
3653  const ColumnDescriptor* cd;
3654  {
3655  cat_read_lock read_lock(this);
3656 
3657  const auto it = deletedColumnPerTable_.find(td);
3658  // if not a table that supports delete return nullptr, nothing more to do
3659  if (it == deletedColumnPerTable_.end()) {
3660  return nullptr;
3661  }
3662  cd = it->second;
3663  tds = getPhysicalTablesDescriptors(td, false);
3664  }
3665  // individual tables are still protected by higher level locks
3666  for (auto tdd : tds) {
3667  if (checkMetadataForDeletedRecs(tdd, cd->columnId)) {
3668  return cd;
3669  }
3670  }
3671  // no deletes so far recorded in metadata
3672  return nullptr;
3673 }
3674 
3675 void Catalog::setDeletedColumn(const TableDescriptor* td, const ColumnDescriptor* cd) {
3676  cat_write_lock write_lock(this);
3677  setDeletedColumnUnlocked(td, cd);
3678 }
3679 
3680 void Catalog::setDeletedColumnUnlocked(const TableDescriptor* td,
3681  const ColumnDescriptor* cd) {
3682  const auto it_ok = deletedColumnPerTable_.emplace(td, cd);
3683  CHECK(it_ok.second);
3684 }
3685 
3686 namespace {
3687 
3689  const Catalog& cat,
3690  const Parser::SharedDictionaryDef& shared_dict_def) {
3691  const auto& table_name = shared_dict_def.get_foreign_table();
3692  const auto td = cat.getMetadataForTable(table_name, false);
3693  CHECK(td);
3694  const auto& foreign_col_name = shared_dict_def.get_foreign_column();
3695  return cat.getMetadataForColumn(td->tableId, foreign_col_name);
3696 }
3697 
3698 } // namespace
3699 
3700 void Catalog::addReferenceToForeignDict(ColumnDescriptor& referencing_column,
3701  Parser::SharedDictionaryDef shared_dict_def,
3702  const bool persist_reference) {
3703  cat_write_lock write_lock(this);
3704  const auto foreign_ref_col = get_foreign_col(*this, shared_dict_def);
3705  CHECK(foreign_ref_col);
3706  referencing_column.columnType = foreign_ref_col->columnType;
3707  const int dict_id = referencing_column.columnType.get_comp_param();
3708  const DictRef dict_ref(currentDB_.dbId, dict_id);
3709  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3710  CHECK(dictIt != dictDescriptorMapByRef_.end());
3711  const auto& dd = dictIt->second;
3712  CHECK_GE(dd->refcount, 1);
3713  ++dd->refcount;
3714  if (persist_reference) {
3716  sqliteConnector_.query_with_text_params(
3717  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3718  {std::to_string(dict_id)});
3719  }
3720 }
3721 
3722 bool Catalog::setColumnSharedDictionary(
3723  ColumnDescriptor& cd,
3724  std::list<ColumnDescriptor>& cdd,
3725  std::list<DictDescriptor>& dds,
3726  const TableDescriptor td,
3727  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3728  cat_write_lock write_lock(this);
3730 
3731  if (shared_dict_defs.empty()) {
3732  return false;
3733  }
3734  for (const auto& shared_dict_def : shared_dict_defs) {
3735  // check if the current column is a referencing column
3736  const auto& column = shared_dict_def.get_column();
3737  if (cd.columnName == column) {
3738  if (!shared_dict_def.get_foreign_table().compare(td.tableName)) {
3739  // Dictionaries are being shared in table to be created
3740  const auto& ref_column = shared_dict_def.get_foreign_column();
3741  auto colIt =
3742  std::find_if(cdd.begin(), cdd.end(), [ref_column](const ColumnDescriptor it) {
3743  return !ref_column.compare(it.columnName);
3744  });
3745  CHECK(colIt != cdd.end());
3746  cd.columnType = colIt->columnType;
3747 
3748  const int dict_id = colIt->columnType.get_comp_param();
3749  CHECK_GE(dict_id, 1);
3750  auto dictIt = std::find_if(
3751  dds.begin(), dds.end(), [this, dict_id](const DictDescriptor it) {
3752  return it.dictRef.dbId == this->currentDB_.dbId &&
3753  it.dictRef.dictId == dict_id;
3754  });
3755  if (dictIt != dds.end()) {
3756  // There exists dictionary definition of a dictionary column
3757  CHECK_GE(dictIt->refcount, 1);
3758  ++dictIt->refcount;
3759  if (!table_is_temporary(&td)) {
3760  // Persist reference count
3761  sqliteConnector_.query_with_text_params(
3762  "UPDATE mapd_dictionaries SET refcount = refcount + 1 WHERE dictid = ?",
3763  {std::to_string(dict_id)});
3764  }
3765  } else {
3766  // The dictionary is referencing a column which is referencing a column in
3767  // diffrent table
3768  auto root_dict_def = compress_reference_path(shared_dict_def, shared_dict_defs);
3769  addReferenceToForeignDict(cd, root_dict_def, !table_is_temporary(&td));
3770  }
3771  } else {
3772  const auto& foreign_table_name = shared_dict_def.get_foreign_table();
3773  const auto foreign_td = getMetadataForTable(foreign_table_name, false);
3774  if (table_is_temporary(foreign_td)) {
3775  if (!table_is_temporary(&td)) {
3776  throw std::runtime_error(
3777  "Only temporary tables can share dictionaries with other temporary "
3778  "tables.");
3779  }
3780  addReferenceToForeignDict(cd, shared_dict_def, false);
3781  } else {
3782  addReferenceToForeignDict(cd, shared_dict_def, !table_is_temporary(&td));
3783  }
3784  }
3785  return true;
3786  }
3787  }
3788  return false;
3789 }
3790 
3791 void Catalog::setColumnDictionary(ColumnDescriptor& cd,
3792  std::list<DictDescriptor>& dds,
3793  const TableDescriptor& td,
3794  bool is_logical_table,
3795  bool use_temp_dictionary) {
3796  cat_write_lock write_lock(this);
3797 
3798  std::string dictName{"Initial_key"};
3799  int dictId{0};
3800  std::string folderPath;
3801  if (is_logical_table) {
3803 
3804  sqliteConnector_.query_with_text_params(
3805  "INSERT INTO mapd_dictionaries (name, nbits, is_shared, refcount) VALUES (?, ?, "
3806  "?, 1)",
3807  std::vector<std::string>{
3808  dictName, std::to_string(cd.columnType.get_comp_param()), "0"});
3809  sqliteConnector_.query_with_text_param(
3810  "SELECT dictid FROM mapd_dictionaries WHERE name = ?", dictName);
3811  dictId = sqliteConnector_.getData<int>(0, 0);
3812  dictName = td.tableName + "_" + cd.columnName + "_dict" + std::to_string(dictId);
3813  sqliteConnector_.query_with_text_param(
3814  "UPDATE mapd_dictionaries SET name = ? WHERE name = 'Initial_key'", dictName);
3815  folderPath = g_base_path + "/" + shared::kDataDirectoryName + "/DB_" +
3816  std::to_string(currentDB_.dbId) + "_DICT_" + std::to_string(dictId);
3817  }
3818  DictDescriptor dd(currentDB_.dbId,
3819  dictId,
3820  dictName,
3822  false,
3823  1,
3824  folderPath,
3825  use_temp_dictionary);
3826  dds.push_back(dd);
3827  if (!cd.columnType.is_array()) {
3829  }
3830  cd.columnType.set_comp_param(dictId);
3831 }
3832 
3833 void Catalog::createShardedTable(
3834  TableDescriptor& td,
3835  const list<ColumnDescriptor>& cols,
3836  const std::vector<Parser::SharedDictionaryDef>& shared_dict_defs) {
3837  /* create logical table */
3838  TableDescriptor* tdl = &td;
3839  createTable(*tdl, cols, shared_dict_defs, true); // create logical table
3840  int32_t logical_tb_id = tdl->tableId;
3841  std::string logical_table_name = tdl->tableName;
3842 
3843  /* create physical tables and link them to the logical table */
3844  std::vector<int32_t> physicalTables;
3845  for (int32_t i = 1; i <= td.nShards; i++) {
3846  TableDescriptor* tdp = &td;
3847  tdp->tableName = generatePhysicalTableName(logical_table_name, i);
3848  tdp->shard = i - 1;
3849  createTable(*tdp, cols, shared_dict_defs, false); // create physical table
3850  int32_t physical_tb_id = tdp->tableId;
3851 
3852  /* add physical table to the vector of physical tables */
3853  physicalTables.push_back(physical_tb_id);
3854  }
3855 
3856  if (!physicalTables.empty()) {
3857  cat_write_lock write_lock(this);
3858  /* add logical to physical tables correspondence to the map */
3859  const auto it_ok =
3860  logicalToPhysicalTableMapById_.emplace(logical_tb_id, physicalTables);
3861  CHECK(it_ok.second);
3862  /* update sqlite mapd_logical_to_physical in sqlite database */
3863  if (!table_is_temporary(&td)) {
3864  updateLogicalToPhysicalTableMap(logical_tb_id);
3865  }
3866  }
3867 }
3868 
3869 void Catalog::truncateTable(const TableDescriptor* td) {
3870  // truncate all corresponding physical tables
3871  const auto physical_tables = getPhysicalTablesDescriptors(td);
3872  for (const auto table : physical_tables) {
3873  doTruncateTable(table);
3874  }
3875 }
3876 
3877 void Catalog::doTruncateTable(const TableDescriptor* td) {
3878  // must destroy fragmenter before deleteChunks is called.
3879  removeFragmenterForTable(td->tableId);
3880 
3881  const int tableId = td->tableId;
3882  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
3883  // assuming deleteChunksWithPrefix is atomic
3884  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
3885  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
3886 
3887  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
3888 
3889  cat_write_lock write_lock(this);
3890  std::unique_ptr<StringDictionaryClient> client;
3891  if (SysCatalog::instance().isAggregator()) {
3892  CHECK(!string_dict_hosts_.empty());
3893  DictRef dict_ref(currentDB_.dbId, -1);
3894  client.reset(new StringDictionaryClient(string_dict_hosts_.front(), dict_ref, true));
3895  }
3896  // clean up any dictionaries
3897  // delete all column descriptors for the table
3898  for (const auto& columnDescriptor : columnDescriptorMapById_) {
3899  auto cd = columnDescriptor.second;
3900  if (cd->tableId != td->tableId) {
3901  continue;
3902  }
3903  const int dict_id = cd->columnType.get_comp_param();
3904  // Dummy dictionaries created for a shard of a logical table have the id set to zero.
3905  if (cd->columnType.get_compression() == kENCODING_DICT && dict_id) {
3906  const DictRef dict_ref(currentDB_.dbId, dict_id);
3907  const auto dictIt = dictDescriptorMapByRef_.find(dict_ref);
3908  CHECK(dictIt != dictDescriptorMapByRef_.end());
3909  const auto& dd = dictIt->second;
3910  CHECK_GE(dd->refcount, 1);
3911  // if this is the only table using this dict reset the dict
3912  if (dd->refcount == 1) {
3913  // close the dictionary
3914  dd->stringDict.reset();
3915  File_Namespace::renameForDelete(dd->dictFolderPath);
3916  if (client) {
3917  client->drop(dd->dictRef);
3918  }
3919  if (!dd->dictIsTemp) {
3920  boost::filesystem::create_directory(dd->dictFolderPath);
3921  }
3922  }
3923 
3924  DictDescriptor* new_dd = new DictDescriptor(dd->dictRef,
3925  dd->dictName,
3926  dd->dictNBits,
3927  dd->dictIsShared,
3928  dd->refcount,
3929  dd->dictFolderPath,
3930  dd->dictIsTemp);
3931  dictDescriptorMapByRef_.erase(dictIt);
3932  // now create new Dict -- need to figure out what to do here for temp tables
3933  if (client) {
3934  client->create(new_dd->dictRef, new_dd->dictIsTemp);
3935  }
3936  dictDescriptorMapByRef_[new_dd->dictRef].reset(new_dd);
3937  getMetadataForDict(new_dd->dictRef.dictId);
3938  }
3939  }
3940 }
3941 
3942 // NOTE(sy): Only used by --multi-instance clusters.
3943 void Catalog::invalidateCachesForTable(const int table_id) {
3944  // When called, exactly one thread has a LockMgr data or insert lock for the table.
3945  cat_read_lock read_lock(this);
3946  ChunkKey const table_key{getDatabaseId(), table_id};
3947  auto td = getMutableMetadataForTableUnlocked(table_id);
3948  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::GPU_LEVEL);
3949  getDataMgr().deleteChunksWithPrefix(table_key, MemoryLevel::CPU_LEVEL);
3950  DeleteTriggeredCacheInvalidator::invalidateCachesByTable(boost::hash_value(table_key));
3951  // TODO(sy): doTruncateTable() says "destroy fragmenter before deleteChunks is called"
3952  // removeFragmenterForTable(table_key[CHUNK_KEY_TABLE_IDX]);
3953  if (td->fragmenter != nullptr) {
3954  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3955  CHECK(tableDescIt != tableDescriptorMapById_.end());
3956  tableDescIt->second->fragmenter = nullptr;
3957  CHECK(td->fragmenter == nullptr);
3958  }
3959  getDataMgr().getGlobalFileMgr()->closeFileMgr(table_key[CHUNK_KEY_DB_IDX],
3960  table_key[CHUNK_KEY_TABLE_IDX]);
3961  // getMetadataForTable(table_key[CHUNK_KEY_TABLE_IDX], /*populateFragmenter=*/true);
3962  instantiateFragmenter(td);
3963 }
3964 
3965 void Catalog::removeFragmenterForTable(const int table_id) const {
3966  cat_write_lock write_lock(this);
3967  auto td = getMetadataForTable(table_id, false);
3968  if (td->fragmenter != nullptr) {
3969  auto tableDescIt = tableDescriptorMapById_.find(table_id);
3970  CHECK(tableDescIt != tableDescriptorMapById_.end());
3971  tableDescIt->second->fragmenter = nullptr;
3972  CHECK(td->fragmenter == nullptr);
3973  }
3974 }
3975 
3976 // used by rollback_table_epoch to clean up in memory artifacts after a rollback
3977 void Catalog::removeChunks(const int table_id) const {
3978  removeFragmenterForTable(table_id);
3979 
3980  // remove the chunks from in memory structures
3981  ChunkKey chunkKey = {currentDB_.dbId, table_id};
3982 
3983  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::CPU_LEVEL);
3984  dataMgr_->deleteChunksWithPrefix(chunkKey, MemoryLevel::GPU_LEVEL);
3985 }
3986 
3987 void Catalog::dropTable(const TableDescriptor* td) {
3988  SysCatalog::instance().revokeDBObjectPrivilegesFromAll(
3990  std::vector<const TableDescriptor*> tables_to_drop;
3991  {
3992  cat_read_lock read_lock(this);
3993  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
3994  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
3995  // remove all corresponding physical tables if this is a logical table
3996  const auto physicalTables = physicalTableIt->second;
3997  CHECK(!physicalTables.empty());
3998  for (size_t i = 0; i < physicalTables.size(); i++) {
3999  int32_t physical_tb_id = physicalTables[i];
4000  const TableDescriptor* phys_td =
4001  getMutableMetadataForTableUnlocked(physical_tb_id);
4002  CHECK(phys_td);
4003  tables_to_drop.emplace_back(phys_td);
4004  }
4005  }
4006  tables_to_drop.emplace_back(td);
4007  }
4008 
4009  for (auto table : tables_to_drop) {
4010  eraseTablePhysicalData(table);
4011  }
4012  deleteTableCatalogMetadata(td, tables_to_drop);
4013 }
4014 
4015 void Catalog::deleteTableCatalogMetadata(
4016  const TableDescriptor* logical_table,
4017  const std::vector<const TableDescriptor*>& physical_tables) {
4018  cat_write_lock write_lock(this);
4020  sqliteConnector_.query("BEGIN TRANSACTION");
4021  try {
4022  // remove corresponding record from the logicalToPhysicalTableMap in sqlite database
4023  sqliteConnector_.query_with_text_param(
4024  "DELETE FROM mapd_logical_to_physical WHERE logical_table_id = ?",
4025  std::to_string(logical_table->tableId));
4026  logicalToPhysicalTableMapById_.erase(logical_table->tableId);
4027  for (auto table : physical_tables) {
4028  eraseTableMetadata(table);
4029  }
4030  } catch (std::exception& e) {
4031  sqliteConnector_.query("ROLLBACK TRANSACTION");
4032  throw;
4033  }
4034  sqliteConnector_.query("END TRANSACTION");
4035 }
4036 
4037 void Catalog::eraseTableMetadata(const TableDescriptor* td) {
4038  executeDropTableSqliteQueries(td);
4040  dropTableFromJsonUnlocked(td->tableName);
4041  }
4042  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4043  {
4044  INJECT_TIMER(removeTableFromMap_);
4045  removeTableFromMap(td->tableName, td->tableId);
4046  }
4047 }
4048 
4049 void Catalog::executeDropTableSqliteQueries(const TableDescriptor* td) {
4050  const int tableId = td->tableId;
4051  sqliteConnector_.query_with_text_param("DELETE FROM mapd_tables WHERE tableid = ?",
4052  std::to_string(tableId));
4053  sqliteConnector_.query_with_text_params(
4054  "select comp_param from mapd_columns where compression = ? and tableid = ?",
4055  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4056  int numRows = sqliteConnector_.getNumRows();
4057  std::vector<int> dict_id_list;
4058  for (int r = 0; r < numRows; ++r) {
4059  dict_id_list.push_back(sqliteConnector_.getData<int>(r, 0));
4060  }
4061  for (auto dict_id : dict_id_list) {
4062  sqliteConnector_.query_with_text_params(
4063  "UPDATE mapd_dictionaries SET refcount = refcount - 1 WHERE dictid = ?",
4064  std::vector<std::string>{std::to_string(dict_id)});
4065  }
4066  sqliteConnector_.query_with_text_params(
4067  "DELETE FROM mapd_dictionaries WHERE dictid in (select comp_param from "
4068  "mapd_columns where compression = ? "
4069  "and tableid = ?) and refcount = 0",
4070  std::vector<std::string>{std::to_string(kENCODING_DICT), std::to_string(tableId)});
4071  sqliteConnector_.query_with_text_param("DELETE FROM mapd_columns WHERE tableid = ?",
4072  std::to_string(tableId));
4073  if (td->isView) {
4074  sqliteConnector_.query_with_text_param("DELETE FROM mapd_views WHERE tableid = ?",
4075  std::to_string(tableId));
4076  }
4078  sqliteConnector_.query_with_text_param(
4079  "DELETE FROM omnisci_foreign_tables WHERE table_id = ?", std::to_string(tableId));
4080  }
4081 }
4082 
4083 void Catalog::renamePhysicalTable(const TableDescriptor* td, const string& newTableName) {
4084  cat_write_lock write_lock(this);
4086 
4087  sqliteConnector_.query("BEGIN TRANSACTION");
4088  try {
4089  sqliteConnector_.query_with_text_params(
4090  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4091  std::vector<std::string>{newTableName, std::to_string(td->tableId)});
4092  } catch (std::exception& e) {
4093  sqliteConnector_.query("ROLLBACK TRANSACTION");
4094  throw;
4095  }
4096  sqliteConnector_.query("END TRANSACTION");
4097  TableDescriptorMap::iterator tableDescIt =
4098  tableDescriptorMap_.find(to_upper(td->tableName));
4099  CHECK(tableDescIt != tableDescriptorMap_.end());
4100  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4101  // Get table descriptor to change it
4102  TableDescriptor* changeTd = tableDescIt->second;
4103  changeTd->tableName = newTableName;
4104  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4105  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4106  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4107 }
4108 
4109 void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) {
4110  {
4111  cat_write_lock write_lock(this);
4112  cat_sqlite_lock sqlite_lock(this);
4113  // rename all corresponding physical tables if this is a logical table
4114  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(td->tableId);
4115  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4116  const auto physicalTables = physicalTableIt->second;
4117  CHECK(!physicalTables.empty());
4118  for (size_t i = 0; i < physicalTables.size(); i++) {
4119  int32_t physical_tb_id = physicalTables[i];
4120  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4121  CHECK(phys_td);
4122  std::string newPhysTableName =
4123  generatePhysicalTableName(newTableName, static_cast<int32_t>(i + 1));
4124  renamePhysicalTable(phys_td, newPhysTableName);
4125  }
4126  }
4127  renamePhysicalTable(td, newTableName);
4128  }
4129  {
4130  DBObject object(newTableName, TableDBObjectType);
4131  // update table name in direct and effective priv map
4132  DBObjectKey key;
4133  key.dbId = currentDB_.dbId;
4134  key.objectId = td->tableId;
4135  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4136  object.setObjectKey(key);
4137  auto objdescs = SysCatalog::instance().getMetadataForObject(
4138  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), td->tableId);
4139  for (auto obj : objdescs) {
4140  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4141  if (grnt) {
4142  grnt->renameDbObject(object);
4143  }
4144  }
4145  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4146  }
4147 }
4148 
4149 void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
4150  std::vector<int>& tableIds) {
4151  cat_write_lock write_lock(this);
4153 
4154  // execute the SQL query
4155  try {
4156  for (size_t i = 0; i < names.size(); i++) {
4157  int tableId = tableIds[i];
4158  std::string& newTableName = names[i].second;
4159 
4160  sqliteConnector_.query_with_text_params(
4161  "UPDATE mapd_tables SET name = ? WHERE tableid = ?",
4162  std::vector<std::string>{newTableName, std::to_string(tableId)});
4163  }
4164  } catch (std::exception& e) {
4165  sqliteConnector_.query("ROLLBACK TRANSACTION");
4166  throw;
4167  }
4168 
4169  // reset the table descriptors, give Calcite a kick
4170  for (size_t i = 0; i < names.size(); i++) {
4171  std::string& curTableName = names[i].first;
4172  std::string& newTableName = names[i].second;
4173 
4174  TableDescriptorMap::iterator tableDescIt =
4175  tableDescriptorMap_.find(to_upper(curTableName));
4176  CHECK(tableDescIt != tableDescriptorMap_.end());
4177  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4178 
4179  // Get table descriptor to change it
4180  TableDescriptor* changeTd = tableDescIt->second;
4181  changeTd->tableName = newTableName;
4182  tableDescriptorMap_.erase(tableDescIt); // erase entry under old name
4183  tableDescriptorMap_[to_upper(newTableName)] = changeTd;
4184  calciteMgr_->updateMetadata(currentDB_.dbName, curTableName);
4185  }
4186 }
4187 
4188 // Collect an 'overlay' mapping of the tableNames->tableId
4189 // to account for possible chained renames
4190 // (for swap: a->b, b->c, c->d, d->a)
4191 
4193  std::map<std::string, int>& cachedTableMap,
4194  std::string& curTableName) {
4195  auto iter = cachedTableMap.find(curTableName);
4196  if ((iter != cachedTableMap.end())) {
4197  // get the cached tableId
4198  // and use that to lookup the TableDescriptor
4199  int tableId = (*iter).second;
4200  if (tableId == -1) {
4201  return NULL;
4202  } else {
4203  return cat->getMetadataForTable(tableId);
4204  }
4205  }
4206 
4207  // else ... lookup in standard location
4208  return cat->getMetadataForTable(curTableName);
4209 }
4210 
4211 void replaceTableName(std::map<std::string, int>& cachedTableMap,
4212  std::string& curTableName,
4213  std::string& newTableName,
4214  int tableId) {
4215  // mark old/cur name as deleted
4216  cachedTableMap[curTableName] = -1;
4217 
4218  // insert the 'new' name
4219  cachedTableMap[newTableName] = tableId;
4220 }
4221 
4222 void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
4223  // tableId of all tables being renamed
4224  // ... in matching order to 'names'
4225  std::vector<int> tableIds;
4226 
4227  // (sorted & unique) list of tables ids for locking
4228  // (with names index of src in case of error)
4229  // <tableId, strIndex>
4230  // std::map is by definition/implementation sorted
4231  // std::map current usage below tests to avoid over-write
4232  std::map<int, size_t> uniqueOrderedTableIds;
4233 
4234  // mapping of modified tables names -> tableId
4235  std::map<std::string, int> cachedTableMap;
4236 
4237  // -------- Setup --------
4238 
4239  // gather tableIds pre-execute; build maps
4240  for (size_t i = 0; i < names.size(); i++) {
4241  std::string& curTableName = names[i].first;
4242  std::string& newTableName = names[i].second;
4243 
4244  // make sure the table being renamed exists,
4245  // or will exist when executed in 'name' order
4246  auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
4247  CHECK(td);
4248 
4249  tableIds.push_back(td->tableId);
4250  if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
4251  // don't overwrite as it should map to the first names index 'i'
4252  uniqueOrderedTableIds[td->tableId] = i;
4253  }
4254  replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
4255  }
4256 
4257  CHECK_EQ(tableIds.size(), names.size());
4258 
4259  // The outer Stmt created a write lock before calling the catalog rename table
4260  // -> TODO: might want to sort out which really should set the lock :
4261  // the comment in the outer scope indicates it should be in here
4262  // but it's not clear if the access done there *requires* it out there
4263  //
4264  // Lock tables pre-execute (may/will be in different order than rename occurs)
4265  // const auto execute_write_lock = heavyai::unique_lock<heavyai::shared_mutex>(
4266  // *legacylockmgr::LockMgr<heavyai::shared_mutex, bool>::getMutex(
4267  // legacylockmgr::ExecutorOuterLock, true));
4268 
4269  // acquire the locks for all tables being renamed
4271  for (auto& idPair : uniqueOrderedTableIds) {
4272  std::string& tableName = names[idPair.second].first;
4273  tableLocks.emplace_back(
4276  *this, tableName, false)));
4277  }
4278 
4279  // -------- Rename --------
4280 
4281  {
4282  cat_write_lock write_lock(this);
4284 
4285  sqliteConnector_.query("BEGIN TRANSACTION");
4286 
4287  // collect all (tables + physical tables) into a single list
4288  std::vector<std::pair<std::string, std::string>> allNames;
4289  std::vector<int> allTableIds;
4290 
4291  for (size_t i = 0; i < names.size(); i++) {
4292  int tableId = tableIds[i];
4293  std::string& curTableName = names[i].first;
4294  std::string& newTableName = names[i].second;
4295 
4296  // rename all corresponding physical tables if this is a logical table
4297  const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
4298  if (physicalTableIt != logicalToPhysicalTableMapById_.end()) {
4299  const auto physicalTables = physicalTableIt->second;
4300  CHECK(!physicalTables.empty());
4301  for (size_t k = 0; k < physicalTables.size(); k++) {
4302  int32_t physical_tb_id = physicalTables[k];
4303  const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
4304  CHECK(phys_td);
4305  std::string newPhysTableName =
4306  generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
4307  allNames.emplace_back(phys_td->tableName, newPhysTableName);
4308  allTableIds.push_back(phys_td->tableId);
4309  }
4310  }
4311  allNames.emplace_back(curTableName, newTableName);
4312  allTableIds.push_back(tableId);
4313  }
4314  // rename all tables in one shot
4315  renamePhysicalTable(allNames, allTableIds);
4316 
4317  sqliteConnector_.query("END TRANSACTION");
4318  // cat write/sqlite locks are released when they go out scope
4319  }
4320  {
4321  // now update the SysCatalog
4322  for (size_t i = 0; i < names.size(); i++) {
4323  int tableId = tableIds[i];
4324  std::string& newTableName = names[i].second;
4325  {
4326  // update table name in direct and effective priv map
4327  DBObjectKey key;
4328  key.dbId = currentDB_.dbId;
4329  key.objectId = tableId;
4330  key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);
4331 
4332  DBObject object(newTableName, TableDBObjectType);
4333  object.setObjectKey(key);
4334 
4335  auto objdescs = SysCatalog::instance().getMetadataForObject(
4336  currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
4337  for (auto obj : objdescs) {
4338  Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
4339  if (grnt) {
4340  grnt->renameDbObject(object);
4341  }
4342  }
4343  SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
4344  }
4345  }
4346  }
4347 
4348  // -------- Cleanup --------
4349 
4350  // table locks are released when 'tableLocks' goes out of scope
4351 }
4352 
4353 void Catalog::renameColumn(const TableDescriptor* td,
4354  const ColumnDescriptor* cd,
4355  const string& newColumnName) {
4356  cat_write_lock write_lock(this);
4358  sqliteConnector_.query("BEGIN TRANSACTION");
4359  try {
4360  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4361  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4362  CHECK(cdx);
4363  std::string new_column_name = cdx->columnName;
4364  new_column_name.replace(0, cd->columnName.size(), newColumnName);
4365  sqliteConnector_.query_with_text_params(
4366  "UPDATE mapd_columns SET name = ? WHERE tableid = ? AND columnid = ?",
4367  std::vector<std::string>{new_column_name,
4368  std::to_string(td->tableId),
4369  std::to_string(cdx->columnId)});
4370  }
4371  } catch (std::exception& e) {
4372  sqliteConnector_.query("ROLLBACK TRANSACTION");
4373  throw;
4374  }
4375  sqliteConnector_.query("END TRANSACTION");
4376  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4377  for (int i = 0; i <= cd->columnType.get_physical_cols(); ++i) {
4378  auto cdx = getMetadataForColumn(td->tableId, cd->columnId + i);
4379  CHECK(cdx);
4380  ColumnDescriptorMap::iterator columnDescIt = columnDescriptorMap_.find(
4381  std::make_tuple(td->tableId, to_upper(cdx->columnName)));
4382  CHECK(columnDescIt != columnDescriptorMap_.end());
4383  ColumnDescriptor* changeCd = columnDescIt->second;
4384  changeCd->columnName.replace(0, cd->columnName.size(), newColumnName);
4385  columnDescriptorMap_.erase(columnDescIt); // erase entry under old name
4386  columnDescriptorMap_[std::make_tuple(td->tableId, to_upper(changeCd->columnName))] =
4387  changeCd;
4388  }
4389  calciteMgr_->updateMetadata(currentDB_.dbName, td->tableName);
4390 }
4391 
4392 int32_t Catalog::createDashboard(DashboardDescriptor& vd) {
4393  cat_write_lock write_lock(this);
4395  sqliteConnector_.query("BEGIN TRANSACTION");
4396  try {
4397  // TODO(andrew): this should be an upsert
4398  sqliteConnector_.query_with_text_params(
4399  "SELECT id FROM mapd_dashboards WHERE name = ? and userid = ?",
4400  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4401  if (sqliteConnector_.getNumRows() > 0) {
4402  sqliteConnector_.query_with_text_params(
4403  "UPDATE mapd_dashboards SET state = ?, image_hash = ?, metadata = ?, "
4404  "update_time = "
4405  "datetime('now') where name = ? "
4406  "and userid = ?",
4407  std::vector<std::string>{vd.dashboardState,
4408  vd.imageHash,
4409  vd.dashboardMetadata,
4410  vd.dashboardName,
4411  std::to_string(vd.userId)});
4412  } else {
4413  sqliteConnector_.query_with_text_params(
4414  "INSERT INTO mapd_dashboards (name, state, image_hash, metadata, "
4415  "update_time, "
4416  "userid) "
4417  "VALUES "
4418  "(?,?,?,?, "
4419  "datetime('now'), ?)",
4420  std::vector<std::string>{vd.dashboardName,
4421  vd.dashboardState,
4422  vd.imageHash,
4423  vd.dashboardMetadata,
4424  std::to_string(vd.userId)});
4425  }
4426  } catch (std::exception& e) {
4427  sqliteConnector_.query("ROLLBACK TRANSACTION");
4428  throw;
4429  }
4430  sqliteConnector_.query("END TRANSACTION");
4431 
4432  // now get the auto generated dashboardId
4433  try {
4434  sqliteConnector_.query_with_text_params(
4435  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_dashboards "
4436  "WHERE name = ? and userid = ?",
4437  std::vector<std::string>{vd.dashboardName, std::to_string(vd.userId)});
4438  vd.dashboardId = sqliteConnector_.getData<int>(0, 0);
4439  vd.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4440  } catch (std::exception& e) {
4441  throw;
4442  }
4444  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4445  addFrontendViewToMap(vd);
4446  sqlite_lock.unlock();
4447  write_lock.unlock();
4448  if (!isInfoSchemaDb()) {
4449  // NOTE(wamsi): Transactionally unsafe
4450  createOrUpdateDashboardSystemRole(
4452  }
4453  return vd.dashboardId;
4454 }
4455 
4456 void Catalog::replaceDashboard(DashboardDescriptor& vd) {
4457  cat_write_lock write_lock(this);
4459 
4460  CHECK(sqliteConnector_.getSqlitePtr());
4461  sqliteConnector_.query("BEGIN TRANSACTION");
4462  try {
4463  sqliteConnector_.query_with_text_params(
4464  "SELECT id FROM mapd_dashboards WHERE id = ?",
4465  std::vector<std::string>{std::to_string(vd.dashboardId)});
4466  if (sqliteConnector_.getNumRows() > 0) {
4467  sqliteConnector_.query_with_text_params(
4468  "UPDATE mapd_dashboards SET name = ?, state = ?, image_hash = ?, metadata = "
4469  "?, userid = ?, update_time = datetime('now') where id = ? ",
4470  std::vector<std::string>{vd.dashboardName,
4471  vd.dashboardState,
4472  vd.imageHash,
4473  vd.dashboardMetadata,
4474  std::to_string(vd.userId),
4476  } else {
4477  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4478  << " does not exist in db";
4479  throw runtime_error("Error replacing dashboard id " +
4480  std::to_string(vd.dashboardId) + " does not exist in db");
4481  }
4482  } catch (std::exception& e) {
4483  sqliteConnector_.query("ROLLBACK TRANSACTION");
4484  throw;
4485  }
4486  sqliteConnector_.query("END TRANSACTION");
4487 
4488  bool found{false};
4489  for (auto descp : dashboardDescriptorMap_) {
4490  auto dash = descp.second.get();
4491  if (dash->dashboardId == vd.dashboardId) {
4492  found = true;
4493  auto viewDescIt = dashboardDescriptorMap_.find(std::to_string(dash->userId) + ":" +
4494  dash->dashboardName);
4495  if (viewDescIt ==
4496  dashboardDescriptorMap_.end()) { // check to make sure view exists
4497  LOG(ERROR) << "No metadata for dashboard for user " << dash->userId
4498  << " dashboard " << dash->dashboardName << " does not exist in map";
4499  throw runtime_error("No metadata for dashboard for user " +
4500  std::to_string(dash->userId) + " dashboard " +
4501  dash->dashboardName + " does not exist in map");
4502  }
4503  dashboardDescriptorMap_.erase(viewDescIt);
4504  break;
4505  }
4506  }
4507  if (!found) {
4508  LOG(ERROR) << "Error replacing dashboard id " << vd.dashboardId
4509  << " does not exist in map";
4510  throw runtime_error("Error replacing dashboard id " + std::to_string(vd.dashboardId) +
4511  " does not exist in map");
4512  }
4513 
4514  // now reload the object
4515  sqliteConnector_.query_with_text_params(
4516  "SELECT id, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM "
4517  "mapd_dashboards "
4518  "WHERE id = ?",
4519  std::vector<std::string>{std::to_string(vd.dashboardId)});
4520  vd.updateTime = sqliteConnector_.getData<string>(0, 1);
4522  std::to_string(currentDB_.dbId), std::to_string(vd.dashboardId));
4523  addFrontendViewToMapNoLock(vd);
4524  sqlite_lock.unlock();
4525  write_lock.unlock();
4526  if (!isInfoSchemaDb()) {
4527  // NOTE(wamsi): Transactionally unsafe
4528  createOrUpdateDashboardSystemRole(
4530  }
4531 }
4532 
4533 std::string Catalog::calculateSHA1(const std::string& data) {
4534  boost::uuids::detail::sha1 sha1;
4535  unsigned int digest[5];
4536  sha1.process_bytes(data.c_str(), data.length());
4537  sha1.get_digest(digest);
4538  std::stringstream ss;
4539  for (size_t i = 0; i < 5; i++) {
4540  ss << std::hex << digest[i];
4541  }
4542  return ss.str();
4543 }
4544 
4545 std::string Catalog::createLink(LinkDescriptor& ld, size_t min_length) {
4546  cat_write_lock write_lock(this);
4548  sqliteConnector_.query("BEGIN TRANSACTION");
4549  try {
4550  ld.link = calculateSHA1(ld.viewState + ld.viewMetadata + std::to_string(ld.userId))
4551  .substr(0, 8);
4552  sqliteConnector_.query_with_text_params(
4553  "SELECT linkid FROM mapd_links WHERE link = ? and userid = ?",
4554  std::vector<std::string>{ld.link, std::to_string(ld.userId)});
4555  if (sqliteConnector_.getNumRows() > 0) {
4556  sqliteConnector_.query_with_text_params(
4557  "UPDATE mapd_links SET update_time = datetime('now') WHERE userid = ? AND "
4558  "link = ?",
4559  std::vector<std::string>{std::to_string(ld.userId), ld.link});
4560  } else {
4561  sqliteConnector_.query_with_text_params(
4562  "INSERT INTO mapd_links (userid, link, view_state, view_metadata, "
4563  "update_time) VALUES (?,?,?,?, datetime('now'))",
4564  std::vector<std::string>{
4565  std::to_string(ld.userId), ld.link, ld.viewState, ld.viewMetadata});
4566  }
4567  // now get the auto generated dashid
4568  sqliteConnector_.query_with_text_param(
4569  "SELECT linkid, strftime('%Y-%m-%dT%H:%M:%SZ', update_time) FROM mapd_links "
4570  "WHERE link = ?",
4571  ld.link);
4572  ld.linkId = sqliteConnector_.getData<int>(0, 0);
4573  ld.updateTime = sqliteConnector_.getData<std::string>(0, 1);
4574  } catch (std::exception& e) {
4575  sqliteConnector_.query("ROLLBACK TRANSACTION");
4576  throw;
4577  }
4578  sqliteConnector_.query("END TRANSACTION");
4579  addLinkToMap(ld);
4580  return ld.link;
4581 }
4582 
4583 const ColumnDescriptor* Catalog::getShardColumnMetadataForTable(
4584  const TableDescriptor* td) const {
4585  cat_read_lock read_lock(this);
4586 
4587  const auto column_descriptors =
4588  getAllColumnMetadataForTable(td->tableId, false, true, true);
4589 
4590  const ColumnDescriptor* shard_cd{nullptr};
4591  int i = 1;
4592  for (auto cd_itr = column_descriptors.begin(); cd_itr != column_descriptors.end();
4593  ++cd_itr, ++i) {
4594  if (i == td->shardedColumnId) {
4595  shard_cd = *cd_itr;
4596  }
4597  }
4598  return shard_cd;
4599 }
4600 
4601 std::vector<const TableDescriptor*> Catalog::getPhysicalTablesDescriptors(
4602  const TableDescriptor* logical_table_desc,
4603  bool populate_fragmenter) const {
4604  cat_read_lock read_lock(this);
4605  const auto physicalTableIt =
4606  logicalToPhysicalTableMapById_.find(logical_table_desc->tableId);
4607  if (physicalTableIt == logicalToPhysicalTableMapById_.end()) {
4608  return {logical_table_desc};
4609  }
4610  const auto physicalTablesIds = physicalTableIt->second;
4611  CHECK(!physicalTablesIds.empty());
4612  read_lock.unlock();
4613  std::vector<const TableDescriptor*> physicalTables;
4614  for (size_t i = 0; i < physicalTablesIds.size(); i++) {
4615  physicalTables.push_back(
4616  getMetadataForTable(physicalTablesIds[i], populate_fragmenter));
4617  }
4618  return physicalTables;
4619 }
4620 
4621 std::vector<std::pair<int32_t, int32_t>> Catalog::getAllPersistedTableAndShardIds()
4622  const {
4623  cat_read_lock read_lock(this);
4624  std::vector<std::pair<int32_t, int32_t>> table_and_shard_ids;
4625  table_and_shard_ids.reserve(tableDescriptorMapById_.size());
4626  for (const auto [table_id, td] : tableDescriptorMapById_) {
4627  // Only include ids for physical persisted tables
4628  if (!td->isView && !td->isTemporaryTable() && !td->isForeignTable() &&
4629  logicalToPhysicalTableMapById_.find(table_id) ==
4630  logicalToPhysicalTableMapById_.end()) {
4631  table_and_shard_ids.emplace_back(table_id, td->shard);
4632  }
4633  }
4634  return table_and_shard_ids;
4635 }
4636 
4637 const std::map<int, const ColumnDescriptor*> Catalog::getDictionaryToColumnMapping() {
4638  cat_read_lock read_lock(this);
4639 
4640  std::map<int, const ColumnDescriptor*> mapping;
4641 
4642  const auto tables = getAllTableMetadata();
4643  for (const auto td : tables) {
4644  if (td->shard >= 0) {
4645  // skip shards, they're not standalone tables
4646  continue;
4647  }
4648 
4649  for (auto& cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4650  const auto& ti = cd->columnType;
4651  if (ti.is_string()) {
4652  if (ti.get_compression() == kENCODING_DICT) {
4653  // if foreign reference, get referenced tab.col
4654  const auto dict_id = ti.get_comp_param();
4655 
4656  // ignore temp (negative) dictionaries
4657  if (dict_id > 0 && mapping.end() == mapping.find(dict_id)) {
4658  mapping[dict_id] = cd;
4659  }
4660  }
4661  }
4662  }
4663  }
4664 
4665  return mapping;
4666 }
4667 
4668 bool Catalog::filterTableByTypeAndUser(const TableDescriptor* td,
4669  const UserMetadata& user_metadata,
4670  const GetTablesType get_tables_type) const {
4671  if (td->shard >= 0) {
4672  // skip shards, they're not standalone tables
4673  return false;
4674  }
4675  switch (get_tables_type) {
4676  case GET_PHYSICAL_TABLES: {
4677  if (td->isView) {
4678  return false;
4679  }
4680  break;
4681  }
4682  case GET_VIEWS: {
4683  if (!td->isView) {
4684  return false;
4685  }
4686  break;
4687  }
4688  default:
4689  break;
4690  }
4692  dbObject.loadKey(*this);
4693  std::vector<DBObject> privObjects = {dbObject};
4694  if (!SysCatalog::instance().hasAnyPrivileges(user_metadata, privObjects)) {
4695  // skip table, as there are no privileges to access it
4696  return false;
4697  }
4698  return true;
4699 }
4700 
4701 std::vector<std::string> Catalog::getTableNamesForUser(
4702  const UserMetadata& user_metadata,
4703  const GetTablesType get_tables_type) const {
4704  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4705  cat_read_lock read_lock(this);
4706  std::vector<std::string> table_names;
4707  const auto tables = getAllTableMetadata();
4708  for (const auto td : tables) {
4709  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4710  table_names.push_back(td->tableName);
4711  }
4712  }
4713  return table_names;
4714 }
4715 
4716 std::vector<TableMetadata> Catalog::getTablesMetadataForUser(
4717  const UserMetadata& user_metadata,
4718  const GetTablesType get_tables_type,
4719  const std::string& filter_table_name) const {
4720  sys_read_lock syscat_read_lock(&SysCatalog::instance());
4721  cat_read_lock read_lock(this);
4722 
4723  std::vector<TableMetadata> tables_metadata;
4724  const auto tables = getAllTableMetadata();
4725  for (const auto td : tables) {
4726  if (filterTableByTypeAndUser(td, user_metadata, get_tables_type)) {
4727  if (!filter_table_name.empty()) {
4728  if (td->tableName != filter_table_name) {
4729  continue;
4730  }
4731  }
4732  TableMetadata table_metadata(td); // Makes a copy, not safe to access raw table
4733  // descriptor outside catalog lock
4734  tables_metadata.emplace_back(table_metadata);
4735  }
4736  }
4737  return tables_metadata;
4738 }
4739 
4740 int Catalog::getLogicalTableId(const int physicalTableId) const {
4741  cat_read_lock read_lock(this);
4742  for (const auto& l : logicalToPhysicalTableMapById_) {
4743  if (l.second.end() != std::find_if(l.second.begin(),
4744  l.second.end(),
4745  [&](decltype(*l.second.begin()) tid) -> bool {
4746  return physicalTableId == tid;
4747  })) {
4748  return l.first;
4749  }
4750  }
4751  return physicalTableId;
4752 }
4753 
4754 void Catalog::checkpoint(const int logicalTableId) const {
4755  const auto td = getMetadataForTable(logicalTableId);
4756  const auto shards = getPhysicalTablesDescriptors(td);
4757  for (const auto shard : shards) {
4758  getDataMgr().checkpoint(getCurrentDB().dbId, shard->tableId);
4759  }
4760 }
4761 
4762 void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
4763  auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
4764  try {
4765  checkpoint(logical_table_id);
4766  } catch (...) {
4767  setTableEpochsLogExceptions(getDatabaseId(), table_epochs);
4768  throw;
4769  }
4770 }
4771 
4772 void Catalog::resetTableEpochFloor(const int logicalTableId) const {
4773  cat_read_lock read_lock(this);
4774  const auto td = getMetadataForTable(logicalTableId, false);
4775  const auto shards = getPhysicalTablesDescriptors(td, false);
4776  for (const auto shard : shards) {
4777  getDataMgr().resetTableEpochFloor(getCurrentDB().dbId, shard->tableId);
4778  }
4779 }
4780 
4781 void Catalog::eraseDbMetadata() {
4782  const auto tables = getAllTableMetadata();
4783  for (const auto table : tables) {
4784  eraseTableMetadata(table);
4785  }
4786  // Physically erase database metadata
4787  boost::filesystem::remove(basePath_ + "/" + shared::kCatalogDirectoryName + "/" +
4788  currentDB_.dbName);
4789  calciteMgr_->updateMetadata(currentDB_.dbName, "");
4790 }
4791 
4792 void Catalog::eraseDbPhysicalData() {
4793  const auto tables = getAllTableMetadata();
4794  for (const auto table : tables) {
4795  eraseTablePhysicalData(table);
4796  }
4797 }
4798 
4799 void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
4800  const int tableId = td->tableId;
4801  // must destroy fragmenter before deleteChunks is called.
4802  removeFragmenterForTable(tableId);
4803 
4804  ChunkKey chunkKeyPrefix = {currentDB_.dbId, tableId};
4805  {
4806  INJECT_TIMER(deleteChunksWithPrefix);
4807  // assuming deleteChunksWithPrefix is atomic
4808  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::CPU_LEVEL);
4809  dataMgr_->deleteChunksWithPrefix(chunkKeyPrefix, MemoryLevel::GPU_LEVEL);
4810  }
4811  if (!td->isView) {
4812  INJECT_TIMER(Remove_Table);
4813  dataMgr_->removeTableRelatedDS(currentDB_.dbId, tableId);
4814  }
4815 }
4816 
4817 std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
4818  const int32_t& shardNumber) {
4819  std::string physicalTableName =
4820  logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
4821  return (physicalTableName);
4822 }
4823 
4824 void Catalog::buildForeignServerMapUnlocked() {
4826  sqliteConnector_.query(
4827  "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
4828  "omnisci_foreign_servers");
4829  auto num_rows = sqliteConnector_.getNumRows();
4830 
4831  for (size_t row = 0; row < num_rows; row++) {
4832  auto foreign_server = std::make_shared<foreign_storage::ForeignServer>(
4833  sqliteConnector_.getData<int>(row, 0),
4834  sqliteConnector_.getData<std::string>(row, 1),
4835  sqliteConnector_.getData<std::string>(row, 2),
4836  sqliteConnector_.getData<std::string>(row, 3),
4837  sqliteConnector_.getData<std::int32_t>(row, 4),
4838  sqliteConnector_.getData<std::int32_t>(row, 5));
4839  foreignServerMap_[foreign_server->name] = foreign_server;
4840  foreignServerMapById_[foreign_server->id] = foreign_server;
4841  }
4842 }
4843 
4844 void Catalog::updateForeignTablesInMapUnlocked() {
4846  sqliteConnector_.query(
4847  "SELECT table_id, server_id, options, last_refresh_time, next_refresh_time from "
4848  "omnisci_foreign_tables");
4849  auto num_rows = sqliteConnector_.getNumRows();
4850  for (size_t r = 0; r < num_rows; r++) {
4851  const auto table_id = sqliteConnector_.getData<int32_t>(r, 0);
4852  const auto server_id = sqliteConnector_.getData<int32_t>(r, 1);
4853  const auto& options = sqliteConnector_.getData<std::string>(r, 2);
4854  const auto last_refresh_time = sqliteConnector_.getData<int64_t>(r, 3);
4855  const auto next_refresh_time = sqliteConnector_.getData<int64_t>(r, 4);
4856 
4857  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
4858  auto foreign_table =
4859  dynamic_cast<foreign_storage::ForeignTable*>(tableDescriptorMapById_[table_id]);
4860  CHECK(foreign_table);
4861  foreign_table->foreign_server = foreignServerMapById_[server_id].get();
4862  CHECK(foreign_table->foreign_server);
4863  foreign_table->populateOptionsMap(options);
4864  foreign_table->last_refresh_time = last_refresh_time;
4865  foreign_table->next_refresh_time = next_refresh_time;
4866  if (foreign_table->is_system_table) {
4867  foreign_table->is_in_memory_system_table =
4869  foreign_table->foreign_server->data_wrapper_type);
4870  }
4871  }
4872 }
4873 
4874 void Catalog::setForeignServerProperty(const std::string& server_name,
4875  const std::string& property,
4876  const std::string& value) {
4878  sqliteConnector_.query_with_text_params(
4879  "SELECT id from omnisci_foreign_servers where name = ?",
4880  std::vector<std::string>{server_name});
4881  auto num_rows = sqliteConnector_.getNumRows();
4882  if (num_rows > 0) {
4883  CHECK_EQ(size_t(1), num_rows);
4884  auto server_id = sqliteConnector_.getData<int32_t>(0, 0);
4885  sqliteConnector_.query_with_text_params(
4886  "UPDATE omnisci_foreign_servers SET " + property + " = ? WHERE id = ?",
4887  std::vector<std::string>{value, std::to_string(server_id)});
4888  } else {
4889  throw std::runtime_error{"Can not change property \"" + property +
4890  "\" for foreign server." + " Foreign server \"" +
4891  server_name + "\" is not found."};
4892  }
4893 }
4894 
4895 void Catalog::createDefaultServersIfNotExists() {
4900 
4901  auto local_csv_server = std::make_unique<foreign_storage::ForeignServer>(
4902  "default_local_delimited",
4904  options,
4906  local_csv_server->validate();
4907  createForeignServerNoLocks(std::move(local_csv_server), true);
4908 
4909 #ifdef ENABLE_IMPORT_PARQUET
4910  auto local_parquet_server = std::make_unique<foreign_storage::ForeignServer>(
4911  "default_local_parquet",
4913  options,
4915  local_parquet_server->validate();
4916  createForeignServerNoLocks(std::move(local_parquet_server), true);
4917 #endif
4918 
4919  auto local_regex_parser_server = std::make_unique<foreign_storage::ForeignServer>(
4920  "default_local_regex_parsed",
4922  options,
4924  local_regex_parser_server->validate();
4925  createForeignServerNoLocks(std::move(local_regex_parser_server), true);
4926 }
4927 
4928 // prepare a fresh file reload on next table access
4929 void Catalog::setForReload(const int32_t tableId) {
4930  const auto td = getMetadataForTable(tableId);
4931  for (const auto shard : getPhysicalTablesDescriptors(td)) {
4932  const auto tableEpoch = getTableEpoch(currentDB_.dbId, shard->tableId);
4933  setTableEpoch(currentDB_.dbId, shard->tableId, tableEpoch);
4934  }
4935 }
4936 
4937 // get a table's data dirs
4938 std::vector<std::string> Catalog::getTableDataDirectories(
4939  const TableDescriptor* td) const {
4940  const auto global_file_mgr = getDataMgr().getGlobalFileMgr();
4941  std::vector<std::string> file_paths;
4942  for (auto shard : getPhysicalTablesDescriptors(td)) {
4943  const auto file_mgr = dynamic_cast<File_Namespace::FileMgr*>(
4944  global_file_mgr->getFileMgr(currentDB_.dbId, shard->tableId));
4945  boost::filesystem::path file_path(file_mgr->getFileMgrBasePath());
4946  file_paths.push_back(file_path.filename().string());
4947  }
4948  return file_paths;
4949 }
4950 
4951 // get a column's dict dir basename
4952 std::string Catalog::getColumnDictDirectory(const ColumnDescriptor* cd,
4953  bool file_name_only) const {
4954  if ((cd->columnType.is_string() || cd->columnType.is_string_array()) &&
4956  cd->columnType.get_comp_param() > 0) {
4957  const auto dictId = cd->columnType.get_comp_param();
4958  const DictRef dictRef(currentDB_.dbId, dictId);
4959  const auto dit = dictDescriptorMapByRef_.find(dictRef);
4960  CHECK(dit != dictDescriptorMapByRef_.end());
4961  CHECK(dit->second);
4962  if (file_name_only) {
4963  boost::filesystem::path file_path(dit->second->dictFolderPath);
4964  return file_path.filename().string();
4965  } else {
4966  return dit->second->dictFolderPath;
4967  }
4968  }
4969  return std::string();
4970 }
4971 
4972 // get a table's dict dirs
4973 std::vector<std::string> Catalog::getTableDictDirectories(
4974  const TableDescriptor* td) const {
4975  std::vector<std::string> file_paths;
4976  for (auto cd : getAllColumnMetadataForTable(td->tableId, false, false, true)) {
4977  auto file_base = getColumnDictDirectory(cd);
4978  if (!file_base.empty() &&
4979  file_paths.end() == std::find(file_paths.begin(), file_paths.end(), file_base)) {
4980  file_paths.push_back(file_base);
4981  }
4982  }
4983  return file_paths;
4984 }
4985 
4986 std::set<std::string> Catalog::getTableDictDirectoryPaths(int32_t table_id) const {
4987  cat_read_lock read_lock(this);
4988  std::set<std::string> directory_paths;
4989  auto it = dict_columns_by_table_id_.find(table_id);
4990  if (it != dict_columns_by_table_id_.end()) {
4991  for (auto cd : it->second) {
4992  auto directory_path = getColumnDictDirectory(cd, false);
4993  if (!directory_path.empty()) {
4994  directory_paths.emplace(directory_path);
4995  }
4996  }
4997  }
4998  return directory_paths;
4999 }
5000 
5001 // returns table schema in a string
5002 // NOTE(sy): Might be able to replace dumpSchema() later with
5003 // dumpCreateTable() after a deeper review of the TableArchiver code.
5004 std::string Catalog::dumpSchema(const TableDescriptor* td) const {
5005  CHECK(!td->is_system_table);
5006  cat_read_lock read_lock(this);
5007 
5008  std::ostringstream os;
5009  os << "CREATE TABLE @T (";
5010  // gather column defines
5011  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
5012  std::string comma;
5013  std::vector<std::string> shared_dicts;
5014  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5015  for (const auto cd : cds) {
5016  if (!(cd->isSystemCol || cd->isVirtualCol)) {
5017  const auto& ti = cd->columnType;
5018  os << comma << cd->columnName;
5019  // CHAR is perculiar... better dump it as TEXT(32) like \d does
5020  if (ti.get_type() == SQLTypes::kCHAR) {
5021  os << " "
5022  << "TEXT";
5023  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
5024  os << " "
5025  << "TEXT[]";
5026  } else {
5027  os << " " << ti.get_type_name();
5028  }
5029  os << (ti.get_notnull() ? " NOT NULL" : "");
5030  if (cd->default_value.has_value()) {
5031  os << " DEFAULT " << cd->getDefaultValueLiteral();
5032  }
5033  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
5034  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5035  if (ti.get_compression() == kENCODING_DICT) {
5036  // if foreign reference, get referenced tab.col
5037  const auto dict_id = ti.get_comp_param();
5038  const DictRef dict_ref(currentDB_.dbId, dict_id);
5039  const auto dict_it = dictDescriptorMapByRef_.find(dict_ref);
5040  CHECK(dict_it != dictDescriptorMapByRef_.end());
5041  const auto dict_name = dict_it->second->dictName;
5042  // when migrating a table, any foreign dict ref will be dropped
5043  // and the first cd of a dict will become root of the dict
5044  if (dict_root_cds.end() == dict_root_cds.find(dict_name)) {
5045  dict_root_cds[dict_name] = cd;
5046  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
5047  } else {
5048  const auto dict_root_cd = dict_root_cds[dict_name];
5049  shared_dicts.push_back("SHARED DICTIONARY (" + cd->columnName +
5050  ") REFERENCES @T(" + dict_root_cd->columnName + ")");
5051  // "... shouldn't specify an encoding, it borrows from the referenced
5052  // column"
5053  }
5054  } else {
5055  os << " ENCODING NONE";
5056  }
5057  } else if (ti.is_date_in_days() ||
5058  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5059  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5060  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
5061  } else if (ti.is_geometry()) {
5062  if (ti.get_compression() == kENCODING_GEOINT) {
5063  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
5064  << ")";
5065  } else {
5066  os << " ENCODING NONE";
5067  }
5068  }
5069  comma = ", ";
5070  }
5071  }
5072  // gather SHARED DICTIONARYs
5073  if (shared_dicts.size()) {
5074  os << ", " << boost::algorithm::join(shared_dicts, ", ");
5075  }
5076  // gather WITH options ...
5077  std::vector<std::string> with_options;
5078  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
5079  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
5080  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
5081  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
5082  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
5083  : "VACUUM='IMMEDIATE'");
5084  if (!td->partitions.empty()) {
5085  with_options.push_back("PARTITIONS='" + td->partitions + "'");
5086  }
5087  if (td->nShards > 0) {
5088  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
5089  CHECK(shard_cd);
5090  os << ", SHARD KEY(" << shard_cd->columnName << ")";
5091  with_options.push_back(
5092  "SHARD_COUNT=" +
5093  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
5094  }
5095  if (td->sortedColumnId > 0) {
5096  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
5097  CHECK(sort_cd);
5098  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
5099  }
5101  td->maxRollbackEpochs != -1) {
5102  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
5104  }
5105  os << ") WITH (" + boost::algorithm::join(with_options, ", ") + ");";
5106  return os.str();
5107 }
5108 
5109 #include "Parser/ReservedKeywords.h"
5110 
5112 inline bool contains_spaces(std::string_view str) {
5113  return std::find_if(str.begin(), str.end(), [](const unsigned char& ch) {
5114  return std::isspace(ch);
5115  }) != str.end();
5116 }
5117 
5120  std::string_view str,
5121  std::string_view chars = "`~!@#$%^&*()-=+[{]}\\|;:'\",<.>/?") {
5122  return str.find_first_of(chars) != std::string_view::npos;
5123 }
5124 
5126 inline bool is_reserved_sql_keyword(std::string_view str) {
5127  return reserved_keywords.find(to_upper(std::string(str))) != reserved_keywords.end();
5128 }
5129 
5130 // returns a "CREATE TABLE" statement in a string for "SHOW CREATE TABLE"
5131 std::string Catalog::dumpCreateTable(const TableDescriptor* td,
5132  bool multiline_formatting,
5133  bool dump_defaults) const {
5134  cat_read_lock read_lock(this);
5135  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5136 }
5137 
5138 std::optional<std::string> Catalog::dumpCreateTable(int32_t table_id,
5139  bool multiline_formatting,
5140  bool dump_defaults) const {
5141  cat_read_lock read_lock(this);
5142  const auto td = getMutableMetadataForTableUnlocked(table_id);
5143  if (!td) {
5144  return {};
5145  }
5146  return dumpCreateTableUnlocked(td, multiline_formatting, dump_defaults);
5147 }
5148 
5149 std::string Catalog::dumpCreateTableUnlocked(const TableDescriptor* td,
5150  bool multiline_formatting,
5151  bool dump_defaults) const {
5152  auto foreign_table = dynamic_cast<const foreign_storage::ForeignTable*>(td);
5153  std::ostringstream os;
5154 
5155  if (foreign_table && !td->is_system_table) {
5156  os << "CREATE FOREIGN TABLE " << td->tableName << " (";
5157  } else if (!td->isView) {
5158  os << "CREATE ";
5160  os << "TEMPORARY ";
5161  }
5162  os << "TABLE " + td->tableName + " (";
5163  } else {
5164  os << "CREATE VIEW " + td->tableName + " AS " << td->viewSQL;
5165  return os.str();
5166  }
5167  // scan column defines
5168  std::vector<std::string> additional_info;
5169  std::set<std::string> shared_dict_column_names;
5170 
5171  gatherAdditionalInfo(additional_info, shared_dict_column_names, td);
5172 
5173  // gather column defines
5174  const auto cds = getAllColumnMetadataForTable(td->tableId, false, false, false);
5175  std::map<const std::string, const ColumnDescriptor*> dict_root_cds;
5176  bool first = true;
5177  for (const auto cd : cds) {
5178  if (!(cd->isSystemCol || cd->isVirtualCol)) {
5179  const auto& ti = cd->columnType;
5180  if (!first) {
5181  os << ",";
5182  if (!multiline_formatting) {
5183  os << " ";
5184  }
5185  } else {
5186  first = false;
5187  }
5188  if (multiline_formatting) {
5189  os << "\n ";
5190  }
5191  // column name
5192  os << quoteIfRequired(cd->columnName);
5193  // CHAR is perculiar... better dump it as TEXT(32) like \d does
5194  if (ti.get_type() == SQLTypes::kCHAR) {
5195  os << " "
5196  << "TEXT";
5197  } else if (ti.get_subtype() == SQLTypes::kCHAR) {
5198  os << " "
5199  << "TEXT[]";
5200  } else {
5201  os << " " << ti.get_type_name();
5202  }
5203  os << (ti.get_notnull() ? " NOT NULL" : "");
5204  if (cd->default_value.has_value()) {
5205  os << " DEFAULT " << cd->getDefaultValueLiteral();
5206  }
5207  if (shared_dict_column_names.find(cd->columnName) ==
5208  shared_dict_column_names.end()) {
5209  // avoids "Column ... shouldn't specify an encoding, it borrows it
5210  // from the referenced column"
5211  if (ti.is_string() || (ti.is_array() && ti.get_subtype() == kTEXT)) {
5212  auto size = ti.is_array() ? ti.get_logical_size() : ti.get_size();
5213  if (ti.get_compression() == kENCODING_DICT) {
5214  os << " ENCODING " << ti.get_compression_name() << "(" << (size * 8) << ")";
5215  } else {
5216  os << " ENCODING NONE";
5217  }
5218  } else if (ti.is_date_in_days() ||
5219  (ti.get_size() > 0 && ti.get_size() != ti.get_logical_size())) {
5220  const auto comp_param = ti.get_comp_param() ? ti.get_comp_param() : 32;
5221  os << " ENCODING " << ti.get_compression_name() << "(" << comp_param << ")";
5222  } else if (ti.is_geometry()) {
5223  if (ti.get_compression() == kENCODING_GEOINT) {
5224  os << " ENCODING " << ti.get_compression_name() << "(" << ti.get_comp_param()
5225  << ")";
5226  } else {
5227  os << " ENCODING NONE";
5228  }
5229  }
5230  }
5231  }
5232  }
5233  // gather SHARED DICTIONARYs
5234  if (additional_info.size()) {
5235  std::string comma;
5236  if (!multiline_formatting) {
5237  comma = ", ";
5238  } else {
5239  comma = ",\n ";
5240  }
5241  os << comma;
5242  os << boost::algorithm::join(additional_info, comma);
5243  }
5244  os << ")";
5245 
5246  std::vector<std::string> with_options;
5247  if (foreign_table && !td->is_system_table) {
5248  if (multiline_formatting) {
5249  os << "\n";
5250  } else {
5251  os << " ";
5252  }
5253  os << "SERVER " << foreign_table->foreign_server->name;
5254 
5255  // gather WITH options ...
5256  for (const auto& [option, value] : foreign_table->options) {
5257  with_options.emplace_back(option + "='" + value + "'");
5258  }
5259  }
5260 
5261  if (dump_defaults || td->maxFragRows != DEFAULT_FRAGMENT_ROWS) {
5262  with_options.push_back("FRAGMENT_SIZE=" + std::to_string(td->maxFragRows));
5263  }
5264  if (dump_defaults || td->maxChunkSize != DEFAULT_MAX_CHUNK_SIZE) {
5265  with_options.push_back("MAX_CHUNK_SIZE=" + std::to_string(td->maxChunkSize));
5266  }
5267  if (!foreign_table && (dump_defaults || td->fragPageSize != DEFAULT_PAGE_SIZE)) {
5268  with_options.push_back("PAGE_SIZE=" + std::to_string(td->fragPageSize));
5269  }
5270  if (!foreign_table && (dump_defaults || td->maxRows != DEFAULT_MAX_ROWS)) {
5271  with_options.push_back("MAX_ROWS=" + std::to_string(td->maxRows));
5272  }
5273  if ((dump_defaults || td->maxRollbackEpochs != DEFAULT_MAX_ROLLBACK_EPOCHS) &&
5274  td->maxRollbackEpochs != -1) {
5275  with_options.push_back("MAX_ROLLBACK_EPOCHS=" +
5277  }
5278  if (!foreign_table && (dump_defaults || !td->hasDeletedCol)) {
5279  with_options.emplace_back(td->hasDeletedCol ? "VACUUM='DELAYED'"
5280  : "VACUUM='IMMEDIATE'");
5281  }
5282  if (!foreign_table && !td->partitions.empty()) {
5283  with_options.push_back("PARTITIONS='" + td->partitions + "'");
5284  }
5285  if (!foreign_table && td->nShards > 0) {
5286  const auto shard_cd = getMetadataForColumn(td->tableId, td->shardedColumnId);
5287  CHECK(shard_cd);
5288  with_options.push_back(
5289  "SHARD_COUNT=" +
5290  std::to_string(td->nShards * std::max(g_leaf_count, static_cast<size_t>(1))));
5291  }
5292  if (!foreign_table && td->sortedColumnId > 0) {
5293  const auto sort_cd = getMetadataForColumn(td->tableId, td->sortedColumnId);
5294  CHECK(sort_cd);
5295  with_options.push_back("SORT_COLUMN='" + sort_cd->columnName + "'");
5296  }
5297 
5298  if (!with_options.empty()) {
5299  if (!multiline_formatting) {
5300  os << " ";
5301  } else {
5302  os << "\n";
5303  }
5304  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
5305  }
5306  os << ";";
5307  return os.str();
5308 }
5309 
5310 std::string Catalog::dumpCreateServer(const std::string& name,
5311  bool multiline_formatting) const {
5312  cat_read_lock read_lock(this);
5313  auto server_it = foreignServerMap_.find(name);
5314  if (server_it == foreignServerMap_.end()) {
5315  throw std::runtime_error("Foreign server " + name + " does not exist.");
5316  }
5317  auto server = server_it->second.get();
5318  std::ostringstream os;
5319  os << "CREATE SERVER " << name << " FOREIGN DATA WRAPPER " << server->data_wrapper_type;
5320  std::vector<std::string> with_options;
5321  for (const auto& [option, value] : server->options) {
5322  with_options.emplace_back(option + "='" + value + "'");
5323  }
5324  if (!with_options.empty()) {
5325  if (!multiline_formatting) {
5326  os << " ";
5327  } else {
5328  os << "\n";
5329  }
5330  os << "WITH (" + boost::algorithm::join(with_options, ", ") + ")";
5331  }
5332  os << ";";
5333  return os.str();
5334 }
5335 
5336 bool Catalog::validateNonExistentTableOrView(const std::string& name,
5337  const bool if_not_exists) {
5338  if (getMetadataForTable(name, false)) {
5339  if (if_not_exists) {
5340  return false;
5341  }
5342  throw std::runtime_error("Table or View with name \"" + name + "\" already exists.");
5343  }
5344  return true;
5345 }
5346 
5347 std::vector<const TableDescriptor*> Catalog::getAllForeignTablesForRefresh() const {
5348  cat_read_lock read_lock(this);
5349  std::vector<const TableDescriptor*> tables;
5350  for (auto entry : tableDescriptorMapById_) {
5351  auto table_descriptor = entry.second;
5352  if (table_descriptor->storageType == StorageType::FOREIGN_TABLE) {
5353  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
5354  CHECK(foreign_table);
5355  auto timing_type_entry = foreign_table->options.find(
5357  CHECK(timing_type_entry != foreign_table->options.end());
5359  if (timing_type_entry->second ==
5361  foreign_table->next_refresh_time <= current_time) {
5362  tables.emplace_back(foreign_table);
5363  }
5364  }
5365  }
5366  return tables;
5367 }
5368 
5369 void Catalog::updateForeignTableRefreshTimes(const int32_t table_id) {
5370  cat_write_lock write_lock(this);
5372  CHECK(tableDescriptorMapById_.find(table_id) != tableDescriptorMapById_.end());
5373  auto table_descriptor = tableDescriptorMapById_.find(table_id)->second;
5374  CHECK(table_descriptor);
5375  auto foreign_table = dynamic_cast<foreign_storage::ForeignTable*>(table_descriptor);
5376  CHECK(foreign_table);
5377  auto last_refresh_time = foreign_storage::RefreshTimeCalculator::getCurrentTime();
5378  auto next_refresh_time = get_next_refresh_time(*foreign_table);
5379  sqliteConnector_.query_with_text_params(
5380  "UPDATE omnisci_foreign_tables SET last_refresh_time = ?, next_refresh_time = ? "
5381  "WHERE table_id = ?",
5382  std::vector<std::string>{std::to_string(last_refresh_time),
5383